Java源码示例:org.apache.beam.sdk.util.Sleeper

示例1
private void execute(Retriable retriable, FluentBackoff backoff) throws Exception {
  Sleeper sleeper = Sleeper.DEFAULT;
  BackOff backOff = backoff.backoff();
  while (true) {
    try {
      retriable.execute();
      break;
    } catch (Exception e) {
      if (retriable.isExceptionRetriable(e) && BackOffUtils.next(sleeper, backOff)) {
        retriable.cleanUpAfterFailure();
      } else {
        throw e;
      }
    }
  }
}
 
示例2
@Override
public boolean matchesSafely(List<ShardedFile> outputFiles) {
  try {
    // Load output data
    List<String> outputLines = new ArrayList<>();
    for (ShardedFile outputFile : outputFiles) {
      outputLines.addAll(
          outputFile.readFilesWithRetries(Sleeper.DEFAULT, BACK_OFF_FACTORY.backoff()));
    }

    // Since the windowing is nondeterministic we only check the sums
    actualCounts = new TreeMap<>();
    for (String line : outputLines) {
      String[] splits = line.split(": ", -1);
      String word = splits[0];
      long count = Long.parseLong(splits[1]);
      actualCounts.merge(word, count, (a, b) -> a + b);
    }

    return actualCounts.equals(expectedWordCounts);
  } catch (Exception e) {
    throw new RuntimeException(
        String.format("Failed to read from sharded output: %s due to exception", outputFiles),
        e);
  }
}
 
示例3
@Test
public void testInvalidJson() throws Exception {
  File deadLetterFile = new File(tempFolder.getRoot(), "dead-letter-file");
  Files.write(
      tempFolder.newFile("test.json").toPath(), INVALID_JSON_TEXT.getBytes(Charsets.UTF_8));

  BeamSqlEnv env = BeamSqlEnv.inMemory(new TextTableProvider());
  env.executeDdl(
      String.format(
          "CREATE EXTERNAL TABLE test %s TYPE text LOCATION '%s/*' "
              + "TBLPROPERTIES '{\"format\":\"json\", \"deadLetterFile\": \"%s\"}'",
          SQL_JSON_SCHEMA, tempFolder.getRoot(), deadLetterFile.getAbsoluteFile()));

  PCollection<Row> rows =
      BeamSqlRelUtils.toPCollection(pipeline, env.parseQuery("SELECT * FROM test"));

  PAssert.that(rows).empty();

  pipeline.run();
  assertThat(
      new NumberedShardedFile(deadLetterFile.getAbsoluteFile() + "*")
          .readFilesWithRetries(Sleeper.DEFAULT, BackOff.STOP_BACKOFF),
      containsInAnyOrder(INVALID_JSON_TEXT));
}
 
示例4
@Test
public void testWriteLines() throws Exception {
  File destinationFile = new File(tempFolder.getRoot(), "lines-outputs");
  BeamSqlEnv env = BeamSqlEnv.inMemory(new TextTableProvider());
  env.executeDdl(
      String.format(
          "CREATE EXTERNAL TABLE test %s TYPE text LOCATION '%s' TBLPROPERTIES '{\"format\":\"lines\"}'",
          SQL_LINES_SCHEMA, destinationFile.getAbsolutePath()));

  BeamSqlRelUtils.toPCollection(
      pipeline, env.parseQuery("INSERT INTO test VALUES ('hello'), ('goodbye')"));
  pipeline.run();

  assertThat(
      new NumberedShardedFile(destinationFile.getAbsolutePath() + "*")
          .readFilesWithRetries(Sleeper.DEFAULT, BackOff.STOP_BACKOFF),
      containsInAnyOrder("hello", "goodbye"));
}
 
示例5
@Test
public void testWriteCsv() throws Exception {
  File destinationFile = new File(tempFolder.getRoot(), "csv-outputs");
  BeamSqlEnv env = BeamSqlEnv.inMemory(new TextTableProvider());

  // NumberedShardedFile
  env.executeDdl(
      String.format(
          "CREATE EXTERNAL TABLE test %s TYPE text LOCATION '%s' TBLPROPERTIES '{\"format\":\"csv\"}'",
          SQL_CSV_SCHEMA, destinationFile.getAbsolutePath()));

  BeamSqlRelUtils.toPCollection(
      pipeline, env.parseQuery("INSERT INTO test VALUES ('hello', 42), ('goodbye', 13)"));
  pipeline.run();

  assertThat(
      new NumberedShardedFile(destinationFile.getAbsolutePath() + "*")
          .readFilesWithRetries(Sleeper.DEFAULT, BackOff.STOP_BACKOFF),
      containsInAnyOrder("hello,42", "goodbye,13"));
}
 
