Java源码示例:org.apache.pulsar.client.api.TypedMessageBuilder
示例1
private void add(E item) {
TypedMessageBuilder<M> messageBuilder = producer.newMessage()
.value(extractValueFn.apply(item));
if (extractKeyFn != null) {
messageBuilder = messageBuilder.key(extractKeyFn.apply(item));
}
if (extractPropertiesFn != null) {
messageBuilder = messageBuilder.properties(extractPropertiesFn.apply(item));
}
if (extractTimestampFn != null) {
messageBuilder.eventTime(extractTimestampFn.apply(item));
}
messageBuilder.sendAsync()
.thenApply(CompletableFuture::completedFuture)
.exceptionally(t -> {
ExceptionUtil.sneakyThrow(t);
return null;
});
}
示例2
@Override
public void invoke(T value, Context context) throws Exception {
checkErroneous();
byte[] serializedValue = schema.serialize(value);
TypedMessageBuilder<byte[]> msgBuilder = producer.newMessage();
if (null != context.timestamp()) {
msgBuilder = msgBuilder.eventTime(context.timestamp());
}
String msgKey = flinkPulsarKeyExtractor.getKey(value);
if (null != msgKey) {
msgBuilder = msgBuilder.key(msgKey);
}
if (flushOnCheckpoint) {
synchronized (pendingRecordsLock) {
pendingRecords++;
}
}
msgBuilder.value(serializedValue)
.properties(this.flinkPulsarPropertiesExtractor.getProperties(value))
.sendAsync()
.thenApply(successCallback)
.exceptionally(failureCallback);
}
示例3
@Override
public TypedMessageBuilder<T> newMessage(Record<T> record) {
if (!record.getPartitionId().isPresent()) {
throw new RuntimeException("PartitionId needs to be specified for every record while in Effectively-once mode");
}
Producer<T> producer = getProducer(
String.format("%s-%s",record.getDestinationTopic().orElse(pulsarSinkConfig.getTopic()), record.getPartitionId().get()),
record.getPartitionId().get(),
record.getDestinationTopic().orElse(pulsarSinkConfig.getTopic()),
record.getSchema()
);
if (record.getSchema() != null) {
return producer.newMessage(record.getSchema());
} else {
return producer.newMessage();
}
}
示例4
@BeforeMethod
public void setup() {
config = new InstanceConfig();
FunctionDetails functionDetails = FunctionDetails.newBuilder()
.setUserConfig("")
.build();
config.setFunctionDetails(functionDetails);
logger = mock(Logger.class);
client = mock(PulsarClientImpl.class);
when(client.newProducer()).thenReturn(new ProducerBuilderImpl(client, Schema.BYTES));
when(client.createProducerAsync(any(ProducerConfigurationData.class), any(), any()))
.thenReturn(CompletableFuture.completedFuture(producer));
when(client.getSchema(anyString())).thenReturn(CompletableFuture.completedFuture(Optional.empty()));
when(producer.sendAsync(anyString())).thenReturn(CompletableFuture.completedFuture(null));
TypedMessageBuilder messageBuilder = spy(new TypedMessageBuilderImpl(mock(ProducerBase.class), Schema.STRING));
doReturn(new CompletableFuture<>()).when(messageBuilder).sendAsync();
when(producer.newMessage()).thenReturn(messageBuilder);
context = new ContextImpl(
config,
logger,
client,
new EnvironmentBasedSecretsProvider(), new CollectorRegistry(), new String[0],
FunctionDetails.ComponentType.FUNCTION, null, null);
context.setCurrentMessageContext((Record<String>) () -> null);
}
示例5
@Override
public Void process(String input, Context context) {
String publishTopic = (String) context.getUserConfigValueOrDefault("publish-topic", "publishtopic");
String output = String.format("%s!", input);
Map<String, String> properties = new HashMap<>();
properties.put("input_topic", context.getCurrentRecord().getTopicName().get());
properties.putAll(context.getCurrentRecord().getProperties());
try {
TypedMessageBuilder messageBuilder = context.newOutputMessage(publishTopic, Schema.STRING).
value(output).properties(properties);
if (context.getCurrentRecord().getKey().isPresent()){
messageBuilder.key(context.getCurrentRecord().getKey().get());
}
messageBuilder.eventTime(System.currentTimeMillis()).sendAsync();
} catch (PulsarClientException e) {
context.getLogger().error(e.toString());
}
return null;
}
示例6
@Override
public void writeLogMessages(List<LogMessage> messages) throws LogStreamWriterException {
long bytesWritten = 0;
List<CompletableFuture<MessageId>> messsageFutures = new ArrayList<>();
long maxPulsarWriteLatency = System.currentTimeMillis();
for (LogMessage m : messages) {
TypedMessageBuilder<byte[]> message = producer.newMessage();
if (m.isSetKey()) {
message.keyBytes(m.getKey());
bytesWritten += m.getKey().length;
}
CompletableFuture<MessageId> sendAsync = message.value(m.getMessage()).sendAsync();
messsageFutures.add(sendAsync);
bytesWritten += m.getMessage().length;
}
try {
producer.flush();
for (CompletableFuture<MessageId> future : messsageFutures) {
future.get();
}
} catch (PulsarClientException | InterruptedException | ExecutionException e) {
OpenTsdbMetricConverter.incr(PULSAR_WRITE_FAILURE, messages.size(), "topic=" + metricTag,
"host=" + HOSTNAME, "logname=" + logName);
throw new LogStreamWriterException("Message delivery failed", e);
}
maxPulsarWriteLatency = System.currentTimeMillis() - maxPulsarWriteLatency;
OpenTsdbMetricConverter.gauge(PULSAR_THROUGHPUT, bytesWritten, "topic=" + metricTag,
"host=" + HOSTNAME, "logname=" + logName);
OpenTsdbMetricConverter.gauge(PULSAR_LATENCY, maxPulsarWriteLatency, "topic=" + metricTag,
"host=" + HOSTNAME, "logname=" + logName);
OpenTsdbMetricConverter.incr(NUM_PULSAR_MESSAGES, messages.size(), "topic=" + metricTag,
"host=" + HOSTNAME, "logname=" + logName);
LOG.info("Completed batch writes to Pulsar topic:" + metricTag + " size:" + messages.size());
}
示例7
@Override
public void invoke(T value, Context context) throws Exception {
checkErroneous();
initializeSendCallback();
TypedMessageBuilder<T> mb;
if (forcedTopic) {
mb = (TypedMessageBuilder<T>) getProducer(defaultTopic).newMessage().value(value);
} else {
byte[] key = topicKeyExtractor.serializeKey(value);
String topic = topicKeyExtractor.getTopic(value);
if (topic == null) {
if (failOnWrite) {
throw new NullPointerException("no topic present in the data.");
}
return;
}
mb = (TypedMessageBuilder<T>) getProducer(topic).newMessage().value(value);
if (key != null) {
mb.keyBytes(key);
}
}
if (flushOnCheckpoint) {
synchronized (pendingRecordsLock) {
pendingRecords++;
}
}
mb.sendAsync().whenComplete(sendCallback);
}
示例8
@Override
public CompletableFuture<Void> sendAsync(Optional<String> key, byte[] payload) {
TypedMessageBuilder<byte[]> msgBuilder = producer.newMessage().value(payload);
if (key.isPresent()) {
msgBuilder.key(key.get());
}
return msgBuilder.sendAsync().thenApply(msgId -> null);
}
示例9
private RecordMetadata getRecordMetadata(String topic, TypedMessageBuilder<byte[]> msgBuilder, MessageId messageId,
int size) {
MessageIdImpl msgId = (MessageIdImpl) messageId;
// Combine ledger id and entry id to form offset
long offset = MessageIdUtils.getOffset(msgId);
int partition = msgId.getPartitionIndex();
TopicPartition tp = new TopicPartition(topic, partition);
TypedMessageBuilderImpl<byte[]> mb = (TypedMessageBuilderImpl<byte[]>) msgBuilder;
return new RecordMetadata(tp, offset, 0L, mb.getPublishTime(), 0L, mb.hasKey() ? mb.getKey().length() : 0, size);
}
示例10
void produce(int messages, TypedMessageBuilder<byte[]> messageBuilder) throws Exception {
log.info("Start sending messages");
for (int i = 0; i < messages; i++) {
final String m = new String("test-" + i);
messageBuilder.value(m.getBytes()).send();
log.info("Sent message {}", m);
}
}
示例11
@Test
public void testReplicatedCluster() throws Exception {
log.info("--- Starting ReplicatorTest::testReplicatedCluster ---");
final String namespace = "pulsar/global/repl";
final String topicName = String.format("persistent://%s/topic1-%d", namespace, System.currentTimeMillis());
admin1.namespaces().createNamespace(namespace);
admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2", "r3"));
admin1.topics().createPartitionedTopic(topicName, 4);
PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString()).statsInterval(0, TimeUnit.SECONDS)
.build();
PulsarClient client2 = PulsarClient.builder().serviceUrl(url2.toString()).statsInterval(0, TimeUnit.SECONDS)
.build();
Producer<byte[]> producer1 = client1.newProducer().topic(topicName).create();
org.apache.pulsar.client.api.Consumer<byte[]> consumer1 = client1.newConsumer().topic(topicName).subscriptionName("s1").subscribe();
org.apache.pulsar.client.api.Consumer<byte[]> consumer2 = client2.newConsumer().topic(topicName).subscriptionName("s1").subscribe();
byte[] value = "test".getBytes();
// publish message local only
TypedMessageBuilder<byte[]> msg = producer1.newMessage().replicationClusters(Lists.newArrayList("r1")).value(value);
msg.send();
assertEquals(consumer1.receive().getValue(), value);
Message<byte[]> msg2 = consumer2.receive(1, TimeUnit.SECONDS);
if (msg2 != null) {
fail("msg should have not been replicated to remote cluster");
}
consumer1.close();
consumer2.close();
producer1.close();
}
示例12
public void send(final byte[] msg) {
if (producer != null) {
String newKey = null;
if(key != null && key.contains("${")) {
newKey = getLoggerContext().getConfiguration().getStrSubstitutor().replace(key);
} else if (key != null) {
newKey = key;
}
TypedMessageBuilder<byte[]> messageBuilder = producer.newMessage()
.value(msg);
if (newKey != null) {
messageBuilder.key(newKey);
}
if (syncSend) {
try {
messageBuilder.send();
} catch (PulsarClientException e) {
LOGGER.error("Unable to write to Pulsar in appender [" + getName() + "]", e);
}
} else {
messageBuilder.sendAsync()
.exceptionally(cause -> {
LOGGER.error("Unable to write to Pulsar in appender [" + getName() + "]", cause);
return null;
});
}
}
}
示例13
@Override
public TypedMessageBuilder<T> newMessage(Record<T> record) {
if (record.getSchema() != null) {
return getProducer(record
.getDestinationTopic()
.orElse(pulsarSinkConfig.getTopic()), record.getSchema())
.newMessage(record.getSchema());
} else {
return getProducer(record
.getDestinationTopic()
.orElse(pulsarSinkConfig.getTopic()), record.getSchema())
.newMessage();
}
}
示例14
@Override
public void sendOutputMessage(TypedMessageBuilder<T> msg, Record<T> record) {
if (!record.getRecordSequence().isPresent()) {
throw new RuntimeException("RecordSequence needs to be specified for every record while in Effectively-once mode");
}
// assign sequence id to output message for idempotent producing
msg.sequenceId(record.getRecordSequence().get());
CompletableFuture<MessageId> future = msg.sendAsync();
future.thenAccept(messageId -> record.ack()).exceptionally(getPublishErrorHandler(record, true));
}
示例15
@Override
public void write(Record<T> record) {
TypedMessageBuilder<T> msg = pulsarSinkProcessor.newMessage(record);
if (record.getKey().isPresent() && !(record.getSchema() instanceof KeyValueSchema &&
((KeyValueSchema) record.getSchema()).getKeyValueEncodingType() == KeyValueEncodingType.SEPARATED)) {
msg.key(record.getKey().get());
}
msg.value(record.getValue());
if (!record.getProperties().isEmpty() && pulsarSinkConfig.isForwardSourceMessageProperty()) {
msg.properties(record.getProperties());
}
SinkRecord<T> sinkRecord = (SinkRecord<T>) record;
if (sinkRecord.getSourceRecord() instanceof PulsarRecord) {
PulsarRecord<T> pulsarRecord = (PulsarRecord<T>) sinkRecord.getSourceRecord();
// forward user properties to sink-topic
msg.property("__pfn_input_topic__", pulsarRecord.getTopicName().get())
.property("__pfn_input_msg_id__",
new String(Base64.getEncoder().encode(pulsarRecord.getMessageId().toByteArray())));
} else {
// It is coming from some source
Optional<Long> eventTime = sinkRecord.getSourceRecord().getEventTime();
eventTime.ifPresent(msg::eventTime);
}
pulsarSinkProcessor.sendOutputMessage(msg, record);
}
示例16
@Test
public void testExecuteWithLateTupleStream() throws Exception {
windowConfig.setLateDataTopic("$late");
doReturn(Optional.of(new Gson().fromJson(new Gson().toJson(windowConfig), Map.class)))
.when(context).getUserConfigValue(WindowConfig.WINDOW_CONFIG_KEY);
TypedMessageBuilder typedMessageBuilder = mock(TypedMessageBuilder.class);
when(typedMessageBuilder.value(any())).thenReturn(typedMessageBuilder);
when(typedMessageBuilder.sendAsync()).thenReturn(CompletableFuture.anyOf());
when(context.newOutputMessage(anyString(), any())).thenReturn(typedMessageBuilder);
long[] timestamps = {603, 605, 607, 618, 626, 636, 600};
List<Long> events = new ArrayList<>(timestamps.length);
for (long ts : timestamps) {
events.add(ts);
Record<?> record = mock(Record.class);
doReturn(Optional.of("test-topic")).when(record).getTopicName();
doReturn(record).when(context).getCurrentRecord();
doReturn(ts).when(record).getValue();
testWindowedPulsarFunction.process(ts, context);
//Update the watermark to this timestamp
testWindowedPulsarFunction.waterMarkEventGenerator.run();
}
System.out.println(testWindowedPulsarFunction.windows);
long event = events.get(events.size() - 1);
}
示例17
@Override
public TypedMessageBuilder<byte[]> toMessage(TypedMessageBuilder<byte[]> msgBuilder, Tuple tuple) {
String receivedMessage = tuple.getString(0);
// message processing
String processedMsg = receivedMessage + "-processed";
return msgBuilder.value(processedMsg.getBytes());
}
示例18
@Override
public TypedMessageBuilder<byte[]> toMessage(TypedMessageBuilder<byte[]> msgBuilder, Tuple tuple) {
if ("message to be dropped".equals(new String(tuple.getBinary(0)))) {
return null;
}
if ("throw exception".equals(new String(tuple.getBinary(0)))) {
throw new RuntimeException();
}
return msgBuilder.value(tuple.getBinary(0));
}
示例19
/**
* Set the value on a message builder to prepare the message to be published from the Bolt.
*
* @param tuple
* @return
*/
default TypedMessageBuilder<byte[]> toMessage(TypedMessageBuilder<byte[]> msgBuilder, Tuple tuple) {
// Default implementation provided for backward compatibility
Message<byte[]> msg = toMessage(tuple);
msgBuilder.value(msg.getData())
.properties(msg.getProperties());
if (msg.hasKey()) {
msgBuilder.key(msg.getKey());
}
return msgBuilder;
}
示例20
public TypedMessageBuilder<T> newMessage(Transaction txn) {
checkArgument(txn instanceof TransactionImpl);
// check the producer has proper settings to send transactional messages
if (conf.getSendTimeoutMs() > 0) {
throw new IllegalArgumentException("Only producers disabled sendTimeout are allowed to"
+ " produce transactional messages");
}
return new TypedMessageBuilderImpl<>(this, schema, (TransactionImpl) txn);
}
示例21
@Override
public TypedMessageBuilder<T> keyBytes(byte[] key) {
if (schema.getSchemaInfo().getType() == SchemaType.KEY_VALUE) {
KeyValueSchema kvSchema = (KeyValueSchema) schema;
checkArgument(!(kvSchema.getKeyValueEncodingType() == KeyValueEncodingType.SEPARATED),
"This method is not allowed to set keys when in encoding type is SEPARATED");
if (key == null) {
msgMetadataBuilder.setNullPartitionKey(true);
return this;
}
}
msgMetadataBuilder.setPartitionKey(Base64.getEncoder().encodeToString(key));
msgMetadataBuilder.setPartitionKeyB64Encoded(true);
return this;
}
示例22
@Override
public TypedMessageBuilder<T> property(String name, String value) {
checkArgument(name != null, "Need Non-Null name");
checkArgument(value != null, "Need Non-Null value for name: " + name);
msgMetadataBuilder.addProperties(KeyValue.newBuilder().setKey(name).setValue(value).build());
return this;
}
示例23
@Override
public TypedMessageBuilder<T> replicationClusters(List<String> clusters) {
Preconditions.checkNotNull(clusters);
msgMetadataBuilder.clearReplicateTo();
msgMetadataBuilder.addAllReplicateTo(clusters);
return this;
}
示例24
@SuppressWarnings("unchecked")
@Override
public TypedMessageBuilder<T> loadConf(Map<String, Object> config) {
config.forEach((key, value) -> {
if (key.equals(CONF_KEY)) {
this.key(checkType(value, String.class));
} else if (key.equals(CONF_PROPERTIES)) {
this.properties(checkType(value, Map.class));
} else if (key.equals(CONF_EVENT_TIME)) {
this.eventTime(checkType(value, Long.class));
} else if (key.equals(CONF_SEQUENCE_ID)) {
this.sequenceId(checkType(value, Long.class));
} else if (key.equals(CONF_REPLICATION_CLUSTERS)) {
this.replicationClusters(checkType(value, List.class));
} else if (key.equals(CONF_DISABLE_REPLICATION)) {
boolean disableReplication = checkType(value, Boolean.class);
if (disableReplication) {
this.disableReplication();
}
} else if (key.equals(CONF_DELIVERY_AFTER_SECONDS)) {
this.deliverAfter(checkType(value, Long.class), TimeUnit.SECONDS);
} else if (key.equals(CONF_DELIVERY_AT)) {
this.deliverAt(checkType(value, Long.class));
} else {
throw new RuntimeException("Invalid message config key '" + key + "'");
}
});
return this;
}
示例25
@Override
public TypedMessageBuilder<byte[]> disableReplication() {
return this;
}
示例26
@Override
public TypedMessageBuilder<byte[]> eventTime(long arg0) {
return this;
}
示例27
@Override
public TypedMessageBuilder<byte[]> key(String arg0) {
return this;
}
示例28
@Override
public TypedMessageBuilder<byte[]> keyBytes(byte[] arg0) {
return this;
}
示例29
@Override
public TypedMessageBuilder<byte[]> properties(Map<String, String> arg0) {
return this;
}
示例30
@Override
public TypedMessageBuilder<byte[]> property(String arg0, String arg1) {
return this;
}