【Kafka】Ubuntu 部署kafka中间件,实现Django生产和消费

这篇具有很好参考价值的文章主要介绍了【Kafka】Ubuntu 部署kafka中间件,实现Django生产和消费。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

原文作者:我辈李想
版权声明:文章原创,转载时请务必加上原文超链接、作者信息和本声明。


前言

队列是一种特殊的线性表,特殊之处在于它只允许在表的前端(front)进行删除操作,而在表的后端(rear)进行插入操作,和栈一样,队列是一种操作受限制的线性表。进行插入操作的端称为队尾,进行删除操作的端称为队头。

消息队列是一种中间件 ,用于在不同的组件或系统之间传递消息(进程间通讯的一种)。 它提供了一种可靠的机制(AMQP)来存储和传递消息,并确保消息的顺序性和可靠性。消息队列需要存储消息。

Apache Kafka是一个开源消息系统,由Scala写成。是由Apache软件基金会开发的一个开源消息系统项目。Kafka是一个分布式消息队列:生产者、消费者的功能。Kafka对消息保存时根据Topic进行归类,发送消息者称为Producer,消息接受者称为Consumer,此外kafka集群有多个kafka实例组成,每个实例(server)成为broker。

一、Kafka安装

1.下载并安装Java

Kafka 是基于 Java 开发的,因此需要先安装 Java 环境。如果你已经安装了 Java 环境,可以跳过这一步。

在命令行中输入以下命令:

sudo apt-get update
# linux命令行下,安装jdk
sudo apt-get install openjdk-8-jdk
# 查看安装结果
java -version

2.下载和解压 Kafka

下载 Kafka 压缩包,并解压到 /opt 目录下。

cd ~
sudo wget https://archive.apache.org/dist/kafka/3.4.0/kafka_2.13-3.4.0.tgz
sudo tar -xzf kafka_2.13-3.4.0.tgz
cd /opt
sudo mv cd ~/kafka_2.13-3.4.0 kafka

3.配置 Kafka

接下来需要更改 Kafka 的配置文件。打开 config/server.properties 文件,并进行以下更改:

sudo vim /opt/kafka/config/server.properties

advertised.listeners=PLAINTEXT://<your-server-IP-address>:9092
listeners=PLAINTEXT://0.0.0.0:9092

确保将 <your-server-IP-address> 替换为实际的服务器 IP 地址。(ifconfig查看本机ip)

4.启动 Kafka

Kafka 启动有两种方式:单机模式和分布式模式。

  • 单机模式

在单机模式下,Kafka 只有一个 broker。在命令行中输入以下命令:

cd /opt/kafka
sudo /opt/kafka/bin/zookeeper-server-start.sh  /opt/kafka/config/zookeeper.properties  # 在一个窗口,或后台运行
sudo /opt/kafka/bin/kafka-server-start.sh  /opt/kafka/config/server.properties  # 另起一个窗口,或后台运行
  • 分布式模式

在分布式模式中,Kafka 包含多个 broker。在命令行中输入以下命令:

首先,编辑 config/server.properties 文件,设置以下属性:

broker.id=0 # 设置当前 broker 的 id,不能重复
listeners=PLAINTEXT://your.server.ip.address:9092 # 设置监听地址和端口
log.dirs=/tmp/kafka-logs # 设置日志目录

复制 broker.id=0 的行,修改 broker.id 的值,设置多个 broker 的 id。

接下来,启动 ZooKeeper:

cd /opt/kafka
sudo /opt/kafka/bin/zookeeper-server-start.sh config/zookeeper.properties

最后,每个 broker 启动 Kafka:

cd /opt/kafka
sudo /opt/kafka/bin/kafka-server-start.sh config/server.properties

5.创建主题和生产者/消费者

可以使用 Kafka 自带的命令行工具创建主题、发送消息和消费消息。

  • 创建主题
cd /opt/kafka
sudo /opt/kafka/bin/kafka-topics.sh --create --bootstrap-server your.server.ip.address:9092 --replication-factor 1 --partitions 1 --topic test-topic
  • 查看
sudo /opt/kafka/bin/kafka-topics.sh --list --bootstrap-server localhost:9092
  • 删除
sudo /opt/kafka/bin/kafka-topics.sh --delete --bootstrap-server localhost:9092 --topic test-topic

6.发布和订阅消息

现在已经成功地部署了 Kafka,并创建了一个 topic。可以使用以下命令在 topic 中发布和订阅消息:

发布消息:# 另起一个窗口

sudo /opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test-topic 

