Java源码示例:org.apache.pulsar.client.api.TypedMessageBuilder

示例1
private void add(E item) {
    TypedMessageBuilder<M> messageBuilder = producer.newMessage()
                                                    .value(extractValueFn.apply(item));
    if (extractKeyFn != null) {
        messageBuilder = messageBuilder.key(extractKeyFn.apply(item));
    }
    if (extractPropertiesFn != null) {
        messageBuilder = messageBuilder.properties(extractPropertiesFn.apply(item));
    }
    if (extractTimestampFn != null) {
        messageBuilder.eventTime(extractTimestampFn.apply(item));
    }
    messageBuilder.sendAsync()
                  .thenApply(CompletableFuture::completedFuture)
                  .exceptionally(t -> {
                      ExceptionUtil.sneakyThrow(t);
                      return null;
                  });
}
 
示例2
@Override
public void invoke(T value, Context context) throws Exception {
    checkErroneous();

    byte[] serializedValue = schema.serialize(value);

    TypedMessageBuilder<byte[]> msgBuilder = producer.newMessage();
    if (null != context.timestamp()) {
        msgBuilder = msgBuilder.eventTime(context.timestamp());
    }
    String msgKey = flinkPulsarKeyExtractor.getKey(value);
    if (null != msgKey) {
        msgBuilder = msgBuilder.key(msgKey);
    }

    if (flushOnCheckpoint) {
        synchronized (pendingRecordsLock) {
            pendingRecords++;
        }
    }
    msgBuilder.value(serializedValue)
            .properties(this.flinkPulsarPropertiesExtractor.getProperties(value))
            .sendAsync()
            .thenApply(successCallback)
            .exceptionally(failureCallback);
}
 
示例3
@Override
public TypedMessageBuilder<T> newMessage(Record<T> record) {
    if (!record.getPartitionId().isPresent()) {
        throw new RuntimeException("PartitionId needs to be specified for every record while in Effectively-once mode");
    }

    Producer<T> producer = getProducer(
            String.format("%s-%s",record.getDestinationTopic().orElse(pulsarSinkConfig.getTopic()), record.getPartitionId().get()),
            record.getPartitionId().get(),
            record.getDestinationTopic().orElse(pulsarSinkConfig.getTopic()),
            record.getSchema()
    );
    if (record.getSchema() != null) {
        return producer.newMessage(record.getSchema());
    } else {
        return producer.newMessage();
    }
}
 
示例4
@BeforeMethod
public void setup() {
    config = new InstanceConfig();
    FunctionDetails functionDetails = FunctionDetails.newBuilder()
        .setUserConfig("")
        .build();
    config.setFunctionDetails(functionDetails);
    logger = mock(Logger.class);
    client = mock(PulsarClientImpl.class);
    when(client.newProducer()).thenReturn(new ProducerBuilderImpl(client, Schema.BYTES));
    when(client.createProducerAsync(any(ProducerConfigurationData.class), any(), any()))
            .thenReturn(CompletableFuture.completedFuture(producer));
    when(client.getSchema(anyString())).thenReturn(CompletableFuture.completedFuture(Optional.empty()));
    when(producer.sendAsync(anyString())).thenReturn(CompletableFuture.completedFuture(null));

    TypedMessageBuilder messageBuilder = spy(new TypedMessageBuilderImpl(mock(ProducerBase.class), Schema.STRING));
    doReturn(new CompletableFuture<>()).when(messageBuilder).sendAsync();
    when(producer.newMessage()).thenReturn(messageBuilder);
    context = new ContextImpl(
        config,
        logger,
        client,
        new EnvironmentBasedSecretsProvider(), new CollectorRegistry(), new String[0],
            FunctionDetails.ComponentType.FUNCTION, null, null);
    context.setCurrentMessageContext((Record<String>) () -> null);
}
 
