ActiveMQ 02 常用API

这篇具有很好参考价值的文章主要介绍了ActiveMQ 02 常用API。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Active MQ 02

常用API

事务

session.commit();
session.rollback();

用来提交/回滚事务

Purge

清理消息

签收模式

签收代表接收端的session已收到消息的一次确认,反馈给broker

ActiveMQ支持自动签收与手动签收

Session.AUTO_ACKNOWLEDGE

当客户端从receiver或onMessage成功返回时,Session自动签收客户端的这条消息的收条。

Session.CLIENT_ACKNOWLEDGE

客户端通过调用消息(Message)的acknowledge方法签收消息。在这种情况下,签收发生在Session层面:签收一个已经消费的消息会自动地签收这个Session所有已消费的收条。

Session.DUPS_OK_ACKNOWLEDGE

Session不必确保对传送消息的签收,这个模式可能会引起消息的重复,但是降低了Session的开销,所以只有客户端能容忍重复的消息,才可使用。

持久化

默认持久化是开启的

producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT)

优先级

可以打乱消费顺序

producer.setPriority

配置文件需要指定使用优先级的目的地

<policyEntry queue="queue1" prioritizedMessages="true" />

消息超时/过期

producer.setTimeToLive

设置了消息超时的消息,消费端在超时后无法在消费到此消息。

给消息设置一个超时时间 -> 死信队列 -> 拿出来 -> 重发

死信

此类消息会进入到ActiveMQ.DLQ队列且不会自动清除,称为死信

此处有消息堆积的风险

修改死信队列名称
			  <policyEntry queue="f" prioritizedMessages="true" >
				<deadLetterStrategy> 

					<individualDeadLetterStrategy   queuePrefix="DLxxQ." useQueueForQueueMessages="true" /> 

				</deadLetterStrategy> 
			  </policyEntry>

useQueueForQueueMessages: 设置使用队列保存死信,还可以设置useQueueForTopicMessages,使用Topic来保存死信

让非持久化的消息也进入死信队列
			<individualDeadLetterStrategy   queuePrefix="DLxxQ." useQueueForQueueMessages="true"  processNonPersistent="true" /> 

processNonPersistent=“true”

过期消息不进死信队列
<individualDeadLetterStrategy   processExpired="false"  /> 

独占消费者

	Queue queue = session.createQueue("xxoo?consumer.exclusive=true");

还可以设置优先级

Queue queue = session.createQueue("xxoo?consumer.exclusive=true&consumer.priority=10");

消息类型

object

发送端
		Girl girl = new Girl("qiqi",25,398.0);
		
		Message message = session.createObjectMessage(girl);
接受端
		if(message instanceof ActiveMQObjectMessage) {
			
			Girl girl = (Girl)((ActiveMQObjectMessage)message).getObject();
			
			System.out.println(girl);
			System.out.println(girl.getName());
		}

如果遇到此类报错

Exception in thread "main" javax.jms.JMSException: Failed to build body from content. Serializable class not available to broker. Reason: java.lang.ClassNotFoundException: Forbidden class com.mashibing.mq.Girl! This class is not trusted to be serialized as ObjectMessage payload. Please take a look at http://activemq.apache.org/objectmessage.html for more information on how to configure trusted classes.
	at org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:36)
	at org.apache.activemq.command.ActiveMQObjectMessage.getObject(ActiveMQObjectMessage.java:213)
	at com.mashibing.mq.Receiver.main(Receiver.java:65)
Caused by: java.lang.ClassNotFoundException: Forbidden class com.mashibing.mq.Girl! This class is not trusted to be serialized as ObjectMessage payload. Please take a look at http://activemq.apache.org/objectmessage.html for more information on how to configure trusted classes.
	at org.apache.activemq.util.ClassLoadingAwareObjectInputStream.checkSecurity(ClassLoadingAwareObjectInputStream.java:112)
	at org.apache.activemq.util.ClassLoadingAwareObjectInputStream.resolveClass(ClassLoadingAwareObjectInputStream.java:57)
	at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1868)
	at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2042)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
	at org.apache.activemq.command.ActiveMQObjectMessage.getObject(ActiveMQObjectMessage.java:211)
	... 1 more

需要添加信任

		connectionFactory.setTrustedPackages(
				new ArrayList<String>(
						Arrays.asList(
								new String[]{
										Girl.class.getPackage().getName()
										}
								
								)
						)
				
				);

bytesMessage

发送端
		BytesMessage bytesMessage = session.createBytesMessage();
        bytesMessage.writeBytes("str".getBytes());
        bytesMessage.writeUTF("哈哈");
