ActiveMQ 介绍 安装

mac2025-02-15  10

ActiveMQ 官网    ActiveMQ 下载     视频教程  腾讯云 ActiveMQ  腾讯云 ActiveMQ  1、ActiveMQ 介绍功能特性 是Apache软件基金会所研发的开放源代码消息中间件(框架);由于ActiveMQ是一个纯Java程序,因此只需要操作系统支持Java虚拟机,ActiveMQ便可执行。 ● 多种语言和协议编写客户端。语言: Java,C,C++,C#,Ruby,Perl,Python,PHP。应用协议: OpenWire,Stomp REST,WS Notification,XMPP,AMQP ● 完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务) ● 对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去通过了常见J2EE服务器(如 Geronimo,JBoss 4,GlassFish,WebLogic)的测试,其中通过JCA 1.5 resource adaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE 1.4 商业服务器上 ● 支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA ● 支持通过JDBC和journal提供高速的消息持久化 ● 自动重连功能 ● 从设计上保证了高性能的集群,客户端-服务器,点对点 ● 支持Ajax ● 支持与Axis的整合 ● 可以很容易的调用内嵌JMS provider,进行测试 ● 默认端口:61616,URL地址:ACTIVEMQ_Url = "tcp://192.168.111.222:61616" *缺点 吞吐量低,由于ActiveMQ需要建立索引,导致吞吐量下降,无分片功能,时效性 ms 级 对队列数较多的情况支持不好, ActiveMQ默认的配置性能偏低,需要优化配置,但是配置文件复杂,ActiveMQ本身不提供管理工具;版本迭代很慢;示例代码少;主页上的文档看上去比较全面,但是缺乏一种有效的组织方式,文档只有片段,用户很难由浅入深进行了解,文档整体的专业性太强。在研究阶段可以通过查maillist、看Javadoc、分析源代码来了解。两种消息传送模式(Queue模式队列 Topic模式队列) 点对点模式(Queue) Point-to-Point(1->1)三个重点:队列(queue)生产者(sender)消费者(receiver) ● 每个消息只要一个消费者(即一条消息只发送给一个消费者,所以消费者再多,性能也不会明显降低) ● 发送者和接收者在时间上是没有时间的约束,也就是说发送者在发送完消息之后,不管接收者有没有接受消息,都不会影响发送方发送消息到消息队列中; ● 发送方不管是否在发送消息,接收方都可以从消息队列中去到消息; ● 接收方在接收完消息之后,需要向消息队列应答成功。 ● 消息被消费后队列中不会再存储,所以消费者不会消费到已经被消费掉的消息,即不可重复消费发布订阅模式(Pub/Sub):一对多模式(Topic) Publish/Subscribe(1->Many)三个重点:主题(topic)发布者(Publisher)订阅者(Subscriber) ● 一个消息可以传递给多个订阅者(即:一个消息可以有多个接受方); ● 发布者与订阅者具有时间约束,针对某个主题(Topic)的订阅者,它必须创建一个订阅者之后,才能消费发布者的消息,而且为了消费消息,订阅者必须保持运行的状态; ● 实现订阅消息的推送,一定是先开启消费者; ● 为了缓和这样严格的时间相关性,JMS 允许订阅者创建一个可持久化的订阅。这样,即使订阅者没有被激活(运行),它也能接收到发布者的消息。Topic 与 Queue 区别两种消费方式 同步阻塞方式(receive()) 异步非阻塞方式(监听器)消息正文类型 TextMessage:字符串 StreamMessage:数据流 BytesMessage:字节 MapMessage:key/value方式的键值对 ObjectMessage:序列化对象2、Windows 安装 ActiveMQ 从 ActiveMQ 官网 下载文件,解压下载的压缩包; 进入bin目录,目录下有两个目录,一个是32位系统,一个是64位系统启动脚本,启动之前请确保自己已经安装了jdk,并且配置了环境变量; 双击 activemq.bat 脚本文件运行; 启动成功后,提示结果 Connector  vm://localhost Started; 浏览器地址栏输入:http://localhost:8161/admin/,登陆的用户名密码:admin/admin (密码具体的配置在:conf/jetty-realm.properties) 登陆成功后的页面 *3、.NET 中 ActiveMQ 类库添加引用 管理 NuGet 程序包:Apache.NMS,Apache.NMS.ActiveMQ *4、.NET Core 中 ActiveMQ 类库添加引用 管理 NuGet 程序包:Apache.NMS.ActiveMQ.NetCore *5、ActiveMQ Broker 消息代理,它是ActiveMQ服务端角色,表示消息队列服务器实体,接受客户端连接,提供消息通信的核心服务。 ActiveMQ Broker 的主要作用是为客户端应用提供一种通信机制,为此 ActiveMQ 提供了一种连接机制,并用连接器(connector)来描述这种连接机制。ActiveMQ 中连接器有两种,一种是用于客户端与消息代理服务器(client-to-broker)之间通信的传输连接器(transport connector),一种是用于消息代理服务器之间(broker-to-broker)通信的网络连接器(network connector)。 * *6、ActiveMQ 集群 *7、ActiveMQ 消息存款、持久化 消息存储有三种:存储到内存、存储到文件、存储到数据库 ● AMQ,是 ActiveMQ 5.0及以前版本默认的消息存储方式,它是一个基于文件的、支持事务的消息存储解决方案。在此方案下消息本身以日志的形式实现持久化,存放在 Data Log 里。并且还对日志里的消息做了引用索引,方便快速取回消息。 ● KahaDB,基于文件并具有支持事务的消息存储方式,从5.3开始推荐使用 KahaDB 存储消息,它提供了比 AMQ 消息存储更好的可扩展性和可恢复性。 ● JDBC,基于 JDBC 方式将消息存储在数据库中,将消息存到数据库相对来说比较慢,所以 ActiveMQ 建议结合 journal 来存储,它使用了快速的缓存写入技术,大大提高了性能。 ● 内存存储,是指将所有要持久化的消息放到内存中,因为这里没有动态的缓存,所以需要注意设置消息服务器的 JVM 和内存大小。 ● LevelDB,5.6版本之后推出了 LevelDB 的持久化引擎,它使用了自定义的索引代替常用的 BTree 索引,其持久化性能高于 KahaDB,虽然默认的持久化方式还是 KahaDB,但是 LevelDB 将是趋势。在5.9版本还提供了基于 LevelDB 和 Zookeeper 的数据复制方式,作为 Master-Slave 方式的首选数据复制方案。JDBC 持久化(数据库存储,Mysql、SQL Server、Oracle、DB2,下面以MySql为例) 配置安装目录下 conf/acticvemq.xml 文件 ● 定义一个 mysql-ds 的 MySql 数据源

