raft-example

在etcd当中,提供了一个raft-example,该程序并非构建了一个完整的Raft模块,而是对Raft模块的的基本使用。并在此基础上构建了一个简单的KV存储结构。而Raft模块是作为一个包的形式存在的,Raft模块只提供如Leader Election Log Replication Snapshot等Raft的基本逻辑功能,而在此之上的存储、网络通信等都交给了使用Raft包的开发者来自行决定。

因此最终的结构就分为了三层:

  • 最底层的Raft Package,只提供Raft最基本的逻辑功能
  • Raft服务器,调用底层的Raft Package的相关API,自定义日志和快照的存储以及网络通信等
  • 内存数据库,和Raft服务器之间通过channel进行通信,向下提交日志,并且接受Raft服务器等Commit信息,之后将其应用于自身的存储模块当中。

kvstore

此处的设计基本上与MIT6.824一致,首先看一下kvstore的结构体

1
2
3
4
5
6
type kvstore struct {
	proposeC    chan<- string // channel for proposing updates
	mu          sync.RWMutex
	kvStore     map[string]string // current committed key-value pairs
	snapshotter *snap.Snapshotter
}
  • proposeC用于向下层的Raft服务器提交新的请求,之后下层的Raft服务器拿到之后调用Raft Package当中的相关API封装为日志,之后形成共识
  • kvStore即为内存数据库,但是可以通过快照 + WAL日志来提供持久化稳定存储
  • snapshotter用于处理快照等相关消息,如Load Save等,在etcdserver/api/snap当中实现

创建

kvStore的创建过程也比较简单,首先尝试从快照当中恢复数据,之后就单独开启一个goroutine,监听从下层Raft传来的日志提交信息,如果提交,就将其应用到自身

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
func newKVStore(snapshotter *snap.Snapshotter, proposeC chan<- string, commitC <-chan *commit, errorC <-chan error) *kvstore {
	s := &kvstore{proposeC: proposeC, kvStore: make(map[string]string), snapshotter: snapshotter}
	snapshot, err := s.loadSnapshot()
	if err != nil {
		log.Panic(err)
	}
	if snapshot != nil {
		log.Printf("loading snapshot at term %d and index %d", snapshot.Metadata.Term, snapshot.Metadata.Index)
		if err := s.recoverFromSnapshot(snapshot.Data); err != nil {
			log.Panic(err)
		}
	}
	// read commits from raft into kvStore map until error
	go s.readCommits(commitC, errorC)
	return s
}

readCommit的实现和MIT6.824当中的实现稍有不同,由于ectd的快照是真实存储的,因此下层的Raft只需要通过一个nil​来告知一下上层有了新的快照,之后上层就可以去进行读取,之后就是读取commit信息应用于自身,最后通过close chan的形式通知下层Raft上层应用完毕。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func (s *kvstore) readCommits(commitC <-chan *commit, errorC <-chan error) {
	for commit := range commitC {
		// 相比于MIT 6.824 此处的snapshot为真正持久化的,因此无需通过channel传输
		// 因此在chan当中只需要发送一个nil用于通知即可
		if commit == nil {
			// signaled to load snapshot
			snapshot, err := s.loadSnapshot()
			if err != nil {
				log.Panic(err)
			}
			if snapshot != nil {
				log.Printf("loading snapshot at term %d and index %d", snapshot.Metadata.Term, snapshot.Metadata.Index)
				if err := s.recoverFromSnapshot(snapshot.Data); err != nil {
					log.Panic(err)
				}
			}
			continue
		}
		// handle the commit data
		// ....
		close(commit.applyDoneC)
	}
	if err, ok := <-errorC; ok {
		log.Fatal(err)
	}
}

除了几个从Snapshot当中加载数据的函数之外,就只剩写入和读取了。写入操作和MIT6.824一样,当提交了一个写入请求之后,向下提交一个日志,此时kvstore会一直阻塞,直至下层的Raft形成了共识并向上传递了commit的信息,之后才会通知客户端写入成功。

