Mysql
 sql >> Datenbank >  >> RDS >> Mysql

Wie werden reaktive Streams in Slick zum Einfügen von Daten verwendet?

Serienbeilagen

Der einfachste Weg wäre Einfügungen innerhalb eines Sink.foreach .

Angenommen, Sie haben die Schema-Code-Generierung verwendet und weiter davon aus, dass Ihre Tabelle "NumberTable" heißt

//Tables file was auto-generated by the schema code generation
import Tables.{Numbertable, NumbertableRow} 

val numberTableDB = Database forConfig "NumberTableConfig"

Wir können eine Funktion schreiben, die das Einfügen durchführt

def insertIntoDb(num : Int) = 
  numberTableDB run (Numbertable += NumbertableRow(num))

Und diese Funktion kann in der Senke platziert werden

val insertSink = Sink[Int] foreach insertIntoDb

Source(0 to 100) runWith insertSink

Batch-Einfügungen

Sie könnten die Sink-Methodik weiter erweitern, indem Sie N Einfügungen gleichzeitig stapeln:

def batchInsertIntoDb(nums : Seq[Int]) = 
  numberTableDB run (Numbertable ++= nums.map(NumbertableRow.apply))

val batchInsertSink = Sink[Seq[Int]] foreach batchInsertIntoDb

Diese gestapelte Senke kann von einem Flow gespeist werden was die Batch-Gruppierung durchführt:

val batchSize = 10

Source(0 to 100).via(Flow[Int].grouped(batchSize))
                .runWith(batchInsertSink)