使用 Raft 实现 VIP 功能

背景

在部署应用时想要应用是高可用,通常会在应用前放置一个 HAProxy,当任何一个 Server 故障,HAProxy 会自动切换,但是 HAProxy 也存在单点故障,因此需要多个 HAProxy 来保证业务不中断,这时候我们需要另一个软件配合:Keepalived。通常我用 Keepalived 仅用来提供 VIP,保证当一个 Keepalived 故障,VIP 自动在其他 Keepalived 节点配置。

Keepalived 有一个问题是 virtual route ID 必须是同一网段内唯一的,当我们想要在一个网段内部署多个集群时,就需要人为的介入去分配 virtual route ID,不方便。这次来使用 Raft 自己实现 VIP 逻辑。

hashicorp/raft

Raft 有很多开源实现,其中 Hashicorp 实现的 Raft 库 已经被 Consul 等软件使用,且接口友善,选择使用它来实现。在 Github 上有很多 Raft 的使用示例,比较简单且完整的是 otoolep/hraftd,我们来看看他是怎么使用的。

otoolep/hraftd

main.go

在 main.go 中主要做了 4 件事情:store.New, store.Open, http.New, http.Start,先来看看程序是如何启动的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
func init() {
// 设置命令行参数
flag.BoolVar(&inmem, "inmem", false, "Use in-memory storage for Raft")
...
flag.Usage = func() {
fmt.Fprintf(os.Stderr, "Usage: %s [options] <raft-data-path> \n", os.Args[0])
flag.PrintDefaults()
}
}

func main() {
// 解析命令行参数
flag.Parse()
...
// 创建一个 Store 对象
s := store.New(inmem)
s.RaftDir = raftDir
s.RaftBind = raftAddr
// 运行 Store
if err := s.Open(joinAddr == "", nodeID); err != nil {
log.Fatalf("failed to open store: %s", err.Error())
}
// 新建一个 http 对象并运行
h := httpd.New(httpAddr, s)
if err := h.Start(); err != nil {
log.Fatalf("failed to start HTTP service: %s", err.Error())
}

// 如果 joinAddr 参数不为空,则处理 join 请求
if joinAddr != "" {
if err := join(joinAddr, raftAddr, nodeID); err != nil {
log.Fatalf("failed to join node at %s: %s", joinAddr, err.Error())
}
}

log.Println("hraftd started successfully")
// 监听系统信号,若接收到 os.Interrupt 则程序退出
terminate := make(chan os.Signal, 1)
signal.Notify(terminate, os.Interrupt)
<-terminate
log.Println("hraftd exiting")
}

http

看了 main.go 我们知道调用了 http.Start, 先不管 Store 是什么,先来看看 http 相关实现:

