Java源码示例:org.apache.crunch.types.avro.Avros

示例1
public int run(String[] args) throws Exception {

    String fooInputPath = args[0];
    String barInputPath = args[1];
    String outputPath = args[2];
    int fooValMax = Integer.parseInt(args[3]);
    int joinValMax = Integer.parseInt(args[4]);
    int numberOfReducers = Integer.parseInt(args[5]);

    Pipeline pipeline = new MRPipeline(JoinFilterExampleCrunch.class, getConf()); //<1>
    
    PCollection<String> fooLines = pipeline.readTextFile(fooInputPath);  //<2>
    PCollection<String> barLines = pipeline.readTextFile(barInputPath);

    PTable<Long, Pair<Long, Integer>> fooTable = fooLines.parallelDo(  //<3>
        new FooIndicatorFn(),
        Avros.tableOf(Avros.longs(),
        Avros.pairs(Avros.longs(), Avros.ints())));

    fooTable = fooTable.filter(new FooFilter(fooValMax));  //<4>

    PTable<Long, Integer> barTable = barLines.parallelDo(new BarIndicatorFn(),
        Avros.tableOf(Avros.longs(), Avros.ints()));

    DefaultJoinStrategy<Long, Pair<Long, Integer>, Integer> joinStrategy =   //<5>
        new DefaultJoinStrategy
          <Long, Pair<Long, Integer>, Integer>
          (numberOfReducers);

    PTable<Long, Pair<Pair<Long, Integer>, Integer>> joinedTable = joinStrategy //<6>
        .join(fooTable, barTable, JoinType.INNER_JOIN);

    PTable<Long, Pair<Pair<Long, Integer>, Integer>> filteredTable = joinedTable.filter(new JoinFilter(joinValMax));

    filteredTable.write(At.textFile(outputPath), WriteMode.OVERWRITE); //<7>

    PipelineResult result = pipeline.done();

    return result.succeeded() ? 0 : 1;
  }
 
示例2
@SuppressWarnings("unchecked")
private static <T> AvroType<T> ptype(View<T> view) {
  Class<T> recordClass = view.getType();
  if (GenericRecord.class.isAssignableFrom(recordClass)) {
    return (AvroType<T>) Avros.generics(
        view.getDataset().getDescriptor().getSchema());
  } else {
    return Avros.records(recordClass);
  }
}
 
示例3
@SuppressWarnings("unchecked")
private static <E> AvroType<E> toAvroType(View<E> view, Class<E> type) {
  if (type.isAssignableFrom(GenericData.Record.class)) {
    return (AvroType<E>) Avros.generics(
      view.getDataset().getDescriptor().getSchema());
  } else {
    return Avros.records(type);
  }
}
 
示例4
private static <E> PCollection<E> partition(PCollection<E> collection,
                                            int numReducers) {
  PType<E> type = collection.getPType();
  PTableType<E, Void> tableType = Avros.tableOf(type, Avros.nulls());
  PTable<E, Void> table = collection.parallelDo(new AsKeyTable<E>(), tableType);
  PGroupedTable<E, Void> grouped =
      numReducers > 0 ? table.groupByKey(numReducers) : table.groupByKey();
  return grouped.ungroup().keys();
}
 
示例5
@Override
public int run(String[] args) throws Exception {
  // Turn debug on while in development.
  getPipeline().enableDebug();
  getPipeline().getConfiguration().set("crunch.log.job.progress", "true");

  Dataset<StandardEvent> eventsDataset = Datasets.load(
      "dataset:hdfs:/tmp/data/default/events", StandardEvent.class);

  View<StandardEvent> eventsToProcess;
  if (args.length == 0 || (args.length == 1 && args[0].equals("LATEST"))) {
    // get the current minute
    Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
    cal.set(Calendar.SECOND, 0);
    cal.set(Calendar.MILLISECOND, 0);
    long currentMinute = cal.getTimeInMillis();
    // restrict events to before the current minute
    // in the workflow, this also has a lower bound for the timestamp
    eventsToProcess = eventsDataset.toBefore("timestamp", currentMinute);

  } else if (isView(args[0])) {
    eventsToProcess = Datasets.load(args[0], StandardEvent.class);
  } else {
    eventsToProcess = FileSystemDatasets.viewForPath(eventsDataset, new Path(args[0]));
  }

  if (eventsToProcess.isEmpty()) {
    LOG.info("No records to process.");
    return 0;
  }

  // Create a parallel collection from the working partition
  PCollection<StandardEvent> events = read(
      CrunchDatasets.asSource(eventsToProcess));

  // Group events by user and cookie id, then create a session for each group
  PCollection<Session> sessions = events
      .by(new GetSessionKey(), Avros.strings())
      .groupByKey()
      .parallelDo(new MakeSession(), Avros.specifics(Session.class));

  // Write the sessions to the "sessions" Dataset
  getPipeline().write(sessions,
      CrunchDatasets.asTarget("dataset:hive:/tmp/data/default/sessions"),
      Target.WriteMode.APPEND);

  return run().succeeded() ? 0 : 1;
}
 
