Java源码示例:org.apache.beam.sdk.coders.RowCoder

示例1
@Override
public PCollection<Row> expand(PBegin input) {
  checkArgument(getQuery() != null, "withQuery() is required");
  checkArgument(
      (getDataSourceProviderFn() != null),
      "withDataSourceConfiguration() or withDataSourceProviderFn() is required");

  Schema schema = inferBeamSchema();
  PCollection<Row> rows =
      input.apply(
          JdbcIO.<Row>read()
              .withDataSourceProviderFn(getDataSourceProviderFn())
              .withQuery(getQuery())
              .withCoder(RowCoder.of(schema))
              .withRowMapper(SchemaUtil.BeamRowMapper.of(schema))
              .withFetchSize(getFetchSize())
              .withOutputParallelization(getOutputParallelization())
              .withStatementPreparator(getStatementPreparator()));
  rows.setRowSchema(schema);
  return rows;
}
 
示例2
private RowCoder getCoderForRow(String tableName, Row record) {
  if (!rowCoderMap.containsKey(tableName)) {
    RowCoder coderForTableTopic = RowCoder.of(record.getSchema());
    rowCoderMap.put(tableName, coderForTableTopic);
  }

  return rowCoderMap.get(tableName);
}
 
示例3
@Override
public PCollection<Row> expand(PBegin input) {
  Schema schema = Schema.of(Schema.Field.of("f0", Schema.FieldType.INT64));
  Iterable<Row> bundle =
      IntStream.range(0, size)
          .mapToObj(x -> Row.withSchema(schema).addValue((long) x).build())
          .collect(Collectors.toList());

  // make sure we get one big bundle
  return input
      .getPipeline()
      .apply(Create.<Iterable<Row>>of(bundle).withCoder(IterableCoder.of(RowCoder.of(schema))))
      .apply(Flatten.iterables())
      .setRowSchema(schema);
}
 
示例4
/**
 * @param args arguments.
 */
public static void main(final String[] args) {
  final String outputFilePath = args[0];

  final PipelineOptions options = NemoPipelineOptionsFactory.create();
  options.setJobName("SimpleSumSQL");
  final Pipeline p = Pipeline.create(options);

  // define the input row format
  final Schema schema = Schema.builder()
    .addInt32Field("c1")
    .addStringField("c2")
    .addDoubleField("c3").build();

  // 10 rows with 0 ~ 9.
  final List<Row> rows = IntStream.range(0, 10)
    .mapToObj(i -> Row.withSchema(schema).addValues(i, "row", (double) i).build())
    .collect(Collectors.toList());

  // Create a source PCollection
  final PCollection<Row> inputTable = PBegin.in(p)
    .apply(Create.of(rows).withCoder(RowCoder.of(schema)))
    .setRowSchema(schema);

  // Run 2 SQL queries
  // ==> Sum of ints larger than 1
  final PCollection<Row> firstQueryResult =
    inputTable.apply(SqlTransform.query("select c1, c2, c3 from PCOLLECTION where c1 > 1"));
  final PCollection<Row> secondQueryResult = PCollectionTuple
    .of(new TupleTag<>("FIRST_QUERY_RESULT"), firstQueryResult)
    .apply(SqlTransform.query("select c2, sum(c3) from FIRST_QUERY_RESULT group by c2"));

  // Write results to a file
  // The result should be 2 + 3 + 4 + ... + 9 = 44
  GenericSourceSink.write(secondQueryResult.apply(MapElements.via(new SimpleFunction<Row, String>() {
    @Override
    public String apply(final Row input) {
      final String c2 = input.getString(0);
      final Double c3 = input.getDouble(1);
      return c2 + " is " + c3;
    }
  })), outputFilePath);

  p.run().waitUntilFinish();
}
 
示例5
private DecodeRows(Schema beamSchema) {
  this.coder = RowCoder.of(beamSchema);
  this.schema = beamSchema;
}
 
