activemq
参考链接:https://www.cnblogs.com/charlesblc/p/6045238.html
消息队列-推/拉模式学习 & ActiveMQ及JMS学习
消息中间件的主要功能是消息的路由(Routing)和缓存(Buffering)。在AMQP中提供类似功能的两种域模型:Exchange 和 Message queue。
AMQP的更多内容可以看这里: http://www.cnblogs.com/charlesblc/p/6058799.html
一种分类是推和拉 。
还有一种分类是 Queue 和 Pub/Sub 。
先看的这一篇:http://blog.csdn.net/heyutao007/article/details/50131089
先讲了JMS和遵守JMS的ActiveMQ。Java Message Service,JMS,指的是面向消息中间件(MOM),用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。
1 | AMQP的原始用途只是为金融界提供一个可以彼此协作的消息协议,而现在的目标则是为通用消息队列架构提供通用构建工具。因此,面向消息的中间件(MOM)系统,例如发布/订阅队列,没有作为基本元素实现。反而通过发送简化的AMQ实体,用户被赋予了构建例如这些实体的能力。这些实体也是规范的一部分,形成了在线路层协议顶端的一个层级:AMQP模型。这个模型统一了消息模式,诸如之前提到的发布/订阅,队列,事务以及流数据,并且添加了额外的特性,例如更易于扩展,基于内容的路由。 |
JMS中定义了两种消息模型:点对点(point to point, queue)和发布/订阅(publish/subscribe,topic)。主要区别就是是否能重复消费。
点对点:Queue,不可重复消费
1 | 消息生产者生产消息发送到queue中,然后消息消费者从queue中取出并且消费消息。 |
发布/订阅:Topic,可以重复消费
1 | 消息生产者(发布)将消息发布到topic中,同时有多个消息消费者(订阅)消费该消息。 |
支持订阅组的发布订阅模式:
1 | 发布订阅模式下,当发布者消息量很大时,显然单个订阅者的处理能力是不足的。 |
注:queue和topic在ActiveMQ里面的实现和对比,可以参考:《ActiveMQ的queue以及topic两种消息处理机制分析》
有完整queue和topic对比的代码可以看这里:http://blog.csdn.net/zmx729618/article/details/51082844
可以看出区别 topic 是 session.createTopic(“FirstTopic”); 而queue是 createQueue.
流行模型比较
传统企业型消息队列ActiveMQ遵循了JMS规范,实现了点对点和发布订阅模型,但其他流行的消息队列RabbitMQ、Kafka并没有遵循JMS规范。
3.1、RabbitMQ
RabbitMQ实现了AMQP协议,AMQP协议定义了消息路由规则和方式。
(更多AMQP内容,看这里:http://www.cnblogs.com/charlesblc/p/6058799.html)
生产端通过路由规则发送消息到不同queue,消费端根据queue名称消费消息。
RabbitMQ既支持内存队列也支持持久化队列,消费端为推模型,消费状态和订阅关系由服务端负责维护,消息消费完后立即删除,不保留历史消息。
(1)点对点
1 | 生产端发送一条消息通过路由投递到Queue,只有一个消费者能消费到。 |
(2)多订阅
1 | 当RabbitMQ需要支持多订阅时,发布者发送的消息通过路由同时写到多个Queue,不同订阅组消费不同的Queue。 |
3.2、Kafka
Kafka只支持消息持久化,消费端为拉模型,消费状态和订阅关系由客户端端负责维护,消息消费完后不会立即删除,会保留历史消息。
因此支持多订阅时,消息只会存储一份就可以了。但是可能产生重复消费的情况。
(1)点对点&多订阅(因为不删消息,所以这两种就不区分了)
1 | 发布者生产一条消息到topic中,不同订阅组消费此消息。 |
上面是三种最流行MQ的比较(ActiveMQ, RabbitMQ, Kafka,没有涉及C++的zeorq)。
下面这篇文章针对ActiveMQ的推拉模型进行介绍。
http://www.cnblogs.com/hapjin/p/5683648.html
对于消费者而言有两种方式从消息中间件获取消息:
1 | ①Push方式:由消息中间件主动地将消息推送给消费者; |
看一段官网对Push方式的解释:
1 | To be able to achieve high performance it is important to stream messages to consumers as fast as possible |
比较:
1 | 采用Push方式,可以尽可能快地将消息发送给消费者(stream messages to consumers as fast as possible) |
但是,Push方式会有一个坏处:
1 | 如果消费者的处理消息的能力很弱(一条消息需要很长的时间处理),而消息中间件不断地向消费者Push消息,消费者的缓冲区可能会溢出。 |
ActiveMQ是怎么解决这个问题的呢?那就是 prefetch limit
1 | prefetch limit 规定了一次可以向消费者Push(推送)多少条消息。 |
prefetch limit设置的大小根据场景而定:
1 | 那prefetch limit的值设置为多少合适?视具体的应用场景而定。 |
prefetch limit 设置成0意味着什么?意味着变成 拉pull模式。
1 | Specifying a prefetch limit of zero means the consumer will poll for more messages, one at a time, |
prefetch Limit>0即为prefetch,=0为Pull,看起来没有不prefetch的push,push都要设置prefetch。
另外,对于prefetch模式(,那么消费需要进行响应ACK。因为服务器需要知道consumer消费的情况。
1 | perfetch limit是“消息预取”的值,这是针对消息中间件如何向消费者发消息 而设置的。 |
1 | 如果prefetchACK为true,那么prefetch必须大于0;当prefetchACK为false时,你可以指定prefetch为0以及任意大小的正数。 |
在程序中如何采用Push方式或者Pull方式呢?
1 | 从是否阻塞来看,消费者有两种方式获取消息。同步方式和异步方式。 |
同步方式:
1 | 使用同步方式receive()方法获取消息时,prefetch limit即可以设置为0,也可以设置为大于0 |
异步方式:
1 | 当使用MessageListener异步获取消息时,prefetch limit必须大于零了。 |
此外,还有一个要注意的地方,即消费者采用同步获取消息(receive方法) 与 异步获取消息的方法(MessageListener) ,对消息的确认时机是不同的。
这里提到了这篇文章:http://shift-alt-ctrl.iteye.com/blog/2020182 文章名《ActiveMQ消息传送机制以及ACK机制详解》
ActiveMQ消息传送机制
1 | Producer客户端使用来发送消息的, Consumer客户端用来消费消息; |
一条消息的生命周期如下:
1 | 一条消息从producer端发出之后,一旦被broker正确保存,那么它将会被consumer消费,然后ACK,broker端才会删除; |
上面的图里面写的很清晰。
上半部分是producer的流程,下半部分consumer的流程分为两块,同步的consumer.receive和异步的MessageListener。从图中可以看出异步的MessageLister也是一条一条处理的,由delivered队列控制的。
1 | 这张图片中简单的描述了:1)producer端如何发送消息 2) consumer端如何消费消息 3) broker端如何调度。 |
Prefetch和optimizeACK
我们需要在brokerUrl指定optimizeACK选项,在destinationUri中指定prefetchSize(预获取)选项。
1 | 其中brokerUrl参数选项是全局的,即当前factory下所有的connection/session/consumer都会默认使用这些值; |
关于prefetchAck、同步、异步api(上面讲过了,温习一下):
1 | 如果prefetchACK为true,那么prefetch必须大于0; |
重发选项:
1 | 我们还可以brokerUrl中配置“redelivery”策略,比如当一条消息处理异常时,broker端可以重发的最大次数; |
consumer消费快慢,决定了架构和设计如何处理:
1 | 按照良好的设计准则, |
其他情景:
1 | 如果consumer端消费速度很快,但是producer端生成消息的速率较慢,而且我们还部署了多个consumer, |
错误处理与重发:
1 | 既然optimizeACK是”延迟“确认,那么就引入一种潜在的风险: |
从上面的图可以看出,没有ACK的情况下,队列是blocking的。
1 | 无论如何设定此值,client持有的消息条数最大为:prefetch + “DELIVERED_ACK_TYPE消息条数”(DELIVERED_ACK_TYPE参见下文) |
optimizeACK其他注意:
1 | 即使当optimizeACK为true,也只会当session的ACK模式为AUTO_ACKNOWLEDGE时才会生效,即在其他类型的ACK模式时consumer端仍然不会“延迟确认”,即: |
ACK模式与类型介绍
JMS API中约定了Client端可以使用四种ACK模式
1 | 在javax.jms.Session接口中: |
对于broker而言,只有接收到ACK指令,才会认为消息被正确的接收或者处理成功了,通过ACK,可以在consumer(/producer)与Broker之间建立一种简单的“担保”机制.
1 | Client端指定了ACK模式,但是在Client与broker在交换ACK指令的时候,还需要告知ACK_TYPE,ACK_TYPE表示此确认指令的类型, |
ACK的基本流程见下图:
Consumer消费消息的风格有2种: 同步/异步. 使用consumer.receive()就是同步,使用messageListener就是异步。
在同一个consumer中,我们不能同时使用这2种风格,比如在使用listener的情况下,当调用receive()方法将会获得一个Exception。
两种风格下,消息确认时机有所不同。
1 | "同步"伪代码: |
ACK模式详解
AUTO_ACKNOWLEDGE :
1 | 自动确认,这就意味着消息的确认时机将有consumer择机确认. |
CLIENT_ACKNOWLEDGE :
1 | 客户端手动确认,这就意味着AcitveMQ将不会“自作主张”的为你ACK任何消息,开发者需要自己择机确认。 |
注意防止不ack而hang住:
1 | 如果client端因为某种原因导致acknowledge方法未被执行,将导致大量消息不能被确认, |
broker依据ack速率进行负载平衡:
1 | 在CLIET_ACK模式下,消息在交付给listener之前,都会首先创建一个DELIVERED_ACK_TYPE的ACK指令, |
DUPS_OK_ACKNOWLEDGE :
1 | "消息可重复"确认,意思是此模式下,可能会出现重复消息,并不是一条消息需要发送多次ACK才行。 |
发生作用的时机:
1 | 1) 在ActiveMQ中,如果在Destination是Queue通道,我们真的可以认为DUPS_OK_ACK就是“AUTO_ACK + optimizeACK + (prefetch > 0)” |
SESSION_TRANSACTED :
当session使用事务时,就是使用此模式。
1 | 在事务开启之后,和session.commit()之前,所有消费的消息,要么全部正常确认,要么全部redelivery。 |
确认过程,以及确认ACK的发送时机:
1 | 事务的确认过程中,首先把本地的deliveredMessage队列中尚未确认的消息全部确认(STANDARD_ACK_TYPE); |
INDIVIDUAL_ACKNOWLEDGE :
很少使用,它的确认时机和CLIENT_ACKNOWLEDGE几乎一样
1 | 当消息消费成功之后,需要调用message.acknowledege来确认此消息(单条), |