Java源码示例:org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer

示例1
@SuppressWarnings("unchecked")
public StreamTaskNetworkInput(
		CheckpointedInputGate checkpointedInputGate,
		TypeSerializer<?> inputSerializer,
		IOManager ioManager,
		int inputIndex) {
	this.checkpointedInputGate = checkpointedInputGate;
	this.deserializationDelegate = new NonReusingDeserializationDelegate<>(
		new StreamElementSerializer<>(inputSerializer));

	// Initialize one deserializer per input channel
	this.recordDeserializers = new SpillingAdaptiveSpanningRecordDeserializer[checkpointedInputGate.getNumberOfInputChannels()];
	for (int i = 0; i < recordDeserializers.length; i++) {
		recordDeserializers[i] = new SpillingAdaptiveSpanningRecordDeserializer<>(
			ioManager.getSpillingDirectoriesPaths());
	}

	this.inputIndex = inputIndex;
}
 
示例2
@SuppressWarnings("unchecked")
public StreamTaskNetworkInput(
		CheckpointedInputGate checkpointedInputGate,
		TypeSerializer<?> inputSerializer,
		IOManager ioManager,
		StatusWatermarkValve statusWatermarkValve,
		int inputIndex) {
	this.checkpointedInputGate = checkpointedInputGate;
	this.deserializationDelegate = new NonReusingDeserializationDelegate<>(
		new StreamElementSerializer<>(inputSerializer));

	// Initialize one deserializer per input channel
	this.recordDeserializers = new SpillingAdaptiveSpanningRecordDeserializer[checkpointedInputGate.getNumberOfInputChannels()];
	for (int i = 0; i < recordDeserializers.length; i++) {
		recordDeserializers[i] = new SpillingAdaptiveSpanningRecordDeserializer<>(
			ioManager.getSpillingDirectoriesPaths());
	}

	this.statusWatermarkValve = checkNotNull(statusWatermarkValve);
	this.inputIndex = inputIndex;
	this.channelIndexes = getChannelIndexes(checkpointedInputGate);
}
 
示例3
/**
 * Creates a new AbstractRecordReader that de-serializes records from the given input gate and
 * can spill partial records to disk, if they grow large.
 *
 * @param inputGate The input gate to read from.
 * @param tmpDirectories The temp directories. USed for spilling if the reader concurrently
 *                       reconstructs multiple large records.
 */
@SuppressWarnings("unchecked")
protected AbstractRecordReader(InputGate inputGate, String[] tmpDirectories) {
	super(inputGate);

	// Initialize one deserializer per input channel
	this.recordDeserializers = new SpillingAdaptiveSpanningRecordDeserializer[inputGate.getNumberOfInputChannels()];
	for (int i = 0; i < recordDeserializers.length; i++) {
		recordDeserializers[i] = new SpillingAdaptiveSpanningRecordDeserializer<T>(tmpDirectories);
	}
}
 
示例4
/**
 * Creates a new AbstractRecordReader that de-serializes records from the given input gate and
 * can spill partial records to disk, if they grow large.
 *
 * @param inputGate The input gate to read from.
 * @param tmpDirectories The temp directories. USed for spilling if the reader concurrently
 *                       reconstructs multiple large records.
 */
@SuppressWarnings("unchecked")
protected AbstractRecordReader(InputGate inputGate, String[] tmpDirectories) {
	super(inputGate);

	// Initialize one deserializer per input channel
	this.recordDeserializers = new SpillingAdaptiveSpanningRecordDeserializer[inputGate.getNumberOfInputChannels()];
	for (int i = 0; i < recordDeserializers.length; i++) {
		recordDeserializers[i] = new SpillingAdaptiveSpanningRecordDeserializer<T>(tmpDirectories);
	}
}
 
示例5
/**
 * Creates a new AbstractRecordReader that de-serializes records from the given input gate and
 * can spill partial records to disk, if they grow large.
 *
 * @param inputGate The input gate to read from.
 * @param tmpDirectories The temp directories. USed for spilling if the reader concurrently
 *                       reconstructs multiple large records.
 */
@SuppressWarnings("unchecked")
protected AbstractRecordReader(InputGate inputGate, String[] tmpDirectories) {
	super(inputGate);

	// Initialize one deserializer per input channel
	recordDeserializers = inputGate.getChannelInfos().stream()
		.collect(Collectors.toMap(
			Function.identity(),
			channelInfo -> new SpillingAdaptiveSpanningRecordDeserializer<>(tmpDirectories)));
}
 
示例6
/**
 * Tests that records are broadcast via {@link RecordWriter#broadcastEmit(IOReadableWritable)}.
 */
