爬虫技术:去重知识点

mac2022-06-30  69

爬虫技术:去重知识点

1. 去重的场景

  url去重:防止发送重复请求

  数据文本去重:防止储存重复数据

2.数据去重的原理

  什么类型的数据:

  重复的依据是什么:

  例如:  data1 = ["123",123,"456","qwe","qwe"]

  列表去重方法:

# 方法一:集合法:乱序 data = ["123",123,"qwe","qwe","456","123"] ret = list(set(data)) print(ret) # 方法二:字典键值法:有序 data = ["123",123,"qwe","qwe","456","123"] # {'123': None, 123: None, 'qwe': None, '456': None} ret_dict = {}.fromkeys(data) # dict_keys(['123', 123, 'qwe', '456']) ret_list = ret_dict.keys() # ['123', 123, 'qwe', '456'] print(list(ret_list))) # 方法三:循环判断法:有序 demo_list = list() for i in data: if i not in data: demo_list.append(i)# ['123', 123, 'qwe', '456'] print(demo_list)

  例如:  data1 = ["123",123,"456","qwe","qwe"]

  限制:"123"和123是重复的,进行去重

data = ["123",123,"qwe","qwe","456","123"] ret_list = list(set[str(i) for i in data]) print(ret_list)

  例如:对象去重

class Test(object): def __init__(self,v):   self.v = v t1 = Test(100) t2 = Test(100) t3 = Test(200) t4 = t1 data = [t1,t2,t3,t4] # [<__main__.Test object at 0x000000000227E208>, <__main__.Test object at 0x000000000227E2B0>, <__main__.Test object at 0x00000000026FE0F0>] print(list(set(data)))需求:剔除重复数据,Test对象的v相同则为重复数据 ret_list = list()ret_set = set()for i in range(len(data)): if data[i].v not in ret_list: ret_list.append(data[i].v) ret_set.add(data[i])# {<__main__.Test object at 0x00000000004E9320>, <__main__.Test object at 0x00000000004E9E48>}print((ret_set))需求:剔除重复数据,Test对象的继承的类相同则为重复数据 ret_list = list()ret_set = set()for i in range(len(data)): if data[i].__class__ not in ret_list: ret_list.append(data[i].__class__) ret_set.add(data[i])# {<__main__.Test object at 0x00000000026A9320>}print((ret_set))

  对即将产生的数据进行去重:容器去重(存储判断依据)

data = ["1",1,"2","3","3"] # 数据是一个一个过来,需要用一个容器进行记录,第一个数据肯定能放入容器中。 first_list = [] 判断依据 for i in data if i not in first_list: first_list = ["1"] first_list = ["1",1] first_list = ["1",1,"2"] first_list = ["1",1,"2","3"]

  1. 原始数据特别大,去重容器和存储容器都存储原始数据,会造成资源紧张。

  2. 判断是否重复,需要快速得出结果,因此可以去重容器放入内存中,进行判断,例如sign存md5的值,才16个字节,省空间。

  3. 提高效率

 去重容器的选择方案:临时容器(内置数据结构),持久化容器(数据库)

  set(), list()内置的数据结构。

  内置的数据结构缺点:无法实现共享无法实现数据的持久化,在分布式增量式爬虫,断电续爬,程序终止后,内存中的数据也就消失了。

  

  持久化的数据库存储:一般用redis,因为是内存形的数据库,速度快,但是当数据量特别大,可以考虑mysql数据库。或其他可以去重的数据库。可持久,可共享。

  

  常用的原始数据特征值计算方法:

    信息摘要hash算法:指纹

    SimHash算法:模糊文本的去重

    布隆过滤器方式:海量数据的过滤去重,考虑存储造成的内存压力。

  

3. 基于信息摘要算法的去重:指纹去重

  指纹的唯一性,可以将任意长度的文本,字节数据,通过算法得到固定长度的文本,MD5(128位),SHA1(160位)

from hashlib import md5 m = md5() # 必须对字符串进行编码,默认是utf-8 m.update("hello".encode()) # 对hello的第一次加密 ret = m.hexdigest() m.update("hello python".encode()) ret2 = m.hexdigest() # 对任意长度的字符串或者文本都返回固定长度的密码 print(ret) # 5d41402abc4b2a76b9719d911017c592 print(ret2) # f07d654f8d9b9123766264c2444d49b1 m.update("hello".encode()) # 对hello的第二次加密 ret3 = m.hexdigest() print(ret3) # 89f528a60d324822a45680021921b486 # 去重依据,唯一性,比较ret的值。

  指纹去重的优势:降低容器的存储空间的使用率,提高判断速度,正确率高。

 

 去重基类实现