但是对于读取操作,在raft-example当中并没有通过Raft日志来保证强一致性,而是直接在Leader处进行本地读的操作,可以提高读操作的qps,但是相对的,强一致性就无法得到保证。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
func (s *kvstore) Lookup(key string) (string, bool) {
	s.mu.RLock()
	defer s.mu.RUnlock()
	v, ok := s.kvStore[key]
	return v, ok
}

func (s *kvstore) Propose(k string, v string) {
	var buf bytes.Buffer
	if err := gob.NewEncoder(&buf).Encode(kv{k, v}); err != nil {
		log.Fatal(err)
	}
	// 将kv写入到下层,等待raft完成共识
	s.proposeC <- buf.String()
}

在kvstore之上还有一层httpApi,用于对外提供网络服务,比较简单就放一下代码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
func (h *httpKVAPI) ServeHTTP(w http.ResponseWriter, r *http.Request) {
	key := r.RequestURI
	defer r.Body.Close()
	switch {
	case r.Method == "PUT":
		v, err := ioutil.ReadAll(r.Body)
		if err != nil {
			log.Printf("Failed to read on PUT (%v)\n", err)
			http.Error(w, "Failed on PUT", http.StatusBadRequest)
			return
		}

		h.store.Propose(key, string(v))

		// Optimistic-- no waiting for ack from raft. Value is not yet
		// committed so a subsequent GET on the key may return old value
		w.WriteHeader(http.StatusNoContent)
	case r.Method == "GET":
		if v, ok := h.store.Lookup(key); ok {
			w.Write([]byte(v))
		} else {
			http.Error(w, "Failed to GET", http.StatusNotFound)
		}
	case r.Method == "POST":
		url, err := ioutil.ReadAll(r.Body)
		if err != nil {
			log.Printf("Failed to read on POST (%v)\n", err)
			http.Error(w, "Failed on POST", http.StatusBadRequest)
			return
		}

		nodeId, err := strconv.ParseUint(key[1:], 0, 64)
		if err != nil {
			log.Printf("Failed to convert ID for conf change (%v)\n", err)
			http.Error(w, "Failed on POST", http.StatusBadRequest)
			return
		}

		cc := raftpb.ConfChange{
			Type:    raftpb.ConfChangeAddNode,
			NodeID:  nodeId,
			Context: url,
		}
		h.confChangeC <- cc

		// As above, optimistic that raft will apply the conf change
		w.WriteHeader(http.StatusNoContent)
	case r.Method == "DELETE":
		nodeId, err := strconv.ParseUint(key[1:], 0, 64)
		if err != nil {
			log.Printf("Failed to convert ID for conf change (%v)\n", err)
			http.Error(w, "Failed on DELETE", http.StatusBadRequest)
			return
		}

		cc := raftpb.ConfChange{
			Type:   raftpb.ConfChangeRemoveNode,
			NodeID: nodeId,
		}
		h.confChangeC <- cc

		// As above, optimistic that raft will apply the conf change
		w.WriteHeader(http.StatusNoContent)
	default:
		w.Header().Set("Allow", "PUT")
		w.Header().Add("Allow", "GET")
		w.Header().Add("Allow", "POST")
		w.Header().Add("Allow", "DELETE")
		http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
	}
}

Raft

Raft模块,或者说Raft服务器,构建于Raft的包之上,Raft包提供的Leader Eelction Log Replication功能之上,再提供日志和快照的存储形式、节点之间的通信方式等功能,构建出一个完整的Raft。

