Java源码示例:co.paralleluniverse.fibers.FiberScheduler
示例1
/**
* Record events in a separate Fiber using one or more Recorders.
*
* This method returns immediately after starting the Fiber for recording events.
*
* @param fiberName the name of the fiber that records events.
* @param fe an optional scheduler for the spawned recording fiber, null to use the default.
* @param rp a channel on which to receive TimingEvents, which are passed to the underlying
* Recorders.
* @param rs zero or more Recorders, each of which receives every event (after the delay period)
* in the order they are passed to this method.
* @param <T> the type of the Response object in each TimingEvent.
*/
@SafeVarargs
static <T> Fiber<Void> record(final String fiberName,
final FiberScheduler fe,
final ReceivePort<TimingEvent<T>> rp,
final Recorder<T>... rs)
{
return new Fiber<Void>(fiberName, fe != null ? fe : DefaultFiberScheduler.getInstance(), () -> {
while (true) {
final TimingEvent<T> event = rp.receive();
if (event == null) {
break;
}
for (final Recorder<T> r : rs) {
r.record(event);
}
}
return null;
}).start();
}
示例2
/**
* Get the `FiberScheduler` for the current context. There should be only one instance per context.
* @return the scheduler
*/
@Suspendable
public static FiberScheduler getContextScheduler() {
Context context = Vertx.currentContext();
if (context == null) {
throw new IllegalStateException("Not in context");
}
if (!context.isEventLoopContext()) {
throw new IllegalStateException("Not on event loop");
}
// We maintain one scheduler per context
FiberScheduler scheduler = context.get(FIBER_SCHEDULER_CONTEXT_KEY);
if (scheduler == null) {
Thread eventLoop = Thread.currentThread();
scheduler = new FiberExecutorScheduler("vertx.contextScheduler", command -> {
if (Thread.currentThread() != eventLoop) {
context.runOnContext(v -> command.run());
} else {
// Just run directly
command.run();
}
});
context.put(FIBER_SCHEDULER_CONTEXT_KEY, scheduler);
}
return scheduler;
}
示例3
private static FiberScheduler defaultScheduler() {
return instance;
}
示例4
private static <Req, Res> void loadTestThroughput(final IntervalGenerator intervalGen,
int warmupRequests,
final ReceivePort<Req> requests,
final RequestExecutor<Req, Res> executor,
final SendPort<TimingEvent<Res>> eventChannel,
final FiberScheduler fiberScheduler,
final StrandFactory strandFactory)
throws SuspendExecution, InterruptedException
{
final long startNanos = System.nanoTime();
try {
long overageNanos = 0;
long overageStart = System.nanoTime();
final WaitGroup waitGroup = new WaitGroup();
while (true) {
final long receiveNanosStart = System.nanoTime();
final Req request = requests.receive();
LOG.trace("Receive request time: {}", System.nanoTime() - receiveNanosStart);
if (request == null) {
break;
}
// Wait before dispatching request as much as generated, minus the remaining dispatching overhead
// to be compensated for (up to having 0 waiting time of course, not negative)
long waitNanos = intervalGen.nextInterval(System.nanoTime() - startNanos);
final long adjust = Math.min(waitNanos, overageNanos);
waitNanos -= adjust;
overageNanos -= adjust;
// Sleep in the accepting fiber
long sleepNanosStart = System.nanoTime();
Strand.sleep(waitNanos, TimeUnit.NANOSECONDS);
LOG.trace("Sleep time: {}", System.nanoTime() - sleepNanosStart);
// Increment wait group count for new request handler
waitGroup.add();
final long curWaitNanos = waitNanos;
final long curWarmupRequests = warmupRequests;
final long curOverageNanos = overageNanos;
final SuspendableCallable<Void> sc = () -> {
try {
final RequestExecOutcome<Res> outcome = executeRequest(request, executor);
if (curWarmupRequests <= 0) {
report(curWaitNanos, curOverageNanos, outcome, eventChannel);
}
} finally {
// Complete, decrementing wait group count
waitGroup.done();
}
return null;
};
if (fiberScheduler != null) {
new Fiber<>(fiberScheduler, sc).start();
} else if (strandFactory != null) {
strandFactory.newStrand(sc).start();
} else {
new Fiber<>(sc).start();
}
final long nowNanos = System.nanoTime();
overageNanos += nowNanos - overageStart - waitNanos;
overageStart = nowNanos;
warmupRequests = Math.max(warmupRequests - 1, 0);
}
// Wait for all outstanding requests
waitGroup.await();
} finally {
eventChannel.close();
}
}
示例5
private static <Req, Res> void loadTestConcurrency(final int concurrency,
int warmupRequests,
final ReceivePort<Req> requests,
final RequestExecutor<Req, Res> executor,
final SendPort<TimingEvent<Res>> eventChannel,
final FiberScheduler fiberScheduler,
final StrandFactory strandFactory)
throws SuspendExecution, InterruptedException
{
try {
final WaitGroup waitGroup = new WaitGroup();
final Semaphore running = new Semaphore(concurrency);
while (true) {
final Req request = requests.receive();
if (request == null) {
break;
}
running.acquire();
waitGroup.add();
final long curWarmupRequests = warmupRequests;
final SuspendableCallable<Void> sc = () -> {
try {
final RequestExecOutcome<Res> outcome = executeRequest(request, executor);
if (curWarmupRequests <= 0) {
report(0, 0, outcome, eventChannel);
}
} finally {
running.release();
waitGroup.done();
}
return null;
};
if (fiberScheduler != null) {
new Fiber<>(fiberScheduler, sc).start();
} else if (strandFactory != null) {
strandFactory.newStrand(sc).start();
} else {
new Fiber<>(sc).start();
}
warmupRequests = Math.max(warmupRequests - 1, 0);
}
waitGroup.await();
} finally {
eventChannel.close();
}
}
示例6
public HandlerReceiverAdaptorImpl(FiberScheduler fiberScheduler) {
this.fiberScheduler = fiberScheduler;
channel = Channels.newChannel(-1, Channels.OverflowPolicy.DROP, true, true);
}
示例7
public HandlerReceiverAdaptorImpl(FiberScheduler fiberScheduler, Channel<T> channel) {
this.fiberScheduler = fiberScheduler;
this.channel = channel;
}
示例8
/**
* Run a load test with a given throughput, using as many fibers as necessary.
*
* This method can be run in any strand; thread-fiber synchronization is more expensive than
* fiber-fiber synchronization though, so if requests are being performed by fibers its best
* to call this method inside a fiber.
*
* @param intervalGen provides the interval between subsequent requests (in nanoseconds). This
* controls the throughput of the load test.
* @param warmupRequests the number of requests to use as "warmup" for the load tester and the
* service. These requests will not have TimingEvents generated in the
* eventChannel, but will be sent to the remote service at the requested
* rate.
* @param requests provides requests for the load test, must be closed by the caller to stop the
* load test (the load test will continue for as long as this channel is open,
* even if there are no requests arriving).
* @param executor executes the requests provided by the requests channel, returning a response
* object.
* @param eventChannel a TimingEvent is sent on this channel for every request executed during
* the load test (whether the request succeeds or not).
* @param fiberScheduler an optional scheduler for fibers that will perform the requests (the
* default one will be used if {@code null}).
* @param <Req> the request type.
* @param <Res> the response type.
* @throws SuspendExecution
* @throws InterruptedException
*/
public static <Req, Res> void loadTestThroughput(final IntervalGenerator intervalGen,
final int warmupRequests,
final ReceivePort<Req> requests,
final RequestExecutor<Req, Res> executor,
final SendPort<TimingEvent<Res>> eventChannel,
final FiberScheduler fiberScheduler)
throws InterruptedException, SuspendExecution
{
loadTestThroughput(intervalGen, warmupRequests, requests, executor, eventChannel, fiberScheduler, null);
}
示例9
/**
* Run a load test with a given number of fibers, making as many requests as possible.
*
* This method can be run in any strand; thread-fiber synchronization is more expensive than
* fiber-fiber synchronization though, so if requests are being performed by fibers its best
* to call this method inside a fiber.
*
* @param concurrency the number of Fibers to run. Each Fiber will execute requests serially with
* as little overhead as possible.
* @param warmupRequests the number of requests to use when warming up the load tester and the
* remote service. These requests will not not have TimingEvents generated
* in the eventChannel, but will be sent to the remote service.
* @param requests provides requests for the load test and must be closed by the caller to stop
* the load test (the load test will continue for as long as this channel is
* open, even if there are no requests arriving).
* @param executor executes the requets provided by the requests channel, returning a response
* object.
* @param eventChannel a TimingEvent is sent on this channel for every request executed during
* the load test (whether the request succeeds or not).
* @param fiberScheduler an optional scheduler for fibers that will perform the requests (the
* default one will be used if {@code null}).
* @param <Req> the request type.
* @param <Res> the response type.
* @throws SuspendExecution
* @throws InterruptedException
*/
public static <Req, Res> void loadTestConcurrency(final int concurrency,
final int warmupRequests,
final ReceivePort<Req> requests,
final RequestExecutor<Req, Res> executor,
final SendPort<TimingEvent<Res>> eventChannel,
final FiberScheduler fiberScheduler)
throws SuspendExecution, InterruptedException
{
loadTestConcurrency(concurrency, warmupRequests, requests, executor, eventChannel, fiberScheduler, null);
}
示例10
/**
* Convert a standard handler to a handler which runs on a fiber. This is necessary if you want to do fiber blocking
* synchronous operations in your handler.
*
* @param handler the standard handler
* @param <T> the event type of the handler
* @return a wrapped handler that runs the handler on a fiber
*/
@Suspendable
public static <T> Handler<T> fiberHandler(Handler<T> handler) {
FiberScheduler scheduler = getContextScheduler();
return p -> new Fiber<Void>(scheduler, () -> handler.handle(p)).start();
}