Java源码示例:org.eclipse.microprofile.reactive.streams.operators.ProcessorBuilder
示例1
@Test
public void dropWhileStageBuilderShouldBeReusable() {
ProcessorBuilder<Integer, Integer> dropWhile = rs.<Integer>builder()
.dropWhile(i -> i < 3);
assertEquals(await(rs.of(1, 2, 3, 4)
.via(dropWhile)
.toList()
.run(getEngine())), Arrays.asList(3, 4));
assertEquals(await(rs.of(0, 1, 6, 7)
.via(dropWhile)
.toList()
.run(getEngine())), Arrays.asList(6, 7));
}
示例2
@Test
public void testToStringProcessorFromSpec() throws ExecutionException, InterruptedException {
ProcessorBuilder<Integer, String> toStringProcessor = ReactiveStreams.<Integer> builder().map(Object::toString);
SubscriberBuilder<Integer, List<String>> toList = toStringProcessor.toList();
CompletionStage<List<String>> stage = ReactiveStreams.of(1, 2, 3, 4, 5).to(toList).run();
assertThat(stage.toCompletableFuture().get()).containsExactly("1", "2", "3", "4", "5");
}
示例3
@Test
public void builderShouldBeImmutable() {
ProcessorBuilder<Integer, Integer> builder = builder();
ProcessorBuilder<Integer, Integer> mapped = builder.map(Function.identity());
ProcessorBuilder<Integer, Integer> distinct = builder.distinct();
SubscriberBuilder<Integer, Void> cancelled = builder.cancel();
getAddedStage(Stage.Map.class, graphFor(mapped));
getAddedStage(Stage.Distinct.class, graphFor(distinct));
getAddedStage(Stage.Cancel.class, graphFor(cancelled));
}
示例4
@Test
public void peekStageShouldBeReusable() {
ProcessorBuilder<Integer, Integer> peek = rs.<Integer>builder().peek(t -> {});
assertEquals(await(rs.of(1, 2, 3).via(peek).toList().run(getEngine())), Arrays.asList(1, 2, 3));
assertEquals(await(rs.of(4, 5, 6).via(peek).toList().run(getEngine())), Arrays.asList(4, 5, 6));
}
示例5
@Test
public void flatMapCsStageBuilderShouldBeResuable() {
ProcessorBuilder<Integer, Integer> mapper = rs.<Integer>builder()
.flatMapCompletionStage(i -> CompletableFuture.completedFuture(i + 1));
assertEquals(await(rs.of(1, 2, 3).via(mapper).toList().run(getEngine())), Arrays.asList(2, 3, 4));
assertEquals(await(rs.of(4, 5, 6).via(mapper).toList().run(getEngine())), Arrays.asList(5, 6, 7));
}
示例6
@Test
public void flatMapStageBuilderShouldBeReusable() {
ProcessorBuilder<PublisherBuilder<Integer>, Integer> flatMap =
rs.<PublisherBuilder<Integer>>builder().flatMap(Function.identity());
assertEquals(await(rs.of(rs.of(1, 2)).via(flatMap).toList().run(getEngine())), Arrays.asList(1, 2));
assertEquals(await(rs.of(rs.of(3, 4)).via(flatMap).toList().run(getEngine())), Arrays.asList(3, 4));
}
示例7
@Test
public void skipStageShouldBeReusable() {
ProcessorBuilder<Integer, Integer> skip = rs.<Integer>builder().skip(2);
assertEquals(await(rs.of(1, 2, 3, 4).via(skip).toList().run(getEngine())), Arrays.asList(3, 4));
assertEquals(await(rs.of(5, 6, 7, 8).via(skip).toList().run(getEngine())), Arrays.asList(7, 8));
}
示例8
@Override
public <T, R> ProcessorBuilder<T, R> coupled(SubscriberBuilder<? super T, ?> subscriber,
PublisherBuilder<? extends R> publisher) {
Graph sGraph = ReactiveStreamsGraphBuilder.rsBuilderToGraph(Objects.requireNonNull(subscriber,
"Subscriber must not be null"));
Graph pGraph = ReactiveStreamsGraphBuilder.rsBuilderToGraph(Objects.requireNonNull(publisher,
"Publisher must not be null"));
return new ProcessorBuilderImpl<>(new Stages.Coupled(sGraph, pGraph), null);
}
示例9
private void processMethodReturningAProcessorBuilderOfMessages() {
ProcessorBuilder<Message<?>, Message<?>> builder = Objects.requireNonNull(invoke(),
msg.methodReturnedNull(configuration.methodAsString()));
this.processor = ReactiveStreams.<Message<?>> builder()
.flatMapCompletionStage(managePreProcessingAck())
.via(builder)
.buildRs();
}
示例10
@SuppressWarnings({ "unchecked", "rawtypes" })
private void processMethodReturningAProcessorBuilderOfPayloads() {
ProcessorBuilder returnedProcessorBuilder = invoke();
Objects.requireNonNull(returnedProcessorBuilder, msg.methodReturnedNull(configuration.methodAsString()));
this.processor = ReactiveStreams.<Message<?>> builder()
.flatMapCompletionStage(managePreProcessingAck())
.map(Message::getPayload)
.via(returnedProcessorBuilder)
.map(Message::of)
.buildRs();
}
示例11
@Incoming(NO_ACKNOWLEDGMENT_BUILDER)
@Acknowledgment(Acknowledgment.Strategy.NONE)
@Outgoing("sink-" + NO_ACKNOWLEDGMENT_BUILDER)
public ProcessorBuilder<String, String> processorWithNoAckWithBuilder() {
return ReactiveStreams.<String> builder()
.flatMap(m -> ReactiveStreams.of(m, m))
.peek(m -> processed(NO_ACKNOWLEDGMENT_BUILDER, m));
}
示例12
@Incoming(DEFAULT_ACKNOWLEDGMENT_BUILDER)
@Outgoing("sink-" + DEFAULT_ACKNOWLEDGMENT_BUILDER)
public ProcessorBuilder<String, String> processorWithAutoAckBuilder() {
return ReactiveStreams.<String> builder()
.flatMap(m -> ReactiveStreams.of(m, m))
.peek(m -> processed(DEFAULT_ACKNOWLEDGMENT_BUILDER, m));
}
示例13
@Incoming(PRE_ACKNOWLEDGMENT_BUILDER)
@Acknowledgment(Acknowledgment.Strategy.PRE_PROCESSING)
@Outgoing("sink-" + PRE_ACKNOWLEDGMENT_BUILDER)
public ProcessorBuilder<String, String> processorWithPreAckBuilder() {
return ReactiveStreams.<String> builder()
.flatMap(m -> ReactiveStreams.of(m, m))
.peek(m -> processed(PRE_ACKNOWLEDGMENT_BUILDER, m));
}
示例14
@Incoming(MANUAL_ACKNOWLEDGMENT_BUILDER)
@Acknowledgment(Acknowledgment.Strategy.MANUAL)
@Outgoing("sink-" + MANUAL_ACKNOWLEDGMENT_BUILDER)
public ProcessorBuilder<Message<String>, Message<String>> processorWithAckWithBuilder() {
return ReactiveStreams.<Message<String>> builder()
.flatMapCompletionStage(m -> m.ack().thenApply(x -> m))
.flatMap(m -> ReactiveStreams.of(Message.of(m.getPayload()), Message.of(m.getPayload())))
.peek(m -> processed(MANUAL_ACKNOWLEDGMENT_BUILDER, m));
}
示例15
@Incoming(NO_ACKNOWLEDGMENT_BUILDER)
@Acknowledgment(Acknowledgment.Strategy.NONE)
@Outgoing("sink-" + NO_ACKNOWLEDGMENT_BUILDER)
public ProcessorBuilder<Message<String>, Message<String>> processorWithNoAckWithBuilder() {
return ReactiveStreams.<Message<String>> builder()
.flatMap(m -> ReactiveStreams.of(Message.of(m.getPayload()), Message.of(m.getPayload())))
.peek(m -> processed(NO_ACKNOWLEDGMENT_BUILDER, m));
}
示例16
@Incoming(DEFAULT_ACKNOWLEDGMENT_BUILDER)
@Outgoing("sink-" + DEFAULT_ACKNOWLEDGMENT_BUILDER)
public ProcessorBuilder<Message<String>, Message<String>> processorWithAutoAckBuilder() {
return ReactiveStreams.<Message<String>> builder()
.flatMap(m -> ReactiveStreams.of(Message.of(m.getPayload()), Message.of(m.getPayload())).onComplete(
m::ack))
.peek(m -> processed(DEFAULT_ACKNOWLEDGMENT_BUILDER, m));
}
示例17
@Incoming(PRE_ACKNOWLEDGMENT_BUILDER)
@Acknowledgment(Acknowledgment.Strategy.PRE_PROCESSING)
@Outgoing("sink-" + PRE_ACKNOWLEDGMENT_BUILDER)
public ProcessorBuilder<Message<String>, Message<String>> processorWithPreAckBuilder() {
return ReactiveStreams.<Message<String>> builder()
.flatMap(m -> ReactiveStreams.of(Message.of(m.getPayload()), Message.of(m.getPayload())))
.peek(m -> processed(PRE_ACKNOWLEDGMENT_BUILDER, m));
}
示例18
@Incoming("count")
@Outgoing("sink")
public ProcessorBuilder<Message<Integer>, Message<String>> process() {
return ReactiveStreams.<Message<Integer>> builder()
.map(Message::getPayload)
.map(i -> i + 1)
.flatMapRsPublisher(i -> Flowable.just(i, i))
.map(i -> Integer.toString(i))
.map(Message::of);
}
示例19
@Incoming("count")
@Outgoing("sink")
public ProcessorBuilder<Integer, String> process() {
return ReactiveStreams.<Integer> builder()
.map(i -> i + 1)
.flatMapRsPublisher(i -> Flowable.just(i, i))
.map(i -> Integer.toString(i));
}
示例20
@Blocking
@Incoming("count")
@Outgoing("sink")
public ProcessorBuilder<Message<Integer>, Message<String>> process() {
return ReactiveStreams.<Message<Integer>> builder()
.map(Message::getPayload)
.map(i -> i + 1)
.flatMapRsPublisher(i -> Flowable.just(i, i))
.map(i -> Integer.toString(i))
.map(Message::of);
}
示例21
@Blocking
@Incoming("count")
@Outgoing("sink")
public ProcessorBuilder<Integer, String> process() {
return ReactiveStreams.<Integer> builder()
.map(i -> i + 1)
.flatMapRsPublisher(i -> Flowable.just(i, i))
.map(i -> Integer.toString(i));
}
示例22
@Incoming("publisher-for-processor-builder-message")
@Outgoing("processor-builder-message")
public ProcessorBuilder<Message<Integer>, Message<String>> processorBuilderOfMessages() {
increment("publisher-for-processor-builder-message");
return ReactiveStreams.<Message<Integer>>builder()
.map(Message::getPayload)
.map(i -> i + 1)
.flatMap(i -> ReactiveStreams.of(i, i))
.map(i -> Integer.toString(i))
.map(Message::of);
}
示例23
@Incoming("publisher-for-processor-builder-payload")
@Outgoing("processor-builder-payload")
public ProcessorBuilder<Integer, String> processorBuilderOfPayloads() {
increment("publisher-for-processor-builder-payload");
return ReactiveStreams.<Integer>builder()
.map(i -> i + 1)
.flatMap(i -> ReactiveStreams.of(i, i))
.map(i -> Integer.toString(i));
}
示例24
public void via() {
//tag::via[]
ProcessorBuilder<Integer, String> processor = ReactiveStreams
.<Integer>builder().map(i -> Integer.toString(i));
ReactiveStreams.of(1, 2)
.via(processor); // ("1", "2")
//end::via[]
}
示例25
private ProcessorBuilder<Integer, Integer> duplicateProcessorBuilder() {
return ReactiveStreams.<Integer> builder().flatMapIterable(i -> Arrays.asList(i, i));
}
示例26
private ProcessorBuilder<Integer, String> asStringProcessorBuilder() {
return ReactiveStreams.<Integer> builder().map(Object::toString);
}
示例27
protected Graph graphFor(ProcessorBuilder<?, ?> pb) {
return objGraphFor(pb);
}
示例28
@Test(expectedExceptions = NullPointerException.class)
public void viaNull() {
builder().via((ProcessorBuilder) null);
}
示例29
private ProcessorBuilder<Integer, Integer> builder() {
return rs.<Integer>builder().map(Function.identity());
}
示例30
@Test(expectedExceptions = NullPointerException.class)
public void viaNull() {
builder().via((ProcessorBuilder) null);
}