Structured_Streaming和Kafka整合

这篇具有很好参考价值的文章主要介绍了Structured_Streaming和Kafka整合。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

结构化编程模型

输出终端/位置

默认情况下,Spark的结构化流支持多种输出方案:

1- console sink: 将结果数据输出到控制台。主要是用在测试中,并且支持3种输出模式
2- File sink: 输出到文件。将结果数据输出到某个目录下,形成文件数据。只支持append模式
3- foreach sink 和 foreachBatch sink: 将数据进行遍历处理。遍历后输出到哪里,取决于自定义函数。并且支持3种输出模式
4- memory sink: 将结果数据输出到内存中。主要目的是进行再次的迭代处理。数据大小不能过大。支持append模式和complete模式
5- Kafka sink: 将结果数据输出到Kafka中。类似于Kafka中的生产者角色。并且支持3种输出模式

File Sink

import os
from pyspark.sql import SparkSession
import  pyspark.sql.functions as F

# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

if __name__ == '__main__':
    # 1- 创建SparkSession对象
    spark = SparkSession.builder\
        .config("spark.sql.shuffle.partitions",1)\
        .appName('file_sink')\
        .master('local[1]')\
        .getOrCreate()

    # 输出到文件必须要设置checkpointLocation检查点路径
    spark.conf.set("spark.sql.streaming.checkpointLocation", "hdfs://node1:8020/chk")

    # 2- 数据输入
    init_df = spark.readStream\
        .format("socket")\
        .option("host","192.168.88.161")\
        .option("port","55555")\
        .load()

    # 3- 数据处理
    # 4- 数据输出
    # 5- 启动流式任务
    """
        File Sink总结:
            1- 输出到文件必须要设置checkpointLocation检查点路径
            2- 因为结构化流底层是微批处理,如果不手动指定处理间隔,程序会尽可能缩短两个批处理间的间隔。那么会导致小文件问题的产生
            3- 我们可以使用触发器trigger来指定批处理的时间间隔,用来减少小文件的产生
    """
    init_df.writeStream\
        .format("csv")\
        .outputMode("append")\
        .option("sep",",")\
        .option("header","True")\
        .option("encoding","UTF-8")\
        .trigger(processingTime="20 seconds")\
        .start("file:///export/data/child")\
        .awaitTermination()
file sink总结:
1- 要设置检查点数据存放路径。否则会报如下错误
AnalysisException: checkpointLocation must be specified either through
2- 因为结构化流底层是微批处理,如果不手动指定处理间隔,程序会尽可能缩短两个批处理间的间隔。那么会导致小文件问题的产生
3- 可以通过触发器trigger解决小文件的问题。可以通过触发器来调整每一批次产生间隔的时间
4- 支持输出到本地文件系统和HDFS文件系统

foreach sink
允许对输出的数据进行任意的处理操作,具体如何处理由用户自定义函数决定。对输出的数据一个个进行处理操作。
使用方式主要有二种
Structured_Streaming和Kafka整合,kafka,分布式,spark,大数据
方式一:

import os
from pyspark.sql import SparkSession
import  pyspark.sql.functions as F

# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

if __name__ == '__main__':
    # 1- 创建SparkSession对象
    spark = SparkSession.builder\
        .config("spark.sql.shuffle.partitions",1)\
        .appName('foreach_sink')\
        .master('local[*]')\
        .getOrCreate()

    # 2- 数据输入
    init_df = spark.readStream\
        .format("socket")\
        .option("host","192.168.88.161")\
        .option("port","55555")\
        .load()

    # 3- 数据处理
    # 4- 数据输出
    # 5- 启动流式任务
    """
        每输入一条数据都会调用foreach中的函数。
        取Row中某个字段的值:   
            方式一 row.字段名
            方式二 row['字段名']
    """
    def my_foreach_func(row):
        # 打开数据库连接

        # 存储到数据库中
        print(row,row.value)
        print(row['value'])

        # 关闭数据库连接

    init_df.writeStream.foreach(my_foreach_func).outputMode("append").start().awaitTermination()