订阅消息:# 另起一个窗口

sudo /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic --from-beginning

以上是在 Ubuntu 上部署 Kafka 的基本步骤,你可以根据实际情况进行修改。

二、Kafka+Django生产和消费

confluent-kafka 和kafka-python是python中处理kafka的三方包,这里以confluent-kafka为例。

pip install confluent-kafka

1.Django配置文件

在settings.py中加入配置

KAFKA_SETTINGS = {
    'bootstrap.servers': 'localhost:9092', # localhost替换为kafka服务的ip
    'group.id': 'mygroup',
    'auto.offset.reset': 'earliest',
}

2.通过django命令实现消费

  1. kafka消费处理kafka_consumer.py文件
from confluent_kafka import Consumer, KafkaError
from django.conf import settings

def kafka_handler():
    c = Consumer(settings.KAFKA_SETTINGS)
    c.subscribe(['test-topic'])

    while True:
        msg = c.poll(1.0)
		print(111,msg)
        if msg is None:
            continue
        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                print('End of partition reached')
            else:
                print('Error: {}'.format(msg.error()))
        else:
            print('Received message: {}'.format(msg.value()))

  1. 使用django的startapp新增kafka对app

     cd ptoject  #  manage.py的同级目录
     django-admin startapp kafka
     python manage.py startapp kafka
    
  2. 在django配置文件settings中添加kafka至INSTALLED_APPS

     INSTALLED_APPS = [
         'django.contrib.admin',
         'django.contrib.auth',
         'django.contrib.contenttypes',
         'django.contrib.sessions',
         'django.contrib.messages',
         'django.contrib.staticfiles',
         'kafka'
     ]
    
  3. 自定义django命令
    可以参看这个链接:https://www.osgeo.cn/django/howto/custom-management-commands.html

    在kafka下新建 management/commands二级文件夹
    【Kafka】Ubuntu 部署kafka中间件,实现Django生产和消费,Python开始入门,中间件,ubuntu,kafka

其中kafka_consumer是第一步创建的消费处理程序,my_消费是我们自定义的django命令程序。my_消费.py的内容如下:

# -*- coding:utf-8 _*-
"""
@author:Administrator
@file: my_消费.py
@time: 2023/7/16 0016  12:32
"""
from django.core.management.base import BaseCommand, CommandError

from kafka_consumer import kafka_handler


class Command(BaseCommand):
    help = 'Closes the specified poll for voting'

    def handle(self, *args, **options):
        kafka_handler()
  1. 启动消费命令

切换至虚拟环境,目录切换至django项目目录,及mnange.py的同级目录。

	python manage.py my_消费

【Kafka】Ubuntu 部署kafka中间件,实现Django生产和消费,Python开始入门,中间件,ubuntu,kafka

【Kafka】Ubuntu 部署kafka中间件,实现Django生产和消费,Python开始入门,中间件,ubuntu,kafka

3.通过Django生产

【Kafka】Ubuntu 部署kafka中间件,实现Django生产和消费,Python开始入门,中间件,ubuntu,kafka

  1. 创建kafka_producer.py生产文件
# -*- coding:utf-8 _*-
"""
@author:Administrator
@file: kafka_producer.py
@time: 2023/7/16 0016  12:28
"""
from confluent_kafka import Producer
from django.conf import settings


def send_message(message):
    p = Producer({'bootstrap.servers': settings.KAFKA_SETTINGS.get('bootstrap.servers')})
    topic = 'test-topic'
    p.produce(topic, message.encode('utf-8'))
    p.flush()


if __name__ == '__main__':
    send_message('测试')
  1. 通过django命令生产消息
    这里还是用的自定义命令,自定义文件为my_消费.py
# -*- coding:utf-8 _*-
"""
@author:Administrator
@file: my_消费.py
@time: 2023/7/16 0016  12:32
"""
from django.core.management.base import BaseCommand, CommandError

from kafka_producer import send_message


class Command(BaseCommand):
    help = 'Closes the specified poll for voting'

    def handle(self, *args, **options):
        send_message('111')

【Kafka】Ubuntu 部署kafka中间件,实现Django生产和消费,Python开始入门,中间件,ubuntu,kafka
启动方式与消费一样,python manage.py my_生产

  1. 通过django程序生产消息

kafka_producer文件中的send_message方法可以在程序中被调用,我们在处理用户请求时,可以通过异步的方式处理send_message。文章来源地址https://www.toymoban.com/news/detail-573315.html

