Java源码示例:org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility
示例1
private static <T> TypeSerializerSchemaCompatibility<T>
avroCompatibilityToFlinkCompatibility(SchemaPairCompatibility compatibility) {
switch (compatibility.getType()) {
case COMPATIBLE: {
// The new serializer would be able to read data persisted with *this* serializer, therefore no migration
// is required.
return TypeSerializerSchemaCompatibility.compatibleAfterMigration();
}
case INCOMPATIBLE: {
return TypeSerializerSchemaCompatibility.incompatible();
}
case RECURSION_IN_PROGRESS:
default:
return TypeSerializerSchemaCompatibility.incompatible();
}
}
示例2
/**
* This legacy snapshot delegates compatibility checks to the {@link PojoSerializerSnapshot}.
*/
@Override
public TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(TypeSerializer<T> newSerializer) {
LinkedHashMap<String, TypeSerializerSnapshot<?>> legacyFieldSerializerSnapshots =
preprocessLegacySerializerSnapshotTuples(fieldToSerializerConfigSnapshot);
int numFields = legacyFieldSerializerSnapshots.size();
ArrayList<Field> fields = new ArrayList<>(numFields);
ArrayList<TypeSerializerSnapshot<?>> fieldSerializerSnapshots = new ArrayList<>(numFields);
legacyFieldSerializerSnapshots.forEach((fieldName, fieldSerializerSnapshot) -> {
fields.add(PojoFieldUtils.getField(fieldName, getTypeClass()));
fieldSerializerSnapshots.add(fieldSerializerSnapshot);
});
PojoSerializerSnapshot<T> newSnapshot = new PojoSerializerSnapshot<>(
getTypeClass(),
fields.toArray(new Field[numFields]),
fieldSerializerSnapshots.toArray(new TypeSerializerSnapshot[numFields]),
preprocessLegacySerializerSnapshotTuples(registeredSubclassesToSerializerConfigSnapshots),
preprocessLegacySerializerSnapshotTuples(nonRegisteredSubclassesToSerializerConfigSnapshots));
return newSnapshot.resolveSchemaCompatibility(newSerializer);
}
示例3
@Override
public TypeSerializerSchemaCompatibility<Either<L, R>> resolveSchemaCompatibility(
TypeSerializer<Either<L, R>> newSerializer) {
checkState(nestedSnapshot != null);
if (newSerializer instanceof EitherSerializer) {
// delegate compatibility check to the new snapshot class
return CompositeTypeSerializerUtil.delegateCompatibilityCheckToNewSnapshot(
newSerializer,
new JavaEitherSerializerSnapshot<>(),
nestedSnapshot.getNestedSerializerSnapshots());
}
else {
return TypeSerializerSchemaCompatibility.incompatible();
}
}
示例4
/**
* This legacy snapshot delegates compatibility checks to the {@link PojoSerializerSnapshot}.
*/
@Override
public TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(TypeSerializer<T> newSerializer) {
LinkedHashMap<String, TypeSerializerSnapshot<?>> legacyFieldSerializerSnapshots =
preprocessLegacySerializerSnapshotTuples(fieldToSerializerConfigSnapshot);
int numFields = legacyFieldSerializerSnapshots.size();
ArrayList<Field> fields = new ArrayList<>(numFields);
ArrayList<TypeSerializerSnapshot<?>> fieldSerializerSnapshots = new ArrayList<>(numFields);
legacyFieldSerializerSnapshots.forEach((fieldName, fieldSerializerSnapshot) -> {
fields.add(PojoFieldUtils.getField(fieldName, getTypeClass()));
fieldSerializerSnapshots.add(fieldSerializerSnapshot);
});
PojoSerializerSnapshot<T> newSnapshot = new PojoSerializerSnapshot<>(
getTypeClass(),
fields.toArray(new Field[numFields]),
fieldSerializerSnapshots.toArray(new TypeSerializerSnapshot[numFields]),
preprocessLegacySerializerSnapshotTuples(registeredSubclassesToSerializerConfigSnapshots),
preprocessLegacySerializerSnapshotTuples(nonRegisteredSubclassesToSerializerConfigSnapshots));
return newSnapshot.resolveSchemaCompatibility(newSerializer);
}
示例5
@Override
public TypeSerializerSchemaCompatibility<Map<K, V>> resolveSchemaCompatibility(TypeSerializer<Map<K, V>> newSerializer) {
if (newSerializer instanceof MapSerializer) {
List<Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>>> nestedSerializersAndConfigs = getNestedSerializersAndConfigs();
// redirect the compatibility check to the new MapSerializerSnapshot
return CompositeTypeSerializerUtil.delegateCompatibilityCheckToNewSnapshot(
newSerializer,
new MapSerializerSnapshot<>(),
nestedSerializersAndConfigs.get(0).f1,
nestedSerializersAndConfigs.get(1).f1);
}
else {
return super.resolveSchemaCompatibility(newSerializer);
}
}
示例6
private static <T> TypeSerializerSchemaCompatibility<T>
avroCompatibilityToFlinkCompatibility(SchemaPairCompatibility compatibility) {
switch (compatibility.getType()) {
case COMPATIBLE: {
// The new serializer would be able to read data persisted with *this* serializer, therefore no migration
// is required.
return TypeSerializerSchemaCompatibility.compatibleAfterMigration();
}
case INCOMPATIBLE: {
return TypeSerializerSchemaCompatibility.incompatible();
}
case RECURSION_IN_PROGRESS:
default:
return TypeSerializerSchemaCompatibility.incompatible();
}
}
示例7
@Nonnull
@Override
@SuppressWarnings("ConstantConditions")
public TypeSerializerSchemaCompatibility<T> registerNewSerializerForRestoredState(TypeSerializer<T> newSerializer) {
checkNotNull(newSerializer);
if (registeredSerializer != null) {
throw new UnsupportedOperationException("A serializer has already been registered for the state; re-registration is not allowed.");
}
TypeSerializerSchemaCompatibility<T> result = previousSerializerSnapshot.resolveSchemaCompatibility(newSerializer);
if (result.isIncompatible()) {
invalidateCurrentSchemaSerializerAccess();
}
if (result.isCompatibleWithReconfiguredSerializer()) {
this.registeredSerializer = result.getReconfiguredSerializer();
} else {
this.registeredSerializer = newSerializer;
}
return result;
}
示例8
@Test
public void testEagerlyRegisterIncompatibleSerializer() {
StateSerializerProvider<TestType> testProvider = StateSerializerProvider.fromNewRegisteredSerializer(new TestType.IncompatibleTestTypeSerializer());
// set previous serializer snapshot for state, which should let the new serializer be considered incompatible
TypeSerializerSchemaCompatibility<TestType> schemaCompatibility =
testProvider.setPreviousSerializerSnapshotForRestoredState(new TestType.V1TestTypeSerializer().snapshotConfiguration());
assertTrue(schemaCompatibility.isIncompatible());
try {
// a serializer for the current schema will no longer be accessible
testProvider.currentSchemaSerializer();
fail();
} catch (Exception excepted) {
// success
}
}
示例9
private <N, S extends State, SV> RegisteredKeyValueStateBackendMetaInfo<N, SV> updateRestoredStateMetaInfo(
Tuple2<ColumnFamilyHandle, RegisteredKeyValueStateBackendMetaInfo<N, SV>> oldStateInfo,
StateDescriptor<S, SV> stateDesc,
TypeSerializer<N> namespaceSerializer,
TypeSerializer<SV> stateSerializer) throws Exception {
@SuppressWarnings("unchecked")
RegisteredKeyValueStateBackendMetaInfo<N, SV> restoredKvStateMetaInfo = oldStateInfo.f1;
TypeSerializerSchemaCompatibility<N> s = restoredKvStateMetaInfo.updateNamespaceSerializer(namespaceSerializer);
if (s.isCompatibleAfterMigration() || s.isIncompatible()) {
throw new StateMigrationException("The new namespace serializer must be compatible.");
}
restoredKvStateMetaInfo.checkStateMetaInfo(stateDesc);
TypeSerializerSchemaCompatibility<SV> newStateSerializerCompatibility =
restoredKvStateMetaInfo.updateStateSerializer(stateSerializer);
if (newStateSerializerCompatibility.isCompatibleAfterMigration()) {
migrateStateValues(stateDesc, oldStateInfo);
} else if (newStateSerializerCompatibility.isIncompatible()) {
throw new StateMigrationException("The new state serializer cannot be incompatible.");
}
return restoredKvStateMetaInfo;
}
示例10
@Test
public void testResolveSchemaCompatibilityWithCompatibleAfterMigrationFieldSerializers() {
final PojoSerializerSnapshot<TestPojo> testSnapshot = buildTestSnapshot(Arrays.asList(
ID_FIELD,
NAME_FIELD,
mockFieldSerializerSnapshot(
HEIGHT_FIELD,
SchemaCompatibilityTestingSnapshot.thatIsCompatibleWithNextSerializerAfterMigration())));
final PojoSerializer<TestPojo> newPojoSerializer = buildTestNewPojoSerializer(Arrays.asList(
ID_FIELD,
NAME_FIELD,
mockFieldSerializer(HEIGHT_FIELD, new SchemaCompatibilityTestingSerializer())
));
final TypeSerializerSchemaCompatibility<TestPojo> resultCompatibility =
testSnapshot.resolveSchemaCompatibility(newPojoSerializer);
assertTrue(resultCompatibility.isCompatibleAfterMigration());
}
示例11
@Override
public TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(TypeSerializer<T> newSerializer) {
TraversableSerializer<T, E> previousSerializer = (TraversableSerializer<T, E>) restoreSerializer();
TraversableSerializerSnapshot<T, E> newCompositeSnapshot =
new TraversableSerializerSnapshot<>(previousSerializer.cbfCode());
return CompositeTypeSerializerUtil.delegateCompatibilityCheckToNewSnapshot(
newSerializer,
newCompositeSnapshot,
getSingleNestedSerializerAndConfig().f1
);
}
示例12
@Test
public void testEagerlyRegisterCompatibleAfterMigrationSerializer() {
StateSerializerProvider<TestType> testProvider = StateSerializerProvider.fromNewRegisteredSerializer(new TestType.V2TestTypeSerializer());
// set previous serializer snapshot for state, which should let the new serializer be considered compatible after migration
TypeSerializerSchemaCompatibility<TestType> schemaCompatibility =
testProvider.setPreviousSerializerSnapshotForRestoredState(new TestType.V1TestTypeSerializer().snapshotConfiguration());
assertTrue(schemaCompatibility.isCompatibleAfterMigration());
assertTrue(testProvider.currentSchemaSerializer() instanceof TestType.V2TestTypeSerializer);
assertTrue(testProvider.previousSchemaSerializer() instanceof TestType.V1TestTypeSerializer);
}
示例13
@Override
public TypeSerializerSchemaCompatibility<Row> resolveSchemaCompatibility(TypeSerializer<Row> newSerializer) {
TypeSerializerSnapshot<?>[] nestedSnapshots = getNestedSerializersAndConfigs()
.stream()
.map(t -> t.f1)
.toArray(TypeSerializerSnapshot[]::new);
return CompositeTypeSerializerUtil.delegateCompatibilityCheckToNewSnapshot(
newSerializer,
new RowSerializerSnapshot(),
nestedSnapshots);
}
示例14
@Test
public void testLazilyRegisterNewSerializerRequiringReconfiguration() {
TestType.V1TestTypeSerializer serializer = new TestType.V1TestTypeSerializer();
StateSerializerProvider<TestType> testProvider = StateSerializerProvider.fromPreviousSerializerSnapshot(serializer.snapshotConfiguration());
// register serializer that requires reconfiguration, and verify that
// the resulting current schema serializer is the reconfigured one
TypeSerializerSchemaCompatibility<TestType> schemaCompatibility =
testProvider.registerNewSerializerForRestoredState(new TestType.ReconfigurationRequiringTestTypeSerializer());
assertTrue(schemaCompatibility.isCompatibleWithReconfiguredSerializer());
assertTrue(testProvider.currentSchemaSerializer().getClass() == TestType.V1TestTypeSerializer.class);
}
示例15
@Test
public void testResolveSchemaCompatibilityWithNewFields() {
final PojoSerializerSnapshot<TestPojo> testSnapshot = buildTestSnapshot(Collections.singletonList(HEIGHT_FIELD));
final PojoSerializer<TestPojo> newPojoSerializer = buildTestNewPojoSerializer(Arrays.asList(
ID_FIELD,
NAME_FIELD,
HEIGHT_FIELD
));
final TypeSerializerSchemaCompatibility<TestPojo> resultCompatibility =
testSnapshot.resolveSchemaCompatibility(newPojoSerializer);
assertTrue(resultCompatibility.isCompatibleAfterMigration());
}
示例16
@Test
public void testEagerlyRegisterNewCompatibleAsIsSerializer() {
StateSerializerProvider<TestType> testProvider = StateSerializerProvider.fromNewRegisteredSerializer(new TestType.V1TestTypeSerializer());
// set previous serializer snapshot for state, which should let the new serializer be considered compatible as is
TypeSerializerSchemaCompatibility<TestType> schemaCompatibility =
testProvider.setPreviousSerializerSnapshotForRestoredState(new TestType.V1TestTypeSerializer().snapshotConfiguration());
assertTrue(schemaCompatibility.isCompatibleAsIs());
assertTrue(testProvider.currentSchemaSerializer() instanceof TestType.V1TestTypeSerializer);
assertTrue(testProvider.previousSchemaSerializer() instanceof TestType.V1TestTypeSerializer);
}
示例17
@Override
public TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(TypeSerializer<T> newSerializer) {
NullableSerializer<T> previousSerializer = (NullableSerializer<T>) restoreSerializer();
NullableSerializerSnapshot<T> newCompositeSnapshot = new NullableSerializerSnapshot<>(previousSerializer.nullPaddingLength());
return CompositeTypeSerializerUtil.delegateCompatibilityCheckToNewSnapshot(
newSerializer,
newCompositeSnapshot,
getSingleNestedSerializerAndConfig().f1
);
}
示例18
@Override
public TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(TypeSerializer<T> newSerializer) {
if (!(newSerializer instanceof KryoSerializer)) {
return TypeSerializerSchemaCompatibility.incompatible();
}
KryoSerializer<T> kryoSerializer = (KryoSerializer<T>) newSerializer;
if (kryoSerializer.getType() != snapshotData.getTypeClass()) {
return TypeSerializerSchemaCompatibility.incompatible();
}
return resolveSchemaCompatibility(kryoSerializer);
}
示例19
@Test
public void testReconfiguration() {
// mock the previous ordering of enum constants to be BAR, PAULA, NATHANIEL
PublicEnum[] mockPreviousOrder = {PublicEnum.BAR, PublicEnum.PAULA, PublicEnum.NATHANIEL};
// now, the actual order of FOO, BAR, PETER, NATHANIEL, EMMA, PAULA will be the "new wrong order"
EnumSerializer<PublicEnum> serializer = new EnumSerializer<>(PublicEnum.class);
// verify that the serializer is first using the "wrong order" (i.e., the initial new configuration)
assertEquals(PublicEnum.FOO.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.FOO).intValue());
assertEquals(PublicEnum.BAR.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.BAR).intValue());
assertEquals(PublicEnum.PETER.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.PETER).intValue());
assertEquals(PublicEnum.NATHANIEL.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.NATHANIEL).intValue());
assertEquals(PublicEnum.EMMA.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.EMMA).intValue());
assertEquals(PublicEnum.PAULA.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.PAULA).intValue());
// reconfigure and verify compatibility
EnumSerializer.EnumSerializerSnapshot serializerSnapshot = new EnumSerializer.EnumSerializerSnapshot(PublicEnum.class, mockPreviousOrder);
TypeSerializerSchemaCompatibility compatibility = serializerSnapshot.resolveSchemaCompatibility(serializer);
assertTrue(compatibility.isCompatibleWithReconfiguredSerializer());
// after reconfiguration, the order should be first the original BAR, PAULA, NATHANIEL,
// followed by the "new enum constants" FOO, PETER, EMMA
PublicEnum[] expectedOrder = {PublicEnum.BAR, PublicEnum.PAULA, PublicEnum.NATHANIEL, PublicEnum.FOO, PublicEnum.PETER, PublicEnum.EMMA};
EnumSerializer<PublicEnum> configuredSerializer = (EnumSerializer<PublicEnum>) compatibility.getReconfiguredSerializer();
int i = 0;
for (PublicEnum constant : expectedOrder) {
assertEquals(i, configuredSerializer.getValueToOrdinal().get(constant).intValue());
i++;
}
assertTrue(Arrays.equals(expectedOrder, configuredSerializer.getValues()));
}
示例20
@Override
public TypeSerializerSchemaCompatibility<TaggedUnion<T1, T2>> resolveSchemaCompatibility(TypeSerializer<TaggedUnion<T1, T2>> newSerializer) {
List<Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>>> nestedSerializersAndConfigs = getNestedSerializersAndConfigs();
return CompositeTypeSerializerUtil.delegateCompatibilityCheckToNewSnapshot(
newSerializer,
new UnionSerializerSnapshot<>(),
nestedSerializersAndConfigs.get(0).f1,
nestedSerializersAndConfigs.get(1).f1
);
}
示例21
@Test
public void testResolveSchemaCompatibilityWithRemovedFields() {
final PojoSerializerSnapshot<TestPojo> testSnapshot = buildTestSnapshot(Arrays.asList(
mockRemovedField(ID_FIELD),
NAME_FIELD,
mockRemovedField(HEIGHT_FIELD)
));
final PojoSerializer<TestPojo> newPojoSerializer = buildTestNewPojoSerializer(Collections.singletonList(NAME_FIELD));
final TypeSerializerSchemaCompatibility<TestPojo> resultCompatibility =
testSnapshot.resolveSchemaCompatibility(newPojoSerializer);
assertTrue(resultCompatibility.isCompatibleAfterMigration());
}
示例22
/**
* Verifies that reconfiguration result is INCOMPATIBLE if data type has changed.
*/
@Test
public void testMigrationStrategyWithDifferentKryoType() throws Exception {
KryoSerializer<TestClassA> kryoSerializerForA = new KryoSerializer<>(TestClassA.class, new ExecutionConfig());
// snapshot configuration and serialize to bytes
TypeSerializerSnapshot kryoSerializerConfigSnapshot = kryoSerializerForA.snapshotConfiguration();
byte[] serializedConfig;
try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
TypeSerializerSnapshotSerializationUtil.writeSerializerSnapshot(
new DataOutputViewStreamWrapper(out), kryoSerializerConfigSnapshot, kryoSerializerForA);
serializedConfig = out.toByteArray();
}
KryoSerializer<TestClassB> kryoSerializerForB = new KryoSerializer<>(TestClassB.class, new ExecutionConfig());
// read configuration again from bytes
try (ByteArrayInputStream in = new ByteArrayInputStream(serializedConfig)) {
kryoSerializerConfigSnapshot = TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(
new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader(), kryoSerializerForB);
}
@SuppressWarnings("unchecked")
TypeSerializerSchemaCompatibility<TestClassB> compatResult =
kryoSerializerConfigSnapshot.resolveSchemaCompatibility(kryoSerializerForB);
assertTrue(compatResult.isIncompatible());
}
示例23
@Override
public TypeSerializerSchemaCompatibility<State<TXN, CONTEXT>> resolveSchemaCompatibility(
TypeSerializer<State<TXN, CONTEXT>> newSerializer) {
final TypeSerializerSnapshot<?>[] nestedSnapshots = getNestedSerializersAndConfigs()
.stream()
.map(t -> t.f1)
.toArray(TypeSerializerSnapshot[]::new);
return CompositeTypeSerializerUtil.delegateCompatibilityCheckToNewSnapshot(
newSerializer,
new StateSerializerSnapshot<>(),
nestedSnapshots
);
}
示例24
@Override
public TypeSerializerSchemaCompatibility<BinaryRowData> resolveSchemaCompatibility(
TypeSerializer<BinaryRowData> newSerializer) {
if (!(newSerializer instanceof BinaryRowDataSerializer)) {
return TypeSerializerSchemaCompatibility.incompatible();
}
BinaryRowDataSerializer newBinaryRowSerializer = (BinaryRowDataSerializer) newSerializer;
if (previousNumFields != newBinaryRowSerializer.numFields) {
return TypeSerializerSchemaCompatibility.incompatible();
} else {
return TypeSerializerSchemaCompatibility.compatibleAsIs();
}
}
示例25
@Test
public void testSerializeReconfiguredEnumSerializer() throws Exception {
// mock the previous ordering of enum constants to be BAR, PAULA, NATHANIEL
PublicEnum[] mockPreviousOrder = {PublicEnum.BAR, PublicEnum.PAULA, PublicEnum.NATHANIEL};
// now, the actual order of FOO, BAR, PETER, NATHANIEL, EMMA, PAULA will be the "new wrong order"
EnumSerializer<PublicEnum> serializer = new EnumSerializer<>(PublicEnum.class);
// verify that the serializer is first using the "wrong order" (i.e., the initial new configuration)
assertEquals(PublicEnum.FOO.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.FOO).intValue());
assertEquals(PublicEnum.BAR.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.BAR).intValue());
assertEquals(PublicEnum.PETER.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.PETER).intValue());
assertEquals(PublicEnum.NATHANIEL.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.NATHANIEL).intValue());
assertEquals(PublicEnum.EMMA.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.EMMA).intValue());
assertEquals(PublicEnum.PAULA.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.PAULA).intValue());
// reconfigure and verify compatibility
EnumSerializer.EnumSerializerSnapshot serializerSnapshot = new EnumSerializer.EnumSerializerSnapshot(PublicEnum.class, mockPreviousOrder);
TypeSerializerSchemaCompatibility compatibility = serializerSnapshot.resolveSchemaCompatibility(serializer);
assertTrue(compatibility.isCompatibleWithReconfiguredSerializer());
// verify that after the serializer was read, the reconfigured constant ordering is untouched
PublicEnum[] expectedOrder = {PublicEnum.BAR, PublicEnum.PAULA, PublicEnum.NATHANIEL, PublicEnum.FOO, PublicEnum.PETER, PublicEnum.EMMA};
EnumSerializer<PublicEnum> configuredSerializer = (EnumSerializer<PublicEnum>) compatibility.getReconfiguredSerializer();
int i = 0;
for (PublicEnum constant : expectedOrder) {
assertEquals(i, configuredSerializer.getValueToOrdinal().get(constant).intValue());
i++;
}
assertTrue(Arrays.equals(expectedOrder, configuredSerializer.getValues()));
}
示例26
@Override
public TypeSerializerSchemaCompatibility<State<TXN, CONTEXT>> resolveSchemaCompatibility(
TypeSerializer<State<TXN, CONTEXT>> newSerializer) {
final TypeSerializerSnapshot<?>[] nestedSnapshots = getNestedSerializersAndConfigs()
.stream()
.map(t -> t.f1)
.toArray(TypeSerializerSnapshot[]::new);
return CompositeTypeSerializerUtil.delegateCompatibilityCheckToNewSnapshot(
newSerializer,
new StateSerializerSnapshot<>(),
nestedSnapshots
);
}
示例27
@Test
public void testSerializeReconfiguredEnumSerializer() throws Exception {
// mock the previous ordering of enum constants to be BAR, PAULA, NATHANIEL
PublicEnum[] mockPreviousOrder = {PublicEnum.BAR, PublicEnum.PAULA, PublicEnum.NATHANIEL};
// now, the actual order of FOO, BAR, PETER, NATHANIEL, EMMA, PAULA will be the "new wrong order"
EnumSerializer<PublicEnum> serializer = new EnumSerializer<>(PublicEnum.class);
// verify that the serializer is first using the "wrong order" (i.e., the initial new configuration)
assertEquals(PublicEnum.FOO.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.FOO).intValue());
assertEquals(PublicEnum.BAR.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.BAR).intValue());
assertEquals(PublicEnum.PETER.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.PETER).intValue());
assertEquals(PublicEnum.NATHANIEL.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.NATHANIEL).intValue());
assertEquals(PublicEnum.EMMA.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.EMMA).intValue());
assertEquals(PublicEnum.PAULA.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.PAULA).intValue());
// reconfigure and verify compatibility
EnumSerializer.EnumSerializerSnapshot serializerSnapshot = new EnumSerializer.EnumSerializerSnapshot(PublicEnum.class, mockPreviousOrder);
TypeSerializerSchemaCompatibility compatibility = serializerSnapshot.resolveSchemaCompatibility(serializer);
assertTrue(compatibility.isCompatibleWithReconfiguredSerializer());
// verify that after the serializer was read, the reconfigured constant ordering is untouched
PublicEnum[] expectedOrder = {PublicEnum.BAR, PublicEnum.PAULA, PublicEnum.NATHANIEL, PublicEnum.FOO, PublicEnum.PETER, PublicEnum.EMMA};
EnumSerializer<PublicEnum> configuredSerializer = (EnumSerializer<PublicEnum>) compatibility.getReconfiguredSerializer();
int i = 0;
for (PublicEnum constant : expectedOrder) {
assertEquals(i, configuredSerializer.getValueToOrdinal().get(constant).intValue());
i++;
}
assertTrue(Arrays.equals(expectedOrder, configuredSerializer.getValues()));
}
示例28
@Override
public TypeSerializerSchemaCompatibility<TimerHeapInternalTimer<K, N>> resolveSchemaCompatibility(
TypeSerializer<TimerHeapInternalTimer<K, N>> newSerializer) {
final TypeSerializerSnapshot<?>[] nestedSnapshots = getNestedSerializersAndConfigs()
.stream()
.map(t -> t.f1)
.toArray(TypeSerializerSnapshot[]::new);
return CompositeTypeSerializerUtil.delegateCompatibilityCheckToNewSnapshot(
newSerializer,
new TimerSerializerSnapshot<>(),
nestedSnapshots
);
}
示例29
@Test
public void testResolveSchemaCompatibilityWithCompatibleWithReconfigurationFieldSerializers() {
final PojoSerializerSnapshot<TestPojo> testSnapshot = buildTestSnapshot(Arrays.asList(
mockFieldSerializerSnapshot(
ID_FIELD,
SchemaCompatibilityTestingSnapshot.thatIsCompatibleWithNextSerializerAfterReconfiguration()),
NAME_FIELD,
HEIGHT_FIELD
));
final PojoSerializer<TestPojo> newPojoSerializer = buildTestNewPojoSerializer(Arrays.asList(
mockFieldSerializer(ID_FIELD, new SchemaCompatibilityTestingSerializer()),
NAME_FIELD,
HEIGHT_FIELD
));
final TypeSerializerSchemaCompatibility<TestPojo> resultCompatibility =
testSnapshot.resolveSchemaCompatibility(newPojoSerializer);
assertTrue(resultCompatibility.isCompatibleWithReconfiguredSerializer());
final TypeSerializer<TestPojo> reconfiguredSerializer = resultCompatibility.getReconfiguredSerializer();
assertSame(reconfiguredSerializer.getClass(), PojoSerializer.class);
final PojoSerializer<TestPojo> reconfiguredPojoSerializer = (PojoSerializer<TestPojo>) reconfiguredSerializer;
final TypeSerializer<?>[] reconfiguredFieldSerializers = reconfiguredPojoSerializer.getFieldSerializers();
assertArrayEquals(
new TypeSerializer[] {
new SchemaCompatibilityTestingSerializer(),
StringSerializer.INSTANCE,
DoubleSerializer.INSTANCE },
reconfiguredFieldSerializers);
}
示例30
@Override
public TypeSerializerSchemaCompatibility<MigratedNFA<T>> resolveSchemaCompatibility(TypeSerializer<MigratedNFA<T>> newSerializer) {
return CompositeTypeSerializerUtil.delegateCompatibilityCheckToNewSnapshot(
newSerializer,
new MigratedNFASerializerSnapshot<>(),
getNestedSerializerSnapshots());
}