基于docker的confluent-kafka搭建及python接口使用

这篇具有很好参考价值的文章主要介绍了基于docker的confluent-kafka搭建及python接口使用。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

本文介绍基于docker搭建的confluent-kafka及其python接口的使用。

本文只搭建了一个单Broker的confluent-kafka测试环境,考虑到高可用、高吞吐等因素,正式生产环境一般至少要3个节点。

本文采用的系统配置如下:

  • LinuxMint 20.3 (兼容 Ununtu 20.04)
  • docker 20.10.21
  • docker-compose 2.14.2
  • python 3.9.16
  • confluent-kafka(python包) 2.1.1

1. 安装docker以及docker-compose

1.1 安装docker

docker-compose依赖于docker,因此需要先安装docker。

curl -fsSL https://test.docker.com -o test-docker.sh
sudo sh test-docker.sh

1.2 安装docker-compose

Compose 是用于定义和运行多容器 Docker 应用程序的工具。通过 Compose,您可以使用 YML 文件来配置应用程序需要的所有服务。然后,使用一个命令,就可以从 YML 文件配置中创建并启动所有服务。

curl -L https://get.daocloud.io/docker/compose/releases/download/v2.14.2/docker-compose-`uname -s`-`uname -m` > /usr/local/bin/docker-compose

要安装其他版本的 Compose,请替换 v2.14.2。

2. 安装confluent-kafka

新建文件并创建docker-compose.yml文件:

version: '3'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.0.1
    container_name: zookeeper
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-kafka:7.0.1
    container_name: broker
    ports:
    # To learn about configuring Kafka for access across networks see
    # https://www.confluent.io/blog/kafka-client-cannot-connect-to-broker-on-aws-on-docker-etc/
      - "9092:9092"
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://broker:29092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1

注意 这里搭建的是本地环境,如果需要从网络中的另一位置访问kafka,需要将KAFKA_ADVERTISED_LISTENERS中的localhost换成kafka所在主机的真实地址/域名。

进入该文件夹并运行:

docker-compose -f docker-compose.yml up -d

运行后在docker中看到类似结果说明启动成功:

aa@bb:~/docker_scripts$ docker ps
CONTAINER ID   IMAGE                             COMMAND                  CREATED         STATUS         PORTS                                       NAMES
e6fbc05d61b1   confluentinc/cp-kafka:7.0.1       "/etc/confluent/dock…"   7 minutes ago   Up 7 minutes   0.0.0.0:9092->9092/tcp, :::9092->9092/tcp   broker
58b04385f2bf   confluentinc/cp-zookeeper:7.0.1   "/etc/confluent/dock…"   7 minutes ago   Up 7 minutes   2181/tcp, 2888/tcp, 3888/tcp                zookeeper

这里kafka端口为9092。

关闭容器服务:

docker-compose -f docker-compose.yml down

3. python接口使用

3.1 安装依赖包

安装依赖包:

pip3 install confluent-kafka

3.2 创建、查看topic

进入kafka镜像:

docker exec -ti broker bash

查看topic:

[aa@bb ~]$ /bin/kafka-topics --list --bootstrap-server 127.0.0.1:9092

新建名为test001的topic:

[aa@bb ~]$ /bin/kafka-topics --create --bootstrap-server 127.0.0.1:9092 --topic test001 --partitions 2
Created topic test001.

查看topic:

[aa@bb ~]$ /bin/kafka-topics --list --bootstrap-server 127.0.0.1:9092
test001

通过Ctrl + P + Q回到终端。

3.3 python接口-broker

创建producer代码producer1.py

import socket

from confluent_kafka import Producer

conf = {
    'bootstrap.servers': "localhost:9092",
    'client.id': socket.gethostname()
}

producer = Producer(conf)


def __publish_delivery_report(err, msg) -> None:
    if err is not None:
        print(f"send msg:{msg} fail, err is not None")
    else:
        print(f"send msg{msg} success")


def send_msg(topic: str, data):
    producer.produce(topic, data, callback=__publish_delivery_report)
    producer.flush()


if __name__ == '__main__':
    msg = "hello kafka"
    topic = "test001"
    send_msg(topic, msg)

运行结果:

aa@bb:~/codes/kafka_test$ python3 producer1.py
send msg<cimpl.Message object at 0x7f8d6fe6acc0> success

3.4 python接口-consumer

创建consumer代码consumer1.py

from confluent_kafka import Consumer
 

class KafkaConsumer:
    def __init__(self, brokers, group):
        config = dict()
        config['bootstrap.servers'] = brokers
        config['group.id'] = group
        config['auto.offset.reset'] = 'earliest'
        self.consumer = Consumer(config)
 
    def subscribe(self, topics):
        self.consumer.subscribe(topics=topics)
 
    def pull(self):
        while True:
            msg = self.consumer.poll(1.0)
            if msg is None:
                continue
            if msg.error():
                print("Consumer error: {}".format(msg.error()))
                continue
            print('Received message: {}'.format(msg.value().decode('utf-8')))
 
    def close(self):
        self.consumer.close()
 
 
