Java源码示例:org.apache.flink.streaming.runtime.partitioner.ConfigurableStreamPartitioner

示例1
private static <OUT> RecordWriter<SerializationDelegate<StreamRecord<OUT>>> createRecordWriter(
		StreamEdge edge,
		int outputIndex,
		Environment environment,
		String taskName,
		long bufferTimeout) {
	@SuppressWarnings("unchecked")
	StreamPartitioner<OUT> outputPartitioner = (StreamPartitioner<OUT>) edge.getPartitioner();

	LOG.debug("Using partitioner {} for output {} of task {}", outputPartitioner, outputIndex, taskName);

	ResultPartitionWriter bufferWriter = environment.getWriter(outputIndex);

	// we initialize the partitioner here with the number of key groups (aka max. parallelism)
	if (outputPartitioner instanceof ConfigurableStreamPartitioner) {
		int numKeyGroups = bufferWriter.getNumTargetKeyGroups();
		if (0 < numKeyGroups) {
			((ConfigurableStreamPartitioner) outputPartitioner).configure(numKeyGroups);
		}
	}

	RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output =
		RecordWriter.createRecordWriter(bufferWriter, outputPartitioner, bufferTimeout, taskName);
	output.setMetricGroup(environment.getMetricGroup().getIOMetricGroup());
	return output;
}
 
示例2
private static <OUT> RecordWriter<SerializationDelegate<StreamRecord<OUT>>> createRecordWriter(
		StreamEdge edge,
		int outputIndex,
		Environment environment,
		String taskName,
		long bufferTimeout) {
	@SuppressWarnings("unchecked")
	StreamPartitioner<OUT> outputPartitioner = (StreamPartitioner<OUT>) edge.getPartitioner();

	LOG.debug("Using partitioner {} for output {} of task {}", outputPartitioner, outputIndex, taskName);

	ResultPartitionWriter bufferWriter = environment.getWriter(outputIndex);

	// we initialize the partitioner here with the number of key groups (aka max. parallelism)
	if (outputPartitioner instanceof ConfigurableStreamPartitioner) {
		int numKeyGroups = bufferWriter.getNumTargetKeyGroups();
		if (0 < numKeyGroups) {
			((ConfigurableStreamPartitioner) outputPartitioner).configure(numKeyGroups);
		}
	}

	RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output = new RecordWriterBuilder()
		.setChannelSelector(outputPartitioner)
		.setTimeout(bufferTimeout)
		.setTaskName(taskName)
		.build(bufferWriter);
	output.setMetricGroup(environment.getMetricGroup().getIOMetricGroup());
	return output;
}
 
示例3
private static <OUT> RecordWriter<SerializationDelegate<StreamRecord<OUT>>> createRecordWriter(
		StreamEdge edge,
		int outputIndex,
		Environment environment,
		String taskName,
		long bufferTimeout) {
	@SuppressWarnings("unchecked")
	StreamPartitioner<OUT> outputPartitioner = (StreamPartitioner<OUT>) edge.getPartitioner();

	LOG.debug("Using partitioner {} for output {} of task {}", outputPartitioner, outputIndex, taskName);

	ResultPartitionWriter bufferWriter = environment.getWriter(outputIndex);

	// we initialize the partitioner here with the number of key groups (aka max. parallelism)
	if (outputPartitioner instanceof ConfigurableStreamPartitioner) {
		int numKeyGroups = bufferWriter.getNumTargetKeyGroups();
		if (0 < numKeyGroups) {
			((ConfigurableStreamPartitioner) outputPartitioner).configure(numKeyGroups);
		}
	}

	RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output = new RecordWriterBuilder<SerializationDelegate<StreamRecord<OUT>>>()
		.setChannelSelector(outputPartitioner)
		.setTimeout(bufferTimeout)
		.setTaskName(taskName)
		.build(bufferWriter);
	output.setMetricGroup(environment.getMetricGroup().getIOMetricGroup());
	return output;
}