Java源码示例:org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer
示例1
@Test
public void testSerializationEmpty() throws IOException {
final File testFolder = tempFolder.newFolder();
final FileSystem fs = FileSystem.get(testFolder.toURI());
final RecoverableWriter writer = fs.createRecoverableWriter();
final Path testBucket = new Path(testFolder.getPath(), "test");
final BucketState<String> bucketState = new BucketState<>(
"test", testBucket, Long.MAX_VALUE, null, new HashMap<>());
final SimpleVersionedSerializer<BucketState<String>> serializer =
new BucketStateSerializer<>(
writer.getResumeRecoverableSerializer(),
writer.getCommitRecoverableSerializer(),
SimpleVersionedStringSerializer.INSTANCE
);
byte[] bytes = SimpleVersionedSerialization.writeVersionAndSerialize(serializer, bucketState);
final BucketState<String> recoveredState = SimpleVersionedSerialization.readVersionAndDeSerialize(serializer, bytes);
Assert.assertEquals(testBucket, recoveredState.getBucketPath());
Assert.assertNull(recoveredState.getInProgressResumableFile());
Assert.assertTrue(recoveredState.getCommittableFilesPerCheckpoint().isEmpty());
}
示例2
@Test
public void testSerializationEmpty() throws IOException {
final File testFolder = tempFolder.newFolder();
final FileSystem fs = FileSystem.get(testFolder.toURI());
final RecoverableWriter writer = fs.createRecoverableWriter();
final Path testBucket = new Path(testFolder.getPath(), "test");
final BucketState<String> bucketState = new BucketState<>(
"test", testBucket, Long.MAX_VALUE, null, new HashMap<>());
final SimpleVersionedSerializer<BucketState<String>> serializer =
new BucketStateSerializer<>(
writer.getResumeRecoverableSerializer(),
writer.getCommitRecoverableSerializer(),
SimpleVersionedStringSerializer.INSTANCE
);
byte[] bytes = SimpleVersionedSerialization.writeVersionAndSerialize(serializer, bucketState);
final BucketState<String> recoveredState = SimpleVersionedSerialization.readVersionAndDeSerialize(serializer, bytes);
Assert.assertEquals(testBucket, recoveredState.getBucketPath());
Assert.assertNull(recoveredState.getInProgressResumableFile());
Assert.assertTrue(recoveredState.getCommittableFilesPerCheckpoint().isEmpty());
}
示例3
@Test
public void testSerializationOnlyInProgress() throws IOException {
final File testFolder = tempFolder.newFolder();
final FileSystem fs = FileSystem.get(testFolder.toURI());
final Path testBucket = new Path(testFolder.getPath(), "test");
final RecoverableWriter writer = fs.createRecoverableWriter();
final RecoverableFsDataOutputStream stream = writer.open(testBucket);
stream.write(IN_PROGRESS_CONTENT.getBytes(Charset.forName("UTF-8")));
final RecoverableWriter.ResumeRecoverable current = stream.persist();
final BucketState<String> bucketState = new BucketState<>(
"test", testBucket, Long.MAX_VALUE, current, new HashMap<>());
final SimpleVersionedSerializer<BucketState<String>> serializer =
new BucketStateSerializer<>(
writer.getResumeRecoverableSerializer(),
writer.getCommitRecoverableSerializer(),
SimpleVersionedStringSerializer.INSTANCE
);
final byte[] bytes = SimpleVersionedSerialization.writeVersionAndSerialize(serializer, bucketState);
// to simulate that everything is over for file.
stream.close();
final BucketState<String> recoveredState = SimpleVersionedSerialization.readVersionAndDeSerialize(serializer, bytes);
Assert.assertEquals(testBucket, recoveredState.getBucketPath());
FileStatus[] statuses = fs.listStatus(testBucket.getParent());
Assert.assertEquals(1L, statuses.length);
Assert.assertTrue(
statuses[0].getPath().getPath().startsWith(
(new Path(testBucket.getParent(), ".test.inprogress")).toString())
);
}
示例4
@Test
public void testSerializationOnlyInProgress() throws IOException {
final File testFolder = tempFolder.newFolder();
final FileSystem fs = FileSystem.get(testFolder.toURI());
final Path testBucket = new Path(testFolder.getPath(), "test");
final RecoverableWriter writer = fs.createRecoverableWriter();
final RecoverableFsDataOutputStream stream = writer.open(testBucket);
stream.write(IN_PROGRESS_CONTENT.getBytes(Charset.forName("UTF-8")));
final RecoverableWriter.ResumeRecoverable current = stream.persist();
final BucketState<String> bucketState = new BucketState<>(
"test", testBucket, Long.MAX_VALUE, current, new HashMap<>());
final SimpleVersionedSerializer<BucketState<String>> serializer =
new BucketStateSerializer<>(
writer.getResumeRecoverableSerializer(),
writer.getCommitRecoverableSerializer(),
SimpleVersionedStringSerializer.INSTANCE
);
final byte[] bytes = SimpleVersionedSerialization.writeVersionAndSerialize(serializer, bucketState);
// to simulate that everything is over for file.
stream.close();
final BucketState<String> recoveredState = SimpleVersionedSerialization.readVersionAndDeSerialize(serializer, bytes);
Assert.assertEquals(testBucket, recoveredState.getBucketPath());
FileStatus[] statuses = fs.listStatus(testBucket.getParent());
Assert.assertEquals(1L, statuses.length);
Assert.assertTrue(
statuses[0].getPath().getPath().startsWith(
(new Path(testBucket.getParent(), ".test.inprogress")).getPath())
);
}
示例5
private File prepareCompressedFile(CompressWriterFactory<String> writer, List<String> lines) throws Exception {
final File outDir = TEMPORARY_FOLDER.newFolder();
final BucketAssigner<String, String> assigner = new BucketAssigner<String, String> () {
@Override
public String getBucketId(String element, BucketAssigner.Context context) {
return "bucket";
}
@Override
public SimpleVersionedSerializer<String> getSerializer() {
return SimpleVersionedStringSerializer.INSTANCE;
}
};
StreamingFileSink<String> sink = StreamingFileSink
.forBulkFormat(new Path(outDir.toURI()), writer)
.withBucketAssigner(assigner)
.build();
try (
OneInputStreamOperatorTestHarness<String, Object> testHarness = new OneInputStreamOperatorTestHarness<>(new StreamSink<>(sink), 1, 1, 0)
) {
testHarness.setup();
testHarness.open();
int time = 0;
for (String line: lines) {
testHarness.processElement(new StreamRecord<>(line, ++time));
}
testHarness.snapshot(1, ++time);
testHarness.notifyOfCompletedCheckpoint(1);
}
return outDir;
}
示例6
private static SimpleVersionedSerializer<BucketState<String>> bucketStateSerializer() throws IOException {
final RowWiseBucketWriter bucketWriter = createBucketWriter();
return new BucketStateSerializer<>(
bucketWriter.getProperties().getInProgressFileRecoverableSerializer(),
bucketWriter.getProperties().getPendingFileRecoverableSerializer(),
SimpleVersionedStringSerializer.INSTANCE);
}
示例7
@Override
public SimpleVersionedSerializer<String> getSerializer() {
return SimpleVersionedStringSerializer.INSTANCE;
}
示例8
@Override
public SimpleVersionedSerializer<String> getSerializer() {
return SimpleVersionedStringSerializer.INSTANCE;
}
示例9
@Override
public SimpleVersionedSerializer<String> getSerializer() {
return SimpleVersionedStringSerializer.INSTANCE;
}
示例10
@Override
public SimpleVersionedSerializer<String> getSerializer() {
return SimpleVersionedStringSerializer.INSTANCE;
}
示例11
@Override
public SimpleVersionedSerializer<String> getSerializer() {
return SimpleVersionedStringSerializer.INSTANCE;
}
示例12
@Override
public SimpleVersionedSerializer<String> getSerializer() {
return SimpleVersionedStringSerializer.INSTANCE;
}
示例13
@Override
public SimpleVersionedSerializer<String> getSerializer() {
return SimpleVersionedStringSerializer.INSTANCE;
}
示例14
@Override
public SimpleVersionedSerializer<String> getSerializer() {
return SimpleVersionedStringSerializer.INSTANCE;
}
示例15
@Override
public SimpleVersionedSerializer<String> getSerializer() {
return SimpleVersionedStringSerializer.INSTANCE;
}
示例16
@Override
public SimpleVersionedSerializer<String> getSerializer() {
return SimpleVersionedStringSerializer.INSTANCE;
}
示例17
@Override
public SimpleVersionedSerializer<String> getSerializer() {
return SimpleVersionedStringSerializer.INSTANCE;
}
示例18
@Override
public SimpleVersionedSerializer<String> getSerializer() {
return SimpleVersionedStringSerializer.INSTANCE;
}
示例19
@Override
public SimpleVersionedSerializer<String> getSerializer() {
return SimpleVersionedStringSerializer.INSTANCE;
}
示例20
@Override
public SimpleVersionedSerializer<String> getSerializer() {
return SimpleVersionedStringSerializer.INSTANCE;
}
示例21
@Override
public SimpleVersionedSerializer<String> getSerializer() {
return SimpleVersionedStringSerializer.INSTANCE;
}
示例22
@Override
public SimpleVersionedSerializer<String> getSerializer() {
return SimpleVersionedStringSerializer.INSTANCE;
}