Java源码示例:org.apache.beam.sdk.util.UserCodeException
示例1
UserCodeMethodInvocation(
@Nullable Integer returnVarIndex,
MethodDescription targetMethod,
MethodDescription instrumentedMethod) {
this.returnVarIndex = returnVarIndex;
this.targetMethod = targetMethod;
this.instrumentedMethod = instrumentedMethod;
this.returnType = targetMethod.getReturnType().asErasure();
boolean targetMethodReturnsVoid = TypeDescription.VOID.equals(returnType);
checkArgument(
(returnVarIndex == null) == targetMethodReturnsVoid,
"returnVarIndex should be defined if and only if the target method has a return value");
try {
createUserCodeException =
new MethodDescription.ForLoadedMethod(
UserCodeException.class.getDeclaredMethod("wrap", Throwable.class));
} catch (NoSuchMethodException | SecurityException e) {
throw new RuntimeException("Unable to find UserCodeException.wrap", e);
}
}
示例2
/** @deprecated Use {@link TestPipeline} with the {@code DirectRunner}. */
@Deprecated
public void startBundle() throws Exception {
checkState(
state == State.UNINITIALIZED || state == State.BUNDLE_FINISHED,
"Wrong state during startBundle: %s",
state);
if (state == State.UNINITIALIZED) {
initializeState();
}
try {
fnInvoker.invokeStartBundle(new TestStartBundleContext());
} catch (UserCodeException e) {
unwrapUserCodeException(e);
}
state = State.BUNDLE_STARTED;
}
示例3
/** @deprecated Use {@link TestPipeline} with the {@code DirectRunner}. */
@Deprecated
public void finishBundle() throws Exception {
checkState(
state == State.BUNDLE_STARTED,
"Must be inside bundle to call finishBundle, but was: %s",
state);
try {
fnInvoker.invokeFinishBundle(new TestFinishBundleContext());
} catch (UserCodeException e) {
unwrapUserCodeException(e);
}
if (cloningBehavior == CloningBehavior.CLONE_PER_BUNDLE) {
fnInvoker.invokeTeardown();
fn = null;
fnInvoker = null;
state = State.UNINITIALIZED;
} else {
state = State.BUNDLE_FINISHED;
}
}
示例4
/**
* Runs this {@link Pipeline} using the given {@link PipelineOptions}, using the runner specified
* by the options.
*/
public PipelineResult run(PipelineOptions options) {
PipelineRunner<? extends PipelineResult> runner = PipelineRunner.fromOptions(options);
// Ensure all of the nodes are fully specified before a PipelineRunner gets access to the
// pipeline.
LOG.debug("Running {} via {}", this, runner);
try {
validate(options);
return runner.run(this);
} catch (UserCodeException e) {
// This serves to replace the stack with one that ends here and
// is caused by the caught UserCodeException, thereby splicing
// out all the stack frames in between the PipelineRunner itself
// and where the worker calls into the user's code.
throw new PipelineExecutionException(e.getCause());
}
}
示例5
@Test
public void testProcessElementException() throws Exception {
DoFnInvoker<Integer, Integer> invoker =
DoFnInvokers.invokerFor(
new DoFn<Integer, Integer>() {
@ProcessElement
public void processElement(@SuppressWarnings("unused") ProcessContext c) {
throw new IllegalArgumentException("bogus");
}
});
thrown.expect(UserCodeException.class);
thrown.expectMessage("bogus");
invoker.invokeProcessElement(
new FakeArgumentProvider<Integer, Integer>() {
@Override
public DoFn<Integer, Integer>.ProcessContext processContext(DoFn<Integer, Integer> fn) {
return null;
}
});
}
示例6
@Test
public void testPipelineSDKExceptionHandling() {
PipelineOptions options = TestPipeline.testingPipelineOptions();
options.setRunner(TestPipelineRunnerThrowingSdkException.class);
Pipeline p = Pipeline.create(options);
// Check pipeline runner correctly catches SDK errors.
try {
p.run();
fail("Should have thrown an exception.");
} catch (RuntimeException exn) {
// Make sure the exception isn't a UserCodeException.
assertThat(exn, not(instanceOf(UserCodeException.class)));
// Assert that the message is correct.
assertThat(exn.getMessage(), containsString("SDK exception"));
// RuntimeException should be IllegalStateException.
assertThat(exn, instanceOf(IllegalStateException.class));
}
}
示例7
@Test
public void testProcessElementExceptionsWrappedAsUserCodeException() {
ThrowingDoFn fn = new ThrowingDoFn();
DoFnRunner<String, String> runner =
new SimpleDoFnRunner<>(
null,
fn,
NullSideInputReader.empty(),
null,
null,
Collections.emptyList(),
mockStepContext,
null,
Collections.emptyMap(),
WindowingStrategy.of(new GlobalWindows()),
DoFnSchemaInformation.create(),
Collections.emptyMap());
thrown.expect(UserCodeException.class);
thrown.expectCause(is(fn.exceptionToThrow));
runner.processElement(WindowedValue.valueInGlobalWindow("anyValue"));
}
示例8
@Test
public void testStartBundleExceptionsWrappedAsUserCodeException() {
ThrowingDoFn fn = new ThrowingDoFn();
DoFnRunner<String, String> runner =
new SimpleDoFnRunner<>(
null,
fn,
NullSideInputReader.empty(),
null,
null,
Collections.emptyList(),
mockStepContext,
null,
Collections.emptyMap(),
WindowingStrategy.of(new GlobalWindows()),
DoFnSchemaInformation.create(),
Collections.emptyMap());
thrown.expect(UserCodeException.class);
thrown.expectCause(is(fn.exceptionToThrow));
runner.startBundle();
}
示例9
@Test
public void testFinishBundleExceptionsWrappedAsUserCodeException() {
ThrowingDoFn fn = new ThrowingDoFn();
DoFnRunner<String, String> runner =
new SimpleDoFnRunner<>(
null,
fn,
NullSideInputReader.empty(),
null,
null,
Collections.emptyList(),
mockStepContext,
null,
Collections.emptyMap(),
WindowingStrategy.of(new GlobalWindows()),
DoFnSchemaInformation.create(),
Collections.emptyMap());
thrown.expect(UserCodeException.class);
thrown.expectCause(is(fn.exceptionToThrow));
runner.finishBundle();
}
示例10
private static RuntimeException beamExceptionFrom(final Throwable e) {
// Scala doesn't declare checked exceptions in the bytecode, and the Java compiler
// won't let you catch something that is not declared, so we can't catch
// SparkException directly, instead we do an instanceof check.
if (e instanceof SparkException) {
if (e.getCause() != null && e.getCause() instanceof UserCodeException) {
UserCodeException userException = (UserCodeException) e.getCause();
return new Pipeline.PipelineExecutionException(userException.getCause());
} else if (e.getCause() != null) {
return new Pipeline.PipelineExecutionException(e.getCause());
}
}
return runtimeExceptionFrom(e);
}
示例11
private static RuntimeException beamExceptionFrom(final Throwable e) {
// Scala doesn't declare checked exceptions in the bytecode, and the Java compiler
// won't let you catch something that is not declared, so we can't catch
// SparkException directly, instead we do an instanceof check.
if (e instanceof SparkException) {
if (e.getCause() != null && e.getCause() instanceof UserCodeException) {
UserCodeException userException = (UserCodeException) e.getCause();
return new Pipeline.PipelineExecutionException(userException.getCause());
} else if (e.getCause() != null) {
return new Pipeline.PipelineExecutionException(e.getCause());
}
}
return runtimeExceptionFrom(e);
}
示例12
private ParDoEvaluator(
PushbackSideInputDoFnRunner<InputT, ?> fnRunner,
AppliedPTransform<?, ?, ?> transform,
BundleOutputManager outputManager,
DirectStepContext stepContext) {
this.fnRunner = fnRunner;
this.transform = transform;
this.outputManager = outputManager;
this.stepContext = stepContext;
this.unprocessedElements = ImmutableList.builder();
try {
fnRunner.startBundle();
} catch (Exception e) {
throw UserCodeException.wrap(e);
}
}
示例13
@Override
public TransformResult<InputT> finishBundle() {
try {
fnRunner.finishBundle();
} catch (Exception e) {
throw UserCodeException.wrap(e);
}
StepTransformResult.Builder<InputT> resultBuilder;
CopyOnAccessInMemoryStateInternals state = stepContext.commitState();
if (state != null) {
resultBuilder =
StepTransformResult.<InputT>withHold(transform, state.getEarliestWatermarkHold())
.withState(state);
} else {
resultBuilder = StepTransformResult.withoutHold(transform);
}
return resultBuilder
.addOutput(outputManager.bundles.values())
.withTimerUpdate(stepContext.getTimerUpdate())
.addUnprocessedElements(unprocessedElements.build())
.build();
}
示例14
/**
* {@inheritDoc}.
*
* <p>If the pipeline terminates abnormally by throwing an {@link Exception}, this will rethrow
* the original {@link Exception}. Future calls to {@link #getState()} will return {@link
* org.apache.beam.sdk.PipelineResult.State#FAILED}.
*/
@Override
public State waitUntilFinish(Duration duration) {
State startState = this.state;
if (!startState.isTerminal()) {
try {
state = executor.waitUntilFinish(duration);
} catch (UserCodeException uce) {
// Emulates the behavior of Pipeline#run(), where a stack trace caused by a
// UserCodeException is truncated and replaced with the stack starting at the call to
// waitToFinish
throw new Pipeline.PipelineExecutionException(uce.getCause());
} catch (Exception e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
if (e instanceof RuntimeException) {
throw (RuntimeException) e;
}
throw new RuntimeException(e);
}
}
return this.state;
}
示例15
@Override
public void processElement(WindowedValue<KV<K, Iterable<AccumT>>> element) throws Exception {
checkState(
element.getWindows().size() == 1,
"Expected inputs to %s to be in exactly one window. Got %s",
MergeAccumulatorsAndExtractOutputEvaluator.class.getSimpleName(),
element.getWindows().size());
Iterable<AccumT> inputAccumulators = element.getValue().getValue();
try {
AccumT first = combineFn.createAccumulator();
AccumT merged =
combineFn.mergeAccumulators(
Iterables.concat(
Collections.singleton(first),
inputAccumulators,
Collections.singleton(combineFn.createAccumulator())));
OutputT extracted = combineFn.extractOutput(merged);
output.add(element.withValue(KV.of(element.getValue().getKey(), extracted)));
} catch (Exception e) {
throw UserCodeException.wrap(e);
}
}
示例16
@Test(expected = UserCodeException.class)
public void test_limitCode() throws Throwable {
String command =
"from java.security import AccessControlException, Permissions, AllPermission, SecureClassLoader, CodeSource\n"
+ "from java.net import URL\n" + "import java.security\n" + "\n"
+ "class MagicClassLoader(SecureClassLoader):\n" + " def _init_(self):\n"
+ " SecureClassLoader._init_(self)\n" + " self.datamap = {}\n"
+ " self.codeSource = CodeSource(URL('file:/pwn'), None)\n" + "\n"
+ " def addClass(self, name, data):\n" + " self.datamap[name] = data\n" + "\n"
+ " def findClass(self, name):\n" + " data = self.datamap[name]\n"
+ " return self.super_defineClass(name, data, 0, len(data), self.codeSource)\n" + " \n"
+ " def getPermissions(self, codesource):\n" + " permissions = Permissions()\n"
+ " permissions.add(AllPermission())\n" + " return permissions \n" + "\n"
+ "output = input\n";
try {
execute(command);
} catch (PyException pyEx) {
assertEquals("ImportError", ((PyType) pyEx.type).getName());
assertEquals("No module named os", ((PyBaseExceptionDerived) pyEx.value).getMessage().toString());
return;
}
assertTrue(false);
}
示例17
/** Outputs the given element to the specified set of consumers wrapping any exceptions. */
private <T> void outputTo(
Collection<FnDataReceiver<WindowedValue<T>>> consumers, WindowedValue<T> output) {
if (currentWatermarkEstimator instanceof TimestampObservingWatermarkEstimator) {
((TimestampObservingWatermarkEstimator) currentWatermarkEstimator)
.observeTimestamp(output.getTimestamp());
}
try {
for (FnDataReceiver<WindowedValue<T>> consumer : consumers) {
consumer.accept(output);
}
} catch (Throwable t) {
throw UserCodeException.wrap(t);
}
}
示例18
private static void unwrapUserCodeException(UserCodeException e) throws Exception {
if (e.getCause() instanceof Exception) {
throw (Exception) e.getCause();
} else if (e.getCause() instanceof Error) {
throw (Error) e.getCause();
} else {
throw e;
}
}
示例19
public SerializableArrayViaCoder(Coder<T> elementCoder, T[] value) {
this.coder = ListCoder.of(elementCoder);
this.value = value;
try {
this.encodedValue = CoderUtils.encodeToByteArray(coder, Arrays.asList(value));
} catch (CoderException exc) {
throw UserCodeException.wrap(exc);
}
}
示例20
@Test
public void testProcessElementExceptionWithReturn() throws Exception {
thrown.expect(UserCodeException.class);
thrown.expectMessage("bogus");
DoFnInvokers.invokerFor(
new DoFn<Integer, Integer>() {
@ProcessElement
public ProcessContinuation processElement(
@SuppressWarnings("unused") ProcessContext c,
RestrictionTracker<SomeRestriction, Void> tracker) {
throw new IllegalArgumentException("bogus");
}
@GetInitialRestriction
public SomeRestriction getInitialRestriction(@Element Integer element) {
return null;
}
@NewTracker
public SomeRestrictionTracker newTracker(@Restriction SomeRestriction restriction) {
return null;
}
})
.invokeProcessElement(
new FakeArgumentProvider<Integer, Integer>() {
@Override
public DoFn.ProcessContext processContext(DoFn<Integer, Integer> doFn) {
return null; // will not be touched
}
@Override
public RestrictionTracker<?, ?> restrictionTracker() {
return null; // will not be touched
}
});
}
示例21
/** Test of Write to a non-existent table. */
@Test
public void testWriteFailureTableDoesNotExist() {
thrown.expectCause(isA(UserCodeException.class));
thrown.expectMessage(containsString("org.apache.hive.hcatalog.common.HCatException"));
thrown.expectMessage(containsString("NoSuchObjectException"));
defaultPipeline
.apply(Create.of(buildHCatRecords(TEST_RECORDS_COUNT)))
.apply(
HCatalogIO.write()
.withConfigProperties(getConfigPropertiesAsMap(service.getHiveConf()))
.withTable("myowntable"));
defaultPipeline.run();
}
示例22
@Test
public void testOnTimerExceptionsWrappedAsUserCodeException() {
ThrowingDoFn fn = new ThrowingDoFn();
DoFnRunner<String, String> runner =
new SimpleDoFnRunner<>(
null,
fn,
NullSideInputReader.empty(),
null,
null,
Collections.emptyList(),
mockStepContext,
null,
Collections.emptyMap(),
WindowingStrategy.of(new GlobalWindows()),
DoFnSchemaInformation.create(),
Collections.emptyMap());
thrown.expect(UserCodeException.class);
thrown.expectCause(is(fn.exceptionToThrow));
runner.onTimer(
TimerDeclaration.PREFIX + ThrowingDoFn.TIMER_ID,
"",
null,
GlobalWindow.INSTANCE,
new Instant(0),
new Instant(0),
TimeDomain.EVENT_TIME);
}
示例23
/**
* Demonstrates that attempting to output an element before the timestamp of the current element
* with zero {@link DoFn#getAllowedTimestampSkew() allowed timestamp skew} throws.
*/
@Test
public void testBackwardsInTimeNoSkew() {
SkewingDoFn fn = new SkewingDoFn(Duration.ZERO);
DoFnRunner<Duration, Duration> runner =
new SimpleDoFnRunner<>(
null,
fn,
NullSideInputReader.empty(),
new ListOutputManager(),
new TupleTag<>(),
Collections.emptyList(),
mockStepContext,
null,
Collections.emptyMap(),
WindowingStrategy.of(new GlobalWindows()),
DoFnSchemaInformation.create(),
Collections.emptyMap());
runner.startBundle();
// An element output at the current timestamp is fine.
runner.processElement(
WindowedValue.timestampedValueInGlobalWindow(Duration.ZERO, new Instant(0)));
thrown.expect(UserCodeException.class);
thrown.expectCause(isA(IllegalArgumentException.class));
thrown.expectMessage("must be no earlier");
thrown.expectMessage(
String.format("timestamp of the current input (%s)", new Instant(0).toString()));
thrown.expectMessage(
String.format(
"the allowed skew (%s)", PeriodFormat.getDefault().print(Duration.ZERO.toPeriod())));
// An element output before (current time - skew) is forbidden
runner.processElement(
WindowedValue.timestampedValueInGlobalWindow(Duration.millis(1L), new Instant(0)));
}
示例24
/**
* Demonstrates that attempting to output an element before the timestamp of the current element
* plus the value of {@link DoFn#getAllowedTimestampSkew()} throws, but between that value and the
* current timestamp succeeds.
*/
@Test
public void testSkew() {
SkewingDoFn fn = new SkewingDoFn(Duration.standardMinutes(10L));
DoFnRunner<Duration, Duration> runner =
new SimpleDoFnRunner<>(
null,
fn,
NullSideInputReader.empty(),
new ListOutputManager(),
new TupleTag<>(),
Collections.emptyList(),
mockStepContext,
null,
Collections.emptyMap(),
WindowingStrategy.of(new GlobalWindows()),
DoFnSchemaInformation.create(),
Collections.emptyMap());
runner.startBundle();
// Outputting between "now" and "now - allowed skew" succeeds.
runner.processElement(
WindowedValue.timestampedValueInGlobalWindow(Duration.standardMinutes(5L), new Instant(0)));
thrown.expect(UserCodeException.class);
thrown.expectCause(isA(IllegalArgumentException.class));
thrown.expectMessage("must be no earlier");
thrown.expectMessage(
String.format("timestamp of the current input (%s)", new Instant(0).toString()));
thrown.expectMessage(
String.format(
"the allowed skew (%s)",
PeriodFormat.getDefault().print(Duration.standardMinutes(10L).toPeriod())));
// Outputting before "now - allowed skew" fails.
runner.processElement(
WindowedValue.timestampedValueInGlobalWindow(Duration.standardHours(1L), new Instant(0)));
}
示例25
private ValueT toValue(WindowedValue<InputT> input) {
try {
return toValue.call(input.getValue());
} catch (Exception ex) {
throw UserCodeException.wrap(ex);
}
}
示例26
private ValueT toValue(WindowedValue<InputT> value) {
try {
return toValue.call(value.getValue());
} catch (Exception ex) {
throw UserCodeException.wrap(ex);
}
}
示例27
/**
* Some of the Beam unit tests relying on the exception message to do assertion. This function
* will find the original UserCodeException so the message will be exposed directly.
*/
private static Throwable getUserCodeException(Throwable throwable) {
Throwable t = throwable;
while (t != null) {
if (t instanceof UserCodeException) {
return t;
}
t = t.getCause();
}
return throwable;
}
示例28
/** Return the {@link WorkItemServiceState} resulting from sending an error completion status. */
public synchronized WorkItemServiceState reportError(Throwable e) throws IOException {
checkState(!finalStateSent, "cannot reportUpdates after sending a final state");
WorkItemStatus status = createStatusUpdate(true);
// TODO: Provide more structure representation of error, e.g., the serialized exception object.
// TODO: Look into moving the stack trace thinning into the client.
Throwable t = e instanceof UserCodeException ? e.getCause() : e;
Status error = new Status();
error.setCode(2); // Code.UNKNOWN. TODO: Replace with a generated definition.
// TODO: Attach the stack trace as exception details, not to the message.
String logPrefix = String.format("Failure processing work item %s", uniqueWorkId());
if (isOutOfMemoryError(t)) {
String message =
"An OutOfMemoryException occurred. Consider specifying higher memory "
+ "instances in PipelineOptions.\n";
LOG.error("{}: {}", logPrefix, message);
error.setMessage(message + DataflowWorkerLoggingHandler.formatException(t));
} else {
LOG.error(
"{}: Uncaught exception occurred during work unit execution. This will be retried.",
logPrefix,
t);
error.setMessage(DataflowWorkerLoggingHandler.formatException(t));
}
status.setErrors(ImmutableList.of(error));
return execute(status);
}
示例29
@Override
public void processElement(WindowedValue<BoundedSourceShard<OutputT>> element)
throws Exception {
BoundedSource<OutputT> source = element.getValue().getSource();
try (final BoundedReader<OutputT> reader = source.createReader(options)) {
boolean contentsRemaining = reader.start();
Future<BoundedSource<OutputT>> residualFuture = startDynamicSplitThread(source, reader);
UncommittedBundle<OutputT> output = evaluationContext.createBundle(outputPCollection);
while (contentsRemaining) {
output.add(
WindowedValue.timestampedValueInGlobalWindow(
reader.getCurrent(), reader.getCurrentTimestamp()));
contentsRemaining = reader.advance();
}
resultBuilder.addOutput(output);
try {
BoundedSource<OutputT> residual = residualFuture.get();
if (residual != null) {
resultBuilder.addUnprocessedElements(
element.withValue(BoundedSourceShard.of(residual)));
}
} catch (ExecutionException exex) {
// Un-and-rewrap the exception thrown by attempting to split
throw UserCodeException.wrap(exex.getCause());
}
}
}
示例30
@Override
public void processElement(WindowedValue<InputT> element) {
try {
Iterable<WindowedValue<InputT>> unprocessed = fnRunner.processElementInReadyWindows(element);
unprocessedElements.addAll(unprocessed);
} catch (Exception e) {
throw UserCodeException.wrap(e);
}
}