示例5
@Override
public Void process(String input, Context context) {
    String publishTopic = (String) context.getUserConfigValueOrDefault("publish-topic", "publishtopic");
    String output = String.format("%s!", input);

    Map<String, String> properties = new HashMap<>();
    properties.put("input_topic", context.getCurrentRecord().getTopicName().get());
    properties.putAll(context.getCurrentRecord().getProperties());

    try {
        TypedMessageBuilder messageBuilder = context.newOutputMessage(publishTopic, Schema.STRING).
                value(output).properties(properties);
        if (context.getCurrentRecord().getKey().isPresent()){
            messageBuilder.key(context.getCurrentRecord().getKey().get());
        }
        messageBuilder.eventTime(System.currentTimeMillis()).sendAsync();
    } catch (PulsarClientException e) {
        context.getLogger().error(e.toString());
    }
    return null;
}
 
示例6
@Override
public void writeLogMessages(List<LogMessage> messages) throws LogStreamWriterException {
  long bytesWritten = 0;
  List<CompletableFuture<MessageId>> messsageFutures = new ArrayList<>();
  long maxPulsarWriteLatency = System.currentTimeMillis();
  for (LogMessage m : messages) {
    TypedMessageBuilder<byte[]> message = producer.newMessage();
    if (m.isSetKey()) {
      message.keyBytes(m.getKey());
      bytesWritten += m.getKey().length;
    }
    CompletableFuture<MessageId> sendAsync = message.value(m.getMessage()).sendAsync();
    messsageFutures.add(sendAsync);
    bytesWritten += m.getMessage().length;
  }
  try {
    producer.flush();
    for (CompletableFuture<MessageId> future : messsageFutures) {
      future.get();
    }
  } catch (PulsarClientException | InterruptedException | ExecutionException e) {
    OpenTsdbMetricConverter.incr(PULSAR_WRITE_FAILURE, messages.size(), "topic=" + metricTag,
        "host=" + HOSTNAME, "logname=" + logName);
    throw new LogStreamWriterException("Message delivery failed", e);
  }

  maxPulsarWriteLatency = System.currentTimeMillis() - maxPulsarWriteLatency;
  OpenTsdbMetricConverter.gauge(PULSAR_THROUGHPUT, bytesWritten, "topic=" + metricTag,
      "host=" + HOSTNAME, "logname=" + logName);
  OpenTsdbMetricConverter.gauge(PULSAR_LATENCY, maxPulsarWriteLatency, "topic=" + metricTag,
      "host=" + HOSTNAME, "logname=" + logName);
  OpenTsdbMetricConverter.incr(NUM_PULSAR_MESSAGES, messages.size(), "topic=" + metricTag,
      "host=" + HOSTNAME, "logname=" + logName);
  LOG.info("Completed batch writes to Pulsar topic:" + metricTag + " size:" + messages.size());

}
 
示例7
@Override
public void invoke(T value, Context context) throws Exception {
    checkErroneous();
    initializeSendCallback();

    TypedMessageBuilder<T> mb;

    if (forcedTopic) {
        mb = (TypedMessageBuilder<T>) getProducer(defaultTopic).newMessage().value(value);
    } else {
        byte[] key = topicKeyExtractor.serializeKey(value);
        String topic = topicKeyExtractor.getTopic(value);

        if (topic == null) {
            if (failOnWrite) {
                throw new NullPointerException("no topic present in the data.");
            }
            return;
        }

        mb = (TypedMessageBuilder<T>) getProducer(topic).newMessage().value(value);
        if (key != null) {
            mb.keyBytes(key);
        }
    }

    if (flushOnCheckpoint) {
        synchronized (pendingRecordsLock) {
            pendingRecords++;
        }
    }
    mb.sendAsync().whenComplete(sendCallback);
}
 
示例8
@Override
public CompletableFuture<Void> sendAsync(Optional<String> key, byte[] payload) {
    TypedMessageBuilder<byte[]> msgBuilder = producer.newMessage().value(payload);
    if (key.isPresent()) {
        msgBuilder.key(key.get());
    }

    return msgBuilder.sendAsync().thenApply(msgId -> null);
}
 