首先还是看一下Raft的结构体

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
type raftNode struct {
	proposeC    <-chan string            // proposed messages (k,v)
	confChangeC <-chan raftpb.ConfChange // proposed cluster config changes
	commitC     chan<- *commit           // entries committed to log (k,v)
	errorC      chan<- error             // errors from raft session

	id          int      // client ID for raft session
	peers       []string // raft peer URLs
	join        bool     // node is joining an existing cluster
	waldir      string   // path to WAL directory
	snapdir     string   // path to snapshot directory
	getSnapshot func() ([]byte, error)

	confState     raftpb.ConfState
	snapshotIndex uint64
	appliedIndex  uint64

	// raft backing for the commit/error channel
	node        raft.Node
	raftStorage *raft.MemoryStorage
	wal         *wal.WAL

	snapshotter      *snap.Snapshotter
	snapshotterReady chan *snap.Snapshotter // signals when snapshotter is ready

	snapCount uint64
	transport *rafthttp.Transport
	stopc     chan struct{} // signals proposal channel closed
	httpstopc chan struct{} // signals http server to shutdown
	httpdonec chan struct{} // signals http server shutdown complete

	logger *zap.Logger
}
proposeC从 kvstore当中接受新的请求,创建为日志
confChangeCRaft集群配置更新的相关消息
commitC向kvstore发送日志提交的信息
errorC传递错误信息
snapshotIndex快照化的最后一条日志的Index,用于在故障恢复之后找到Index
appliedIndex应用于kvstorte的最后一条日志Index,用于故障恢复
nodeRaft Package对外提供的API接口
raftStorage稳定存储日志,由于使用了WAL,因此可以使用内存的形式稳定存储
wal对WAL日志操作的封装
transportraft节点之间的通信

此外还有三个struct{}的 chan用于进行消息通知,接收方使用select阻塞,发送方向其中发送一个空结构体即可使其从阻塞当中恢复。

Raft的创建

定义一个newRaftNode供上层的kvstore进行调用,传入一个propose的channel用于接受kvstore的新的请求,之后初始化raftNode的部分值,并将commitC与errorC返回交给kvstore。之后再单独开启一个goroutine调用startRaft来启动一些raft的服务。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func newRaftNode(id int, peers []string, join bool, getSnapshot func() ([]byte, error), proposeC <-chan string,
	confChangeC <-chan raftpb.ConfChange) (<-chan *commit, <-chan error, <-chan *snap.Snapshotter) {

	commitC := make(chan *commit)
	errorC := make(chan error)

	rc := &raftNode{
		proposeC:    proposeC,
		confChangeC: confChangeC,
		commitC:     commitC,
		errorC:      errorC,
		id:          id,
		peers:       peers,
		join:        join,
		waldir:      fmt.Sprintf("raftexample-%d", id),
		snapdir:     fmt.Sprintf("raftexample-%d-snap", id),
		getSnapshot: getSnapshot,
		snapCount:   defaultSnapshotCount,
		stopc:       make(chan struct{}),
		httpstopc:   make(chan struct{}),
		httpdonec:   make(chan struct{}),

		logger: zap.NewExample(),

		snapshotterReady: make(chan *snap.Snapshotter, 1),
		// rest of structure populated after WAL replay
	}
	go rc.startRaft()
	return commitC, errorC, rc.snapshotterReady
}

初始化

startRaft

在startRaft当中继续完成一部分初始化的工作,当前的节点有可能是之前宕机之后重启,因此需要首先检查快照与写入的WAL,从中读取快照并通知上层的kvstore去应用快照恢复内存数据库,和恢复Raft日志到memoryStorage当中。

之后再对底层的Raft Package进行一些相关的配置,如设置心跳信息等

etcd当中使用的为逻辑时钟,即通过Tick来推进时钟,如将heartbeat的间隔设置为1次tick,选举的时间间隔设置为10次tick

