Java源码示例:org.elasticsearch.transport.ReceiveTimeoutTransportException

示例1
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {

    FlowFile flowFile = session.get();
    if (flowFile == null) {
        return;
    }

    final String index = context.getProperty(INDEX).evaluateAttributeExpressions(flowFile).getValue();
    final String docId = context.getProperty(DOC_ID).evaluateAttributeExpressions(flowFile).getValue();
    final String docType = context.getProperty(TYPE).evaluateAttributeExpressions(flowFile).getValue();
    final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue());

    final ComponentLog logger = getLogger();
    try {

        logger.debug("Fetching {}/{}/{} from Elasticsearch", new Object[]{index, docType, docId});
        final long startNanos = System.nanoTime();

        GetRequestBuilder getRequestBuilder = esClient.get().prepareGet(index, docType, docId);
        if (authToken != null) {
            getRequestBuilder.putHeader("Authorization", authToken);
        }
        final GetResponse getResponse = getRequestBuilder.execute().actionGet();

        if (getResponse == null || !getResponse.isExists()) {
            logger.warn("Failed to read {}/{}/{} from Elasticsearch: Document not found",
                    new Object[]{index, docType, docId});

            // We couldn't find the document, so penalize it and send it to "not found"
            flowFile = session.penalize(flowFile);
            session.transfer(flowFile, REL_NOT_FOUND);
        } else {
            flowFile = session.putAttribute(flowFile, "filename", docId);
            flowFile = session.putAttribute(flowFile, "es.index", index);
            flowFile = session.putAttribute(flowFile, "es.type", docType);
            flowFile = session.write(flowFile, new OutputStreamCallback() {
                @Override
                public void process(OutputStream out) throws IOException {
                    out.write(getResponse.getSourceAsString().getBytes(charset));
                }
            });
            logger.debug("Elasticsearch document " + docId + " fetched, routing to success");

            final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
            final String uri = context.getProperty(HOSTS).evaluateAttributeExpressions().getValue() + "/" + index + "/" + docType + "/" + docId;
            session.getProvenanceReporter().fetch(flowFile, uri, millis);
            session.transfer(flowFile, REL_SUCCESS);
        }
    } catch (NoNodeAvailableException
            | ElasticsearchTimeoutException
            | ReceiveTimeoutTransportException
            | NodeClosedException exceptionToRetry) {
        logger.error("Failed to read into Elasticsearch due to {}, this may indicate an error in configuration "
                        + "(hosts, username/password, etc.). Routing to retry",
                new Object[]{exceptionToRetry.getLocalizedMessage()}, exceptionToRetry);
        session.transfer(flowFile, REL_RETRY);
        context.yield();

    } catch (Exception e) {
        logger.error("Failed to read {} from Elasticsearch due to {}", new Object[]{flowFile, e.getLocalizedMessage()}, e);
        session.transfer(flowFile, REL_FAILURE);
        context.yield();
    }
}
 
示例2
@Test
public void testPutElasticsearchOnTriggerWithExceptions() throws IOException {
    PutElasticsearchTestProcessor processor = new PutElasticsearchTestProcessor(false);
    runner = TestRunners.newTestRunner(processor);
    runner.setProperty(AbstractElasticsearchTransportClientProcessor.CLUSTER_NAME, "elasticsearch");
    runner.setProperty(AbstractElasticsearchTransportClientProcessor.HOSTS, "127.0.0.1:9300");
    runner.setProperty(AbstractElasticsearchTransportClientProcessor.PING_TIMEOUT, "5s");
    runner.setProperty(AbstractElasticsearchTransportClientProcessor.SAMPLER_INTERVAL, "5s");
    runner.setProperty(PutElasticsearch.INDEX, "doc");
    runner.setProperty(PutElasticsearch.TYPE, "status");
    runner.setValidateExpressionUsage(true);
    runner.setProperty(PutElasticsearch.ID_ATTRIBUTE, "doc_id");

    // No Node Available exception
    processor.setExceptionToThrow(new NoNodeAvailableException("test"));
    runner.enqueue(docExample, new HashMap<String, String>() {{
        put("doc_id", "28039652140");
    }});
    runner.run(1, true, true);

    runner.assertAllFlowFilesTransferred(FetchElasticsearch.REL_RETRY, 1);
    runner.clearTransferState();

    // Elasticsearch Timeout exception
    processor.setExceptionToThrow(new ElasticsearchTimeoutException("test"));
    runner.enqueue(docExample, new HashMap<String, String>() {{
        put("doc_id", "28039652141");
    }});
    runner.run(1, true, true);

    runner.assertAllFlowFilesTransferred(FetchElasticsearch.REL_RETRY, 1);
    runner.clearTransferState();

    // Receive Timeout Transport exception
    processor.setExceptionToThrow(new ReceiveTimeoutTransportException(mock(StreamInput.class)));
    runner.enqueue(docExample, new HashMap<String, String>() {{
        put("doc_id", "28039652142");
    }});
    runner.run(1, true, true);

    runner.assertAllFlowFilesTransferred(FetchElasticsearch.REL_RETRY, 1);
    runner.clearTransferState();

    // Node Closed exception
    processor.setExceptionToThrow(new NodeClosedException(mock(StreamInput.class)));
    runner.enqueue(docExample, new HashMap<String, String>() {{
        put("doc_id", "28039652143");
    }});
    runner.run(1, true, true);

    runner.assertAllFlowFilesTransferred(FetchElasticsearch.REL_RETRY, 1);
    runner.clearTransferState();

    // Elasticsearch Parse exception
    processor.setExceptionToThrow(new ElasticsearchParseException("test"));
    runner.enqueue(docExample, new HashMap<String, String>() {{
        put("doc_id", "28039652144");
    }});
    runner.run(1, true, true);

    // This test generates an exception on execute(),routes to failure
    runner.assertTransferCount(PutElasticsearch.REL_FAILURE, 1);
}
 
