使用 ActiveMQ 当做 JMS 提供者,写一个简单的 demo。

主要是模拟一个聊天室的样子,下面是源代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
package com.mycompany.app;

import java.io.BufferedReader;
import java.io.InputStreamReader;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
* Hello world!
*
*/
public class App implements MessageListener {

private TopicSession pubSession;
private TopicPublisher publisher;
private TopicConnection connection;
private String username;
private static String user = ActiveMQConnection.DEFAULT_USER;
private static String password = ActiveMQConnection.DEFAULT_PASSWORD;
private static String url = ActiveMQConnection.DEFAULT_BROKER_URL;

public App(String username) throws Exception {
this.username = username;//聊天室的用户名
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(user, password, url);//创建ActiveMQ提供的链接工厂
connection = factory.createTopicConnection();
pubSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);//创建生产者 session
TopicSession subSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);//创建消费者 session

Topic topic = subSession.createTopic("myTopic");//创建一个目的地
publisher = pubSession.createPublisher(topic);//生产者 session 创建生产者,目的地是上面的
TopicSubscriber subscriber = subSession.createSubscriber(topic, null, true);//消费者创建,目的地相同
subscriber.setMessageListener(this);//消费者监听本身
connection.start();
}

public static void main(String[] args) throws Exception {
App app = new App(args[0]);

BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));

while (true) {
String line = reader.readLine();
if ("exit".equals(line)) {
app.close();
} else {
app.writeMessage(line);
}
}
}

public void onMessage(Message msg) {
TextMessage message = (TextMessage) msg;
try {
System.out.println(message.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}

public void writeMessage(String text) throws Exception {
TextMessage message = pubSession.createTextMessage();
message.setText(username + " : " + text);
publisher.publish(message);
}

public void close() throws Exception {
connection.close();
}
}

安装 ActiveMQ

mac 安装比较简单

install activemq```
1
2
3
4
5
6
7
8
9
10
11
12
13

ActiveMQ 会被默认安装到 /usr/local/Cellar/activemq。

先运行 `activemq setup ~/.activemqrc` 来指定 activemq 的环境配置文件。

运行 activemq start 可以在一个独立进程中启动 activemq。

```bash
$ activemq start
INFO: Loading '/usr/local/Cellar/activemq/5.11.1/libexec/bin/env'
INFO: Using java '/Library/Java/JavaVirtualMachines/jdk1.7.0_71.jdk/Contents/Home/bin/java'
INFO: Starting - inspect logfiles specified in logging.properties and log4j.properties to get details
INFO: pidfile created : '/usr/local/Cellar/activemq/5.11.1/libexec/data/activemq.pid' (pid '4880')

终止 ActiveMQ 的运行有两种方式。一种是使用 activemq stop。

另一种则是暴力的杀死进程,即 kill 4880。

运行

通过 maven 启动 3个 main 方法:

1
2
3
4
5
mvn exec:java -Dexec.mainClass="com.mycompany.app.App" -Dexec.args="rcx1"

mvn exec:java -Dexec.mainClass="com.mycompany.app.App" -Dexec.args="rcx2"

mvn exec:java -Dexec.mainClass="com.mycompany.app.App" -Dexec.args="rcx3"

然后在其中一个里面说一句话,其他都会收到信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// rcx2
rcx1 : hello
hello two
rcx3 : 大家好

// rcx1
hello
rcx2 : hello two
rcx3 : 大家好

// rcx3
大家好
rcx1 : hello
rcx2 : hello two

【参考资料】

  1. java消息服务

—EOF—