Java源码示例:org.apache.kafka.common.serialization.Deserializer
示例1
@SuppressWarnings("unchecked")
private K getKey(String topic, Message<byte[]> msg) {
if (!msg.hasKey()) {
return null;
}
if (keySchema instanceof PulsarKafkaSchema) {
PulsarKafkaSchema<K> pulsarKafkaSchema = (PulsarKafkaSchema) keySchema;
Deserializer<K> kafkaDeserializer = pulsarKafkaSchema.getKafkaDeserializer();
if (kafkaDeserializer instanceof StringDeserializer) {
return (K) msg.getKey();
}
pulsarKafkaSchema.setTopic(topic);
}
// Assume base64 encoding
byte[] data = Base64.getDecoder().decode(msg.getKey());
return keySchema.decode(data);
}
示例2
public ConsumerContainer(
Properties consumerProperties,
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer,
Oneof2<
java.util.function.Consumer<? super ConsumerRecord<K, V>>,
java.util.function.Consumer<? super ConsumerRecords<K, V>>
> recordOrRecordsHandler,
BiConsumer<? super Consumer<?, ?>, ? super RuntimeException> consumerExceptionHandler
) {
this(
consumerProperties,
keyDeserializer,
valueDeserializer,
DEFAULT_CONSUMER_POLL_TIMEOUT,
recordOrRecordsHandler,
consumerExceptionHandler,
0L, null
);
}
示例3
public ConsumerContainer(
Properties consumerProperties,
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer,
long consumerPollTimeout,
Oneof2<
java.util.function.Consumer<? super ConsumerRecord<K, V>>,
java.util.function.Consumer<? super ConsumerRecords<K, V>>
> recordOrRecordsHandler,
BiConsumer<? super Consumer<?, ?>, ? super RuntimeException> consumerExceptionHandler,
long idlePingTimeout,
java.util.function.Consumer<? super TopicPartition> idlePingHandler
) {
this.consumerProperties = Objects.requireNonNull(consumerProperties);
this.keyDeserializer = Objects.requireNonNull(keyDeserializer);
this.valueDeserializer = Objects.requireNonNull(valueDeserializer);
this.consumerPollTimeout = Duration.ofMillis(consumerPollTimeout);
this.recordHandler = recordOrRecordsHandler.isFirst() ? recordOrRecordsHandler.getFirst() : null;
this.recordsHandler = recordOrRecordsHandler.isSecond() ? recordOrRecordsHandler.getSecond() : null;
this.consumerExceptionHandler = Objects.requireNonNull(consumerExceptionHandler);
this.idlePingTimeout = idlePingTimeout;
this.idlePingHandler = /* optional */ idlePingHandler;
this.thread = new Thread(this::consumerLoop,
"kafka-consumer-container-" + containerCount.incrementAndGet());
thread.start();
}
示例4
@Override
public Deserializer<T> deserializer() {
return new Deserializer<T>() {
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}
@Override
public T deserialize(String topic, byte[] data) {
T result;
try {
result = mapper.readValue(data, cls);
} catch (Exception e) {
throw new SerializationException(e);
}
return result;
}
@Override
public void close() {
}
};
}
示例5
private List<RatingCount> readOutputTopic(TopologyTestDriver testDriver,
String outputTopic,
Deserializer<String> keyDeserializer,
Deserializer<String> valueDeserializer) {
List<RatingCount> results = new ArrayList<>();
while(true) {
ProducerRecord<String, String> record = testDriver.readOutput(outputTopic, keyDeserializer, valueDeserializer);
if (record != null) {
results.add(new RatingCount(record.key().toString(), record.value()));
} else {
break;
}
}
return results;
}
示例6
@Test
public void testSingleMessageSegment() {
// Create serializer/deserializers.
Serializer<LargeMessageSegment> segmentSerializer = new DefaultSegmentSerializer();
Deserializer<LargeMessageSegment> segmentDeserializer = new DefaultSegmentDeserializer();
byte[] messageWrappedBytes = wrapMessageBytes(segmentSerializer, "message".getBytes());
MessageAssembler messageAssembler = new MessageAssemblerImpl(100, 100, true, segmentDeserializer);
MessageAssembler.AssembleResult assembleResult =
messageAssembler.assemble(new TopicPartition("topic", 0), 0, messageWrappedBytes);
assertNotNull(assembleResult.messageBytes());
assertEquals(assembleResult.messageStartingOffset(), 0, "The message starting offset should be 0");
assertEquals(assembleResult.messageEndingOffset(), 0, "The message ending offset should be 0");
}
示例7
/**
* Create a {@link Consumer} that has a unique group ID and reads everything from a topic in Kafka
* starting at the earliest point by default.
*
* @param kafkaHostname - The Kafka broker hostname. (not null)
* @param kafkaPort - The Kafka broker port.
* @param keyDeserializerClass - Deserializes the keys. (not null)
* @param valueDeserializerClass - Deserializes the values. (not null)
* @return A {@link Consumer} that can be used to read records from a topic.
*/
private static <K, V> Consumer<K, V> fromStartConsumer(
final String kafkaHostname,
final int kakfaPort,
final Class<? extends Deserializer<K>> keyDeserializerClass,
final Class<? extends Deserializer<V>> valueDeserializerClass) {
requireNonNull(kafkaHostname);
requireNonNull(keyDeserializerClass);
requireNonNull(valueDeserializerClass);
final Properties consumerProps = new Properties();
consumerProps.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafkaHostname + ":" + kakfaPort);
consumerProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
consumerProps.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString());
consumerProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializerClass.getName());
consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializerClass.getName());
return new KafkaConsumer<>(consumerProps);
}
示例8
@Test
public void testSerde() {
Serializer<String> stringSerializer = new StringSerializer();
Deserializer<String> stringDeserializer = new StringDeserializer();
Serializer<LargeMessageSegment> segmentSerializer = new DefaultSegmentSerializer();
Deserializer<LargeMessageSegment> segmentDeserializer = new DefaultSegmentDeserializer();
String s = LiKafkaClientsTestUtils.getRandomString(100);
assertEquals(s.length(), 100);
byte[] stringBytes = stringSerializer.serialize("topic", s);
assertEquals(stringBytes.length, 100);
LargeMessageSegment segment =
new LargeMessageSegment(LiKafkaClientsUtils.randomUUID(), 0, 2, stringBytes.length, ByteBuffer.wrap(stringBytes));
// String bytes + segment header
byte[] serializedSegment = segmentSerializer.serialize("topic", segment);
assertEquals(serializedSegment.length, 1 + stringBytes.length + LargeMessageSegment.SEGMENT_INFO_OVERHEAD + 4);
LargeMessageSegment deserializedSegment = segmentDeserializer.deserialize("topic", serializedSegment);
assertEquals(deserializedSegment.messageId, segment.messageId);
assertEquals(deserializedSegment.messageSizeInBytes, segment.messageSizeInBytes);
assertEquals(deserializedSegment.numberOfSegments, segment.numberOfSegments);
assertEquals(deserializedSegment.sequenceNumber, segment.sequenceNumber);
assertEquals(deserializedSegment.payload.limit(), 100);
String deserializedString = stringDeserializer.deserialize("topic", deserializedSegment.payloadArray());
assertEquals(deserializedString.length(), s.length());
}
示例9
/**
*
* @param connector
* @param topics
* @param processThreads
*/
@SuppressWarnings("unchecked")
public OldApiTopicConsumer(ConsumerContext context) {
this.consumerContext = context;
try {
Class<?> deserializerClass = Class
.forName(context.getProperties().getProperty("value.deserializer"));
deserializer = (Deserializer<Object>) deserializerClass.newInstance();
} catch (Exception e) {
}
this.connector = kafka.consumer.Consumer
.createJavaConsumerConnector(new ConsumerConfig(context.getProperties()));
int poolSize = consumerContext.getMessageHandlers().size();
this.fetchExecutor = new StandardThreadExecutor(poolSize, poolSize, 0, TimeUnit.SECONDS,
poolSize, new StandardThreadFactory("KafkaFetcher"));
this.defaultProcessExecutor = new StandardThreadExecutor(1, context.getMaxProcessThreads(),
30, TimeUnit.SECONDS, context.getMaxProcessThreads(),
new StandardThreadFactory("KafkaProcessor"), new PoolFullRunsPolicy());
logger.info(
"Kafka Conumer ThreadPool initialized,fetchPool Size:{},defalutProcessPool Size:{} ",
poolSize, context.getMaxProcessThreads());
}
示例10
private WebKafkaConsumerFactory createDefaultFactory() {
final PluginFactory<Deserializer> deserializerPluginFactory = new PluginFactory<>("not/used", Deserializer.class);
final PluginFactory<RecordFilter> filterPluginFactoryPluginFactory = new PluginFactory<>("not/used", RecordFilter.class);
final SecretManager secretManager = new SecretManager("Passphrase");
final KafkaConsumerFactory kafkaConsumerFactory = new KafkaConsumerFactory(
new KafkaClientConfigUtil("not/used", "MyPrefix")
);
return new WebKafkaConsumerFactory(
deserializerPluginFactory,
filterPluginFactoryPluginFactory,
secretManager,
kafkaConsumerFactory,
null
);
}
示例11
@SuppressWarnings("unchecked")
void init(ServletContext context) {
String serializedConfig = context.getInitParameter(ConfigUtils.class.getName() + ".serialized");
Objects.requireNonNull(serializedConfig);
this.config = ConfigUtils.deserialize(serializedConfig);
this.updateTopic = config.getString("oryx.update-topic.message.topic");
this.maxMessageSize = config.getInt("oryx.update-topic.message.max-size");
this.updateTopicLockMaster = config.getString("oryx.update-topic.lock.master");
this.updateTopicBroker = config.getString("oryx.update-topic.broker");
this.readOnly = config.getBoolean("oryx.serving.api.read-only");
if (!readOnly) {
this.inputTopic = config.getString("oryx.input-topic.message.topic");
this.inputTopicLockMaster = config.getString("oryx.input-topic.lock.master");
this.inputTopicBroker = config.getString("oryx.input-topic.broker");
}
this.modelManagerClassName = config.getString("oryx.serving.model-manager-class");
this.updateDecoderClass = (Class<? extends Deserializer<U>>) ClassUtils.loadClass(
config.getString("oryx.update-topic.message.decoder-class"), Deserializer.class);
Preconditions.checkArgument(maxMessageSize > 0);
}
示例12
private ConsumerFactory<byte[], byte[]> consumerFactory() {
Map<String, Object> props = new HashMap<>();
KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
configurationProperties.getKafkaConnectionString());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "TEST-CONSUMER-GROUP");
Deserializer<byte[]> valueDecoder = new ByteArrayDeserializer();
Deserializer<byte[]> keyDecoder = new ByteArrayDeserializer();
return new DefaultKafkaConsumerFactory<>(props, keyDecoder, valueDecoder);
}
示例13
@RegistryServiceTest
public void testProto(Supplier<RegistryService> supplier) throws Exception {
try (ProtobufKafkaSerializer<TestCmmn.UUID> serializer = new ProtobufKafkaSerializer<TestCmmn.UUID>(supplier.get());
Deserializer<DynamicMessage> deserializer = new ProtobufKafkaDeserializer(supplier.get())) {
serializer.setGlobalIdStrategy(new AutoRegisterIdStrategy<>());
TestCmmn.UUID record = TestCmmn.UUID.newBuilder().setLsb(2).setMsb(1).build();
String subject = generateArtifactId();
byte[] bytes = serializer.serialize(subject, record);
waitForSchema(supplier.get(), bytes);
DynamicMessage dm = deserializer.deserialize(subject, bytes);
Descriptors.Descriptor descriptor = dm.getDescriptorForType();
Descriptors.FieldDescriptor lsb = descriptor.findFieldByName("lsb");
Assertions.assertNotNull(lsb);
Assertions.assertEquals(2L, dm.getField(lsb));
Descriptors.FieldDescriptor msb = descriptor.findFieldByName("msb");
Assertions.assertNotNull(msb);
Assertions.assertEquals(1L, dm.getField(msb));
}
}
示例14
/**
* Use the supplied function to asynchronously consume messages from the cluster.
*
* @param groupId the name of the group; may not be null
* @param clientId the name of the client; may not be null
* @param autoOffsetReset how to pick a starting offset when there is no initial offset in ZooKeeper or if an offset is
* out of range; may be null for the default to be used
* @param keyDeserializer the deserializer for the keys; may not be null
* @param valueDeserializer the deserializer for the values; may not be null
* @param continuation the function that determines if the consumer should continue; may not be null
* @param offsetCommitCallback the callback that should be used after committing offsets; may be null if offsets are
* not to be committed
* @param completion the function to call when the consumer terminates; may be null
* @param topics the set of topics to consume; may not be null or empty
* @param consumerFunction the function to consume the messages; may not be null
*/
public <K, V> void consume(String groupId, String clientId, OffsetResetStrategy autoOffsetReset,
Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer,
BooleanSupplier continuation, OffsetCommitCallback offsetCommitCallback, Runnable completion,
Collection<String> topics,
java.util.function.Consumer<ConsumerRecord<K, V>> consumerFunction) {
Properties props = getConsumerProperties(groupId, clientId, autoOffsetReset);
Thread t = new Thread(() -> {
LOGGER.infof("Starting consumer %s to read messages", clientId);
try (KafkaConsumer<K, V> consumer = new KafkaConsumer<>(props, keyDeserializer, valueDeserializer)) {
consumer.subscribe(new ArrayList<>(topics));
while (continuation.getAsBoolean()) {
consumer.poll(Duration.ofMillis(10)).forEach(record -> {
LOGGER.infof("Consumer %s: consuming message %s", clientId, record);
consumerFunction.accept(record);
if (offsetCommitCallback != null) {
consumer.commitAsync(offsetCommitCallback);
}
});
}
} finally {
if (completion != null) {
completion.run();
}
LOGGER.debugf("Stopping consumer %s", clientId);
}
});
t.setName(clientId + "-thread");
t.start();
}
示例15
@Test
public void testConfluentAvroDeserializer() throws IOException, RestClientException {
WorkUnitState mockWorkUnitState = getMockWorkUnitState(0L,10L);
mockWorkUnitState.setProp("schema.registry.url", TEST_URL);
Schema schema = SchemaBuilder.record(TEST_RECORD_NAME)
.namespace(TEST_NAMESPACE).fields()
.name(TEST_FIELD_NAME).type().stringType().noDefault()
.endRecord();
GenericRecord testGenericRecord = new GenericRecordBuilder(schema).set(TEST_FIELD_NAME, "testValue").build();
SchemaRegistryClient mockSchemaRegistryClient = mock(SchemaRegistryClient.class);
when(mockSchemaRegistryClient.getByID(any(Integer.class))).thenReturn(schema);
Serializer<Object> kafkaEncoder = new KafkaAvroSerializer(mockSchemaRegistryClient);
Deserializer<Object> kafkaDecoder = new KafkaAvroDeserializer(mockSchemaRegistryClient);
ByteBuffer testGenericRecordByteBuffer =
ByteBuffer.wrap(kafkaEncoder.serialize(TEST_TOPIC_NAME, testGenericRecord));
KafkaSchemaRegistry<Integer, Schema> mockKafkaSchemaRegistry = mock(KafkaSchemaRegistry.class);
KafkaDeserializerExtractor kafkaDecoderExtractor =
new KafkaDeserializerExtractor(mockWorkUnitState,
Optional.fromNullable(Deserializers.CONFLUENT_AVRO), kafkaDecoder, mockKafkaSchemaRegistry);
ByteArrayBasedKafkaRecord mockMessageAndOffset = getMockMessageAndOffset(testGenericRecordByteBuffer);
Assert.assertEquals(kafkaDecoderExtractor.decodeRecord(mockMessageAndOffset), testGenericRecord);
}
示例16
/**
* Constructor
*
* @param vertx Vert.x instance
* @param bridgeConfig Bridge configuration
* @param format embedded format for the key/value in the Kafka message
* @param keyDeserializer Kafka deserializer for the message key
* @param valueDeserializer Kafka deserializer for the message value
*/
public SinkBridgeEndpoint(Vertx vertx, BridgeConfig bridgeConfig,
EmbeddedFormat format, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
this.vertx = vertx;
this.bridgeConfig = bridgeConfig;
this.topicSubscriptions = new ArrayList<>();
this.format = format;
this.keyDeserializer = keyDeserializer;
this.valueDeserializer = valueDeserializer;
}
示例17
@SuppressWarnings("unchecked")
public SpeedLayer(Config config) {
super(config);
this.updateBroker = config.getString("oryx.update-topic.broker");
this.updateTopic = config.getString("oryx.update-topic.message.topic");
this.maxMessageSize = config.getInt("oryx.update-topic.message.max-size");
this.modelManagerClassName = config.getString("oryx.speed.model-manager-class");
this.updateDecoderClass = (Class<? extends Deserializer<U>>) ClassUtils.loadClass(
config.getString("oryx.update-topic.message.decoder-class"), Deserializer.class);
Preconditions.checkArgument(maxMessageSize > 0);
}
示例18
/**
* Test creating a Deserializer.
*/
@Test
public void testWithDeserializer() throws LoaderException {
final String jarFilename = "testPlugins.jar";
final String classPath = "examples.deserializer.ExampleDeserializer";
// Find jar on filesystem.
final URL jar = getClass().getClassLoader().getResource("testDeserializer/" + jarFilename);
final String jarPath = new File(jar.getFile()).getParent();
// Create factory
final PluginFactory<Deserializer> factory = new PluginFactory<>(jarPath, Deserializer.class);
final Path pathForJar = factory.getPathForJar(jarFilename);
// Validate path is correct
assertEquals("Has expected Path", jar.getPath(), pathForJar.toString());
// Get class instance
final Class<? extends Deserializer> pluginFilterClass = factory.getPluginClass(jarFilename, classPath);
// Validate
assertNotNull(pluginFilterClass);
assertEquals("Has expected name", classPath, pluginFilterClass.getName());
assertTrue("Validate came from correct class loader", pluginFilterClass.getClassLoader() instanceof PluginClassLoader);
// Crete Deserializer instance
final Deserializer deserializer = factory.getPlugin(jarFilename, classPath);
assertNotNull(deserializer);
assertEquals("Has correct name", classPath, deserializer.getClass().getName());
// Call method on interface
final String value = "MyValue";
final String result = (String) deserializer.deserialize("MyTopic", value.getBytes(StandardCharsets.UTF_8));
}
示例19
/**
* Tests loading a deserializer not from an external jar.
*/
@Test
public void testLoadingDefaultDeserializer() throws LoaderException {
final String classPath = StringDeserializer.class.getName();
// Create factory
final PluginFactory<Deserializer> factory = new PluginFactory<>("/tmp", Deserializer.class);
// Get class instance
final Class<? extends Deserializer> pluginFilterClass = factory.getPluginClass(classPath);
// Validate
assertNotNull(pluginFilterClass);
assertEquals("Has expected name", classPath, pluginFilterClass.getName());
}
示例20
public static void main(String[] args) throws Exception {
StreamsConfig streamsConfig = new StreamsConfig(getProperties());
Deserializer<String> stringDeserializer = Serdes.String().deserializer();
Serializer<String> stringSerializer = Serdes.String().serializer();
Serde<StockPerformance> stockPerformanceSerde = StreamsSerdes.StockPerformanceSerde();
Serializer<StockPerformance> stockPerformanceSerializer = stockPerformanceSerde.serializer();
Serde<StockTransaction> stockTransactionSerde = StreamsSerdes.StockTransactionSerde();
Deserializer<StockTransaction> stockTransactionDeserializer = stockTransactionSerde.deserializer();
Topology topology = new Topology();
String stocksStateStore = "stock-performance-store";
double differentialThreshold = 0.02;
KeyValueBytesStoreSupplier storeSupplier = Stores.inMemoryKeyValueStore(stocksStateStore);
StoreBuilder<KeyValueStore<String, StockPerformance>> storeBuilder = Stores.keyValueStoreBuilder(storeSupplier, Serdes.String(), stockPerformanceSerde);
topology.addSource("stocks-source", stringDeserializer, stockTransactionDeserializer,"stock-transactions")
.addProcessor("stocks-processor", () -> new StockPerformanceProcessor(stocksStateStore, differentialThreshold), "stocks-source")
.addStateStore(storeBuilder,"stocks-processor")
.addSink("stocks-sink", "stock-performance", stringSerializer, stockPerformanceSerializer, "stocks-processor");
topology.addProcessor("stocks-printer", new KStreamPrinter("StockPerformance"), "stocks-processor");
KafkaStreams kafkaStreams = new KafkaStreams(topology, streamsConfig);
MockDataProducer.produceStockTransactionsWithKeyFunction(50,50, 25, StockTransaction::getSymbol);
System.out.println("Stock Analysis App Started");
kafkaStreams.cleanUp();
kafkaStreams.start();
Thread.sleep(70000);
System.out.println("Shutting down the Stock Analysis App now");
kafkaStreams.close();
MockDataProducer.shutdown();
}
示例21
/**
* Attempt to infer a {@link Coder} by extracting the type of the deserialized-class from the
* deserializer argument using the {@link Coder} registry.
*/
@Override
public NullableCoder<T> getCoder(CoderRegistry coderRegistry) {
for (Type type : deserializer.getGenericInterfaces()) {
if (!(type instanceof ParameterizedType)) {
continue;
}
// This does not recurse: we will not infer from a class that extends
// a class that extends Deserializer<T>.
ParameterizedType parameterizedType = (ParameterizedType) type;
if (parameterizedType.getRawType() == Deserializer.class) {
Type parameter = parameterizedType.getActualTypeArguments()[0];
@SuppressWarnings("unchecked")
Class<T> clazz = (Class<T>) parameter;
try {
return NullableCoder.of(coderRegistry.getCoder(clazz));
} catch (CannotProvideCoderException e) {
throw new RuntimeException(
String.format(
"Unable to automatically infer a Coder for "
+ "the Kafka Deserializer %s: no coder registered for type %s",
deserializer, clazz));
}
}
}
throw new RuntimeException(
String.format("Could not extract the Kafka Deserializer type from %s", deserializer));
}
示例22
@VisibleForTesting
KafkaDeserializerExtractor(WorkUnitState state, Optional<Deserializers> deserializerType,
Deserializer<?> kafkaDeserializer, KafkaSchemaRegistry<?, ?> kafkaSchemaRegistry) {
super(state);
this.kafkaDeserializer = kafkaDeserializer;
this.kafkaSchemaRegistry = kafkaSchemaRegistry;
this.latestSchema =
(deserializerType.equals(Optional.of(Deserializers.CONFLUENT_AVRO))) ? (Schema) getSchema() : null;
}
示例23
/**
* Constructor.
* @param keyDeserializerClass Class for deserializer for keys.
* @param valueDeserializerClass Class for deserializer for values.
*/
private DeserializerConfig(
final Class<? extends Deserializer> keyDeserializerClass,
final Map<String, String> keyDeserializerOptions,
final Class<? extends Deserializer> valueDeserializerClass,
final Map<String, String> valueDeserializerOptions
) {
this.keyDeserializerClass = keyDeserializerClass;
this.keyDeserializerOptions = new HashMap<>();
this.keyDeserializerOptions.putAll(keyDeserializerOptions);
this.valueDeserializerClass = valueDeserializerClass;
this.valueDeserializerOptions = new HashMap<>();
this.valueDeserializerOptions.putAll(valueDeserializerOptions);
}
示例24
public LiKafkaConsumerImpl(Map<String, Object> configs,
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer,
Deserializer<LargeMessageSegment> largeMessageSegmentDeserializer,
Auditor<K, V> consumerAuditor) {
this(new LiKafkaConsumerConfig(configs), keyDeserializer, valueDeserializer, largeMessageSegmentDeserializer, consumerAuditor);
}
示例25
/**
* Attempt to infer a {@link Coder} by extracting the type of the deserialized-class from the
* deserializer argument using the {@link Coder} registry.
*/
@VisibleForTesting
static <T> NullableCoder<T> inferCoder(
CoderRegistry coderRegistry, Class<? extends Deserializer<T>> deserializer) {
checkNotNull(deserializer);
for (Type type : deserializer.getGenericInterfaces()) {
if (!(type instanceof ParameterizedType)) {
continue;
}
// This does not recurse: we will not infer from a class that extends
// a class that extends Deserializer<T>.
ParameterizedType parameterizedType = (ParameterizedType) type;
if (parameterizedType.getRawType() == Deserializer.class) {
Type parameter = parameterizedType.getActualTypeArguments()[0];
@SuppressWarnings("unchecked")
Class<T> clazz = (Class<T>) parameter;
try {
return NullableCoder.of(coderRegistry.getCoder(clazz));
} catch (CannotProvideCoderException e) {
throw new RuntimeException(
String.format(
"Unable to automatically infer a Coder for "
+ "the Kafka Deserializer %s: no coder registered for type %s",
deserializer, clazz));
}
}
}
throw new RuntimeException(
String.format("Could not extract the Kafka Deserializer type from %s", deserializer));
}
示例26
@Override
public Deserializer<SensorReading> getDeserializer() {
if(smile) {
return JacksonReadingSerializer.smileConfig();
}
else {
return JacksonReadingSerializer.defaultConfig();
}
}
示例27
/**
* Constructs a {@link Deserializer}, using the value of {@link #KAFKA_DESERIALIZER_TYPE}.
*/
private static Deserializer<?> getDeserializer(Properties props, Optional<Deserializers> deserializerType) throws ReflectiveOperationException {
Deserializer<?> deserializer;
if (deserializerType.isPresent()) {
deserializer = ConstructorUtils.invokeConstructor(deserializerType.get().getDeserializerClass());
} else {
deserializer = Deserializer.class
.cast(ConstructorUtils.invokeConstructor(Class.forName(props.getProperty(KAFKA_DESERIALIZER_TYPE))));
}
deserializer.configure(PropertiesUtils.propsToStringKeyMap(props), false);
return deserializer;
}
示例28
private static <T> Class<Deserializer<T>> createDeserializer( Optional<String> deserializerClass
, String defaultDeserializerClass
)
{
try {
return (Class<Deserializer<T>>) Class.forName(deserializerClass.orElse(defaultDeserializerClass));
} catch (Exception e) {
throw new IllegalStateException("Unable to create a deserializer: " + deserializerClass.orElse(defaultDeserializerClass) + ": " + e.getMessage(), e);
}
}
示例29
private Deserializer<GenericRow> getDeserializer(Schema schema,
DataSource.DataSourceSerDe dataSourceSerDe) {
switch (dataSourceSerDe) {
case JSON:
return new KsqlJsonDeserializer(schema);
case AVRO:
return new KsqlGenericRowAvroDeserializer(schema,
this.schemaRegistryClient,
false);
case DELIMITED:
return new KsqlDelimitedDeserializer(schema);
default:
throw new KsqlException("Format not supported: " + dataSourceSerDe);
}
}
示例30
@Test
public void constructorWithDeserializers_nullConfig() {
Deserializer<String> keyDeserializer = new StringDeserializer();
Deserializer<String> valueDeserializer = new StringDeserializer();
try {
new ProcessingKafkaConsumer(null, keyDeserializer, valueDeserializer);
Assert.fail("Expected IllegalArgumentException to be thrown");
} catch (IllegalArgumentException e) {
}
}