我使用Spark 2.2.0。
如何使用pyspark提供Amazon SQS流以激发结构化流?
这个问题试图通过创建一个自定义接收器来回答非结构化流和scala的问题。
在pyspark中是否有类似的可能?
spark.readStream \
.format("s3-sqs") \
.option("fileFormat", "json") \
.option("queueUrl", ...) \
.schema(...) \
.load()
根据上面的数据库,接收器可用于S3-SQS文件源。然而,对于只有SQS的人来说,一种方法可能是什么。
我尝试理解从AWS-SQS-Receive_Message接收消息。然而,如何直接发送流到火花流并不清楚。
我对Amazon SQS一无所知,但是“我如何使用pyspark提供Amazon SQS流以激发结构化流。”对于任何外部消息传递系统或使用Spark结构化流(又名Spark“流”)的数据源都是不可能的。
在Spark结构化流式传输中,当Spark定期拉取数据时,情况正好相反(类似于Kafka消费者API的工作方式,它拉取数据时没有给出它)。
换句话说,Spark“Streams”只是来自Amazon SQS中“队列”的消息的另一个消费者。
每当我被要求将外部系统与Spark“Streams”集成时,我都会开始使用客户端/消费者API为系统编写客户端。
一旦我有了它,下一步就是为外部系统开发一个自定义的流源代码,例如Amazon SQS,使用上面的示例客户端代码。
在开发自定义流Source
时,您必须执行以下步骤:
>
编写一个实现Source
特征的Scala类
向SparkSQL注册Scala类(自定义Source
),使用META-INF/services/org. apache.park.sql.source.DataSource用完全限定的类名注册
文件,或者使用格式
的完全限定的类名
拥有自定义流式传输源是一个两部分的开发,开发源代码(并可选择将其注册到SparkSQL),并通过格式
方法在Spark结构化流式传输应用程序(Python)中使用它。