Java源码示例:software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler
示例1
/**
* Uses Reactor via the onEventStream lifecycle method. This gives you full access to the publisher, which can be used
* to create a Flux.
*/
private static CompletableFuture<Void> responseHandlerBuilder_Reactor(KinesisAsyncClient client, SubscribeToShardRequest request) {
SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler
.builder()
.onError(t -> System.err.println("Error during stream - " + t.getMessage()))
.onEventStream(p -> Flux.from(p)
.ofType(SubscribeToShardEvent.class)
.flatMapIterable(SubscribeToShardEvent::records)
.limitRate(1000)
.buffer(25)
.subscribe(e -> System.out.println("Record batch = " + e)))
.build();
return client.subscribeToShard(request, responseHandler);
}
示例2
/**
* Uses RxJava via the onEventStream lifecycle method. This gives you full access to the publisher, which can be used
* to create an Rx Flowable.
*/
private static CompletableFuture<Void> responseHandlerBuilder_RxJava(KinesisAsyncClient client, SubscribeToShardRequest request) {
// snippet-start:[kinesis.java2.stream_rx_example.event_stream]
SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler
.builder()
.onError(t -> System.err.println("Error during stream - " + t.getMessage()))
.onEventStream(p -> Flowable.fromPublisher(p)
.ofType(SubscribeToShardEvent.class)
.flatMapIterable(SubscribeToShardEvent::records)
.limit(1000)
.buffer(25)
.subscribe(e -> System.out.println("Record batch = " + e)))
.build();
// snippet-end:[kinesis.java2.stream_rx_example.event_stream]
return client.subscribeToShard(request, responseHandler);
}
示例3
public static void main(String[] args) {
// snippet-start:[kinesis.java2.stream_example.setup]
Region region = Region.US_EAST_1;
KinesisAsyncClient client = KinesisAsyncClient.builder()
.region(region)
.build();
SubscribeToShardRequest request = SubscribeToShardRequest.builder()
.consumerARN(CONSUMER_ARN)
.shardId("arn:aws:kinesis:us-east-1:814548047983:stream/StockTradeStream")
.startingPosition(s -> s.type(ShardIteratorType.LATEST)).build();
// snippet-end:[kinesis.java2.stream_example.setup]
SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler
.builder()
.onError(t -> System.err.println("Error during stream - " + t.getMessage()))
.subscriber(MySubscriber::new)
.build();
client.subscribeToShard(request, responseHandler);
client.close();
}
示例4
@Override
public void onNext(SubscribeToShardEventStream recordBatchEvent) {
synchronized (parent.lockObject) {
if (flow.shouldSubscriptionCancel()) {
log.debug(
"{}: [SubscriptionLifetime]: (RecordSubscription#onNext) @ {} id: {} -- RecordFlow requires cancelling",
parent.shardId, connectionStartedAt, subscribeToShardId);
cancel();
return;
}
recordBatchEvent.accept(new SubscribeToShardResponseHandler.Visitor() {
@Override
public void visit(SubscribeToShardEvent event) {
flow.recordsReceived(event);
}
});
}
}
示例5
@Test
public void subscribeToShard_smallWindow_doesNotTimeOutReads() {
// We want sufficiently large records (relative to the initial window
// size we're choosing) so the client has to send multiple
// WINDOW_UPDATEs to receive them
for (int i = 0; i < 16; ++i) {
putRecord(64 * 1024);
}
KinesisAsyncClient smallWindowAsyncClient = KinesisAsyncClient.builder()
.credentialsProvider(CREDENTIALS_PROVIDER_CHAIN)
.httpClientBuilder(NettyNioAsyncHttpClient.builder()
.http2Configuration(Http2Configuration.builder()
.initialWindowSize(16384)
.build()))
.build();
try {
smallWindowAsyncClient.subscribeToShard(r -> r.consumerARN(consumerArn)
.shardId(shardId)
.startingPosition(s -> s.type(ShardIteratorType.TRIM_HORIZON)),
SubscribeToShardResponseHandler.builder()
.onEventStream(es -> Flowable.fromPublisher(es).forEach(e -> {}))
.onResponse(this::verifyHttpMetadata)
.build())
.join();
} finally {
smallWindowAsyncClient.close();
}
}
示例6
@Test
public void subscribeToShard_ReceivesAllData() {
List<SdkBytes> producedData = new ArrayList<>();
ScheduledExecutorService producer = Executors.newScheduledThreadPool(1);
// Delay it a bit to allow us to subscribe first
producer.scheduleAtFixedRate(() -> putRecord().ifPresent(producedData::add), 10, 1, TimeUnit.SECONDS);
List<SdkBytes> receivedData = new ArrayList<>();
// Add every event's data to the receivedData list
Consumer<SubscribeToShardEvent> eventConsumer = s -> receivedData.addAll(
s.records().stream()
.map(Record::data)
.collect(Collectors.toList()));
asyncClient.subscribeToShard(r -> r.consumerARN(consumerArn)
.shardId(shardId)
.startingPosition(s -> s.type(ShardIteratorType.LATEST)),
SubscribeToShardResponseHandler.builder()
.onEventStream(p -> p.filter(SubscribeToShardEvent.class)
.subscribe(eventConsumer))
.onResponse(this::verifyHttpMetadata)
.build())
.join();
producer.shutdown();
// Make sure we all the data we received was data we published, we may have published more
// if the producer isn't shutdown immediately after we finish subscribing.
assertThat(producedData).containsSequence(receivedData);
}
示例7
private List<SubscribeToShardEventStream> subscribeToShard() throws Throwable {
try {
List<SubscribeToShardEventStream> events = new ArrayList<>();
client.subscribeToShard(SubscribeToShardRequest.builder().build(),
SubscribeToShardResponseHandler.builder()
.subscriber(events::add)
.build())
.join();
return events;
} catch (CompletionException e) {
throw e.getCause();
}
}
示例8
/**
* Because a Flux is also a publisher, the publisherTransformer method integrates nicely with Reactor. Notice that
* you must adapt to an SdkPublisher.
*/
private static CompletableFuture<Void> responseHandlerBuilder_OnEventStream_Reactor(KinesisAsyncClient client, SubscribeToShardRequest request) {
SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler
.builder()
.onError(t -> System.err.println("Error during stream - " + t.getMessage()))
.publisherTransformer(p -> Flux.from(p).limitRate(100).as(SdkPublisher::adapt))
.build();
return client.subscribeToShard(request, responseHandler);
}
示例9
/**
* Because a Flowable is also a publisher, the publisherTransformer method integrates nicely with RxJava. Notice that
* you must adapt to an SdkPublisher.
*/
private static CompletableFuture<Void> responseHandlerBuilder_OnEventStream_RxJava(KinesisAsyncClient client, SubscribeToShardRequest request) {
// snippet-start:[kinesis.java2.stream_rx_example.publish_transform]
SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler
.builder()
.onError(t -> System.err.println("Error during stream - " + t.getMessage()))
.publisherTransformer(p -> SdkPublisher.adapt(Flowable.fromPublisher(p).limit(100)))
.build();
// snippet-end:[kinesis.java2.stream_rx_example.publish_transform]
return client.subscribeToShard(request, responseHandler);
}
示例10
private static CompletableFuture<Void> responseHandlerBuilder(KinesisAsyncClient client, SubscribeToShardRequest request) {
SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler
.builder()
.onError(t -> System.err.println("Error during stream - " + t.getMessage()))
.onComplete(() -> System.out.println("All records stream successfully"))
// Must supply some type of subscriber
.subscriber(e -> System.out.println("Received event - " + e))
.build();
return client.subscribeToShard(request, responseHandler);
}
示例11
/**
* Uses the SubscribeToShardResponseHandler.Builder and a simple consumer of events to subscribe
*/
private static CompletableFuture<Void> responseHandlerBuilderConsumer(KinesisAsyncClient client, SubscribeToShardRequest request) {
SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler
.builder()
.onError(t -> System.err.println("Error during stream - " + t.getMessage()))
.subscriber(e -> System.out.println("Received event - " + e))
.build();
return client.subscribeToShard(request, responseHandler);
}
示例12
/**
* Uses the publisherTransformer method to customize the publisher before ultimately subscribing to it
*/
// snippet-start:[kinesis.java2.stream_example.publish_transformer]
private static CompletableFuture<Void> responseHandlerBuilderPublisherTransformer(KinesisAsyncClient client, SubscribeToShardRequest request) {
SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler
.builder()
.onError(t -> System.err.println("Error during stream - " + t.getMessage()))
.publisherTransformer(p -> p.filter(e -> e instanceof SubscribeToShardEvent).limit(100))
.subscriber(e -> System.out.println("Received event - " + e))
.build();
return client.subscribeToShard(request, responseHandler);
}
示例13
/**
* Creates a SubscribeToShardResponseHandler.Visitor using the builder, which lets you register an event handler for
* all events you're interested in instead of implementing the interface
*/
// snippet-start:[kinesis.java2.stream_example.visitor]
private static CompletableFuture<Void> responseHandlerBuilderVisitorBuilder(KinesisAsyncClient client, SubscribeToShardRequest request) {
SubscribeToShardResponseHandler.Visitor visitor = SubscribeToShardResponseHandler.Visitor
.builder()
.onSubscribeToShardEvent(e -> System.out.println("Received subscribe to shard event " + e))
.build();
SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler
.builder()
.onError(t -> System.err.println("Error during stream - " + t.getMessage()))
.subscriber(visitor)
.build();
return client.subscribeToShard(request, responseHandler);
}
示例14
/**
* Subscribes to the stream of events by implementing the SubscribeToShardResponseHandler.Visitor interface
*/
private static CompletableFuture<Void> responseHandlerBuilderVisitor(KinesisAsyncClient client, SubscribeToShardRequest request) {
SubscribeToShardResponseHandler.Visitor visitor = new SubscribeToShardResponseHandler.Visitor() {
@Override
public void visit(SubscribeToShardEvent event) {
System.out.println("Received subscribe to shard event " + event);
}
};
SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler
.builder()
.onError(t -> System.err.println("Error during stream - " + t.getMessage()))
.subscriber(visitor)
.build();
return client.subscribeToShard(request, responseHandler);
}
示例15
/**
* Creates a SubscribeToShardResponseHandler the classic way by implementing the interface
*/
// snippet-start:[kinesis.java2.stream_example.custom_handler]
private static CompletableFuture<Void> responseHandlerBuilderClassic(KinesisAsyncClient client, SubscribeToShardRequest request) {
SubscribeToShardResponseHandler responseHandler = new SubscribeToShardResponseHandler() {
@Override
public void responseReceived(SubscribeToShardResponse response) {
System.out.println("Receieved initial response");
}
@Override
public void onEventStream(SdkPublisher<SubscribeToShardEventStream> publisher) {
publisher
// Filter to only SubscribeToShardEvents
.filter(SubscribeToShardEvent.class)
// Flat map into a publisher of just records
.flatMapIterable(SubscribeToShardEvent::records)
// Limit to 1000 total records
.limit(1000)
// Batch records into lists of 25
.buffer(25)
// Print out each record batch
.subscribe(batch -> System.out.println("Record Batch - " + batch));
}
@Override
public void complete() {
System.out.println("All records stream successfully");
}
@Override
public void exceptionOccurred(Throwable throwable) {
System.err.println("Error during stream - " + throwable.getMessage());
}
};
return client.subscribeToShard(request, responseHandler);
}
示例16
/**
* Use the SubscribeToShardResponseHandler.Builder and a traditional subscriber
*/
// snippet-start:[kinesis.java2.stream_example.subscribe]
private static CompletableFuture<Void> responseHandlerBuilderSubscriber(KinesisAsyncClient client, SubscribeToShardRequest request) {
SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler
.builder()
.onError(t -> System.err.println("Error during stream - " + t.getMessage()))
.subscriber(MySubscriber::new)
.build();
return client.subscribeToShard(request, responseHandler);
}
示例17
/**
* Subscribes to the publisher using the onEventStream lifecycle callback method, which allows greater control
* over the publisher and allows transformation methods on the publisher, like map and buffer
*/
private static CompletableFuture<Void> responseHandlerBuilderOnEventStream(KinesisAsyncClient client, SubscribeToShardRequest request) {
SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler
.builder()
.onError(t -> System.err.println("Error during stream - " + t.getMessage()))
.onEventStream(p -> p.filter(SubscribeToShardEvent.class).subscribe(new MySubscriber()))
.build();
return client.subscribeToShard(request, responseHandler);
}