Java源码示例:org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer
示例1
@SuppressWarnings("unchecked")
public StreamTaskNetworkInput(
CheckpointedInputGate checkpointedInputGate,
TypeSerializer<?> inputSerializer,
IOManager ioManager,
int inputIndex) {
this.checkpointedInputGate = checkpointedInputGate;
this.deserializationDelegate = new NonReusingDeserializationDelegate<>(
new StreamElementSerializer<>(inputSerializer));
// Initialize one deserializer per input channel
this.recordDeserializers = new SpillingAdaptiveSpanningRecordDeserializer[checkpointedInputGate.getNumberOfInputChannels()];
for (int i = 0; i < recordDeserializers.length; i++) {
recordDeserializers[i] = new SpillingAdaptiveSpanningRecordDeserializer<>(
ioManager.getSpillingDirectoriesPaths());
}
this.inputIndex = inputIndex;
}
示例2
@SuppressWarnings("unchecked")
public StreamTaskNetworkInput(
CheckpointedInputGate checkpointedInputGate,
TypeSerializer<?> inputSerializer,
IOManager ioManager,
StatusWatermarkValve statusWatermarkValve,
int inputIndex) {
this.checkpointedInputGate = checkpointedInputGate;
this.deserializationDelegate = new NonReusingDeserializationDelegate<>(
new StreamElementSerializer<>(inputSerializer));
// Initialize one deserializer per input channel
this.recordDeserializers = new SpillingAdaptiveSpanningRecordDeserializer[checkpointedInputGate.getNumberOfInputChannels()];
for (int i = 0; i < recordDeserializers.length; i++) {
recordDeserializers[i] = new SpillingAdaptiveSpanningRecordDeserializer<>(
ioManager.getSpillingDirectoriesPaths());
}
this.statusWatermarkValve = checkNotNull(statusWatermarkValve);
this.inputIndex = inputIndex;
this.channelIndexes = getChannelIndexes(checkpointedInputGate);
}
示例3
/**
* Creates a new AbstractRecordReader that de-serializes records from the given input gate and
* can spill partial records to disk, if they grow large.
*
* @param inputGate The input gate to read from.
* @param tmpDirectories The temp directories. USed for spilling if the reader concurrently
* reconstructs multiple large records.
*/
@SuppressWarnings("unchecked")
protected AbstractRecordReader(InputGate inputGate, String[] tmpDirectories) {
super(inputGate);
// Initialize one deserializer per input channel
this.recordDeserializers = new SpillingAdaptiveSpanningRecordDeserializer[inputGate.getNumberOfInputChannels()];
for (int i = 0; i < recordDeserializers.length; i++) {
recordDeserializers[i] = new SpillingAdaptiveSpanningRecordDeserializer<T>(tmpDirectories);
}
}
示例4
/**
* Creates a new AbstractRecordReader that de-serializes records from the given input gate and
* can spill partial records to disk, if they grow large.
*
* @param inputGate The input gate to read from.
* @param tmpDirectories The temp directories. USed for spilling if the reader concurrently
* reconstructs multiple large records.
*/
@SuppressWarnings("unchecked")
protected AbstractRecordReader(InputGate inputGate, String[] tmpDirectories) {
super(inputGate);
// Initialize one deserializer per input channel
this.recordDeserializers = new SpillingAdaptiveSpanningRecordDeserializer[inputGate.getNumberOfInputChannels()];
for (int i = 0; i < recordDeserializers.length; i++) {
recordDeserializers[i] = new SpillingAdaptiveSpanningRecordDeserializer<T>(tmpDirectories);
}
}
示例5
/**
* Creates a new AbstractRecordReader that de-serializes records from the given input gate and
* can spill partial records to disk, if they grow large.
*
* @param inputGate The input gate to read from.
* @param tmpDirectories The temp directories. USed for spilling if the reader concurrently
* reconstructs multiple large records.
*/
@SuppressWarnings("unchecked")
protected AbstractRecordReader(InputGate inputGate, String[] tmpDirectories) {
super(inputGate);
// Initialize one deserializer per input channel
recordDeserializers = inputGate.getChannelInfos().stream()
.collect(Collectors.toMap(
Function.identity(),
channelInfo -> new SpillingAdaptiveSpanningRecordDeserializer<>(tmpDirectories)));
}
示例6
/**
* Tests that records are broadcast via {@link RecordWriter#broadcastEmit(IOReadableWritable)}.
*/
@Test
public void testBroadcastEmitRecord() throws Exception {
final int numberOfChannels = 4;
final int bufferSize = 32;
final int numValues = 8;
final int serializationLength = 4;
@SuppressWarnings("unchecked")
final Queue<BufferConsumer>[] queues = new Queue[numberOfChannels];
for (int i = 0; i < numberOfChannels; i++) {
queues[i] = new ArrayDeque<>();
}
final TestPooledBufferProvider bufferProvider = new TestPooledBufferProvider(Integer.MAX_VALUE, bufferSize);
final ResultPartitionWriter partitionWriter = new CollectingPartitionWriter(queues, bufferProvider);
final RecordWriter<SerializationTestType> writer = createRecordWriter(partitionWriter);
final RecordDeserializer<SerializationTestType> deserializer = new SpillingAdaptiveSpanningRecordDeserializer<>(
new String[]{ tempFolder.getRoot().getAbsolutePath() });
final ArrayDeque<SerializationTestType> serializedRecords = new ArrayDeque<>();
final Iterable<SerializationTestType> records = Util.randomRecords(numValues, SerializationTestTypeFactory.INT);
for (SerializationTestType record : records) {
serializedRecords.add(record);
writer.broadcastEmit(record);
}
final int numRequiredBuffers = numValues / (bufferSize / (4 + serializationLength));
if (isBroadcastWriter) {
assertEquals(numRequiredBuffers, bufferProvider.getNumberOfCreatedBuffers());
} else {
assertEquals(numRequiredBuffers * numberOfChannels, bufferProvider.getNumberOfCreatedBuffers());
}
for (int i = 0; i < numberOfChannels; i++) {
assertEquals(numRequiredBuffers, queues[i].size());
verifyDeserializationResults(queues[i], deserializer, serializedRecords.clone(), numRequiredBuffers, numValues);
}
}
示例7
/**
* The results of emitting records via BroadcastPartitioner or broadcasting records directly are the same,
* that is all the target channels can receive the whole outputs.
*
* @param isBroadcastEmit whether using {@link RecordWriter#broadcastEmit(IOReadableWritable)} or not
*/
private void emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(boolean isBroadcastEmit) throws Exception {
final int numberOfChannels = 4;
final int bufferSize = 32;
final int numValues = 8;
final int serializationLength = 4;
@SuppressWarnings("unchecked")
final Queue<BufferConsumer>[] queues = new Queue[numberOfChannels];
for (int i = 0; i < numberOfChannels; i++) {
queues[i] = new ArrayDeque<>();
}
final TestPooledBufferProvider bufferProvider = new TestPooledBufferProvider(Integer.MAX_VALUE, bufferSize);
final ResultPartitionWriter partitionWriter = new CollectingPartitionWriter(queues, bufferProvider);
final ChannelSelector selector = new OutputEmitter(ShipStrategyType.BROADCAST, 0);
final RecordWriter<SerializationTestType> writer = RecordWriter.createRecordWriter(partitionWriter, selector, 0, "test");
final RecordDeserializer<SerializationTestType> deserializer = new SpillingAdaptiveSpanningRecordDeserializer<>(
new String[]{ tempFolder.getRoot().getAbsolutePath() });
final ArrayDeque<SerializationTestType> serializedRecords = new ArrayDeque<>();
final Iterable<SerializationTestType> records = Util.randomRecords(numValues, SerializationTestTypeFactory.INT);
for (SerializationTestType record : records) {
serializedRecords.add(record);
if (isBroadcastEmit) {
writer.broadcastEmit(record);
} else {
writer.emit(record);
}
}
final int requiredBuffers = numValues / (bufferSize / (4 + serializationLength));
for (int i = 0; i < numberOfChannels; i++) {
assertEquals(requiredBuffers, queues[i].size());
final ArrayDeque<SerializationTestType> expectedRecords = serializedRecords.clone();
int assertRecords = 0;
for (int j = 0; j < requiredBuffers; j++) {
Buffer buffer = buildSingleBuffer(queues[i].remove());
deserializer.setNextBuffer(buffer);
assertRecords += DeserializationUtils.deserializeRecords(expectedRecords, deserializer);
}
Assert.assertEquals(numValues, assertRecords);
}
}
示例8
@SuppressWarnings("unchecked")
public StreamInputProcessor(
InputGate[] inputGates,
TypeSerializer<IN> inputSerializer,
StreamTask<?, ?> checkpointedTask,
CheckpointingMode checkpointMode,
Object lock,
IOManager ioManager,
Configuration taskManagerConfig,
StreamStatusMaintainer streamStatusMaintainer,
OneInputStreamOperator<IN, ?> streamOperator,
TaskIOMetricGroup metrics,
WatermarkGauge watermarkGauge) throws IOException {
InputGate inputGate = InputGateUtil.createInputGate(inputGates);
this.barrierHandler = InputProcessorUtil.createCheckpointBarrierHandler(
checkpointedTask, checkpointMode, ioManager, inputGate, taskManagerConfig);
this.lock = checkNotNull(lock);
StreamElementSerializer<IN> ser = new StreamElementSerializer<>(inputSerializer);
this.deserializationDelegate = new NonReusingDeserializationDelegate<>(ser);
// Initialize one deserializer per input channel
this.recordDeserializers = new SpillingAdaptiveSpanningRecordDeserializer[inputGate.getNumberOfInputChannels()];
for (int i = 0; i < recordDeserializers.length; i++) {
recordDeserializers[i] = new SpillingAdaptiveSpanningRecordDeserializer<>(
ioManager.getSpillingDirectoriesPaths());
}
this.numInputChannels = inputGate.getNumberOfInputChannels();
this.streamStatusMaintainer = checkNotNull(streamStatusMaintainer);
this.streamOperator = checkNotNull(streamOperator);
this.statusWatermarkValve = new StatusWatermarkValve(
numInputChannels,
new ForwardingValveOutputHandler(streamOperator, lock));
this.watermarkGauge = watermarkGauge;
metrics.gauge("checkpointAlignmentTime", barrierHandler::getAlignmentDurationNanos);
}
示例9
@SuppressWarnings("unchecked")
public StreamTwoInputProcessor(
Collection<InputGate> inputGates1,
Collection<InputGate> inputGates2,
TypeSerializer<IN1> inputSerializer1,
TypeSerializer<IN2> inputSerializer2,
TwoInputStreamTask<IN1, IN2, ?> checkpointedTask,
CheckpointingMode checkpointMode,
Object lock,
IOManager ioManager,
Configuration taskManagerConfig,
StreamStatusMaintainer streamStatusMaintainer,
TwoInputStreamOperator<IN1, IN2, ?> streamOperator,
TaskIOMetricGroup metrics,
WatermarkGauge input1WatermarkGauge,
WatermarkGauge input2WatermarkGauge) throws IOException {
final InputGate inputGate = InputGateUtil.createInputGate(inputGates1, inputGates2);
this.barrierHandler = InputProcessorUtil.createCheckpointBarrierHandler(
checkpointedTask, checkpointMode, ioManager, inputGate, taskManagerConfig);
this.lock = checkNotNull(lock);
StreamElementSerializer<IN1> ser1 = new StreamElementSerializer<>(inputSerializer1);
this.deserializationDelegate1 = new NonReusingDeserializationDelegate<>(ser1);
StreamElementSerializer<IN2> ser2 = new StreamElementSerializer<>(inputSerializer2);
this.deserializationDelegate2 = new NonReusingDeserializationDelegate<>(ser2);
// Initialize one deserializer per input channel
this.recordDeserializers = new SpillingAdaptiveSpanningRecordDeserializer[inputGate.getNumberOfInputChannels()];
for (int i = 0; i < recordDeserializers.length; i++) {
recordDeserializers[i] = new SpillingAdaptiveSpanningRecordDeserializer<>(
ioManager.getSpillingDirectoriesPaths());
}
// determine which unioned channels belong to input 1 and which belong to input 2
int numInputChannels1 = 0;
for (InputGate gate: inputGates1) {
numInputChannels1 += gate.getNumberOfInputChannels();
}
this.numInputChannels1 = numInputChannels1;
this.numInputChannels2 = inputGate.getNumberOfInputChannels() - numInputChannels1;
this.firstStatus = StreamStatus.ACTIVE;
this.secondStatus = StreamStatus.ACTIVE;
this.streamStatusMaintainer = checkNotNull(streamStatusMaintainer);
this.streamOperator = checkNotNull(streamOperator);
this.statusWatermarkValve1 = new StatusWatermarkValve(numInputChannels1, new ForwardingValveOutputHandler1(streamOperator, lock));
this.statusWatermarkValve2 = new StatusWatermarkValve(numInputChannels2, new ForwardingValveOutputHandler2(streamOperator, lock));
this.input1WatermarkGauge = input1WatermarkGauge;
this.input2WatermarkGauge = input2WatermarkGauge;
metrics.gauge("checkpointAlignmentTime", barrierHandler::getAlignmentDurationNanos);
}
示例10
/**
* The results of emitting records via BroadcastPartitioner or broadcasting records directly are the same,
* that is all the target channels can receive the whole outputs.
*
* @param isBroadcastEmit whether using {@link RecordWriter#broadcastEmit(IOReadableWritable)} or not
*/
private void emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(boolean isBroadcastEmit) throws Exception {
final int numberOfChannels = 4;
final int bufferSize = 32;
final int numValues = 8;
final int serializationLength = 4;
@SuppressWarnings("unchecked")
final Queue<BufferConsumer>[] queues = new Queue[numberOfChannels];
for (int i = 0; i < numberOfChannels; i++) {
queues[i] = new ArrayDeque<>();
}
final TestPooledBufferProvider bufferProvider = new TestPooledBufferProvider(Integer.MAX_VALUE, bufferSize);
final ResultPartitionWriter partitionWriter = new CollectingPartitionWriter(queues, bufferProvider);
final ChannelSelector selector = new OutputEmitter(ShipStrategyType.BROADCAST, 0);
final RecordWriter<SerializationTestType> writer = new RecordWriterBuilder()
.setChannelSelector(selector)
.setTimeout(0)
.build(partitionWriter);
final RecordDeserializer<SerializationTestType> deserializer = new SpillingAdaptiveSpanningRecordDeserializer<>(
new String[]{ tempFolder.getRoot().getAbsolutePath() });
final ArrayDeque<SerializationTestType> serializedRecords = new ArrayDeque<>();
final Iterable<SerializationTestType> records = Util.randomRecords(numValues, SerializationTestTypeFactory.INT);
for (SerializationTestType record : records) {
serializedRecords.add(record);
if (isBroadcastEmit) {
writer.broadcastEmit(record);
} else {
writer.emit(record);
}
}
final int requiredBuffers = numValues / (bufferSize / (4 + serializationLength));
for (int i = 0; i < numberOfChannels; i++) {
assertEquals(requiredBuffers, queues[i].size());
final ArrayDeque<SerializationTestType> expectedRecords = serializedRecords.clone();
int assertRecords = 0;
for (int j = 0; j < requiredBuffers; j++) {
Buffer buffer = buildSingleBuffer(queues[i].remove());
deserializer.setNextBuffer(buffer);
assertRecords += DeserializationUtils.deserializeRecords(expectedRecords, deserializer);
}
Assert.assertEquals(numValues, assertRecords);
}
}
示例11
@SuppressWarnings("unchecked")
public StreamTwoInputProcessor(
Collection<InputGate> inputGates1,
Collection<InputGate> inputGates2,
TypeSerializer<IN1> inputSerializer1,
TypeSerializer<IN2> inputSerializer2,
TwoInputStreamTask<IN1, IN2, ?> checkpointedTask,
CheckpointingMode checkpointMode,
Object lock,
IOManager ioManager,
Configuration taskManagerConfig,
StreamStatusMaintainer streamStatusMaintainer,
TwoInputStreamOperator<IN1, IN2, ?> streamOperator,
TaskIOMetricGroup metrics,
WatermarkGauge input1WatermarkGauge,
WatermarkGauge input2WatermarkGauge,
String taskName,
OperatorChain<?, ?> operatorChain) throws IOException {
final InputGate inputGate = InputGateUtil.createInputGate(inputGates1, inputGates2);
this.barrierHandler = InputProcessorUtil.createCheckpointedInputGate(
checkpointedTask,
checkpointMode,
ioManager,
inputGate,
taskManagerConfig,
taskName);
this.lock = checkNotNull(lock);
StreamElementSerializer<IN1> ser1 = new StreamElementSerializer<>(inputSerializer1);
this.deserializationDelegate1 = new NonReusingDeserializationDelegate<>(ser1);
StreamElementSerializer<IN2> ser2 = new StreamElementSerializer<>(inputSerializer2);
this.deserializationDelegate2 = new NonReusingDeserializationDelegate<>(ser2);
// Initialize one deserializer per input channel
this.recordDeserializers = new SpillingAdaptiveSpanningRecordDeserializer[inputGate.getNumberOfInputChannels()];
for (int i = 0; i < recordDeserializers.length; i++) {
recordDeserializers[i] = new SpillingAdaptiveSpanningRecordDeserializer<>(
ioManager.getSpillingDirectoriesPaths());
}
// determine which unioned channels belong to input 1 and which belong to input 2
int numInputChannels1 = 0;
for (InputGate gate: inputGates1) {
numInputChannels1 += gate.getNumberOfInputChannels();
}
this.numInputChannels1 = numInputChannels1;
this.numInputChannels2 = inputGate.getNumberOfInputChannels() - numInputChannels1;
this.firstStatus = StreamStatus.ACTIVE;
this.secondStatus = StreamStatus.ACTIVE;
this.streamStatusMaintainer = checkNotNull(streamStatusMaintainer);
this.streamOperator = checkNotNull(streamOperator);
this.statusWatermarkValve1 = new StatusWatermarkValve(numInputChannels1, new ForwardingValveOutputHandler1(streamOperator, lock));
this.statusWatermarkValve2 = new StatusWatermarkValve(numInputChannels2, new ForwardingValveOutputHandler2(streamOperator, lock));
this.input1WatermarkGauge = input1WatermarkGauge;
this.input2WatermarkGauge = input2WatermarkGauge;
metrics.gauge("checkpointAlignmentTime", barrierHandler::getAlignmentDurationNanos);
this.operatorChain = checkNotNull(operatorChain);
this.finishedChannels1 = new BitSet();
this.finishedChannels2 = new BitSet();
}
示例12
/**
* Tests the number of requested buffers and results are correct in the case of switching
* modes between {@link BroadcastRecordWriter#broadcastEmit(IOReadableWritable)} and
* {@link BroadcastRecordWriter#randomEmit(IOReadableWritable)}.
*/
@Test
public void testBroadcastMixedRandomEmitRecord() throws Exception {
final int numberOfChannels = 4;
final int numberOfRecords = 8;
final int bufferSize = 32;
@SuppressWarnings("unchecked")
final Queue<BufferConsumer>[] queues = new Queue[numberOfChannels];
for (int i = 0; i < numberOfChannels; i++) {
queues[i] = new ArrayDeque<>();
}
final TestPooledBufferProvider bufferProvider = new TestPooledBufferProvider(Integer.MAX_VALUE, bufferSize);
final ResultPartitionWriter partitionWriter = new CollectingPartitionWriter(queues, bufferProvider);
final BroadcastRecordWriter<SerializationTestType> writer = new BroadcastRecordWriter<>(partitionWriter, 0, "test");
final RecordDeserializer<SerializationTestType> deserializer = new SpillingAdaptiveSpanningRecordDeserializer<>(
new String[]{ tempFolder.getRoot().getAbsolutePath() });
// generate the configured number of int values as global record set
final Iterable<SerializationTestType> records = Util.randomRecords(numberOfRecords, SerializationTestTypeFactory.INT);
// restore the corresponding record set for every input channel
final Map<Integer, ArrayDeque<SerializationTestType>> serializedRecords = new HashMap<>();
for (int i = 0; i < numberOfChannels; i++) {
serializedRecords.put(i, new ArrayDeque<>());
}
// every record in global set would both emit into one random channel and broadcast to all the channels
int index = 0;
for (SerializationTestType record : records) {
int randomChannel = index++ % numberOfChannels;
writer.randomEmit(record, randomChannel);
serializedRecords.get(randomChannel).add(record);
writer.broadcastEmit(record);
for (int i = 0; i < numberOfChannels; i++) {
serializedRecords.get(i).add(record);
}
}
final int numberOfCreatedBuffers = bufferProvider.getNumberOfCreatedBuffers();
// verify the expected number of requested buffers, and it would always request a new buffer while random emitting
assertEquals(numberOfRecords, numberOfCreatedBuffers);
for (int i = 0; i < numberOfChannels; i++) {
// every channel would queue the number of above crated buffers
assertEquals(numberOfRecords, queues[i].size());
final int excessRandomRecords = i < numberOfRecords % numberOfChannels ? 1 : 0;
final int numberOfRandomRecords = numberOfRecords / numberOfChannels + excessRandomRecords;
final int numberOfTotalRecords = numberOfRecords + numberOfRandomRecords;
// verify the data correctness in every channel queue
verifyDeserializationResults(
queues[i],
deserializer,
serializedRecords.get(i),
numberOfCreatedBuffers,
numberOfTotalRecords);
}
}