示例3
@Test
public void testFetchElasticsearchOnTriggerWithExceptions() throws IOException {
    FetchElasticsearchTestProcessor processor = new FetchElasticsearchTestProcessor(true);
    runner = TestRunners.newTestRunner(processor);
    runner.setProperty(AbstractElasticsearchTransportClientProcessor.CLUSTER_NAME, "elasticsearch");
    runner.setProperty(AbstractElasticsearchTransportClientProcessor.HOSTS, "127.0.0.1:9300");
    runner.setProperty(AbstractElasticsearchTransportClientProcessor.PING_TIMEOUT, "5s");
    runner.setProperty(AbstractElasticsearchTransportClientProcessor.SAMPLER_INTERVAL, "5s");
    runner.setProperty(FetchElasticsearch.INDEX, "doc");
    runner.setProperty(FetchElasticsearch.TYPE, "status");
    runner.setValidateExpressionUsage(true);
    runner.setProperty(FetchElasticsearch.DOC_ID, "${doc_id}");

    // No Node Available exception
    processor.setExceptionToThrow(new NoNodeAvailableException("test"));
    runner.enqueue(docExample, new HashMap<String, String>() {{
        put("doc_id", "28039652140");
    }});
    runner.run(1, true, true);

    runner.assertAllFlowFilesTransferred(FetchElasticsearch.REL_RETRY, 1);
    runner.clearTransferState();

    // Elasticsearch Timeout exception
    processor.setExceptionToThrow(new ElasticsearchTimeoutException("test"));
    runner.enqueue(docExample, new HashMap<String, String>() {{
        put("doc_id", "28039652141");
    }});
    runner.run(1, true, true);

    runner.assertAllFlowFilesTransferred(FetchElasticsearch.REL_RETRY, 1);
    runner.clearTransferState();

    // Receive Timeout Transport exception
    processor.setExceptionToThrow(new ReceiveTimeoutTransportException(mock(StreamInput.class)));
    runner.enqueue(docExample, new HashMap<String, String>() {{
        put("doc_id", "28039652141");
    }});
    runner.run(1, true, true);

    runner.assertAllFlowFilesTransferred(FetchElasticsearch.REL_RETRY, 1);
    runner.clearTransferState();

    // Node Closed exception
    processor.setExceptionToThrow(new NodeClosedException(mock(StreamInput.class)));
    runner.enqueue(docExample, new HashMap<String, String>() {{
        put("doc_id", "28039652141");
    }});
    runner.run(1, true, true);

    runner.assertAllFlowFilesTransferred(FetchElasticsearch.REL_RETRY, 1);
    runner.clearTransferState();

    // Elasticsearch Parse exception
    processor.setExceptionToThrow(new ElasticsearchParseException("test"));
    runner.enqueue(docExample, new HashMap<String, String>() {{
        put("doc_id", "28039652141");
    }});
    runner.run(1, true, true);

    // This test generates an exception on execute(),routes to failure
    runner.assertTransferCount(FetchElasticsearch.REL_FAILURE, 1);
}
 
