Java源码示例:com.lmax.disruptor.BusySpinWaitStrategy
示例1
@Override
public void afterPropertiesSet() throws Exception {
if ("BusySpinWaitStrategy".equals(waitStrategy)) {
disruptor = new Disruptor<FastEvent>(new FastEventFactory(), 65536, DaemonThreadFactory.INSTANCE,
ProducerType.MULTI, new BusySpinWaitStrategy());
} else if ("SleepingWaitStrategy".equals(waitStrategy)) {
disruptor = new Disruptor<FastEvent>(new FastEventFactory(), 65536, DaemonThreadFactory.INSTANCE,
ProducerType.MULTI, new SleepingWaitStrategy());
} else if ("BlockingWaitStrategy".equals(waitStrategy)) {
disruptor = new Disruptor<FastEvent>(new FastEventFactory(), 65536, DaemonThreadFactory.INSTANCE,
ProducerType.MULTI, new BlockingWaitStrategy());
} else {
disruptor = new Disruptor<FastEvent>(new FastEventFactory(), 65536, DaemonThreadFactory.INSTANCE,
ProducerType.MULTI, new YieldingWaitStrategy());
}
ringBuffer = disruptor.start();
}
示例2
static WaitStrategy createWaitStrategy(final String propertyName, final long timeoutMillis) {
final String strategy = PropertiesUtil.getProperties().getStringProperty(propertyName, "TIMEOUT");
LOGGER.trace("property {}={}", propertyName, strategy);
final String strategyUp = strategy.toUpperCase(Locale.ROOT); // TODO Refactor into Strings.toRootUpperCase(String)
switch (strategyUp) { // TODO Define a DisruptorWaitStrategy enum?
case "SLEEP":
return new SleepingWaitStrategy();
case "YIELD":
return new YieldingWaitStrategy();
case "BLOCK":
return new BlockingWaitStrategy();
case "BUSYSPIN":
return new BusySpinWaitStrategy();
case "TIMEOUT":
return new TimeoutBlockingWaitStrategy(timeoutMillis, TimeUnit.MILLISECONDS);
default:
return new TimeoutBlockingWaitStrategy(timeoutMillis, TimeUnit.MILLISECONDS);
}
}
示例3
/**
* Starts a background thread to process events
*/
public void start() {
if (disruptor == null) {
disruptor = new Disruptor<BufferEvent>(BufferEvent::new, queueDepth, threadFactory,
ProducerType.SINGLE, new BusySpinWaitStrategy());
// Connect the handler
disruptor.handleEventsWith(eventHandler);
// Start the Disruptor, starts all threads running
disruptor.start();
ringBuffer = disruptor.getRingBuffer();
}
}
示例4
@Override
protected void configure() {
switch (config.getWaitStrategyEnum()) {
// A low-cpu usage Disruptor configuration for using in local/test environments
case LOW_CPU:
bind(WaitStrategy.class).annotatedWith(Names.named("PersistenceStrategy")).to(BlockingWaitStrategy.class);
bind(WaitStrategy.class).annotatedWith(Names.named("ReplyStrategy")).to(BlockingWaitStrategy.class);
bind(WaitStrategy.class).annotatedWith(Names.named("RetryStrategy")).to(BlockingWaitStrategy.class);
break;
// The default high-cpu usage Disruptor configuration for getting high throughput on production environments
case HIGH_THROUGHPUT:
default:
bind(WaitStrategy.class).annotatedWith(Names.named("PersistenceStrategy")).to(BusySpinWaitStrategy.class);
bind(WaitStrategy.class).annotatedWith(Names.named("ReplyStrategy")).to(BusySpinWaitStrategy.class);
bind(WaitStrategy.class).annotatedWith(Names.named("RetryStrategy")).to(YieldingWaitStrategy.class);
break;
}
if (config.getLowLatency()) {
bind(RequestProcessor.class).to(RequestProcessorSkipCT.class).in(Singleton.class);
bind(PersistenceProcessor.class).to(PersitenceProcessorNullImpl.class).in(Singleton.class);
} else {
bind(PersistenceProcessor.class).to(PersistenceProcessorImpl.class).in(Singleton.class);
bind(RequestProcessor.class).to(RequestProcessorPersistCT.class).in(Singleton.class);
}
bind(ReplyProcessor.class).to(ReplyProcessorImpl.class).in(Singleton.class);
bind(RetryProcessor.class).to(RetryProcessorImpl.class).in(Singleton.class);
}
示例5
@Setup
public synchronized void setup() throws InterruptedException
{
for (int i = 0; i < MAX_THREAD_COUNT; i++)
{
responseQueues[i] = new OneToOneConcurrentArrayQueue<>(RESPONSE_QUEUE_CAPACITY);
}
values = new int[burstLength];
for (int i = 0; i < burstLength; i++)
{
values[i] = -(burstLength - i);
}
handler = new Handler(responseQueues);
disruptor = new Disruptor<>(
Message::new,
Configuration.SEND_QUEUE_CAPACITY,
(ThreadFactory)Thread::new,
ProducerType.MULTI,
new BusySpinWaitStrategy());
disruptor.handleEventsWith(handler);
disruptor.start();
handler.waitForStart();
}
示例6
@Test
public void test_All_WaitStrategies() {
assertTrue(WaitStrategyType.BLOCKING.instance() instanceof BlockingWaitStrategy);
assertTrue(WaitStrategyType.BUSY_SPIN.instance() instanceof BusySpinWaitStrategy);
assertTrue(WaitStrategyType.LITE_BLOCKING.instance() instanceof LiteBlockingWaitStrategy);
assertTrue(WaitStrategyType.SLEEPING_WAIT.instance() instanceof SleepingWaitStrategy);
assertTrue(WaitStrategyType.YIELDING.instance() instanceof YieldingWaitStrategy);
}
示例7
@Before
public void setUp() throws Exception {
waitStrategy = new BusySpinWaitStrategy();
}