Spring集成 JMS OpenMQ

29 七月, 2010 (14:15) | GlassFish, J2EE服务器, spring3, 性能 繁体 English    DeliciOus    分享到新浪微博
作者: H.E. | 您可以转载, 但必须以超链接形式标明文章原始出处和作者信息及版权声明
网址: http://www.javabloger.com/article/spring-jms-openmq.html
豆瓣读书 向你推荐有关 GlassFishJ2EE服务器spring3性能、 类别的图书。

前端时间采用JMS API 直接访问 OpenMQ JMS服务器会出现一个现象,当JMS服务器down掉以后或者重启以后,JMS的接收端将无法工作,如果将程序改成MDB的方式将违背了我们的初衷,所以采用一个相对折中的办法,采用Spring整合JMS OpenMQ。

不仅可以解决我们现在存在的问题,并且有以下优势:

1、占用资源资源少,对硬件配置要求低
2、部署简单、灵活,不限制于某种特定的J2EE容器
3、依托微容器管理扩展性强

代码如下:

连接器

package com.javabloger.jms;

import java.util.Enumeration;
import java.util.Properties;

import javax.jms.XAConnectionFactory;

public class OpenMqConnectionFactory {
   
    private Properties props;

    public void setProperties(Properties props) {
        this.props = props;
    }

    public XAConnectionFactory createConnectionFactory(){
        com.sun.messaging.XAConnectionFactory cf =
                new com.sun.messaging.XAConnectionFactory();
        try{
            Enumeration<?> keys = props.propertyNames();
            while (keys.hasMoreElements()) {
                String name = (String)keys.nextElement();
                String value = props.getProperty(name);
                cf.setProperty(name, value);
            }
        } catch (Exception e){
            throw new RuntimeException(
            "MQConnectionFactoryFactory.createConnectionFactory() failed: "+
            e.getMessage(), e);
        }
        return cf;
    }
}
 

配置文件

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop"
    xmlns:tx="http://www.springframework.org/schema/tx"
    xsi:schemaLocation="
     http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
     http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-2.5.xsd
     http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-2.5.xsd">

    <bean id="connectionfactoryfactory" class="com.javabloger.jms.OpenMqConnectionFactory">
        <property name="properties">
            <props>
                <prop key="imqAddressList">192.168.20.210:7677,192.168.20.211:7677</prop>
            </props>
        </property>
    </bean>
 
    <bean id="mqConnectionFactory" factory-bean="connectionfactoryfactory"
        factory-method="createConnectionFactory" />

    <bean id="testmq" class="com.sun.messaging.Queue">
        <constructor-arg type="java.lang.String" value="testmq" />
    </bean>

    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="mqConnectionFactory" />
        <property name="defaultDestination" ref="testmq" />
        <property name="receiveTimeout" value="20000" />
    </bean>

    <bean id="messageListener1"
        class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
        <constructor-arg>
            <bean class="com.javabloger.jms.SimpleMessageListener" />
        </constructor-arg>
        <property name="defaultListenerMethod" value="receive" />
        <property name="messageConverter">
            <null />
        </property>
    </bean>

<bean id="consumercontainer"
       class="org.springframework.jms.listener.DefaultMessageListenerContainer">
       <property name="connectionFactory" ref="mqConnectionFactory"/>
       <property name="destination" ref="testmq"/>
       <property name="messageListener" ref="messageListener1"/>
       <property name="transactionTimeout" value="180000"/>
       <property name="receiveTimeout" value="180000"/>
       <property name="sessionTransacted" value="true" />
</bean>

    <!– 
    <bean id="jmsContainer"
        class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="mqConnectionFactory" />
        <property name="destination" ref="testmq" />
        <property name="messageListener" ref="messageListener1" />
    </bean>
–>
</beans>

接收端

package com.javabloger.jms;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

public class SimpleMessageListener implements MessageListener {

    public void onMessage(Message message) {
        if (message instanceof TextMessage) {
            try {
                System.out.println("Received message: "+
                                      ((TextMessage) message).getText());
           
            }
            catch (JMSException ex) {
                System.out.println(
                "SimpleMessageListener.onMessage(): got exception: "+ex.getMessage());
                ex.printStackTrace();
                throw new RuntimeException(ex);
            }
        }
        else {
            throw new IllegalArgumentException(
            "MessageListener.onMessage(): Message must be of type TextMessage");
        }
    }
}
 

启动Srping与JMS集成的客户端代码

package com.javabloger.jms;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class MessageConsumer {

    public static void main(String[] args) {
       
        ApplicationContext context =  new ClassPathXmlApplicationContext( "jms.xml");
          System.out.println( context.getId()   ) ;
         
    }

}

发送端代码

package com.javabloger.jms;

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import com.sun.messaging.ConnectionConfiguration;
import com.sun.messaging.ConnectionFactory;
import com.sun.messaging.Queue;

public class QueueSender {

    /**
     * @param args
     * @throws JMSException
     */
    public static void main(String[] args) throws JMSException {
 
         ConnectionFactory myConnFactory;
         myConnFactory = new com.sun.messaging.ConnectionFactory();
         myConnFactory.setProperty(ConnectionConfiguration.imqAddressList,  "mq://192.168.20.211:7677");
         myConnFactory.setProperty(ConnectionConfiguration.imqReconnectEnabled, "true");
         Connection myConn = myConnFactory.createConnection();
         myConn.start();
         //Step 4:
         //Create a session within the connection.
         Session mySess = myConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
         Queue myTopic=new com.sun.messaging.Queue("testmq");
         MessageProducer myMsgProducer = mySess.createProducer(myTopic);
        
        
         TextMessage myTextMsg = mySess.createTextMessage();
         for (int i=0;i<10;i++){
             myTextMsg.setText("Queue Msg ID: "+i+"  "+ new java.util.Date() );
             System.out.println( myTextMsg.getText()); 
             myMsgProducer.send(myTextMsg);
         }
     
        
         mySess.close();
         myConn.close();
    }

}
 

相关文章
大型系统中使用JMS优化技巧–Sun OpenMQ
GlassFish JMS 集群
Sun OpenMQ Topic消息收/发 —Tips
GlassFish OpenMQ JDBC

–end–

豆瓣读书  向你推荐有关 GlassFish J2EE服务器 spring3 性能、 类别的图书。



Creative Commons License
本文由J2ee企业顾问-黄毅创作,并已采用创作共用署名2.5中国大陆版许可证授权。

评论

评论也是有版权的!




4071