1
2
3
4
5
6
7
8
// Start starts the service.
func (s *Service) Start() error {
// Service 实现了 ServeHTTP 方法
http.Handle("/", s)
go func() {
err := server.Serve(s.ln)
...
}
1
2
3
4
5
func (s *Service) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if strings.HasPrefix(r.URL.Path, "/key") {
s.handleKeyRequest(w, r)
// 先忽略其他分支
}

主要处理请求的是 s.handleKeyRequest 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func (s *Service) handleKeyRequest(w http.ResponseWriter, r *http.Request) {
...
switch r.Method {
case "GET":
k := getKey()
if k == "" {
w.WriteHeader(http.StatusBadRequest)
}
v, err := s.store.Get(k)
...
io.WriteString(w, string(b))
case "POST":
// Read the value from the POST body.
m := map[string]string{}
if err := json.NewDecoder(r.Body).Decode(&m); err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
for k, v := range m {
if err := s.store.Set(k, v); err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
}
...

s.handleKeyRequest 中根据请求方法,去调用 store 对应的方法,那么 store 实现了哪些接口呢?这也是在 http 模块中定义的:

1
2
3
4
5
6
type Store interface {
Get(key string) (string, error)
Set(key, value string) error
Delete(key string) error
Join(nodeID string, addr string) error
}

除了 Join 看着比较奇怪,其他的都是一个 K/V 系统该有的接口,接下来就看看 Store 具体方法的实现。

store

这个模块的编写涉及到了 Raft 中的具体概念,建议先阅读 siddontang 写的 Raft 相关博客快速了解(链接在参考链接列出)。

以下以设置 Key 为例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func (s *Store) Set(key, value string) error {
if s.raft.State() != raft.Leader {
return fmt.Errorf("not leader")
}

// 封装具体执行的动作
c := &command{
Op: "set",
Key: key,
Value: value,
}
b, err := json.Marshal(c)
...
// 将 command 应用于 FSM
f := s.raft.Apply(b, raftTimeout)
return f.Error()
}

查看 FSM Apply 方法实现:

1
2
3
4
5
6
7
8
9
10
// Apply applies a Raft log entry to the key-value store.
func (f *fsm) Apply(l *raft.Log) interface{} {
var c command
// 根据操作动作的不同,执行不同的方法,这里以设置 Key 为例
switch c.Op {
case "set":
return f.applySet(c.Key, c.Value)
...
}
}

1
2
3
4
5
6
7
8
func (f *fsm) applySet(key, value string) interface{} {
// 互斥锁
f.mu.Lock()
defer f.mu.Unlock()
// 设置 Map 中的 Key/Value
f.m[key] = value
return nil
}

raft

看了上面的具体动作实现,接下来看看 Raft 具体启动:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
func (s *Store) Open(enableSingle bool, localID string) error {
// 设置 Raft 配置
config := raft.DefaultConfig()
config.LocalID = raft.ServerID(localID)
// 设置 Raft 通信
addr, err := net.ResolveTCPAddr("tcp", s.RaftBind)
...
transport, err := raft.NewTCPTransport(s.RaftBind, addr, 3, 10*time.Second, os.Stderr)
// 设置 Raft 存储对象
snapshots, err := raft.NewFileSnapshotStore(s.RaftDir, retainSnapshotCount, os.Stderr)
var logStore raft.LogStore
var stableStore raft.StableStore
if s.inmem {
logStore = raft.NewInmemStore()
stableStore = raft.NewInmemStore()
} else {
boltDB, err := raftboltdb.NewBoltStore(filepath.Join(s.RaftDir, "raft.db"))
if err != nil {
return fmt.Errorf("new bolt store: %s", err)
}
logStore = boltDB
stableStore = boltDB
}
// 创建 raft 示例,并使用该 raft 实例启动集群
ra, err := raft.NewRaft(config, (*fsm)(s), logStore, stableStore, snapshots, transport)
if err != nil {
return fmt.Errorf("new raft: %s", err)
}
s.raft = ra
if enableSingle {
...
ra.BootstrapCluster(configuration)
}
}

这里可以看到 Raft 所需接口为 FSM 和 Snapshot,具体的实现方式根据需求来实现,一般与 hraftd 相仿,大概了解了 hashicorp/raft 的使用,那么我们来实现具体的 VIP 功能。

VIP

network

既然是跟 IP 相关,那么肯定需要给对应网卡配置时间,在 Linux 中我们可以通过 ip 命令来设置,Golang 中使用 vishvananda/netlink 来实现。

netlink.AddrAdd 可以在指定的网络设备上添加 IP 地址。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func (nc *NetworkConfig) addIP() error {
res, err := nc.IsSet()
if err != nil {
return errors.Wrap(err, "ip check in AddIP failed")

}
if res {
return nil
}
if err := netlink.AddrAdd(nc.link, nc.address); err != nil {
return errors.Wrap(err, "could not add ip")
}
return nil
}

netlink.AddrDel 可以将 IP 从指定网络设备上删除:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func (nc *NetworkConfig) delIP() error {
res, err := nc.IsSet()
if err != nil {
return errors.Wrap(err, "ip check in DelIP failed")
}

if !res {
return nil
}

if err := netlink.AddrDel(nc.link, nc.address); err != nil {
return errors.Wrap(err, "could not delete ip")
}

return nil
}

raft

实现了 IP 设置相关,我们不需要提供 HTTP 接口,直接编写 Raft 相关实现,跟 hraftd 实现不同,在 hraftd 中需要进行信息写入读取,而我们的 VIP 仅依赖于 Raft 选举 Leader,所以只需要编写好对应的方法,不需要做额外操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
type FSM struct {
}

func (fsm FSM) Apply(log *raft.Log) interface{} {
return nil
}

func (fsm FSM) Restore(snap io.ReadCloser) error {
return nil
}

func (fsm FSM) Snapshot() (raft.FSMSnapshot, error) {
return Snapshot{}, nil
}

type Snapshot struct {
}

func (snapshot Snapshot) Persist(sink raft.SnapshotSink) error {
return nil
}

func (snapshot Snapshot) Release() {
}

serve

基础方法都已经实现了,那么接下来编写集群启动逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
func (manager *VIPManager) Start() error {
// 初始化 raft 配置、存储对象、通信
for id, ip := range manager.peers {
configuration.Servers = append(configuration.Servers, raft.Server{ID: raft.ServerID(id), Address: raft.ServerAddress(ip)})
}

// 启动 Raft 集群,这里与 hraftd 不同,需要注意
if error := raft.BootstrapCluster(config, logStore, stableStore, snapshots, transport, configuration); error != nil {
return error
}

// 创建 raft 实例
raftServer, error := raft.NewRaft(config, manager.fsm, logStore, stableStore, snapshots, transport)
...
ticker := time.NewTicker(time.Second)
isLeader := false
// 服务启动时先删除 VIP,防止集群中同时存在节点都配置了 VIP
manager.deleteIP(false)
go func() {
for {
select {
// 如果 当前节点是 Leader 节点,则设置 VIP
case leader := <-raftServer.LeaderCh():
if leader {
isLeader = true
log.Info("Leading")
manager.addIP(true)

}
// 定时检测,如果是 Leader,则检测 VIP 是否正确设置,如果没有就再次配置 VIP
case <-ticker.C:
if isLeader {
result, error := manager.networkConfigurator.IsSet()
if error != nil {
log.WithFields(log.Fields{"error": error, "ip": manager.networkConfigurator.IP(), "interface": manager.networkConfigurator.Interface()}).Error("Could not check ip")
}

if result == false {
log.Error("Lost IP")

manager.addIP(true)
}
}
...
}
}
}()
}

这里与 hraftd 的实现不同,hraftd 先实例 raft,然后使用该 raft 实例启动集群,这样做的好处是哪怕集群只有一个节点,就是第一个节点,那么集群也是可以正常工作的,坏处是集群启动顺序是固定的,必须要先启动第一个节点,然后其他节点通过 Join 请求添加到 Raft 集群中(我们忽略了 Join 的走读)。

重新想一下我们的需求:集群、高可用、故障。当这几个词放在一起,我们就知道 hraftd 的方法不适合我们,有以下原因:

  1. 集群节点启动顺序要求
  2. 各个节点配置文件不同,有的需要 Join 参数

所以我们是直接使用 raft.BootstrapCluster 来启动集群,虽然只有一个节点集群无法正常工作,但是这个是可以容忍的。

ARP

在实现完上述功能后,我以为大功告成了,就开始自测,但是测试过程中发现了很诡异的现象,虽然我们通过 Raft 自身选举实现了 VIP 的故障自动漂移,但是实际测试中发现业务访问随着 VIP 的重建并没有立即恢复,,检查 ARP 记录发现集群中各个节点关于 VIP 的 ARP 记录各不相同,甚至是完全不同。

我们来重温下 ARP 协议内容:

地址解析协议(英語:Address Resolution Protocol,缩写:ARP)是一个通过解析网络层地址来找寻数据链路层地址的网络传输协议,它在IPv4中极其重要。ARP最初在1982年的RFC 826(征求意见稿)[1]中提出并纳入互联网标准 STD 37. ARP 也可能指是在多数操作系统中管理其相关地址的一个进程。

在以太网协议中规定,同一局域网中的一台主机要和另一台主机进行直接通信,必须要知道目标主机的MAC地址。而在TCP/IP协议中,网络层和传输层只关心目标主机的IP地址。这就导致在以太网中使用IP协议时,数据链路层的以太网协议接到上层IP协议提供的数据中,只包含目的主机的IP地址。于是需要一种方法,根据目的主机的IP地址,获得其MAC地址。这就是ARP协议要做的事情。所谓地址解析(address resolution)就是主机在发送帧前将目标IP地址转换成目标MAC地址的过程。

如果 VIP 与真实节点对应的 MAC 地址不同,就相当于 ARP 攻击了,所以我们的 Leader 节点设置完 VIP 后,还需要发送 ARP 请求广播,告诉广播域中的其他节点 VIP 正确的 MAC 地址。采用的方式是 gratuitous ARP(免费 ARP)。这里我们直接找一个开源的 ARP 实现来完成这个需求:

google/seesaw 是一个负载均衡器,里面实现了这个功能:

1
2
3
4
5
6
7
8
9
10
11
12
13
// ARPSendGratuitous sends a gratuitous ARP message via the specified interface.
func (ncc *SeesawNCC) ARPSendGratuitous(arp *ncctypes.ARPGratuitous, out *int) error {
iface, err := net.InterfaceByName(arp.IfaceName)
if err != nil {
return fmt.Errorf("failed to get interface %q: %v", arp.IfaceName, err)
}
log.V(2).Infof("Sending gratuitous ARP for %s (%s) via %s", arp.IP, iface.HardwareAddr, iface.Name)
m, err := gratuitousARPReply(arp.IP, iface.HardwareAddr)
if err != nil {
return err
}
return sendARP(iface, m)
}

总结

使用 hashicorp/raft 可以很简单方便的实现需要选举 Leader 的分布式应用,虽然我司的 Slogan 是 Make IT Simple ,但是愈发感觉 Hashicorp 才是这句话的忠实体现,他们的 Terraform、Vault、Consul、Nomad、Vagrant 等软件,都是让基础设施的适用与管理更简单方便的,还是很爽的。

本文的具体实现在 sparrow 可以看到。

参考链接