到了这里,关于【Kafka】Ubuntu 部署kafka中间件,实现Django生产和消费的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 中间件 kafka

    Kafka(Apache Kafka)是一个非常流行的开源分布式流数据平台。它最初由LinkedIn开发,后来捐赠给了Apache基金会,并成为顶级项目。Kafka被设计用于处理实时数据流,具有高吞吐量、可扩展性和持久性。 Kafka 的主要特点和用途包括: 发布-订阅模型: Kafka 提供了一种发布-订阅(

    2024年02月13日
    浏览(38)
  • Kafka消息中间件(Kafka与MQTT区别)

    Kafka是一个分布式流处理平台,它可以快速地处理大量的数据流。Kafka的核心原理是基于 发布/订阅 模式的消息队列。Kafka允许多个生产者将数据写入主题(topic)中,同时也允许多个消费者从主题中读取数据。 Kafka重要原理 Kafka的设计原则之一是高可用性和可扩展性,因此它

    2024年02月03日
    浏览(34)
  • 消息中间件(二)——kafka

    在大数据中,会使用到大量的数据。面对这些海量的数据,我们一是需要做到能够 收集 这些数据,其次是要能够 分析和处理 这些海量数据。在此过程中,需要一套消息系统。 Kafka专门为分 布式高吞吐量 系统设计。作为一个消息代理的替代品,Kafka往往做的比其他消息中间

    2024年02月07日
    浏览(47)
  • 大数据中间件——Kafka

    Kafka安装配置 首先我们把kafka的安装包上传到虚拟机中: 解压到对应的目录并修改对应的文件名: 首先我们来到kafka的config目录,我们第一个要修改的文件就是server.properties文件,修改内容如下: 主要修改三个部分,一个是唯一标识id,kafka的文件存储路径,一个是zookeeper的节

    2024年02月07日
    浏览(33)
  • 中间件(三)- Kafka(二)

    6.1 Kafka的高效读写 顺序写磁盘 Kafka的producer生产数据,需要写入到log文件中,写的过程是追加到文件末端,顺序写的方式,官网有数据表明,同样的磁盘,顺序写能够到600M/s,而随机写只有200K/s,这与磁盘的机械结构有关,顺序写之所以快,是因为其省去了大量磁头寻址的时

    2024年02月07日
    浏览(27)
  • 消息中间件 —— 初识Kafka

    1.1.1、为什么要有消息队列? 1.1.2、消息队列 消息 Message 网络中的两台计算机或者两个通讯设备之间传递的数据。例如说:文本、音乐、视频等内容。 队列 Queue 一种特殊的线性表(数据元素首尾相接),特殊之处在于只允许在首部删除元素和在尾部追加元素(FIFO)。 入队、出

    2024年02月13日
    浏览(42)
  • 消息中间件之Kafka(二)

    1.1 为什么要对topic下数据进行分区存储? 1.commit log文件会受到所在机器的文件系统大小的限制,分区之后可以将不同的分区放在不同的机器上, 相当于对数据做了分布式存储,理论上一个topic可以处理任意数量的数据 2.提高并行度 1.2 如何在多个partition中保证顺序消费? 方案一

    2024年01月21日
    浏览(38)
  • 消息中间件之Kafka(一)

    高性能的消息中间件,在大数据的业务场景下性能比较好,kafka本身不维护消息位点,而是交由Consumer来维护,消息可以重复消费,并且内部使用了零拷贝技术,性能比较好 Broker持久化消息时采用了MMAP的技术,Consumer拉取消息时使用的sendfile技术 Kafka是最初由Linkedin公司开发,

    2024年01月20日
    浏览(39)
  • 【Java面试丨消息中间件】Kafka

    1. 介绍 使用kafka在消息的收发过程都有可能会出现消息丢失 (1)生产者发送消息到broker丢失 (2)消息在broker中存储丢失 (3)消费者从broker接收消息丢失 2. 生产者发送消息到broker丢失 设置异步发送:同步发送会发生阻塞,一般使用异步发送方式发送消息 消息重试:由于网

    2024年02月11日
    浏览(34)
  • 常用中间件redis,kafka及其测试方法

    一、中间件的使用场景 引入中间件的目的一般有两个: 1、提升性能 产品架构中的性能设计: 常用的中间件: 1) 高速缓存:redis 基于内存,所以比mysql块(存在磁盘io) 为什么查询速度快? 单进程+IO多路复用去提高性能 基于内存 做缓存,极大缓解了数据库压力 非常适合

    2024年04月11日
    浏览(38)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包