Java源码示例:com.lmax.disruptor.AlertException

示例1
@Override
public long waitFor(final long sequence, Sequence cursor, final Sequence dependentSequence,
                    final SequenceBarrier barrier) throws AlertException {
    long availableSequence;
    int counter = retries;

    // dependentSequence 该项目组织架构中,其实只是生产者的sequence,也就是cursor
    while ((availableSequence = dependentSequence.get()) < sequence) {
        counter = applyWaitMethod(barrier, counter);

        // 每睡眠一次,执行一次循环
        if (counter <= 0) {
            eventLoop.safeLoopOnce();
        }
    }
    return availableSequence;
}
 
示例2
private int applyWaitMethod(final SequenceBarrier barrier, int counter)
        throws AlertException {
    // 检查中断/终止信号
    barrier.checkAlert();

    if (counter > 100) {
        // 大于100时自旋
        --counter;
    } else if (counter > 0) {
        // 大于0时尝试让出Cpu
        --counter;
        Thread.yield();
    } else {
        // 等到最大次数了,睡眠等待
        LockSupport.parkNanos(sleepTimeNs);
    }
    return counter;
}
 
示例3
@Override
public long waitFor(final long sequence, Sequence cursor, final Sequence dependentSequence,
                    final SequenceBarrier barrier) throws AlertException, InterruptedException {

    long availableSequence;
    int spinTries = 0;

    // dependentSequence 该项目组织架构中,其实只是生产者的sequence,也就是cursor
    while ((availableSequence = dependentSequence.get()) < sequence) {
        barrier.checkAlert();
        Thread.onSpinWait();

        if (++spinTries == loopOnceSpinTries) {
            spinTries = 0;
            eventLoop.safeLoopOnce();
        }
    }

    return availableSequence;
}
 
示例4
public long waitFor(long sequence, Sequence cursorSequence, Sequence dependentSequence, SequenceBarrier barrier)
        throws AlertException, InterruptedException {
    if (cursorSequence.get() < sequence) {
        this.lock.lock();

        try {
            while (cursorSequence.get() < sequence) {
                barrier.checkAlert();
                this.processorNotifyCondition.await();
            }
        } finally {
            this.lock.unlock();
        }
    }

    long availableSequence;
    while ((availableSequence = dependentSequence.get()) < sequence) {
        barrier.checkAlert();
        LockSupport.parkNanos(1L);
    }

    return availableSequence;
}
 
示例5
private boolean replay(final boolean unbounded) {
  Sequence replayedSequence;
  MutableSignal<T> signal;
  while ((replayedSequence = processor.cancelledSequences.poll()) != null) {
    signal = processor.ringBuffer.get(replayedSequence.get() + 1L);
    try {
      if (signal.value == null) {
        barrier.waitFor(replayedSequence.get() + 1L);
      }
      readNextEvent(signal, unbounded);
      RingBufferSubscriberUtils.routeOnce(signal, subscriber);
      processor.ringBuffer.removeGatingSequence(replayedSequence);
    } catch (TimeoutException | InterruptedException | AlertException | CancelException ce) {
      processor.ringBuffer.removeGatingSequence(sequence);
      processor.cancelledSequences.add(replayedSequence);
      return true;
    }
  }
  return false;
}
 
示例6
private synchronized List<Object> getConsumeBatch() throws AlertException, InterruptedException, TimeoutException {
    long endCursor = getAvailableConsumeCursor();
    long currCursor = _consumer.get();
    long eventNumber = endCursor - currCursor;
    List<Object> batch = new ArrayList<>((int) eventNumber);
    for (long curr = currCursor + 1; curr <= endCursor; curr++) {
        try {
            MutableObject mo = _buffer.get(curr);
            Object o = mo.o;
            mo.setObject(null);
            batch.add(o);
        } catch (Exception e) {
            LOG.error(e.getMessage(), e);
            throw new RuntimeException(e);
        }
    }
    _consumer.set(endCursor);

    return batch;
}
 
示例7
private int applyWaitMethod(final SequenceBarrier barrier, int counter)
        throws AlertException {
    // 检查中断、停止信号
    barrier.checkAlert();

    if (counter > 0) {
        --counter;
    } else {
        Thread.yield();
    }
    return counter;
}
 
示例8
@Override
public long waitFor(final long sequence, Sequence cursor, final Sequence dependentSequence, final SequenceBarrier barrier) throws AlertException {
    long availableSequence;
    int currentSleep = sleepTimeNsStart;

    while ((availableSequence = dependentSequence.get()) < sequence) {
        currentSleep = applyWaitMethod(barrier, currentSleep);
    }

    return availableSequence;
}
 
示例9
private int applyWaitMethod(final SequenceBarrier barrier, int currentSleep) throws AlertException {
    barrier.checkAlert();

    if (currentSleep < sleepTimeNsMax) {
        LockSupport.parkNanos(currentSleep);
        return currentSleep * 2;
    } else {
        LockSupport.parkNanos(sleepTimeNsMax);
        return currentSleep;
    }
}
 
