Java源码示例:com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput
示例1
@Override
public void shutdown(ShutdownInput shutdownInput) {
if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) {
try {
shutdownInput.getCheckpointer().checkpoint();
}
catch (Exception e) {
e.printStackTrace();
}
}
}
示例2
@Override
public void shutdown(ShutdownInput shutdownInput) {
if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) {
try {
shutdownInput.getCheckpointer().checkpoint();
}
catch (Exception e) {
e.printStackTrace();
}
}
}
示例3
/**
* Invoked by the Amazon Kinesis Client Library to indicate it will no longer send data records to this
* RecordProcessor instance.
*
* @param shutdownInput Provides information and capabilities (eg checkpointing) related to shutdown of this record
* processor.
*/
@Override
public void shutdown(ShutdownInput shutdownInput) {
LOG.info("Shutting down {} with reason:{}", this, shutdownInput.getShutdownReason());
Validate.isTrue(!shutdownRequested, String.format("KCL called shutdown more than once for processor %s.", this));
shutdownRequested = true;
// shutdown reason TERMINATE indicates that the shard is closed due to re-sharding. It also indicates that all the
// records from the shard have been delivered to the consumer and the consumer is expected to checkpoint the
// progress.
if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) {
// We need to ensure that all records are processed and checkpointed before going ahead and marking the processing
// complete by calling checkpoint() on KCL. We need to checkpoint the completion state for parent shard, for KCL
// to consume from the child shard(s).
try {
LOG.info("Waiting for all the records for {} to be processed.", this);
// Let's poll periodically and block until the last processed record is checkpointed. Also handle the case
// where there are no records received for the processor, in which case the lastProcessedRecordSeqNumber will
// be null.
while (lastProcessedRecordSeqNumber != null
&& !lastProcessedRecordSeqNumber.equals(lastCheckpointedRecordSeqNumber)) {
Thread.sleep(POLL_INTERVAL_DURING_PARENT_SHARD_SHUTDOWN_MS);
}
LOG.info("Final checkpoint for {} before shutting down.", this);
shutdownInput.getCheckpointer().checkpoint();
} catch (Exception e) {
LOG.warn("An error occurred while committing the final checkpoint in the parent shard {}", this, e);
}
}
listener.onShutdown(ssp);
}
示例4
static void shutDownProcessor(KinesisRecordProcessor processor, ShutdownReason reason) {
try {
ShutdownInput shutdownInput = Mockito.mock(ShutdownInput.class);
when(shutdownInput.getShutdownReason()).thenReturn(reason);
when(shutdownInput.getCheckpointer()).thenReturn(getCheckpointer(processor));
processor.shutdown(shutdownInput);
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}
示例5
/**
* We don't checkpoint on SHUTDOWN_REQUESTED because we currently always
* checkpoint each batch in {@link #processRecords}.
*
* @param shutdownInput {@inheritDoc}
*/
@Override
public void shutdown(ShutdownInput shutdownInput) {
LOG.info("Shutting down record processor for shard: {}", shardId);
if (ShutdownReason.TERMINATE.equals(shutdownInput.getShutdownReason())) {
// Shard is closed / finished processing. Checkpoint all processing up to here.
try {
shutdownInput.getCheckpointer().checkpoint();
LOG.debug("Checkpointed due to record processor shutdown request.");
} catch (InvalidStateException | ShutdownException e) {
LOG.error("Error checkpointing batch: {}", e.toString(), e);
}
}
}
示例6
protected IRecordProcessor createStreamProcessor() {
return new IRecordProcessor() {
@Override
public void initialize(InitializationInput initializationInput) {
}
public List<Record> extractDynamoStreamRecords(List<com.amazonaws.services.kinesis.model.Record> kinesisRecords) {
List<Record> dynamoRecords = new ArrayList<>(kinesisRecords.size());
for(com.amazonaws.services.kinesis.model.Record kinesisRecord : kinesisRecords) {
if (kinesisRecord instanceof RecordAdapter) {
Record dynamoRecord = ((RecordAdapter) kinesisRecord).getInternalObject();
dynamoRecords.add(dynamoRecord);
}
}
return dynamoRecords;
}
@Override
public void processRecords(ProcessRecordsInput processRecordsInput) {
List<Record> records = extractDynamoStreamRecords(processRecordsInput.getRecords());
DynamoDBTableReplicator.this.processRecords(records);
checkpoint(processRecordsInput.getCheckpointer());
}
@Override
public void shutdown(ShutdownInput shutdownInput) {
if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) {
checkpoint(shutdownInput.getCheckpointer());
}
}
void checkpoint(IRecordProcessorCheckpointer checkpointer) {
try {
checkpointer.checkpoint();
} catch (KinesisClientLibDependencyException|InvalidStateException|ThrottlingException|ShutdownException e) {
LOG.warn(e);
}
}
};
}
示例7
@Override
public void shutdown(ShutdownInput shutdownInput) {}
示例8
@Override
public void shutdown(ShutdownInput shutdownInput) {
log.info("Shutting down record processor for shard: {}", kinesisShardId);
checkpoint(shutdownInput.getCheckpointer());
}