尚硅谷rabbitmq 2024 Federation配置 第60节答疑

news/2024/10/16 19:20:08 标签: rabbitmq, 分布式, java, 开发语言, 后端

rabbitmq联邦队列怎么做


要在 RabbitMQ 中设置联邦队列(Federated Queues),你需要遵循以下步骤。联邦队列允许你在不同的 RabbitMQ 实例之间共享队列,从而实现消息的分布式处理和高可用性。

### 步骤 1:安装 RabbitMQ 联邦插件

首先,确保你的 RabbitMQ 安装了联邦插件。你可以通过以下命令启用插件:

```bash
rabbitmq-plugins enable rabbitmq_federation
rabbitmq-plugins enable rabbitmq_federation_management
```

### 步骤 2:配置联邦 Upstream

联邦 Upstream 是指一个 RabbitMQ 服务器如何连接到另一个 RabbitMQ 服务器。你需要在 RabbitMQ 配置文件中定义这些连接。

编辑 RabbitMQ 配置文件(通常是 `rabbitmq.conf` 或 `advanced.config`),添加以下内容:

```erlang
[
  {rabbitmq_federation,
    [
      {upstream, "upstream-name",
        [
          {uri, "amqp://user:password@remote-rabbitmq-host"},
          {expires, 3600000}
        ]
      }
    ]
  }
].
```

### 步骤 3:定义联邦策略

接下来,你需要定义一个策略来应用联邦配置。策略可以通过 RabbitMQ 管理控制台或命令行工具设置。

通过命令行工具设置策略:

```bash
rabbitmqctl set_policy federate-me "^federated\." \
  '{"federation-upstream-set":"all"}' --apply-to queues
```

这个命令的意思是将所有以 `federated.` 开头的队列应用联邦策略,并使用之前配置的所有联邦 Upstream。

### 步骤 4:创建联邦队列

现在你可以创建一个队列,并确保它符合策略的命名规则:

```bash
rabbitmqadmin declare queue name=federated.myqueue
```

这个队列将自动应用你之前定义的联邦策略,并连接到配置的 Upstream。

### 验证配置

你可以通过 RabbitMQ 管理控制台(通常在 `http://localhost:15672`)检查联邦配置和队列的状态。在“Federation”标签下,你应该能够看到联邦链接的状态和活动情况。

### 结论

