Java源码示例:org.apache.flink.streaming.util.serialization.DeserializationSchema
示例1
/**
* Creates a generic Kafka {@link StreamTableSource}.
*
* @param topic Kafka topic to consume.
* @param properties Properties for the Kafka consumer.
* @param deserializationSchema Deserialization schema to use for Kafka records.
* @param fieldNames Row field names.
* @param fieldTypes Row field types.
*/
KafkaTableSource(
String topic,
Properties properties,
DeserializationSchema<Row> deserializationSchema,
String[] fieldNames,
TypeInformation<?>[] fieldTypes) {
this.topic = Preconditions.checkNotNull(topic, "Topic");
this.properties = Preconditions.checkNotNull(properties, "Properties");
this.deserializationSchema = Preconditions.checkNotNull(deserializationSchema, "Deserialization schema");
this.fieldNames = Preconditions.checkNotNull(fieldNames, "Field names");
this.fieldTypes = Preconditions.checkNotNull(fieldTypes, "Field types");
Preconditions.checkArgument(fieldNames.length == fieldTypes.length,
"Number of provided field names and types does not match.");
this.typeInfo = new RowTypeInfo(fieldTypes, fieldNames);
}
示例2
/**
* Creates a generic Kafka {@link StreamTableSource}.
*
* @param topic Kafka topic to consume.
* @param properties Properties for the Kafka consumer.
* @param deserializationSchema Deserialization schema to use for Kafka records.
* @param typeInfo Type information describing the result type.
*/
KafkaTableSource(
String topic,
Properties properties,
DeserializationSchema<Row> deserializationSchema,
TypeInformation<Row> typeInfo) {
this.topic = Preconditions.checkNotNull(topic, "Topic");
this.properties = Preconditions.checkNotNull(properties, "Properties");
this.deserializationSchema = Preconditions.checkNotNull(deserializationSchema, "Deserialization schema");
this.typeInfo = Preconditions.checkNotNull(typeInfo, "Type information");
this.fieldTypes = null;
this.fieldNames = null;
}
示例3
/**
* Creates a generic Kafka {@link StreamTableSource}.
*
* @param topic Kafka topic to consume.
* @param properties Properties for the Kafka consumer.
* @param deserializationSchema Deserialization schema to use for Kafka records.
* @param fieldNames Row field names.
* @param fieldTypes Row field types.
*/
KafkaTableSource(
String topic,
Properties properties,
DeserializationSchema<Row> deserializationSchema,
String[] fieldNames,
Class<?>[] fieldTypes) {
this(topic, properties, deserializationSchema, fieldNames, toTypeInfo(fieldTypes));
}
示例4
@Override
FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) {
return new FlinkKafkaConsumer010<>(topic, deserializationSchema, properties);
}
示例5
@Override
FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) {
return new FlinkKafkaConsumer010<>(topic, deserializationSchema, properties);
}
示例6
@Override
FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) {
return new FlinkKafkaConsumer09<>(topic, deserializationSchema, properties);
}
示例7
/**
* Returns the version-specific Kafka consumer.
*
* @param topic Kafka topic to consume.
* @param properties Properties for the Kafka consumer.
* @param deserializationSchema Deserialization schema to use for Kafka records.
* @return The version-specific Kafka consumer
*/
abstract FlinkKafkaConsumerBase<Row> getKafkaConsumer(
String topic,
Properties properties,
DeserializationSchema<Row> deserializationSchema);
示例8
/**
* Returns the deserialization schema.
*
* @return The deserialization schema
*/
protected DeserializationSchema<Row> getDeserializationSchema() {
return deserializationSchema;
}