1. 首先安装google 和 protobuf
pip install google
pip install protobuf
2. 配置protobuf文件
以我最近使用的头条DMP包配置文件为例, 文件名 toutiao_dmp.proto
syntax="proto2";
package toutiao.dmp;
option java_outer_classname = "DmpDataProto";
message DmpData { //上传文件每行一个base64编码的字符串,每个字符串包含一个完整的DmpData消息二进制字节串
repeated IdItem idList = 1; // 每行数据包含的idList大小不能超过10000
}
message IdItem {
optional uint32 timestamp = 1; //若不设置,默认以上传文件的创建时间为此条记录的创建时间
required DataType dataType = 2; //指定此id的类型,如IMEI、IDFA等
required string id = 3; //根据dataType字段的类型,放置对应类型的id的字符串,需要小写
repeated string tags = 4; //标识此id的业务标签字符串
enum DataType {
IMEI = 0;
IDFA = 1;
UID = 2;
IMEI_MD5 = 4;
IDFA_MD5 = 5;
MOBILE_HASH_SHA256 = 6;
}
}
3. 根据配置文件 生成protobuf python文件
到protobuf文件目录下 执行
protoc --python_out=. toutiao_dmp.proto
4. 生成protobuf文件
# -*- coding: utf-8 -*-
import time
import base64
import toutiao_dmp_pb2 # 由pb文件生成的python代码, 使用Protocol Buffer2
import argparse
import zipfile
import os
def pb2(data, file_type, timestamp, target_file_name):
target_file = open(target_file_name, 'a+')
dmp_data = toutiao_dmp_pb2.DmpData()
for (i, line) in enumerate(data):
exec('id_item{}={}'.format(i, 'dmp_data.idList.add()'))
if file_type == '0':
exec ('id_item{}.dataType={}'.format(i, toutiao_dmp_pb2.IdItem.IMEI))
elif file_type == '1':
exec ('id_item{}.dataType={}'.format(i, toutiao_dmp_pb2.IdItem.IDFA))
elif file_type == '2':
exec ('id_item{}.dataType={}'.format(i, toutiao_dmp_pb2.IdItem.UID))
elif file_type == '4':
exec ('id_item{}.dataType={}'.format(i, toutiao_dmp_pb2.IdItem.IMEI_MD5))
elif file_type == '5':
exec ('id_item{}.dataType={}'.format(i, toutiao_dmp_pb2.IdItem.IDFA_MD5))
elif file_type == '6':
exec ('id_item{}.dataType={}'.format(i, toutiao_dmp_pb2.IdItem.MOBILE_HASH_SHA256))
else:
continue
exec ('id_item{}.id= "{}"'.format(i, line))
exec ('id_item{}.timestamp={}'.format(i, timestamp))
binary_string = dmp_data.SerializeToString()
result_string = base64.b64encode(binary_string)
target_file.write(result_string)
target_file.write('\n')
target_file.close()
def zip_files(file, zip_name):
zip = zipfile.ZipFile(zip_name, 'w', zipfile.ZIP_DEFLATED)
zip.write(file)
zip.close()
def main(file, type):
line_cnt = 1
timestamp = int(time.time())
target_file_name = 'toutiao_dmp_' + str(timestamp)
data = []
with open(file, 'r') as f:
for line in f:
line_cnt += 1
data.append(line.strip())
if line_cnt % 99999 == 0:
data = []
pb2(data, type, timestamp, target_file_name)
pb2(data, type, timestamp, target_file_name)
zip_files(target_file_name, target_file_name + '.zip')
os.remove(target_file_name)
return target_file_name + '.zip'
if __name__ == '__main__':
parser = argparse.ArgumentParser(u"头条dmp工具")
parser.add_argument('-f', '--file', default='')
parser.add_argument('-t', '--type', default='')
args = parser.parse_args()
main(args.file, args.type)
5. 校验
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import base64
import chardet
import codecs
import zipfile
import sys
import re
import toutiao_dmp_pb2
PATTERNS = {
0: u'^[a-zA-Z0-9]{15}$',
1: u'^[a-zA-Z0-9]{8}-[a-zA-Z0-9]{4}-[a-zA-Z0-9]{4}-[a-zA-Z0-9]{4}-[a-zA-Z0-9]{12}$',
2: u'^\d+$',
3: u'^1[34578]{1}\d{9}$',
4: u'^[a-zA-Z0-9]{32}$',
5: u'^[a-zA-Z0-9]{32}$',
6: u'^[a-fA-F0-9]{64}$',
}
def validate_id_format(data_type, id_data):
reg_pattern = PATTERNS.get(data_type)
if reg_pattern:
return re.match(reg_pattern, id_data) is not None
else:
return False
def main():
zip_file = zipfile.ZipFile('/Users/wangzhongjie/Desktop/toutiao/dmp/toutiao_dmp_1572525899.zip')
valid_num = 0
invalid_num = 0
for inside_file in zip_file.namelist():
with zip_file.open(inside_file, 'rU') as f:
encoding = chardet.detect(f.peek()).get('encoding')
print encoding
decoded_file = codecs.iterdecode(f, encoding, errors='ignore')
for data_line in decoded_file:
data_line = data_line.strip()
data_line = base64.b64decode(data_line)
dmp_data = toutiao_dmp_pb2.DmpData()
dmp_data.ParseFromString(data_line)
for id_item in dmp_data.idList:
if not validate_id_format(id_item.dataType, id_item.id):
# print 'invaild item:',
# print id_item
invalid_num += 1
else:
# print 'vaild item:',
# print id_item
valid_num +=1
print 'valid_num: %s' % valid_num
print 'invalid_num: %s' % invalid_num
if __name__ == '__main__':
main()