Java源码示例:org.apache.flink.api.java.typeutils.runtime.kryo.Serializers
示例1
@Override
public void addAvroSerializersIfRequired(ExecutionConfig reg, Class<?> type) {
if (org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(type) ||
org.apache.avro.generic.GenericData.Record.class.isAssignableFrom(type)) {
// Avro POJOs contain java.util.List which have GenericData.Array as their runtime type
// because Kryo is not able to serialize them properly, we use this serializer for them
reg.registerTypeWithKryoSerializer(GenericData.Array.class, Serializers.SpecificInstanceCollectionSerializerForArrayList.class);
// We register this serializer for users who want to use untyped Avro records (GenericData.Record).
// Kryo is able to serialize everything in there, except for the Schema.
// This serializer is very slow, but using the GenericData.Records of Kryo is in general a bad idea.
// we add the serializer as a default serializer because Avro is using a private sub-type at runtime.
reg.addDefaultKryoSerializer(Schema.class, AvroSchemaSerializer.class);
}
}
示例2
@Override
public void addAvroSerializersIfRequired(ExecutionConfig reg, Class<?> type) {
if (org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(type) ||
org.apache.avro.generic.GenericData.Record.class.isAssignableFrom(type)) {
// Avro POJOs contain java.util.List which have GenericData.Array as their runtime type
// because Kryo is not able to serialize them properly, we use this serializer for them
reg.registerTypeWithKryoSerializer(GenericData.Array.class, Serializers.SpecificInstanceCollectionSerializerForArrayList.class);
// We register this serializer for users who want to use untyped Avro records (GenericData.Record).
// Kryo is able to serialize everything in there, except for the Schema.
// This serializer is very slow, but using the GenericData.Records of Kryo is in general a bad idea.
// we add the serializer as a default serializer because Avro is using a private sub-type at runtime.
reg.addDefaultKryoSerializer(Schema.class, AvroSchemaSerializer.class);
}
}
示例3
@Override
public void addAvroSerializersIfRequired(ExecutionConfig reg, Class<?> type) {
if (org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(type) ||
org.apache.avro.generic.GenericData.Record.class.isAssignableFrom(type)) {
// Avro POJOs contain java.util.List which have GenericData.Array as their runtime type
// because Kryo is not able to serialize them properly, we use this serializer for them
reg.registerTypeWithKryoSerializer(GenericData.Array.class, Serializers.SpecificInstanceCollectionSerializerForArrayList.class);
// We register this serializer for users who want to use untyped Avro records (GenericData.Record).
// Kryo is able to serialize everything in there, except for the Schema.
// This serializer is very slow, but using the GenericData.Records of Kryo is in general a bad idea.
// we add the serializer as a default serializer because Avro is using a private sub-type at runtime.
reg.addDefaultKryoSerializer(Schema.class, AvroSchemaSerializer.class);
}
}
示例4
@Override
public void addAvroGenericDataArrayRegistration(LinkedHashMap<String, KryoRegistration> kryoRegistrations) {
kryoRegistrations.put(
GenericData.Array.class.getName(),
new KryoRegistration(
GenericData.Array.class,
new ExecutionConfig.SerializableSerializer<>(new Serializers.SpecificInstanceCollectionSerializerForArrayList())));
}
示例5
@Override
public void addAvroGenericDataArrayRegistration(LinkedHashMap<String, KryoRegistration> kryoRegistrations) {
kryoRegistrations.put(
GenericData.Array.class.getName(),
new KryoRegistration(
GenericData.Array.class,
new ExecutionConfig.SerializableSerializer<>(new Serializers.SpecificInstanceCollectionSerializerForArrayList())));
}
示例6
@Override
public void addAvroGenericDataArrayRegistration(LinkedHashMap<String, KryoRegistration> kryoRegistrations) {
kryoRegistrations.put(
GenericData.Array.class.getName(),
new KryoRegistration(
GenericData.Array.class,
new ExecutionConfig.SerializableSerializer<>(new Serializers.SpecificInstanceCollectionSerializerForArrayList())));
}
示例7
/**
* Test if the Flink serialization is able to properly process GenericData.Record types.
* Usually users of Avro generate classes (POJOs) from Avro schemas.
* However, if generated classes are not available, one can also use GenericData.Record.
* It is an untyped key-value record which is using a schema to validate the correctness of the data.
*
* <p>It is not recommended to use GenericData.Record with Flink. Use generated POJOs instead.
*/
@Test
public void testDeserializeToGenericType() throws IOException {
DatumReader<GenericData.Record> datumReader = new GenericDatumReader<>(userSchema);
try (FileReader<GenericData.Record> dataFileReader = DataFileReader.openReader(testFile, datumReader)) {
// initialize Record by reading it from disk (that's easier than creating it by hand)
GenericData.Record rec = new GenericData.Record(userSchema);
dataFileReader.next(rec);
// check if record has been read correctly
assertNotNull(rec);
assertEquals("name not equal", TEST_NAME, rec.get("name").toString());
assertEquals("enum not equal", TEST_ENUM_COLOR.toString(), rec.get("type_enum").toString());
assertEquals(null, rec.get("type_long_test")); // it is null for the first record.
// now serialize it with our framework:
TypeInformation<GenericData.Record> te = TypeExtractor.createTypeInfo(GenericData.Record.class);
ExecutionConfig ec = new ExecutionConfig();
assertEquals(GenericTypeInfo.class, te.getClass());
Serializers.recursivelyRegisterType(te.getTypeClass(), ec, new HashSet<>());
TypeSerializer<GenericData.Record> tser = te.createSerializer(ec);
assertEquals(1, ec.getDefaultKryoSerializerClasses().size());
assertTrue(
ec.getDefaultKryoSerializerClasses().containsKey(Schema.class) &&
ec.getDefaultKryoSerializerClasses().get(Schema.class).equals(AvroKryoSerializerUtils.AvroSchemaSerializer.class));
ByteArrayOutputStream out = new ByteArrayOutputStream();
try (DataOutputViewStreamWrapper outView = new DataOutputViewStreamWrapper(out)) {
tser.serialize(rec, outView);
}
GenericData.Record newRec;
try (DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(
new ByteArrayInputStream(out.toByteArray()))) {
newRec = tser.deserialize(inView);
}
// check if it is still the same
assertNotNull(newRec);
assertEquals("enum not equal", TEST_ENUM_COLOR.toString(), newRec.get("type_enum").toString());
assertEquals("name not equal", TEST_NAME, newRec.get("name").toString());
assertEquals(null, newRec.get("type_long_test"));
}
}
示例8
@SuppressWarnings({"rawtypes", "unchecked"})
@Override
public void addAvroGenericDataArrayRegistration(LinkedHashMap<String, KryoRegistration> kryoRegistrations) {
kryoRegistrations.put(AVRO_GENERIC_DATA_ARRAY,
new KryoRegistration(Serializers.DummyAvroRegisteredClass.class, (Class) Serializers.DummyAvroKryoSerializerClass.class));
}
示例9
/**
* Test if the Flink serialization is able to properly process GenericData.Record types.
* Usually users of Avro generate classes (POJOs) from Avro schemas.
* However, if generated classes are not available, one can also use GenericData.Record.
* It is an untyped key-value record which is using a schema to validate the correctness of the data.
*
* <p>It is not recommended to use GenericData.Record with Flink. Use generated POJOs instead.
*/
@Test
public void testDeserializeToGenericType() throws IOException {
DatumReader<GenericData.Record> datumReader = new GenericDatumReader<>(userSchema);
try (FileReader<GenericData.Record> dataFileReader = DataFileReader.openReader(testFile, datumReader)) {
// initialize Record by reading it from disk (that's easier than creating it by hand)
GenericData.Record rec = new GenericData.Record(userSchema);
dataFileReader.next(rec);
// check if record has been read correctly
assertNotNull(rec);
assertEquals("name not equal", TEST_NAME, rec.get("name").toString());
assertEquals("enum not equal", TEST_ENUM_COLOR.toString(), rec.get("type_enum").toString());
assertEquals(null, rec.get("type_long_test")); // it is null for the first record.
// now serialize it with our framework:
TypeInformation<GenericData.Record> te = TypeExtractor.createTypeInfo(GenericData.Record.class);
ExecutionConfig ec = new ExecutionConfig();
assertEquals(GenericTypeInfo.class, te.getClass());
Serializers.recursivelyRegisterType(te.getTypeClass(), ec, new HashSet<>());
TypeSerializer<GenericData.Record> tser = te.createSerializer(ec);
assertEquals(1, ec.getDefaultKryoSerializerClasses().size());
assertTrue(
ec.getDefaultKryoSerializerClasses().containsKey(Schema.class) &&
ec.getDefaultKryoSerializerClasses().get(Schema.class).equals(AvroKryoSerializerUtils.AvroSchemaSerializer.class));
ByteArrayOutputStream out = new ByteArrayOutputStream();
try (DataOutputViewStreamWrapper outView = new DataOutputViewStreamWrapper(out)) {
tser.serialize(rec, outView);
}
GenericData.Record newRec;
try (DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(
new ByteArrayInputStream(out.toByteArray()))) {
newRec = tser.deserialize(inView);
}
// check if it is still the same
assertNotNull(newRec);
assertEquals("enum not equal", TEST_ENUM_COLOR.toString(), newRec.get("type_enum").toString());
assertEquals("name not equal", TEST_NAME, newRec.get("name").toString());
assertEquals(null, newRec.get("type_long_test"));
}
}
示例10
@SuppressWarnings({"rawtypes", "unchecked"})
@Override
public void addAvroGenericDataArrayRegistration(LinkedHashMap<String, KryoRegistration> kryoRegistrations) {
kryoRegistrations.put(AVRO_GENERIC_DATA_ARRAY,
new KryoRegistration(Serializers.DummyAvroRegisteredClass.class, (Class) Serializers.DummyAvroKryoSerializerClass.class));
}
示例11
/**
* Test if the Flink serialization is able to properly process GenericData.Record types.
* Usually users of Avro generate classes (POJOs) from Avro schemas.
* However, if generated classes are not available, one can also use GenericData.Record.
* It is an untyped key-value record which is using a schema to validate the correctness of the data.
*
* <p>It is not recommended to use GenericData.Record with Flink. Use generated POJOs instead.
*/
@Test
public void testDeserializeToGenericType() throws IOException {
DatumReader<GenericData.Record> datumReader = new GenericDatumReader<>(userSchema);
try (FileReader<GenericData.Record> dataFileReader = DataFileReader.openReader(testFile, datumReader)) {
// initialize Record by reading it from disk (that's easier than creating it by hand)
GenericData.Record rec = new GenericData.Record(userSchema);
dataFileReader.next(rec);
// check if record has been read correctly
assertNotNull(rec);
assertEquals("name not equal", TEST_NAME, rec.get("name").toString());
assertEquals("enum not equal", TEST_ENUM_COLOR.toString(), rec.get("type_enum").toString());
assertEquals(null, rec.get("type_long_test")); // it is null for the first record.
// now serialize it with our framework:
TypeInformation<GenericData.Record> te = TypeExtractor.createTypeInfo(GenericData.Record.class);
ExecutionConfig ec = new ExecutionConfig();
assertEquals(GenericTypeInfo.class, te.getClass());
Serializers.recursivelyRegisterType(te.getTypeClass(), ec, new HashSet<>());
TypeSerializer<GenericData.Record> tser = te.createSerializer(ec);
assertEquals(1, ec.getDefaultKryoSerializerClasses().size());
assertTrue(
ec.getDefaultKryoSerializerClasses().containsKey(Schema.class) &&
ec.getDefaultKryoSerializerClasses().get(Schema.class).equals(AvroKryoSerializerUtils.AvroSchemaSerializer.class));
ByteArrayOutputStream out = new ByteArrayOutputStream();
try (DataOutputViewStreamWrapper outView = new DataOutputViewStreamWrapper(out)) {
tser.serialize(rec, outView);
}
GenericData.Record newRec;
try (DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(
new ByteArrayInputStream(out.toByteArray()))) {
newRec = tser.deserialize(inView);
}
// check if it is still the same
assertNotNull(newRec);
assertEquals("enum not equal", TEST_ENUM_COLOR.toString(), newRec.get("type_enum").toString());
assertEquals("name not equal", TEST_NAME, newRec.get("name").toString());
assertEquals(null, newRec.get("type_long_test"));
}
}
示例12
@SuppressWarnings({"rawtypes", "unchecked"})
@Override
public void addAvroGenericDataArrayRegistration(LinkedHashMap<String, KryoRegistration> kryoRegistrations) {
kryoRegistrations.put(AVRO_GENERIC_DATA_ARRAY,
new KryoRegistration(Serializers.DummyAvroRegisteredClass.class, (Class) Serializers.DummyAvroKryoSerializerClass.class));
}