Redis发布订阅模式

young 429 2021-10-18

可以通过队列的rpush和lpop实现

但是会存在的问题:

  1. 当生产速度远大于消费速度,会在内存中堆积过多的数据,占用大量的内存
  2. 消息实时性
  3. 一对多不支持

发布订阅

redis中提供了发布订阅模式

订阅频道:可以一次订阅多个

subscribe channel-1 channel-2 channel-3

向指定频道发布消息

publish channel-1 2673

取消订阅(不能在订阅状态下使用):

unsubscribe channel-1

消费者只能接受订阅后收到的消息,不能接收到订阅之前的消息,发布的消息不会被持久化

订阅频道

消费端1,关注运动信息:

psubscribe *sport

消费端2,关注所以新闻:

psubscribe news*

消费端3,关注天气新闻:

psubscribe news-weather

使用Java代码实现订阅发布

Jedis jedis = new Jedis("10.0.16.6",6379);
// 使用模式匹配的方式设置频道
// 会阻塞
final MyListener listener = new MyListener();
jedis.psubscribe(listener,"young-*");

创建监听器,实现相关方法

public class MyListener extends JedisPubSub {
    /** 取得订阅的消息后的处理*/
    @Override
    public void onMessage(String channel, String message) {
        System.out.println(channel + "=" + message);
    }

    @Override
    /** 初始化订阅时候的处理*/
    public void onSubscribe(String channel, int subscribedChannels) {
        // System.out.println(channel + "=" + subscribedChannels);
    }

    @Override
    /** 取消订阅时候的处理*/
    public void onUnsubscribe(String channel, int subscribedChannels) {
        // System.out.println(channel + "=" + subscribedChannels);
    }

    @Override
    /** 初始化按表达式的方式订阅时候的处理*/
    public void onPSubscribe(String pattern, int subscribedChannels) {
        // System.out.println(pattern + "=" + subscribedChannels);
    }

    /** 取消按表达式的方式订阅时候的处理*/
    @Override
    public void onPUnsubscribe(String pattern, int subscribedChannels) {
        // System.out.println(pattern + "=" + subscribedChannels);
    }

    /** 取得按表达式的方式订阅的消息后的处理*/
    @Override
    public void onPMessage(String pattern, String channel, String message) {
        System.out.println(pattern + "=" + channel + "=" + message);
    }
}

发布消息

Jedis jedis = new Jedis("10.0.16.6",6379);
jedis.publish("young-123","aaa");
jedis.publish("young-456","bbb");