Lecture 8 Flink流处理-Kafka简介与基本使用(Appendix Ⅰ)

这篇具有很好参考价值的文章主要介绍了Lecture 8 Flink流处理-Kafka简介与基本使用(Appendix Ⅰ)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

0 前言

        认识一个新框架的时候,先要知道这个东西干什么用的,具体有哪些实际应用场景,根据它的应用场景去初步推测它的架构(包括数据结构,设计模式等)是怎样的,而不是上来就看定义概念。

1.1 Kafka应用场景

1.1.1 异步处理

        电商网站中,新的用户注册时,需要将用户的信息保存到数据库中,同时还需要额外发送注册的邮件通知、以及短信注册码给用户。但因为发送邮件、发送注册短信需要连接外部的服务器,需要额外等待一段时间,此时,就可以使用消息队列来进行异步处理,从而实现快速响应。

Lecture 8 Flink流处理-Kafka简介与基本使用(Appendix Ⅰ),大数据实时计算,flink,大数据

1.1.2 系统解耦

Lecture 8 Flink流处理-Kafka简介与基本使用(Appendix Ⅰ),大数据实时计算,flink,大数据

1.1.3 流量削峰

Lecture 8 Flink流处理-Kafka简介与基本使用(Appendix Ⅰ),大数据实时计算,flink,大数据

1.1.4 日志处理

        大型电商网站(淘宝、京东、国美、苏宁...)、App(抖音、美团、滴滴等)等需要分析用户行为,要根据用户的访问行为来发现用户的喜好以及活跃情况,需要在页面上收集大量的用户访问信息。

Lecture 8 Flink流处理-Kafka简介与基本使用(Appendix Ⅰ),大数据实时计算,flink,大数据

1.2 生产者、消费者模型

        我们之前学习过Java的服务器开发,Java服务器端开发的交互模型是这样的:

Lecture 8 Flink流处理-Kafka简介与基本使用(Appendix Ⅰ),大数据实时计算,flink,大数据

        我们之前还学习过使用Java JDBC来访问操作MySQL数据库,它的交互模型是这样的:

Lecture 8 Flink流处理-Kafka简介与基本使用(Appendix Ⅰ),大数据实时计算,flink,大数据

        它也是一种请求响应模型,只不过它不再是基于http协议,而是基于MySQL数据库的通信协议。

        而如果我们基于消息队列来编程,此时的交互模式成为:生产者、消费者模型。

Lecture 8 Flink流处理-Kafka简介与基本使用(Appendix Ⅰ),大数据实时计算,flink,大数据

1.3 消息队列的两种模式

1.3.1 点对点模式

Lecture 8 Flink流处理-Kafka简介与基本使用(Appendix Ⅰ),大数据实时计算,flink,大数据

        消息发送者生产消息发送到消息队列中,然后消息接收者从消息队列中取出并且消费消息。消息被消费以后,消息队列中不再有存储,所以消息接收者不可能消费到已经被消费的消息。

        点对点模式特点:

  1.  每个消息只有一个接收者(Consumer)(即一旦被消费,消息就不再在消息队列中)
  2.  发送者和接收者间没有依赖性,发送者发送消息之后,不管有没有接收者在运行,都不会影响到发送者下次发送消息;
  3.  接收者在成功接收消息之后需向队列应答成功,以便消息队列删除当前接收的消息;

1.3.2 发布订阅模式

Lecture 8 Flink流处理-Kafka简介与基本使用(Appendix Ⅰ),大数据实时计算,flink,大数据

发布/订阅模式特点:

  1.  每个消息可以有多个订阅者;
  2.  发布者和订阅者之间有时间上的依赖性。针对某个主题(Topic)的订阅者,它必须创建一个订阅者之后,才能消费发布者的消息。
  3.  为了消费消息,订阅者需要提前订阅该角色主题,并保持在线运行;

        此时你脑海里想到了一些什么东西?是不是有观察者模式?但他们还是有一些区别的。

(1)从组成成分上看:

  • 观察者模式里,只有两个角色:观察者 和 目标者(也可以叫被观察者)。其中 被观察者 是重点。
  • 发布订阅模式里,不仅仅只有 发布者 和 订阅者,还有一个 事件中心。其中 事件中心 是重点。

Lecture 8 Flink流处理-Kafka简介与基本使用(Appendix Ⅰ),大数据实时计算,flink,大数据