示例6
@Test
public void testWriteJson() throws Exception {
  File destinationFile = new File(tempFolder.getRoot(), "json-outputs");
  BeamSqlEnv env = BeamSqlEnv.inMemory(new TextTableProvider());
  env.executeDdl(
      String.format(
          "CREATE EXTERNAL TABLE test %s TYPE text LOCATION '%s' TBLPROPERTIES '{\"format\":\"json\"}'",
          SQL_JSON_SCHEMA, destinationFile.getAbsolutePath()));

  BeamSqlRelUtils.toPCollection(
      pipeline, env.parseQuery("INSERT INTO test(name, age) VALUES ('Jack', 13)"));
  pipeline.run();

  assertThat(
      new NumberedShardedFile(destinationFile.getAbsolutePath() + "*")
          .readFilesWithRetries(Sleeper.DEFAULT, BackOff.STOP_BACKOFF),
      containsInAnyOrder(JSON_TEXT));
}
 
示例7
/**
 * Computes a checksum of the given sharded file. Not safe to call until the writing is complete.
 */
private String getActualChecksum(ShardedFile shardedFile) {
  // Load output data
  List<String> outputs;
  try {
    outputs = shardedFile.readFilesWithRetries(Sleeper.DEFAULT, BACK_OFF_FACTORY.backoff());
  } catch (Exception e) {
    throw new RuntimeException(String.format("Failed to read from: %s", shardedFile), e);
  }

  // Verify outputs. Checksum is computed using SHA-1 algorithm
  actualChecksum = computeHash(outputs);
  LOG.debug("Generated checksum: {}", actualChecksum);

  return actualChecksum;
}
 
示例8
private RunQueryResponse runQueryWithRetries(RunQueryRequest request) throws Exception {
  Sleeper sleeper = Sleeper.DEFAULT;
  BackOff backoff = RUNQUERY_BACKOFF.backoff();
  while (true) {
    try {
      RunQueryResponse response = datastore.runQuery(request);
      rpcSuccesses.inc();
      return response;
    } catch (DatastoreException exception) {
      rpcErrors.inc();

      if (NON_RETRYABLE_ERRORS.contains(exception.getCode())) {
        throw exception;
      }
      if (!BackOffUtils.next(sleeper, backoff)) {
        LOG.error("Aborting after {} retries.", MAX_RETRIES);
        throw exception;
      }
    }
  }
}
 
示例9
@VisibleForTesting
static void processWork(
    DataflowWorkerHarnessOptions pipelineOptions,
    final BatchDataflowWorker worker,
    Sleeper sleeper)
    throws InterruptedException {
  int numThreads = chooseNumberOfThreads(pipelineOptions);
  ExecutorService executor = pipelineOptions.getExecutorService();
  final List<Callable<Boolean>> tasks = new ArrayList<>();

  LOG.debug("Starting {} worker threads", numThreads);
  // We start the appropriate number of threads.
  for (int i = 0; i < numThreads; ++i) {
    tasks.add(new WorkerThread(worker, sleeper));
  }

  LOG.debug("Waiting for {} worker threads", numThreads);
  // We wait forever unless there is a big problem.
  executor.invokeAll(tasks);
  LOG.error("All threads died.");
}
 
示例10
private PubsubHelper(PubsubClient pubsubClient, String project) {
  this.pubsubClient = pubsubClient;
  this.project = project;
  createdTopics = new ArrayList<>();
  createdSubscriptions = new ArrayList<>();
  sleeper = Sleeper.DEFAULT;
  backOff =
      FluentBackoff.DEFAULT
          .withInitialBackoff(Duration.standardSeconds(1))
          .withMaxRetries(3)
          .backoff();
}
 
示例11
private void flushBatch() throws DatastoreException, IOException, InterruptedException {
  LOG.info("Writing batch of {} entities", entities.size());
  Sleeper sleeper = Sleeper.DEFAULT;
  BackOff backoff =
      FluentBackoff.DEFAULT
          .withMaxRetries(MAX_RETRIES)
          .withInitialBackoff(INITIAL_BACKOFF)
          .backoff();

  while (true) {
    // Batch mutate entities.
    try {
      CommitRequest.Builder commitRequest = CommitRequest.newBuilder();
      for (Entity entity : entities) {
        commitRequest.addMutations(mutationBuilder.apply(entity));
      }
      commitRequest.setMode(CommitRequest.Mode.NON_TRANSACTIONAL);
      datastore.commit(commitRequest.build());
      // Break if the commit threw no exception.
      break;
    } catch (DatastoreException exception) {
      LOG.error(
          "Error writing to the Datastore ({}): {}",
          exception.getCode(),
          exception.getMessage());
      if (!BackOffUtils.next(sleeper, backoff)) {
        LOG.error("Aborting after {} retries.", MAX_RETRIES);
        throw exception;
      }
    }
  }
  LOG.info("Successfully wrote {} entities", entities.size());
  entities.clear();
}
 
