Java源码示例:com.amazonaws.services.kinesis.model.PutRecordResult
示例1
public PutRecordResult putRecord(ByteBuffer data, String partitionKey)
{
// Create record and insert into the shards. Initially just do it
// on a round robin basis.
long timestamp = System.currentTimeMillis() - 50000;
Record record = new Record();
record = record.withData(data).withPartitionKey(partitionKey).withSequenceNumber(String.valueOf(sequenceNo));
record.setApproximateArrivalTimestamp(new Date(timestamp));
if (nextShard == shards.size()) {
nextShard = 0;
}
InternalShard shard = shards.get(nextShard);
shard.addRecord(record);
PutRecordResult result = new PutRecordResult();
result.setSequenceNumber(String.valueOf(sequenceNo));
result.setShardId(shard.getShardId());
nextShard++;
sequenceNo++;
return result;
}
示例2
@Override
public PutRecordsResult putRecords(PutRecordsRequest putRecordsRequest)
throws AmazonClientException
{
// Setup method to add a batch of new records:
InternalStream theStream = this.getStream(putRecordsRequest.getStreamName());
if (theStream != null) {
PutRecordsResult result = new PutRecordsResult();
List<PutRecordsResultEntry> resultList = new ArrayList<>();
for (PutRecordsRequestEntry entry : putRecordsRequest.getRecords()) {
PutRecordResult putResult = theStream.putRecord(entry.getData(), entry.getPartitionKey());
resultList.add((new PutRecordsResultEntry()).withShardId(putResult.getShardId()).withSequenceNumber(putResult.getSequenceNumber()));
}
result.setRecords(resultList);
return result;
}
else {
throw new AmazonClientException("This stream does not exist!");
}
}
示例3
@Bean
public AsyncHandler<PutRecordRequest, PutRecordResult> asyncHandler() {
return new AsyncHandler<PutRecordRequest, PutRecordResult>() {
@Override
public void onError(Exception exception) {
}
@Override
public void onSuccess(PutRecordRequest request, PutRecordResult putRecordsResult) {
ProcessorConfiguration.this.resultMonoProcessor.onNext(putRecordsResult);
ProcessorConfiguration.this.resultMonoProcessor.onComplete();
}
};
}
示例4
/**
* Check if the input aggregated record is complete and if so, forward it to the
* configured destination Kinesis stream.
*
* @param logger The LambdaLogger from the input Context
* @param aggRecord The aggregated record to transmit or null if the record isn't full yet.
*/
private void checkAndForwardRecords(LambdaLogger logger, AggRecord aggRecord)
{
if(aggRecord == null)
{
return;
}
logger.log("Forwarding " + aggRecord.getNumUserRecords() + " as an aggregated record.");
PutRecordRequest request = aggRecord.toPutRecordRequest(DESTINATION_STREAM_NAME);
try
{
PutRecordResult result = this.kinesisForwarder.putRecord(request);
logger.log("Successfully published record Seq #" + result.getSequenceNumber() + " to shard " + result.getShardId());
}
catch(Exception e)
{
logger.log("ERROR: Failed to forward Kinesis records to destination stream: " + e.getMessage());
return;
}
}
示例5
public PutRecordResult putRecord(ByteBuffer data, String partitionKey)
{
// Create record and insert into the shards. Initially just do it
// on a round robin basis.
long ts = System.currentTimeMillis() - 50000;
Record rec = new Record();
rec = rec.withData(data).withPartitionKey(partitionKey).withSequenceNumber(String.valueOf(sequenceNo));
rec.setApproximateArrivalTimestamp(new Date(ts));
if (nextShard == shards.size()) {
nextShard = 0;
}
InternalShard shard = shards.get(nextShard);
shard.addRecord(rec);
PutRecordResult result = new PutRecordResult();
result.setSequenceNumber(String.valueOf(sequenceNo));
result.setShardId(shard.getShardId());
nextShard++;
sequenceNo++;
return result;
}
示例6
@Override
public PutRecordsResult putRecords(PutRecordsRequest putRecordsRequest) throws AmazonServiceException, AmazonClientException
{
// Setup method to add a batch of new records:
InternalStream theStream = this.getStream(putRecordsRequest.getStreamName());
if (theStream != null) {
PutRecordsResult result = new PutRecordsResult();
ArrayList<PutRecordsResultEntry> resultList = new ArrayList<PutRecordsResultEntry>();
for (PutRecordsRequestEntry entry : putRecordsRequest.getRecords()) {
PutRecordResult putResult = theStream.putRecord(entry.getData(), entry.getPartitionKey());
resultList.add((new PutRecordsResultEntry()).withShardId(putResult.getShardId()).withSequenceNumber(putResult.getSequenceNumber()));
}
result.setRecords(resultList);
return result;
}
else {
throw new AmazonClientException("This stream does not exist!");
}
}
示例7
public void run() {
while (true) {
try {
// get message from queue - blocking so code will wait here for work to do
Event event = eventsQueue.take();
PutRecordRequest put = new PutRecordRequest();
put.setStreamName(this.streamName);
put.setData(event.getData());
put.setPartitionKey(event.getPartitionKey());
PutRecordResult result = kinesisClient.putRecord(put);
logger.info(result.getSequenceNumber() + ": {}", this);
} catch (Exception e) {
// didn't get record - move on to next\
e.printStackTrace();
}
}
}
示例8
@Override
protected void runOnce() throws Exception {
ClickEvent event = inputQueue.take();
String partitionKey = event.getSessionId();
ByteBuffer data = ByteBuffer.wrap(
event.getPayload().getBytes("UTF-8"));
recordsPut.getAndIncrement();
PutRecordResult res = kinesis.putRecord(
STREAM_NAME, data, partitionKey);
MetricDatum d = new MetricDatum()
.withDimensions(
new Dimension().withName("StreamName").withValue(STREAM_NAME),
new Dimension().withName("ShardId").withValue(res.getShardId()),
new Dimension().withName("Host").withValue(
InetAddress.getLocalHost().toString()))
.withValue(1.0)
.withMetricName("RecordsPut");
cw.putMetricData(new PutMetricDataRequest()
.withMetricData(d)
.withNamespace("MySampleProducer"));
}
示例9
@Override
public PutRecordResult putRecord(PutRecordRequest putRecordRequest)
throws AmazonClientException
{
// Setup method to add a new record:
InternalStream theStream = this.getStream(putRecordRequest.getStreamName());
if (theStream != null) {
return theStream.putRecord(putRecordRequest.getData(), putRecordRequest.getPartitionKey());
}
else {
throw new AmazonClientException("This stream does not exist!");
}
}
示例10
public com.amazonaws.services.kinesis.AmazonKinesis build(AmazonKinesis kinesisProperties) {
return new AbstractAmazonKinesis() {
public PutRecordResult putRecord(PutRecordRequest request) {
// do nothing
return new PutRecordResult();
}
};
}
示例11
@Override
public void logEvent(String event, Map<String, Object> producerConfig) {
String streamName = (String) producerConfig.get("stream");
if(streamName == null){
streamName = this.streamName;
}
sequenceNumber.getAndIncrement();
try {
PutRecordRequest putRecordRequest = new PutRecordRequest();
putRecordRequest.setStreamName( streamName);
putRecordRequest.setData(generateData(event));
putRecordRequest.setPartitionKey( TIMESTAMP);
PutRecordResult putRecordResult = kinesisClient.putRecord( putRecordRequest );
} catch (Exception ex) {
//got interrupted while waiting
log.error("Error while publishing events : ", ex);
}
long totalTimeElasped = System.currentTimeMillis() - startTimeFull;
log.info("Events Published : " + sequenceNumber + " events in " + (totalTimeElasped / 1000) + " secs");
if(this.maxRecords != 0 && sequenceNumber.intValue() == maxRecords){
shutdown();
System.exit(0);
}
}
示例12
@Override
public PutRecordResult putRecord(PutRecordRequest putRecordRequest) throws AmazonServiceException, AmazonClientException
{
// Setup method to add a new record:
InternalStream theStream = this.getStream(putRecordRequest.getStreamName());
if (theStream != null) {
PutRecordResult result = theStream.putRecord(putRecordRequest.getData(), putRecordRequest.getPartitionKey());
return result;
}
else {
throw new AmazonClientException("This stream does not exist!");
}
}
示例13
public void sendMessage(String topic, String msg) {
PutRecordRequest putRecordRequest = new PutRecordRequest();
putRecordRequest.setStreamName(topic);
putRecordRequest.setPartitionKey("fakePartitionKey");
putRecordRequest.withData(ByteBuffer.wrap(msg.getBytes()));
PutRecordResult putRecordResult = kinesisClient.putRecord(putRecordRequest);
LOG.info("added record: {}", putRecordResult.getSequenceNumber());
}
示例14
/**
* This method is invoked when a log record is successfully sent to Kinesis.
* Though this is not too useful for production use cases, it provides a good
* debugging tool while tweaking parameters for the appender.
*/
@Override
public void onSuccess(PutRecordRequest request, PutRecordResult result) {
successfulRequestCount++;
if (logger.isDebugEnabled() && (successfulRequestCount + failedRequestCount) % 3000 == 0) {
logger.debug("Appender (" + appenderName + ") made " + successfulRequestCount
+ " successful put requests out of total " + (successfulRequestCount + failedRequestCount) + " in "
+ PeriodFormat.getDefault().print(new Period(startTime, DateTime.now())) + " since start");
}
}
示例15
@Override
public PutRecordResult putRecord(String s, ByteBuffer byteBuffer, String s1)
throws AmazonServiceException, AmazonClientException
{
throw new UnsupportedOperationException("MockKinesisClient doesn't support this.");
}
示例16
@Override
public PutRecordResult putRecord(String s, ByteBuffer byteBuffer, String s1, String s2)
throws AmazonServiceException, AmazonClientException
{
throw new UnsupportedOperationException("MockKinesisClient doesn't support this.");
}
示例17
@Test
@SuppressWarnings("unchecked")
public void testProducerErrorChannel() throws Exception {
KinesisTestBinder binder = getBinder();
final RuntimeException putRecordException = new RuntimeException(
"putRecordRequestEx");
final AtomicReference<Object> sent = new AtomicReference<>();
AmazonKinesisAsync amazonKinesisMock = mock(AmazonKinesisAsync.class);
BDDMockito
.given(amazonKinesisMock.putRecordAsync(any(PutRecordRequest.class),
any(AsyncHandler.class)))
.willAnswer((Answer<Future<PutRecordResult>>) (invocation) -> {
PutRecordRequest request = invocation.getArgument(0);
sent.set(request.getData());
AsyncHandler<?, ?> handler = invocation.getArgument(1);
handler.onError(putRecordException);
return mock(Future.class);
});
new DirectFieldAccessor(binder.getBinder()).setPropertyValue("amazonKinesis",
amazonKinesisMock);
ExtendedProducerProperties<KinesisProducerProperties> producerProps = createProducerProperties();
producerProps.setErrorChannelEnabled(true);
DirectChannel moduleOutputChannel = createBindableChannel("output",
createProducerBindingProperties(producerProps));
Binding<MessageChannel> producerBinding = binder.bindProducer("ec.0",
moduleOutputChannel, producerProps);
ApplicationContext applicationContext = TestUtils.getPropertyValue(
binder.getBinder(), "applicationContext", ApplicationContext.class);
SubscribableChannel ec = applicationContext.getBean("ec.0.errors",
SubscribableChannel.class);
final AtomicReference<Message<?>> errorMessage = new AtomicReference<>();
final CountDownLatch latch = new CountDownLatch(1);
ec.subscribe((message) -> {
errorMessage.set(message);
latch.countDown();
});
String messagePayload = "oops";
moduleOutputChannel.send(new GenericMessage<>(messagePayload.getBytes()));
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(errorMessage.get()).isInstanceOf(ErrorMessage.class);
assertThat(errorMessage.get().getPayload())
.isInstanceOf(AwsRequestFailureException.class);
AwsRequestFailureException exception = (AwsRequestFailureException) errorMessage
.get().getPayload();
assertThat(exception.getCause()).isSameAs(putRecordException);
assertThat(((PutRecordRequest) exception.getRequest()).getData())
.isSameAs(sent.get());
producerBinding.unbind();
}
示例18
@Override protected void doCancel() {
Future<PutRecordResult> maybeFuture = future;
if (maybeFuture != null) maybeFuture.cancel(true);
}
示例19
@Override protected boolean doIsCanceled() {
Future<PutRecordResult> maybeFuture = future;
return maybeFuture != null && maybeFuture.isCancelled();
}
示例20
@Override public void onSuccess(PutRecordRequest request, PutRecordResult result) {
callback.onSuccess(null);
}
示例21
@Override
public PutRecordResult putRecord(PutRecordRequest putRecordRequest) {
throw new RuntimeException("Not implemented");
}
示例22
@Override
public PutRecordResult putRecord(String streamName, ByteBuffer data, String partitionKey) {
throw new RuntimeException("Not implemented");
}
示例23
@Override
public PutRecordResult putRecord(
String streamName, ByteBuffer data, String partitionKey, String sequenceNumberForOrdering) {
throw new RuntimeException("Not implemented");
}
示例24
@Override
public PutRecordResult putRecord(String s, ByteBuffer byteBuffer, String s1) throws AmazonServiceException, AmazonClientException
{
throw new UnsupportedOperationException("MockKinesisClient doesn't support this.");
}
示例25
@Override
public PutRecordResult putRecord(String s, ByteBuffer byteBuffer, String s1, String s2) throws AmazonServiceException, AmazonClientException
{
throw new UnsupportedOperationException("MockKinesisClient doesn't support this.");
}
示例26
@Override
public void write(StreamsDatum entry) {
String document = (String) TypeConverterUtil.getInstance().convert(entry.getDocument(), String.class);
PutRecordRequest putRecordRequest = new PutRecordRequest()
.withStreamName(config.getStream())
.withPartitionKey(entry.getId())
.withData(ByteBuffer.wrap(document.getBytes()));
PutRecordResult putRecordResult = client.putRecord(putRecordRequest);
entry.setSequenceid(new BigInteger(putRecordResult.getSequenceNumber()));
LOGGER.debug("Wrote {}", entry);
}