Java源码示例:org.apache.kafka.common.serialization.Serializer

示例1
@Override
public Serializer<T> serializer() {
    return new Serializer<T>() {
        @Override
        public void configure(Map<String, ?> configs, boolean isKey) {}

        @Override
        public byte[] serialize(String topic, T data) {
            serialisedObjectCache.putIfAbsent(Tuple2.of(topic, data.hashCode()), data);
            return String.valueOf(data.hashCode()).getBytes(Charset.defaultCharset());
        }

        @Override
        public void close() { }
    };
}
 
示例2
@Test
public void testDelimitedSerialization() {
  List<StructField> fields = Lists.newArrayList(
      DataTypes.createStructField("field1", DataTypes.StringType, true),
      DataTypes.createStructField("field2", DataTypes.IntegerType, true),
      DataTypes.createStructField("field3", DataTypes.BooleanType, true)
  );
  Row row = new RowWithSchema(DataTypes.createStructType(fields), "hello", 1, false);
  
  Map<String, String> configs = Maps.newHashMap();
  configs.put(DelimitedSerializer.FIELD_DELIMITER_CONFIG_NAME, "||");
  Serializer<Row> serializer = new DelimitedSerializer();
  serializer.configure(configs, false);
  
  byte[] serialized = serializer.serialize("test", row);
  serializer.close();
  
  assertEquals(new String(serialized), "hello||1||false");
}
 
示例3
public static <K, V> void edgesToTopic(
    InputStream inputStream,
    Parser<EdgeWithValue<K, V>> edgeParser,
    Serializer<V> valueSerializer,
    Properties props,
    String topic,
    int numPartitions,
    short replicationFactor
) throws IOException {
    ClientUtils.createTopic(topic, numPartitions, replicationFactor, props);
    try (BufferedReader reader =
             new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8));
         Producer<Edge<K>, V> producer = new KafkaProducer<>(props, new KryoSerializer<>(), valueSerializer)) {
        String line;
        while ((line = reader.readLine()) != null) {
            EdgeWithValue<K, V> edge = edgeParser.parse(line);
            log.trace("read edge: ({}, {})", edge.source(), edge.target());
            ProducerRecord<Edge<K>, V> producerRecord =
                new ProducerRecord<>(topic, new Edge<>(edge.source(), edge.target()), edge.value());
            producer.send(producerRecord);
        }
        producer.flush();
    }
}
 
示例4
/**
 * Create a {@link Producer} that is able to write to a topic in Kafka.
 *
 * @param kafkaHostname - The Kafka broker hostname. (not null)
 * @param kafkaPort - The Kafka broker port.
 * @param keySerializerClass - Serializes the keys. (not null)
 * @param valueSerializerClass - Serializes the values. (not null)
 * @return A {@link Producer} that can be used to write records to a topic.
 */
private static <K, V> Producer<K, V> makeProducer(
        final String kafkaHostname,
        final int kakfaPort,
        final Class<? extends Serializer<K>> keySerializerClass,
        final Class<? extends Serializer<V>> valueSerializerClass) {
    requireNonNull(kafkaHostname);
    requireNonNull(keySerializerClass);
    requireNonNull(valueSerializerClass);

    final Properties producerProps = new Properties();
    producerProps.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafkaHostname + ":" + kakfaPort);
    producerProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializerClass.getName());
    producerProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializerClass.getName());
    return new KafkaProducer<>(producerProps);
}
 
示例5
public static <K, V> void verticesToTopic(
    InputStream inputStream,
    Parser<VertexWithValue<K, V>> vertexParser,
    Serializer<K> keySerializer,
    Serializer<V> valueSerializer,
    Properties props,
    String topic,
    int numPartitions,
    short replicationFactor
) throws IOException {
    ClientUtils.createTopic(topic, numPartitions, replicationFactor, props);
    try (BufferedReader reader =
             new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8));
         Producer<K, V> producer = new KafkaProducer<>(props, keySerializer, valueSerializer)) {
        String line;
        while ((line = reader.readLine()) != null) {
            VertexWithValue<K, V> vertex = vertexParser.parse(line);
            log.trace("read vertex: {}", vertex.id());
            ProducerRecord<K, V> producerRecord =
                new ProducerRecord<>(topic, vertex.id(), vertex.value());
            producer.send(producerRecord);
        }
        producer.flush();
    }
}
 
