Java源码示例:org.apache.tez.mapreduce.hadoop.MRHelpers
示例1
public List<Event> initialize(TezRootInputInitializerContext rootInputContext) throws Exception {
MRInputUserPayloadProto userPayloadProto = MRHelpers
.parseMRInputPayload(rootInputContext.getUserPayload());
Configuration conf = MRHelpers.createConfFromByteString(userPayloadProto
.getConfigurationBytes());
try {
ReflectionUtils.getClazz(RELOCALIZATION_TEST_CLASS_NAME);
LOG.info("Class found");
FileSystem fs = FileSystem.get(conf);
fs.mkdirs(new Path("/tmp/relocalizationfilefound"));
} catch (TezUncheckedException e) {
LOG.info("Class not found");
}
return super.initialize(rootInputContext);
}
示例2
/**
* Helper API to generate the user payload for the MRInput and
* MRInputAMSplitGenerator (if used). The InputFormat will be invoked by Tez
* at DAG runtime to generate the input splits.
*
* @param conf
* Configuration for the InputFormat
* @param inputFormatClassName
* Name of the class of the InputFormat
* @param useNewApi
* use new mapreduce API or old mapred API
* @param groupSplitsInAM
* do grouping of splits in the AM. If true then splits generated by
* the InputFormat will be grouped in the AM based on available
* resources, locality etc. This option may be set to true only when
* using MRInputAMSplitGenerator as the initializer class in
* {@link Vertex#addInput(String, org.apache.tez.dag.api.InputDescriptor, Class)}
* @return returns the user payload to be set on the InputDescriptor of MRInput
* @throws IOException
*/
public static byte[] createUserPayload(Configuration conf,
String inputFormatClassName, boolean useNewApi, boolean groupSplitsInAM)
throws IOException {
Configuration inputConf = new JobConf(conf);
String wrappedInputFormatClassName = null;
String configInputFormatClassName = null;
if (groupSplitsInAM) {
wrappedInputFormatClassName = inputFormatClassName;
configInputFormatClassName = TezGroupedSplitsInputFormat.class.getName();
} else {
wrappedInputFormatClassName = null;
configInputFormatClassName = inputFormatClassName;
}
inputConf.set(MRJobConfig.INPUT_FORMAT_CLASS_ATTR,
configInputFormatClassName);
inputConf.setBoolean("mapred.mapper.new-api", useNewApi);
MRHelpers.translateVertexConfToTez(inputConf);
MRHelpers.doJobClientMagic(inputConf);
if (groupSplitsInAM) {
return MRHelpers.createMRInputPayloadWithGrouping(inputConf,
wrappedInputFormatClassName);
} else {
return MRHelpers.createMRInputPayload(inputConf, null);
}
}
示例3
/**
* Creates the user payload to be set on the OutputDescriptor for MROutput
*/
private UserPayload createUserPayload() {
// set which api is being used always
conf.setBoolean(MRJobConfig.NEW_API_REDUCER_CONFIG, useNewApi);
conf.setBoolean(MRJobConfig.NEW_API_MAPPER_CONFIG, useNewApi);
if (outputFormatProvided) {
if (useNewApi) {
conf.set(MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR, outputFormat.getName());
} else {
conf.set("mapred.output.format.class", outputFormat.getName());
}
}
MRHelpers.translateMRConfToTez(conf);
try {
return TezUtils.createUserPayloadFromConf(conf);
} catch (IOException e) {
throw new TezUncheckedException(e);
}
}
示例4
private DataSourceDescriptor createCustomDataSource() throws IOException {
setupBasicConf(conf);
MRHelpers.translateMRConfToTez(conf);
Collection<URI> uris = maybeGetURIsForCredentials();
UserPayload payload = MRInputHelpersInternal.createMRInputPayload(
conf, groupSplitsInAM, sortSplitsInAM);
DataSourceDescriptor ds = DataSourceDescriptor
.create(InputDescriptor.create(inputClassName).setUserPayload(payload),
customInitializerDescriptor, null);
if (conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT,
TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT_DEFAULT)) {
ds.getInputDescriptor().setHistoryText(TezUtils.convertToHistoryText(conf));
}
if (uris != null) {
ds.addURIsForCredentials(uris);
}
return ds;
}
示例5
private DataSourceDescriptor createGeneratorDataSource() throws IOException {
setupBasicConf(conf);
MRHelpers.translateMRConfToTez(conf);
Collection<URI> uris = maybeGetURIsForCredentials();
UserPayload payload = MRInputHelpersInternal.createMRInputPayload(
conf, groupSplitsInAM, sortSplitsInAM);
DataSourceDescriptor ds = DataSourceDescriptor.create(
InputDescriptor.create(inputClassName).setUserPayload(payload),
InputInitializerDescriptor.create(MRInputAMSplitGenerator.class.getName()), null);
if (conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT,
TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT_DEFAULT)) {
ds.getInputDescriptor().setHistoryText(TezUtils.convertToHistoryText(conf));
}
if (uris != null) {
ds.addURIsForCredentials(uris);
}
return ds;
}
示例6
private DAG createDAG(FileSystem fs, TezConfiguration tezConf,
Map<String, LocalResource> localResources, Path stagingDir,
String inputPath, String outputPath) throws IOException {
Configuration inputConf = new Configuration(tezConf);
inputConf.set(FileInputFormat.INPUT_DIR, inputPath);
InputDescriptor id = new InputDescriptor(MRInput.class.getName())
.setUserPayload(MRInput.createUserPayload(inputConf,
TextInputFormat.class.getName(), true, true));
Configuration outputConf = new Configuration(tezConf);
outputConf.set(FileOutputFormat.OUTDIR, outputPath);
OutputDescriptor od = new OutputDescriptor(MROutput.class.getName())
.setUserPayload(MROutput.createUserPayload(
outputConf, TextOutputFormat.class.getName(), true));
Vertex tokenizerVertex = new Vertex("tokenizer", new ProcessorDescriptor(
TokenProcessor.class.getName()), -1, MRHelpers.getMapResource(tezConf));
tokenizerVertex.addInput("MRInput", id, MRInputAMSplitGenerator.class);
Vertex summerVertex = new Vertex("summer",
new ProcessorDescriptor(
SumProcessor.class.getName()), 1, MRHelpers.getReduceResource(tezConf));
summerVertex.addOutput("MROutput", od, MROutputCommitter.class);
OrderedPartitionedKVEdgeConfigurer edgeConf = OrderedPartitionedKVEdgeConfigurer
.newBuilder(Text.class.getName(), IntWritable.class.getName(),
HashPartitioner.class.getName(), null).build();
DAG dag = new DAG("WordCount");
dag.addVertex(tokenizerVertex)
.addVertex(summerVertex)
.addEdge(
new Edge(tokenizerVertex, summerVertex, edgeConf.createDefaultEdgeProperty()));
return dag;
}
示例7
private DAG createDag(TezConfiguration tezConf, Path largeOutPath, Path smallOutPath,
Path expectedOutputPath, int numTasks, long largeOutSize, long smallOutSize)
throws IOException {
long largeOutSizePerTask = largeOutSize / numTasks;
long smallOutSizePerTask = smallOutSize / numTasks;
DAG dag = new DAG("IntersectDataGen");
byte[] streamOutputPayload = createPayloadForOutput(largeOutPath, tezConf);
byte[] hashOutputPayload = createPayloadForOutput(smallOutPath, tezConf);
byte[] expectedOutputPayload = createPayloadForOutput(expectedOutputPath, tezConf);
Vertex genDataVertex = new Vertex("datagen", new ProcessorDescriptor(
GenDataProcessor.class.getName()).setUserPayload(GenDataProcessor.createConfiguration(
largeOutSizePerTask, smallOutSizePerTask)), numTasks, MRHelpers.getMapResource(tezConf));
genDataVertex.addOutput(STREAM_OUTPUT_NAME,
new OutputDescriptor(MROutput.class.getName()).setUserPayload(streamOutputPayload),
MROutputCommitter.class);
genDataVertex.addOutput(HASH_OUTPUT_NAME,
new OutputDescriptor(MROutput.class.getName()).setUserPayload(hashOutputPayload),
MROutputCommitter.class);
genDataVertex.addOutput(EXPECTED_OUTPUT_NAME,
new OutputDescriptor(MROutput.class.getName()).setUserPayload(expectedOutputPayload),
MROutputCommitter.class);
dag.addVertex(genDataVertex);
return dag;
}
示例8
/**
* Creates the user payload to be set on the OutputDescriptor for MROutput
* @param conf Configuration for the OutputFormat
* @param outputFormatName Name of the class of the OutputFormat
* @param useNewApi Use new mapreduce API or old mapred API
* @return
* @throws IOException
*/
public static byte[] createUserPayload(Configuration conf,
String outputFormatName, boolean useNewApi) throws IOException {
Configuration outputConf = new JobConf(conf);
outputConf.set(MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR, outputFormatName);
outputConf.setBoolean("mapred.mapper.new-api", useNewApi);
MRHelpers.translateVertexConfToTez(outputConf);
MRHelpers.doJobClientMagic(outputConf);
return TezUtils.createUserPayloadFromConf(outputConf);
}
示例9
private void setupMapReduceEnv(Configuration jobConf,
Map<String, String> environment, boolean isMap) throws IOException {
if (isMap) {
warnForJavaLibPath(
jobConf.get(MRJobConfig.MAP_JAVA_OPTS,""),
"map",
MRJobConfig.MAP_JAVA_OPTS,
MRJobConfig.MAP_ENV);
warnForJavaLibPath(
jobConf.get(MRJobConfig.MAPRED_MAP_ADMIN_JAVA_OPTS,""),
"map",
MRJobConfig.MAPRED_MAP_ADMIN_JAVA_OPTS,
MRJobConfig.MAPRED_ADMIN_USER_ENV);
} else {
warnForJavaLibPath(
jobConf.get(MRJobConfig.REDUCE_JAVA_OPTS,""),
"reduce",
MRJobConfig.REDUCE_JAVA_OPTS,
MRJobConfig.REDUCE_ENV);
warnForJavaLibPath(
jobConf.get(MRJobConfig.MAPRED_REDUCE_ADMIN_JAVA_OPTS,""),
"reduce",
MRJobConfig.MAPRED_REDUCE_ADMIN_JAVA_OPTS,
MRJobConfig.MAPRED_ADMIN_USER_ENV);
}
MRHelpers.updateEnvironmentForMRTasks(jobConf, environment, isMap);
}
示例10
public List<Event> initialize() throws IOException {
getContext().requestInitialMemory(0l, null); // mandatory call
MRRuntimeProtos.MRInputUserPayloadProto mrUserPayload =
MRHelpers.parseMRInputPayload(getContext().getUserPayload());
Preconditions.checkArgument(mrUserPayload.hasSplits() == false,
"Split information not expected in " + this.getClass().getName());
Configuration conf = MRHelpers.createConfFromByteString(mrUserPayload.getConfigurationBytes());
this.jobConf = new JobConf(conf);
// Add tokens to the jobConf - in case they are accessed within the RR / IF
jobConf.getCredentials().mergeAll(UserGroupInformation.getCurrentUser().getCredentials());
TaskAttemptID taskAttemptId = new TaskAttemptID(
new TaskID(
Long.toString(getContext().getApplicationId().getClusterTimestamp()),
getContext().getApplicationId().getId(), TaskType.MAP,
getContext().getTaskIndex()),
getContext().getTaskAttemptNumber());
jobConf.set(MRJobConfig.TASK_ATTEMPT_ID,
taskAttemptId.toString());
jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID,
getContext().getDAGAttemptNumber());
this.inputRecordCounter = getContext().getCounters().findCounter(
TaskCounter.INPUT_RECORDS_PROCESSED);
useNewApi = this.jobConf.getUseNewMapper();
return null;
}
示例11
private void setupMapReduceEnv(Configuration jobConf,
Map<String, String> environment, boolean isMap) throws IOException {
if (isMap) {
warnForJavaLibPath(
jobConf.get(MRJobConfig.MAP_JAVA_OPTS,""),
"map",
MRJobConfig.MAP_JAVA_OPTS,
MRJobConfig.MAP_ENV);
warnForJavaLibPath(
jobConf.get(MRJobConfig.MAPRED_MAP_ADMIN_JAVA_OPTS,""),
"map",
MRJobConfig.MAPRED_MAP_ADMIN_JAVA_OPTS,
MRJobConfig.MAPRED_ADMIN_USER_ENV);
} else {
warnForJavaLibPath(
jobConf.get(MRJobConfig.REDUCE_JAVA_OPTS,""),
"reduce",
MRJobConfig.REDUCE_JAVA_OPTS,
MRJobConfig.REDUCE_ENV);
warnForJavaLibPath(
jobConf.get(MRJobConfig.MAPRED_REDUCE_ADMIN_JAVA_OPTS,""),
"reduce",
MRJobConfig.MAPRED_REDUCE_ADMIN_JAVA_OPTS,
MRJobConfig.MAPRED_ADMIN_USER_ENV);
}
MRHelpers.updateEnvBasedOnMRTaskEnv(jobConf, environment, isMap);
}
示例12
private DataSourceDescriptor createDistributorDataSource() throws IOException {
InputSplitInfo inputSplitInfo;
setupBasicConf(conf);
try {
inputSplitInfo = MRInputHelpers.generateInputSplitsToMem(conf, false, true, 0);
} catch (Exception e) {
throw new TezUncheckedException(e);
}
MRHelpers.translateMRConfToTez(conf);
UserPayload payload = MRInputHelpersInternal.createMRInputPayload(conf,
inputSplitInfo.getSplitsProto());
Credentials credentials = null;
if (getCredentialsForSourceFilesystem && inputSplitInfo.getCredentials() != null) {
credentials = inputSplitInfo.getCredentials();
}
DataSourceDescriptor ds = DataSourceDescriptor.create(
InputDescriptor.create(inputClassName).setUserPayload(payload),
InputInitializerDescriptor.create(MRInputSplitDistributor.class.getName()),
inputSplitInfo.getNumTasks(), credentials,
VertexLocationHint.create(inputSplitInfo.getTaskLocationHints()), null);
if (conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT,
TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT_DEFAULT)) {
ds.getInputDescriptor().setHistoryText(TezUtils.convertToHistoryText(conf));
}
return ds;
}
示例13
private DAG createDag(TezConfiguration tezConf, Path lhs, Path rhs, int numPartitions)
throws IOException {
DAG dag = new DAG("IntersectValidate");
// Configuration for src1
Configuration lhsInputConf = new Configuration(tezConf);
lhsInputConf.set(FileInputFormat.INPUT_DIR, lhs.toUri().toString());
byte[] streamInputPayload = MRInput.createUserPayload(lhsInputConf,
TextInputFormat.class.getName(), true, false);
// Configuration for src2
Configuration rhsInputConf = new Configuration(tezConf);
rhsInputConf.set(FileInputFormat.INPUT_DIR, rhs.toUri().toString());
byte[] hashInputPayload = MRInput.createUserPayload(rhsInputConf,
TextInputFormat.class.getName(), true, false);
// Configuration for intermediate output - shared by Vertex1 and Vertex2
// This should only be setting selective keys from the underlying conf. Fix after there's a
// better mechanism to configure the IOs.
OrderedPartitionedKVEdgeConfigurer edgeConf = OrderedPartitionedKVEdgeConfigurer
.newBuilder(Text.class.getName(), NullWritable.class.getName(),
HashPartitioner.class.getName(), null).build();
// Change the way resources are setup - no MRHelpers
Vertex lhsVertex = new Vertex(LHS_INPUT_NAME, new ProcessorDescriptor(
ForwardingProcessor.class.getName()), -1,
MRHelpers.getMapResource(tezConf)).addInput("lhs", new InputDescriptor(
MRInput.class.getName()).setUserPayload(streamInputPayload),
MRInputAMSplitGenerator.class);
Vertex rhsVertex = new Vertex(RHS_INPUT_NAME, new ProcessorDescriptor(
ForwardingProcessor.class.getName()), -1,
MRHelpers.getMapResource(tezConf)).addInput("rhs", new InputDescriptor(
MRInput.class.getName()).setUserPayload(hashInputPayload),
MRInputAMSplitGenerator.class);
Vertex intersectValidateVertex = new Vertex("intersectvalidate",
new ProcessorDescriptor(IntersectValidateProcessor.class.getName()),
numPartitions, MRHelpers.getReduceResource(tezConf));
Edge e1 = new Edge(lhsVertex, intersectValidateVertex, edgeConf.createDefaultEdgeProperty());
Edge e2 = new Edge(rhsVertex, intersectValidateVertex, edgeConf.createDefaultEdgeProperty());
dag.addVertex(lhsVertex).addVertex(rhsVertex).addVertex(intersectValidateVertex).addEdge(e1)
.addEdge(e2);
return dag;
}
示例14
private DAG createDag(TezConfiguration tezConf, Path streamPath, Path hashPath, Path outPath,
int numPartitions) throws IOException {
DAG dag = new DAG("IntersectExample");
// Configuration for src1
Configuration streamInputConf = new Configuration(tezConf);
streamInputConf.set(FileInputFormat.INPUT_DIR, streamPath.toUri().toString());
byte[] streamInputPayload = MRInput.createUserPayload(streamInputConf,
TextInputFormat.class.getName(), true, false);
// Configuration for src2
Configuration hashInputConf = new Configuration(tezConf);
hashInputConf.set(FileInputFormat.INPUT_DIR, hashPath.toUri().toString());
byte[] hashInputPayload = MRInput.createUserPayload(hashInputConf,
TextInputFormat.class.getName(), true, false);
// Configuration for intermediate output - shared by Vertex1 and Vertex2
// This should only be setting selective keys from the underlying conf. Fix after there's a
// better mechanism to configure the IOs.
UnorderedPartitionedKVEdgeConfigurer edgeConf =
UnorderedPartitionedKVEdgeConfigurer
.newBuilder(Text.class.getName(), NullWritable.class.getName(),
HashPartitioner.class.getName(), null).build();
Configuration finalOutputConf = new Configuration(tezConf);
finalOutputConf.set(FileOutputFormat.OUTDIR, outPath.toUri().toString());
byte[] finalOutputPayload = MROutput.createUserPayload(finalOutputConf,
TextOutputFormat.class.getName(), true);
// Change the way resources are setup - no MRHelpers
Vertex streamFileVertex = new Vertex("partitioner1",
new ProcessorDescriptor(ForwardingProcessor.class.getName()), -1,
MRHelpers.getMapResource(tezConf)).addInput("streamfile",
new InputDescriptor(MRInput.class.getName())
.setUserPayload(streamInputPayload), MRInputAMSplitGenerator.class);
Vertex hashFileVertex = new Vertex("partitioner2", new ProcessorDescriptor(
ForwardingProcessor.class.getName()), -1,
MRHelpers.getMapResource(tezConf)).addInput("hashfile",
new InputDescriptor(MRInput.class.getName())
.setUserPayload(hashInputPayload), MRInputAMSplitGenerator.class);
Vertex intersectVertex = new Vertex("intersect", new ProcessorDescriptor(
IntersectProcessor.class.getName()), numPartitions,
MRHelpers.getReduceResource(tezConf)).addOutput("finalOutput",
new OutputDescriptor(MROutput.class.getName())
.setUserPayload(finalOutputPayload), MROutputCommitter.class);
Edge e1 = new Edge(streamFileVertex, intersectVertex, edgeConf.createDefaultEdgeProperty());
Edge e2 = new Edge(hashFileVertex, intersectVertex, edgeConf.createDefaultEdgeProperty());
dag.addVertex(streamFileVertex).addVertex(hashFileVertex).addVertex(intersectVertex)
.addEdge(e1).addEdge(e2);
return dag;
}
示例15
private DAG createDAG(FileSystem fs, TezConfiguration tezConf,
Path stagingDir, boolean doLocalityCheck) throws IOException, YarnException {
JobConf mrConf = new JobConf(tezConf);
int numBroadcastTasks = 2;
int numOneToOneTasks = 3;
if (doLocalityCheck) {
YarnClient yarnClient = YarnClient.createYarnClient();
yarnClient.init(tezConf);
yarnClient.start();
int numNMs = yarnClient.getNodeReports(NodeState.RUNNING).size();
yarnClient.stop();
// create enough 1-1 tasks to run in parallel
numOneToOneTasks = numNMs - numBroadcastTasks - 1;// 1 AM
if (numOneToOneTasks < 1) {
numOneToOneTasks = 1;
}
}
byte[] procPayload = {(byte) (doLocalityCheck ? 1 : 0), 1};
System.out.println("Using " + numOneToOneTasks + " 1-1 tasks");
Vertex broadcastVertex = new Vertex("Broadcast", new ProcessorDescriptor(
InputProcessor.class.getName()),
numBroadcastTasks, MRHelpers.getMapResource(mrConf));
Vertex inputVertex = new Vertex("Input", new ProcessorDescriptor(
InputProcessor.class.getName()).setUserPayload(procPayload),
numOneToOneTasks, MRHelpers.getMapResource(mrConf));
Vertex oneToOneVertex = new Vertex("OneToOne",
new ProcessorDescriptor(
OneToOneProcessor.class.getName()).setUserPayload(procPayload),
-1, MRHelpers.getReduceResource(mrConf));
oneToOneVertex.setVertexManagerPlugin(
new VertexManagerPluginDescriptor(InputReadyVertexManager.class.getName()));
UnorderedUnpartitionedKVEdgeConfigurer edgeConf = UnorderedUnpartitionedKVEdgeConfigurer
.newBuilder(Text.class.getName(), IntWritable.class.getName()).build();
DAG dag = new DAG("BroadcastAndOneToOneExample");
dag.addVertex(inputVertex)
.addVertex(broadcastVertex)
.addVertex(oneToOneVertex)
.addEdge(
new Edge(inputVertex, oneToOneVertex, edgeConf.createDefaultOneToOneEdgeProperty()))
.addEdge(
new Edge(broadcastVertex, oneToOneVertex,
edgeConf.createDefaultBroadcastEdgeProperty()));
return dag;
}
示例16
private DAG createDAG(FileSystem fs, TezConfiguration tezConf,
Map<String, LocalResource> localResources, Path stagingDir,
String inputPath, String outputPath) throws IOException {
DAG dag = new DAG("UnionExample");
int numMaps = -1;
Configuration inputConf = new Configuration(tezConf);
inputConf.set(FileInputFormat.INPUT_DIR, inputPath);
InputDescriptor id = new InputDescriptor(MRInput.class.getName())
.setUserPayload(MRInput.createUserPayload(inputConf,
TextInputFormat.class.getName(), true, true));
Vertex mapVertex1 = new Vertex("map1", new ProcessorDescriptor(
TokenProcessor.class.getName()),
numMaps, MRHelpers.getMapResource(tezConf));
mapVertex1.addInput("MRInput", id, MRInputAMSplitGenerator.class);
Vertex mapVertex2 = new Vertex("map2", new ProcessorDescriptor(
TokenProcessor.class.getName()),
numMaps, MRHelpers.getMapResource(tezConf));
mapVertex2.addInput("MRInput", id, MRInputAMSplitGenerator.class);
Vertex mapVertex3 = new Vertex("map3", new ProcessorDescriptor(
TokenProcessor.class.getName()),
numMaps, MRHelpers.getMapResource(tezConf));
mapVertex3.addInput("MRInput", id, MRInputAMSplitGenerator.class);
Vertex checkerVertex = new Vertex("checker",
new ProcessorDescriptor(
UnionProcessor.class.getName()),
1, MRHelpers.getReduceResource(tezConf));
Configuration outputConf = new Configuration(tezConf);
outputConf.set(FileOutputFormat.OUTDIR, outputPath);
OutputDescriptor od = new OutputDescriptor(MROutput.class.getName())
.setUserPayload(MROutput.createUserPayload(
outputConf, TextOutputFormat.class.getName(), true));
checkerVertex.addOutput("union", od, MROutputCommitter.class);
Configuration allPartsConf = new Configuration(tezConf);
allPartsConf.set(FileOutputFormat.OUTDIR, outputPath+"-all-parts");
OutputDescriptor od2 = new OutputDescriptor(MROutput.class.getName())
.setUserPayload(MROutput.createUserPayload(
allPartsConf, TextOutputFormat.class.getName(), true));
checkerVertex.addOutput("all-parts", od2, MROutputCommitter.class);
Configuration partsConf = new Configuration(tezConf);
partsConf.set(FileOutputFormat.OUTDIR, outputPath+"-parts");
VertexGroup unionVertex = dag.createVertexGroup("union", mapVertex1, mapVertex2);
OutputDescriptor od1 = new OutputDescriptor(MROutput.class.getName())
.setUserPayload(MROutput.createUserPayload(
partsConf, TextOutputFormat.class.getName(), true));
unionVertex.addOutput("parts", od1, MROutputCommitter.class);
OrderedPartitionedKVEdgeConfigurer edgeConf = OrderedPartitionedKVEdgeConfigurer
.newBuilder(Text.class.getName(), IntWritable.class.getName(),
HashPartitioner.class.getName(), null).build();
dag.addVertex(mapVertex1)
.addVertex(mapVertex2)
.addVertex(mapVertex3)
.addVertex(checkerVertex)
.addEdge(
new Edge(mapVertex3, checkerVertex, edgeConf.createDefaultEdgeProperty()))
.addEdge(
new GroupInputEdge(unionVertex, checkerVertex, edgeConf.createDefaultEdgeProperty(),
new InputDescriptor(
ConcatenatedMergedKeyValuesInput.class.getName())));
return dag;
}
示例17
private Vertex createVertexForStage(Configuration stageConf,
Map<String, LocalResource> jobLocalResources,
List<TaskLocationHint> locations, int stageNum, int totalStages)
throws IOException {
// stageNum starts from 0, goes till numStages - 1
boolean isMap = false;
if (stageNum == 0) {
isMap = true;
}
int numTasks = isMap ? stageConf.getInt(MRJobConfig.NUM_MAPS, 0)
: stageConf.getInt(MRJobConfig.NUM_REDUCES, 0);
String processorName = isMap ? MapProcessor.class.getName()
: ReduceProcessor.class.getName();
String vertexName = null;
if (isMap) {
vertexName = MultiStageMRConfigUtil.getInitialMapVertexName();
} else {
if (stageNum == totalStages - 1) {
vertexName = MultiStageMRConfigUtil.getFinalReduceVertexName();
} else {
vertexName = MultiStageMRConfigUtil
.getIntermediateStageVertexName(stageNum);
}
}
Resource taskResource = isMap ? MRHelpers.getMapResource(stageConf)
: MRHelpers.getReduceResource(stageConf);
stageConf.set(MRJobConfig.MROUTPUT_FILE_NAME_PREFIX, "part");
byte[] vertexUserPayload = MRHelpers.createUserPayloadFromConf(stageConf);
Vertex vertex = new Vertex(vertexName, new ProcessorDescriptor(processorName).
setUserPayload(vertexUserPayload),
numTasks, taskResource);
if (isMap) {
byte[] mapInputPayload = MRHelpers.createMRInputPayload(vertexUserPayload, null);
MRHelpers.addMRInput(vertex, mapInputPayload, null);
}
// Map only jobs.
if (stageNum == totalStages -1) {
MRHelpers.addMROutputLegacy(vertex, vertexUserPayload);
}
Map<String, String> taskEnv = new HashMap<String, String>();
setupMapReduceEnv(stageConf, taskEnv, isMap);
Map<String, LocalResource> taskLocalResources =
new TreeMap<String, LocalResource>();
// PRECOMMIT Remove split localization for reduce tasks if it's being set
// here
taskLocalResources.putAll(jobLocalResources);
String taskJavaOpts = isMap ? MRHelpers.getMapJavaOpts(stageConf)
: MRHelpers.getReduceJavaOpts(stageConf);
vertex.setTaskEnvironment(taskEnv)
.setTaskLocalFiles(taskLocalResources)
.setTaskLocationsHint(locations)
.setTaskLaunchCmdOpts(taskJavaOpts);
if (!isMap) {
vertex.setVertexManagerPlugin(new VertexManagerPluginDescriptor(
ShuffleVertexManager.class.getName()));
}
if (LOG.isDebugEnabled()) {
LOG.debug("Adding vertex to DAG" + ", vertexName="
+ vertex.getName() + ", processor="
+ vertex.getProcessorDescriptor().getClassName() + ", parallelism="
+ vertex.getParallelism() + ", javaOpts=" + vertex.getTaskLaunchCmdOpts()
+ ", resources=" + vertex.getTaskResource()
// TODO Add localResources and Environment
);
}
return vertex;
}
示例18
public static org.apache.hadoop.mapreduce.InputSplit getNewSplitDetailsFromEvent(
MRSplitProto splitProto, Configuration conf) throws IOException {
SerializationFactory serializationFactory = new SerializationFactory(conf);
return MRHelpers.createNewFormatSplitFromUserPayload(
splitProto, serializationFactory);
}
示例19
@Private
public static InputSplit getOldSplitDetailsFromEvent(MRSplitProto splitProto, Configuration conf)
throws IOException {
SerializationFactory serializationFactory = new SerializationFactory(conf);
return MRHelpers.createOldFormatSplitFromUserPayload(splitProto, serializationFactory);
}
示例20
@Override
public List<Event> initialize(TezRootInputInitializerContext rootInputContext)
throws IOException {
Stopwatch sw = null;
if (LOG.isDebugEnabled()) {
sw = new Stopwatch().start();
}
MRInputUserPayloadProto userPayloadProto = MRHelpers.parseMRInputPayload(rootInputContext.getUserPayload());
if (LOG.isDebugEnabled()) {
sw.stop();
LOG.debug("Time to parse MRInput payload into prot: "
+ sw.elapsedMillis());
}
Configuration conf = MRHelpers.createConfFromByteString(userPayloadProto
.getConfigurationBytes());
JobConf jobConf = new JobConf(conf);
boolean useNewApi = jobConf.getUseNewMapper();
sendSerializedEvents = conf.getBoolean(
MRJobConfig.MR_TEZ_INPUT_INITIALIZER_SERIALIZE_EVENT_PAYLOAD,
MRJobConfig.MR_TEZ_INPUT_INITIALIZER_SERIALIZE_EVENT_PAYLOAD_DEFAULT);
LOG.info("Emitting serialized splits: " + sendSerializedEvents);
this.splitsProto = userPayloadProto.getSplits();
MRInputUserPayloadProto.Builder updatedPayloadBuilder = MRInputUserPayloadProto.newBuilder(userPayloadProto);
updatedPayloadBuilder.clearSplits();
List<Event> events = Lists.newArrayListWithCapacity(this.splitsProto.getSplitsCount() + 1);
RootInputUpdatePayloadEvent updatePayloadEvent = new RootInputUpdatePayloadEvent(
updatedPayloadBuilder.build().toByteArray());
events.add(updatePayloadEvent);
int count = 0;
for (MRSplitProto mrSplit : this.splitsProto.getSplitsList()) {
RootInputDataInformationEvent diEvent;
if (sendSerializedEvents) {
// Unnecessary array copy, can be avoided by using ByteBuffer instead of
// a raw array.
diEvent = new RootInputDataInformationEvent(count++, mrSplit.toByteArray());
} else {
if (useNewApi) {
org.apache.hadoop.mapreduce.InputSplit newInputSplit = MRInputUtils
.getNewSplitDetailsFromEvent(mrSplit, conf);
diEvent = new RootInputDataInformationEvent(count++, newInputSplit);
} else {
org.apache.hadoop.mapred.InputSplit oldInputSplit = MRInputUtils
.getOldSplitDetailsFromEvent(mrSplit, conf);
diEvent = new RootInputDataInformationEvent(count++, oldInputSplit);
}
}
events.add(diEvent);
}
return events;
}
示例21
@Test
public void testMapProcessor() throws Exception {
String dagName = "mrdag0";
String vertexName = MultiStageMRConfigUtil.getInitialMapVertexName();
JobConf jobConf = new JobConf(defaultConf);
setUpJobConf(jobConf);
MRHelpers.translateVertexConfToTez(jobConf);
jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 0);
jobConf.setBoolean(MRJobConfig.MR_TEZ_SPLITS_VIA_EVENTS, false);
jobConf.set(MRFrameworkConfigs.TASK_LOCAL_RESOURCE_DIR, new Path(workDir,
"localized-resources").toUri().toString());
Path mapInput = new Path(workDir, "map0");
MapUtils.generateInputSplit(localFs, workDir, jobConf, mapInput);
InputSpec mapInputSpec = new InputSpec("NullSrcVertex",
new InputDescriptor(MRInputLegacy.class.getName())
.setUserPayload(MRHelpers.createMRInputPayload(jobConf, null)),
1);
OutputSpec mapOutputSpec = new OutputSpec("NullDestVertex",
new OutputDescriptor(LocalOnFileSorterOutput.class.getName())
.setUserPayload(TezUtils.createUserPayloadFromConf(jobConf)), 1);
LogicalIOProcessorRuntimeTask task = MapUtils.createLogicalTask(localFs, workDir, jobConf, 0,
new Path(workDir, "map0"), new TestUmbilical(), dagName, vertexName,
Collections.singletonList(mapInputSpec),
Collections.singletonList(mapOutputSpec));
task.initialize();
task.run();
task.close();
TezInputContext inputContext = task.getInputContexts().iterator().next();
TezTaskOutput mapOutputs = new TezLocalTaskOutputFiles(jobConf, inputContext.getUniqueIdentifier());
// TODO NEWTEZ FIXME OutputCommitter verification
// MRTask mrTask = (MRTask)t.getProcessor();
// Assert.assertEquals(TezNullOutputCommitter.class.getName(), mrTask
// .getCommitter().getClass().getName());
// t.close();
Path mapOutputFile = mapOutputs.getInputFile(new InputAttemptIdentifier(0, 0));
LOG.info("mapOutputFile = " + mapOutputFile);
IFile.Reader reader =
new IFile.Reader(localFs, mapOutputFile, null, null, null, false, 0, -1);
LongWritable key = new LongWritable();
Text value = new Text();
DataInputBuffer keyBuf = new DataInputBuffer();
DataInputBuffer valueBuf = new DataInputBuffer();
long prev = Long.MIN_VALUE;
while (reader.nextRawKey(keyBuf)) {
reader.nextRawValue(valueBuf);
key.readFields(keyBuf);
value.readFields(valueBuf);
if (prev != Long.MIN_VALUE) {
assert(prev <= key.get());
prev = key.get();
}
LOG.info("key = " + key.get() + "; value = " + value);
}
reader.close();
}
示例22
@Test(timeout = 5000)
public void testSingleSplit() throws Exception {
Path workDir = new Path(TEST_ROOT_DIR, "testSingleSplit");
JobConf jobConf = new JobConf(defaultConf);
jobConf.setInputFormat(org.apache.hadoop.mapred.SequenceFileInputFormat.class);
FileInputFormat.setInputPaths(jobConf, workDir);
MRInputUserPayloadProto.Builder builder = MRInputUserPayloadProto.newBuilder();
builder.setInputFormatName(SequenceFileInputFormat.class.getName());
builder.setConfigurationBytes(TezUtils.createByteStringFromConf(jobConf));
byte[] payload = builder.build().toByteArray();
TezInputContext inputContext = createTezInputContext(payload);
MultiMRInput input = new MultiMRInput();
input.setNumPhysicalInputs(1);
input.initialize(inputContext);
List<Event> eventList = new ArrayList<Event>();
String file1 = "file1";
LinkedHashMap<LongWritable, Text> data1 = createInputData(localFs, workDir, jobConf, file1, 0,
10);
SequenceFileInputFormat<LongWritable, Text> format =
new SequenceFileInputFormat<LongWritable, Text>();
InputSplit[] splits = format.getSplits(jobConf, 1);
assertEquals(1, splits.length);
MRSplitProto splitProto = MRHelpers.createSplitProto(splits[0]);
RootInputDataInformationEvent event = new RootInputDataInformationEvent(0,
splitProto.toByteArray());
eventList.clear();
eventList.add(event);
input.handleEvents(eventList);
int readerCount = 0;
for (KeyValueReader reader : input.getKeyValueReaders()) {
readerCount++;
while (reader.next()) {
if (data1.size() == 0) {
fail("Found more records than expected");
}
Object key = reader.getCurrentKey();
Object val = reader.getCurrentValue();
assertEquals(val, data1.remove(key));
}
}
assertEquals(1, readerCount);
}
示例23
@Test(timeout = 5000)
public void testMultipleSplits() throws Exception {
Path workDir = new Path(TEST_ROOT_DIR, "testMultipleSplits");
JobConf jobConf = new JobConf(defaultConf);
jobConf.setInputFormat(org.apache.hadoop.mapred.SequenceFileInputFormat.class);
FileInputFormat.setInputPaths(jobConf, workDir);
MRInputUserPayloadProto.Builder builder = MRInputUserPayloadProto.newBuilder();
builder.setInputFormatName(SequenceFileInputFormat.class.getName());
builder.setConfigurationBytes(TezUtils.createByteStringFromConf(jobConf));
byte[] payload = builder.build().toByteArray();
TezInputContext inputContext = createTezInputContext(payload);
MultiMRInput input = new MultiMRInput();
input.setNumPhysicalInputs(2);
input.initialize(inputContext);
List<Event> eventList = new ArrayList<Event>();
LinkedHashMap<LongWritable, Text> data = new LinkedHashMap<LongWritable, Text>();
String file1 = "file1";
LinkedHashMap<LongWritable, Text> data1 = createInputData(localFs, workDir, jobConf, file1, 0,
10);
String file2 = "file2";
LinkedHashMap<LongWritable, Text> data2 = createInputData(localFs, workDir, jobConf, file2, 10,
20);
data.putAll(data1);
data.putAll(data2);
SequenceFileInputFormat<LongWritable, Text> format =
new SequenceFileInputFormat<LongWritable, Text>();
InputSplit[] splits = format.getSplits(jobConf, 2);
assertEquals(2, splits.length);
MRSplitProto splitProto1 = MRHelpers.createSplitProto(splits[0]);
RootInputDataInformationEvent event1 = new RootInputDataInformationEvent(0,
splitProto1.toByteArray());
MRSplitProto splitProto2 = MRHelpers.createSplitProto(splits[1]);
RootInputDataInformationEvent event2 = new RootInputDataInformationEvent(0,
splitProto2.toByteArray());
eventList.clear();
eventList.add(event1);
eventList.add(event2);
input.handleEvents(eventList);
int readerCount = 0;
for (KeyValueReader reader : input.getKeyValueReaders()) {
readerCount++;
while (reader.next()) {
if (data.size() == 0) {
fail("Found more records than expected");
}
Object key = reader.getCurrentKey();
Object val = reader.getCurrentValue();
assertEquals(val, data.remove(key));
}
}
assertEquals(2, readerCount);
}
示例24
@Test(timeout = 5000)
public void testExtraEvents() throws Exception {
Path workDir = new Path(TEST_ROOT_DIR, "testExtraEvents");
JobConf jobConf = new JobConf(defaultConf);
jobConf.setInputFormat(org.apache.hadoop.mapred.SequenceFileInputFormat.class);
FileInputFormat.setInputPaths(jobConf, workDir);
MRInputUserPayloadProto.Builder builder = MRInputUserPayloadProto.newBuilder();
builder.setInputFormatName(SequenceFileInputFormat.class.getName());
builder.setConfigurationBytes(TezUtils.createByteStringFromConf(jobConf));
byte[] payload = builder.build().toByteArray();
TezInputContext inputContext = createTezInputContext(payload);
MultiMRInput input = new MultiMRInput();
input.setNumPhysicalInputs(1);
input.initialize(inputContext);
List<Event> eventList = new ArrayList<Event>();
String file1 = "file1";
createInputData(localFs, workDir, jobConf, file1, 0, 10);
SequenceFileInputFormat<LongWritable, Text> format =
new SequenceFileInputFormat<LongWritable, Text>();
InputSplit[] splits = format.getSplits(jobConf, 1);
assertEquals(1, splits.length);
MRSplitProto splitProto = MRHelpers.createSplitProto(splits[0]);
RootInputDataInformationEvent event1 = new RootInputDataInformationEvent(0,
splitProto.toByteArray());
RootInputDataInformationEvent event2 = new RootInputDataInformationEvent(1,
splitProto.toByteArray());
eventList.clear();
eventList.add(event1);
eventList.add(event2);
try {
input.handleEvents(eventList);
fail("Expecting Exception due to too many events");
} catch (Exception e) {
assertTrue(e.getMessage().contains(
"Unexpected event. All physical sources already initialized"));
}
}
示例25
@Test
public void testSerializedPayload() throws IOException {
Configuration conf = new Configuration(false);
conf.setBoolean(MRJobConfig.MR_TEZ_INPUT_INITIALIZER_SERIALIZE_EVENT_PAYLOAD, true);
ByteString confByteString = MRHelpers.createByteStringFromConf(conf);
InputSplit split1 = new InputSplitForTest(1);
InputSplit split2 = new InputSplitForTest(2);
MRSplitProto proto1 = MRHelpers.createSplitProto(split1);
MRSplitProto proto2 = MRHelpers.createSplitProto(split2);
MRSplitsProto.Builder splitsProtoBuilder = MRSplitsProto.newBuilder();
splitsProtoBuilder.addSplits(proto1);
splitsProtoBuilder.addSplits(proto2);
MRInputUserPayloadProto.Builder payloadProto = MRInputUserPayloadProto.newBuilder();
payloadProto.setSplits(splitsProtoBuilder.build());
payloadProto.setConfigurationBytes(confByteString);
byte[] userPayload = payloadProto.build().toByteArray();
TezRootInputInitializerContext context = new TezRootInputInitializerContextForTest(userPayload);
MRInputSplitDistributor splitDist = new MRInputSplitDistributor();
List<Event> events = splitDist.initialize(context);
assertEquals(3, events.size());
assertTrue(events.get(0) instanceof RootInputUpdatePayloadEvent);
assertTrue(events.get(1) instanceof RootInputDataInformationEvent);
assertTrue(events.get(2) instanceof RootInputDataInformationEvent);
RootInputDataInformationEvent diEvent1 = (RootInputDataInformationEvent) (events.get(1));
RootInputDataInformationEvent diEvent2 = (RootInputDataInformationEvent) (events.get(2));
assertNull(diEvent1.getDeserializedUserPayload());
assertNull(diEvent2.getDeserializedUserPayload());
assertNotNull(diEvent1.getUserPayload());
assertNotNull(diEvent2.getUserPayload());
MRSplitProto event1Proto = MRSplitProto.parseFrom(diEvent1.getUserPayload());
InputSplit is1 = MRInputUtils.getOldSplitDetailsFromEvent(event1Proto, new Configuration());
assertTrue(is1 instanceof InputSplitForTest);
assertEquals(1, ((InputSplitForTest) is1).identifier);
MRSplitProto event2Proto = MRSplitProto.parseFrom(diEvent2.getUserPayload());
InputSplit is2 = MRInputUtils.getOldSplitDetailsFromEvent(event2Proto, new Configuration());
assertTrue(is2 instanceof InputSplitForTest);
assertEquals(2, ((InputSplitForTest) is2).identifier);
}
示例26
@Test
public void testDeserializedPayload() throws IOException {
Configuration conf = new Configuration(false);
conf.setBoolean(MRJobConfig.MR_TEZ_INPUT_INITIALIZER_SERIALIZE_EVENT_PAYLOAD, false);
ByteString confByteString = MRHelpers.createByteStringFromConf(conf);
InputSplit split1 = new InputSplitForTest(1);
InputSplit split2 = new InputSplitForTest(2);
MRSplitProto proto1 = MRHelpers.createSplitProto(split1);
MRSplitProto proto2 = MRHelpers.createSplitProto(split2);
MRSplitsProto.Builder splitsProtoBuilder = MRSplitsProto.newBuilder();
splitsProtoBuilder.addSplits(proto1);
splitsProtoBuilder.addSplits(proto2);
MRInputUserPayloadProto.Builder payloadProto = MRInputUserPayloadProto.newBuilder();
payloadProto.setSplits(splitsProtoBuilder.build());
payloadProto.setConfigurationBytes(confByteString);
byte[] userPayload = payloadProto.build().toByteArray();
TezRootInputInitializerContext context = new TezRootInputInitializerContextForTest(userPayload);
MRInputSplitDistributor splitDist = new MRInputSplitDistributor();
List<Event> events = splitDist.initialize(context);
assertEquals(3, events.size());
assertTrue(events.get(0) instanceof RootInputUpdatePayloadEvent);
assertTrue(events.get(1) instanceof RootInputDataInformationEvent);
assertTrue(events.get(2) instanceof RootInputDataInformationEvent);
RootInputDataInformationEvent diEvent1 = (RootInputDataInformationEvent) (events.get(1));
RootInputDataInformationEvent diEvent2 = (RootInputDataInformationEvent) (events.get(2));
assertNull(diEvent1.getUserPayload());
assertNull(diEvent2.getUserPayload());
assertNotNull(diEvent1.getDeserializedUserPayload());
assertNotNull(diEvent2.getDeserializedUserPayload());
assertTrue(diEvent1.getDeserializedUserPayload() instanceof InputSplitForTest);
assertEquals(1, ((InputSplitForTest) diEvent1.getDeserializedUserPayload()).identifier);
assertTrue(diEvent2.getDeserializedUserPayload() instanceof InputSplitForTest);
assertEquals(2, ((InputSplitForTest) diEvent2.getDeserializedUserPayload()).identifier);
}
示例27
private Vertex createVertexForStage(Configuration stageConf,
Map<String, LocalResource> jobLocalResources,
List<TaskLocationHint> locations, int stageNum, int totalStages)
throws IOException {
// stageNum starts from 0, goes till numStages - 1
boolean isMap = false;
if (stageNum == 0) {
isMap = true;
}
int numTasks = isMap ? stageConf.getInt(MRJobConfig.NUM_MAPS, 0)
: stageConf.getInt(MRJobConfig.NUM_REDUCES, 0);
String processorName = isMap ? MapProcessor.class.getName()
: ReduceProcessor.class.getName();
String vertexName = null;
if (isMap) {
vertexName = MultiStageMRConfigUtil.getInitialMapVertexName();
} else {
if (stageNum == totalStages - 1) {
vertexName = MultiStageMRConfigUtil.getFinalReduceVertexName();
} else {
vertexName = MultiStageMRConfigUtil
.getIntermediateStageVertexName(stageNum);
}
}
Resource taskResource = isMap ? MRHelpers.getResourceForMRMapper(stageConf)
: MRHelpers.getResourceForMRReducer(stageConf);
stageConf.set(MRJobConfig.MROUTPUT_FILE_NAME_PREFIX, "part");
UserPayload vertexUserPayload = TezUtils.createUserPayloadFromConf(stageConf);
Vertex vertex = Vertex.create(vertexName,
ProcessorDescriptor.create(processorName).setUserPayload(vertexUserPayload),
numTasks, taskResource);
if (stageConf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT,
TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT_DEFAULT)) {
vertex.getProcessorDescriptor().setHistoryText(TezUtils.convertToHistoryText(stageConf));
}
if (isMap) {
vertex.addDataSource("MRInput",
configureMRInputWithLegacySplitsGenerated(stageConf, true));
}
// Map only jobs.
if (stageNum == totalStages -1) {
OutputDescriptor od = OutputDescriptor.create(MROutputLegacy.class.getName())
.setUserPayload(vertexUserPayload);
if (stageConf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT,
TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT_DEFAULT)) {
od.setHistoryText(TezUtils.convertToHistoryText(stageConf));
}
vertex.addDataSink("MROutput", DataSinkDescriptor.create(od,
OutputCommitterDescriptor.create(MROutputCommitter.class.getName()), null));
}
Map<String, String> taskEnv = new HashMap<String, String>();
setupMapReduceEnv(stageConf, taskEnv, isMap);
Map<String, LocalResource> taskLocalResources =
new TreeMap<String, LocalResource>();
// PRECOMMIT Remove split localization for reduce tasks if it's being set
// here
taskLocalResources.putAll(jobLocalResources);
String taskJavaOpts = isMap ? MRHelpers.getJavaOptsForMRMapper(stageConf)
: MRHelpers.getJavaOptsForMRReducer(stageConf);
vertex.setTaskEnvironment(taskEnv)
.addTaskLocalFiles(taskLocalResources)
.setLocationHint(VertexLocationHint.create(locations))
.setTaskLaunchCmdOpts(taskJavaOpts);
if (!isMap) {
vertex.setVertexManagerPlugin((ShuffleVertexManager.createConfigBuilder(stageConf).build()));
}
if (LOG.isDebugEnabled()) {
LOG.debug("Adding vertex to DAG" + ", vertexName="
+ vertex.getName() + ", processor="
+ vertex.getProcessorDescriptor().getClassName() + ", parallelism="
+ vertex.getParallelism() + ", javaOpts=" + vertex.getTaskLaunchCmdOpts()
+ ", resources=" + vertex.getTaskResource()
// TODO Add localResources and Environment
);
}
return vertex;
}
示例28
@Test(timeout = 5000)
public void testMapProcessor() throws Exception {
String dagName = "mrdag0";
String vertexName = MultiStageMRConfigUtil.getInitialMapVertexName();
JobConf jobConf = new JobConf(defaultConf);
setUpJobConf(jobConf);
MRHelpers.translateMRConfToTez(jobConf);
jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 0);
jobConf.setBoolean(MRJobConfig.MR_TEZ_SPLITS_VIA_EVENTS, false);
jobConf.set(MRFrameworkConfigs.TASK_LOCAL_RESOURCE_DIR, new Path(workDir,
"localized-resources").toUri().toString());
Path mapInput = new Path(workDir, "map0");
MapUtils.generateInputSplit(localFs, workDir, jobConf, mapInput, 10);
InputSpec mapInputSpec = new InputSpec("NullSrcVertex",
InputDescriptor.create(MRInputLegacy.class.getName())
.setUserPayload(UserPayload.create(ByteBuffer.wrap(
MRRuntimeProtos.MRInputUserPayloadProto.newBuilder()
.setConfigurationBytes(TezUtils.createByteStringFromConf(jobConf)).build()
.toByteArray()))),
1);
OutputSpec mapOutputSpec = new OutputSpec("NullDestVertex",
OutputDescriptor.create(OrderedPartitionedKVOutput.class.getName())
.setUserPayload(TezUtils.createUserPayloadFromConf(jobConf)), 1);
TezSharedExecutor sharedExecutor = new TezSharedExecutor(jobConf);
LogicalIOProcessorRuntimeTask task = MapUtils.createLogicalTask(localFs, workDir, jobConf, 0,
new Path(workDir, "map0"), new TestUmbilical(), dagName, vertexName,
Collections.singletonList(mapInputSpec), Collections.singletonList(mapOutputSpec),
sharedExecutor);
task.initialize();
task.run();
task.close();
sharedExecutor.shutdownNow();
OutputContext outputContext = task.getOutputContexts().iterator().next();
TezTaskOutput mapOutputs = new TezTaskOutputFiles(
jobConf, outputContext.getUniqueIdentifier(),
outputContext.getDagIdentifier());
// TODO NEWTEZ FIXME OutputCommitter verification
// MRTask mrTask = (MRTask)t.getProcessor();
// Assert.assertEquals(TezNullOutputCommitter.class.getName(), mrTask
// .getCommitter().getClass().getName());
// t.close();
Path mapOutputFile = getMapOutputFile(jobConf, outputContext);
LOG.info("mapOutputFile = " + mapOutputFile);
IFile.Reader reader =
new IFile.Reader(localFs, mapOutputFile, null, null, null, false, 0, -1);
LongWritable key = new LongWritable();
Text value = new Text();
DataInputBuffer keyBuf = new DataInputBuffer();
DataInputBuffer valueBuf = new DataInputBuffer();
long prev = Long.MIN_VALUE;
while (reader.nextRawKey(keyBuf)) {
reader.nextRawValue(valueBuf);
key.readFields(keyBuf);
value.readFields(valueBuf);
if (prev != Long.MIN_VALUE) {
assert(prev <= key.get());
prev = key.get();
}
LOG.info("key = " + key.get() + "; value = " + value);
}
reader.close();
}
示例29
@Test(timeout = 30000)
public void testMapProcessorProgress() throws Exception {
String dagName = "mrdag0";
String vertexName = MultiStageMRConfigUtil.getInitialMapVertexName();
JobConf jobConf = new JobConf(defaultConf);
setUpJobConf(jobConf);
MRHelpers.translateMRConfToTez(jobConf);
jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 0);
jobConf.setBoolean(MRJobConfig.MR_TEZ_SPLITS_VIA_EVENTS, false);
jobConf.set(MRFrameworkConfigs.TASK_LOCAL_RESOURCE_DIR, new Path(workDir,
"localized-resources").toUri().toString());
Path mapInput = new Path(workDir, "map0");
MapUtils.generateInputSplit(localFs, workDir, jobConf, mapInput, 100000);
InputSpec mapInputSpec = new InputSpec("NullSrcVertex",
InputDescriptor.create(MRInputLegacy.class.getName())
.setUserPayload(UserPayload.create(ByteBuffer.wrap(
MRRuntimeProtos.MRInputUserPayloadProto.newBuilder()
.setConfigurationBytes(TezUtils.createByteStringFromConf
(jobConf)).build()
.toByteArray()))),
1);
OutputSpec mapOutputSpec = new OutputSpec("NullDestVertex",
OutputDescriptor.create(OrderedPartitionedKVOutput.class.getName())
.setUserPayload(TezUtils.createUserPayloadFromConf(jobConf)), 1);
TezSharedExecutor sharedExecutor = new TezSharedExecutor(jobConf);
final LogicalIOProcessorRuntimeTask task = MapUtils.createLogicalTask
(localFs, workDir, jobConf, 0,
new Path(workDir, "map0"), new TestUmbilical(), dagName, vertexName,
Collections.singletonList(mapInputSpec),
Collections.singletonList(mapOutputSpec), sharedExecutor);
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
Thread monitorProgress = new Thread(new Runnable() {
@Override
public void run() {
float prog = task.getProgress();
if(prog > 0.0f && prog < 1.0f)
progressUpdate = prog;
}
});
task.initialize();
scheduler.scheduleAtFixedRate(monitorProgress, 0, 1,
TimeUnit.MILLISECONDS);
task.run();
Assert.assertTrue("Progress Updates should be captured!",
progressUpdate > 0.0f && progressUpdate < 1.0f);
task.close();
sharedExecutor.shutdownNow();
}