示例6
@Override
public int run(String[] args) throws Exception {

  new JCommander(this, args);

  URI outputUri = URI.create(output);

  // Our crunch job is a MapReduce job
  Pipeline pipeline = new MRPipeline(LegacyHdfs2Cass.class, getConf());

  // Parse & fetch info about target Cassandra cluster
  CassandraParams params = CassandraParams.parse(outputUri);

  // Read records from Avro files in inputFolder
  PCollection<ByteBuffer> records =
      pipeline.read(From.avroFile(inputList(input), Avros.records(ByteBuffer.class)));

  // Transform the input
  String protocol = outputUri.getScheme();
  if (protocol.equalsIgnoreCase("thrift")) {
    records
        // First convert ByteBuffers to ThriftRecords
        .parallelDo(new LegacyHdfsToThrift(), ThriftRecord.PTYPE)
        // Then group the ThriftRecords in preparation for writing them
        .parallelDo(new ThriftRecord.AsPair(), ThriftRecord.AsPair.PTYPE)
        .groupByKey(params.createGroupingOptions())
        // Finally write the ThriftRecords to Cassandra
        .write(new ThriftTarget(outputUri, params));
  }
  else if (protocol.equalsIgnoreCase("cql")) {
    records
        // In case of CQL, convert ByteBuffers to CQLRecords
        .parallelDo(new LegacyHdfsToCQL(), CQLRecord.PTYPE)
        .by(params.getKeyFn(), Avros.bytes())
        .groupByKey(params.createGroupingOptions())
        .write(new CQLTarget(outputUri, params));
  }

  // Execute the pipeline
  PipelineResult result = pipeline.done();
  return result.succeeded() ? 0 : 1;
}
 
示例7
@Override
public int run(String[] args) throws Exception {

  new JCommander(this, args);

  URI outputUri = URI.create(output);

  // Our crunch job is a MapReduce job
  Configuration conf = getConf();
  conf.setBoolean(MRJobConfig.MAP_SPECULATIVE, Boolean.FALSE);
  conf.setBoolean(MRJobConfig.REDUCE_SPECULATIVE, Boolean.FALSE);
  Pipeline pipeline = new MRPipeline(Hdfs2Cass.class, conf);

  // Parse & fetch info about target Cassandra cluster
  CassandraParams params = CassandraParams.parse(outputUri);

  PCollection<GenericRecord> records =
      ((PCollection<GenericRecord>)(PCollection) pipeline.read(From.avroFile(inputList(input))));

  String protocol = outputUri.getScheme();
  if (protocol.equalsIgnoreCase("thrift")) {
    records
        // First convert ByteBuffers to ThriftRecords
        .parallelDo(new AvroToThrift(rowkey, timestamp, ttl, ignore), ThriftRecord.PTYPE)
        // Then group the ThriftRecords in preparation for writing them
        .parallelDo(new ThriftRecord.AsPair(), ThriftRecord.AsPair.PTYPE)
        .groupByKey(params.createGroupingOptions())
         // Finally write the ThriftRecords to Cassandra
        .write(new ThriftTarget(outputUri, params));
  }
  else if (protocol.equalsIgnoreCase("cql")) {
    records
        // In case of CQL, convert ByteBuffers to CQLRecords
        .parallelDo(new AvroToCQL(rowkey, timestamp, ttl, ignore), CQLRecord.PTYPE)
        .by(params.getKeyFn(), Avros.bytes())
        .groupByKey(params.createGroupingOptions())
        .write(new CQLTarget(outputUri, params));
  }

  // Execute the pipeline
  PipelineResult result = pipeline.done();
  return result.succeeded() ? 0 : 1;
}
 
示例8
/**
 * Partitions {@code collection} to be stored efficiently in {@code View}.
 * <p>
 * This restructures the parallel collection so that all of the entities that
 * will be stored in a given partition will be evenly distributed across a specified
 * {@code numPartitionWriters}.
 * <p>
 * If the dataset is not partitioned, then this will structure all of the
 * entities to produce a number of files equal to {@code numWriters}.
 *
 * @param collection a collection of entities
 * @param view a {@link View} of a dataset to partition the collection for
 * @param numWriters the number of writers that should be used
 * @param numPartitionWriters the number of writers data for a single partition will be distributed across
 * @param <E> the type of entities in the collection and underlying dataset
 * @return an equivalent collection of entities partitioned for the view
 * @see #partition(PCollection, View)
 *
 * @since 1.1.0
 */
