1.准备工作
? 1) 下载安装,启动activemq
? 2) 下载activemq?? jar包导入项目
2.消息生产者
class="java">package com.activemq.demo1;
import javax.jms.*;
import org.apache.activemq.*;
/**
* 消息生产者,用于生成并发送消息
*/
public class ProducerTool {
private String user = ActiveMQConnection.DEFAULT_USER;
private String password = ActiveMQConnection.DEFAULT_PASSWORD;
private String url = ActiveMQConnection.DEFAULT_BROKER_URL;
private String subject = "TOOL.DEFAULT";
private Destination destination = null;
private Connection connection = null;
private Session session = null;
private MessageProducer producer = null;
/**
* 初始化
* @throws Exception
*/
private void initialize() throws Exception {
ActiveMQConnectionFactory connectionFactory =
new ActiveMQConnectionFactory(user, password, url);
connection = connectionFactory.createConnection();
/* 创建Session,参数解释:
第一个参数 是否使用事务:当消息发送者向消息提供者(即消息代理)发送消息时,消息发送者等待消息代理的确认,
没有回应则抛出异常,消息发送程序负责处理这个错误。
第二个参数 消息的确认模式:
AUTO_ACKNOWLEDGE : 指定消息提供者在每次收到消息时自动发送确认。消息只向目标发送一次,
但传输过程中可能因为错误而丢失消息。
CLIENT_ACKNOWLEDGE : 由消息接收者确认收到消息,通过调用消息的acknowledge()方法
(会通知消息提供者收到了消息)
DUPS_OK_ACKNOWLEDGE : 指定消息提供者在消息接收者没有确认发送时重新发送消息
(这种确认模式不在乎接收者收到重复的消息)。*/
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue(subject);
producer = session.createProducer(destination);
//设置是否持久化
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
}
/**
* 发送消息
* @param message
* @throws Exception
*/
public void produceMessage(String message) throws Exception {
initialize();
//发送TextMessage,还可发送MapMessage,ObjectMessage,StreamMessage
TextMessage msg = session.createTextMessage(message);
connection.start();
System.out.println("Producer:-> send start.");
producer.send(msg);
System.out.println("Producer:-> send complete.");
close();
}
/**
* 关闭连接
* @throws JMSException
*/
public void close() throws JMSException {
System.out.println("Producer:->Closing Connection.");
if (producer != null)
producer.close();
if (session != null)
session.close();
if (connection != null)
connection.close();
}
}
3.消息消费者
package com.activemq.demo1;
import javax.jms.*;
import javax.jms.Message;
import org.apache.activemq.*;
/**
* 消息消费者,用于接收消息
*/
public class ConsumerTool implements MessageListener {
private String user = ActiveMQConnection.DEFAULT_USER;
private String password = ActiveMQConnection.DEFAULT_PASSWORD;
private String url = ActiveMQConnection.DEFAULT_BROKER_URL;
private String subject = "TOOL.DEFAULT";
private Destination destination = null;
private Connection connection = null;
private Session session = null;
private MessageConsumer consumer = null;
/**
* 初始化
* @throws JMSException
* @throws Exception
*/
private void initialize() throws Exception {
ActiveMQConnectionFactory connectionFactory =
new ActiveMQConnectionFactory(user, password, url);
connection = connectionFactory.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue(subject);
consumer = session.createConsumer(destination);
}
/**
* 消费消息
* @throws Exception
*/
public void consumeMessage() throws Exception {
initialize();
connection.start();
System.out.println("Consumer:->Begin listening...");
// 开始监听
consumer.setMessageListener(this);
}
/**
* 关闭连接
* @throws JMSException
*/
public void close() throws JMSException {
System.out.println("Consumer:->Closing connection");
if (consumer != null)
consumer.close();
if (session != null)
session.close();
if (connection != null)
connection.close();
}
/**
* 消息处理函数
*/
public void onMessage(Message message) {
try {
if (message instanceof TextMessage) {
TextMessage txtMsg = (TextMessage) message;
String msg = txtMsg.getText();
System.out.println("Consumer:->Received textMessage: " + msg);
} else {
System.out.println("Consumer:->Received: " + message);
}
close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
4.测试类
package com.activemq.demo1;
import javax.jms.*;
public class Test {
/**
* @param args
*/
public static void main(String[] args) throws JMSException, Exception {
ConsumerTool consumer = new ConsumerTool();
ProducerTool producer = new ProducerTool();
// 开始监听
consumer.consumeMessage();
// 延时500毫秒之后发送消息
Thread.sleep(500);
producer.produceMessage("Hello, world!");
}
}
?