Go 语言并发利器:chan struct{}
与 close()
的妙用
在 Go 语言的并发世界里,goroutine 和 channel 是构建强大并发程序的基石。今天,我们来深入探讨一种特殊但非常实用的 channel 类型:chan struct{}
,以及与之配合使用的 close()
函数。理解它们的机制和应用场景,能帮助我们写出更优雅、更高效的并发代码。
chan struct{}
:轻量级的信号通道
通常,channel 用于在 goroutine 之间传递数据。但有时,我们仅仅需要一个信号,通知某个或某些 goroutine 某个事件已经发生,而不需要传递任何具体的数据。这时,chan struct{}
就派上了用场。
struct{}
是一个零大小的类型,这意味着通过 chan struct{}
传递数据不会产生额外的内存开销。我们仅仅关心 channel 是否被发送或接收,以此作为事件发生的标志。
声明和使用 chan struct{}
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| package main
import "fmt"
func worker(done chan struct{}) { fmt.Println("Worker is starting...") fmt.Println("Worker is done!") done <- struct{}{} }
func main() { done := make(chan struct{}) go worker(done) <-done fmt.Println("All done!") }
|
在上面的例子中,done
channel 的类型是 chan struct{}
。worker
goroutine 完成工作后,向 done
channel 发送一个空结构体 struct{}
。main
goroutine 通过接收这个信号得知 worker
已经完成。
close()
:优雅地广播“结束”信号
close()
函数用于关闭一个 channel。关闭一个 channel 会向所有正在等待从该 channel 接收数据的 goroutine 发送一个零值(对于 chan struct{}
来说就是零值,即可以接收到信号),并且后续不能再向已关闭的 channel 发送数据,否则会引发 panic。
接收方可以通过接收操作的第二个返回值来判断 channel 是否已经关闭:
如果 ok
是 true
,表示接收到了 channel 中发送的值;如果 ok
是 false
,表示 channel 已经关闭,并且 channel 中没有更多的数据可以接收。
使用 close()
通知多个 goroutine 停止工作:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| package main
import ( "fmt" "sync" "time" )
func worker(id int, quit chan struct{}, wg *sync.WaitGroup) { defer wg.Done() fmt.Printf("Worker %d starting...\n", id) for { select { case <-quit: fmt.Printf("Worker %d received quit signal. Exiting.\n", id) return case <-time.After(time.Second): fmt.Printf("Worker %d is working...\n", id) } } }
func main() { numWorkers := 3 var wg sync.WaitGroup quit := make(chan struct{})
for i := 0; i < numWorkers; i++ { wg.Add(1) go worker(i+1, quit, &wg) }
time.Sleep(3 * time.Second) fmt.Println("Sending quit signal to all workers...") close(quit)
wg.Wait() fmt.Println("All workers have exited.") }
|
在这个例子中,我们创建了多个 worker
goroutine,它们都监听同一个 quit
channel。当 main
goroutine 调用 close(quit)
时,所有正在阻塞等待从 quit
接收数据的 worker
都会收到一个零值,并通过 select
语句中的 case <-quit:
分支退出。sync.WaitGroup
用于等待所有 worker goroutine 优雅地退出。
实际应用场景
1. 事件通知:更细致的场景
在微服务架构中,一个服务完成某个关键操作后,可能需要通知其他依赖它的服务。使用 chan struct{}
可以实现这种轻量级的通知机制。
场景:订单服务完成支付,通知库存服务扣减库存。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| package main
import ( "fmt" "sync" "time" )
func inventoryService(orderPaid <-chan struct{}, wg *sync.WaitGroup) { defer wg.Done() for range orderPaid { fmt.Println("库存服务:收到订单支付通知,正在扣减库存...") time.Sleep(1 * time.Second) fmt.Println("库存服务:库存扣减完成。") } fmt.Println("库存服务已停止。") }
func orderService(orderID string, paid chan<- struct{}) { fmt.Printf("订单服务:订单 %s 开始处理支付...\n", orderID) time.Sleep(2 * time.Second) fmt.Printf("订单服务:订单 %s 支付完成。\n", orderID) paid <- struct{}{} }
func main() { var wg sync.WaitGroup orderPaid := make(chan struct{})
wg.Add(1) go inventoryService(orderPaid, &wg)
go orderService("order-123", orderPaid)
time.Sleep(5 * time.Second) close(orderPaid) wg.Wait() fmt.Println("主服务已停止。") }
|
在这个例子中,orderPaid
是一个 chan struct{}
。当订单服务完成支付后,会向 orderPaid
发送一个信号。库存服务监听这个信号,一旦收到,就开始扣减库存。使用 chan struct{}
的好处是,我们只关心“支付完成”这个事件的发生,不需要传递订单的具体信息给库存服务(订单信息应该通过其他更适合的方式传递)。最后通过 close(orderPaid)
通知库存服务可以停止监听了。
2. 服务优雅关闭:更全面的考量
在构建长时间运行的服务(如 Web 服务器、后台任务处理程序)时,优雅关闭至关重要。这涉及到通知所有正在运行的 goroutine 停止工作,并等待它们清理资源后退出。
场景:一个简单的 Web 服务器,需要能够优雅地停止所有请求处理 goroutine。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
| package main
import ( "fmt" "net/http" "os" "os/signal" "sync" "syscall" "time" )
func requestHandler(w http.ResponseWriter, r *http.Request, quit <-chan struct{}, wg *sync.WaitGroup) { defer wg.Done() select { case <-quit: fmt.Println("请求处理 goroutine 收到停止信号,正在退出...") return case <-time.After(5 * time.Second): fmt.Fprintf(w, "请求处理完成 at %s\n", time.Now().Format(time.RFC3339)) fmt.Println("请求处理完成。") } }
func main() { var wg sync.WaitGroup quit := make(chan struct{})
http.HandleFunc("/process", func(w http.ResponseWriter, r *http.Request) { wg.Add(1) go requestHandler(w, r, quit, &wg) })
server := &http.Server{Addr: ":8080", Handler: nil}
go func() { fmt.Println("Web 服务器正在监听 :8080") if err := server.ListenAndServe(); err != http.ErrServerClosed { fmt.Fatalf("HTTP server ListenAndServe: %v", err) } fmt.Println("Web 服务器已停止监听。") }()
sig := make(chan os.Signal, 1) signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
<-sig fmt.Println("\n收到停止信号,开始关闭服务器...")
close(quit)
shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() if err := server.Shutdown(shutdownCtx); err != nil { fmt.Printf("HTTP server shutdown error: %v\n", err) }
wg.Wait() fmt.Println("所有 goroutine 已退出,服务器已优雅关闭。") }
|
在这个更复杂的例子中:
- 我们创建了一个
quit
channel (chan struct{}
) 用于通知请求处理的 goroutine 停止。
- 当收到操作系统的停止信号时,我们首先
close(quit)
,向所有正在处理请求的 requestHandler
goroutine 发送停止信号。
- 然后,我们使用
http.Server.Shutdown
来优雅地关闭 HTTP 服务器,这会阻止新的连接,并等待正在处理的连接完成(在超时时间内)。
- 最后,我们使用
wg.Wait()
等待所有正在处理的 requestHandler
goroutine 退出。
close(quit)
在这里起到了广播停止信号的关键作用,使得多个并发的请求处理 goroutine 能够收到通知并开始清理退出。
3. 限流:令牌桶的简单实现
虽然 Go 语言有更专业的限流库,但使用带缓冲的 chan struct{}
可以实现一个简单的令牌桶限流器。
场景:限制某个操作的并发执行数量。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| package main
import ( "fmt" "sync" "time" )
func worker(id int, rateLimiter <-chan struct{}, wg *sync.WaitGroup) { defer wg.Done() <-rateLimiter fmt.Printf("Worker %d started at %s\n", id, time.Now().Format(time.RFC3339)) time.Sleep(2 * time.Second) fmt.Printf("Worker %d finished at %s\n", id, time.Now().Format(time.RFC3339)) }
func main() { numWorkers := 5 concurrencyLimit := 2 var wg sync.WaitGroup rateLimiter := make(chan struct{}, concurrencyLimit)
for i := 0; i < concurrencyLimit; i++ { rateLimiter <- struct{}{} }
for i := 0; i < numWorkers; i++ { wg.Add(1) go worker(i+1, rateLimiter, &wg) time.Sleep(500 * time.Millisecond) select { case rateLimiter <- struct{}{}: default: } }
wg.Wait() fmt.Println("All workers finished.") }
|
在这个例子中,rateLimiter
是一个容量为 concurrencyLimit
的 chan struct{}
。每个 worker
在开始工作前需要从 rateLimiter
接收一个值(相当于获取一个“令牌”)。只有当 rateLimiter
中有可用的“令牌”时,worker
才能继续执行。虽然这个例子中的令牌补充比较简单,但它展示了如何使用带缓冲的 chan struct{}
来控制并发数量。