Java源码示例:org.apache.pulsar.client.api.Message
示例1
protected void testPulsarFunctionality(String pulsarBrokerUrl) throws Exception {
try (
PulsarClient client = PulsarClient.builder()
.serviceUrl(pulsarBrokerUrl)
.build();
Consumer consumer = client.newConsumer()
.topic(TEST_TOPIC)
.subscriptionName("test-subs")
.subscribe();
Producer<byte[]> producer = client.newProducer()
.topic(TEST_TOPIC)
.create()
) {
producer.send("test containers".getBytes());
CompletableFuture<Message> future = consumer.receiveAsync();
Message message = future.get(5, TimeUnit.SECONDS);
assertThat(new String(message.getData()))
.isEqualTo("test containers");
}
}
示例2
@Test(timeOut = 20000, dataProvider = "lastDeletedBatching")
public void testEmptyCompactionLedger(boolean batching) throws Exception {
String topic = "persistent://my-property/use/my-ns/my-topic1";
Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).enableBatching(batching)
.messageRoutingMode(MessageRoutingMode.SinglePartition).create();
pulsarClient.newConsumer().topic(topic).subscriptionName("sub1").readCompacted(true).subscribe().close();
producer.newMessage().key("1").value("1".getBytes()).send();
producer.newMessage().key("2").value("2".getBytes()).send();
producer.newMessage().key("1").value("".getBytes()).send();
producer.newMessage().key("2").value("".getBytes()).send();
Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
compactor.compact(topic).get();
// consumer with readCompacted enabled only get compacted entries
try (Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
.readCompacted(true).subscribe()) {
Message<byte[]> m = consumer.receive(2, TimeUnit.SECONDS);
assertNull(m);
}
}
示例3
private ConsumerContext(
@Nonnull ILogger logger,
@Nonnull PulsarClient client,
@Nonnull List<String> topics,
@Nonnull Map<String, Object> consumerConfig,
@Nonnull SupplierEx<Schema<M>> schemaSupplier,
@Nonnull SupplierEx<BatchReceivePolicy> batchReceivePolicySupplier,
@Nonnull FunctionEx<Message<M>, T> projectionFn
) throws PulsarClientException {
this.logger = logger;
this.projectionFn = projectionFn;
this.client = client;
this.consumer = client.newConsumer(schemaSupplier.get())
.topics(topics)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.loadConf(consumerConfig)
.batchReceivePolicy(batchReceivePolicySupplier.get())
.subscriptionType(SubscriptionType.Shared)
.subscribe();
}
示例4
/**
* Receive the messages as a batch. The {@link BatchReceivePolicy} is
* configured while creating the Pulsar {@link Consumer}.
* In this method, emitted items are created by applying the projection function
* to the messages received from Pulsar client. If there is an event time
* associated with the message, it sets the event time as the timestamp of the
* emitted item. Otherwise, it sets the publish time(which always exists)
* of the message as the timestamp.
*/
private void fillBuffer(SourceBuilder.TimestampedSourceBuffer<T> sourceBuffer) throws PulsarClientException {
Messages<M> messages = consumer.batchReceive();
for (Message<M> message : messages) {
if (message.getEventTime() != 0) {
sourceBuffer.add(projectionFn.apply(message), message.getEventTime());
} else {
sourceBuffer.add(projectionFn.apply(message), message.getPublishTime());
}
}
consumer.acknowledgeAsync(messages)
.exceptionally(t -> {
logger.warning(buildLogMessage(messages));
return null;
});
}
示例5
@Test
public void testBatchingAwareness() throws Exception {
Message<?> msg = mock(Message.class);
when(msg.getKey()).thenReturn(null);
Clock clock = mock(Clock.class);
RoundRobinPartitionMessageRouterImpl router = new RoundRobinPartitionMessageRouterImpl(
HashingScheme.JavaStringHash, 0, true, 10, clock);
TopicMetadataImpl metadata = new TopicMetadataImpl(100);
// time at `12345*` milliseconds
for (int i = 0; i < 10; i++) {
when(clock.millis()).thenReturn(123450L + i);
assertEquals(45, router.choosePartition(msg, metadata));
}
// time at `12346*` milliseconds
for (int i = 0; i < 10; i++) {
when(clock.millis()).thenReturn(123460L + i);
assertEquals(46, router.choosePartition(msg, metadata));
}
}
示例6
/**
* Receive the messages as a batch.
* In this method, emitted items are created by applying the projection function
* to the messages received from Pulsar client. If there is an event time
* associated with the message, it sets the event time as the timestamp of the
* emitted item. Otherwise, it sets the publish time(which always exists)
* of the message as the timestamp.
*/
private void fillBuffer(SourceBuilder.TimestampedSourceBuffer<T> sourceBuffer) throws PulsarClientException {
if (reader == null) {
createReader();
}
int count = 0;
while (!queue.isEmpty() && count++ < MAX_FILL_MESSAGES) {
Message<M> message = queue.poll();
long timestamp;
if (message.getEventTime() != 0) {
timestamp = message.getEventTime();
} else {
timestamp = message.getPublishTime();
}
T item = projectionFn.apply(message);
offset = message.getMessageId();
if (item != null) {
sourceBuffer.add(item, timestamp);
}
}
}
示例7
@Test(timeOut = 20000, dataProvider = "lastDeletedBatching")
public void testCompactionWithLastDeletedKey(boolean batching) throws Exception {
String topic = "persistent://my-property/use/my-ns/my-topic1";
Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).enableBatching(batching)
.messageRoutingMode(MessageRoutingMode.SinglePartition).create();
pulsarClient.newConsumer().topic(topic).subscriptionName("sub1").readCompacted(true).subscribe().close();
producer.newMessage().key("1").value("1".getBytes()).send();
producer.newMessage().key("2").value("2".getBytes()).send();
producer.newMessage().key("3").value("3".getBytes()).send();
producer.newMessage().key("1").value("".getBytes()).send();
producer.newMessage().key("2").value("".getBytes()).send();
Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
compactor.compact(topic).get();
Set<String> expected = Sets.newHashSet("3");
// consumer with readCompacted enabled only get compacted entries
try (Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
.readCompacted(true).subscribe()) {
Message<byte[]> m = consumer.receive(2, TimeUnit.SECONDS);
assertTrue(expected.remove(m.getKey()));
}
}
示例8
public static void main(String[] args) throws PulsarClientException, JsonProcessingException {
PulsarClient pulsarClient = PulsarClient.builder().serviceUrl("http://localhost:8080").build();
Consumer<JsonPojo> consumer = pulsarClient.newConsumer(JSONSchema.of
(SchemaDefinition.<JsonPojo>builder().withPojo(JsonPojo.class).build())) //
.topic("persistent://my-property/use/my-ns/my-topic") //
.subscriptionName("my-subscription-name").subscribe();
Message<JsonPojo> msg = null;
for (int i = 0; i < 100; i++) {
msg = consumer.receive();
// do something
System.out.println("Received: " + msg.getValue().content);
}
// Acknowledge the consumption of all messages at once
consumer.acknowledgeCumulative(msg);
pulsarClient.close();
}
示例9
private void triggerZeroQueueSizeListener(final Message<T> message) {
checkNotNull(listener, "listener can't be null");
checkNotNull(message, "unqueued message can't be null");
listenerExecutor.execute(() -> {
stats.updateNumMsgsReceived(message);
try {
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Calling message listener for unqueued message {}", topic, subscription,
message.getMessageId());
}
waitingOnListenerForZeroQueueSize = true;
trackMessage(message);
listener.received(ZeroQueueConsumerImpl.this, beforeConsume(message));
} catch (Throwable t) {
log.error("[{}][{}] Message listener error in processing unqueued message: {}", topic, subscription,
message.getMessageId(), t);
}
increaseAvailablePermits(cnx());
waitingOnListenerForZeroQueueSize = false;
});
}
示例10
@Override
public void reconsumeLater(Message<?> message, long delayTime, TimeUnit unit) throws PulsarClientException {
if (!conf.isRetryEnable()) {
throw new PulsarClientException("reconsumeLater method not support!");
}
try {
reconsumeLaterAsync(message, delayTime, unit).get();
} catch (Exception e) {
Throwable t = e.getCause();
if (t instanceof PulsarClientException) {
throw (PulsarClientException) t;
} else {
throw new PulsarClientException(t);
}
}
}
示例11
private void failPendingReceive() {
lock.readLock().lock();
try {
if (listenerExecutor != null && !listenerExecutor.isShutdown()) {
while (!pendingReceives.isEmpty()) {
CompletableFuture<Message<T>> receiveFuture = pendingReceives.poll();
if (receiveFuture != null) {
receiveFuture.completeExceptionally(
new PulsarClientException.AlreadyClosedException("Consumer is already closed"));
} else {
break;
}
}
}
} finally {
lock.readLock().unlock();
}
}
示例12
@Override
public int convert(BatchMaker batchMaker, Source.Context context, String messageId, Message message)
throws StageException {
byte[] payload = message.getData();
int count = 0;
if (payload.length > 0) {
try {
for (Record record : ServicesUtil.parseAll(context, context, messageConfig.produceSingleRecordPerMessage,
messageId, payload)) {
Map<String, String> messageProperties = message.getProperties();
messageProperties.forEach((key, value) -> record.getHeader().setAttribute(key, value == null ? "" : value));
batchMaker.addRecord(record);
++count;
}
} catch (StageException e) {
handleException(context, messageId, e);
}
}
return count;
}
示例13
@Override
protected Message<T> internalReceive(int timeout, TimeUnit unit) throws PulsarClientException {
Message<T> message;
try {
message = incomingMessages.poll(timeout, unit);
if (message == null) {
return null;
}
messageProcessed(message);
return beforeConsume(message);
} catch (InterruptedException e) {
State state = getState();
if (state != State.Closing && state != State.Closed) {
stats.incrementNumReceiveFailed();
throw PulsarClientException.unwrap(e);
} else {
return null;
}
}
}
示例14
@Override
CompletableFuture<MessageId> internalSendAsync(Message<?> message) {
switch (getState()) {
case Ready:
case Connecting:
break; // Ok
case Closing:
case Closed:
return FutureUtil.failedFuture(new PulsarClientException.AlreadyClosedException("Producer already closed"));
case Terminated:
return FutureUtil.failedFuture(new PulsarClientException.TopicTerminatedException("Topic was terminated"));
case Failed:
case Uninitialized:
return FutureUtil.failedFuture(new PulsarClientException.NotConnectedException());
}
int partition = routerPolicy.choosePartition(message, topicMetadata);
checkArgument(partition >= 0 && partition < topicMetadata.numPartitions(),
"Illegal partition index chosen by the message routing policy: " + partition);
return producers.get(partition).internalSendAsync(message);
}
示例15
@Test(invocationTimeOut = 1000)
public void testNotifyPendingReceivedCallback_WorkNormally() {
CompletableFuture<Message<ConsumerImpl>> receiveFuture = new CompletableFuture<>();
MessageImpl message = mock(MessageImpl.class);
ConsumerImpl<ConsumerImpl> spy = spy(consumer);
consumer.pendingReceives.add(receiveFuture);
doReturn(message).when(spy).beforeConsume(any());
doNothing().when(spy).messageProcessed(message);
spy.notifyPendingReceivedCallback(message, null);
Message<ConsumerImpl> receivedMessage = receiveFuture.join();
verify(spy, times(1)).beforeConsume(message);
verify(spy, times(1)).messageProcessed(message);
Assert.assertTrue(receiveFuture.isDone());
Assert.assertFalse(receiveFuture.isCompletedExceptionally());
Assert.assertEquals(receivedMessage, message);
}
示例16
private void testReadMessages(String topic, boolean enableBatch) throws Exception {
int numKeys = 10;
Set<String> keys = publishMessages(topic, numKeys, enableBatch);
Reader<byte[]> reader = pulsarClient.newReader()
.topic(topic)
.startMessageId(MessageId.earliest)
.readerName(subscription)
.create();
while (reader.hasMessageAvailable()) {
Message<byte[]> message = reader.readNext();
Assert.assertTrue(keys.remove(message.getKey()));
}
Assert.assertTrue(keys.isEmpty());
Reader<byte[]> readLatest = pulsarClient.newReader().topic(topic).startMessageId(MessageId.latest)
.readerName(subscription + "latest").create();
Assert.assertFalse(readLatest.hasMessageAvailable());
}
示例17
private org.apache.pulsar.client.api.Producer<byte[]> createNewProducer(String topic) {
try {
pulsarProducerBuilder.messageRoutingMode(MessageRoutingMode.CustomPartition);
pulsarProducerBuilder.messageRouter(new MessageRouter() {
private static final long serialVersionUID = 1L;
@Override
public int choosePartition(Message<?> msg, TopicMetadata metadata) {
// https://kafka.apache.org/08/documentation.html#producerapi
// The default partitioner is based on the hash of the key.
return partitioner.partition(msg.getKey(), metadata.numPartitions());
}
});
log.info("Creating producer for topic {} with config {}", topic, pulsarProducerBuilder.toString());
return pulsarProducerBuilder.clone().topic(topic).create();
} catch (PulsarClientException e) {
throw new RuntimeException(e);
}
}
示例18
private static Map<Integer, String> consumeCompactedTopic(PulsarClient client,
String topic,
String subscription,
int numKeys) throws PulsarClientException {
Map<Integer, String> keys = Maps.newHashMap();
try (Consumer<byte[]> consumer = client.newConsumer()
.readCompacted(true)
.topic(topic)
.subscriptionName(subscription)
.subscribe()
) {
for (int i = 0; i < numKeys; i++) {
Message<byte[]> m = consumer.receive();
keys.put(Integer.parseInt(m.getKey()), new String(m.getValue(), UTF_8));
}
}
return keys;
}
示例19
@Test
public void testPartitions() throws Exception {
PulsarClient client = PulsarClient.builder()
.serviceUrl(proxyService.getServiceUrlTls())
.allowTlsInsecureConnection(false).tlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH).build();
TenantInfo tenantInfo = createDefaultTenantInfo();
admin.tenants().createTenant("sample", tenantInfo);
admin.topics().createPartitionedTopic("persistent://sample/test/local/partitioned-topic", 2);
Producer<byte[]> producer = client.newProducer(Schema.BYTES).topic("persistent://sample/test/local/partitioned-topic")
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
// Create a consumer directly attached to broker
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://sample/test/local/partitioned-topic")
.subscriptionName("my-sub").subscribe();
for (int i = 0; i < 10; i++) {
producer.send("test".getBytes());
}
for (int i = 0; i < 10; i++) {
Message<byte[]> msg = consumer.receive(1, TimeUnit.SECONDS);
checkNotNull(msg);
}
client.close();
}
示例20
private static void publishAndConsumeAvroMessages(String inputTopic,
String outputTopic,
int numMessages) throws Exception {
@Cleanup PulsarClient client = PulsarClient.builder()
.serviceUrl(pulsarCluster.getPlainTextServiceUrl())
.build();
@Cleanup Consumer<String> consumer = client.newConsumer(Schema.STRING)
.topic(outputTopic)
.subscriptionType(SubscriptionType.Exclusive)
.subscriptionName("test-sub")
.subscribe();
@Cleanup Producer<CustomObject> producer = client.newProducer(Schema.AVRO(CustomObject.class))
.topic(inputTopic)
.create();
for (int i = 0; i < numMessages; i++) {
CustomObject co = new CustomObject(i);
producer.send(co);
}
for (int i = 0; i < numMessages; i++) {
Message<String> msg = consumer.receive();
assertEquals("value-" + i, msg.getValue());
}
}
示例21
@Test(timeOut = 20000)
public void testSimpleTerminationReader() throws Exception {
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
MessageId msgId1 = producer.send("test-msg-1".getBytes());
MessageId msgId2 = producer.send("test-msg-2".getBytes());
MessageId msgId3 = producer.send("test-msg-3".getBytes());
MessageId lastMessageId = admin.topics().terminateTopicAsync(topicName).get();
assertEquals(lastMessageId, msgId3);
Reader<byte[]> reader = pulsarClient.newReader().topic(topicName).startMessageId(MessageId.earliest).create();
Message<byte[]> msg1 = reader.readNext();
assertEquals(msg1.getMessageId(), msgId1);
Message<byte[]> msg2 = reader.readNext();
assertEquals(msg2.getMessageId(), msgId2);
Message<byte[]> msg3 = reader.readNext();
assertEquals(msg3.getMessageId(), msgId3);
Message<byte[]> msg4 = reader.readNext(100, TimeUnit.MILLISECONDS);
assertNull(msg4);
Thread.sleep(100);
assertTrue(reader.hasReachedEndOfTopic());
}
示例22
@Test
public void testProducerAvroSchemaWithPulsarKafkaClient() throws Exception {
String topic = "testProducerAvroSchemaWithPulsarKafkaClient";
AvroSchema<Bar> barSchema = AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
AvroSchema<Foo> fooSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
@Cleanup
PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(getPlainTextServiceUrl()).build();
org.apache.pulsar.client.api.Consumer<byte[]> pulsarConsumer =
pulsarClient.newConsumer()
.topic(topic)
.subscriptionName("my-subscription")
.subscribe();
Properties props = new Properties();
props.put("bootstrap.servers", getPlainTextServiceUrl());
props.put("key.serializer", IntegerSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
Producer<Bar, Foo> producer = new KafkaProducer<>(props, barSchema, fooSchema);
for (int i = 0; i < 10; i++) {
Bar bar = new Bar();
bar.setField1(true);
Foo foo = new Foo();
foo.setField1("field1");
foo.setField2("field2");
foo.setField3(i);
producer.send(new ProducerRecord<Bar, Foo>(topic, bar, foo));
}
producer.flush();
producer.close();
for (int i = 0; i < 10; i++) {
Message<byte[]> msg = pulsarConsumer.receive(1, TimeUnit.SECONDS);
Foo value = fooSchema.decode(msg.getValue());
Assert.assertEquals(value.getField1(), "field1");
Assert.assertEquals(value.getField2(), "field2");
Assert.assertEquals(value.getField3(), i);
pulsarConsumer.acknowledge(msg);
}
}
示例23
private String buildLogMessage(Messages<M> messages) {
StringBuilder builder = new StringBuilder();
builder.append("Received batch with message ids: ");
String prefix = "";
for (Message<M> message : messages) {
builder.append(prefix);
prefix = ", ";
builder.append(message.getMessageId());
}
builder.append(" cannot be acknowledged.");
return builder.toString();
}
示例24
@Override
public Message<T> receive() throws PulsarClientException {
if (listener != null) {
throw new PulsarClientException.InvalidConfigurationException(
"Cannot use receive() when a listener has been set");
}
verifyConsumerState();
return internalReceive();
}
示例25
@Test(dataProvider = "batch")
public void testSendTimeout(int batchMessageDelayMs) throws Exception {
log.info("-- Starting {} test --", methodName);
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic("persistent://my-property/use/my-ns/my-topic5")
.subscriptionName("my-subscriber-name")
.subscriptionType(SubscriptionType.Exclusive)
.subscribe();
Producer<byte[]> producer = pulsarClient.newProducer()
.topic("persistent://my-property/use/my-ns/my-topic5")
.batchingMaxMessages(5)
.batchingMaxPublishDelay(2 * BATCHING_MAX_PUBLISH_DELAY_THRESHOLD, TimeUnit.MILLISECONDS)
.enableBatching(batchMessageDelayMs != 0)
.sendTimeout(1, TimeUnit.SECONDS)
.create();
final String message = "my-message";
// Trigger the send timeout
stopBroker();
Future<MessageId> future = producer.sendAsync(message.getBytes());
try {
future.get();
Assert.fail("Send operation should have failed");
} catch (ExecutionException e) {
// Expected
}
startBroker();
// We should not have received any message
Message<byte[]> msg = consumer.receive(3, TimeUnit.SECONDS);
Assert.assertNull(msg);
consumer.close();
log.info("-- Exiting {} test --", methodName);
}
示例26
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
Message<byte[]> message = topics.getMessageById(persistentTopic, ledgerId, entryId);
ByteBuf date = Unpooled.wrappedBuffer(message.getData());
System.out.println(ByteBufUtil.prettyHexDump(date));
}
示例27
@Test
public void testReadEntriesAfterCompaction() throws Exception {
String topic = "persistent://my-property/use/my-ns/my-topic1";
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
.enableBatching(false)
.create();
pulsarClient.newConsumer().topic(topic).subscriptionName("sub1").readCompacted(true).subscribe().close();
producer.newMessage().key("key0").value("content0".getBytes()).send();
producer.newMessage().key("key0").value("content1".getBytes()).send();
producer.newMessage().key("key0").value("content2".getBytes()).send();
Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
compactor.compact(topic).get();
producer.newMessage().key("key0").value("content3".getBytes()).send();
try (Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
.readCompacted(true).subscribe()) {
Message<byte[]> m = consumer.receive();
Assert.assertEquals(m.getKey(), "key0");
Assert.assertEquals(m.getData(), "content2".getBytes());
m = consumer.receive();
Assert.assertEquals(m.getKey(), "key0");
Assert.assertEquals(m.getData(), "content3".getBytes());
}
}
示例28
/**
* Test that sequence id from a producer is correct when there are send errors
*/
@Test
public void testCheckSequenceId() throws Exception {
admin.namespaces().createNamespace("prop/my-test", Collections.singleton("usc"));
String topicName = "prop/my-test/my-topic";
int N = 10;
PulsarClient client = PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl()).build();
// Create consumer
Consumer<String> consumer = client.newConsumer(Schema.STRING).topic(topicName).subscriptionName("sub")
.subscribe();
// Fence the topic by opening the ManagedLedger for the topic outside the Pulsar broker. This will cause the
// broker to fail subsequent send operation and it will trigger a recover
ManagedLedgerClientFactory clientFactory = new ManagedLedgerClientFactory(pulsar.getConfiguration(),
pulsar.getZkClient(), pulsar.getBookKeeperClientFactory());
ManagedLedgerFactory mlFactory = clientFactory.getManagedLedgerFactory();
ManagedLedger ml = mlFactory.open(TopicName.get(topicName).getPersistenceNamingEncoding());
ml.close();
clientFactory.close();
// Create a producer
Producer<String> producer = client.newProducer(Schema.STRING).topic(topicName).create();
for (int i = 0; i < N; i++) {
producer.send("Hello-" + i);
}
for (int i = 0; i < N; i++) {
Message<String> msg = consumer.receive();
assertEquals(msg.getValue(), "Hello-" + i);
assertEquals(msg.getSequenceId(), i);
consumer.acknowledge(msg);
}
client.close();
}
示例29
@POST
@ApiOperation(
value = "Triggers a Pulsar Function with a user-specified value or file data",
response = Message.class
)
@ApiResponses(value = {
@ApiResponse(code = 400, message = "Invalid request"),
@ApiResponse(code = 404, message = "The Pulsar Function does not exist"),
@ApiResponse(code = 408, message = "Request timeout"),
@ApiResponse(code = 500, message = "Internal server error")
})
@Path("/{tenant}/{namespace}/{functionName}/trigger")
@Consumes(MediaType.MULTIPART_FORM_DATA)
public String triggerFunction(
@ApiParam(value = "The tenant of a Pulsar Function")
final @PathParam("tenant") String tenant,
@ApiParam(value = "The namespace of a Pulsar Function")
final @PathParam("namespace") String namespace,
@ApiParam(value = "The name of a Pulsar Function")
final @PathParam("functionName") String functionName,
@ApiParam(value = "The value with which you want to trigger the Pulsar Function")
final @FormDataParam("data") String triggerValue,
@ApiParam(value = "The path to the file that contains the data with which you'd like to trigger the Pulsar Function")
final @FormDataParam("dataStream") InputStream triggerStream,
@ApiParam(value = "The specific topic name that the Pulsar Function consumes from which you want to inject the data to")
final @FormDataParam("topic") String topic) {
return functions.triggerFunction(tenant, namespace, functionName, triggerValue, triggerStream, topic, clientAppId(), clientAuthData());
}
示例30
@Test
public void testAppendWithSerializedLayout() throws Exception {
final Appender appender = ctx.getConfiguration().getAppender("PulsarAppenderWithSerializedLayout");
final LogEvent logEvent = createLogEvent();
appender.append(logEvent);
final Message<byte[]> item;
synchronized (history) {
assertEquals(1, history.size());
item = history.get(0);
}
assertNotNull(item);
assertFalse(item.hasKey());
assertEquals(LOG_MESSAGE, deserializeLogEvent(item.getData()).getMessage().getFormattedMessage());
}