Java源码示例:com.lmax.disruptor.AlertException
示例1
@Override
public long waitFor(final long sequence, Sequence cursor, final Sequence dependentSequence,
final SequenceBarrier barrier) throws AlertException {
long availableSequence;
int counter = retries;
// dependentSequence 该项目组织架构中,其实只是生产者的sequence,也就是cursor
while ((availableSequence = dependentSequence.get()) < sequence) {
counter = applyWaitMethod(barrier, counter);
// 每睡眠一次,执行一次循环
if (counter <= 0) {
eventLoop.safeLoopOnce();
}
}
return availableSequence;
}
示例2
private int applyWaitMethod(final SequenceBarrier barrier, int counter)
throws AlertException {
// 检查中断/终止信号
barrier.checkAlert();
if (counter > 100) {
// 大于100时自旋
--counter;
} else if (counter > 0) {
// 大于0时尝试让出Cpu
--counter;
Thread.yield();
} else {
// 等到最大次数了,睡眠等待
LockSupport.parkNanos(sleepTimeNs);
}
return counter;
}
示例3
@Override
public long waitFor(final long sequence, Sequence cursor, final Sequence dependentSequence,
final SequenceBarrier barrier) throws AlertException, InterruptedException {
long availableSequence;
int spinTries = 0;
// dependentSequence 该项目组织架构中,其实只是生产者的sequence,也就是cursor
while ((availableSequence = dependentSequence.get()) < sequence) {
barrier.checkAlert();
Thread.onSpinWait();
if (++spinTries == loopOnceSpinTries) {
spinTries = 0;
eventLoop.safeLoopOnce();
}
}
return availableSequence;
}
示例4
public long waitFor(long sequence, Sequence cursorSequence, Sequence dependentSequence, SequenceBarrier barrier)
throws AlertException, InterruptedException {
if (cursorSequence.get() < sequence) {
this.lock.lock();
try {
while (cursorSequence.get() < sequence) {
barrier.checkAlert();
this.processorNotifyCondition.await();
}
} finally {
this.lock.unlock();
}
}
long availableSequence;
while ((availableSequence = dependentSequence.get()) < sequence) {
barrier.checkAlert();
LockSupport.parkNanos(1L);
}
return availableSequence;
}
示例5
private boolean replay(final boolean unbounded) {
Sequence replayedSequence;
MutableSignal<T> signal;
while ((replayedSequence = processor.cancelledSequences.poll()) != null) {
signal = processor.ringBuffer.get(replayedSequence.get() + 1L);
try {
if (signal.value == null) {
barrier.waitFor(replayedSequence.get() + 1L);
}
readNextEvent(signal, unbounded);
RingBufferSubscriberUtils.routeOnce(signal, subscriber);
processor.ringBuffer.removeGatingSequence(replayedSequence);
} catch (TimeoutException | InterruptedException | AlertException | CancelException ce) {
processor.ringBuffer.removeGatingSequence(sequence);
processor.cancelledSequences.add(replayedSequence);
return true;
}
}
return false;
}
示例6
private synchronized List<Object> getConsumeBatch() throws AlertException, InterruptedException, TimeoutException {
long endCursor = getAvailableConsumeCursor();
long currCursor = _consumer.get();
long eventNumber = endCursor - currCursor;
List<Object> batch = new ArrayList<>((int) eventNumber);
for (long curr = currCursor + 1; curr <= endCursor; curr++) {
try {
MutableObject mo = _buffer.get(curr);
Object o = mo.o;
mo.setObject(null);
batch.add(o);
} catch (Exception e) {
LOG.error(e.getMessage(), e);
throw new RuntimeException(e);
}
}
_consumer.set(endCursor);
return batch;
}
示例7
private int applyWaitMethod(final SequenceBarrier barrier, int counter)
throws AlertException {
// 检查中断、停止信号
barrier.checkAlert();
if (counter > 0) {
--counter;
} else {
Thread.yield();
}
return counter;
}
示例8
@Override
public long waitFor(final long sequence, Sequence cursor, final Sequence dependentSequence, final SequenceBarrier barrier) throws AlertException {
long availableSequence;
int currentSleep = sleepTimeNsStart;
while ((availableSequence = dependentSequence.get()) < sequence) {
currentSleep = applyWaitMethod(barrier, currentSleep);
}
return availableSequence;
}
示例9
private int applyWaitMethod(final SequenceBarrier barrier, int currentSleep) throws AlertException {
barrier.checkAlert();
if (currentSleep < sleepTimeNsMax) {
LockSupport.parkNanos(currentSleep);
return currentSleep * 2;
} else {
LockSupport.parkNanos(sleepTimeNsMax);
return currentSleep;
}
}
示例10
public long waitFor(
long sequence,
Sequence cursor,
Sequence dependentSequence,
SequenceBarrier barrier
) throws AlertException, InterruptedException, TimeoutException {
long availableSequence;
if ((availableSequence = cursor.get()) < sequence) {
flush();
synchronized (lock) {
++numWaiters;
while ((availableSequence = cursor.get()) < sequence) {
if (state == State.STOPPED) {
disruptor.halt();
throw AlertException.INSTANCE;
}
barrier.checkAlert();
//*/
lock.wait();
/*/
Thread.sleep(1);
//*/
}
--numWaiters;
}
}
while ((availableSequence = dependentSequence.get()) < sequence) {
barrier.checkAlert();
}
return availableSequence;
}
示例11
private void readNextEvent(MutableSignal<T> event, final boolean unbounded) throws AlertException {
//if event is Next Signal we need to handle backpressure (pendingRequests)
if (event.type == MutableSignal.Type.NEXT) {
if (event.value == null) {
return;
}
//if bounded and out of capacity
if (!unbounded && pendingRequest.addAndGet(-1l) < 0l) {
//re-add the retained capacity
pendingRequest.incrementAndGet();
//if current sequence does not yet match the published one
//if (nextSequence < cachedAvailableSequence) {
//pause until request
while (pendingRequest.addAndGet(-1l) < 0l) {
pendingRequest.incrementAndGet();
if (!running.get()) throw CancelException.INSTANCE;
//Todo Use WaitStrategy?
LockSupport.parkNanos(1l);
}
}
} else if (event.type != null) {
//Complete or Error are terminal events, we shutdown the processor and process the signal
running.set(false);
RingBufferSubscriberUtils.route(event, subscriber);
Subscription s = processor.upstreamSubscription;
if (s != null) {
s.cancel();
}
throw CancelException.INSTANCE;
}
}
示例12
@Override
public long waitFor(long sequence,
Sequence cursor,
Sequence dependentSequence,
SequenceBarrier barrier) throws AlertException,
InterruptedException,
TimeoutException {
long availableSequence;
while ((availableSequence = dependentSequence.get()) < sequence) {
barrier.checkAlert();
LockSupport.parkNanos(parkFor);
}
return availableSequence;
}
示例13
@Override
public long waitFor(final long sequence, Sequence cursor, final Sequence dependentSequence, final SequenceBarrier barrier)
throws AlertException, InterruptedException
{
long availableSequence;
while ((availableSequence = dependentSequence.get()) < sequence)
{
SpinHint.spinLoopHint();
barrier.checkAlert();
}
return availableSequence;
}
示例14
public void asyncConsumeBatchToCursor(EventHandler<Object> handler) throws AlertException, InterruptedException, TimeoutException {
List<Object> batch = getConsumeBatch();
if (batch == null)
return;
for (int i = 0; i < batch.size(); i++) {
try {
handler.onEvent(batch.get(i), 0, i == (batch.size() - 1));
} catch (Exception e) {
LOG.error(e.getMessage(), e);
throw new RuntimeException(e);
}
}
}
示例15
public static <T> boolean waitRequestOrTerminalEvent(
Sequence pendingRequest,
RingBuffer<MutableSignal<T>> ringBuffer,
SequenceBarrier barrier,
Subscriber<? super T> subscriber,
AtomicBoolean isRunning
) {
final long waitedSequence = ringBuffer.getCursor() + 1L;
try {
MutableSignal<T> event = null;
while (pendingRequest.get() < 0l) {
//pause until first request
if (event == null) {
barrier.waitFor(waitedSequence);
event = ringBuffer.get(waitedSequence);
if (event.type == MutableSignal.Type.COMPLETE) {
try {
subscriber.onComplete();
return false;
} catch (Throwable t) {
Exceptions.throwIfFatal(t);
subscriber.onError(t);
return false;
}
} else if (event.type == MutableSignal.Type.ERROR) {
subscriber.onError(event.error);
return false;
}
} else {
barrier.checkAlert();
}
LockSupport.parkNanos(1l);
}
} catch (TimeoutException te) {
//ignore
} catch (AlertException ae) {
if (!isRunning.get()) {
return false;
}
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
return true;
}
示例16
@Override
public long waitFor(long sequence, Sequence cursor, Sequence dependentSequence, SequenceBarrier barrier)
throws AlertException, InterruptedException, TimeoutException {
return currentStrategy.waitFor(sequence, cursor, dependentSequence, barrier);
}
示例17
private long getAvailableConsumeCursor() throws AlertException, InterruptedException, TimeoutException {
final long nextSequence = _consumer.get() + 1;
return _barrier.waitFor(nextSequence);
}
示例18
@Override
public List<Object> retreiveAvailableBatch() throws AlertException, InterruptedException, TimeoutException {
// get all events in disruptor queue
return getConsumeBatch();
}