五、ActiveMQ的Broker(嵌入到java程序)

这篇具有很好参考价值的文章主要介绍了五、ActiveMQ的Broker(嵌入到java程序)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、是什么

  • Broker相当于一个ActiveMQ服务器实例
  • 实现了用代码的形式启动ActiveMQ将MQ嵌入到Java代码中,以便随时用随时启动,在用的时候再去启动这样能节省了资源,也保证了可用性。
  • 用ActiveMQ Broker作为独立的消息服务器来构建Java应用。ActiveMQ也支持在vm中通信基于嵌入的broker,能够无缝的集成其他java应用。

二、Java应用嵌入ActiveMQ

  • 工作中还是以单独部署的ActiveMQ使用为主。

1.pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.qingsi.activemq</groupId>
    <artifactId>activemq_test</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <!-- activemq所需要的jar包配置 -->
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>5.15.11</version>
        </dependency>

        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.10.1</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.xbean/xbean-spring -->
        <dependency>
            <groupId>org.apache.xbean</groupId>
            <artifactId>xbean-spring</artifactId>
            <version>4.15</version>
        </dependency>
        <!--  下面是junit/logback等基础配置  -->
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>1.2.3</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.16.18</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
        </dependency>
    </dependencies>

</project>

2.borker实现

package com.qingsi.activemq.Embed;

import org.apache.activemq.broker.BrokerService;

public class EmbedBroker {
    public static void main(String[] args) throws Exception {
        // ActiveMQ也支持在vm中通信基于嵌入的broker
        BrokerService brokerService = new BrokerService();
        brokerService.setUseJmx(true);
        brokerService.addConnector("tcp://localhost:61616");
        brokerService.start();
    }
}

  • 运行后,启动一个mini版的AcitveMQ
    五、ActiveMQ的Broker(嵌入到java程序),# ActiveMQ,java-activemq,activemq,java

3.生产者

package com.qingsi.activemq;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class JmsProduce {

    public static final String ACTIVEMQ_URL = "tcp://localhost:61616";
    public static final String QUEUE_NAME = "queue01";

    public static void main(String[] args) throws JMSException {
        //1.创建连接工厂,按照给定的URL,采用默认的用户名密码
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        //2.通过连接工厂,获得connection并启动访问
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        //3.创建会话session
        //两个参数transacted=事务,acknowledgeMode=确认模式(签收)
        //开启事务需要commit
        Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
        //4.创建目的地(具体是队列queue还是主题topic)
        Queue queue = session.createQueue(QUEUE_NAME);
        //5.创建消息的生产者,并设置不持久化消息
        MessageProducer producer = session.createProducer(queue);
        //6.通过使用消息生产者,生产三条消息,发送到MQ的队列里面
        try {
            for (int i = 0; i < 3; i++) {
                TextMessage textMessage = session.createTextMessage("tx msg--" + i);
                producer.send(textMessage);
            }
            //7.提交事务
            session.commit();
            System.out.println("消息发送完成");
        } catch (Exception e) {
            System.out.println("出现异常,消息回滚");
            session.rollback();
        } finally {
            //8.关闭资源
            producer.close();
            session.close();
            connection.close();
        }

    }

}

4.消费者

package com.qingsi.activemq;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;
import java.io.IOException;

public class JmsConsumer {
    public static final String ACTIVEMQ_URL = "tcp://localhost:61616";
    public static final String QUEUE_NAME = "queue01";

    public static void main(String[] args) throws JMSException, IOException {
        //1.创建连接工厂,按照给定的URL,采用默认的用户名密码
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        //2.通过连接工厂,获得connection并启动访问
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        //3.创建会话session
        //两个参数transacted=事务,acknowledgeMode=确认模式(签收)
        //消费者开启了事务就必须手动提交,不然会重复消费消息
        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
        //4.创建目的地(具体是队列queue还是主题topic)
        Queue queue = session.createQueue(QUEUE_NAME);
        //5.创建消息的消费者,指定消费哪一个队列里面的消息
        MessageConsumer messageConsumer = session.createConsumer(queue);
        //6.通过监听的方式消费消息
        messageConsumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                if (message instanceof TextMessage) {
                    try {

                        TextMessage textMessage = (TextMessage) message;
                        System.out.println("***消费者接收broker的消息:   " + textMessage.getText());
                        // 手动签收
                        textMessage.acknowledge();
                    } catch (Exception e) {
                        System.out.println("出现异常,消费失败,放弃消费");
                    }
                }
            }
        });
        //7.关闭资源
        System.in.read();
        messageConsumer.close();
        session.close();
        connection.close();
    }
}

文章来源地址https://www.toymoban.com/news/detail-827584.html

