Java源码示例:org.apache.tez.mapreduce.hadoop.MRHelpers

示例1
public List<Event> initialize(TezRootInputInitializerContext rootInputContext)  throws Exception {
  MRInputUserPayloadProto userPayloadProto = MRHelpers
      .parseMRInputPayload(rootInputContext.getUserPayload());
  Configuration conf = MRHelpers.createConfFromByteString(userPayloadProto
      .getConfigurationBytes());

  try {
    ReflectionUtils.getClazz(RELOCALIZATION_TEST_CLASS_NAME);
    LOG.info("Class found");
    FileSystem fs = FileSystem.get(conf);
    fs.mkdirs(new Path("/tmp/relocalizationfilefound"));
  } catch (TezUncheckedException e) {
    LOG.info("Class not found");
  }

  return super.initialize(rootInputContext);
}
 
示例2
/**
 * Helper API to generate the user payload for the MRInput and
 * MRInputAMSplitGenerator (if used). The InputFormat will be invoked by Tez
 * at DAG runtime to generate the input splits.
 * 
 * @param conf
 *          Configuration for the InputFormat
 * @param inputFormatClassName
 *          Name of the class of the InputFormat
 * @param useNewApi
 *          use new mapreduce API or old mapred API
 * @param groupSplitsInAM
 *          do grouping of splits in the AM. If true then splits generated by
 *          the InputFormat will be grouped in the AM based on available
 *          resources, locality etc. This option may be set to true only when
 *          using MRInputAMSplitGenerator as the initializer class in
 *          {@link Vertex#addInput(String, org.apache.tez.dag.api.InputDescriptor, Class)}
 * @return returns the user payload to be set on the InputDescriptor of  MRInput
 * @throws IOException
 */
public static byte[] createUserPayload(Configuration conf,
    String inputFormatClassName, boolean useNewApi, boolean groupSplitsInAM)
    throws IOException {
  Configuration inputConf = new JobConf(conf);
  String wrappedInputFormatClassName = null;
  String configInputFormatClassName = null;
  if (groupSplitsInAM) {
    wrappedInputFormatClassName = inputFormatClassName;
    configInputFormatClassName = TezGroupedSplitsInputFormat.class.getName();
  } else {
    wrappedInputFormatClassName = null;
    configInputFormatClassName = inputFormatClassName;
  }
  inputConf.set(MRJobConfig.INPUT_FORMAT_CLASS_ATTR,
      configInputFormatClassName);
  inputConf.setBoolean("mapred.mapper.new-api", useNewApi);
  MRHelpers.translateVertexConfToTez(inputConf);
  MRHelpers.doJobClientMagic(inputConf);
  if (groupSplitsInAM) {
    return MRHelpers.createMRInputPayloadWithGrouping(inputConf,
        wrappedInputFormatClassName);
  } else {
    return MRHelpers.createMRInputPayload(inputConf, null);
  }
}
 
示例3
/**
 * Creates the user payload to be set on the OutputDescriptor for MROutput
 */
private UserPayload createUserPayload() {
  // set which api is being used always
  conf.setBoolean(MRJobConfig.NEW_API_REDUCER_CONFIG, useNewApi);
  conf.setBoolean(MRJobConfig.NEW_API_MAPPER_CONFIG, useNewApi);
  if (outputFormatProvided) {
    if (useNewApi) {
      conf.set(MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR, outputFormat.getName());
    } else {
      conf.set("mapred.output.format.class", outputFormat.getName());
    }
  }
  MRHelpers.translateMRConfToTez(conf);
  try {
    return TezUtils.createUserPayloadFromConf(conf);
  } catch (IOException e) {
    throw new TezUncheckedException(e);
  }
}
 
示例4
private DataSourceDescriptor createCustomDataSource() throws IOException {
  setupBasicConf(conf);

  MRHelpers.translateMRConfToTez(conf);

  Collection<URI> uris = maybeGetURIsForCredentials();

  UserPayload payload = MRInputHelpersInternal.createMRInputPayload(
      conf, groupSplitsInAM, sortSplitsInAM);

  DataSourceDescriptor ds = DataSourceDescriptor
      .create(InputDescriptor.create(inputClassName).setUserPayload(payload),
          customInitializerDescriptor, null);

  if (conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT,
      TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT_DEFAULT)) {
    ds.getInputDescriptor().setHistoryText(TezUtils.convertToHistoryText(conf));
  }

  if (uris != null) {
    ds.addURIsForCredentials(uris);
  }
  return ds;
}
 
示例5
private DataSourceDescriptor createGeneratorDataSource() throws IOException {
  setupBasicConf(conf);
  MRHelpers.translateMRConfToTez(conf);
  
  Collection<URI> uris = maybeGetURIsForCredentials();

  UserPayload payload = MRInputHelpersInternal.createMRInputPayload(
      conf, groupSplitsInAM, sortSplitsInAM);

  DataSourceDescriptor ds = DataSourceDescriptor.create(
      InputDescriptor.create(inputClassName).setUserPayload(payload),
      InputInitializerDescriptor.create(MRInputAMSplitGenerator.class.getName()), null);

  if (conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT,
      TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT_DEFAULT)) {
    ds.getInputDescriptor().setHistoryText(TezUtils.convertToHistoryText(conf));
  }

  if (uris != null) {
    ds.addURIsForCredentials(uris);
  }
  return ds;
}
 
示例6
private DAG createDAG(FileSystem fs, TezConfiguration tezConf,
    Map<String, LocalResource> localResources, Path stagingDir,
    String inputPath, String outputPath) throws IOException {

  Configuration inputConf = new Configuration(tezConf);
  inputConf.set(FileInputFormat.INPUT_DIR, inputPath);
  InputDescriptor id = new InputDescriptor(MRInput.class.getName())
      .setUserPayload(MRInput.createUserPayload(inputConf,
          TextInputFormat.class.getName(), true, true));

  Configuration outputConf = new Configuration(tezConf);
  outputConf.set(FileOutputFormat.OUTDIR, outputPath);
  OutputDescriptor od = new OutputDescriptor(MROutput.class.getName())
    .setUserPayload(MROutput.createUserPayload(
        outputConf, TextOutputFormat.class.getName(), true));

  Vertex tokenizerVertex = new Vertex("tokenizer", new ProcessorDescriptor(
      TokenProcessor.class.getName()), -1, MRHelpers.getMapResource(tezConf));
  tokenizerVertex.addInput("MRInput", id, MRInputAMSplitGenerator.class);

  Vertex summerVertex = new Vertex("summer",
      new ProcessorDescriptor(
          SumProcessor.class.getName()), 1, MRHelpers.getReduceResource(tezConf));
  summerVertex.addOutput("MROutput", od, MROutputCommitter.class);

  OrderedPartitionedKVEdgeConfigurer edgeConf = OrderedPartitionedKVEdgeConfigurer
      .newBuilder(Text.class.getName(), IntWritable.class.getName(),
          HashPartitioner.class.getName(), null).build();

  DAG dag = new DAG("WordCount");
  dag.addVertex(tokenizerVertex)
      .addVertex(summerVertex)
      .addEdge(
          new Edge(tokenizerVertex, summerVertex, edgeConf.createDefaultEdgeProperty()));
  return dag;  
}
 