之后再定义raft节点之间的传输协议之后,初始化基本完成,之后分别开启一个协程去负责节点之间的通信和一个用于处理和kvstore的channel和raft package的channel

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
func (rc *raftNode) startRaft() {
	// handle snapshot and wal
	// ...
	c := &raft.Config{
		ID:                        uint64(rc.id),
		ElectionTick:              10,
		HeartbeatTick:             1,
		Storage:                   rc.raftStorage,
		MaxSizePerMsg:             1024 * 1024,
		MaxInflightMsgs:           256,
		MaxUncommittedEntriesSize: 1 << 30,
	}
	// 在创建节点时,如果通过原本的WAL日志进行log和snapshot的加载
	// 或者为中途加入到集群当中的节点相比于新节点少了通过bootstrap进行初始化加载
	if oldwal || rc.join {
		rc.node = raft.RestartNode(c)
	} else {
		rc.node = raft.StartNode(c, rpeers)
	}

	rc.transport = &rafthttp.Transport{
		Logger:      rc.logger,
		ID:          types.ID(rc.id),
		ClusterID:   0x1000,
		Raft:        rc,
		ServerStats: stats.NewServerStats("", ""),
		LeaderStats: stats.NewLeaderStats(zap.NewExample(), strconv.Itoa(rc.id)),
		ErrorC:      make(chan error),
	}

	rc.transport.Start()
	for i := range rc.peers {
		if i+1 != rc.id {
			rc.transport.AddPeer(types.ID(i+1), []string{rc.peers[i]})
		}
	}
	// serveRaft对外监听端口提供服务
	// serveChannels处理存储层发起的请求,和raftNode所传来的相关信息
	go rc.serveRaft()
	go rc.serveChannels()
}

channels处理

重点看一下serveChannels

在serverChannels当中,又单独开了一个goroutine,其中通过select 监听kvstore的propose channel和集群配置的channel,调用Raft Package的API交给下层的Raft Package进行处理。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
	go func() {
		confChangeCount := uint64(0)

		for rc.proposeC != nil && rc.confChangeC != nil {
			select {
			case prop, ok := <-rc.proposeC:
				if !ok {
					rc.proposeC = nil
				} else {
					// blocks until accepted by raft state machine
					rc.node.Propose(context.TODO(), []byte(prop))
				}

			case cc, ok := <-rc.confChangeC:
				if !ok {
					rc.confChangeC = nil
				} else {
					confChangeCount++
					cc.ID = confChangeCount
					rc.node.ProposeConfChange(context.TODO(), cc)
				}
			}
		}
		// client closed channel; shutdown raft if not already
		close(rc.stopc)
	}()

而serverChannels自身的goroutine用于处理Raft Package处理完毕的消息,共select 两个channel,一个用于处理定时器,如果通过这个channel收到了消息那么就调用Tick函数推进逻辑时钟。

另外一个Channel是通过raftnode.Ready()函数返回的一个 chan Ready

Ready的结构体定义如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
type Ready struct {
	// The current volatile state of a Node.
	// SoftState will be nil if there is no update.
	// It is not required to consume or store SoftState.
	*SoftState

	// The current state of a Node to be saved to stable storage BEFORE
	// Messages are sent.
	// HardState will be equal to empty state if there is no update.
	pb.HardState

	// ReadStates can be used for node to serve linearizable read requests locally
	// when its applied index is greater than the index in ReadState.
	// Note that the readState will be returned when raft receives msgReadIndex.
	// The returned is only valid for the request that requested to read.
	ReadStates []ReadState

	// Entries specifies entries to be saved to stable storage BEFORE
	// Messages are sent.
	Entries []pb.Entry

	// Snapshot specifies the snapshot to be saved to stable storage.
	Snapshot pb.Snapshot

	// CommittedEntries specifies entries to be committed to a
	// store/state-machine. These have previously been committed to stable
	// store.
	CommittedEntries []pb.Entry

	// Messages specifies outbound messages to be sent AFTER Entries are
	// committed to stable storage.
	// If it contains a MsgSnap message, the application MUST report back to raft
	// when the snapshot has been received or has failed by calling ReportSnapshot.
	Messages []pb.Message

	// MustSync indicates whether the HardState and Entries must be synchronously
	// written to disk or if an asynchronous write is permissible.
	MustSync bool
}

此处我们只需要HardState、Entries、Snapshot、Messages、CommitedEntries

就像上文所说的那样,Raft Package并不负责通信和存储,这两部分都要交给RaftServer处理,因此将HardState、Entries写入到WAL当中,Snapshot同样进行保存。之后再把Entries添加到raftStorage当中。

而Message即为当前节点产生的所有的需要发送给其他节点的消息,都需要在此处进行发送,通过之前定义的transport模块进行发送。

而对于底层Raft Package已经达成共识认定为committed 的Entries,通过publishEntries​处理后通过commitC通知上层的kvstore。