消息队列工具 ActiveMQ
1 、简介
- 同类产品: RabbitMQ 、 Kafka、Redis(List)
1.1 对比RabbitMQ
- 最接近的同类型产品,经常拿来比较,性能伯仲之间,基本上可以互相替代。最主要区别是二者的协议不同RabbitMQ的协议是AMQP(Advanced Message Queueing Protoco),而ActiveMQ使用的是JMS(Java Messaging Service )协议。顾名思义JMS是针对Java体系的传输协议,队列两端必须有JVM,所以如果开发环境都是java的话推荐使用ActiveMQ,可以用Java的一些对象进行传递比如Map、BLob、Stream等。而AMQP通用行较强,非java环境经常使用,传输内容就是标准字符串。
另外一点就是RabbitMQ用Erlang开发,安装前要装Erlang环境,比较麻烦。ActiveMQ解压即可用不用任何安装。
1.2 对比KafKa
- Kafka性能超过ActiveMQ等传统MQ工具,集群扩展性好。
弊端是:
在传输过程中可能会出现消息重复的情况,
不保证发送顺序
一些传统MQ的功能没有,比如消息的事务功能。
所以通常用Kafka处理大数据日志。
1.3 对比Redis
- 其实Redis本身利用List可以实现消息队列的功能,但是功能很少,而且队列体积较大时性能会急剧下降。对于数据量不大、业务简单的场景可以使用。
2 安装 ActiveMQ
拷贝apache-activemq-5.14.4-bin.tar.gz到Linux服务器的/opt下
解压缩
text
1
tar -zxvf apache-activemq-5.14.4-bin.tar.gz
重命名
text
1
mv apache-activemq-5.14.4 activemq
更改配置文件
text
1
vim /opt/activemq/bin/activemq
增加两行
text
1
2
JAVA_HOME="/opt/jdk1.8.0_152"
JAVA_CMD="/opt/jdk1.8.0_152/bin"

注册服务
text
1
2
ln -s /opt/activemq/bin/activemq /etc/init.d/activemq
chkconfig --add activemq
启动服务
text
1
service activemq start

关闭服务
text
1
service activemq stop
通过netstat 查看端口

activemq两个重要的端口,一个是提供消息队列的默认端口:61616
另一个是控制台端口8161
通过控制台测试
启动消费端

进入网页控制台

账号/密码默认: admin/admin
3 在Java中使用消息队列
3.1 在gmall-service-util中导入依赖坐标
text
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>5.15.2</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
3.2 producer端
text
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public static void main(String[] args) {
ConnectionFactory connect = new ActiveMQConnectionFactory("tcp://192.168.67.163:61616");
try {
Connection connection = connect.createConnection();
connection.start();
//第一个值表示是否使用事务,如果选择true,第二个值相当于选择0
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue testqueue = session.createQueue("TEST1");
MessageProducer producer = session.createProducer(testqueue);
TextMessage textMessage=new ActiveMQTextMessage();
textMessage.setText("今天天气真好!");
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
producer.send(textMessage);
session.commit();
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
3.3 consumer端
text
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public static void main(String[] args) {
ConnectionFactory connect = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,ActiveMQConnection.DEFAULT_PASSWORD,"tcp://192.168.67.163:61616");
try {
Connection connection = connect.createConnection();
connection.start();
//第一个值表示是否使用事务,如果选择true,第二个值相当于选择0
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination testqueue = session.createQueue("TEST1");
MessageConsumer consumer = session.createConsumer(testqueue);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
if(message instanceof TextMessage){
try {
String text = ((TextMessage) message).getText();
System.out.println(text);
//session.rollback();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
});
}catch (Exception e){
e.printStackTrace();;
}
}
3.4 关于事务控制
| producer****提交时的事务 | 事务开启 | *只执行send并不会提交到队列中,只有当执行***session.commit()**时,消息才被真正的提交到队列中。 |
|---|---|---|
| 事务不开启 | 只要执行send,就进入到队列中。 | |
| consumer 接收时的事务 | 事务开启,签收必须写 Session.SESSION_TRANSACTED | *收到消息后,消息并没有真正的被消费。消息只是被锁住。一旦出现该线程死掉、抛异常,或者程序执行了***session.rollback()**那么消息会释放,重新回到队列中被别的消费端再次消费。 |
| 事务不开启,签收方式选择 Session.AUTO_ACKNOWLEDGE | 只要调用comsumer.receive方法 ,自动确认。 | |
| 事务不开启,签收方式选择 Session.CLIENT_ACKNOWLEDGE | 需要客户端执行 message.acknowledge(),否则视为未提交状态,线程结束后,其他线程还可以接收到。 这种方式跟事务模式很像,区别是不能手动回滚,而且可以单独确认某个消息。 | |
| 事务不开启,签收方式选择 Session.DUPS_OK_ACKNOWLEDGE | 在Topic模式下做批量签收时用的,可以提高性能。但是某些情况消息可能会被重复提交,使用这种模式的consumer要可以处理重复提交的问题。 |
3.5 持久化与非持久化
通过producer.setDeliveryMode(DeliveryMode.PERSISTENT) 进行设置
持久化的好处就是当activemq宕机的话,消息队列中的消息不会丢失。非持久化会丢失。但是会消耗一定的性能。
四 与springboot整合
1 配置类ActiveMQConfig
text
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
@Configuration
public class ActiveMQConfig {
@Value("${spring.activemq.broker-url:disabled}")
String brokerURL ;
@Value("${activemq.listener.enable:disabled}")
String listenerEnable;
@Bean
public ActiveMQUtil getActiveMQUtil() throws JMSException {
if(brokerURL.equals("disabled")){
return null;
}
ActiveMQUtil activeMQUtil=new ActiveMQUtil();
activeMQUtil.init(brokerURL);
return activeMQUtil;
}
//定义一个消息监听器连接工厂,这里定义的是点对点模式的监听器连接工厂
@Bean(name = "jmsQueueListener")
public DefaultJmsListenerContainerFactory jmsQueueListenerContainerFactory(ActiveMQConnectionFactory activeMQConnectionFactory ) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
if(!listenerEnable.equals("true")){
return null;
}
factory.setConnectionFactory(activeMQConnectionFactory);
//设置并发数
factory.setConcurrency("5");
//重连间隔时间
factory.setRecoveryInterval(5000L);
factory.setSessionTransacted(false);
factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
return factory;
}
@Bean
public ActiveMQConnectionFactory activeMQConnectionFactory ( ){
/* if((url==null||url.equals(""))&&!brokerURL.equals("disabled")){
url=brokerURL;
}*/
ActiveMQConnectionFactory activeMQConnectionFactory =
new ActiveMQConnectionFactory( brokerURL);
return activeMQConnectionFactory;
}
}
2 工具类ActiveMQUtil
text
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class ActiveMQUtil {
PooledConnectionFactory pooledConnectionFactory=null;
public ConnectionFactory init(String brokerUrl) {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrl);
//加入连接池
pooledConnectionFactory=new PooledConnectionFactory(factory);
//出现异常时重新连接
pooledConnectionFactory.setReconnectOnException(true);
//
pooledConnectionFactory.setMaxConnections(5);
pooledConnectionFactory.setExpiryTimeout(10000);
return pooledConnectionFactory;
}
public ConnectionFactory getConnectionFactory(){
return pooledConnectionFactory;
}
}





评论
登录后即可评论
分享你的想法,与作者互动
暂无评论