Java源码示例:org.redisson.api.BatchOptions
示例1
private void enableLocalCache(String requestId, Map<HashKey, HashValue> hashes) {
if (hashes.isEmpty()) {
return;
}
RedissonBatch publishBatch = new RedissonBatch(null, commandExecutor.getConnectionManager(), BatchOptions.defaults());
for (Entry<HashKey, HashValue> entry : hashes.entrySet()) {
String name = RedissonObject.suffixName(entry.getKey().getName(), RedissonLocalCachedMap.TOPIC_SUFFIX);
RTopicAsync topic = publishBatch.getTopic(name, LocalCachedMessageCodec.INSTANCE);
LocalCachedMapEnable msg = new LocalCachedMapEnable(requestId, entry.getValue().getKeyIds().toArray(new byte[entry.getValue().getKeyIds().size()][]));
topic.publishAsync(msg);
}
try {
publishBatch.execute();
} catch (Exception e) {
// skip it. Disabled local cache entries are enabled once reach timeout.
}
}
示例2
public RedisCommonBatchExecutor(NodeSource source, RPromise<Void> mainPromise,
ConnectionManager connectionManager, BatchOptions options, Entry entry, AtomicInteger slots) {
super(entry.isReadOnlyMode(), source, null, null, null, mainPromise, true, connectionManager, null);
this.options = options;
this.entry = entry;
this.slots = slots;
if (options.getRetryAttempts() > 0) {
this.attempts = options.getRetryAttempts();
}
if (options.getRetryInterval() > 0) {
this.retryInterval = options.getRetryInterval();
}
if (options.getResponseTimeout() > 0) {
this.responseTimeout = options.getResponseTimeout();
}
}
示例3
@Test
public void testBatch() throws InterruptedException {
RBatchReactive batch = redisson.createBatch(BatchOptions.defaults());
RBucketReactive<Object> b1 = batch.getBucket("b1");
RBucketReactive<Object> b2 = batch.getBucket("b2");
RBucketReactive<Object> b3 = batch.getBucket("b3");
b2.set(b3);
b1.set(b2);
b3.set(b1);
sync(batch.execute());
batch = redisson.createBatch(BatchOptions.defaults());
batch.getBucket("b1").get();
batch.getBucket("b2").get();
batch.getBucket("b3").get();
List<RBucketReactive> result = (List<RBucketReactive>) sync(batch.execute()).getResponses();
assertEquals("b2", result.get(0).getName());
assertEquals("b3", result.get(1).getName());
assertEquals("b1", result.get(2).getName());
}
示例4
@Test
public void testReactiveToNormal() throws InterruptedException {
RBatchReactive batch = redisson.createBatch(BatchOptions.defaults());
RBucketReactive<Object> b1 = batch.getBucket("b1");
RBucketReactive<Object> b2 = batch.getBucket("b2");
RBucketReactive<Object> b3 = batch.getBucket("b3");
b2.set(b3);
b1.set(b2);
b3.set(b1);
sync(batch.execute());
RedissonClient lredisson = Redisson.create(redisson.getConfig());
RBatch b = lredisson.createBatch(BatchOptions.defaults());
b.getBucket("b1").getAsync();
b.getBucket("b2").getAsync();
b.getBucket("b3").getAsync();
List<RBucket> result = (List<RBucket>)b.execute().getResponses();
assertEquals("b2", result.get(0).getName());
assertEquals("b3", result.get(1).getName());
assertEquals("b1", result.get(2).getName());
lredisson.shutdown();
}
示例5
@Override
public String changeSessionId() {
String oldId = delegate.getId();
String id = delegate.changeSessionId();
RBatch batch = redisson.createBatch(BatchOptions.defaults());
batch.getBucket(getExpiredKey(oldId)).remainTimeToLiveAsync();
batch.getBucket(getExpiredKey(oldId)).deleteAsync();
batch.getMap(map.getName(), map.getCodec()).readAllMapAsync();
batch.getMap(map.getName()).deleteAsync();
BatchResult<?> res = batch.execute();
List<?> list = res.getResponses();
Long remainTTL = (Long) list.get(0);
Map<String, Object> oldState = (Map<String, Object>) list.get(2);
if (remainTTL == -2) {
// Either:
// - a parallel request also invoked changeSessionId() on this session, and the
// expiredKey for oldId had been deleted
// - sessions do not expire
remainTTL = delegate.getMaxInactiveInterval().toMillis();
}
RBatch batchNew = redisson.createBatch();
batchNew.getMap(keyPrefix + id, map.getCodec()).putAllAsync(oldState);
if (remainTTL > 0) {
batchNew.getBucket(getExpiredKey(id)).setAsync("", remainTTL, TimeUnit.MILLISECONDS);
}
batchNew.execute();
map = redisson.getMap(keyPrefix + id, map.getCodec());
return id;
}
示例6
private BatchOptions createOptions() {
MasterSlaveEntry entry = commandExecutor.getConnectionManager().getEntrySet().iterator().next();
int syncSlaves = entry.getAvailableSlaves();
BatchOptions batchOptions = BatchOptions.defaults()
.syncSlaves(syncSlaves, options.getSyncTimeout(), TimeUnit.MILLISECONDS)
.responseTimeout(options.getResponseTimeout(), TimeUnit.MILLISECONDS)
.retryAttempts(options.getRetryAttempts())
.retryInterval(options.getRetryInterval(), TimeUnit.MILLISECONDS)
.executionMode(BatchOptions.ExecutionMode.IN_MEMORY_ATOMIC);
return batchOptions;
}
示例7
private RFuture<BatchResult<?>> enableLocalCacheAsync(String requestId, Map<HashKey, HashValue> hashes) {
if (hashes.isEmpty()) {
return RedissonPromise.newSucceededFuture(null);
}
RedissonBatch publishBatch = new RedissonBatch(null, commandExecutor.getConnectionManager(), BatchOptions.defaults());
for (Entry<HashKey, HashValue> entry : hashes.entrySet()) {
String name = RedissonObject.suffixName(entry.getKey().getName(), RedissonLocalCachedMap.TOPIC_SUFFIX);
RTopicAsync topic = publishBatch.getTopic(name, LocalCachedMessageCodec.INSTANCE);
LocalCachedMapEnable msg = new LocalCachedMapEnable(requestId, entry.getValue().getKeyIds().toArray(new byte[entry.getValue().getKeyIds().size()][]));
topic.publishAsync(msg);
}
return publishBatch.executeAsync();
}
示例8
@SuppressWarnings("ParameterNumber")
public BaseRedisBatchExecutor(boolean readOnlyMode, NodeSource source, Codec codec, RedisCommand<V> command,
Object[] params, RPromise<R> mainPromise, boolean ignoreRedirect,
ConnectionManager connectionManager, RedissonObjectBuilder objectBuilder,
ConcurrentMap<MasterSlaveEntry, Entry> commands,
BatchOptions options, AtomicInteger index, AtomicBoolean executed) {
super(readOnlyMode, source, codec, command, params, mainPromise, ignoreRedirect, connectionManager,
objectBuilder);
this.commands = commands;
this.options = options;
this.index = index;
this.executed = executed;
}
示例9
@SuppressWarnings("ParameterNumber")
public RedisBatchExecutor(boolean readOnlyMode, NodeSource source, Codec codec, RedisCommand<V> command,
Object[] params, RPromise<R> mainPromise, boolean ignoreRedirect, ConnectionManager connectionManager,
RedissonObjectBuilder objectBuilder, ConcurrentMap<MasterSlaveEntry, Entry> commands,
BatchOptions options, AtomicInteger index,
AtomicBoolean executed) {
super(readOnlyMode, source, codec, command, params, mainPromise, ignoreRedirect, connectionManager, objectBuilder,
commands, options, index, executed);
}
示例10
@SuppressWarnings("ParameterNumber")
public RedisQueuedBatchExecutor(boolean readOnlyMode, NodeSource source, Codec codec, RedisCommand<V> command,
Object[] params, RPromise<R> mainPromise, boolean ignoreRedirect, ConnectionManager connectionManager,
RedissonObjectBuilder objectBuilder, ConcurrentMap<MasterSlaveEntry, Entry> commands,
ConcurrentMap<MasterSlaveEntry, ConnectionEntry> connections, BatchOptions options, AtomicInteger index,
AtomicBoolean executed, AsyncSemaphore semaphore) {
super(readOnlyMode, source, codec, command, params, mainPromise, ignoreRedirect, connectionManager, objectBuilder,
commands, options, index, executed);
this.connections = connections;
this.semaphore = semaphore;
}
示例11
private CommandBatchService createCommandBatchService() {
if (commandExecutor instanceof CommandBatchService) {
return (CommandBatchService) commandExecutor;
}
MasterSlaveEntry entry = commandExecutor.getConnectionManager().getEntry(getName());
BatchOptions options = BatchOptions.defaults()
.syncSlaves(entry.getAvailableSlaves(), 1, TimeUnit.SECONDS);
return new CommandBatchService(commandExecutor.getConnectionManager(), options);
}
示例12
@Parameterized.Parameters(name= "{index} - {0}")
public static Iterable<Object[]> data() {
return Arrays.asList(new Object[][] {
{BatchOptions.defaults().executionMode(ExecutionMode.IN_MEMORY)},
{BatchOptions.defaults().executionMode(ExecutionMode.REDIS_WRITE_ATOMIC)}
});
}
示例13
@Before
public void before() throws IOException, InterruptedException {
super.before();
if (batchOptions.getExecutionMode() == ExecutionMode.IN_MEMORY) {
batchOptions = BatchOptions.defaults().executionMode(ExecutionMode.IN_MEMORY);
}
if (batchOptions.getExecutionMode() == ExecutionMode.REDIS_WRITE_ATOMIC) {
batchOptions = BatchOptions.defaults().executionMode(ExecutionMode.REDIS_WRITE_ATOMIC);
}
}
示例14
@Test
public void testConnectionLeakAfterError() throws InterruptedException {
Config config = BaseTest.createConfig();
config.useSingleServer()
.setRetryInterval(100)
.setTimeout(200)
.setConnectionMinimumIdleSize(1).setConnectionPoolSize(1);
RedissonRxClient redisson = Redisson.createRx(config);
BatchOptions batchOptions = BatchOptions.defaults().executionMode(ExecutionMode.REDIS_WRITE_ATOMIC);
RBatchRx batch = redisson.createBatch(batchOptions);
for (int i = 0; i < 25000; i++) {
batch.getBucket("test").set(123);
}
try {
sync(batch.execute());
Assert.fail();
} catch (Exception e) {
// skip
}
sync(redisson.getBucket("test3").set(4));
assertThat(sync(redisson.getBucket("test3").get())).isEqualTo(4);
batch = redisson.createBatch(batchOptions);
batch.getBucket("test1").set(1);
batch.getBucket("test2").set(2);
sync(batch.execute());
assertThat(sync(redisson.getBucket("test1").get())).isEqualTo(1);
assertThat(sync(redisson.getBucket("test2").get())).isEqualTo(2);
redisson.shutdown();
}
示例15
@SuppressFBWarnings(justification = "This is intentional to avoid unnecessary sync")
protected A getCurrentAsyncCollection() {
// avoid synchronization if unnecessary by checking for null outside
// synchronized block
if (currentAsync == null) {
synchronized (this) {
// check again within synchronized block
if (currentAsync == null) {
currentBatch = client.createBatch(BatchOptions.defaults());
currentAsync = initAsyncCollection(currentBatch, setName, codec);
}
}
}
return currentAsync;
}
示例16
@Override
public RBatch createBatch(BatchOptions options) {
return redissonClient.createBatch(options);
}
示例17
@Override
public void openPipeline() {
BatchOptions options = BatchOptions.defaults()
.executionMode(ExecutionMode.IN_MEMORY);
this.executorService = new CommandBatchService(redisson.getConnectionManager(), options);
}
示例18
@Override
public void openPipeline() {
BatchOptions options = BatchOptions.defaults()
.executionMode(ExecutionMode.IN_MEMORY);
this.executorService = new CommandBatchService(redisson.getConnectionManager(), options);
}
示例19
@Override
public void openPipeline() {
BatchOptions options = BatchOptions.defaults()
.executionMode(ExecutionMode.IN_MEMORY);
this.executorService = new CommandBatchService(redisson.getConnectionManager(), options);
}
示例20
@Override
public void openPipeline() {
BatchOptions options = BatchOptions.defaults()
.executionMode(ExecutionMode.IN_MEMORY);
this.executorService = new CommandBatchService(redisson.getConnectionManager(), options);
}
示例21
@Override
public void openPipeline() {
BatchOptions options = BatchOptions.defaults()
.executionMode(ExecutionMode.IN_MEMORY);
this.executorService = new CommandBatchService(redisson.getConnectionManager(), options);
}
示例22
@Override
public void openPipeline() {
BatchOptions options = BatchOptions.defaults()
.executionMode(ExecutionMode.IN_MEMORY);
this.executorService = new CommandBatchService(redisson.getConnectionManager(), options);
}
示例23
@Override
public void openPipeline() {
BatchOptions options = BatchOptions.defaults()
.executionMode(ExecutionMode.IN_MEMORY);
this.executorService = new CommandBatchService(redisson.getConnectionManager(), options);
}
示例24
@Override
public RFuture<Void> commitAsync() {
checkState();
checkTimeout();
BatchOptions batchOptions = createOptions();
CommandBatchService transactionExecutor = new CommandBatchService(commandExecutor.getConnectionManager(), batchOptions);
for (TransactionalOperation transactionalOperation : operations) {
transactionalOperation.commit(transactionExecutor);
}
String id = generateId();
RPromise<Void> result = new RedissonPromise<Void>();
RFuture<Map<HashKey, HashValue>> future = disableLocalCacheAsync(id, localCaches, operations);
future.onComplete((res, ex) -> {
if (ex != null) {
result.tryFailure(new TransactionException("Unable to execute transaction", ex));
return;
}
Map<HashKey, HashValue> hashes = future.getNow();
try {
checkTimeout();
} catch (TransactionTimeoutException e) {
enableLocalCacheAsync(id, hashes);
result.tryFailure(e);
return;
}
RFuture<BatchResult<?>> transactionFuture = transactionExecutor.executeAsync();
transactionFuture.onComplete((r, exc) -> {
if (exc != null) {
result.tryFailure(new TransactionException("Unable to execute transaction", exc));
return;
}
enableLocalCacheAsync(id, hashes);
executed.set(true);
result.trySuccess(null);
});
});
return result;
}
示例25
public CommandReactiveBatchService(ConnectionManager connectionManager, BatchOptions options) {
super(connectionManager);
batchService = new CommandBatchService(connectionManager, options);
}
示例26
public CommandRxBatchService(ConnectionManager connectionManager, BatchOptions options) {
super(connectionManager);
batchService = new CommandBatchService(connectionManager, options);
}
示例27
public CommandBatchService(ConnectionManager connectionManager, BatchOptions options) {
super(connectionManager);
this.options = options;
}
示例28
public BatchOptions getOptions() {
return options;
}