__init__.py

# 基于内存数据结果的去重 # 基于redis的去重 # 基于mysql的去重 import hashlib import six from idna import unicode class BaseFilter(object): def __init__(self,hash_name="md5"): # self.hash_func = hashlib.md5 openssl_md5而非原生的md5 self.hash_func = getattr(hashlib,hash_name) self.storage = self._get_storage() def _safe_data(self,data): """ 返回二进制的数据 :param data: :return: python2 str == python3 bytes python2 unicode == python3 str """ if six.PY3: if isinstance(data,bytes): return data elif isinstance(data,str): return data.encode() else: raise Exception("please support str type data") else: if isinstance(data,str): return data elif isinstance(data,unicode): return data.encode() else: raise Exception("please support str type data") def _gethash(self,data): """ 计算hash值 :param data: :return: """ hash_obj = self.hash_func() data =self._safe_data(data) hash_obj.update(data) hash_value = hash_obj.hexdigest() return hash_value def save(self,data): """ 根据data原始数据进行指纹计算,并存储指纹 :param data: 二进制类型的字符串 :return: 存储sign """ hash_value = self._gethash(data) return self._save(hash_value) def _save(self,hash_value): """ redis或mysql去重写这个方法,进行存储 :param hash_value: :return: """ pass def exist(self,data): hash_value = self._gethash(data) return self._is_exist(hash_value) def _is_exist(self,hash_value): """ 交给子类继承重写,进行判断 :param hash_value: :return: """ pass def _get_storage(self): pass # 验证,说明:这里使用的是openssl_md5 data = ["123", "223", "323", "哈哈", "qwe", "123", "哈哈"] f = BaseFilter() for d in data: print(f._gethash(d))

  202cb962ac59075b964b07152d234b70  115f89503138416a242f40fb7d7f338e  bc6dc48b743dc5d013b1abaebd2faed2  8c8fa3529ee34d4e69a0baafb7069da3  76d80224611fc919a5d54f0ff9fba446  202cb962ac59075b964b07152d234b70  8c8fa3529ee34d4e69a0baafb7069da3

memory.py

from . import BaseFilter class MemoryFilter(BaseFilter):def _save(self,hash_value):return self.storage.add(hash_value) def _is_exist(self,hash_value):if hash_value in self.storage: return True return False def _get_storage(self): """ 返回set对象 :return: """ return set()

momory_demo.py

from filter_class.memory import MemoryFilter filter = MemoryFilter() data = ["123", "223", "323", "哈哈", "qwe", "123", "哈哈"] for d in data: if filter.exist(d): print("发现重复数据", d) else: filter.save(d) print("保存数据成功", d) # 结果 保存数据成功 123 保存数据成功 223 保存数据成功 323 保存数据成功 哈哈 保存数据成功 qwe 发现重复数据 123 发现重复数据 哈哈

__init__.py  测试redis去重,__init__方法中增加了一些属性

# 基于内存数据结果的去重 # 基于redis的去重 # 基于mysql的去重 import hashlib import six from idna import unicode class BaseFilter(object): def __init__(self,hash_name="md5",redis_host="localhost",redis_port=6379,redis_db=0,redis_key="filter"): # self.hash_func = hashlib.md5 openssl_md5而非原生的md5 self.redis_host = redis_host self.redis_port = redis_port self.redis_db = redis_db self.hash_func = getattr(hashlib,hash_name) self.redis_key = redis_key # 集合的名字 self.storage = self._get_storage() def _safe_data(self,data): """ 返回二进制的数据 :param data: :return: python2 str == python3 bytes python2 unicode == python3 str """ if six.PY3: if isinstance(data,bytes): return data elif isinstance(data,str): return data.encode() else: raise Exception("please support str type data") else: if isinstance(data,str): return data elif isinstance(data,unicode): return data.encode() else: raise Exception("please support str type data") def _gethash(self,data): """ 计算hash值 :param data: :return: """ hash_obj = self.hash_func() data =self._safe_data(data) hash_obj.update(data) hash_value = hash_obj.hexdigest() return hash_value def save(self,data): """ 根据data原始数据进行指纹计算,并存储指纹 :param data: 二进制类型的字符串 :return: 存储sign """ hash_value = self._gethash(data) return self._save(hash_value) def _save(self,hash_value): """ redis或mysql去重写这个方法,进行存储 :param hash_value: :return: """ pass def exist(self,data): hash_value = self._gethash(data) return self._is_exist(hash_value) def _is_exist(self,hash_value): """ 交给子类继承重写,进行判断 :param hash_value: :return: """ pass def _get_storage(self): pass