示例12
private void flush() throws Exception {
  BackOff backOff = retryBackoff.backoff();
  int attempt = 0;

  if (buffer.isEmpty()) {
    return;
  }

  batchSize.update(buffer.size());

  while (true) {
    try (ClickHouseStatement statement = connection.createStatement()) {
      statement.sendRowBinaryStream(
          insertSql(schema(), table()),
          stream -> {
            for (Row row : buffer) {
              ClickHouseWriter.writeRow(stream, schema(), row);
            }
          });
      buffer.clear();
      break;
    } catch (SQLException e) {
      if (!BackOffUtils.next(Sleeper.DEFAULT, backOff)) {
        throw e;
      } else {
        retries.inc();
        LOG.warn(String.format(RETRY_ATTEMPT_LOG, attempt), e);
        attempt++;
      }
    }
  }
}
 
示例13
public List<Shard> listShards(final String streamName) throws TransientKinesisException {
  return wrapExceptions(
      () -> {
        List<Shard> shards = Lists.newArrayList();
        String lastShardId = null;

        // DescribeStream has limits that can be hit fairly easily if we are attempting
        // to configure multiple KinesisIO inputs in the same account. Retry up to
        // LIST_SHARDS_DESCRIBE_STREAM_MAX_ATTEMPTS times if we end up hitting that limit.
        //
        // Only pass the wrapped exception up once that limit is reached. Use FluentBackoff
        // to implement the retry policy.
        FluentBackoff retryBackoff =
            FluentBackoff.DEFAULT
                .withMaxRetries(LIST_SHARDS_DESCRIBE_STREAM_MAX_ATTEMPTS)
                .withInitialBackoff(LIST_SHARDS_DESCRIBE_STREAM_INITIAL_BACKOFF);
        StreamDescription description = null;
        do {
          BackOff backoff = retryBackoff.backoff();
          Sleeper sleeper = Sleeper.DEFAULT;
          while (true) {
            try {
              description =
                  kinesis.describeStream(streamName, lastShardId).getStreamDescription();
              break;
            } catch (LimitExceededException exc) {
              if (!BackOffUtils.next(sleeper, backoff)) {
                throw exc;
              }
            }
          }

          shards.addAll(description.getShards());
          lastShardId = shards.get(shards.size() - 1).getShardId();
        } while (description.getHasMoreShards());

        return shards;
      });
}
 
示例14
@ProcessElement
public void processElement(ProcessContext context) throws Exception {
  PublishRequest request = context.element();
  Sleeper sleeper = Sleeper.DEFAULT;
  BackOff backoff = retryBackoff.backoff();
  int attempt = 0;
  while (true) {
    attempt++;
    try {
      PublishResult pr = producer.publish(request);
      context.output(pr);
      break;
    } catch (Exception ex) {
      // Fail right away if there is no retry configuration
      if (spec.getRetryConfiguration() == null
          || !spec.getRetryConfiguration().getRetryPredicate().test(ex)) {
        SNS_WRITE_FAILURES.inc();
        LOG.info("Unable to publish message {} due to {} ", request.getMessage(), ex);
        throw new IOException("Error writing to SNS (no attempt made to retry)", ex);
      }

      if (!BackOffUtils.next(sleeper, backoff)) {
        throw new IOException(
            String.format(
                "Error writing to SNS after %d attempt(s). No more attempts allowed",
                attempt),
            ex);
      } else {
        // Note: this used in test cases to verify behavior
        LOG.warn(String.format(RETRY_ATTEMPT_LOG, attempt), ex);
      }
    }
  }
}
 
示例15
private void executeBatch() throws SQLException, IOException, InterruptedException {
  if (records.isEmpty()) {
    return;
  }
  Sleeper sleeper = Sleeper.DEFAULT;
  BackOff backoff = retryBackOff.backoff();
  while (true) {
    try (PreparedStatement preparedStatement =
        connection.prepareStatement(spec.getStatement().get())) {
      try {
        // add each record in the statement batch
        for (T record : records) {
          processRecord(record, preparedStatement);
        }
        // execute the batch
        preparedStatement.executeBatch();
        // commit the changes
        connection.commit();
        break;
      } catch (SQLException exception) {
        if (!spec.getRetryStrategy().apply(exception)) {
          throw exception;
        }
        LOG.warn("Deadlock detected, retrying", exception);
        // clean up the statement batch and the connection state
        preparedStatement.clearBatch();
        connection.rollback();
        if (!BackOffUtils.next(sleeper, backoff)) {
          // we tried the max number of times
          throw exception;
        }
      }
    }
  }
  records.clear();
}
 
示例16
@ProcessElement
public void processElement(ProcessContext context) throws Exception {
  PublishRequest request =
      (PublishRequest) spec.getPublishRequestFn().apply(context.element());
  Sleeper sleeper = Sleeper.DEFAULT;
  BackOff backoff = retryBackoff.backoff();
  int attempt = 0;
  while (true) {
    attempt++;
    try {
      PublishResponse pr = producer.publish(request);
      context.output(pr);
      break;
    } catch (Exception ex) {
      // Fail right away if there is no retry configuration
      if (spec.getRetryConfiguration() == null
          || !spec.getRetryConfiguration().getRetryPredicate().test(ex)) {
        SNS_WRITE_FAILURES.inc();
        LOG.info("Unable to publish message {} due to {} ", request.message(), ex);
        throw new IOException("Error writing to SNS (no attempt made to retry)", ex);
      }

      if (!BackOffUtils.next(sleeper, backoff)) {
        throw new IOException(
            String.format(
                "Error writing to SNS after %d attempt(s). No more attempts allowed",
                attempt),
            ex);
      } else {
        // Note: this used in test cases to verify behavior
        LOG.warn(String.format(RETRY_ATTEMPT_LOG, attempt), ex);
      }
    }
  }
}
 
