七牛云数据存储Demo

mac2022-06-30  95

利用七牛云的Python SDK实现文件上传、下载等操作。

import os import requests import qiniu from qiniu import BucketManager from qiniu import Auth from qiniu import utils # 你自己的accessKey accessKey = 'DjZp39jT_GN7yr0i_vf5Zs3vBnzYjzkxd7kt6tuK' # 你自己的secretKey secretKey = 'HouhuF7Essfu-VveQ3GYXTcEddgP1qrwtblWuMNa' # 你自己的bucket_domain bucket_domain = '7xjsrw.com3.w1.blg.clouddn.com' class Kpan():     def __init__(self, base_dir, ak=accessKey, sk=secretKey):         """初始化Kpan类             base_dir:工作路径             ak:accessKey             sk: secretKey         """         self.auth = Auth(ak, sk)         self.bucket_manager = BucketManager(self.auth)         self.kpan_bucket = 'kpan'         self.base_dir = os.path.abspath(base_dir)     def upload_file(self, key):         filePath = os.path.join(self.base_dir, key)         # 计算当前文件的hash值,如果服务器上存在相同的副本取消上传         file_hash = utils.etag(filePath)         if file_hash == self.is_file_exist(key):             print('服务器已存在相同的副本,上传取消...')             return         upToken = self.auth.upload_token(self.kpan_bucket, key)         ret, info = qiniu.put_file(upToken, key, filePath)         print('%s上传成功...' % key)     def download_file(self, key, toPath=None):         '''下载单个文件            key:要下载的文件            toPath:存储路径,默认为base_dir         '''         if not toPath:             toPath = self.base_dir         base_url = 'http://%s/%s' % (bucket_domain, key)         # 设置token过期时间         private_url = self.auth.private_download_url(base_url, expires=3600)         r = requests.get(private_url)         path = os.path.join(toPath, key)         dir, _ = os.path.split(path)         if not os.path.exists(dir):             os.makedirs(dir)         with open(path, 'wb') as f:             f.write(r.content)             print('%s 文件保存成功' % key)     def get_all_files(self):         items = self.list_all_file()         for item in items:             self.download_file(item['key'])     def delete_file(self, key):         if not self.is_file_exist(key):             print('%s 文件不存在...' % key)             return         ret, info = self.bucket_manager.delete(self.kpan_bucket, key)         if not ret:             print('%s 文件删除成功...' % key)         else:             print(ret)     def list_all_file(self):         ret, eof, info = self.bucket_manager.list(self.kpan_bucket)         items = ret['items']         # 返回所有文件列表         return items     def rename(self, key, tokey):         if not self.is_file_exist(key) or self.is_file_exist(tokey):             print('重命名失败,“%s”可能已存在' % tokey)             return         ret, info = self.bucket_manager.rename(self.kpan_bucket, key, tokey)         if not ret:             print('将“%s”重命名为“%s”成功...' % (key, tokey))         else:             print('“%s”重命名失败...' % key)     def is_file_exist(self, key):         # 检查服务器上文件是否存在,如存在返回hash值         ret, info = self.bucket_manager.stat(self.kpan_bucket, key)         if not ret:             return False         else:             return ret['hash']     def upload_all_files(self):         for fpathe, dirs, fs in os.walk(self.base_dir):             for f in fs:                 _path = os.path.join(fpathe, f)                 key = os.path.relpath(_path, self.base_dir)                 if os.name == 'nt':                     key = key.replace('\\', '/')                 pan.upload_file(key)         print('所有文件上次成功...') if __name__ == '__main__':     # 测试文件上传     pan = Kpan('D:/Kpan')     # pan.get_all_files()     pan.upload_all_files()

 

 

转载于:https://www.cnblogs.com/lkpp/p/7400032.html

相关资源:.Net七牛云储存Demo
最新回复(0)