方式二:这种方式的适用场景是需要和资源打交道的情况(例如:连接和关闭数据库、打开关闭文件等)。通过open和close来处理资源,通过process来对数据进行自定义的处理

import os
from pyspark.sql import SparkSession
import  pyspark.sql.functions as F

# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

if __name__ == '__main__':
    # 1- 创建SparkSession对象
    spark = SparkSession.builder\
        .config("spark.sql.shuffle.partitions",1)\
        .appName('foreach_sink')\
        .master('local[*]')\
        .getOrCreate()

    # 2- 数据输入
    init_df = spark.readStream\
        .format("socket")\
        .option("host","192.168.88.161")\
        .option("port","55555")\
        .load()

    # 3- 数据处理
    # 4- 数据输出
    # 5- 启动流式任务
    # 自定义类
    """
        foreach sink方式二
        1- 必须要创建一个自定义类,类中必须要有3个方法:open、process、close,方法名称、形参不能改变
        2- open中代码执行完成以后,返回值类型是bool类型。如果返回False,那么不会执行process方法;只有返回True的时候才会执行process方法
        3- 一般open用来对资源进行初始化;process使用相关资源对数据进行自定义的处理;close用来对资源进行释放
        4- 一般会和trigger一起配合使用,用来减少消耗资源的操作
    """
    class MyForeachFunc:
        def open(self,partition_id, epoch_id):
            print(f"partition_id={partition_id},epoch_id={epoch_id}")
            # 打开数据库连接
            print("打开数据库连接")
            return True

        def process(self,row):
            # 存储到数据库中
            print(row, row.value)

        def close(self, error):
            # 关闭数据库连接
            print("关闭数据库连接")

    init_df.writeStream\
        .foreach(MyForeachFunc())\
        .trigger(processingTime="20 seconds")\
        .outputMode("append")\
        .start()\
        .awaitTermination()
说明:
	open: 在一个批次中只会调用一次。返回值是bool类型,当返回True的时候,process方法才会被调用
	process: 会被调用多次,该批次内有多少行数据,就会被调用多少次
	close: 在一个批次中只会调用一次。用来关闭在open打开的资源
	
1- 必须要创建一个自定义类,类中必须要有3个方法:open、process、close,方法名称、形参不能改变
2- open中代码执行完成以后,返回值类型是bool类型。如果返回False,那么不会执行process方法;只有返回True的时候才会执行process方法
3- 一般open用来对资源进行初始化;process使用相关资源对数据进行自定义的处理;close用来对资源进行释放
4- 一般会和trigger一起配合使用,用来减少消耗资源的操作

foreachBatch Sink
Structured_Streaming和Kafka整合,kafka,分布式,spark,大数据

import os
from pyspark.sql import SparkSession
import  pyspark.sql.functions as F

# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

if __name__ == '__main__':
    # 1- 创建SparkSession对象
    spark = SparkSession.builder\
        .config("spark.sql.shuffle.partitions",1)\
        .appName('foreach_sink')\
        .master('local[*]')\
        .getOrCreate()

    # 2- 数据输入
    init_df = spark.readStream\
        .format("socket")\
        .option("host","192.168.88.161")\
        .option("port","55555")\
        .load()

    # 3- 数据处理
    # 4- 数据输出
    # 5- 启动流式任务
    """
        batch_df:有界的DataFrame,可以调用show()方法
        batch_id:批次ID
    """
    def my_foreach_batch_func(batch_df, batch_id):
        batch_df.show()
        print(batch_id)

    init_df.writeStream.foreachBatch(my_foreach_batch_func).outputMode("append").start().awaitTermination()
说明: process_fun(batch_df, batch_id)
batch_df: 该批次中的数据形成的有界DataFrame
batch_id: 批次的编号

设置触发器Trigger

触发器Trigger:决定多久执行一次操作并且输出结果。也就是在结构化流中,处理完一批数据以后,等待一会,再处理下一批数据

主要提供如下几种触发器:

  • 1- 默认方案:也就是不使用触发器的情况。如果没有明确指定,那么结构化流会自动进行决策每一个批次的大小。在运行过程中,会尽可能让每一个批次间的间隔时间变得更短(常用)
