我有几个Nifi流程组,我想在推广到生产之前运行集成测试。问题是我似乎找不到任何关于如何这样做的留档。
数据来源似乎是一个很有前途的工具来实现我想要的,但是,在流文件的生命周期中,数据被发布到kafka或文件系统。因此,流文件UUID更改,因此我无法使用nifi-api
查询它。
此外,我知道Nifi提供了一个TestRunner
库来运行测试,但是,这似乎只适用于通过代码生成的处理器/处理器组,而不是UI。
有人知道集成和单元测试nifi进程组的工具、框架或模式吗?理想情况下,这将是一个解决方案,您可以在不修改现有工作流的情况下以编程方式比较处理器/处理器组的输入/输出。
随着Apache NiFi注册表的引入,我们已经看到用户将流从开发/沙盒环境提升到测试/QE环境,其中存在围绕“被测流”的现有“测试工具”流,以便他们可以通过流发送可重复和确定性的(或真实生产数据的匿名样本),并将结果与期望值进行比较。
正如您所指出的,有一个TestRunner
类和一个用于单元测试的完整测试框架。虽然手动将UI构建的流转换为编程构建可能很困难,但您也可以创建类似翻译器的东西来接受流模板或flow. xml.gz文件并将其转换为测试框架可处理的内容。
也许管道工会帮你做流量测试。
我们还想测试整个NiFi流,而不仅仅是单个处理器,所以我们创建了这个库并决定开源它。Scala中的简单示例:
// read flow previously exported from NiFi
val template = TemplateDeserializer.deserialize(this.getClass.getClassLoader.getResourceAsStream("exported-flow.xml"))
val flow = NifiTemplateFlowFactory(template).create()
// enqueue some data to any processor
flow.enqueueByName("csv row,12,another value,true", "CsvParserProcessor")
// run entire flow once
flow.run(1)
// get the results from any processor
val records = flow.resultsFromProcessorRelation("LastProcessorInFlow","successRelation")
records should have size 1
这个库仍在开发中,因此欢迎改进和想法!:)