pitaya框架中etcd实现服务发现源码注释

mac2025-09-16  7

package cluster import ( "context" "encoding/json" "fmt" "strings" "sync" "time" "github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/clientv3/namespace" "github.com/topfreegames/pitaya/config" "github.com/topfreegames/pitaya/constants" "github.com/topfreegames/pitaya/logger" "github.com/topfreegames/pitaya/util" ) //ETCD实现服务发现的核心思想: key存储、 租约、 key监听 // 本地使用一个结构Server保存服务的信息 id type // 将id和type组装成key server进行Json序列化 // 连接ETCD生成etcd client // 生成一个租约设置租约时间并且对租约keepalive,同时对租约续租响应通道进行监听(如果keepalive失败则重新一遍本部及下一步的流程) // 同时注册本地server到ETCD授权租约之后同步ETCD上的server到本地 // 开启定时器定期同步ETCD上服务器信息(get key) // 使用key监听的方式 监听ETCD上传来的Server的添加或者删除用来更新本地的信息 //server想etcd注册一个key(type/id)写入自身的信息,同时开启定时器获取ETCD上的server同步到本地, //监听ETCD上指定前缀的key用来监听ETCD上的服务端的变化同时同步到本地 type etcdServiceDiscovery struct { cli *clientv3.Client //etcd v3客户端 config *config.Config //viper配置 syncServersInterval time.Duration //servers 同步时间间隔 heartbeatTTL time.Duration //心跳间隔 logHeartbeat bool //是否日志心跳 lastHeartbeatTime time.Time //上次心跳 leaseID clientv3.LeaseID //租约id mapByTypeLock sync.RWMutex //读写所 serverMapByType map[string]map[string]*Server //类型 server映射 serverMapByID sync.Map //同步的map etcdEndpoints []string //etcd 节点 etcdPrefix string etcdDialTimeout time.Duration running bool server *Server //本机服务 stopChan chan bool stopLeaseChan chan bool lastSyncTime time.Time //上次同步 事件 listeners []SDListener //服务发现监听 revokeTimeout time.Duration //撤销超时 grantLeaseTimeout time.Duration //设置租约时间超时 grantLeaseMaxRetries int grantLeaseInterval time.Duration shutdownDelay time.Duration appDieChan chan bool } // NewEtcdServiceDiscovery ctor func NewEtcdServiceDiscovery( config *config.Config, server *Server, appDieChan chan bool, cli ...*clientv3.Client, ) (ServiceDiscovery, error) { var client *clientv3.Client if len(cli) > 0 { client = cli[0] } sd := &etcdServiceDiscovery{ config: config, running: false, server: server, serverMapByType: make(map[string]map[string]*Server), listeners: make([]SDListener, 0), stopChan: make(chan bool), stopLeaseChan: make(chan bool), appDieChan: appDieChan, cli: client, } sd.configure() return sd, nil } // etcdServiceDiscovery 设置服务发现的配置 func (sd *etcdServiceDiscovery) configure() { sd.etcdEndpoints = sd.config.GetStringSlice("pitaya.cluster.sd.etcd.endpoints") //逗号分隔的etcd端点列表 sd.etcdDialTimeout = sd.config.GetDuration("pitaya.cluster.sd.etcd.dialtimeout") //拨号超时值传递给服务发现etcd客户端 sd.etcdPrefix = sd.config.GetString("pitaya.cluster.sd.etcd.prefix") //避免不同的pitaya冲突 服务器必须具有相同的前缀才能相互看到 sd.heartbeatTTL = sd.config.GetDuration("pitaya.cluster.sd.etcd.heartbeat.ttl") //etcd租约的心跳间隔 sd.logHeartbeat = sd.config.GetBool("pitaya.cluster.sd.etcd.heartbeat.log") //是否应在调试模式下记录etcd心跳 sd.syncServersInterval = sd.config.GetDuration("pitaya.cluster.sd.etcd.syncservers.interval") //服务发现模块执行的服务器同步之间的间隔 sd.revokeTimeout = sd.config.GetDuration("pitaya.cluster.sd.etcd.revoke.timeout") //etcd的撤销功能超时 sd.grantLeaseTimeout = sd.config.GetDuration("pitaya.cluster.sd.etcd.grantlease.timeout") //etcd租期超时 sd.grantLeaseMaxRetries = sd.config.GetInt("pitaya.cluster.sd.etcd.grantlease.maxretries") //etcd授予租约的最大尝试次数 sd.grantLeaseInterval = sd.config.GetDuration("pitaya.cluster.sd.etcd.grantlease.retryinterval") //每次授予租约尝试之间的间隔 sd.shutdownDelay = sd.config.GetDuration("pitaya.cluster.sd.etcd.shutdown.delay") //从服务发现注销后等待关闭的时间 } // watchLeaseChan 监听续租的相应管道LeaseKeepAliveResponse 会收到续租相应的事件 func (sd *etcdServiceDiscovery) watchLeaseChan(c <-chan *clientv3.LeaseKeepAliveResponse) { failedGrantLeaseAttempts := 0 for { select { case <-sd.stopChan: //服务发现停止 return case <-sd.stopLeaseChan: //是否停止续住监听 return case leaseKeepAliveResponse := <-c: //从续租响应管道获取到通知事件 if leaseKeepAliveResponse != nil { if sd.logHeartbeat { logger.Log.Debugf("sd: etcd lease %x renewed", leaseKeepAliveResponse.ID) } failedGrantLeaseAttempts = 0 continue } logger.Log.Warn("sd: error renewing etcd lease, reconfiguring") for { err := sd.renewLease() if err != nil { failedGrantLeaseAttempts = failedGrantLeaseAttempts + 1 if err == constants.ErrEtcdGrantLeaseTimeout { logger.Log.Warn("sd: timed out trying to grant etcd lease") if sd.appDieChan != nil { sd.appDieChan <- true } return } if failedGrantLeaseAttempts >= sd.grantLeaseMaxRetries { logger.Log.Warn("sd: exceeded max attempts to renew etcd lease") if sd.appDieChan != nil { sd.appDieChan <- true } return } logger.Log.Warnf("sd: error granting etcd lease, will retry in %d seconds", uint64(sd.grantLeaseInterval.Seconds())) time.Sleep(sd.grantLeaseInterval) continue } return } } } } // renewLease reestablishes connection with etcd 创新生成租约重新启动后服务 func (sd *etcdServiceDiscovery) renewLease() error { c := make(chan error) go func() { defer close(c) logger.Log.Infof("waiting for etcd lease") err := sd.grantLease() if err != nil { c <- err return } err = sd.bootstrapServer(sd.server) c <- err }() select { case err := <-c: return err case <-time.After(sd.grantLeaseTimeout): return constants.ErrEtcdGrantLeaseTimeout } } // grantLease创建租约并且续租同时监听续租的相应管道 func (sd *etcdServiceDiscovery) grantLease() error { // 创建一个租约到期时间即心跳时间 l, err := sd.cli.Grant(context.TODO(), int64(sd.heartbeatTTL.Seconds())) if err != nil { return err } //记录租约id sd.leaseID = l.ID logger.Log.Debugf("sd: got leaseID: %x", l.ID) // this will keep alive forever, when channel c is closed // it means we probably have to rebootstrap the lease //租约定期续租保持alive 返回租约续租相应管道 c, err := sd.cli.KeepAlive(context.TODO(), sd.leaseID) if err != nil { return err } // need to receive here as per etcd docs //读取管道的值 <-c go sd.watchLeaseChan(c) return nil } //服务注册 将Server的type和id组装成key 序列化信息做为value将 kv写入etcd func (sd *etcdServiceDiscovery) addServerIntoEtcd(server *Server) error { _, err := sd.cli.Put( context.TODO(), getKey(server.ID, server.Type), //servers/serverType/serverID 组装key server.AsJSONString(), //server的信息序列化为json数据 clientv3.WithLease(sd.leaseID), //给key授权租约 ) return err } // bootstrapServer 启动服务 func (sd *etcdServiceDiscovery) bootstrapServer(server *Server) error { //注册服务 将服务信息作为kv写入etcd同时授权租约 if err := sd.addServerIntoEtcd(server); err != nil { return err } //湖区etcd上的注册的服务器信息对本地保存的服务器信息进行同步 sd.SyncServers() return nil } // AddListener adds a listener to etcd service discovery func (sd *etcdServiceDiscovery) AddListener(listener SDListener) { sd.listeners = append(sd.listeners, listener) } // AfterInit executes after Init func (sd *etcdServiceDiscovery) AfterInit() { } // notifyListeners 添加删除服务器时进行监听通知 func (sd *etcdServiceDiscovery) notifyListeners(act Action, sv *Server) { for _, l := range sd.listeners { if act == DEL { l.RemoveServer(sv) } else if act == ADD { l.AddServer(sv) } } } // 加锁执行f func (sd *etcdServiceDiscovery) writeLockScope(f func()) { sd.mapByTypeLock.Lock() defer sd.mapByTypeLock.Unlock() f() } //根据id删除本地的server信息记录 func (sd *etcdServiceDiscovery) deleteServer(serverID string) { if actual, ok := sd.serverMapByID.Load(serverID); ok { sv := actual.(*Server) sd.serverMapByID.Delete(sv.ID) sd.writeLockScope(func() { if svMap, ok := sd.serverMapByType[sv.Type]; ok { delete(svMap, sv.ID) } }) sd.notifyListeners(DEL, sv) } } func (sd *etcdServiceDiscovery) deleteLocalInvalidServers(actualServers []string) { sd.serverMapByID.Range(func(key interface{}, value interface{}) bool { k := key.(string) if !util.SliceContainsString(actualServers, k) { logger.Log.Warnf("deleting invalid local server %s", k) sd.deleteServer(k) } return true }) } // getKey 类型和id组装成key servers/serverType/serverID func getKey(serverID, serverType string) string { return fmt.Sprintf("servers/%s/%s", serverType, serverID) } // getServerFromEtcd 查询etcd上的server序列化信息 func (sd *etcdServiceDiscovery) getServerFromEtcd(serverType, serverID string) (*Server, error) { svKey := getKey(serverID, serverType) svEInfo, err := sd.cli.Get(context.TODO(), svKey) if err != nil { return nil, fmt.Errorf("error getting server: %s from etcd, error: %s", svKey, err.Error()) } if len(svEInfo.Kvs) == 0 { return nil, fmt.Errorf("didn't found server: %s in etcd", svKey) } return parseServer(svEInfo.Kvs[0].Value) } // GetServersByType returns a slice with all the servers of a certain type func (sd *etcdServiceDiscovery) GetServersByType(serverType string) (map[string]*Server, error) { sd.mapByTypeLock.RLock() defer sd.mapByTypeLock.RUnlock() if m, ok := sd.serverMapByType[serverType]; ok && len(m) > 0 { // Create a new map to avoid concurrent read and write access to the // map, this also prevents accidental changes to the list of servers // kept by the service discovery. ret := make(map[string]*Server, len(sd.serverMapByType)) for k, v := range sd.serverMapByType[serverType] { ret[k] = v } return ret, nil } return nil, constants.ErrNoServersAvailableOfType } // GetServers returns a slice with all the servers func (sd *etcdServiceDiscovery) GetServers() []*Server { ret := make([]*Server, 0) sd.serverMapByID.Range(func(k, v interface{}) bool { ret = append(ret, v.(*Server)) return true }) return ret } func (sd *etcdServiceDiscovery) bootstrap() error { //生成租约 监听续租响应管道 if err := sd.grantLease(); err != nil { return err } //将sd.server信息写入ETCD同时授权租约,之后同步ETCD 上的Server到本地 if err := sd.bootstrapServer(sd.server); err != nil { return err } return nil } // GetServer returns a server given it's id func (sd *etcdServiceDiscovery) GetServer(id string) (*Server, error) { if sv, ok := sd.serverMapByID.Load(id); ok { return sv.(*Server), nil } return nil, constants.ErrNoServerWithID } // Init starts the service discovery client 注册模块儿的时候回进行调用 func (sd *etcdServiceDiscovery) Init() error { sd.running = true var cli *clientv3.Client var err error if sd.cli == nil { //连接ETCD cli, err = clientv3.New(clientv3.Config{ Endpoints: sd.etcdEndpoints, DialTimeout: sd.etcdDialTimeout, }) if err != nil { return err } sd.cli = cli } // namespaced etcd :) Package namespace is a clientv3 wrapper that translates all keys to begin with a given prefix. sd.cli.KV = namespace.NewKV(sd.cli.KV, sd.etcdPrefix) // 重写ectd client interfaces 给所有的key 监听 租约加上前缀 sd.cli.Watcher = namespace.NewWatcher(sd.cli.Watcher, sd.etcdPrefix) //监听 sd.cli.Lease = namespace.NewLease(sd.cli.Lease, sd.etcdPrefix) //租约 //生成租约且设置租约时间并且租约定期续租 同时注册本地server到ETCD授权租约之后同步ETCD上的server到本地 if err = sd.bootstrap(); err != nil { return err } // update servers 开启定时器定期同步服务器信息 syncServersTicker := time.NewTicker(sd.syncServersInterval) go func() { for sd.running { select { case <-syncServersTicker.C: err := sd.SyncServers() if err != nil { logger.Log.Errorf("error resyncing servers: %s", err.Error()) } case <-sd.stopChan: return } } }() //使用key监听的方式 监听ETCD上传来的Server的添加或者删除用来更新本地的信息 go sd.watchEtcdChanges() return nil } func parseEtcdKey(key string) (string, string, error) { splittedServer := strings.Split(key, "/") if len(splittedServer) != 3 { return "", "", fmt.Errorf("error parsing etcd key %s (server name can't contain /)", key) } svType := splittedServer[1] svID := splittedServer[2] return svType, svID, nil } func parseServer(value []byte) (*Server, error) { var sv *Server err := json.Unmarshal(value, &sv) if err != nil { logger.Log.Warnf("failed to load server %s, error: %s", sv, err.Error()) } return sv, nil } func (sd *etcdServiceDiscovery) printServers() { sd.mapByTypeLock.RLock() defer sd.mapByTypeLock.RUnlock() for k, v := range sd.serverMapByType { logger.Log.Debugf("type: %s, servers: %+v", k, v) } } // SyncServers gets all servers from etcd 从etcd获取左右的server对本地保存的server信息进行同步(增删) func (sd *etcdServiceDiscovery) SyncServers() error { //读取servers目录下的所有key keys, err := sd.cli.Get( context.TODO(), "servers/", clientv3.WithPrefix(), clientv3.WithKeysOnly(), ) if err != nil { return err } // delete invalid servers (local ones that are not in etcd) allIds := make([]string, 0) // filter servers I need to grab info 获取所有etcd上的server根据key获取server然后添加到本地 for _, kv := range keys.Kvs { svType, svID, err := parseEtcdKey(string(kv.Key)) //根据etcd的key解析出 svType 和svID if err != nil { logger.Log.Warnf("failed to parse etcd key %s, error: %s", kv.Key, err.Error()) } allIds = append(allIds, svID) // TODO is this slow? if so we can paralellize if _, ok := sd.serverMapByID.Load(svID); !ok { //查看id是否存在 如果本地存在记录 logger.Log.Debugf("loading info from missing server: %s/%s", svType, svID) //根据svType 和svId组装成key获取Etcd上的Server序列化信息 sv, err := sd.getServerFromEtcd(svType, svID) if err != nil { logger.Log.Errorf("error getting server from etcd: %s, error: %s", svID, err.Error()) continue } //将sever添加到本地存储server的两种map sd.addServer(sv) } } //删除ETCD上没有本地存在的server sd.deleteLocalInvalidServers(allIds) sd.printServers() sd.lastSyncTime = time.Now() //记录下同步的时间 return nil } // BeforeShutdown executes before shutting down and will remove the server from the list func (sd *etcdServiceDiscovery) BeforeShutdown() { sd.revoke() time.Sleep(sd.shutdownDelay) // Sleep for a short while to ensure shutdown has propagated } // Shutdown executes on shutdown and will clean etcd func (sd *etcdServiceDiscovery) Shutdown() error { sd.running = false close(sd.stopChan) return nil } // revoke prevents Pitaya from crashing when etcd is not available func (sd *etcdServiceDiscovery) revoke() error { close(sd.stopLeaseChan) c := make(chan error) defer close(c) go func() { logger.Log.Debug("waiting for etcd revoke") _, err := sd.cli.Revoke(context.TODO(), sd.leaseID) c <- err logger.Log.Debug("finished waiting for etcd revoke") }() select { case err := <-c: return err // completed normally case <-time.After(sd.revokeTimeout): logger.Log.Warn("timed out waiting for etcd revoke") return nil // timed out } } // addServer 将Server存入serverMapByID(map[id]*Server) 和 serverMapByType(map[type]map[id]*Server) func (sd *etcdServiceDiscovery) addServer(sv *Server) { if _, loaded := sd.serverMapByID.LoadOrStore(sv.ID, sv); !loaded { //在对serverMapByType进行修改时使用加锁 sd.writeLockScope(func() { mapSvByType, ok := sd.serverMapByType[sv.Type] if !ok { mapSvByType = make(map[string]*Server) sd.serverMapByType[sv.Type] = mapSvByType } mapSvByType[sv.ID] = sv }) if sv.ID != sd.server.ID { //通知监听添加服务器 sd.notifyListeners(ADD, sv) } } } // 监听所有ETCD上的Server的key func (sd *etcdServiceDiscovery) watchEtcdChanges() { //监视etcd上key前缀为servers的key w := sd.cli.Watch(context.Background(), "servers/", clientv3.WithPrefix()) go func(chn clientv3.WatchChan) { for sd.running { //sd在运行 select { case wResp := <-chn: //监听到key有变化 for _, ev := range wResp.Events { switch ev.Type { case clientv3.EventTypePut: //添加或者更新事件 var sv *Server var err error if sv, err = parseServer(ev.Kv.Value); err != nil { logger.Log.Errorf("Failed to parse server from etcd: %v", err) continue } sd.addServer(sv) //添加服务到本地 logger.Log.Debugf("server %s added", ev.Kv.Key) sd.printServers() case clientv3.EventTypeDelete: //删除服务事件 _, svID, err := parseEtcdKey(string(ev.Kv.Key)) if err != nil { logger.Log.Warnf("failed to parse key from etcd: %s", ev.Kv.Key) continue } sd.deleteServer(svID) //删除本地服务 logger.Log.Debugf("server %s deleted", svID) sd.printServers() } } case <-sd.stopChan: //停止 return } } }(w) }
最新回复(0)