博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
activemq 的那些事1
阅读量:5168 次
发布时间:2019-06-13

本文共 6356 字,大约阅读时间需要 21 分钟。

 

#关于事务:

activemq 遇到的不能消息确认的问题。

Session session = connection.createSession(Boolean.FALSE,   Session.AUTO_ACKNOWLEDGE); 

有两种情况:

1 开启事务

当createSession第一个参数为true时,表示创建的session被标记为transactional的,确认消息就通过确认和校正来自动地处理,第二个参数应该是没用的。   此时需要我们手动 session.commit(); , 否则此session发送的消息是不会发送到activemq 中去,而只是存在于当前jvm 内存..

 

2 不开启事务

当createSession的第一个参数为false时,表示创建的session没有标记为transactional, 此时不能执行session.commit(), 否则报错的, 因为: 

public void commit() throws JMSException {        this.checkClosed();        if (!this.getTransacted()) {            throw new IllegalStateException("Not a transacted session");        } else {            if (LOG.isDebugEnabled()) {                LOG.debug(this.getSessionId() + " Transaction Commit :" + this.transactionContext.getTransactionId());            }            this.transactionContext.commit();        }    }

 

此时有三种用于消息确认的选项: 

**AUTO_ACKNOWLEDGE session将自动地确认收到的一则消息; 
**CLIENT_ACKNOWLEDGE 客户端程序将确认收到的一则消息,调用这则消息的确认方法; 
**DUPS_OK_ACKNOWLEDGE 这个选项命令session“懒散的”确认消息传递,可以想到,这将导致消息提供者传递的一些复制消息可能出错。 

 

#关于消息持久化:

有两种方式NON_PERSISTENT、PERSISTENT

MS有两种消息传递方式。标记为NON_PERSISTENT的消息最多传递一次,而标记为PERSISTENT的消息将使用暂存后再转发的机理投递。如果一个JMS服务离线,那么持久性消息不会丢失,但是得等到这个服务恢复联机的时候才会被传递。所以默认的消息传递方式是非持久性的,虽然使用非持久性消息可能降低内存和需要的存储器,但这种传递方式只有当你不需要接收所有消息时才使用。

 

我自己的理解:  持久化的消息呢, activemq 服务器重启后, 消息还在,  非持久化的消息activemq 服务器重启后, 消息就不在了.. 

 

 

#关于消息无法消费:

消息无法消费通常有各种各样的原因,  这里说的是connection 关闭过早的问题:

下面的代码, 如果  consume();后面直接,      close(); 的话, MessageListener 是消费不到的, 因为 还没来得及消费, connection 就已经关闭了啊..

有一种情况例外,:  consumer.receive(TIME_IN_MILLISECONDS); 方法,    我测试的时候, 如果把 setMessageListener 改成receive, 那么,  我们是可以消费到一条消息的( 只要queue 存在未消费消息 )... 