示例17
/** Initializes the worker and starts the actual read loop (in {@link #processWork}). */
public void run() throws InterruptedException {
  // Configure standard file systems.
  FileSystems.setDefaultPipelineOptions(pipelineOptions);

  DataflowWorkUnitClient client = new DataflowWorkUnitClient(pipelineOptions, LOG);
  BatchDataflowWorker worker =
      BatchDataflowWorker.forBatchIntrinsicWorkerHarness(client, pipelineOptions);

  worker.startStatusServer();
  processWork(pipelineOptions, worker, Sleeper.DEFAULT);
}
 
示例18
private void getConfig(String computation) {
  BackOff backoff =
      FluentBackoff.DEFAULT
          .withInitialBackoff(Duration.millis(100))
          .withMaxBackoff(Duration.standardMinutes(1))
          .withMaxCumulativeBackoff(Duration.standardMinutes(5))
          .backoff();
  while (running.get()) {
    try {
      if (windmillServiceEnabled) {
        getConfigFromDataflowService(computation);
      } else {
        getConfigFromWindmill(computation);
      }
      return;
    } catch (IllegalArgumentException | IOException e) {
      LOG.warn("Error fetching config: ", e);
      try {
        if (!BackOffUtils.next(Sleeper.DEFAULT, backoff)) {
          return;
        }
      } catch (IOException ioe) {
        LOG.warn("Error backing off, will not retry: ", ioe);
        return;
      } catch (InterruptedException ie) {
        Thread.currentThread().interrupt();
        return;
      }
    }
  }
}
 
示例19
private <ResponseT> ResponseT callWithBackoff(Supplier<ResponseT> function) {
  BackOff backoff = grpcBackoff();
  int rpcErrors = 0;
  while (true) {
    try {
      return function.get();
    } catch (StatusRuntimeException e) {
      try {
        if (++rpcErrors % 20 == 0) {
          LOG.warn(
              "Many exceptions calling gRPC. Last exception: {} with status {}",
              e,
              e.getStatus());
        }
        if (!BackOffUtils.next(Sleeper.DEFAULT, backoff)) {
          throw new WindmillServerStub.RpcException(e);
        }
      } catch (IOException | InterruptedException i) {
        if (i instanceof InterruptedException) {
          Thread.currentThread().interrupt();
        }
        WindmillServerStub.RpcException rpcException = new WindmillServerStub.RpcException(e);
        rpcException.addSuppressed(i);
        throw rpcException;
      }
    }
  }
}
 
示例20
private void testWindowedWordCountPipeline(WindowedWordCountITOptions options) throws Exception {

    ResourceId output = FileBasedSink.convertToFileResourceIfPossible(options.getOutput());
    PerWindowFiles filenamePolicy = new PerWindowFiles(output);

    List<ShardedFile> expectedOutputFiles = Lists.newArrayListWithCapacity(6);

    for (int startMinute : ImmutableList.of(0, 10, 20, 30, 40, 50)) {
      final Instant windowStart =
          new Instant(options.getMinTimestampMillis()).plus(Duration.standardMinutes(startMinute));
      String filePrefix =
          filenamePolicy.filenamePrefixForWindow(
              new IntervalWindow(windowStart, windowStart.plus(Duration.standardMinutes(10))));
      expectedOutputFiles.add(
          new NumberedShardedFile(
              output
                      .getCurrentDirectory()
                      .resolve(filePrefix, StandardResolveOptions.RESOLVE_FILE)
                      .toString()
                  + "*"));
    }

    ShardedFile inputFile = new ExplicitShardedFile(Collections.singleton(options.getInputFile()));

    // For this integration test, input is tiny and we can build the expected counts
    SortedMap<String, Long> expectedWordCounts = new TreeMap<>();
    for (String line :
        inputFile.readFilesWithRetries(Sleeper.DEFAULT, BACK_OFF_FACTORY.backoff())) {
      String[] words = line.split(ExampleUtils.TOKENIZER_PATTERN, -1);

      for (String word : words) {
        if (!word.isEmpty()) {
          expectedWordCounts.put(
              word, MoreObjects.firstNonNull(expectedWordCounts.get(word), 0L) + 1L);
        }
      }
    }

    WindowedWordCount.runWindowedWordCount(options);

    assertThat(expectedOutputFiles, containsWordCounts(expectedWordCounts));
  }
 
