Java源码示例:org.apache.beam.sdk.coders.RowCoder
示例1
@Override
public PCollection<Row> expand(PBegin input) {
checkArgument(getQuery() != null, "withQuery() is required");
checkArgument(
(getDataSourceProviderFn() != null),
"withDataSourceConfiguration() or withDataSourceProviderFn() is required");
Schema schema = inferBeamSchema();
PCollection<Row> rows =
input.apply(
JdbcIO.<Row>read()
.withDataSourceProviderFn(getDataSourceProviderFn())
.withQuery(getQuery())
.withCoder(RowCoder.of(schema))
.withRowMapper(SchemaUtil.BeamRowMapper.of(schema))
.withFetchSize(getFetchSize())
.withOutputParallelization(getOutputParallelization())
.withStatementPreparator(getStatementPreparator()));
rows.setRowSchema(schema);
return rows;
}
示例2
private RowCoder getCoderForRow(String tableName, Row record) {
if (!rowCoderMap.containsKey(tableName)) {
RowCoder coderForTableTopic = RowCoder.of(record.getSchema());
rowCoderMap.put(tableName, coderForTableTopic);
}
return rowCoderMap.get(tableName);
}
示例3
@Override
public PCollection<Row> expand(PBegin input) {
Schema schema = Schema.of(Schema.Field.of("f0", Schema.FieldType.INT64));
Iterable<Row> bundle =
IntStream.range(0, size)
.mapToObj(x -> Row.withSchema(schema).addValue((long) x).build())
.collect(Collectors.toList());
// make sure we get one big bundle
return input
.getPipeline()
.apply(Create.<Iterable<Row>>of(bundle).withCoder(IterableCoder.of(RowCoder.of(schema))))
.apply(Flatten.iterables())
.setRowSchema(schema);
}
示例4
/**
* @param args arguments.
*/
public static void main(final String[] args) {
final String outputFilePath = args[0];
final PipelineOptions options = NemoPipelineOptionsFactory.create();
options.setJobName("SimpleSumSQL");
final Pipeline p = Pipeline.create(options);
// define the input row format
final Schema schema = Schema.builder()
.addInt32Field("c1")
.addStringField("c2")
.addDoubleField("c3").build();
// 10 rows with 0 ~ 9.
final List<Row> rows = IntStream.range(0, 10)
.mapToObj(i -> Row.withSchema(schema).addValues(i, "row", (double) i).build())
.collect(Collectors.toList());
// Create a source PCollection
final PCollection<Row> inputTable = PBegin.in(p)
.apply(Create.of(rows).withCoder(RowCoder.of(schema)))
.setRowSchema(schema);
// Run 2 SQL queries
// ==> Sum of ints larger than 1
final PCollection<Row> firstQueryResult =
inputTable.apply(SqlTransform.query("select c1, c2, c3 from PCOLLECTION where c1 > 1"));
final PCollection<Row> secondQueryResult = PCollectionTuple
.of(new TupleTag<>("FIRST_QUERY_RESULT"), firstQueryResult)
.apply(SqlTransform.query("select c2, sum(c3) from FIRST_QUERY_RESULT group by c2"));
// Write results to a file
// The result should be 2 + 3 + 4 + ... + 9 = 44
GenericSourceSink.write(secondQueryResult.apply(MapElements.via(new SimpleFunction<Row, String>() {
@Override
public String apply(final Row input) {
final String c2 = input.getString(0);
final Double c3 = input.getDouble(1);
return c2 + " is " + c3;
}
})), outputFilePath);
p.run().waitUntilFinish();
}
示例5
private DecodeRows(Schema beamSchema) {
this.coder = RowCoder.of(beamSchema);
this.schema = beamSchema;
}
示例6
@Test
@Category(NeedsRunner.class)
public void testUserAgentAnalysisSQL() {
// ============================================================
// The base test input
// List of Input UserAgent and expected { DeviceClass, AgentNameVersion }
List<List<String>> useragents = Arrays.asList(
Arrays.asList(
"Mozilla/5.0 (X11; Linux x86_64) " +
"AppleWebKit/537.36 (KHTML, like Gecko) " +
"Chrome/48.0.2564.82 Safari/537.36",
"Desktop",
"Chrome 48.0.2564.82"),
Arrays.asList(
"Mozilla/5.0 (Linux; Android 7.0; Nexus 6 Build/NBD90Z) " +
"AppleWebKit/537.36 (KHTML, like Gecko) " +
"Chrome/53.0.2785.124 Mobile Safari/537.36",
"Phone",
"Chrome 53.0.2785.124")
);
// ============================================================
// Convert into a PCollection<Row>
Schema inputSchema = Schema
.builder()
.addStringField("userAgent")
.addStringField("expectedDeviceClass")
.addStringField("expectedAgentNameVersion")
.build();
PCollection<Row> input = pipeline
.apply(Create.of(useragents))
.setCoder(ListCoder.of(StringUtf8Coder.of()))
.apply(ParDo.of(new DoFn<List<String>, Row>() {
@ProcessElement
public void processElement(ProcessContext c) {
// Get the current POJO instance
List<String> inputValue = c.element();
// Create a Row with the appSchema schema
// and values from the current POJO
Row appRow =
Row
.withSchema(inputSchema)
.addValues(inputValue.get(0))
.addValues(inputValue.get(1))
.addValues(inputValue.get(2))
.build();
// Output the Row representing the current POJO
c.output(appRow);
}
}))
.setCoder(RowCoder.of(inputSchema));
// ============================================================
// Define a SQL query which calls the UDF
String sql =
"SELECT" +
" userAgent AS userAgent " +
", expectedDeviceClass AS expectedDeviceClass " +
", expectedAgentNameVersion AS expectedAgentNameVersion " +
", ParseUserAgent(userAgent) AS parsedUserAgent " +
", ParseUserAgent(userAgent)['DeviceClass'] AS deviceClass " +
", ParseUserAgent(userAgent)['AgentNameVersion'] AS agentNameVersion " +
"FROM InputStream";
// Create and apply the PTransform representing the query.
// Register the UDFs used in the query by calling '.registerUdf()' with
// either a class which implements BeamSqlUdf or with
// an instance of the SerializableFunction;
PCollection<Row> result =
// This way we give a name to the input stream for use in the SQL
PCollectionTuple.of("InputStream", input)
// Apply the SQL with the UDFs we need.
.apply("Execute SQL", SqlTransform
.query(sql)
.registerUdf("ParseUserAgent", new ParseUserAgent())
);
result.apply(ParDo.of(new RowPrinter()));
// // Assert on the results.
// PAssert.that(result)
// .containsInAnyOrder((Row)null);
// FIXME: This DOES NOT work at the time of writing.
// waiting for https://issues.apache.org/jira/browse/BEAM-9267
// to be implemented which depends on Calcite 1.22 to be released.
pipeline.run().waitUntilFinish();
}
示例7
/** Create a {@link StateSpec} for a row value with the specified schema. */
public static StateSpec<ValueState<Row>> rowValue(Schema schema) {
return value(RowCoder.of(schema));
}
示例8
/** Returns a {@link SchemaCoder} for {@link Row} instances with the given {@code schema}. */
public static SchemaCoder<Row> of(Schema schema) {
return RowCoder.of(schema);
}
示例9
/**
* Create a {@link StateSpec} for a {@link BagState}, optimized for adding values frequently and
* occasionally retrieving all the values that have been added.
*
* <p>This method is for storing row elements with the given schema.
*/
public static StateSpec<BagState<Row>> rowBag(Schema schema) {
return new BagStateSpec<>(RowCoder.of(schema));
}
示例10
/**
* Create a {@link StateSpec} for a {@link SetState}, optimized for checking membership.
*
* <p>This method is for storing row elements with the given schema.
*/
public static StateSpec<SetState<Row>> rowSet(Schema schema) {
return new SetStateSpec<>(RowCoder.of(schema));
}
示例11
/**
* Create a {@link StateSpec} for a {@link SetState}, optimized for key lookups and writes.
*
* <p>This method is for storing maps where both the keys and the values are rows with the
* specified schemas.
*
* @see #map(Coder, Coder)
*/
public static StateSpec<MapState<Row, Row>> rowMap(Schema keySchema, Schema valueSchema) {
return new MapStateSpec<>(RowCoder.of(keySchema), RowCoder.of(valueSchema));
}