示例6
@Test
public void testSerde() {
  Serializer<String> stringSerializer = new StringSerializer();
  Deserializer<String> stringDeserializer = new StringDeserializer();
  Serializer<LargeMessageSegment> segmentSerializer = new DefaultSegmentSerializer();
  Deserializer<LargeMessageSegment> segmentDeserializer = new DefaultSegmentDeserializer();

  String s = LiKafkaClientsTestUtils.getRandomString(100);
  assertEquals(s.length(), 100);
  byte[] stringBytes = stringSerializer.serialize("topic", s);
  assertEquals(stringBytes.length, 100);
  LargeMessageSegment segment =
      new LargeMessageSegment(LiKafkaClientsUtils.randomUUID(), 0, 2, stringBytes.length, ByteBuffer.wrap(stringBytes));
  // String bytes + segment header
  byte[] serializedSegment = segmentSerializer.serialize("topic", segment);
  assertEquals(serializedSegment.length, 1 + stringBytes.length + LargeMessageSegment.SEGMENT_INFO_OVERHEAD + 4);

  LargeMessageSegment deserializedSegment = segmentDeserializer.deserialize("topic", serializedSegment);
  assertEquals(deserializedSegment.messageId, segment.messageId);
  assertEquals(deserializedSegment.messageSizeInBytes, segment.messageSizeInBytes);
  assertEquals(deserializedSegment.numberOfSegments, segment.numberOfSegments);
  assertEquals(deserializedSegment.sequenceNumber, segment.sequenceNumber);
  assertEquals(deserializedSegment.payload.limit(), 100);
  String deserializedString = stringDeserializer.deserialize("topic", deserializedSegment.payloadArray());
  assertEquals(deserializedString.length(), s.length());
}
 
示例7
@Override
public Serializer<T> serializer() {
    return new Serializer<T>() {

        @Override
        public void configure(Map<String, ?> configs, boolean isKey) {

        }

        @Override
        public byte[] serialize(String topic, T data) {
            try {
                return mapper.writeValueAsBytes(data);
            } catch (Exception e) {
                throw new SerializationException("Error serializing JSON message", e);
            }
        }

        @Override
        public void close() {

        }
    };

}
 
示例8
static Deserializer getDeserializer(Serializer serializer) {
    if (serializer instanceof StringSerializer) {
        return new StringDeserializer();
    } else if (serializer instanceof LongSerializer) {
        return new LongDeserializer();
    } else if (serializer instanceof IntegerSerializer) {
        return new IntegerDeserializer();
    } else if (serializer instanceof DoubleSerializer) {
        return new DoubleDeserializer();
    } else if (serializer instanceof BytesSerializer) {
        return new BytesDeserializer();
    } else if (serializer instanceof ByteBufferSerializer) {
        return new ByteBufferDeserializer();
    } else if (serializer instanceof ByteArraySerializer) {
        return new ByteArrayDeserializer();
    } else {
        throw new IllegalArgumentException(serializer.getClass().getName() + " is not a valid or supported subclass of org.apache.kafka.common.serialization.Serializer.");
    }
}
 
示例9
private <T> void testSerializer(Class<T> type, T val) {
  final Serde<T> serde = VertxSerdes.serdeFrom(type);
  final Deserializer<T> deserializer = serde.deserializer();
  final Serializer<T> serializer = serde.serializer();

  assertEquals("Should get the original value after serialization and deserialization",
    val, deserializer.deserialize(topic, serializer.serialize(topic, val)));

  assertEquals("Should support null in serialization and deserialization",
    null, deserializer.deserialize(topic, serializer.serialize(topic, null)));
}
 
示例10
@Override
public Serializer<TopFiveSongs> serializer() {

	return new Serializer<TopFiveSongs>() {
		@Override
		public void configure(final Map<String, ?> map, final boolean b) {
		}

		@Override
		public byte[] serialize(final String s, final TopFiveSongs topFiveSongs) {

			final ByteArrayOutputStream out = new ByteArrayOutputStream();
			final DataOutputStream
					dataOutputStream =
					new DataOutputStream(out);
			try {
				for (SongPlayCount songPlayCount : topFiveSongs) {
					dataOutputStream.writeLong(songPlayCount.getSongId());
					dataOutputStream.writeLong(songPlayCount.getPlays());
				}
				dataOutputStream.flush();
			} catch (IOException e) {
				throw new RuntimeException(e);
			}
			return out.toByteArray();
		}
	};
}
 
