Java源码示例:org.apache.flink.api.java.typeutils.runtime.kryo.Serializers

示例1
@Override
public void addAvroSerializersIfRequired(ExecutionConfig reg, Class<?> type) {
	if (org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(type) ||
		org.apache.avro.generic.GenericData.Record.class.isAssignableFrom(type)) {

		// Avro POJOs contain java.util.List which have GenericData.Array as their runtime type
		// because Kryo is not able to serialize them properly, we use this serializer for them
		reg.registerTypeWithKryoSerializer(GenericData.Array.class, Serializers.SpecificInstanceCollectionSerializerForArrayList.class);

		// We register this serializer for users who want to use untyped Avro records (GenericData.Record).
		// Kryo is able to serialize everything in there, except for the Schema.
		// This serializer is very slow, but using the GenericData.Records of Kryo is in general a bad idea.
		// we add the serializer as a default serializer because Avro is using a private sub-type at runtime.
		reg.addDefaultKryoSerializer(Schema.class, AvroSchemaSerializer.class);
	}
}
 
示例2
@Override
public void addAvroSerializersIfRequired(ExecutionConfig reg, Class<?> type) {
	if (org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(type) ||
		org.apache.avro.generic.GenericData.Record.class.isAssignableFrom(type)) {

		// Avro POJOs contain java.util.List which have GenericData.Array as their runtime type
		// because Kryo is not able to serialize them properly, we use this serializer for them
		reg.registerTypeWithKryoSerializer(GenericData.Array.class, Serializers.SpecificInstanceCollectionSerializerForArrayList.class);

		// We register this serializer for users who want to use untyped Avro records (GenericData.Record).
		// Kryo is able to serialize everything in there, except for the Schema.
		// This serializer is very slow, but using the GenericData.Records of Kryo is in general a bad idea.
		// we add the serializer as a default serializer because Avro is using a private sub-type at runtime.
		reg.addDefaultKryoSerializer(Schema.class, AvroSchemaSerializer.class);
	}
}
 
示例3
@Override
public void addAvroSerializersIfRequired(ExecutionConfig reg, Class<?> type) {
	if (org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(type) ||
		org.apache.avro.generic.GenericData.Record.class.isAssignableFrom(type)) {

		// Avro POJOs contain java.util.List which have GenericData.Array as their runtime type
		// because Kryo is not able to serialize them properly, we use this serializer for them
		reg.registerTypeWithKryoSerializer(GenericData.Array.class, Serializers.SpecificInstanceCollectionSerializerForArrayList.class);

		// We register this serializer for users who want to use untyped Avro records (GenericData.Record).
		// Kryo is able to serialize everything in there, except for the Schema.
		// This serializer is very slow, but using the GenericData.Records of Kryo is in general a bad idea.
		// we add the serializer as a default serializer because Avro is using a private sub-type at runtime.
		reg.addDefaultKryoSerializer(Schema.class, AvroSchemaSerializer.class);
	}
}
 
示例4
@Override
public void addAvroGenericDataArrayRegistration(LinkedHashMap<String, KryoRegistration> kryoRegistrations) {
	kryoRegistrations.put(
		GenericData.Array.class.getName(),
		new KryoRegistration(
			GenericData.Array.class,
			new ExecutionConfig.SerializableSerializer<>(new Serializers.SpecificInstanceCollectionSerializerForArrayList())));
}
 
示例5
@Override
public void addAvroGenericDataArrayRegistration(LinkedHashMap<String, KryoRegistration> kryoRegistrations) {
	kryoRegistrations.put(
		GenericData.Array.class.getName(),
		new KryoRegistration(
			GenericData.Array.class,
			new ExecutionConfig.SerializableSerializer<>(new Serializers.SpecificInstanceCollectionSerializerForArrayList())));
}
 
示例6
@Override
public void addAvroGenericDataArrayRegistration(LinkedHashMap<String, KryoRegistration> kryoRegistrations) {
	kryoRegistrations.put(
		GenericData.Array.class.getName(),
		new KryoRegistration(
			GenericData.Array.class,
			new ExecutionConfig.SerializableSerializer<>(new Serializers.SpecificInstanceCollectionSerializerForArrayList())));
}
 
示例7
/**
 * Test if the Flink serialization is able to properly process GenericData.Record types.
 * Usually users of Avro generate classes (POJOs) from Avro schemas.
 * However, if generated classes are not available, one can also use GenericData.Record.
 * It is an untyped key-value record which is using a schema to validate the correctness of the data.
 *
 * <p>It is not recommended to use GenericData.Record with Flink. Use generated POJOs instead.
 */