@Test
public void testBroadcastEmitRecord() throws Exception {
	final int numberOfChannels = 4;
	final int bufferSize = 32;
	final int numValues = 8;
	final int serializationLength = 4;

	@SuppressWarnings("unchecked")
	final Queue<BufferConsumer>[] queues = new Queue[numberOfChannels];
	for (int i = 0; i < numberOfChannels; i++) {
		queues[i] = new ArrayDeque<>();
	}

	final TestPooledBufferProvider bufferProvider = new TestPooledBufferProvider(Integer.MAX_VALUE, bufferSize);
	final ResultPartitionWriter partitionWriter = new CollectingPartitionWriter(queues, bufferProvider);
	final RecordWriter<SerializationTestType> writer = createRecordWriter(partitionWriter);
	final RecordDeserializer<SerializationTestType> deserializer = new SpillingAdaptiveSpanningRecordDeserializer<>(
		new String[]{ tempFolder.getRoot().getAbsolutePath() });

	final ArrayDeque<SerializationTestType> serializedRecords = new ArrayDeque<>();
	final Iterable<SerializationTestType> records = Util.randomRecords(numValues, SerializationTestTypeFactory.INT);
	for (SerializationTestType record : records) {
		serializedRecords.add(record);
		writer.broadcastEmit(record);
	}

	final int numRequiredBuffers = numValues / (bufferSize / (4 + serializationLength));
	if (isBroadcastWriter) {
		assertEquals(numRequiredBuffers, bufferProvider.getNumberOfCreatedBuffers());
	} else {
		assertEquals(numRequiredBuffers * numberOfChannels, bufferProvider.getNumberOfCreatedBuffers());
	}

	for (int i = 0; i < numberOfChannels; i++) {
		assertEquals(numRequiredBuffers, queues[i].size());
		verifyDeserializationResults(queues[i], deserializer, serializedRecords.clone(), numRequiredBuffers, numValues);
	}
}
 
示例7
/**
 * The results of emitting records via BroadcastPartitioner or broadcasting records directly are the same,
 * that is all the target channels can receive the whole outputs.
 *
 * @param isBroadcastEmit whether using {@link RecordWriter#broadcastEmit(IOReadableWritable)} or not
 */
private void emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(boolean isBroadcastEmit) throws Exception {
	final int numberOfChannels = 4;
	final int bufferSize = 32;
	final int numValues = 8;
	final int serializationLength = 4;

	@SuppressWarnings("unchecked")
	final Queue<BufferConsumer>[] queues = new Queue[numberOfChannels];
	for (int i = 0; i < numberOfChannels; i++) {
		queues[i] = new ArrayDeque<>();
	}

	final TestPooledBufferProvider bufferProvider = new TestPooledBufferProvider(Integer.MAX_VALUE, bufferSize);
	final ResultPartitionWriter partitionWriter = new CollectingPartitionWriter(queues, bufferProvider);
	final ChannelSelector selector = new OutputEmitter(ShipStrategyType.BROADCAST, 0);
	final RecordWriter<SerializationTestType> writer = RecordWriter.createRecordWriter(partitionWriter, selector, 0, "test");
	final RecordDeserializer<SerializationTestType> deserializer = new SpillingAdaptiveSpanningRecordDeserializer<>(
		new String[]{ tempFolder.getRoot().getAbsolutePath() });

	final ArrayDeque<SerializationTestType> serializedRecords = new ArrayDeque<>();
	final Iterable<SerializationTestType> records = Util.randomRecords(numValues, SerializationTestTypeFactory.INT);
	for (SerializationTestType record : records) {
		serializedRecords.add(record);

		if (isBroadcastEmit) {
			writer.broadcastEmit(record);
		} else {
			writer.emit(record);
		}
	}

	final int requiredBuffers = numValues / (bufferSize / (4 + serializationLength));
	for (int i = 0; i < numberOfChannels; i++) {
		assertEquals(requiredBuffers, queues[i].size());

		final ArrayDeque<SerializationTestType> expectedRecords = serializedRecords.clone();
		int assertRecords = 0;
		for (int j = 0; j < requiredBuffers; j++) {
			Buffer buffer = buildSingleBuffer(queues[i].remove());
			deserializer.setNextBuffer(buffer);

			assertRecords += DeserializationUtils.deserializeRecords(expectedRecords, deserializer);
		}
		Assert.assertEquals(numValues, assertRecords);
	}
}
 