示例21
/**
 * Writes a batch of mutations to Cloud Datastore.
 *
 * <p>If a commit fails, it will be retried up to {@link #MAX_RETRIES} times. All mutations in
 * the batch will be committed again, even if the commit was partially successful. If the retry
 * limit is exceeded, the last exception from Cloud Datastore will be thrown.
 *
 * @throws DatastoreException if the commit fails or IOException or InterruptedException if
 *     backing off between retries fails.
 */
private void flushBatch() throws DatastoreException, IOException, InterruptedException {
  LOG.debug("Writing batch of {} mutations", mutations.size());
  Sleeper sleeper = Sleeper.DEFAULT;
  BackOff backoff = BUNDLE_WRITE_BACKOFF.backoff();

  while (true) {
    // Batch upsert entities.
    CommitRequest.Builder commitRequest = CommitRequest.newBuilder();
    commitRequest.addAllMutations(mutations);
    commitRequest.setMode(CommitRequest.Mode.NON_TRANSACTIONAL);
    long startTime = System.currentTimeMillis(), endTime;

    if (throttler.throttleRequest(startTime)) {
      LOG.info("Delaying request due to previous failures");
      throttledSeconds.inc(WriteBatcherImpl.DATASTORE_BATCH_TARGET_LATENCY_MS / 1000);
      sleeper.sleep(WriteBatcherImpl.DATASTORE_BATCH_TARGET_LATENCY_MS);
      continue;
    }

    try {
      datastore.commit(commitRequest.build());
      endTime = System.currentTimeMillis();

      writeBatcher.addRequestLatency(endTime, endTime - startTime, mutations.size());
      throttler.successfulRequest(startTime);
      rpcSuccesses.inc();

      // Break if the commit threw no exception.
      break;
    } catch (DatastoreException exception) {
      if (exception.getCode() == Code.DEADLINE_EXCEEDED) {
        /* Most errors are not related to request size, and should not change our expectation of
         * the latency of successful requests. DEADLINE_EXCEEDED can be taken into
         * consideration, though. */
        endTime = System.currentTimeMillis();
        writeBatcher.addRequestLatency(endTime, endTime - startTime, mutations.size());
      }
      // Only log the code and message for potentially-transient errors. The entire exception
      // will be propagated upon the last retry.
      LOG.error(
          "Error writing batch of {} mutations to Datastore ({}): {}",
          mutations.size(),
          exception.getCode(),
          exception.getMessage());
      rpcErrors.inc();

      if (NON_RETRYABLE_ERRORS.contains(exception.getCode())) {
        throw exception;
      }
      if (!BackOffUtils.next(sleeper, backoff)) {
        LOG.error("Aborting after {} retries.", MAX_RETRIES);
        throw exception;
      }
    }
  }
  LOG.debug("Successfully wrote {} mutations", mutations.size());
  mutations.clear();
  mutationsSize = 0;
}
 
示例22
@Test
public void deadlineExceededRetries() throws InterruptedException {
  List<Mutation> mutationList = Arrays.asList(m((long) 1));

  // mock sleeper so that it does not actually sleep.
  WriteToSpannerFn.sleeper = Mockito.mock(Sleeper.class);

  // respond with 2 timeouts and a success.
  when(serviceFactory.mockDatabaseClient().writeAtLeastOnce(any()))
      .thenThrow(
          SpannerExceptionFactory.newSpannerException(
              ErrorCode.DEADLINE_EXCEEDED, "simulated Timeout 1"))
      .thenThrow(
          SpannerExceptionFactory.newSpannerException(
              ErrorCode.DEADLINE_EXCEEDED, "simulated Timeout 2"))
      .thenReturn(Timestamp.now());

  SpannerWriteResult result =
      pipeline
          .apply(Create.of(mutationList))
          .apply(
              SpannerIO.write()
                  .withProjectId("test-project")
                  .withInstanceId("test-instance")
                  .withDatabaseId("test-database")
                  .withServiceFactory(serviceFactory)
                  .withBatchSizeBytes(0)
                  .withFailureMode(SpannerIO.FailureMode.REPORT_FAILURES));

  // all success, so veryify no errors
  PAssert.that(result.getFailedMutations())
      .satisfies(
          m -> {
            assertEquals(0, Iterables.size(m));
            return null;
          });
  pipeline.run().waitUntilFinish();

  // 2 calls to sleeper
  verify(WriteToSpannerFn.sleeper, times(2)).sleep(anyLong());
  // 3 write attempts for the single mutationGroup.
  verify(serviceFactory.mockDatabaseClient(), times(3)).writeAtLeastOnce(any());
}
 
