`
dwj147258
  • 浏览: 187714 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
社区版块
存档分类
最新评论

项目中实现JMS消息的发送

阅读更多

   一、JMS简介

       JMS消息可以有效的调动程序中的各种动作,例如,当我们完成一个动作后,我们需要一些程序完成他们自己相应的动作,这时候我们只需要发送一个消息出来,当他们接收到这个消息时,就可以完成自己的事情,是一个很方便的技术,现在用到的JMS消息一般都是通过ActiveMQ来完成,ActIveMQ是一个成熟的框架,可以通过tcp发送JMS,还可以在程序内发送JMS,下面来通过一个实例来介绍。

二、环境搭建

      这个实例是在Spring的基础上完成的,所以需要导入spring的所有jar包,当然还需要导入activemq的jar包,下载activemq以后就可以找到一个activemq-all.jar的jar包,导入即可

三、配置文件简介

      首先application.xml配置文件内容如下

 

<?xml version="1.0" encoding="UTF-8"?>
  <beans xmlns="http://www.springframework.org/schema/beans"  
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
          xmlns:context="http://www.springframework.org/schema/context"
          xmlns:jms="http://www.springframework.org/schema/jms"
          xmlns:amq="http://activemq.apache.org/schema/core"
          xmlns:aop="http://www.springframework.org/schema/aop"
        xsi:schemaLocation=" http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.1.xsd 
        	  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd
              http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd  
              http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.0.xsd
              http://www.springframework.org/schema/context
        http://www.springframework.org/schema/context/spring-context-3.0.xsd">  
        <!-- 激活组件扫描功能,在包cn.ysh.studio.spring.aop及其子包下面自动扫描通过注解配置的组件 -->
     <context:annotation-config />  
	<context:component-scan base-package="main.java.com"/>
	<!-- 激活自动代理功能 -->
	<aop:aspectj-autoproxy proxy-target-class="true"/>
	<import resource="jms_client.xml"/>
  </beans>

 这里的spring就是完成了组件扫描以及aop代理的一些配置,然后就是导入配置activemq的配置文件,如下

 

 

<?xml version="1.0" encoding="UTF-8"?>
  <beans xmlns="http://www.springframework.org/schema/beans"  
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
          xmlns:context="http://www.springframework.org/schema/context"
          xmlns:jms="http://www.springframework.org/schema/jms"
          xmlns:amq="http://activemq.apache.org/schema/core"
          xmlns:aop="http://www.springframework.org/schema/aop"
        xsi:schemaLocation=" http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.1.xsd 
        	  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd
              http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd  
              http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.0.xsd
              http://www.springframework.org/schema/context
        http://www.springframework.org/schema/context/spring-context-3.0.xsd">  
        <!-- 激活组件扫描功能,在包cn.ysh.studio.spring.aop及其子包下面自动扫描通过注解配置的组件 -->
	<bean id="broker" class="org.apache.activemq.xbean.BrokerFactoryBean">   
     <property name="config" value="classpath:main/java/conf/ActiveMQConfig.xml" />   
     <property name="start"  value="true" />   
    </bean>    
     <bean id="myamqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">    
         <property name="brokerURL" value="tcp://10.10.11.37:61616"/>    
         <property name="trustedPackages">  
        <list>  
            <value>main.java</value>  
        </list>  
    </property>   
    </bean>  
     <bean id="myconnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">  
        <property name="targetConnectionFactory" ref="myamqConnectionFactory"></property>  
        <property name="sessionCacheSize" value="100" />  
    </bean>
    <bean id="myjmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
        <constructor-arg ref="myconnectionFactory" />
        <property name="pubSubDomain" value="false" />
    </bean>

    <bean id="myjmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">
        <constructor-arg ref="myconnectionFactory" />
        <property name="pubSubDomain" value="true" />
    </bean>
  </beans>

 这里第一步因为不是完整的activemq,是spring嵌入式activemq,所以需要启动一个broker来启动activemq,这里需要用到的是一个另外的activemq配置,内容如下:

 

 

<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:amq="http://activemq.apache.org/schema/core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
    <bean
        class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer" />
    <broker useJmx="false" persistent="false"
        xmlns="http://activemq.apache.org/schema/core">
        <transportConnectors>
            <transportConnector uri="tcp://10.10.11.37:61616"/>
        </transportConnectors>
    </broker>
</beans>

 启动完broker之后就算是完成了acrivemqmq的启动,接下来就开始定义连接工厂了,连接工厂需要制定brokerURL和信任包(不知道有什么卵用),接着就是吧activemq的连接工厂转换成spring的连接工厂,最后两个就不多说了,一个是队列消息templete一个是主题消息template,都是非常简单的,其中如果我们把tcp改为vm则是在程序内发送JMS消息

 

四、一个服务

     在这里我们需要写一个服务用来注册消息监听器,发送消息 , 先把代码贴出来吧

 

package main.java.com.consumer;

import java.io.Serializable;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.Topic;

import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Component;

@Component("msgService")
public class MsgServiceImpl {
	//存放注册了的监听器
	private Map<String ,Object> consumers = new ConcurrentHashMap<String , Object>();
	
	@Autowired
	@Qualifier("myjmsQueueTemplate")
	private JmsTemplate queueTemplate ;
	
	@Autowired
	@Qualifier("myjmsTopicTemplate")
	private JmsTemplate topicTemplate ;
	
	@Autowired
	@Qualifier("myconnectionFactory")
	private ConnectionFactory connectionFactory ;
	
	public void sendMsg(String destation ,final Serializable message){
		if(consumers.containsKey(destation)){
			if(consumers.get(destation) instanceof Queue){
				queueTemplate.send(destation , new MessageCreator() {
					@Override
					public Message createMessage(Session session) throws JMSException {
						return session.createObjectMessage(message);
					}
				});
				return ;
			}
			if(consumers.get(destation) instanceof Topic){
				topicTemplate.send(destation , new MessageCreator() {
					@Override
					public Message createMessage(Session session) throws JMSException {
						return session.createObjectMessage(message);
					}
				});
			}
			return ;
		}
	}
	
	public void registerConsumer(Consumer consumer , JMSConstant type){
		try {
			Connection connection = connectionFactory.createConnection() ;
			connection.start();
			Session session = connection.createSession( false, Session.AUTO_ACKNOWLEDGE) ;
			if(type != null && type.equals(JMSConstant.TYPE_TOPIC)){
				Topic topic = new ActiveMQTopic(consumer.getName()) ;
				MessageConsumer messageConsumer = session.createConsumer(topic) ;
				messageConsumer.setMessageListener(new JmsMessageListener(consumer));
				consumers.put(consumer.getName(), topic) ;
			}else{
				Queue queue = new ActiveMQQueue(consumer.getName()) ;
				MessageConsumer messageConsumer = session.createConsumer(queue) ;
				messageConsumer.setMessageListener(new JmsMessageListener(consumer));
				consumers.put(consumer.getName(), queue) ;
			}
			
		} catch (JMSException e) {
			e.printStackTrace();
		}
	}
}

 简单说一下吧,首先把这个类注册到spring中,名字为msgService 然后定义一个map来存放目的地,接着就是开始定义的template和从测测提哦你Factory了这些都不多说,都是开始定义了的东西

第一个方法:

sendMsg

可以看到他的参数有两个,一个为目的地,一个为消息内容  ,如果目的地未注册则返回不发送消息,发送消息我们选择发送Object消息,当然所有的Object实例都需要实现Serializable序列化,然后创建消息发送出去

 

第二个方法
registerConsumer

注册监听器,这里先介绍接口Consumer

package main.java.com.consumer;

import java.io.Serializable;

public interface Consumer {
	public String getName() ;
	
	public void onMessage(Serializable msg);
}

 所有的监听器都需要实现这个接口,

然后自己定义一个JmsMessageListener :

package main.java.com.consumer;

import java.io.Serializable;

import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;

public class JmsMessageListener implements MessageListener{
	private Consumer consumer ;
	
	public JmsMessageListener(Consumer consumer){
		this.consumer = consumer ;
	}
	@Override
	public void onMessage(Message msg) {
		if(msg != null){
			try{
				consumer.onMessage(createObjectMsg(msg)) ;
			}catch(Exception e){
				e.printStackTrace() ;
			}
		}
	}
	
	private Serializable createObjectMsg(Message msg) throws Exception {
		Serializable message = ((ObjectMessage) msg).getObject() ;
		return message ;
	}
	
}

 这个很简单不多说,这是这个服务两个简单必须的方法、

这样我们只需定义一个监听器,

package main.java.com.consumer;

import java.io.Serializable;

public class JmsConsumer1 implements Consumer{

	@Override
	public String getName() {
		return MsgTypeInfo.TEST.name();
	}

	@Override
	public void onMessage(Serializable msg) {
		System.out.println("JmsConsumer1 收到 消息   "+ msg);
	}
	
}

 然后把它注册到服务,msgService.registerConsumer(new JmsConsumer1(), JMSConstant.TYPE_TOPIC) ;

我们发送一条消息:

msgService.sendMsg(MsgTypeInfo.TEST.name(), "132") ;

就可以看到控制台打印到了接收到的消息。

0
0
分享到:
评论

相关推荐

    Spring 实现远程访问详解——jms和activemq

    把它应用到实际的业务需求中的话我们可以在特定的时候利用生产者生成一消息,并进行发送,对应的消费者在接收到对应的消息后去完成对应的业务逻辑。 JMS 支持两种消息传递模型: 点对点(point-to-point,简称 PTP)...

    ehcache-jms-wan-replicator:Ehcache 的 JMS 复制模块的可定制实现; 可用于在 GSLBd 应用程序中通过 Nevado SQSSNS 通过 WAN 连接的不同数据中心之间广播流线型的 Ehcache 事件

    ehcache-jms-wan-replicator 这是一个可定制的模块,它建立在 Ehcache 的复制框架(特别是 JMS 复制缓存模块)之上,它允许在...在简化的 JMS TextMessage 中添加事件批处理,优化用于在一条消息中发送数百个事件; 与

    JMShub:与JMS相关的应用程序的父项目

    JMS提供者:实现JMS接口并提供管理控制的消息系统。 JMS客户端:产生和使用消息的组件。 Java EE或Java SE应用程序。 消息:传达信息的对象。 受管理的对象:用于配置的对象。 示例:目标工厂和连接工厂。 这...

    activeMQ点对点map消息

    使用ActiveMQ实现了JMS的点对点的消息模型,将发送者和接受者分别放在了两个不同的项目中。

    微服务架构的分布式事务解决方案,完整31讲-龙果学院

    第05节--JMS规范的消息发送与接收特点 第06节--消息重复发送问题及业务接口的幂等性设计 第07节--可靠消息服务方案1(本地消息服务)的分析与设计 第08节--可靠消息服务方案2(独立消息服务)的分析与设计 第09...

    JAVA上百实例源码以及开源项目源代码

    发送消息,同时对文本进行少量修改,发送end-of-messages消息,最后关闭连接。 Tcp服务端与客户端的JAVA实例源代码 2个目标文件 摘要:Java源码,文件操作,TCP,服务器  Tcp服务端与客户端的JAVA实例源代码,一个简单...

    消息队列服务HazelcastMQ.zip

    hazelcastmq-core:核心 MQ 库,提供 JMS 2.0 类似的 API ,用来发送和接收信息hazelcastmq-camel:Apache Camel 组件,支持 Camel 的集成矿井爱和企业集成模式(EIP)hazelcastmq-jms:JMS 1.1 实现hazelcastmq-...

    Docker学习之搭建ActiveMQ消息服务的方法步骤

    在生产项目中,很多时候需要消息中间件来进行分布式系统间的通信。它具有低耦合、可靠投递、广播、流量控制、最终一致性等一系列功能。本篇主要介绍ActiveMQ 相关概念以及安装说明,后面会着重介绍 SpringBoot 集成...

    weixin-popular-mutil-account:weixin-popular-mutil-account

    用户发送的消息首先到weiXin服务器,weiXin服务器发送消息到我们的指定地址,由w-server接收,w-server接收到消息后发送消息到jms队列并返回空字符串给weiXin 服务器,其它的webApp异步监听jms队列。 发送消息 当...

    JAVA上百实例源码以及开源项目

    发送消息,同时对文本进行少量修改,发送end-of-messages消息,最后关闭连接。 Tcp服务端与客户端的JAVA实例源代码 2个目标文件 摘要:Java源码,文件操作,TCP,服务器  Tcp服务端与客户端的JAVA实例源代码,一个简单...

    cdp-jms:该项目提供收集订单、将这些订单交付给出租车服务、处理它们并创建报告的架构

    cdp-jms 该测试项目旨在学习下一组技术: SpringMVC Hibernate联合管理系统REST-服务REST 请求JSP(Javascript,Ajax) 它由以下模块组成: 核。 它有域模型和一些实用程序类。 发件人。 客户可以根据需要订购出租车...

    经典JAVA.EE企业应用实战.基于WEBLOGIC_JBOSS的JSF_EJB3_JPA整合开发.pdf

    第6章 利用JMS实现企业消息处理 226 6.1 面向消息的架构和JMS概述 227 6.1.1 面向消息的应用架构 227 6.1.2 JMS的基础知识和优势 228 6.1.3 JMS的两个重要版本 229 6.2 PTP类型的JMS 230 6.2.1 配置PTP的JMS服务器 ...

    springCloud

    管理和传播所有分布式项目中的消息,本质是利用了MQ的广播机制在分布式的系统中传播消息,目前常用的有Kafka和RabbitMQ 。 下面是一个配置中心刷新配置的例子 1、提交代码触发post请求给bus/refresh 2、server端...

    PHP实现的AMQPphp-amqplib.zip

    php-amqplib是Advanced ...作为线路层协议,而不是API(例如JMS),AMQP 客户端能够无视消息的来源任意发送和接受信息。现在,已经有相当一部分不同平台的服务器和客户端可以投入使用。 标签:phpamqplib

    java开源包3

    LemonSMS 这个Java库可以让开发者在应用程序中集成使用GSM调制解调器或兼容电话来发送SMS消息。 远程桌面 Java Remote Desktop.tar Java Remote Desktop 是一个Java 的远程桌面软件,支持很多特性例如文件传输、...

    java开源包4

    LemonSMS 这个Java库可以让开发者在应用程序中集成使用GSM调制解调器或兼容电话来发送SMS消息。 远程桌面 Java Remote Desktop.tar Java Remote Desktop 是一个Java 的远程桌面软件,支持很多特性例如文件传输、...

    Spring中文帮助文档

    2.4.4. 异步的JMS 2.4.5. JDBC 2.5. Web层 2.5.1. Spring MVC合理的默认值 2.5.2. Portlet 框架 2.5.3. 基于Annotation的控制器 2.5.4. Spring MVC的表单标签库 2.5.5. 对Tiles 2 支持 2.5.6. 对JSF 1.2支持...

    java 面试题 总结

    在实现中,assertion就是在程序中的一条语句,它对一个boolean表达式进行检查,一个正确程序必须保证这个boolean表达式的值为true;如果该值为false,说明程序已经处于不正确的状态下,系统将给出警告或退出。...

    java面试宝典

    201、你在项目中用到了xml技术的哪些方面?如何实现的? 48 202、用jdom解析xml文件时如何解决中文问题?如何解析? 48 203、编程用JAVA解析XML的方式. 49 204、EJB2.0有哪些内容?分别用在什么场合? EJB2.0和EJB1.1的...

Global site tag (gtag.js) - Google Analytics