示例7
private DAG createDag(TezConfiguration tezConf, Path largeOutPath, Path smallOutPath,
    Path expectedOutputPath, int numTasks, long largeOutSize, long smallOutSize)
    throws IOException {

  long largeOutSizePerTask = largeOutSize / numTasks;
  long smallOutSizePerTask = smallOutSize / numTasks;

  DAG dag = new DAG("IntersectDataGen");

  byte[] streamOutputPayload = createPayloadForOutput(largeOutPath, tezConf);
  byte[] hashOutputPayload = createPayloadForOutput(smallOutPath, tezConf);
  byte[] expectedOutputPayload = createPayloadForOutput(expectedOutputPath, tezConf);

  Vertex genDataVertex = new Vertex("datagen", new ProcessorDescriptor(
      GenDataProcessor.class.getName()).setUserPayload(GenDataProcessor.createConfiguration(
      largeOutSizePerTask, smallOutSizePerTask)), numTasks, MRHelpers.getMapResource(tezConf));
  genDataVertex.addOutput(STREAM_OUTPUT_NAME,
      new OutputDescriptor(MROutput.class.getName()).setUserPayload(streamOutputPayload),
      MROutputCommitter.class);
  genDataVertex.addOutput(HASH_OUTPUT_NAME,
      new OutputDescriptor(MROutput.class.getName()).setUserPayload(hashOutputPayload),
      MROutputCommitter.class);
  genDataVertex.addOutput(EXPECTED_OUTPUT_NAME,
      new OutputDescriptor(MROutput.class.getName()).setUserPayload(expectedOutputPayload),
      MROutputCommitter.class);

  dag.addVertex(genDataVertex);

  return dag;
}
 
示例8
/**
 * Creates the user payload to be set on the OutputDescriptor for MROutput
 * @param conf Configuration for the OutputFormat
 * @param outputFormatName Name of the class of the OutputFormat
 * @param useNewApi Use new mapreduce API or old mapred API
 * @return
 * @throws IOException
 */
public static byte[] createUserPayload(Configuration conf, 
    String outputFormatName, boolean useNewApi) throws IOException {
  Configuration outputConf = new JobConf(conf);
  outputConf.set(MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR, outputFormatName);
  outputConf.setBoolean("mapred.mapper.new-api", useNewApi);
  MRHelpers.translateVertexConfToTez(outputConf);
  MRHelpers.doJobClientMagic(outputConf);
  return TezUtils.createUserPayloadFromConf(outputConf);
}
 
示例9
private void setupMapReduceEnv(Configuration jobConf,
    Map<String, String> environment, boolean isMap) throws IOException {

  if (isMap) {
    warnForJavaLibPath(
        jobConf.get(MRJobConfig.MAP_JAVA_OPTS,""),
        "map",
        MRJobConfig.MAP_JAVA_OPTS,
        MRJobConfig.MAP_ENV);
    warnForJavaLibPath(
        jobConf.get(MRJobConfig.MAPRED_MAP_ADMIN_JAVA_OPTS,""),
        "map",
        MRJobConfig.MAPRED_MAP_ADMIN_JAVA_OPTS,
        MRJobConfig.MAPRED_ADMIN_USER_ENV);
  } else {
    warnForJavaLibPath(
        jobConf.get(MRJobConfig.REDUCE_JAVA_OPTS,""),
        "reduce",
        MRJobConfig.REDUCE_JAVA_OPTS,
        MRJobConfig.REDUCE_ENV);
    warnForJavaLibPath(
        jobConf.get(MRJobConfig.MAPRED_REDUCE_ADMIN_JAVA_OPTS,""),
        "reduce",
        MRJobConfig.MAPRED_REDUCE_ADMIN_JAVA_OPTS,
        MRJobConfig.MAPRED_ADMIN_USER_ENV);
  }

  MRHelpers.updateEnvironmentForMRTasks(jobConf, environment, isMap);
}
 
示例10
public List<Event> initialize() throws IOException {
  getContext().requestInitialMemory(0l, null); // mandatory call
  MRRuntimeProtos.MRInputUserPayloadProto mrUserPayload =
      MRHelpers.parseMRInputPayload(getContext().getUserPayload());
  Preconditions.checkArgument(mrUserPayload.hasSplits() == false,
      "Split information not expected in " + this.getClass().getName());
  Configuration conf = MRHelpers.createConfFromByteString(mrUserPayload.getConfigurationBytes());

  this.jobConf = new JobConf(conf);
  // Add tokens to the jobConf - in case they are accessed within the RR / IF
  jobConf.getCredentials().mergeAll(UserGroupInformation.getCurrentUser().getCredentials());

  TaskAttemptID taskAttemptId = new TaskAttemptID(
      new TaskID(
          Long.toString(getContext().getApplicationId().getClusterTimestamp()),
          getContext().getApplicationId().getId(), TaskType.MAP,
          getContext().getTaskIndex()),
      getContext().getTaskAttemptNumber());

  jobConf.set(MRJobConfig.TASK_ATTEMPT_ID,
      taskAttemptId.toString());
  jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID,
      getContext().getDAGAttemptNumber());

  this.inputRecordCounter = getContext().getCounters().findCounter(
      TaskCounter.INPUT_RECORDS_PROCESSED);

  useNewApi = this.jobConf.getUseNewMapper();
  return null;
}
 
示例11
private void setupMapReduceEnv(Configuration jobConf,
    Map<String, String> environment, boolean isMap) throws IOException {

  if (isMap) {
    warnForJavaLibPath(
        jobConf.get(MRJobConfig.MAP_JAVA_OPTS,""),
        "map",
        MRJobConfig.MAP_JAVA_OPTS,
        MRJobConfig.MAP_ENV);
    warnForJavaLibPath(
        jobConf.get(MRJobConfig.MAPRED_MAP_ADMIN_JAVA_OPTS,""),
        "map",
        MRJobConfig.MAPRED_MAP_ADMIN_JAVA_OPTS,
        MRJobConfig.MAPRED_ADMIN_USER_ENV);
  } else {
    warnForJavaLibPath(
        jobConf.get(MRJobConfig.REDUCE_JAVA_OPTS,""),
        "reduce",
        MRJobConfig.REDUCE_JAVA_OPTS,
        MRJobConfig.REDUCE_ENV);
    warnForJavaLibPath(
        jobConf.get(MRJobConfig.MAPRED_REDUCE_ADMIN_JAVA_OPTS,""),
        "reduce",
        MRJobConfig.MAPRED_REDUCE_ADMIN_JAVA_OPTS,
        MRJobConfig.MAPRED_ADMIN_USER_ENV);
  }

  MRHelpers.updateEnvBasedOnMRTaskEnv(jobConf, environment, isMap);
}
 
示例12
private DataSourceDescriptor createDistributorDataSource() throws IOException {
  InputSplitInfo inputSplitInfo;
  setupBasicConf(conf);
  try {
    inputSplitInfo = MRInputHelpers.generateInputSplitsToMem(conf, false, true, 0);
  } catch (Exception e) {
    throw new TezUncheckedException(e);
  }
  MRHelpers.translateMRConfToTez(conf);

  UserPayload payload = MRInputHelpersInternal.createMRInputPayload(conf,
      inputSplitInfo.getSplitsProto());
  Credentials credentials = null;
  if (getCredentialsForSourceFilesystem && inputSplitInfo.getCredentials() != null) {
    credentials = inputSplitInfo.getCredentials();
  }
  DataSourceDescriptor ds = DataSourceDescriptor.create(
      InputDescriptor.create(inputClassName).setUserPayload(payload),
      InputInitializerDescriptor.create(MRInputSplitDistributor.class.getName()),
      inputSplitInfo.getNumTasks(), credentials,
      VertexLocationHint.create(inputSplitInfo.getTaskLocationHints()), null);
  if (conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT,
      TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT_DEFAULT)) {
    ds.getInputDescriptor().setHistoryText(TezUtils.convertToHistoryText(conf));
  }

  return ds;
}
 
