Java源码示例:org.apache.nifi.controller.ProcessorNode
示例1
private boolean isHighMemoryUtilizer(final ProcessorNode procNode) {
final Processor processor = procNode.getProcessor();
final SystemResourceConsideration consideration = processor.getClass().getAnnotation(SystemResourceConsideration.class);
if (consideration != null) {
if (SystemResource.MEMORY == consideration.resource()) {
return true;
}
}
final SystemResourceConsiderations considerations = processor.getClass().getAnnotation(SystemResourceConsiderations.class);
if (considerations != null) {
for (final SystemResourceConsideration systemResourceConsideration : considerations.value()) {
if (SystemResource.MEMORY == systemResourceConsideration.resource()) {
return true;
}
}
}
return false;
}
示例2
/**
* Returns the status history for the specified processor.
*
* @param processorId processor id
* @return status history
*/
public StatusHistoryDTO getProcessorStatusHistory(final String processorId) {
final ProcessGroup root = flowController.getGroup(flowController.getRootGroupId());
final ProcessorNode processor = root.findProcessor(processorId);
// ensure the processor was found
if (processor == null) {
throw new ResourceNotFoundException(String.format("Unable to locate processor with id '%s'.", processorId));
}
final StatusHistoryDTO statusHistory = flowController.getProcessorStatusHistory(processorId);
// if not authorized
if (!processor.isAuthorized(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser())) {
statusHistory.getComponentDetails().put(ComponentStatusRepository.COMPONENT_DETAIL_NAME, processorId);
statusHistory.getComponentDetails().put(ComponentStatusRepository.COMPONENT_DETAIL_TYPE, "Processor");
}
return statusHistory;
}
示例3
/**
* Gets the status for the specified processor.
*
* @param processorId processor id
* @return the status for the specified processor
*/
public ProcessorStatus getProcessorStatus(final String processorId) {
final ProcessGroup root = flowController.getGroup(flowController.getRootGroupId());
final ProcessorNode processor = root.findProcessor(processorId);
// ensure the processor was found
if (processor == null) {
throw new ResourceNotFoundException(String.format("Unable to locate processor with id '%s'.", processorId));
}
// calculate the process group status
final String groupId = processor.getProcessGroup().getIdentifier();
final ProcessGroupStatus processGroupStatus = flowController.getGroupStatus(groupId, NiFiUserUtils.getNiFiUser());
if (processGroupStatus == null) {
throw new ResourceNotFoundException(String.format("Unable to locate group with id '%s'.", groupId));
}
final ProcessorStatus status = processGroupStatus.getProcessorStatus().stream().filter(processorStatus -> processorId.equals(processorStatus.getId())).findFirst().orElse(null);
if (status == null) {
throw new ResourceNotFoundException(String.format("Unable to locate processor with id '%s'.", processorId));
}
return status;
}
示例4
/**
* Validates that the Processor can be stopped when @OnScheduled blocks
* indefinitely but written to react to thread interrupts
*/
@Test
public void validateProcessorCanBeStoppedWhenOnScheduledBlocksIndefinitelyInterruptable() throws Exception {
final FlowManagerAndSystemBundle fcsb = this.buildFlowControllerForTest(NiFiProperties.PROCESSOR_SCHEDULING_TIMEOUT, "5 sec");
flowManager = fcsb.getFlowManager();
ProcessGroup testGroup = flowManager.createProcessGroup(UUID.randomUUID().toString());
ProcessorNode testProcNode = flowManager.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
fcsb.getSystemBundle().getBundleDetails().getCoordinate());
testProcNode.setProperties(properties);
TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor();
// sets the scenario for the processor to run
this.blockingInterruptableOnUnschedule(testProcessor);
testProcNode.performValidation();
processScheduler.startProcessor(testProcNode, true);
assertCondition(() -> ScheduledState.RUNNING == testProcNode.getScheduledState(), SHORT_DELAY_TOLERANCE);
processScheduler.stopProcessor(testProcNode);
assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState(), MEDIUM_DELAY_TOLERANCE);
}
示例5
/**
* Returns the status history for the specified processor.
*
* @param processorId processor id
* @return status history
*/
public StatusHistoryDTO getProcessorStatusHistory(final String processorId) {
final ProcessGroup root = getRootGroup();
final ProcessorNode processor = root.findProcessor(processorId);
// ensure the processor was found
if (processor == null) {
throw new ResourceNotFoundException(String.format("Unable to locate processor with id '%s'.", processorId));
}
final boolean authorized = processor.isAuthorized(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser());
final StatusHistoryDTO statusHistory = flowController.getProcessorStatusHistory(processorId, authorized);
// if not authorized
if (!authorized) {
statusHistory.getComponentDetails().put(ComponentStatusRepository.COMPONENT_DETAIL_NAME, processorId);
statusHistory.getComponentDetails().put(ComponentStatusRepository.COMPONENT_DETAIL_TYPE, "Processor");
}
return statusHistory;
}
示例6
@Override
public void yield(final ProcessorNode procNode) {
// This exists in the ProcessScheduler so that the scheduler can take
// advantage of the fact that
// the Processor was yielded and, as a result, avoid scheduling the
// Processor to potentially run
// (thereby skipping the overhead of the Context Switches) if nothing
// can be done.
//
// We used to implement this feature by canceling all futures for the
// given Processor and
// re-submitting them with a delay. However, this became problematic,
// because we have situations where
// a Processor will wait several seconds (often 30 seconds in the case
// of a network timeout), and then yield
// the context. If this Processor has X number of threads, we end up
// submitting X new tasks while the previous
// X-1 tasks are still running. At this point, another thread could
// finish and do the same thing, resulting in
// an additional X-1 extra tasks being submitted.
//
// As a result, we simply removed this buggy implementation, as it was a
// very minor performance optimization
// that gave very bad results.
}
示例7
/**
* Validates that Processor can be stopped when @OnScheduled constantly
* fails. Basically validates that the re-try loop breaks if user initiated
* stopProcessor.
*/
@Test
public void validateProcessorCanBeStoppedWhenOnScheduledConstantlyFails() throws Exception {
final FlowManagerAndSystemBundle fcsb = this.buildFlowControllerForTest();
flowManager = fcsb.getFlowManager();
ProcessGroup testGroup = flowManager.createProcessGroup(UUID.randomUUID().toString());
ProcessorNode testProcNode = flowManager.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
fcsb.getSystemBundle().getBundleDetails().getCoordinate());
testProcNode.setProperties(properties);
TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor();
// sets the scenario for the processor to run
this.longRunningOnUnschedule(testProcessor, 100);
testProcessor.generateExceptionOnScheduled = true;
testProcessor.keepFailingOnScheduledTimes = Integer.MAX_VALUE;
testProcNode.performValidation();
processScheduler.startProcessor(testProcNode, true);
assertCondition(() -> ScheduledState.RUNNING == testProcNode.getScheduledState(), SHORT_DELAY_TOLERANCE);
processScheduler.stopProcessor(testProcNode);
assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState(), SHORT_DELAY_TOLERANCE);
}
示例8
@Test
public void testSerializationEscapingAndFiltering() throws Exception {
final ProcessorNode dummy = controller.getFlowManager().createProcessor(DummyScheduledProcessor.class.getName(),
UUID.randomUUID().toString(), systemBundle.getBundleDetails().getCoordinate());
dummy.setComments(RAW_COMMENTS);
controller.getFlowManager().getRootGroup().addProcessor(dummy);
controller.getFlowManager().getRootGroup().setVariables(Collections.singletonMap(RAW_VARIABLE_NAME, RAW_VARIABLE_VALUE));
// serialize the controller
final ByteArrayOutputStream os = new ByteArrayOutputStream();
final Document doc = serializer.transform(controller, ScheduledStateLookup.IDENTITY_LOOKUP);
serializer.serialize(doc, os);
// verify the results contain the serialized string
final String serializedFlow = os.toString(StandardCharsets.UTF_8.name());
assertTrue(serializedFlow.contains(SERIALIZED_COMMENTS));
assertFalse(serializedFlow.contains(RAW_COMMENTS));
assertTrue(serializedFlow.contains(SERIALIZED_VARIABLE_NAME));
assertFalse(serializedFlow.contains(RAW_VARIABLE_NAME));
assertTrue(serializedFlow.contains(SERIALIZED_VARIABLE_VALUE));
assertFalse(serializedFlow.contains(RAW_VARIABLE_VALUE));
assertFalse(serializedFlow.contains("\u0001"));
}
示例9
@Override
public void stopProcessor(final ProcessorNode processor) {
readLock.lock();
try {
if (!processors.containsKey(processor.getIdentifier())) {
throw new IllegalStateException("No processor with ID " + processor.getIdentifier() + " belongs to this Process Group");
}
final ScheduledState state = processor.getScheduledState();
if (state == ScheduledState.DISABLED) {
throw new IllegalStateException("Processor is disabled");
} else if (state == ScheduledState.STOPPED) {
return;
}
scheduler.stopProcessor(processor);
} finally {
readLock.unlock();
}
}
示例10
@Override
public void disableProcessor(final ProcessorNode processor) {
readLock.lock();
try {
if (!processors.containsKey(processor.getIdentifier())) {
throw new IllegalStateException("No Processor with ID " + processor.getIdentifier() + " belongs to this Process Group");
}
final ScheduledState state = processor.getScheduledState();
if (state == ScheduledState.DISABLED) {
return;
} else if (state == ScheduledState.RUNNING) {
throw new IllegalStateException("Processor is currently running");
}
scheduler.disableProcessor(processor);
} finally {
readLock.unlock();
}
}
示例11
@Test
public void validateDisableOperation() throws Exception {
final FlowManagerAndSystemBundle fcsb = this.buildFlowControllerForTest();
flowManager = fcsb.getFlowManager();
ProcessGroup testGroup = flowManager.createProcessGroup(UUID.randomUUID().toString());
final ProcessorNode testProcNode = flowManager.createProcessor(TestProcessor.class.getName(),
UUID.randomUUID().toString(), fcsb.getSystemBundle().getBundleDetails().getCoordinate());
testProcNode.setProperties(properties);
assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState());
assertCondition(() -> ScheduledState.STOPPED == testProcNode.getPhysicalScheduledState());
// validates idempotency
for (int i = 0; i < 2; i++) {
testProcNode.disable();
}
assertCondition(() -> ScheduledState.DISABLED == testProcNode.getScheduledState());
assertCondition(() -> ScheduledState.DISABLED == testProcNode.getPhysicalScheduledState());
testProcNode.performValidation();
processScheduler.startProcessor(testProcNode, true);
assertCondition(() -> ScheduledState.DISABLED == testProcNode.getPhysicalScheduledState());
}
示例12
/**
* Validates that stop calls are harmless and idempotent if processor is not
* in STARTING or RUNNING state.
*/
@Test
public void validateStopCallsAreMeaninglessIfProcessorNotStarted() throws Exception {
fc = this.buildFlowControllerForTest();
ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString());
this.setControllerRootGroup(fc, testGroup);
final ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString());
testProcNode.setProperties(properties);
TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor();
assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED);
// sets the scenario for the processor to run
int randomDelayLimit = 3000;
this.randomOnTriggerDelay(testProcessor, randomDelayLimit);
final ProcessScheduler ps = fc.getProcessScheduler();
ps.stopProcessor(testProcNode);
assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED);
assertTrue(testProcessor.operationNames.size() == 0);
}
示例13
@Override
public void yield(final ProcessorNode procNode) {
// This exists in the ProcessScheduler so that the scheduler can take
// advantage of the fact that
// the Processor was yielded and, as a result, avoid scheduling the
// Processor to potentially run
// (thereby skipping the overhead of the Context Switches) if nothing
// can be done.
//
// We used to implement this feature by canceling all futures for the
// given Processor and
// re-submitting them with a delay. However, this became problematic,
// because we have situations where
// a Processor will wait several seconds (often 30 seconds in the case
// of a network timeout), and then yield
// the context. If this Processor has X number of threads, we end up
// submitting X new tasks while the previous
// X-1 tasks are still running. At this point, another thread could
// finish and do the same thing, resulting in
// an additional X-1 extra tasks being submitted.
//
// As a result, we simply removed this buggy implementation, as it was a
// very minor performance optimization
// that gave very bad results.
}
示例14
@Test
public void testParameterCreatedWithNullValueOnImportWithSensitivePropertyReference() {
// Create a processor with a sensitive property
final ProcessorNode processor = createProcessorNode(UsernamePasswordProcessor.class);
processor.setProperties(Collections.singletonMap(UsernamePasswordProcessor.PASSWORD.getName(), "#{secret-param}"));
// Create a VersionedFlowSnapshot that contains the processor
final Parameter parameter = new Parameter(new ParameterDescriptor.Builder().name("secret-param").sensitive(true).build(), null);
final VersionedFlowSnapshot versionedFlowWithParameterReference = createFlowSnapshot(Collections.emptyList(), Collections.singletonList(processor), Collections.singleton(parameter));
// Create child group
final ProcessGroup innerGroup = getFlowController().getFlowManager().createProcessGroup("inner-group-id");
innerGroup.setName("Inner Group");
getRootGroup().addProcessGroup(innerGroup);
final ParameterReferenceManager parameterReferenceManager = new StandardParameterReferenceManager(getFlowController().getFlowManager());
final ParameterContext parameterContext = new StandardParameterContext("param-context-id", "parameter-context", parameterReferenceManager, null);
innerGroup.setParameterContext(parameterContext);
assertTrue(parameterContext.getParameters().isEmpty());
innerGroup.updateFlow(versionedFlowWithParameterReference, null, true, true, true);
final Collection<Parameter> parameters = parameterContext.getParameters().values();
assertEquals(1, parameters.size());
final Parameter firstParameter = parameters.iterator().next();
assertEquals("secret-param", firstParameter.getDescriptor().getName());
assertTrue(firstParameter.getDescriptor().isSensitive());
assertNull(firstParameter.getValue());
}
示例15
@Test
public void testCallingControllerService() throws ExecutionException, InterruptedException {
final ProcessorNode counter = createProcessorNode(ControllerServiceReferencingProcessor.class.getName());
final ControllerServiceNode serviceNode = createControllerServiceNode(CounterControllerService.class.getName());
assertSame(ValidationStatus.VALID, serviceNode.performValidation());
getFlowController().getControllerServiceProvider().enableControllerService(serviceNode).get();
counter.setAutoTerminatedRelationships(Collections.singleton(REL_SUCCESS));
counter.setProperties(Collections.singletonMap("Counter Service", serviceNode.getIdentifier()));
triggerOnce(counter);
assertEquals(1, ((Counter) serviceNode.getControllerServiceImplementation()).getValue());
}
示例16
@Test
public void testSearchBasedOnBackPressure() {
// given
final ProcessorNode processor1 = getProcessorNode("processor1", "processor1Name", AUTHORIZED);
final ProcessorNode processor2 = getProcessorNode("processor2", "processor2Name", AUTHORIZED);
final Connection connection = getConnection("connection", "connectionName", getBasicRelationships(), processor1, processor2, AUTHORIZED);
final FlowFileQueue flowFileQueue = Mockito.mock(FlowFileQueue.class);
Mockito.when(flowFileQueue.getBackPressureDataSizeThreshold()).thenReturn("100 KB");
Mockito.when(flowFileQueue.getBackPressureObjectThreshold()).thenReturn(5L);
Mockito.when(connection.getFlowFileQueue()).thenReturn(flowFileQueue);
givenRootProcessGroup()
.withProcessor(processor1)
.withProcessor(processor2)
.withConnection(connection);
// when
whenExecuteSearch("pressure");
// then
thenResultConsists()
.ofConnection(getSimpleResultFromRoot("connection", "connectionName", "Back pressure data size: 100 KB", "Back pressure count: 5"))
.validate(results);
// when
whenExecuteSearch("back pressure");
// then
thenResultConsists()
.ofConnection(getSimpleResultFromRoot("connection", "connectionName", "Back pressure data size: 100 KB", "Back pressure count: 5"))
.validate(results);
}
示例17
@Override
public Set<ProcessorEntity> getProcessors(final String groupId) {
final Set<ProcessorNode> processors = processorDAO.getProcessors(groupId);
return processors.stream()
.map(processor -> createProcessorEntity(processor))
.collect(Collectors.toSet());
}
示例18
@Override
public PropertyDescriptorDTO getProcessorPropertyDescriptor(final String id, final String property) {
final ProcessorNode processor = processorDAO.getProcessor(id);
PropertyDescriptor descriptor = processor.getPropertyDescriptor(property);
// return an invalid descriptor if the processor doesn't support this property
if (descriptor == null) {
descriptor = new PropertyDescriptor.Builder().name(property).addValidator(Validator.INVALID).dynamic(true).build();
}
return dtoFactory.createPropertyDescriptorDto(descriptor, processor.getProcessGroup().getIdentifier());
}
示例19
public Connectable findConnectable(final String id) {
final ProcessorNode procNode = getProcessorNode(id);
if (procNode != null) {
return procNode;
}
final Port inPort = getInputPort(id);
if (inPort != null) {
return inPort;
}
final Port outPort = getOutputPort(id);
if (outPort != null) {
return outPort;
}
final Funnel funnel = getFunnel(id);
if (funnel != null) {
return funnel;
}
final RemoteGroupPort remoteGroupPort = getRootGroup().findRemoteGroupPort(id);
if (remoteGroupPort != null) {
return remoteGroupPort;
}
return null;
}
示例20
@Test
public void testChangeVersionFromParameterToExplicitValueSensitiveProperty() {
// Create a processor with a sensitive property
final ProcessorNode initialProcessor = createProcessorNode(UsernamePasswordProcessor.class);
initialProcessor.setProperties(Collections.singletonMap(UsernamePasswordProcessor.PASSWORD.getName(), "#{secret-param}"));
// Create a VersionedFlowSnapshot that contains the processor
final Parameter parameter = new Parameter(new ParameterDescriptor.Builder().name("secret-param").sensitive(true).build(), null);
final VersionedFlowSnapshot versionedFlowWithParameterReference = createFlowSnapshot(Collections.emptyList(),
Collections.singletonList(initialProcessor), Collections.singleton(parameter));
// Update processor to have an explicit value for the second version of the flow.
initialProcessor.setProperties(Collections.singletonMap(UsernamePasswordProcessor.PASSWORD.getName(), "secret-value"));
final VersionedFlowSnapshot versionedFlowExplicitValue = createFlowSnapshot(Collections.emptyList(), Collections.singletonList(initialProcessor), null);
// Create child group and update to the first version of the flow, with parameter ref
final ProcessGroup innerGroup = getFlowController().getFlowManager().createProcessGroup("inner-group-id");
innerGroup.setName("Inner Group");
getRootGroup().addProcessGroup(innerGroup);
innerGroup.updateFlow(versionedFlowWithParameterReference, null, true, true, true);
final ProcessorNode nodeInGroupWithRef = innerGroup.getProcessors().iterator().next();
assertNotNull(nodeInGroupWithRef.getProperty(UsernamePasswordProcessor.PASSWORD).getRawValue());
// Update the flow to new version that uses explicit value.
innerGroup.updateFlow(versionedFlowExplicitValue, null, true, true, true);
// Updated flow has sensitive property that no longer references parameter. Now is an explicit value, so it should be unset
final ProcessorNode nodeInGroupWithNoValue = innerGroup.getProcessors().iterator().next();
assertNull(nodeInGroupWithNoValue.getProperty(UsernamePasswordProcessor.PASSWORD).getRawValue());
}
示例21
@Override
public ProcessorNode createProcessor(final String groupId, ProcessorDTO processorDTO) {
if (processorDTO.getParentGroupId() != null && !flowController.getFlowManager().areGroupsSame(groupId, processorDTO.getParentGroupId())) {
throw new IllegalArgumentException("Cannot specify a different Parent Group ID than the Group to which the Processor is being added.");
}
// ensure the type is specified
if (processorDTO.getType() == null) {
throw new IllegalArgumentException("The processor type must be specified.");
}
// get the group to add the processor to
ProcessGroup group = locateProcessGroup(flowController, groupId);
try {
// attempt to create the processor
final BundleCoordinate bundleCoordinate = BundleUtils.getBundle(flowController.getExtensionManager(), processorDTO.getType(), processorDTO.getBundle());
ProcessorNode processor = flowController.getFlowManager().createProcessor(processorDTO.getType(), processorDTO.getId(), bundleCoordinate);
// ensure we can perform the update before we add the processor to the flow
verifyUpdate(processor, processorDTO);
// add the processor to the group
group.addProcessor(processor);
// configure the processor
configureProcessor(processor, processorDTO);
return processor;
} catch (IllegalStateException | ComponentLifeCycleException ise) {
throw new NiFiCoreException(ise.getMessage(), ise);
}
}
示例22
@Test
public void testSearchBasedOnExpiration() {
// given
final ProcessorNode processor1 = getProcessorNode("processor1", "processor1Name", AUTHORIZED);
final ProcessorNode processor2 = getProcessorNode("processor2", "processor2Name", AUTHORIZED);
final Connection connection = getConnection("connection", "connectionName", getBasicRelationships(), processor1, processor2, AUTHORIZED);
final FlowFileQueue flowFileQueue = Mockito.mock(FlowFileQueue.class);
Mockito.when(flowFileQueue.getFlowFileExpiration(TimeUnit.MILLISECONDS)).thenReturn(5);
Mockito.when(flowFileQueue.getFlowFileExpiration()).thenReturn("5");
Mockito.when(connection.getFlowFileQueue()).thenReturn(flowFileQueue);
givenRootProcessGroup()
.withProcessor(processor1)
.withProcessor(processor2)
.withConnection(connection);
// when
whenExecuteSearch("expire");
// then
thenResultConsists()
.ofConnection(getSimpleResultFromRoot("connection", "connectionName", "FlowFile expiration: 5" ))
.validate(results);
// when
whenExecuteSearch("expires");
// then
thenResultConsists()
.ofConnection(getSimpleResultFromRoot("connection", "connectionName", "FlowFile expiration: 5" ))
.validate(results);
}
示例23
@Test
public void testComponentsAffectedByVariableOverridden() {
final ProcessGroup child = getFlowController().getFlowManager().createProcessGroup("child");
child.setName("Child");
child.setVariables(Collections.singletonMap("number", "5"));
getRootGroup().setVariables(Collections.singletonMap("number", "1"));
getRootGroup().addProcessGroup(child);
final ProcessorNode processor = createProcessorNode(NumberRefProcessor.class);
processor.setProperties(Collections.singletonMap(NumberRefProcessor.NUMBER.getName(), "${number}"));
moveProcessor(processor, child);
final Set<ComponentNode> componentsAffected = child.getComponentsAffectedByVariable("number");
assertEquals(1, componentsAffected.size());
assertTrue(componentsAffected.contains(processor));
final Set<ComponentNode> rootAffected = getRootGroup().getComponentsAffectedByVariable("number");
assertTrue(rootAffected.isEmpty());
processor.setScheduldingPeriod("1 hour");
child.startProcessor(processor, false);
getRootGroup().setVariables(Collections.singletonMap("number", "2"));
try {
child.setVariables(Collections.singletonMap("number", "10"));
Assert.fail("Updated variable that is referenced by a running processor");
} catch (final IllegalStateException ise) {
// Expected
}
child.stopProcessor(processor);
}
示例24
private boolean isTriggerWhenAnyDestinationAvailable() {
if (connectable.getConnectableType() != ConnectableType.PROCESSOR) {
return false;
}
final ProcessorNode procNode = (ProcessorNode) connectable;
return procNode.isTriggerWhenAnyDestinationAvailable();
}
示例25
/**
* Returns the group id that contains the specified processor.
*
* @param processorId processor id
* @return group id
*/
public String findProcessGroupIdForProcessor(String processorId) {
final ProcessGroup rootGroup = getRootGroup();
final ProcessorNode processor = rootGroup.findProcessor(processorId);
if (processor == null) {
return null;
} else {
return processor.getProcessGroup().getIdentifier();
}
}
示例26
@Test
public void testParametersWhereELSupportedButNotPresent() throws ExecutionException, InterruptedException {
final ProcessorNode generate = createProcessorNode(GenerateProcessor.class);
final ProcessorNode updateAttribute = createProcessorNode(UpdateAttributeWithEL.class);
final ProcessorNode terminate = getTerminateProcessor();
final Connection generatedFlowFileConnection = connect(generate, updateAttribute, REL_SUCCESS);
final Connection updatedAttributeConnection = connect(updateAttribute, terminate, REL_SUCCESS);
final ParameterReferenceManager referenceManager = new StandardParameterReferenceManager(getFlowController().getFlowManager());
final ParameterContext parameterContext = new StandardParameterContext(UUID.randomUUID().toString(), "param-context", referenceManager, null);
parameterContext.setParameters(Collections.singletonMap("test", new Parameter(new ParameterDescriptor.Builder().name("test").build(), "unit")));
getRootGroup().setParameterContext(parameterContext);
final Map<String, String> properties = new HashMap<>();
properties.put("foo", "#{test}");
properties.put("bar", "#{test}#{test}");
properties.put("baz", "foo#{test}bar");
updateAttribute.setProperties(properties);
triggerOnce(generate);
triggerOnce(updateAttribute);
final FlowFileQueue flowFileQueue = updatedAttributeConnection.getFlowFileQueue();
final FlowFileRecord flowFileRecord = flowFileQueue.poll(Collections.emptySet());
assertEquals("unit", flowFileRecord.getAttribute("foo"));
assertEquals("unitunit", flowFileRecord.getAttribute("bar"));
assertEquals("foounitbar", flowFileRecord.getAttribute("baz"));
}
示例27
@Override
public Connectable getConnectable(final String id) {
readLock.lock();
try {
final ProcessorNode node = processors.get(id);
if (node != null) {
return node;
}
final Port inputPort = inputPorts.get(id);
if (inputPort != null) {
return inputPort;
}
final Port outputPort = outputPorts.get(id);
if (outputPort != null) {
return outputPort;
}
final Funnel funnel = funnels.get(id);
if (funnel != null) {
return funnel;
}
return null;
} finally {
readLock.unlock();
}
}
示例28
protected final Future<Void> start(final ProcessorNode procNode) {
final ValidationStatus validationStatus = procNode.performValidation();
if (validationStatus != ValidationStatus.VALID) {
throw new IllegalStateException("Processor is invalid: " + procNode + ": " + procNode.getValidationErrors());
}
return rootProcessGroup.startProcessor(procNode, true);
}
示例29
@Test
public void testAttributesModifiedNotCreatedIfContentModified() throws ExecutionException, InterruptedException, IOException {
final ProcessorNode createProcessor = createGenerateProcessor(0);
final ProcessorNode updateProcessor = createProcessorNode((context, session) -> {
FlowFile flowFile = session.get();
final Map<String, String> attrs = new HashMap<>();
attrs.put("test", "integration");
attrs.put("integration", "true");
flowFile = session.putAllAttributes(flowFile, attrs);
flowFile = session.write(flowFile, out -> out.write('A'));
session.transfer(flowFile, REL_SUCCESS);
}, REL_SUCCESS);
connect(createProcessor, updateProcessor, REL_SUCCESS);
connect(updateProcessor, getTerminateAllProcessor(), REL_SUCCESS);
triggerOnce(createProcessor);
triggerOnce(updateProcessor);
// There should be exactly 1 event.
final ProvenanceEventRepository provRepo = getProvenanceRepository();
assertEquals(1L, provRepo.getMaxEventId().longValue());
final ProvenanceEventRecord firstEvent = provRepo.getEvent(0L);
assertEquals(ProvenanceEventType.CREATE, firstEvent.getEventType());
assertNull(firstEvent.getAttribute("test"));
assertNull(firstEvent.getAttribute("integration"));
final ProvenanceEventRecord secondEvent = provRepo.getEvent(1L);
assertEquals(ProvenanceEventType.CONTENT_MODIFIED, secondEvent.getEventType());
assertEquals("integration", secondEvent.getAttribute("test"));
assertEquals("true", secondEvent.getAttribute("integration"));
}
示例30
@Test
public void testDropEventIfRoutedToAutoTerminatedRelationship() throws ExecutionException, InterruptedException, IOException {
final ProcessorNode createProcessor = createProcessorNode((context, session) -> {
FlowFile flowFile = session.create();
final Map<String, String> attrs = new HashMap<>();
attrs.put("test", "integration");
attrs.put("integration", "true");
flowFile = session.putAllAttributes(flowFile, attrs);
session.transfer(flowFile, REL_SUCCESS);
}, REL_SUCCESS);
createProcessor.setAutoTerminatedRelationships(Collections.singleton(REL_SUCCESS));
triggerOnce(createProcessor);
// There should be exactly 1 event.
final ProvenanceEventRepository provRepo = getProvenanceRepository();
assertEquals(1L, provRepo.getMaxEventId().longValue());
final ProvenanceEventRecord firstEvent = provRepo.getEvent(0L);
assertEquals(ProvenanceEventType.CREATE, firstEvent.getEventType());
assertEquals("integration", firstEvent.getAttribute("test"));
assertEquals("true", firstEvent.getAttribute("integration"));
final ProvenanceEventRecord secondEvent = provRepo.getEvent(1L);
assertEquals(ProvenanceEventType.DROP, secondEvent.getEventType());
assertEquals("integration", secondEvent.getAttribute("test"));
assertEquals("true", secondEvent.getAttribute("integration"));
}