示例4
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {

    synchronized (esClient) {
        if(esClient.get() == null) {
            super.setup(context);
        }
    }

    FlowFile flowFile = session.get();
    if (flowFile == null) {
        return;
    }

    final String index = context.getProperty(INDEX).evaluateAttributeExpressions(flowFile).getValue();
    final String docId = context.getProperty(DOC_ID).evaluateAttributeExpressions(flowFile).getValue();
    final String docType = context.getProperty(TYPE).evaluateAttributeExpressions(flowFile).getValue();
    final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue());

    final ComponentLog logger = getLogger();
    try {

        logger.debug("Fetching {}/{}/{} from Elasticsearch", new Object[]{index, docType, docId});
        GetRequestBuilder getRequestBuilder = esClient.get().prepareGet(index, docType, docId);
        final GetResponse getResponse = getRequestBuilder.execute().actionGet();

        if (getResponse == null || !getResponse.isExists()) {
            logger.warn("Failed to read {}/{}/{} from Elasticsearch: Document not found",
                    new Object[]{index, docType, docId});

            // We couldn't find the document, so penalize it and send it to "not found"
            flowFile = session.penalize(flowFile);
            session.transfer(flowFile, REL_NOT_FOUND);
        } else {
            flowFile = session.putAllAttributes(flowFile, new HashMap<String, String>() {{
                put("filename", docId);
                put("es.index", index);
                put("es.type", docType);
            }});
            flowFile = session.write(flowFile, new OutputStreamCallback() {
                @Override
                public void process(OutputStream out) throws IOException {
                    out.write(getResponse.getSourceAsString().getBytes(charset));
                }
            });
            logger.debug("Elasticsearch document " + docId + " fetched, routing to success");
            // The document is JSON, so update the MIME type of the flow file
            flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), "application/json");
            session.getProvenanceReporter().fetch(flowFile, getResponse.remoteAddress().getAddress());
            session.transfer(flowFile, REL_SUCCESS);
        }
    } catch (NoNodeAvailableException
            | ElasticsearchTimeoutException
            | ReceiveTimeoutTransportException
            | NodeClosedException exceptionToRetry) {
        logger.error("Failed to read into Elasticsearch due to {}, this may indicate an error in configuration "
                        + "(hosts, username/password, etc.), or this issue may be transient. Routing to retry",
                new Object[]{exceptionToRetry.getLocalizedMessage()}, exceptionToRetry);
        session.transfer(flowFile, REL_RETRY);
        context.yield();

    } catch (Exception e) {
        logger.error("Failed to read {} from Elasticsearch due to {}", new Object[]{flowFile, e.getLocalizedMessage()}, e);
        session.transfer(flowFile, REL_FAILURE);
        context.yield();
    }
}
 
示例5
@Test
public void testPutElasticsearch5OnTriggerWithExceptions() throws IOException {
    PutElasticsearch5TestProcessor processor = new PutElasticsearch5TestProcessor(false);
    runner = TestRunners.newTestRunner(processor);
    runner.setProperty(AbstractElasticsearch5TransportClientProcessor.CLUSTER_NAME, "elasticsearch");
    runner.setProperty(AbstractElasticsearch5TransportClientProcessor.HOSTS, "127.0.0.1:9300");
    runner.setProperty(AbstractElasticsearch5TransportClientProcessor.PING_TIMEOUT, "5s");
    runner.setProperty(AbstractElasticsearch5TransportClientProcessor.SAMPLER_INTERVAL, "5s");
    runner.setProperty(PutElasticsearch5.INDEX, "doc");
    runner.setProperty(PutElasticsearch5.TYPE, "status");
    runner.setValidateExpressionUsage(true);
    runner.setProperty(PutElasticsearch5.ID_ATTRIBUTE, "doc_id");

    // No Node Available exception
    processor.setExceptionToThrow(new NoNodeAvailableException("test"));
    runner.enqueue(docExample, new HashMap<String, String>() {{
        put("doc_id", "28039652140");
    }});
    runner.run(1, true, true);

    runner.assertAllFlowFilesTransferred(FetchElasticsearch5.REL_RETRY, 1);
    runner.clearTransferState();

    // Elasticsearch5 Timeout exception
    processor.setExceptionToThrow(new ElasticsearchTimeoutException("test"));
    runner.enqueue(docExample, new HashMap<String, String>() {{
        put("doc_id", "28039652141");
    }});
    runner.run(1, true, true);

    runner.assertAllFlowFilesTransferred(FetchElasticsearch5.REL_RETRY, 1);
    runner.clearTransferState();

    // Receive Timeout Transport exception
    processor.setExceptionToThrow(new ReceiveTimeoutTransportException(mock(StreamInput.class)));
    runner.enqueue(docExample, new HashMap<String, String>() {{
        put("doc_id", "28039652142");
    }});
    runner.run(1, true, true);

    runner.assertAllFlowFilesTransferred(FetchElasticsearch5.REL_RETRY, 1);
    runner.clearTransferState();

    // Node Closed exception
    processor.setExceptionToThrow(new NodeClosedException(mock(StreamInput.class)));
    runner.enqueue(docExample, new HashMap<String, String>() {{
        put("doc_id", "28039652143");
    }});
    runner.run(1, true, true);

    runner.assertAllFlowFilesTransferred(FetchElasticsearch5.REL_RETRY, 1);
    runner.clearTransferState();

    // Elasticsearch5 Parse exception
    processor.setExceptionToThrow(new ElasticsearchParseException("test"));
    runner.enqueue(docExample, new HashMap<String, String>() {{
        put("doc_id", "28039652144");
    }});
    runner.run(1, true, true);

    // This test generates an exception on execute(),routes to failure
    runner.assertTransferCount(PutElasticsearch5.REL_FAILURE, 1);
}
 
