Spring集成 JMS OpenMQ
前端时间采用JMS API 直接访问 OpenMQ JMS服务器会出现一个现象,当JMS服务器down掉以后或者重启以后,JMS的接收端将无法工作,如果将程序改成MDB的方式将违背了我们的初衷,所以采用一个相对折中的办法,采用Spring整合JMS OpenMQ。
不仅可以解决我们现在存在的问题,并且有以下优势:
1、占用资源资源少,对硬件配置要求低
2、部署简单、灵活,不限制于某种特定的J2EE容器
3、依托微容器管理扩展性强
代码如下:
连接器
package com.javabloger.jms;
import java.util.Enumeration;
import java.util.Properties;
import javax.jms.XAConnectionFactory;
public class OpenMqConnectionFactory {
private Properties props;
public void setProperties(Properties props) {
this.props = props;
}
public XAConnectionFactory createConnectionFactory(){
com.sun.messaging.XAConnectionFactory cf =
new com.sun.messaging.XAConnectionFactory();
try{
Enumeration<?> keys = props.propertyNames();
while (keys.hasMoreElements()) {
String name = (String)keys.nextElement();
String value = props.getProperty(name);
cf.setProperty(name, value);
}
} catch (Exception e){
throw new RuntimeException(
"MQConnectionFactoryFactory.createConnectionFactory() failed: "+
e.getMessage(), e);
}
return cf;
}
}
配置文件
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:tx="http://www.springframework.org/schema/tx"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-2.5.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-2.5.xsd">
<bean id="connectionfactoryfactory" class="com.javabloger.jms.OpenMqConnectionFactory">
<property name="properties">
<props>
<prop key="imqAddressList">192.168.20.210:7677,192.168.20.211:7677</prop>
</props>
</property>
</bean>
<bean id="mqConnectionFactory" factory-bean="connectionfactoryfactory"
factory-method="createConnectionFactory" />
<bean id="testmq" class="com.sun.messaging.Queue">
<constructor-arg type="java.lang.String" value="testmq" />
</bean>
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="mqConnectionFactory" />
<property name="defaultDestination" ref="testmq" />
<property name="receiveTimeout" value="20000" />
</bean>
<bean id="messageListener1"
class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<constructor-arg>
<bean class="com.javabloger.jms.SimpleMessageListener" />
</constructor-arg>
<property name="defaultListenerMethod" value="receive" />
<property name="messageConverter">
<null />
</property>
</bean>
<bean id="consumercontainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="mqConnectionFactory"/>
<property name="destination" ref="testmq"/>
<property name="messageListener" ref="messageListener1"/>
<property name="transactionTimeout" value="180000"/>
<property name="receiveTimeout" value="180000"/>
<property name="sessionTransacted" value="true" />
</bean>
<!–
<bean id="jmsContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="mqConnectionFactory" />
<property name="destination" ref="testmq" />
<property name="messageListener" ref="messageListener1" />
</bean>
–>
</beans>
接收端
package com.javabloger.jms;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
public class SimpleMessageListener implements MessageListener {
public void onMessage(Message message) {
if (message instanceof TextMessage) {
try {
System.out.println("Received message: "+
((TextMessage) message).getText());
}
catch (JMSException ex) {
System.out.println(
"SimpleMessageListener.onMessage(): got exception: "+ex.getMessage());
ex.printStackTrace();
throw new RuntimeException(ex);
}
}
else {
throw new IllegalArgumentException(
"MessageListener.onMessage(): Message must be of type TextMessage");
}
}
}
启动Srping与JMS集成的客户端代码
package com.javabloger.jms;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class MessageConsumer {
public static void main(String[] args) {
ApplicationContext context = new ClassPathXmlApplicationContext( "jms.xml");
System.out.println( context.getId() ) ;
}
}
发送端代码
package com.javabloger.jms;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import com.sun.messaging.ConnectionConfiguration;
import com.sun.messaging.ConnectionFactory;
import com.sun.messaging.Queue;
public class QueueSender {
/**
* @param args
* @throws JMSException
*/
public static void main(String[] args) throws JMSException {
ConnectionFactory myConnFactory;
myConnFactory = new com.sun.messaging.ConnectionFactory();
myConnFactory.setProperty(ConnectionConfiguration.imqAddressList, "mq://192.168.20.211:7677");
myConnFactory.setProperty(ConnectionConfiguration.imqReconnectEnabled, "true");
Connection myConn = myConnFactory.createConnection();
myConn.start();
//Step 4:
//Create a session within the connection.
Session mySess = myConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue myTopic=new com.sun.messaging.Queue("testmq");
MessageProducer myMsgProducer = mySess.createProducer(myTopic);
TextMessage myTextMsg = mySess.createTextMessage();
for (int i=0;i<10;i++){
myTextMsg.setText("Queue Msg ID: "+i+" "+ new java.util.Date() );
System.out.println( myTextMsg.getText());
myMsgProducer.send(myTextMsg);
}
mySess.close();
myConn.close();
}
}
相关文章
大型系统中使用JMS优化技巧–Sun OpenMQ
GlassFish JMS 集群
Sun OpenMQ Topic消息收/发 —Tips
GlassFish OpenMQ JDBC
–end–

本文由J2ee企业顾问-黄毅创作,并已采用创作共用署名2.5中国大陆版许可证授权。





