尚硅谷rabbitmq 2024 流式队列2024指定偏移量 第55节答疑

rabbitmq的stream:
4、对比
autoTrackingstrategy方式:始终监听Stream中的新消息(狗狗看家,忠于职守)指定偏移量方式:针对指定偏移量的消息消费之后就停止(狗狗叼飞盘,回来就完)
这两种分别怎么写?java


在 RabbitMQ 中,Stream 是一种新的消息传递机制,允许高吞吐量和低延迟的消息传递。你提到的两种消费方式分别是自动跟踪策略(Auto Tracking Strategy)和指定偏移量(Offset)的消费方式。下面我们将分别演示这两种方式的 Java 实现。

### 1. 自动跟踪策略(Auto Tracking Strategy)

这种方式下,消费者会一直监听 Stream 中的新消息,并不断处理这些消息。

#### 示例代码

```java
import com.rabbitmq.stream.Consumer;
import com.rabbitmq.stream.Environment;
import com.rabbitmq.stream.Message;
import com.rabbitmq.stream.OffsetSpecification;
import com.rabbitmq.stream.ConsumerBuilder;

public class AutoTrackingConsumer {
    public static void main(String[] args) {
        Environment environment = Environment.builder().host("localhost").build();

        String streamName = "test_stream";

        // 创建消费者
        Consumer consumer = environment.consumerBuilder()
            .stream(streamName)
            .offset(OffsetSpecification.first()) // 从第一个消息开始
            .messageHandler((context, message) -> {
                String body = new String(message.getBodyAsBinary());
                System.out.println("Received message: " + body);
            })
            .build();

        // 消费者会一直运行,监听新的消息
        System.out.println("Consumer is listening for new messages...");
    }
}
```

### 2. 指定偏移量(Offset)的消费方式

这种方式下,消费者会从指定的偏移量开始消费消息,并在消费完这些消息后停止。

#### 示例代码

```java
import com.rabbitmq.stream.Consumer;
import com.rabbitmq.stream.Environment;
import com.rabbitmq.stream.Message;
import com.rabbitmq.stream.OffsetSpecification;
import com.rabbitmq.stream.ConsumerBuilder;

public class OffsetConsumer {
    public static void main(String[] args) {
        Environment environment = Environment.builder().host("localhost").build();

        String streamName = "test_stream";
        long offset = 10; // 指定偏移量

        // 创建消费者
        Consumer consumer = environment.consumerBuilder()
            .stream(streamName)
            .offset(OffsetSpecification.offset(offset)) // 从指定偏移量开始
            .messageHandler((context, message


============
为什么没写.autoTrackingStrategy()?

你提到的 `.autoTrackingStrategy()` 是 RabbitMQ Stream API 中的一个重要方法,它用于指定消费者的自动跟踪策略。让我们详细解释一下如何在代码中使用这个方法。

### 自动跟踪策略(Auto Tracking Strategy)

自动跟踪策略允许消费者始终监听 Stream 中的新消息,并自动跟踪消息的偏移量。这种方式下,消费者会持续消费新消息,而不会停止。

#### 示例代码

```java
import com.rabbitmq.stream.Consumer;
import com.rabbitmq.stream.Environment;
import com.rabbitmq.stream.Message;
import com.rabbitmq.stream.OffsetSpecification;
import com.rabbitmq.stream.ConsumerBuilder;

public class AutoTrackingConsumer {
    public static void main(String[] args) {
        Environment environment = Environment.builder().host("localhost").build();

        String streamName = "test_stream";

        // 创建消费者
        Consumer consumer = environment.consumerBuilder()
            .stream(streamName)
            .offset(OffsetSpecification.first()) // 从第一个消息开始
            .autoTrackingStrategy() // 启用自动跟踪策略
            .messageHandler((context, message) -> {
                String body = new String(message.getBodyAsBinary());
                System.out.println("Received message: " + body);
            })
            .build();

        // 消费者会一直运行,监听新的消息
        System.out.println("Consumer is listening for new messages...");
    }
}
```

### 指定偏移量(Offset)的消费方式

这种方式下,消费者会从指定的偏移量开始消费消息,并在消费完这些消息后停止。

#### 示例代码

```java
import com.rabbitmq.stream.Consumer;
import com.rabbitmq.stream.Environment;
import com.rabbitmq.stream.Message;
import com.rabbitmq.stream.OffsetSpecification;
import com.rabbitmq.stream.ConsumerBuilder;

