Java源码示例:com.amazonaws.services.kinesisfirehose.model.Record
示例1
/**
* Method to perform PutRecordBatch operation with the given record list.
*
* @param recordList
* the collection of records
* @return the output of PutRecordBatch
*/
private PutRecordBatchResult putRecordBatch(List<Record> recordList) {
PutRecordBatchRequest putRecordBatchRequest = new PutRecordBatchRequest();
putRecordBatchRequest.setDeliveryStreamName(deliveryStreamName);
putRecordBatchRequest.setRecords(recordList);
// Put Record Batch records. Max No.Of Records we can put in a
// single put record batch request is 500 and total size < 4MB
PutRecordBatchResult putRecordBatchResult = null;
try {
putRecordBatchResult = firehoseClient.putRecordBatch(putRecordBatchRequest);
}catch(AmazonKinesisFirehoseException akfe){
System.out.println("Amazon Kinesis Firehose Exception:" + akfe.getLocalizedMessage());
}catch(Exception e){
System.out.println("Connector Exception" + e.getLocalizedMessage());
}
return putRecordBatchResult;
}
示例2
/**
* @param sinkRecords
*/
private void putRecordsInBatch(Collection<SinkRecord> sinkRecords) {
List<Record> recordList = new ArrayList<Record>();
int recordsInBatch = 0;
int recordsSizeInBytes = 0;
for (SinkRecord sinkRecord : sinkRecords) {
Record record = DataUtility.createRecord(sinkRecord);
recordList.add(record);
recordsInBatch++;
recordsSizeInBytes += record.getData().capacity();
if (recordsInBatch == batchSize || recordsSizeInBytes > batchSizeInBytes) {
putRecordBatch(recordList);
recordList.clear();
recordsInBatch = 0;
recordsSizeInBytes = 0;
}
}
if (recordsInBatch > 0) {
putRecordBatch(recordList);
}
}
示例3
@Override
public boolean add(InternalEvent ievent) throws IllegalStateException, IOException {
if (dataRecords.size() >= MAX_RECORDS) {
logger.trace("hit record index max");
throw new IllegalStateException("reached max payload size");
}
byte[] record = this.serializer.serialize(ievent);
/*
* Restrict size of individual record
*/
if (record.length > MAX_RECORD_SIZE) {
throw new IOException(
"serialized event is " + record.length + " larger than max of " + MAX_RECORD_SIZE);
}
ByteBuffer data = ByteBuffer.wrap(record);
dataRecords.add(new Record().withData(data));
return true;
}
示例4
@Override
public void close() {
if (this.cos.getByteCount() != 0 && this.dataRecords.size() < MAX_RECORDS) {
logger.trace("flushing remainder of buffer");
ByteBuffer data = ByteBuffer.wrap(baos.toByteArray());
this.dataRecords.add(new Record().withData(data));
}
try {
this.baos.close();
} catch (IOException e) {
}
}
示例5
@Override
public ArrayList<Record> getInternalBuffer() {
return this.dataRecords;
}
示例6
@Override
public boolean add(InternalEvent ievent) throws IllegalStateException, IOException {
byte[] record = serializer.serialize(ievent);
/*
* Restrict size of individual record
*/
if (record.length > MAX_RECORD_SIZE) {
throw new IOException(
"serialized event is " + record.length + " larger than max of " + MAX_RECORD_SIZE);
}
/*
* Write record if there's room in buffer
*/
if (dataRecords.size() >= MAX_RECORDS) {
logger.trace("hit record index max");
throw new IllegalStateException("reached max payload size");
} else {
if (cos.getByteCount() + record.length < MAX_RECORD_SIZE) {
cos.write(record);
return true;
}
/*
* If current record is full then flush buffer to a Firehose Record and create a new buffer
*/
logger.trace("creating new datarecord");
ByteBuffer data = ByteBuffer.wrap(baos.toByteArray());
this.dataRecords.add(new Record().withData(data));
baos.reset();
cos.resetByteCount();
cos.resetCount();
/*
* If we hit the max number of Firehose Records (4) then notify IPC service that this buffer
* needs to be sent.
*/
if (dataRecords.size() >= MAX_RECORDS) {
logger.trace("hit record index max");
throw new IllegalStateException("reached max payload size");
}
/*
* Otherwise write the record to the empty internal buffer
*/
cos.write(record);
}
return true;
}
示例7
@Override
public ArrayList<Record> getInternalBuffer() {
return this.dataRecords;
}
示例8
/**
* Converts Kafka record into Kinesis record
*
* @param sinkRecord
* Kafka unit of message
* @return Kinesis unit of message
*/
public static Record createRecord(SinkRecord sinkRecord) {
return new Record().withData(parseValue(sinkRecord.valueSchema(), sinkRecord.value()));
}
示例9
public abstract ArrayList<Record> getInternalBuffer();