Java源码示例:org.apache.flink.streaming.runtime.partitioner.ConfigurableStreamPartitioner
示例1
private static <OUT> RecordWriter<SerializationDelegate<StreamRecord<OUT>>> createRecordWriter(
StreamEdge edge,
int outputIndex,
Environment environment,
String taskName,
long bufferTimeout) {
@SuppressWarnings("unchecked")
StreamPartitioner<OUT> outputPartitioner = (StreamPartitioner<OUT>) edge.getPartitioner();
LOG.debug("Using partitioner {} for output {} of task {}", outputPartitioner, outputIndex, taskName);
ResultPartitionWriter bufferWriter = environment.getWriter(outputIndex);
// we initialize the partitioner here with the number of key groups (aka max. parallelism)
if (outputPartitioner instanceof ConfigurableStreamPartitioner) {
int numKeyGroups = bufferWriter.getNumTargetKeyGroups();
if (0 < numKeyGroups) {
((ConfigurableStreamPartitioner) outputPartitioner).configure(numKeyGroups);
}
}
RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output =
RecordWriter.createRecordWriter(bufferWriter, outputPartitioner, bufferTimeout, taskName);
output.setMetricGroup(environment.getMetricGroup().getIOMetricGroup());
return output;
}
示例2
private static <OUT> RecordWriter<SerializationDelegate<StreamRecord<OUT>>> createRecordWriter(
StreamEdge edge,
int outputIndex,
Environment environment,
String taskName,
long bufferTimeout) {
@SuppressWarnings("unchecked")
StreamPartitioner<OUT> outputPartitioner = (StreamPartitioner<OUT>) edge.getPartitioner();
LOG.debug("Using partitioner {} for output {} of task {}", outputPartitioner, outputIndex, taskName);
ResultPartitionWriter bufferWriter = environment.getWriter(outputIndex);
// we initialize the partitioner here with the number of key groups (aka max. parallelism)
if (outputPartitioner instanceof ConfigurableStreamPartitioner) {
int numKeyGroups = bufferWriter.getNumTargetKeyGroups();
if (0 < numKeyGroups) {
((ConfigurableStreamPartitioner) outputPartitioner).configure(numKeyGroups);
}
}
RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output = new RecordWriterBuilder()
.setChannelSelector(outputPartitioner)
.setTimeout(bufferTimeout)
.setTaskName(taskName)
.build(bufferWriter);
output.setMetricGroup(environment.getMetricGroup().getIOMetricGroup());
return output;
}
示例3
private static <OUT> RecordWriter<SerializationDelegate<StreamRecord<OUT>>> createRecordWriter(
StreamEdge edge,
int outputIndex,
Environment environment,
String taskName,
long bufferTimeout) {
@SuppressWarnings("unchecked")
StreamPartitioner<OUT> outputPartitioner = (StreamPartitioner<OUT>) edge.getPartitioner();
LOG.debug("Using partitioner {} for output {} of task {}", outputPartitioner, outputIndex, taskName);
ResultPartitionWriter bufferWriter = environment.getWriter(outputIndex);
// we initialize the partitioner here with the number of key groups (aka max. parallelism)
if (outputPartitioner instanceof ConfigurableStreamPartitioner) {
int numKeyGroups = bufferWriter.getNumTargetKeyGroups();
if (0 < numKeyGroups) {
((ConfigurableStreamPartitioner) outputPartitioner).configure(numKeyGroups);
}
}
RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output = new RecordWriterBuilder<SerializationDelegate<StreamRecord<OUT>>>()
.setChannelSelector(outputPartitioner)
.setTimeout(bufferTimeout)
.setTaskName(taskName)
.build(bufferWriter);
output.setMetricGroup(environment.getMetricGroup().getIOMetricGroup());
return output;
}