示例23
@Test
public void deadlineExceededFailsAfterRetries() throws InterruptedException {
  List<Mutation> mutationList = Arrays.asList(m((long) 1));

  // mock sleeper so that it does not actually sleep.
  WriteToSpannerFn.sleeper = Mockito.mock(Sleeper.class);

  // respond with all timeouts.
  when(serviceFactory.mockDatabaseClient().writeAtLeastOnce(any()))
      .thenThrow(
          SpannerExceptionFactory.newSpannerException(
              ErrorCode.DEADLINE_EXCEEDED, "simulated Timeout"));

  SpannerWriteResult result =
      pipeline
          .apply(Create.of(mutationList))
          .apply(
              SpannerIO.write()
                  .withProjectId("test-project")
                  .withInstanceId("test-instance")
                  .withDatabaseId("test-database")
                  .withServiceFactory(serviceFactory)
                  .withBatchSizeBytes(0)
                  .withMaxCumulativeBackoff(Duration.standardHours(2))
                  .withFailureMode(SpannerIO.FailureMode.REPORT_FAILURES));

  // One error
  PAssert.that(result.getFailedMutations())
      .satisfies(
          m -> {
            assertEquals(1, Iterables.size(m));
            return null;
          });
  pipeline.run().waitUntilFinish();

  // Due to jitter in backoff algorithm, we cannot test for an exact number of retries,
  // but there will be more than 16 (normally 18).
  int numSleeps = Mockito.mockingDetails(WriteToSpannerFn.sleeper).getInvocations().size();
  assertTrue(String.format("Should be least 16 sleeps, got %d", numSleeps), numSleeps > 16);
  long totalSleep =
      Mockito.mockingDetails(WriteToSpannerFn.sleeper).getInvocations().stream()
          .mapToLong(i -> i.getArgument(0))
          .reduce(0L, Long::sum);

  // Total sleep should be greater then 2x maxCumulativeBackoff: 120m,
  // because the batch is repeated inidividually due REPORT_FAILURES.
  assertTrue(
      String.format("Should be least 7200s of sleep, got %d", totalSleep),
      totalSleep >= Duration.standardHours(2).getMillis());

  // Number of write attempts should be numSleeps + 2 write attempts:
  //      1 batch attempt, numSleeps/2 batch retries,
  // then 1 individual attempt + numSleeps/2 individual retries
  verify(serviceFactory.mockDatabaseClient(), times(numSleeps + 2)).writeAtLeastOnce(any());
}
 
示例24
@Test
public void retryOnSchemaChangeException() throws InterruptedException {
  List<Mutation> mutationList = Arrays.asList(m((long) 1));

  String errString =
      "Transaction aborted. "
          + "Database schema probably changed during transaction, retry may succeed.";

  // mock sleeper so that it does not actually sleep.
  WriteToSpannerFn.sleeper = Mockito.mock(Sleeper.class);

  // respond with 2 timeouts and a success.
  when(serviceFactory.mockDatabaseClient().writeAtLeastOnce(any()))
      .thenThrow(SpannerExceptionFactory.newSpannerException(ErrorCode.ABORTED, errString))
      .thenThrow(SpannerExceptionFactory.newSpannerException(ErrorCode.ABORTED, errString))
      .thenReturn(Timestamp.now());

  SpannerWriteResult result =
      pipeline
          .apply(Create.of(mutationList))
          .apply(
              SpannerIO.write()
                  .withProjectId("test-project")
                  .withInstanceId("test-instance")
                  .withDatabaseId("test-database")
                  .withServiceFactory(serviceFactory)
                  .withBatchSizeBytes(0)
                  .withFailureMode(FailureMode.FAIL_FAST));

  // all success, so veryify no errors
  PAssert.that(result.getFailedMutations())
      .satisfies(
          m -> {
            assertEquals(0, Iterables.size(m));
            return null;
          });
  pipeline.run().waitUntilFinish();

  // 0 calls to sleeper
  verify(WriteToSpannerFn.sleeper, times(0)).sleep(anyLong());
  // 3 write attempts for the single mutationGroup.
  verify(serviceFactory.mockDatabaseClient(), times(3)).writeAtLeastOnce(any());
}
 
示例25
@Test
public void retryMaxOnSchemaChangeException() throws InterruptedException {
  List<Mutation> mutationList = Arrays.asList(m((long) 1));

  String errString =
      "Transaction aborted. "
          + "Database schema probably changed during transaction, retry may succeed.";

  // mock sleeper so that it does not actually sleep.
  WriteToSpannerFn.sleeper = Mockito.mock(Sleeper.class);

  // Respond with Aborted transaction
  when(serviceFactory.mockDatabaseClient().writeAtLeastOnce(any()))
      .thenThrow(SpannerExceptionFactory.newSpannerException(ErrorCode.ABORTED, errString));

  // When spanner aborts transaction for more than 5 time, pipeline execution stops with
  // PipelineExecutionException
  thrown.expect(PipelineExecutionException.class);
  thrown.expectMessage(errString);

  SpannerWriteResult result =
      pipeline
          .apply(Create.of(mutationList))
          .apply(
              SpannerIO.write()
                  .withProjectId("test-project")
                  .withInstanceId("test-instance")
                  .withDatabaseId("test-database")
                  .withServiceFactory(serviceFactory)
                  .withBatchSizeBytes(0)
                  .withFailureMode(FailureMode.FAIL_FAST));

  // One error
  PAssert.that(result.getFailedMutations())
      .satisfies(
          m -> {
            assertEquals(1, Iterables.size(m));
            return null;
          });
  pipeline.run().waitUntilFinish();

  // 0 calls to sleeper
  verify(WriteToSpannerFn.sleeper, times(0)).sleep(anyLong());
  // 5 write attempts for the single mutationGroup.
  verify(serviceFactory.mockDatabaseClient(), times(5)).writeAtLeastOnce(any());
}
 