示例9
private RecordMetadata getRecordMetadata(String topic, TypedMessageBuilder<byte[]> msgBuilder, MessageId messageId,
        int size) {
    MessageIdImpl msgId = (MessageIdImpl) messageId;

    // Combine ledger id and entry id to form offset
    long offset = MessageIdUtils.getOffset(msgId);
    int partition = msgId.getPartitionIndex();

    TopicPartition tp = new TopicPartition(topic, partition);
    TypedMessageBuilderImpl<byte[]> mb = (TypedMessageBuilderImpl<byte[]>) msgBuilder;
    return new RecordMetadata(tp, offset, 0L, mb.getPublishTime(), 0L, mb.hasKey() ? mb.getKey().length() : 0, size);
}
 
示例10
void produce(int messages, TypedMessageBuilder<byte[]> messageBuilder) throws Exception {
    log.info("Start sending messages");
    for (int i = 0; i < messages; i++) {
        final String m = new String("test-" + i);
        messageBuilder.value(m.getBytes()).send();
        log.info("Sent message {}", m);
    }
}
 
示例11
@Test
public void testReplicatedCluster() throws Exception {

    log.info("--- Starting ReplicatorTest::testReplicatedCluster ---");

    final String namespace = "pulsar/global/repl";
    final String topicName = String.format("persistent://%s/topic1-%d", namespace, System.currentTimeMillis());
    admin1.namespaces().createNamespace(namespace);
    admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2", "r3"));
    admin1.topics().createPartitionedTopic(topicName, 4);

    PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString()).statsInterval(0, TimeUnit.SECONDS)
            .build();
    PulsarClient client2 = PulsarClient.builder().serviceUrl(url2.toString()).statsInterval(0, TimeUnit.SECONDS)
            .build();

    Producer<byte[]> producer1 = client1.newProducer().topic(topicName).create();
    org.apache.pulsar.client.api.Consumer<byte[]> consumer1 = client1.newConsumer().topic(topicName).subscriptionName("s1").subscribe();
    org.apache.pulsar.client.api.Consumer<byte[]> consumer2 = client2.newConsumer().topic(topicName).subscriptionName("s1").subscribe();
    byte[] value = "test".getBytes();

    // publish message local only
    TypedMessageBuilder<byte[]> msg = producer1.newMessage().replicationClusters(Lists.newArrayList("r1")).value(value);
    msg.send();
    assertEquals(consumer1.receive().getValue(), value);

    Message<byte[]> msg2 = consumer2.receive(1, TimeUnit.SECONDS);
    if (msg2 != null) {
        fail("msg should have not been replicated to remote cluster");
    }

    consumer1.close();
    consumer2.close();
    producer1.close();

}
 
示例12
public void send(final byte[] msg)  {
    if (producer != null) {
        String newKey = null;

        if(key != null && key.contains("${")) {
            newKey = getLoggerContext().getConfiguration().getStrSubstitutor().replace(key);
        } else if (key != null) {
            newKey = key;
        }


        TypedMessageBuilder<byte[]> messageBuilder = producer.newMessage()
                .value(msg);

        if (newKey != null) {
            messageBuilder.key(newKey);
        }

        if (syncSend) {
            try {
                messageBuilder.send();
            } catch (PulsarClientException e) {
                LOGGER.error("Unable to write to Pulsar in appender [" + getName() + "]", e);
            }
        } else {
            messageBuilder.sendAsync()
                .exceptionally(cause -> {
                    LOGGER.error("Unable to write to Pulsar in appender [" + getName() + "]", cause);
                    return null;
                });
        }
    }
}
 
示例13
@Override
public TypedMessageBuilder<T> newMessage(Record<T> record) {
    if (record.getSchema() != null) {
        return getProducer(record
                .getDestinationTopic()
                .orElse(pulsarSinkConfig.getTopic()), record.getSchema())
                .newMessage(record.getSchema());
    } else {
        return getProducer(record
                .getDestinationTopic()
                .orElse(pulsarSinkConfig.getTopic()), record.getSchema())
                .newMessage();
    }
}
 
示例14
@Override
public void sendOutputMessage(TypedMessageBuilder<T> msg, Record<T> record) {

    if (!record.getRecordSequence().isPresent()) {
        throw new RuntimeException("RecordSequence needs to be specified for every record while in Effectively-once mode");
    }

    // assign sequence id to output message for idempotent producing
    msg.sequenceId(record.getRecordSequence().get());
    CompletableFuture<MessageId> future = msg.sendAsync();

    future.thenAccept(messageId -> record.ack()).exceptionally(getPublishErrorHandler(record, true));
}
 