public class OffsetConsumer {
    public static void main(String[] args) {
        Environment environment = Environment.builder().host("localhost").build();

        String streamName = "test_stream";
        long offset = 10; // 指定偏移量

        // 创建消费者
        Consumer consumer = environment.consumerBuilder()
            .stream(streamName)
            .offset(OffsetSpecification.offset(offset)) // 从指定偏移量开始
            .messageHandler((context, message) -> {


============
            .offset(OffsetSpecification.first()) // 从第一个消息开始
            .autoTrackingStrategy() /
-----这里指定了 .autoTrackingStrategy() /就不用指定.offset(OffsetSpecification.first())了吧?


确实,如果你使用 `.autoTrackingStrategy()`,那么你不需要再指定 `.offset(OffsetSpecification.first())`,因为自动跟踪策略会自动从最新的消息开始消费并持续监听新的消息。让我们分别看看两种方式的完整代码示例。

### 自动跟踪策略(Auto Tracking Strategy)

这种方式下,消费者会自动从最新的消息开始消费,并持续监听新的消息。

#### 示例代码

```java
import com.rabbitmq.stream.Consumer;
import com.rabbitmq.stream.Environment;
import com.rabbitmq.stream.Message;
import com.rabbitmq.stream.ConsumerBuilder;

public class AutoTrackingConsumer {
    public static void main(String[] args) {
        Environment environment = Environment.builder().host("localhost").build();

        String streamName = "test_stream";

        // 创建消费者
        Consumer consumer = environment.consumerBuilder()
            .stream(streamName)
            .autoTrackingStrategy() // 启用自动跟踪策略
            .messageHandler((context, message) -> {
                String body = new String(message.getBodyAsBinary());
                System.out.println("Received message: " + body);
            })
            .build();

        // 消费者会一直运行,监听新的消息
        System.out.println("Consumer is listening for new messages...");
    }
}
```

### 指定偏移量(Offset)的消费方式

这种方式下,消费者会从指定的偏移量开始消费消息,并在消费完这些消息后停止。

#### 示例代码

```java
import com.rabbitmq.stream.Consumer;
import com.rabbitmq.stream.Environment;
import com.rabbitmq.stream.Message;
import com.rabbitmq.stream.OffsetSpecification;
import com.rabbitmq.stream.ConsumerBuilder;

public class OffsetConsumer {
    public static void main(String[] args) {
        Environment environment = Environment.builder().host("localhost").build();

        String streamName = "test_stream";
        long offset = 10; // 指定偏移量

        // 创建消费者
        Consumer consumer = environment.consumerBuilder()
            .stream(streamName)
            .offset(OffsetSpecification.offset(offset)) // 从指定偏移量开始
            .messageHandler((context, message) -> {
                String body = new String(message.getBodyAsBinary());
                System.out.println("Received message: " + body);
           


============
那里之前为什么要写:            .offset(OffsetSpecification.first()) // 从第一个消息开始
            .autoTrackingStrategy() /

你说得对,如果你使用 `.autoTrackingStrategy()`,就不需要再指定 `.offset(OffsetSpecification.first())`。这是一个错误的示例。使用 `.autoTrackingStrategy()` 时,消费者会自动从最新的消息开始消费,并持续监听新的消息。

让我们修正并简化这两种方式的示例代码:

### 自动跟踪策略(Auto Tracking Strategy)

这种方式下,消费者会自动从最新的消息开始消费,并持续监听新的消息。

#### 示例代码

```java
import com.rabbitmq.stream.Consumer;
import com.rabbitmq.stream.Environment;
import com.rabbitmq.stream.Message;
import com.rabbitmq.stream.ConsumerBuilder;

public class AutoTrackingConsumer {
    public static void main(String[] args) {
        Environment environment = Environment.builder().host("localhost").build();

        String streamName = "test_stream";

        // 创建消费者
        Consumer consumer = environment.consumerBuilder()
            .stream(streamName)
            .autoTrackingStrategy() // 启用自动跟踪策略
            .messageHandler((context, message) -> {
                String body = new String(message.getBodyAsBinary());
                System.out.println("Received message: " + body);
            })
            .build();

        // 消费者会一直运行,监听新的消息
        System.out.println("Consumer is listening for new messages...");
    }
}
```

### 指定偏移量(Offset)的消费方式

这种方式下,消费者会从指定的偏移量开始消费消息,并在消费完这些消息后停止。

#### 示例代码

```java
import com.rabbitmq.stream.Consumer;
import com.rabbitmq.stream.Environment;
import com.rabbitmq.stream.Message;
import com.rabbitmq.stream.OffsetSpecification;
import com.rabbitmq.stream.ConsumerBuilder;

public class OffsetConsumer {
    public static void main(String[] args) {
        Environment environment = Environment.builder().host("localhost").build();

        String streamName = "test_stream";
        long offset = 10; // 指定偏移量

        // 创建消费者
        Consumer consumer = environment.consumerBuilder()
            .stream(streamName)
            .offset(OffsetSpecification.offset(offset)) // 从指定偏移量开始
            .messageHandler((context, message) -> {
                String body = new String(message.getBodyAs

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/890268.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Redis主从复制机制详解

目录 一、主从复制介绍二、搭建主从复制三、主从复制流程四、关于Replication ID五、主从复制核心知识六、主从复制应用场景七、主从复制的注意事项八、读写分离实战 一、主从复制介绍 1、什么是主从复制? 2、为什么要使用主从复制? redis-server单点…

谷歌浏览器 文件下载提示网络错误

情况描述: 谷歌版本:129.0.6668.90 (正式版本) (64 位) (cohort: Control)其他浏览器,比如火狐没有问题,但是谷歌会下载失败,故推断为谷歌浏览器导致的问题小文件比如1、2M会成功,大…

【问题分析】使用gperftools分析排查内存问题

背景 当程序长时间允许时(压测、服务器程序),就会面临更大的挑战,其中内存泄漏就是一类典型的问题,内存泄漏往往不易发现,导致的现象更是千奇百怪,本文主要介绍如何借助gperftools分析一个模块的内存泄漏 案例代码 …

yum仓库安装rabbitmq

yum仓库安装rabbitmq 1、配置yum仓库 vim /etc/yum.repos.d/rabbitmq.repo # In /etc/yum.repos.d/rabbitmq.repo## ## Zero dependency Erlang ##[rabbitmq_erlang] namerabbitmq_erlang baseurlhttps://packagecloud.io/rabbitmq/erlang/el/7/$basearch repo_gpgcheck1 gpg…

软件工程:需求规格说明书(图书管理系统)

目录 1 导言 1.1 编写目的 1.2 参考资料 2 项目介绍 2.1 项目背景 2.2 项目目标 3 应用环境 3.1 系统运行网络环境 ​编辑 3.2 系统软硬件环境 4 功能模型 4.1 功能角色分析 4.1.1 图书管理员 4.1.2 普通读者 4.1.3 邮件系统 4.2 功能性需求 4.2.1 预定图…

【一步步开发AI运动小程序】二十、AI运动小程序如何适配相机全屏模式?

引言 受小程序camera组件预览和抽帧图像不一致的特性影响,一直未全功能支持全屏模式,详见本系列文件第四节小程序如何抽帧;随着插件在云上赛事、健身锻炼、AI体测、AR互动场景的深入应用,各开发者迫切的希望能在全屏模式下应用&am…

Excel中Ctrl+e的用法

重点:想要使用ctrle,前提是整合或拆分后的结果放置的单元格必须和被提取信息的单元格相邻,且被提取信息的单元格也必须相连。 下图为错误示例 这样则可以使用ctrle 1、信息整合 2、提取信息 3、添加符号 4、信息顺序调换 5、数字提取 crtle还…

Vue3 + Element plus 实现切换el-radio前二次确认

Vue3 Element plus 实现切换el-radio前二次确认 场景:点击切换el-radio之前判断当前内容是否有改变,如有改变弹窗提示切换el-radio将销毁操作,弹窗二次确认是否切换 问题: el-radio 没有提供类似于beforeUpdate这样的钩子去处理这…

手写mybatis之细化XML语句构建器,完善静态SQL解析

前言 1:在流程上,通过 DefaultSqlSession#selectOne 方法调用执行器,并通过预处理语句处理器 PreparedStatementHandler 执行参数设置和结果查询。 2:那么这个流程中我们所处理的参数信息,也就是每个 SQL 执行时&#…

基于yolov10的芒果成熟度检测系统,支持图像、视频和摄像实时检测【pytorch框架、python】

更多目标检测和图像分类识别项目可看我主页其他文章 功能演示: yolov10,芒果成熟度检测系统,支持图像、视频和摄像实时检测【pytorch框架、python】_哔哩哔哩_bilibili (一)简介 基于yolov10的芒果成熟度检测系统是…

npm install报错一堆sass gyp ERR!

执行npm install ,出现一堆gyp含有sass错误的情况下。 解决办法: 首页可能是node版本问题,太高或者太低,也会导致npm install安装错误(不会自动生成node_modules文件),本次试验,刚开…

【JavaEE】——初始网络原理

阿华代码,不是逆风,就是我疯 你们的点赞收藏是我前进最大的动力!! 希望本文内容能够帮助到你!! 目录 一:局域网 1:概念 二:局域网的连接方式 1:网线直连 …

flask项目框架搭建

目录结构 blueprints python包,蓝图文件,相当于路由组的概念,方便模块化开发 例如auth.py文件 from flask import Blueprint, render_templatebp Blueprint("auth", __name__, url_prefix"/auth")bp.route("/login") d…

空间解析几何3-空间点到线段和平面的距离【附MATLAB代码】

目录 空间中点到线段的距离 空间中点到平面的投影和距离 matlab代码 空间中点到线段的距离 空间中点到平面的投影和距离 matlab代码 function [dis,P2,t] point2Line (A1,B1,C1) %求空间一点到一线段的最短距离 %[dis,P2,Q2]pointSegmentDistance(A,B,C) %A B为线段首末端…

问卷调查毕设计算机毕业设计投票系统SpringBootSSM框架

目录 一、引言‌ ‌二、需求分析‌ 用户角色‌: ‌功能需求‌: ‌非功能需求‌: ‌三、系统设计‌ ‌技术选型‌: ‌数据库设计‌: ‌界面设计‌: ‌四、实现步骤‌ ‌后端实现‌: …

蓝桥杯【物联网】零基础到国奖之路:十八. 扩展模块之光敏和AS312

蓝桥杯【物联网】零基础到国奖之路:十八.扩展模块之光敏和AS312 第一节 硬件解读第二节 CubeMX配置第二节 代码 第一节 硬件解读 光敏和AS312如下图: 光敏电阻接到了扩展模块的5号引脚,5号引脚接了2个电阻,R8和光敏电阻。我们通过ADC读取这…

vue+ElementUI—实现基础后台管理布局(sideBar+header+appMain)(附源码)

后台管理的模板很多,vue本身就提供了完整的vue-template-admin,vue-admin-beautiful等后台管理系统化框架,但是这些框架正是因为成体系而显得繁重。假如你想搭建一个静态的后台管理模板页面和几个单独的菜单页面,直接就上框架是否…

维生素对于生活的重要性

在探索健康奥秘的旅途中,维生素作为人体不可或缺的微量营养素,扮演着至关重要的角色。它们虽不直接提供能量,却是酶促反应、细胞代谢、免疫功能乃至心理健康的基石。今天,让我们一同深入探讨人体所需补充的维生素,这些…

VSCode 使用 EmmyLua 对lua进行调试

时间:2024年10月 其他:win10,EmmyLua v0.8.20 参考:https://blog.csdn.net/ShenHaoDeHao/article/details/140268354 有几个概念搞清楚就好理解了。一般开发中,我们编写的lua文件由宿主程序的来解析、执行&#xff1…

【计算机网络 - 基础问题】每日 3 题(三十九)

✍个人博客:https://blog.csdn.net/Newin2020?typeblog 📣专栏地址:http://t.csdnimg.cn/fYaBd 📚专栏简介:在这个专栏中,我将会分享 C 面试中常见的面试题给大家~ ❤️如果有收获的话,欢迎点赞…