示例11
private void testSchemaHeaderNames(String customKeySchemaHeaderName,
                                   String customValueSchemaHeaderName) {
    TestRecord record = new TestRecord();
    record.setField1("Hello");
    record.setField2("World");

    Map<String, Object> configs = new HashMap<>();
    configs.put(KafkaAvroSerde.KEY_SCHEMA_VERSION_ID_HEADER_NAME, customKeySchemaHeaderName);
    configs.put(KafkaAvroSerde.VALUE_SCHEMA_VERSION_ID_HEADER_NAME, customValueSchemaHeaderName);
    configs.put(KafkaAvroSerializer.STORE_SCHEMA_VERSION_ID_IN_HEADER, "true");
    configs.put(AbstractAvroSnapshotDeserializer.SPECIFIC_AVRO_READER, true);

    ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
    AvroSerDesHandler handler = new DefaultAvroSerDesHandler();
    handler.handlePayloadSerialization(outputStream, record);

    for (Boolean isKey : Arrays.asList(true, false)) {
        KafkaAvroSerde serde = new KafkaAvroSerde(schemaRegistryClient);
        final Serializer<Object> serializer = serde.serializer();
        serializer.configure(configs, isKey);

        Headers headers = new RecordHeaders();
        final byte[] bytes = serializer.serialize(topic, headers, record);
        Assert.assertArrayEquals(outputStream.toByteArray(), bytes);
        Assert.assertEquals(isKey, headers.lastHeader(customKeySchemaHeaderName) != null);
        Assert.assertEquals(!isKey, headers.lastHeader(customValueSchemaHeaderName) != null);

        final Deserializer<Object> deserializer = serde.deserializer();
        deserializer.configure(configs, isKey);
        final TestRecord actual = (TestRecord) deserializer.deserialize(topic, headers, bytes);
        Assert.assertEquals(record, actual);
    }
}
 
示例12
public MessageSplitterImpl(int maxSegmentSize,
                           Serializer<LargeMessageSegment> segmentSerializer,
                           UUIDFactory uuidFactory) {
  _maxSegmentSize = maxSegmentSize;
  _segmentSerializer = segmentSerializer;
  _uuidFactory = uuidFactory;
}
 
示例13
private static KafkaStreams createWikipediaStreamsInstance(String bootstrapServers) {
    final Serializer<JsonNode> jsonSerializer = new JsonSerializer();
    final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
    final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);

    KStreamBuilder builder = new KStreamBuilder();
    Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wikipedia-streams");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);


    KStream<JsonNode, JsonNode> wikipediaRaw = builder.stream(jsonSerde, jsonSerde, "wikipedia-raw");

    KStream<String, WikipediaMessage> wikipediaParsed =
            wikipediaRaw.map(WikipediaMessage::parceIRC)
                    .filter(WikipediaMessage::filterNonNull)
                    .through(Serdes.String(), new JsonPOJOSerde<>(WikipediaMessage.class), "wikipedia-parsed");

    KTable<String, Long> totalEditsByUser = wikipediaParsed
            .filter((key, value) -> value.type == WikipediaMessage.Type.EDIT)
            .countByKey(Serdes.String(), "wikipedia-edits-by-user");

    //some print
    totalEditsByUser.toStream().process(() -> new AbstractProcessor<String, Long>() {
        @Override
        public void process(String user, Long numEdits) {
            System.out.println("USER: " + user + " num.edits: " + numEdits);
        }
    });

    return new KafkaStreams(builder, props);

}
 