示例6
@Test
public void testFetchElasticsearch5OnTriggerWithExceptions() throws IOException {
    FetchElasticsearch5TestProcessor processor = new FetchElasticsearch5TestProcessor(true);
    runner = TestRunners.newTestRunner(processor);
    runner.setProperty(AbstractElasticsearch5TransportClientProcessor.CLUSTER_NAME, "elasticsearch");
    runner.setProperty(AbstractElasticsearch5TransportClientProcessor.HOSTS, "127.0.0.1:9300");
    runner.setProperty(AbstractElasticsearch5TransportClientProcessor.PING_TIMEOUT, "5s");
    runner.setProperty(AbstractElasticsearch5TransportClientProcessor.SAMPLER_INTERVAL, "5s");
    runner.setProperty(FetchElasticsearch5.INDEX, "doc");
    runner.setProperty(FetchElasticsearch5.TYPE, "status");
    runner.setValidateExpressionUsage(true);
    runner.setProperty(FetchElasticsearch5.DOC_ID, "${doc_id}");

    // No Node Available exception
    processor.setExceptionToThrow(new NoNodeAvailableException("test"));
    runner.enqueue(docExample, new HashMap<String, String>() {{
        put("doc_id", "28039652140");
    }});
    runner.run(1, true, true);

    runner.assertAllFlowFilesTransferred(FetchElasticsearch5.REL_RETRY, 1);
    runner.clearTransferState();

    // Elasticsearch5 Timeout exception
    processor.setExceptionToThrow(new ElasticsearchTimeoutException("test"));
    runner.enqueue(docExample, new HashMap<String, String>() {{
        put("doc_id", "28039652141");
    }});
    runner.run(1, true, true);

    runner.assertAllFlowFilesTransferred(FetchElasticsearch5.REL_RETRY, 1);
    runner.clearTransferState();

    // Receive Timeout Transport exception
    processor.setExceptionToThrow(new ReceiveTimeoutTransportException(mock(StreamInput.class)));
    runner.enqueue(docExample, new HashMap<String, String>() {{
        put("doc_id", "28039652141");
    }});
    runner.run(1, true, true);

    runner.assertAllFlowFilesTransferred(FetchElasticsearch5.REL_RETRY, 1);
    runner.clearTransferState();

    // Node Closed exception
    processor.setExceptionToThrow(new NodeClosedException(mock(StreamInput.class)));
    runner.enqueue(docExample, new HashMap<String, String>() {{
        put("doc_id", "28039652141");
    }});
    runner.run(1, true, true);

    runner.assertAllFlowFilesTransferred(FetchElasticsearch5.REL_RETRY, 1);
    runner.clearTransferState();

    // Elasticsearch5 Parse exception
    processor.setExceptionToThrow(new ElasticsearchParseException("test"));
    runner.enqueue(docExample, new HashMap<String, String>() {{
        put("doc_id", "28039652141");
    }});
    runner.run(1, true, true);

    // This test generates an exception on execute(),routes to failure
    runner.assertTransferCount(FetchElasticsearch5.REL_FAILURE, 1);
}
 
示例7
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {

    FlowFile flowFile = session.get();
    if (flowFile == null) {
        return;
    }

    final String index = context.getProperty(INDEX).evaluateAttributeExpressions(flowFile).getValue();
    final String docId = context.getProperty(DOC_ID).evaluateAttributeExpressions(flowFile).getValue();
    final String docType = context.getProperty(TYPE).evaluateAttributeExpressions(flowFile).getValue();
    final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue());

    final ComponentLog logger = getLogger();
    try {

        logger.debug("Fetching {}/{}/{} from Elasticsearch", new Object[]{index, docType, docId});
        final long startNanos = System.nanoTime();

        GetRequestBuilder getRequestBuilder = esClient.get().prepareGet(index, docType, docId);
        if (authToken != null) {
            getRequestBuilder.putHeader("Authorization", authToken);
        }
        final GetResponse getResponse = getRequestBuilder.execute().actionGet();

        if (getResponse == null || !getResponse.isExists()) {
            logger.debug("Failed to read {}/{}/{} from Elasticsearch: Document not found",
                    new Object[]{index, docType, docId});

            // We couldn't find the document, so penalize it and send it to "not found"
            flowFile = session.penalize(flowFile);
            session.transfer(flowFile, REL_NOT_FOUND);
        } else {
            flowFile = session.putAttribute(flowFile, "filename", docId);
            flowFile = session.putAttribute(flowFile, "es.index", index);
            flowFile = session.putAttribute(flowFile, "es.type", docType);
            flowFile = session.write(flowFile, new OutputStreamCallback() {
                @Override
                public void process(OutputStream out) throws IOException {
                    out.write(getResponse.getSourceAsString().getBytes(charset));
                }
            });
            logger.debug("Elasticsearch document " + docId + " fetched, routing to success");

            final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
            final String uri = context.getProperty(HOSTS).evaluateAttributeExpressions().getValue() + "/" + index + "/" + docType + "/" + docId;
            session.getProvenanceReporter().fetch(flowFile, uri, millis);
            session.transfer(flowFile, REL_SUCCESS);
        }
    } catch (NoNodeAvailableException
            | ElasticsearchTimeoutException
            | ReceiveTimeoutTransportException
            | NodeClosedException exceptionToRetry) {
        logger.error("Failed to read into Elasticsearch due to {}, this may indicate an error in configuration "
                        + "(hosts, username/password, etc.). Routing to retry",
                new Object[]{exceptionToRetry.getLocalizedMessage()}, exceptionToRetry);
        session.transfer(flowFile, REL_RETRY);
        context.yield();

    } catch (Exception e) {
        logger.error("Failed to read {} from Elasticsearch due to {}", new Object[]{flowFile, e.getLocalizedMessage()}, e);
        session.transfer(flowFile, REL_FAILURE);
        context.yield();
    }
}
 
