利用七牛云的Python SDK实现文件上传、下载等操作。
import os import requests import qiniu from qiniu import BucketManager from qiniu import Auth from qiniu import utils # 你自己的accessKey accessKey = 'DjZp39jT_GN7yr0i_vf5Zs3vBnzYjzkxd7kt6tuK' # 你自己的secretKey secretKey = 'HouhuF7Essfu-VveQ3GYXTcEddgP1qrwtblWuMNa' # 你自己的bucket_domain bucket_domain = '7xjsrw.com3.w1.blg.clouddn.com' class Kpan(): def __init__(self, base_dir, ak=accessKey, sk=secretKey): """初始化Kpan类 base_dir:工作路径 ak:accessKey sk: secretKey """ self.auth = Auth(ak, sk) self.bucket_manager = BucketManager(self.auth) self.kpan_bucket = 'kpan' self.base_dir = os.path.abspath(base_dir) def upload_file(self, key): filePath = os.path.join(self.base_dir, key) # 计算当前文件的hash值,如果服务器上存在相同的副本取消上传 file_hash = utils.etag(filePath) if file_hash == self.is_file_exist(key): print('服务器已存在相同的副本,上传取消...') return upToken = self.auth.upload_token(self.kpan_bucket, key) ret, info = qiniu.put_file(upToken, key, filePath) print('%s上传成功...' % key) def download_file(self, key, toPath=None): '''下载单个文件 key:要下载的文件 toPath:存储路径,默认为base_dir ''' if not toPath: toPath = self.base_dir base_url = 'http://%s/%s' % (bucket_domain, key) # 设置token过期时间 private_url = self.auth.private_download_url(base_url, expires=3600) r = requests.get(private_url) path = os.path.join(toPath, key) dir, _ = os.path.split(path) if not os.path.exists(dir): os.makedirs(dir) with open(path, 'wb') as f: f.write(r.content) print('%s 文件保存成功' % key) def get_all_files(self): items = self.list_all_file() for item in items: self.download_file(item['key']) def delete_file(self, key): if not self.is_file_exist(key): print('%s 文件不存在...' % key) return ret, info = self.bucket_manager.delete(self.kpan_bucket, key) if not ret: print('%s 文件删除成功...' % key) else: print(ret) def list_all_file(self): ret, eof, info = self.bucket_manager.list(self.kpan_bucket) items = ret['items'] # 返回所有文件列表 return items def rename(self, key, tokey): if not self.is_file_exist(key) or self.is_file_exist(tokey): print('重命名失败,“%s”可能已存在' % tokey) return ret, info = self.bucket_manager.rename(self.kpan_bucket, key, tokey) if not ret: print('将“%s”重命名为“%s”成功...' % (key, tokey)) else: print('“%s”重命名失败...' % key) def is_file_exist(self, key): # 检查服务器上文件是否存在,如存在返回hash值 ret, info = self.bucket_manager.stat(self.kpan_bucket, key) if not ret: return False else: return ret['hash'] def upload_all_files(self): for fpathe, dirs, fs in os.walk(self.base_dir): for f in fs: _path = os.path.join(fpathe, f) key = os.path.relpath(_path, self.base_dir) if os.name == 'nt': key = key.replace('\\', '/') pan.upload_file(key) print('所有文件上次成功...') if __name__ == '__main__': # 测试文件上传 pan = Kpan('D:/Kpan') # pan.get_all_files() pan.upload_all_files()
转载于:https://www.cnblogs.com/lkpp/p/7400032.html
相关资源:.Net七牛云储存Demo