Java源码示例:software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler

示例1
/**
 * Uses Reactor via the onEventStream lifecycle method. This gives you full access to the publisher, which can be used
 * to create a Flux.
 */
private static CompletableFuture<Void> responseHandlerBuilder_Reactor(KinesisAsyncClient client, SubscribeToShardRequest request) {

    SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler
        .builder()
        .onError(t -> System.err.println("Error during stream - " + t.getMessage()))
        .onEventStream(p -> Flux.from(p)
                                .ofType(SubscribeToShardEvent.class)
                                .flatMapIterable(SubscribeToShardEvent::records)
                                .limitRate(1000)
                                .buffer(25)
                                .subscribe(e -> System.out.println("Record batch = " + e)))
        .build();
    return client.subscribeToShard(request, responseHandler);

}
 
示例2
/**
 * Uses RxJava via the onEventStream lifecycle method. This gives you full access to the publisher, which can be used
 * to create an Rx Flowable.
 */
private static CompletableFuture<Void> responseHandlerBuilder_RxJava(KinesisAsyncClient client, SubscribeToShardRequest request) {

    // snippet-start:[kinesis.java2.stream_rx_example.event_stream]
    SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler
        .builder()
        .onError(t -> System.err.println("Error during stream - " + t.getMessage()))
        .onEventStream(p -> Flowable.fromPublisher(p)
                                    .ofType(SubscribeToShardEvent.class)
                                    .flatMapIterable(SubscribeToShardEvent::records)
                                    .limit(1000)
                                    .buffer(25)
                                    .subscribe(e -> System.out.println("Record batch = " + e)))
        .build();
    // snippet-end:[kinesis.java2.stream_rx_example.event_stream]
    return client.subscribeToShard(request, responseHandler);

}
 
示例3
public static void main(String[] args) {

        // snippet-start:[kinesis.java2.stream_example.setup]
        Region region = Region.US_EAST_1;
        KinesisAsyncClient client = KinesisAsyncClient.builder()
        .region(region)
        .build();

        SubscribeToShardRequest request = SubscribeToShardRequest.builder()
                .consumerARN(CONSUMER_ARN)
                .shardId("arn:aws:kinesis:us-east-1:814548047983:stream/StockTradeStream")
                .startingPosition(s -> s.type(ShardIteratorType.LATEST)).build();

        // snippet-end:[kinesis.java2.stream_example.setup]
        SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler
                .builder()
                .onError(t -> System.err.println("Error during stream - " + t.getMessage()))
                .subscriber(MySubscriber::new)
                .build();

        client.subscribeToShard(request, responseHandler);
        client.close();
    }
 
示例4
@Override
public void onNext(SubscribeToShardEventStream recordBatchEvent) {
    synchronized (parent.lockObject) {
        if (flow.shouldSubscriptionCancel()) {
            log.debug(
                    "{}: [SubscriptionLifetime]: (RecordSubscription#onNext) @ {} id: {} -- RecordFlow requires cancelling",
                    parent.shardId, connectionStartedAt, subscribeToShardId);
            cancel();
            return;
        }
        recordBatchEvent.accept(new SubscribeToShardResponseHandler.Visitor() {
            @Override
            public void visit(SubscribeToShardEvent event) {
                flow.recordsReceived(event);
            }
        });
    }
}
 
示例5
@Test
public void subscribeToShard_smallWindow_doesNotTimeOutReads() {
    // We want sufficiently large records (relative to the initial window
    // size we're choosing) so the client has to send multiple
    // WINDOW_UPDATEs to receive them
    for (int i = 0; i < 16; ++i) {
        putRecord(64 * 1024);
    }

    KinesisAsyncClient smallWindowAsyncClient = KinesisAsyncClient.builder()
            .credentialsProvider(CREDENTIALS_PROVIDER_CHAIN)
            .httpClientBuilder(NettyNioAsyncHttpClient.builder()
                .http2Configuration(Http2Configuration.builder()
                        .initialWindowSize(16384)
                        .build()))
            .build();

    try {
        smallWindowAsyncClient.subscribeToShard(r -> r.consumerARN(consumerArn)
                        .shardId(shardId)
                        .startingPosition(s -> s.type(ShardIteratorType.TRIM_HORIZON)),
                SubscribeToShardResponseHandler.builder()
                        .onEventStream(es -> Flowable.fromPublisher(es).forEach(e -> {}))
                        .onResponse(this::verifyHttpMetadata)
                        .build())
                .join();

    } finally {
        smallWindowAsyncClient.close();
    }
}
 