到了这里,关于五、ActiveMQ的Broker(嵌入到java程序)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • ActiveMq学习⑨__基于zookeeper和LevelDB搭建ActiveMQ集群

    引入消息中间件后如何保证其高可用? 基于zookeeper和LevelDB搭建ActiveMQ集群。集群仅 提供主备方式的高可用集群功能,避免单点故障 。 http://activemq.apache.org/masterslave LevelDB,5.6版本之后推出了LecelDB的持久化引擎,它使用了自定义的索引代替常用的BTree索引,其持久化性能高于

    2024年02月05日
    浏览(56)
  • 记录一次老服务器启动ActiveMq时报的Could not create the Java Virtual Machine.错误

    服务器系统CentOS7  1、出现ActiveMq服务无法连接 2、查看activemq状态 service activemq status 显示activemq not running 3、找到ActiveMq的bin目录,# 后台启动 ./activemq console 提示Could not create the Java Virtual Machine.错误 可以判断是java运行环境的问题 4、再看看java版本 java -version 5、再看看activemq版

    2024年04月22日
    浏览(59)
  • 【ActiveMQ】Failed to start Apache ActiveMQ (localhost, ID_XXX)

    在尝试使用\\\"binwin64activemq.bat\\\"启动apache-activemq-5.18.2时,出现了以下错误: 错误原因是由于ActiveMQ无法将mqtt://0.0.0.0:1883端口绑定,因为该端口已经被其他进程占用。但是在命令行中输入以下命令并没有返回结果: 解决方法是修改 confactivemq.xml 文件,找到以下部分: 将端口号

    2024年02月10日
    浏览(35)
  • python接收activemq服务器的消息,转发到另外两个activemq服务器消息中

    要使用Python接收ActiveMQ服务器的消息并将其转发到另外两个ActiveMQ服务器,您可以使用Python的pika库。pika是一个流行的AMQP(高级消息队列协议)客户端库,可以与ActiveMQ等消息代理进行交互。 以下是一个简单的示例,演示如何使用pika从ActiveMQ服务器接收消息,并将其转发到另外

    2024年01月19日
    浏览(47)
  • Zabbix监控ActiveMQ

    当我们在线上使用了 ActiveMQ 后,我们需要对一些参数进行监控,比如 消息是否有阻塞,哪个消息队列阻塞了,总的消息数是多少等等。下面我们就通过 Zabbix 结合 Python 脚本来实现对 ActiveMQ 的监控。 一、创建 Activemq Python 监控脚本 因为 CentOS 系统默认安装的是 Python2.7 ,为了

    2024年02月14日
    浏览(42)
  • SpringBoot整合ActiveMQ

    🙈作者简介:练习时长两年半的Java up主 🙉个人主页:程序员老茶 🙊 ps:点赞👍是免费的,却可以让写博客的作者开心好久好久😎 📚系列专栏:Java全栈,计算机系列(火速更新中) 💭 格言:种一棵树最好的时间是十年前,其次是现在 🏡动动小手,点个关注不迷路,感

    2024年02月19日
    浏览(36)
  • ActiveMQ面试题(一)

    什么是ActiveMQ ActiveMQ 服务器宕机怎么办? 丢消息怎么办 持节化消息非常慢 消息的不均匀消费 activeMQ 是一种开源的,实现了 JMS1.1 规范的,面向消息(MOM)的中间件,为应用程序提供高效的、可扩展的、稳定的和安全的企业级消息通信 这得从 ActiveMQ 的储存机制说起。 在通常的

    2024年02月07日
    浏览(41)
  • 七、ActiveMQ的传输协议

    官网地址:http://activemq.apache.org/configuring-version-5-transports.html ActiveMQ支持的client-broker通讯协议有:TVP、NIO、UDP、SSL、Http(s)、VM。 其中配置Transport Connector的文件在ActiveMQ安装目录的conf/activemq.xml中的transportConnectors标签之内。 URI描述信息的头部都是采用协议名称,唯独在进行op

    2024年02月19日
    浏览(36)
  • Activemq存储KahaDb详解

    ActiveMQ在不提供持久化的情况下,数据保存在内存中,一旦应用崩溃或者重启之后,数据都将会丢失,这显然在大部分情况下是我们所不希望的。对此ActiveMQ提供了两种持久化方式以供选择。 kahaDB是一个基于文件,支持事务的、可靠,高性能,可扩展的消息存储器,目前是a

    2024年02月04日
    浏览(34)
  • 二、ActiveMQ安装

    环境:Centos7.9 安装ActiveMQ版本:5.15.9 JDK8 安装教程:https://qingsi.blog.csdn.net/article/details/136129131 ActiveMQ版本:5.15.9 任意版本下载地址 5.15.9版本下载地址 如果有防火墙,需要开通端口 http://127.0.0.1:8161/admin 默认的用户名和密码是admin/admin

    2024年02月19日
    浏览(33)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包