Redis发布订阅机制学习|kafka相关经验

这篇具有很好参考价值的文章主要介绍了Redis发布订阅机制学习|kafka相关经验。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Redis
键值数据库 key value
NoSql

第一章: 差异
#1 structured 结构化
约束 primary unique unsigned
#2 relational 关联的
#3 SQL查询
例 select id, name, age from tb_user where id=1
redis get user:1
mongoDB db.users.find({_id: 1})
elasticsearch GET http://localhost:9200/users/1
#4 事务 ACID 基本一致 无事务
键值类型 Redis 文档类型 MongoDB 列类型 HBase Graph类型 Neo4j
存储方式 磁盘 内存
扩展性 垂直 水平
Remote Dictionary Server 远程词典服务器 基于内存的键值型NoSQL数据库
特征:
键值key-value型 value支持多种不同数据结构,功能丰富
单线程,每个命令具备原子性 低延迟 速度快 基于内存,IO多路复用
-支持数据持久化
-支持主从集群,分片集群
-支持多语言客户端

安装Redis依赖 redius是基于C语言编写 需要安装Redis所需要的gcc依赖:
yum install -y gcc tcl
解压: tar -zxvf redis-6.2.6.tar.gz
进入redis目录: cd redis-6.2.6
运行编辑命令: make && make install 编译 安装
内容: redis-cli 命令行客户端 redis-server
任意目录 redis-server 端口 6379
–默认启动方式
–指定配置启动 流程: 配置文件备份 cp redis.conf redis.conf.bck
修改redis.conf文件配置
bind 0.0.0.0 任意ip地址可以访问
#守护进程,修改为yes后即可后台运行 daemonize yes
#密码,设置后访问redis必须输入密码 requirepass xxxxx
其他常见配置 #监听端口 port
#工作目录 dir
#数据库数量 databases 默认有16个库,编号0-15
#能够使用的最大内存 maxmemory 512mb
#日志文件 默认为空 不记录日志 logfile “redis.log”
启动 redis-server redis.conf
—开机自启

redis客户端
1 命令行客户端
使用方式: redis-cli [options][commonds]
常见options -h IP地址 -p 端口 -a 访问密码
操作命令commonds ping 与redis服务端做心跳测试,服务端正常会返回pong
set name xxx
get name
SELECT 0 0号库

2 图形化客户端

3 编程客户端

redis常见命令
数据结构介绍 key-value
key一般是string类型
value多类型 string hash list set sortedset geo bitmap hyperlog
redis官网 commands 分组
命令行 help help @

redis通用命令 keys组下 @generic
KEYS: 查看符合模板的所有key(通配符) 例 KEYS * KEYS a* 以a开头
DEL: 删除一个指定的KEY
MSET: 批量插入
EXISTS: 判断key是否存在
EXPIRE: 给key设置有效期 有效期到期key被自动删除 EXPIRE key seconds
TTL: 查看KEY的剩余有效期

String类型 底层都是字节数组形式存储,编码方式不同。
SET GET MSET MGET INCR INCRBY 自增,自增指定步长 SETNX (前提key不存在 否则不执行)
Redis的key允许多个单词形成层级结构,多个单词用’:'隔开,
例: 项目名:业务名:类型:id

hash类型 散列 value是无序字典,类似java中的hashmap结构
常见命令 前缀加H field

List类型 双向链表结构 支持正向检索 反向检索

java客户端
Jedis

  1. 引入依赖

    redis.clients
    jedis
    3.7.0
  2. 建立连接
    private Jedis jedis;
    @BeforeEach
    void setUp(){
    //建立连接
    jedis = new Jedis(“xxx”, xxx);
    jedis.auth(“password”); //密码
    jedis.select(0);
    }

Jedis本身线程不安全 频繁创建和销毁会有性能损耗 推荐使用连接池
JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
jedisPoolConfig.setMaxTotal(); //最大连接
jedisPoolConfig.setMaxIdle(); //最大空闲连接
jedisPoolConfig.setMaxIdle(); //最小空闲连接
jedisPoolConfig.setMaxWaitMillis(); //设置最长等待时间
jedisPool = new JedisPool(jedisPoolConfig, xxx, xxx, timeout, password)

public static Jedis getJedis(){
return jedisPool.getResource();
}