(2)从关系上看 

  • 观察者和目标者,是松耦合的关系
  • 发布者和订阅者,则完全不存在耦合

(3)从使用角度上看

  • 观察者模式,多用于 单个应用内部 (Vue中的数据响应式变化就是观察者模式)
  • 发布订阅模式,更多用于 跨应用的模式(比如我们常用的 消息中间件

1.4 Kafka的概念

        看了它大概是个什么样子的,我们再来看它的概念就很好理解了。

        官方文档:Apache Kafka

        官方描述:Kafka是由Apache软件基金会开发的一个开源流平台,由Scala和Java编写。

        Apache Kafka是一个分布式流平台。一个分布式的流平台应该包含3点关键的能力

        1. 发布和订阅流数据流,类似于消息队列或者是企业消息传递系统;

        2. 以容错的持久化方式存储数据流;

        3. 处理数据流。

        英文原版

        1. Publish and subscribe to streams of records, similar to a message queue or enterprise

messaging system.

        2. Store streams of records in a fault-tolerant durable way.

         3. Process streams of records as they occur.

        重点关键三个部分的关键词:

        Publish and subscribe:发布与订阅

        Store:存储

        Process:处理


        通常将Apache Kafka用在两类程序:

        1. 建立实时数据管道,以可靠地在系统或应用程序之间获取数据

        2. 构建实时流应用程序,以转换或响应数据流

Lecture 8 Flink流处理-Kafka简介与基本使用(Appendix Ⅰ),大数据实时计算,flink,大数据

        上图,我们可以看到:

        1. Producers:可以有很多的应用程序,将消息数据放入到Kafka集群中;

        2. Consumers:可以有很多的应用程序,将消息数据从Kafka集群中拉取出来;

        3. Connectors:Kafka的连接器可以将数据库中的数据导入到Kafka,也可以将Kafka的数据导出到数据库中;

        4. Stream Processors:流处理器可以Kafka中拉取数据,也可以将数据写入到Kafka中。


        前面我们了解到,消息队列中间件有很多,为什么我们要选择Kafka?

特性

ActiveMQ

RabbitMQ

Kafka

RocketMQ

所属社区/公司

Apache

Mozilla Public License

Apache

Apache/Ali

成熟度

成熟

成熟

成熟

比较成熟

生产者-消费者模式

支持

支持

支持

支持

发布-订阅

支持

支持

支持

支持

REQUEST-REPLY

支持

支持

-

支持

API完备性

低(静态配置)

多语言支持

支持JAVA优先

语言无关

支持,JAVA优先

支持

单机呑吐量

万级(最差)

万级

十万级

十万级(最高)

消息延迟

-

微秒级

毫秒级

-

可用性

高(主从)

高(主从)

非常高(分布式)

消息丢失

-

理论上不会丢失

-

消息重复

-

可控制

理论上会有重复

-

事务

支持

不支持

支持

支持

文档的完备性

提供快速入门

首次部署难度

-

        在大数据技术领域,一些重要的组件、框架都支持Apache Kafka,不论成成熟度、社区、性能、可靠性,Kafka都是非常有竞争力的一款产品。 

2.1 Kafka的基本配置与使用(已完成,可忽略)

        1. 将Kafka的安装包上传到集群,并解压

cd /export/software/
tar -xvzf kafka_2.12-2.4.1.tgz -C ../server/
cd /export/server/kafka_2.12-2.4.1/

         2. 修改 server.properties

cd /export/server/kafka_2.12-2.4.1/config
vim server.properties
# 指定broker的id
broker.id=0
# 指定Kafka数据的位置
log.dirs=/export/server/kafka_2.12-2.4.1/data
# 配置zk的三个节点
zookeeper.connect=node1.sjxy.cn:2181,node2.sjxy.cn:2181,node3.sjxy.cn:2181

        3. 将安装好的kafka复制到另外两台服务器

cd /export/server
scp -r kafka_2.12-2.4.1/ node2:$PWD
scp -r kafka_2.12-2.4.1/ node3:$PWD

修改另外两个节点的broker.id分别为1和2
---------node2.sjxy.cn--------------
cd /export/server/kafka_2.12-2.4.1/config
vim server.properties
broker.id=1

--------node3.sjxy.cn--------------
cd /export/server/kafka_2.12-2.4.1/config
vim server.properties
broker.id=2

        另外,kafka数据位置目录里面有个meta文件也需要对应修改broker.id(下面是node2的) 

vim /export/server/kafka/data/meta.properties


cluster.id=xgr8_xgpTiG8GUZZzUUt5Q
version=0
broker.id=1

        4. 配置KAFKA_HOME环境变量

vim /etc/profile
export KAFKA_HOME=/export/server/kafka_2.12-2.4.1
export PATH=:$PATH:${KAFKA_HOME}

分发到各个节点
scp /etc/profile node2:$PWD
scp /etc/profile node3:$PWD
每个节点加载环境变量
source /etc/profile

        5. 启动服务器

# 启动ZooKeeper
nohup bin/zookeeper-server-start.sh config/zookeeper.properties &
# 启动Kafka
cd /export/server/kafka_2.12-2.4.1
nohup bin/kafka-server-start.sh config/server.properties &
# 测试Kafka集群是否启动成功
bin/kafka-topics.sh --bootstrap-server node1.sjxy.cn:9092 --list

2.2 Kafka本课程中使用步骤

Lecture 8 Flink流处理-Kafka简介与基本使用(Appendix Ⅰ),大数据实时计算,flink,大数据

2.2.1 启动Kafka集群        

        在/export/onekey中有一键启停Kafka集群的脚本

./start-kafka.sh
./stop-kafka.sh

2.2.2 创建topic

        Kafka中所有的消息都是保存在主题中,要生产消息到Kafka,首先必须要有一个确定的主题。

# 创建名为test的主题
bin/kafka-topics.sh --create --bootstrap-server node1.sjxy.cn:9092 --topic test
# 查看目前Kafka中的主题
bin/kafka-topics.sh --list --bootstrap-server node1.sjxy.cn:9092

2.2.3 生产消息到Kafka

        使用Kafka内置的测试程序,生产一些消息到Kafka的test主题中

bin/kafka-console-producer.sh --broker-list node1.sjxy.cn:9092 --topic test

2.2.4 从Kafka消费消息

        使用下面的命令来消费 test 主题中的消息

bin/kafka-console-consumer.sh --bootstrap-server node1.sjxy.cn:9092 --topic test --from-beginning

2.3 使用Kafka Tools操作Kafka

        配置见下面

Lecture 8 Flink流处理-Kafka简介与基本使用(Appendix Ⅰ),大数据实时计算,flink,大数据

        创建topic

Lecture 8 Flink流处理-Kafka简介与基本使用(Appendix Ⅰ),大数据实时计算,flink,大数据

        编写代码并运行

package com.sjxy.flink.stream.source.customsource

import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.kafka.clients.consumer.ConsumerConfig

/*
验证flinkkafkaconsumer如何消费kafka中的数据
 */
object TestFlinkKafkaConsumer {
  def main(args: Array[String]): Unit = {
    //1 创建一个流处理的运行环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    // 2 添加自定义的数据源 ,泛型限定了从kafka读取数据的类型
    //2.1 构建properties对象
    val prop = new Properties()
    //kafka 集群地址
    prop.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092,node2:9092")
    //消费者组
    prop.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "flink")
    //动态分区检测
    prop.setProperty("flink.partition-discovery.interval-millis", "5000")
    //设置kv的反序列化使用的类
    prop.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    prop.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    //设置默认消费的偏移量起始值
    prop.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest") //从最新处消费
    //定义topic
    val topic = "test"

    //获得了kafkaconsumer对象
    val flinkKafkaConsumer: FlinkKafkaConsumer[String] = new FlinkKafkaConsumer[String](topic, new SimpleStringSchema(), prop)
    val kafkaDs: DataStream[String] = env.addSource(flinkKafkaConsumer)
    //3 打印数据
    kafkaDs.print()
    //4 启动
    env.execute()
  }
}

        在集群中使用下面命令(生产消息)

