元素码农
基础
UML建模
数据结构
算法
设计模式
网络
TCP/IP协议
HTTPS安全机制
WebSocket实时通信
数据库
sqlite
postgresql
clickhouse
后端
rust
go
java
php
mysql
redis
mongodb
etcd
nats
zincsearch
前端
浏览器
javascript
typescript
vue3
react
游戏
unity
unreal
C++
C#
Lua
App
android
ios
flutter
react-native
安全
Web安全
测试
软件测试
自动化测试 - Playwright
人工智能
Python
langChain
langGraph
运维
linux
docker
工具
git
svn
🌞
🌙
目录
▶
概述
NATS简介
应用场景分析
对比传统消息队列
▶
安装配置
Linux环境安装
Docker容器部署
配置文件详解
客户端选择指南
▶
核心概念
主题与消息结构
发布订阅模式
请求响应模式
持久化机制
服务质量级别
▶
实际操作
第一个NATS程序
消息收发演练
错误处理技巧
性能调优基础
▶
应用整合
Web服务集成
微服务通信
设备物联网方案
▶
监控维护
健康检查方法
日志分析指南
集群管理基础
发布时间:
2025-04-07 18:05
↑
☰
# 消息收发演练 本文将带你深入了解NATS的消息收发机制,通过实际演练帮助你掌握NATS的各种消息传递模式和技巧。 ## 消息传递基础 NATS提供了多种消息传递模式,包括: 1. **发布-订阅 (Pub-Sub)**:一对多的消息分发 2. **请求-响应 (Request-Reply)**:一对一的双向通信 3. **队列组 (Queue Groups)**:负载均衡的消息处理 让我们通过实际代码示例来演练这些模式。 ## 发布-订阅模式演练 ### 基本发布-订阅 发布-订阅是NATS最基本的消息模式,允许一个发布者向多个订阅者发送消息。 #### 多订阅者示例 首先,让我们创建一个发布者和多个订阅者: **发布者 (publisher.go)** ```go package main import ( "fmt" "log" "time" "github.com/nats-io/nats.go" ) func main() { nc, err := nats.Connect(nats.DefaultURL) if err != nil { log.Fatal(err) } defer nc.Close() // 每秒发布一条消息 for i := 1; i <= 5; i++ { message := fmt.Sprintf("消息 #%d", i) err = nc.Publish("updates", []byte(message)) if err != nil { log.Fatal(err) } fmt.Printf("已发布: %s\n", message) time.Sleep(1 * time.Second) } // 确保所有消息都已发送 nc.Flush() } ``` **订阅者1 (subscriber1.go)** ```go package main import ( "fmt" "log" "runtime" "github.com/nats-io/nats.go" ) func main() { nc, err := nats.Connect(nats.DefaultURL) if err != nil { log.Fatal(err) } defer nc.Close() // 订阅主题 _, err = nc.Subscribe("updates", func(msg *nats.Msg) { fmt.Printf("订阅者1收到: %s\n", string(msg.Data)) }) if err != nil { log.Fatal(err) } fmt.Println("订阅者1已启动,等待消息...") runtime.Goexit() } ``` **订阅者2 (subscriber2.go)** ```go package main import ( "fmt" "log" "runtime" "github.com/nats-io/nats.go" ) func main() { nc, err := nats.Connect(nats.DefaultURL) if err != nil { log.Fatal(err) } defer nc.Close() // 订阅主题 _, err = nc.Subscribe("updates", func(msg *nats.Msg) { fmt.Printf("订阅者2收到: %s\n", string(msg.Data)) }) if err != nil { log.Fatal(err) } fmt.Println("订阅者2已启动,等待消息...") runtime.Goexit() } ``` 运行这些程序,你会看到两个订阅者都能接收到所有发布的消息。 ### 主题通配符 NATS支持使用通配符进行主题匹配,这使得消息路由更加灵活: - `*` 匹配任何主题层级中的一个标记 - `>` 匹配任何剩余的标记 **通配符订阅示例** ```go package main import ( "fmt" "log" "runtime" "github.com/nats-io/nats.go" ) func main() { nc, err := nats.Connect(nats.DefaultURL) if err != nil { log.Fatal(err) } defer nc.Close() // 订阅所有天气更新 _, err = nc.Subscribe("weather.*", func(msg *nats.Msg) { fmt.Printf("收到天气更新 [%s]: %s\n", msg.Subject, string(msg.Data)) }) if err != nil { log.Fatal(err) } // 订阅所有市场数据 _, err = nc.Subscribe("market.>", func(msg *nats.Msg) { fmt.Printf("收到市场数据 [%s]: %s\n", msg.Subject, string(msg.Data)) }) if err != nil { log.Fatal(err) } fmt.Println("通配符订阅者已启动,等待消息...") runtime.Goexit() } ``` **发布到不同主题** ```go package main import ( "fmt" "log" "time" "github.com/nats-io/nats.go" ) func main() { nc, err := nats.Connect(nats.DefaultURL) if err != nil { log.Fatal(err) } defer nc.Close() // 发布到不同主题 subjects := []string{ "weather.beijing", "weather.shanghai", "market.stocks.tech", "market.forex.usd.eur", } for _, subject := range subjects { message := fmt.Sprintf("%s的最新数据", subject) err = nc.Publish(subject, []byte(message)) if err != nil { log.Fatal(err) } fmt.Printf("已发布到 %s\n", subject) time.Sleep(500 * time.Millisecond) } // 确保所有消息都已发送 nc.Flush() } ``` ## 队列组演练 队列组允许多个订阅者组成一个组,每条消息只会发送给组内的一个订阅者,实现负载均衡。 **队列组订阅者 (queue_worker.go)** ```go package main import ( "fmt" "log" "os" "runtime" "time" "github.com/nats-io/nats.go" ) func main() { // 获取工作者ID workerID := "1" if len(os.Args) > 1 { workerID = os.Args[1] } nc, err := nats.Connect(nats.DefaultURL) if err != nil { log.Fatal(err) } defer nc.Close() // 加入队列组 _, err = nc.QueueSubscribe("tasks", "workers", func(msg *nats.Msg) { fmt.Printf("工作者 %s 处理任务: %s\n", workerID, string(msg.Data)) // 模拟处理时间 time.Sleep(1 * time.Second) fmt.Printf("工作者 %s 完成任务: %s\n", workerID, string(msg.Data)) }) if err != nil { log.Fatal(err) } fmt.Printf("工作者 %s 已启动,等待任务...\n", workerID) runtime.Goexit() } ``` **任务发布者 (task_publisher.go)** ```go package main import ( "fmt" "log" "time" "github.com/nats-io/nats.go" ) func main() { nc, err := nats.Connect(nats.DefaultURL) if err != nil { log.Fatal(err) } defer nc.Close() // 发布10个任务 for i := 1; i <= 10; i++ { task := fmt.Sprintf("任务 #%d", i) err = nc.Publish("tasks", []byte(task)) if err != nil { log.Fatal(err) } fmt.Printf("已发布: %s\n", task) time.Sleep(200 * time.Millisecond) } // 确保所有消息都已发送 nc.Flush() } ``` 运行方法: 1. 启动多个工作者实例: ``` go run queue_worker.go 1 go run queue_worker.go 2 go run queue_worker.go 3 ``` 2. 运行任务发布者: ``` go run task_publisher.go ``` 你会看到任务被均匀分配给不同的工作者处理。 ## 请求-响应模式演练 请求-响应模式允许客户端发送请求并等待响应,适用于需要双向通信的场景。 ### 同步请求-响应 **服务提供者 (service_provider.go)** ```go package main import ( "fmt" "log" "runtime" "strconv" "strings" "github.com/nats-io/nats.go" ) func main() { nc, err := nats.Connect(nats.DefaultURL) if err != nil { log.Fatal(err) } defer nc.Close() // 提供计算服务 _, err = nc.Subscribe("service.calc", func(msg *nats.Msg) { fmt.Printf("收到计算请求: %s\n", string(msg.Data)) // 解析请求 parts := strings.Split(string(msg.Data), " ") if len(parts) != 3 { nc.Publish(msg.Reply, []byte("错误: 格式应为'数字 运算符 数字'")) return } a, err1 := strconv.ParseFloat(parts[0], 64) op := parts[1] b, err2 := strconv.ParseFloat(parts[2], 64) if err1 != nil || err2 != nil { nc.Publish(msg.Reply, []byte("错误: 无效的数字")) return } var result float64 var errMsg string switch op { case "+": result = a + b case "-": result = a - b case "*": result = a * b case "/": if b == 0 { errMsg = "错误: 除数不能为零" } else { result = a / b } default: errMsg = "错误: 不支持的运算符" } var response string if errMsg != "" { response = errMsg } else { response = fmt.Sprintf("%.2f", result) } // 发送响应 nc.Publish(msg.Reply, []byte(response)) fmt.Printf("已发送响应: %s\n", response) }) if err != nil { log.Fatal(err) } fmt.Println("计算服务已启动,等待请求...") runtime.Goexit() } ``` **客户端 (service_client.go)** ```go package main import ( "bufio" "fmt" "log" "os" "time" "github.com/nats-io/nats.go" ) func main() { nc, err := nats.Connect(nats.DefaultURL) if err != nil { log.Fatal(err) } defer nc.Close() fmt.Println("计算器客户端 (输入'退出'结束)") fmt.Println("格式: 数字 运算符 数字 (例如: 5 + 3)") scanner := bufio.NewScanner(os.Stdin) for { fmt.Print("> ") if !scanner.Scan() { break } input := scanner.Text() if input == "退出" { break } // 发送请求并等待响应 msg, err := nc.Request("service.calc", []byte(input), 2*time.Second) if err != nil { fmt.Printf("请求错误: %v\n", err) continue } fmt.Printf("结果: %s\n", string(msg.Data)) } } ``` ### 异步请求-响应 对于需要处理多个并发请求的场景,可以使用异步请求: **异步客户端 (async_client.go)** ```go package main import ( "fmt" "log" "sync" "time" "github.com/nats-io/nats.go" ) func main() { nc, err := nats.Connect(nats.DefaultURL) if err != nil { log.Fatal(err) } defer nc.Close() // 等待所有请求完成 var wg sync.WaitGroup // 发送多个并发请求 requests := []string{ "10 + 5", "20 - 8", "7 * 6", "100 / 4", } for i, req := range requests { wg.Add(1) // 发送异步请求 reqNum := i + 1 fmt.Printf("发送请求 #%d: %s\n", reqNum, req) nc.RequestWithContext(nc.Context(), "service.calc", []byte(req), func(msg *nats.Msg) { defer wg.Done() fmt.Printf("收到响应 #%d: %s\n", reqNum, string(msg.Data)) }) } // 等待所有响应 wg.Wait() fmt.Println("所有请求已完成") } ``` ## 消息持久性演练 NATS提供了JetStream功能,支持消息持久化。以下是一个简单的JetStream示例: **JetStream发布者 (jetstream_publisher.go)** ```go package main import ( "fmt" "log" "time" "github.com/nats-io/nats.go" ) func main() { nc, err := nats.Connect(nats.DefaultURL) if err != nil { log.Fatal(err) } defer nc.Close() // 创建JetStream上下文 js, err := nc.JetStream() if err != nil { log.Fatal(err) } // 创建或获取流 stream := "ORDERS" subject := "orders.received" _, err = js.AddStream(&nats.StreamConfig{ Name: stream, Subjects: []string{subject}, }) if err != nil { log.Fatal(err) } // 发布消息到JetStream for i := 1; i <= 5; i++ { order := fmt.Sprintf("订单 #%d", i) // 使用JetStream发布 ack, err := js.Publish(subject, []byte(order)) if err != nil { log.Fatal(err) } fmt.Printf("已发布: %s, 序列号: %d\n", order, ack.Sequence) time.Sleep(500 * time.Millisecond) } } ``` **JetStream订阅者 (jetstream_subscriber.go)** ```go package main import ( "fmt" "log" "runtime" "github.com/nats-io/nats.go" ) func main() { nc, err := nats.Connect(nats.DefaultURL) if err != nil { log.Fatal(err) } defer nc.Close() // 创建JetStream上下文 js, err := nc.JetStream() if err != nil { log.Fatal(err) } // 创建持久化订阅 sub, err := js.PullSubscribe( "orders.received", "order-processor", // 消费者名称 nats.AckExplicit(), // 显式确认 ) if err != nil { log.Fatal(err) } fmt.Println("JetStream订阅者已启动,拉取消息...") for { // 拉取最多10条消息,等待最多5秒 msgs, err := sub.Fetch(10, nats.MaxWait(5*time.Second)) if err != nil { if err == nats.ErrTimeout { continue // 超时,继续尝试 } log.Printf("拉取错误: %v\n", err) continue } for _, msg := range msgs { fmt.Printf("收到: %s, 序列号: %d\n", string(msg.Data), msg.Sequence) // 处理消息... // 确认消息 msg.Ack() } } } ``` ## 高级主题:消息过滤和重传 ### 使用消息头部 NATS支持消息头部,可以用于传递元数据: ```go // 发送带头部的消息 headers := nats.Header{} headers.Add("X-Priority", "High") headers.Add("X-Source", "Backend") nc.PublishMsg(&nats.Msg{ Subject: "notifications", Data: []byte("重要通知"), Header: headers, }) // 接收并处理头部 sub, _ := nc.Subscribe("notifications", func(msg *nats.Msg) { if msg.Header != nil { priority := msg.Header.Get("X-Priority") source := msg.Header.Get("X-Source") fmt.Printf("收到来自 %s 的 %s 优先级消息: %s\n", source, priority, string(msg.Data)) } }) ``` ### 消息重传和确认 JetStream支持消息重传和确认机制: ```go // 设置消息重传策略 sub, _ := js.PullSubscribe( "orders.received", "order-processor", nats.AckExplicit(), // 显式确认 nats.AckWait(30*time.Second), // 30秒内未确认则重传 nats.MaxDeliver(3), // 最多重传3次 ) // 处理消息并确认 msgs, _ := sub.Fetch(1) for _, msg := range msgs { fmt.Printf("收到: %s, 尝试次数: %d\n", string(msg.Data), msg.Metadata.NumDelivered) // 根据处理结果选择不同的确认方式 if processSuccessfully(msg) { msg.Ack() // 成功处理 } else if isTemporaryError() { msg.Nak() // 稍后重试 } else { msg.Term() // 终止处理,不再重试 } } ``` ## 总结 通过本文的消息收发演练,你已经学习了: 1. **基本的发布-订阅模式**:一对多的消息分发 2. **主题通配符**:灵活的消息路由 3. **队列组**:负载均衡的消息处理 4. **请求-响应模式**:同步和异步的双向通信 5. **消息持久性**:使用JetStream进行消息持久化 6. **高级特性**:消息头部、重传和确认机制 这些模式和技巧将帮助你构建灵活、高效的分布式系统。根据你的应用需求,选择合适的消息传递模式,并利用NATS提供的丰富功能来优化你的系统架构。 ## 下一步 - 学习[错误处理技巧](/article/nats/practice/error-handling)以构建更健壮的应用程序 - 了解[性能调优基础](/article/nats/practice/performance-tuning)以优化你的NATS应用程序 - 探索[Web服务集成](/article/nats/integration/web-service)和[微服务通信](/article/nats/integration/microservices)的实际应用