示例13
private DAG createDag(TezConfiguration tezConf, Path lhs, Path rhs, int numPartitions)
    throws IOException {
  DAG dag = new DAG("IntersectValidate");

  // Configuration for src1
  Configuration lhsInputConf = new Configuration(tezConf);
  lhsInputConf.set(FileInputFormat.INPUT_DIR, lhs.toUri().toString());
  byte[] streamInputPayload = MRInput.createUserPayload(lhsInputConf,
      TextInputFormat.class.getName(), true, false);

  // Configuration for src2
  Configuration rhsInputConf = new Configuration(tezConf);
  rhsInputConf.set(FileInputFormat.INPUT_DIR, rhs.toUri().toString());
  byte[] hashInputPayload = MRInput.createUserPayload(rhsInputConf,
      TextInputFormat.class.getName(), true, false);

  // Configuration for intermediate output - shared by Vertex1 and Vertex2
  // This should only be setting selective keys from the underlying conf. Fix after there's a
  // better mechanism to configure the IOs.
  OrderedPartitionedKVEdgeConfigurer edgeConf = OrderedPartitionedKVEdgeConfigurer
      .newBuilder(Text.class.getName(), NullWritable.class.getName(),
          HashPartitioner.class.getName(), null).build();

  // Change the way resources are setup - no MRHelpers
  Vertex lhsVertex = new Vertex(LHS_INPUT_NAME, new ProcessorDescriptor(
      ForwardingProcessor.class.getName()), -1,
      MRHelpers.getMapResource(tezConf)).addInput("lhs", new InputDescriptor(
      MRInput.class.getName()).setUserPayload(streamInputPayload),
      MRInputAMSplitGenerator.class);

  Vertex rhsVertex = new Vertex(RHS_INPUT_NAME, new ProcessorDescriptor(
      ForwardingProcessor.class.getName()), -1,
      MRHelpers.getMapResource(tezConf)).addInput("rhs", new InputDescriptor(
      MRInput.class.getName()).setUserPayload(hashInputPayload),
      MRInputAMSplitGenerator.class);

  Vertex intersectValidateVertex = new Vertex("intersectvalidate",
      new ProcessorDescriptor(IntersectValidateProcessor.class.getName()),
      numPartitions, MRHelpers.getReduceResource(tezConf));

  Edge e1 = new Edge(lhsVertex, intersectValidateVertex, edgeConf.createDefaultEdgeProperty());
  Edge e2 = new Edge(rhsVertex, intersectValidateVertex, edgeConf.createDefaultEdgeProperty());

  dag.addVertex(lhsVertex).addVertex(rhsVertex).addVertex(intersectValidateVertex).addEdge(e1)
      .addEdge(e2);
  return dag;
}
 
示例14
private DAG createDag(TezConfiguration tezConf, Path streamPath, Path hashPath, Path outPath,
    int numPartitions) throws IOException {
  DAG dag = new DAG("IntersectExample");

  // Configuration for src1
  Configuration streamInputConf = new Configuration(tezConf);
  streamInputConf.set(FileInputFormat.INPUT_DIR, streamPath.toUri().toString());
  byte[] streamInputPayload = MRInput.createUserPayload(streamInputConf,
      TextInputFormat.class.getName(), true, false);

  // Configuration for src2
  Configuration hashInputConf = new Configuration(tezConf);
  hashInputConf.set(FileInputFormat.INPUT_DIR, hashPath.toUri().toString());
  byte[] hashInputPayload = MRInput.createUserPayload(hashInputConf,
      TextInputFormat.class.getName(), true, false);

  // Configuration for intermediate output - shared by Vertex1 and Vertex2
  // This should only be setting selective keys from the underlying conf. Fix after there's a
  // better mechanism to configure the IOs.

  UnorderedPartitionedKVEdgeConfigurer edgeConf =
      UnorderedPartitionedKVEdgeConfigurer
          .newBuilder(Text.class.getName(), NullWritable.class.getName(),
              HashPartitioner.class.getName(), null).build();

  Configuration finalOutputConf = new Configuration(tezConf);
  finalOutputConf.set(FileOutputFormat.OUTDIR, outPath.toUri().toString());
  byte[] finalOutputPayload = MROutput.createUserPayload(finalOutputConf,
      TextOutputFormat.class.getName(), true);

  // Change the way resources are setup - no MRHelpers
  Vertex streamFileVertex = new Vertex("partitioner1",
      new ProcessorDescriptor(ForwardingProcessor.class.getName()), -1,
      MRHelpers.getMapResource(tezConf)).addInput("streamfile",
      new InputDescriptor(MRInput.class.getName())
          .setUserPayload(streamInputPayload), MRInputAMSplitGenerator.class);

  Vertex hashFileVertex = new Vertex("partitioner2", new ProcessorDescriptor(
      ForwardingProcessor.class.getName()), -1,
      MRHelpers.getMapResource(tezConf)).addInput("hashfile",
      new InputDescriptor(MRInput.class.getName())
          .setUserPayload(hashInputPayload), MRInputAMSplitGenerator.class);

  Vertex intersectVertex = new Vertex("intersect", new ProcessorDescriptor(
      IntersectProcessor.class.getName()), numPartitions,
      MRHelpers.getReduceResource(tezConf)).addOutput("finalOutput",
      new OutputDescriptor(MROutput.class.getName())
          .setUserPayload(finalOutputPayload), MROutputCommitter.class);

  Edge e1 = new Edge(streamFileVertex, intersectVertex, edgeConf.createDefaultEdgeProperty());

  Edge e2 = new Edge(hashFileVertex, intersectVertex, edgeConf.createDefaultEdgeProperty());

  dag.addVertex(streamFileVertex).addVertex(hashFileVertex).addVertex(intersectVertex)
      .addEdge(e1).addEdge(e2);
  return dag;
}
 
