访问etcd首先要创建client,它需要传入一个Config配置.
Endpoints:etcd的多个节点服务地址。 DialTimeout:创建client的首次连接超时时间,这里传了5秒,如果5秒都没有连接成功就会返回err;一旦client创建成功,不用再关心后续底层连接的状态了,client内部会重连。
cli,err := clientv3.New(clientv3.Config{ Endpoints:[]string{"localhost:2379"}, DialTimeout: 5 * time.Second, })返回的 client,它的类型具体如下:
type Client struct { Cluster KV Lease Watcher Auth Maintenance Username string Password string }类型中的成员是etcd客户端几何核心功能模块的具体实现:
Cluster:向集群里增加删除etcd节点,集群管理
KV:K-V键值库
Lease:租约相关操作,租约过期会删除授权的key
Watcher:观察订阅,从而监听最新的数据变化
Auth:管理etcd的用户和权限,属于管理员操作
Maintenance:维护etcd,比如主动迁移etcd的leader节点
kv对象实例获取 kv := clientev3.NewKV(client)
存储Put
putResp, err := kv.Put(context.TODO(),"/key/example", "hello")Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error)
参数介绍: ctx: Context包对象,是用来跟踪上下文的,列如超时控制 key: 存储对象的key val: 存储对象的value opts: 可变参数,额外选项
查询Get getResp, err := kv.Get(context.TODO(), "/key/example")
删除Delete delREsp, err := kv.Delete(context.TODO(), "/key/example")
WithPrefix
rangeResp, err := kv.Get(context.TODO(), "/key/example", clientv3.WithPrefix())WithPrefix()是指查找以/key/example为前缀的所有key
etcd中事务是原子执行的,针对kv存储操作的if … then … else结构.将请求归并到一起放在原子块中.事务可以保护key不受其他并发更新操作的影响.
etcd的实现方式有些不同,它的事务是基于cas方式实现的。在事务执行过程中,client和etcd之间没有维护事务会话,在commit事务时,它的“冲突判断(If)和执行过程Then/Else”一次性提交给etcd,etcd来作为一个原子过程来执行“If-Then-Else”。所以,etcd事务不会发生阻塞,无论成功,还是失败,都会立即返回,需要应用进行失败(发生冲突)重试。因此,这也就要求业务代码是可重试的。
type Txn interface { // If takes a list of comparison. If all comparisons passed in succeed, // the operations passed into Then() will be executed. Or the operations // passed into Else() will be executed. If(cs ...Cmp) Txn // Then takes a list of operations. The Ops list will be executed, if the // comparisons passed in If() succeed. Then(ops ...Op) Txn // Else takes a list of operations. The Ops list will be executed, if the // comparisons passed in If() fail. Else(ops ...Op) Txn // Commit tries to commit the transaction. Commit() (*TxnResponse, error) } 通过kv对象开启一个事务if then else 书写事务最后Commit提交整个Txn事务 txn := kv.Txn(context.TODO()) txnResp, err := txn.If(clientv3.Compare(clientv3.Value("/hi"), "=","hello")). Then(clientv3.OpGet("/hi")). Else(clientv3.OpGet("/test/",clientv3.WithPrefix())). Commit() if txnResp.Succeeded { // If = true fmt.Println("~~~", txnResp.Responses[0].GetResponseRange().Kvs) } else { // If =false fmt.Println("!!!", txnResp.Responses[0].GetResponseRange().Kvs) }这个Value(“/hi”)返回的Cmp表达了:“/hi这个key对应的value” 利用Compare函数来继续为”主语”增加描述, 形成完整的条件语句:/hi这个key对应的value必须等于"hello"
watch接口
type Watcher interface { // Watch watches on a key or prefix. The watched events will be returned // through the returned channel. // If the watch is slow or the required rev is compacted, the watch request // might be canceled from the server-side and the chan will be closed. // 'opts' can be: 'WithRev' and/or 'WithPrefix'. Watch(ctx context.Context, key string, opts ...OpOption) WatchChan // Close closes the watcher and cancels all watch requests. Close() error }Watch 方法返回一个WatchChan 类似的变量,该通道传递WatchResponse类型
//创建监听 wc := client.Watch(context.TODO(), "/job/v3/1", clientv3.WithPrevKV()) //range 监听事件 for v := range wc { for _, e := range v.Events { log.Printf("type:%v kv:%v val:%v prevKey:%v \n ", e.Type, string(e.Kv.Key),e.Kv.Value, e.PrevKv) } }