Java源码示例:com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput
示例1
private Map<String, KinesisRecordProcessor> createAndInitProcessors(IRecordProcessorFactory factory, int numShards) {
Map<String, KinesisRecordProcessor> processorMap = new HashMap<>();
IntStream.range(0, numShards)
.forEach(p -> {
String shardId = String.format("shard-%05d", p);
// Create Kinesis processor
KinesisRecordProcessor processor = (KinesisRecordProcessor) factory.createProcessor();
// Initialize the shard
ExtendedSequenceNumber seqNum = new ExtendedSequenceNumber("0000");
InitializationInput initializationInput =
new InitializationInput().withShardId(shardId).withExtendedSequenceNumber(seqNum);
processor.initialize(initializationInput);
processorMap.put(shardId, processor);
});
return processorMap;
}
示例2
/**
* Invoked by the Amazon Kinesis Client Library before data records are delivered to the RecordProcessor instance
* (via processRecords).
*
* @param initializationInput Provides information related to initialization
*/
@Override
public void initialize(InitializationInput initializationInput) {
Validate.notNull(listener, "There is no listener set for the processor.");
initSeqNumber = initializationInput.getExtendedSequenceNumber();
shardId = initializationInput.getShardId();
LOG.info("Initialization done for {} with sequence {}", this,
initializationInput.getExtendedSequenceNumber().getSequenceNumber());
}
示例3
/**
* {@inheritDoc}
*/
@Override
public void initialize(InitializationInput initializationInput) {
shardId = initializationInput.getShardId();
if (LOG.isDebugEnabled()) {
LOG.debug("Initializing record processor at: {}", initializationInput.getExtendedSequenceNumber().toString());
LOG.debug("Initializing record processor for shard: {}", shardId);
}
}
示例4
@Override
public void initialize(InitializationInput initializationInput) {
checkpointCounter = 0;
}
示例5
protected IRecordProcessor createStreamProcessor() {
return new IRecordProcessor() {
@Override
public void initialize(InitializationInput initializationInput) {
}
public List<Record> extractDynamoStreamRecords(List<com.amazonaws.services.kinesis.model.Record> kinesisRecords) {
List<Record> dynamoRecords = new ArrayList<>(kinesisRecords.size());
for(com.amazonaws.services.kinesis.model.Record kinesisRecord : kinesisRecords) {
if (kinesisRecord instanceof RecordAdapter) {
Record dynamoRecord = ((RecordAdapter) kinesisRecord).getInternalObject();
dynamoRecords.add(dynamoRecord);
}
}
return dynamoRecords;
}
@Override
public void processRecords(ProcessRecordsInput processRecordsInput) {
List<Record> records = extractDynamoStreamRecords(processRecordsInput.getRecords());
DynamoDBTableReplicator.this.processRecords(records);
checkpoint(processRecordsInput.getCheckpointer());
}
@Override
public void shutdown(ShutdownInput shutdownInput) {
if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) {
checkpoint(shutdownInput.getCheckpointer());
}
}
void checkpoint(IRecordProcessorCheckpointer checkpointer) {
try {
checkpointer.checkpoint();
} catch (KinesisClientLibDependencyException|InvalidStateException|ThrottlingException|ShutdownException e) {
LOG.warn(e);
}
}
};
}
示例6
@Override
public void initialize(InitializationInput initializationInput) {}
示例7
@Override
public void initialize(InitializationInput initializationInput) {
checkpointCounter = 0;
}
示例8
@Override
public void initialize(InitializationInput initializationInput) {
kinesisShardId = initializationInput.getShardId();
}