Java源码示例:org.apache.kafka.common.serialization.Serializer
示例1
@Override
public Serializer<T> serializer() {
return new Serializer<T>() {
@Override
public void configure(Map<String, ?> configs, boolean isKey) {}
@Override
public byte[] serialize(String topic, T data) {
serialisedObjectCache.putIfAbsent(Tuple2.of(topic, data.hashCode()), data);
return String.valueOf(data.hashCode()).getBytes(Charset.defaultCharset());
}
@Override
public void close() { }
};
}
示例2
@Test
public void testDelimitedSerialization() {
List<StructField> fields = Lists.newArrayList(
DataTypes.createStructField("field1", DataTypes.StringType, true),
DataTypes.createStructField("field2", DataTypes.IntegerType, true),
DataTypes.createStructField("field3", DataTypes.BooleanType, true)
);
Row row = new RowWithSchema(DataTypes.createStructType(fields), "hello", 1, false);
Map<String, String> configs = Maps.newHashMap();
configs.put(DelimitedSerializer.FIELD_DELIMITER_CONFIG_NAME, "||");
Serializer<Row> serializer = new DelimitedSerializer();
serializer.configure(configs, false);
byte[] serialized = serializer.serialize("test", row);
serializer.close();
assertEquals(new String(serialized), "hello||1||false");
}
示例3
public static <K, V> void edgesToTopic(
InputStream inputStream,
Parser<EdgeWithValue<K, V>> edgeParser,
Serializer<V> valueSerializer,
Properties props,
String topic,
int numPartitions,
short replicationFactor
) throws IOException {
ClientUtils.createTopic(topic, numPartitions, replicationFactor, props);
try (BufferedReader reader =
new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8));
Producer<Edge<K>, V> producer = new KafkaProducer<>(props, new KryoSerializer<>(), valueSerializer)) {
String line;
while ((line = reader.readLine()) != null) {
EdgeWithValue<K, V> edge = edgeParser.parse(line);
log.trace("read edge: ({}, {})", edge.source(), edge.target());
ProducerRecord<Edge<K>, V> producerRecord =
new ProducerRecord<>(topic, new Edge<>(edge.source(), edge.target()), edge.value());
producer.send(producerRecord);
}
producer.flush();
}
}
示例4
/**
* Create a {@link Producer} that is able to write to a topic in Kafka.
*
* @param kafkaHostname - The Kafka broker hostname. (not null)
* @param kafkaPort - The Kafka broker port.
* @param keySerializerClass - Serializes the keys. (not null)
* @param valueSerializerClass - Serializes the values. (not null)
* @return A {@link Producer} that can be used to write records to a topic.
*/
private static <K, V> Producer<K, V> makeProducer(
final String kafkaHostname,
final int kakfaPort,
final Class<? extends Serializer<K>> keySerializerClass,
final Class<? extends Serializer<V>> valueSerializerClass) {
requireNonNull(kafkaHostname);
requireNonNull(keySerializerClass);
requireNonNull(valueSerializerClass);
final Properties producerProps = new Properties();
producerProps.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafkaHostname + ":" + kakfaPort);
producerProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializerClass.getName());
producerProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializerClass.getName());
return new KafkaProducer<>(producerProps);
}
示例5
public static <K, V> void verticesToTopic(
InputStream inputStream,
Parser<VertexWithValue<K, V>> vertexParser,
Serializer<K> keySerializer,
Serializer<V> valueSerializer,
Properties props,
String topic,
int numPartitions,
short replicationFactor
) throws IOException {
ClientUtils.createTopic(topic, numPartitions, replicationFactor, props);
try (BufferedReader reader =
new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8));
Producer<K, V> producer = new KafkaProducer<>(props, keySerializer, valueSerializer)) {
String line;
while ((line = reader.readLine()) != null) {
VertexWithValue<K, V> vertex = vertexParser.parse(line);
log.trace("read vertex: {}", vertex.id());
ProducerRecord<K, V> producerRecord =
new ProducerRecord<>(topic, vertex.id(), vertex.value());
producer.send(producerRecord);
}
producer.flush();
}
}
示例6
@Test
public void testSerde() {
Serializer<String> stringSerializer = new StringSerializer();
Deserializer<String> stringDeserializer = new StringDeserializer();
Serializer<LargeMessageSegment> segmentSerializer = new DefaultSegmentSerializer();
Deserializer<LargeMessageSegment> segmentDeserializer = new DefaultSegmentDeserializer();
String s = LiKafkaClientsTestUtils.getRandomString(100);
assertEquals(s.length(), 100);
byte[] stringBytes = stringSerializer.serialize("topic", s);
assertEquals(stringBytes.length, 100);
LargeMessageSegment segment =
new LargeMessageSegment(LiKafkaClientsUtils.randomUUID(), 0, 2, stringBytes.length, ByteBuffer.wrap(stringBytes));
// String bytes + segment header
byte[] serializedSegment = segmentSerializer.serialize("topic", segment);
assertEquals(serializedSegment.length, 1 + stringBytes.length + LargeMessageSegment.SEGMENT_INFO_OVERHEAD + 4);
LargeMessageSegment deserializedSegment = segmentDeserializer.deserialize("topic", serializedSegment);
assertEquals(deserializedSegment.messageId, segment.messageId);
assertEquals(deserializedSegment.messageSizeInBytes, segment.messageSizeInBytes);
assertEquals(deserializedSegment.numberOfSegments, segment.numberOfSegments);
assertEquals(deserializedSegment.sequenceNumber, segment.sequenceNumber);
assertEquals(deserializedSegment.payload.limit(), 100);
String deserializedString = stringDeserializer.deserialize("topic", deserializedSegment.payloadArray());
assertEquals(deserializedString.length(), s.length());
}
示例7
@Override
public Serializer<T> serializer() {
return new Serializer<T>() {
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}
@Override
public byte[] serialize(String topic, T data) {
try {
return mapper.writeValueAsBytes(data);
} catch (Exception e) {
throw new SerializationException("Error serializing JSON message", e);
}
}
@Override
public void close() {
}
};
}
示例8
static Deserializer getDeserializer(Serializer serializer) {
if (serializer instanceof StringSerializer) {
return new StringDeserializer();
} else if (serializer instanceof LongSerializer) {
return new LongDeserializer();
} else if (serializer instanceof IntegerSerializer) {
return new IntegerDeserializer();
} else if (serializer instanceof DoubleSerializer) {
return new DoubleDeserializer();
} else if (serializer instanceof BytesSerializer) {
return new BytesDeserializer();
} else if (serializer instanceof ByteBufferSerializer) {
return new ByteBufferDeserializer();
} else if (serializer instanceof ByteArraySerializer) {
return new ByteArrayDeserializer();
} else {
throw new IllegalArgumentException(serializer.getClass().getName() + " is not a valid or supported subclass of org.apache.kafka.common.serialization.Serializer.");
}
}
示例9
private <T> void testSerializer(Class<T> type, T val) {
final Serde<T> serde = VertxSerdes.serdeFrom(type);
final Deserializer<T> deserializer = serde.deserializer();
final Serializer<T> serializer = serde.serializer();
assertEquals("Should get the original value after serialization and deserialization",
val, deserializer.deserialize(topic, serializer.serialize(topic, val)));
assertEquals("Should support null in serialization and deserialization",
null, deserializer.deserialize(topic, serializer.serialize(topic, null)));
}
示例10
@Override
public Serializer<TopFiveSongs> serializer() {
return new Serializer<TopFiveSongs>() {
@Override
public void configure(final Map<String, ?> map, final boolean b) {
}
@Override
public byte[] serialize(final String s, final TopFiveSongs topFiveSongs) {
final ByteArrayOutputStream out = new ByteArrayOutputStream();
final DataOutputStream
dataOutputStream =
new DataOutputStream(out);
try {
for (SongPlayCount songPlayCount : topFiveSongs) {
dataOutputStream.writeLong(songPlayCount.getSongId());
dataOutputStream.writeLong(songPlayCount.getPlays());
}
dataOutputStream.flush();
} catch (IOException e) {
throw new RuntimeException(e);
}
return out.toByteArray();
}
};
}
示例11
private void testSchemaHeaderNames(String customKeySchemaHeaderName,
String customValueSchemaHeaderName) {
TestRecord record = new TestRecord();
record.setField1("Hello");
record.setField2("World");
Map<String, Object> configs = new HashMap<>();
configs.put(KafkaAvroSerde.KEY_SCHEMA_VERSION_ID_HEADER_NAME, customKeySchemaHeaderName);
configs.put(KafkaAvroSerde.VALUE_SCHEMA_VERSION_ID_HEADER_NAME, customValueSchemaHeaderName);
configs.put(KafkaAvroSerializer.STORE_SCHEMA_VERSION_ID_IN_HEADER, "true");
configs.put(AbstractAvroSnapshotDeserializer.SPECIFIC_AVRO_READER, true);
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
AvroSerDesHandler handler = new DefaultAvroSerDesHandler();
handler.handlePayloadSerialization(outputStream, record);
for (Boolean isKey : Arrays.asList(true, false)) {
KafkaAvroSerde serde = new KafkaAvroSerde(schemaRegistryClient);
final Serializer<Object> serializer = serde.serializer();
serializer.configure(configs, isKey);
Headers headers = new RecordHeaders();
final byte[] bytes = serializer.serialize(topic, headers, record);
Assert.assertArrayEquals(outputStream.toByteArray(), bytes);
Assert.assertEquals(isKey, headers.lastHeader(customKeySchemaHeaderName) != null);
Assert.assertEquals(!isKey, headers.lastHeader(customValueSchemaHeaderName) != null);
final Deserializer<Object> deserializer = serde.deserializer();
deserializer.configure(configs, isKey);
final TestRecord actual = (TestRecord) deserializer.deserialize(topic, headers, bytes);
Assert.assertEquals(record, actual);
}
}
示例12
public MessageSplitterImpl(int maxSegmentSize,
Serializer<LargeMessageSegment> segmentSerializer,
UUIDFactory uuidFactory) {
_maxSegmentSize = maxSegmentSize;
_segmentSerializer = segmentSerializer;
_uuidFactory = uuidFactory;
}
示例13
private static KafkaStreams createWikipediaStreamsInstance(String bootstrapServers) {
final Serializer<JsonNode> jsonSerializer = new JsonSerializer();
final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);
KStreamBuilder builder = new KStreamBuilder();
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wikipedia-streams");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
KStream<JsonNode, JsonNode> wikipediaRaw = builder.stream(jsonSerde, jsonSerde, "wikipedia-raw");
KStream<String, WikipediaMessage> wikipediaParsed =
wikipediaRaw.map(WikipediaMessage::parceIRC)
.filter(WikipediaMessage::filterNonNull)
.through(Serdes.String(), new JsonPOJOSerde<>(WikipediaMessage.class), "wikipedia-parsed");
KTable<String, Long> totalEditsByUser = wikipediaParsed
.filter((key, value) -> value.type == WikipediaMessage.Type.EDIT)
.countByKey(Serdes.String(), "wikipedia-edits-by-user");
//some print
totalEditsByUser.toStream().process(() -> new AbstractProcessor<String, Long>() {
@Override
public void process(String user, Long numEdits) {
System.out.println("USER: " + user + " num.edits: " + numEdits);
}
});
return new KafkaStreams(builder, props);
}
示例14
@Test(expectedExceptions = OffsetNotTrackedException.class)
public void testStartingOffsetWithNormalMessages() throws IOException {
Serializer<String> stringSerializer = new StringSerializer();
Serializer<LargeMessageSegment> segmentSerializer = new DefaultSegmentSerializer();
ConsumerRecordsProcessor<String, String> consumerRecordsProcessor = createConsumerRecordsProcessor();
// Let consumer record 0 be a normal record.
byte[] message0Bytes = stringSerializer.serialize("topic", "message0");
byte[] message0WrappedBytes = wrapMessageBytes(segmentSerializer, message0Bytes);
ConsumerRecord<byte[], byte[]> consumerRecord0 =
new ConsumerRecord<>("topic", 0, 100L, 0L, TimestampType.CREATE_TIME, 0, 0, 0, "key".getBytes(), message0WrappedBytes);
// Construct the consumer records.
List<ConsumerRecord<byte[], byte[]>> recordList = new ArrayList<>();
recordList.add(consumerRecord0);
Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> recordsMap = new HashMap<>();
recordsMap.put(new TopicPartition("topic", 0), recordList);
ConsumerRecords<byte[], byte[]> records = new ConsumerRecords<>(recordsMap);
consumerRecordsProcessor.process(records).consumerRecords();
TopicPartition tp = new TopicPartition("topic", 0);
assertEquals(consumerRecordsProcessor.startingOffset(tp, 100L), 100, "Should return 100 because there are no " +
"large messages in the partition.");
// Should throw exception when an offset cannot be found by the offset tracker.
consumerRecordsProcessor.startingOffset(tp, 0L);
}
示例15
public void produceStrings(int messageCount, Runnable completionCallback,
Supplier<ProducerRecord<String, String>> messageSupplier) {
Serializer<String> keySer = new StringSerializer();
Serializer<String> valSer = new StringSerializer();
String randomId = UUID.randomUUID().toString();
this.produce(randomId, messageCount, keySer, valSer, completionCallback, messageSupplier);
}
示例16
@Test
public void testConfluentAvroDeserializer() throws IOException, RestClientException {
WorkUnitState mockWorkUnitState = getMockWorkUnitState(0L,10L);
mockWorkUnitState.setProp("schema.registry.url", TEST_URL);
Schema schema = SchemaBuilder.record(TEST_RECORD_NAME)
.namespace(TEST_NAMESPACE).fields()
.name(TEST_FIELD_NAME).type().stringType().noDefault()
.endRecord();
GenericRecord testGenericRecord = new GenericRecordBuilder(schema).set(TEST_FIELD_NAME, "testValue").build();
SchemaRegistryClient mockSchemaRegistryClient = mock(SchemaRegistryClient.class);
when(mockSchemaRegistryClient.getByID(any(Integer.class))).thenReturn(schema);
Serializer<Object> kafkaEncoder = new KafkaAvroSerializer(mockSchemaRegistryClient);
Deserializer<Object> kafkaDecoder = new KafkaAvroDeserializer(mockSchemaRegistryClient);
ByteBuffer testGenericRecordByteBuffer =
ByteBuffer.wrap(kafkaEncoder.serialize(TEST_TOPIC_NAME, testGenericRecord));
KafkaSchemaRegistry<Integer, Schema> mockKafkaSchemaRegistry = mock(KafkaSchemaRegistry.class);
KafkaDeserializerExtractor kafkaDecoderExtractor =
new KafkaDeserializerExtractor(mockWorkUnitState,
Optional.fromNullable(Deserializers.CONFLUENT_AVRO), kafkaDecoder, mockKafkaSchemaRegistry);
ByteArrayBasedKafkaRecord mockMessageAndOffset = getMockMessageAndOffset(testGenericRecordByteBuffer);
Assert.assertEquals(kafkaDecoderExtractor.decodeRecord(mockMessageAndOffset), testGenericRecord);
}
示例17
/**
* {@inheritDoc}
*/
@Override
public <K, V> SerializerPair<K, V> build(Serializer<K> keySerializer, Serializer<V> valueSerializer) {
Serializer<K> newKeySerializer = new CryptoAwareSerializerWrapper<K>(keySerializer, keyReferenceExtractor, null);
Serializer<V> newvalueSerializer = new CryptoSerializer<>(valueSerializer, encryptor, null);
return new SerializerPair<>(newKeySerializer, newvalueSerializer);
}
示例18
/**
* Create a producer that can write to this broker
*
* @param keySerializer Key serializer class
* @param valueSerializer Valuer serializer class
* @param overrideConfig Producer config to override. Pass null if there aren't any.
* @param <K> Type of Key
* @param <V> Type of Value
* @return KafkaProducer
*/
public <K, V> KafkaProducer<K, V> createProducer(Serializer<K> keySerializer, Serializer<V> valueSerializer,
Properties overrideConfig) {
Properties conf = producerConfig();
if (overrideConfig != null) {
conf.putAll(overrideConfig);
}
keySerializer.configure(Maps.fromProperties(conf), true);
valueSerializer.configure(Maps.fromProperties(conf), false);
return new KafkaProducer<>(conf, keySerializer, valueSerializer);
}
示例19
@Override
protected Serializer<GenericRow> getSerializer(
Schema avroSchema,
org.apache.kafka.connect.data.Schema kafkaSchema,
String topicName
) {
return new KsqlDelimitedSerializer(kafkaSchema);
}
示例20
@Override
protected Serializer<GenericRow> getSerializer(
Schema avroSchema,
org.apache.kafka.connect.data.Schema kafkaSchema,
String topicName
) {
return new KsqlJsonSerializer(kafkaSchema);
}
示例21
@SuppressWarnings("rawtypes")
@Override
protected Class<? extends Serializer> serializerClass() {
return AvroKafkaSerializer.class;
}
示例22
public SchemalessConverter(Serializer<T> serializer, Deserializer<T> deserializer) {
setSerializer(serializer);
setDeserializer(deserializer);
}
示例23
protected Class<? extends Serializer> serializerClass() {
return Serializer.class;
}
示例24
public static <K, V> KafkaWriteStreamImpl<K, V> create(Vertx vertx, Map<String, Object> config, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
return new KafkaWriteStreamImpl<>(vertx.getOrCreateContext(), new org.apache.kafka.clients.producer.KafkaProducer<>(config, keySerializer, valueSerializer));
}
示例25
@Override
public Serializer<T> serializer() {
return this;
}
示例26
public PulsarKafkaSchema(Serializer<T> serializer, Deserializer<T> deserializer) {
this.kafkaSerializer = serializer;
this.kafkaDeserializer = deserializer;
}
示例27
@Override
public void apply(BiFunctionReq request, StreamObserver<BiFunctionRes> responseObserver) {
String serviceName = request.getServiceName();
AsyncBiFunctionService.WithSerdes<Object, Object, Object> localService;
try {
// need to work in Object domain since this dispatcher is used for multiple local services of
// the same type but possibly different type parameter(s). Type safety is guaranteed nevertheless
// since distributed service is always initialized with a local service in a type-safe fashion
// together with a chosen service name. Service name therefore guarantees correct type parameters.
@SuppressWarnings("unchecked")
AsyncBiFunctionService.WithSerdes<Object, Object, Object> _abfs =
(AsyncBiFunctionService.WithSerdes) localServiceRegistry.get(serviceName);
localService = _abfs;
} catch (Exception e) {
responseObserver.onError(e);
return;
}
Object key = localService.keySerde().deserializer().deserialize(serviceName, request.getKey().isEmpty() ? null : request.getKey().toByteArray());
Object req = localService.reqSerde().deserializer().deserialize(serviceName, request.getReq().isEmpty() ? null : request.getReq().toByteArray());
Serializer<Object> resSerializer = localService.resSerde().serializer();
try {
localService
.apply(key, req)
.whenComplete((res, serviceExc) -> {
if (serviceExc != null) {
responseObserver.onError(serviceExc);
} else {
BiFunctionRes resProto = null;
try {
byte[] resBytes = resSerializer.serialize(serviceName, res);
resProto = BiFunctionRes
.newBuilder()
.setRes(resBytes == null ? ByteString.EMPTY : ByteString.copyFrom(resBytes))
.build();
} catch (Throwable serializeExc) {
responseObserver.onError(serializeExc);
}
if (resProto != null) {
responseObserver.onNext(resProto);
responseObserver.onCompleted();
}
}
});
} catch (Throwable applyExc) {
responseObserver.onError(applyExc);
}
}
示例28
public static <K, V> KafkaProducer<K, V> createShared(Vertx vertx, String name, Map<String, String> config, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
return createShared(vertx, name, () -> KafkaWriteStream.create(vertx, new HashMap<>(config), keySerializer, valueSerializer));
}
示例29
@Override public Serializer<Set<String>> serializer() {
return new SpanNamesSerializer();
}
示例30
@Override public Serializer<DependencyLink> serializer() {
return new DependencyLinkSerializer();
}