示例6
@Test
    @Category(NeedsRunner.class)
    public void testUserAgentAnalysisSQL() {
        // ============================================================
        // The base test input
        // List of Input UserAgent and expected { DeviceClass, AgentNameVersion }
        List<List<String>> useragents = Arrays.asList(
            Arrays.asList(
                "Mozilla/5.0 (X11; Linux x86_64) " +
                    "AppleWebKit/537.36 (KHTML, like Gecko) " +
                    "Chrome/48.0.2564.82 Safari/537.36",
                "Desktop",
                "Chrome 48.0.2564.82"),

            Arrays.asList(
                "Mozilla/5.0 (Linux; Android 7.0; Nexus 6 Build/NBD90Z) " +
                    "AppleWebKit/537.36 (KHTML, like Gecko) " +
                    "Chrome/53.0.2785.124 Mobile Safari/537.36",
                "Phone",
                "Chrome 53.0.2785.124")
        );

        // ============================================================
        // Convert into a PCollection<Row>
        Schema inputSchema = Schema
            .builder()
            .addStringField("userAgent")
            .addStringField("expectedDeviceClass")
            .addStringField("expectedAgentNameVersion")
            .build();

        PCollection<Row> input = pipeline
            .apply(Create.of(useragents))
            .setCoder(ListCoder.of(StringUtf8Coder.of()))
            .apply(ParDo.of(new DoFn<List<String>, Row>() {
                @ProcessElement
                public void processElement(ProcessContext c) {
                    // Get the current POJO instance
                    List<String> inputValue = c.element();

                    // Create a Row with the appSchema schema
                    // and values from the current POJO
                    Row appRow =
                        Row
                            .withSchema(inputSchema)
                            .addValues(inputValue.get(0))
                            .addValues(inputValue.get(1))
                            .addValues(inputValue.get(2))
                            .build();

                    // Output the Row representing the current POJO
                    c.output(appRow);
                }
            }))
            .setCoder(RowCoder.of(inputSchema));


        // ============================================================

        // Define a SQL query which calls the UDF
        String sql =
            "SELECT" +
                " userAgent                                         AS userAgent " +
                ", expectedDeviceClass                              AS expectedDeviceClass " +
                ", expectedAgentNameVersion                         AS expectedAgentNameVersion " +
                ", ParseUserAgent(userAgent)                        AS parsedUserAgent " +
                ", ParseUserAgent(userAgent)['DeviceClass']         AS deviceClass " +
                ", ParseUserAgent(userAgent)['AgentNameVersion']    AS agentNameVersion " +
                "FROM InputStream";

        // Create and apply the PTransform representing the query.
        // Register the UDFs used in the query by calling '.registerUdf()' with
        // either a class which implements BeamSqlUdf or with
        // an instance of the SerializableFunction;
        PCollection<Row> result =
            // This way we give a name to the input stream for use in the SQL
            PCollectionTuple.of("InputStream", input)
                // Apply the SQL with the UDFs we need.
                .apply("Execute SQL", SqlTransform
                    .query(sql)
                    .registerUdf("ParseUserAgent", new ParseUserAgent())
                );

        result.apply(ParDo.of(new RowPrinter()));

//        // Assert on the results.
//        PAssert.that(result)
//            .containsInAnyOrder((Row)null);

        // FIXME: This DOES NOT work at the time of writing.
        //  waiting for https://issues.apache.org/jira/browse/BEAM-9267
        //  to be implemented which depends on Calcite 1.22 to be released.

        pipeline.run().waitUntilFinish();

    }
 
示例7
/** Create a {@link StateSpec} for a row value with the specified schema. */
public static StateSpec<ValueState<Row>> rowValue(Schema schema) {
  return value(RowCoder.of(schema));
}
 
示例8
/** Returns a {@link SchemaCoder} for {@link Row} instances with the given {@code schema}. */
public static SchemaCoder<Row> of(Schema schema) {
  return RowCoder.of(schema);
}
 
示例9
/**
 * Create a {@link StateSpec} for a {@link BagState}, optimized for adding values frequently and
 * occasionally retrieving all the values that have been added.
 *
 * <p>This method is for storing row elements with the given schema.
 */
public static StateSpec<BagState<Row>> rowBag(Schema schema) {
  return new BagStateSpec<>(RowCoder.of(schema));
}
 
示例10
/**
 * Create a {@link StateSpec} for a {@link SetState}, optimized for checking membership.
 *
 * <p>This method is for storing row elements with the given schema.
 */
public static StateSpec<SetState<Row>> rowSet(Schema schema) {
  return new SetStateSpec<>(RowCoder.of(schema));
}
 
示例11
/**
 * Create a {@link StateSpec} for a {@link SetState}, optimized for key lookups and writes.
 *
 * <p>This method is for storing maps where both the keys and the values are rows with the
 * specified schemas.
 *
 * @see #map(Coder, Coder)
 */
public static StateSpec<MapState<Row, Row>> rowMap(Schema keySchema, Schema valueSchema) {
  return new MapStateSpec<>(RowCoder.of(keySchema), RowCoder.of(valueSchema));
}