消息队列工具 ActiveMQ

FAILED
小兔叽 可爱的博主

时间: 2020-10-14 阅读: 88 字数:9256

sjxi.cn
MQ是消息中间件,是一种在分布式系统中应用程序借以传递消息的媒介,常用的有ActiveMQ,RabbitMQ,kafka。ActiveMQ是Apache下的开源项目,完全支持JMS1.1和J2EE1.4规范的JMS Provider实现。

消息队列工具 ActiveMQ

1 、简介

  • 同类产品: RabbitMQ 、 Kafka、Redis(List)

1.1 对比RabbitMQ

  • 最接近的同类型产品,经常拿来比较,性能伯仲之间,基本上可以互相替代。最主要区别是二者的协议不同RabbitMQ的协议是AMQP(Advanced Message Queueing Protoco),而ActiveMQ使用的是JMS(Java Messaging Service )协议。顾名思义JMS是针对Java体系的传输协议,队列两端必须有JVM,所以如果开发环境都是java的话推荐使用ActiveMQ,可以用Java的一些对象进行传递比如Map、BLob、Stream等。而AMQP通用行较强,非java环境经常使用,传输内容就是标准字符串。
    另外一点就是RabbitMQ用Erlang开发,安装前要装Erlang环境,比较麻烦。ActiveMQ解压即可用不用任何安装。

1.2 对比KafKa

  • Kafka性能超过ActiveMQ等传统MQ工具,集群扩展性好。
    弊端是:
    在传输过程中可能会出现消息重复的情况,
    不保证发送顺序
    一些传统MQ的功能没有,比如消息的事务功能。
    所以通常用Kafka处理大数据日志。

1.3 对比Redis

  • 其实Redis本身利用List可以实现消息队列的功能,但是功能很少,而且队列体积较大时性能会急剧下降。对于数据量不大、业务简单的场景可以使用。

2 安装 ActiveMQ

拷贝apache-activemq-5.14.4-bin.tar.gz到Linux服务器的/opt下

解压缩

  1. tar -zxvf apache-activemq-5.14.4-bin.tar.gz

重命名

  1. mv apache-activemq-5.14.4 activemq

更改配置文件

  1. vim /opt/activemq/bin/activemq

增加两行

  1. JAVA_HOME="/opt/jdk1.8.0_152"
  2. JAVA_CMD="/opt/jdk1.8.0_152/bin"

图片的描述

注册服务

  1. ln -s /opt/activemq/bin/activemq /etc/init.d/activemq
  2. chkconfig --add activemq

启动服务

  1. service activemq start

图片的描述

关闭服务

  1. service activemq stop

通过netstat 查看端口

图片的描述

activemq两个重要的端口,一个是提供消息队列的默认端口:61616
另一个是控制台端口8161

通过控制台测试
启动消费端

图片的描述

进入网页控制台

图片的描述

账号/密码默认: admin/admin

3 在Java中使用消息队列

3.1 在gmall-service-util中导入依赖坐标

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-activemq</artifactId>
  4. <exclusions>
  5. <exclusion>
  6. <groupId>org.slf4j</groupId>
  7. <artifactId>slf4j-log4j12</artifactId>
  8. </exclusion>
  9. </exclusions>
  10. </dependency>
  11. <dependency>
  12. <groupId>org.apache.activemq</groupId>
  13. <artifactId>activemq-pool</artifactId>
  14. <version>5.15.2</version>
  15. <exclusions>
  16. <exclusion>
  17. <groupId>org.slf4j</groupId>
  18. <artifactId>slf4j-log4j12</artifactId>
  19. </exclusion>
  20. </exclusions>
  21. </dependency>

3.2 producer端

  1. public static void main(String[] args) {
  2. ConnectionFactory connect = new ActiveMQConnectionFactory("tcp://192.168.67.163:61616");
  3. try {
  4. Connection connection = connect.createConnection();
  5. connection.start();
  6. //第一个值表示是否使用事务,如果选择true,第二个值相当于选择0
  7. Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
  8. Queue testqueue = session.createQueue("TEST1");
  9. MessageProducer producer = session.createProducer(testqueue);
  10. TextMessage textMessage=new ActiveMQTextMessage();
  11. textMessage.setText("今天天气真好!");
  12. producer.setDeliveryMode(DeliveryMode.PERSISTENT);
  13. producer.send(textMessage);
  14. session.commit();
  15. connection.close();
  16. } catch (JMSException e) {
  17. e.printStackTrace();
  18. }
  19. }

3.3 consumer端

  1. public static void main(String[] args) {
  2. ConnectionFactory connect = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,ActiveMQConnection.DEFAULT_PASSWORD,"tcp://192.168.67.163:61616");
  3. try {
  4. Connection connection = connect.createConnection();
  5. connection.start();
  6. //第一个值表示是否使用事务,如果选择true,第二个值相当于选择0
  7. Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  8. Destination testqueue = session.createQueue("TEST1");
  9. MessageConsumer consumer = session.createConsumer(testqueue);
  10. consumer.setMessageListener(new MessageListener() {
  11. @Override
  12. public void onMessage(Message message) {
  13. if(message instanceof TextMessage){
  14. try {
  15. String text = ((TextMessage) message).getText();
  16. System.out.println(text);
  17. //session.rollback();
  18. } catch (JMSException e) {
  19. // TODO Auto-generated catch block
  20. e.printStackTrace();
  21. }
  22. }
  23. }
  24. });
  25. }catch (Exception e){
  26. e.printStackTrace();;
  27. }
  28. }