result_df.writeStream.foreachBatch(func)\
    .outputMode('append')\
    .start()\
    .awaitTermination()
  • 2- 配置固定的时间间隔:在结构化流运行的过程中,当一批数据处理完以后,下一批数据需要等待一定的时间间隔才会进行处理**(常用,推荐使用)**
result_df.writeStream.foreachBatch(func)\
    .outputMode('append')\
    .trigger(processingTime='5 seconds')\
    .start()\
    .awaitTermination()
    
情形说明:
1- 上一批次的数据在时间间隔内处理完成了,那么会等待我们配置触发器固定的时间间隔结束,才会开始处理下一批数据
2- 上一批次的数据在固定时间间隔结束的时候才处理完成,那么下一批次会立即被处理,不会等待
3- 上一批次的数据在固定时间间隔内没有处理完成,那么下一批次会等待上一批次处理完成以后立即开始处理,不会等待
  • 3- 仅此一次:在运行的过程中,程序只需要执行一次,然后就退出。这种方式适用于进行初始化操作,以及关闭资源等
result_df.writeStream.foreachBatch(func)\
    .outputMode('append')\
    .trigger(once=True)\
    .start()\
    .awaitTermination()

Checkpoint检查点目录设置

设置检查点,目的是为了提供容错性。当程序出现失败了,可以从检查点的位置,直接恢复处理即可。避免出现重复处理的问题

检查点目录主要包含以下几个目录位置:
Structured_Streaming和Kafka整合,kafka,分布式,spark,大数据

1-偏移量offsets: 记录每个批次中的偏移量。为了保证给定的批次始终包含相同的数据。在处理数据之前会将offset信息写入到该目录
2-提交记录commits: 记录已经处理完成的批次。重启任务的时候会检查完成的批次和offsets目录中批次的记录进行对比。确定接下来要处理的批次
3-元数据文件metadata: 和整个查询关联的元数据信息,目前只保留当前的job id
4-数据源sources: 是数据源(Source)各个批次的读取的详情
5-数据接收端sinks: 是数据接收端各个批次的写出的详情
6-状态state: 当有状态操作的时候,例如:累加、聚合、去重等操作场景,这个目录会用来记录这些状态数据。根据配置周期性的生成。snapshot文件用于记录状态

如何设置检查点:

1- SparkSession.conf.set("spark.sql.streaming.checkpointLocation", "检查点路径")
2- option("checkpointLocation", "检查点路径")

推荐: 检查点路径支持本地和HDFS。推荐使用HDFS路径

Spark和Kafka整合

从kafka中读取数据

spark和kafka集成官网文档:
https://spark.apache.org/docs/3.1.2/structured-streaming-kafka-integration.html

流式处理

官方给出方案:

# 订阅Kafka的一个Topic,从最新的消息数据开始消费
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")


# 订阅Kafka的多个Topic,多个Topic间使用英文逗号进行分隔。从最新的消息数据开始消费
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1,topic2") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")


# 订阅一个Topic,并且指定header信息
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1") \
  .option("includeHeaders", "true") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "headers")


# 订阅符合规则的Topic,从最新的数据开始消费
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribePattern", "topic.*") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

从某一个Topic中读取消息数据:

import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

if __name__ == '__main__':
    # 1- 创建SparkSession对象
    spark = SparkSession.builder\
        .config("spark.sql.shuffle.partitions",1)\
        .appName('ss_read_kafka_1_topic')\
        .master('local[*]')\
        .getOrCreate()

    # 2- 数据输入
    # 默认从最新的地方开始消费
    init_df = spark.readStream\
        .format("kafka")\
        .option("kafka.bootstrap.servers","node1.itcast.cn:9092,node2.itcast.cn:9092")\
        .option("subscribe","test01")\
        .load()

    # 3- 数据处理
    result_df1 = init_df.select(F.expr("cast(value as string) as value"))

    # selectExpr = select + F.expr
    result_df2 = init_df.selectExpr("cast(value as string) as value")

    result_df3 = init_df.withColumn("value",F.expr("cast(value as string)"))

    # 4- 数据输出
    # 5- 启动流式任务
    """
        如果有多个输出,那么只能在最后一个start的后面写awaitTermination()
    """
    result_df1.writeStream.queryName("result_df1").format("console").outputMode("append").start()
    result_df2.writeStream.queryName("result_df2").format("console").outputMode("append").start()
    result_df3.writeStream.queryName("result_df3").format("console").outputMode("append").start().awaitTermination()

