Java源码示例:com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput

示例1
@Override
public void shutdown(ShutdownInput shutdownInput) {
    if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) {
        try {
            shutdownInput.getCheckpointer().checkpoint();
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    }

}
 
示例2
@Override
public void shutdown(ShutdownInput shutdownInput) {
    if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) {
        try {
            shutdownInput.getCheckpointer().checkpoint();
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    }
    
}
 
示例3
/**
 * Invoked by the Amazon Kinesis Client Library to indicate it will no longer send data records to this
 * RecordProcessor instance.
 *
 * @param shutdownInput Provides information and capabilities (eg checkpointing) related to shutdown of this record
 *        processor.
 */
@Override
public void shutdown(ShutdownInput shutdownInput) {
  LOG.info("Shutting down {} with reason:{}", this, shutdownInput.getShutdownReason());

  Validate.isTrue(!shutdownRequested, String.format("KCL called shutdown more than once for processor %s.", this));
  shutdownRequested = true;
  // shutdown reason TERMINATE indicates that the shard is closed due to re-sharding. It also indicates that all the
  // records from the shard have been delivered to the consumer and the consumer is expected to checkpoint the
  // progress.
  if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) {
    // We need to ensure that all records are processed and checkpointed before going ahead and marking the processing
    // complete by calling checkpoint() on KCL. We need to checkpoint the completion state for parent shard, for KCL
    // to consume from the child shard(s).
    try {
      LOG.info("Waiting for all the records for {} to be processed.", this);
      // Let's poll periodically and block until the last processed record is checkpointed. Also handle the case
      // where there are no records received for the processor, in which case the lastProcessedRecordSeqNumber will
      // be null.
      while (lastProcessedRecordSeqNumber != null
          && !lastProcessedRecordSeqNumber.equals(lastCheckpointedRecordSeqNumber)) {
        Thread.sleep(POLL_INTERVAL_DURING_PARENT_SHARD_SHUTDOWN_MS);
      }
      LOG.info("Final checkpoint for {} before shutting down.", this);
      shutdownInput.getCheckpointer().checkpoint();
    } catch (Exception e) {
      LOG.warn("An error occurred while committing the final checkpoint in the parent shard {}", this, e);
    }
  }
  listener.onShutdown(ssp);
}
 
示例4
static void shutDownProcessor(KinesisRecordProcessor processor, ShutdownReason reason) {
  try {
    ShutdownInput shutdownInput = Mockito.mock(ShutdownInput.class);
    when(shutdownInput.getShutdownReason()).thenReturn(reason);
    when(shutdownInput.getCheckpointer()).thenReturn(getCheckpointer(processor));
    processor.shutdown(shutdownInput);
  } catch (Exception ex) {
    throw new RuntimeException(ex);
  }
}
 
示例5
/**
 * We don't checkpoint on SHUTDOWN_REQUESTED because we currently always
 * checkpoint each batch in {@link #processRecords}.
 *
 * @param shutdownInput {@inheritDoc}
 */
@Override
public void shutdown(ShutdownInput shutdownInput) {
  LOG.info("Shutting down record processor for shard: {}", shardId);

  if (ShutdownReason.TERMINATE.equals(shutdownInput.getShutdownReason())) {
    // Shard is closed / finished processing. Checkpoint all processing up to here.
    try {
      shutdownInput.getCheckpointer().checkpoint();
      LOG.debug("Checkpointed due to record processor shutdown request.");
    } catch (InvalidStateException | ShutdownException e) {
      LOG.error("Error checkpointing batch: {}", e.toString(), e);
    }
  }
}
 
示例6
protected IRecordProcessor createStreamProcessor() {
	return new IRecordProcessor() {

		@Override
		public void initialize(InitializationInput initializationInput) {
		}

		public List<Record> extractDynamoStreamRecords(List<com.amazonaws.services.kinesis.model.Record> kinesisRecords) {
			List<Record> dynamoRecords = new ArrayList<>(kinesisRecords.size());

			for(com.amazonaws.services.kinesis.model.Record kinesisRecord : kinesisRecords) {
				if (kinesisRecord instanceof RecordAdapter) {
					Record dynamoRecord = ((RecordAdapter) kinesisRecord).getInternalObject();
					dynamoRecords.add(dynamoRecord);
				}
			}

			return dynamoRecords;
		}

		@Override
		public void processRecords(ProcessRecordsInput processRecordsInput) {
			List<Record> records = extractDynamoStreamRecords(processRecordsInput.getRecords());

			DynamoDBTableReplicator.this.processRecords(records);

			checkpoint(processRecordsInput.getCheckpointer());
		}

		@Override
		public void shutdown(ShutdownInput shutdownInput) {
			if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) {
				checkpoint(shutdownInput.getCheckpointer());
			}
		}

		void checkpoint(IRecordProcessorCheckpointer checkpointer) {
			try {
				checkpointer.checkpoint();
			} catch (KinesisClientLibDependencyException|InvalidStateException|ThrottlingException|ShutdownException e) {
				LOG.warn(e);
			}
		}
	};
}
 
示例7
@Override
public void shutdown(ShutdownInput shutdownInput) {}
 
示例8
@Override
public void shutdown(ShutdownInput shutdownInput) {
    log.info("Shutting down record processor for shard: {}", kinesisShardId);
    checkpoint(shutdownInput.getCheckpointer());
}