redis.py

# 基于redis持久化去重判断依据的实现 from .import BaseFilter import redis class RedisFliter(BaseFilter): def _get_storage(self): """ :return: 返回一个客户端的链接对象 """ # 因为redis的链接也是基于tcp的链接,每一次的验证,都要建立链接,关闭链接,对服务器由很大的损耗,因此可以采用连接池。 # 为了host和port,db不写死,更改父类的init方法,添加几个默认属性 pool = redis.ConnectionPool(host=self.redis_host,port=self.redis_port,db=self.redis_db) # client = redis.StrictRedis(host="",port=6379,db=0) client = redis.StrictRedis(connection_pool=pool) return client def _save(self,hash_value): """ 利用redis中的无序集合进行存储 :param hash_value: :return: """ # 为了redis_key不写死,更改父类的init,增加默认属性,给默认的0号仓库的redis_key(类似于表名,这里是集合名)集合里面添加hash_value return self.storage.sadd(self.redis_key,hash_value) def _is_exist(self,hash_value): """ redis中无序集合是否有相同的数据 :param hash_value: :return: """ # redis自带的判断函数 return self.storage.sismember(self.redis_key,hash_value)

redis_demo.py

from filter_class.redis import RedisFliter filter = RedisFliter() data = ["123", "223", "323", "哈哈", "qwe", "123", "哈哈"] for d in data: if filter.exist(d): print("发现重复数据", d) else: filter.save(d) print("保存数据成功", d) 结果: 保存数据成功 123 保存数据成功 223 保存数据成功 323 保存数据成功 哈哈 保存数据成功 qwe 发现重复数据 123 发现重复数据 哈哈

__init__.py  测试mysql去重,__init__方法中增加了一些属性

 

 布隆过滤器:海量数据的去重,但是有误差

  数据表示:hash表示一个很长的表,原始里面都存储的是0。首先对data1进行求md5,sha1,sha126三种hash算法的值,得到三个值,a1,a2,a3

       1. 假设hash表的占用空间长度为256m,则长度一共有:256 * 1024 * 1024 * 8 = 2147483648位(b)

       2. 假设data1 = "a"  求的的md5加密后的值是:0cc175b9c0f1b6a831c399e269772661 是32字节的16进制的字符串,转成十进制的字符串为:

        16955237001963240173058271559858726497 用这个数与hash表的位数 2147483648 进行求余数操作得到 :1769416289 ,则 hash[1769416289]

           处的值就从0被变成1。同理,sha1,sha128重复此操作,将剩余两个位置的数从0变成1。则用hash表中三个1的位置记录"a"这个数。若其余数的索引                               重复,则都此处的值还是1,不进行改变。映射:改值,从0改为1。

  判断重复:假设判断字符串b是否是重复的,也对b进行以上操作,加入,b的三个索引值至少有一个为0,则没有重复,若全是1,则重复。判断只取值,不改值

  缺陷:全是1会存在误判。

    

 

  实现redis版持久化的布隆过滤器

  redis中hash表示基于str类型的,最大的内存值为512m,也就是40多亿位。import hashlib

