Java源码示例:org.apache.flink.core.io.IOReadableWritable
示例1
/**
* Clones the given writable using the {@link IOReadableWritable serialization}.
*
* @param original Object to clone
* @param <T> Type of the object to clone
* @return Cloned object
* @throws IOException Thrown is the serialization fails.
*/
public static <T extends IOReadableWritable> T createCopyWritable(T original) throws IOException {
if (original == null) {
return null;
}
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(baos)) {
original.write(out);
}
final ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
try (DataInputViewStreamWrapper in = new DataInputViewStreamWrapper(bais)) {
@SuppressWarnings("unchecked")
T copy = (T) instantiate(original.getClass());
copy.read(in);
return copy;
}
}
示例2
@Override
protected void initOutputs() throws Exception {
// initialize the regular outputs first (the ones into the step function).
super.initOutputs();
// at this time, the outputs to the step function are created
// add the outputs for the final solution
List<RecordWriter<?>> finalOutputWriters = new ArrayList<RecordWriter<?>>();
final TaskConfig finalOutConfig = this.config.getIterationHeadFinalOutputConfig();
final ClassLoader userCodeClassLoader = getUserCodeClassLoader();
this.finalOutputCollector = BatchTask.getOutputCollector(this, finalOutConfig,
userCodeClassLoader, finalOutputWriters, config.getNumOutputs(), finalOutConfig.getNumOutputs());
// sanity check the setup
final int writersIntoStepFunction = this.eventualOutputs.size();
final int writersIntoFinalResult = finalOutputWriters.size();
final int syncGateIndex = this.config.getIterationHeadIndexOfSyncOutput();
if (writersIntoStepFunction + writersIntoFinalResult != syncGateIndex) {
throw new Exception("Error: Inconsistent head task setup - wrong mapping of output gates.");
}
// now, we can instantiate the sync gate
this.toSync = new RecordWriter<IOReadableWritable>(getEnvironment().getWriter(syncGateIndex));
this.toSyncPartitionId = getEnvironment().getWriter(syncGateIndex).getPartitionId();
}
示例3
/**
* Clones the given writable using the {@link IOReadableWritable serialization}.
*
* @param original Object to clone
* @param <T> Type of the object to clone
* @return Cloned object
* @throws IOException Thrown is the serialization fails.
*/
public static <T extends IOReadableWritable> T createCopyWritable(T original) throws IOException {
if (original == null) {
return null;
}
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(baos)) {
original.write(out);
}
final ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
try (DataInputViewStreamWrapper in = new DataInputViewStreamWrapper(bais)) {
@SuppressWarnings("unchecked")
T copy = (T) instantiate(original.getClass());
copy.read(in);
return copy;
}
}
示例4
/**
* Clones the given writable using the {@link IOReadableWritable serialization}.
*
* @param original Object to clone
* @param <T> Type of the object to clone
* @return Cloned object
* @throws IOException Thrown is the serialization fails.
*/
public static <T extends IOReadableWritable> T createCopyWritable(T original) throws IOException {
if (original == null) {
return null;
}
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(baos)) {
original.write(out);
}
final ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
try (DataInputViewStreamWrapper in = new DataInputViewStreamWrapper(bais)) {
@SuppressWarnings("unchecked")
T copy = (T) instantiate(original.getClass());
copy.read(in);
return copy;
}
}
示例5
/**
* Creates the record readers for the number of inputs as defined by {@link #getNumTaskInputs()}.
*
* This method requires that the task configuration, the driver, and the user-code class loader are set.
*/
protected void initInputReaders() throws Exception {
final int numInputs = getNumTaskInputs();
final MutableReader<?>[] inputReaders = new MutableReader<?>[numInputs];
int currentReaderOffset = 0;
for (int i = 0; i < numInputs; i++) {
// ---------------- create the input readers ---------------------
// in case where a logical input unions multiple physical inputs, create a union reader
final int groupSize = this.config.getGroupSize(i);
if (groupSize == 1) {
// non-union case
inputReaders[i] = new MutableRecordReader<IOReadableWritable>(
getEnvironment().getInputGate(currentReaderOffset),
getEnvironment().getTaskManagerInfo().getTmpDirectories());
} else if (groupSize > 1){
// union case
InputGate[] readers = new InputGate[groupSize];
for (int j = 0; j < groupSize; ++j) {
readers[j] = getEnvironment().getInputGate(currentReaderOffset + j);
}
inputReaders[i] = new MutableRecordReader<IOReadableWritable>(
new UnionInputGate(readers),
getEnvironment().getTaskManagerInfo().getTmpDirectories());
} else {
throw new Exception("Illegal input group size in task configuration: " + groupSize);
}
currentReaderOffset += groupSize;
}
this.inputReaders = inputReaders;
// final sanity check
if (currentReaderOffset != this.config.getNumInputs()) {
throw new Exception("Illegal configuration: Number of input gates and group sizes are not consistent.");
}
}
示例6
/**
* Creates the record readers for the extra broadcast inputs as configured by {@link TaskConfig#getNumBroadcastInputs()}.
*
* This method requires that the task configuration, the driver, and the user-code class loader are set.
*/
protected void initBroadcastInputReaders() throws Exception {
final int numBroadcastInputs = this.config.getNumBroadcastInputs();
final MutableReader<?>[] broadcastInputReaders = new MutableReader<?>[numBroadcastInputs];
int currentReaderOffset = config.getNumInputs();
for (int i = 0; i < this.config.getNumBroadcastInputs(); i++) {
// ---------------- create the input readers ---------------------
// in case where a logical input unions multiple physical inputs, create a union reader
final int groupSize = this.config.getBroadcastGroupSize(i);
if (groupSize == 1) {
// non-union case
broadcastInputReaders[i] = new MutableRecordReader<IOReadableWritable>(
getEnvironment().getInputGate(currentReaderOffset),
getEnvironment().getTaskManagerInfo().getTmpDirectories());
} else if (groupSize > 1){
// union case
InputGate[] readers = new InputGate[groupSize];
for (int j = 0; j < groupSize; ++j) {
readers[j] = getEnvironment().getInputGate(currentReaderOffset + j);
}
broadcastInputReaders[i] = new MutableRecordReader<IOReadableWritable>(
new UnionInputGate(readers),
getEnvironment().getTaskManagerInfo().getTmpDirectories());
} else {
throw new Exception("Illegal input group size in task configuration: " + groupSize);
}
currentReaderOffset += groupSize;
}
this.broadcastInputReaders = broadcastInputReaders;
}
示例7
/**
* Initializes the input readers of the DataSinkTask.
*
* @throws RuntimeException
* Thrown in case of invalid task input configuration.
*/
@SuppressWarnings("unchecked")
private void initInputReaders() throws Exception {
int numGates = 0;
// ---------------- create the input readers ---------------------
// in case where a logical input unions multiple physical inputs, create a union reader
final int groupSize = this.config.getGroupSize(0);
numGates += groupSize;
if (groupSize == 1) {
// non-union case
inputReader = new MutableRecordReader<DeserializationDelegate<IT>>(
getEnvironment().getInputGate(0),
getEnvironment().getTaskManagerInfo().getTmpDirectories());
} else if (groupSize > 1){
// union case
inputReader = new MutableRecordReader<IOReadableWritable>(
new UnionInputGate(getEnvironment().getAllInputGates()),
getEnvironment().getTaskManagerInfo().getTmpDirectories());
} else {
throw new Exception("Illegal input group size in task configuration: " + groupSize);
}
this.inputTypeSerializerFactory = this.config.getInputSerializer(0, getUserCodeClassLoader());
@SuppressWarnings({ "rawtypes" })
final MutableObjectIterator<?> iter = new ReaderIterator(inputReader, this.inputTypeSerializerFactory.getSerializer());
this.reader = (MutableObjectIterator<IT>)iter;
// final sanity check
if (numGates != this.config.getNumInputs()) {
throw new Exception("Illegal configuration: Number of input gates and group sizes are not consistent.");
}
}
示例8
/**
* Creates the record readers for the number of inputs as defined by {@link #getNumTaskInputs()}.
*
* This method requires that the task configuration, the driver, and the user-code class loader are set.
*/
protected void initInputReaders() throws Exception {
final int numInputs = getNumTaskInputs();
final MutableReader<?>[] inputReaders = new MutableReader<?>[numInputs];
int currentReaderOffset = 0;
for (int i = 0; i < numInputs; i++) {
// ---------------- create the input readers ---------------------
// in case where a logical input unions multiple physical inputs, create a union reader
final int groupSize = this.config.getGroupSize(i);
if (groupSize == 1) {
// non-union case
inputReaders[i] = new MutableRecordReader<IOReadableWritable>(
getEnvironment().getInputGate(currentReaderOffset),
getEnvironment().getTaskManagerInfo().getTmpDirectories());
} else if (groupSize > 1){
// union case
InputGate[] readers = new InputGate[groupSize];
for (int j = 0; j < groupSize; ++j) {
readers[j] = getEnvironment().getInputGate(currentReaderOffset + j);
}
inputReaders[i] = new MutableRecordReader<IOReadableWritable>(
new UnionInputGate(readers),
getEnvironment().getTaskManagerInfo().getTmpDirectories());
} else {
throw new Exception("Illegal input group size in task configuration: " + groupSize);
}
currentReaderOffset += groupSize;
}
this.inputReaders = inputReaders;
// final sanity check
if (currentReaderOffset != this.config.getNumInputs()) {
throw new Exception("Illegal configuration: Number of input gates and group sizes are not consistent.");
}
}
示例9
/**
* Creates the record readers for the extra broadcast inputs as configured by {@link TaskConfig#getNumBroadcastInputs()}.
*
* This method requires that the task configuration, the driver, and the user-code class loader are set.
*/
protected void initBroadcastInputReaders() throws Exception {
final int numBroadcastInputs = this.config.getNumBroadcastInputs();
final MutableReader<?>[] broadcastInputReaders = new MutableReader<?>[numBroadcastInputs];
int currentReaderOffset = config.getNumInputs();
for (int i = 0; i < this.config.getNumBroadcastInputs(); i++) {
// ---------------- create the input readers ---------------------
// in case where a logical input unions multiple physical inputs, create a union reader
final int groupSize = this.config.getBroadcastGroupSize(i);
if (groupSize == 1) {
// non-union case
broadcastInputReaders[i] = new MutableRecordReader<IOReadableWritable>(
getEnvironment().getInputGate(currentReaderOffset),
getEnvironment().getTaskManagerInfo().getTmpDirectories());
} else if (groupSize > 1){
// union case
InputGate[] readers = new InputGate[groupSize];
for (int j = 0; j < groupSize; ++j) {
readers[j] = getEnvironment().getInputGate(currentReaderOffset + j);
}
broadcastInputReaders[i] = new MutableRecordReader<IOReadableWritable>(
new UnionInputGate(readers),
getEnvironment().getTaskManagerInfo().getTmpDirectories());
} else {
throw new Exception("Illegal input group size in task configuration: " + groupSize);
}
currentReaderOffset += groupSize;
}
this.broadcastInputReaders = broadcastInputReaders;
}
示例10
/**
* Initializes the input readers of the DataSinkTask.
*
* @throws RuntimeException
* Thrown in case of invalid task input configuration.
*/
@SuppressWarnings("unchecked")
private void initInputReaders() throws Exception {
int numGates = 0;
// ---------------- create the input readers ---------------------
// in case where a logical input unions multiple physical inputs, create a union reader
final int groupSize = this.config.getGroupSize(0);
numGates += groupSize;
if (groupSize == 1) {
// non-union case
inputReader = new MutableRecordReader<DeserializationDelegate<IT>>(
getEnvironment().getInputGate(0),
getEnvironment().getTaskManagerInfo().getTmpDirectories());
} else if (groupSize > 1){
// union case
inputReader = new MutableRecordReader<IOReadableWritable>(
new UnionInputGate(getEnvironment().getAllInputGates()),
getEnvironment().getTaskManagerInfo().getTmpDirectories());
} else {
throw new Exception("Illegal input group size in task configuration: " + groupSize);
}
this.inputTypeSerializerFactory = this.config.getInputSerializer(0, getUserCodeClassLoader());
@SuppressWarnings({ "rawtypes" })
final MutableObjectIterator<?> iter = new ReaderIterator(inputReader, this.inputTypeSerializerFactory.getSerializer());
this.reader = (MutableObjectIterator<IT>)iter;
// final sanity check
if (numGates != this.config.getNumInputs()) {
throw new Exception("Illegal configuration: Number of input gates and group sizes are not consistent.");
}
}
示例11
/**
* Creates the record readers for the number of inputs as defined by {@link #getNumTaskInputs()}.
*
* This method requires that the task configuration, the driver, and the user-code class loader are set.
*/
protected void initInputReaders() throws Exception {
final int numInputs = getNumTaskInputs();
final MutableReader<?>[] inputReaders = new MutableReader<?>[numInputs];
int currentReaderOffset = 0;
for (int i = 0; i < numInputs; i++) {
// ---------------- create the input readers ---------------------
// in case where a logical input unions multiple physical inputs, create a union reader
final int groupSize = this.config.getGroupSize(i);
if (groupSize == 1) {
// non-union case
inputReaders[i] = new MutableRecordReader<IOReadableWritable>(
getEnvironment().getInputGate(currentReaderOffset),
getEnvironment().getTaskManagerInfo().getTmpDirectories());
} else if (groupSize > 1){
// union case
IndexedInputGate[] readers = new IndexedInputGate[groupSize];
for (int j = 0; j < groupSize; ++j) {
readers[j] = getEnvironment().getInputGate(currentReaderOffset + j);
}
inputReaders[i] = new MutableRecordReader<IOReadableWritable>(
new UnionInputGate(readers),
getEnvironment().getTaskManagerInfo().getTmpDirectories());
} else {
throw new Exception("Illegal input group size in task configuration: " + groupSize);
}
currentReaderOffset += groupSize;
}
this.inputReaders = inputReaders;
// final sanity check
if (currentReaderOffset != this.config.getNumInputs()) {
throw new Exception("Illegal configuration: Number of input gates and group sizes are not consistent.");
}
}
示例12
/**
* Creates the record readers for the extra broadcast inputs as configured by {@link TaskConfig#getNumBroadcastInputs()}.
*
* This method requires that the task configuration, the driver, and the user-code class loader are set.
*/
protected void initBroadcastInputReaders() throws Exception {
final int numBroadcastInputs = this.config.getNumBroadcastInputs();
final MutableReader<?>[] broadcastInputReaders = new MutableReader<?>[numBroadcastInputs];
int currentReaderOffset = config.getNumInputs();
for (int i = 0; i < this.config.getNumBroadcastInputs(); i++) {
// ---------------- create the input readers ---------------------
// in case where a logical input unions multiple physical inputs, create a union reader
final int groupSize = this.config.getBroadcastGroupSize(i);
if (groupSize == 1) {
// non-union case
broadcastInputReaders[i] = new MutableRecordReader<IOReadableWritable>(
getEnvironment().getInputGate(currentReaderOffset),
getEnvironment().getTaskManagerInfo().getTmpDirectories());
} else if (groupSize > 1){
// union case
IndexedInputGate[] readers = new IndexedInputGate[groupSize];
for (int j = 0; j < groupSize; ++j) {
readers[j] = getEnvironment().getInputGate(currentReaderOffset + j);
}
broadcastInputReaders[i] = new MutableRecordReader<IOReadableWritable>(
new UnionInputGate(readers),
getEnvironment().getTaskManagerInfo().getTmpDirectories());
} else {
throw new Exception("Illegal input group size in task configuration: " + groupSize);
}
currentReaderOffset += groupSize;
}
this.broadcastInputReaders = broadcastInputReaders;
}
示例13
/**
* Initializes the input readers of the DataSinkTask.
*
* @throws RuntimeException
* Thrown in case of invalid task input configuration.
*/
@SuppressWarnings("unchecked")
private void initInputReaders() throws Exception {
int numGates = 0;
// ---------------- create the input readers ---------------------
// in case where a logical input unions multiple physical inputs, create a union reader
final int groupSize = this.config.getGroupSize(0);
numGates += groupSize;
if (groupSize == 1) {
// non-union case
inputReader = new MutableRecordReader<DeserializationDelegate<IT>>(
getEnvironment().getInputGate(0),
getEnvironment().getTaskManagerInfo().getTmpDirectories());
} else if (groupSize > 1){
// union case
inputReader = new MutableRecordReader<IOReadableWritable>(
new UnionInputGate(getEnvironment().getAllInputGates()),
getEnvironment().getTaskManagerInfo().getTmpDirectories());
} else {
throw new Exception("Illegal input group size in task configuration: " + groupSize);
}
this.inputTypeSerializerFactory = this.config.getInputSerializer(0, getUserCodeClassLoader());
@SuppressWarnings({ "rawtypes" })
final MutableObjectIterator<?> iter = new ReaderIterator(inputReader, this.inputTypeSerializerFactory.getSerializer());
this.reader = (MutableObjectIterator<IT>)iter;
// final sanity check
if (numGates != this.config.getNumInputs()) {
throw new Exception("Illegal configuration: Number of input gates and group sizes are not consistent.");
}
}
示例14
DeserializationResult readInto(IOReadableWritable target) throws IOException {
try {
target.read(this);
} catch (IndexOutOfBoundsException e) {
throw new IOException(BROKEN_SERIALIZATION_ERROR_MESSAGE, e);
}
int remaining = remaining();
if (remaining < 0) {
throw new IOException(BROKEN_SERIALIZATION_ERROR_MESSAGE, new IndexOutOfBoundsException("Remaining = " + remaining));
}
return remaining == 0 ? LAST_RECORD_FROM_BUFFER : INTERMEDIATE_RECORD_FROM_BUFFER;
}
示例15
@Override
public int selectChannel(IOReadableWritable record) {
if (channelIndex >= numberOfChannels) {
return 0;
}
return channelIndex++;
}