接受端
		if(message instanceof BytesMessage) {
			BytesMessage bm = (BytesMessage)message;
			
			 byte[] b = new byte[1024];
             int len = -1;
             while ((len = bm.readBytes(b)) != -1) {
                 System.out.println(new String(b, 0, len));
             }
		}

还可以使用ActiveMQ给提供的便捷方法,但要注意读取和写入的顺序

bm.readBoolean()
bm.readUTF()
写入文件
                    FileOutputStream out = null;
                    try {
                        out = new FileOutputStream("d:/aa.txt");
                    } catch (FileNotFoundException e2) {
                        e2.printStackTrace();
                    }
                    byte[] by = new byte[1024];
                    int len = 0 ;
                    try {
                        while((len = bm.readBytes(by))!= -1){
                            out.write(by,0,len);
                        }
                    } catch (Exception e1) {
                        e1.printStackTrace();
                    }

MapMessage

发送端
		MapMessage mapMessage = session.createMapMessage();
      
		
		mapMessage.setString("name","lucy");
        mapMessage.setBoolean("yihun",false);
		mapMessage.setInt("age", 17);
		
		producer.send(mapMessage);
接收端
		Message message = consumer.receive();
		MapMessage mes = (MapMessage) message;
		
		System.out.println(mes);

		System.out.println(mes.getString("name"));

消息发送原理

同步与异步

开启事务 关闭事务
持久化 异步 同步
非持久化 异步 异步

我们可以通过以下几种方式来设置异步发送:

		ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
				"admin",
				"admin",
				"tcp://localhost:61616"
				);
		// 2.获取一个向ActiveMQ的连接
		connectionFactory.setUseAsyncSend(true);
		ActiveMQConnection connection = (ActiveMQConnection)connectionFactory.createConnection();
		connection.setUseAsyncSend(true);

消息堆积

producer每发送一个消息,统计一下发送的字节数,当字节数达到ProducerWindowSize值时,需要等待broker的确认,才能继续发送。

brokerUrl中设置: tcp://localhost:61616?jms.producerWindowSize=1048576

destinationUri中设置: myQueue?producer.windowSize=1048576

延迟消息投递

首先在配置文件中开启延迟和调度

schedulerSupport=“true”

    <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" schedulerSupport="true">

延迟发送

message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 10*1000);

带间隔的重复发送

		long delay = 10 * 1000;
		long period = 2 * 1000;
		int repeat = 9;
		message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
		message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period);
		message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeat);
		createProducer.send(message);

Cron表达式定时发送

Cron表达式是一个字符串,字符串以5或6个空格隔开,分为6或7个域,每一个域代表一个含义,Cron有如下两种语法格式:

Seconds Minutes Hours DayofMonth Month DayofWeek Year或

Seconds Minutes Hours DayofMonth Month DayofWeek

每一个域可出现的字符如下:

Seconds:可出现", - * /"四个字符,有效范围为0-59的整数

Minutes:可出现", - * /"四个字符,有效范围为0-59的整数

Hours:可出现", - * /"四个字符,有效范围为0-23的整数

DayofMonth:可出现", - * / ? L W C"八个字符,有效范围为0-31的整数

Month:可出现", - * /"四个字符,有效范围为1-12的整数或JAN-DEc

DayofWeek:可出现", - * / ? L C #"四个字符,有效范围为1-7的整数或SUN-SAT两个范围。1表示星期天,2表示星期一, 依次类推

Year:可出现", - * /"四个字符,有效范围为1970-2099年

每一个域都使用数字,但还可以出现如下特殊字符,它们的含义是:

(1):表示匹配该域的任意值,假如在Minutes域使用, 即表示每分钟都会触发事件。

(2)?:只能用在DayofMonth和DayofWeek两个域。它也匹配域的任意值,但实际不会。因为DayofMonth和 DayofWeek会相互影响。例如想在每月的20日触发调度,不管20日到底是星期几,则只能使用如下写法: 13 13 15 20 * ?, 其中最后一位只能用?,而不能使用*,如果使用*表示不管星期几都会触发,实际上并不是这样。

(3)-:表示范围,例如在Minutes域使用5-20,表示从5分到20分钟每分钟触发一次

(4)/:表示起始时间开始触发,然后每隔固定时间触发一次,例如在Minutes域使用5/20,则意味着5分钟触发一次,而25,45等分别触发一次.

(5),:表示列出枚举值值。例如:在Minutes域使用5,20,则意味着在5和20分每分钟触发一次。

(6)L:表示最后,只能出现在DayofWeek和DayofMonth域,如果在DayofWeek域使用5L,意味着在最后的一个星期四触发。