import six from idna import unicode import redis class MultipleHash(object): def __init__(self,salts,hash_func_name="md5"): if len(salts) < 3        raise ("please support more than three place list") self.salts = salts self.hash_func_name = hash_func_name self.hash_func = getattr(hashlib,self.hash_func_name)     # 加密 def get_hash_values(self,data): hash_values = list() for s in self.salts: md = self.hash_func() md.update(self._safe_hash_value(data)) md.update(self._safe_hash_value(s)) ret = md.hexdigest() ret = int(ret,16) hash_values.append(ret) return hash_values def _safe_hash_value(self,data): if six.PY3: if isinstance(data,str): return data.encode("utf-8") elif isinstance(data,bytes): return data else: raise Exception("please support str type data") else: if isinstance(data,str): return data elif isinstance(data,unicode): return data.encode("utf-8") class BloomFilter(object): def __init__(self,salts,redis_key="bloomfilter",redis_host = "localhost",redis_port=6379,redis_db=0): self.redis_key = redis_key self.redis_host = redis_host self.redis_port = redis_port self.redis_db = redis_db self.redis_cli = self._get_redis_client() self.multiple_hash = MultipleHash(salts) def _get_redis_client(self): pool = redis.ConnectionPool(host=self.redis_host, port=self.redis_port, db=self.redis_db) client = redis.StrictRedis(connection_pool=pool) return client def save(self,data): hash_values = self.multiple_hash.get_hash_values(data) for value in hash_values: offset = self._get_offset(value) self.redis_cli.setbit(self.redis_key,offset,1) return True def _get_offset(self,value): ret = value % (256 * 1024 * 1024 * 8) return ret def is_exist(self,data): hash_values = self.multiple_hash.get_hash_values(data) for value in hash_values: offset = self._get_offset(value) ret = self.redis_cli.getbit(self.redis_key, offset) if ret == 0: return False return True if __name__ == '__main__': salts = ["3","2","1"] b = BloomFilter(salts) data = ["z","x","c","v","g","g","h","h"] for d in data: if not b.is_exist(d): b.save(d) print("映射数据成功",d) else: print("发现重复数据",d)# 第一次运行

映射数据成功 z映射数据成功 x映射数据成功 c映射数据成功 v映射数据成功 g发现重复数据 g映射数据成功 h发现重复数据 h

 # 第二次运行

发现重复数据 z发现重复数据 x发现重复数据 c发现重复数据 v发现重复数据 g发现重复数据 g发现重复数据 h发现重复数据 h

 请求去重

  判断依据

    请求方法:

    请求地址: url

    请求参数:url查询参数?kw=xxxxx  ?q=str1

    请求体:

  去重方案

    信息摘要指纹去重:md5,sha1,sha256:save方法和is_exist方法

    布隆过滤器去重:

  请求数据处理

    统一大小写:(method,url)

 

 

     url参数排序

    请求体排序

 

队列与python

  临时队列分类:

    内置队列模快:queue,多线程的queue模块。 详细讲解地址:https://www.rddoc.com/doc/Python/3.6.0/zh/library/queue/#module-queue

    asyncio中的队列模块,详细讲解地址:https://www.rddoc.com/doc/Python/3.6.0/zh/library/queue/#module-queue

    gevent中的队列模块,详细讲解地址:https://www.rddoc.com/doc/Python/3.6.0/zh/library/queue/#module-queue

    tornado中的队列模块。详细讲解地址:https://www.rddoc.com/doc/Python/3.6.0/zh/library/queue/#module-queue

  持久化队列分类:

    queuelib:disk_queue  scrapy的子模块

    基于redis实现的queue:如pyspider中的redis_queue

  基于redis实现三种持久化队列

    先进先出队列实现原理:redis中的list具有 lpush,rpop左进右出,rpush,lpop右进先出

    后进先出队列实现原理:lpush,lpop左进左出。

    redis中底层存储的都是二进制,当要将对象存储到redis中,实际上会先做一个类似于序列化的操作,变成二进制,再进行存储。

    取的时候,也就要进行反序列化。

    pickle.dumps(obj)----> 二进制     pickle.loads(二进制) --- > 对象

复制大神的代码进行细节调整,实现redis队列,先进先出和后进先出