/export/server/kafka/bin/kafka-console-producer.sh --broker-list node1:9092 --topic test

        产生一些数据

Lecture 8 Flink流处理-Kafka简介与基本使用(Appendix Ⅰ),大数据实时计算,flink,大数据         观察IDEA中控制台输出

Lecture 8 Flink流处理-Kafka简介与基本使用(Appendix Ⅰ),大数据实时计算,flink,大数据

        数据在源源不断过来表示成功了。文章来源地址https://www.toymoban.com/news/detail-856115.html

到了这里,关于Lecture 8 Flink流处理-Kafka简介与基本使用(Appendix Ⅰ)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 大数据职业技能大赛样题(数据采集与实时计算:使用Flink处理Kafka中的数据)

           编写Scala代码,使用Flink消费Kafka中Topic为order的数据并进行相应的数据统计计算(订单信息对应表结构order_info,订单详细信息对应表结构order_detail(来源类型和来源编号这两个字段不考虑,所以在实时数据中不会出现),同时计算中使用order_info或order_detail表中create_ti

    2024年03月24日
    浏览(53)
  • Flink(七)Flink四大基石之Time和WaterMaker详解与详细示例(watermaker基本使用、kafka作为数据源的watermaker使用示例以及超出最大允许延迟数据的接收实现)

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年02月15日
    浏览(49)
  • Flink的简介以及基本概念

    有界流和无界流 有状态的流处理 2.1集群角色 2.2 部署模式 会话模式(Session Mode) 单作业模式(Per-Job Mode) 应用模式(Application Mode) 3.1  系统架构 1 )作业管理器(JobManager) JobManager是一个Flink集群中任务管理和调度的核心,是控制应用执行的主进程。也就是说,每个应用

    2024年04月09日
    浏览(61)
  • 7、Flink四大基石之Time和WaterMaker详解与详细示例(watermaker基本使用、kafka作为数据源的watermaker使用示例以及超出最大允许延迟数据的接收实现)

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年02月14日
    浏览(47)
  • 数据流处理框架Flink与Kafka

    在大数据时代,数据流处理技术已经成为了一种重要的技术手段,用于处理和分析大量实时数据。Apache Flink和Apache Kafka是两个非常重要的开源项目,它们在数据流处理领域具有广泛的应用。本文将深入探讨Flink和Kafka的关系以及它们在数据流处理中的应用,并提供一些最佳实践

    2024年04月23日
    浏览(41)
  • Flink 处理函数(1)—— 基本处理函数

    在 Flink 的多层 API中,处理函数是最底层的API,是所有转换算子的一个概括性的表达,可以 自定义处理逻辑 在处理函数中,我们直面的就是数据流中最基本的元素: 数据事件(event)、状态(state)以及时间(time) 。这就相当于 对流有了完全的控制权 基本处理函数主要是定

    2024年01月18日
    浏览(42)
  • 【Flink-Kafka-To-Hive】使用 Flink 实现 Kafka 数据写入 Hive

    需求描述: 1、数据从 Kafka 写入 Hive。 2、相关配置存放于 Mysql 中,通过 Mysql 进行动态读取。 3、此案例中的 Kafka 是进行了 Kerberos 安全认证的,如果不需要自行修改。 4、Flink 集成 Kafka 写入 Hive 需要进行 checkpoint 才能落盘至 HDFS。 5、先在 Hive 中创建表然后动态获取 Hive 的表

    2024年02月03日
    浏览(57)
  • 【Flink-Kafka-To-ClickHouse】使用 Flink 实现 Kafka 数据写入 ClickHouse

    需求描述: 1、数据从 Kafka 写入 ClickHouse。 2、相关配置存放于 Mysql 中,通过 Mysql 进行动态读取。 3、此案例中的 Kafka 是进行了 Kerberos 安全认证的,如果不需要自行修改。 4、先在 ClickHouse 中创建表然后动态获取 ClickHouse 的表结构。 5、Kafka 数据为 Json 格式,通过 FlatMap 扁平

    2024年02月03日
    浏览(47)
  • 【Flink-Kafka-To-RocketMQ】使用 Flink 自定义 Sink 消费 Kafka 数据写入 RocketMQ

    这里的 maven 依赖比较冗余,推荐大家都加上,后面陆续优化。 注意: 1、此程序中所有的相关配置都是通过 Mysql 读取的(生产环境中没有直接写死的,都是通过配置文件动态配置),大家实际测试过程中可以将相关配置信息写死。 2、此程序中 Kafka 涉及到了 Kerberos 认证操作

    2024年02月03日
    浏览(51)
  • Flink使用 KafkaSource消费 Kafka中的数据

    目前,很多 flink相关的书籍和网上的文章讲解如何对接 kafka时都是使用的 FlinkKafkaConsumer,如下: 新版的 flink,比如 1.14.3已经将 FlinkKafkaConsumer标记为 deprecated(不推荐),如下: 新版本的 flink应该使用 KafkaSource来消费 kafka中的数据,详细代码如下: 开发者在工作中应该尽量避

    2024年02月15日
    浏览(37)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包