Serienbeilagen
Der einfachste Weg wäre Einfügungen
innerhalb eines Sink.foreach
.
Angenommen, Sie haben die Schema-Code-Generierung verwendet und weiter davon aus, dass Ihre Tabelle "NumberTable" heißt
//Tables file was auto-generated by the schema code generation
import Tables.{Numbertable, NumbertableRow}
val numberTableDB = Database forConfig "NumberTableConfig"
Wir können eine Funktion schreiben, die das Einfügen durchführt
def insertIntoDb(num : Int) =
numberTableDB run (Numbertable += NumbertableRow(num))
Und diese Funktion kann in der Senke platziert werden
val insertSink = Sink[Int] foreach insertIntoDb
Source(0 to 100) runWith insertSink
Batch-Einfügungen
Sie könnten die Sink-Methodik weiter erweitern, indem Sie N Einfügungen gleichzeitig stapeln:
def batchInsertIntoDb(nums : Seq[Int]) =
numberTableDB run (Numbertable ++= nums.map(NumbertableRow.apply))
val batchInsertSink = Sink[Seq[Int]] foreach batchInsertIntoDb
Diese gestapelte Senke kann von einem Flow
gespeist werden was die Batch-Gruppierung durchführt:
val batchSize = 10
Source(0 to 100).via(Flow[Int].grouped(batchSize))
.runWith(batchInsertSink)