Java源码示例:org.eclipse.microprofile.reactive.messaging.Incoming
示例1
@Blocking
@Incoming("count")
public Uni<Void> consume(Message<String> message) {
return Uni.createFrom().item(() -> {
list.add(message.getPayload());
return null;
});
}
示例2
@Incoming("my-dummy-stream")
@Outgoing("toUpperCase")
public Multi<Message<String>> toUppercase(Multi<Message<String>> input) {
return input
.map(Message::getPayload)
.map(String::toUpperCase)
.map(Message::of);
}
示例3
@Blocking
@Incoming("count")
@Outgoing("sink")
public CompletionStage<Message<String>> process(Message<Integer> value) {
return CompletableFuture.supplyAsync(() -> Integer.toString(value.getPayload() + 1))
.thenApply(Message::of);
}
示例4
@Incoming("count")
@Outgoing("sink")
public Publisher<String> process(Multi<Integer> source) {
return source
.map(i -> i + 1)
.flatMap(i -> Flowable.just(i, i))
.map(i -> Integer.toString(i));
}
示例5
@Incoming(MANUAL_ACKNOWLEDGMENT_BUILDER)
@Acknowledgment(Acknowledgment.Strategy.MANUAL)
@Outgoing("sink-" + MANUAL_ACKNOWLEDGMENT_BUILDER)
public PublisherBuilder<Message<String>> processorWithAckWithBuilder(Message<String> message) {
return ReactiveStreams.of(message)
.flatMapCompletionStage(m -> m.ack().thenApply(x -> m))
.flatMap(m -> ReactiveStreams.of(Message.of(m.getPayload()), Message.of(m.getPayload())))
.peek(m -> processed(MANUAL_ACKNOWLEDGMENT_BUILDER, m));
}
示例6
@Blocking
@Incoming("count")
@Outgoing("sink")
public ProcessorBuilder<Message<Integer>, Message<String>> process() {
return ReactiveStreams.<Message<Integer>> builder()
.map(Message::getPayload)
.map(i -> i + 1)
.flatMapRsPublisher(i -> Flowable.just(i, i))
.map(i -> Integer.toString(i))
.map(Message::of);
}
示例7
@Incoming(NO_ACKNOWLEDGMENT_PAYLOAD)
@Acknowledgment(Acknowledgment.Strategy.NONE)
public Subscriber<String> subWithNoAckWithPayload() {
return ReactiveStreams.<String> builder()
.forEach(m -> {
processed(NO_ACKNOWLEDGMENT_PAYLOAD, m);
microNap();
})
.build();
}
示例8
@Incoming("publisher-for-processor-builder-message")
@Outgoing("processor-builder-message")
public ProcessorBuilder<Message<Integer>, Message<String>> processorBuilderOfMessages() {
increment("publisher-for-processor-builder-message");
return ReactiveStreams.<Message<Integer>>builder()
.map(Message::getPayload)
.map(i -> i + 1)
.flatMap(i -> ReactiveStreams.of(i, i))
.map(i -> Integer.toString(i))
.map(Message::of);
}
示例9
@Incoming(POST_ACKNOWLEDGMENT)
@Acknowledgment(Acknowledgment.Strategy.POST_PROCESSING)
@Outgoing("sink-" + POST_ACKNOWLEDGMENT)
public String processorWithPostAck(String input) {
processed(POST_ACKNOWLEDGMENT, input);
return input + "1";
}
示例10
@Incoming("C")
@Outgoing("CC")
public PublisherBuilder<Message<String>> process(PublisherBuilder<Message<Integer>> publisher) {
return publisher
.flatMapCompletionStage(m -> CompletableFuture
.supplyAsync(() -> Message.of(Integer.toString(m.getPayload())), executor));
}
示例11
@Incoming("publisher-for-processor-publisher-payload")
@Outgoing("processor-publisher-payload")
public Publisher<String> processorOfPayloads(int value) {
return ReactiveStreams.of(value)
.map(i -> i + 1)
.flatMap(i -> ReactiveStreams.of(i, i))
.map(i -> Integer.toString(i))
.buildRs();
}
示例12
@Incoming(PRE_ACKNOWLEDGMENT)
@Acknowledgment(Acknowledgment.Strategy.PRE_PROCESSING)
@Outgoing("sink")
public Message<String> processorWithPreAck(Message<String> input) {
processed(PRE_ACKNOWLEDGMENT, input);
return Message.of(input.getPayload() + "1");
}
示例13
@Incoming("prices")
@Outgoing("my-data-stream")
@Broadcast
public double process(byte[] priceRaw) {
int priceInUsd = Integer.parseInt(new String(priceRaw));
System.out.println("Receiving price: " + priceInUsd);
return priceInUsd * CONVERSION_RATE;
}
示例14
@Incoming("in")
@Outgoing("out-1")
public Multi<String> processPayload(String s) {
if (s.equalsIgnoreCase("skip")) {
return Multi.createFrom().empty();
}
return Multi.createFrom().item(s.toUpperCase());
}
示例15
@Incoming("in")
@Outgoing("out")
public Message<String> processMessage(Message<String> m) {
String s = m.getPayload();
if (s.equalsIgnoreCase("skip")) {
return null;
}
return m.withPayload(s.toUpperCase());
}
示例16
@Incoming("a")
@Incoming("b")
@Incoming("c")
@Outgoing("out")
public Message<String> process(Message<String> s) {
return Message.of(s.getPayload().toUpperCase());
}
示例17
@Incoming("A")
@Outgoing("AA")
public Publisher<Message<String>> process(Publisher<Message<Integer>> publisher) {
return ReactiveStreams.fromPublisher(publisher)
.flatMapCompletionStage(m -> CompletableFuture
.supplyAsync(() -> Message.of(Integer.toString(m.getPayload())), executor))
.buildRs();
}
示例18
@Incoming("hello")
@Outgoing("out")
public Flowable<String> consume(Flowable<String> values) {
Scheduler scheduler = Schedulers.from(executor);
return values
.observeOn(scheduler)
.delay(1, TimeUnit.MILLISECONDS, scheduler)
.doOnError(err -> {
downstreamFailure = err;
});
}
示例19
@Incoming("in")
@Outgoing("out")
public Publisher<String> transformPayload(Multi<String> stream) {
return stream
// The incoming messages are already acknowledged
.map(String::toUpperCase);
}
示例20
@Incoming("sink")
public Subscriber<Message<String>> sink() {
CompletionSubscriber<Message<String>, List<Message<String>>> subscriber = ReactiveStreams.<Message<String>> builder()
.toList().build();
subscriber.getCompletion().thenAccept(result::addAll);
return subscriber;
}
示例21
@SuppressWarnings({ "rawtypes" })
@Incoming("prices")
public CompletionStage<Void> consume(Message<Double> price) {
// process your price.
list.add(price.getPayload());
Optional<IncomingKafkaRecordMetadata> metadata = price.getMetadata(IncomingKafkaRecordMetadata.class);
metadata.orElseThrow(() -> new IllegalArgumentException("Metadata are missing"));
// Acknowledge the incoming message (commit the offset)
return price.ack();
}
示例22
@Incoming(DEFAULT_ACKNOWLEDGMENT_UNI)
@Outgoing("sink")
public Uni<Message<String>> processorWithDefaultAckUni(Message<String> input) {
return Uni.createFrom().item(() -> {
processed(DEFAULT_ACKNOWLEDGMENT_UNI, input);
return input.withPayload(input.getPayload() + "1");
});
}
示例23
@Incoming("count")
@Outgoing("sink")
public Publisher<Message<String>> process(Flux<Message<Integer>> source) {
return source
.map(Message::getPayload)
.map(i -> i + 1)
.flatMap(i -> Flowable.just(i, i))
.map(i -> Integer.toString(i))
.map(Message::of);
}
示例24
@Blocking
@Incoming("count")
@Outgoing("sink")
public PublisherBuilder<Message<String>> process(Message<Integer> message) {
return ReactiveStreams.of(message)
.map(Message::getPayload)
.map(i -> i + 1)
.flatMapRsPublisher(i -> Flowable.just(i, i))
.map(i -> Integer.toString(i))
.map(Message::of);
}
示例25
@Incoming("prices")
public CompletionStage<Void> consume(Message<Double> price) {
// process your price.
// Acknowledge the incoming message, marking the AMQP message as `accepted`.
return price.ack();
}
示例26
@Incoming("in")
@Outgoing("out-2")
public Multi<Message<String>> processMessage(Message<String> m) {
String s = m.getPayload();
if (s.equalsIgnoreCase("skip")) {
return Multi.createFrom().empty();
}
return Multi.createFrom().item(m.withPayload(s.toUpperCase()));
}
示例27
@Incoming("asynchronous-message")
public void getMessgesFromProcessorBuilderOfMessages(String value) {
add("processor-builder-message", value);
}
示例28
@Incoming("source")
public void consume(final String message) {
messages.add(message);
}
示例29
@Incoming("my-stream")
public void consume(int value) {
System.out.println("Received: " + value);
}
示例30
@Incoming("out-4")
public void consumeOut4(String s) {
out4.add(s);
}