3.4 关于事务控制

producer**提交时的事务** 事务开启 只执行**send并不会提交到队列中,只有当执行session.commit()**时,消息才被真正的提交到队列中。
事务不开启 只要执行**send**,就进入到队列中。
consumer 接收时的事务 事务开启,签收必须写 Session.SESSION_TRANSACTED 收到消息后,消息并没有真正的被消费。消息只是被锁住。一旦出现该线程死掉、抛异常,或者程序执行了**session.rollback()**那么消息会释放,重新回到队列中被别的消费端再次消费。
事务不开启,签收方式选择 Session.AUTO_ACKNOWLEDGE 只要调用comsumer.receive方法 ,自动确认。
事务不开启,签收方式选择 Session.CLIENT_ACKNOWLEDGE 需要客户端执行 message.acknowledge(),否则视为未提交状态,线程结束后,其他线程还可以接收到。 这种方式跟事务模式很像,区别是不能手动回滚,而且可以单独确认某个消息。
事务不开启,签收方式选择 Session.DUPS_OK_ACKNOWLEDGE 在Topic模式下做批量签收时用的,可以提高性能。但是某些情况消息可能会被重复提交,使用这种模式的consumer要可以处理重复提交的问题。

3.5 持久化与非持久化

通过producer.setDeliveryMode(DeliveryMode.PERSISTENT) 进行设置
持久化的好处就是当activemq宕机的话,消息队列中的消息不会丢失。非持久化会丢失。但是会消耗一定的性能。

四 与springboot整合

1 配置类ActiveMQConfig

  1. @Configuration
  2. public class ActiveMQConfig {
  3. @Value("${spring.activemq.broker-url:disabled}")
  4. String brokerURL ;
  5. @Value("${activemq.listener.enable:disabled}")
  6. String listenerEnable;
  7. @Bean
  8. public ActiveMQUtil getActiveMQUtil() throws JMSException {
  9. if(brokerURL.equals("disabled")){
  10. return null;
  11. }
  12. ActiveMQUtil activeMQUtil=new ActiveMQUtil();
  13. activeMQUtil.init(brokerURL);
  14. return activeMQUtil;
  15. }
  16. //定义一个消息监听器连接工厂,这里定义的是点对点模式的监听器连接工厂
  17. @Bean(name = "jmsQueueListener")
  18. public DefaultJmsListenerContainerFactory jmsQueueListenerContainerFactory(ActiveMQConnectionFactory activeMQConnectionFactory ) {
  19. DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
  20. if(!listenerEnable.equals("true")){
  21. return null;
  22. }
  23. factory.setConnectionFactory(activeMQConnectionFactory);
  24. //设置并发数
  25. factory.setConcurrency("5");
  26. //重连间隔时间
  27. factory.setRecoveryInterval(5000L);
  28. factory.setSessionTransacted(false);
  29. factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
  30. return factory;
  31. }
  32. @Bean
  33. public ActiveMQConnectionFactory activeMQConnectionFactory ( ){
  34. /* if((url==null||url.equals(""))&&!brokerURL.equals("disabled")){
  35. url=brokerURL;
  36. }*/
  37. ActiveMQConnectionFactory activeMQConnectionFactory =
  38. new ActiveMQConnectionFactory( brokerURL);
  39. return activeMQConnectionFactory;
  40. }
  41. }

2 工具类ActiveMQUtil

  1. public class ActiveMQUtil {
  2. PooledConnectionFactory pooledConnectionFactory=null;
  3. public ConnectionFactory init(String brokerUrl) {
  4. ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrl);
  5. //加入连接池
  6. pooledConnectionFactory=new PooledConnectionFactory(factory);
  7. //出现异常时重新连接
  8. pooledConnectionFactory.setReconnectOnException(true);
  9. //
  10. pooledConnectionFactory.setMaxConnections(5);
  11. pooledConnectionFactory.setExpiryTimeout(10000);
  12. return pooledConnectionFactory;
  13. }
  14. public ConnectionFactory getConnectionFactory(){
  15. return pooledConnectionFactory;
  16. }
  17. }
本文章网址:sjxi.cn/detil/5bf30cd1a4b24a78a64d6f4d1d71bb48

打赏作者

本站为非盈利网站,如果您喜欢这篇文章,欢迎支持我们继续运营!

最新评论
当前未登陆哦
登陆后才可评论哦

    湘ICP备2021009447号

    ×

    (穷逼博主)在线接单

    QQ: 1164453243

    邮箱: abcdsjx@126.com

    前端项目代做
    前后端分离
    Python 爬虫脚本
    Java 后台开发
    各种脚本编写
    服务器搭建
    个人博客搭建
    Web 应用开发
    Chrome 插件编写
    Bug 修复