#关于事务:
activemq 遇到的不能消息确认的问题。
Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
有两种情况:
1 开启事务
当createSession第一个参数为true时,表示创建的session被标记为transactional的,确认消息就通过确认和校正来自动地处理,第二个参数应该是没用的。 此时需要我们手动 session.commit(); , 否则此session发送的消息是不会发送到activemq 中去,而只是存在于当前jvm 内存..
2 不开启事务
当createSession的第一个参数为false时,表示创建的session没有标记为transactional, 此时不能执行session.commit(), 否则报错的, 因为:public void commit() throws JMSException { this.checkClosed(); if (!this.getTransacted()) { throw new IllegalStateException("Not a transacted session"); } else { if (LOG.isDebugEnabled()) { LOG.debug(this.getSessionId() + " Transaction Commit :" + this.transactionContext.getTransactionId()); } this.transactionContext.commit(); } }
此时有三种用于消息确认的选项:
**AUTO_ACKNOWLEDGE session将自动地确认收到的一则消息; **CLIENT_ACKNOWLEDGE 客户端程序将确认收到的一则消息,调用这则消息的确认方法; **DUPS_OK_ACKNOWLEDGE 这个选项命令session“懒散的”确认消息传递,可以想到,这将导致消息提供者传递的一些复制消息可能出错。
#关于消息持久化:
有两种方式NON_PERSISTENT、PERSISTENT
MS有两种消息传递方式。标记为NON_PERSISTENT的消息最多传递一次,而标记为PERSISTENT的消息将使用暂存后再转发的机理投递。如果一个JMS服务离线,那么持久性消息不会丢失,但是得等到这个服务恢复联机的时候才会被传递。所以默认的消息传递方式是非持久性的,虽然使用非持久性消息可能降低内存和需要的存储器,但这种传递方式只有当你不需要接收所有消息时才使用。
我自己的理解: 持久化的消息呢, activemq 服务器重启后, 消息还在, 非持久化的消息activemq 服务器重启后, 消息就不在了..
#关于消息无法消费:
消息无法消费通常有各种各样的原因, 这里说的是connection 关闭过早的问题:
下面的代码, 如果 consume();后面直接, close(); 的话, MessageListener 是消费不到的, 因为 还没来得及消费, connection 就已经关闭了啊..
有一种情况例外,: consumer.receive(TIME_IN_MILLISECONDS); 方法, 我测试的时候, 如果把 setMessageListener 改成receive, 那么, 我们是可以消费到一条消息的( 只要queue 存在未消费消息 )...
public class TestActiveMq{ private static final int SEND_NUMBER = 5; static ConnectionFactory connectionFactory; // Connection :JMS 客户端到JMS Provider 的连接 static Connection connection = null; static String admin = "admin"; static String pass = "admin"; static String brokerUrl = "tcp://10.10.10.123:61616"; public static void main(String[] args) throws Exception { init(); consume(); Thread.sleep(2000); close(); } public static void init() { // ConnectionFactory :连接工厂,JMS 用它创建连接 // Session: 一个发送或接收消息的线程 // TextMessage message; // 构造ConnectionFactory实例对象,此处采用ActiveMq的实现jar connectionFactory = new ActiveMQConnectionFactory( admin, pass,// ActiveMQConnection.DEFAULT_PASSWORD, brokerUrl); ((ActiveMQConnectionFactory) connectionFactory).setTrustAllPackages(true); try { // 构造从工厂得到连接对象 connection = connectionFactory.createConnection(); // 启动 connection.start(); // 获取操作连接 } catch (Exception e) { e.printStackTrace(); } finally { } } public static void close() { try { if (null != connection) connection.close(); } catch (Throwable ignore) { } } /** * * 队列操作 */ public static void consume() throws JMSException { Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); String qu = null; qu = "test-persistence"; Queue queue = session.createQueue(qu); MessageConsumer consumer = session.createConsumer(queue); System.out.println("consumer = " + consumer); consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { System.out.println("message00 ============ " + message); try { message.acknowledge(); } catch (JMSException e) { e.printStackTrace(); } } }); } public static void sendMessage() throws Exception { Session session; // Destination :消息的目的地;消息发送给谁. Destination destination; // MessageProducer:消息发送者 MessageProducer producer; session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置 destination = session.createQueue("test-persistence"); // 得到消息生成者【发送者】 producer = session.createProducer(destination); // 设置不持久化,可以更改 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); for (int i = 1; i <= SEND_NUMBER; i++) { TextMessage message = session .createTextMessage("ActiveMq 发送的消息" + i); // 发送消息到目的地方 System.out.println("发送消息:" + i); long delay = 5 * 1000; long period = 10 * 1000; int repeat = 5;// message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);// message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period);// message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeat);// message.setJMSExpiration(2);// message.setJMSTimestamp(period);// producer.setTimeToLive(0); producer.send(message); } // session.commit(); } /** * * 队列操作 */ public static Object browser() throws JMSException { Session session = connection.createSession(false, 2); String qu = null; qu = "test-persistence"; Queue queue = session.createQueue(qu); QueueBrowser browser = session.createBrowser(queue); ListmsgList = new ArrayList<>(); Enumeration enumeration = browser.getEnumeration(); while (enumeration.hasMoreElements()) {// ObjectMessage o = (ObjectMessage) enumeration.nextElement(); Object o = enumeration.nextElement(); System.out.println("o = " + o);// MessageDto object = (MessageDto) o.getObject();// System.out.println("object = " + object);// msgList.add(object.toString()); } return enumeration; }}
参考
http://riddickbryant.iteye.com/blog/441890
http://blog.sina.com.cn/s/blog_4d22b9720102uxyr.html
https://blog.csdn.net/zishan007/article/details/45037731
https://blog.csdn.net/wsyyyyy/article/details/79888793
https://blog.csdn.net/seven__________7/article/details/69317106