示例26
@Test
public void retryOnAbortedAndDeadlineExceeded() throws InterruptedException {
  List<Mutation> mutationList = Arrays.asList(m((long) 1));

  String errString =
      "Transaction aborted. "
          + "Database schema probably changed during transaction, retry may succeed.";

  // mock sleeper so that it does not actually sleep.
  WriteToSpannerFn.sleeper = Mockito.mock(Sleeper.class);

  // Respond with (1) Aborted transaction a couple of times (2) deadline exceeded
  // (3) Aborted transaction 3 times (4)  deadline exceeded and finally return success.
  when(serviceFactory.mockDatabaseClient().writeAtLeastOnce(any()))
      .thenThrow(SpannerExceptionFactory.newSpannerException(ErrorCode.ABORTED, errString))
      .thenThrow(SpannerExceptionFactory.newSpannerException(ErrorCode.ABORTED, errString))
      .thenThrow(
          SpannerExceptionFactory.newSpannerException(
              ErrorCode.DEADLINE_EXCEEDED, "simulated Timeout 1"))
      .thenThrow(SpannerExceptionFactory.newSpannerException(ErrorCode.ABORTED, errString))
      .thenThrow(SpannerExceptionFactory.newSpannerException(ErrorCode.ABORTED, errString))
      .thenThrow(SpannerExceptionFactory.newSpannerException(ErrorCode.ABORTED, errString))
      .thenThrow(
          SpannerExceptionFactory.newSpannerException(
              ErrorCode.DEADLINE_EXCEEDED, "simulated Timeout 2"))
      .thenReturn(Timestamp.now());

  SpannerWriteResult result =
      pipeline
          .apply(Create.of(mutationList))
          .apply(
              SpannerIO.write()
                  .withProjectId("test-project")
                  .withInstanceId("test-instance")
                  .withDatabaseId("test-database")
                  .withServiceFactory(serviceFactory)
                  .withBatchSizeBytes(0)
                  .withFailureMode(FailureMode.FAIL_FAST));

  // Zero error
  PAssert.that(result.getFailedMutations())
      .satisfies(
          m -> {
            assertEquals(0, Iterables.size(m));
            return null;
          });
  pipeline.run().waitUntilFinish();

  // 2 calls to sleeper
  verify(WriteToSpannerFn.sleeper, times(2)).sleep(anyLong());
  // 8 write attempts for the single mutationGroup.
  verify(serviceFactory.mockDatabaseClient(), times(8)).writeAtLeastOnce(any());
}
 
示例27
@BeforeClass
public static void setup() throws IOException, InterruptedException {
  // network sharing doesn't work with ClassRule
  network = Network.newNetwork();

  zookeeper =
      new GenericContainer<>("zookeeper:3.4.13")
          .withStartupAttempts(10)
          .withExposedPorts(2181)
          .withNetwork(network)
          .withNetworkAliases("zookeeper");

  // so far zookeeper container always starts successfully, so no extra retries
  zookeeper.start();

  clickHouse =
      (ClickHouseContainer)
          new ClickHouseContainer(CLICKHOUSE_IMAGE)
              .withStartupAttempts(10)
              .withCreateContainerCmdModifier(
                  // type inference for `(CreateContainerCmd) -> cmd.` doesn't work
                  cmd ->
                      ((CreateContainerCmd) cmd)
                          .withMemory(256 * 1024 * 1024L)
                          .withMemorySwap(4L * 1024 * 1024 * 1024L))
              .withNetwork(network)
              .withClasspathResourceMapping(
                  "config.d/zookeeper_default.xml",
                  "/etc/clickhouse-server/config.d/zookeeper_default.xml",
                  BindMode.READ_ONLY);

  BackOff backOff =
      FluentBackoff.DEFAULT
          .withMaxRetries(3)
          .withInitialBackoff(Duration.standardSeconds(15))
          .backoff();

  // try to start clickhouse-server a couple of times, see BEAM-6639
  while (true) {
    try {
      Unreliables.retryUntilSuccess(
          10,
          () -> {
            DockerClientFactory.instance()
                .checkAndPullImage(DockerClientFactory.instance().client(), CLICKHOUSE_IMAGE);

            return null;
          });

      clickHouse.start();
      break;
    } catch (Exception e) {
      if (!BackOffUtils.next(Sleeper.DEFAULT, backOff)) {
        throw e;
      } else {
        List<Image> images =
            DockerClientFactory.instance().client().listImagesCmd().withShowAll(true).exec();
        String listImagesOutput = "listImagesCmd:\n" + Joiner.on('\n').join(images) + "\n";

        LOG.warn("failed to start clickhouse-server\n\n" + listImagesOutput, e);
      }
    }
  }
}
 