Redis 发布订阅机制
简介:
Redis 发布订阅(Pus/Sub)是一种消息通信模式:发送者通过 PUBLISH发布消息,订阅者通过 SUBSCRIBE 订阅接收消息或通过UNSUBSCRIBE 取消订阅。主要由「发布者」、「订阅者」、「Channel」三个部分组成。
发布者和订阅者属于客户端,Channel 是 Redis 服务端,发布者将消息发布到频道,订阅这个频道的订阅者则收到消息。

1 基于频道的发布订阅
//在redisServer中,有一个字典类型字段pubsub_channels 用来保存订阅信息,其中key为频道,value为订阅该频道的客户端
struct redisServer{
pid_t pid;
//将频道映射到已订阅客户端的列表
dict *pubsub_channels
}

2 基于模式的发布订阅
//在redisServer中有一个pubsub_patterns属性,该属性表示一个链表,链表中保存着所有和模式相关的信息
struct redisServer{
list *pubsub_patterns;
}
typedef struct pubsubPattern{
client *client; – 订阅模式客户端
robj *pattern; --被订阅的模式
} pubsubPattern;

需要注意的是,发布消息与监听消息要运行在不同的 JVM,如果使用同一个 redissonClient 发布的话,不会监听到自己的消息。

缺陷:
发布者不知道订阅者是否收到发布的消息
订阅者不知道自己是否收到了发布者发出的所有消息
发送者不能获知订阅者的执行情况
没人知道订阅者何时开始收到消息

实现
生产者代码

 * 发布消息到 Topic
 * @param message 消息
 * @return 接收消息的客户端数量
public long sendMessage(String message) {
    RTopic topic = redissonClient.getTopic(CHANNEL);
    long publish = topic.publish(message);
    log.info("生产者发送消息成功,msg = {}", message);
    return publish;
}

消费者代码

public void onMessage() {
  // in other thread or JVM
  RTopic topic = redissonClient.getTopic(CHANNEL);
  topic.addListener(String.class, (channel, msg) -> {
    log.info("channel: {} 收到消息 {}.",  channel, msg);
  });
}

Spring boot整合redis

消息监听配置
@Configuration
public class RedisSubConfig {
    public static final String SUB_KEY = "chat";//频道channel
     * redis消息监听器容器
     * 可以添加多个监听不同话题的redis监听器,只需要把消息监听器和相应的消息订阅处理器绑定,该消息监听器
     * 通过反射技术调用消息订阅处理器的相关方法进行一些业务处理
     * @param connectionFactory
     * @param listenerAdapter
     * @return
    @Bean
    RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        //订阅了一个频道
        container.addMessageListener(listenerAdapter, new PatternTopic(RedisSubConfig.SUB_KEY));
        return container;
    }
    
   * 消息监听器适配器,绑定消息处理器,利用反射技术调用消息处理器的业务方法
    @Bean
    MessageListenerAdapter listenerAdapter(RedisReceiver receiver) {
        return new MessageListenerAdapter(receiver, "receiveMessage");
    }
    /**
     * redis 读取内容的template
     * @param connectionFactory
     * @return
     */
    @Bean
    StringRedisTemplate template(RedisConnectionFactory connectionFactory) {
        return new StringRedisTemplate(connectionFactory);
    }
}

接收消息

@Service
public class RedisReceiver {
    public void receiveMessage(String message) {
        System.out.println("接收消息:" + message);
    }
}

采用定时器发布消息

@EnableScheduling //开启定时器功能
@Component
public class MessageSender {
    @Autowired
    private StringRedisTemplate stringRedisTemplate;
    @Scheduled(fixedRate = 5000) //间隔5s 通过StringRedisTemplate对象向redis消息队列chat频道发布消息
    public void sendMessage(){
        stringRedisTemplate.convertAndSend("chat", "hello "+ new Date());
    }
}

kafka相关
消息头格式
RecordHeaders(headers = [RecordHeader(key = messageType, value = [0, 0, 0, 1]), RecordHeader(key = operationCode, value = [0, 0, 0, 1]), RecordHeader(key = messageId, value = [52, 52, 52, 53, 53, 53])], isReadOnly = false)
使用java读取消息头

private MsgHeader parseMsgHeaders(Headers headers) {
        MsgHeader msgHeader = new MsgHeader();
        Header xxxHeader = headers.lastHeader("xxx");
        if (xxxHeader != null) {
            msgHeader.setXXX(new String(xxxHeader.value()));
        }
        return msgHeader;
    }

使用go发送消息头文章来源地址https://www.toymoban.com/news/detail-606628.html

