Python3 实现 GoBeansDB 集群节点变更以及固定副本平衡

mac2025-12-17  8

目录

一、容灾问题

1、GoBeansproxy 之 sharding 问题❓

2、集群节点变更问题(新增或移除)❓

3、集群内固定副本(N)的平衡问题❓

二、容灾方案

1、Python3 采集 GoBeansproxy 日志路由数据(SET/DELETE)存储到 MySQL

2、Python3 实现 GoBeansDB 集群移除节点后的数据副本平衡

3、以上实例输出结果


一、容灾问题

1、GoBeansproxy 之 sharding 问题❓

自动随机 sharding,只需调整 GoBeansproxy 配置文件 proxy.yaml 提供的 NWR(R ≤ N ≥ W),举个例子即 N=3, R=1, W=1。与 GoBeansproxy 的 route.yaml 配置文件中的 buckets 参数手动分配无关。

N=3, R=1, W=1

N:一份数据(或一个 key-value)最终保证有 3 副本数据一致,根据 W 的配置可能出现短时间内的数据不一致;

R:读时只要从一个节点读到数据就返回(GoBeansproxy 不记录每个 key 具体路由,而是从各个节点去查找,只要从一个节点读到数据就返回);

W:写时需要写到一个节点就表示写成功(GoBeansproxy 不记录写入 key 的具体路由)。

2、集群节点变更问题(新增或移除)❓

新增或移除节点只需更新 route.yaml 配置文件,重启 GoBeansproxy 即可。因为 GoBeansproxy 不记录路由,所以只要保证集群中至少有一份数据(或一个 key-value)即可正常使用。

如果加入的节点本身就有数据,通过集群也可正常使用访问等操作,当然这是特殊情况。

3、集群内固定副本(N)的平衡问题❓

一是集群新增节点不会影响固定副本的变化。

二是分布式架构模式下,如果移除集群任意一节点,而且不采取任何措施补充一份该节点数据,就会造成该节点的数据在集群中丢失一个副本,原因就是 GoBeansproxy 不记录数据路由,不能像 MongoDB 分片那样自平衡。

所以需要借助 Python 脚本管理,保证集群内固定副本(N)的平衡。

二、容灾方案

1、Python3 采集 GoBeansproxy 日志路由数据(SET/DELETE)存储到 MySQL

2、Python3 实现 GoBeansDB 集群移除节点后的数据副本平衡