示例6
@Test
public void subscribeToShard_ReceivesAllData() {
    List<SdkBytes> producedData = new ArrayList<>();
    ScheduledExecutorService producer = Executors.newScheduledThreadPool(1);
    // Delay it a bit to allow us to subscribe first
    producer.scheduleAtFixedRate(() -> putRecord().ifPresent(producedData::add), 10, 1, TimeUnit.SECONDS);

    List<SdkBytes> receivedData = new ArrayList<>();
    // Add every event's data to the receivedData list
    Consumer<SubscribeToShardEvent> eventConsumer = s -> receivedData.addAll(
        s.records().stream()
         .map(Record::data)
         .collect(Collectors.toList()));
    asyncClient.subscribeToShard(r -> r.consumerARN(consumerArn)
                                       .shardId(shardId)
                                       .startingPosition(s -> s.type(ShardIteratorType.LATEST)),
                                 SubscribeToShardResponseHandler.builder()
                                                                .onEventStream(p -> p.filter(SubscribeToShardEvent.class)
                                                                                     .subscribe(eventConsumer))
                                                                .onResponse(this::verifyHttpMetadata)
                                                                .build())
               .join();
    producer.shutdown();
    // Make sure we all the data we received was data we published, we may have published more
    // if the producer isn't shutdown immediately after we finish subscribing.
    assertThat(producedData).containsSequence(receivedData);
}
 
示例7
private List<SubscribeToShardEventStream> subscribeToShard() throws Throwable {
    try {
        List<SubscribeToShardEventStream> events = new ArrayList<>();
        client.subscribeToShard(SubscribeToShardRequest.builder().build(),
                                SubscribeToShardResponseHandler.builder()
                                                               .subscriber(events::add)
                                                               .build())
              .join();
        return events;
    } catch (CompletionException e) {
        throw e.getCause();
    }
}
 
示例8
/**
 * Because a Flux is also a publisher, the publisherTransformer method integrates nicely with Reactor. Notice that
 * you must adapt to an SdkPublisher.
 */
private static CompletableFuture<Void> responseHandlerBuilder_OnEventStream_Reactor(KinesisAsyncClient client, SubscribeToShardRequest request) {
    SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler
        .builder()
        .onError(t -> System.err.println("Error during stream - " + t.getMessage()))
        .publisherTransformer(p -> Flux.from(p).limitRate(100).as(SdkPublisher::adapt))
        .build();
    return client.subscribeToShard(request, responseHandler);
}
 
示例9
/**
 * Because a Flowable is also a publisher, the publisherTransformer method integrates nicely with RxJava. Notice that
 * you must adapt to an SdkPublisher.
 */
private static CompletableFuture<Void> responseHandlerBuilder_OnEventStream_RxJava(KinesisAsyncClient client, SubscribeToShardRequest request) {
    // snippet-start:[kinesis.java2.stream_rx_example.publish_transform]
    SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler
        .builder()
        .onError(t -> System.err.println("Error during stream - " + t.getMessage()))
        .publisherTransformer(p -> SdkPublisher.adapt(Flowable.fromPublisher(p).limit(100)))
        .build();
    // snippet-end:[kinesis.java2.stream_rx_example.publish_transform]
    return client.subscribeToShard(request, responseHandler);
}
 
示例10
private static CompletableFuture<Void> responseHandlerBuilder(KinesisAsyncClient client, SubscribeToShardRequest request) {
    SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler
            .builder()
            .onError(t -> System.err.println("Error during stream - " + t.getMessage()))
            .onComplete(() -> System.out.println("All records stream successfully"))
            // Must supply some type of subscriber
            .subscriber(e -> System.out.println("Received event - " + e))
            .build();
    return client.subscribeToShard(request, responseHandler);
}
 
示例11
/**
 * Uses the SubscribeToShardResponseHandler.Builder and a simple consumer of events to subscribe
 */
private static CompletableFuture<Void> responseHandlerBuilderConsumer(KinesisAsyncClient client, SubscribeToShardRequest request) {
    SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler
            .builder()
            .onError(t -> System.err.println("Error during stream - " + t.getMessage()))
            .subscriber(e -> System.out.println("Received event - " + e))
            .build();
    return client.subscribeToShard(request, responseHandler);
}
 
示例12
/**
 * Uses the publisherTransformer method to customize the publisher before ultimately subscribing to it
 */
