Java源码示例:backtype.storm.task.WorkerTopologyContext
示例1
public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
System.out.println("preparing HBaseStreamPartitioner for streamId " + stream.get_streamId());
this.targetTasks = targetTasks;
this.targetTasksSize = this.targetTasks.size();
Configuration conf = HBaseConfiguration.create();
try {
hTable = new HTable(conf, tableName);
refreshRegionInfo(tableName);
System.out.println("regionStartKeyRegionNameMap: " + regionStartKeyRegionNameMap);
} catch (IOException e) {
e.printStackTrace();
}
}
示例2
public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
this.targetTasks = targetTasks;
int numTasks = targetTasks.size();
if (numTasks % numShards != 0)
throw new IllegalArgumentException("Number of tasks ("+numTasks+") should be a multiple of the number of shards ("+numShards+")!");
this.tasksPerShard = numTasks/numShards;
this.random = new UniformIntegerDistribution(0, tasksPerShard-1);
CompositeIdRouter docRouter = new CompositeIdRouter();
this.ranges = docRouter.partitionRange(numShards, docRouter.fullRange());
}
示例3
public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
this.targetTasks = targetTasks;
int numTasks = targetTasks.size();
int numShards = initShardInfo(); // setup for doing shard to task mapping
if (numTasks % numShards != 0)
throw new IllegalArgumentException("Number of tasks ("+numTasks+") should be a multiple of the number of shards ("+numShards+")!");
this.tasksPerShard = numTasks/numShards;
this.random = new UniformIntegerDistribution(0, tasksPerShard-1);
}
示例4
@Override
public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
random = new Random();
choices = new ArrayList<List<Integer>>(targetTasks.size());
for (Integer i : targetTasks) {
choices.add(Arrays.asList(i));
}
Collections.shuffle(choices, random);
current = new AtomicInteger(0);
}
示例5
@Override
public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
this.outdegreeTasks = new ArrayList<>(targetTasks);
shuffleGroupingDelegate = new ShuffleGrouping();
shuffleGroupingDelegate.prepare(context, stream, targetTasks);
globalGroupingDelegate = new GlobalGrouping();
globalGroupingDelegate.prepare(context, stream, targetTasks);
connectedTargetIds = new HashMap<>();
for (Integer targetId : targetTasks) {
String targetComponentId = context.getComponentId(targetId);
connectedTargetIds.put(targetComponentId, targetId);
}
LOG.info("OutDegree components: [{}]", StringUtils.join(connectedTargetIds.values(), ","));
}
示例6
@Override
public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
List<Integer> sources = context.getComponentTasks(stream.get_componentId());
for(int i=0; i<sources.size(); i++){
idMapping.put(sources.get(i), targetTasks.get(i%targetTasks.size()));
}
}
示例7
@Override
public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
this.targetTasks = targetTasks;
Fields tupleFields = context.getComponentOutputFields(stream);
for(int i=0; i<tupleFields.size(); i++){
if(tupleFields.get(i).equals(CVParticleSerializer.STREAMID)){
streamIdIndex = i;
}else if(tupleFields.get(i).equals(CVParticleSerializer.SEQUENCENR)){
sequenceNrIndex = i;
}
}
}
示例8
public WorkerTopologyContext makeWorkerTopologyContext(StormTopology topology) {
Map stormConf = workerData.getStormConf();
String topologyId = workerData.getTopologyId();
HashMap<String, Map<String, Fields>> componentToStreamToFields =
workerData.generateComponentToStreamToFields(topology);
return new WorkerTopologyContext(topology, stormConf, workerData.getTasksToComponent(),
workerData.getComponentToSortedTasks(), componentToStreamToFields,
topologyId, resourcePath, workerId, workerData.getPort(), workerTasks,
workerData.getDefaultResources(), workerData.getUserResources());
}
示例9
public void updateKryoSerializer() {
WorkerTopologyContext workerTopologyContext = contextMaker.makeWorkerTopologyContext(sysTopology);
KryoTupleDeserializer kryoTupleDeserializer = new KryoTupleDeserializer(stormConf, workerTopologyContext, workerTopologyContext.getRawTopology());
KryoTupleSerializer kryoTupleSerializer = new KryoTupleSerializer(stormConf, workerTopologyContext.getRawTopology());
atomKryoDeserializer.getAndSet(kryoTupleDeserializer);
atomKryoSerializer.getAndSet(kryoTupleSerializer);
}
示例10
protected List<AsyncLoopThread> setDeserializeThreads() {
WorkerTopologyContext workerTopologyContext = contextMaker.makeWorkerTopologyContext(sysTopology);
int tasksNum = shutdownTasks.size();
double workerRatio = ConfigExtension.getWorkerDeserializeThreadRatio(stormConf);
int workerDeserThreadNum = Utils.getInt(Math.ceil(workerRatio * tasksNum));
if (workerDeserThreadNum > 0 && tasksNum > 0) {
double average = tasksNum / (double) workerDeserThreadNum;
for (int i = 0; i < workerDeserThreadNum; i++) {
int startRunTaskIndex = Utils.getInt(Math.rint(average * i));
deserializeThreads.add(new AsyncLoopThread(new WorkerDeserializeRunnable(
shutdownTasks, stormConf, workerTopologyContext, startRunTaskIndex, i)));
}
}
return deserializeThreads;
}
示例11
protected List<AsyncLoopThread> setSerializeThreads() {
WorkerTopologyContext workerTopologyContext = contextMaker.makeWorkerTopologyContext(sysTopology);
int tasksNum = shutdownTasks.size();
double workerRatio = ConfigExtension.getWorkerSerializeThreadRatio(stormConf);
int workerSerialThreadNum = Utils.getInt(Math.ceil(workerRatio * tasksNum));
if (workerSerialThreadNum > 0 && tasksNum > 0) {
double average = tasksNum / (double) workerSerialThreadNum;
for (int i = 0; i < workerSerialThreadNum; i++) {
int startRunTaskIndex = Utils.getInt(Math.rint(average * i));
serializeThreads.add(new AsyncLoopThread(new WorkerSerializeRunnable(
shutdownTasks, stormConf, workerTopologyContext, startRunTaskIndex, i)));
}
}
return serializeThreads;
}
示例12
@Override
public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> tasks) {
List<Integer> sourceTasks = new ArrayList<Integer>(context.getComponentTasks(stream.get_componentId()));
Collections.sort(sourceTasks);
if (sourceTasks.size() != tasks.size()) {
throw new RuntimeException("Can only do an identity grouping when source and target have same number of tasks");
}
tasks = new ArrayList<Integer>(tasks);
Collections.sort(tasks);
for (int i = 0; i < sourceTasks.size(); i++) {
int s = sourceTasks.get(i);
int t = tasks.get(i);
_precomputed.put(s, Arrays.asList(t));
}
}
示例13
@Override
public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
this.targetTasks = targetTasks;
targetTaskStats = new long[this.targetTasks.size()];
if (this.fields != null) {
this.outFields = context.getComponentOutputFields(stream);
}
}
示例14
@Override
public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
targetTasks = new ArrayList<Integer>(targetTasks);
Collections.sort(targetTasks);
_outTasks = new ArrayList<Integer>();
for (int i = 0; i < _n; i++) {
_outTasks.add(targetTasks.get(i));
}
}
示例15
@Override
public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
this.targetTasks = new ArrayList<>(targetTasks);
}
示例16
@Ignore
@Test
public void testRoutingByCustomGrouping() throws Exception {
Config conf = new Config();
conf.setNumWorkers(2); // use two worker processes
TopologyBuilder topologyBuilder = new TopologyBuilder();
topologyBuilder.setSpout("blue-spout", new BlueSpout()); // parallelism hint
topologyBuilder.setBolt("green-bolt-1", new GreenBolt(0)).setNumTasks(2)
.customGrouping("blue-spout", new CustomStreamGrouping() {
int count = 0;
List<Integer> targetTask;
@Override
public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
this.targetTask = targetTasks;
}
@Override
public List<Integer> chooseTasks(int taskId, List<Object> values) {
if (count % 2 == 0) {
count++;
return Arrays.asList(targetTask.get(0));
} else {
count++;
return Arrays.asList(targetTask.get(1));
}
}
});
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("mytopology", new HashMap(), topologyBuilder.createTopology());
while (true) {
try {
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
}
}
示例17
@Override
public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
this.targetTasks = targetTasks;
}
示例18
@Override
public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
this.targetTasks = targetTasks;
}
示例19
@Override
public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
this.targetTasks = targetTasks;
}
示例20
@Override
public void prepare(WorkerTopologyContext context, GlobalStreamId streamId, List<Integer> targetTasks) {
this.targetTasks = targetTasks;
}
示例21
@Override
public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
_targets = targetTasks;
}
示例22
@Override
public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targets) {
List<Integer> sorted = new ArrayList<Integer>(targets);
Collections.sort(sorted);
target = Arrays.asList(sorted.get(0));
}
示例23
/**
* Tells the stream grouping at runtime the tasks in the target bolt.
* This information should be used in chooseTasks to determine the target tasks.
* <p>
* It also tells the grouping the metadata on the stream this grouping will be used on.
*/
void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks);
示例24
/**
* Tells the stream grouping at runtime the tasks in the target bolt.
* This information should be used in chooseTasks to determine the target tasks.
* It also tells the grouping the metadata on the stream this grouping will be used on.
*/
void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks);
示例25
/**
* This method is called when a worker is started
*
* @param stormConf The Storm configuration for this worker
* @param context This object can be used to get information about this worker's place within the topology
*/
@Override
public void start(Map stormConf, WorkerTopologyContext context) {
// NOOP
}
示例26
/**
* This method is called when a worker is started
*
* @param stormConf The Storm configuration for this worker
* @param context This object can be used to get information about this worker's place within the topology
*/
void start(Map stormConf, WorkerTopologyContext context);