Mysql
 sql >> Datenbank >  >> RDS >> Mysql

Google Dataflow (Apache Beam) JdbcIO-Masseneinfügung in MySQL-Datenbank

BEARBEITEN 2018-01-27:

Es stellt sich heraus, dass dieses Problem mit DirectRunner zusammenhängt. Wenn Sie dieselbe Pipeline mit DataflowRunner ausführen, sollten Sie Batches erhalten, die tatsächlich bis zu 1.000 Datensätze umfassen. Der DirectRunner erstellt nach einem Gruppierungsvorgang immer Bündel der Größe 1.

Ursprüngliche Antwort:

Ich bin auf das gleiche Problem gestoßen, als ich mit JdbcIO von Apache Beam in Cloud-Datenbanken geschrieben habe. Das Problem ist, dass JdbcIO zwar das Schreiben von bis zu 1.000 Datensätzen in einem Stapel unterstützt, aber ich habe noch nie gesehen, dass es mehr als 1 Zeile gleichzeitig schreibt (ich muss zugeben:Dies war immer die Verwendung von DirectRunner in einer Entwicklungsumgebung).

Ich habe JdbcIO daher eine Funktion hinzugefügt, mit der Sie die Größe der Stapel selbst steuern können, indem Sie Ihre Daten zusammenfassen und jede Gruppe als einen Stapel schreiben. Nachfolgend finden Sie ein Beispiel für die Verwendung dieser Funktion basierend auf dem ursprünglichen WordCount-Beispiel von Apache Beam.

p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
    // Count words in input file(s)
    .apply(new CountWords())
    // Format as text
    .apply(MapElements.via(new FormatAsTextFn()))
    // Make key-value pairs with the first letter as the key
    .apply(ParDo.of(new FirstLetterAsKey()))
    // Group the words by first letter
    .apply(GroupByKey.<String, String> create())
    // Get a PCollection of only the values, discarding the keys
    .apply(ParDo.of(new GetValues()))
    // Write the words to the database
    .apply(JdbcIO.<String> writeIterable()
            .withDataSourceConfiguration(
                JdbcIO.DataSourceConfiguration.create(options.getJdbcDriver(), options.getURL()))
            .withStatement(INSERT_OR_UPDATE_SQL)
            .withPreparedStatementSetter(new WordCountPreparedStatementSetter()));

Der Unterschied zur normalen Write-Methode von JdbcIO ist die neue Methode writeIterable() das nimmt eine PCollection<Iterable<RowT>> als Eingabe anstelle von PCollection<RowT> . Jedes Iterable wird als ein Batch in die Datenbank geschrieben.

Die Version von JdbcIO mit diesem Zusatz finden Sie hier:https://github.com/olavloite/beam/blob/JdbcIOIterableWrite/sdks/java/io/jdbc/src/main/java /org/apache/beam/sdk/io/jdbc/JdbcIO.java

Das gesamte Beispielprojekt, das das obige Beispiel enthält, finden Sie hier:https://github.com/ olavloite/spanner-beam-Beispiel

(Es gibt auch eine Pull-Anforderung an Apache Beam, um dies in das Projekt aufzunehmen)