Java源码示例:co.paralleluniverse.fibers.FiberScheduler

示例1
/**
 * Record events in a separate Fiber using one or more Recorders.
 *
 * This method returns immediately after starting the Fiber for recording events.
 *
 * @param fiberName the name of the fiber that records events.
 * @param fe an optional scheduler for the spawned recording fiber, null to use the default.
 * @param rp a channel on which to receive TimingEvents, which are passed to the underlying
 *           Recorders.
 * @param rs zero or more Recorders, each of which receives every event (after the delay period)
 *           in the order they are passed to this method.
 * @param <T> the type of the Response object in each TimingEvent.
 */
@SafeVarargs
static <T> Fiber<Void> record(final String fiberName,
                              final FiberScheduler fe,
                              final ReceivePort<TimingEvent<T>> rp,
                              final Recorder<T>... rs)
{
  return new Fiber<Void>(fiberName, fe != null ? fe : DefaultFiberScheduler.getInstance(), () -> {
    while (true) {
      final TimingEvent<T> event = rp.receive();

      if (event == null) {
        break;
      }

      for (final Recorder<T> r : rs) {
        r.record(event);
      }
    }
    return null;
  }).start();
}
 
示例2
/**
 * Get the `FiberScheduler` for the current context. There should be only one instance per context.
 * @return  the scheduler
 */
@Suspendable
public static FiberScheduler getContextScheduler() {
  Context context = Vertx.currentContext();
  if (context == null) {
    throw new IllegalStateException("Not in context");
  }
  if (!context.isEventLoopContext()) {
    throw new IllegalStateException("Not on event loop");
  }
  // We maintain one scheduler per context
  FiberScheduler scheduler = context.get(FIBER_SCHEDULER_CONTEXT_KEY);
  if (scheduler == null) {
    Thread eventLoop = Thread.currentThread();
    scheduler = new FiberExecutorScheduler("vertx.contextScheduler", command -> {
      if (Thread.currentThread() != eventLoop) {
        context.runOnContext(v -> command.run());
      } else {
        // Just run directly
        command.run();
      }
    });
    context.put(FIBER_SCHEDULER_CONTEXT_KEY, scheduler);
  }
  return scheduler;
}
 
示例3
private static FiberScheduler defaultScheduler() {
    return instance;
}
 
示例4
private static <Req, Res> void loadTestThroughput(final IntervalGenerator intervalGen,
                                                  int warmupRequests,
                                                  final ReceivePort<Req> requests,
                                                  final RequestExecutor<Req, Res> executor,
                                                  final SendPort<TimingEvent<Res>> eventChannel,
                                                  final FiberScheduler fiberScheduler,
                                                  final StrandFactory strandFactory)
        throws SuspendExecution, InterruptedException
{
  final long startNanos = System.nanoTime();

  try {
    long overageNanos = 0;
    long overageStart = System.nanoTime();

    final WaitGroup waitGroup = new WaitGroup();
    while (true) {
      final long receiveNanosStart = System.nanoTime();
      final Req request = requests.receive();
      LOG.trace("Receive request time: {}", System.nanoTime() - receiveNanosStart);
      if (request == null) {
        break;
      }

      // Wait before dispatching request as much as generated, minus the remaining dispatching overhead
      // to be compensated for (up to having 0 waiting time of course, not negative)
      long waitNanos = intervalGen.nextInterval(System.nanoTime() - startNanos);
      final long adjust = Math.min(waitNanos, overageNanos);
      waitNanos -= adjust;
      overageNanos -= adjust;

      // Sleep in the accepting fiber
      long sleepNanosStart = System.nanoTime();
      Strand.sleep(waitNanos, TimeUnit.NANOSECONDS);
      LOG.trace("Sleep time: {}", System.nanoTime() - sleepNanosStart);

      // Increment wait group count for new request handler
      waitGroup.add();
      final long curWaitNanos = waitNanos;
      final long curWarmupRequests = warmupRequests;
      final long curOverageNanos = overageNanos;

      final SuspendableCallable<Void> sc = () -> {
        try {
          final RequestExecOutcome<Res> outcome = executeRequest(request, executor);
          if (curWarmupRequests <= 0) {
            report(curWaitNanos, curOverageNanos, outcome, eventChannel);
          }
        } finally {
          // Complete, decrementing wait group count
          waitGroup.done();
        }
        return null;
      };
      if (fiberScheduler != null) {
        new Fiber<>(fiberScheduler, sc).start();
      } else if (strandFactory != null) {
        strandFactory.newStrand(sc).start();
      } else {
        new Fiber<>(sc).start();
      }

      final long nowNanos = System.nanoTime();
      overageNanos += nowNanos - overageStart - waitNanos;
      overageStart = nowNanos;
      warmupRequests = Math.max(warmupRequests - 1, 0);
    }

    // Wait for all outstanding requests
    waitGroup.await();
  } finally {
    eventChannel.close();
  }
}
 
