activemq特性是什么

72次阅读
没有评论

共计 10362 个字符,预计需要花费 26 分钟才能阅读完成。

这篇文章主要介绍“activemq 特性是什么”,在日常操作中,相信很多人在 activemq 特性是什么问题上存在疑惑,丸趣 TV 小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”activemq 特性是什么”的疑惑有所帮助!接下来,请跟着丸趣 TV 小编一起来学习吧!

activemq 特点:用通配符订阅多个 destination, 用组合发布多重 destionation
activemq 支持 destination 的层次结构【topic 和 queen】便于归类和管理。
通配符有三个:
.  用来分隔路径
* 用来匹配路径中的一节
用来匹配任意节的路径
opics: sport League . team  
。例如:football.division.leeds。如果 leeds 参加两种运动 –Scccer 和 Rugby, 为了方便,我们希望通过一个消息消费者而看到 Leeds 两种运动的最新战绩,这个时候,通配符就有用武之地了
.  : used to separate elements in the destination name
*  : used to match one element   
  : match one or all trailing elements
所以,对于上面的例子,你可以订阅这样的主题:*.*.Leeds
如果你想知道 division1 这个赛区的所有分数,你可以订阅这个:soccer.division1.*
如果你想知道 Rugby 的分数:你可以订阅这个:rugby. .
然而,通配符中是为消费者服务的,如果你发送了这样的一个主题:rugby. .,这个消息仅会发送到命名了 rugby. . 的主题,并不是所有的主题都是以 rugby 开头的。
这里有一种   方法,使消息生产者能将一条
消息发送到多个目的地。通过使用   composite destination。
将同一条消息发送到不同的目的地是很有用的。比如一个用来存储信息的应用,会发送一条消息给队列
同时也要将这条消息广播给监控的所有系统。通常,你会通过用两个 producer 发送两次消息来达到这个目的。composite destination 就是用来解决这种情况的
例如,如果你创建了名子为:store.order.backoffice,store.order.warehouse 的 Queue, 这样 就会发送同时两个 Queue。
订阅信息     解释
PRICE.    Any price for any product on any exchange
PRICE.STOCK.    Any price for a stock on any exchange
PRICE.STOCK.NASDAQ.*    Any stock price on NASDAQ
PRICE.STOCK.*.IBM    Any IBM stock price on any exchange
从 5.5 版本以后,可以自定义路径分隔符:

  plugins
  …..
  destinationPathSeparatorPlugin/
  /plugins