示例8
@SuppressWarnings("unchecked")
public StreamInputProcessor(
		InputGate[] inputGates,
		TypeSerializer<IN> inputSerializer,
		StreamTask<?, ?> checkpointedTask,
		CheckpointingMode checkpointMode,
		Object lock,
		IOManager ioManager,
		Configuration taskManagerConfig,
		StreamStatusMaintainer streamStatusMaintainer,
		OneInputStreamOperator<IN, ?> streamOperator,
		TaskIOMetricGroup metrics,
		WatermarkGauge watermarkGauge) throws IOException {

	InputGate inputGate = InputGateUtil.createInputGate(inputGates);

	this.barrierHandler = InputProcessorUtil.createCheckpointBarrierHandler(
		checkpointedTask, checkpointMode, ioManager, inputGate, taskManagerConfig);

	this.lock = checkNotNull(lock);

	StreamElementSerializer<IN> ser = new StreamElementSerializer<>(inputSerializer);
	this.deserializationDelegate = new NonReusingDeserializationDelegate<>(ser);

	// Initialize one deserializer per input channel
	this.recordDeserializers = new SpillingAdaptiveSpanningRecordDeserializer[inputGate.getNumberOfInputChannels()];

	for (int i = 0; i < recordDeserializers.length; i++) {
		recordDeserializers[i] = new SpillingAdaptiveSpanningRecordDeserializer<>(
			ioManager.getSpillingDirectoriesPaths());
	}

	this.numInputChannels = inputGate.getNumberOfInputChannels();

	this.streamStatusMaintainer = checkNotNull(streamStatusMaintainer);
	this.streamOperator = checkNotNull(streamOperator);

	this.statusWatermarkValve = new StatusWatermarkValve(
			numInputChannels,
			new ForwardingValveOutputHandler(streamOperator, lock));

	this.watermarkGauge = watermarkGauge;
	metrics.gauge("checkpointAlignmentTime", barrierHandler::getAlignmentDurationNanos);
}
 
示例9
@SuppressWarnings("unchecked")
public StreamTwoInputProcessor(
		Collection<InputGate> inputGates1,
		Collection<InputGate> inputGates2,
		TypeSerializer<IN1> inputSerializer1,
		TypeSerializer<IN2> inputSerializer2,
		TwoInputStreamTask<IN1, IN2, ?> checkpointedTask,
		CheckpointingMode checkpointMode,
		Object lock,
		IOManager ioManager,
		Configuration taskManagerConfig,
		StreamStatusMaintainer streamStatusMaintainer,
		TwoInputStreamOperator<IN1, IN2, ?> streamOperator,
		TaskIOMetricGroup metrics,
		WatermarkGauge input1WatermarkGauge,
		WatermarkGauge input2WatermarkGauge) throws IOException {

	final InputGate inputGate = InputGateUtil.createInputGate(inputGates1, inputGates2);

	this.barrierHandler = InputProcessorUtil.createCheckpointBarrierHandler(
		checkpointedTask, checkpointMode, ioManager, inputGate, taskManagerConfig);

	this.lock = checkNotNull(lock);

	StreamElementSerializer<IN1> ser1 = new StreamElementSerializer<>(inputSerializer1);
	this.deserializationDelegate1 = new NonReusingDeserializationDelegate<>(ser1);

	StreamElementSerializer<IN2> ser2 = new StreamElementSerializer<>(inputSerializer2);
	this.deserializationDelegate2 = new NonReusingDeserializationDelegate<>(ser2);

	// Initialize one deserializer per input channel
	this.recordDeserializers = new SpillingAdaptiveSpanningRecordDeserializer[inputGate.getNumberOfInputChannels()];

	for (int i = 0; i < recordDeserializers.length; i++) {
		recordDeserializers[i] = new SpillingAdaptiveSpanningRecordDeserializer<>(
			ioManager.getSpillingDirectoriesPaths());
	}

	// determine which unioned channels belong to input 1 and which belong to input 2
	int numInputChannels1 = 0;
	for (InputGate gate: inputGates1) {
		numInputChannels1 += gate.getNumberOfInputChannels();
	}

	this.numInputChannels1 = numInputChannels1;
	this.numInputChannels2 = inputGate.getNumberOfInputChannels() - numInputChannels1;

	this.firstStatus = StreamStatus.ACTIVE;
	this.secondStatus = StreamStatus.ACTIVE;

	this.streamStatusMaintainer = checkNotNull(streamStatusMaintainer);
	this.streamOperator = checkNotNull(streamOperator);

	this.statusWatermarkValve1 = new StatusWatermarkValve(numInputChannels1, new ForwardingValveOutputHandler1(streamOperator, lock));
	this.statusWatermarkValve2 = new StatusWatermarkValve(numInputChannels2, new ForwardingValveOutputHandler2(streamOperator, lock));

	this.input1WatermarkGauge = input1WatermarkGauge;
	this.input2WatermarkGauge = input2WatermarkGauge;
	metrics.gauge("checkpointAlignmentTime", barrierHandler::getAlignmentDurationNanos);
}
 
