Java源码示例:io.searchbox.core.BulkResult
示例1
private long doSync( List<DataMap> list,String table,String pk) throws IOException {
long posi=0;
//List<Map> list2=new ArrayList<>();
Bulk.Builder bulk = new Bulk.Builder().defaultIndex(indexName).defaultType(Const.ES_TYPE);
for(DataMap dataMap:list){
long id=dataMap.getLong(pk);
if(id>posi) posi=id;
Map map=(convertMysql2Es(dataMap));
logger.info("[full] {}={}",table,map);
Index index = new Index.Builder(map).id(""+id).build();
bulk.addAction(index);
}
BulkResult br = jest.getJestClient().execute(bulk.build());
if(!br.isSucceeded()){
logger.error("error={}, failItems={}",br.getErrorMessage(), JSON.toJSONString(br.getFailedItems()));
// br.getFailedItems().get(0).
throw new RuntimeException("bulk error");
}
return posi;
}
示例2
@Test
public void failoverHandlerIsNotExecutedImmediatelyIfBackoffPolicyShouldNotApply() {
// given
BackoffPolicy<AbstractAction<BulkResult>> backoffPolicy = mock(BackoffPolicy.class);
when(backoffPolicy.shouldApply(any())).thenReturn(false);
Builder builder = createTestObjectFactoryBuilder();
builder.withBackoffPolicy(backoffPolicy);
JestHttpObjectFactory config = spy(builder.build());
String payload1 = "test1";
Bulk bulk = createTestBatch(payload1);
FailoverPolicy failoverPolicy = mock(FailoverPolicy.class);
Function<Bulk, Boolean> listener = config.createBatchListener(failoverPolicy);
// when
listener.apply(bulk);
// then
ArgumentCaptor<FailedItemSource> captor = ArgumentCaptor.forClass(FailedItemSource.class);
verify(failoverPolicy, never()).deliver(captor.capture());
}
示例3
/**
* Process the next item in the queue.
*/
protected void processQueue() {
try {
Collection<RequestMetric> batch = new ArrayList<>(this.batchSize);
RequestMetric rm = queue.take();
batch.add(rm);
queue.drainTo(batch, this.batchSize - 1);
Builder builder = new Bulk.Builder();
for (RequestMetric metric : batch) {
Index index = new Index.Builder(metric).refresh(false)
.index(getIndexName())
.type("request").build(); //$NON-NLS-1$
builder.addAction(index);
}
BulkResult result = getClient().execute(builder.build());
if (!result.isSucceeded()) {
logger.warn("Failed to add metric(s) to ES"); //$NON-NLS-1$
}
} catch (Exception e) {
logger.warn("Error adding metric to ES"); //$NON-NLS-1$
return;
}
}
示例4
/**
* 批量新增数据
* @param indexName
* @param typeName
* @param objs
* @return
* @throws Exception
*/
public static boolean insertBatch(JestClient jestClient,String indexName, String typeName, List<Object> objs) throws Exception {
Bulk.Builder bulk = new Bulk.Builder().defaultIndex(indexName).defaultType(typeName);
for (Object obj : objs) {
Index index = new Index.Builder(obj).build();
bulk.addAction(index);
}
BulkResult br = jestClient.execute(bulk.build());
return br.isSucceeded();
}
示例5
/**
* 批量新增数据
* @param indexName
* @param typeName
* @param objs
* @return
* @throws Exception
*/
public static boolean insertBatch(JestClient jestClient,String indexName, String typeName, List<Object> objs) throws Exception {
Bulk.Builder bulk = new Bulk.Builder().defaultIndex(indexName).defaultType(typeName);
for (Object obj : objs) {
Index index = new Index.Builder(obj).build();
bulk.addAction(index);
}
BulkResult br = jestClient.execute(bulk.build());
return br.isSucceeded();
}
示例6
private void doSync(List<ConsumerRecord> records) throws Exception {
Bulk.Builder bulk = new Bulk.Builder().defaultIndex(indexName).defaultType(Const.ES_TYPE);
for (ConsumerRecord record : records) {
logger.info("[incr] {}={}",indexName,record.value());
Row row = JSON.parseObject(record.value(), Row.class);
if(columnMap==null){
columnMap=databaseService.getColumnMap(row.getDatabase(),row.getTable());
}
String id = record.key();
if (row.getType().equalsIgnoreCase("insert") || (row.getType().equalsIgnoreCase("update"))) {
LinkedHashMap<String, Object> data = row.getData();
Map map = (convertKafka2Es(data));
Index index = new Index.Builder(map).id(id).build();
bulk.addAction(index);
} else if (row.getType().equalsIgnoreCase("delete")) {
Delete delete = new Delete.Builder(id).build();
bulk.addAction(delete);
} else {
//
}
}
BulkResult br = jest.getJestClient().execute(bulk.build());
if (!br.isSucceeded()) {
logger.error("error={}, failItems={}", br.getErrorMessage(), JSON.toJSONString(br.getFailedItems()));
// br.getFailedItems().get(0).
throw new RuntimeException("bulk error");
}
// buffer.add(record);
}
示例7
@Override
public BulkResponse execute(Bulk bulk) throws IOException {
final BulkResult result = client.execute(bulk);
if (result.isSucceeded()) {
return BulkResponse.success();
}
boolean retriable = true;
final List<Key> versionConflicts = new ArrayList<>();
final List<String> errors = new ArrayList<>();
for (BulkResult.BulkResultItem item : result.getItems()) {
if (item.error != null) {
final ObjectNode parsedError = (ObjectNode) OBJECT_MAPPER.readTree(item.error);
final String errorType = parsedError.get("type").asText("");
if ("version_conflict_engine_exception".equals(errorType)) {
versionConflicts.add(new Key(item.index, item.type, item.id));
} else if ("mapper_parse_exception".equals(errorType)) {
retriable = false;
errors.add(item.error);
} else {
errors.add(item.error);
}
}
}
if (!versionConflicts.isEmpty()) {
LOG.debug("Ignoring version conflicts for items: {}", versionConflicts);
if (errors.isEmpty()) {
// The only errors were version conflicts
return BulkResponse.success();
}
}
final String errorInfo = errors.isEmpty() ? result.getErrorMessage() : errors.toString();
return BulkResponse.failure(retriable, errorInfo);
}
示例8
public Builder withBackoffPolicy(BackoffPolicy<AbstractAction<BulkResult>> backoffPolicy) {
this.backoffPolicy = backoffPolicy;
return this;
}
示例9
@Test
public void failoverHandlerIsExecutedImmediatelyIfBackoffPolicyShouldApply() {
// given
BackoffPolicy<AbstractAction<BulkResult>> backoffPolicy = mock(BackoffPolicy.class);
when(backoffPolicy.shouldApply(any())).thenReturn(true);
Builder builder = createTestObjectFactoryBuilder();
builder.withBackoffPolicy(backoffPolicy);
JestHttpObjectFactory config = spy(builder.build());
String payload1 = "test1";
Bulk bulk = createTestBatch(payload1);
FailoverPolicy failoverPolicy = mock(FailoverPolicy.class);
Function<Bulk, Boolean> listener = config.createBatchListener(failoverPolicy);
// when
listener.apply(bulk);
// then
ArgumentCaptor<FailedItemSource> captor = ArgumentCaptor.forClass(FailedItemSource.class);
verify(failoverPolicy, times(1)).deliver(captor.capture());
assertEquals(payload1, captor.getValue().getSource());
}
示例10
@Test
public void responseHandlerDeregistersRequestFromBackoffPolicyAfterException() {
// given
BackoffPolicy<AbstractAction<BulkResult>> backoffPolicy = mock(BackoffPolicy.class);
Builder builder = createTestObjectFactoryBuilder();
builder.withBackoffPolicy(backoffPolicy);
JestHttpObjectFactory config = spy(builder.build());
String payload1 = "test1";
Bulk bulk = createTestBatch(payload1);
FailoverPolicy failoverPolicy = mock(FailoverPolicy.class);
JestResultHandler<JestResult> responseHandler = config.createResultHandler(bulk, config.createFailureHandler(failoverPolicy));
JestResult result = mock(JestResult.class);
when(result.isSucceeded()).thenReturn(false);
// when
responseHandler.completed(result);
// then
verify(backoffPolicy, times(1)).deregister(eq(bulk));
}
示例11
@Test
public void responseHandlerDeregistersRequestFromBackoffPolicyAfterSuccess() {
// given
BackoffPolicy<AbstractAction<BulkResult>> backoffPolicy = mock(BackoffPolicy.class);
Builder builder = createTestObjectFactoryBuilder();
builder.withBackoffPolicy(backoffPolicy);
JestHttpObjectFactory config = spy(builder.build());
String payload1 = "test1";
Bulk bulk = createTestBatch(payload1);
FailoverPolicy failoverPolicy = mock(FailoverPolicy.class);
JestResultHandler<JestResult> responseHandler = config.createResultHandler(bulk, config.createFailureHandler(failoverPolicy));
JestResult result = mock(JestResult.class);
when(result.isSucceeded()).thenReturn(true);
// when
responseHandler.completed(result);
// then
verify(backoffPolicy, times(1)).deregister(eq(bulk));
}
示例12
@Test
public void responseHandlerDeregistersRequestFromBackoffPolicyAfterException() {
// given
BackoffPolicy<AbstractAction<BulkResult>> backoffPolicy = mock(BackoffPolicy.class);
BufferedJestHttpObjectFactory.Builder builder = createTestObjectFactoryBuilder();
builder.withBackoffPolicy(backoffPolicy);
BufferedJestHttpObjectFactory config = spy(builder.build());
ItemSource<ByteBuf> payload1 = createDefaultTestBuffereItemSource("test1");
Bulk bulk = createTestBatch(payload1);
FailoverPolicy failoverPolicy = mock(FailoverPolicy.class);
JestResultHandler<JestResult> responseHandler = config.createResultHandler(bulk, config.createFailureHandler(failoverPolicy));
JestResult result = mock(JestResult.class);
when(result.isSucceeded()).thenReturn(false);
// when
responseHandler.completed(result);
// then
verify(backoffPolicy, times(1)).deregister(eq(bulk));
}
示例13
@Test
public void responseHandlerDeregistersRequestFromBackoffPolicyAfterSuccess() {
// given
BackoffPolicy<AbstractAction<BulkResult>> backoffPolicy = mock(BackoffPolicy.class);
BufferedJestHttpObjectFactory.Builder builder = createTestObjectFactoryBuilder();
builder.withBackoffPolicy(backoffPolicy);
BufferedJestHttpObjectFactory config = spy(builder.build());
ItemSource<ByteBuf> payload1 = createDefaultTestBuffereItemSource("test1");
Bulk bulk = createTestBatch(payload1);
FailoverPolicy failoverPolicy = mock(FailoverPolicy.class);
JestResultHandler<JestResult> responseHandler = config.createResultHandler(bulk, config.createFailureHandler(failoverPolicy));
JestResult result = mock(JestResult.class);
when(result.isSucceeded()).thenReturn(true);
// when
responseHandler.completed(result);
// then
verify(backoffPolicy, times(1)).deregister(eq(bulk));
}