示例15
private DAG createDAG(FileSystem fs, TezConfiguration tezConf,
    Path stagingDir, boolean doLocalityCheck) throws IOException, YarnException {

  JobConf mrConf = new JobConf(tezConf);

  int numBroadcastTasks = 2;
  int numOneToOneTasks = 3;
  if (doLocalityCheck) {
    YarnClient yarnClient = YarnClient.createYarnClient();
    yarnClient.init(tezConf);
    yarnClient.start();
    int numNMs = yarnClient.getNodeReports(NodeState.RUNNING).size();
    yarnClient.stop();
    // create enough 1-1 tasks to run in parallel
    numOneToOneTasks = numNMs - numBroadcastTasks - 1;// 1 AM
    if (numOneToOneTasks < 1) {
      numOneToOneTasks = 1;
    }
  }
  byte[] procPayload = {(byte) (doLocalityCheck ? 1 : 0), 1};

  System.out.println("Using " + numOneToOneTasks + " 1-1 tasks");

  Vertex broadcastVertex = new Vertex("Broadcast", new ProcessorDescriptor(
      InputProcessor.class.getName()),
      numBroadcastTasks, MRHelpers.getMapResource(mrConf));
  
  Vertex inputVertex = new Vertex("Input", new ProcessorDescriptor(
      InputProcessor.class.getName()).setUserPayload(procPayload),
      numOneToOneTasks, MRHelpers.getMapResource(mrConf));

  Vertex oneToOneVertex = new Vertex("OneToOne",
      new ProcessorDescriptor(
          OneToOneProcessor.class.getName()).setUserPayload(procPayload),
          -1, MRHelpers.getReduceResource(mrConf));
  oneToOneVertex.setVertexManagerPlugin(
          new VertexManagerPluginDescriptor(InputReadyVertexManager.class.getName()));

  UnorderedUnpartitionedKVEdgeConfigurer edgeConf = UnorderedUnpartitionedKVEdgeConfigurer
      .newBuilder(Text.class.getName(), IntWritable.class.getName()).build();

  DAG dag = new DAG("BroadcastAndOneToOneExample");
  dag.addVertex(inputVertex)
      .addVertex(broadcastVertex)
      .addVertex(oneToOneVertex)
      .addEdge(
          new Edge(inputVertex, oneToOneVertex, edgeConf.createDefaultOneToOneEdgeProperty()))
      .addEdge(
          new Edge(broadcastVertex, oneToOneVertex,
              edgeConf.createDefaultBroadcastEdgeProperty()));
  return dag;
}
 
示例16
private DAG createDAG(FileSystem fs, TezConfiguration tezConf,
    Map<String, LocalResource> localResources, Path stagingDir,
    String inputPath, String outputPath) throws IOException {
  DAG dag = new DAG("UnionExample");
  
  int numMaps = -1;
  Configuration inputConf = new Configuration(tezConf);
  inputConf.set(FileInputFormat.INPUT_DIR, inputPath);
  InputDescriptor id = new InputDescriptor(MRInput.class.getName())
      .setUserPayload(MRInput.createUserPayload(inputConf,
          TextInputFormat.class.getName(), true, true));

  Vertex mapVertex1 = new Vertex("map1", new ProcessorDescriptor(
      TokenProcessor.class.getName()),
      numMaps, MRHelpers.getMapResource(tezConf));
  mapVertex1.addInput("MRInput", id, MRInputAMSplitGenerator.class);

  Vertex mapVertex2 = new Vertex("map2", new ProcessorDescriptor(
      TokenProcessor.class.getName()),
      numMaps, MRHelpers.getMapResource(tezConf));
  mapVertex2.addInput("MRInput", id, MRInputAMSplitGenerator.class);

  Vertex mapVertex3 = new Vertex("map3", new ProcessorDescriptor(
      TokenProcessor.class.getName()),
      numMaps, MRHelpers.getMapResource(tezConf));
  mapVertex3.addInput("MRInput", id, MRInputAMSplitGenerator.class);

  Vertex checkerVertex = new Vertex("checker",
      new ProcessorDescriptor(
          UnionProcessor.class.getName()),
              1, MRHelpers.getReduceResource(tezConf));

  Configuration outputConf = new Configuration(tezConf);
  outputConf.set(FileOutputFormat.OUTDIR, outputPath);
  OutputDescriptor od = new OutputDescriptor(MROutput.class.getName())
    .setUserPayload(MROutput.createUserPayload(
        outputConf, TextOutputFormat.class.getName(), true));
  checkerVertex.addOutput("union", od, MROutputCommitter.class);

  Configuration allPartsConf = new Configuration(tezConf);
  allPartsConf.set(FileOutputFormat.OUTDIR, outputPath+"-all-parts");
  OutputDescriptor od2 = new OutputDescriptor(MROutput.class.getName())
    .setUserPayload(MROutput.createUserPayload(
        allPartsConf, TextOutputFormat.class.getName(), true));
  checkerVertex.addOutput("all-parts", od2, MROutputCommitter.class);

  Configuration partsConf = new Configuration(tezConf);
  partsConf.set(FileOutputFormat.OUTDIR, outputPath+"-parts");
  
  VertexGroup unionVertex = dag.createVertexGroup("union", mapVertex1, mapVertex2);
  OutputDescriptor od1 = new OutputDescriptor(MROutput.class.getName())
    .setUserPayload(MROutput.createUserPayload(
        partsConf, TextOutputFormat.class.getName(), true));
  unionVertex.addOutput("parts", od1, MROutputCommitter.class);

  OrderedPartitionedKVEdgeConfigurer edgeConf = OrderedPartitionedKVEdgeConfigurer
      .newBuilder(Text.class.getName(), IntWritable.class.getName(),
          HashPartitioner.class.getName(), null).build();

  dag.addVertex(mapVertex1)
      .addVertex(mapVertex2)
      .addVertex(mapVertex3)
      .addVertex(checkerVertex)
      .addEdge(
          new Edge(mapVertex3, checkerVertex, edgeConf.createDefaultEdgeProperty()))
      .addEdge(
          new GroupInputEdge(unionVertex, checkerVertex, edgeConf.createDefaultEdgeProperty(),
              new InputDescriptor(
                  ConcatenatedMergedKeyValuesInput.class.getName())));
  return dag;  
}
 
示例17
private Vertex createVertexForStage(Configuration stageConf,
    Map<String, LocalResource> jobLocalResources,
    List<TaskLocationHint> locations, int stageNum, int totalStages)
    throws IOException {
  // stageNum starts from 0, goes till numStages - 1
  boolean isMap = false;
  if (stageNum == 0) {
    isMap = true;
  }

  int numTasks = isMap ? stageConf.getInt(MRJobConfig.NUM_MAPS, 0)
      : stageConf.getInt(MRJobConfig.NUM_REDUCES, 0);
  String processorName = isMap ? MapProcessor.class.getName()
      : ReduceProcessor.class.getName();
  String vertexName = null;
  if (isMap) {
    vertexName = MultiStageMRConfigUtil.getInitialMapVertexName();
  } else {
    if (stageNum == totalStages - 1) {
      vertexName = MultiStageMRConfigUtil.getFinalReduceVertexName();
    } else {
      vertexName = MultiStageMRConfigUtil
          .getIntermediateStageVertexName(stageNum);
    }
  }

  Resource taskResource = isMap ? MRHelpers.getMapResource(stageConf)
      : MRHelpers.getReduceResource(stageConf);
  
  stageConf.set(MRJobConfig.MROUTPUT_FILE_NAME_PREFIX, "part");
  
  byte[] vertexUserPayload = MRHelpers.createUserPayloadFromConf(stageConf);
  Vertex vertex = new Vertex(vertexName, new ProcessorDescriptor(processorName).
      setUserPayload(vertexUserPayload),
      numTasks, taskResource);
  if (isMap) {
    byte[] mapInputPayload = MRHelpers.createMRInputPayload(vertexUserPayload, null);
    MRHelpers.addMRInput(vertex, mapInputPayload, null);
  }
  // Map only jobs.
  if (stageNum == totalStages -1) {
    MRHelpers.addMROutputLegacy(vertex, vertexUserPayload);
  }

  Map<String, String> taskEnv = new HashMap<String, String>();
  setupMapReduceEnv(stageConf, taskEnv, isMap);

  Map<String, LocalResource> taskLocalResources =
      new TreeMap<String, LocalResource>();
  // PRECOMMIT Remove split localization for reduce tasks if it's being set
  // here
  taskLocalResources.putAll(jobLocalResources);

  String taskJavaOpts = isMap ? MRHelpers.getMapJavaOpts(stageConf)
      : MRHelpers.getReduceJavaOpts(stageConf);

  vertex.setTaskEnvironment(taskEnv)
      .setTaskLocalFiles(taskLocalResources)
      .setTaskLocationsHint(locations)
      .setTaskLaunchCmdOpts(taskJavaOpts);
  
  if (!isMap) {
    vertex.setVertexManagerPlugin(new VertexManagerPluginDescriptor(
        ShuffleVertexManager.class.getName()));
  }

  if (LOG.isDebugEnabled()) {
    LOG.debug("Adding vertex to DAG" + ", vertexName="
        + vertex.getName() + ", processor="
        + vertex.getProcessorDescriptor().getClassName() + ", parallelism="
        + vertex.getParallelism() + ", javaOpts=" + vertex.getTaskLaunchCmdOpts()
        + ", resources=" + vertex.getTaskResource()
    // TODO Add localResources and Environment
    );
  }

  return vertex;
}
 
