Java源码示例:com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput

示例1
private Map<String, KinesisRecordProcessor> createAndInitProcessors(IRecordProcessorFactory factory, int numShards) {
  Map<String, KinesisRecordProcessor> processorMap = new HashMap<>();
  IntStream.range(0, numShards)
      .forEach(p -> {
        String shardId = String.format("shard-%05d", p);
        // Create Kinesis processor
        KinesisRecordProcessor processor = (KinesisRecordProcessor) factory.createProcessor();

        // Initialize the shard
        ExtendedSequenceNumber seqNum = new ExtendedSequenceNumber("0000");
        InitializationInput initializationInput =
            new InitializationInput().withShardId(shardId).withExtendedSequenceNumber(seqNum);
        processor.initialize(initializationInput);
        processorMap.put(shardId, processor);
      });
  return processorMap;
}
 
示例2
/**
 * Invoked by the Amazon Kinesis Client Library before data records are delivered to the RecordProcessor instance
 * (via processRecords).
 *
 * @param initializationInput Provides information related to initialization
 */
@Override
public void initialize(InitializationInput initializationInput) {
  Validate.notNull(listener, "There is no listener set for the processor.");
  initSeqNumber = initializationInput.getExtendedSequenceNumber();
  shardId = initializationInput.getShardId();
  LOG.info("Initialization done for {} with sequence {}", this,
      initializationInput.getExtendedSequenceNumber().getSequenceNumber());
}
 
示例3
/**
 * {@inheritDoc}
 */
@Override
public void initialize(InitializationInput initializationInput) {
  shardId = initializationInput.getShardId();
  if (LOG.isDebugEnabled()) {
    LOG.debug("Initializing record processor at: {}", initializationInput.getExtendedSequenceNumber().toString());
    LOG.debug("Initializing record processor for shard: {}", shardId);
  }
}
 
示例4
@Override
public void initialize(InitializationInput initializationInput) {
    checkpointCounter = 0;
}
 
示例5
protected IRecordProcessor createStreamProcessor() {
	return new IRecordProcessor() {

		@Override
		public void initialize(InitializationInput initializationInput) {
		}

		public List<Record> extractDynamoStreamRecords(List<com.amazonaws.services.kinesis.model.Record> kinesisRecords) {
			List<Record> dynamoRecords = new ArrayList<>(kinesisRecords.size());

			for(com.amazonaws.services.kinesis.model.Record kinesisRecord : kinesisRecords) {
				if (kinesisRecord instanceof RecordAdapter) {
					Record dynamoRecord = ((RecordAdapter) kinesisRecord).getInternalObject();
					dynamoRecords.add(dynamoRecord);
				}
			}

			return dynamoRecords;
		}

		@Override
		public void processRecords(ProcessRecordsInput processRecordsInput) {
			List<Record> records = extractDynamoStreamRecords(processRecordsInput.getRecords());

			DynamoDBTableReplicator.this.processRecords(records);

			checkpoint(processRecordsInput.getCheckpointer());
		}

		@Override
		public void shutdown(ShutdownInput shutdownInput) {
			if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) {
				checkpoint(shutdownInput.getCheckpointer());
			}
		}

		void checkpoint(IRecordProcessorCheckpointer checkpointer) {
			try {
				checkpointer.checkpoint();
			} catch (KinesisClientLibDependencyException|InvalidStateException|ThrottlingException|ShutdownException e) {
				LOG.warn(e);
			}
		}
	};
}
 
示例6
@Override
public void initialize(InitializationInput initializationInput) {}
 
示例7
@Override
public void initialize(InitializationInput initializationInput) {
    checkpointCounter = 0;
}
 
示例8
@Override
public void initialize(InitializationInput initializationInput) {
    kinesisShardId = initializationInput.getShardId();
}