if __name__ == "__main__":
    consumer = KafkaConsumer("127.0.0.1:9092", "test_group1")
    consumer.subscribe(['test001'])
    consumer.pull()
    consumer.close()

运行结果:文章来源地址https://www.toymoban.com/news/detail-741444.html

aa@bb:~/codes/kafka_test$ python3 consumer1.py
Received message: hello kafka

参考链接

  1. Hello Kafka(八)——Confluent Kafka简介
  2. Docker Compose | 菜鸟教程
  3. confluent_kafka生产者 - luckygxf - 博客园
  4. Hello Kafka(十二)——Python客户端_kafka python客户端_天山老妖的博客-CSDN博客

到了这里,关于基于docker的confluent-kafka搭建及python接口使用的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 基于Jenkins+Python+Ubuntu+Docker的接口/UI自动化测试环境部署详细过程

    学习官网:Jenkins官网,Jenkins中文官网; Jenkins 是一款开源 CICD 软件,用于自动化各种任务,包括构建、测试和部署软件; 用 Java 语言编写的,可在 Tomcat 、 Docker 等流行的容器中运行,也可独立运行。 通俗的讲,比如把编译、打包、上传、部署到Tomcat中的过程交由Jenkins,

    2024年02月13日
    浏览(41)
  • docker搭建kafka

    注意:云服务器需要设置安全策略放行2181与9092端口,否则访问失败

    2024年02月06日
    浏览(30)
  • docker快速搭建kafka集群

    一、准备工作 1、拉取kafka镜像 2、拉取kafka可视化管理工具镜像 3、安装docker-compose工具 4、创建相关文件夹 5、创建网络,用于kafka和zookeeper共享一个网络段 6、构建zookeeper集群 kafka集群需要用到zookeeper集群,因此需要先构建zookeeper集群,请查看文章 docker快速搭建zookeeper集群

    2024年02月13日
    浏览(28)
  • Docker 搭建 zookeeper、kafka 集群

    首先创建一个自定义网络,后续的所有容器都放入同一个内网中,容器之间还可以通过容器名称进行直接访问,在后续的配置中只需要写明容器名称即可,会自动找到对应的IP地址,防止重启容器后IP地址发生变化时,还要去修改配置文件的操作 创建目录 启动zookeeper 进入zo

    2024年02月10日
    浏览(29)
  • 『Kafka』在Docker中快速部署Kafka及其管理平台搭建

    📣读完这篇文章里你能收获到 在Docker中快速部署Kafka 在Docker中快速部署Zookeeper 搭建Kafka管理平台 Kafka部署测试 感谢点赞+收藏,避免下次找不到~ 参数说明: -e KAFKA_BROKER_ID=0 在kafka集群中,每个kafka都有一个BROKER_ID来区分自己 -e KAFKA_ZOOKEEPER_CONNECT=172.16.0.13:2181/kafka 配置zookeep

    2024年02月16日
    浏览(30)
  • docker搭建kafka集群并测试完整版

    1.安装docker desktop. 打开docker官网,下载docker desktop,这里直接给出网址:Install Docker Desktop on Windows | Docker Docs 如下图,点击下载即可。 下载好后 点击运行exe文件,我们采用交互式安装程序。 安装完成后直接重启即可,默认安装在c盘,如果不想安装在c盘就采用命令行的方式安装

    2024年02月02日
    浏览(29)
  • 【Docker】手把手教你使用Docker搭建kafka【详细教程】

    目录 前提条件 1.安装Zookeeper 1.1运行ZooKeeper容器 2.运行Kafka容器 2.1启动Kafka容器 3.验证 3.1进入Kafka容器 3.2查看容器状态 3.3查看容器日志 3.4重新启动容器 3.5创建测试主题 1. 安装Docker: 确保你已经在你的Windows机器上安装了Docker。你可以从Docker官方网站下载并安装Docker Desktop。

    2024年02月04日
    浏览(52)
  • Ubuntu18.04 docker kafka 本地测试环境搭建

    Kafka是一种分布式流处理平台,也是一个高吞吐量的分布式发布订阅消息系统。它由LinkedIn开发,并于2011年成为Apache软件基金会的顶级项目。 Kafka的设计目标是能够处理大规模的消息流,并提供持久性、高吞吐量和低延迟的特性。它的核心概念是发布-订阅模型,其中消息被组

    2024年02月15日
    浏览(52)
  • docker搭建Elk+Kafka+Filebeat分布式日志收集系统

    目录 一、介绍 二、集群环境 三、ES集群 四、Kibana  五、Logstash 六、Zookeeper 七、Kafka 八、Filebeat 八、Nginx (一)架构图  (二)组件介绍 1.Elasticsearch 是一个基于Lucene的搜索服务器。提供搜集、分析、存储数据三大功能。它提供了一个分布式多用户能力的全文搜索引擎,基于

    2024年02月04日
    浏览(40)
  • 基于 kRaft 搭建单机 kafka 测试环境

    使用 docker-compose 在单机搭建有三个节点的 kafka 集群。

    2024年02月14日
    浏览(31)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包