<bean id="mysql-ds" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close">   <property name="driverClassName" value="com.mysql.jdbc.Driver" />   <property name="url" value="jdbc:mysql://192.168.25.131:3306/test?characterEncoding=utf-8" />   <property name="username" value="root" />   <property name="password" value="123456" />   <property name="poolPreparedStatements" value="true"/> </bean>

● 在 persistenceAdapter 节点中配置 jdbcPersistenceAdapter 并且引用刚才定义的数据源   dataDirectory:需要配置和broker 的dataDirectory 一致;   dataSource:数据源的选择,关联数据库的具体配置,下文会具体说明;   useDatabaseLock:是否使用数据库锁,主要是在程序启动的时候会同步查询数据,导致数据库锁;   createTablesOnStartup:是否启动时创建表,默认true,每次启动会创建数据表,一般第一次启动设置true,之后设置false;

<persistenceAdapter>   <!-- <kahaDB directory="${activemq.data}/kahadb"/>-->   <jdbcPersistenceAdapter dataDirectory="${activemq.base}/data" dataSource="#mysql-ds" useDatabaseLock="false" createTablesOnStartup="true" /> </persistenceAdapter>

● 启动成功后,数据库会创建3个表:activemq_msgs,activemq_acks和activemq_lock ● 数据库连接池问题,启动activemq如果提示数据库的连接池有问题,这可能是少了lib,增加    ● mysql-connector-java-5.1.30.jar    ● commons-dbcp2-2.1.1.jar    ● commons-pool2-2.4.2.jar    三个包,放到lib目录即可 * *8、消息生产者编码(发送消息) 创建连接工厂 - ConnectionFactory factory = new ConnectionFactory("tcp://192.168.111.222:61616") 创建连接 - using (IConnection conn = factory.CreateConnection()) { } 启动访问 - conn.Start() 创建会话session - using (ISession sessio = connection.CreateSession()) { }创建生产者Producer - IMessageProducer prod = session.CreateProducer() 创建目的地(队列 or 主题)- ITemporaryQueue queue = session.CreateTemporaryQueue() 或 ITemporaryTopic topic = session.CreateTemporaryTopic() 创建消息 - ITextMessage textMessage = session.CreateTextMessage("消息---" + DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")); 发送消息 - prod.Send(queue) 释放关闭资源 - prod.Close(),session.Close(),conn.Close() 完整代码如下 *9、消息消费者编码(接收消息) 创建连接工厂 - ConnectionFactory factory = new ConnectionFactory("tcp://192.168.111.222:61616") 创建连接 - using (IConnection conn = factory.CreateConnection()) { } 启动访问 - conn.Start() 创建会话session - using (ISession sessio = connection.CreateSession()) { } 创建目的地(队列 or 主题)- ITemporaryQueue queue = session.CreateTemporaryQueue() 或 ITemporaryTopic topic = session.CreateTemporaryTopic()创建消费者Consumer - IMessageConsumer consumer = session.CreateConsumer(queue) 接收消息:同步阻塞接收消息 while (true) {     ITextMessage message = (ITextMessage)consumer.Receive();     string msg = message.Text; } 接收消息:监听接收消息 consumer.Listener += (IMessage message) =>  {     ITextMessage msg = (ITextMessage)message;     string content = msg.Text; }; 或 consumer.Listener += new MessageListener(consumer_Listener); void consumer_Listener(IMessage message) {     ITextMessage msg = (ITextMessage)message; } 释放关闭资源 - consumer.Close(),session.Close(),conn.Close() 完整代码如下 *10、发送消息、接收消息代码示例 * 11、 * 12、 * 13、 * 14、 * 15、 * 16、 * 17、 * 18、 * 19、 * 20

最新回复(0)