Java源码示例:com.couchbase.client.java.PersistTo
示例1
private void populateBucket() {
CouchbaseEnvironment env = DefaultCouchbaseEnvironment
.builder()
.socketConnectTimeout(60000)
.connectTimeout(60000)
.keepAliveInterval(60000)
.keyValueServiceConfig(KeyValueServiceConfig.create(60)) // If skip this config, we may get TimeoutException https://forums.couchbase.com/t/kv-upsert-throwing-timeoutexception-couchbase-4-5/9399
.build();
CouchbaseCluster cluster = CouchbaseCluster.create(env, bootstrapNodes);
Bucket bucket = cluster.openBucket(bucketName, password);
LOGGER.info("Connected to bucket - " + bucketName);
assertTrue(bucket.bucketManager().flush());
JsonDocument document = JsonDocument.create("foo", JsonObject.create().put("bar", 42));
bucket.upsert(document, PersistTo.MASTER);
bucket.close();
LOGGER.info("Bucket is closed after upserting data");
if (cluster != null) {
cluster.disconnect();
}
}
示例2
@Test
public void testStaticDocId() throws Exception {
String bucketName = "bucket-1";
String docId = "doc-a";
int expiry = 100;
long cas = 200L;
String inFileData = "{\"key\":\"value\"}";
byte[] inFileDataBytes = inFileData.getBytes(StandardCharsets.UTF_8);
Bucket bucket = mock(Bucket.class);
when(bucket.upsert(any(RawJsonDocument.class), eq(PersistTo.NONE), eq(ReplicateTo.NONE)))
.thenReturn(RawJsonDocument.create(docId, expiry, inFileData, cas));
setupMockBucket(bucket);
testRunner.enqueue(inFileDataBytes);
testRunner.setProperty(BUCKET_NAME, bucketName);
testRunner.setProperty(DOC_ID, docId);
testRunner.run();
verify(bucket, times(1)).upsert(any(RawJsonDocument.class), eq(PersistTo.NONE), eq(ReplicateTo.NONE));
testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
testRunner.assertTransferCount(REL_SUCCESS, 1);
testRunner.assertTransferCount(REL_RETRY, 0);
testRunner.assertTransferCount(REL_FAILURE, 0);
MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
outFile.assertContentEquals(inFileData);
outFile.assertAttributeEquals(CouchbaseAttributes.Cluster.key(), SERVICE_ID);
outFile.assertAttributeEquals(CouchbaseAttributes.Bucket.key(), bucketName);
outFile.assertAttributeEquals(CouchbaseAttributes.DocId.key(), docId);
outFile.assertAttributeEquals(CouchbaseAttributes.Cas.key(), String.valueOf(cas));
outFile.assertAttributeEquals(CouchbaseAttributes.Expiry.key(), String.valueOf(expiry));
}
示例3
@Test
public void testDurabilityConstraint() throws Exception {
String docId = "doc-a";
String inFileData = "{\"key\":\"value\"}";
byte[] inFileDataBytes = inFileData.getBytes(StandardCharsets.UTF_8);
Bucket bucket = mock(Bucket.class);
when(bucket.upsert(any(RawJsonDocument.class), eq(PersistTo.MASTER), eq(ReplicateTo.ONE)))
.thenReturn(RawJsonDocument.create(docId, inFileData));
setupMockBucket(bucket);
testRunner.enqueue(inFileDataBytes);
testRunner.setProperty(DOC_ID, docId);
testRunner.setProperty(PutCouchbaseKey.PERSIST_TO, PersistTo.MASTER.toString());
testRunner.setProperty(PutCouchbaseKey.REPLICATE_TO, ReplicateTo.ONE.toString());
testRunner.run();
verify(bucket, times(1)).upsert(any(RawJsonDocument.class), eq(PersistTo.MASTER), eq(ReplicateTo.ONE));
testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
testRunner.assertTransferCount(REL_SUCCESS, 1);
testRunner.assertTransferCount(REL_RETRY, 0);
testRunner.assertTransferCount(REL_FAILURE, 0);
MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
outFile.assertContentEquals(inFileData);
}
示例4
@Test
public void testDocIdExp() throws Exception {
String docIdExp = "${'someProperty'}";
String somePropertyValue = "doc-p";
String inFileData = "{\"key\":\"value\"}";
byte[] inFileDataBytes = inFileData.getBytes(StandardCharsets.UTF_8);
Bucket bucket = mock(Bucket.class);
when(bucket.upsert(any(RawJsonDocument.class), eq(PersistTo.NONE), eq(ReplicateTo.NONE)))
.thenReturn(RawJsonDocument.create(somePropertyValue, inFileData));
setupMockBucket(bucket);
testRunner.setProperty(DOC_ID, docIdExp);
Map<String, String> properties = new HashMap<>();
properties.put("someProperty", somePropertyValue);
testRunner.enqueue(inFileDataBytes, properties);
testRunner.run();
ArgumentCaptor<RawJsonDocument> capture = ArgumentCaptor.forClass(RawJsonDocument.class);
verify(bucket, times(1)).upsert(capture.capture(), eq(PersistTo.NONE), eq(ReplicateTo.NONE));
assertEquals(somePropertyValue, capture.getValue().id());
assertEquals(inFileData, capture.getValue().content());
testRunner.assertTransferCount(REL_SUCCESS, 1);
testRunner.assertTransferCount(REL_RETRY, 0);
testRunner.assertTransferCount(REL_FAILURE, 0);
MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
outFile.assertContentEquals(inFileData);
}
示例5
@Test
public void testInvalidDocIdExp() throws Exception {
String docIdExp = "${invalid_function(someProperty)}";
String somePropertyValue = "doc-p";
String inFileData = "{\"key\":\"value\"}";
byte[] inFileDataBytes = inFileData.getBytes(StandardCharsets.UTF_8);
Bucket bucket = mock(Bucket.class);
when(bucket.upsert(any(RawJsonDocument.class), eq(PersistTo.NONE), eq(ReplicateTo.NONE)))
.thenReturn(RawJsonDocument.create(somePropertyValue, inFileData));
setupMockBucket(bucket);
testRunner.setProperty(DOC_ID, docIdExp);
Map<String, String> properties = new HashMap<>();
properties.put("someProperty", somePropertyValue);
testRunner.enqueue(inFileDataBytes, properties);
try {
testRunner.run();
fail("Exception should be thrown.");
} catch (AssertionError e){
Assert.assertTrue(e.getCause().getClass().equals(AttributeExpressionLanguageException.class));
}
testRunner.assertTransferCount(REL_SUCCESS, 0);
testRunner.assertTransferCount(REL_RETRY, 0);
testRunner.assertTransferCount(REL_FAILURE, 0);
}
示例6
@Test
public void testInputFlowFileUuid() throws Exception {
String uuid = "00029362-5106-40e8-b8a9-bf2cecfbc0d7";
String inFileData = "{\"key\":\"value\"}";
byte[] inFileDataBytes = inFileData.getBytes(StandardCharsets.UTF_8);
Bucket bucket = mock(Bucket.class);
when(bucket.upsert(any(RawJsonDocument.class), eq(PersistTo.NONE), eq(ReplicateTo.NONE)))
.thenReturn(RawJsonDocument.create(uuid, inFileData));
setupMockBucket(bucket);
Map<String, String> properties = new HashMap<>();
properties.put(CoreAttributes.UUID.key(), uuid);
testRunner.enqueue(inFileDataBytes, properties);
testRunner.run();
ArgumentCaptor<RawJsonDocument> capture = ArgumentCaptor.forClass(RawJsonDocument.class);
verify(bucket, times(1)).upsert(capture.capture(), eq(PersistTo.NONE), eq(ReplicateTo.NONE));
assertEquals(uuid, capture.getValue().id());
assertEquals(inFileData, capture.getValue().content());
testRunner.assertTransferCount(REL_SUCCESS, 1);
testRunner.assertTransferCount(REL_RETRY, 0);
testRunner.assertTransferCount(REL_FAILURE, 0);
MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
outFile.assertContentEquals(inFileData);
}
示例7
@Test
public void testCouchbaseFailure() throws Exception {
String docId = "doc-a";
String inFileData = "{\"key\":\"value\"}";
byte[] inFileDataBytes = inFileData.getBytes(StandardCharsets.UTF_8);
Bucket bucket = mock(Bucket.class);
when(bucket.upsert(any(RawJsonDocument.class), eq(PersistTo.NONE), eq(ReplicateTo.ONE)))
.thenThrow(new ServiceNotAvailableException());
setupMockBucket(bucket);
testRunner.enqueue(inFileDataBytes);
testRunner.setProperty(DOC_ID, docId);
testRunner.setProperty(PutCouchbaseKey.REPLICATE_TO, ReplicateTo.ONE.toString());
try {
testRunner.run();
fail("ProcessException should be thrown.");
} catch (AssertionError e){
Assert.assertTrue(e.getCause().getClass().equals(ProcessException.class));
}
verify(bucket, times(1)).upsert(any(RawJsonDocument.class), eq(PersistTo.NONE), eq(ReplicateTo.ONE));
testRunner.assertAllFlowFilesTransferred(REL_FAILURE);
testRunner.assertTransferCount(REL_SUCCESS, 0);
testRunner.assertTransferCount(REL_RETRY, 0);
testRunner.assertTransferCount(REL_FAILURE, 0);
}
示例8
@Test
public void testCouchbaseTempFlowFileError() throws Exception {
String docId = "doc-a";
String inFileData = "{\"key\":\"value\"}";
byte[] inFileDataBytes = inFileData.getBytes(StandardCharsets.UTF_8);
Bucket bucket = mock(Bucket.class);
CouchbaseException exception = new DurabilityException();
when(bucket.upsert(any(RawJsonDocument.class), eq(PersistTo.NONE), eq(ReplicateTo.ONE)))
.thenThrow(exception);
setupMockBucket(bucket);
testRunner.enqueue(inFileDataBytes);
testRunner.setProperty(DOC_ID, docId);
testRunner.setProperty(PutCouchbaseKey.REPLICATE_TO, ReplicateTo.ONE.toString());
testRunner.run();
verify(bucket, times(1)).upsert(any(RawJsonDocument.class), eq(PersistTo.NONE), eq(ReplicateTo.ONE));
testRunner.assertTransferCount(REL_SUCCESS, 0);
testRunner.assertTransferCount(REL_RETRY, 1);
testRunner.assertTransferCount(REL_FAILURE, 0);
MockFlowFile orgFile = testRunner.getFlowFilesForRelationship(REL_RETRY).get(0);
orgFile.assertContentEquals(inFileData);
orgFile.assertAttributeEquals(Exception.key(), exception.getClass().getName());
}
示例9
private static CouchbaseTargetConfig getDefaultConfig() {
CouchbaseTargetConfig config = new CouchbaseTargetConfig();
config.couchbase.nodes = "localhost";
config.couchbase.bucket = BUCKET;
config.couchbase.kvTimeout = 2500;
config.couchbase.connectTimeout = 5000;
config.couchbase.disconnectTimeout = 25000;
config.couchbase.tls.tlsEnabled = false;
config.credentials.version = AuthenticationType.USER;
config.credentials.userName = () -> USERNAME;
config.credentials.userPassword = () -> PASSWORD;
config.documentKeyEL = "myDocumentKey";
config.documentTtlEL = "0";
config.defaultWriteOperation = WriteOperationType.UPSERT;
config.unsupportedOperation = UnsupportedOperationType.TOERROR;
config.useCas = true;
config.allowSubdoc = true;
config.subdocPathEL = "";
config.subdocOperationEL = "";
config.replicateTo = ReplicateTo.NONE;
config.persistTo = PersistTo.NONE;
config.dataFormatConfig = new DataGeneratorFormatConfig();
config.dataFormatConfig.charset = "UTF-8";
config.dataFormat = DataFormat.JSON;
return config;
}
示例10
@Test
public void testStaticDocId() throws Exception {
String bucketName = "bucket-1";
String docId = "doc-a";
int expiry = 100;
long cas = 200L;
String inFileData = "{\"key\":\"value\"}";
byte[] inFileDataBytes = inFileData.getBytes(StandardCharsets.UTF_8);
Bucket bucket = mock(Bucket.class);
when(bucket.upsert(any(RawJsonDocument.class), eq(PersistTo.NONE), eq(ReplicateTo.NONE)))
.thenReturn(RawJsonDocument.create(docId, expiry, inFileData, cas));
setupMockBucket(bucket);
testRunner.enqueue(inFileDataBytes);
testRunner.setProperty(BUCKET_NAME, bucketName);
testRunner.setProperty(DOC_ID, docId);
testRunner.run();
verify(bucket, times(1)).upsert(any(RawJsonDocument.class), eq(PersistTo.NONE), eq(ReplicateTo.NONE));
testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
testRunner.assertTransferCount(REL_SUCCESS, 1);
testRunner.assertTransferCount(REL_RETRY, 0);
testRunner.assertTransferCount(REL_FAILURE, 0);
MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
outFile.assertContentEquals(inFileData);
outFile.assertAttributeEquals(CouchbaseAttributes.Cluster.key(), SERVICE_ID);
outFile.assertAttributeEquals(CouchbaseAttributes.Bucket.key(), bucketName);
outFile.assertAttributeEquals(CouchbaseAttributes.DocId.key(), docId);
outFile.assertAttributeEquals(CouchbaseAttributes.Cas.key(), String.valueOf(cas));
outFile.assertAttributeEquals(CouchbaseAttributes.Expiry.key(), String.valueOf(expiry));
}
示例11
@Test
public void testBinaryDoc() throws Exception {
String bucketName = "bucket-1";
String docId = "doc-a";
int expiry = 100;
long cas = 200L;
String inFileData = "12345";
byte[] inFileDataBytes = inFileData.getBytes(StandardCharsets.UTF_8);
Bucket bucket = mock(Bucket.class);
when(bucket.upsert(any(ByteArrayDocument.class), eq(PersistTo.NONE), eq(ReplicateTo.NONE)))
.thenReturn(ByteArrayDocument.create(docId, expiry, Unpooled.copiedBuffer(inFileData.getBytes(StandardCharsets.UTF_8)).array(), cas));
setupMockBucket(bucket);
testRunner.enqueue(inFileDataBytes);
testRunner.setProperty(BUCKET_NAME, bucketName);
testRunner.setProperty(DOC_ID, docId);
testRunner.setProperty(DOCUMENT_TYPE, DocumentType.Binary.name());
testRunner.run();
verify(bucket, times(1)).upsert(any(ByteArrayDocument.class), eq(PersistTo.NONE), eq(ReplicateTo.NONE));
testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
testRunner.assertTransferCount(REL_SUCCESS, 1);
testRunner.assertTransferCount(REL_RETRY, 0);
testRunner.assertTransferCount(REL_FAILURE, 0);
MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
outFile.assertContentEquals(inFileData);
outFile.assertAttributeEquals(CouchbaseAttributes.Cluster.key(), SERVICE_ID);
outFile.assertAttributeEquals(CouchbaseAttributes.Bucket.key(), bucketName);
outFile.assertAttributeEquals(CouchbaseAttributes.DocId.key(), docId);
outFile.assertAttributeEquals(CouchbaseAttributes.Cas.key(), String.valueOf(cas));
outFile.assertAttributeEquals(CouchbaseAttributes.Expiry.key(), String.valueOf(expiry));
}
示例12
@Test
public void testDurabilityConstraint() throws Exception {
String docId = "doc-a";
String inFileData = "{\"key\":\"value\"}";
byte[] inFileDataBytes = inFileData.getBytes(StandardCharsets.UTF_8);
Bucket bucket = mock(Bucket.class);
when(bucket.upsert(any(RawJsonDocument.class), eq(PersistTo.MASTER), eq(ReplicateTo.ONE)))
.thenReturn(RawJsonDocument.create(docId, inFileData));
setupMockBucket(bucket);
testRunner.enqueue(inFileDataBytes);
testRunner.setProperty(DOC_ID, docId);
testRunner.setProperty(PutCouchbaseKey.PERSIST_TO, PersistTo.MASTER.toString());
testRunner.setProperty(PutCouchbaseKey.REPLICATE_TO, ReplicateTo.ONE.toString());
testRunner.run();
verify(bucket, times(1)).upsert(any(RawJsonDocument.class), eq(PersistTo.MASTER), eq(ReplicateTo.ONE));
testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
testRunner.assertTransferCount(REL_SUCCESS, 1);
testRunner.assertTransferCount(REL_RETRY, 0);
testRunner.assertTransferCount(REL_FAILURE, 0);
MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
outFile.assertContentEquals(inFileData);
}
示例13
@Test
public void testDocIdExp() throws Exception {
String docIdExp = "${'someProperty'}";
String somePropertyValue = "doc-p";
String inFileData = "{\"key\":\"value\"}";
byte[] inFileDataBytes = inFileData.getBytes(StandardCharsets.UTF_8);
Bucket bucket = mock(Bucket.class);
when(bucket.upsert(any(RawJsonDocument.class), eq(PersistTo.NONE), eq(ReplicateTo.NONE)))
.thenReturn(RawJsonDocument.create(somePropertyValue, inFileData));
setupMockBucket(bucket);
testRunner.setProperty(DOC_ID, docIdExp);
Map<String, String> properties = new HashMap<>();
properties.put("someProperty", somePropertyValue);
testRunner.enqueue(inFileDataBytes, properties);
testRunner.run();
ArgumentCaptor<RawJsonDocument> capture = ArgumentCaptor.forClass(RawJsonDocument.class);
verify(bucket, times(1)).upsert(capture.capture(), eq(PersistTo.NONE), eq(ReplicateTo.NONE));
assertEquals(somePropertyValue, capture.getValue().id());
assertEquals(inFileData, capture.getValue().content());
testRunner.assertTransferCount(REL_SUCCESS, 1);
testRunner.assertTransferCount(REL_RETRY, 0);
testRunner.assertTransferCount(REL_FAILURE, 0);
MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
outFile.assertContentEquals(inFileData);
}
示例14
@Test
public void testInvalidDocIdExp() throws Exception {
String docIdExp = "${invalid_function(someProperty)}";
String somePropertyValue = "doc-p";
String inFileData = "{\"key\":\"value\"}";
byte[] inFileDataBytes = inFileData.getBytes(StandardCharsets.UTF_8);
Bucket bucket = mock(Bucket.class);
when(bucket.upsert(any(RawJsonDocument.class), eq(PersistTo.NONE), eq(ReplicateTo.NONE)))
.thenReturn(RawJsonDocument.create(somePropertyValue, inFileData));
setupMockBucket(bucket);
testRunner.setProperty(DOC_ID, docIdExp);
Map<String, String> properties = new HashMap<>();
properties.put("someProperty", somePropertyValue);
testRunner.enqueue(inFileDataBytes, properties);
try {
testRunner.run();
fail("Exception should be thrown.");
} catch (AssertionError e){
Assert.assertTrue(e.getCause().getClass().equals(AttributeExpressionLanguageException.class));
}
testRunner.assertTransferCount(REL_SUCCESS, 0);
testRunner.assertTransferCount(REL_RETRY, 0);
testRunner.assertTransferCount(REL_FAILURE, 0);
}
示例15
@Test
public void testInputFlowFileUuid() throws Exception {
String uuid = "00029362-5106-40e8-b8a9-bf2cecfbc0d7";
String inFileData = "{\"key\":\"value\"}";
byte[] inFileDataBytes = inFileData.getBytes(StandardCharsets.UTF_8);
Bucket bucket = mock(Bucket.class);
when(bucket.upsert(any(RawJsonDocument.class), eq(PersistTo.NONE), eq(ReplicateTo.NONE)))
.thenReturn(RawJsonDocument.create(uuid, inFileData));
setupMockBucket(bucket);
Map<String, String> properties = new HashMap<>();
properties.put(CoreAttributes.UUID.key(), uuid);
testRunner.enqueue(inFileDataBytes, properties);
testRunner.run();
ArgumentCaptor<RawJsonDocument> capture = ArgumentCaptor.forClass(RawJsonDocument.class);
verify(bucket, times(1)).upsert(capture.capture(), eq(PersistTo.NONE), eq(ReplicateTo.NONE));
assertEquals(inFileData, capture.getValue().content());
testRunner.assertTransferCount(REL_SUCCESS, 1);
testRunner.assertTransferCount(REL_RETRY, 0);
testRunner.assertTransferCount(REL_FAILURE, 0);
MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
outFile.assertContentEquals(inFileData);
}
示例16
@Test
public void testCouchbaseFailure() throws Exception {
String docId = "doc-a";
String inFileData = "{\"key\":\"value\"}";
byte[] inFileDataBytes = inFileData.getBytes(StandardCharsets.UTF_8);
Bucket bucket = mock(Bucket.class);
when(bucket.upsert(any(RawJsonDocument.class), eq(PersistTo.NONE), eq(ReplicateTo.ONE)))
.thenThrow(new ServiceNotAvailableException());
setupMockBucket(bucket);
testRunner.enqueue(inFileDataBytes);
testRunner.setProperty(DOC_ID, docId);
testRunner.setProperty(PutCouchbaseKey.REPLICATE_TO, ReplicateTo.ONE.toString());
try {
testRunner.run();
fail("ProcessException should be thrown.");
} catch (AssertionError e){
Assert.assertTrue(e.getCause().getClass().equals(ProcessException.class));
}
verify(bucket, times(1)).upsert(any(RawJsonDocument.class), eq(PersistTo.NONE), eq(ReplicateTo.ONE));
testRunner.assertAllFlowFilesTransferred(REL_FAILURE);
testRunner.assertTransferCount(REL_SUCCESS, 0);
testRunner.assertTransferCount(REL_RETRY, 0);
testRunner.assertTransferCount(REL_FAILURE, 0);
}
示例17
@Test
public void testCouchbaseTempFlowFileError() throws Exception {
String docId = "doc-a";
String inFileData = "{\"key\":\"value\"}";
byte[] inFileDataBytes = inFileData.getBytes(StandardCharsets.UTF_8);
Bucket bucket = mock(Bucket.class);
CouchbaseException exception = new DurabilityException();
when(bucket.upsert(any(RawJsonDocument.class), eq(PersistTo.NONE), eq(ReplicateTo.ONE)))
.thenThrow(exception);
setupMockBucket(bucket);
testRunner.enqueue(inFileDataBytes);
testRunner.setProperty(DOC_ID, docId);
testRunner.setProperty(PutCouchbaseKey.REPLICATE_TO, ReplicateTo.ONE.toString());
testRunner.run();
verify(bucket, times(1)).upsert(any(RawJsonDocument.class), eq(PersistTo.NONE), eq(ReplicateTo.ONE));
testRunner.assertTransferCount(REL_SUCCESS, 0);
testRunner.assertTransferCount(REL_RETRY, 1);
testRunner.assertTransferCount(REL_FAILURE, 0);
MockFlowFile orgFile = testRunner.getFlowFilesForRelationship(REL_RETRY).get(0);
orgFile.assertContentEquals(inFileData);
orgFile.assertAttributeEquals(Exception.key(), exception.getClass().getName());
}
示例18
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final ComponentLog logger = getLogger();
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final byte[] content = new byte[(int) flowFile.getSize()];
session.read(flowFile, new InputStreamCallback() {
@Override
public void process(final InputStream in) throws IOException {
StreamUtils.fillBuffer(in, content, true);
}
});
String docId = flowFile.getAttribute(CoreAttributes.UUID.key());
if (!StringUtils.isEmpty(context.getProperty(DOC_ID).getValue())) {
docId = context.getProperty(DOC_ID).evaluateAttributeExpressions(flowFile).getValue();
}
try {
Document<?> doc = null;
final DocumentType documentType = DocumentType.valueOf(context.getProperty(DOCUMENT_TYPE).getValue());
switch (documentType) {
case Json: {
doc = RawJsonDocument.create(docId, new String(content, StandardCharsets.UTF_8));
break;
}
case Binary: {
final ByteBuf buf = Unpooled.copiedBuffer(content);
doc = BinaryDocument.create(docId, buf);
break;
}
}
final PersistTo persistTo = PersistTo.valueOf(context.getProperty(PERSIST_TO).getValue());
final ReplicateTo replicateTo = ReplicateTo.valueOf(context.getProperty(REPLICATE_TO).getValue());
doc = openBucket(context).upsert(doc, persistTo, replicateTo);
final Map<String, String> updatedAttrs = new HashMap<>();
updatedAttrs.put(CouchbaseAttributes.Cluster.key(), context.getProperty(COUCHBASE_CLUSTER_SERVICE).getValue());
updatedAttrs.put(CouchbaseAttributes.Bucket.key(), context.getProperty(BUCKET_NAME).getValue());
updatedAttrs.put(CouchbaseAttributes.DocId.key(), docId);
updatedAttrs.put(CouchbaseAttributes.Cas.key(), String.valueOf(doc.cas()));
updatedAttrs.put(CouchbaseAttributes.Expiry.key(), String.valueOf(doc.expiry()));
flowFile = session.putAllAttributes(flowFile, updatedAttrs);
session.getProvenanceReporter().send(flowFile, getTransitUrl(context, docId));
session.transfer(flowFile, REL_SUCCESS);
} catch (final CouchbaseException e) {
String errMsg = String.format("Writing document %s to Couchbase Server using %s failed due to %s", docId, flowFile, e);
handleCouchbaseException(context, session, logger, flowFile, e, errMsg);
}
}
示例19
public PersistToChooserValues() {
super(PersistTo.class);
}
示例20
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final ComponentLog logger = getLogger();
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final byte[] content = new byte[(int) flowFile.getSize()];
session.read(flowFile, new InputStreamCallback() {
@Override
public void process(final InputStream in) throws IOException {
StreamUtils.fillBuffer(in, content, true);
}
});
String docId = flowFile.getAttribute(CoreAttributes.UUID.key());
if (context.getProperty(DOC_ID).isSet()) {
docId = context.getProperty(DOC_ID).evaluateAttributeExpressions(flowFile).getValue();
}
try {
Document<?> doc = null;
final DocumentType documentType = DocumentType.valueOf(context.getProperty(DOCUMENT_TYPE).getValue());
switch (documentType) {
case Json: {
doc = RawJsonDocument.create(docId, new String(content, StandardCharsets.UTF_8));
break;
}
case Binary: {
doc = ByteArrayDocument.create(docId, content);
break;
}
}
final PersistTo persistTo = PersistTo.valueOf(context.getProperty(PERSIST_TO).getValue());
final ReplicateTo replicateTo = ReplicateTo.valueOf(context.getProperty(REPLICATE_TO).getValue());
final Bucket bucket = openBucket(context);
doc = bucket.upsert(doc, persistTo, replicateTo);
final Map<String, String> updatedAttrs = new HashMap<>();
updatedAttrs.put(CouchbaseAttributes.Cluster.key(), context.getProperty(COUCHBASE_CLUSTER_SERVICE).getValue());
updatedAttrs.put(CouchbaseAttributes.Bucket.key(), bucket.name());
updatedAttrs.put(CouchbaseAttributes.DocId.key(), docId);
updatedAttrs.put(CouchbaseAttributes.Cas.key(), String.valueOf(doc.cas()));
updatedAttrs.put(CouchbaseAttributes.Expiry.key(), String.valueOf(doc.expiry()));
flowFile = session.putAllAttributes(flowFile, updatedAttrs);
session.getProvenanceReporter().send(flowFile, getTransitUrl(bucket, docId));
session.transfer(flowFile, REL_SUCCESS);
} catch (final CouchbaseException e) {
String errMsg = String.format("Writing document %s to Couchbase Server using %s failed due to %s", docId, flowFile, e);
handleCouchbaseException(context, session, logger, flowFile, e, errMsg);
}
}