• 售前

  • 售后

热门帖子
入门百科

【RabbitMQ】Go语言实现六种消息中间件模型

[复制链接]
麻辣鸡翅 显示全部楼层 发表于 2022-1-12 14:52:53 |阅读模式 打印 上一主题 下一主题
文章目录



写在前面

   本文是使用Go语言实现各种RabbitMQ的中间件模型
  1. 介绍

1.1 什么是MQ

MQ(Message Quene) : 翻译为 消息队列,通过典型的 生产者和消费者模型,生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入,轻松的实现系统间解耦。
别名为 消息中间件 通过利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。
目前市面上有很多消息中间件:RabbitMQ,RocketMQ,Kafka等等…
1.2 什么是RabbitMQ

RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。AMQP协议更多用在企业系统内对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求可能比较低了。
1.3 AMQP 协议

AMQP(advanced message queuing protocol) 在2003年时被提出,最早用于解决金融领不同平台之间的消息传递交互问题。
顾名思义,AMQP是一种协议,更准确的说是一种binary wire-level protocol(链接协议)。这是其和JMS的本质差别,AMQP不从API层进行限定,而是直接定义网络交换的数据格式。这使得实现了AMQP的provider天然性就是跨平台的。以下是AMQP协议模型:

2. Go语言操作RabbitMQ

2.1 下载

下载rabbitmq过程就省了,可以直接到官网网站下载安装,像安装qq一样。
2.2 引入驱动



  • 驱动
go get github.com/streadway/amqp


  • 连接
  1. var MQ *amqp.Connection
  2. // RabbitMQ 链接
  3. func RabbitMQ(connString string) {
  4.         conn, err := amqp.Dial(connString)
  5.         if err != nil {
  6.                 panic(err)
  7.         }
  8.         MQ = conn
  9. }
复制代码
2.3 HelloWorld 模型


P代表生产者,C代表消费者,红色部分是队列。
生产者生成消息到队列中,消费者进行消费,直连单点模式。
2.3.1 生产者



  • 声明连接对象
  1. var ProductMQ *amqp.Connection
复制代码


  • 声明通道
  1. ch, err := ProductMQ.Channel()
复制代码


  • 创建队列
  1. q, err := ch.QueueDeclare("hello",         // 队列名字
  2.         false,           // 是否持久化,
  3.         false,   // 不用的时候是否自动删除
  4.         false,           // 用来指定是否独占队列
  5.         false,           // no-wait
  6.         nil,                     // 其他参数
  7. )
复制代码
参数1(name):队列名字
参数2(durable):持久化,队列中所有的数据都是在内存中的,如果为true的话,这个通道关闭之后,数据就会存在磁盘中持久化,false的话就会丢弃
参数3(autoDelete):不需要用到队列的时候,是否将消息删除
参数4(exclusive):是否独占队列,true的话,就是只能是这个进程独占这个队列,其他都不能对这个队列进行读写
参数5(noWait):是否阻塞
参数6(args):其他参数


  • 发布消息
  1. body := "Hello World!"
  2. err = ch.Publish(
  3.         "",     // 交换机
  4.         q.Name, // 队列名字
  5.         false,  // 是否强制性
  6.         // 当mandatory标志位设置为true时,如果exchange根据自身类型和消息routeKey无法找到一个符合条件的queue,那么会调用basic.return方法将消息返回给生产者
  7.         // 当mandatory设置为false时,出现上述情形broker会直接将消息扔掉
  8.         false, //当immediate标志位设置为true时,如果exchange在将消息路由到queue(s)时发现对于的queue上么有消费者,那么这条消息不会放入队列中。当与消息routeKey关联的所有queue(一个或者多个)都没有消费者时,该消息会通过basic.return方法返还给生产者
  9.         // 是否立刻
  10.         /**
  11.         概括来说,mandatory标志告诉服务器至少将该消息route到一个队列中,否则将消息返还给生产者;immediate标志告诉服务器如果该消息关联的queue上有消费者,则马上将消息投递给它,如果所有queue都没有消费者,直接把消息返还给生产者,不用将消息入队列等待消费者了。
  12.         **/
  13.         amqp.Publishing{
  14.                 ContentType: "text/plain",
  15.                 Body:        []byte(body), // 发送的消息
  16.         })
复制代码
参数1(exchange):交换机,后续会讲到
参数2(route-key):队列名字
参数3(mandatory):是否强制性,
   当mandatory标志位设置为true时,如果exchange根据自身类型和消息routeKey无法找到一个符合条件的queue,那么会调用basic.return方法将消息返回给生产者
当mandatory设置为false时,出现上述情形broker会直接将消息扔掉
  参数4(immediate):是否立即处理
   当immediate标志位设置为true时,如果exchange在将消息路由到queue(s)时发现对于的queue上么有消费者,那么这条消息不会放入队列中。当与消息routeKey关联的所有queue(一个或者多个)都没有消费者时,该消息会通过basic.return方法返还给生产者
  也就是说,mandatory 标志告诉服务器至少将该消息route到一个队列中,否则将消息返还给生产者;immediate标志告诉服务器如果该消息关联的queue上有消费者,则马上将消息投递给它,如果所有queue都没有消费者,直接把消息返还给生产者,不用将消息入队列等待消费者了。
参数5(msg):发布的消息,ContentType是传输类型,Body是发送的消息。
2.3.2 消费者



  • 声明通道
  1. ch, err := ConsumerMQ.Channel()
复制代码


  • 创建队列
  1. q, err := ch.QueueDeclare(
  2.         "hello",
  3.         false,
  4.         false,
  5.         false,
  6.         false,
  7.         nil,
  8. )
复制代码


  • 读取队列消息
  1. msgs, err := ch.Consume(
  2.         q.Name,
  3.         "",   
  4.         true,  
  5.         false,  
  6.         false,  
  7.         false,
  8.         nil,   
  9. )
复制代码
由于消费者端需要一直监听,所以我们要用一个for循环+channel去阻塞主进程,使得主进程一直处于监听状态。
[code]forever := make(chan bool)go func() {        for d := range msgs {                fmt.Printf("Received a message: %s", d.Body)        }}()fmt.Printf("
  • Waiting for messages. To exit press CTRL+C")
  • 本帖子中包含更多资源

    您需要 登录 才可以下载或查看,没有帐号?立即注册

    x

    帖子地址: 

    回复

    使用道具 举报

    分享
    推广
    火星云矿 | 预约S19Pro,享500抵1000!
    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    草根技术分享(草根吧)是全球知名中文IT技术交流平台,创建于2021年,包含原创博客、精品问答、职业培训、技术社区、资源下载等产品服务,提供原创、优质、完整内容的专业IT技术开发社区。
    • 官方手机版

    • 微信公众号

    • 商务合作