Java源码示例:org.elasticsearch.index.reindex.BulkByScrollResponse
示例1
/**
* @return Map
* @Author pancm
* @Description //根据条件删除数据
* @Date 2019/3/21
* @Param []
**/
public static Map<String, Object> deleteByQuery(String index, String type, QueryBuilder[] queryBuilders) throws IOException {
if (index == null || type == null || queryBuilders == null) {
return null;
}
Map<String, Object> map = new HashMap<>();
try {
DeleteByQueryRequest request = new DeleteByQueryRequest(index, type);
if (queryBuilders != null) {
for (QueryBuilder queryBuilder : queryBuilders) {
request.setQuery(queryBuilder);
}
}
// 同步执行
BulkByScrollResponse bulkResponse = client.deleteByQuery(request, RequestOptions.DEFAULT);
// 响应结果处理
map.put("time", bulkResponse.getTook().getMillis());
map.put("total", bulkResponse.getTotal());
} finally {
if (isAutoClose) {
close();
}
}
return map;
}
示例2
@SuppressWarnings("unchecked")
private void templateDailyCron(DailyCronTestExecutionMode mode) {
Clock clock = mock(Clock.class);
ClientUtil clientUtil = mock(ClientUtil.class);
DailyCron cron = new DailyCron(clock, Duration.ofHours(24), clientUtil);
doAnswer(invocation -> {
Object[] args = invocation.getArguments();
assertTrue(String.format("The size of args is %d. Its content is %s", args.length, Arrays.toString(args)), args.length == 3);
assertTrue(args[2] instanceof ActionListener);
ActionListener<BulkByScrollResponse> listener = (ActionListener<BulkByScrollResponse>) args[2];
if (mode == DailyCronTestExecutionMode.INDEX_NOT_EXIST) {
listener.onFailure(new IndexNotFoundException("foo", "bar"));
} else if (mode == DailyCronTestExecutionMode.FAIL) {
listener.onFailure(new ElasticsearchException("bar"));
} else {
BulkByScrollResponse deleteByQueryResponse = mock(BulkByScrollResponse.class);
when(deleteByQueryResponse.getDeleted()).thenReturn(10L);
listener.onResponse(deleteByQueryResponse);
}
return null;
}).when(clientUtil).execute(eq(DeleteByQueryAction.INSTANCE), any(), any());
cron.run();
}
示例3
@Test
public void testDelete() throws Exception {
BulkByScrollResponse expected = buildBulkResponse();
when(client.deleteByQuery(any(DeleteByQueryRequest.class), any()))
.thenReturn(expected);
ElasticSearchDataCleaner elasticSearchDataCleaner = new ElasticSearchDataCleaner();
BulkByScrollResponse actual = elasticSearchDataCleaner.deleteData(client);
assertEquals(actual,expected);
}
示例4
@Test
public void testDeleteIOException() throws Exception {
when(client.deleteByQuery(any(DeleteByQueryRequest.class), any()))
.thenThrow(IOException.class);
ElasticSearchDataCleaner elasticSearchDataCleaner = new ElasticSearchDataCleaner();
BulkByScrollResponse actual = elasticSearchDataCleaner.deleteData(client);
assertNull(actual);
}
示例5
@Test
public void testDeleteException() throws Exception {
when(client.deleteByQuery(any(DeleteByQueryRequest.class), any()))
.thenThrow(Exception.class);
ElasticSearchDataCleaner elasticSearchDataCleaner = new ElasticSearchDataCleaner();
BulkByScrollResponse actual = elasticSearchDataCleaner.deleteData(client);
assertNull(actual);
}
示例6
/**
* @return boolean
* @Author pancm
* @Description 根据条件更新
* @Date 2019/3/21
* @Param []
**/
public static Map<String, Object> updateByQuery(String index, String type, QueryBuilder... queryBuilders) throws IOException {
if (index == null || type == null) {
return null;
}
Map<String, Object> map = new HashMap<>();
try {
UpdateByQueryRequest request = new UpdateByQueryRequest();
request.indices(index);
request.setDocTypes(type);
if (queryBuilders != null) {
for (QueryBuilder queryBuilder : queryBuilders) {
request.setQuery(queryBuilder);
}
}
// 同步执行
BulkByScrollResponse bulkResponse = client.updateByQuery(request, RequestOptions.DEFAULT);
// 响应结果处理
map.put("time", bulkResponse.getTook().getMillis());
map.put("total", bulkResponse.getTotal());
} finally {
if (isAutoClose) {
close();
}
}
return map;
}
示例7
/**
* 根据查询条件删除
*
* @throws IOException
*/
private static void deleteByQuery() throws IOException {
String type = "_doc";
String index = "test1";
DeleteByQueryRequest request = new DeleteByQueryRequest(index,type);
// 设置查询条件
request.setQuery(QueryBuilders.termQuery("uid",1234));
// 同步执行
BulkByScrollResponse bulkResponse = client.deleteByQuery(request, RequestOptions.DEFAULT);
// 异步执行
// client.updateByQueryAsync(request, RequestOptions.DEFAULT, listener);
// 返回结果
TimeValue timeTaken = bulkResponse.getTook();
boolean timedOut = bulkResponse.isTimedOut();
long totalDocs = bulkResponse.getTotal();
long updatedDocs = bulkResponse.getUpdated();
long deletedDocs = bulkResponse.getDeleted();
long batches = bulkResponse.getBatches();
long noops = bulkResponse.getNoops();
long versionConflicts = bulkResponse.getVersionConflicts();
long bulkRetries = bulkResponse.getBulkRetries();
long searchRetries = bulkResponse.getSearchRetries();
TimeValue throttledMillis = bulkResponse.getStatus().getThrottled();
TimeValue throttledUntilMillis = bulkResponse.getStatus().getThrottledUntil();
List<ScrollableHitSource.SearchFailure> searchFailures = bulkResponse.getSearchFailures();
List<BulkItemResponse.Failure> bulkFailures = bulkResponse.getBulkFailures();
System.out.println("查询更新总共花费了:" + timeTaken.getMillis() + " 毫秒,总条数:" + totalDocs + ",更新数:" + updatedDocs);
}
示例8
@Override
public DeleteResult execute(Delete delete, DocumentFactory factory) {
try {
final StopWatch elapsedTime = StopWatch.createStarted();
elasticClientLogger.debug(">>> delete({})", delete);
final QueryBuilder deleteQuery = ElasticQueryBuilder.buildFilterQuery(delete.getQuery(), factory, delete.getUpdateContext());
final BulkByScrollResponse response = elasticSearchClient.deleteByQuery(deleteQuery);
elapsedTime.stop();
return new DeleteResult(response.getTook().getMillis()).setElapsedTime(elapsedTime.getTime());
} catch (ElasticsearchException | IOException e) {
log.error("Cannot delete with query {}", delete.getQuery() , e);
throw new SearchServerException(
String.format("Cannot delete with query %s", delete.getQuery().toString()), e);
}
}
示例9
@Override
synchronized public void clear(boolean inferred, Resource[] contexts) {
BulkByScrollResponse response = DeleteByQueryAction.INSTANCE.newRequestBuilder(clientProvider.getClient())
.filter(getQueryBuilder(null, null, null, inferred, contexts))
.abortOnVersionConflict(false)
.source(index)
.get();
long deleted = response.getDeleted();
}
示例10
private boolean groupTagUntag(Project prj, List<String> documentIds, Script untagScript) throws IOException {
UpdateByQueryRequest updateByQuery = new UpdateByQueryRequest(prj.getId());
updateByQuery.setQuery(termsQuery("_id", documentIds.toArray(new String[0])));
updateByQuery.setConflicts("proceed");
updateByQuery.setScript(untagScript);
updateByQuery.setRefresh(esCfg.refreshPolicy.getValue().equals("true"));
BulkByScrollResponse updateResponse = client.updateByQuery(updateByQuery, RequestOptions.DEFAULT);
return updateResponse.getBulkFailures().size() == 0 && updateResponse.getUpdated() > 0 ;
}
示例11
@Override
public void deleteByQuery(List<String> indices, DeleteByQueryOptions options, Handler<AsyncResult<com.hubrick.vertx.elasticsearch.model.DeleteByQueryResponse>> resultHandler) {
final DeleteByQueryRequestBuilder deleteByQueryRequestBuilder = new DeleteByQueryRequestBuilder(client, DeleteByQueryAction.INSTANCE);
deleteByQueryRequestBuilder.source(indices.toArray(new String[indices.size()]));
if (options != null) {
populateSearchRequestBuilder(deleteByQueryRequestBuilder.source(), options);
if (options.getMaxRetries() != null) deleteByQueryRequestBuilder.setMaxRetries(options.getMaxRetries());
if (options.getSlices() != null) deleteByQueryRequestBuilder.setSlices(options.getSlices());
if (options.getWaitForActiveShards() != null)
deleteByQueryRequestBuilder.waitForActiveShards(ActiveShardCount.from(options.getWaitForActiveShards()));
if (options.getConflicts() != null)
deleteByQueryRequestBuilder.abortOnVersionConflict(Optional.ofNullable(options.getConflicts()).map(e -> !Conflicts.PROCEED.equals(e)).orElse(true));
if (options.getRequestsPerSecond() != null)
deleteByQueryRequestBuilder.setRequestsPerSecond(options.getRequestsPerSecond());
}
deleteByQueryRequestBuilder.execute(new ActionListener<BulkByScrollResponse>() {
@Override
public void onResponse(BulkByScrollResponse deleteByQueryResponse) {
resultHandler.handle(Future.succeededFuture(mapToDeleteByQueryResponse(deleteByQueryResponse)));
}
@Override
public void onFailure(Exception t) {
handleFailure(resultHandler, t);
}
});
}
示例12
public static com.hubrick.vertx.elasticsearch.model.DeleteByQueryResponse mapToDeleteByQueryResponse(BulkByScrollResponse esDeleteByQueryResponse) {
final com.hubrick.vertx.elasticsearch.model.DeleteByQueryResponse deleteByQueryResponse = new com.hubrick.vertx.elasticsearch.model.DeleteByQueryResponse();
deleteByQueryResponse.setRawResponse(readResponse(esDeleteByQueryResponse));
deleteByQueryResponse.setTookMillis(esDeleteByQueryResponse.getTook().getMillis());
deleteByQueryResponse.setTimedOut(esDeleteByQueryResponse.isTimedOut());
deleteByQueryResponse.setDeleted(esDeleteByQueryResponse.getDeleted());
deleteByQueryResponse.setBatches(esDeleteByQueryResponse.getBatches());
deleteByQueryResponse.setRetries(new Retries(esDeleteByQueryResponse.getBulkRetries(), esDeleteByQueryResponse.getSearchRetries()));
deleteByQueryResponse.setThrottledMillis(esDeleteByQueryResponse.getStatus().getThrottled().getMillis());
deleteByQueryResponse.setRequestsPerSecond(esDeleteByQueryResponse.getStatus().getRequestsPerSecond());
deleteByQueryResponse.setThrottledUntilMillis(esDeleteByQueryResponse.getStatus().getThrottledUntil().getMillis());
deleteByQueryResponse.setTotal(esDeleteByQueryResponse.getStatus().getTotal());
deleteByQueryResponse.setVersionConflicts(esDeleteByQueryResponse.getVersionConflicts());
if (esDeleteByQueryResponse.getSearchFailures() != null) {
esDeleteByQueryResponse.getSearchFailures().forEach(failure -> {
deleteByQueryResponse.addFailure(new JsonObject(failure.toString()));
});
}
if (esDeleteByQueryResponse.getBulkFailures() != null) {
esDeleteByQueryResponse.getBulkFailures().forEach(failure -> {
deleteByQueryResponse.addFailure(new JsonObject(failure.toString()));
});
}
return deleteByQueryResponse;
}
示例13
public void clearIndices(Elasticsearch7SearchIndex searchIndex) throws Exception {
String[] indices = searchIndex.getClient().admin().indices().prepareGetIndex().execute().get().indices();
for (String index : indices) {
if (index.startsWith(ES_INDEX_NAME) || index.startsWith(ES_EXTENDED_DATA_INDEX_NAME_PREFIX)) {
LOGGER.info("clearing test index: %s", index);
BulkByScrollResponse response = new DeleteByQueryRequestBuilder(searchIndex.getClient(), DeleteByQueryAction.INSTANCE)
.source(index)
.filter(QueryBuilders.matchAllQuery())
.get();
LOGGER.info("removed %d documents", response.getDeleted());
}
}
}
示例14
public void clearIndices(Elasticsearch5SearchIndex searchIndex) throws Exception {
String[] indices = searchIndex.getClient().admin().indices().prepareGetIndex().execute().get().indices();
for (String index : indices) {
if (index.startsWith(ES_INDEX_NAME) || index.startsWith(ES_EXTENDED_DATA_INDEX_NAME_PREFIX)) {
LOGGER.info("clearing test index: %s", index);
BulkByScrollResponse response = DeleteByQueryAction.INSTANCE.newRequestBuilder(searchIndex.getClient())
.source(index)
.get();
LOGGER.info("removed %d documents", response.getDeleted());
}
}
}
示例15
public int delete(String indexName, String timeBucketColumnName, long endTimeBucket) throws IOException {
indexName = formatIndexName(indexName);
DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(indexName);
deleteByQueryRequest.setAbortOnVersionConflict(false);
deleteByQueryRequest.setQuery(QueryBuilders.rangeQuery(timeBucketColumnName).lte(endTimeBucket));
BulkByScrollResponse bulkByScrollResponse = client.deleteByQuery(deleteByQueryRequest, RequestOptions.DEFAULT);
log.debug(
"delete indexName: {}, by query request: {}, response: {}", indexName, deleteByQueryRequest,
bulkByScrollResponse
);
return HttpStatus.SC_OK;
}
示例16
public BulkByScrollResponse deleteByQuery(DeleteByQueryRequest deleteByQueryRequest, RequestOptions requestOptions)
throws Exception {
return client.deleteByQuery(deleteByQueryRequest, RequestOptions.DEFAULT);
}
示例17
public BulkByScrollResponse buildBulkResponse(){
BulkByScrollResponse bulkByScrollResponse = mock(BulkByScrollResponse.class);
return bulkByScrollResponse;
}
示例18
/**
* 索引复制
*
* @throws IOException
*/
private static void reindex() throws IOException {
// 创建索引复制请求并进行索引复制
ReindexRequest request = new ReindexRequest();
// 需要复制的索引
request.setSourceIndices("user");
// 复制的目标索引
request.setDestIndex("dest_test");
// 表示如果在复制索引的时候有缺失的文档的话会进行创建,默认是index
request.setDestOpType("create");
// 如果在复制的过程中发现版本冲突,那么会继续进行复制
request.setConflicts("proceed");
// 只复制文档类型为 userindex 的数据
request.setSourceDocTypes("userindex");
// 只复制 pancm 用户的数据
request.setSourceQuery(new TermQueryBuilder("user", "pancm"));
// 设置复制文档的数量
request.setSize(10);
// 设置一次批量处理的条数,默认是1000
request.setSourceBatchSize(100);
// 进行排序
// request.addSortField("postDate", SortOrder.DESC);
// 指定切片大小
request.setSlices(2);
//设置超时时间
request.setTimeout(TimeValue.timeValueMinutes(2));
//允许刷新
request.setRefresh(true);
// 同步执行
BulkByScrollResponse bulkResponse = client.reindex(request, RequestOptions.DEFAULT);
// 异步执行
// client.reindexAsync(request, RequestOptions.DEFAULT, listener);
// 响应结果处理
TimeValue timeTaken = bulkResponse.getTook();
boolean timedOut = bulkResponse.isTimedOut();
long totalDocs = bulkResponse.getTotal();
long updatedDocs = bulkResponse.getUpdated();
long createdDocs = bulkResponse.getCreated();
long deletedDocs = bulkResponse.getDeleted();
long batches = bulkResponse.getBatches();
long noops = bulkResponse.getNoops();
long versionConflicts = bulkResponse.getVersionConflicts();
long bulkRetries = bulkResponse.getBulkRetries();
long searchRetries = bulkResponse.getSearchRetries();
TimeValue throttledMillis = bulkResponse.getStatus().getThrottled();
TimeValue throttledUntilMillis = bulkResponse.getStatus().getThrottledUntil();
List<ScrollableHitSource.SearchFailure> searchFailures = bulkResponse.getSearchFailures();
List<BulkItemResponse.Failure> bulkFailures = bulkResponse.getBulkFailures();
System.out.println("索引复制总共花费了:" + timeTaken.getMillis() + " 毫秒,总条数:" + totalDocs + ",创建数:" + createdDocs
+ ",更新数:" + updatedDocs);
}
示例19
/**
* 根据查询条件删除
* @throws IOException
*/
private static void deleteByQuery() throws IOException {
//
DeleteByQueryRequest request = new DeleteByQueryRequest("user");
// 设置查询条件
request.setQuery(new TermQueryBuilder("user", "pancm"));
// 设置复制文档的数量
request.setSize(10);
// 设置一次批量处理的条数,默认是1000
request.setBatchSize(100);
//设置路由
request.setRouting("=cat");
//设置超时时间
request.setTimeout(TimeValue.timeValueMinutes(2));
//允许刷新
request.setRefresh(true);
//索引选项
request.setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN);
// 同步执行
BulkByScrollResponse bulkResponse = client.deleteByQuery(request, RequestOptions.DEFAULT);
// 异步执行
// client.updateByQueryAsync(request, RequestOptions.DEFAULT, listener);
// 返回结果
TimeValue timeTaken = bulkResponse.getTook();
boolean timedOut = bulkResponse.isTimedOut();
long totalDocs = bulkResponse.getTotal();
long deletedDocs = bulkResponse.getDeleted();
long batches = bulkResponse.getBatches();
long noops = bulkResponse.getNoops();
long versionConflicts = bulkResponse.getVersionConflicts();
long bulkRetries = bulkResponse.getBulkRetries();
long searchRetries = bulkResponse.getSearchRetries();
TimeValue throttledMillis = bulkResponse.getStatus().getThrottled();
TimeValue throttledUntilMillis = bulkResponse.getStatus().getThrottledUntil();
List<ScrollableHitSource.SearchFailure> searchFailures = bulkResponse.getSearchFailures();
List<BulkItemResponse.Failure> bulkFailures = bulkResponse.getBulkFailures();
System.out.println("查询更新总共花费了:" + timeTaken.getMillis() + " 毫秒,总条数:" + totalDocs + ",删除数:" + deletedDocs);
}
示例20
/**
* @return Map
* @Author pancm
* @Description //重索引
* @Date 2019/3/21
* @Param []
**/
public static Map<String, Object> reindexByQuery(String index, String destIndex, QueryBuilder[] queryBuilders) throws IOException {
if (index == null || destIndex == null) {
return null;
}
Map<String, Object> map = new HashMap<>();
try {
// 创建索引复制请求并进行索引复制
ReindexRequest request = new ReindexRequest();
// 需要复制的索引
request.setSourceIndices(index);
/* 复制的目标索引 */
request.setDestIndex(destIndex);
if (queryBuilders != null) {
for (QueryBuilder queryBuilder : queryBuilders) {
request.setSourceQuery(queryBuilder);
}
}
// 表示如果在复制索引的时候有缺失的文档的话会进行创建,默认是index
request.setDestOpType("create");
// 如果在复制的过程中发现版本冲突,那么会继续进行复制
request.setConflicts("proceed");
// 设置复制文档的数量
// request.setSize(10);
// 设置一次批量处理的条数,默认是1000
// request.setSourceBatchSize(10000);
//设置超时时间
request.setTimeout(TimeValue.timeValueMinutes(2));
// 同步执行
BulkByScrollResponse bulkResponse = client.reindex(request, RequestOptions.DEFAULT);
// 响应结果处理
map.put("time", bulkResponse.getTook().getMillis());
map.put("total", bulkResponse.getTotal());
map.put("createdDocs", bulkResponse.getCreated());
map.put("updatedDocs", bulkResponse.getUpdated());
} finally {
if (isAutoClose) {
close();
}
}
return map;
}
示例21
/**
* 根据查询条件更新
*
* @throws IOException
*/
private static void updateByQuery() throws IOException {
String type = "_doc";
String index = "test1";
//
UpdateByQueryRequest request = new UpdateByQueryRequest(index,type);
// 设置查询条件
request.setQuery(new TermQueryBuilder("user", "pancm"));
BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder();
// 设置复制文档的数量
request.setSize(10);
// 设置一次批量处理的条数,默认是1000
request.setBatchSize(100);
//设置超时时间
request.setTimeout(TimeValue.timeValueMinutes(2));
//索引选项
request.setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN);
// 同步执行
BulkByScrollResponse bulkResponse = client.updateByQuery(request, RequestOptions.DEFAULT);
// 异步执行
// client.updateByQueryAsync(request, RequestOptions.DEFAULT, listener);
// 返回结果
TimeValue timeTaken = bulkResponse.getTook();
boolean timedOut = bulkResponse.isTimedOut();
long totalDocs = bulkResponse.getTotal();
long updatedDocs = bulkResponse.getUpdated();
long deletedDocs = bulkResponse.getDeleted();
long batches = bulkResponse.getBatches();
long noops = bulkResponse.getNoops();
long versionConflicts = bulkResponse.getVersionConflicts();
long bulkRetries = bulkResponse.getBulkRetries();
long searchRetries = bulkResponse.getSearchRetries();
TimeValue throttledMillis = bulkResponse.getStatus().getThrottled();
TimeValue throttledUntilMillis = bulkResponse.getStatus().getThrottledUntil();
List<ScrollableHitSource.SearchFailure> searchFailures = bulkResponse.getSearchFailures();
List<BulkItemResponse.Failure> bulkFailures = bulkResponse.getBulkFailures();
System.out.println("查询更新总共花费了:" + timeTaken.getMillis() + " 毫秒,总条数:" + totalDocs + ",更新数:" + updatedDocs);
}
示例22
public BulkByScrollResponse deleteByQuery(QueryBuilder query) throws IOException {
final DeleteByQueryRequest request = ElasticRequestUtils.getDeleteByQueryRequest(defaultIndex, query);
return client.deleteByQuery(request,RequestOptions.DEFAULT);
}