示例18
public static org.apache.hadoop.mapreduce.InputSplit getNewSplitDetailsFromEvent(
    MRSplitProto splitProto, Configuration conf) throws IOException {
  SerializationFactory serializationFactory = new SerializationFactory(conf);
  return MRHelpers.createNewFormatSplitFromUserPayload(
      splitProto, serializationFactory);
}
 
示例19
@Private
public static InputSplit getOldSplitDetailsFromEvent(MRSplitProto splitProto, Configuration conf)
    throws IOException {
  SerializationFactory serializationFactory = new SerializationFactory(conf);
  return MRHelpers.createOldFormatSplitFromUserPayload(splitProto, serializationFactory);
}
 
示例20
@Override
public List<Event> initialize(TezRootInputInitializerContext rootInputContext)
    throws IOException {
  Stopwatch sw = null;
  if (LOG.isDebugEnabled()) {
    sw = new Stopwatch().start();
  }
  MRInputUserPayloadProto userPayloadProto = MRHelpers.parseMRInputPayload(rootInputContext.getUserPayload());
  if (LOG.isDebugEnabled()) {
    sw.stop();
    LOG.debug("Time to parse MRInput payload into prot: "
        + sw.elapsedMillis());  
  }
  Configuration conf = MRHelpers.createConfFromByteString(userPayloadProto
      .getConfigurationBytes());
  JobConf jobConf = new JobConf(conf);
  boolean useNewApi = jobConf.getUseNewMapper();
  sendSerializedEvents = conf.getBoolean(
      MRJobConfig.MR_TEZ_INPUT_INITIALIZER_SERIALIZE_EVENT_PAYLOAD,
      MRJobConfig.MR_TEZ_INPUT_INITIALIZER_SERIALIZE_EVENT_PAYLOAD_DEFAULT);
  LOG.info("Emitting serialized splits: " + sendSerializedEvents);

  this.splitsProto = userPayloadProto.getSplits();
  
  MRInputUserPayloadProto.Builder updatedPayloadBuilder = MRInputUserPayloadProto.newBuilder(userPayloadProto);
  updatedPayloadBuilder.clearSplits();

  List<Event> events = Lists.newArrayListWithCapacity(this.splitsProto.getSplitsCount() + 1);
  RootInputUpdatePayloadEvent updatePayloadEvent = new RootInputUpdatePayloadEvent(
      updatedPayloadBuilder.build().toByteArray());

  events.add(updatePayloadEvent);
  int count = 0;

  for (MRSplitProto mrSplit : this.splitsProto.getSplitsList()) {

    RootInputDataInformationEvent diEvent;

    if (sendSerializedEvents) {
      // Unnecessary array copy, can be avoided by using ByteBuffer instead of
      // a raw array.
      diEvent = new RootInputDataInformationEvent(count++, mrSplit.toByteArray());
    } else {
      if (useNewApi) {
        org.apache.hadoop.mapreduce.InputSplit newInputSplit = MRInputUtils
            .getNewSplitDetailsFromEvent(mrSplit, conf);
        diEvent = new RootInputDataInformationEvent(count++, newInputSplit);
      } else {
        org.apache.hadoop.mapred.InputSplit oldInputSplit = MRInputUtils
            .getOldSplitDetailsFromEvent(mrSplit, conf);
        diEvent = new RootInputDataInformationEvent(count++, oldInputSplit);
      }
    }
    events.add(diEvent);
  }

  return events;
}
 
示例21
@Test
  public void testMapProcessor() throws Exception {
    String dagName = "mrdag0";
    String vertexName = MultiStageMRConfigUtil.getInitialMapVertexName();
    JobConf jobConf = new JobConf(defaultConf);
    setUpJobConf(jobConf);

    MRHelpers.translateVertexConfToTez(jobConf);
    jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 0);

    jobConf.setBoolean(MRJobConfig.MR_TEZ_SPLITS_VIA_EVENTS, false);

    jobConf.set(MRFrameworkConfigs.TASK_LOCAL_RESOURCE_DIR, new Path(workDir,
        "localized-resources").toUri().toString());
    
    Path mapInput = new Path(workDir, "map0");
    
    
    MapUtils.generateInputSplit(localFs, workDir, jobConf, mapInput);
    
    InputSpec mapInputSpec = new InputSpec("NullSrcVertex",
        new InputDescriptor(MRInputLegacy.class.getName())
            .setUserPayload(MRHelpers.createMRInputPayload(jobConf, null)),
        1);
    OutputSpec mapOutputSpec = new OutputSpec("NullDestVertex", 
        new OutputDescriptor(LocalOnFileSorterOutput.class.getName())
            .setUserPayload(TezUtils.createUserPayloadFromConf(jobConf)), 1);

    LogicalIOProcessorRuntimeTask task = MapUtils.createLogicalTask(localFs, workDir, jobConf, 0,
        new Path(workDir, "map0"), new TestUmbilical(), dagName, vertexName,
        Collections.singletonList(mapInputSpec),
        Collections.singletonList(mapOutputSpec));
    
    task.initialize();
    task.run();
    task.close();
    
    TezInputContext inputContext = task.getInputContexts().iterator().next();
    TezTaskOutput mapOutputs = new TezLocalTaskOutputFiles(jobConf, inputContext.getUniqueIdentifier());
    
    
    // TODO NEWTEZ FIXME OutputCommitter verification
//    MRTask mrTask = (MRTask)t.getProcessor();
//    Assert.assertEquals(TezNullOutputCommitter.class.getName(), mrTask
//        .getCommitter().getClass().getName());
//    t.close();

    Path mapOutputFile = mapOutputs.getInputFile(new InputAttemptIdentifier(0, 0));
    LOG.info("mapOutputFile = " + mapOutputFile);
    IFile.Reader reader =
        new IFile.Reader(localFs, mapOutputFile, null, null, null, false, 0, -1);
    LongWritable key = new LongWritable();
    Text value = new Text();
    DataInputBuffer keyBuf = new DataInputBuffer();
    DataInputBuffer valueBuf = new DataInputBuffer();
    long prev = Long.MIN_VALUE;
    while (reader.nextRawKey(keyBuf)) {
      reader.nextRawValue(valueBuf);
      key.readFields(keyBuf);
      value.readFields(valueBuf);
      if (prev != Long.MIN_VALUE) {
        assert(prev <= key.get());
        prev = key.get();
      }
      LOG.info("key = " + key.get() + "; value = " + value);
    }
    reader.close();
  }
 
