commit 4904506310fa79f47efef3937e77cc143f52f54e
Author: 日月与吾 <2995749773@qq.com>
Date: Sat Jul 12 23:00:16 2025 +0800
1. 初始化仓库
2. 编辑了README文档
3. 初步实现了123云盘的基本api调用
4. 初步确定了项目结构
diff --git a/CHANGELOG.md b/CHANGELOG.md
new file mode 100644
index 0000000..fdc241e
--- /dev/null
+++ b/CHANGELOG.md
@@ -0,0 +1,4 @@
+## v0.1.0 (2025-07-12)
+
+ - 初始版本发布
+ - 实现了基本的api功能
\ No newline at end of file
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..7184a92
--- /dev/null
+++ b/README.md
@@ -0,0 +1,95 @@
+
+
123Driver
+
+
+## 项目背景
+
+123云盘作为一个优秀的云盘服务商,相较于其他云盘服务商,有着更高的上传速度、下载速度、存储空间、安全性等优势。因此123云盘是我的主要使用的云盘服务。
+
+但是,由于个人需求,经常想要将123云盘的功能集成到自己的应用中,而且我也看了下官方的开发文档,发现123云盘提供了丰富的API接口,包括文件管理、文件预览、文件搜索、文件下载等功能。
+
+于是,我萌生了开发一个Python模块来调用123云盘API的想法,以方便自己和他人使用123云盘的API。
+
+就此,123Driver项目诞生了。
+
+## 项目目标
+
+- ✅ 支持所有官方API端点
+- ✅ 内置速率限制控制
+- ✅ 异步HTTP客户端
+- ✅ 完整的类型注解
+- ✅ 错误处理和重试机制
+- ✅ 详细的文档和示例
+
+## 项目方案
+
+利用httpx库实现123云盘api的调用,
+
+## 项目结构
+
+### 项目目录
+
+```
+123driver #项目根目录
+├──__init__.py
+├──__version__.py
+├──_api.py
+├──_logger.py
+├──_utils.py
+├──_main.py
+├──CHANGELOG.md
+├──README.md
+├──requirements.txt
+├── ...
+```
+
+### 项目模块
+
+- `_api.py`接口模块:封装123云盘的交互接口,包括登录、查询、预订等功能。
+- `_main.py`主模块:封装项目的入口函数,包括命令行参数解析、日志配置等。
+- `_logger.py`日志模块:封装项目的日志操作,包括记录日志、输出日志等。
+- `_utils.py`工具模块:封装项目的工具函数,包括时间戳转换、随机数生成等。
+
+### 项目流程
+
+
+
+### 项目工具
+
+- httpx:异步HTTP客户端库,用于发起HTTP请求。
+- loguru:日志库,用于记录日志。
+
+## 项目进度
+
+- [x] 项目背景
+- [x] 项目目标
+- [x] 项目方案
+- [x] 项目结构
+- [x] 项目模块
+- [ ] 项目流程
+- [x] 项目工具
+- [ ] 项目进度
+- [ ] 项目总结
+- [ ] 项目反思
+- [ ] 项目后续规划
+- [ ] 项目参考资料
+- [ ] 项目实例
+- [x] 项目贡献者
+
+## 项目总结
+
+## 项目反思
+
+## 项目后续规划
+
+## 项目参考资料
+
+## 项目实例
+
+## 项目贡献者
+
+感谢以下[代码贡献者](https://github.com/RiyueYuwu/123driver/graphs/contributors)和社区里其他成员对 `123Driver` 的贡献:
+
+
+
+
\ No newline at end of file
diff --git a/__init__.py b/__init__.py
new file mode 100644
index 0000000..62ff14e
--- /dev/null
+++ b/__init__.py
@@ -0,0 +1,8 @@
+from .__version__ import *
+from ._api import *
+
+
+__all__ = [
+ "API",
+ "RateLimit",
+]
\ No newline at end of file
diff --git a/__version__.py b/__version__.py
new file mode 100644
index 0000000..ab7b582
--- /dev/null
+++ b/__version__.py
@@ -0,0 +1,8 @@
+__title__ = "123driver"
+__description__ = "A Python library for 123netdisk API"
+__url__ = "https://github.com/RiyueYuwu/123driver"
+__author__ = "RiyueYuwu"
+__author_email__ = "2995749773@qq.com"
+__version__ = "0.1.0"
+__license__ = "MIT"
+__copyright__ = "Copyright 2025 RiyueYuwu"
\ No newline at end of file
diff --git a/_api.py b/_api.py
new file mode 100644
index 0000000..7d61976
--- /dev/null
+++ b/_api.py
@@ -0,0 +1,465 @@
+import re
+from tabnanny import check
+import time
+import json
+import asyncio
+from typing import Dict, List, Optional, Any
+from dataclasses import dataclass
+from datetime import date, datetime
+import httpx
+
+
+@dataclass
+class RateLimit:
+ """速率限制配置"""
+ endpoint: str
+ max_requests_per_second: int
+ last_request_time: float = 0.0
+ request_count: int = 0
+
+
+class API:
+ """123Driver API Moudle"""
+
+ def __init__(self, client_id: str, client_secret: str, base_url: str = "https://open-api.123pan.com"):
+ self.client_id = client_id
+ self.client_secret = client_secret
+ self.base_url = base_url.rstrip('/')
+ self.access_token: str = ''
+ self.token_expires_at: float = 0.0
+
+ # 初始化速率限制配置
+ self.rate_limits = {
+ "api/v1/access_token": RateLimit("api/v1/access_token", 1),
+ "api/v1/user/info": RateLimit("api/v1/user/info", 1),
+ "api/v1/file/move": RateLimit("api/v1/file/move", 1),
+ "api/v1/file/delete": RateLimit("api/v1/file/delete", 1),
+ "api/v1/file/list": RateLimit("api/v1/file/list", 4),
+ "api/v2/file/list": RateLimit("api/v2/file/list", 3),
+ "upload/v1/file/mkdir": RateLimit("upload/v1/file/mkdir", 2),
+ "upload/v1/file/create": RateLimit("upload/v1/file/create", 2),
+ "upload/v1/file/upload_async_result": RateLimit("upload/v1/file/upload_async_result", 1),
+ "api/v1/share/list": RateLimit("api/v1/share/list", 10),
+ "api/v1/share/list/info": RateLimit("api/v1/share/list/info", 10),
+ "api/v1/transcode/folder/info": RateLimit("api/v1/transcode/folder/info", 20),
+ "api/v1/transcode/upload/from_cloud_disk": RateLimit("api/v1/transcode/upload/from_cloud_disk", 1),
+ "api/v1/transcode/delete": RateLimit("api/v1/transcode/delete", 10),
+ "api/v1/transcode/video/resolutions": RateLimit("api/v1/transcode/video/resolutions", 1),
+ "api/v1/transcode/video": RateLimit("api/v1/transcode/video", 3),
+ "api/v1/transcode/video/record": RateLimit("api/v1/transcode/video/record", 20),
+ "api/v1/transcode/video/result": RateLimit("api/v1/transcode/video/result", 20),
+ "api/v1/transcode/file/download": RateLimit("api/v1/transcode/file/download", 10),
+ "api/v1/transcode/m3u8_ts/download": RateLimit("api/v1/transcode/m3u8_ts/download", 20),
+ "api/v1/transcode/file/download/all": RateLimit("api/v1/transcode/file/download/all", 1)
+ }
+
+ self.client = httpx.AsyncClient(timeout=30.0)
+
+ async def _enforce_rate_limit(self, endpoint: str):
+ """强制执行速率限制"""
+ if endpoint not in self.rate_limits:
+ return
+
+ rate_limit = self.rate_limits[endpoint]
+ current_time = time.time()
+
+ # 如果距离上次请求不足1秒,需要等待
+ if current_time - rate_limit.last_request_time < 1.0:
+ wait_time = 1.0 - (current_time - rate_limit.last_request_time)
+ await asyncio.sleep(wait_time)
+
+ # 检查是否超过每秒请求限制
+ if rate_limit.request_count >= rate_limit.max_requests_per_second:
+ # 等待到下一秒
+ await asyncio.sleep(1.0)
+ rate_limit.request_count = 0
+
+ rate_limit.last_request_time = time.time()
+ rate_limit.request_count += 1
+
+ async def _get_headers(self) -> Dict[str, str]:
+ """获取请求头"""
+ headers = {
+ "Content-Type": "application/json",
+ 'Platform': 'open_platform'
+ }
+
+ if self.access_token:
+ headers["Authorization"] = f"Bearer {self.access_token}"
+
+ return headers
+
+ async def _make_request(self, method: str, endpoint: str,headers: dict = {}, **kwargs) -> Dict[str, Any]:
+ """发送HTTP请求"""
+ await self._enforce_rate_limit(endpoint)
+
+ url = f"{self.base_url}/{endpoint}"
+ if not headers:
+ headers = await self._get_headers()
+
+ response = await self.client.request(
+ method=method,
+ url=url,
+ headers=headers,
+ **kwargs
+ )
+
+ response.raise_for_status()
+ return response.json()
+
+ async def get_access_token(self) -> Dict[str, Any]:
+ """获取访问令牌"""
+ data = {
+ "clientID": self.client_id,
+ "clientSecret": self.client_secret
+ }
+
+ response = await self._make_request("POST", "api/v1/access_token", json=data)
+ if not response['code']:
+ self.access_token = response['data']["access_token"]
+ self.token_expires_at = datetime.fromisoformat(response['data']["expiredAt"]).timestamp()
+ await self.save_access_token()
+ return response
+
+ async def save_access_token(self) -> None:
+ """保存访问令牌"""
+ with open("access_token.json", "w") as f:
+ json.dump({"acceseToken": self.access_token, "expiredAt": self.token_expires_at}, f)
+
+ def check_access_token(self) -> bool:
+ """检查访问令牌是否有效"""
+ try:
+ with open("access_token.json", "r") as f:
+ data = json.load(f)
+ self.access_token = data["accessToken"]
+ self.token_expires_at = data["expiredAt"]
+ if self.token_expires_at < datetime.now().timestamp():
+ return False
+ return True
+ except FileNotFoundError:
+ return False
+
+ async def refresh_access_token(self) -> None:
+ """刷新访问令牌"""
+ if not self.check_access_token():
+ await self.get_access_token()
+
+ async def get_user_info(self) -> Dict[str, Any]:
+ """获取用户信息"""
+ return await self._make_request("GET", "api/v1/user/info")
+
+ async def get_file_info(self, fileId: int) -> Dict[str, Any]:
+ """获取单个文件信息"""
+ return await self._make_request("GET", f"api/v1/file/detail?fileId={fileId}")
+
+ async def fet_files_info(self, fileIds: List[int]) -> Dict[str, Any]:
+ """获取多个文件信息"""
+ data = {"fileIDs": fileIds}
+ return await self._make_request("POST", "api/v1/file/infos", json=data)
+
+ async def move_file(self, fileIDs: List[int], toParentFileID: int) -> Dict[str, Any]:
+ """移动文件"""
+ data = {
+ "fileIDs": fileIDs,
+ "toParentFileID": toParentFileID
+ }
+ return await self._make_request("POST", "api/v1/file/move", json=data)
+
+ async def rename_single_file(self, fileId: int, fileName: str) -> Dict[str, Any]:
+ """单个文件重命名"""
+ data = {
+ "fileID": fileId,
+ "fileName": fileName
+ }
+ return await self._make_request("PUT", "api/v1/file/name", json=data)
+
+ async def rename_files(self, renameList: List[str]) -> Dict[str, Any]:
+ """批量文件重命名"""
+ data = {"renameList": renameList}
+ return await self._make_request("POST", "api/v1/file/rename", json=data)
+
+ async def file_trash(self, fileIDs: List[int]) -> Dict[Any, Any]:
+ """文件移入回收站"""
+ results = []
+ batch_size = 100
+ if len(fileIDs) > batch_size:
+ for i in range(0, len(fileIDs), batch_size):
+ batch = fileIDs[i:i+batch_size]
+ data = {"fileIDs": batch}
+ result = await self._make_request("POST", "api/v1/file/trash", json=data)
+ results.append(result)
+ return {i: result for i, result in enumerate(results)}
+ else:
+ data = {"fileIDs": fileIDs}
+ return await self._make_request("POST", "api/v1/file/trash", json=data)
+
+ async def recover_file(self, fileIDs: List[int]) -> Dict[Any, Any]:
+ """从回收站恢复文件"""
+ results = []
+ batch_size = 100
+ if len(fileIDs) > batch_size:
+ for i in range(0, len(fileIDs), batch_size):
+ batch = fileIDs[i:i+batch_size]
+ data = {"fileIDs": batch}
+ result = await self._make_request("POST", "api/v1/file/recover", json=data)
+ results.append(result)
+ return {i: result for i, result in enumerate(results)}
+ else:
+ data = {"fileIDs": fileIDs}
+ return await self._make_request("POST", "api/v1/file/recover", json=data)
+
+ async def delete_file(self, fileIDs: List[int]) -> Dict[Any, Any]:
+ """彻底删除文件"""
+ results = []
+ batch_size = 100
+ if len(fileIDs) > batch_size:
+ for i in range(0, len(fileIDs), batch_size):
+ batch = fileIDs[i:i+batch_size]
+ data = {"fileIDs": batch}
+ result = await self._make_request("POST", "api/v1/file/delete", json=data)
+ results.append(result)
+ return {i: result for i, result in enumerate(results)}
+ else:
+ data = {"fileIDs": fileIDs}
+ return await self._make_request("POST", "api/v1/file/delete", json=data)
+
+ async def list_files_v1(self, parentFileId: int = 0, page: int = 1, limit: int = 100, orderBy: str = "file_name", orderDirection: str = "asc", trashed: bool = False, searchData: Optional[str] = None) -> Dict[str, Any]:
+ """获取文件列表 (v1)"""
+ params = {
+ "parentFileId": parentFileId,
+ "page": page,
+ "limit": limit,
+ "orderBy": orderBy,
+ "orderDirection": orderDirection,
+ "trashed": int(trashed)
+ }
+ if searchData:
+ params["searchData"] = searchData
+ return await self._make_request("GET", "api/v1/file/list", params=params)
+
+ async def list_files_v2(self, parentFileId: int = 0, limit: int = 100, searchData: Optional[str] = None, searchMode: Optional[int] = None, lastFileId: Optional[int] = None) -> Dict[str, Any]:
+ """获取文件列表 (v2)"""
+ params: Dict[str, Any] = {
+ "parentFileId": parentFileId,
+ "limit": limit
+ }
+ if searchData is not None:
+ params["searchData"] = searchData
+ if searchMode is not None:
+ params["searchMode"] = searchMode
+ if lastFileId is not None:
+ params["lastFileId"] = lastFileId
+ return await self._make_request("GET", "api/v2/file/list", params=params)
+
+ async def create_folder(self, name: str, parentID: int = 0) -> Dict[str, Any]:
+ """创建文件夹"""
+ data = {"name": name, "parentID": parentID}
+ return await self._make_request("POST", "upload/v1/file/mkdir", json=data)
+
+ async def create_file_v1(self, parentFileID: int, filename: str, etag: str, size: int, duplicate: int = 1, containDir: bool = False) -> Dict[str, Any]:
+ """创建文件 (v1)"""
+ data = {
+ "parentFileID": parentFileID,
+ "filename": filename,
+ "etag": etag,
+ "size": size,
+ "duplicate": duplicate,
+ "containDir": containDir
+ }
+ return await self._make_request("POST", "upload/v1/file/create", json=data)
+
+ async def get_upload_url_v1(self, preuploadID: str, sliceNo: int) -> Dict[str, Any]:
+ """获取上传URL (v1)"""
+ data = {
+ "preuploadID": preuploadID,
+ "sliceNo": sliceNo
+ }
+ return await self._make_request("POST", "upload/v1/file/get_upload_url", json=data)
+
+ async def list_upload_parts_v1(self, preuploadID: str) -> Dict[str, Any]:
+ """列举已上传分片 (v1)"""
+ data = {"preuploadID": preuploadID}
+ return await self._make_request("POST", "upload/v1/file/list_upload_parts", json=data)
+
+ async def upload_complete_v1(self, preuploadID: str) -> Dict[str, Any]:
+ """完成上传 (v1)"""
+ data = {"preuploadID": preuploadID}
+ return await self._make_request("POST", "upload/v1/file/upload_complete", json=data)
+
+ async def upload_async_result_v1(self, preuploadID: str) -> Dict[str, Any]:
+ """异步轮询获取上传结果(v1)"""
+ data = {"preuploadID": preuploadID}
+ return await self._make_request("POST", "upload/v1/file/upload_async_result", json=data)
+
+ async def create_file_v2(self, parentFileID: int, filename: str, etag: str, size: int, duplicate: int = 1, containDir: bool = False) -> Dict[str, Any]:
+ """创建文件 (v2)"""
+ data = {
+ "parentFileID": parentFileID,
+ "filename": filename,
+ "etag": etag,
+ "size": size,
+ "duplicate": duplicate,
+ "containDir": containDir
+ }
+ return await self._make_request("POST", "upload/v2/file/create", json=data)
+
+ async def upload_slice_v2(self, preuploadID: str, sliceNo: int, sliceMD5: str, slice: bytes) -> Dict[str, Any]:
+ """上传分片 (v2)"""
+ headers = await self._get_headers()
+ headers["Content-Type"] = "multipart/form-data"
+ data = {
+ "preuploadID": preuploadID,
+ "sliceNo": sliceNo,
+ "sliceMD5": sliceMD5,
+ "slice": slice
+ }
+ return await self._make_request("POST", "upload/v2/file/slice", headers=headers, data=data)
+
+ async def upload_complete_v2(self, preuploadID: str) -> Dict[str, Any]:
+ """上传完毕 (v2)"""
+ data = {"preuploadID": preuploadID}
+ return await self._make_request("POST", "upload/v2/file/complete", json=data)
+
+ async def get_upload_domain_v2(self):
+ """获取上传域名 (v2)"""
+ return await self._make_request("GET", "upload/v2/file/domain")
+
+ async def single_upload_v2(self, parentFileID: int, filename: str, etag: str, size: int, file: bytes, duplicate: int = 1, containDir: bool = False) -> Dict[str, Any]:
+ """单步上传文件 (v2)"""
+ data = {
+ "parentFileID": parentFileID,
+ "filename": filename,
+ "etag": etag,
+ "size": size,
+ "duplicate": duplicate,
+ "containDir": containDir
+ }
+ return await self._make_request("POST", "upload/v2/file/single/create", json=data, data=file)
+
+ async def create_offline_downlod(self, url: str, dirID: int, fileName: Optional[str] = None, callBackUrl: Optional[str] = None) -> Dict[str, Any]:
+ """创建离线下载任务"""
+ data: Dict[str, Any] = {
+ "url": url,
+ "dirID": dirID,
+ "fileName": fileName
+ }
+ if callBackUrl is not None:
+ data["callBackUrl"] = callBackUrl
+ return await self._make_request("POST", "/api/v1/offline/download", json=data)
+
+ async def offline_progress(self, taskID: int) -> Dict[str, Any]:
+ """离线下载进度"""
+ params = {"taskID": taskID}
+ return await self._make_request("GET", "/api/v1/offline/download/progress", params=params)
+
+ async def share_payment_files(self, shareName: str, fileIDList: str, payAmount: int, resourceDesc: str, isReward: bool|int = False) -> Dict[str, Any]:
+ """分享付费文件"""
+ data = {
+ "shareName": shareName,
+ "fileIDList": fileIDList,
+ "payAmount": payAmount,
+ "resourceDesc": resourceDesc,
+ "isReward": int(isReward)
+ }
+ return await self._make_request("POST", "/api/v1/share/content-payment/create", json=data)
+
+ async def create_share(self, shareName: str, shareExpire: int, fileIDList: str, sharePwd: Optional[str] = None, trafficSwitch: Optional[int] = None, trafficLimitSwitch: Optional[int] = None, trafficLimit: Optional[int] = None) -> Dict[str, Any]:
+ """创建分享"""
+ data: Dict[str, Any] = {
+ "shareName": shareName,
+ "shareExpire": shareExpire,
+ "fileIDList": fileIDList,
+ }
+ if sharePwd is not None:
+ data["sharePwd"] = sharePwd
+ if trafficSwitch is not None:
+ data["trafficSwitch"] = trafficSwitch
+ if trafficLimitSwitch is not None:
+ data["trafficLimitSwitch"] = trafficLimitSwitch
+ if trafficLimit is not None:
+ data["trafficLimit"] = trafficLimit
+ return await self._make_request("POST", "api/v1/share/create", json=data)
+
+ async def edit_share(self, shareIdList: List[int], trafficSwitch: Optional[int] = None, trafficLimitSwitch: Optional[int] = None, trafficLimit: Optional[int] = None) -> Dict[str, Any]:
+ """编辑分享"""
+ data: Dict[str, Any] = {
+ "shareIdList": shareIdList,
+ }
+ if trafficSwitch is not None:
+ data["trafficSwitch"] = trafficSwitch
+ if trafficLimitSwitch is not None:
+ data["trafficLimitSwitch"] = trafficLimitSwitch
+ if trafficLimit is not None:
+ data["trafficLimit"] = trafficLimit
+ return await self._make_request("PUT", "api/v1/share/list/info", json=data)
+
+ async def get_share_list(self, limit: int = 100, lastShareId: int = 0) -> Dict[str, Any]:
+ """获取分享列表"""
+ params = {"limit": limit, "lastShareId": lastShareId}
+ return await self._make_request("GET", "api/v1/share/list", params=params)
+
+ async def get_transcode_folder_info(self, folder_path: str) -> Dict[str, Any]:
+ """获取转码文件夹信息"""
+ params = {"folder_path": folder_path}
+ return await self._make_request("GET", "api/v1/transcode/folder/info", params=params)
+
+ async def upload_from_cloud_disk(self, source_path: str, target_path: str) -> Dict[str, Any]:
+ """从云盘上传文件进行转码"""
+ data = {
+ "source_path": source_path,
+ "target_path": target_path
+ }
+ return await self._make_request("POST", "api/v1/transcode/upload/from_cloud_disk", json=data)
+
+ async def delete_transcode(self, transcode_id: str) -> Dict[str, Any]:
+ """删除转码任务"""
+ data = {"transcode_id": transcode_id}
+ return await self._make_request("POST", "api/v1/transcode/delete", json=data)
+
+ async def get_video_resolutions(self) -> Dict[str, Any]:
+ """获取视频分辨率列表"""
+ return await self._make_request("GET", "api/v1/transcode/video/resolutions")
+
+ async def transcode_video(self, file_path: str, resolution: str, output_format: str = "mp4") -> Dict[str, Any]:
+ """转码视频"""
+ data = {
+ "file_path": file_path,
+ "resolution": resolution,
+ "output_format": output_format
+ }
+ return await self._make_request("POST", "api/v1/transcode/video", json=data)
+
+ async def get_transcode_record(self, transcode_id: str) -> Dict[str, Any]:
+ """获取转码记录"""
+ params = {"transcode_id": transcode_id}
+ return await self._make_request("GET", "api/v1/transcode/video/record", params=params)
+
+ async def get_transcode_result(self, transcode_id: str) -> Dict[str, Any]:
+ """获取转码结果"""
+ params = {"transcode_id": transcode_id}
+ return await self._make_request("GET", "api/v1/transcode/video/result", params=params)
+
+ async def download_transcode_file(self, transcode_id: str, file_path: str) -> Dict[str, Any]:
+ """下载转码文件"""
+ params = {
+ "transcode_id": transcode_id,
+ "file_path": file_path
+ }
+ return await self._make_request("GET", "api/v1/transcode/file/download", params=params)
+
+ async def download_m3u8_ts(self, m3u8_url: str, ts_file: str) -> Dict[str, Any]:
+ """下载M3U8 TS文件"""
+ params = {
+ "m3u8_url": m3u8_url,
+ "ts_file": ts_file
+ }
+ return await self._make_request("GET", "api/v1/transcode/m3u8_ts/download", params=params)
+
+ async def download_all_transcode_files(self, transcode_id: str) -> Dict[str, Any]:
+ """下载所有转码文件"""
+ params = {"transcode_id": transcode_id}
+ return await self._make_request("GET", "api/v1/transcode/file/download/all", params=params)
+
+ async def close(self):
+ """关闭客户端"""
+ await self.client.aclose()
\ No newline at end of file
diff --git a/_logger.py b/_logger.py
new file mode 100644
index 0000000..e69de29
diff --git a/_main.py b/_main.py
new file mode 100644
index 0000000..0496edb
--- /dev/null
+++ b/_main.py
@@ -0,0 +1,7 @@
+import os
+import time
+
+
+def main():
+ print("Hello, world!")
+ time.sleep(5)
\ No newline at end of file
diff --git a/_utils.py b/_utils.py
new file mode 100644
index 0000000..e69de29