示例10
public long waitFor(
	long sequence,
	Sequence cursor,
	Sequence dependentSequence,
	SequenceBarrier barrier
) throws AlertException, InterruptedException, TimeoutException {
	long availableSequence;
	if ((availableSequence = cursor.get()) < sequence) {
		flush();
		synchronized (lock) {
			++numWaiters;
			while ((availableSequence = cursor.get()) < sequence) {
				if (state == State.STOPPED) {
					disruptor.halt();
					throw AlertException.INSTANCE;
				}
				barrier.checkAlert();
				//*/
				lock.wait();
				/*/
				Thread.sleep(1);
				//*/
			}
			--numWaiters;
		}
	}
	while ((availableSequence = dependentSequence.get()) < sequence) {
		barrier.checkAlert();
	}

	return availableSequence;
}
 
示例11
private void readNextEvent(MutableSignal<T> event, final boolean unbounded) throws AlertException {
  //if event is Next Signal we need to handle backpressure (pendingRequests)
  if (event.type == MutableSignal.Type.NEXT) {
    if (event.value == null) {
      return;
    }

    //if bounded and out of capacity
    if (!unbounded && pendingRequest.addAndGet(-1l) < 0l) {
      //re-add the retained capacity
      pendingRequest.incrementAndGet();

      //if current sequence does not yet match the published one
      //if (nextSequence < cachedAvailableSequence) {
      //pause until request
      while (pendingRequest.addAndGet(-1l) < 0l) {
        pendingRequest.incrementAndGet();
        if (!running.get()) throw CancelException.INSTANCE;
        //Todo Use WaitStrategy?
        LockSupport.parkNanos(1l);
      }
    }
  } else if (event.type != null) {
    //Complete or Error are terminal events, we shutdown the processor and process the signal
    running.set(false);
    RingBufferSubscriberUtils.route(event, subscriber);
    Subscription s = processor.upstreamSubscription;
    if (s != null) {
      s.cancel();
    }
    throw CancelException.INSTANCE;
  }
}
 
示例12
@Override
public long waitFor(long sequence,
                    Sequence cursor,
                    Sequence dependentSequence,
                    SequenceBarrier barrier) throws AlertException,
  InterruptedException,
  TimeoutException {
  long availableSequence;
  while ((availableSequence = dependentSequence.get()) < sequence) {
    barrier.checkAlert();
    LockSupport.parkNanos(parkFor);
  }
  return availableSequence;
}
 
示例13
@Override
public long waitFor(final long sequence, Sequence cursor, final Sequence dependentSequence, final SequenceBarrier barrier)
        throws AlertException, InterruptedException
{
    long availableSequence;

    while ((availableSequence = dependentSequence.get()) < sequence)
    {
        SpinHint.spinLoopHint();
        barrier.checkAlert();
    }

    return availableSequence;
}
 
示例14
public void asyncConsumeBatchToCursor(EventHandler<Object> handler) throws AlertException, InterruptedException, TimeoutException {
    List<Object> batch = getConsumeBatch();
    if (batch == null)
        return;

    for (int i = 0; i < batch.size(); i++) {
        try {
            handler.onEvent(batch.get(i), 0, i == (batch.size() - 1));
        } catch (Exception e) {
            LOG.error(e.getMessage(), e);
            throw new RuntimeException(e);
        }
    }
}
 
示例15
public static <T> boolean waitRequestOrTerminalEvent(
  Sequence pendingRequest,
  RingBuffer<MutableSignal<T>> ringBuffer,
  SequenceBarrier barrier,
  Subscriber<? super T> subscriber,
  AtomicBoolean isRunning
) {
  final long waitedSequence = ringBuffer.getCursor() + 1L;
  try {
    MutableSignal<T> event = null;
    while (pendingRequest.get() < 0l) {
      //pause until first request
      if (event == null) {
        barrier.waitFor(waitedSequence);
        event = ringBuffer.get(waitedSequence);

        if (event.type == MutableSignal.Type.COMPLETE) {
          try {
            subscriber.onComplete();
            return false;
          } catch (Throwable t) {
            Exceptions.throwIfFatal(t);
            subscriber.onError(t);
            return false;
          }
        } else if (event.type == MutableSignal.Type.ERROR) {
          subscriber.onError(event.error);
          return false;
        }
      } else {
        barrier.checkAlert();
      }
      LockSupport.parkNanos(1l);
    }
  } catch (TimeoutException te) {
    //ignore
  } catch (AlertException ae) {
    if (!isRunning.get()) {
      return false;
    }
  } catch (InterruptedException ie) {
    Thread.currentThread().interrupt();
  }

  return true;
}
 
示例16
@Override
public long waitFor(long sequence, Sequence cursor, Sequence dependentSequence, SequenceBarrier barrier)
  throws AlertException, InterruptedException, TimeoutException {
  return currentStrategy.waitFor(sequence, cursor, dependentSequence, barrier);
}
 
示例17
private long getAvailableConsumeCursor() throws AlertException, InterruptedException, TimeoutException {
    final long nextSequence = _consumer.get() + 1;
    return _barrier.waitFor(nextSequence);
}
 
示例18
@Override
public List<Object> retreiveAvailableBatch() throws AlertException, InterruptedException, TimeoutException {
    // get all events in disruptor queue
    return getConsumeBatch();
}