示例22
@Test(timeout = 5000)
public void testSingleSplit() throws Exception {

  Path workDir = new Path(TEST_ROOT_DIR, "testSingleSplit");
  JobConf jobConf = new JobConf(defaultConf);
  jobConf.setInputFormat(org.apache.hadoop.mapred.SequenceFileInputFormat.class);
  FileInputFormat.setInputPaths(jobConf, workDir);

  MRInputUserPayloadProto.Builder builder = MRInputUserPayloadProto.newBuilder();
  builder.setInputFormatName(SequenceFileInputFormat.class.getName());
  builder.setConfigurationBytes(TezUtils.createByteStringFromConf(jobConf));
  byte[] payload = builder.build().toByteArray();

  TezInputContext inputContext = createTezInputContext(payload);

  MultiMRInput input = new MultiMRInput();
  input.setNumPhysicalInputs(1);
  input.initialize(inputContext);
  List<Event> eventList = new ArrayList<Event>();

  String file1 = "file1";
  LinkedHashMap<LongWritable, Text> data1 = createInputData(localFs, workDir, jobConf, file1, 0,
      10);
  SequenceFileInputFormat<LongWritable, Text> format =
      new SequenceFileInputFormat<LongWritable, Text>();
  InputSplit[] splits = format.getSplits(jobConf, 1);
  assertEquals(1, splits.length);

  MRSplitProto splitProto = MRHelpers.createSplitProto(splits[0]);
  RootInputDataInformationEvent event = new RootInputDataInformationEvent(0,
      splitProto.toByteArray());

  eventList.clear();
  eventList.add(event);
  input.handleEvents(eventList);

  int readerCount = 0;
  for (KeyValueReader reader : input.getKeyValueReaders()) {
    readerCount++;
    while (reader.next()) {
      if (data1.size() == 0) {
        fail("Found more records than expected");
      }
      Object key = reader.getCurrentKey();
      Object val = reader.getCurrentValue();
      assertEquals(val, data1.remove(key));
    }
  }
  assertEquals(1, readerCount);
}
 
示例23
@Test(timeout = 5000)
public void testMultipleSplits() throws Exception {

  Path workDir = new Path(TEST_ROOT_DIR, "testMultipleSplits");
  JobConf jobConf = new JobConf(defaultConf);
  jobConf.setInputFormat(org.apache.hadoop.mapred.SequenceFileInputFormat.class);
  FileInputFormat.setInputPaths(jobConf, workDir);

  MRInputUserPayloadProto.Builder builder = MRInputUserPayloadProto.newBuilder();
  builder.setInputFormatName(SequenceFileInputFormat.class.getName());
  builder.setConfigurationBytes(TezUtils.createByteStringFromConf(jobConf));
  byte[] payload = builder.build().toByteArray();

  TezInputContext inputContext = createTezInputContext(payload);

  MultiMRInput input = new MultiMRInput();
  input.setNumPhysicalInputs(2);
  input.initialize(inputContext);
  List<Event> eventList = new ArrayList<Event>();

  LinkedHashMap<LongWritable, Text> data = new LinkedHashMap<LongWritable, Text>();

  String file1 = "file1";
  LinkedHashMap<LongWritable, Text> data1 = createInputData(localFs, workDir, jobConf, file1, 0,
      10);

  String file2 = "file2";
  LinkedHashMap<LongWritable, Text> data2 = createInputData(localFs, workDir, jobConf, file2, 10,
      20);

  data.putAll(data1);
  data.putAll(data2);

  SequenceFileInputFormat<LongWritable, Text> format =
      new SequenceFileInputFormat<LongWritable, Text>();
  InputSplit[] splits = format.getSplits(jobConf, 2);
  assertEquals(2, splits.length);

  MRSplitProto splitProto1 = MRHelpers.createSplitProto(splits[0]);
  RootInputDataInformationEvent event1 = new RootInputDataInformationEvent(0,
      splitProto1.toByteArray());

  MRSplitProto splitProto2 = MRHelpers.createSplitProto(splits[1]);
  RootInputDataInformationEvent event2 = new RootInputDataInformationEvent(0,
      splitProto2.toByteArray());

  eventList.clear();
  eventList.add(event1);
  eventList.add(event2);
  input.handleEvents(eventList);

  int readerCount = 0;
  for (KeyValueReader reader : input.getKeyValueReaders()) {
    readerCount++;
    while (reader.next()) {
      if (data.size() == 0) {
        fail("Found more records than expected");
      }
      Object key = reader.getCurrentKey();
      Object val = reader.getCurrentValue();
      assertEquals(val, data.remove(key));
    }
  }
  assertEquals(2, readerCount);
}
 
示例24
@Test(timeout = 5000)
public void testExtraEvents() throws Exception {
  Path workDir = new Path(TEST_ROOT_DIR, "testExtraEvents");
  JobConf jobConf = new JobConf(defaultConf);
  jobConf.setInputFormat(org.apache.hadoop.mapred.SequenceFileInputFormat.class);
  FileInputFormat.setInputPaths(jobConf, workDir);

  MRInputUserPayloadProto.Builder builder = MRInputUserPayloadProto.newBuilder();
  builder.setInputFormatName(SequenceFileInputFormat.class.getName());
  builder.setConfigurationBytes(TezUtils.createByteStringFromConf(jobConf));
  byte[] payload = builder.build().toByteArray();

  TezInputContext inputContext = createTezInputContext(payload);

  MultiMRInput input = new MultiMRInput();
  input.setNumPhysicalInputs(1);
  input.initialize(inputContext);
  List<Event> eventList = new ArrayList<Event>();

  String file1 = "file1";
  createInputData(localFs, workDir, jobConf, file1, 0, 10);
  SequenceFileInputFormat<LongWritable, Text> format =
      new SequenceFileInputFormat<LongWritable, Text>();
  InputSplit[] splits = format.getSplits(jobConf, 1);
  assertEquals(1, splits.length);

  MRSplitProto splitProto = MRHelpers.createSplitProto(splits[0]);
  RootInputDataInformationEvent event1 = new RootInputDataInformationEvent(0,
      splitProto.toByteArray());
  RootInputDataInformationEvent event2 = new RootInputDataInformationEvent(1,
      splitProto.toByteArray());

  eventList.clear();
  eventList.add(event1);
  eventList.add(event2);
  try {
    input.handleEvents(eventList);
    fail("Expecting Exception due to too many events");
  } catch (Exception e) {
    assertTrue(e.getMessage().contains(
        "Unexpected event. All physical sources already initialized"));
  }
}
 