#!/usr/bin/env python3 # -*- coding: UTF-8 -*- '''=========================================================== @Project -> File :GoBeansDB -> gobeansdb_balance_remove_node.py @IDE :PyCharm @Author :Mr. Wufei @Date :2019/11/1 15:45 @Desc :Python3 实现 GoBeansDB 集群移除节点后的数据副本平衡 ============================================================''' import mysqlconn import struct import socket import python3_libmc_beansdb as plb class BalanceReplica(object): def __init__(self, mysql_conn_dict, gobeansproxy_conn_list): """ :param mysql_conn_dict: 一个包含 MySQL 连接所需信息的字典(格式:{'host': '127.0.0.1', 'port': 3306, 'user': 'test', 'passwd': 'test', 'db': 'test'}) :param gobeansproxy_conn_list: GoBeansproxy 连接的 IP:PORT 列表(格式:['ip1:port1', 'ip2:port2']) """ self.__mysql_conn_dict = mysql_conn_dict self.__gobeansproxy_conn_list = gobeansproxy_conn_list def _parse_ip_int(self, ip_str): """ 将 IP 转换为 int 类型 :param ip_str: 一个字符串 IP :return: int 类型 IP """ ip_int = struct.unpack('!I', socket.inet_aton(ip_str))[0] return ip_int def _parse_ip_str(self, ip_int): """ 将 int 类型 IP 转换为字符串类型 :param ip_int: 一个 int 类型 IP :return: 字符串 IP """ ip_str = socket.inet_ntoa(struct.pack('!I', ip_int)) return ip_str def _construct_sql(self, gobeansdb_old_ip, gobeansdb_port, gobeansdb_new_ip): """ 构造 SQL :param gobeansdb_old_ip: 被移除 GoBeansDB 的 IP :param gobeansdb_port: 被移除 GoBeansDB 的 PORT :param gobeansdb_new_ip: 接收数据的新 GoBeansDB 的 IP :return: 两个标准 SQL 语句 """ old_int_ip = self._parse_ip_int(gobeansdb_old_ip) new_int_ip = self._parse_ip_int(gobeansdb_new_ip) sql_update = ''' update gobeansproxy_record set gobeansdb_ip = %d where gobeansdb_ip = %d and gobeansdb_port = %d ''' %(new_int_ip, old_int_ip, gobeansdb_port) sql_select = ''' select request_key from gobeansproxy_record where gobeansdb_ip = %d and method_status = 1 and gobeansdb_port = %d ''' %(new_int_ip, gobeansdb_port) return sql_update, sql_select def _mysql_operations(self, mysql_conn_dict, sql_update, sql_select): """ MySQL 操作 :param mysql_conn_dict: 一个包含 MySQL 连接所需信息的字典 :param sql_update: 一个可执行的更新 SQL 语句 :param sql_select: 一个可执行的查询 SQL 语句 :return: 一个或多个图片分布式存储路径(即图片在 BeansDB/GoBeansDB 的 key)的集合(用集合主要是为了去重,因为可能存在被 DELETE 的 key) """ request_key_set = set() with mysqlconn.MySqlDB(host = mysql_conn_dict['host'], port = mysql_conn_dict['port'], user = mysql_conn_dict['user'], passwd = mysql_conn_dict['passwd'], db = mysql_conn_dict['db']) as mysqldb: update_status = mysqldb.execute(sql_update) if update_status == 1: mysqldb.execute(sql_select) for request_key_dict in mysqldb: request_key_set.add(request_key_dict['request_key']) else: return update_status return request_key_set def _get_multi_list(self, ips_port_list, image_keys_list): """ 获取所有(一个或多个)给定 key 的值 :param ips_port_list: GoBeansproxy 连接的 IP:PORT 列表(格式:['ip1:port1', 'ip2:port2']) :param image_keys_list: 一个或多个图片分布式存储路径(即图片在 BeansDB/GoBeansDB 的 key)的列表(格式:[key1, key2, key3]) :return: GoBeansDB 一个或多个 key-value 的字典(格式:{key1 : value1, key2 : value2 , key3 : value3})【针对图片存储,上限为 720,最优为 100】 """ mc = plb.InteractiveConn(ips_port_list) image_keys_values_dict = mc.get_multi_list(image_keys_list) return image_keys_values_dict def _set_multi_dict(self, ips_port_list, image_keys_values_dict): """ GoBeansDB 同时设置一个或多个 key-value :param ips_port_list: GoBeansDB 连接的 IP:PORT 列表(格式:['ip1:port1', 'ip2:port2']) :param image_keys_values_dict: GoBeansDB 一个或多个 key-value 的字典(格式:{key1 : value1, key2 : value2 , key3 : value3}) :return: set_status(True/False) """ mc = plb.InteractiveConn(ips_port_list) set_status = mc.set_multi_dict(image_keys_values_dict) return set_status def data_migration(self, gobeansdb_old_ip, gobeansdb_port, gobeansdb_new_ip): """ 可调用数据迁移操作函数 :param gobeansdb_old_ip: 被移除 GoBeansDB 的 IP :param gobeansdb_port: 被移除 GoBeansDB 的 PORT :param gobeansdb_new_ip: 接收数据的新 GoBeansDB 的 IP :return: """ sql_update, sql_select = self._construct_sql(gobeansdb_old_ip, gobeansdb_port, gobeansdb_new_ip) request_key_set = self._mysql_operations(self.__mysql_conn_dict, sql_update, sql_select) if request_key_set == 0: return 'Do not repeat this operation if additional nodes are to be programmed.' gobeansdb_conn_new_str = str(gobeansdb_new_ip) + ':' + str(gobeansdb_port) gobeansdb_conn_new_list = [gobeansdb_conn_new_str] image_keys_list = list(request_key_set) list_len = len(image_keys_list) print('当前数据迁移 key-value 数量:', list_len) # image_keys_values_dict 针对图片存储,上限为 720,最优为 100,当长度大于 360 时每次 100 循环执行 if list_len > 360: # 取整除,列表切片准则 exactly_divisible = list_len // 100 list_num = 0 while list_num <= exactly_divisible: slice_start = list_num * 100 # 判断是否为最后一次循环,当为最后一次是 stop 即为列表长度 if list_num == exactly_divisible: slice_stop = list_len else: slice_stop = list_num * 100 + 100 # 连接BeansDB/GoBeansDB,获取所有(一个或多个)给定 key 的值 image_keys_values_dict = self._get_multi_list(self.__gobeansproxy_conn_list, image_keys_list[slice_start: slice_stop]) # 连接BeansDB/GoBeansDB,同时设置一个或多个 key-value(原理还是相当于依次执行 set) set_status = self._set_multi_dict(gobeansdb_conn_new_list, image_keys_values_dict) if not set_status: # 如果为 False,不是整个 image_keys_values_dict 执行失败,而是个别 set 的值不存在,可忽略(因为可通过 Python 分析日志找出 set 失败的key) # 当然,如果数据量小,可不用采取批量形式,set_key_value 即可 print('Data Migration Fail, Value does not exist.') else: print('>>> list_len: %d; slice_start: %d; slice_stop: %d' % (list_len, slice_start, slice_stop)) list_num += 1 else: # 连接BeansDB/GoBeansDB,获取所有(一个或多个)给定 key 的值 image_keys_values_dict = self._get_multi_list(self.__gobeansproxy_conn_list, image_keys_list) # 连接BeansDB/GoBeansDB,同时设置一个或多个 key-value set_status = self._set_multi_dict(gobeansdb_conn_new_list, image_keys_values_dict) if not set_status: print('Data Migration Fail', image_keys_list) else: print('>>> list_len: %d' % (list_len)) return 'Data Migration Successful' """ if __name__ == '__main__': mysql_xxxx_gobeansdb_dict = {'host': 'xx.xx.2.35', 'port': 3701, 'user': 'xxxx_test', 'passwd': 'xxxxtest', 'db': 'xxxx_gobeansdb'} gobeansproxy_ips_port_list = ['xx.xx.3.36:7905'] br = BalanceReplica(mysql_xxxx_gobeansdb_dict, gobeansproxy_ips_port_list) result = br.data_migration('xx.xx.2.35', 7981, 'xx.xx.3.36') print(result) """