此时 FOO.BAR.* 可以表示为 FOO/BAR/*
也可以通过 pathSeparator 属性定义其他符号位路径分隔符。
  public void subscribeToLeeds() throws JMSException {
  String brokerURI = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
  ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURI);
  Connection connection = connectionFactory.createConnection();
  connection.start();
  Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  Topic allLeeds = session.createTopic(*.*.Leeds
  MessageConsumer consumer = session.createConsumer(allLeeds);
  Message result = consumer.receive();
  }
   11.1.2 发送一个 message 到多重 destinations
    发送相同的 message 到不同的 destination 上:案列发送一个 [queen,opic] 组合模式,默认的组合 destination 用,分隔
    列如 store.order.backoffice,store.order.warehouse
    String brokerURI = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
  ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURI);
  Connection connection = connectionFactory.createConnection();
  connection.start();
  Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  Queue ordersDestination = session.createQueue(store.orders, topic://store.orders
  MessageProducer producer = session.createProducer(ordersDestination);
  Message order = session.createObjectMessage();
  producer.send(order);
11.2 通知消息
单的说就是实现了 ActiveMQ 的 broker 上各种操作的记录跟踪和通知。

使用这个功能,你可以实时的知道 broker 上

  创建或销毁了连接,
  添加或删除了生存者或消费者,
  添加或删除了主题或队列,
  有消息发送和接收,
  什么时候有慢消费者,
  什么时候有快生产者
  什么时候什么消息被丢弃
  什么时候 broker 被添加到集群(主从或是网络连接)

这个机制是 ActiveMQ 对 JMS 协议的重要补充,也是基于 JMS 实现的 ActiveMQ 的可管理性的一部分。多个 ActiveMQ 的相互协调和互操作的基础设置。
 String brokerURI = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
  ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURI);
  Connection connection = connectionFactory.createConnection();
  connection.start();
  Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  Topic connectionAdvisory = org.apache.activemq.advisory.AdvisorySupport.CONNECTION_ADVISORY_TOPIC;
  MessageConsumer consumer = session.createConsumer(connectionAdvisory);
  ActiveMQMessage message = (ActiveMQMessage) consumer.receive();
  DataStructure data = (DataStructure) message.getDataStructure();
  if (data.getDataStructureType() == ConnectionInfo.DATA_STRUCTURE_TYPE) {
  ConnectionInfo connectionInfo = (ConnectionInfo) data;
  System.out.println(Connection started: + connectionInfo);
  } else if (data.getDataStructureType() == RemoveInfo.DATA_STRUCTURE_TYPE) {
  RemoveInfo removeInfo = (RemoveInfo) data;
  System.out.println(Connection stopped: + removeInfo.getObjectId());
  } else {
  System.err.println(Unknown message + data);
  }
      大多数 advisor 消息都是完整的对于 destiation,但是呢 advisorysupport 类有一些方法来决定监听哪个 advisorytopic,你也能使用通配符 -
      String brokerURI = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
  ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURI);
  Connection connection = connectionFactory.createConnection();
  connection.start();
  // Lets first create a Consumer to listen too
  Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  // Lets first create a Consumer to listen too
  Queue queue = session.createQueue(test.Queue
  MessageConsumer testConsumer = session.createConsumer(queue);
  // so lets listen for the Consumer starting/stoping
  Topic advisoryTopic = org.apache.activemq.advisory.AdvisorySupport.getConsumerAdvisoryTopic(queue);
  MessageConsumer consumer = session.createConsumer(advisoryTopic);
  consumer.setMessageListener(new MessageListener() {
  public void onMessage(Message m) {
  ActiveMQMessage message = (ActiveMQMessage) m;
  try {
  System.out.println(Consumer Count = + m.getStringProperty( consumerCount));
  DataStructure data = (DataStructure) message.getDataStructure();
  if (data.getDataStructureType() == ConsumerInfo.DATA_STRUCTURE_TYPE) {
  ConsumerInfo consumerInfo = (ConsumerInfo) data;
  System.out.println(Consumer started: + consumerInfo);
  } else if (data.getDataStructureType() == RemoveInfo.DATA_STRUCTURE_TYPE) {
  RemoveInfo removeInfo = (RemoveInfo) data;
  System.out.println(Consumer stopped: + removeInfo.getObjectId());
  } else {
  System.err.println(Unknown message + data);
  }
  } catch (JMSException e) {
  e.printStackTrace();
  }
  }
  });
  testConsumer.close();
      destinationPolicy
  policyMap
  policyEntries
  policyEntry topic= advisoryForSlowConsumers= true
  /policyEntry
  /policyEntries
  /policyMap
  /destinationPolicy

ActiveMQ 中,topic 只有在持久订阅(durablesubscription)下是持久化的。存在持久订阅时,每个持久订阅者,都相当于一个持久化的 queue 的客户端,它会收取所有消息。这种情况下存在两个问题:

1.  同一应用内 consumer 端负载均衡的问题:同一个应用上的一个持久订阅不能使用多个 consumer 来共同承担消息处理功能。因为每个都会获取所有消息。queue 模式可以解决这个问题,broker
端又不能将消息发送到多个应用端。所以,既要发布订阅,又要让消费者分组,这个功能 jms 规范本身是没有的。
2.  同一应用内 consumer 端 failover 的问题:由于只能使用单个的持久订阅者,如果这个订阅者出错,则应用就无法处理消息了,系统的健壮性不高, 为了解决这两个问题,ActiveMQ 中实现了虚拟
Topic 的功能。使用起来非常简单。对于消息发布者来说,就是一个正常的 Topic,名称以 VirtualTopic. 开头。例如 VirtualTopic.TEST。对于消息接收端来说,是个队列,不同应用里使用不同的前缀作为
队列的名称,即可表明自己的身份即可实现消费端应用分组。例如 Consumer.A.VirtualTopic.TEST,说明它是名称为 A 的消费端,同理 Consumer.B.VirtualTopic.TEST 说明是一个名称为 B 的客户端。
可以在同一个应用里使用多个 consumer 消费此 queue,则可以实现上面两个功能。又因为不同应用使用的 queue 名称不同(前缀不同),所以不同的应用中都可以接收到全部的消息。每个客户端相当于一个持久订
阅者,而且这个客户端可以使用多个消费者共同来承担消费任务。
String brokerURI = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
   
  ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURI);
  Connection consumerConnection = connectionFactory.createConnection();
  consumerConnection.start();
   
  Session consumerSessionA = consumerConnection.createSession(false,Session.AUTO_ACKNOWLEDGE);
  Queue consumerAQueue = consumerSessionA.createQueue(Consumer.A.VirtualTopic.orders
  MessageConsumer consumerA = consumerSessionA.createConsumer(consumerAQueue);
   
  Session consumerSessionB = consumerConnection.createSession(false,Session.AUTO_ACKNOWLEDGE);
  Queue consumerBQueue = consumerSessionB.createQueue(Consumer.B.VirtualTopic.orders
  MessageConsumer consumerB = consumerSessionB.createConsumer(consumerAQueue);
   
  //setup the sender
  Connection senderConnection = connectionFactory.createConnection();
  senderConnection.start();
  Session senerSession = senderConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  Topic ordersDestination = senerSession.createTopic(VirtualTopic.orders
  MessageProducer producer = senerSession.createProducer(ordersDestination);
同样 queue 名称的消费者会平分所有消息。
从 queue 接收到的消息,message.getJMSDestination().toString()为 topic://VirtualTopic.TEST,即原始的 destination。消息的 persistent 属性为 true,即每个相当于一个持久订阅。
Virtual Topic 这个功能特性在 broker 上有个总开关,useVirtualTopics 属性,默认为 true,设置为 false 即可关闭此功能。
当此功能开启,并且使用了持久化的存储时,broker 启动的时候会从持久化存储里拿到所有的 destinations 的名称,如果名称模式与 Virtual Topics 匹配,则把它们添加到系统的 Virtual Topics 列表中去。
当然,没有显式定义的 Virtual Topics,也可以直接使用的,系统会自动创建对应的实际 topic。
当有 consumer 访问此 VirtualTopics 时,系统会自动创建持久化的 queue,并在每次 Topic 收到消息时,分发到具体的 queue。

可追溯”消费者,只对 Topic 有效,如果 consumer 是可追溯的,那么它可以获取实例创建之前的消息。通常而言,订阅者不可能获取实例创建之前的消息,因为 broker 根本不知道它的存在。对于 broker 而言,如果
一个 Topic 通道创建,且有发布者发布消息 (Publisher), 那么 broker 将会在内存中(非持久化) 或者磁盘中 (持久化) 保存已经发布的消息,直到所有的订阅者都消费者,才会清除原始消息内容。那么 retroactive
类型的订阅者,就可以获取这些原本不属于自己但 broker 上还保存的旧消息,就像我们订阅一种 Feed,可以立即获取旧的内容列表一样。如果此订阅者不是 durable(耐久的),它可以获取最近发布的一些消息;如果是 durable,它可以获取存储器中尚未删除的所有的旧消息。[下文会详细介绍 Topic 的数据转发模型]
// 在 destinationUrl 中设置, 默认为 false
feedTopic?consumer.retroactive=true
在 broker 端,可以配置当前 Topic 默认为“可追溯的”,不过 Topic 并不会在此种情况下额外的保存消息,只不过表示订阅者默认都是可追溯的而已。
!– 只对 topic 有效,默认为 false —
policyEntry topic= feedTopic alwaysRetroactive= true /
  String brokerURI = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
  ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURI);
  Connection connection = connectionFactory.createConnection();
  connection.start();
  Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  Topic topic = session.createTopic(soccer.division1.leeds?consumer.retroactive=true
  MessageConsumer consumer = session.createConsumer(topic);
  Message result = consumer.receive();

 redeliveryPolicy
   consumer 使用的重发策略,当消息在 client 端处理失败 (比如 onMessage 方法抛出异常,事务回滚等),将会触发消息重发。对于 Broker 端,需要重发的消息将会被立即发送(如果 broker 端使用异步发送,
且发送队列中还有其他消息,那么重发的消息可能不会被立即到达 Consumer)。我们通过此 Policy 配置最大重发次数、重发频率等, 如果你的 Consumer 客户端处于不良网络环境中,可以适当调整相关参数。参数列表,
请参见(RedeliveryPolicy)
// 在 brokerUrl 中设置
tcp://localhost:61616?jms.redeliveryPolicy.maximumRedeliveries=6
 . redeliveryPolicy
RedelieveryPolicy policy=connection.getRedelieveryPolicy();
policy.setInitialRedelieveryDelay(500);
policy.setBackOffMultiplier(2)
policy.setUseExponentialBackOff(true)
policy.setMaximumRedelieveries(2)

DLQ- 死信队列 (Dead Letter Queue) 用来保存处理失败或者过期的消息。
出现以下情况时,消息会被 redelivered
A transacted session is used and rollback() is called.
A transacted session is closed before commit is called.
A session is using CLIENT_ACKNOWLEDGE and Session.recover() is called.

当一个消息被 redelivered 超过 maximumRedeliveries(缺省为 6 次,具体设置请参考后面的链接)次数时,会给 broker 发送一个 Poison ack,这个消息被认为是 a poison pill,这时 broker 会将这
消息发送到 DLQ,以便后续处理。缺省的死信队列是 ActiveMQ.DLQ,如果没有特别指定,死信都会被发送到这个队列。缺省持久消息过期,会被送到 DLQ,非持久消息不会送到 DLQ 可以通过配置文件 (activemq.xml)
来调整死信发送策略
destinationPolicy

  policyMap

  policyEntries

  !— 设置所有队列,使用,否则用队列名称 —

  policyEntry queue=

  deadLetterStrategy

  !–

  queuePrefix: 设置死信队列前缀

  useQueueForQueueMessages: 设置使用队列保存死信,还可以设置 useQueueForTopicMessages,使用 Topic 来保存死信

  —

  individualDeadLetterStrategy  queuePrefix= DLQ. useQueueForQueueMessages= true   processExpired= false processNonPersistent= false /

  /deadLetterStrategy

  /policyEntry

  /policyEntries

  /policyMap

  /destinationPolicy

  …

/broker

  在一个电子系统中可能接受来自不同供应商的各种订单信息,不同类型的订单走的流程不尽相同,为了快速处理各种不同的订单完成不同的业务。特定义不同的路由 信息。根据路由信息的不同,将消息进行不同的处理。如果采用 ActiveMQ 那么最好采用 apache-camel 整合,使不同的消息根据不同的流程自动 处理到不同的队列中去。

beans

broker brokerName= testBroker

transportConnectors

transportConnector uri= tcp://localhos:61616

/transportConnectors

import resource= camel.xml

/beans

到此,关于“activemq 特性是什么”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注丸趣 TV 网站,丸趣 TV 小编会继续努力为大家带来更多实用的文章!

正文完
 
丸趣
版权声明:本站原创文章,由 丸趣 2023-08-16发表,共计10362字。
转载说明:除特殊说明外本站除技术相关以外文章皆由网络搜集发布,转载请注明出处。
评论(没有评论)