Java源码示例:backtype.storm.task.WorkerTopologyContext

示例1
public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
  
  System.out.println("preparing HBaseStreamPartitioner for streamId " + stream.get_streamId());
  this.targetTasks = targetTasks;
  this.targetTasksSize = this.targetTasks.size();

  Configuration conf = HBaseConfiguration.create();
  try {
    hTable = new HTable(conf, tableName);
    refreshRegionInfo(tableName);

    System.out.println("regionStartKeyRegionNameMap: " + regionStartKeyRegionNameMap);

  } catch (IOException e) {
    e.printStackTrace();
  }

}
 
示例2
public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
  this.targetTasks = targetTasks;
  int numTasks = targetTasks.size();
  if (numTasks % numShards != 0)
    throw new IllegalArgumentException("Number of tasks ("+numTasks+") should be a multiple of the number of shards ("+numShards+")!");

  this.tasksPerShard = numTasks/numShards;
  this.random = new UniformIntegerDistribution(0, tasksPerShard-1);

  CompositeIdRouter docRouter =  new CompositeIdRouter();
  this.ranges = docRouter.partitionRange(numShards, docRouter.fullRange());
}
 
示例3
public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
  this.targetTasks = targetTasks;
  int numTasks = targetTasks.size();
  int numShards = initShardInfo(); // setup for doing shard to task mapping 
  if (numTasks % numShards != 0)
    throw new IllegalArgumentException("Number of tasks ("+numTasks+") should be a multiple of the number of shards ("+numShards+")!");

  this.tasksPerShard = numTasks/numShards;
  this.random = new UniformIntegerDistribution(0, tasksPerShard-1);
}
 
示例4
@Override
public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
    random = new Random();
    choices = new ArrayList<List<Integer>>(targetTasks.size());
    for (Integer i : targetTasks) {
        choices.add(Arrays.asList(i));
    }
    Collections.shuffle(choices, random);
    current = new AtomicInteger(0);
}
 
示例5
@Override
public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
    this.outdegreeTasks = new ArrayList<>(targetTasks);
    shuffleGroupingDelegate = new ShuffleGrouping();
    shuffleGroupingDelegate.prepare(context, stream, targetTasks);
    globalGroupingDelegate = new GlobalGrouping();
    globalGroupingDelegate.prepare(context, stream, targetTasks);
    connectedTargetIds = new HashMap<>();
    for (Integer targetId : targetTasks) {
        String targetComponentId = context.getComponentId(targetId);
        connectedTargetIds.put(targetComponentId, targetId);
    }
    LOG.info("OutDegree components: [{}]", StringUtils.join(connectedTargetIds.values(), ","));
}
 
示例6
@Override
public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
	List<Integer> sources = context.getComponentTasks(stream.get_componentId());
	for(int i=0; i<sources.size(); i++){
		idMapping.put(sources.get(i), targetTasks.get(i%targetTasks.size()));
	}
}
 
示例7
@Override
public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
	this.targetTasks = targetTasks;
	
	Fields tupleFields = context.getComponentOutputFields(stream);
	for(int i=0; i<tupleFields.size(); i++){
		if(tupleFields.get(i).equals(CVParticleSerializer.STREAMID)){
			streamIdIndex = i;
		}else if(tupleFields.get(i).equals(CVParticleSerializer.SEQUENCENR)){
			sequenceNrIndex = i;
		}
	}
}
 
示例8
public WorkerTopologyContext makeWorkerTopologyContext(StormTopology topology) {
    Map stormConf = workerData.getStormConf();
    String topologyId = workerData.getTopologyId();

    HashMap<String, Map<String, Fields>> componentToStreamToFields =
            workerData.generateComponentToStreamToFields(topology);

    return new WorkerTopologyContext(topology, stormConf, workerData.getTasksToComponent(),
            workerData.getComponentToSortedTasks(), componentToStreamToFields,
            topologyId, resourcePath, workerId, workerData.getPort(), workerTasks,
            workerData.getDefaultResources(), workerData.getUserResources());
}
 
示例9
public void updateKryoSerializer() {
    WorkerTopologyContext workerTopologyContext = contextMaker.makeWorkerTopologyContext(sysTopology);
    KryoTupleDeserializer kryoTupleDeserializer = new KryoTupleDeserializer(stormConf, workerTopologyContext, workerTopologyContext.getRawTopology());
    KryoTupleSerializer kryoTupleSerializer = new KryoTupleSerializer(stormConf, workerTopologyContext.getRawTopology());

    atomKryoDeserializer.getAndSet(kryoTupleDeserializer);
    atomKryoSerializer.getAndSet(kryoTupleSerializer);
}
 
示例10
protected List<AsyncLoopThread> setDeserializeThreads() {
    WorkerTopologyContext workerTopologyContext = contextMaker.makeWorkerTopologyContext(sysTopology);
    int tasksNum = shutdownTasks.size();
    double workerRatio = ConfigExtension.getWorkerDeserializeThreadRatio(stormConf);
    int workerDeserThreadNum = Utils.getInt(Math.ceil(workerRatio * tasksNum));
    if (workerDeserThreadNum > 0 && tasksNum > 0) {
        double average = tasksNum / (double) workerDeserThreadNum;
        for (int i = 0; i < workerDeserThreadNum; i++) {
            int startRunTaskIndex = Utils.getInt(Math.rint(average * i));
            deserializeThreads.add(new AsyncLoopThread(new WorkerDeserializeRunnable(
                    shutdownTasks, stormConf, workerTopologyContext, startRunTaskIndex, i)));
        }
    }
    return deserializeThreads;
}
 