示例8
@Test
public void testPutElasticsearchOnTriggerWithExceptions() throws IOException {
    PutElasticsearchTestProcessor processor = new PutElasticsearchTestProcessor(false);
    runner = TestRunners.newTestRunner(processor);
    runner.setProperty(AbstractElasticsearchTransportClientProcessor.CLUSTER_NAME, "elasticsearch");
    runner.setProperty(AbstractElasticsearchTransportClientProcessor.HOSTS, "127.0.0.1:9300");
    runner.setProperty(AbstractElasticsearchTransportClientProcessor.PING_TIMEOUT, "5s");
    runner.setProperty(AbstractElasticsearchTransportClientProcessor.SAMPLER_INTERVAL, "5s");
    runner.setProperty(PutElasticsearch.INDEX, "doc");
    runner.setProperty(PutElasticsearch.TYPE, "status");
    runner.setProperty(PutElasticsearch.ID_ATTRIBUTE, "doc_id");

    // No Node Available exception
    processor.setExceptionToThrow(new NoNodeAvailableException("test"));
    runner.enqueue(docExample, new HashMap<String, String>() {{
        put("doc_id", "28039652140");
    }});
    runner.run(1, true, true);

    runner.assertAllFlowFilesTransferred(FetchElasticsearch.REL_RETRY, 1);
    runner.clearTransferState();

    // Elasticsearch Timeout exception
    processor.setExceptionToThrow(new ElasticsearchTimeoutException("test"));
    runner.enqueue(docExample, new HashMap<String, String>() {{
        put("doc_id", "28039652141");
    }});
    runner.run(1, true, true);

    runner.assertAllFlowFilesTransferred(FetchElasticsearch.REL_RETRY, 1);
    runner.clearTransferState();

    // Receive Timeout Transport exception
    processor.setExceptionToThrow(new ReceiveTimeoutTransportException(mock(StreamInput.class)));
    runner.enqueue(docExample, new HashMap<String, String>() {{
        put("doc_id", "28039652142");
    }});
    runner.run(1, true, true);

    runner.assertAllFlowFilesTransferred(FetchElasticsearch.REL_RETRY, 1);
    runner.clearTransferState();

    // Node Closed exception
    processor.setExceptionToThrow(new NodeClosedException(mock(StreamInput.class)));
    runner.enqueue(docExample, new HashMap<String, String>() {{
        put("doc_id", "28039652143");
    }});
    runner.run(1, true, true);

    runner.assertAllFlowFilesTransferred(FetchElasticsearch.REL_RETRY, 1);
    runner.clearTransferState();

    // Elasticsearch Parse exception
    processor.setExceptionToThrow(new ElasticsearchParseException("test"));
    runner.enqueue(docExample, new HashMap<String, String>() {{
        put("doc_id", "28039652144");
    }});
    runner.run(1, true, true);

    // This test generates an exception on execute(),routes to failure
    runner.assertTransferCount(PutElasticsearch.REL_FAILURE, 1);
}
 