示例15
@Override
public void write(Record<T> record) {
    TypedMessageBuilder<T> msg = pulsarSinkProcessor.newMessage(record);

    if (record.getKey().isPresent() && !(record.getSchema() instanceof KeyValueSchema &&
            ((KeyValueSchema) record.getSchema()).getKeyValueEncodingType() == KeyValueEncodingType.SEPARATED)) {
        msg.key(record.getKey().get());
    }

    msg.value(record.getValue());

    if (!record.getProperties().isEmpty() && pulsarSinkConfig.isForwardSourceMessageProperty()) {
        msg.properties(record.getProperties());
    }

    SinkRecord<T> sinkRecord = (SinkRecord<T>) record;
    if (sinkRecord.getSourceRecord() instanceof PulsarRecord) {
        PulsarRecord<T> pulsarRecord = (PulsarRecord<T>) sinkRecord.getSourceRecord();
        // forward user properties to sink-topic
        msg.property("__pfn_input_topic__", pulsarRecord.getTopicName().get())
           .property("__pfn_input_msg_id__",
                     new String(Base64.getEncoder().encode(pulsarRecord.getMessageId().toByteArray())));
    } else {
        // It is coming from some source
        Optional<Long> eventTime = sinkRecord.getSourceRecord().getEventTime();
        eventTime.ifPresent(msg::eventTime);
    }

    pulsarSinkProcessor.sendOutputMessage(msg, record);
}
 
示例16
@Test
public void testExecuteWithLateTupleStream() throws Exception {

    windowConfig.setLateDataTopic("$late");
    doReturn(Optional.of(new Gson().fromJson(new Gson().toJson(windowConfig), Map.class)))
            .when(context).getUserConfigValue(WindowConfig.WINDOW_CONFIG_KEY);
    TypedMessageBuilder typedMessageBuilder = mock(TypedMessageBuilder.class);
    when(typedMessageBuilder.value(any())).thenReturn(typedMessageBuilder);
    when(typedMessageBuilder.sendAsync()).thenReturn(CompletableFuture.anyOf());
    when(context.newOutputMessage(anyString(), any())).thenReturn(typedMessageBuilder);

    long[] timestamps = {603, 605, 607, 618, 626, 636, 600};
    List<Long> events = new ArrayList<>(timestamps.length);

    for (long ts : timestamps) {
        events.add(ts);
        Record<?> record = mock(Record.class);
        doReturn(Optional.of("test-topic")).when(record).getTopicName();
        doReturn(record).when(context).getCurrentRecord();
        doReturn(ts).when(record).getValue();
        testWindowedPulsarFunction.process(ts, context);

        //Update the watermark to this timestamp
        testWindowedPulsarFunction.waterMarkEventGenerator.run();
    }
    System.out.println(testWindowedPulsarFunction.windows);
    long event = events.get(events.size() - 1);
}
 
示例17
@Override
public TypedMessageBuilder<byte[]> toMessage(TypedMessageBuilder<byte[]> msgBuilder, Tuple tuple) {
    String receivedMessage = tuple.getString(0);
    // message processing
    String processedMsg = receivedMessage + "-processed";
    return msgBuilder.value(processedMsg.getBytes());
}
 
示例18
@Override
public TypedMessageBuilder<byte[]> toMessage(TypedMessageBuilder<byte[]> msgBuilder, Tuple tuple) {
    if ("message to be dropped".equals(new String(tuple.getBinary(0)))) {
        return null;
    }
    if ("throw exception".equals(new String(tuple.getBinary(0)))) {
        throw new RuntimeException();
    }
    return msgBuilder.value(tuple.getBinary(0));
}
 
示例19
/**
 * Set the value on a message builder to prepare the message to be published from the Bolt.
 *
 * @param tuple
 * @return
 */