示例14
@Test(expectedExceptions = OffsetNotTrackedException.class)
public void testStartingOffsetWithNormalMessages() throws IOException {
  Serializer<String> stringSerializer = new StringSerializer();
  Serializer<LargeMessageSegment> segmentSerializer = new DefaultSegmentSerializer();
  ConsumerRecordsProcessor<String, String> consumerRecordsProcessor = createConsumerRecordsProcessor();

  // Let consumer record 0 be a normal record.
  byte[] message0Bytes = stringSerializer.serialize("topic", "message0");
  byte[] message0WrappedBytes = wrapMessageBytes(segmentSerializer, message0Bytes);
  ConsumerRecord<byte[], byte[]> consumerRecord0 =
      new ConsumerRecord<>("topic", 0, 100L, 0L, TimestampType.CREATE_TIME, 0, 0, 0, "key".getBytes(), message0WrappedBytes);

  // Construct the consumer records.
  List<ConsumerRecord<byte[], byte[]>> recordList = new ArrayList<>();
  recordList.add(consumerRecord0);
  Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> recordsMap = new HashMap<>();
  recordsMap.put(new TopicPartition("topic", 0), recordList);
  ConsumerRecords<byte[], byte[]> records = new ConsumerRecords<>(recordsMap);

  consumerRecordsProcessor.process(records).consumerRecords();

  TopicPartition tp = new TopicPartition("topic", 0);
  assertEquals(consumerRecordsProcessor.startingOffset(tp, 100L), 100, "Should return 100 because there are no " +
      "large messages in the partition.");

  // Should throw exception when an offset cannot be found by the offset tracker.
  consumerRecordsProcessor.startingOffset(tp, 0L);
}
 
示例15
public void produceStrings(int messageCount, Runnable completionCallback,
        Supplier<ProducerRecord<String, String>> messageSupplier) {
    Serializer<String> keySer = new StringSerializer();
    Serializer<String> valSer = new StringSerializer();
    String randomId = UUID.randomUUID().toString();
    this.produce(randomId, messageCount, keySer, valSer, completionCallback, messageSupplier);
}
 
示例16
@Test
public void testConfluentAvroDeserializer() throws IOException, RestClientException {
  WorkUnitState mockWorkUnitState = getMockWorkUnitState(0L,10L);

  mockWorkUnitState.setProp("schema.registry.url", TEST_URL);

  Schema schema = SchemaBuilder.record(TEST_RECORD_NAME)
      .namespace(TEST_NAMESPACE).fields()
      .name(TEST_FIELD_NAME).type().stringType().noDefault()
      .endRecord();

  GenericRecord testGenericRecord = new GenericRecordBuilder(schema).set(TEST_FIELD_NAME, "testValue").build();

  SchemaRegistryClient mockSchemaRegistryClient = mock(SchemaRegistryClient.class);
  when(mockSchemaRegistryClient.getByID(any(Integer.class))).thenReturn(schema);

  Serializer<Object> kafkaEncoder = new KafkaAvroSerializer(mockSchemaRegistryClient);
  Deserializer<Object> kafkaDecoder = new KafkaAvroDeserializer(mockSchemaRegistryClient);

  ByteBuffer testGenericRecordByteBuffer =
      ByteBuffer.wrap(kafkaEncoder.serialize(TEST_TOPIC_NAME, testGenericRecord));

  KafkaSchemaRegistry<Integer, Schema> mockKafkaSchemaRegistry = mock(KafkaSchemaRegistry.class);
  KafkaDeserializerExtractor kafkaDecoderExtractor =
      new KafkaDeserializerExtractor(mockWorkUnitState,
          Optional.fromNullable(Deserializers.CONFLUENT_AVRO), kafkaDecoder, mockKafkaSchemaRegistry);

  ByteArrayBasedKafkaRecord mockMessageAndOffset = getMockMessageAndOffset(testGenericRecordByteBuffer);

  Assert.assertEquals(kafkaDecoderExtractor.decodeRecord(mockMessageAndOffset), testGenericRecord);
}
 
示例17
/**
 * {@inheritDoc}
 */
@Override
public <K, V> SerializerPair<K, V> build(Serializer<K> keySerializer, Serializer<V> valueSerializer) {
    Serializer<K> newKeySerializer = new CryptoAwareSerializerWrapper<K>(keySerializer, keyReferenceExtractor, null);
    Serializer<V> newvalueSerializer = new CryptoSerializer<>(valueSerializer, encryptor, null);
    return new SerializerPair<>(newKeySerializer, newvalueSerializer);
}
 
示例18
/**
 * Create a producer that can write to this broker
 *
 * @param keySerializer   Key serializer class
 * @param valueSerializer Valuer serializer class
 * @param overrideConfig  Producer config to override. Pass null if there aren't any.
 * @param <K>             Type of Key
 * @param <V>             Type of Value
 * @return KafkaProducer
 */
public <K, V> KafkaProducer<K, V> createProducer(Serializer<K> keySerializer, Serializer<V> valueSerializer,
                                                 Properties overrideConfig) {
    Properties conf = producerConfig();
    if (overrideConfig != null) {
        conf.putAll(overrideConfig);
    }
    keySerializer.configure(Maps.fromProperties(conf), true);
    valueSerializer.configure(Maps.fromProperties(conf), false);
    return new KafkaProducer<>(conf, keySerializer, valueSerializer);
}
 