headers := []sarma.RecordHeader{
    sarama.RecordHeader{
        Key: []byte("kkk"),
        Value: []byte("vvv"),
}}
msg := &sarama.ProducerMessage{
    Topic: "topic",
    Key: sarama.StringEncoder("  "),
    Value: sarama.StringEncode("  "),
    Headers: headers,
}

到了这里,关于Redis发布订阅机制学习|kafka相关经验的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Redis发布订阅

    Redis 发布订阅(pub/sub)是一种 消息通信模式:发送者(pub)发送消息,订阅者(sub)接收消息。 Redis 客户端可以订阅任意数量的频道。 订阅/发布消息图: 下图展示了频道 channel1 , 以及订阅这个频道的三个客户端 —— client2 、 client5 和 client1 之间的关系: 当有新消息通过 PUBLISH

    2024年02月11日
    浏览(38)
  • Redis(03)——发布订阅

    基于频道 publish channel message:将信号发送到指定的频道 pubsub subcommand [argument [argyment]]:查看订阅或发布系统状态 subscribe channel [channel]:订阅一个或多个频道的信息 unsubscribe [channel [channel]]:退订指定的频道,若没有指定频道,则默认退订所有频道 基于模式 psubcribe pattern [pa

    2024年02月20日
    浏览(36)
  • redis 发布和订阅

    目录 一、简介  二、常用命令 三、示例 Redis 发布订阅 (pub/sub) 是一种消息通信模式:发送者 (pub) 发送消息,订阅者 (sub) 接收消息。Redis 客户端可以订阅任意数量的频道。下图展示了频道 channel1 ,以及订阅这个频道的三个客户端 —— client1 、client2 和 client3 之间的关系: 当

    2024年02月12日
    浏览(35)
  • JAVA 实现 Redis 发布订阅

    发布订阅: 消息发布者发布消息 和 消息订阅者接收消息 ,两者之间通过某种媒介联系起来 例如订杂志,当自己订阅了爱格杂志,每个月会发刊一本。到发布的时候派送员将杂志送到自己手上就能看到杂志内容。只有我们订阅了该杂志才会派送给我们 Redis 发布订阅(pub/su

    2024年02月14日
    浏览(37)
  • redis发布订阅模式的应用

    小体量系统,某些特定场景需要做异步处理。如操作日志记录、发送消息、数据excel导入等。并发量不大,主要作用是异步批处理数据,提高响应速度,改善用户体验,不至于页面卡半天。用消息队列的话显得很笨重,牛刀杀鸡,不利于项目的快速布署和影响项目稳定。在这

    2024年02月11日
    浏览(43)
  • Redis 消息队列和发布订阅

    采用redis 三种方案: ● 生产者消费者:一个消息只能有一个消费者 ● 发布者订阅者:一个消息可以被多个消费者收到 ● stream模式:实现队列和广播模式 Producer调用redis的lpush往特定key里放消息,Consumer调用brpop去不断监听key。 1、利用redis的链表,存储数据,实现队列模式

    2024年01月18日
    浏览(42)
  • 【Redis】Pub/Sub(发布/订阅)

    Pub/Sub(发布/订阅)是一种消息传递模式,其中消息发送者(发布者)将消息发布到一个或多个主题(topics)或频道(channels),而消息接收者(订阅者)订阅特定的主题或频道以接收消息。 在Pub/Sub模式中,发布者和订阅者不直接通信,而是通过一个中介(通常称为消息代理

    2024年02月16日
    浏览(35)
  • Redis消息传递:发布订阅模式详解

    目录 1.Redis发布订阅简介 2.发布/订阅使用    2.1 基于频道(Channel)的发布/订阅    2.2 基于模式(pattern)的发布/订阅 3.深入理解Redis的订阅发布机制    3.1 基于频道(Channel)的发布/订阅如何实现的?    3.2 基于模式(Pattern)的发布/订阅如何实现的?    3.3 SpringBoot结合Redis发布

    2024年02月12日
    浏览(42)
  • Redis实现消息的发布和订阅

    4.1 发送消息 4.2 接收消息

    2024年02月13日
    浏览(40)
  • Redis(发布订阅、事务、redis整合springboot、集成 Spring Cache)

    目录 一.redis的发布订阅 1、什么 是发布和订阅 2、Redis的发布和订阅 3、发布订阅的代码实现 二.Redis事务 1.事务简介 1、在事务执行之前 如果监听的key的值有变化就不能执行 2、在事务执行之前 如果监听的key的值没有变化就能执行 3、Exec之前就出现错误 4、Exec之后出现的错误

    2024年01月24日
    浏览(49)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包