default TypedMessageBuilder<byte[]> toMessage(TypedMessageBuilder<byte[]> msgBuilder, Tuple tuple) {
    // Default implementation provided for backward compatibility
    Message<byte[]> msg = toMessage(tuple);
    msgBuilder.value(msg.getData())
        .properties(msg.getProperties());
    if (msg.hasKey()) {
        msgBuilder.key(msg.getKey());
    }
    return msgBuilder;
}
 
示例20
public TypedMessageBuilder<T> newMessage(Transaction txn) {
    checkArgument(txn instanceof TransactionImpl);

    // check the producer has proper settings to send transactional messages
    if (conf.getSendTimeoutMs() > 0) {
        throw new IllegalArgumentException("Only producers disabled sendTimeout are allowed to"
            + " produce transactional messages");
    }

    return new TypedMessageBuilderImpl<>(this, schema, (TransactionImpl) txn);
}
 
示例21
@Override
public TypedMessageBuilder<T> keyBytes(byte[] key) {
    if (schema.getSchemaInfo().getType() == SchemaType.KEY_VALUE) {
        KeyValueSchema kvSchema = (KeyValueSchema) schema;
        checkArgument(!(kvSchema.getKeyValueEncodingType() == KeyValueEncodingType.SEPARATED),
                "This method is not allowed to set keys when in encoding type is SEPARATED");
        if (key == null) {
            msgMetadataBuilder.setNullPartitionKey(true);
            return this;
        }
    }
    msgMetadataBuilder.setPartitionKey(Base64.getEncoder().encodeToString(key));
    msgMetadataBuilder.setPartitionKeyB64Encoded(true);
    return this;
}
 
示例22
@Override
public TypedMessageBuilder<T> property(String name, String value) {
    checkArgument(name != null, "Need Non-Null name");
    checkArgument(value != null, "Need Non-Null value for name: " + name);
    msgMetadataBuilder.addProperties(KeyValue.newBuilder().setKey(name).setValue(value).build());
    return this;
}
 
示例23
@Override
public TypedMessageBuilder<T> replicationClusters(List<String> clusters) {
    Preconditions.checkNotNull(clusters);
    msgMetadataBuilder.clearReplicateTo();
    msgMetadataBuilder.addAllReplicateTo(clusters);
    return this;
}
 
示例24
@SuppressWarnings("unchecked")
@Override
public TypedMessageBuilder<T> loadConf(Map<String, Object> config) {
    config.forEach((key, value) -> {
        if (key.equals(CONF_KEY)) {
            this.key(checkType(value, String.class));
        } else if (key.equals(CONF_PROPERTIES)) {
            this.properties(checkType(value, Map.class));
        } else if (key.equals(CONF_EVENT_TIME)) {
            this.eventTime(checkType(value, Long.class));
        } else if (key.equals(CONF_SEQUENCE_ID)) {
            this.sequenceId(checkType(value, Long.class));
        } else if (key.equals(CONF_REPLICATION_CLUSTERS)) {
            this.replicationClusters(checkType(value, List.class));
        } else if (key.equals(CONF_DISABLE_REPLICATION)) {
            boolean disableReplication = checkType(value, Boolean.class);
            if (disableReplication) {
                this.disableReplication();
            }
        } else if (key.equals(CONF_DELIVERY_AFTER_SECONDS)) {
            this.deliverAfter(checkType(value, Long.class), TimeUnit.SECONDS);
        } else if (key.equals(CONF_DELIVERY_AT)) {
            this.deliverAt(checkType(value, Long.class));
        } else {
            throw new RuntimeException("Invalid message config key '" + key + "'");
        }
    });
    return this;
}
 
示例25
@Override
public TypedMessageBuilder<byte[]> disableReplication() {
  return this;
}
 
示例26
@Override
public TypedMessageBuilder<byte[]> eventTime(long arg0) {
  return this;
}
 
示例27
@Override
public TypedMessageBuilder<byte[]> key(String arg0) {
  return this;
}
 
示例28
@Override
public TypedMessageBuilder<byte[]> keyBytes(byte[] arg0) {
  return this;
}
 
示例29
@Override
public TypedMessageBuilder<byte[]> properties(Map<String, String> arg0) {
  return this;
}
 
示例30
@Override
public TypedMessageBuilder<byte[]> property(String arg0, String arg1) {
  return this;
}