async

mac2022-06-30  20

# -*- coding: utf-8 -*- # @Time : 2019/1/7 2:11 PM # @Author : cxa # @File : motortesdt.py # @Software: PyCharm import motor.motor_asyncio import asyncio import pprint from bson import SON db_configs = { 'type': 'mongo', 'host': '123.22.3.11', 'port': '27017', 'user': 'admin', 'passwd': '12344', 'db_name': 'spider_data' } class Motor(): def __init__(self): self.__dict__.update(**db_configs) self.motor_uri = f"mongodb://{self.user}:{self.passwd}@{self.host}:{self.port}/{self.db_name}?authSource={self.user}" self.client = motor.motor_asyncio.AsyncIOMotorClient(self.motor_uri) self.db = self.client.spider_data async def do_insert(self): document = {'key': 'value'} result = await self.db.表名.insert_one(document) print(f'result{result.inserted_id}') async def do_find_one(self): document = await self.db.表名.find_one({}) pprint.pprint(document) async def do_find(self): # cursor = db.表名.find({}) # for document in await cursor.to_list(length=2): # pprint.pprint(document) c = self.db.表名.find({}) c.sort('value', -1).limit(2).skip(1) async for item in c: pprint.pprint(item) async def do_replace(self): try: coll = self.db.表名 old_document = await coll.find_one({'value': "50"}) print(f'found document:{pprint.pformat(old_document)}') _id = old_document['_id'] old_document["value"] = "12" result = await coll.replace_one({'_id': _id}, old_document) print(result) new_document = await coll.find_one({'_id': _id}) print(new_document) except TypeError as e: print("找不到value为50的数据") async def do_update(self): try: condition = {"value": 50} coll = self.db.表名 item = await coll.find_one(condition) item["value"] = 10 result = await coll.update_one(condition, {'$set': item}, upsert=True) # 更新多条update_many print(f'updated {result.modified_count} document') new_document = await coll.find_one(condition) print(f'document is now {pprint.pformat(new_document)}') except TypeError as e: print("找不到value为50的数据") async def do_delete_many(self): coll = self.db.表名 n = await coll.count() print('%s documents before calling delete_many()' % n) result = await self.db.test_collection.delete_many() print('%s documents after' % (await coll.count())) async def use_count_command(self): response = await self.db.command(SON([("count", "表名")])) print(f'response:{pprint.pformat(response)}') if __name__ == '__main__': m = Motor() loop = asyncio.get_event_loop() loop.run_until_complete(m.use_count_command())

转载于:https://www.cnblogs.com/c-x-a/p/10233271.html

最新回复(0)