# rabbitmq **Repository Path**: stevenmaster/rabbitmq ## Basic Information - **Project Name**: rabbitmq - **Description**: No description available - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 1 - **Created**: 2025-07-10 - **Last Updated**: 2025-07-10 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README 文章链接 ~~~bash https://gitee.com/fakerlove/rabbitmq ~~~ # RabbitMQ 实战教程 # 1.MQ引言 ## 1.1 什么是MQ `MQ`(Message Quene) : 翻译为 `消息队列`,通过典型的 `生产者`和`消费者`模型,生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入,轻松的实现系统间解耦。别名为 `消息中间件` 通过利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。 ## 1.2 MQ有哪些 当今市面上有很多主流的消息中间件,如老牌的`ActiveMQ`、`RabbitMQ`,炙手可热的`Kafka`,阿里巴巴自主开发`RocketMQ`等。 ## 1.3 不同MQ特点 ```markdown # 1.ActiveMQ ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。它是一个完全支持JMS规范的的消息中间件。丰富的API,多种集群架构模式让ActiveMQ在业界成为老牌的消息中间件,在中小型企业颇受欢迎! # 2.Kafka Kafka是LinkedIn开源的分布式发布-订阅消息系统,目前归属于Apache顶级项目。Kafka主要特点是基于Pull的模式来处理消息消费, 追求高吞吐量,一开始的目的就是用于日志收集和传输。0.8版本开始支持复制,不支持事务,对消息的重复、丢失、错误没有严格要求, 适合产生大量数据的互联网服务的数据收集业务。 # 3.RocketMQ RocketMQ是阿里开源的消息中间件,它是纯Java开发,具有高吞吐量、高可用性、适合大规模分布式系统应用的特点。RocketMQ思路起 源于Kafka,但并不是Kafka的一个Copy,它对消息的可靠传输及事务性做了优化,目前在阿里集团被广泛应用于交易、充值、流计算、消 息推送、日志流式处理、binglog分发等场景。 # 4.RabbitMQ RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。AMQP的主要特征是面向消息、队列、路由(包括点对点和 发布/订阅)、可靠性、安全。AMQP协议更多用在企业系统内对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在 其次。 ``` > RabbitMQ比Kafka可靠,Kafka更适合IO高吞吐的处理,一般应用在大数据日志处理或对实时性(少量延迟),可靠性(少量丢数据)要求稍低的场景使用,比如ELK日志收集。 --- # 2.RabbitMQ 的安装 [原文链接](https://gitee.com/fakerlove/rabbitmq) @[TOC] ## 2.1 安装包下载安装 ### 2.2.1 下载 rabbitmq 是基于 erlang 编程语言的,所以需要环境的 首先注意系统版本,自己的服务器是linux 还是windows 下面的网址是下载erlang 的 ~~~bash https://www.erlang-solutions.com/resources/download.html ~~~ 下面的比较慢 ~~~bash https://www.erlang.org/downloads ~~~ 下面是下载rabbitmq 的 ~~~bash https://www.rabbitmq.com/download.html ~~~ ![image-20190925220115235](https://gitee.com/fakerlove/picture_1/raw/master/image-20190925220115235.png) > `最新版本`: 3.7.18 ### 2.2.2 下载的安装包 rabbit 如果能够运行,需要两个东西 erlang 和socket 的包 ![image-20190925220343521](https://gitee.com/fakerlove/picture_1/raw/master/image-20190925220343521.png) > `注意`:这里的安装包是centos7安装的包 ### 2.2.3 安装步骤 1.将rabbitmq安装包上传到linux系统中 ~~~bash erlang-22.0.7-1.el7.x86_64.rpm rabbitmq-server-3.7.18-1.el7.noarch.rpm ~~~ 2.安装依赖包 rpm安装方式 就需要三个rpm 格式的东西 ~~~bash rpm -ivh erlang-22.0.7-1.el7.x86_64.rpm rpm -ivh socat-1.7.3.2-2.el7.x86_64.rpm rpm -ivh rabbitmq-server-3.7.18-1.el7.noarch.rpm ~~~ yum 安装方式 * 安装erlang需要的依赖环境 ``` # 添加仓库 curl -s https://packagecloud.io/install/repositories/rabbitmq/erlang/script.rpm.sh | sudo bash Detected operating system as centos/8. # 安装erlang dnf install erlang ``` * 安装socat ~~~bash wget http://www.dest-unreach.org/socat/download/socat-1.7.0.1.tar.gz tar -zxvf socat-1.7.0.1.tar.gz cd socat-1.7.0.1 ./configure --disable-fips make && make install ~~~ ~~~bash # 如果是centos 8 http://www.dest-unreach.org/socat/download/socat-1.7.4.0.tar.gz ~~~ * 安装 logrotate ~~~bash yum -y install logrotate ~~~ * 问题 centos 7 的socat.rpm ~~~bash wget http://repo.iotti.biz/CentOS/7/x86_64/socat-1.7.3.2-5.el7.lux.x86_64.rpm ~~~ 二、导入密钥 ``` rpm --import https://github.com/rabbitmq/signing-keys/releases/download/2.0/rabbitmq-release-signing-key.asc ``` 三、在/etc/yum.repos.d目录下添加rabbitmq.repo文件,内容如下: ```bash [bintray-rabbitmq-server] name=bintray-rabbitmq-rpm baseurl=https://dl.bintray.com/rabbitmq/rpm/rabbitmq-server/v3.8.x/el/8/ gpgcheck=0 repo_gpgcheck=0 enabled=1 ``` centos7 的配置 ~~~bash [bintray-rabbitmq-server] name=bintray-rabbitmq-rpm baseurl=https://dl.bintray.com/rabbitmq/rpm/rabbitmq-server/v3.8.x/el/7/ gpgcheck=0 repo_gpgcheck=0 enabled=1 ~~~ 3.安装RabbitMQ安装包(需要联网) ~~~bash yum install -y rabbitmq-server-3.7.18-1.el7.noarch.rpm ~~~ 4.复制配置文件 ~~~bash cp /usr/share/doc/rabbitmq-server-3.7.18/rabbitmq.config.example /etc/rabbitmq/rabbitmq.config ~~~ > 注意:默认安装完成后配置文件模板在:/usr/share/doc/rabbitmq-server-3.7.18/rabbitmq.config.example目录中,需要 > 将配置文件复制到/etc/rabbitmq/目录中,并修改名称为rabbitmq.config 5.查看配置文件位置 ~~~bash ls /etc/rabbitmq/rabbitmq.config ~~~ 6.修改配置文件(参见下图:) ```markdown vim /etc/rabbitmq/rabbitmq.config ``` ![image-20190925222230260](https://gitee.com/fakerlove/picture_1/raw/master/image-20190925222230260-3836271-16300347915721.png) 将上图中配置文件中红色部分去掉`%%`,以及最后的`,`逗号 修改为下图: ![image-20190925222329200](https://gitee.com/fakerlove/picture_1/raw/master/image-2019092522sdaasasda2329200-3836312.png) 7.执行如下命令,启动rabbitmq中的插件管理 ~~~bash rabbitmq-plugins enable rabbitmq_management ~~~ > 出现如下说明: > Enabling plugins on node rabbit@localhost: > rabbitmq_management > The following plugins have been configured: > rabbitmq_management > rabbitmq_management_agent > rabbitmq_web_dispatch > Applying plugin configuration to rabbit@localhost... > The following plugins have been enabled: > rabbitmq_management > rabbitmq_management_agent > rabbitmq_web_dispatch > > set 3 plugins. > Offline change; changes will take effect at broker restart. 8.启动RabbitMQ的服务 ~~~bash systemctl start rabbitmq-server systemctl restart rabbitmq-server systemctl stop rabbitmq-server ~~~ 9.查看服务状态(见下图:) ~~~bash systemctl status rabbitmq-server ~~~ ```markdown ● rabbitmq-server.service - RabbitMQ broker Loaded: loaded (/usr/lib/systemd/system/rabbitmq-server.service; disabled; vendor preset: disabled) Active: active (running) since 三 2019-09-25 22:26:35 CST; 7s ago Main PID: 2904 (beam.smp) Status: "Initialized" CGroup: /system.slice/rabbitmq-server.service ├─2904 /usr/lib64/erlang/erts-10.4.4/bin/beam.smp -W w -A 64 -MBas ageffcbf -MHas ageffcbf - MBlmbcs... ├─3220 erl_child_setup 32768 ├─3243 inet_gethost 4 └─3244 inet_gethost 4 ......... ``` ![image-20190925222743776](https://gitee.com/fakerlove/picture_1/raw/master/image-20190925222743776-3836511.png) 10.关闭防火墙服务 ~~~bash systemctl disable firewalld Removed symlink /etc/systemd/system/multi-user.target.wants/firewalld.service. Removed symlink /etc/systemd/system/dbus-org.fedoraproject.FirewallD1.service. systemctl stop firewalld ~~~ 11.访问web管理界面 ```markdown http://10.15.0.8:15672/ ``` ![image-20190926194738708](https://gitee.com/fakerlove/picture_1/raw/master/image-20190926194738708-3836601.png) 12.登录管理界面 ```markdown username: guest password: guest ``` ![image-20190926194954822](https://gitee.com/fakerlove/picture_1/raw/master/image-20190926194954822-3836665.png) ## 2.2 docker 安装 rabbitmq 下载镜像,并且运行 ~~~bash docker run -d -p 5672:5672 -p 15672:15672 --name rabbitmq rabbitmq:management ~~~ 最新版本的话 ~~~bash docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management ~~~ 访问网址 ~~~bash http://你的主机名(比如localhost):15672/#/ ~~~ ## 2.3 yum 安装 检查谁提供了rabbItmq ~~~bash yum provides rabbitmq-server ~~~ 安装 ~~~bash yum install rabbitmq-server # 启动rabbitmq-server systemctl start rabbitmq-server # 查看rabbitmq-server状态 systemctl status rabbitmq-server # 启动web管理功能,端口号15672 rabbitmq-plugins enable rabbitmq_management # 重启rabbitmq-server systemctl restart rabbitmq-server ~~~ ## 2.4 RabbitMQ 简介 ### 2.4.1 简介 > 基于`AMQP`协议,erlang语言开发,是部署最广泛的开源消息中间件,是最受欢迎的开源消息中间件之一。 ![image-20190925215603036](https://gitee.com/fakerlove/picture_1/raw/master/image-20190925215603036-9419777.png) `官网`: https://www.rabbitmq.com/ `官方教程`: https://www.rabbitmq.com/#getstarted ```markdown # AMQP 协议 AMQP(advanced message queuing protocol)`在2003年时被提出,最早用于解决金融领不同平台之间的消息传递交互问题。顾名思义,AMQP是一种协议,更准确的说是一种binary wire-level protocol(链接协议)。这是其和JMS的本质差别,AMQP不从API层进行限定,而是直接定义网络交换的数据格式。这使得实现了AMQP的provider天然性就是跨平台的。以下是AMQP协议模型: ``` ![image-20200311182438041](https://gitee.com/fakerlove/picture_1/raw/master/image-20200311182438041.png) ### 2.4.2 应用场景 RabbitMQ除了像兔子一样跑的很快以外,还有这些特点: - 开源、性能优秀,稳定性保障 - 提供可靠性消息投递模式、返回模式 - 与Spring AMQP完美整合,API丰富 - 集群模式丰富,表达式配置,HA模式,镜像队列模型 - 保证数据不丢失的前提做到高可靠性、可用性 MQ典型应用场景: - 异步处理。把消息放入消息中间件中,等到需要的时候再去处理。 - 流量削峰。例如秒杀活动,在短时间内访问量急剧增加,使用消息队列,当消息队列满了就拒绝响应,跳转到错误页面,这样就可以使得系统不会因为超负载而崩溃。 - 日志处理 - 应用解耦。假设某个服务A需要给许多个服务(B、C、D)发送消息,当某个服务(例如B)不需要发送消息了,服务A需要改代码再次部署;当新加入一个服务(服务E)需要服务A的消息的时候,也需要改代码重新部署;另外服务A也要考虑其他服务挂掉,没有收到消息怎么办?要不要重新发送呢?是不是很麻烦,使用MQ发布订阅模式,服务A只生产消息发送到MQ,B、C、D从MQ中读取消息,需要A的消息就订阅,不需要了就取消订阅,服务A不再操心其他的事情,使用这种方式可以降低服务或者系统之间的耦合。 ### 2.4.3 AMQP协议 提到RabbitMQ,就不得不提AMQP协议。AMQP协议是具有现代特征的二进制协议。是一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。 先了解一下AMQP协议中间的几个重要概念: - Server:接收客户端的连接,实现AMQP实体服务。 - Connection:连接,应用程序与Server的网络连接,TCP连接。 - Channel:信道,消息读写等操作在信道中进行。客户端可以建立多个信道,每个信道代表一个会话任务。 - Message:消息,应用程序和服务器之间传送的数据,消息可以非常简单,也可以很复杂。有Properties和Body组成。Properties为外包装,可以对消息进行修饰,比如消息的优先级、延迟等高级特性;Body就是消息体内容。 - Virtual Host:虚拟主机,用于逻辑隔离。一个虚拟主机里面可以有若干个Exchange和Queue,同一个虚拟主机里面不能有相同名称的Exchange或Queue。 - Exchange:交换器,接收消息,按照路由规则将消息路由到一个或者多个队列。如果路由不到,或者返回给生产者,或者直接丢弃。RabbitMQ常用的交换器常用类型有direct、topic、fanout、headers四种,后面详细介绍。 - Binding:绑定,交换器和消息队列之间的虚拟连接,绑定中可以包含一个或者多个RoutingKey。 - RoutingKey:路由键,生产者将消息发送给交换器的时候,会发送一个RoutingKey,用来指定路由规则,这样交换器就知道把消息发送到哪个队列。路由键通常为一个“.”分割的字符串,例如“com.rabbitmq”。 - Queue:消息队列,用来保存消息,供消费者消费。 > 我们完全可以直接使用 Connection 就能完成信道的工作,为什么还要引入信道呢? > 试想这样一个场景, 一个应用程序中有很多个线程需要从 RabbitMQ 中消费消息,或者生产消息,那么必然需要建立很多个 Connection,也就是许多个 TCP 连接。然而对于操作系统而言,建立和销毁 TCP 连接是非常昂贵的开销,如果遇到使用高峰,性能瓶颈也随之显现。 RabbitMQ 采用 TCP 连接复用的方式,不仅可以减少性能开销,同时也便于管理 。 下图是AMQP的协议模型: ![img](https://gitee.com/fakerlove/picture_1/raw/master/1538609-20190720105435977-1170222541.png) 正如图中所看到的,AMQP协议模型有三部分组成:生产者、消费者和服务端。 生产者是投递消息的一方,首先连接到Server,建立一个连接,开启一个信道;然后生产者声明交换器和队列,设置相关属性,并通过路由键将交换器和队列进行绑定。同理,消费者也需要进行建立连接,开启信道等操作,便于接收消息。 接着生产者就可以发送消息,发送到服务端中的虚拟主机,虚拟主机中的交换器根据路由键选择路由规则,然后发送到不同的消息队列中,这样订阅了消息队列的消费者就可以获取到消息,进行消费。 最后还要关闭信道和连接。 RabbitMQ是基于AMQP协议实现的,其结构如下图所示,和AMQP协议简直就是一模一样。 ![img](https://gitee.com/fakerlove/picture_1/raw/master/1538609-20190720105508727-442219527.png) ### 2.4.4 常用交换器 RabbitMQ常用的交换器类型有direct、topic、fanout、headers四种。 Direct Exchange 该类型的交换器将所有发送到该交换器的消息被转发到RoutingKey指定的队列中,也就是说路由到BindingKey和RoutingKey完全匹配的队列中。 ![img](https://gitee.com/fakerlove/picture_1/raw/master/1538609-20190720105736817-253615143.png) Topic Exchange 该类型的交换器将所有发送到Topic Exchange的消息被转发到所有RoutingKey中指定的Topic的队列上面。 Exchange将RoutingKey和某Topic进行模糊匹配,其中“*”用来匹配一个词,“#”用于匹配一个或者多个词。例如“com.#”能匹配到“com.rabbitmq.oa”和“com.rabbitmq”;而"login.*"只能匹配到“com.rabbitmq”。 ![img](https://gitee.com/fakerlove/picture_1/raw/master/1538609-20190720105754635-2077492605.png) Fanout Exchange 该类型不处理路由键,会把所有发送到交换器的消息路由到所有绑定的队列中。优点是转发消息最快,性能最好。 ![img](https://gitee.com/fakerlove/picture_1/raw/master/1538609-20190720105808645-873494263.png) Headers Exchange 该类型的交换器不依赖路由规则来路由消息,而是根据消息内容中的headers属性进行匹配。headers类型交换器性能差,在实际中并不常用。 # 3. 配置 ## 3.1 rabbitmq 所有命令 ~~~bash rabbitmq-defaults rabbitmq-env rabbitmq-queues rabbitmq-upgrade rabbitmqctl rabbitmq-diagnostics rabbitmq-plugins rabbitmq-server rabbitmqadmin ~~~ 有这么多命令 ## 3.2 命令介绍 ### 启动 ~~~bash rabbitmq-server ~~~ 后台启动 ~~~bash rabbitmq-server -detached ~~~ ### 查看状态 ~~~bash rabbitmqctl status ~~~ ### 关闭 ~~~bash rabbitmqctl stop ~~~ ### 所有ctl 指令 ~~~bash Help: autocomplete Provides command name autocomplete variants help Displays usage information for a command version Displays CLI tools version Nodes: await_startup Waits for the RabbitMQ application to start on the target node reset Instructs a RabbitMQ node to leave the cluster and return to its virgin state rotate_logs Instructs the RabbitMQ node to perform internal log rotation shutdown Stops RabbitMQ and its runtime (Erlang VM). Monitors progress for local nodes. Does not require a PID file path. start_app Starts the RabbitMQ application but leaves the runtime (Erlang VM) running stop Stops RabbitMQ and its runtime (Erlang VM). Requires a local node pid file path to monitor progress. stop_app Stops the RabbitMQ application, leaving the runtime (Erlang VM) running wait Waits for RabbitMQ node startup by monitoring a local PID file. See also 'rabbitmqctl await_online_nodes' Cluster: await_online_nodes Waits for nodes to join the cluster change_cluster_node_type Changes the type of the cluster node cluster_status Displays all the nodes in the cluster grouped by node type, together with the currently running nodes force_boot Forces node to start even if it cannot contact or rejoin any of its previously known peers force_reset Forcefully returns a RabbitMQ node to its virgin state forget_cluster_node Removes a node from the cluster join_cluster Instructs the node to become a member of the cluster that the specified node is in rename_cluster_node Renames cluster nodes in the local database update_cluster_nodes Instructs a cluster member node to sync the list of known cluster members from Replication: cancel_sync_queue Instructs a synchronising mirrored queue to stop synchronising itself sync_queue Instructs a mirrored queue with unsynchronised mirrors (follower replicas) to synchronise them Users: add_user Creates a new user in the internal database authenticate_user Attempts to authenticate a user. Exits with a non-zero code if authentication fails. change_password Changes the user password clear_password Clears (resets) password and disables password login for a user delete_user Removes a user from the internal database. Has no effect on users provided by external backends such as LDAP list_users List user names and tags set_user_tags Sets user tags Access Control: clear_permissions Revokes user permissions for a vhost clear_topic_permissions Clears user topic permissions for a vhost or exchange list_permissions Lists user permissions in a virtual host list_topic_permissions Lists topic permissions in a virtual host list_user_permissions Lists permissions of a user across all virtual hosts list_user_topic_permissions Lists user topic permissions list_vhosts Lists virtual hosts set_permissions Sets user permissions for a vhost set_topic_permissions Sets user topic permissions for an exchange Monitoring, observability and health checks: list_bindings Lists all bindings on a vhost list_channels Lists all channels in the node list_ciphers Lists cipher suites supported by encoding commands list_connections Lists AMQP 0.9.1 connections for the node list_consumers Lists all consumers for a vhost list_exchanges Lists exchanges list_hashes Lists hash functions supported by encoding commands list_queues Lists queues and their properties list_unresponsive_queues Tests queues to respond within timeout. Lists those which did not respond ping Checks that the node OS process is up, registered with EPMD and CLI tools can authenticate with it report Generate a server status report containing a concatenation of all server status information for support purposes schema_info Lists schema database tables and their properties status Displays status of a node Parameters: clear_global_parameter Clears a global runtime parameter clear_parameter Clears a runtime parameter. list_global_parameters Lists global runtime parameters list_parameters Lists runtime parameters for a virtual host set_global_parameter Sets a runtime parameter. set_parameter Sets a runtime parameter. Policies: clear_operator_policy Clears an operator policy clear_policy Clears (removes) a policy list_operator_policies Lists operator policy overrides for a virtual host list_policies Lists all policies in a virtual host set_operator_policy Sets an operator policy that overrides a subset of arguments in user policies set_policy Sets or updates a policy Virtual hosts: add_vhost Creates a virtual host clear_vhost_limits Clears virtual host limits delete_vhost Deletes a virtual host list_vhost_limits Displays configured virtual host limits restart_vhost Restarts a failed vhost data stores and queues set_vhost_limits Sets virtual host limits trace_off trace_on Configuration and Environment: decode Decrypts an encrypted configuration value encode Encrypts a sensitive configuration value environment Displays the name and value of each variable in the application environment for each running application set_cluster_name Sets the cluster name set_disk_free_limit Sets the disk_free_limit setting set_log_level Sets log level in the running node set_vm_memory_high_watermark Sets the vm_memory_high_watermark setting Definitions: export_definitions Exports definitions in JSON or compressed Erlang Term Format. import_definitions Imports definitions in JSON or compressed Erlang Term Format. Feature flags: enable_feature_flag Enables a feature flag on target node list_feature_flags Lists feature flags Operations: close_all_connections Instructs the broker to close all connections for the specified vhost or entire RabbitMQ node close_connection Instructs the broker to close the connection associated with the Erlang process id eval Evaluates a snippet of Erlang code on the target node eval_file Evaluates a file that contains a snippet of Erlang code on the target node exec Evaluates a snippet of Elixir code on the CLI node force_gc Makes all Erlang processes on the target node perform/schedule a full sweep garbage collection resume_listeners Resumes client connection listeners making them accept client connections again suspend_listeners Suspends client connection listeners so that no new client connections are accepted Queues: delete_queue Deletes a queue purge_queue Purges a queue (removes all messages in it) Deprecated: hipe_compile DEPRECATED. This command is a no-op. HiPE is no longer supported by modern Erlang versions node_health_check DEPRECATED. Performs intrusive, opinionated health checks on a fully booted node. See https://www.rabbitmq.com/monitoring.html#health-checks instead Use 'rabbitmqctl help ' to learn more about a specific command ~~~ ### 所有插件 ~~~bash rabbitmq-plugins list ~~~ ## 3.3 Web 页面 ### 3.3.1 页面介绍 ![image-20191126162026720](https://gitee.com/fakerlove/picture_1/raw/master/image-20191126162026720.png) - `connections:无论生产者还是消费者,都需要与RabbitMQ建立连接后才可以完成消息的生产和消费,在这里可以查看连接情况` - `channels:通道,建立连接后,会形成通道,消息的投递获取依赖通道。` - `Exchanges:交换机,用来实现消息的路由` - `Queues:队列,即消息队列,消息存放在队列中,等待消费,消费后被移除队列。` ### 3.3.2 Admin用户和虚拟主机管理 #### 添加用户 ![](https://gitee.com/fakerlove/picture_1/raw/master/image-20210107160055720.png) - `超级管理员(administrator)` 可登陆管理控制台,可查看所有的信息,并且可以对用户,策略(policy)进行操作。 - `监控者(monitoring)` 可登陆管理控制台,同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等) - `策略制定者(policymaker)` 可登陆管理控制台, 同时可以对policy进行管理。但无法查看节点的相关信息(上图红框标识的部分)。 - `普通管理者(management)` 仅可登陆管理控制台,无法看到节点信息,也无法对策略进行管理。 - `其他` 无法登陆管理控制台,通常就是普通的生产者和消费者。 #### 创建虚拟主机 ![](https://gitee.com/fakerlove/picture_1/raw/master/image-20210107160211451.png) > 为了让各个用户可以互不干扰的工作,RabbitMQ添加了虚拟主机(Virtual Hosts)的概念。其实就是一个独立的访问路径,不同用户使用不同路径,各自有自己的队列、交换机,互相不会影响。 #### 绑定虚拟主机和用户 ![](https://gitee.com/fakerlove/picture_1/raw/master/image-20210107160528807.png) ![](https://gitee.com/fakerlove/picture_1/raw/master/image-20210107160636824.png) # 4. Java 使用rabbitmq ## 4.1 直连模型--Helloword ![image-20191126165840602](https://gitee.com/fakerlove/picture_1/raw/master/image-20191126165840602.png) ![](https://gitee.com/fakerlove/picture_1/raw/master/image-20210107162920921.png) ### 引入依赖 ~~~xml 4.0.0 org.example helloword 1.0-SNAPSHOT com.rabbitmq amqp-client 5.10.0 ~~~ ### 创建开发生产者 ~~~java package com.ak.test; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import org.junit.Test; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Provider { @Test public void testSendMessage(){ ConnectionFactory connectionFactory=new ConnectionFactory(); // 设置主机名 connectionFactory.setHost("47.100.104.187"); // 设置端口号 connectionFactory.setPort(5672); // 设置连接的虚拟主机的名字 connectionFactory.setVirtualHost("/joker"); // 设置虚拟机的用户名和密码 connectionFactory.setUsername("joker"); connectionFactory.setPassword("123456"); // 获取连接对象 生产者----> 队列 try { // 获取连接对象 Connection connection=connectionFactory.newConnection(); // 获取连接中通道 Channel channel=connection.createChannel(); // 通道绑定对应消息队列 /** * 参数一:队列名字,队列不存在自动创建 * 参数二,是否持久化 * 参数三:是否独占队列 true 是独占队列 ,false 不独占 * 参数四: 是否在消费完成后删除队列 * 参数五:额外附加参数 */ channel.queueDeclare("hello",false,false,false,null); // 发布消息 /** * 参数一:交换机名称 * 参数二:队列名称 * 参数三: 传递消息额外设置 * 参数四:消息的内容 * */ channel.basicPublish("","hello",null,"hello rabbit".getBytes()); channel.close(); connection.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); }finally { } } } ~~~ ### 发布成功 ![](https://gitee.com/fakerlove/picture_1/raw/master/image-20210107164726713.png) ### 建立消费者 ~~~java package com.ak.test; import com.rabbitmq.client.*; import org.junit.Test; import java.io.IOException; import java.util.concurrent.TimeoutException; public class MyConsumer { public static void main(String[] args) { ConnectionFactory connectionFactory=new ConnectionFactory(); // 设置主机名 connectionFactory.setHost("47.100.104.187"); // 设置端口号 connectionFactory.setPort(5672); // 设置连接的虚拟主机的名字 connectionFactory.setVirtualHost("/joker"); // 设置虚拟机的用户名和密码 connectionFactory.setUsername("joker"); connectionFactory.setPassword("123456"); // 获取连接对象 生产者----> 队列 try { // 获取连接对象 Connection connection=connectionFactory.newConnection(); // 获取连接中通道 Channel channel=connection.createChannel(); // 通道绑定对应消息队列 /** * 参数一:队列名字,队列不存在自动创建 * 参数二,是否持久化 * 参数三:是否独占队列 true 是独占队列 ,false 不独占 * 参数四: 是否在消费完成后删除队列 * 参数五:额外附加参数 */ channel.queueDeclare("hello",false,false,false,null); // 发布消息 /** * 参数一:队列名称 * 参数二:开始消费的自动确认机制 * 参数三: 消费时的回调接口 * */ channel.basicConsume("",true, new DefaultConsumer(channel){ /** * 参数回调 * @param consumerTag * @param envelope * @param properties * @param body 消息队列中取出的消息 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(new String(body)); } }); channel.close(); connection.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); }finally { } } } ~~~ ### 查看是否被消费 ![](https://gitee.com/fakerlove/picture_1/raw/master/image-20210107165855526.png) ### 工具类 ~~~java package com.ak.utils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class RabbitMQUtils { private static ConnectionFactory connectionFactory; static { connectionFactory=new ConnectionFactory(); connectionFactory.setHost("47.100.104.187"); // 设置端口号 connectionFactory.setPort(5672); // 设置连接的虚拟主机的名字 connectionFactory.setVirtualHost("/joker"); // 设置虚拟机的用户名和密码 connectionFactory.setUsername("joker"); connectionFactory.setPassword("123456"); } public static Connection getConnection(){ // ConnectionFactory connectionFactory=new ConnectionFactory(); // 设置主机名 try { // 获取连接对象 Connection connection=connectionFactory.newConnection(); return connection; } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); }finally { } return null; } public static void closeConnectionAndChanel(Channel channel,Connection connection){ try { if(channel!=null){ channel.close(); } if(connection!=null){ connection.close(); } }catch (Exception e){ e.printStackTrace(); } } } ~~~ ### 使用测试 ~~~java package com.ak; import com.ak.utils.RabbitMQUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; public class Provider { public static void main(String[] args) { // 获取连接对象 生产者----> 队列 try { Connection connection= RabbitMQUtils.getConnection(); Channel channel=connection.createChannel(); channel.queueDeclare("hello",false,false,false,null); channel.basicPublish("","hello",null,"hello rabbit".getBytes()); RabbitMQUtils.closeConnectionAndChanel(channel,connection); } catch (IOException e) { e.printStackTrace(); } finally { } } } ~~~ ### 项目结构 ![](https://gitee.com/fakerlove/picture_1/raw/master/image-20210107193732651.png) ## 4.2 work quene 任务模型 > `Work queues`,也被称为(`Task queues`),任务模型。当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。此时就可以使用work 模型:**让多个消费者绑定到一个队列,共同消费队列中的消息**。队列中的消息一旦消费,就会消失,因此任务是不会被重复执行的。 ![image-20200314221002008](https://gitee.com/fakerlove/picture_1/raw/master/image-20200314221002008.png) 角色: - P:生产者:任务的发布者 - C1:消费者-1,领取任务并且完成任务,假设完成速度较慢 - C2:消费者-2:领取任务并完成任务,假设完成速度快 ### 创建生产者 ~~~java package com.ak.demo_2; import com.ak.utils.RabbitMQUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import java.io.IOException; public class Provider { public static void main(String[] args) { Connection connection= RabbitMQUtils.getConnection(); try { Channel channel=connection.createChannel(); channel.queueDeclare("work",true,false,false,null); for(int i=0;i<1000;i++){ channel.basicPublish("","work",null,("hello"+i).getBytes()); } RabbitMQUtils.closeConnectionAndChanel(channel,connection); } catch (IOException e) { e.printStackTrace(); } } } ~~~ ### 创建消费者 ~~~java package com.ak.demo_2; import com.ak.utils.RabbitMQUtils; import com.rabbitmq.client.*; import java.io.IOException; public class Consumer_1 { public static void main(String[] args) { try { // 获取连接对象 Connection connection=RabbitMQUtils.getConnection(); // 获取连接中通道 Channel channel=connection.createChannel(); channel.queueDeclare("work",true,false,false,null); channel.basicConsume("",true, new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(new String(body)); } }); // RabbitMQUtils.closeConnectionAndChanel(channel,connection); } catch (IOException e) { e.printStackTrace(); } finally { } } } ~~~ ~~~java package com.ak.demo_2; import com.ak.utils.RabbitMQUtils; import com.rabbitmq.client.*; import java.io.IOException; public class Consumer_2 { public static void main(String[] args) { try { // 获取连接对象 Connection connection=RabbitMQUtils.getConnection(); // 获取连接中通道 Channel channel=connection.createChannel(); channel.queueDeclare("work",true,false,false,null); channel.basicConsume("",true, new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(new String(body)); } }); // RabbitMQUtils.closeConnectionAndChanel(channel,connection); } catch (IOException e) { e.printStackTrace(); } finally { } } } ~~~ > `总结:默认情况下,RabbitMQ将按顺序将每个消息发送给下一个使用者。平均而言,每个消费者都会收到相同数量的消息。这种分发消息的方式称为循环。` ### 消息自动确认机制 > 如何实现能者多劳的任务模型。需要手动确认信息 ~~~java package com.ak.demo_2; import com.ak.utils.RabbitMQUtils; import com.rabbitmq.client.*; import java.io.IOException; public class Consumer_1_Information { public static void main(String[] args) { try { // 获取连接对象 Connection connection=RabbitMQUtils.getConnection(); // 获取连接中通道 Channel channel=connection.createChannel(); channel.basicQos(1); channel.queueDeclare("workquene",true,false,false,null); channel.basicConsume("",false, new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(new String(body)); // 手动确认 ,参数1:手动确认信息标识,参数2:false 每次确认一个 channel.basicAck(envelope.getDeliveryTag(),false); } }); // RabbitMQUtils.closeConnectionAndChanel(channel,connection); } catch (IOException e) { e.printStackTrace(); } finally { } } } ~~~ ## 4.3 fanout 模型 ![image-20191126213115873](https://gitee.com/fakerlove/picture_1/raw/master/image-20191126213115873.png) 在广播模式下,消息发送流程是这样的: - 可以有多个消费者 - 每个**消费者有自己的queue**(队列) - 每个**队列都要绑定到Exchange**(交换机) - **生产者发送的消息,只能发送到交换机**,交换机来决定要发给哪个队列,生产者无法决定。 - 交换机把消息发送给绑定过的所有队列 - 队列的消费者都能拿到消息。实现一条消息被多个消费者消费 ### 开发生产者 ~~~java package com.ak.fanout; import com.ak.utils.RabbitMQUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import java.io.IOException; public class Provider { public static void main(String[] args) throws IOException { Connection connection= RabbitMQUtils.getConnection(); Channel channel=connection.createChannel(); // 参数一:为交换机名称,参数二:fanout 为交换机 channel.exchangeDeclare("joker","fanout"); // 发送信息 channel.basicPublish("joker","",null,"fanout type message".getBytes()); RabbitMQUtils.closeConnectionAndChanel(channel,connection); } } ~~~ ### 开发消费者 ~~~java package com.ak.fanout; import com.ak.utils.RabbitMQUtils; import com.rabbitmq.client.*; import java.io.IOException; public class MyConsumer_3 { public static void main(String[] args) throws IOException { Connection connection = RabbitMQUtils.getConnection(); Channel channel=connection.createChannel(); // 通道绑定交换机 channel.exchangeDeclare("joker","fanout"); // 临时队列 String queneName=channel.queueDeclare().getQueue(); // 绑定交换机和队列 channel.queueBind(queneName,"joker",""); // 消费信息 channel.basicConsume(queneName,true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(new String(body)); } }); } } ~~~ ## 4.4 Routing ### 4.4.1 直连 `在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。` 在Direct模型下: - 队列与交换机的绑定,不能是任意绑定了,而是要指定一个`RoutingKey`(路由key) - 消息的发送方在 向 Exchange发送消息时,也必须指定消息的 `RoutingKey`。 - Exchange不再把消息交给每一个绑定的队列,而是根据消息的`Routing Key`进行判断,只有队列的`Routingkey`与消息的 `Routing key`完全一致,才会接收到消息 流程: ![image-20191126220145375](https://gitee.com/fakerlove/picture_1/raw/master/image-20191126220145375.png) 图解: - P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。 - X:Exchange(交换机),接收生产者的消息,然后把消息递交给 与routing key完全匹配的队列 - C1:消费者,其所在队列指定了需要routing key 为 error 的消息 - C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息 > 我希望只有错误日志的时候,才能存储到磁盘 > > 其他日志在控制台打印 #### 开发生产者 ~~~java package com.ak.routedirect; import com.ak.utils.RabbitMQUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import java.io.IOException; import java.util.Random; public class Provider { public static void main(String[] args) throws IOException { Connection connection= RabbitMQUtils.getConnection(); Channel channel=connection.createChannel(); // 参数一:为交换机名称,参数二:fanout 为交换机 channel.exchangeDeclare("log_router","direct"); String []routeKey={"error","info","waring","debug"}; // 发送信息 for(int i=0;i<10;i++){ int temp=new Random().nextInt(100)%4; channel.basicPublish("log_router",routeKey[temp],null,("发送的信息为 "+routeKey[temp]).getBytes()); } RabbitMQUtils.closeConnectionAndChanel(channel,connection); } } ~~~ #### 开发消费者 ~~~java package com.ak.routedirect; import com.ak.utils.RabbitMQUtils; import com.rabbitmq.client.*; import java.io.IOException; public class Consumer_1 { public static void main(String[] args) throws IOException { Connection connection= RabbitMQUtils.getConnection(); // 获取连接中通道 Channel channel=connection.createChannel(); channel.exchangeDeclare("log_router","direct"); // 临时队列 String queneName=channel.queueDeclare().getQueue(); // 绑定交换机和队列 channel.queueBind(queneName,"log_router","error"); // 消费信息 channel.basicConsume(queneName,true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者2 "+new String(body)); } }); } } ~~~ ~~~java package com.ak.routedirect; import com.ak.utils.RabbitMQUtils; import com.rabbitmq.client.*; import java.io.IOException; public class Consumer_2 { public static void main(String[] args) throws IOException { Connection connection= RabbitMQUtils.getConnection(); // 获取连接中通道 Channel channel=connection.createChannel(); channel.exchangeDeclare("log_router","direct"); // 临时队列 String queneName=channel.queueDeclare().getQueue(); // 绑定交换机和队列 channel.queueBind(queneName,"log_router","error"); channel.queueBind(queneName,"log_router","info"); channel.queueBind(queneName,"log_router","waring"); channel.queueBind(queneName,"log_router","debug"); // 消费信息 channel.basicConsume(queneName,true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者1 "+new String(body)); } }); } } ~~~ #### 检验 ![](https://gitee.com/fakerlove/picture_1/raw/master/image-20210107214118576.png) ### 4.4.2 Routing 之订阅模型-Topic `Topic`类型的`Exchange`与`Direct`相比,都是可以根据`RoutingKey`把消息路由到不同的队列。只不过`Topic`类型`Exchange`可以让队列在绑定`Routing key` 的时候使用通配符!这种模型`Routingkey` 一般都是由一个或多个单词组成,多个单词之间以”.”分割,例如: `item.insert` ![image-20191127121900255](https://gitee.com/fakerlove/picture_1/raw/master/image-20191127121900255.png) ``` markdown # 统配符 * (star) can substitute for exactly one word. 匹配不多不少恰好1个词 # (hash) can substitute for zero or more words. 匹配一个或多个词 # 如: audit.# 匹配audit.irs.corporate或者 audit.irs 等 audit.* 只能匹配 audit.irs ``` ##### #### 创建生产者 ~~~java package com.ak.routeTopic; import com.ak.utils.RabbitMQUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import java.io.IOException; import java.util.Random; public class Provider { public static void main(String[] args) throws IOException { Connection connection= RabbitMQUtils.getConnection(); Channel channel=connection.createChannel(); // 参数一:为交换机名称,参数二:fanout 为交换机 String channelName="log_top"; channel.exchangeDeclare(channelName,"topic"); String []routeKey={"user.save","user.add","admin.add","admin.save"}; for (int i=0;i<10;i++){ int temp=new Random().nextInt(100)%4; channel.basicPublish(channelName,routeKey[temp],null,("这个是topics 发布的信息"+routeKey[temp]).getBytes()); } RabbitMQUtils.closeConnectionAndChanel(channel,connection); } } ~~~ #### 创建消费者 ~~~java package com.ak.routeTopic; import com.ak.utils.RabbitMQUtils; import com.rabbitmq.client.*; import java.io.IOException; public class Consumer_1 { public static void main(String[] args) throws IOException { Connection connection= RabbitMQUtils.getConnection(); // 获取连接中通道 Channel channel=connection.createChannel(); String channelName="log_top"; channel.exchangeDeclare(channelName,"topic"); // 临时队列 String queneName=channel.queueDeclare().getQueue(); // 绑定交换机和队列 channel.queueBind(queneName,channelName,"user.*"); // 消费信息 channel.basicConsume(queneName,true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者user 的信息 "+new String(body)); } }); } } ~~~ ~~~java package com.ak.routeTopic; import com.ak.utils.RabbitMQUtils; import com.rabbitmq.client.*; import java.io.IOException; public class Consumer_2 { public static void main(String[] args) throws IOException { Connection connection= RabbitMQUtils.getConnection(); // 获取连接中通道 Channel channel=connection.createChannel(); String channelName="log_top"; channel.exchangeDeclare(channelName,"topic"); // 临时队列 String queneName=channel.queueDeclare().getQueue(); // 绑定交换机和队列 channel.queueBind(queneName,channelName,"admin.*"); // 消费信息 channel.basicConsume(queneName,true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者admin 的信息 "+new String(body)); } }); } } ~~~ #### 检查 ![](https://gitee.com/fakerlove/picture_1/raw/master/image-20210107215833763.png) # 5. 整合SpringBoot ## 5.1 helloword 模型 ### 引入依赖 ~~~xml 4.0.0 org.springframework.boot spring-boot-starter-parent 2.4.1 org.example demo1 1.0-SNAPSHOT 11 org.springframework.boot spring-boot-starter-web org.springframework.boot spring-boot-starter-amqp org.springframework.amqp spring-rabbit-test test org.springframework.boot spring-boot-starter-test test org.junit.vintage junit-vintage-engine org.springframework.boot spring-boot-maven-plugin ~~~ ### 创建生产者 ~~~java package com.ak.demo; import org.junit.jupiter.api.Test; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; @SpringBootTest(classes = RabbitmqSpringApplication.class) public class MyTest { @Autowired private RabbitTemplate rabbitTemplate; @Test public void test(){ rabbitTemplate.convertAndSend("hello","hello world"); } } ~~~ ### 创建消费者 ~~~java package com.ak.demo.hello; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * 默认持久化队列 */ @Component @RabbitListener(queuesToDeclare =@Queue("hello")) public class HelloCustomer { @RabbitHandler public void kk(String message){ System.out.println(message); } } ~~~ ### 目录结构 ![](https://gitee.com/fakerlove/picture_1/raw/master/image-20210108090136641.png) ## 5.2 workquene 模型 ### 修改work 类 ~~~java @Test public void test2(){ for(int i=0;i<10;i++){ rabbitTemplate.convertAndSend("work","work 模型"); } } ~~~ ### 创建消费者 ~~~java package com.ak.demo.work; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class WorkConsumer { @RabbitListener(queuesToDeclare =@Queue("work")) public void receive(String messaage){ System.out.println("消费者1---"+messaage); } @RabbitListener(queuesToDeclare =@Queue("work")) public void receive2(String messaage){ System.out.println("消费者2--"+messaage); } } ~~~ ![](https://gitee.com/fakerlove/picture_1/raw/master/image-20210108092648830.png) ### 创建确认机制 ## 5.3 广播模式 ### 创建生产者 ~~~java /** * 广播形式的发布信息 */ @Test public void test3(){ for(int i=0;i<10;i++){ rabbitTemplate.convertAndSend("kk","","广播信息"); } } ~~~ ### 创建消费者 ~~~java package com.ak.demo.fanout; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class FanOutConsumer { @RabbitListener(bindings = { @QueueBinding( value =@Queue, exchange = @Exchange(value = "kk",type = "fanout") ) }) public void receive(String message){ System.out.println("----"); System.out.println("广播信息1"+message); } @RabbitListener(bindings = { @QueueBinding( value =@Queue, exchange = @Exchange(value = "kk",type = "fanout") ) }) public void receive2(String message){ System.out.println("----"); System.out.println("广播信息2"+message); } } ~~~ ### 测试 ![](https://gitee.com/fakerlove/picture_1/raw/master/image-20210108100051098.png) ## 5.4 直连模式 ### 创建生产者 ~~~java /** * 测试路由模型 */ @Test public void testDirect(){ String []routeKey={"error","info","waring","debug"}; for(int i=0;i<10;i++){ int temp=new Random().nextInt(100)%4; rabbitTemplate.convertAndSend("directs",routeKey[temp],routeKey[temp]+"的日志信息"); } } ~~~ ### 创建消费者 ~~~java package com.ak.demo.router; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class DirectConsumer { String []routeKey={"error","info","waring","debug"}; @RabbitListener(bindings = { @QueueBinding( value = @Queue,// 临时队列 exchange = @Exchange(value = "directs",type = "direct"), key={"error","info","waring","debug"} ) }) public void receive(String message){ System.out.println("接受全部信息--"+message); } @RabbitListener(bindings = { @QueueBinding( value = @Queue,// 临时队列 exchange = @Exchange(value = "directs",type = "direct"), key={"error"} ) }) public void receive2(String message){ System.out.println("只接受error--"+message); } } ~~~ ### 检查 ![](https://gitee.com/fakerlove/picture_1/raw/master/image-20210108100907225.png) ## 5.5 Topic 模式 ### 创建生产者 ~~~java @Test public void testFive(){ String []routeKey={"user.save","user.add","admin.add","admin.save"}; for(int i=0;i<10;i++){ int temp=new Random().nextInt(100)%4; rabbitTemplate.convertAndSend("topics",routeKey[temp],routeKey[temp]+"信息"); } } ~~~ ### 消费者 ~~~java package com.ak.demo.topic; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class TopicConsumer { @RabbitListener(bindings = { @QueueBinding( value = @Queue, exchange = @Exchange(name = "topics",type = "topic"), key={"user.*"} ) }) public void receive(String message){ System.out.println("user类---"+message); } @RabbitListener(bindings = { @QueueBinding( value = @Queue, exchange = @Exchange(name = "topics",type = "topic"), key={"admin.*"} ) }) public void receive2(String message){ System.out.println("admin 类--"+message); } } ~~~ ### 测试 ![](https://gitee.com/fakerlove/picture_1/raw/master/image-20210108101624614.png) # 6. 搭建集群 ## 6.1 命令行搭建 `默认情况下:RabbitMQ代理操作所需的所有数据/状态都将跨所有节点复制。这方面的一个例外是消息队列,默认情况下,消息队列位于一个节点上,尽管它们可以从所有节点看到和访问` 1. ##### 架构图 ![image-20200320094147471](https://gitee.com/fakerlove/picture_1/raw/master/image-20200320094147471.png) ​ 核心解决问题: `当集群中某一时刻master节点宕机,可以对Quene中信息,进行备份` 2. ##### 集群搭建 ```markdown # 0.集群规划 node1: 10.15.0.3 mq1 master 主节点 node2: 10.15.0.4 mq2 repl1 副本节点 node3: 10.15.0.5 mq3 repl2 副本节点 # 1.克隆三台机器主机名和ip映射 vim /etc/hosts加入: 10.15.0.3 mq1 10.15.0.4 mq2 10.15.0.5 mq3 node1: vim /etc/hostname 加入: mq1 node2: vim /etc/hostname 加入: mq2 node3: vim /etc/hostname 加入: mq3 # 2.三个机器安装rabbitmq,并同步cookie文件,在node1上执行: scp /var/lib/rabbitmq/.erlang.cookie root@mq2:/var/lib/rabbitmq/ scp /var/lib/rabbitmq/.erlang.cookie root@mq3:/var/lib/rabbitmq/ # 3.查看cookie是否一致: node1: cat /var/lib/rabbitmq/.erlang.cookie node2: cat /var/lib/rabbitmq/.erlang.cookie node3: cat /var/lib/rabbitmq/.erlang.cookie # 4.后台启动rabbitmq所有节点执行如下命令,启动成功访问管理界面: rabbitmq-server -detached # 5.在node2和node3执行加入集群命令: 1.关闭 rabbitmqctl stop_app 2.加入集群 rabbitmqctl join_cluster rabbit@mq1 3.启动服务 rabbitmqctl start_app # 6.查看集群状态,任意节点执行: rabbitmqctl cluster_status # 7.如果出现如下显示,集群搭建成功: Cluster status of node rabbit@mq3 ... [{nodes,[{disc,[rabbit@mq1,rabbit@mq2,rabbit@mq3]}]}, {running_nodes,[rabbit@mq1,rabbit@mq2,rabbit@mq3]}, {cluster_name,<<"rabbit@mq1">>}, {partitions,[]}, {alarms,[{rabbit@mq1,[]},{rabbit@mq2,[]},{rabbit@mq3,[]}]}] # 8.登录管理界面,展示如下状态: ``` ![image-20200320095613586](https://gitee.com/fakerlove/picture_1/raw/master/image-20200320095613586.png) ```markdown # 9.测试集群在node1上,创建队列 ``` ![image-20200320095743935](https://gitee.com/fakerlove/picture_1/raw/master/image-20200320095743935.png) ```markdown # 10.查看node2和node3节点: ``` ![image-20200320095827688](https://gitee.com/fakerlove/picture_1/raw/master/image-20200320095827688.png) ![image-20200320095843370](https://gitee.com/fakerlove/picture_1/raw/master/image-20200320095843370.png) ```markdown # 11.关闭node1节点,执行如下命令,查看node2和node3: rabbitmqctl stop_app ``` ![image-20200320100000347](https://gitee.com/fakerlove/picture_1/raw/master/image-20200320100000347.png) ![image-20200320100010968](https://gitee.com/fakerlove/picture_1/raw/master/image-20200320100010968.png) ## 6.2 docker 搭建 ![image-20200320113423235](https://gitee.com/fakerlove/picture_1/raw/master/image-20200320113423235.png) > 使用docker 进行搭建 删除所有镜像 ~~~bash docker stop myrabbit1 myrabbit2 myrabbit3 docker rm myrabbit1 myrabbit2 myrabbit3 ~~~ 自定义网络 ~~~bash docker network create --driver bridge --subnet 192.168.0.0/24 --gateway 192.168.0.1 myrediswork ~~~ 创建集群 ~~~bash docker run -d --hostname rabbit1 --name myrabbit1 -p 5675:5672 -p 15673:15672 -v ~/mydata/rabbitmq/rabbitmq01:/var/lib/rabbitmq -e RABBITMQ_ERLANG_COOKIE='rabbitcookie' rabbitmq:3-management docker run -d --hostname rabbit2 --name myrabbit2 -p 5673:5672 -p 15674:15672 -v ~/mydata/rabbitmq/rabbitmq02:/var/lib/rabbitmq --link myrabbit1:rabbit1 -e RABBITMQ_ERLANG_COOKIE='rabbitcookie' rabbitmq:3-management docker run -d --hostname rabbit3 --name myrabbit3 -p 5674:5672 -p 15675:15672 -v ~/mydata/rabbitmq/rabbitmq03:/var/lib/rabbitmq --link myrabbit1:rabbit1 --link myrabbit2:rabbit2 -e RABBITMQ_ERLANG_COOKIE='rabbitcookie' rabbitmq:3-management ~~~ 进入每个集群中,然后运行命令 ~~~bash docker exec -it myrabbit1 /bin/bash rabbitmqctl stop_app rabbitmqctl reset rabbitmqctl start_app exit ~~~ **这里主义 myrabbit2 是不一样的** ~~~bash docker exec -it myrabbit2 bash rabbitmqctl stop_app rabbitmqctl join_cluster --ram rabbit@rabbit1 rabbitmqctl start_app exit ~~~ ~~~bash docker exec -it myrabbit3 bash rabbitmqctl stop_app rabbitmqctl join_cluster --ram rabbit@rabbit1 rabbitmqctl start_app exit ~~~ 访问网址 ~~~bash http://www.jokerak.com:15673/#/ ~~~ **问题** > Error response from daemon: Pool overlaps with other one on this address space > > 192.168.0.0 网段已经被使用了,换个网段即可