按规则订阅Topic:

import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

if __name__ == '__main__':
    # 1- 创建SparkSession对象
    spark = SparkSession.builder\
        .config("spark.sql.shuffle.partitions",1)\
        .appName('ss_read_kafka_multi_topic')\
        .master('local[*]')\
        .getOrCreate()

    # 2- 数据输入
    # 默认从最新的地方开始消费
    init_df = spark.readStream\
        .format("kafka")\
        .option("kafka.bootstrap.servers","node1.itcast.cn:9092,node2.itcast.cn:9092")\
        .option("subscribePattern","test.*")\
        .load()

    # 3- 数据处理
    result_df1 = init_df.select("topic",F.expr("cast(value as string) as value"))

    # selectExpr = select + F.expr
    result_df2 = init_df.selectExpr("topic","cast(value as string) as value")

    result_df3 = init_df.withColumn("value",F.expr("cast(value as string)"))

    # 4- 数据输出
    # 5- 启动流式任务
    """
        如果有多个输出,那么只能在最后一个start的后面写awaitTermination()
    """
    result_df1.writeStream.queryName("result_df1").format("console").outputMode("append").start()
    result_df2.writeStream.queryName("result_df2").format("console").outputMode("append").start()
    result_df3.writeStream.queryName("result_df3").format("console").outputMode("append").start().awaitTermination()

对接kafka后,返回的结果数据内容:

key: 发送数据的key值。如果没有,就为null
value: 最重要的字段。发送数据的value值,也就是消息内容。如果没有,就为null
topic: 表示消息是从哪个Topic中消费出来
partition: 分区编号。表示消费到的该条数据来源于Topic的哪个分区
offset: 表示消息偏移量

timestamp: 接收的时间戳
timestampType: 时间戳类型(无意义)

类型的说明:

列名 类型
key binary
value binary
topic string
partition int
offset long
timestamp timestamp
timestampType int
headers (optional) array

批处理

官方给出方案:

# 订阅一个Topic主题, 默认从最早到最晚的偏移量范围
df = spark \
  .read \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
 
 
