Java源码示例:org.apache.crunch.types.avro.Avros
示例1
public int run(String[] args) throws Exception {
String fooInputPath = args[0];
String barInputPath = args[1];
String outputPath = args[2];
int fooValMax = Integer.parseInt(args[3]);
int joinValMax = Integer.parseInt(args[4]);
int numberOfReducers = Integer.parseInt(args[5]);
Pipeline pipeline = new MRPipeline(JoinFilterExampleCrunch.class, getConf()); //<1>
PCollection<String> fooLines = pipeline.readTextFile(fooInputPath); //<2>
PCollection<String> barLines = pipeline.readTextFile(barInputPath);
PTable<Long, Pair<Long, Integer>> fooTable = fooLines.parallelDo( //<3>
new FooIndicatorFn(),
Avros.tableOf(Avros.longs(),
Avros.pairs(Avros.longs(), Avros.ints())));
fooTable = fooTable.filter(new FooFilter(fooValMax)); //<4>
PTable<Long, Integer> barTable = barLines.parallelDo(new BarIndicatorFn(),
Avros.tableOf(Avros.longs(), Avros.ints()));
DefaultJoinStrategy<Long, Pair<Long, Integer>, Integer> joinStrategy = //<5>
new DefaultJoinStrategy
<Long, Pair<Long, Integer>, Integer>
(numberOfReducers);
PTable<Long, Pair<Pair<Long, Integer>, Integer>> joinedTable = joinStrategy //<6>
.join(fooTable, barTable, JoinType.INNER_JOIN);
PTable<Long, Pair<Pair<Long, Integer>, Integer>> filteredTable = joinedTable.filter(new JoinFilter(joinValMax));
filteredTable.write(At.textFile(outputPath), WriteMode.OVERWRITE); //<7>
PipelineResult result = pipeline.done();
return result.succeeded() ? 0 : 1;
}
示例2
@SuppressWarnings("unchecked")
private static <T> AvroType<T> ptype(View<T> view) {
Class<T> recordClass = view.getType();
if (GenericRecord.class.isAssignableFrom(recordClass)) {
return (AvroType<T>) Avros.generics(
view.getDataset().getDescriptor().getSchema());
} else {
return Avros.records(recordClass);
}
}
示例3
@SuppressWarnings("unchecked")
private static <E> AvroType<E> toAvroType(View<E> view, Class<E> type) {
if (type.isAssignableFrom(GenericData.Record.class)) {
return (AvroType<E>) Avros.generics(
view.getDataset().getDescriptor().getSchema());
} else {
return Avros.records(type);
}
}
示例4
private static <E> PCollection<E> partition(PCollection<E> collection,
int numReducers) {
PType<E> type = collection.getPType();
PTableType<E, Void> tableType = Avros.tableOf(type, Avros.nulls());
PTable<E, Void> table = collection.parallelDo(new AsKeyTable<E>(), tableType);
PGroupedTable<E, Void> grouped =
numReducers > 0 ? table.groupByKey(numReducers) : table.groupByKey();
return grouped.ungroup().keys();
}
示例5
@Override
public int run(String[] args) throws Exception {
// Turn debug on while in development.
getPipeline().enableDebug();
getPipeline().getConfiguration().set("crunch.log.job.progress", "true");
Dataset<StandardEvent> eventsDataset = Datasets.load(
"dataset:hdfs:/tmp/data/default/events", StandardEvent.class);
View<StandardEvent> eventsToProcess;
if (args.length == 0 || (args.length == 1 && args[0].equals("LATEST"))) {
// get the current minute
Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
cal.set(Calendar.SECOND, 0);
cal.set(Calendar.MILLISECOND, 0);
long currentMinute = cal.getTimeInMillis();
// restrict events to before the current minute
// in the workflow, this also has a lower bound for the timestamp
eventsToProcess = eventsDataset.toBefore("timestamp", currentMinute);
} else if (isView(args[0])) {
eventsToProcess = Datasets.load(args[0], StandardEvent.class);
} else {
eventsToProcess = FileSystemDatasets.viewForPath(eventsDataset, new Path(args[0]));
}
if (eventsToProcess.isEmpty()) {
LOG.info("No records to process.");
return 0;
}
// Create a parallel collection from the working partition
PCollection<StandardEvent> events = read(
CrunchDatasets.asSource(eventsToProcess));
// Group events by user and cookie id, then create a session for each group
PCollection<Session> sessions = events
.by(new GetSessionKey(), Avros.strings())
.groupByKey()
.parallelDo(new MakeSession(), Avros.specifics(Session.class));
// Write the sessions to the "sessions" Dataset
getPipeline().write(sessions,
CrunchDatasets.asTarget("dataset:hive:/tmp/data/default/sessions"),
Target.WriteMode.APPEND);
return run().succeeded() ? 0 : 1;
}
示例6
@Override
public int run(String[] args) throws Exception {
new JCommander(this, args);
URI outputUri = URI.create(output);
// Our crunch job is a MapReduce job
Pipeline pipeline = new MRPipeline(LegacyHdfs2Cass.class, getConf());
// Parse & fetch info about target Cassandra cluster
CassandraParams params = CassandraParams.parse(outputUri);
// Read records from Avro files in inputFolder
PCollection<ByteBuffer> records =
pipeline.read(From.avroFile(inputList(input), Avros.records(ByteBuffer.class)));
// Transform the input
String protocol = outputUri.getScheme();
if (protocol.equalsIgnoreCase("thrift")) {
records
// First convert ByteBuffers to ThriftRecords
.parallelDo(new LegacyHdfsToThrift(), ThriftRecord.PTYPE)
// Then group the ThriftRecords in preparation for writing them
.parallelDo(new ThriftRecord.AsPair(), ThriftRecord.AsPair.PTYPE)
.groupByKey(params.createGroupingOptions())
// Finally write the ThriftRecords to Cassandra
.write(new ThriftTarget(outputUri, params));
}
else if (protocol.equalsIgnoreCase("cql")) {
records
// In case of CQL, convert ByteBuffers to CQLRecords
.parallelDo(new LegacyHdfsToCQL(), CQLRecord.PTYPE)
.by(params.getKeyFn(), Avros.bytes())
.groupByKey(params.createGroupingOptions())
.write(new CQLTarget(outputUri, params));
}
// Execute the pipeline
PipelineResult result = pipeline.done();
return result.succeeded() ? 0 : 1;
}
示例7
@Override
public int run(String[] args) throws Exception {
new JCommander(this, args);
URI outputUri = URI.create(output);
// Our crunch job is a MapReduce job
Configuration conf = getConf();
conf.setBoolean(MRJobConfig.MAP_SPECULATIVE, Boolean.FALSE);
conf.setBoolean(MRJobConfig.REDUCE_SPECULATIVE, Boolean.FALSE);
Pipeline pipeline = new MRPipeline(Hdfs2Cass.class, conf);
// Parse & fetch info about target Cassandra cluster
CassandraParams params = CassandraParams.parse(outputUri);
PCollection<GenericRecord> records =
((PCollection<GenericRecord>)(PCollection) pipeline.read(From.avroFile(inputList(input))));
String protocol = outputUri.getScheme();
if (protocol.equalsIgnoreCase("thrift")) {
records
// First convert ByteBuffers to ThriftRecords
.parallelDo(new AvroToThrift(rowkey, timestamp, ttl, ignore), ThriftRecord.PTYPE)
// Then group the ThriftRecords in preparation for writing them
.parallelDo(new ThriftRecord.AsPair(), ThriftRecord.AsPair.PTYPE)
.groupByKey(params.createGroupingOptions())
// Finally write the ThriftRecords to Cassandra
.write(new ThriftTarget(outputUri, params));
}
else if (protocol.equalsIgnoreCase("cql")) {
records
// In case of CQL, convert ByteBuffers to CQLRecords
.parallelDo(new AvroToCQL(rowkey, timestamp, ttl, ignore), CQLRecord.PTYPE)
.by(params.getKeyFn(), Avros.bytes())
.groupByKey(params.createGroupingOptions())
.write(new CQLTarget(outputUri, params));
}
// Execute the pipeline
PipelineResult result = pipeline.done();
return result.succeeded() ? 0 : 1;
}
示例8
/**
* Partitions {@code collection} to be stored efficiently in {@code View}.
* <p>
* This restructures the parallel collection so that all of the entities that
* will be stored in a given partition will be evenly distributed across a specified
* {@code numPartitionWriters}.
* <p>
* If the dataset is not partitioned, then this will structure all of the
* entities to produce a number of files equal to {@code numWriters}.
*
* @param collection a collection of entities
* @param view a {@link View} of a dataset to partition the collection for
* @param numWriters the number of writers that should be used
* @param numPartitionWriters the number of writers data for a single partition will be distributed across
* @param <E> the type of entities in the collection and underlying dataset
* @return an equivalent collection of entities partitioned for the view
* @see #partition(PCollection, View)
*
* @since 1.1.0
*/
public static <E> PCollection<E> partition(PCollection<E> collection,
View<E> view,
int numWriters, int numPartitionWriters) {
//ensure the number of writers is honored whether it is per partition or total.
DatasetDescriptor descriptor = view.getDataset().getDescriptor();
if (descriptor.isPartitioned()) {
GetStorageKey<E> getKey = new GetStorageKey<E>(view, numPartitionWriters);
PTable<Pair<GenericData.Record, Integer>, E> table = collection
.by(getKey, Avros.pairs(Avros.generics(getKey.schema()), Avros.ints()));
PGroupedTable<Pair<GenericData.Record, Integer>, E> grouped =
numWriters > 0 ? table.groupByKey(numWriters) : table.groupByKey();
return grouped.ungroup().values();
} else {
return partition(collection, numWriters);
}
}
示例9
@Test
public void testUseReaderSchema() throws IOException {
// Create a schema with only a username, so we can test reading it
// with an enhanced record structure.
Schema oldRecordSchema = SchemaBuilder.record("org.kitesdk.data.user.OldUserRecord")
.fields()
.requiredString("username")
.endRecord();
// create the dataset
Dataset<Record> in = repo.create("ns", "in", new DatasetDescriptor.Builder()
.schema(oldRecordSchema).build());
Dataset<Record> out = repo.create("ns", "out", new DatasetDescriptor.Builder()
.schema(oldRecordSchema).build());
Record oldUser = new Record(oldRecordSchema);
oldUser.put("username", "user");
DatasetWriter<Record> writer = in.newWriter();
try {
writer.write(oldUser);
} finally {
writer.close();
}
Pipeline pipeline = new MRPipeline(TestCrunchDatasets.class);
// read data from updated dataset that has the new schema.
// At this point, User class has the old schema
PCollection<NewUserRecord> data = pipeline.read(CrunchDatasets.asSource(in.getUri(),
NewUserRecord.class));
PCollection<NewUserRecord> processed = data.parallelDo(new UserRecordIdentityFn(),
Avros.records(NewUserRecord.class));
pipeline.write(processed, CrunchDatasets.asTarget(out));
DatasetReader reader = out.newReader();
Assert.assertTrue("Pipeline failed.", pipeline.run().succeeded());
try {
// there should be one record that is equal to our old user generic record.
Assert.assertEquals(oldUser, reader.next());
Assert.assertFalse(reader.hasNext());
} finally {
reader.close();
}
}
示例10
@Test
public void testUseReaderSchemaParquet() throws IOException {
// Create a schema with only a username, so we can test reading it
// with an enhanced record structure.
Schema oldRecordSchema = SchemaBuilder.record("org.kitesdk.data.user.OldUserRecord")
.fields()
.requiredString("username")
.endRecord();
// create the dataset
Dataset<Record> in = repo.create("ns", "in", new DatasetDescriptor.Builder()
.format(Formats.PARQUET).schema(oldRecordSchema).build());
Dataset<Record> out = repo.create("ns", "out", new DatasetDescriptor.Builder()
.format(Formats.PARQUET).schema(oldRecordSchema).build());
Record oldUser = new Record(oldRecordSchema);
oldUser.put("username", "user");
DatasetWriter<Record> writer = in.newWriter();
try {
writer.write(oldUser);
} finally {
writer.close();
}
Pipeline pipeline = new MRPipeline(TestCrunchDatasets.class);
// read data from updated dataset that has the new schema.
// At this point, User class has the old schema
PCollection<NewUserRecord> data = pipeline.read(CrunchDatasets.asSource(in.getUri(),
NewUserRecord.class));
PCollection<NewUserRecord> processed = data.parallelDo(new UserRecordIdentityFn(),
Avros.records(NewUserRecord.class));
pipeline.write(processed, CrunchDatasets.asTarget(out));
DatasetReader reader = out.newReader();
Assert.assertTrue("Pipeline failed.", pipeline.run().succeeded());
try {
// there should be one record that is equal to our old user generic record.
Assert.assertEquals(oldUser, reader.next());
Assert.assertFalse(reader.hasNext());
} finally {
reader.close();
}
}