JMS和ActiveMQ介绍(2)_JMS

9-30 1,297 views

1幻灯片7
1幻灯片8
JMSAPI可以分为3个主要部分:公共API、队列API和主题API。
JMSAPI中,ConnectionFactory和Destination既可以作为受管对象,由JMS提供者创建,并使用JNDI从提供者获得,也可以直接动态创建;
其他接口通过工厂方法创建,比如Session可以通过Connection创建;
消息生产者和消费者一般仅创建一个连接(Connection),而可以创建多个会话(Session);
由Session保存用于消息发送的事务性工作单元。
通过Session可以创建Message,MessageProducer,MessageConsumer。
幻灯片9
消息可分为3部分:消息头、属性和有效负载。
其中消息又分为自动分配消息头和开发者分配消息头。
自动分配的消息头:
1)JMSDestination,消息目的地。
2)JMSDeliveryMode,消息传送模式,默认设置为持久性模式。
3)JMSMessageID,消息ID。
4)JMSTimestamp,JMS提供者接收消息的时间。
5)JMSExpiration,消息过期时间,默认值为0,即永不过期。
6)JMSRedelivered,是否是重发的消息。
7)JMSPriority,0~4级是普通优先级,而5~9级则是加急优先级。
开发者分配的消息头:
1)JMSReplyTo,接收消息后发送响应消息的目的地。
2)JMSCorrelationID,与消息相关联的特定ID,在大多数情况下,用于接收消息后发送响应消息时,记录上一个消息的ID。
3)JMSType,消息类型。
属性:
与消息头类似,由开发者添加,支持String、Int、Boolean、Double、Float、Byte、Long、Short、Object等类型,可用于消息选择;
有效负载,即实际传输的消息内容。
根据要携带的有效负载类型,JMS定义了6种消息接口:
Message,不含有效负载;
TextMessage,携带一个String作为有效负载;
ObjectMessage,携带一个可序列化Java对象作为有效负载;
BytesMessage,携带一组原始类型字节流;
StreamMessage,携带一个Java原始数据类型流;
MapMessage,携带一组键值对作为有效负载。
消息持久化,通过JMSDeliveryMode设定。在持久性模式下,消息在服务器端被持久化保存,直至消费者接收到该消息,因此可以保证消息成功发送有且仅有一次,而非持久性模式只能保证消息最多发送一次,而不能保证消息被成功接收。
幻灯片10
对于消息生产者相关API的使用,我们分队列和主题分别介绍。
在队列下:
通过ActiveMQ提供的连接工厂类创建queueConnection。
queueConnection调用start方法真正建立与消息服务器的长连接。
调用queueConnectoin的createQueueSession方法创建负责消息相关操作的会话单元。
调用queueSession的createQueue、createSender、createMessage方法分别创建队列、发送者和消息。
最后调用queueSender的send方法发送消息,消息发送是一个同步操作,在send方法中线程会被阻塞,直到接收到来自消息服务器的确认。
在主题下,消息生产过程中各接口的调用方法与队列下的基本类似,只是使用了与主题相关的一些接口。因此,对于消息生产,可以使用公共API中的接口来完成。
幻灯片11
使用公共API中的接口实现消息生产者,其中使用了公共API中的接口Connection、Session、Destination和MessageProducer。
在创建destination时,调用createTopic方法和createQueue方法来分别创建主题和队列。
幻灯片12
对于消息消费者相关API的使用,我们也分队列和主题分别介绍。
在队列下,调用queueSession的createReceiver方法创建接收者。
调用queueReceiver的receive方法接收消息,在receive方法中,线程将被阻塞直至接收到消息。也可以在调用receive方法时,设置最长延时,若该时间内未收到消息,receive方法将直接返回。
在主题下,调用topicSession的createSubscriber方法创建订阅者。
另外需要通过实现MessageListener接口,实现消息侦听器,并重写侦听器的onMessage方法。通过topicSubscriber的setMessageListener将侦听器注册到订阅者中。当消息服务器将一条消息推向topicSubscriber时,topicSubscriber将调用侦听器的onMessage方法对消息进行具体处理。
幻灯片13
使用公共API中的接口实现消息消费者。
对于队列和主题,除了在创建destination时的方法不同外,在消息接收时,队列下的consumer是通过receive方法阻塞线程主动接收消息,主题下的comsumer是通过设置侦听器侦听推送过来的消息。
幻灯片14
对于持久订阅者,它与普通订阅者的区别是:对于普通订阅者,在不连接时,发送的消息将被丢失,连接后这些消息不会被接收,而对于持久订阅者,在不连接时,发送的消息将被保存,连接后这些消息会被正常接收。
持久订阅者的创建与普通订阅者的创建有两点不同:一是在连接中需要设置ClientID,二是在创建订阅者时调用createDurableSubscriber方法创建持久订阅者,并设置订阅者名称。消息服务器将ClientID和订阅者名称作为持久订阅者的唯一标识符,在不连接时,为其保留一份消息副本,在连接时,将消息发送至该持久订阅者。
若要彻底关闭持久订阅者,使消息服务器不再为其保留消息副本,则需要调用unsubscribe方法。
持久订阅者的优点是不管订阅者是否连接,消息都能被正常接收。
持久订阅者的缺点是若订阅者一直不连接,消息将被一直保存,造成存储压力。
因此是否选用持久订阅者要根据具体应用场景来决定,若需要保证订阅者在有连接丢失的情况下仍能接收到所有消息,则采用持久订阅者,若只需要订阅者在连接时接收消息,且允许有一定的消息丢失,则采用普通订阅者。
幻灯片15
队列中的消息只能被一个消费者消费,若查看队列中的消息,但不消费消息,可以使用QueueBrowser。
通过调用getEnumeration方法可以按照消息接收顺序获取到当前队列中的所有消息。
幻灯片16
对于消费者,有时候需要处理满足特定条件的消息,一个可行的解决方案是在处理消息前,增加条件判断,对于不满足条件的消息不作处理,但这个方案存在的一个问题是由于消息已经被该消费者接收,但不会被处理,从而造成网络带宽的浪费,另外不作处理的消息也不能再被其他可能会处理这些消息的消费者接收。
我们可以通过消息选择器解决上述问题。使用消息选择器可以实现消息的过滤,只接收满足特定要求的消息。
消息选择器可以增加的条件包括消息头和消息属性。消息头中可以用作选择条件的有JMSDeliveryMode,JMSPriority,JMSMessageID,JMSTimestamp,JMSCorrelationID,JMSType,例如可以选择权重值较高的消息。在消息属性中可以设置业务属性,这样就可以选择满足特定业务需求的消息。
消息选择器的基本表达式由标识符、比较运算符和常量组成,例如,JMSPriority> 5,即选择权重大于5的消息。
多个基本表达式可以由AND、OR组成复杂表达式,例如,JMSPriority> 5 AND Oper= ‘Delete’,即选择权重大于5且业务上操作类型为“Delete”的消息。
消息选择器在创建消费者时设置,如下所示。
幻灯片17
对于持久化消息,消息服务器会对消息进行确认,以保证消息成功发送。
消息确认机制有如下几种:AUTO_ACKNOWLEDGE,DUPS_OK_ACKNOWLEDGE,CLIENT_ACKNOWLEDGE,
消息确认机制可以在创建session时指定。
AUTO_ACKNOWLEDGE是由消息服务器自动完成消息确认。
消息的发送和接收是两个异步过程,因此分别讨论这两个过程中AUTO机制的消息确认。
消息发送时:
1)生产者发送消息,生产者阻塞线程等待消息服务器接收到消息的确认。
2)消息服务器接收到消息后,对消息进行持久化。
3)持久化成功后,消息服务器向生成者发送确认通知。
4)生产者接收到确认通知后,从发送方法返回。
消息接收时:
5)消费者接收消息。
6)消费者发送确认通知。
7)消息服务器从存储中删除消息。
DUPS_OK_ACKNOWLEDGE不同于AUTO对单条消息进行确认,是一种延时、批量确认机制,这样做的一个好处是可以减少因单条消息确认而带来的系统开销,但带来的一个问题是可能将一条消息向同一目的地发送两次以上,因此,适用于可以重复接收消息的应用场景。
CLIENT_ACKNOWLEDGE是由消费者通过调用消息的acknowledge方法进行确认,如果消费者使用CLIENT机制,且调用acknowledge方法,那么在上次确认之间所发的未被确认的消息都将被确认。
INDIVIDUAL_ACKNOWLEDGE也是由消费者通过调用消息的acknowledge方法进行确认,但不同于CLIENT机制,每条消息都需要调用acknowledge方法进行确认。
幻灯片18
消息事务可以保证:
如果在一个会话中,只有消息生产者或只有消息消费者,则对于消息生产者,事务将保证其所有消息都发送至服务器或都不发送,而对于消息消费者,事务将保证其接收所有消息或都不接收。
如果在一个会话中,既有消息生产者,又有消息消费者,则事务将保证所有消息都发送并接收或都不发送和接收。
创建会话时可以设置是否使用事务。
多次操作后通过调用commit()提交。在出现异常时,通过调用rollback()回滚。
幻灯片19
Spring对JMS提供支持,在客户端可以通过Spring配置连接、生产者和消费者。
通过spring提供的DefaultMessageListenerContainer可以设置消息的侦听,通过spring提供的JmsTemplate可以进行消息的发送和接收。

标签:

基于WebSocket实现微信小程序的消息推送

微信小程序支持通过基于WebSocket进行消息推送,提供了相应的API,例如创建连接示例代码: JavaScript wx.connectSocket({ ...

阅读全文

基于nginx-sticky-module-ng实现会话保持(Sticky Sessions)

对服务进行集群部署,前端进行负载均衡时,需要实现会话保持,对于同一会话的多个请求,通过集群中的一个节点来提供服务。系统的部署结构如图所示,通过Resin...

阅读全文

ActiveMQ基于Zookeeper和LevelDB实现Master/Slave

ActiveMQ的Master/Slave目前支持三种实现方式: 1)Shared File System Master Slave; 2)JDBC Master Slave; 3)Replicated LevelDB Store。 对于第三种方...

阅读全文