(7)W: 表示有效工作日(周一到周五),只能出现在DayofMonth域,系统将在离指定日期的最近的有效工作日触发事件。例如:在 DayofMonth使用5W,如果5日是星期六,则将在最近的工作日:星期五,即4日触发。如果5日是星期天,则在6日(周一)触发;如果5日在星期一 到星期五中的一天,则就在5日触发。另外一点,W的最近寻找不会跨过月份

(8)LW:这两个字符可以连用,表示在某个月最后一个工作日,即最后一个星期五。

(9)#:用于确定每个月第几个星期几,只能出现在DayofMonth域。例如在4#2,表示某月的第二个星期三。

举几个例子:

0 0 2 1 * ? * 表示在每月的1日的凌晨2点调度任务

0 15 10 ? * MON-FRI 表示周一到周五每天上午10:15执行作业

0 15 10 ? 6L 2002-2006 表示2002-2006年的每个月的最后一个星期五上午10:15执行作

一个cron表达式有至少6个(也可能7个)有空格分隔的时间元素。

按顺序依次为

秒(0~59)

分钟(0~59)

小时(0~23)

天(月)(0~31,但是你需要考虑你月的天数)

月(0~11)

天(星期)(1~7 1=SUN 或 SUN,MON,TUE,WED,THU,FRI,SAT)

年份(1970-2099)

其中每个元素可以是一个值(如6),一个连续区间(9-12),一个间隔时间(8-18/4)(/表示每隔4小时),一个列表(1,3,5),通配符。由于"月份中的日期"和"星期中的日期"这两个元素互斥的,必须要对其中一个设置?

0 0 10,14,16 * * ? 每天上午10点,下午2点,4点

0 0/30 9-17 * * ? 朝九晚五工作时间内每半小时

0 0 12 ? * WED 表示每个星期三中午12点

“0 0 12 * * ?” 每天中午12点触发

“0 15 10 ? * *” 每天上午10:15触发

“0 15 10 * * ?” 每天上午10:15触发

“0 15 10 * * ? *” 每天上午10:15触发

“0 15 10 * * ? 2005” 2005年的每天上午10:15触发

“0 * 14 * * ?” 在每天下午2点到下午2:59期间的每1分钟触发

“0 0/5 14 * * ?” 在每天下午2点到下午2:55期间的每5分钟触发

“0 0/5 14,18 * * ?” 在每天下午2点到2:55期间和下午6点到6:55期间的每5分钟触发

“0 0-5 14 * * ?” 在每天下午2点到下午2:05期间的每1分钟触发

“0 10,44 14 ? 3 WED” 每年三月的星期三的下午2:10和2:44触发

“0 15 10 ? * MON-FRI” 周一至周五的上午10:15触发

“0 15 10 15 * ?” 每月15日上午10:15触发

“0 15 10 L * ?” 每月最后一日的上午10:15触发

“0 15 10 ? * 6L” 每月的最后一个星期五上午10:15触发

“0 15 10 ? * 6L 2002-2005” 2002年至2005年的每月的最后一个星期五上午10:15触发

“0 15 10 ? * 6#3” 每月的第三个星期五上午10:15触发

监听器

可以使用监听器来处理消息接收

consumer.setMessageListener(new MyListener());

需要实现接口MessageListener

public class MyListener implements MessageListener {

	public void onMessage(Message message) {
		// TODO Auto-generated method stub
		TextMessage textMessage = (TextMessage)message;
		try {
			System.out.println("xxoo" + textMessage.getText());
		} catch (JMSException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}

}

当收到消息后会调起onMessage方法文章来源地址https://www.toymoban.com/news/detail-853379.html

消息过滤

消息发送

		MapMessage msg1 = session.createMapMessage();
		msg1.setString("name", "qiqi");
		msg1.setString("age", "18");
		
		msg1.setStringProperty("name", "qiqi");
		msg1.setIntProperty("age", 18);
		MapMessage msg2 = session.createMapMessage();
		msg2.setString("name", "lucy");
		msg2.setString("age", "18");
		msg2.setStringProperty("name", "lucy");
		msg2.setIntProperty("age", 18);
		MapMessage msg3 = session.createMapMessage();
		msg3.setString("name", "qianqian");
		msg3.setString("age", "17");
		msg3.setStringProperty("name", "qianqian");
		msg3.setIntProperty("age", 17);

消息接收

	
		String selector1 = "age > 17";
		String selector2 = "name = 'lucy'";
		MessageConsumer consumer = session.createConsumer(queue,selector2);

到了这里,关于ActiveMQ 02 常用API的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • ActiveMq学习⑦__ActiveMq协议

    问题一、默认的61616端口如何更改? 问题二、你生产上的链接协议如何配置的?使用tcp吗? ActiveMQ 支持的client-broker 通讯协议有:TVP、NIO、UDP、SSL、Http(s)、VM。 其中配置TransportConnector 的文件在ActiveMQ 安装目录的conf/activemq.xml 中的标签之内。 activemq 传输协议的官方文档:htt