public class TestActiveMq{    private static final int SEND_NUMBER = 5;    static ConnectionFactory connectionFactory;    // Connection :JMS 客户端到JMS Provider 的连接    static Connection connection = null;    static String admin = "admin";    static String pass = "admin";    static String brokerUrl = "tcp://10.10.10.123:61616";    public static void main(String[] args) throws Exception {        init();        consume();        Thread.sleep(2000);        close();    }    public static void init() {        // ConnectionFactory :连接工厂,JMS 用它创建连接        // Session: 一个发送或接收消息的线程        // TextMessage message;        // 构造ConnectionFactory实例对象,此处采用ActiveMq的实现jar        connectionFactory = new ActiveMQConnectionFactory(                admin,                pass,//                ActiveMQConnection.DEFAULT_PASSWORD,                brokerUrl);        ((ActiveMQConnectionFactory) connectionFactory).setTrustAllPackages(true);        try {            // 构造从工厂得到连接对象            connection = connectionFactory.createConnection();            // 启动            connection.start();            // 获取操作连接        } catch (Exception e) {            e.printStackTrace();        } finally {        }    }    public static void close() {        try {            if (null != connection)                connection.close();        } catch (Throwable ignore) {        }    }    /**     *     * 队列操作     */    public static void consume() throws JMSException {        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);        String qu = null;        qu = "test-persistence";        Queue queue = session.createQueue(qu);        MessageConsumer consumer = session.createConsumer(queue);        System.out.println("consumer = " + consumer);        consumer.setMessageListener(new MessageListener() {            @Override            public void onMessage(Message message) {                System.out.println("message00 ============ " + message);                try {                    message.acknowledge();                } catch (JMSException e) {                    e.printStackTrace();                }            }        });    }    public static void sendMessage()            throws Exception {        Session session;        // Destination :消息的目的地;消息发送给谁.        Destination destination;        // MessageProducer:消息发送者        MessageProducer producer;        session = connection.createSession(Boolean.FALSE,                Session.AUTO_ACKNOWLEDGE);        // 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置        destination = session.createQueue("test-persistence");        // 得到消息生成者【发送者】        producer = session.createProducer(destination);        // 设置不持久化,可以更改        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);        for (int i = 1; i <= SEND_NUMBER; i++) {            TextMessage message = session                    .createTextMessage("ActiveMq 发送的消息" + i);            // 发送消息到目的地方              System.out.println("发送消息:" + i);            long delay = 5 * 1000;            long period = 10 * 1000;            int repeat = 5;//            message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);//            message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period);//            message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeat);//            message.setJMSExpiration(2);//            message.setJMSTimestamp(period);//            producer.setTimeToLive(0);            producer.send(message);        }        // session.commit();    }    /**     *     * 队列操作     */    public static Object browser() throws JMSException {        Session session = connection.createSession(false, 2);        String qu = null;        qu = "test-persistence";        Queue queue = session.createQueue(qu);        QueueBrowser browser = session.createBrowser(queue);        List
msgList = new ArrayList<>(); Enumeration enumeration = browser.getEnumeration(); while (enumeration.hasMoreElements()) {// ObjectMessage o = (ObjectMessage) enumeration.nextElement(); Object o = enumeration.nextElement(); System.out.println("o = " + o);// MessageDto object = (MessageDto) o.getObject();// System.out.println("object = " + object);// msgList.add(object.toString()); } return enumeration; }}

 

 

参考

http://riddickbryant.iteye.com/blog/441890

http://blog.sina.com.cn/s/blog_4d22b9720102uxyr.html

https://blog.csdn.net/zishan007/article/details/45037731

https://blog.csdn.net/wsyyyyy/article/details/79888793

https://blog.csdn.net/seven__________7/article/details/69317106

 

posted on
2018-07-21 16:55 阅读(
...) 评论(
...)

转载于:https://www.cnblogs.com/FlyAway2013/p/9347174.html

你可能感兴趣的文章
macOS Mojave 10.14 无法安装brew缺少Command Line Tools for Xcode的解决办法
查看>>
要么甲必胜,要么乙必胜
查看>>
第三周笔记
查看>>
基于Storm构建实时热力分布项目实战
查看>>
【BZOJ1270】1270: [BeijingWc2008]雷涛的小猫 DP
查看>>
gpload的简单实用
查看>>
第二章 flex输入输出结构
查看>>
Uva 572 Oil Deposits
查看>>
关于Cocos2d-x物理引擎用到的类和使用
查看>>
关于Unity中变量和函数的定义
查看>>
读build to win之感
查看>>
支持向量机
查看>>
Vim 编辑器指令
查看>>
iOS 七大手势之轻拍,长按,旋转手势识别器方法-赵小波
查看>>
NS2入门指导 ---SeaSon & crabhit From DB lab of HIT
查看>>
2016.6.20 eclipse中的jsp文件的字体大小在哪里修改
查看>>
2017.6.30 码云--生成公钥
查看>>
Web框架之Tornado
查看>>
Android 中Webview 自适应屏幕
查看>>
Android自定义 Dialog 对话框
查看>>