示例9
@Test
public void testFetchElasticsearchOnTriggerWithExceptions() throws IOException {
    FetchElasticsearchTestProcessor processor = new FetchElasticsearchTestProcessor(true);
    runner = TestRunners.newTestRunner(processor);
    runner.setProperty(AbstractElasticsearchTransportClientProcessor.CLUSTER_NAME, "elasticsearch");
    runner.setProperty(AbstractElasticsearchTransportClientProcessor.HOSTS, "127.0.0.1:9300");
    runner.setProperty(AbstractElasticsearchTransportClientProcessor.PING_TIMEOUT, "5s");
    runner.setProperty(AbstractElasticsearchTransportClientProcessor.SAMPLER_INTERVAL, "5s");
    runner.setProperty(FetchElasticsearch.INDEX, "doc");
    runner.setProperty(FetchElasticsearch.TYPE, "status");
    runner.setProperty(FetchElasticsearch.DOC_ID, "${doc_id}");

    // No Node Available exception
    processor.setExceptionToThrow(new NoNodeAvailableException("test"));
    runner.enqueue(docExample, new HashMap<String, String>() {{
        put("doc_id", "28039652140");
    }});
    runner.run(1, true, true);

    runner.assertAllFlowFilesTransferred(FetchElasticsearch.REL_RETRY, 1);
    runner.clearTransferState();

    // Elasticsearch Timeout exception
    processor.setExceptionToThrow(new ElasticsearchTimeoutException("test"));
    runner.enqueue(docExample, new HashMap<String, String>() {{
        put("doc_id", "28039652141");
    }});
    runner.run(1, true, true);

    runner.assertAllFlowFilesTransferred(FetchElasticsearch.REL_RETRY, 1);
    runner.clearTransferState();

    // Receive Timeout Transport exception
    processor.setExceptionToThrow(new ReceiveTimeoutTransportException(mock(StreamInput.class)));
    runner.enqueue(docExample, new HashMap<String, String>() {{
        put("doc_id", "28039652141");
    }});
    runner.run(1, true, true);

    runner.assertAllFlowFilesTransferred(FetchElasticsearch.REL_RETRY, 1);
    runner.clearTransferState();

    // Node Closed exception
    processor.setExceptionToThrow(new NodeClosedException(mock(StreamInput.class)));
    runner.enqueue(docExample, new HashMap<String, String>() {{
        put("doc_id", "28039652141");
    }});
    runner.run(1, true, true);

    runner.assertAllFlowFilesTransferred(FetchElasticsearch.REL_RETRY, 1);
    runner.clearTransferState();

    // Elasticsearch Parse exception
    processor.setExceptionToThrow(new ElasticsearchParseException("test"));
    runner.enqueue(docExample, new HashMap<String, String>() {{
        put("doc_id", "28039652141");
    }});
    runner.run(1, true, true);

    // This test generates an exception on execute(),routes to failure
    runner.assertTransferCount(FetchElasticsearch.REL_FAILURE, 1);
}
 
示例10
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {

    synchronized (esClient) {
        if(esClient.get() == null) {
            super.setup(context);
        }
    }

    FlowFile flowFile = session.get();
    if (flowFile == null) {
        return;
    }

    final String index = context.getProperty(INDEX).evaluateAttributeExpressions(flowFile).getValue();
    final String docId = context.getProperty(DOC_ID).evaluateAttributeExpressions(flowFile).getValue();
    final String docType = context.getProperty(TYPE).evaluateAttributeExpressions(flowFile).getValue();
    final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue());

    final ComponentLog logger = getLogger();
    try {

        logger.debug("Fetching {}/{}/{} from Elasticsearch", new Object[]{index, docType, docId});
        GetRequestBuilder getRequestBuilder = esClient.get().prepareGet(index, docType, docId);
        final GetResponse getResponse = getRequestBuilder.execute().actionGet();

        if (getResponse == null || !getResponse.isExists()) {
            logger.debug("Failed to read {}/{}/{} from Elasticsearch: Document not found",
                    new Object[]{index, docType, docId});

            // We couldn't find the document, so penalize it and send it to "not found"
            flowFile = session.penalize(flowFile);
            session.transfer(flowFile, REL_NOT_FOUND);
        } else {
            flowFile = session.putAllAttributes(flowFile, new HashMap<String, String>() {{
                put("filename", docId);
                put("es.index", index);
                put("es.type", docType);
            }});
            flowFile = session.write(flowFile, new OutputStreamCallback() {
                @Override
                public void process(OutputStream out) throws IOException {
                    out.write(getResponse.getSourceAsString().getBytes(charset));
                }
            });
            logger.debug("Elasticsearch document " + docId + " fetched, routing to success");
            // The document is JSON, so update the MIME type of the flow file
            flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), "application/json");
            session.getProvenanceReporter().fetch(flowFile, getResponse.remoteAddress().getAddress());
            session.transfer(flowFile, REL_SUCCESS);
        }
    } catch (NoNodeAvailableException
            | ElasticsearchTimeoutException
            | ReceiveTimeoutTransportException
            | NodeClosedException exceptionToRetry) {
        logger.error("Failed to read into Elasticsearch due to {}, this may indicate an error in configuration "
                        + "(hosts, username/password, etc.), or this issue may be transient. Routing to retry",
                new Object[]{exceptionToRetry.getLocalizedMessage()}, exceptionToRetry);
        session.transfer(flowFile, REL_RETRY);
        context.yield();

    } catch (Exception e) {
        logger.error("Failed to read {} from Elasticsearch due to {}", new Object[]{flowFile, e.getLocalizedMessage()}, e);
        session.transfer(flowFile, REL_FAILURE);
        context.yield();
    }
}
 