    2024年02月05日
    浏览(42)
  • docker-compose安装和使用(自启、redis、mysql、rabbitmq、activemq、es、nginx、java应用)

    1.在线安装docker-compose: 参考官网:https://docs.docker.com/compose/install/other/ docker-compose安装及简单入门 [Docker] docker-compose使用教程 Docker系列教程22-docker-compose.yml常用命令 2、离线安装docker-compose: 参考:Docker - 离线安装 docker-compose(以CentOS系统为例) (1)首先访问 docker-compose 的

    2024年02月05日
    浏览(55)
  • ActiveMq学习⑨__基于zookeeper和LevelDB搭建ActiveMQ集群

    引入消息中间件后如何保证其高可用? 基于zookeeper和LevelDB搭建ActiveMQ集群。集群仅 提供主备方式的高可用集群功能,避免单点故障 。 http://activemq.apache.org/masterslave LevelDB,5.6版本之后推出了LecelDB的持久化引擎,它使用了自定义的索引代替常用的BTree索引,其持久化性能高于

    2024年02月05日
    浏览(56)
  • 记录一次老服务器启动ActiveMq时报的Could not create the Java Virtual Machine.错误

    服务器系统CentOS7  1、出现ActiveMq服务无法连接 2、查看activemq状态 service activemq status 显示activemq not running 3、找到ActiveMq的bin目录,# 后台启动 ./activemq console 提示Could not create the Java Virtual Machine.错误 可以判断是java运行环境的问题 4、再看看java版本 java -version 5、再看看activemq版

    2024年04月22日
    浏览(59)
  • 【ActiveMQ】Failed to start Apache ActiveMQ (localhost, ID_XXX)

    在尝试使用\\\"binwin64activemq.bat\\\"启动apache-activemq-5.18.2时,出现了以下错误: 错误原因是由于ActiveMQ无法将mqtt://0.0.0.0:1883端口绑定,因为该端口已经被其他进程占用。但是在命令行中输入以下命令并没有返回结果: 解决方法是修改 confactivemq.xml 文件,找到以下部分: 将端口号

    2024年02月10日
    浏览(35)
  • python接收activemq服务器的消息,转发到另外两个activemq服务器消息中

    要使用Python接收ActiveMQ服务器的消息并将其转发到另外两个ActiveMQ服务器,您可以使用Python的pika库。pika是一个流行的AMQP(高级消息队列协议)客户端库,可以与ActiveMQ等消息代理进行交互。 以下是一个简单的示例,演示如何使用pika从ActiveMQ服务器接收消息,并将其转发到另外

    2024年01月19日
    浏览(47)
  • Zabbix监控ActiveMQ

    当我们在线上使用了 ActiveMQ 后,我们需要对一些参数进行监控,比如 消息是否有阻塞,哪个消息队列阻塞了,总的消息数是多少等等。下面我们就通过 Zabbix 结合 Python 脚本来实现对 ActiveMQ 的监控。 一、创建 Activemq Python 监控脚本 因为 CentOS 系统默认安装的是 Python2.7 ,为了

    2024年02月14日
    浏览(42)
  • SpringBoot整合ActiveMQ

    🙈作者简介:练习时长两年半的Java up主 🙉个人主页:程序员老茶 🙊 ps:点赞👍是免费的,却可以让写博客的作者开心好久好久😎 📚系列专栏:Java全栈,计算机系列(火速更新中) 💭 格言:种一棵树最好的时间是十年前,其次是现在 🏡动动小手,点个关注不迷路,感

    2024年02月19日
    浏览(36)
  • ActiveMQ面试题(一)

    什么是ActiveMQ ActiveMQ 服务器宕机怎么办? 丢消息怎么办 持节化消息非常慢 消息的不均匀消费 activeMQ 是一种开源的,实现了 JMS1.1 规范的,面向消息(MOM)的中间件,为应用程序提供高效的、可扩展的、稳定的和安全的企业级消息通信 这得从 ActiveMQ 的储存机制说起。 在通常的

    2024年02月07日
    浏览(41)
  • 七、ActiveMQ的传输协议

    官网地址:http://activemq.apache.org/configuring-version-5-transports.html ActiveMQ支持的client-broker通讯协议有:TVP、NIO、UDP、SSL、Http(s)、VM。 其中配置Transport Connector的文件在ActiveMQ安装目录的conf/activemq.xml中的transportConnectors标签之内。 URI描述信息的头部都是采用协议名称,唯独在进行op

    2024年02月19日
    浏览(36)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包