# 批处理订阅Kafka的多个Topic数据。并且可以通过startingOffsets和endingOffsets指定要消费的消息偏移
量(offset)范围。"topic1":{"0":23,"1":-2} 含义是:topic1,"0":23从分区编号为0的分区的
offset=23地方开始消费,"1":-2 从分区编号为1的分区的最开始的地方开始消费
df = spark \
.read \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribe", "topic1,topic2") \
.option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""") \
.option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""") \
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")


# 通过正则匹配多个Topic, 默认从最早到最晚的偏移量范围
df = spark \
  .read \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribePattern", "topic.*") \
  .option("startingOffsets", "earliest") \
  .option("endingOffsets", "latest") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

订阅一个Topic:

import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

if __name__ == '__main__':
    # 1- 创建SparkSession对象
    spark = SparkSession.builder\
        .config("spark.sql.shuffle.partitions",1)\
        .appName('sparksql_read_kafka_1_topic')\
        .master('local[*]')\
        .getOrCreate()

    # 2- 数据输入
    # 默认从Topic开头一直消费到结尾
    init_df = spark.read\
        .format("kafka")\
        .option("kafka.bootstrap.servers","node1.itcast.cn:9092,node2.itcast.cn:9092")\
        .option("subscribe","test02")\
        .load()

    # 3- 数据处理
    result_df1 = init_df.select(F.expr("cast(value as string) as value"))

    # selectExpr = select + F.expr
    result_df2 = init_df.selectExpr("cast(value as string) as value")

    result_df3 = init_df.withColumn("value",F.expr("cast(value as string)"))

    # 4- 数据输出
    print("result_df1")
    result_df1.show()

    print("result_df2")
    result_df2.show()

    print("result_df3")
    result_df3.show()

    # 5- 释放资源
    spark.stop()

指定startingOffsets参数:

import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

if __name__ == '__main__':
    # 1- 创建SparkSession对象
    spark = SparkSession.builder\
        .config("spark.sql.shuffle.partitions",1)\
        .appName('sparksql_read_kafka_multi_topic')\
        .master('local[*]')\
        .getOrCreate()

    # 2- 数据输入
    # 默认从Topic开头一直消费到结尾
    # 对每个分区指定具体消费的offset,了解即可。实际工作很少用
    # init_df = spark.read\
    #     .format("kafka")\
    #     .option("kafka.bootstrap.servers","node1.itcast.cn:9092,node2.itcast.cn:9092")\
    #     .option("subscribe","test02,test03")\
    #     .option("startingOffsets","""{"test02":{"0":1}}""")\
    #     .load()

    init_df = spark.read \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "node1.itcast.cn:9092,node2.itcast.cn:9092") \
        .option("subscribe", "test02,test03") \
        .option("startingOffsets", "earliest") \
        .load()

    # 3- 数据处理
    result_df1 = init_df.select(F.expr("cast(value as string) as value"))

    # selectExpr = select + F.expr
    result_df2 = init_df.selectExpr("cast(value as string) as value")

    result_df3 = init_df.withColumn("value",F.expr("cast(value as string)"))

    # 4- 数据输出
    print("result_df1")
    result_df1.show()

    print("result_df2")
    result_df2.show()

    print("result_df3")
    result_df3.show(n=100)

    # 5- 释放资源
    spark.stop()

可能遇到的错误:
Structured_Streaming和Kafka整合,kafka,分布式,spark,大数据

原因: 如果有指定startingOffsets或者endingOffsets,需要指定所有分区的offset

-1: latest,最新的地方
-2: earliest,最旧的地方

必备参数:

选项 说明
assign 通过一个Json 字符串的方式来表示: {“topicA”:[0,1],“topicB”:[2,4]} 设置使用特定的TopicPartitions
subscribe 以逗号分隔的Topic主题列表 要订阅的主题列表
subscribePattern 正则表达式字符串 订阅匹配符合条件的Topic。assign、subscribe、subscribePattern任意指定一个。
kafka.bootstrap.servers 以英文逗号分隔的host:port列表 指定kafka服务的地址

数据写入Kafka中

流式处理

官方给出方案:

# 将Key和Value的数据都写入到Kafka当中
ds = df \
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("topic", "topic1") \
.start()


# 将Key和Value的数据都写入到Kafka当中。使用DataFrame数据中的Topic字段来指定要将数据写入到Kafka集群
的哪个Topic中。这种方式适用于消费多个Topic的情况
ds = df \
.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.start()

写出到指定Topic

import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

if __name__ == '__main__':
    # 1- 创建SparkSession对象
    spark = SparkSession.builder\
        .config("spark.sql.shuffle.partitions",1)\
        .appName('ss_read_kafka_1_topic')\
        .master('local[*]')\
        .getOrCreate()

    # 2- 数据输入
    # 默认从最新的地方开始消费
    init_df = spark.readStream\
        .format("kafka")\
        .option("kafka.bootstrap.servers","node1.itcast.cn:9092,node2.itcast.cn:9092")\
        .option("subscribe","test01")\
        .load()

    # 3- 数据处理
    result_df = init_df.select(F.expr("concat(cast(value as string),'_itheima') as value"))

    # 4- 数据输出
    # 5- 启动流式任务
    result_df.writeStream.format("kafka")\
        .option("kafka.bootstrap.servers","node1.itcast.cn:9092,node2.itcast.cn:9092")\
        .option("topic","test02")\
        .option("checkpointLocation", "hdfs://node1:8020/chk")\
        .start()\
        .awaitTermination()

从数据内容中解析得到Topic,然后写入Kafka:

import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

if __name__ == '__main__':
    # 1- 创建SparkSession对象
    spark = SparkSession.builder\
        .config("spark.sql.shuffle.partitions",1)\
        .appName('ss_read_kafka_multi_topic')\
        .master('local[*]')\
        .getOrCreate()

    # 2- 数据输入
    # 默认从最新的地方开始消费
    init_df = spark.readStream\
        .format("kafka")\
        .option("kafka.bootstrap.servers","node1.itcast.cn:9092,node2.itcast.cn:9092")\
        .option("subscribePattern","test.*")\
        .load()

    # 3- 数据处理
    # 错误写法:缺少topic字段
    # result_df = init_df.select(F.expr("topic as new_topic"),F.expr("concat(cast(value as string),'_',topic) as value"))
    result_df = init_df.select("topic",F.expr("concat(cast(value as string),'_',topic) as value"))

    # 4- 数据输出
    # 5- 启动流式任务
    result_df.writeStream.format("console").outputMode("append").start()

    result_df.writeStream.format("kafka")\
        .option("kafka.bootstrap.servers","node1.itcast.cn:9092,node2.itcast.cn:9092")\
        .option("checkpointLocation", "hdfs://node1:8020/chk")\
        .start()\
        .awaitTermination()

可能遇到的错误一:
Structured_Streaming和Kafka整合,kafka,分布式,spark,大数据

原因: 当从数据中解析得到Topic信息的时候,最终输出到Kafka的那个DataFrame中必须要有topic

可能遇到的错误二:
Structured_Streaming和Kafka整合,kafka,分布式,spark,大数据

原因: 输出到Kafka中的数据,需要命名value。而且数据类型需要是string或者binary(二进制)
备注 Column 数据类型
可选字段 key string or binary
必填字段 value string or binary
可选字段 headers string or binary
必填字段 topic string
可选字段 partition int

批处理

官方给出方案:文章来源地址https://www.toymoban.com/news/detail-803244.html

# 从DataFrame中写入key-value数据到一个选项中指定的特定Kafka topic中
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
  .write \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("topic", "topic1") \
  .save()
  

# 使用数据中指定的主题将key-value数据从DataFrame写入Kafka
df.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \
  .write \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .save()
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

if __name__ == '__main__':
    # 1- 创建SparkSession对象
    spark = SparkSession.builder\
        .config("spark.sql.shuffle.partitions",1)\
        .appName('ss_read_kafka_1_topic')\
        .master('local[*]')\
        .getOrCreate()

    # 2- 数据输入
    # 默认从最新的地方开始消费
    init_df = spark.read\
        .format("kafka")\
        .option("kafka.bootstrap.servers","node1.itcast.cn:9092,node2.itcast.cn:9092")\
        .option("subscribe","test02")\
        .load()

    # 3- 数据处理
    result_df = init_df.select(F.expr("concat(cast(value as string),'_itheima') as value"))

    # 4- 数据输出
    # 5- 启动流式任务
    result_df.write.format("kafka")\
        .option("kafka.bootstrap.servers","node1.itcast.cn:9092,node2.itcast.cn:9092")\
        .option("topic","test02")\
        .option("checkpointLocation", "hdfs://node1:8020/chk")\
        .save()
备注 Column 数据类型
可选字段 key string or binary
必填字段 value string or binary
可选字段 headers array
必填字段 topic string
可选字段 partition int

到了这里,关于Structured_Streaming和Kafka整合的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 结构化流(Structured Streaming)

    有界数据: 无界数据: 结构化流是构建在Spark SQL处理引擎之上的一个流式的处理引擎,主要是针对无界数据的处理操作。对于结构化流同样也支持多种语言操作的API:比如 Python Java Scala SQL … Spark的核心是RDD。RDD出现主要的目的就是提供更加高效的离线的迭代计算操作,RDD是针

    2024年01月17日
    浏览(56)
  • Spark Structured Streaming使用教程

    Structured Streaming是一个基于Spark SQL引擎的可扩展和容错流处理引擎,Spark SQL引擎将负责增量和连续地运行它,并在流数据继续到达时更新最终结果。 Structured Streaming把持续不断的流式数据当做一个不断追加的表,这使得新的流处理模型与批处理模型非常相似。您将把流计算表

    2024年02月03日
    浏览(56)
  • 数据平台的实时处理:Streaming和Apache Kafka

    随着数据的增长和数据处理的复杂性,实时数据处理变得越来越重要。实时数据处理是指在数据产生时或者数据产生后的很短时间内对数据进行处理的技术。这种技术在各个领域都有广泛的应用,如实时推荐、实时监控、实时分析、实时语言翻译等。 在实时数据处理中,St

    2024年04月14日
    浏览(42)
  • Flink与Spark Streaming在与kafka结合的区别!

    首先,我们先看下图,这是一张生产消息到kafka,从kafka消费消息的结构图。 当然, 这张图很简单,拿这张图的目的是从中可以得到的跟本节文章有关的消息,有以下两个: 1,kafka中的消息不是kafka主动去拉去的,而必须有生产者往kafka写消息。 2,kafka是不会主动往消费者发

    2024年04月17日
    浏览(50)
  • 【Spark编程基础】第7章 Structured Streaming

    7.1.1 基本概念 Structured Streaming的关键思想是将实时数据流视为一张正在不断添加数据的表 可以把流计算等同于在一个静态表上的批处理查询,Spark会在不断添加数据的无界输入表上运行计算,并进行增量查询 在无界表上对输入的查询将生成结果表,系统每隔一定的周期会触发

    2024年02月08日
    浏览(57)
  • 2 Data Streaming Pipelines With Flink and Kafka

    作者:禅与计算机程序设计艺术 数据流是一个连续不断的、产生、存储和处理数据的过程。传统上,数据流编程都是基于特定平台(比如:消息队列,数据仓库,事件溯源)的SDK或者API进行开发,但随着云计算和容器技术的发展,越来越多的企业选择使用开源工具实现自己的

    2024年02月08日
    浏览(54)
  • pyspark之Structured Streaming file文件案例1

    # generate_file.py  # 生成数据 生成500个文件,每个文件1000条数据 # 生成数据格式:eventtime name province action ()时间 用户名 省份 动作) import os  import time import shutil import time FIRST_NAME = [\\\'Zhao\\\', \\\'Qian\\\', \\\'Sun\\\', \\\'Li\\\', \\\'Zhou\\\', \\\'Wu\\\', \\\'Zheng\\\', \\\'Wang\\\'] SECOND_NAME = [\\\'San\\\', \\\'Si\\\', \\\'Wu\\\', \\\'Chen\\\', \\\'Yang\\\', \\\'Min\\\', \\\'Jie\\\', \\\'Qi

    2024年01月21日
    浏览(35)
  • Structured Streaming: Apache Spark的流处理引擎

    欢迎来到我们的技术博客!今天,我们要探讨的主题是Apache Spark的一个核心组件——Structured Streaming。作为一个可扩展且容错的流处理引擎,Structured Streaming使得处理实时数据流变得更加高效和简便。 Structured Streaming是基于Apache Spark SQL引擎构建的高级流处理框架。它允许用户

    2024年01月25日
    浏览(52)
  • Spark编程实验五:Spark Structured Streaming编程

    目录 一、目的与要求 二、实验内容 三、实验步骤 1、Syslog介绍 2、通过Socket传送Syslog到Spark 3、Syslog日志拆分为DateFrame 4、对Syslog进行查询 四、结果分析与实验体会 1、通过实验掌握Structured Streaming的基本编程方法; 2、掌握日志分析的常规操作,包括拆分日志方法和分析场景

    2024年02月20日
    浏览(39)
  • 推荐系统架构设计实践:Spark Streaming+Kafka构建实时推荐系统架构

    作者:禅与计算机程序设计艺术 推荐系统(Recommendation System)一直都是互联网领域一个非常火热的话题。其主要目标是在用户多样化的信息环境中,通过分析用户的偏好、消费习惯等数据,提供个性化的信息推送、商品推荐、购物指导等服务。如何设计一个推荐系统的架构及

    2024年02月08日
    浏览(53)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包