Spark读取kafka(流式和批数据)

这篇具有很好参考价值的文章主要介绍了Spark读取kafka(流式和批数据)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

spark读取kafka(批数据处理)

Spark读取kafka(流式和批数据),Spark阶段,spark,kafka,大数据

Spark读取kafka(流式和批数据),Spark阶段,spark,kafka,大数据

# 按照偏移量读取kafka数据
from pyspark.sql import SparkSession

ss = SparkSession.builder.getOrCreate()

# spark读取kafka
options = {
    # 写kafka配置信息
    # 指定kafka的连接的broker服务节点信息
    'kafka.bootstrap.servers': 'node1:9092',
    # 指定主题
    'subscribe': 'itcast',# 读取的主题不存在会自动创建
    # todo 注意一:连接的配置
    #       主题名称 ,分区编号,偏移量
    # 指定起始偏移量   {主题名称:{分区编号0:偏移量,分区编号1:偏移量....}}
    'startingOffsets':""" {"itcast":{"0":0,"1":1}} """,
    # 指定结束偏移量  {主题名称:{分区编号0:偏移量,分区编号1:偏移量....}}
    'endingOffsets':""" {"itcast":{"0":3,"1":2}}  """
    # 注意点  : 偏移量的区间是左闭右开 ,结束偏移的指定按照最大偏移量加一 ,所有分区都要指定
}
# 读取
# format 指定读取kafka
df = ss.read.load(format='kafka',**options)
# todo 注意二:这一步的数据处理(将value转化为字符串类型)是必须做的,不然你看不懂数据。
#       可以用df.的方式,那我后来怎么都没怎么见过了0
df_select = df.select('key',df.value.cast('string'),'topic','partition','offset','timestamp','timestampType')
# 查看df数据
# todo 注意三:这里使用.show()的方式的,是因为它是有界表
df_select.show()

Spark读取kafka(流式和批数据),Spark阶段,spark,kafka,大数据

spark读取kafka(流数据处理)

Spark读取kafka(流式和批数据),Spark阶段,spark,kafka,大数据文章来源地址https://www.toymoban.com/news/detail-811711.html

# 流式读取kafka数据
from pyspark.sql import SparkSession

ss = SparkSession.builder.getOrCreate()
# todo 注意一:定义kafka的连接配置
options={
    # 写kafka配置信息
    # 指定kafka的连接的broker服务节点信息
    'kafka.bootstrap.servers': 'node1:9092',
    # 指定主题
    'subscribe': 'itheima'  # 读取的主题不存在会自动创建
}
df = ss.readStream.load(format='kafka',**options)
# todo 注意二:必须将value转化为string类型

# 计算
df_res = df.select('key',df.value.cast('string'),'topic','partition','offset','timestamp')

# 输出
# todo 注意三:输出不是df_res.show,
df_res.writeStream.start(format='console',outputMode='append').awaitTermination()

到了这里,关于Spark读取kafka(流式和批数据)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Spark 数据读取保存

    Spark 的数据读取及数据保存可以从两个维度来作区分:文件格式以及文件系统: 文件格式: Text 文件、 Json 文件、 csv 文件、 Sequence 文件以及 Object 文件 文件系统:本地文件系统、 HDFS、Hbase 以及数据库 text/hdfs 类型的文件读都可以用 textFile(path) ,保存使用 saveAsTextFile(path)

    2024年02月09日
    浏览(42)
  • Spark连接Hive读取数据

            Ubuntu 16.04 LTS         ubuntu-16.04.6-desktop-i386.iso          spark-3.0.0-bin-without-hadoop.tgz           hadoop-3.1.3.tar.gz         apache-hive-3.1.2-bin.tar.gz         spark-hive_2.12-3.2.2.jar         openjdk 1.8.0_292         mysql-connector-java-5.1.40.tar.gz         

    2024年02月01日
    浏览(39)
  • spark读取数据写入hive数据表

    目录 spark 读取数据 spark从某hive表选取数据写入另一个表的一个模板 概述: create_tabel建表函数,定义日期分区 删除原有分区drop_partition函数 generate_data 数据处理函数,将相关数据写入定义的表中  注: 关于 insert overwrite/into 中partition时容易出的分区报错问题:  添加分区函数

    2024年01月19日
    浏览(52)
  • 【Spark大数据习题】习题_Spark SQL&&&Kafka&& HBase&&Hive

    PDF资源路径-Spark1 PDF资源路径-Spark2 一、填空题 1、Scala语言的特性包含面向对象编程、函数式编程的、静态类型的、可扩展的、可以交互操作的。 2、在Scala数据类型层级结构的底部有两个数据类型,分别是 Nothing和Null。 3、在Scala中,声明变量的有var声明变量和val声明常

    2024年02月06日
    浏览(43)
  • Spark Doris Connector 可以支持通过 Spark 读取 Doris 数据类型不兼容报错解决

    doris版本: 1.2.8 Spark Connector for Apache Doris 版本: spark-doris-connector-3.3_2.12-1.3.0.jar:1.3.0-SNAPSHOT spark版本:spark-3.3.1 Spark Doris Connector - Apache Doris 目前最新发布版本: Release Apache Doris Spark Connector 1.3.0 Release · apache/doris-spark-connector · GitHub 2.1、Spark Doris Connector概述 Spark Doris Connector 可

    2024年01月23日
    浏览(47)
  • 使用Spark SQL读取阿里云OSS的数据

    创建一个table,并关联OSS目录路径 如果数据文件是 Parquet 格式的,可以自动推断出表的schema,很方便。 这样就可以使用sql语句读取数据了。 首先创建一个关联OSS目录的 database : 现在就可以通过sql写入数据到OSS了,如下:

    2024年02月02日
    浏览(52)
  • HDFS常用操作以及使用Spark读取文件系统数据

    掌握在Linux虚拟机中安装Hadoop和Spark的方法; 熟悉HDFS的基本使用方法; 掌握使用Spark访问本地文件和HDFS文件的方法。 启动Hadoop,在HDFS中创建用户目录“/user/hadoop” 在Linux系统的本地文件系统的“/home/hadoop”目录下新建一个文本文件test.txt,并在该文件中随便输入一些内容,

    2024年04月22日
    浏览(41)
  • 大数据编程实验一:HDFS常用操作和Spark读取文件系统数据

    这是我们大数据专业开设的第二门课程——大数据编程,使用的参考书是《Spark编程基础》,这门课跟大数据技术基础是分开学习的,但这门课是用的我们自己在电脑上搭建的虚拟环境进行实验的,不是在那个平台上,而且搭建的还是伪分布式,这门课主要偏向于有关大数据

    2024年04月10日
    浏览(52)
  • Spark Streaming + Kafka构建实时数据流

    1. 使用Apache Kafka构建实时数据流 参考文档链接:https://cloud.tencent.com/developer/article/1814030 2. 数据见UserBehavior.csv 数据解释:本次实战用到的数据集是CSV文件,里面是一百零四万条淘宝用户行为数据,该数据来源是阿里云天池公开数据集 根据这一csv文档运用Kafka模拟实时数据流,

    2024年02月12日
    浏览(43)
  • 【大数据技术】Spark+Flume+Kafka实现商品实时交易数据统计分析实战(附源码)

    需要源码请点赞关注收藏后评论区留言私信~~~ 1)Kafka 是一个非常通用的系统,你可以有许多生产者和消费者共享多个主题Topics。相比之下,Flume是一个专用工具被设计为旨在往HDFS,HBase等发送数据。它对HDFS有特殊的优化,并且集成了Hadoop的安全特性。如果数据被多个系统消

    2024年02月03日
    浏览(49)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包