示例28
private void flushBatch() throws IOException, InterruptedException {
  if (batch.isEmpty()) {
    return;
  }

  try {
    // Since each element is a KV<tableName, writeRequest> in the batch, we need to group them
    // by tableName
    Map<String, List<WriteRequest>> mapTableRequest =
        batch.stream()
            .collect(
                Collectors.groupingBy(
                    KV::getKey, Collectors.mapping(KV::getValue, Collectors.toList())));

    BatchWriteItemRequest batchRequest = new BatchWriteItemRequest();
    mapTableRequest
        .entrySet()
        .forEach(
            entry -> batchRequest.addRequestItemsEntry(entry.getKey(), entry.getValue()));

    Sleeper sleeper = Sleeper.DEFAULT;
    BackOff backoff = retryBackoff.backoff();
    int attempt = 0;
    while (true) {
      attempt++;
      try {
        client.batchWriteItem(batchRequest);
        break;
      } catch (Exception ex) {
        // Fail right away if there is no retry configuration
        if (spec.getRetryConfiguration() == null
            || !spec.getRetryConfiguration().getRetryPredicate().test(ex)) {
          DYNAMO_DB_WRITE_FAILURES.inc();
          LOG.info(
              "Unable to write batch items {} due to {} ",
              batchRequest.getRequestItems().entrySet(),
              ex);
          throw new IOException("Error writing to DynamoDB (no attempt made to retry)", ex);
        }

        if (!BackOffUtils.next(sleeper, backoff)) {
          throw new IOException(
              String.format(
                  "Error writing to DynamoDB after %d attempt(s). No more attempts allowed",
                  attempt),
              ex);
        } else {
          // Note: this used in test cases to verify behavior
          LOG.warn(String.format(RETRY_ATTEMPT_LOG, attempt), ex);
        }
      }
    }
  } finally {
    batch.clear();
  }
}
 
示例29
private void flushBatch() throws IOException, InterruptedException {
  if (batch.isEmpty()) {
    return;
  }

  try {
    // Since each element is a KV<tableName, writeRequest> in the batch, we need to group them
    // by tableName
    Map<String, List<WriteRequest>> mapTableRequest =
        batch.stream()
            .collect(
                Collectors.groupingBy(
                    KV::getKey, Collectors.mapping(KV::getValue, Collectors.toList())));

    BatchWriteItemRequest batchRequest =
        BatchWriteItemRequest.builder().requestItems(mapTableRequest).build();

    Sleeper sleeper = Sleeper.DEFAULT;
    BackOff backoff = retryBackoff.backoff();
    int attempt = 0;
    while (true) {
      attempt++;
      try {
        client.batchWriteItem(batchRequest);
        break;
      } catch (Exception ex) {
        // Fail right away if there is no retry configuration
        if (spec.getRetryConfiguration() == null
            || !spec.getRetryConfiguration().getRetryPredicate().test(ex)) {
          DYNAMO_DB_WRITE_FAILURES.inc();
          LOG.info(
              "Unable to write batch items {} due to {} ",
              batchRequest.requestItems().entrySet(),
              ex);
          throw new IOException("Error writing to DynamoDB (no attempt made to retry)", ex);
        }

        if (!BackOffUtils.next(sleeper, backoff)) {
          throw new IOException(
              String.format(
                  "Error writing to DynamoDB after %d attempt(s). No more attempts allowed",
                  attempt),
              ex);
        } else {
          // Note: this used in test cases to verify behavior
          LOG.warn(String.format(RETRY_ATTEMPT_LOG, attempt), ex);
        }
      }
    }
  } finally {
    batch.clear();
  }
}
 
示例30
private void flushBatch() throws IOException, InterruptedException {
  if (batch.isEmpty()) {
    return;
  }
  try {
    UpdateRequest updateRequest = new UpdateRequest();
    updateRequest.add(batch);

    Sleeper sleeper = Sleeper.DEFAULT;
    BackOff backoff = retryBackoff.backoff();
    int attempt = 0;
    while (true) {
      attempt++;
      try {
        solrClient.process(spec.getCollection(), updateRequest);
        break;
      } catch (Exception exception) {

        // fail immediately if no retry configuration doesn't handle this
        if (spec.getRetryConfiguration() == null
            || !spec.getRetryConfiguration().getRetryPredicate().test(exception)) {
          throw new IOException(
              "Error writing to Solr (no attempt made to retry)", exception);
        }

        // see if we can pause and try again
        if (!BackOffUtils.next(sleeper, backoff)) {
          throw new IOException(
              String.format(
                  "Error writing to Solr after %d attempt(s). No more attempts allowed",
                  attempt),
              exception);

        } else {
          // Note: this used in test cases to verify behavior
          LOG.warn(String.format(RETRY_ATTEMPT_LOG, attempt), exception);
        }
      }
    }
  } finally {
    batch.clear();
  }
}