示例11
protected List<AsyncLoopThread> setSerializeThreads() {
    WorkerTopologyContext workerTopologyContext = contextMaker.makeWorkerTopologyContext(sysTopology);
    int tasksNum = shutdownTasks.size();
    double workerRatio = ConfigExtension.getWorkerSerializeThreadRatio(stormConf);
    int workerSerialThreadNum = Utils.getInt(Math.ceil(workerRatio * tasksNum));
    if (workerSerialThreadNum > 0 && tasksNum > 0) {
        double average = tasksNum / (double) workerSerialThreadNum;
        for (int i = 0; i < workerSerialThreadNum; i++) {
            int startRunTaskIndex = Utils.getInt(Math.rint(average * i));
            serializeThreads.add(new AsyncLoopThread(new WorkerSerializeRunnable(
                    shutdownTasks, stormConf, workerTopologyContext, startRunTaskIndex, i)));
        }
    }
    return serializeThreads;
}
 
示例12
@Override
public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> tasks) {
    List<Integer> sourceTasks = new ArrayList<Integer>(context.getComponentTasks(stream.get_componentId()));
    Collections.sort(sourceTasks);
    if (sourceTasks.size() != tasks.size()) {
        throw new RuntimeException("Can only do an identity grouping when source and target have same number of tasks");
    }
    tasks = new ArrayList<Integer>(tasks);
    Collections.sort(tasks);
    for (int i = 0; i < sourceTasks.size(); i++) {
        int s = sourceTasks.get(i);
        int t = tasks.get(i);
        _precomputed.put(s, Arrays.asList(t));
    }
}
 
示例13
@Override
public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
    this.targetTasks = targetTasks;
    targetTaskStats = new long[this.targetTasks.size()];
    if (this.fields != null) {
        this.outFields = context.getComponentOutputFields(stream);
    }
}
 
示例14
@Override
public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
    targetTasks = new ArrayList<Integer>(targetTasks);
    Collections.sort(targetTasks);
    _outTasks = new ArrayList<Integer>();
    for (int i = 0; i < _n; i++) {
        _outTasks.add(targetTasks.get(i));
    }
}
 
示例15
@Override
public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
    this.targetTasks = new ArrayList<>(targetTasks);
}
 
示例16
@Ignore
@Test
public void testRoutingByCustomGrouping() throws Exception {
    Config conf = new Config();
    conf.setNumWorkers(2); // use two worker processes
    TopologyBuilder topologyBuilder = new TopologyBuilder();
    topologyBuilder.setSpout("blue-spout", new BlueSpout()); // parallelism hint

    topologyBuilder.setBolt("green-bolt-1", new GreenBolt(0)).setNumTasks(2)
        .customGrouping("blue-spout", new CustomStreamGrouping() {
            int count = 0;
            List<Integer> targetTask;

            @Override
            public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
                this.targetTask = targetTasks;
            }

            @Override
            public List<Integer> chooseTasks(int taskId, List<Object> values) {
                if (count % 2 == 0) {
                    count++;
                    return Arrays.asList(targetTask.get(0));
                } else {
                    count++;
                    return Arrays.asList(targetTask.get(1));
                }
            }
        });

    LocalCluster cluster = new LocalCluster();
    cluster.submitTopology("mytopology", new HashMap(), topologyBuilder.createTopology());

    while (true) {
        try {
            Thread.sleep(1000);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
 
示例17
@Override
public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
	this.targetTasks = targetTasks;		
}
 
示例18
@Override
public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
	this.targetTasks = targetTasks;
}
 
示例19
@Override
public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
	this.targetTasks = targetTasks;
}
 
示例20
@Override
public void prepare(WorkerTopologyContext context, GlobalStreamId streamId, List<Integer> targetTasks) {
	this.targetTasks = targetTasks;
}
 
示例21
@Override
public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
    _targets = targetTasks;
}
 
示例22
@Override
public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targets) {
    List<Integer> sorted = new ArrayList<Integer>(targets);
    Collections.sort(sorted);
    target = Arrays.asList(sorted.get(0));
}
 
示例23
/**
 * Tells the stream grouping at runtime the tasks in the target bolt.
 * This information should be used in chooseTasks to determine the target tasks.
 * <p>
 * It also tells the grouping the metadata on the stream this grouping will be used on.
 */
void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks);
 
示例24
/**
 * Tells the stream grouping at runtime the tasks in the target bolt.
 * This information should be used in chooseTasks to determine the target tasks.
 * It also tells the grouping the metadata on the stream this grouping will be used on.
 */
void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks);
 
示例25
/**
 * This method is called when a worker is started
 *
 * @param stormConf The Storm configuration for this worker
 * @param context This object can be used to get information about this worker's place within the topology
 */
@Override
public void start(Map stormConf, WorkerTopologyContext context) {
    // NOOP
}
 
示例26
/**
 * This method is called when a worker is started
 *
 * @param stormConf The Storm configuration for this worker
 * @param context This object can be used to get information about this worker's place within the topology
 */
void start(Map stormConf, WorkerTopologyContext context);