#!/usr/bin/env python # -*- encoding: utf-8 -*- # vim: set et sw=4 ts=4 sts=4 ff=unix fenc=utf8: # Author: Binux<roy@binux.me> # http://binux.me # Created on 2015-04-27 22:48:04 # code from github/binux/pyspider/message_queue/redis_queue.py # 要对这个方法进行修改,替换umsgpack为python自带的pickle模块. import pickle import time import redis # import umsgpack from six.moves import queue as BaseQueue class BaseRedisQueue(object): """ A Queue like message built over redis """ Empty = BaseQueue.Empty Full = BaseQueue.Full max_timeout = 0.3 def __init__(self, name, host='localhost', port=6379, db=0, maxsize=0, lazy_limit=True, password=None, cluster_nodes=None): """ Constructor for RedisQueue maxsize: an integer that sets the upperbound limit on the number of items that can be placed in the queue. lazy_limit: redis queue is shared via instance, a lazy size limit is used for better performance. """ self.name = name if(cluster_nodes is not None): from rediscluster import StrictRedisCluster self.redis = StrictRedisCluster(startup_nodes=cluster_nodes) else: self.redis = redis.StrictRedis(host=host, port=port, db=db, password=password) self.maxsize = maxsize self.lazy_limit = lazy_limit self.last_qsize = 0 def qsize(self): self.last_qsize = self.redis.llen(self.name) return self.last_qsize def empty(self): if self.qsize() == 0: return True else: return False def full(self): if self.maxsize and self.qsize() >= self.maxsize: return True else: return False def put_nowait(self, obj): if self.lazy_limit and self.last_qsize < self.maxsize: pass elif self.full(): raise self.Full # self.last_qsize = self.redis.rpush(self.name, umsgpack.packb(obj)) self.last_qsize = self.redis.rpush(self.name, pickle.dumps(obj)) return True def put(self, obj, block=True, timeout=None): if not block: return self.put_nowait(obj) start_time = time.time() while True: try: return self.put_nowait(obj) except self.Full: if timeout: lasted = time.time() - start_time if timeout > lasted: time.sleep(min(self.max_timeout, timeout - lasted)) else: raise else: time.sleep(self.max_timeout) def get_nowait(self): ret = self.redis.lpop(self.name) if ret is None: raise self.Empty # return umsgpack.unpackb(ret) return pickle.loads(ret) def get(self, block=True, timeout=None): if not block: return self.get_nowait() start_time = time.time() while True: try: return self.get_nowait() except self.Empty: if timeout: lasted = time.time() - start_time if timeout > lasted: time.sleep(min(self.max_timeout, timeout - lasted)) else: raise else: time.sleep(self.max_timeout) # Queue = RedisQueue # 先进先出 from .base import BaseRedisQueue FifoRedisQueue = BaseRedisQueue() # 先进后出 import pickle from .base import BaseRedisQueue class LifoRedisQueue(BaseRedisQueue): def get_nowait(self): ret = self.redis.rpop(self.name) if ret is None: raise self.Empty return pickle.loads(ret)

    优先级队列的实现:redis中的有序集合

      设置有序集合zset 权重值从1-5的值分别是:a,b,c,d,e

      获取元素:zrange zset 0 0 :获取索引值为0的元素,排在最前面的元素

           zrange zset 0 5  : 获取索引范围为 0, 5 的元素:权重小的排在前面,权重大的排在后面。

          zrangebyscore zset 0 5 : 获取索引范围内

          zrem zset a :删除a

          数据的访问:zrange zset 0 0 结果是ret

          数据的获取:zrem zset ret 

import pickle from .base import BaseRedisQueue class PriorityReidisQueue(BaseRedisQueue): """利用redis有序集合的权重,实现优先级队列""" def qsize(self): self.last_qsize = self.redis.zcard(self.name) return self.last_qsize def put_nowait(self,obj): """ obj = (score, value) """ if self.lazy_limit and self.last_qsize < self.maxsize: pass elif self.full(): raise self.Full # 权重 + 数据,用给有序集合添加值的方式模拟队列的put self.last_qsize = self.redis.zadd(self.name,obj[0],pickle.dumps(obj[1])) return True def get_nowait(self): ret = self.redis.zrange(self.name,-1,-1) # 列表为空,抛出异常 if not ret: raise self.Empty # ret 是一个列表 [b"\0\x\"],用删除队列中的值的方式,且删除权重最大的值,模拟队列的get self.redis.zrem(self.name,ret[0]) return pickle.loads(ret[0])

 消息队列与python

  基于消费者与生产者模式,结合队列的特征,实现数据传递的一种技术。

  常见的有:卡夫卡

  Producer:生产数据

  Borker:服务器,需要端口,存储数据至Disk,可持久化存储,可创建多个Broker进行集群。

  Consumer:获得数据

  Disk:存储数据

Broker集群概念

Broker中的队列分区概念

 

posted on 2019-09-07 23:07  张京墨 阅读( ...) 评论( ...) 编辑 收藏

转载于:https://www.cnblogs.com/meloncodezhang/p/11483748.html

相关资源:美团点评技术团队博客
最新回复(0)