示例10
/**
 * The results of emitting records via BroadcastPartitioner or broadcasting records directly are the same,
 * that is all the target channels can receive the whole outputs.
 *
 * @param isBroadcastEmit whether using {@link RecordWriter#broadcastEmit(IOReadableWritable)} or not
 */
private void emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(boolean isBroadcastEmit) throws Exception {
	final int numberOfChannels = 4;
	final int bufferSize = 32;
	final int numValues = 8;
	final int serializationLength = 4;

	@SuppressWarnings("unchecked")
	final Queue<BufferConsumer>[] queues = new Queue[numberOfChannels];
	for (int i = 0; i < numberOfChannels; i++) {
		queues[i] = new ArrayDeque<>();
	}

	final TestPooledBufferProvider bufferProvider = new TestPooledBufferProvider(Integer.MAX_VALUE, bufferSize);
	final ResultPartitionWriter partitionWriter = new CollectingPartitionWriter(queues, bufferProvider);
	final ChannelSelector selector = new OutputEmitter(ShipStrategyType.BROADCAST, 0);
	final RecordWriter<SerializationTestType> writer = new RecordWriterBuilder()
		.setChannelSelector(selector)
		.setTimeout(0)
		.build(partitionWriter);
	final RecordDeserializer<SerializationTestType> deserializer = new SpillingAdaptiveSpanningRecordDeserializer<>(
		new String[]{ tempFolder.getRoot().getAbsolutePath() });

	final ArrayDeque<SerializationTestType> serializedRecords = new ArrayDeque<>();
	final Iterable<SerializationTestType> records = Util.randomRecords(numValues, SerializationTestTypeFactory.INT);
	for (SerializationTestType record : records) {
		serializedRecords.add(record);

		if (isBroadcastEmit) {
			writer.broadcastEmit(record);
		} else {
			writer.emit(record);
		}
	}

	final int requiredBuffers = numValues / (bufferSize / (4 + serializationLength));
	for (int i = 0; i < numberOfChannels; i++) {
		assertEquals(requiredBuffers, queues[i].size());

		final ArrayDeque<SerializationTestType> expectedRecords = serializedRecords.clone();
		int assertRecords = 0;
		for (int j = 0; j < requiredBuffers; j++) {
			Buffer buffer = buildSingleBuffer(queues[i].remove());
			deserializer.setNextBuffer(buffer);

			assertRecords += DeserializationUtils.deserializeRecords(expectedRecords, deserializer);
		}
		Assert.assertEquals(numValues, assertRecords);
	}
}
 
示例11
@SuppressWarnings("unchecked")
public StreamTwoInputProcessor(
		Collection<InputGate> inputGates1,
		Collection<InputGate> inputGates2,
		TypeSerializer<IN1> inputSerializer1,
		TypeSerializer<IN2> inputSerializer2,
		TwoInputStreamTask<IN1, IN2, ?> checkpointedTask,
		CheckpointingMode checkpointMode,
		Object lock,
		IOManager ioManager,
		Configuration taskManagerConfig,
		StreamStatusMaintainer streamStatusMaintainer,
		TwoInputStreamOperator<IN1, IN2, ?> streamOperator,
		TaskIOMetricGroup metrics,
		WatermarkGauge input1WatermarkGauge,
		WatermarkGauge input2WatermarkGauge,
		String taskName,
		OperatorChain<?, ?> operatorChain) throws IOException {

	final InputGate inputGate = InputGateUtil.createInputGate(inputGates1, inputGates2);

	this.barrierHandler = InputProcessorUtil.createCheckpointedInputGate(
		checkpointedTask,
		checkpointMode,
		ioManager,
		inputGate,
		taskManagerConfig,
		taskName);

	this.lock = checkNotNull(lock);

	StreamElementSerializer<IN1> ser1 = new StreamElementSerializer<>(inputSerializer1);
	this.deserializationDelegate1 = new NonReusingDeserializationDelegate<>(ser1);

	StreamElementSerializer<IN2> ser2 = new StreamElementSerializer<>(inputSerializer2);
	this.deserializationDelegate2 = new NonReusingDeserializationDelegate<>(ser2);

	// Initialize one deserializer per input channel
	this.recordDeserializers = new SpillingAdaptiveSpanningRecordDeserializer[inputGate.getNumberOfInputChannels()];

	for (int i = 0; i < recordDeserializers.length; i++) {
		recordDeserializers[i] = new SpillingAdaptiveSpanningRecordDeserializer<>(
			ioManager.getSpillingDirectoriesPaths());
	}

	// determine which unioned channels belong to input 1 and which belong to input 2
	int numInputChannels1 = 0;
	for (InputGate gate: inputGates1) {
		numInputChannels1 += gate.getNumberOfInputChannels();
	}

	this.numInputChannels1 = numInputChannels1;
	this.numInputChannels2 = inputGate.getNumberOfInputChannels() - numInputChannels1;

	this.firstStatus = StreamStatus.ACTIVE;
	this.secondStatus = StreamStatus.ACTIVE;

	this.streamStatusMaintainer = checkNotNull(streamStatusMaintainer);
	this.streamOperator = checkNotNull(streamOperator);

	this.statusWatermarkValve1 = new StatusWatermarkValve(numInputChannels1, new ForwardingValveOutputHandler1(streamOperator, lock));
	this.statusWatermarkValve2 = new StatusWatermarkValve(numInputChannels2, new ForwardingValveOutputHandler2(streamOperator, lock));

	this.input1WatermarkGauge = input1WatermarkGauge;
	this.input2WatermarkGauge = input2WatermarkGauge;
	metrics.gauge("checkpointAlignmentTime", barrierHandler::getAlignmentDurationNanos);

	this.operatorChain = checkNotNull(operatorChain);

	this.finishedChannels1 = new BitSet();
	this.finishedChannels2 = new BitSet();
}
 