3、以上实例输出结果

当前数据迁移 key-value 数量: 7392 >>> list_len: 7392; slice_start: 0; slice_stop: 100 >>> list_len: 7392; slice_start: 100; slice_stop: 200 >>> list_len: 7392; slice_start: 200; slice_stop: 300 >>> list_len: 7392; slice_start: 300; slice_stop: 400 >>> list_len: 7392; slice_start: 400; slice_stop: 500 >>> list_len: 7392; slice_start: 500; slice_stop: 600 >>> list_len: 7392; slice_start: 600; slice_stop: 700 >>> list_len: 7392; slice_start: 700; slice_stop: 800 >>> list_len: 7392; slice_start: 800; slice_stop: 900 >>> list_len: 7392; slice_start: 900; slice_stop: 1000 >>> list_len: 7392; slice_start: 1000; slice_stop: 1100 >>> list_len: 7392; slice_start: 1100; slice_stop: 1200 >>> list_len: 7392; slice_start: 1200; slice_stop: 1300 >>> list_len: 7392; slice_start: 1300; slice_stop: 1400 >>> list_len: 7392; slice_start: 1400; slice_stop: 1500 >>> list_len: 7392; slice_start: 1500; slice_stop: 1600 >>> list_len: 7392; slice_start: 1600; slice_stop: 1700 >>> list_len: 7392; slice_start: 1700; slice_stop: 1800 >>> list_len: 7392; slice_start: 1800; slice_stop: 1900 >>> list_len: 7392; slice_start: 1900; slice_stop: 2000 >>> list_len: 7392; slice_start: 2000; slice_stop: 2100 >>> list_len: 7392; slice_start: 2100; slice_stop: 2200 >>> list_len: 7392; slice_start: 2200; slice_stop: 2300 >>> list_len: 7392; slice_start: 2300; slice_stop: 2400 >>> list_len: 7392; slice_start: 2400; slice_stop: 2500 >>> list_len: 7392; slice_start: 2500; slice_stop: 2600 >>> list_len: 7392; slice_start: 2600; slice_stop: 2700 >>> list_len: 7392; slice_start: 2700; slice_stop: 2800 >>> list_len: 7392; slice_start: 2800; slice_stop: 2900 >>> list_len: 7392; slice_start: 2900; slice_stop: 3000 >>> list_len: 7392; slice_start: 3000; slice_stop: 3100 >>> list_len: 7392; slice_start: 3100; slice_stop: 3200 >>> list_len: 7392; slice_start: 3200; slice_stop: 3300 >>> list_len: 7392; slice_start: 3300; slice_stop: 3400 >>> list_len: 7392; slice_start: 3400; slice_stop: 3500 >>> list_len: 7392; slice_start: 3500; slice_stop: 3600 >>> list_len: 7392; slice_start: 3600; slice_stop: 3700 >>> list_len: 7392; slice_start: 3700; slice_stop: 3800 >>> list_len: 7392; slice_start: 3800; slice_stop: 3900 >>> list_len: 7392; slice_start: 3900; slice_stop: 4000 >>> list_len: 7392; slice_start: 4000; slice_stop: 4100 >>> list_len: 7392; slice_start: 4100; slice_stop: 4200 >>> list_len: 7392; slice_start: 4200; slice_stop: 4300 >>> list_len: 7392; slice_start: 4300; slice_stop: 4400 >>> list_len: 7392; slice_start: 4400; slice_stop: 4500 >>> list_len: 7392; slice_start: 4500; slice_stop: 4600 >>> list_len: 7392; slice_start: 4600; slice_stop: 4700 >>> list_len: 7392; slice_start: 4700; slice_stop: 4800 >>> list_len: 7392; slice_start: 4800; slice_stop: 4900 >>> list_len: 7392; slice_start: 4900; slice_stop: 5000 >>> list_len: 7392; slice_start: 5000; slice_stop: 5100 >>> list_len: 7392; slice_start: 5100; slice_stop: 5200 >>> list_len: 7392; slice_start: 5200; slice_stop: 5300 >>> list_len: 7392; slice_start: 5300; slice_stop: 5400 >>> list_len: 7392; slice_start: 5400; slice_stop: 5500 >>> list_len: 7392; slice_start: 5500; slice_stop: 5600 >>> list_len: 7392; slice_start: 5600; slice_stop: 5700 >>> list_len: 7392; slice_start: 5700; slice_stop: 5800 >>> list_len: 7392; slice_start: 5800; slice_stop: 5900 >>> list_len: 7392; slice_start: 5900; slice_stop: 6000 >>> list_len: 7392; slice_start: 6000; slice_stop: 6100 >>> list_len: 7392; slice_start: 6100; slice_stop: 6200 >>> list_len: 7392; slice_start: 6200; slice_stop: 6300 >>> list_len: 7392; slice_start: 6300; slice_stop: 6400 >>> list_len: 7392; slice_start: 6400; slice_stop: 6500 >>> list_len: 7392; slice_start: 6500; slice_stop: 6600 >>> list_len: 7392; slice_start: 6600; slice_stop: 6700 >>> list_len: 7392; slice_start: 6700; slice_stop: 6800 >>> list_len: 7392; slice_start: 6800; slice_stop: 6900 >>> list_len: 7392; slice_start: 6900; slice_stop: 7000 >>> list_len: 7392; slice_start: 7000; slice_stop: 7100 >>> list_len: 7392; slice_start: 7100; slice_stop: 7200 >>> list_len: 7392; slice_start: 7200; slice_stop: 7300 >>> list_len: 7392; slice_start: 7300; slice_stop: 7392 Data Migration Successful

 

最新回复(0)