Kafka与MySQL的组合使用

这篇具有很好参考价值的文章主要介绍了Kafka与MySQL的组合使用。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

kafka连接mysql,大数据采集与预处理,mysql,数据库,kafka

  1. 根据上面给出的student表,编写Python程序完成如下操作:

(1)读取student表的数据内容,将其转为JSON格式,发送给Kafka;

创建Student表的SQL语句如下:

create table student(
sno char(5),
sname char(10),
ssex char(2),
sage int
);

kafka连接mysql,大数据采集与预处理,mysql,数据库,kafka

kafka连接mysql,大数据采集与预处理,mysql,数据库,kafka

向student表中插入两条记录的SQL语句如下:

insert into student values(‘95001’,’John’,’M’,23);
insert into student values(‘95002’,’Tom’,’M’,23);

kafka连接mysql,大数据采集与预处理,mysql,数据库,kafka

 启动zookeeper和kafka的服务

kafka连接mysql,大数据采集与预处理,mysql,数据库,kafka

kafka连接mysql,大数据采集与预处理,mysql,数据库,kafka

编写一个生产者程序mysql_producer.py:

from kafka import KafkaProducer
import json
import pymysql.cursors

producer = KafkaProducer(bootstrap_servers='localhost:9092',value_serializer=lambda v:json.dumps(v).encode('utf-8'))

connect=pymysql.Connect(
    host='localhost',
    port=3306,
    user='root',
    passwd='123456',
    db='zhangna',
    charset='utf8'
)
cursor=connect.cursor()
sql="select sno,sname,ssex,sage from student;"
cursor.execute(sql)
data=cursor.fetchall()
connect.commit()

for message in data:
    zn={}
    zn['sno']=message[0]
    zn['sname']=message[1]
    zn['sex']=message[2]
    zn['age']=message[3]
    producer.send('mysql_topic',zn)

connect.close()
producer.close()

kafka连接mysql,大数据采集与预处理,mysql,数据库,kafka

(2)再从Kafka中获取到JSON格式数据,打印出来;

编写一个消费者程序mysql_consumer.py:

from kafka import KafkaConsumer
import json
import pymysql.cursors

consumer = KafkaConsumer('mysql_topic',bootstrap_servers=['localhost:9092'],group_id=None,auto_offset_reset='earliest')
for msg in consumer:
	msg1=str(msg.value,encoding="utf-8")
	data=json.loads(msg1)
	print(data)

kafka连接mysql,大数据采集与预处理,mysql,数据库,kafka

终于出来了,出错的原因是encoding,我写成了encodings的缘故

为什么我会出现两条重复记录,原因是我生产者程序运行了多次,生产者多运行一次,消费者程序就会多一次查询文章来源地址https://www.toymoban.com/news/detail-754648.html

到了这里,关于Kafka与MySQL的组合使用的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 大数据职业技能大赛样题(数据采集与实时计算:使用Flink处理Kafka中的数据)

           编写Scala代码,使用Flink消费Kafka中Topic为order的数据并进行相应的数据统计计算(订单信息对应表结构order_info,订单详细信息对应表结构order_detail(来源类型和来源编号这两个字段不考虑,所以在实时数据中不会出现),同时计算中使用order_info或order_detail表中create_ti

    2024年03月24日
    浏览(53)
  • 使用finksql方式将mysql数据同步到kafka中,每次只能同步一张表

    使用finksql方式将mysql数据同步到kafka中,每次只能同步一张表

    2024年02月11日
    浏览(42)
  • 使用Flink实现Kafka到MySQL的数据流转换:一个基于Flink的实践指南

    在现代数据处理架构中,Kafka和MySQL是两种非常流行的技术。Kafka作为一个高吞吐量的分布式消息系统,常用于构建实时数据流管道。而MySQL则是广泛使用的关系型数据库,适用于存储和查询数据。在某些场景下,我们需要将Kafka中的数据实时地写入到MySQL数据库中,本文将介绍

    2024年04月15日
    浏览(53)
  • 【数据采集与预处理】数据接入工具Kafka

    目录 一、Kafka简介 (一)消息队列 (二)什么是Kafka 二、Kafka架构 三、Kafka工作流程分析 (一)Kafka核心组成 (二)写入流程 (三)Zookeeper 存储结构 (四)Kafka 消费过程 四、Kafka准备工作 (一)Kafka安装配置 (二)启动Kafka (三)测试Kafka是否正常工作 五、编写Spark Str

    2024年01月19日
    浏览(73)
  • filebeat采集日志数据到kafka(一)(filebeat->kafka->logstash->es)

    一、filebeat安装 filebeat-kafka版本适配 1、安装包下载 https://www.elastic.co/cn/downloads/past-releases#filebeat 解压 2、新建yml配置文件,如test.yml 3、Filebeat启动和停止 启动:./filebeat -c test.yml 停止:kill -9 PID

    2024年02月16日
    浏览(89)
  • 使用flink的sql-client.sh,测试mysql-->kafka-->kafka-->mysql实时流

    目录 1. 环境介绍 2. mysql建表 3. flinksql建表 3.1 进入flinksql客户端  ​3.2 配置输出格式 ​3.3 flink建表 3.4 任务流配置 4. 测试 4.1 插入测试数据 4.2 查看结果表数据​ 4.3 新增测试数据 4.4 再次查看结果表数据 服务 版本 zookeeper 3.8.0 kafka 3.3.1 flink 1.13.5 mysql 5.7.34 jdk 1.8 scala 2.12 连接器

    2024年02月11日
    浏览(42)
  • Flume采集端口数据kafka消费

    1.flume单独搭建 2.Flume采集端口数据kafka消费

    2024年02月06日
    浏览(49)
  • Debezium同步Mysql数据到Kafka

    Kafka:3.3.2 mysql-connector:1.8.1 (0)前提是安装好mysql,开启binlog (1)下载kafka (2)下载mysql-connector插件 (3)编辑配置文件 (4)启动kafka自带的zk (5)启动kafka (6)启动connect (7)调用api 注意:当成功调用api,创建此连接器后会有如下主题产生:dbhistory.inventory、mysql1、

    2024年02月10日
    浏览(42)
  • Canal+Kafka实现Mysql数据同步

    canal [kə\\\'næl] ,译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费 canal可以用来监控数据库数据的变化,从而获得新增数据,或者修改的数据。 canal是应阿里巴巴存在杭州和美国的双机房部署,存在跨机房同步的业务需求而提出的。

    2024年02月12日
    浏览(50)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包