Java源码示例:com.ning.http.client.AsyncHandler
示例1
/**
* Critical execute a request (will not lookup in cache nor load into cache).
*
* @param request Request to critical execute
* @param asyncHandler Request async handler
* @param <T> Response type
* @return {@literal CompletableFuture<T>}
*/
public <T> CompletableFuture<T> criticalExecute(
final ParsecAsyncHttpRequest request,
AsyncHandler<T> asyncHandler
) {
AsyncHandler<T> practicalAsyncHandler =
oldFashionProfiling? new ParsecAsyncHandlerWrapper<>(asyncHandler, request.getNingRequest()): asyncHandler;
if (request.getRetryStatusCodes().isEmpty() && request.getRetryExceptions().isEmpty()) {
return new ParsecCompletableFuture<>(
client.executeRequest(request.getNingRequest(), practicalAsyncHandler)
);
} else {
ParsecHttpRequestRetryCallable<T> retryCallable;
retryCallable = retryIntervalMillisOpt.map(retryIntervalMillis -> new ParsecHttpRequestRetryCallable<>(
client, request, practicalAsyncHandler, retryIntervalMillis))
.orElseGet(() -> new ParsecHttpRequestRetryCallable<>(client, request, practicalAsyncHandler));
return new ParsecCompletableFuture<>(executorService.submit(retryCallable));
}
}
示例2
/**
* Constructor.
*
* @param asyncHandler asyncHandler
* @param loggingPredicate
*/
public LoggingAsyncHandlerWrapper(final AsyncHandler<T> asyncHandler, final Request ningRequest,
BiPredicate<Request, ResponseOrThrowable> loggingPredicate,
NingRequestResponseFormatter formatter,
String loggerName) {
this.asyncHandler = asyncHandler;
extensions = (asyncHandler instanceof AsyncHandlerExtensions)
? (AsyncHandlerExtensions) asyncHandler : null;
progressAsyncHandler = (asyncHandler instanceof ProgressAsyncHandler)
? (ProgressAsyncHandler<T>) asyncHandler : null;
this.progress = new ParsecAsyncProgress();
this.ningRequest = ningRequest;
this.loggingPredicate = loggingPredicate;
this.formatter = formatter;
this.traceLogger = LoggerFactory.getLogger(loggerName);
}
示例3
@Test
public void test() throws Exception {
AsyncHttpClient client = new AsyncHttpClient();
try {
Future<Response> f = client.preparePost(webServer.getCallHttpUrl()).addParameter("param1", "value1").execute();
Response response = f.get();
} finally {
client.close();
}
PluginTestVerifier verifier = PluginTestVerifierHolder.getInstance();
verifier.printCache();
String destinationId = webServer.getHostAndPort();
String httpUrl = webServer.getCallHttpUrl();
verifier.verifyTrace(event("ASYNC_HTTP_CLIENT", AsyncHttpClient.class.getMethod("executeRequest", Request.class, AsyncHandler.class), null, null, destinationId,
annotation("http.url", httpUrl)));
verifier.verifyTraceCount(0);
}
示例4
/**
* Constructor.
*
* @param client client
* @param request request
* @param asyncHandler async handler
* @param retryIntervalMillis retry interval in milliseconds
*/
public ParsecHttpRequestRetryCallable(
final AsyncHttpClient client,
final ParsecAsyncHttpRequest request,
final AsyncHandler<T> asyncHandler,
long retryIntervalMillis
) {
this.client = client;
this.request = request;
this.asyncHandler = asyncHandler;
this.retryDelayer = new RetryDelayer(retryIntervalMillis);
responses = new ArrayList<>();
}
示例5
/**
* Package-private Constructor. Only for unit-test usage.
*
* @param client client
* @param request request
* @param asyncHandler async handler
* @param retryDelayer custom retry Delayer
*/
ParsecHttpRequestRetryCallable(
final AsyncHttpClient client,
final ParsecAsyncHttpRequest request,
final AsyncHandler<T> asyncHandler,
RetryDelayer retryDelayer
) {
this.client = client;
this.request = request;
this.asyncHandler = asyncHandler;
this.retryDelayer = retryDelayer;
responses = new ArrayList<>();
}
示例6
@Override
public void transactionMarker() throws Exception {
AsyncHttpClient asyncHttpClient = new AsyncHttpClient();
final CountDownLatch latch = new CountDownLatch(1);
final AtomicInteger statusCode = new AtomicInteger();
asyncHttpClient.prepareGet("http://localhost:" + getPort() + "/hello3/")
.execute(new AsyncHandler<Response>() {
@Override
public STATE onBodyPartReceived(HttpResponseBodyPart part) {
return null;
}
@Override
public Response onCompleted() throws Exception {
latch.countDown();
return null;
}
@Override
public STATE onHeadersReceived(HttpResponseHeaders headers) {
return null;
}
@Override
public STATE onStatusReceived(HttpResponseStatus status) {
statusCode.set(status.getStatusCode());
return null;
}
@Override
public void onThrowable(Throwable t) {}
});
latch.await();
asyncHttpClient.close();
if (statusCode.get() != 200) {
throw new IllegalStateException("Unexpected status code: " + statusCode);
}
}
示例7
public AsyncHandler.STATE onBodyPartReceived(HttpResponseBodyPart bodyPart) throws Exception {
// body arrived, flush headers
if (!responseSet) {
response = responseBuilder.build();
responseSet = true;
headersArrived.countDown();
}
bodyPart.writeTo(output);
return AsyncHandler.STATE.CONTINUE;
}
示例8
@DataProvider(value = {
"true | true",
"true | false",
"false | true",
"false | false"
}, splitBy = "\\|")
@Test
public void executeAsyncHttpRequest_sets_up_and_executes_call_as_expected(
boolean performSubspan, boolean currentTracingInfoNull
) {
// given
Whitebox.setInternalState(helperSpy, "performSubSpanAroundDownstreamCalls", performSubspan);
CircuitBreaker<Response> circuitBreakerMock = mock(CircuitBreaker.class);
doReturn(Optional.of(circuitBreakerMock)).when(helperSpy).getCircuitBreaker(any(RequestBuilderWrapper.class));
ManualModeTask<Response> cbManualTaskMock = mock(ManualModeTask.class);
doReturn(cbManualTaskMock).when(circuitBreakerMock).newManualModeTask();
String url = "http://localhost/some/path";
String method = "GET";
AsyncHttpClient.BoundRequestBuilder reqMock = mock(AsyncHttpClient.BoundRequestBuilder.class);
RequestBuilderWrapper rbw = new RequestBuilderWrapper(url, method, reqMock, Optional.empty(), false);
AsyncResponseHandler responseHandlerMock = mock(AsyncResponseHandler.class);
Span initialSpan = (currentTracingInfoNull) ? null : Tracer.getInstance().startRequestWithRootSpan("foo");
Deque<Span> initialSpanStack = (currentTracingInfoNull)
? null
: Tracer.getInstance().getCurrentSpanStackCopy();
Map<String, String> initialMdc = (currentTracingInfoNull) ? null : MDC.getCopyOfContextMap();
resetTracingAndMdc();
// when
CompletableFuture resultFuture = helperSpy.executeAsyncHttpRequest(
rbw, responseHandlerMock, initialSpanStack, initialMdc
);
// then
// Verify that the circuit breaker came from the getCircuitBreaker helper method and that its
// throwExceptionIfCircuitBreakerIsOpen() method was called.
verify(helperSpy).getCircuitBreaker(rbw);
verify(cbManualTaskMock).throwExceptionIfCircuitBreakerIsOpen();
// Verify that the inner request's execute method was called with a
// AsyncCompletionHandlerWithTracingAndMdcSupport for the handler.
ArgumentCaptor<AsyncHandler> executedHandlerCaptor = ArgumentCaptor.forClass(AsyncHandler.class);
verify(reqMock).execute(executedHandlerCaptor.capture());
AsyncHandler executedHandler = executedHandlerCaptor.getValue();
assertThat(executedHandler).isInstanceOf(AsyncCompletionHandlerWithTracingAndMdcSupport.class);
// Verify that the AsyncCompletionHandlerWithTracingAndMdcSupport was created with the expected args
AsyncCompletionHandlerWithTracingAndMdcSupport achwtams =
(AsyncCompletionHandlerWithTracingAndMdcSupport) executedHandler;
assertThat(achwtams.completableFutureResponse).isSameAs(resultFuture);
assertThat(achwtams.responseHandlerFunction).isSameAs(responseHandlerMock);
assertThat(achwtams.performSubSpanAroundDownstreamCalls).isEqualTo(performSubspan);
assertThat(achwtams.circuitBreakerManualTask).isEqualTo(Optional.of(cbManualTaskMock));
if (performSubspan) {
int initialSpanStackSize = (initialSpanStack == null) ? 0 : initialSpanStack.size();
assertThat(achwtams.distributedTraceStackToUse).hasSize(initialSpanStackSize + 1);
Span subspan = (Span) achwtams.distributedTraceStackToUse.peek();
assertThat(subspan.getSpanName())
.isEqualTo(initialSpanNameFromStrategy.get());
if (initialSpan != null) {
assertThat(subspan.getTraceId()).isEqualTo(initialSpan.getTraceId());
assertThat(subspan.getParentSpanId()).isEqualTo(initialSpan.getSpanId());
}
assertThat(achwtams.mdcContextToUse.get(SpanFieldForLoggerMdc.TRACE_ID.mdcKey)).isEqualTo(subspan.getTraceId());
}
else {
assertThat(achwtams.distributedTraceStackToUse).isSameAs(initialSpanStack);
assertThat(achwtams.mdcContextToUse).isSameAs(initialMdc);
}
// Verify that the trace headers were added (or not depending on state).
Span spanForDownstreamCall = achwtams.getSpanForCall();
if (initialSpan == null && !performSubspan) {
assertThat(spanForDownstreamCall).isNull();
verifyNoMoreInteractions(reqMock);
}
else {
assertThat(spanForDownstreamCall).isNotNull();
verify(reqMock).setHeader(TraceHeaders.TRACE_SAMPLED,
convertSampleableBooleanToExpectedB3Value(spanForDownstreamCall.isSampleable()));
verify(reqMock).setHeader(TraceHeaders.TRACE_ID, spanForDownstreamCall.getTraceId());
verify(reqMock).setHeader(TraceHeaders.SPAN_ID, spanForDownstreamCall.getSpanId());
if (spanForDownstreamCall.getParentSpanId() == null) {
verify(reqMock, never()).setHeader(eq(TraceHeaders.PARENT_SPAN_ID), anyString());
}
else {
verify(reqMock).setHeader(TraceHeaders.PARENT_SPAN_ID, spanForDownstreamCall.getParentSpanId());
}
verify(reqMock, never()).setHeader(eq(TraceHeaders.SPAN_NAME), anyString());
}
// Verify that any subspan had request tagging performed.
if (performSubspan) {
strategyRequestTaggingArgs.get().verifyArgs(spanForDownstreamCall, rbw, wingtipsTagAndNamingAdapterMock);
}
}
示例9
public Builder(AsyncHttpClientConfig config, Request request, AsyncHandler<T> asyncHandler) {
this.config = config;
this.request = request;
this.asyncHandler = asyncHandler;
this.future = null;
}
示例10
public Builder(AsyncHttpClientConfig config, Request request, AsyncHandler<T> asyncHandler, NettyResponseFuture<T> future) {
this.config = config;
this.request = request;
this.asyncHandler = asyncHandler;
this.future = future;
}
示例11
public <T> Future<T> execute(final Request request, final AsyncHandler<T> asyncHandler) throws IOException {
return doConnect(request,asyncHandler, null);
}
示例12
private <T> Future<T> doConnect(final Request request, final AsyncHandler<T> asyncHandler, NettyResponseFuture<T> f) throws IOException{
if (isClose.get()){
throw new IOException("Closed");
}
if (activeConnectionsCount.getAndIncrement() >= config.getMaxTotalConnections()) {
throw new IOException("Too many connections");
}
URI uri = createUri(request.getUrl());
if (log.isDebugEnabled())
log.debug("Lookup cache: " + uri.toString());
Channel channel = lookupInCache(uri);
if (channel != null && channel.isOpen()) {
HttpRequest nettyRequest = buildRequest(config,request,uri);
if (f == null) {
f = new NettyResponseFuture<T>(uri, request, asyncHandler,
nettyRequest, config.getRequestTimeoutInMs());
}
executeRequest(channel, config,f,nettyRequest);
return f;
}
ConnectListener<T> c = new ConnectListener.Builder<T>(config, request, asyncHandler,f).build();
configure(uri.getScheme().compareToIgnoreCase("https") == 0, c);
ChannelFuture channelFuture;
try{
if (config.getProxyServer() == null) {
channelFuture = bootstrap.connect(new InetSocketAddress(uri.getHost(), getPort(uri)));
} else {
channelFuture = bootstrap.connect(
new InetSocketAddress(config.getProxyServer().getHost(), config.getProxyServer().getPort()));
}
bootstrap.setOption("connectTimeout", config.getConnectionTimeoutInMs());
} catch (Throwable t){
activeConnectionsCount.decrementAndGet();
log.error(t);
c.future().abort(t.getCause());
return c.future();
}
channelFuture.addListener(c);
openChannels.add(channelFuture.getChannel());
return c.future();
}
示例13
@SuppressWarnings("unchecked")
private final boolean updateStatusAndInterrupt(AsyncHandler handler, HttpResponseStatus c) throws Exception {
return handler.onStatusReceived(c) != STATE.CONTINUE;
}
示例14
@SuppressWarnings("unchecked")
private final boolean updateHeadersAndInterrupt(AsyncHandler handler, HttpResponseHeaders c) throws Exception {
return handler.onHeadersReceived(c) != STATE.CONTINUE;
}
示例15
@SuppressWarnings("unchecked")
private final boolean updateBodyAndInterrupt(AsyncHandler handler, HttpResponseBodyPart c) throws Exception {
return handler.onBodyPartReceived(c) != STATE.CONTINUE;
}
示例16
private void doTest(Request request) throws InterruptedException, ExecutionException, IOException {
final PipedOutputStream pipedOutputStream = new PipedOutputStream();
final PipedInputStream pipedInputStream = new PipedInputStream(pipedOutputStream);
AsyncHandler<Response> asyncHandler = new AsyncHandler<Response>() {
private final Response.ResponseBuilder builder = new Response.ResponseBuilder();
@Override
public STATE onBodyPartReceived(final HttpResponseBodyPart content) throws Exception {
content.writeTo(pipedOutputStream);
return STATE.CONTINUE;
}
@Override
public STATE onStatusReceived(final HttpResponseStatus status) throws Exception {
builder.accumulate(status);
return STATE.CONTINUE;
}
@Override
public STATE onHeadersReceived(final HttpResponseHeaders headers) throws Exception {
builder.accumulate(headers);
return STATE.CONTINUE;
}
@Override
public Response onCompleted() throws Exception {
LOGGER.info("On complete called!");
pipedOutputStream.flush();
pipedOutputStream.close();
return builder.build();
}
@Override
public void onThrowable(Throwable arg0) {
// TODO Auto-generated method stub
LOGGER.error("Error: {}", arg0);
onTestFailed();
}
};
Future<Void> readingThreadFuture = Executors.newCachedThreadPool().submit(new Callable<Void>() {
@Override
public Void call() throws Exception {
BufferedReader reader = new BufferedReader(new InputStreamReader(pipedInputStream));
String readPrediction;
int numPredictionsRead = 0;
while ((readPrediction = reader.readLine()) != null) {
//LOGGER.info("Got prediction: {}", readPrediction);
numPredictionsRead++;
}
LOGGER.info("Read a total of {} predictions", numPredictionsRead);
Assert.assertEquals(roundsOfDataToSubmit * 272274, numPredictionsRead);
return null;
}
});
Builder config = new AsyncHttpClientConfig.Builder();
config.setRequestTimeoutInMs(-1); //need to set this to -1, to indicate wait forever. setting to 0 actually means a 0 ms timeout!
AsyncHttpClient client = new AsyncHttpClient(config.build());
client.executeRequest(request, asyncHandler).get();
readingThreadFuture.get(); //verify no exceptions occurred when reading predictions
client.close();
Assert.assertFalse(getTestFailed());
}
示例17
public AsyncHandler.STATE onStatusReceived(HttpResponseStatus responseStatus) throws Exception {
responseBuilder.reset();
responseBuilder.accumulate(responseStatus);
statusReceived = true;
return AsyncHandler.STATE.CONTINUE;
}
示例18
public AsyncHandler.STATE onHeadersReceived(HttpResponseHeaders headers) throws Exception {
responseBuilder.accumulate(headers);
return AsyncHandler.STATE.CONTINUE;
}
示例19
/**
* Constructor.
*
* @param client client
* @param request request
* @param asyncHandler async handler
*/
public ParsecHttpRequestRetryCallable(
final AsyncHttpClient client, final ParsecAsyncHttpRequest request, final AsyncHandler<T> asyncHandler) {
this(client, request, asyncHandler, DEFAULT_RETRY_INTERVAL);
}