motor helper

mac2022-06-30  19

# -*- coding: utf-8 -*- # @Time : 2019-02-13 10:44 # @Author : cxa # @File : mongohelper.py # @Software: PyCharm import asyncio from logger.log import storage import pathlib import datetime from motor.motor_asyncio import AsyncIOMotorClient try: import uvloop asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) except ImportError: pass db_configs = { 'type': 'mongo', 'host': '121.7.0.170', 'port': '27017', 'user': 'admin', 'passwd': '1234qwer', 'db_name': 'spider_data' } class MotorOperation(): def __init__(self): self.__dict__.update(**db_configs) if self.user: self.motor_uri = f"mongodb://{self.user}:{self.passwd}@{self.host}:{self.port}/{self.db_name}" else: self.motor_uri = f"mongodb://{self.host}:{self.port}/{self.db_name}" self.client = AsyncIOMotorClient(self.motor_uri) self.mb = self.client[self.db_name] def get_use_list(self): with open("tangxinqun_details.json", "r", encoding="utf-8") as fs: data = fs.read() return data async def save_file_to_mongo(self): items = self.get_use_list() await self.mb.tangxinqun_details.insert_many(eval(items)) async def save_data(self, items): storage.info(f"此时的items:{items}") if isinstance(items, list): for item in items: try: item["primary_key"] = item["primary_key"] await self.mb.tangxinqun_data.update_one({ 'primary_key': item.get("primary_key")}, {'$set': item}, upsert=True) except Exception as e: storage.error(f"数据插入出错:{e.args}此时的item是:{item}") elif isinstance(items, dict): try: items["primary_key"] = items["primary_key"] await self.mb.tangxinqun_data.update_one({ 'primary_key': items.get("primary_key")}, {'$set': items}, upsert=True) except Exception as e: storage.error(f"数据插入出错:{e.args}此时的item是:{items}") async def change_status(self, condition, status_code=0): # status_code 0:初始,1:开始下载,2下载完了 try: item = {} item["status"] = status_code storage.info(f"修改状态,此时的数据是:{item}") await self.mb.tangxinqun_details.update_one(condition, {'$set': item}) except Exception as e: storage.error(f"修改状态出错:{e.args}此时的数据是:{item}") async def get_detail_datas(self): data = self.mb.tangxinqun_details.find({'status': 0}) async for item in data: print(item) return data async def reset_status(self): await self.mb.tangxinqun_details.update_many({'status': 1}, {'$set': {"status": 0}}) async def reset_all_status(self): await self.mb.tangxinqun_details.update_many({}, {'$set': {"status": 0}}) async def find_data(self): curosr = self.mb.tangxinqun_details.find({'status': 0}, {"_id": 0}) async_gen = (item async for item in curosr) return async_gen async def do_delete_many(self): await self.mb.tangxinqun_data.delete_many({"flag": 0}) if __name__ == '__main__': m = MotorOperation() loop = asyncio.get_event_loop() loop.run_until_complete(m.save_file_to_mongo())

转载于:https://www.cnblogs.com/c-x-a/p/10536362.html

相关资源:JAVA上百实例源码以及开源项目
最新回复(0)