public static <E> PCollection<E> partition(PCollection<E> collection,
                                           View<E> view,
                                           int numWriters, int numPartitionWriters) {
  //ensure the number of writers is honored whether it is per partition or total.
  DatasetDescriptor descriptor = view.getDataset().getDescriptor();
  if (descriptor.isPartitioned()) {
    GetStorageKey<E> getKey = new GetStorageKey<E>(view, numPartitionWriters);
    PTable<Pair<GenericData.Record, Integer>, E> table = collection
        .by(getKey, Avros.pairs(Avros.generics(getKey.schema()), Avros.ints()));
    PGroupedTable<Pair<GenericData.Record, Integer>, E> grouped =
        numWriters > 0 ? table.groupByKey(numWriters) : table.groupByKey();
    return grouped.ungroup().values();
  } else {
    return partition(collection, numWriters);
  }
}
 
示例9
@Test
public void testUseReaderSchema() throws IOException {

  // Create a schema with only a username, so we can test reading it
  // with an enhanced record structure.
  Schema oldRecordSchema = SchemaBuilder.record("org.kitesdk.data.user.OldUserRecord")
      .fields()
      .requiredString("username")
      .endRecord();

  // create the dataset
  Dataset<Record> in = repo.create("ns", "in", new DatasetDescriptor.Builder()
      .schema(oldRecordSchema).build());
  Dataset<Record> out = repo.create("ns", "out", new DatasetDescriptor.Builder()
      .schema(oldRecordSchema).build());
  Record oldUser = new Record(oldRecordSchema);
  oldUser.put("username", "user");

  DatasetWriter<Record> writer = in.newWriter();

  try {

    writer.write(oldUser);

  } finally {
    writer.close();
  }

  Pipeline pipeline = new MRPipeline(TestCrunchDatasets.class);

  // read data from updated dataset that has the new schema.
  // At this point, User class has the old schema
  PCollection<NewUserRecord> data = pipeline.read(CrunchDatasets.asSource(in.getUri(),
      NewUserRecord.class));

  PCollection<NewUserRecord> processed = data.parallelDo(new UserRecordIdentityFn(),
      Avros.records(NewUserRecord.class));

  pipeline.write(processed, CrunchDatasets.asTarget(out));

  DatasetReader reader = out.newReader();

  Assert.assertTrue("Pipeline failed.", pipeline.run().succeeded());

  try {

    // there should be one record that is equal to our old user generic record.
    Assert.assertEquals(oldUser, reader.next());
    Assert.assertFalse(reader.hasNext());

  } finally {
    reader.close();
  }
}
 
示例10
@Test
public void testUseReaderSchemaParquet() throws IOException {

  // Create a schema with only a username, so we can test reading it
  // with an enhanced record structure.
  Schema oldRecordSchema = SchemaBuilder.record("org.kitesdk.data.user.OldUserRecord")
      .fields()
      .requiredString("username")
      .endRecord();

  // create the dataset
  Dataset<Record> in = repo.create("ns", "in", new DatasetDescriptor.Builder()
      .format(Formats.PARQUET).schema(oldRecordSchema).build());

  Dataset<Record> out = repo.create("ns", "out", new DatasetDescriptor.Builder()
      .format(Formats.PARQUET).schema(oldRecordSchema).build());
  Record oldUser = new Record(oldRecordSchema);
  oldUser.put("username", "user");

  DatasetWriter<Record> writer = in.newWriter();

  try {

    writer.write(oldUser);

  } finally {
    writer.close();
  }

  Pipeline pipeline = new MRPipeline(TestCrunchDatasets.class);

  // read data from updated dataset that has the new schema.
  // At this point, User class has the old schema
  PCollection<NewUserRecord> data = pipeline.read(CrunchDatasets.asSource(in.getUri(),
      NewUserRecord.class));

  PCollection<NewUserRecord> processed = data.parallelDo(new UserRecordIdentityFn(),
      Avros.records(NewUserRecord.class));

  pipeline.write(processed, CrunchDatasets.asTarget(out));

  DatasetReader reader = out.newReader();

  Assert.assertTrue("Pipeline failed.", pipeline.run().succeeded());

  try {

    // there should be one record that is equal to our old user generic record.
    Assert.assertEquals(oldUser, reader.next());
    Assert.assertFalse(reader.hasNext());

  } finally {
    reader.close();
  }
}