示例5
private static <Req, Res> void loadTestConcurrency(final int concurrency,
                                                   int warmupRequests,
                                                   final ReceivePort<Req> requests,
                                                   final RequestExecutor<Req, Res> executor,
                                                   final SendPort<TimingEvent<Res>> eventChannel,
                                                   final FiberScheduler fiberScheduler,
                                                   final StrandFactory strandFactory)
        throws SuspendExecution, InterruptedException
{
  try {
    final WaitGroup waitGroup = new WaitGroup();
    final Semaphore running = new Semaphore(concurrency);

    while (true) {
      final Req request = requests.receive();
      if (request == null) {
        break;
      }

      running.acquire();
      waitGroup.add();
      final long curWarmupRequests = warmupRequests;
      final SuspendableCallable<Void> sc = () -> {
        try {
          final RequestExecOutcome<Res> outcome = executeRequest(request, executor);
          if (curWarmupRequests <= 0) {
            report(0, 0, outcome, eventChannel);
          }
        } finally {
          running.release();
          waitGroup.done();
        }
        return null;
      };
      if (fiberScheduler != null) {
        new Fiber<>(fiberScheduler, sc).start();
      } else if (strandFactory != null) {
        strandFactory.newStrand(sc).start();
      } else {
        new Fiber<>(sc).start();
      }

      warmupRequests = Math.max(warmupRequests - 1, 0);
    }

    waitGroup.await();
  } finally {
    eventChannel.close();
  }
}
 
示例6
public HandlerReceiverAdaptorImpl(FiberScheduler fiberScheduler) {
  this.fiberScheduler = fiberScheduler;
  channel = Channels.newChannel(-1, Channels.OverflowPolicy.DROP, true, true);
}
 
示例7
public HandlerReceiverAdaptorImpl(FiberScheduler fiberScheduler, Channel<T> channel) {
  this.fiberScheduler = fiberScheduler;
  this.channel = channel;
}
 
示例8
/**
 * Run a load test with a given throughput, using as many fibers as necessary.
 *
 * This method can be run in any strand; thread-fiber synchronization is more expensive than
 * fiber-fiber synchronization though, so if requests are being performed by fibers its best
 * to call this method inside a fiber.
 *
 * @param intervalGen provides the interval between subsequent requests (in nanoseconds). This
 *                    controls the throughput of the load test.
 * @param warmupRequests the number of requests to use as "warmup" for the load tester and the
 *                       service. These requests will not have TimingEvents generated in the
 *                       eventChannel, but will be sent to the remote service at the requested
 *                       rate.
 * @param requests provides requests for the load test, must be closed by the caller to stop the
 *                 load test (the load test will continue for as long as this channel is open,
 *                 even if there are no requests arriving).
 * @param executor executes the requests provided by the requests channel, returning a response
 *                 object.
 * @param eventChannel a TimingEvent is sent on this channel for every request executed during
 *                     the load test (whether the request succeeds or not).
 * @param fiberScheduler an optional scheduler for fibers that will perform the requests (the
 *                       default one will be used if {@code null}).
 * @param <Req> the request type.
 * @param <Res> the response type.
 * @throws SuspendExecution
 * @throws InterruptedException
 */
public static <Req, Res> void loadTestThroughput(final IntervalGenerator intervalGen,
                                                 final int warmupRequests,
                                                 final ReceivePort<Req> requests,
                                                 final RequestExecutor<Req, Res> executor,
                                                 final SendPort<TimingEvent<Res>> eventChannel,
                                                 final FiberScheduler fiberScheduler)
        throws InterruptedException, SuspendExecution
{
  loadTestThroughput(intervalGen, warmupRequests, requests, executor, eventChannel, fiberScheduler, null);
}
 
示例9
/**
 * Run a load test with a given number of fibers, making as many requests as possible.
 *
 * This method can be run in any strand; thread-fiber synchronization is more expensive than
 * fiber-fiber synchronization though, so if requests are being performed by fibers its best
 * to call this method inside a fiber.
 *
 * @param concurrency the number of Fibers to run. Each Fiber will execute requests serially with
 *                    as little overhead as possible.
 * @param warmupRequests the number of requests to use when warming up the load tester and the
 *                       remote service. These requests will not not have TimingEvents generated
 *                       in the eventChannel, but will be sent to the remote service.
 * @param requests provides requests for the load test and must be closed by the caller to stop
 *                 the load test (the load test will continue for as long as this channel is
 *                 open, even if there are no requests arriving).
 * @param executor executes the requets provided by the requests channel, returning a response
 *                 object.
 * @param eventChannel a TimingEvent is sent on this channel for every request executed during
 *                     the load test (whether the request succeeds or not).
 * @param fiberScheduler an optional scheduler for fibers that will perform the requests (the
 *                       default one will be used if {@code null}).
 * @param <Req> the request type.
 * @param <Res> the response type.
 * @throws SuspendExecution
 * @throws InterruptedException
 */
public static <Req, Res> void loadTestConcurrency(final int concurrency,
                                                  final int warmupRequests,
                                                  final ReceivePort<Req> requests,
                                                  final RequestExecutor<Req, Res> executor,
                                                  final SendPort<TimingEvent<Res>> eventChannel,
                                                  final FiberScheduler fiberScheduler)
        throws SuspendExecution, InterruptedException
{
  loadTestConcurrency(concurrency, warmupRequests, requests, executor, eventChannel, fiberScheduler, null);
}
 
示例10
/**
 * Convert a standard handler to a handler which runs on a fiber. This is necessary if you want to do fiber blocking
 * synchronous operations in your handler.
 *
 * @param handler  the standard handler
 * @param <T>  the event type of the handler
 * @return  a wrapped handler that runs the handler on a fiber
 */
@Suspendable
public static <T> Handler<T> fiberHandler(Handler<T> handler) {
  FiberScheduler scheduler = getContextScheduler();
  return p -> new Fiber<Void>(scheduler, () -> handler.handle(p)).start();
}