在本文中,我们使用两个示例学习一下go中的rpc的基本使用。这是一个非常经典的 RPC 应用场景:Master/Worker,其中Master负责分发任务,Worker负责接收并执行任务,然后返回结果。
Push模式
这个例子可以理解为:
Master有很多的任务要做,但是不可能自己去做,他要检查自己有哪些Worker,然后并发调用Worker的RPC函数得到结果,仿佛是自己完成了所有工作。
- 项目结构
1 2 3 4 5 6 7 8
| rpc-master-worker/ ├── go.mod ├── master/ │ └── main.go ├── shared/ │ └── common.go └── worker/ └── main.go
|
- 共享数据结构(shared/common.go)
为了让 Master 和 Worker 能够共享数据结构(如任务参数和回复),我们创建一个 shared 包。
这是 Master 和 Worker 之间的“契约”。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| package shared
import "fmt"
type TaskArgs struct { TaskID int TaskData string }
type TaskReply struct { WorkerID int ResultData string }
const WorkerServiceName = "WorkerService"
func (t *TaskArgs) String() string { return fmt.Sprintf("Task(ID: %d, Data: '%s')", t.TaskID, t.TaskData) }
func (r *TaskReply) String() string { return fmt.Sprintf("Reply(Worker: %d, Result: '%s')", r.WorkerID, r.ResultData) }
|
- 服务端(worker/main.go)
Worker 是这个例子中的 RPC 服务端。它注册一个 WorkerService,实现一个 DoTask 方法,然后在指定的端口上监听来自 Master 的连接。
我们将使用 flag 包允许从命令行指定每个 Worker 的 ID 和端口,这样我们就可以轻松地启动多个 Worker。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
| package main
import ( "flag" "fmt" "log" "net" "net/rpc" "rpc-master-worker/shared" "strings" )
type WorkerService struct { ID int }
func (w *WorkerService) DoTask(args *shared.TaskArgs, reply *shared.TaskReply) error { log.Printf("Worker %d: 接收到任务 %s\n", w.ID, args)
result := strings.ToUpper(args.TaskData)
reply.WorkerID = w.ID reply.ResultData = result
log.Printf("Worker %d: 完成任务 %d, 结果: %s\n", w.ID, args.TaskID, result) return nil }
func main() { workerID := flag.Int("id", 0, "Worker ID") port := flag.String("port", ":9001", "Worker 监听的端口 (例如 :9001)") flag.Parse()
workerService := &WorkerService{ ID: *workerID, }
err := rpc.RegisterName(shared.WorkerServiceName, workerService) if err != nil { log.Fatalf("Worker %d: 注册服务失败: %v", *workerID, err) }
listener, err := net.Listen("tcp", *port) if err != nil { log.Fatalf("Worker %d: 监听 %s 失败: %v", *workerID, *port, err) } defer listener.Close()
log.Printf("Worker %d: 正在 %s 上监听 RPC 请求...\n", *workerID, *port)
rpc.Accept(listener) }
|
- 客户端(master/main.go)
Master 是这个例子中的 RPC 客户端。它持有一组任务和一个 Worker 地址列表(理解为我需要使用的服务们)。
它将并发地(使用 goroutine 和 WaitGroup)将任务分发给 Worker,并收集所有的结果。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76
| package main
import ( "log" "net/rpc" "rpc-master-worker/shared" "sync" "time" )
func main() { log.Println("Master 启动...")
workerAddresses := []string{ "localhost:9001", "localhost:9002", }
tasks := []*shared.TaskArgs{ {TaskID: 1, TaskData: "hello world"}, {TaskID: 2, TaskData: "go rpc is simple"}, {TaskID: 3, TaskData: "learn by example"}, {TaskID: 4, TaskData: "concurrency is fun"}, {TaskID: 5, TaskData: "master worker pattern"}, }
var wg sync.WaitGroup numWorkers := len(workerAddresses)
for i, task := range tasks { wg.Add(1)
workerAddr := workerAddresses[i%numWorkers] go func(task *shared.TaskArgs, addr string) { defer wg.Done()
client, err := rpc.Dial("tcp", addr) if err != nil { log.Printf("Master: 连接 Worker %s 失败: %v", addr, err) return } defer client.Close()
var reply shared.TaskReply
log.Printf("Master: 分发任务 %s 到 Worker %s\n", task, addr)
err = client.Call(shared.WorkerServiceName+".DoTask", task, &reply) if err != nil { log.Printf("Master: 调用 Worker %s 失败: %v", addr, err) return }
log.Printf("Master: 收到结果: %s\n", &reply)
}(task, workerAddr) time.Sleep(100 * time.Millisecond) }
wg.Wait() log.Println("Master: 所有任务已完成。") }
|
- 运行
打开 3 个终端窗口。
1 2
| cd rpc-master-worker go run . -id=1 -port=:9001
|
1 2
| cd rpc-master-worker go run . -id=2 -port=:9002
|
等待两个 Worker 都启动后,运行 Master:
1 2
| cd rpc-master-worker go run .
|
Pull模式
这个例子可以理解为:
Master有很多的任务要做,但是不可能自己去做,他的Worker会不停的询问他有没有待完成的工作,如果有请求+工作,Master就会把工作分发出去,否则阻塞工作进程。另外,我们再加入一点容错机制,例如超时机制(一个worker领取了任务但是没有及时完成,则任务会被重置等待新的Worker来领取)。
- 项目结构
和之前一样
- 共享数据结构(shared/common.go)
依旧定义一些约定
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
| package shared
import ( "fmt" "time" )
const MasterServiceName = "MasterService"
const MasterTaskTimeout = 3 * time.Second
type RequestTaskArgs struct { WorkerID string }
type Task struct { ID int Data string }
type RequestTaskReply struct { Task *Task NoMoreTasks bool }
type SubmitTaskArgs struct { WorkerID string TaskID int ResultData string }
type SubmitTaskReply struct { Acknowledged bool }
func (t *Task) String() string { return fmt.Sprintf("Task(ID: %d, Data: '%s')", t.ID, t.Data) }
func (t *Task) Process() string { fmt.Printf("... 正在处理任务 %d ...\n", t.ID) time.Sleep(1 * time.Second) return fmt.Sprintf("Processed data from task %d", t.ID) }
|
- 客户端(Worker/main.go)
Worker 现在是客户端。它在一个无限循环中工作(RPC注册/连接后):请求 -> 处理 -> 提交。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119
| package main
import ( "flag" "fmt" "io" "log" "net/rpc" "rpc/shared" "time" )
type Worker struct { ID string MasterAddr string client *rpc.Client }
func NewWorker(id, masterAddr string) *Worker { return &Worker{ ID: id, MasterAddr: masterAddr, } }
func (w *Worker) connect() error { client, err := rpc.Dial("tcp", w.MasterAddr) if err != nil { return fmt.Errorf("连接 Master (%s) 失败: %v", w.MasterAddr, err) } log.Printf("Worker %s: 已连接到 Master %s\n", w.ID, w.MasterAddr) w.client = client return nil }
func (w *Worker) Run() { if err := w.connect(); err != nil { log.Fatalf(err.Error()) } defer w.client.Close()
log.Printf("Worker %s: 开始请求任务...\n", w.ID)
for { reqArgs := &shared.RequestTaskArgs{WorkerID: w.ID} reply := &shared.RequestTaskReply{}
err := w.client.Call(shared.MasterServiceName+".RequestTask", reqArgs, &reply)
if err != nil { if err == io.EOF || err == rpc.ErrShutdown { log.Printf("Worker %s: 与 Master 的连接已断开。退出。\n", w.ID) return } log.Printf("Worker %s: 请求任务失败: %v. 正在重试...\n", w.ID, err) time.Sleep(1 * time.Second) continue }
if reply.NoMoreTasks { log.Printf("Worker %s: Master 通知所有任务已完成。退出。\n", w.ID) return }
if reply.Task == nil { log.Printf("Worker %s: 暂时没有任务,1秒后重试...\n", w.ID) time.Sleep(1 * time.Second) continue }
task := reply.Task log.Printf("Worker %s: 领到任务 %s\n", w.ID, task)
if task.ID == 3 && w.ID == "w01" { log.Printf("Worker %s: !!! 模拟处理任务 3 超时 !!!", w.ID) time.Sleep(shared.MasterTaskTimeout + 2*time.Second) }
resultData := task.Process()
submitArgs := &shared.SubmitTaskArgs{ WorkerID: w.ID, TaskID: task.ID, ResultData: resultData, } submitReply := &shared.SubmitTaskReply{}
err = w.client.Call(shared.MasterServiceName+".SubmitTaskResult", submitArgs, &submitReply) if err != nil { log.Printf("Worker %s: 提交任务 %d 结果失败: %v\n", w.ID, task.ID, err) } else if submitReply.Acknowledged { log.Printf("Worker %s: 成功提交任务 %d 结果\n", w.ID, task.ID) } } }
func main() { workerID := flag.String("id", "worker-01", "Worker 的唯一ID") masterAddr := flag.String("master", "localhost:9000", "Master 的地址") flag.Parse()
worker := NewWorker(*workerID, *masterAddr) worker.Run() }
|
- 服务端(Master/main.go)
Master 现在是有状态的服务端。它需要维护任务队列,并处理并发(因为多个 Worker 会同时请求)。我们将使用 Mutex (互斥锁) 来保护任务状态。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233
| package main
import ( "fmt" "log" "net" "net/rpc" "rpc/shared" "sync" "time" )
type TaskStatus int
const ( StatusIdle TaskStatus = 0 StatusInProgress TaskStatus = 1 StatusCompleted TaskStatus = 2 )
type MasterTask struct { Task *shared.Task Status TaskStatus WorkerID string StartTime time.Time }
type Master struct { mu sync.Mutex tasks map[int]*MasterTask taskQueue chan int totalTasks int completedTasks int }
func NewMaster() *Master { m := &Master{ tasks: make(map[int]*MasterTask), taskQueue: make(chan int, 100), }
m.initTasks() m.totalTasks = len(m.tasks) log.Printf("Master 初始化完成,总共有 %d 个任务待处理。\n", m.totalTasks)
go m.runScanner()
return m }
func (m *Master) runScanner() { ticker := time.NewTicker(1 * time.Second) defer ticker.Stop()
for range ticker.C { m.mu.Lock()
if m.totalTasks == m.completedTasks { m.mu.Unlock() return }
log.Println("Scanner: 正在扫描超时任务...")
var tasksToRequeue []int for taskID, masterTask := range m.tasks { if masterTask.Status == StatusInProgress && time.Since(masterTask.StartTime) > shared.MasterTaskTimeout {
log.Printf("Master: 任务 %d 在 Worker %s 上已超时! 准备重新排队。", taskID, masterTask.WorkerID)
masterTask.Status = StatusIdle masterTask.WorkerID = "" tasksToRequeue = append(tasksToRequeue, taskID) } }
m.mu.Unlock()
for _, taskID := range tasksToRequeue { m.taskQueue <- taskID } } }
func (m *Master) initTasks() { tasksData := []string{ "task data 1", "task data 2", "task data 3", "task data 4", "task data 5", } for i, data := range tasksData { taskID := i + 1 m.tasks[taskID] = &MasterTask{ Task: &shared.Task{ID: taskID, Data: data}, Status: StatusIdle, } m.taskQueue <- taskID } }
func (m *Master) RequestTask(args *shared.RequestTaskArgs, reply *shared.RequestTaskReply) error { taskID, ok := <-m.taskQueue if !ok { reply.NoMoreTasks = true return nil }
m.mu.Lock() defer m.mu.Unlock()
masterTask, exists := m.tasks[taskID] if !exists { return fmt.Errorf("内部错误:任务ID %d 不存在", taskID) }
masterTask.Status = StatusInProgress masterTask.WorkerID = args.WorkerID masterTask.StartTime = time.Now()
reply.Task = masterTask.Task reply.NoMoreTasks = false
log.Printf("Master: 分配任务 %d 给 Worker %s\n", taskID, args.WorkerID) return nil }
func (m *Master) SubmitTaskResult(args *shared.SubmitTaskArgs, reply *shared.SubmitTaskReply) error { m.mu.Lock() defer m.mu.Unlock()
taskID := args.TaskID masterTask, exists := m.tasks[taskID]
if !exists { return fmt.Errorf("任务 %d 不存在", taskID) }
if masterTask.Status == StatusIdle { log.Printf("Master: 收到任务 %d 的【过时】提交 (来自 %s),任务已超时并正在等待重新分配。忽略。", taskID, args.WorkerID) reply.Acknowledged = true return nil }
if masterTask.Status == StatusInProgress && masterTask.WorkerID != args.WorkerID { log.Printf("Master: 收到 Worker %s 对任务 %d 的【过时】提交 (任务现分配给 %s)。忽略此结果。", args.WorkerID, taskID, masterTask.WorkerID) reply.Acknowledged = true return nil }
if masterTask.Status == StatusCompleted { log.Printf("Master: 收到任务 %d 的【重复】提交 (来自 %s),任务已由 %s 完成。忽略。", taskID, args.WorkerID, masterTask.WorkerID) reply.Acknowledged = true return nil }
masterTask.Status = StatusCompleted masterTask.WorkerID = args.WorkerID m.completedTasks++
log.Printf("Master: 收到 Worker %s 对任务 %d 的结果: '%s'\n", args.WorkerID, taskID, args.ResultData)
if m.completedTasks == m.totalTasks { log.Println("--- 所有任务已完成!即将关闭任务队列。 ---") close(m.taskQueue) }
reply.Acknowledged = true return nil }
func main() { master := NewMaster()
err := rpc.RegisterName(shared.MasterServiceName, master) if err != nil { log.Fatalf("注册 Master 服务失败: %v", err) }
port := ":9000" listener, err := net.Listen("tcp", port) if err != nil { log.Fatalf("监听 %s 失败: %v", port, err) } defer listener.Close()
log.Printf("Master 服务端正在 %s 上监听...", port)
rpc.Accept(listener) }
|
谁来管理状态?
这是一个经典问题。答案是:“尽可能地将状态推给服务端(接收方),让客户端保持无状态。”
我们来分析不同的模式:
模式一:服务端管理状态(Stateful Server)- 我们的 Pull 模式
谁管理? 接收方(Master)。
状态:Master 维护着所有任务的完整状态(空闲、处理中、已完成)、哪个 Worker 在处理、什么时候开始的。
客户端(Worker):完全无状态。它不记得自己做过什么,它只知道“请求 -> 执行 -> 提交”。如果一个 Worker 崩溃重启了,它会像一个新 Worker 一样简单地重新加入工作池,开始请求任务。
实例:
我们的 Master/Worker:这是最佳实践。
数据库:数据库(服务端)管理所有数据状态,客户端只是发出 CRUD 请求。
Web Sessions (服务端存储):用户登录后,服务器创建一个 Session ID,并将用户信息(如购物车)存储在服务器的内存或 Redis 中。
优点:
缺点:
服务端成为瓶颈:Master 成了“单点”,所有人都来问它要任务,它的并发和容错能力(SPOF - Single Point of Failure)要求极高。
模式二:客户端管理状态(Stateless Server)- 我们的 Push 模式(第一个例子)
谁管理? 请求方(Master)。
状态:Master(请求方)维护着任务列表,以及 Worker 的地址列表。它需要自己决定“任务1给Worker A,任务2给Worker B”。
服务端(Worker):无状态。它不记得 Master,它只是被动地接收一个请求,处理,然后返回。
实例:
我们的第一个 Push 示例。
RESTful API (如 JWT 认证):
Server (API) 是无状态的。它不记得你是否登录。
Client (App) 必须管理自己的状态(即保存 JWT Token),并在每一次请求时,都主动在 Header 中带上 Authorization: Bearer 来证明自己的身份。
函数计算 (Serverless):云函数本身是无状态的,它被调用时,需要调用方(或事件源)提供所有必需的上下文数据。
优点:
服务端(Worker)极易扩展:因为 Worker 是无状态的,你可以轻松地从1个扩展到1000个,放在负载均衡器后面即可。
缺点:
客户端(Master)逻辑复杂:Master 不仅要管理任务,还要管理 Worker 的“健康状态”和“地址列表”(即服务发现)。如果 Worker A 挂了,Master 怎么知道?它如何重试分配给 A 的任务?这非常复杂。
模式三:双方管理状态(Hybrid)
谁管理? 双方都管理一部分。
实例:TCP 连接。
Server 和 Client 都必须维护一个关于连接状态(ESTABLISHED, FIN_WAIT, CLOSE_WAIT…)的“状态机”。如果一方的状态机与另一方不同步(例如一方认为连接已关闭,另一方还认为已连接),通信就会失败。
优点:
适用于需要持久、双向通信的场景。
缺点:
复杂度极高,状态同步是分布式系统中最难的问题之一。应尽量避免。
结论与最佳实践
对于“任务分发”这个场景:
最佳实践永远是“模式一”(服务端管理状态,即我们的 Pull 模式)。
让 Master 成为“状态的唯一事实来源 (Single Source of Truth)”,Worker 则是“无状态的劳动力 (Stateless Workforce)”。
这种架构的弹性是最好的:
Worker 挂了? 没关系,Master 会通过超时机制把任务交给别人。
要加100个 Worker? 没关系,启动它们,它们会自动去 Master 领活。
Master 挂了? ……这才是你唯一需要担心的问题。你需要为 Master 做高可用(HA)集群(例如使用 Raft/Etcd 来保证 Master 状态的一致性),但这是另一个更高级的话题了。