示例12
/**
 * Tests the number of requested buffers and results are correct in the case of switching
 * modes between {@link BroadcastRecordWriter#broadcastEmit(IOReadableWritable)} and
 * {@link BroadcastRecordWriter#randomEmit(IOReadableWritable)}.
 */
@Test
public void testBroadcastMixedRandomEmitRecord() throws Exception {
	final int numberOfChannels = 4;
	final int numberOfRecords = 8;
	final int bufferSize = 32;

	@SuppressWarnings("unchecked")
	final Queue<BufferConsumer>[] queues = new Queue[numberOfChannels];
	for (int i = 0; i < numberOfChannels; i++) {
		queues[i] = new ArrayDeque<>();
	}

	final TestPooledBufferProvider bufferProvider = new TestPooledBufferProvider(Integer.MAX_VALUE, bufferSize);
	final ResultPartitionWriter partitionWriter = new CollectingPartitionWriter(queues, bufferProvider);
	final BroadcastRecordWriter<SerializationTestType> writer = new BroadcastRecordWriter<>(partitionWriter, 0, "test");
	final RecordDeserializer<SerializationTestType> deserializer = new SpillingAdaptiveSpanningRecordDeserializer<>(
		new String[]{ tempFolder.getRoot().getAbsolutePath() });

	// generate the configured number of int values as global record set
	final Iterable<SerializationTestType> records = Util.randomRecords(numberOfRecords, SerializationTestTypeFactory.INT);
	// restore the corresponding record set for every input channel
	final Map<Integer, ArrayDeque<SerializationTestType>> serializedRecords = new HashMap<>();
	for (int i = 0; i < numberOfChannels; i++) {
		serializedRecords.put(i, new ArrayDeque<>());
	}

	// every record in global set would both emit into one random channel and broadcast to all the channels
	int index = 0;
	for (SerializationTestType record : records) {
		int randomChannel = index++ % numberOfChannels;
		writer.randomEmit(record, randomChannel);
		serializedRecords.get(randomChannel).add(record);

		writer.broadcastEmit(record);
		for (int i = 0; i < numberOfChannels; i++) {
			serializedRecords.get(i).add(record);
		}
	}

	final int numberOfCreatedBuffers = bufferProvider.getNumberOfCreatedBuffers();
	// verify the expected number of requested buffers, and it would always request a new buffer while random emitting
	assertEquals(numberOfRecords, numberOfCreatedBuffers);

	for (int i = 0; i < numberOfChannels; i++) {
		// every channel would queue the number of above crated buffers
		assertEquals(numberOfRecords, queues[i].size());

		final int excessRandomRecords = i < numberOfRecords % numberOfChannels ? 1 : 0;
		final int numberOfRandomRecords = numberOfRecords / numberOfChannels + excessRandomRecords;
		final int numberOfTotalRecords = numberOfRecords + numberOfRandomRecords;
		// verify the data correctness in every channel queue
		verifyDeserializationResults(
			queues[i],
			deserializer,
			serializedRecords.get(i),
			numberOfCreatedBuffers,
			numberOfTotalRecords);
	}
}