Part-A: 选主
核心内容
这一章要解决的核心问题是:在一个去中心化的系统里,当老大(Leader)挂了,群众(Followers)如何自动选出一个新老大,并且保证只选出一个?
节点角色
- Follower(跟随者/群众):
- Candidate(候选人/竞选者):
- Leader(领导者):
选举机制
1. 心跳与超时
Raft 的选举完全依靠时间来驱动。这里有两个至关重要的时间概念,请务必区分清楚:
- 心跳间隔 (Heartbeat Interval):
- 谁发? Leader。
- 频率:很高(例如每 50ms - 100ms)。
- 作用:Leader 不断给所有 Follower/Candidate 发空消息(AppendRpc),收到的节点需要保持或切换为“Follower”。
- 选举超时 (Election Timeout):
- 谁用? Follower。
- 时长:随机的(例如 150ms - 300ms 之间)。
- 作用:Follower 进程中持有一个倒计时器,每次收到 Leader 的心跳,倒计时器就清零重置。
- 触发:如果倒计时归零了,Follower 就会认为 Leader 挂了,立马造反,切换为Candidate触发选举。
选举流程
- 正常状态。老leader不断发送心跳(空AppendRpc),Follower和已经变成Candidate的Follower收到立即保持Follower(即使已经有节点投给他票了,也要立即作为普通节点);
- 领导下线。老leader宕机,停止心跳。
- 触发超时。Follower切换到Candidate,增加自己任期,给自己头上一票先。
- 发起选举。Candidate广播发送RequestVoteRpc,其中说明自己自己的任期和其他一些信息(按下不表先)。
- 群众投票。其他节点(包括Candidate),在投票有两个原则“唯任期论”和“先到先得”,都是顾名思义。
- 统计结果。Candidate在处理Rpc结果的同时统计得票数,超过半数立即自封Leader,不足则继续尝试,直到成功或收到心跳(已经有其他人上位成功了,变回跟随着)。
实现思路
一些关键的代码实现细节总结
首先考虑这个机制下Raft节点需要维护的属性
1 2 3 4 5
| currentTerm int votedFor int state State
lastElectionReset time.Time
|
然后从流程出发,先看看正常运行下需要实现的东西,显然是“心跳机制”
在Raft算法中,一共只有两种Rpc:
Raft 节点之间只通过两种主要的消息(RPC - 远程过程调用)进行沟通:
- RequestVote RPC:用于选举。
- AppendEntries RPC:用于日志复制和心跳。
- Leader:“这是新数据,记下来。” 或者 “我还活着(不带数据)。”
所以我们先实现AppendEntries RPC:
不要忘记RPC属性大写
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
| type AppendEntriesArgs struct { Term int Leader int Entries []any
}
type AppendEntriesReply struct { Term int Success bool }
func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) { rf.mu.Lock() defer rf.mu.Unlock()
if args.Term < rf.currentTerm { reply.Term = rf.currentTerm reply.Success = false return }
rf.currentTerm = args.Term
rf.lastElectionReset = time.Now()
rf.state = StateFollower
reply.Term = rf.currentTerm reply.Success = true
}
func (rf *Raft) sendAppendEntries(server int, args *AppendEntriesArgs, reply *AppendEntriesReply) bool { ok := rf.peers[server].Call("Raft.AppendEntries", args, reply) return ok }
|
然后我们想到什么时候发送心跳Rpc呢?显然是leader的日常工作(循环),找到节点的ticker:
为了看起来清晰一点,就去掉一些并发锁之类的,这当然是非常重要的,这里先不讨论(我自己感觉写的也不咋地)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| func (rf *Raft) ticker() { for rf.killed() == false {
if rf.state == StateLeader { rf.broadcastHeartbeat() }else if time.Since(rf.lastElectionReset) > timeout { rf.startElection() }
time.Sleep(50 * time.Millisecond) } }
|
然后我们考虑一下怎么广播心跳信号,1.构造一个“当前状态”的AppendRpc,2.目标是除自己外的所有节点,3.并发发送,4.处理回收(看看有没有真领导,即任期更大的节点,有的话直接降级为Follower跟着真领导混):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| func (rf *Raft) broadcastHeartbeat() {
if rf.state != StateLeader { return }
for peer := range rf.peers { if peer == rf.me { continue } go func(server int) { args := &AppendEntriesArgs{ Term: rf.currentTerm, Leader: rf.me, Entries: nil, } reply := &AppendEntriesReply{} if rf.sendAppendEntries(peer, args, reply) {
if rf.state != StateLeader { return }
if reply.Term > rf.currentTerm { rf.currentTerm = reply.Term rf.state = StateFollower rf.votedFor = -1 rf.lastElectionReset = time.Now() } } }(peer) } }
|
那么显然这个功能只剩下最重要的“选举”环节了,上文我们已经定义了触发时机(超时),现在我们定义一下这个Rpc:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| type RequestVoteArgs struct { Term int CandidateId int }
type RequestVoteReply struct { Term int VoteGranted bool }
func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
if args.Term > rf.currentTerm { rf.currentTerm = args.Term rf.state = StateFollower rf.votedFor = -1 }
reply.Term = rf.currentTerm
if args.Term < rf.currentTerm { return }
if rf.votedFor == -1 || rf.votedFor == args.CandidateId { reply.VoteGranted = true rf.votedFor = args.CandidateId rf.lastElectionReset = time.Now() } }
|
有了Rpc就可以发起选举了:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
| func (rf *Raft) startElection() {
if rf.state == StateLeader || time.Since(rf.lastElectionReset) < 10*time.Millisecond { return }
rf.state = StateCandidate rf.currentTerm++
rf.votedFor = rf.me votes := 1
rf.lastElectionReset = time.Now()
for peer := range rf.peers { if peer == rf.me { continue } go func(serverID int) { args := &RequestVoteArgs{ Term: rf.currentTerm, CandidateId: rf.me, } reply := &RequestVoteReply{} if rf.sendRequestVote(peer, args, reply) {
if reply.Term > rf.currentTerm { rf.state = StateFollower rf.currentTerm = reply.Term rf.votedFor = -1 return }
if rf.state != StateCandidate || rf.currentTerm != args.Term { return }
if reply.VoteGranted { votes++ if votes > len(rf.peers)/2 { rf.state = StateLeader } } } }(peer) } }
|
以上就是Lab3-A的核心内容了,其中的并发加锁解锁处理没有讨论,读者自己折腾吧