示例11
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {

    synchronized (esClient) {
        if(esClient.get() == null) {
            setup(context);
        }
    }

    FlowFile flowFile = session.get();
    if (flowFile == null) {
        return;
    }

    final String index = context.getProperty(INDEX).evaluateAttributeExpressions(flowFile).getValue();
    final String documentId = context.getProperty(DOCUMENT_ID).evaluateAttributeExpressions(flowFile).getValue();
    final String documentType = context.getProperty(TYPE).evaluateAttributeExpressions(flowFile).getValue();

    final ComponentLog logger = getLogger();

    if ( StringUtils.isBlank(index) ) {
        logger.debug("Index is required but was empty {}", new Object [] { index });
        flowFile = session.putAttribute(flowFile, ES_ERROR_MESSAGE, "Index is required but was empty");
        session.transfer(flowFile,REL_FAILURE);
        return;
    }
    if ( StringUtils.isBlank(documentType) ) {
        logger.debug("Document type is required but was empty {}", new Object [] { documentType });
        flowFile = session.putAttribute(flowFile, ES_ERROR_MESSAGE, "Document type is required but was empty");
        session.transfer(flowFile,REL_FAILURE);
        return;
    }
    if ( StringUtils.isBlank(documentId) ) {
        logger.debug("Document id is required but was empty {}", new Object [] { documentId });
        flowFile = session.putAttribute(flowFile, ES_ERROR_MESSAGE, "Document id is required but was empty");
        session.transfer(flowFile,REL_FAILURE);
        return;
    }

    flowFile = session.putAllAttributes(flowFile, new HashMap<String, String>() {{
        put(ES_FILENAME, documentId);
        put(ES_INDEX, index);
        put(ES_TYPE, documentType);
    }});

    try {

        logger.debug("Deleting document {}/{}/{} from Elasticsearch", new Object[]{index, documentType, documentId});
        DeleteRequestBuilder requestBuilder = prepareDeleteRequest(index, documentId, documentType);
        final DeleteResponse response = doDelete(requestBuilder);

        if (response.status() != RestStatus.OK)  {
            logger.warn("Failed to delete document {}/{}/{} from Elasticsearch: Status {}",
                    new Object[]{index, documentType, documentId, response.status()});
            flowFile = session.putAttribute(flowFile, ES_ERROR_MESSAGE, UNABLE_TO_DELETE_DOCUMENT_MESSAGE);
            flowFile = session.putAttribute(flowFile, ES_REST_STATUS, response.status().toString());
            context.yield();
            if ( response.status() ==  RestStatus.NOT_FOUND ) {
                   session.transfer(flowFile, REL_NOT_FOUND);
            } else {
                session.transfer(flowFile, REL_FAILURE);
            }
        } else {
            logger.debug("Elasticsearch document " + documentId + " deleted");
            session.transfer(flowFile, REL_SUCCESS);
        }
    } catch ( ElasticsearchTimeoutException
            | ReceiveTimeoutTransportException exception) {
        logger.error("Failed to delete document {} from Elasticsearch due to {}",
                new Object[]{documentId, exception.getLocalizedMessage()}, exception);
        flowFile = session.putAttribute(flowFile, ES_ERROR_MESSAGE, exception.getLocalizedMessage());
        session.transfer(flowFile, REL_RETRY);
        context.yield();

    } catch (Exception e) {
        logger.error("Failed to delete document {} from Elasticsearch due to {}", new Object[]{documentId, e.getLocalizedMessage()}, e);
        flowFile = session.putAttribute(flowFile, ES_ERROR_MESSAGE, e.getLocalizedMessage());
        session.transfer(flowFile, REL_FAILURE);
        context.yield();
    }
}
 
示例12
@Test
public void testPutElasticsearch5OnTriggerWithExceptions() throws IOException {
    PutElasticsearch5TestProcessor processor = new PutElasticsearch5TestProcessor(false);
    runner = TestRunners.newTestRunner(processor);
    runner.setProperty(AbstractElasticsearch5TransportClientProcessor.CLUSTER_NAME, "elasticsearch");
    runner.setProperty(AbstractElasticsearch5TransportClientProcessor.HOSTS, "127.0.0.1:9300");
    runner.setProperty(AbstractElasticsearch5TransportClientProcessor.PING_TIMEOUT, "5s");
    runner.setProperty(AbstractElasticsearch5TransportClientProcessor.SAMPLER_INTERVAL, "5s");
    runner.setProperty(PutElasticsearch5.INDEX, "doc");
    runner.setProperty(PutElasticsearch5.TYPE, "status");
    runner.setProperty(PutElasticsearch5.ID_ATTRIBUTE, "doc_id");

    // No Node Available exception
    processor.setExceptionToThrow(new NoNodeAvailableException("test"));
    runner.enqueue(docExample, new HashMap<String, String>() {{
        put("doc_id", "28039652140");
    }});
    runner.run(1, true, true);

    runner.assertAllFlowFilesTransferred(FetchElasticsearch5.REL_RETRY, 1);
    runner.clearTransferState();

    // Elasticsearch5 Timeout exception
    processor.setExceptionToThrow(new ElasticsearchTimeoutException("test"));
    runner.enqueue(docExample, new HashMap<String, String>() {{
        put("doc_id", "28039652141");
    }});
    runner.run(1, true, true);

    runner.assertAllFlowFilesTransferred(FetchElasticsearch5.REL_RETRY, 1);
    runner.clearTransferState();

    // Receive Timeout Transport exception
    processor.setExceptionToThrow(new ReceiveTimeoutTransportException(mock(StreamInput.class)));
    runner.enqueue(docExample, new HashMap<String, String>() {{
        put("doc_id", "28039652142");
    }});
    runner.run(1, true, true);

    runner.assertAllFlowFilesTransferred(FetchElasticsearch5.REL_RETRY, 1);
    runner.clearTransferState();

    // Node Closed exception
    processor.setExceptionToThrow(new NodeClosedException(mock(StreamInput.class)));
    runner.enqueue(docExample, new HashMap<String, String>() {{
        put("doc_id", "28039652143");
    }});
    runner.run(1, true, true);

    runner.assertAllFlowFilesTransferred(FetchElasticsearch5.REL_RETRY, 1);
    runner.clearTransferState();

    // Elasticsearch5 Parse exception
    processor.setExceptionToThrow(new ElasticsearchParseException("test"));
    runner.enqueue(docExample, new HashMap<String, String>() {{
        put("doc_id", "28039652144");
    }});
    runner.run(1, true, true);

    // This test generates an exception on execute(),routes to failure
    runner.assertTransferCount(PutElasticsearch5.REL_FAILURE, 1);
}
 