@Test
public void testDeserializeToGenericType() throws IOException {
	DatumReader<GenericData.Record> datumReader = new GenericDatumReader<>(userSchema);

	try (FileReader<GenericData.Record> dataFileReader = DataFileReader.openReader(testFile, datumReader)) {
		// initialize Record by reading it from disk (that's easier than creating it by hand)
		GenericData.Record rec = new GenericData.Record(userSchema);
		dataFileReader.next(rec);

		// check if record has been read correctly
		assertNotNull(rec);
		assertEquals("name not equal", TEST_NAME, rec.get("name").toString());
		assertEquals("enum not equal", TEST_ENUM_COLOR.toString(), rec.get("type_enum").toString());
		assertEquals(null, rec.get("type_long_test")); // it is null for the first record.

		// now serialize it with our framework:
		TypeInformation<GenericData.Record> te = TypeExtractor.createTypeInfo(GenericData.Record.class);

		ExecutionConfig ec = new ExecutionConfig();
		assertEquals(GenericTypeInfo.class, te.getClass());

		Serializers.recursivelyRegisterType(te.getTypeClass(), ec, new HashSet<>());

		TypeSerializer<GenericData.Record> tser = te.createSerializer(ec);
		assertEquals(1, ec.getDefaultKryoSerializerClasses().size());
		assertTrue(
				ec.getDefaultKryoSerializerClasses().containsKey(Schema.class) &&
						ec.getDefaultKryoSerializerClasses().get(Schema.class).equals(AvroKryoSerializerUtils.AvroSchemaSerializer.class));

		ByteArrayOutputStream out = new ByteArrayOutputStream();
		try (DataOutputViewStreamWrapper outView = new DataOutputViewStreamWrapper(out)) {
			tser.serialize(rec, outView);
		}

		GenericData.Record newRec;
		try (DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(
				new ByteArrayInputStream(out.toByteArray()))) {
			newRec = tser.deserialize(inView);
		}

		// check if it is still the same
		assertNotNull(newRec);
		assertEquals("enum not equal", TEST_ENUM_COLOR.toString(), newRec.get("type_enum").toString());
		assertEquals("name not equal", TEST_NAME, newRec.get("name").toString());
		assertEquals(null, newRec.get("type_long_test"));
	}
}
 
示例8
@SuppressWarnings({"rawtypes", "unchecked"})
@Override
public void addAvroGenericDataArrayRegistration(LinkedHashMap<String, KryoRegistration> kryoRegistrations) {
	kryoRegistrations.put(AVRO_GENERIC_DATA_ARRAY,
		new KryoRegistration(Serializers.DummyAvroRegisteredClass.class, (Class) Serializers.DummyAvroKryoSerializerClass.class));
}
 
示例9
/**
 * Test if the Flink serialization is able to properly process GenericData.Record types.
 * Usually users of Avro generate classes (POJOs) from Avro schemas.
 * However, if generated classes are not available, one can also use GenericData.Record.
 * It is an untyped key-value record which is using a schema to validate the correctness of the data.
 *
 * <p>It is not recommended to use GenericData.Record with Flink. Use generated POJOs instead.
 */
@Test
public void testDeserializeToGenericType() throws IOException {
	DatumReader<GenericData.Record> datumReader = new GenericDatumReader<>(userSchema);

	try (FileReader<GenericData.Record> dataFileReader = DataFileReader.openReader(testFile, datumReader)) {
		// initialize Record by reading it from disk (that's easier than creating it by hand)
		GenericData.Record rec = new GenericData.Record(userSchema);
		dataFileReader.next(rec);

		// check if record has been read correctly
		assertNotNull(rec);
		assertEquals("name not equal", TEST_NAME, rec.get("name").toString());
		assertEquals("enum not equal", TEST_ENUM_COLOR.toString(), rec.get("type_enum").toString());
		assertEquals(null, rec.get("type_long_test")); // it is null for the first record.

		// now serialize it with our framework:
		TypeInformation<GenericData.Record> te = TypeExtractor.createTypeInfo(GenericData.Record.class);

		ExecutionConfig ec = new ExecutionConfig();
		assertEquals(GenericTypeInfo.class, te.getClass());

		Serializers.recursivelyRegisterType(te.getTypeClass(), ec, new HashSet<>());

		TypeSerializer<GenericData.Record> tser = te.createSerializer(ec);
		assertEquals(1, ec.getDefaultKryoSerializerClasses().size());
		assertTrue(
				ec.getDefaultKryoSerializerClasses().containsKey(Schema.class) &&
						ec.getDefaultKryoSerializerClasses().get(Schema.class).equals(AvroKryoSerializerUtils.AvroSchemaSerializer.class));

		ByteArrayOutputStream out = new ByteArrayOutputStream();
		try (DataOutputViewStreamWrapper outView = new DataOutputViewStreamWrapper(out)) {
			tser.serialize(rec, outView);
		}

		GenericData.Record newRec;
		try (DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(
				new ByteArrayInputStream(out.toByteArray()))) {
			newRec = tser.deserialize(inView);
		}

		// check if it is still the same
		assertNotNull(newRec);
		assertEquals("enum not equal", TEST_ENUM_COLOR.toString(), newRec.get("type_enum").toString());
		assertEquals("name not equal", TEST_NAME, newRec.get("name").toString());
		assertEquals(null, newRec.get("type_long_test"));
	}
}
 
