package cluster
import (
"context"
"encoding/json"
"fmt"
"strings"
"sync"
"time"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/clientv3/namespace"
"github.com/topfreegames/pitaya/config"
"github.com/topfreegames/pitaya/constants"
"github.com/topfreegames/pitaya/logger"
"github.com/topfreegames/pitaya/util"
)
type etcdServiceDiscovery
struct {
cli
*clientv3
.Client
config
*config
.Config
syncServersInterval time
.Duration
heartbeatTTL time
.Duration
logHeartbeat
bool
lastHeartbeatTime time
.Time
leaseID clientv3
.LeaseID
mapByTypeLock sync
.RWMutex
serverMapByType
map[string]map[string]*Server
serverMapByID sync
.Map
etcdEndpoints
[]string
etcdPrefix
string
etcdDialTimeout time
.Duration
running
bool
server
*Server
stopChan
chan bool
stopLeaseChan
chan bool
lastSyncTime time
.Time
listeners
[]SDListener
revokeTimeout time
.Duration
grantLeaseTimeout time
.Duration
grantLeaseMaxRetries
int
grantLeaseInterval time
.Duration
shutdownDelay time
.Duration
appDieChan
chan bool
}
func NewEtcdServiceDiscovery(
config
*config
.Config
,
server
*Server
,
appDieChan
chan bool,
cli
...*clientv3
.Client
,
) (ServiceDiscovery
, error) {
var client
*clientv3
.Client
if len(cli
) > 0 {
client
= cli
[0]
}
sd
:= &etcdServiceDiscovery
{
config
: config
,
running
: false,
server
: server
,
serverMapByType
: make(map[string]map[string]*Server
),
listeners
: make([]SDListener
, 0),
stopChan
: make(chan bool),
stopLeaseChan
: make(chan bool),
appDieChan
: appDieChan
,
cli
: client
,
}
sd
.configure()
return sd
, nil
}
func (sd
*etcdServiceDiscovery
) configure() {
sd
.etcdEndpoints
= sd
.config
.GetStringSlice("pitaya.cluster.sd.etcd.endpoints")
sd
.etcdDialTimeout
= sd
.config
.GetDuration("pitaya.cluster.sd.etcd.dialtimeout")
sd
.etcdPrefix
= sd
.config
.GetString("pitaya.cluster.sd.etcd.prefix")
sd
.heartbeatTTL
= sd
.config
.GetDuration("pitaya.cluster.sd.etcd.heartbeat.ttl")
sd
.logHeartbeat
= sd
.config
.GetBool("pitaya.cluster.sd.etcd.heartbeat.log")
sd
.syncServersInterval
= sd
.config
.GetDuration("pitaya.cluster.sd.etcd.syncservers.interval")
sd
.revokeTimeout
= sd
.config
.GetDuration("pitaya.cluster.sd.etcd.revoke.timeout")
sd
.grantLeaseTimeout
= sd
.config
.GetDuration("pitaya.cluster.sd.etcd.grantlease.timeout")
sd
.grantLeaseMaxRetries
= sd
.config
.GetInt("pitaya.cluster.sd.etcd.grantlease.maxretries")
sd
.grantLeaseInterval
= sd
.config
.GetDuration("pitaya.cluster.sd.etcd.grantlease.retryinterval")
sd
.shutdownDelay
= sd
.config
.GetDuration("pitaya.cluster.sd.etcd.shutdown.delay")
}
func (sd
*etcdServiceDiscovery
) watchLeaseChan(c
<-chan *clientv3
.LeaseKeepAliveResponse
) {
failedGrantLeaseAttempts
:= 0
for {
select {
case <-sd
.stopChan
:
return
case <-sd
.stopLeaseChan
:
return
case leaseKeepAliveResponse
:= <-c
:
if leaseKeepAliveResponse
!= nil {
if sd
.logHeartbeat
{
logger
.Log
.Debugf("sd: etcd lease %x renewed", leaseKeepAliveResponse
.ID
)
}
failedGrantLeaseAttempts
= 0
continue
}
logger
.Log
.Warn("sd: error renewing etcd lease, reconfiguring")
for {
err
:= sd
.renewLease()
if err
!= nil {
failedGrantLeaseAttempts
= failedGrantLeaseAttempts
+ 1
if err
== constants
.ErrEtcdGrantLeaseTimeout
{
logger
.Log
.Warn("sd: timed out trying to grant etcd lease")
if sd
.appDieChan
!= nil {
sd
.appDieChan
<- true
}
return
}
if failedGrantLeaseAttempts
>= sd
.grantLeaseMaxRetries
{
logger
.Log
.Warn("sd: exceeded max attempts to renew etcd lease")
if sd
.appDieChan
!= nil {
sd
.appDieChan
<- true
}
return
}
logger
.Log
.Warnf("sd: error granting etcd lease, will retry in %d seconds", uint64(sd
.grantLeaseInterval
.Seconds()))
time
.Sleep(sd
.grantLeaseInterval
)
continue
}
return
}
}
}
}
func (sd
*etcdServiceDiscovery
) renewLease() error {
c
:= make(chan error)
go func() {
defer close(c
)
logger
.Log
.Infof("waiting for etcd lease")
err
:= sd
.grantLease()
if err
!= nil {
c
<- err
return
}
err
= sd
.bootstrapServer(sd
.server
)
c
<- err
}()
select {
case err
:= <-c
:
return err
case <-time
.After(sd
.grantLeaseTimeout
):
return constants
.ErrEtcdGrantLeaseTimeout
}
}
func (sd
*etcdServiceDiscovery
) grantLease() error {
l
, err
:= sd
.cli
.Grant(context
.TODO(), int64(sd
.heartbeatTTL
.Seconds()))
if err
!= nil {
return err
}
sd
.leaseID
= l
.ID
logger
.Log
.Debugf("sd: got leaseID: %x", l
.ID
)
c
, err
:= sd
.cli
.KeepAlive(context
.TODO(), sd
.leaseID
)
if err
!= nil {
return err
}
<-c
go sd
.watchLeaseChan(c
)
return nil
}
func (sd
*etcdServiceDiscovery
) addServerIntoEtcd(server
*Server
) error {
_, err
:= sd
.cli
.Put(
context
.TODO(),
getKey(server
.ID
, server
.Type
),
server
.AsJSONString(),
clientv3
.WithLease(sd
.leaseID
),
)
return err
}
func (sd
*etcdServiceDiscovery
) bootstrapServer(server
*Server
) error {
if err
:= sd
.addServerIntoEtcd(server
); err
!= nil {
return err
}
sd
.SyncServers()
return nil
}
func (sd
*etcdServiceDiscovery
) AddListener(listener SDListener
) {
sd
.listeners
= append(sd
.listeners
, listener
)
}
func (sd
*etcdServiceDiscovery
) AfterInit() {
}
func (sd
*etcdServiceDiscovery
) notifyListeners(act Action
, sv
*Server
) {
for _, l
:= range sd
.listeners
{
if act
== DEL
{
l
.RemoveServer(sv
)
} else if act
== ADD
{
l
.AddServer(sv
)
}
}
}
func (sd
*etcdServiceDiscovery
) writeLockScope(f
func()) {
sd
.mapByTypeLock
.Lock()
defer sd
.mapByTypeLock
.Unlock()
f()
}
func (sd
*etcdServiceDiscovery
) deleteServer(serverID
string) {
if actual
, ok
:= sd
.serverMapByID
.Load(serverID
); ok
{
sv
:= actual
.(*Server
)
sd
.serverMapByID
.Delete(sv
.ID
)
sd
.writeLockScope(func() {
if svMap
, ok
:= sd
.serverMapByType
[sv
.Type
]; ok
{
delete(svMap
, sv
.ID
)
}
})
sd
.notifyListeners(DEL
, sv
)
}
}
func (sd
*etcdServiceDiscovery
) deleteLocalInvalidServers(actualServers
[]string) {
sd
.serverMapByID
.Range(func(key
interface{}, value
interface{}) bool {
k
:= key
.(string)
if !util
.SliceContainsString(actualServers
, k
) {
logger
.Log
.Warnf("deleting invalid local server %s", k
)
sd
.deleteServer(k
)
}
return true
})
}
func getKey(serverID
, serverType
string) string {
return fmt
.Sprintf("servers/%s/%s", serverType
, serverID
)
}
func (sd
*etcdServiceDiscovery
) getServerFromEtcd(serverType
, serverID
string) (*Server
, error) {
svKey
:= getKey(serverID
, serverType
)
svEInfo
, err
:= sd
.cli
.Get(context
.TODO(), svKey
)
if err
!= nil {
return nil, fmt
.Errorf("error getting server: %s from etcd, error: %s", svKey
, err
.Error())
}
if len(svEInfo
.Kvs
) == 0 {
return nil, fmt
.Errorf("didn't found server: %s in etcd", svKey
)
}
return parseServer(svEInfo
.Kvs
[0].Value
)
}
func (sd
*etcdServiceDiscovery
) GetServersByType(serverType
string) (map[string]*Server
, error) {
sd
.mapByTypeLock
.RLock()
defer sd
.mapByTypeLock
.RUnlock()
if m
, ok
:= sd
.serverMapByType
[serverType
]; ok
&& len(m
) > 0 {
ret
:= make(map[string]*Server
, len(sd
.serverMapByType
))
for k
, v
:= range sd
.serverMapByType
[serverType
] {
ret
[k
] = v
}
return ret
, nil
}
return nil, constants
.ErrNoServersAvailableOfType
}
func (sd
*etcdServiceDiscovery
) GetServers() []*Server
{
ret
:= make([]*Server
, 0)
sd
.serverMapByID
.Range(func(k
, v
interface{}) bool {
ret
= append(ret
, v
.(*Server
))
return true
})
return ret
}
func (sd
*etcdServiceDiscovery
) bootstrap() error {
if err
:= sd
.grantLease(); err
!= nil {
return err
}
if err
:= sd
.bootstrapServer(sd
.server
); err
!= nil {
return err
}
return nil
}
func (sd
*etcdServiceDiscovery
) GetServer(id
string) (*Server
, error) {
if sv
, ok
:= sd
.serverMapByID
.Load(id
); ok
{
return sv
.(*Server
), nil
}
return nil, constants
.ErrNoServerWithID
}
func (sd
*etcdServiceDiscovery
) Init() error {
sd
.running
= true
var cli
*clientv3
.Client
var err
error
if sd
.cli
== nil {
cli
, err
= clientv3
.New(clientv3
.Config
{
Endpoints
: sd
.etcdEndpoints
,
DialTimeout
: sd
.etcdDialTimeout
,
})
if err
!= nil {
return err
}
sd
.cli
= cli
}
sd
.cli
.KV
= namespace
.NewKV(sd
.cli
.KV
, sd
.etcdPrefix
)
sd
.cli
.Watcher
= namespace
.NewWatcher(sd
.cli
.Watcher
, sd
.etcdPrefix
)
sd
.cli
.Lease
= namespace
.NewLease(sd
.cli
.Lease
, sd
.etcdPrefix
)
if err
= sd
.bootstrap(); err
!= nil {
return err
}
syncServersTicker
:= time
.NewTicker(sd
.syncServersInterval
)
go func() {
for sd
.running
{
select {
case <-syncServersTicker
.C
:
err
:= sd
.SyncServers()
if err
!= nil {
logger
.Log
.Errorf("error resyncing servers: %s", err
.Error())
}
case <-sd
.stopChan
:
return
}
}
}()
go sd
.watchEtcdChanges()
return nil
}
func parseEtcdKey(key
string) (string, string, error) {
splittedServer
:= strings
.Split(key
, "/")
if len(splittedServer
) != 3 {
return "", "", fmt
.Errorf("error parsing etcd key %s (server name can't contain /)", key
)
}
svType
:= splittedServer
[1]
svID
:= splittedServer
[2]
return svType
, svID
, nil
}
func parseServer(value
[]byte) (*Server
, error) {
var sv
*Server
err
:= json
.Unmarshal(value
, &sv
)
if err
!= nil {
logger
.Log
.Warnf("failed to load server %s, error: %s", sv
, err
.Error())
}
return sv
, nil
}
func (sd
*etcdServiceDiscovery
) printServers() {
sd
.mapByTypeLock
.RLock()
defer sd
.mapByTypeLock
.RUnlock()
for k
, v
:= range sd
.serverMapByType
{
logger
.Log
.Debugf("type: %s, servers: %+v", k
, v
)
}
}
func (sd
*etcdServiceDiscovery
) SyncServers() error {
keys
, err
:= sd
.cli
.Get(
context
.TODO(),
"servers/",
clientv3
.WithPrefix(),
clientv3
.WithKeysOnly(),
)
if err
!= nil {
return err
}
allIds
:= make([]string, 0)
for _, kv
:= range keys
.Kvs
{
svType
, svID
, err
:= parseEtcdKey(string(kv
.Key
))
if err
!= nil {
logger
.Log
.Warnf("failed to parse etcd key %s, error: %s", kv
.Key
, err
.Error())
}
allIds
= append(allIds
, svID
)
if _, ok
:= sd
.serverMapByID
.Load(svID
); !ok
{
logger
.Log
.Debugf("loading info from missing server: %s/%s", svType
, svID
)
sv
, err
:= sd
.getServerFromEtcd(svType
, svID
)
if err
!= nil {
logger
.Log
.Errorf("error getting server from etcd: %s, error: %s", svID
, err
.Error())
continue
}
sd
.addServer(sv
)
}
}
sd
.deleteLocalInvalidServers(allIds
)
sd
.printServers()
sd
.lastSyncTime
= time
.Now()
return nil
}
func (sd
*etcdServiceDiscovery
) BeforeShutdown() {
sd
.revoke()
time
.Sleep(sd
.shutdownDelay
)
}
func (sd
*etcdServiceDiscovery
) Shutdown() error {
sd
.running
= false
close(sd
.stopChan
)
return nil
}
func (sd
*etcdServiceDiscovery
) revoke() error {
close(sd
.stopLeaseChan
)
c
:= make(chan error)
defer close(c
)
go func() {
logger
.Log
.Debug("waiting for etcd revoke")
_, err
:= sd
.cli
.Revoke(context
.TODO(), sd
.leaseID
)
c
<- err
logger
.Log
.Debug("finished waiting for etcd revoke")
}()
select {
case err
:= <-c
:
return err
case <-time
.After(sd
.revokeTimeout
):
logger
.Log
.Warn("timed out waiting for etcd revoke")
return nil
}
}
func (sd
*etcdServiceDiscovery
) addServer(sv
*Server
) {
if _, loaded
:= sd
.serverMapByID
.LoadOrStore(sv
.ID
, sv
); !loaded
{
sd
.writeLockScope(func() {
mapSvByType
, ok
:= sd
.serverMapByType
[sv
.Type
]
if !ok
{
mapSvByType
= make(map[string]*Server
)
sd
.serverMapByType
[sv
.Type
] = mapSvByType
}
mapSvByType
[sv
.ID
] = sv
})
if sv
.ID
!= sd
.server
.ID
{
sd
.notifyListeners(ADD
, sv
)
}
}
}
func (sd
*etcdServiceDiscovery
) watchEtcdChanges() {
w
:= sd
.cli
.Watch(context
.Background(), "servers/", clientv3
.WithPrefix())
go func(chn clientv3
.WatchChan
) {
for sd
.running
{
select {
case wResp
:= <-chn
:
for _, ev
:= range wResp
.Events
{
switch ev
.Type
{
case clientv3
.EventTypePut
:
var sv
*Server
var err
error
if sv
, err
= parseServer(ev
.Kv
.Value
); err
!= nil {
logger
.Log
.Errorf("Failed to parse server from etcd: %v", err
)
continue
}
sd
.addServer(sv
)
logger
.Log
.Debugf("server %s added", ev
.Kv
.Key
)
sd
.printServers()
case clientv3
.EventTypeDelete
:
_, svID
, err
:= parseEtcdKey(string(ev
.Kv
.Key
))
if err
!= nil {
logger
.Log
.Warnf("failed to parse key from etcd: %s", ev
.Kv
.Key
)
continue
}
sd
.deleteServer(svID
)
logger
.Log
.Debugf("server %s deleted", svID
)
sd
.printServers()
}
}
case <-sd
.stopChan
:
return
}
}
}(w
)
}