示例19
@Override
protected Serializer<GenericRow> getSerializer(
    Schema avroSchema,
    org.apache.kafka.connect.data.Schema kafkaSchema,
    String topicName
) {
  return new KsqlDelimitedSerializer(kafkaSchema);
}
 
示例20
@Override
protected Serializer<GenericRow> getSerializer(
    Schema avroSchema,
    org.apache.kafka.connect.data.Schema kafkaSchema,
    String topicName
) {
  return new KsqlJsonSerializer(kafkaSchema);
}
 
示例21
@SuppressWarnings("rawtypes")
@Override
protected Class<? extends Serializer> serializerClass() {
    return AvroKafkaSerializer.class;
}
 
示例22
public SchemalessConverter(Serializer<T> serializer, Deserializer<T> deserializer) {
    setSerializer(serializer);
    setDeserializer(deserializer);
}
 
示例23
protected Class<? extends Serializer> serializerClass() {
    return Serializer.class;
}
 
示例24
public static <K, V> KafkaWriteStreamImpl<K, V> create(Vertx vertx, Map<String, Object> config, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
  return new KafkaWriteStreamImpl<>(vertx.getOrCreateContext(), new org.apache.kafka.clients.producer.KafkaProducer<>(config, keySerializer, valueSerializer));
}
 
示例25
@Override
public Serializer<T> serializer() {
    return this;
}
 
示例26
public PulsarKafkaSchema(Serializer<T> serializer, Deserializer<T> deserializer) {
    this.kafkaSerializer = serializer;
    this.kafkaDeserializer = deserializer;
}
 
示例27
@Override
public void apply(BiFunctionReq request, StreamObserver<BiFunctionRes> responseObserver) {
    String serviceName = request.getServiceName();
    AsyncBiFunctionService.WithSerdes<Object, Object, Object> localService;
    try {
        // need to work in Object domain since this dispatcher is used for multiple local services of
        // the same type but possibly different type parameter(s). Type safety is guaranteed nevertheless
        // since distributed service is always initialized with a local service in a type-safe fashion
        // together with a chosen service name. Service name therefore guarantees correct type parameters.
        @SuppressWarnings("unchecked")
        AsyncBiFunctionService.WithSerdes<Object, Object, Object> _abfs =
            (AsyncBiFunctionService.WithSerdes) localServiceRegistry.get(serviceName);
        localService = _abfs;
    } catch (Exception e) {
        responseObserver.onError(e);
        return;
    }

    Object key = localService.keySerde().deserializer().deserialize(serviceName, request.getKey().isEmpty() ? null : request.getKey().toByteArray());
    Object req = localService.reqSerde().deserializer().deserialize(serviceName, request.getReq().isEmpty() ? null : request.getReq().toByteArray());
    Serializer<Object> resSerializer = localService.resSerde().serializer();

    try {
        localService
            .apply(key, req)
            .whenComplete((res, serviceExc) -> {
                if (serviceExc != null) {
                    responseObserver.onError(serviceExc);
                } else {
                    BiFunctionRes resProto = null;
                    try {
                        byte[] resBytes = resSerializer.serialize(serviceName, res);
                        resProto = BiFunctionRes
                            .newBuilder()
                            .setRes(resBytes == null ? ByteString.EMPTY : ByteString.copyFrom(resBytes))
                            .build();
                    } catch (Throwable serializeExc) {
                        responseObserver.onError(serializeExc);
                    }
                    if (resProto != null) {
                        responseObserver.onNext(resProto);
                        responseObserver.onCompleted();
                    }
                }
            });
    } catch (Throwable applyExc) {
        responseObserver.onError(applyExc);
    }
}
 
示例28
public static <K, V> KafkaProducer<K, V> createShared(Vertx vertx, String name, Map<String, String> config, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
  return createShared(vertx, name, () -> KafkaWriteStream.create(vertx, new HashMap<>(config), keySerializer, valueSerializer));
}
 
示例29
@Override public Serializer<Set<String>> serializer() {
  return new SpanNamesSerializer();
}
 
示例30
@Override public Serializer<DependencyLink> serializer() {
  return new DependencyLinkSerializer();
}