// snippet-start:[kinesis.java2.stream_example.publish_transformer]
private static CompletableFuture<Void> responseHandlerBuilderPublisherTransformer(KinesisAsyncClient client, SubscribeToShardRequest request) {
    SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler
            .builder()
            .onError(t -> System.err.println("Error during stream - " + t.getMessage()))
            .publisherTransformer(p -> p.filter(e -> e instanceof SubscribeToShardEvent).limit(100))
            .subscriber(e -> System.out.println("Received event - " + e))
            .build();
    return client.subscribeToShard(request, responseHandler);
}
 
示例13
/**
 * Creates a SubscribeToShardResponseHandler.Visitor using the builder, which lets you register an event handler for
 * all events you're interested in instead of implementing the interface
 */
// snippet-start:[kinesis.java2.stream_example.visitor]
private static CompletableFuture<Void> responseHandlerBuilderVisitorBuilder(KinesisAsyncClient client, SubscribeToShardRequest request) {
    SubscribeToShardResponseHandler.Visitor visitor = SubscribeToShardResponseHandler.Visitor
            .builder()
            .onSubscribeToShardEvent(e -> System.out.println("Received subscribe to shard event " + e))
            .build();
    SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler
            .builder()
            .onError(t -> System.err.println("Error during stream - " + t.getMessage()))
            .subscriber(visitor)
            .build();
    return client.subscribeToShard(request, responseHandler);
}
 
示例14
/**
 * Subscribes to the stream of events by implementing the SubscribeToShardResponseHandler.Visitor interface
 */
private static CompletableFuture<Void> responseHandlerBuilderVisitor(KinesisAsyncClient client, SubscribeToShardRequest request) {
    SubscribeToShardResponseHandler.Visitor visitor = new SubscribeToShardResponseHandler.Visitor() {
        @Override
        public void visit(SubscribeToShardEvent event) {
            System.out.println("Received subscribe to shard event " + event);
        }
    };
    SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler
            .builder()
            .onError(t -> System.err.println("Error during stream - " + t.getMessage()))
            .subscriber(visitor)
            .build();
    return client.subscribeToShard(request, responseHandler);
}
 
示例15
/**
 * Creates a SubscribeToShardResponseHandler the classic way by implementing the interface
 */
// snippet-start:[kinesis.java2.stream_example.custom_handler]
private static CompletableFuture<Void> responseHandlerBuilderClassic(KinesisAsyncClient client, SubscribeToShardRequest request) {
    SubscribeToShardResponseHandler responseHandler = new SubscribeToShardResponseHandler() {

        @Override
        public void responseReceived(SubscribeToShardResponse response) {
            System.out.println("Receieved initial response");
        }

        @Override
        public void onEventStream(SdkPublisher<SubscribeToShardEventStream> publisher) {
            publisher
                    // Filter to only SubscribeToShardEvents
                    .filter(SubscribeToShardEvent.class)
                    // Flat map into a publisher of just records
                    .flatMapIterable(SubscribeToShardEvent::records)
                    // Limit to 1000 total records
                    .limit(1000)
                    // Batch records into lists of 25
                    .buffer(25)
                    // Print out each record batch
                    .subscribe(batch -> System.out.println("Record Batch - " + batch));
        }

        @Override
        public void complete() {
            System.out.println("All records stream successfully");
        }

        @Override
        public void exceptionOccurred(Throwable throwable) {
            System.err.println("Error during stream - " + throwable.getMessage());
        }
    };
    return client.subscribeToShard(request, responseHandler);
}
 
示例16
/**
 * Use the SubscribeToShardResponseHandler.Builder and a traditional subscriber
 */
// snippet-start:[kinesis.java2.stream_example.subscribe]
private static CompletableFuture<Void> responseHandlerBuilderSubscriber(KinesisAsyncClient client, SubscribeToShardRequest request) {
    SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler
            .builder()
            .onError(t -> System.err.println("Error during stream - " + t.getMessage()))
            .subscriber(MySubscriber::new)
            .build();
    return client.subscribeToShard(request, responseHandler);
}
 
示例17
/**
 * Subscribes to the publisher using the onEventStream lifecycle callback method, which allows greater control
 * over the publisher and allows transformation methods on the publisher, like map and buffer
 */
private static CompletableFuture<Void> responseHandlerBuilderOnEventStream(KinesisAsyncClient client, SubscribeToShardRequest request) {
    SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler
            .builder()
            .onError(t -> System.err.println("Error during stream - " + t.getMessage()))
            .onEventStream(p -> p.filter(SubscribeToShardEvent.class).subscribe(new MySubscriber()))
            .build();
    return client.subscribeToShard(request, responseHandler);
}