示例25
@Test
public void testSerializedPayload() throws IOException {

  Configuration conf = new Configuration(false);
  conf.setBoolean(MRJobConfig.MR_TEZ_INPUT_INITIALIZER_SERIALIZE_EVENT_PAYLOAD, true);
  ByteString confByteString = MRHelpers.createByteStringFromConf(conf);
  InputSplit split1 = new InputSplitForTest(1);
  InputSplit split2 = new InputSplitForTest(2);
  MRSplitProto proto1 = MRHelpers.createSplitProto(split1);
  MRSplitProto proto2 = MRHelpers.createSplitProto(split2);
  MRSplitsProto.Builder splitsProtoBuilder = MRSplitsProto.newBuilder();
  splitsProtoBuilder.addSplits(proto1);
  splitsProtoBuilder.addSplits(proto2);
  MRInputUserPayloadProto.Builder payloadProto = MRInputUserPayloadProto.newBuilder();
  payloadProto.setSplits(splitsProtoBuilder.build());
  payloadProto.setConfigurationBytes(confByteString);
  byte[] userPayload = payloadProto.build().toByteArray();

  TezRootInputInitializerContext context = new TezRootInputInitializerContextForTest(userPayload);
  MRInputSplitDistributor splitDist = new MRInputSplitDistributor();

  List<Event> events = splitDist.initialize(context);

  assertEquals(3, events.size());
  assertTrue(events.get(0) instanceof RootInputUpdatePayloadEvent);
  assertTrue(events.get(1) instanceof RootInputDataInformationEvent);
  assertTrue(events.get(2) instanceof RootInputDataInformationEvent);

  RootInputDataInformationEvent diEvent1 = (RootInputDataInformationEvent) (events.get(1));
  RootInputDataInformationEvent diEvent2 = (RootInputDataInformationEvent) (events.get(2));

  assertNull(diEvent1.getDeserializedUserPayload());
  assertNull(diEvent2.getDeserializedUserPayload());

  assertNotNull(diEvent1.getUserPayload());
  assertNotNull(diEvent2.getUserPayload());

  MRSplitProto event1Proto = MRSplitProto.parseFrom(diEvent1.getUserPayload());
  InputSplit is1 = MRInputUtils.getOldSplitDetailsFromEvent(event1Proto, new Configuration());
  assertTrue(is1 instanceof InputSplitForTest);
  assertEquals(1, ((InputSplitForTest) is1).identifier);

  MRSplitProto event2Proto = MRSplitProto.parseFrom(diEvent2.getUserPayload());
  InputSplit is2 = MRInputUtils.getOldSplitDetailsFromEvent(event2Proto, new Configuration());
  assertTrue(is2 instanceof InputSplitForTest);
  assertEquals(2, ((InputSplitForTest) is2).identifier);
}
 
示例26
@Test
public void testDeserializedPayload() throws IOException {

  Configuration conf = new Configuration(false);
  conf.setBoolean(MRJobConfig.MR_TEZ_INPUT_INITIALIZER_SERIALIZE_EVENT_PAYLOAD, false);
  ByteString confByteString = MRHelpers.createByteStringFromConf(conf);
  InputSplit split1 = new InputSplitForTest(1);
  InputSplit split2 = new InputSplitForTest(2);
  MRSplitProto proto1 = MRHelpers.createSplitProto(split1);
  MRSplitProto proto2 = MRHelpers.createSplitProto(split2);
  MRSplitsProto.Builder splitsProtoBuilder = MRSplitsProto.newBuilder();
  splitsProtoBuilder.addSplits(proto1);
  splitsProtoBuilder.addSplits(proto2);
  MRInputUserPayloadProto.Builder payloadProto = MRInputUserPayloadProto.newBuilder();
  payloadProto.setSplits(splitsProtoBuilder.build());
  payloadProto.setConfigurationBytes(confByteString);
  byte[] userPayload = payloadProto.build().toByteArray();

  TezRootInputInitializerContext context = new TezRootInputInitializerContextForTest(userPayload);
  MRInputSplitDistributor splitDist = new MRInputSplitDistributor();

  List<Event> events = splitDist.initialize(context);

  assertEquals(3, events.size());
  assertTrue(events.get(0) instanceof RootInputUpdatePayloadEvent);
  assertTrue(events.get(1) instanceof RootInputDataInformationEvent);
  assertTrue(events.get(2) instanceof RootInputDataInformationEvent);

  RootInputDataInformationEvent diEvent1 = (RootInputDataInformationEvent) (events.get(1));
  RootInputDataInformationEvent diEvent2 = (RootInputDataInformationEvent) (events.get(2));

  assertNull(diEvent1.getUserPayload());
  assertNull(diEvent2.getUserPayload());

  assertNotNull(diEvent1.getDeserializedUserPayload());
  assertNotNull(diEvent2.getDeserializedUserPayload());

  assertTrue(diEvent1.getDeserializedUserPayload() instanceof InputSplitForTest);
  assertEquals(1, ((InputSplitForTest) diEvent1.getDeserializedUserPayload()).identifier);

  assertTrue(diEvent2.getDeserializedUserPayload() instanceof InputSplitForTest);
  assertEquals(2, ((InputSplitForTest) diEvent2.getDeserializedUserPayload()).identifier);
}
 
示例27
private Vertex createVertexForStage(Configuration stageConf,
    Map<String, LocalResource> jobLocalResources,
    List<TaskLocationHint> locations, int stageNum, int totalStages)
    throws IOException {
  // stageNum starts from 0, goes till numStages - 1
  boolean isMap = false;
  if (stageNum == 0) {
    isMap = true;
  }

  int numTasks = isMap ? stageConf.getInt(MRJobConfig.NUM_MAPS, 0)
      : stageConf.getInt(MRJobConfig.NUM_REDUCES, 0);
  String processorName = isMap ? MapProcessor.class.getName()
      : ReduceProcessor.class.getName();
  String vertexName = null;
  if (isMap) {
    vertexName = MultiStageMRConfigUtil.getInitialMapVertexName();
  } else {
    if (stageNum == totalStages - 1) {
      vertexName = MultiStageMRConfigUtil.getFinalReduceVertexName();
    } else {
      vertexName = MultiStageMRConfigUtil
          .getIntermediateStageVertexName(stageNum);
    }
  }

  Resource taskResource = isMap ? MRHelpers.getResourceForMRMapper(stageConf)
      : MRHelpers.getResourceForMRReducer(stageConf);
  
  stageConf.set(MRJobConfig.MROUTPUT_FILE_NAME_PREFIX, "part");
  
  UserPayload vertexUserPayload = TezUtils.createUserPayloadFromConf(stageConf);
  Vertex vertex = Vertex.create(vertexName,
      ProcessorDescriptor.create(processorName).setUserPayload(vertexUserPayload),
      numTasks, taskResource);
  if (stageConf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT,
      TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT_DEFAULT)) {
    vertex.getProcessorDescriptor().setHistoryText(TezUtils.convertToHistoryText(stageConf));
  }

  if (isMap) {
    vertex.addDataSource("MRInput",
        configureMRInputWithLegacySplitsGenerated(stageConf, true));
  }
  // Map only jobs.
  if (stageNum == totalStages -1) {
    OutputDescriptor od = OutputDescriptor.create(MROutputLegacy.class.getName())
        .setUserPayload(vertexUserPayload);
    if (stageConf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT,
        TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT_DEFAULT)) {
      od.setHistoryText(TezUtils.convertToHistoryText(stageConf));
    }
    vertex.addDataSink("MROutput", DataSinkDescriptor.create(od,
        OutputCommitterDescriptor.create(MROutputCommitter.class.getName()), null));
  }

  Map<String, String> taskEnv = new HashMap<String, String>();
  setupMapReduceEnv(stageConf, taskEnv, isMap);

  Map<String, LocalResource> taskLocalResources =
      new TreeMap<String, LocalResource>();
  // PRECOMMIT Remove split localization for reduce tasks if it's being set
  // here
  taskLocalResources.putAll(jobLocalResources);

  String taskJavaOpts = isMap ? MRHelpers.getJavaOptsForMRMapper(stageConf)
      : MRHelpers.getJavaOptsForMRReducer(stageConf);

  vertex.setTaskEnvironment(taskEnv)
      .addTaskLocalFiles(taskLocalResources)
      .setLocationHint(VertexLocationHint.create(locations))
      .setTaskLaunchCmdOpts(taskJavaOpts);
  
  if (!isMap) {
    vertex.setVertexManagerPlugin((ShuffleVertexManager.createConfigBuilder(stageConf).build()));
  }

  if (LOG.isDebugEnabled()) {
    LOG.debug("Adding vertex to DAG" + ", vertexName="
        + vertex.getName() + ", processor="
        + vertex.getProcessorDescriptor().getClassName() + ", parallelism="
        + vertex.getParallelism() + ", javaOpts=" + vertex.getTaskLaunchCmdOpts()
        + ", resources=" + vertex.getTaskResource()
    // TODO Add localResources and Environment
    );
  }

  return vertex;
}
 