示例10
@SuppressWarnings({"rawtypes", "unchecked"})
@Override
public void addAvroGenericDataArrayRegistration(LinkedHashMap<String, KryoRegistration> kryoRegistrations) {
	kryoRegistrations.put(AVRO_GENERIC_DATA_ARRAY,
		new KryoRegistration(Serializers.DummyAvroRegisteredClass.class, (Class) Serializers.DummyAvroKryoSerializerClass.class));
}
 
示例11
/**
 * Test if the Flink serialization is able to properly process GenericData.Record types.
 * Usually users of Avro generate classes (POJOs) from Avro schemas.
 * However, if generated classes are not available, one can also use GenericData.Record.
 * It is an untyped key-value record which is using a schema to validate the correctness of the data.
 *
 * <p>It is not recommended to use GenericData.Record with Flink. Use generated POJOs instead.
 */
@Test
public void testDeserializeToGenericType() throws IOException {
	DatumReader<GenericData.Record> datumReader = new GenericDatumReader<>(userSchema);

	try (FileReader<GenericData.Record> dataFileReader = DataFileReader.openReader(testFile, datumReader)) {
		// initialize Record by reading it from disk (that's easier than creating it by hand)
		GenericData.Record rec = new GenericData.Record(userSchema);
		dataFileReader.next(rec);

		// check if record has been read correctly
		assertNotNull(rec);
		assertEquals("name not equal", TEST_NAME, rec.get("name").toString());
		assertEquals("enum not equal", TEST_ENUM_COLOR.toString(), rec.get("type_enum").toString());
		assertEquals(null, rec.get("type_long_test")); // it is null for the first record.

		// now serialize it with our framework:
		TypeInformation<GenericData.Record> te = TypeExtractor.createTypeInfo(GenericData.Record.class);

		ExecutionConfig ec = new ExecutionConfig();
		assertEquals(GenericTypeInfo.class, te.getClass());

		Serializers.recursivelyRegisterType(te.getTypeClass(), ec, new HashSet<>());

		TypeSerializer<GenericData.Record> tser = te.createSerializer(ec);
		assertEquals(1, ec.getDefaultKryoSerializerClasses().size());
		assertTrue(
				ec.getDefaultKryoSerializerClasses().containsKey(Schema.class) &&
						ec.getDefaultKryoSerializerClasses().get(Schema.class).equals(AvroKryoSerializerUtils.AvroSchemaSerializer.class));

		ByteArrayOutputStream out = new ByteArrayOutputStream();
		try (DataOutputViewStreamWrapper outView = new DataOutputViewStreamWrapper(out)) {
			tser.serialize(rec, outView);
		}

		GenericData.Record newRec;
		try (DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(
				new ByteArrayInputStream(out.toByteArray()))) {
			newRec = tser.deserialize(inView);
		}

		// check if it is still the same
		assertNotNull(newRec);
		assertEquals("enum not equal", TEST_ENUM_COLOR.toString(), newRec.get("type_enum").toString());
		assertEquals("name not equal", TEST_NAME, newRec.get("name").toString());
		assertEquals(null, newRec.get("type_long_test"));
	}
}
 
示例12
@SuppressWarnings({"rawtypes", "unchecked"})
@Override
public void addAvroGenericDataArrayRegistration(LinkedHashMap<String, KryoRegistration> kryoRegistrations) {
	kryoRegistrations.put(AVRO_GENERIC_DATA_ARRAY,
		new KryoRegistration(Serializers.DummyAvroRegisteredClass.class, (Class) Serializers.DummyAvroKryoSerializerClass.class));
}