REDIGERING 2018-01-27:
Det viser sig, at dette problem er relateret til DirectRunner. Hvis du kører den samme pipeline ved hjælp af DataflowRunner, bør du få batches, der faktisk er op til 1.000 poster. DirectRunner'en opretter altid bundter i størrelse 1 efter en grupperingsoperation.
Oprindeligt svar:
Jeg er stødt på det samme problem, når jeg skriver til cloud-databaser ved hjælp af Apache Beams JdbcIO. Problemet er, at selvom JdbcIO understøtter skrivning af op til 1.000 poster i én batch, har jeg faktisk aldrig set den skrive mere end 1 række ad gangen (jeg må indrømme:Dette brugte altid DirectRunner i et udviklingsmiljø).
Jeg har derfor tilføjet en funktion til JdbcIO, hvor du selv kan styre størrelsen på batchene ved at gruppere dine data sammen og skrive hver gruppe som én batch. Nedenfor er et eksempel på, hvordan man bruger denne funktion baseret på det originale WordCount-eksempel på Apache Beam.
p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
// Count words in input file(s)
.apply(new CountWords())
// Format as text
.apply(MapElements.via(new FormatAsTextFn()))
// Make key-value pairs with the first letter as the key
.apply(ParDo.of(new FirstLetterAsKey()))
// Group the words by first letter
.apply(GroupByKey.<String, String> create())
// Get a PCollection of only the values, discarding the keys
.apply(ParDo.of(new GetValues()))
// Write the words to the database
.apply(JdbcIO.<String> writeIterable()
.withDataSourceConfiguration(
JdbcIO.DataSourceConfiguration.create(options.getJdbcDriver(), options.getURL()))
.withStatement(INSERT_OR_UPDATE_SQL)
.withPreparedStatementSetter(new WordCountPreparedStatementSetter()));
Forskellen med den normale skrivemetode for JdbcIO er den nye metode writeIterable()
der tager en PCollection<Iterable<RowT>>
som input i stedet for PCollection<RowT>
. Hver Iterable skrives som én batch til databasen.
Versionen af JdbcIO med denne tilføjelse kan findes her:https://github.com/olavloite/beam/blob/JdbcIOIterableWrite/sdks/java/io/jdbc/src/main/java /org/apache/beam/sdk/io/jdbc/JdbcIO.java
Hele eksempelprojektet, der indeholder eksemplet ovenfor, kan findes her:https://github.com/ olavloite/spænde-stråle-eksempel
(Der er også en afventende pull-anmodning på Apache Beam for at inkludere dette i projektet)