Java源码示例:org.apache.beam.sdk.util.Sleeper
示例1
private void execute(Retriable retriable, FluentBackoff backoff) throws Exception {
Sleeper sleeper = Sleeper.DEFAULT;
BackOff backOff = backoff.backoff();
while (true) {
try {
retriable.execute();
break;
} catch (Exception e) {
if (retriable.isExceptionRetriable(e) && BackOffUtils.next(sleeper, backOff)) {
retriable.cleanUpAfterFailure();
} else {
throw e;
}
}
}
}
示例2
@Override
public boolean matchesSafely(List<ShardedFile> outputFiles) {
try {
// Load output data
List<String> outputLines = new ArrayList<>();
for (ShardedFile outputFile : outputFiles) {
outputLines.addAll(
outputFile.readFilesWithRetries(Sleeper.DEFAULT, BACK_OFF_FACTORY.backoff()));
}
// Since the windowing is nondeterministic we only check the sums
actualCounts = new TreeMap<>();
for (String line : outputLines) {
String[] splits = line.split(": ", -1);
String word = splits[0];
long count = Long.parseLong(splits[1]);
actualCounts.merge(word, count, (a, b) -> a + b);
}
return actualCounts.equals(expectedWordCounts);
} catch (Exception e) {
throw new RuntimeException(
String.format("Failed to read from sharded output: %s due to exception", outputFiles),
e);
}
}
示例3
@Test
public void testInvalidJson() throws Exception {
File deadLetterFile = new File(tempFolder.getRoot(), "dead-letter-file");
Files.write(
tempFolder.newFile("test.json").toPath(), INVALID_JSON_TEXT.getBytes(Charsets.UTF_8));
BeamSqlEnv env = BeamSqlEnv.inMemory(new TextTableProvider());
env.executeDdl(
String.format(
"CREATE EXTERNAL TABLE test %s TYPE text LOCATION '%s/*' "
+ "TBLPROPERTIES '{\"format\":\"json\", \"deadLetterFile\": \"%s\"}'",
SQL_JSON_SCHEMA, tempFolder.getRoot(), deadLetterFile.getAbsoluteFile()));
PCollection<Row> rows =
BeamSqlRelUtils.toPCollection(pipeline, env.parseQuery("SELECT * FROM test"));
PAssert.that(rows).empty();
pipeline.run();
assertThat(
new NumberedShardedFile(deadLetterFile.getAbsoluteFile() + "*")
.readFilesWithRetries(Sleeper.DEFAULT, BackOff.STOP_BACKOFF),
containsInAnyOrder(INVALID_JSON_TEXT));
}
示例4
@Test
public void testWriteLines() throws Exception {
File destinationFile = new File(tempFolder.getRoot(), "lines-outputs");
BeamSqlEnv env = BeamSqlEnv.inMemory(new TextTableProvider());
env.executeDdl(
String.format(
"CREATE EXTERNAL TABLE test %s TYPE text LOCATION '%s' TBLPROPERTIES '{\"format\":\"lines\"}'",
SQL_LINES_SCHEMA, destinationFile.getAbsolutePath()));
BeamSqlRelUtils.toPCollection(
pipeline, env.parseQuery("INSERT INTO test VALUES ('hello'), ('goodbye')"));
pipeline.run();
assertThat(
new NumberedShardedFile(destinationFile.getAbsolutePath() + "*")
.readFilesWithRetries(Sleeper.DEFAULT, BackOff.STOP_BACKOFF),
containsInAnyOrder("hello", "goodbye"));
}
示例5
@Test
public void testWriteCsv() throws Exception {
File destinationFile = new File(tempFolder.getRoot(), "csv-outputs");
BeamSqlEnv env = BeamSqlEnv.inMemory(new TextTableProvider());
// NumberedShardedFile
env.executeDdl(
String.format(
"CREATE EXTERNAL TABLE test %s TYPE text LOCATION '%s' TBLPROPERTIES '{\"format\":\"csv\"}'",
SQL_CSV_SCHEMA, destinationFile.getAbsolutePath()));
BeamSqlRelUtils.toPCollection(
pipeline, env.parseQuery("INSERT INTO test VALUES ('hello', 42), ('goodbye', 13)"));
pipeline.run();
assertThat(
new NumberedShardedFile(destinationFile.getAbsolutePath() + "*")
.readFilesWithRetries(Sleeper.DEFAULT, BackOff.STOP_BACKOFF),
containsInAnyOrder("hello,42", "goodbye,13"));
}
示例6
@Test
public void testWriteJson() throws Exception {
File destinationFile = new File(tempFolder.getRoot(), "json-outputs");
BeamSqlEnv env = BeamSqlEnv.inMemory(new TextTableProvider());
env.executeDdl(
String.format(
"CREATE EXTERNAL TABLE test %s TYPE text LOCATION '%s' TBLPROPERTIES '{\"format\":\"json\"}'",
SQL_JSON_SCHEMA, destinationFile.getAbsolutePath()));
BeamSqlRelUtils.toPCollection(
pipeline, env.parseQuery("INSERT INTO test(name, age) VALUES ('Jack', 13)"));
pipeline.run();
assertThat(
new NumberedShardedFile(destinationFile.getAbsolutePath() + "*")
.readFilesWithRetries(Sleeper.DEFAULT, BackOff.STOP_BACKOFF),
containsInAnyOrder(JSON_TEXT));
}
示例7
/**
* Computes a checksum of the given sharded file. Not safe to call until the writing is complete.
*/
private String getActualChecksum(ShardedFile shardedFile) {
// Load output data
List<String> outputs;
try {
outputs = shardedFile.readFilesWithRetries(Sleeper.DEFAULT, BACK_OFF_FACTORY.backoff());
} catch (Exception e) {
throw new RuntimeException(String.format("Failed to read from: %s", shardedFile), e);
}
// Verify outputs. Checksum is computed using SHA-1 algorithm
actualChecksum = computeHash(outputs);
LOG.debug("Generated checksum: {}", actualChecksum);
return actualChecksum;
}
示例8
private RunQueryResponse runQueryWithRetries(RunQueryRequest request) throws Exception {
Sleeper sleeper = Sleeper.DEFAULT;
BackOff backoff = RUNQUERY_BACKOFF.backoff();
while (true) {
try {
RunQueryResponse response = datastore.runQuery(request);
rpcSuccesses.inc();
return response;
} catch (DatastoreException exception) {
rpcErrors.inc();
if (NON_RETRYABLE_ERRORS.contains(exception.getCode())) {
throw exception;
}
if (!BackOffUtils.next(sleeper, backoff)) {
LOG.error("Aborting after {} retries.", MAX_RETRIES);
throw exception;
}
}
}
}
示例9
@VisibleForTesting
static void processWork(
DataflowWorkerHarnessOptions pipelineOptions,
final BatchDataflowWorker worker,
Sleeper sleeper)
throws InterruptedException {
int numThreads = chooseNumberOfThreads(pipelineOptions);
ExecutorService executor = pipelineOptions.getExecutorService();
final List<Callable<Boolean>> tasks = new ArrayList<>();
LOG.debug("Starting {} worker threads", numThreads);
// We start the appropriate number of threads.
for (int i = 0; i < numThreads; ++i) {
tasks.add(new WorkerThread(worker, sleeper));
}
LOG.debug("Waiting for {} worker threads", numThreads);
// We wait forever unless there is a big problem.
executor.invokeAll(tasks);
LOG.error("All threads died.");
}
示例10
private PubsubHelper(PubsubClient pubsubClient, String project) {
this.pubsubClient = pubsubClient;
this.project = project;
createdTopics = new ArrayList<>();
createdSubscriptions = new ArrayList<>();
sleeper = Sleeper.DEFAULT;
backOff =
FluentBackoff.DEFAULT
.withInitialBackoff(Duration.standardSeconds(1))
.withMaxRetries(3)
.backoff();
}
示例11
private void flushBatch() throws DatastoreException, IOException, InterruptedException {
LOG.info("Writing batch of {} entities", entities.size());
Sleeper sleeper = Sleeper.DEFAULT;
BackOff backoff =
FluentBackoff.DEFAULT
.withMaxRetries(MAX_RETRIES)
.withInitialBackoff(INITIAL_BACKOFF)
.backoff();
while (true) {
// Batch mutate entities.
try {
CommitRequest.Builder commitRequest = CommitRequest.newBuilder();
for (Entity entity : entities) {
commitRequest.addMutations(mutationBuilder.apply(entity));
}
commitRequest.setMode(CommitRequest.Mode.NON_TRANSACTIONAL);
datastore.commit(commitRequest.build());
// Break if the commit threw no exception.
break;
} catch (DatastoreException exception) {
LOG.error(
"Error writing to the Datastore ({}): {}",
exception.getCode(),
exception.getMessage());
if (!BackOffUtils.next(sleeper, backoff)) {
LOG.error("Aborting after {} retries.", MAX_RETRIES);
throw exception;
}
}
}
LOG.info("Successfully wrote {} entities", entities.size());
entities.clear();
}
示例12
private void flush() throws Exception {
BackOff backOff = retryBackoff.backoff();
int attempt = 0;
if (buffer.isEmpty()) {
return;
}
batchSize.update(buffer.size());
while (true) {
try (ClickHouseStatement statement = connection.createStatement()) {
statement.sendRowBinaryStream(
insertSql(schema(), table()),
stream -> {
for (Row row : buffer) {
ClickHouseWriter.writeRow(stream, schema(), row);
}
});
buffer.clear();
break;
} catch (SQLException e) {
if (!BackOffUtils.next(Sleeper.DEFAULT, backOff)) {
throw e;
} else {
retries.inc();
LOG.warn(String.format(RETRY_ATTEMPT_LOG, attempt), e);
attempt++;
}
}
}
}
示例13
public List<Shard> listShards(final String streamName) throws TransientKinesisException {
return wrapExceptions(
() -> {
List<Shard> shards = Lists.newArrayList();
String lastShardId = null;
// DescribeStream has limits that can be hit fairly easily if we are attempting
// to configure multiple KinesisIO inputs in the same account. Retry up to
// LIST_SHARDS_DESCRIBE_STREAM_MAX_ATTEMPTS times if we end up hitting that limit.
//
// Only pass the wrapped exception up once that limit is reached. Use FluentBackoff
// to implement the retry policy.
FluentBackoff retryBackoff =
FluentBackoff.DEFAULT
.withMaxRetries(LIST_SHARDS_DESCRIBE_STREAM_MAX_ATTEMPTS)
.withInitialBackoff(LIST_SHARDS_DESCRIBE_STREAM_INITIAL_BACKOFF);
StreamDescription description = null;
do {
BackOff backoff = retryBackoff.backoff();
Sleeper sleeper = Sleeper.DEFAULT;
while (true) {
try {
description =
kinesis.describeStream(streamName, lastShardId).getStreamDescription();
break;
} catch (LimitExceededException exc) {
if (!BackOffUtils.next(sleeper, backoff)) {
throw exc;
}
}
}
shards.addAll(description.getShards());
lastShardId = shards.get(shards.size() - 1).getShardId();
} while (description.getHasMoreShards());
return shards;
});
}
示例14
@ProcessElement
public void processElement(ProcessContext context) throws Exception {
PublishRequest request = context.element();
Sleeper sleeper = Sleeper.DEFAULT;
BackOff backoff = retryBackoff.backoff();
int attempt = 0;
while (true) {
attempt++;
try {
PublishResult pr = producer.publish(request);
context.output(pr);
break;
} catch (Exception ex) {
// Fail right away if there is no retry configuration
if (spec.getRetryConfiguration() == null
|| !spec.getRetryConfiguration().getRetryPredicate().test(ex)) {
SNS_WRITE_FAILURES.inc();
LOG.info("Unable to publish message {} due to {} ", request.getMessage(), ex);
throw new IOException("Error writing to SNS (no attempt made to retry)", ex);
}
if (!BackOffUtils.next(sleeper, backoff)) {
throw new IOException(
String.format(
"Error writing to SNS after %d attempt(s). No more attempts allowed",
attempt),
ex);
} else {
// Note: this used in test cases to verify behavior
LOG.warn(String.format(RETRY_ATTEMPT_LOG, attempt), ex);
}
}
}
}
示例15
private void executeBatch() throws SQLException, IOException, InterruptedException {
if (records.isEmpty()) {
return;
}
Sleeper sleeper = Sleeper.DEFAULT;
BackOff backoff = retryBackOff.backoff();
while (true) {
try (PreparedStatement preparedStatement =
connection.prepareStatement(spec.getStatement().get())) {
try {
// add each record in the statement batch
for (T record : records) {
processRecord(record, preparedStatement);
}
// execute the batch
preparedStatement.executeBatch();
// commit the changes
connection.commit();
break;
} catch (SQLException exception) {
if (!spec.getRetryStrategy().apply(exception)) {
throw exception;
}
LOG.warn("Deadlock detected, retrying", exception);
// clean up the statement batch and the connection state
preparedStatement.clearBatch();
connection.rollback();
if (!BackOffUtils.next(sleeper, backoff)) {
// we tried the max number of times
throw exception;
}
}
}
}
records.clear();
}
示例16
@ProcessElement
public void processElement(ProcessContext context) throws Exception {
PublishRequest request =
(PublishRequest) spec.getPublishRequestFn().apply(context.element());
Sleeper sleeper = Sleeper.DEFAULT;
BackOff backoff = retryBackoff.backoff();
int attempt = 0;
while (true) {
attempt++;
try {
PublishResponse pr = producer.publish(request);
context.output(pr);
break;
} catch (Exception ex) {
// Fail right away if there is no retry configuration
if (spec.getRetryConfiguration() == null
|| !spec.getRetryConfiguration().getRetryPredicate().test(ex)) {
SNS_WRITE_FAILURES.inc();
LOG.info("Unable to publish message {} due to {} ", request.message(), ex);
throw new IOException("Error writing to SNS (no attempt made to retry)", ex);
}
if (!BackOffUtils.next(sleeper, backoff)) {
throw new IOException(
String.format(
"Error writing to SNS after %d attempt(s). No more attempts allowed",
attempt),
ex);
} else {
// Note: this used in test cases to verify behavior
LOG.warn(String.format(RETRY_ATTEMPT_LOG, attempt), ex);
}
}
}
}
示例17
/** Initializes the worker and starts the actual read loop (in {@link #processWork}). */
public void run() throws InterruptedException {
// Configure standard file systems.
FileSystems.setDefaultPipelineOptions(pipelineOptions);
DataflowWorkUnitClient client = new DataflowWorkUnitClient(pipelineOptions, LOG);
BatchDataflowWorker worker =
BatchDataflowWorker.forBatchIntrinsicWorkerHarness(client, pipelineOptions);
worker.startStatusServer();
processWork(pipelineOptions, worker, Sleeper.DEFAULT);
}
示例18
private void getConfig(String computation) {
BackOff backoff =
FluentBackoff.DEFAULT
.withInitialBackoff(Duration.millis(100))
.withMaxBackoff(Duration.standardMinutes(1))
.withMaxCumulativeBackoff(Duration.standardMinutes(5))
.backoff();
while (running.get()) {
try {
if (windmillServiceEnabled) {
getConfigFromDataflowService(computation);
} else {
getConfigFromWindmill(computation);
}
return;
} catch (IllegalArgumentException | IOException e) {
LOG.warn("Error fetching config: ", e);
try {
if (!BackOffUtils.next(Sleeper.DEFAULT, backoff)) {
return;
}
} catch (IOException ioe) {
LOG.warn("Error backing off, will not retry: ", ioe);
return;
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
return;
}
}
}
}
示例19
private <ResponseT> ResponseT callWithBackoff(Supplier<ResponseT> function) {
BackOff backoff = grpcBackoff();
int rpcErrors = 0;
while (true) {
try {
return function.get();
} catch (StatusRuntimeException e) {
try {
if (++rpcErrors % 20 == 0) {
LOG.warn(
"Many exceptions calling gRPC. Last exception: {} with status {}",
e,
e.getStatus());
}
if (!BackOffUtils.next(Sleeper.DEFAULT, backoff)) {
throw new WindmillServerStub.RpcException(e);
}
} catch (IOException | InterruptedException i) {
if (i instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
WindmillServerStub.RpcException rpcException = new WindmillServerStub.RpcException(e);
rpcException.addSuppressed(i);
throw rpcException;
}
}
}
}
示例20
private void testWindowedWordCountPipeline(WindowedWordCountITOptions options) throws Exception {
ResourceId output = FileBasedSink.convertToFileResourceIfPossible(options.getOutput());
PerWindowFiles filenamePolicy = new PerWindowFiles(output);
List<ShardedFile> expectedOutputFiles = Lists.newArrayListWithCapacity(6);
for (int startMinute : ImmutableList.of(0, 10, 20, 30, 40, 50)) {
final Instant windowStart =
new Instant(options.getMinTimestampMillis()).plus(Duration.standardMinutes(startMinute));
String filePrefix =
filenamePolicy.filenamePrefixForWindow(
new IntervalWindow(windowStart, windowStart.plus(Duration.standardMinutes(10))));
expectedOutputFiles.add(
new NumberedShardedFile(
output
.getCurrentDirectory()
.resolve(filePrefix, StandardResolveOptions.RESOLVE_FILE)
.toString()
+ "*"));
}
ShardedFile inputFile = new ExplicitShardedFile(Collections.singleton(options.getInputFile()));
// For this integration test, input is tiny and we can build the expected counts
SortedMap<String, Long> expectedWordCounts = new TreeMap<>();
for (String line :
inputFile.readFilesWithRetries(Sleeper.DEFAULT, BACK_OFF_FACTORY.backoff())) {
String[] words = line.split(ExampleUtils.TOKENIZER_PATTERN, -1);
for (String word : words) {
if (!word.isEmpty()) {
expectedWordCounts.put(
word, MoreObjects.firstNonNull(expectedWordCounts.get(word), 0L) + 1L);
}
}
}
WindowedWordCount.runWindowedWordCount(options);
assertThat(expectedOutputFiles, containsWordCounts(expectedWordCounts));
}
示例21
/**
* Writes a batch of mutations to Cloud Datastore.
*
* <p>If a commit fails, it will be retried up to {@link #MAX_RETRIES} times. All mutations in
* the batch will be committed again, even if the commit was partially successful. If the retry
* limit is exceeded, the last exception from Cloud Datastore will be thrown.
*
* @throws DatastoreException if the commit fails or IOException or InterruptedException if
* backing off between retries fails.
*/
private void flushBatch() throws DatastoreException, IOException, InterruptedException {
LOG.debug("Writing batch of {} mutations", mutations.size());
Sleeper sleeper = Sleeper.DEFAULT;
BackOff backoff = BUNDLE_WRITE_BACKOFF.backoff();
while (true) {
// Batch upsert entities.
CommitRequest.Builder commitRequest = CommitRequest.newBuilder();
commitRequest.addAllMutations(mutations);
commitRequest.setMode(CommitRequest.Mode.NON_TRANSACTIONAL);
long startTime = System.currentTimeMillis(), endTime;
if (throttler.throttleRequest(startTime)) {
LOG.info("Delaying request due to previous failures");
throttledSeconds.inc(WriteBatcherImpl.DATASTORE_BATCH_TARGET_LATENCY_MS / 1000);
sleeper.sleep(WriteBatcherImpl.DATASTORE_BATCH_TARGET_LATENCY_MS);
continue;
}
try {
datastore.commit(commitRequest.build());
endTime = System.currentTimeMillis();
writeBatcher.addRequestLatency(endTime, endTime - startTime, mutations.size());
throttler.successfulRequest(startTime);
rpcSuccesses.inc();
// Break if the commit threw no exception.
break;
} catch (DatastoreException exception) {
if (exception.getCode() == Code.DEADLINE_EXCEEDED) {
/* Most errors are not related to request size, and should not change our expectation of
* the latency of successful requests. DEADLINE_EXCEEDED can be taken into
* consideration, though. */
endTime = System.currentTimeMillis();
writeBatcher.addRequestLatency(endTime, endTime - startTime, mutations.size());
}
// Only log the code and message for potentially-transient errors. The entire exception
// will be propagated upon the last retry.
LOG.error(
"Error writing batch of {} mutations to Datastore ({}): {}",
mutations.size(),
exception.getCode(),
exception.getMessage());
rpcErrors.inc();
if (NON_RETRYABLE_ERRORS.contains(exception.getCode())) {
throw exception;
}
if (!BackOffUtils.next(sleeper, backoff)) {
LOG.error("Aborting after {} retries.", MAX_RETRIES);
throw exception;
}
}
}
LOG.debug("Successfully wrote {} mutations", mutations.size());
mutations.clear();
mutationsSize = 0;
}
示例22
@Test
public void deadlineExceededRetries() throws InterruptedException {
List<Mutation> mutationList = Arrays.asList(m((long) 1));
// mock sleeper so that it does not actually sleep.
WriteToSpannerFn.sleeper = Mockito.mock(Sleeper.class);
// respond with 2 timeouts and a success.
when(serviceFactory.mockDatabaseClient().writeAtLeastOnce(any()))
.thenThrow(
SpannerExceptionFactory.newSpannerException(
ErrorCode.DEADLINE_EXCEEDED, "simulated Timeout 1"))
.thenThrow(
SpannerExceptionFactory.newSpannerException(
ErrorCode.DEADLINE_EXCEEDED, "simulated Timeout 2"))
.thenReturn(Timestamp.now());
SpannerWriteResult result =
pipeline
.apply(Create.of(mutationList))
.apply(
SpannerIO.write()
.withProjectId("test-project")
.withInstanceId("test-instance")
.withDatabaseId("test-database")
.withServiceFactory(serviceFactory)
.withBatchSizeBytes(0)
.withFailureMode(SpannerIO.FailureMode.REPORT_FAILURES));
// all success, so veryify no errors
PAssert.that(result.getFailedMutations())
.satisfies(
m -> {
assertEquals(0, Iterables.size(m));
return null;
});
pipeline.run().waitUntilFinish();
// 2 calls to sleeper
verify(WriteToSpannerFn.sleeper, times(2)).sleep(anyLong());
// 3 write attempts for the single mutationGroup.
verify(serviceFactory.mockDatabaseClient(), times(3)).writeAtLeastOnce(any());
}
示例23
@Test
public void deadlineExceededFailsAfterRetries() throws InterruptedException {
List<Mutation> mutationList = Arrays.asList(m((long) 1));
// mock sleeper so that it does not actually sleep.
WriteToSpannerFn.sleeper = Mockito.mock(Sleeper.class);
// respond with all timeouts.
when(serviceFactory.mockDatabaseClient().writeAtLeastOnce(any()))
.thenThrow(
SpannerExceptionFactory.newSpannerException(
ErrorCode.DEADLINE_EXCEEDED, "simulated Timeout"));
SpannerWriteResult result =
pipeline
.apply(Create.of(mutationList))
.apply(
SpannerIO.write()
.withProjectId("test-project")
.withInstanceId("test-instance")
.withDatabaseId("test-database")
.withServiceFactory(serviceFactory)
.withBatchSizeBytes(0)
.withMaxCumulativeBackoff(Duration.standardHours(2))
.withFailureMode(SpannerIO.FailureMode.REPORT_FAILURES));
// One error
PAssert.that(result.getFailedMutations())
.satisfies(
m -> {
assertEquals(1, Iterables.size(m));
return null;
});
pipeline.run().waitUntilFinish();
// Due to jitter in backoff algorithm, we cannot test for an exact number of retries,
// but there will be more than 16 (normally 18).
int numSleeps = Mockito.mockingDetails(WriteToSpannerFn.sleeper).getInvocations().size();
assertTrue(String.format("Should be least 16 sleeps, got %d", numSleeps), numSleeps > 16);
long totalSleep =
Mockito.mockingDetails(WriteToSpannerFn.sleeper).getInvocations().stream()
.mapToLong(i -> i.getArgument(0))
.reduce(0L, Long::sum);
// Total sleep should be greater then 2x maxCumulativeBackoff: 120m,
// because the batch is repeated inidividually due REPORT_FAILURES.
assertTrue(
String.format("Should be least 7200s of sleep, got %d", totalSleep),
totalSleep >= Duration.standardHours(2).getMillis());
// Number of write attempts should be numSleeps + 2 write attempts:
// 1 batch attempt, numSleeps/2 batch retries,
// then 1 individual attempt + numSleeps/2 individual retries
verify(serviceFactory.mockDatabaseClient(), times(numSleeps + 2)).writeAtLeastOnce(any());
}
示例24
@Test
public void retryOnSchemaChangeException() throws InterruptedException {
List<Mutation> mutationList = Arrays.asList(m((long) 1));
String errString =
"Transaction aborted. "
+ "Database schema probably changed during transaction, retry may succeed.";
// mock sleeper so that it does not actually sleep.
WriteToSpannerFn.sleeper = Mockito.mock(Sleeper.class);
// respond with 2 timeouts and a success.
when(serviceFactory.mockDatabaseClient().writeAtLeastOnce(any()))
.thenThrow(SpannerExceptionFactory.newSpannerException(ErrorCode.ABORTED, errString))
.thenThrow(SpannerExceptionFactory.newSpannerException(ErrorCode.ABORTED, errString))
.thenReturn(Timestamp.now());
SpannerWriteResult result =
pipeline
.apply(Create.of(mutationList))
.apply(
SpannerIO.write()
.withProjectId("test-project")
.withInstanceId("test-instance")
.withDatabaseId("test-database")
.withServiceFactory(serviceFactory)
.withBatchSizeBytes(0)
.withFailureMode(FailureMode.FAIL_FAST));
// all success, so veryify no errors
PAssert.that(result.getFailedMutations())
.satisfies(
m -> {
assertEquals(0, Iterables.size(m));
return null;
});
pipeline.run().waitUntilFinish();
// 0 calls to sleeper
verify(WriteToSpannerFn.sleeper, times(0)).sleep(anyLong());
// 3 write attempts for the single mutationGroup.
verify(serviceFactory.mockDatabaseClient(), times(3)).writeAtLeastOnce(any());
}
示例25
@Test
public void retryMaxOnSchemaChangeException() throws InterruptedException {
List<Mutation> mutationList = Arrays.asList(m((long) 1));
String errString =
"Transaction aborted. "
+ "Database schema probably changed during transaction, retry may succeed.";
// mock sleeper so that it does not actually sleep.
WriteToSpannerFn.sleeper = Mockito.mock(Sleeper.class);
// Respond with Aborted transaction
when(serviceFactory.mockDatabaseClient().writeAtLeastOnce(any()))
.thenThrow(SpannerExceptionFactory.newSpannerException(ErrorCode.ABORTED, errString));
// When spanner aborts transaction for more than 5 time, pipeline execution stops with
// PipelineExecutionException
thrown.expect(PipelineExecutionException.class);
thrown.expectMessage(errString);
SpannerWriteResult result =
pipeline
.apply(Create.of(mutationList))
.apply(
SpannerIO.write()
.withProjectId("test-project")
.withInstanceId("test-instance")
.withDatabaseId("test-database")
.withServiceFactory(serviceFactory)
.withBatchSizeBytes(0)
.withFailureMode(FailureMode.FAIL_FAST));
// One error
PAssert.that(result.getFailedMutations())
.satisfies(
m -> {
assertEquals(1, Iterables.size(m));
return null;
});
pipeline.run().waitUntilFinish();
// 0 calls to sleeper
verify(WriteToSpannerFn.sleeper, times(0)).sleep(anyLong());
// 5 write attempts for the single mutationGroup.
verify(serviceFactory.mockDatabaseClient(), times(5)).writeAtLeastOnce(any());
}
示例26
@Test
public void retryOnAbortedAndDeadlineExceeded() throws InterruptedException {
List<Mutation> mutationList = Arrays.asList(m((long) 1));
String errString =
"Transaction aborted. "
+ "Database schema probably changed during transaction, retry may succeed.";
// mock sleeper so that it does not actually sleep.
WriteToSpannerFn.sleeper = Mockito.mock(Sleeper.class);
// Respond with (1) Aborted transaction a couple of times (2) deadline exceeded
// (3) Aborted transaction 3 times (4) deadline exceeded and finally return success.
when(serviceFactory.mockDatabaseClient().writeAtLeastOnce(any()))
.thenThrow(SpannerExceptionFactory.newSpannerException(ErrorCode.ABORTED, errString))
.thenThrow(SpannerExceptionFactory.newSpannerException(ErrorCode.ABORTED, errString))
.thenThrow(
SpannerExceptionFactory.newSpannerException(
ErrorCode.DEADLINE_EXCEEDED, "simulated Timeout 1"))
.thenThrow(SpannerExceptionFactory.newSpannerException(ErrorCode.ABORTED, errString))
.thenThrow(SpannerExceptionFactory.newSpannerException(ErrorCode.ABORTED, errString))
.thenThrow(SpannerExceptionFactory.newSpannerException(ErrorCode.ABORTED, errString))
.thenThrow(
SpannerExceptionFactory.newSpannerException(
ErrorCode.DEADLINE_EXCEEDED, "simulated Timeout 2"))
.thenReturn(Timestamp.now());
SpannerWriteResult result =
pipeline
.apply(Create.of(mutationList))
.apply(
SpannerIO.write()
.withProjectId("test-project")
.withInstanceId("test-instance")
.withDatabaseId("test-database")
.withServiceFactory(serviceFactory)
.withBatchSizeBytes(0)
.withFailureMode(FailureMode.FAIL_FAST));
// Zero error
PAssert.that(result.getFailedMutations())
.satisfies(
m -> {
assertEquals(0, Iterables.size(m));
return null;
});
pipeline.run().waitUntilFinish();
// 2 calls to sleeper
verify(WriteToSpannerFn.sleeper, times(2)).sleep(anyLong());
// 8 write attempts for the single mutationGroup.
verify(serviceFactory.mockDatabaseClient(), times(8)).writeAtLeastOnce(any());
}
示例27
@BeforeClass
public static void setup() throws IOException, InterruptedException {
// network sharing doesn't work with ClassRule
network = Network.newNetwork();
zookeeper =
new GenericContainer<>("zookeeper:3.4.13")
.withStartupAttempts(10)
.withExposedPorts(2181)
.withNetwork(network)
.withNetworkAliases("zookeeper");
// so far zookeeper container always starts successfully, so no extra retries
zookeeper.start();
clickHouse =
(ClickHouseContainer)
new ClickHouseContainer(CLICKHOUSE_IMAGE)
.withStartupAttempts(10)
.withCreateContainerCmdModifier(
// type inference for `(CreateContainerCmd) -> cmd.` doesn't work
cmd ->
((CreateContainerCmd) cmd)
.withMemory(256 * 1024 * 1024L)
.withMemorySwap(4L * 1024 * 1024 * 1024L))
.withNetwork(network)
.withClasspathResourceMapping(
"config.d/zookeeper_default.xml",
"/etc/clickhouse-server/config.d/zookeeper_default.xml",
BindMode.READ_ONLY);
BackOff backOff =
FluentBackoff.DEFAULT
.withMaxRetries(3)
.withInitialBackoff(Duration.standardSeconds(15))
.backoff();
// try to start clickhouse-server a couple of times, see BEAM-6639
while (true) {
try {
Unreliables.retryUntilSuccess(
10,
() -> {
DockerClientFactory.instance()
.checkAndPullImage(DockerClientFactory.instance().client(), CLICKHOUSE_IMAGE);
return null;
});
clickHouse.start();
break;
} catch (Exception e) {
if (!BackOffUtils.next(Sleeper.DEFAULT, backOff)) {
throw e;
} else {
List<Image> images =
DockerClientFactory.instance().client().listImagesCmd().withShowAll(true).exec();
String listImagesOutput = "listImagesCmd:\n" + Joiner.on('\n').join(images) + "\n";
LOG.warn("failed to start clickhouse-server\n\n" + listImagesOutput, e);
}
}
}
}
示例28
private void flushBatch() throws IOException, InterruptedException {
if (batch.isEmpty()) {
return;
}
try {
// Since each element is a KV<tableName, writeRequest> in the batch, we need to group them
// by tableName
Map<String, List<WriteRequest>> mapTableRequest =
batch.stream()
.collect(
Collectors.groupingBy(
KV::getKey, Collectors.mapping(KV::getValue, Collectors.toList())));
BatchWriteItemRequest batchRequest = new BatchWriteItemRequest();
mapTableRequest
.entrySet()
.forEach(
entry -> batchRequest.addRequestItemsEntry(entry.getKey(), entry.getValue()));
Sleeper sleeper = Sleeper.DEFAULT;
BackOff backoff = retryBackoff.backoff();
int attempt = 0;
while (true) {
attempt++;
try {
client.batchWriteItem(batchRequest);
break;
} catch (Exception ex) {
// Fail right away if there is no retry configuration
if (spec.getRetryConfiguration() == null
|| !spec.getRetryConfiguration().getRetryPredicate().test(ex)) {
DYNAMO_DB_WRITE_FAILURES.inc();
LOG.info(
"Unable to write batch items {} due to {} ",
batchRequest.getRequestItems().entrySet(),
ex);
throw new IOException("Error writing to DynamoDB (no attempt made to retry)", ex);
}
if (!BackOffUtils.next(sleeper, backoff)) {
throw new IOException(
String.format(
"Error writing to DynamoDB after %d attempt(s). No more attempts allowed",
attempt),
ex);
} else {
// Note: this used in test cases to verify behavior
LOG.warn(String.format(RETRY_ATTEMPT_LOG, attempt), ex);
}
}
}
} finally {
batch.clear();
}
}
示例29
private void flushBatch() throws IOException, InterruptedException {
if (batch.isEmpty()) {
return;
}
try {
// Since each element is a KV<tableName, writeRequest> in the batch, we need to group them
// by tableName
Map<String, List<WriteRequest>> mapTableRequest =
batch.stream()
.collect(
Collectors.groupingBy(
KV::getKey, Collectors.mapping(KV::getValue, Collectors.toList())));
BatchWriteItemRequest batchRequest =
BatchWriteItemRequest.builder().requestItems(mapTableRequest).build();
Sleeper sleeper = Sleeper.DEFAULT;
BackOff backoff = retryBackoff.backoff();
int attempt = 0;
while (true) {
attempt++;
try {
client.batchWriteItem(batchRequest);
break;
} catch (Exception ex) {
// Fail right away if there is no retry configuration
if (spec.getRetryConfiguration() == null
|| !spec.getRetryConfiguration().getRetryPredicate().test(ex)) {
DYNAMO_DB_WRITE_FAILURES.inc();
LOG.info(
"Unable to write batch items {} due to {} ",
batchRequest.requestItems().entrySet(),
ex);
throw new IOException("Error writing to DynamoDB (no attempt made to retry)", ex);
}
if (!BackOffUtils.next(sleeper, backoff)) {
throw new IOException(
String.format(
"Error writing to DynamoDB after %d attempt(s). No more attempts allowed",
attempt),
ex);
} else {
// Note: this used in test cases to verify behavior
LOG.warn(String.format(RETRY_ATTEMPT_LOG, attempt), ex);
}
}
}
} finally {
batch.clear();
}
}
示例30
private void flushBatch() throws IOException, InterruptedException {
if (batch.isEmpty()) {
return;
}
try {
UpdateRequest updateRequest = new UpdateRequest();
updateRequest.add(batch);
Sleeper sleeper = Sleeper.DEFAULT;
BackOff backoff = retryBackoff.backoff();
int attempt = 0;
while (true) {
attempt++;
try {
solrClient.process(spec.getCollection(), updateRequest);
break;
} catch (Exception exception) {
// fail immediately if no retry configuration doesn't handle this
if (spec.getRetryConfiguration() == null
|| !spec.getRetryConfiguration().getRetryPredicate().test(exception)) {
throw new IOException(
"Error writing to Solr (no attempt made to retry)", exception);
}
// see if we can pause and try again
if (!BackOffUtils.next(sleeper, backoff)) {
throw new IOException(
String.format(
"Error writing to Solr after %d attempt(s). No more attempts allowed",
attempt),
exception);
} else {
// Note: this used in test cases to verify behavior
LOG.warn(String.format(RETRY_ATTEMPT_LOG, attempt), exception);
}
}
}
} finally {
batch.clear();
}
}