示例28
@Test(timeout = 5000)
  public void testMapProcessor() throws Exception {
    String dagName = "mrdag0";
    String vertexName = MultiStageMRConfigUtil.getInitialMapVertexName();
    JobConf jobConf = new JobConf(defaultConf);
    setUpJobConf(jobConf);

    MRHelpers.translateMRConfToTez(jobConf);
    jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 0);

    jobConf.setBoolean(MRJobConfig.MR_TEZ_SPLITS_VIA_EVENTS, false);

    jobConf.set(MRFrameworkConfigs.TASK_LOCAL_RESOURCE_DIR, new Path(workDir,
        "localized-resources").toUri().toString());
    
    Path mapInput = new Path(workDir, "map0");
    
    
    MapUtils.generateInputSplit(localFs, workDir, jobConf, mapInput, 10);

    InputSpec mapInputSpec = new InputSpec("NullSrcVertex",
        InputDescriptor.create(MRInputLegacy.class.getName())
            .setUserPayload(UserPayload.create(ByteBuffer.wrap(
                MRRuntimeProtos.MRInputUserPayloadProto.newBuilder()
                    .setConfigurationBytes(TezUtils.createByteStringFromConf(jobConf)).build()
                    .toByteArray()))),
        1);
    OutputSpec mapOutputSpec = new OutputSpec("NullDestVertex", 
        OutputDescriptor.create(OrderedPartitionedKVOutput.class.getName())
            .setUserPayload(TezUtils.createUserPayloadFromConf(jobConf)), 1);

    TezSharedExecutor sharedExecutor = new TezSharedExecutor(jobConf);
    LogicalIOProcessorRuntimeTask task = MapUtils.createLogicalTask(localFs, workDir, jobConf, 0,
        new Path(workDir, "map0"), new TestUmbilical(), dagName, vertexName,
        Collections.singletonList(mapInputSpec), Collections.singletonList(mapOutputSpec),
        sharedExecutor);

    task.initialize();
    task.run();
    task.close();
    sharedExecutor.shutdownNow();

    OutputContext outputContext = task.getOutputContexts().iterator().next();
    TezTaskOutput mapOutputs = new TezTaskOutputFiles(
        jobConf, outputContext.getUniqueIdentifier(),
        outputContext.getDagIdentifier());
    
    
    // TODO NEWTEZ FIXME OutputCommitter verification
//    MRTask mrTask = (MRTask)t.getProcessor();
//    Assert.assertEquals(TezNullOutputCommitter.class.getName(), mrTask
//        .getCommitter().getClass().getName());
//    t.close();

    Path mapOutputFile = getMapOutputFile(jobConf, outputContext);
    LOG.info("mapOutputFile = " + mapOutputFile);
    IFile.Reader reader =
        new IFile.Reader(localFs, mapOutputFile, null, null, null, false, 0, -1);
    LongWritable key = new LongWritable();
    Text value = new Text();
    DataInputBuffer keyBuf = new DataInputBuffer();
    DataInputBuffer valueBuf = new DataInputBuffer();
    long prev = Long.MIN_VALUE;
    while (reader.nextRawKey(keyBuf)) {
      reader.nextRawValue(valueBuf);
      key.readFields(keyBuf);
      value.readFields(valueBuf);
      if (prev != Long.MIN_VALUE) {
        assert(prev <= key.get());
        prev = key.get();
      }
      LOG.info("key = " + key.get() + "; value = " + value);
    }
    reader.close();
  }
 
示例29
@Test(timeout = 30000)
public void testMapProcessorProgress() throws Exception {
  String dagName = "mrdag0";
  String vertexName = MultiStageMRConfigUtil.getInitialMapVertexName();
  JobConf jobConf = new JobConf(defaultConf);
  setUpJobConf(jobConf);

  MRHelpers.translateMRConfToTez(jobConf);
  jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 0);

  jobConf.setBoolean(MRJobConfig.MR_TEZ_SPLITS_VIA_EVENTS, false);

  jobConf.set(MRFrameworkConfigs.TASK_LOCAL_RESOURCE_DIR, new Path(workDir,
      "localized-resources").toUri().toString());

  Path mapInput = new Path(workDir, "map0");


  MapUtils.generateInputSplit(localFs, workDir, jobConf, mapInput, 100000);

  InputSpec mapInputSpec = new InputSpec("NullSrcVertex",
      InputDescriptor.create(MRInputLegacy.class.getName())
          .setUserPayload(UserPayload.create(ByteBuffer.wrap(
              MRRuntimeProtos.MRInputUserPayloadProto.newBuilder()
                  .setConfigurationBytes(TezUtils.createByteStringFromConf
                      (jobConf)).build()
                  .toByteArray()))),
      1);
  OutputSpec mapOutputSpec = new OutputSpec("NullDestVertex",
      OutputDescriptor.create(OrderedPartitionedKVOutput.class.getName())
          .setUserPayload(TezUtils.createUserPayloadFromConf(jobConf)), 1);

  TezSharedExecutor sharedExecutor = new TezSharedExecutor(jobConf);
  final LogicalIOProcessorRuntimeTask task = MapUtils.createLogicalTask
      (localFs, workDir, jobConf, 0,
          new Path(workDir, "map0"), new TestUmbilical(), dagName, vertexName,
          Collections.singletonList(mapInputSpec),
          Collections.singletonList(mapOutputSpec), sharedExecutor);

  ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
  Thread monitorProgress = new Thread(new Runnable() {
    @Override
    public void run() {
      float prog = task.getProgress();
      if(prog > 0.0f && prog < 1.0f)
        progressUpdate = prog;
    }
  });

  task.initialize();
  scheduler.scheduleAtFixedRate(monitorProgress, 0, 1,
      TimeUnit.MILLISECONDS);
  task.run();
  Assert.assertTrue("Progress Updates should be captured!",
      progressUpdate > 0.0f && progressUpdate < 1.0f);
  task.close();
  sharedExecutor.shutdownNow();
}