通过以上步骤,你就可以在 RabbitMQ 中设置联邦队列,实现不同 RabbitMQ 实例之间的消息共享和分布式处理。如果你有更多的定制需求,可以参考 [RabbitMQ 官方文档](https://www.rabbitmq.com/federation.html) 获取更详细的信息。

===========
'{"federation-upstream-set":"all"}'  这个是什么,不是应该定义"federation-upstream":"upstream-name"吗?


你说得对,`"federation-upstream-set":"all"` 和 `"federation-upstream":"upstream-name"` 是不同的配置方式。

- `"federation-upstream-set":"all"`:这是用来应用所有定义的 upstream 集合。
- `"federation-upstream":"upstream-name"`:这是用来应用特定的 upstream。

在你的例子中,如果你只想使用特定的 upstream,你应该使用 `"federation-upstream":"upstream-name"`。以下是更新后的步骤:

### 步骤 1:安装 RabbitMQ 联邦插件

确保你的 RabbitMQ 安装了联邦插件:

```bash
rabbitmq-plugins enable rabbitmq_federation
rabbitmq-plugins enable rabbitmq_federation_management
```

### 步骤 2:配置联邦 Upstream

在 RabbitMQ 配置文件中定义 upstream:

```erlang
[
  {rabbitmq_federation,
    [
      {upstream, "upstream-name",
        [
          {uri, "amqp://user:password@remote-rabbitmq-host"},
          {expires, 3600000}
        ]
      }
    ]
  }
].
```

### 步骤 3:定义联邦策略

通过命令行工具设置策略,应用特定的 upstream:

```bash
rabbitmqctl set_policy federate-me "^federated\." \
  '{"federation-upstream":"upstream-name"}' --apply-to queues
```

这个命令的意思是将所有以 `federated.` 开头的队列应用联邦策略,并使用你定义的 `"upstream-name"`。

### 步骤 4:创建联邦队列

创建一个符合策略命名规则的队列:

```bash
rabbitmqadmin declare queue name=federated.myqueue
```

这个队列将自动应用你之前定义的联邦策略,并连接到配置的 upstream。

### 验证配置

你可以通过 RabbitMQ 管理控制台(通常在 `http://localhost:15672`)检查联邦配置和队列的状态。在“Federation”标签下,你应该能够看到联邦链接的状态和活动情况。

### 结论

通过以上步骤,你可以在 RabbitMQ 中设置联邦队列,并指定特定的 upstream。如果你有更多的定制需求,可以参考 [RabbitMQ 官方文档](https://www.rabbitmq.com/federation.html) 获取更详细的信息。


============
但此时发现下游节点中联邦队列并没有接收到消息,这是为什么呢?这里就体现出了联邦队列和联邦交换机工作逻辑的区别。对联邦队列来说,如果没有监听联邦队列的消费端程序,它是不会到上游去拉取消息的!如果有消费端监听联邦队列,那么首先消费联邦队列重身的消息;如果联邦队列为空,这时候才会到上游队列节点中拉取消息。所以现在的测试效果需要消费端程序配合才能看到:

这话什么意思?具体演示一下,联邦队列和联邦交换机使用什么区别,好像说在ui界面上,上游发消息下游看不到,但是在springboot里面消费就可以?


你提到的联邦队列和联邦交换机的区别确实是 RabbitMQ 联邦特性中的一个关键点。让我详细解释一下,并通过一个具体的例子来演示它们的工作机制。

### 联邦队列 vs 联邦交换机

#### 联邦队列(Federated Queues)

联邦队列的工作原理是“按需拉取”。这意味着下游节点上的联邦队列只有在有消费者监听时才会从上游节点拉取消息。如果没有消费者监听,联邦队列不会主动拉取消息。

#### 联邦交换机(Federated Exchanges)

联邦交换机的工作原理是“主动推送”。这意味着上游节点上的消息会主动推送到下游节点的交换机,然后路由到相应的队列中。无论下游是否有消费者监听,消息都会被推送到下游节点。

### 具体演示

#### 环境准备

假设我们有两个 RabbitMQ 实例:
- 上游节点(Upstream):`upstream-rabbitmq`
- 下游节点(Downstream):`downstream-rabbitmq`

#### 配置联邦队列

1. **在上游节点创建队列**

   ```bash
   rabbitmqadmin -H upstream-rabbitmq declare queue name=upstream.queue
   ```

2. **在下游节点配置联邦 Upstream**

   在 `downstream-rabbitmq` 的配置文件中添加:

   ```erlang
   [
     {rabbitmq_federation,
       [
         {upstream, "upstream-name",
           [
             {uri, "amqp://user:password@upstream-rabbitmq"},
             {expires, 3600000}
           ]
         }
       ]
     }
   ].
   ```

3. **在下游节点设置联邦策略**

   ```bash
   rabbitmqctl -H downstream-rabbitmq set_policy federate-me "^federated\." \
     '{"federation-upstream":"upstream-name"}' --apply-to queues
   ```

4. **在下游节点创建联邦队列**

   ```bash
   rabbitmqadmin -H downstream-rabbitmq declare queue name=federated.queue
   ```

5. **在上游节点发送消息**

   ```bash
   rabbitmqadmin -H upstream-rabbitmq publish routing_key=upstream.queue payload="Hello, World!"
   ```

6. **在下游节点启动消费者**

   使用 Spring Boot 应用监听 `federated.queue`:

   ```java
   @Service
   public class RabbitMqConsumer {

       @RabbitListener(queues = "federated.queue")
       public void listen(String message) {
           System.out.println("Received message: " + message);
       }
   }
   ```

#### 配置联邦交换机

1. **在上游节点创建交换机和队列**

   ```bash
   rabbitmqadmin -H upstream-rabbitmq declare exchange name=upstream.exchange type=direct
   rabbitmqadmin -H upstream-rabbitmq declare queue name=upstream.queue
   rabbitmqadmin -H upstream-rabbitmq declare binding source=upstream.exchange destination=upstream.queue routing_key=key
   ```

2. **在下游节点配置联邦 Upstream**

   在 `downstream-rabbitmq` 的配置文件中添加:

   ```erlang
   [
     {rabbitmq_federation,
       [
         {upstream, "upstream-name",
           [
             {uri, "amqp://user:password@upstream-rabbitmq"},
             {expires, 3600000}
           ]
         }
       ]
     }
   ].
   ```

3. **在下游节点设置联邦策略**

   ```bash
   rabbitmqctl -H downstream-rabbitmq set_policy federate-me "^federated\." \
     '{"federation-upstream":"upstream-name"}' --apply-to exchanges
   ```

4. **在下游节点创建联邦交换机**

   ```bash
   rabbitmqadmin -H downstream-rabbitmq declare exchange name=federated.exchange type=direct
   ```

5. **在下游节点创建队列并绑定到联邦交换机**

   ```bash
   rabbitmqadmin -H downstream-rabbitmq declare queue name=downstream.queue
   rabbitmqadmin -H downstream-rabbitmq declare binding source=federated.exchange destination=downstream.queue routing_key=key
   ```

6. **在上游节点发送消息**

   ```bash
   rabbitmqadmin -H upstream-rabbitmq publish exchange=upstream.exchange routing_key=key payload="Hello, Federated Exchange!"
   ```

### 结果验证

- **联邦队列**:如果在下游节点没有消费者监听 `federated.queue`,消息不会被拉取。当你启动 Spring Boot 应用监听 `federated.queue` 时,消息才会被拉取并消费。
- **联邦交换机**:无论下游节点是否有消费者监听 `downstream.queue`,消息都会被推送到 `federated.exchange`,并路由到 `downstream.queue`。你可以在 RabbitMQ 管理界面中看到消息已经到达下游节点的队列中。

通过这些步骤,你可以看到联邦队列和联邦交换机在工作逻辑上的区别。希望这能帮助你更好地理解和使用 RabbitMQ 的联邦特性。


===========
shovel从一个位置移动另一个位置,有什么用呢,为什么不直接用连一个队列呢?


Shovel 和联邦队列(Federated Queues)或联邦交换机(Federated Exchanges)在功能和使用场景上有所不同。Shovel 是一种插件,它允许你将消息从一个 RabbitMQ 服务器(或集群)移动到另一个 RabbitMQ 服务器(或集群)。以下是一些使用 Shovel 的场景和它的优势:

### Shovel 的使用场景

1. **跨数据中心消息传递**:
   - 如果你有多个数据中心,并且需要在这些数据中心之间传递消息,Shovel 可以帮助你实现这一点。它可以将消息从一个数据中心的 RabbitMQ 服务器移动到另一个数据中心的 RabbitMQ 服务器。

2. **灾难恢复**:
   - Shovel 可以用于灾难恢复场景。你可以将消息从一个主要的 RabbitMQ 服务器移动到一个备用的 RabbitMQ 服务器,以确保在主要服务器出现故障时消息不会丢失。

3. **数据迁移**:
   - 如果你需要将消息从一个旧的 RabbitMQ 服务器迁移到一个新的 RabbitMQ 服务器,Shovel 可以帮助你完成这一任务。

4. **不同网络环境**:
   - 当你的 RabbitMQ 服务器位于不同的网络环境中(例如,一个在内网,一个在外网),Shovel 可以帮助你在这些不同的网络环境之间传递消息。

### Shovel 的优势

- **独立于消费者**:Shovel 是一个独立的进程,它会持续地将消息从源队列移动到目标队列,无需在目标 RabbitMQ 服务器上有消费者。
- **可靠性**:Shovel 可以配置成持久化的,以确保在 RabbitMQ 服务器重启或发生故障时消息不会丢失。
- **灵活性**:Shovel 支持多种配置选项,可以根据你的需求进行定制。

### 为什么不直接连接一个队列?

直接连接队列(例如,通过联邦队列或联邦交换机)虽然在某些情况下是合适的,但在以下场景中可能不太适用:

1. **网络隔离**:
   - 当两个 RabbitMQ 服务器位于不同的网络环境中,直接连接可能会受到网络隔离的限制,而 Shovel 可以跨越这些限制。

2. **不同的 RabbitMQ 版本**:
   - 如果源和目标 RabbitMQ 服务器运行不同的版本,直接连接可能会有兼容性问题,而 Shovel 可以在这种情况下工作。

3. **复杂的路由需求**:
   - Shovel 可以配置复杂的路由规则,将消息从一个队列移动到另一个队列,甚至可以重命名队列或交换机。

### Shovel 配置示例

以下是一个 Shovel 配置的示例:

```erlang
[
  {rabbitmq_shovel,
    [
      {shovels,
        [
          {my_shovel,
            [
              {sources, [{broker, "amqp://user:password@source-rabbitmq"}]},
              {destinations, [{broker, "amqp://user:password@destination-rabbitmq"}]},
              {queue, <<"source-queue">>},
              {queue, <<"destination-queue">>},
              {ack_mode, on_confirm},
              {reconnect_delay, 5}
            ]
          }
        ]
      }
    ]
  }
].
```

在这个配置中:
- `sources` 定义了源 RabbitMQ 服务器。
- `destinations` 定义了目标 RabbitMQ 服务器。
- `queue` 指定了源和目标队列。
- `ack_mode` 设置为 `on_confirm` 以确保消息被确认。
- `reconnect_delay` 设置为 5 秒,以在连接断开时进行重连。

### 结论

Shovel 提供了一种灵活且可靠的方式,将消息从一个 RabbitMQ 服务器移动到另一个 RabbitMQ 服务器。它在跨数据中心消息传递、灾难恢复和数据迁移等场景中非常有用。如果你需要在不同网络环境或不同版本的 RabbitMQ 服务器之间传递消息,Shovel 是一个很好的选择。

===============
Shovel Status
Shovel Management
-------------ui管理界面为什么要有两个菜单

在 RabbitMQ 管理界面中,`Shovel Status` 和 `Shovel Management` 是两个不同的菜单项,它们提供了不同的功能和信息:

### Shovel Status

`Shovel Status` 菜单项主要用于监控和查看当前 Shovel 的运行状态。它提供了以下信息:

1. **Shovel 名称**:显示每个 Shovel 的名称。
2. **源和目标**:显示每个 Shovel 的源和目标队列或交换机的信息。
3. **状态**:显示 Shovel 的当前状态,例如是否正在运行、暂停或出错。
4. **统计信息**:显示 Shovel 的消息传输统计信息,例如传输的消息数量、传输速率等。

通过 `Shovel Status`,你可以实时监控 Shovel 的健康状况和性能,确保消息传输的可靠性。

### Shovel Management

`Shovel Management` 菜单项主要用于管理和配置 Shovel。它提供了以下功能:

1. **创建 Shovel**:允许你通过 UI 界面创建新的 Shovel。你可以指定源和目标 RabbitMQ 服务器、队列或交换机,以及其他配置选项。
2. **编辑 Shovel**:允许你修改现有 Shovel 的配置。你可以更新源和目标、调整传输参数等。
3. **删除 Shovel**:允许你删除不再需要的 Shovel。
4. **查看详细配置**:你可以查看每个 Shovel 的详细配置参数,了解其具体设置。

通过 `Shovel Management`,你可以方便地管理 Shovel 的生命周期,从创建到修改再到删除,所有操作都可以在 UI 界面上完成。

### 为什么要有两个菜单项?

1. **职责分离**:
   - `Shovel Status` 专注于监控和查看 Shovel 的运行状态,提供实时的性能和健康状况信息。
   - `Shovel Management` 专注于管理和配置 Shovel,提供创建、编辑和删除 Shovel 的功能。

2. **用户角色**:
   - 不同的用户角色可能需要访问不同的功能。例如,运维人员可能更关注 `Shovel Status` 以监控系统健康状况,而开发人员或管理员可能更关注 `Shovel Management` 以配置和管理 Shovel。

3. **简化操作**:
   - 将监控和管理功能分开,可以简化用户操作,使界面更加清晰和易于使用。用户可以快速找到他们需要的功能,而不必在一个复杂的界面中查找。

### 示例

假设你在 RabbitMQ 管理界面中:

- **Shovel Status**:
  - 你可以看到一个名为 `my_shovel` 的 Shovel 正在运行。
  - 它的源是 `amqp://user:password@source-rabbitmq`,目标是 `amqp://user:password@destination-rabbitmq`。
  - 当前状态是 `running`,传输速率是 `10 messages/second`。

- **Shovel Management**:
  - 你可以点击 `Create Shovel` 按钮,输入名称 `new_shovel`,配置源和目标 RabbitMQ 服务器、队列或交换机,然后点击 `Save` 创建新的 Shovel。
  - 你可以点击现有的 `my_shovel`,修改其配置,例如更改目标队列,然后点击 `Save` 保存修改。
  - 你可以点击 `Delete` 按钮,删除不再需要的 Shovel。

通过这两个菜单项,你可以全面地管理和监控 RabbitMQ 中的 Shovel,确保消息传输的可靠性和高效性。
 


http://www.niftyadmin.cn/n/5708414.html

相关文章

Java避坑案例 - ConcurrentHashMap 的使用陷阱

文章目录 Pre概述场景问题复现现象 问题分析解决方案&#xff1a;加锁版本Fix效果 改进与更优方案&#xff1a;使用原子操作避免加锁使用 computeIfAbsent 等原子方法单线程执行填充操作 总结 Pre J.U.C Review - 并发容器集合解析 Java Review - 并发组件ConcurrentHashMap使…

java-重要知识01

1. 各个语言的擅长点 C&#xff1a;几乎其他语言的全部功能 速度快 C 速度快 JAVA 大型web开发&#xff0c;手机安卓 本来有桌面开发&#xff0c;后来被C#挖人 GO 大型web开发 C# 中小型web&#xff0c;桌面程序开发 Python 数学处理&#xff0c;中小型网站 性…

【linux】Microsoft Edge 的 Bookmarks 文件存储位置

在 Linux 系统中&#xff0c;Microsoft Edge 的书签&#xff08;Bookmarks&#xff09;文件存储在用户的配置目录下。具体路径通常如下&#xff1a; ~/.config/microsoft-edge/Default/Bookmarks说明&#xff1a; 路径解释&#xff1a; ~ 表示当前用户的主目录。.config 是一个…

【开源】第三期:数字货币程序化交易终端开源

关于初衷&#xff1a; 这篇文章&#xff0c;其实应该在六年前发出来&#xff0c;但是受制于各种杂事和生活琐事&#xff0c;一直拖到现在&#xff0c;想必有朋友看到在"终端"那期里&#xff0c;聊到的数字货币交易的实践&#xff0c;那个时候遍地都是数字货币交易所&…

如何查看GB28181流媒体平台LiveGBS对GB28181视频数据的统计信息

LiveGBS流媒体平台GB/T28181常见问题-如何快速查看推流上来的摄像头并停止摄像头推流&#xff1f; 1、负载信息2、负载信息说明3、会话列表查看3.1、会话列表 4、停止会话5、搭建GB28181视频直播平台 1、负载信息 实时展示直播、回放、播放、录像、H265、级联等使用数目 2、负…

00 springboot项目创建

我们创建SpringBoot项目有两种方式: Spring Initializr spring initerzie 方式创建: 启动类, 依赖 生成,但是需要网络maven的方式 maven方式创建: 启动类, 依赖, 这些都需要手动编写,但是不需要网络 springboot系列&#xff0c;最近持续更新中&#xff0c;如需要请关注 如果…

网络安全 IP地址防泄漏指南

IP地址作为每个上网人的“门牌标识号”&#xff0c;如果产生泄露&#xff0c;可能会导致个人行踪曝光、数据被窃取甚至遭受网络攻击&#xff0c;要防止IP地址不被窃取&#xff0c;我们可以尝试以下方法&#xff1a; 利用专用网络加强隐私保护 通过加密在公共网络上创建一条安全…

Springboot 整合 Java DL4J 实现农产品质量检测系统

&#x1f9d1; 博主简介&#xff1a;历代文学网&#xff08;PC端可以访问&#xff1a;https://literature.sinhy.com/#/literature?__c1000&#xff0c;移动端可微信小程序搜索“历代文学”&#xff09;总架构师&#xff0c;15年工作经验&#xff0c;精通Java编程&#xff0c;…