示例13
@Test
public void testFetchElasticsearch5OnTriggerWithExceptions() throws IOException {
    FetchElasticsearch5TestProcessor processor = new FetchElasticsearch5TestProcessor(true);
    runner = TestRunners.newTestRunner(processor);
    runner.setProperty(AbstractElasticsearch5TransportClientProcessor.CLUSTER_NAME, "elasticsearch");
    runner.setProperty(AbstractElasticsearch5TransportClientProcessor.HOSTS, "127.0.0.1:9300");
    runner.setProperty(AbstractElasticsearch5TransportClientProcessor.PING_TIMEOUT, "5s");
    runner.setProperty(AbstractElasticsearch5TransportClientProcessor.SAMPLER_INTERVAL, "5s");
    runner.setProperty(FetchElasticsearch5.INDEX, "doc");
    runner.setProperty(FetchElasticsearch5.TYPE, "status");
    runner.setProperty(FetchElasticsearch5.DOC_ID, "${doc_id}");

    // No Node Available exception
    processor.setExceptionToThrow(new NoNodeAvailableException("test"));
    runner.enqueue(docExample, new HashMap<String, String>() {{
        put("doc_id", "28039652140");
    }});
    runner.run(1, true, true);

    runner.assertAllFlowFilesTransferred(FetchElasticsearch5.REL_RETRY, 1);
    runner.clearTransferState();

    // Elasticsearch5 Timeout exception
    processor.setExceptionToThrow(new ElasticsearchTimeoutException("test"));
    runner.enqueue(docExample, new HashMap<String, String>() {{
        put("doc_id", "28039652141");
    }});
    runner.run(1, true, true);

    runner.assertAllFlowFilesTransferred(FetchElasticsearch5.REL_RETRY, 1);
    runner.clearTransferState();

    // Receive Timeout Transport exception
    processor.setExceptionToThrow(new ReceiveTimeoutTransportException(mock(StreamInput.class)));
    runner.enqueue(docExample, new HashMap<String, String>() {{
        put("doc_id", "28039652141");
    }});
    runner.run(1, true, true);

    runner.assertAllFlowFilesTransferred(FetchElasticsearch5.REL_RETRY, 1);
    runner.clearTransferState();

    // Node Closed exception
    processor.setExceptionToThrow(new NodeClosedException(mock(StreamInput.class)));
    runner.enqueue(docExample, new HashMap<String, String>() {{
        put("doc_id", "28039652141");
    }});
    runner.run(1, true, true);

    runner.assertAllFlowFilesTransferred(FetchElasticsearch5.REL_RETRY, 1);
    runner.clearTransferState();

    // Elasticsearch5 Parse exception
    processor.setExceptionToThrow(new ElasticsearchParseException("test"));
    runner.enqueue(docExample, new HashMap<String, String>() {{
        put("doc_id", "28039652141");
    }});
    runner.run(1, true, true);

    // This test generates an exception on execute(),routes to failure
    runner.assertTransferCount(FetchElasticsearch5.REL_FAILURE, 1);
}
 
示例14
private static boolean isTimeoutOrNodeNotReachable(Throwable t) {
    return t instanceof ReceiveTimeoutTransportException
        || t instanceof ConnectTransportException;
}
 
示例15
/**
 * Check if the input exception indicates connection issues.
 *
 * @param e exception
 * @return true if we get disconnected from the node or the node is not in the
 *         right state (being closed) or transport request times out (sent from TimeoutHandler.run)
 */
private boolean hasConnectionIssue(Throwable e) {
    return e instanceof ConnectTransportException || e instanceof NodeClosedException || e instanceof ReceiveTimeoutTransportException;
}