1. 初始化仓库

2. 编辑了README文档
3. 初步实现了123云盘的基本api调用
4. 初步确定了项目结构
This commit is contained in:
2025-07-12 23:00:16 +08:00
commit 4904506310
8 changed files with 587 additions and 0 deletions

4
CHANGELOG.md Normal file
View File

@@ -0,0 +1,4 @@
## v0.1.0 (2025-07-12)
- 初始版本发布
- 实现了基本的api功能

95
README.md Normal file
View File

@@ -0,0 +1,95 @@
<p align="center">
<h1 align="center">123Driver</h1>
</p>
## 项目背景
123云盘作为一个优秀的云盘服务商相较于其他云盘服务商有着更高的上传速度、下载速度、存储空间、安全性等优势。因此123云盘是我的主要使用的云盘服务。
但是由于个人需求经常想要将123云盘的功能集成到自己的应用中而且我也看了下官方的开发文档发现123云盘提供了丰富的API接口包括文件管理、文件预览、文件搜索、文件下载等功能。
于是我萌生了开发一个Python模块来调用123云盘API的想法以方便自己和他人使用123云盘的API。
就此123Driver项目诞生了。
## 项目目标
- ✅ 支持所有官方API端点
- ✅ 内置速率限制控制
- ✅ 异步HTTP客户端
- ✅ 完整的类型注解
- ✅ 错误处理和重试机制
- ✅ 详细的文档和示例
## 项目方案
利用httpx库实现123云盘api的调用
## 项目结构
### 项目目录
```
123driver #项目根目录
├──__init__.py
├──__version__.py
├──_api.py
├──_logger.py
├──_utils.py
├──_main.py
├──CHANGELOG.md
├──README.md
├──requirements.txt
├── ...
```
### 项目模块
- `_api.py`接口模块封装123云盘的交互接口包括登录、查询、预订等功能。
- `_main.py`主模块:封装项目的入口函数,包括命令行参数解析、日志配置等。
- `_logger.py`日志模块:封装项目的日志操作,包括记录日志、输出日志等。
- `_utils.py`工具模块:封装项目的工具函数,包括时间戳转换、随机数生成等。
### 项目流程
![123Driver流程图](https://i.loli.net/2021/08/15/2y.png)
### 项目工具
- httpx异步HTTP客户端库用于发起HTTP请求。
- loguru日志库用于记录日志。
## 项目进度
- [x] 项目背景
- [x] 项目目标
- [x] 项目方案
- [x] 项目结构
- [x] 项目模块
- [ ] 项目流程
- [x] 项目工具
- [ ] 项目进度
- [ ] 项目总结
- [ ] 项目反思
- [ ] 项目后续规划
- [ ] 项目参考资料
- [ ] 项目实例
- [x] 项目贡献者
## 项目总结
## 项目反思
## 项目后续规划
## 项目参考资料
## 项目实例
## 项目贡献者
感谢以下[代码贡献者](https://github.com/RiyueYuwu/123driver/graphs/contributors)和社区里其他成员对 `123Driver` 的贡献:
<a href="https://github.com/RiyueYuwu/123driver/graphs/contributors">
<img src="https://contrib.rocks/image?repo=RiyueYuwu/123driver" />
</a>

8
__init__.py Normal file
View File

@@ -0,0 +1,8 @@
from .__version__ import *
from ._api import *
__all__ = [
"API",
"RateLimit",
]

8
__version__.py Normal file
View File

@@ -0,0 +1,8 @@
__title__ = "123driver"
__description__ = "A Python library for 123netdisk API"
__url__ = "https://github.com/RiyueYuwu/123driver"
__author__ = "RiyueYuwu"
__author_email__ = "2995749773@qq.com"
__version__ = "0.1.0"
__license__ = "MIT"
__copyright__ = "Copyright 2025 RiyueYuwu"

465
_api.py Normal file
View File

@@ -0,0 +1,465 @@
import re
from tabnanny import check
import time
import json
import asyncio
from typing import Dict, List, Optional, Any
from dataclasses import dataclass
from datetime import date, datetime
import httpx
@dataclass
class RateLimit:
"""速率限制配置"""
endpoint: str
max_requests_per_second: int
last_request_time: float = 0.0
request_count: int = 0
class API:
"""123Driver API Moudle"""
def __init__(self, client_id: str, client_secret: str, base_url: str = "https://open-api.123pan.com"):
self.client_id = client_id
self.client_secret = client_secret
self.base_url = base_url.rstrip('/')
self.access_token: str = ''
self.token_expires_at: float = 0.0
# 初始化速率限制配置
self.rate_limits = {
"api/v1/access_token": RateLimit("api/v1/access_token", 1),
"api/v1/user/info": RateLimit("api/v1/user/info", 1),
"api/v1/file/move": RateLimit("api/v1/file/move", 1),
"api/v1/file/delete": RateLimit("api/v1/file/delete", 1),
"api/v1/file/list": RateLimit("api/v1/file/list", 4),
"api/v2/file/list": RateLimit("api/v2/file/list", 3),
"upload/v1/file/mkdir": RateLimit("upload/v1/file/mkdir", 2),
"upload/v1/file/create": RateLimit("upload/v1/file/create", 2),
"upload/v1/file/upload_async_result": RateLimit("upload/v1/file/upload_async_result", 1),
"api/v1/share/list": RateLimit("api/v1/share/list", 10),
"api/v1/share/list/info": RateLimit("api/v1/share/list/info", 10),
"api/v1/transcode/folder/info": RateLimit("api/v1/transcode/folder/info", 20),
"api/v1/transcode/upload/from_cloud_disk": RateLimit("api/v1/transcode/upload/from_cloud_disk", 1),
"api/v1/transcode/delete": RateLimit("api/v1/transcode/delete", 10),
"api/v1/transcode/video/resolutions": RateLimit("api/v1/transcode/video/resolutions", 1),
"api/v1/transcode/video": RateLimit("api/v1/transcode/video", 3),
"api/v1/transcode/video/record": RateLimit("api/v1/transcode/video/record", 20),
"api/v1/transcode/video/result": RateLimit("api/v1/transcode/video/result", 20),
"api/v1/transcode/file/download": RateLimit("api/v1/transcode/file/download", 10),
"api/v1/transcode/m3u8_ts/download": RateLimit("api/v1/transcode/m3u8_ts/download", 20),
"api/v1/transcode/file/download/all": RateLimit("api/v1/transcode/file/download/all", 1)
}
self.client = httpx.AsyncClient(timeout=30.0)
async def _enforce_rate_limit(self, endpoint: str):
"""强制执行速率限制"""
if endpoint not in self.rate_limits:
return
rate_limit = self.rate_limits[endpoint]
current_time = time.time()
# 如果距离上次请求不足1秒需要等待
if current_time - rate_limit.last_request_time < 1.0:
wait_time = 1.0 - (current_time - rate_limit.last_request_time)
await asyncio.sleep(wait_time)
# 检查是否超过每秒请求限制
if rate_limit.request_count >= rate_limit.max_requests_per_second:
# 等待到下一秒
await asyncio.sleep(1.0)
rate_limit.request_count = 0
rate_limit.last_request_time = time.time()
rate_limit.request_count += 1
async def _get_headers(self) -> Dict[str, str]:
"""获取请求头"""
headers = {
"Content-Type": "application/json",
'Platform': 'open_platform'
}
if self.access_token:
headers["Authorization"] = f"Bearer {self.access_token}"
return headers
async def _make_request(self, method: str, endpoint: str,headers: dict = {}, **kwargs) -> Dict[str, Any]:
"""发送HTTP请求"""
await self._enforce_rate_limit(endpoint)
url = f"{self.base_url}/{endpoint}"
if not headers:
headers = await self._get_headers()
response = await self.client.request(
method=method,
url=url,
headers=headers,
**kwargs
)
response.raise_for_status()
return response.json()
async def get_access_token(self) -> Dict[str, Any]:
"""获取访问令牌"""
data = {
"clientID": self.client_id,
"clientSecret": self.client_secret
}
response = await self._make_request("POST", "api/v1/access_token", json=data)
if not response['code']:
self.access_token = response['data']["access_token"]
self.token_expires_at = datetime.fromisoformat(response['data']["expiredAt"]).timestamp()
await self.save_access_token()
return response
async def save_access_token(self) -> None:
"""保存访问令牌"""
with open("access_token.json", "w") as f:
json.dump({"acceseToken": self.access_token, "expiredAt": self.token_expires_at}, f)
def check_access_token(self) -> bool:
"""检查访问令牌是否有效"""
try:
with open("access_token.json", "r") as f:
data = json.load(f)
self.access_token = data["accessToken"]
self.token_expires_at = data["expiredAt"]
if self.token_expires_at < datetime.now().timestamp():
return False
return True
except FileNotFoundError:
return False
async def refresh_access_token(self) -> None:
"""刷新访问令牌"""
if not self.check_access_token():
await self.get_access_token()
async def get_user_info(self) -> Dict[str, Any]:
"""获取用户信息"""
return await self._make_request("GET", "api/v1/user/info")
async def get_file_info(self, fileId: int) -> Dict[str, Any]:
"""获取单个文件信息"""
return await self._make_request("GET", f"api/v1/file/detail?fileId={fileId}")
async def fet_files_info(self, fileIds: List[int]) -> Dict[str, Any]:
"""获取多个文件信息"""
data = {"fileIDs": fileIds}
return await self._make_request("POST", "api/v1/file/infos", json=data)
async def move_file(self, fileIDs: List[int], toParentFileID: int) -> Dict[str, Any]:
"""移动文件"""
data = {
"fileIDs": fileIDs,
"toParentFileID": toParentFileID
}
return await self._make_request("POST", "api/v1/file/move", json=data)
async def rename_single_file(self, fileId: int, fileName: str) -> Dict[str, Any]:
"""单个文件重命名"""
data = {
"fileID": fileId,
"fileName": fileName
}
return await self._make_request("PUT", "api/v1/file/name", json=data)
async def rename_files(self, renameList: List[str]) -> Dict[str, Any]:
"""批量文件重命名"""
data = {"renameList": renameList}
return await self._make_request("POST", "api/v1/file/rename", json=data)
async def file_trash(self, fileIDs: List[int]) -> Dict[Any, Any]:
"""文件移入回收站"""
results = []
batch_size = 100
if len(fileIDs) > batch_size:
for i in range(0, len(fileIDs), batch_size):
batch = fileIDs[i:i+batch_size]
data = {"fileIDs": batch}
result = await self._make_request("POST", "api/v1/file/trash", json=data)
results.append(result)
return {i: result for i, result in enumerate(results)}
else:
data = {"fileIDs": fileIDs}
return await self._make_request("POST", "api/v1/file/trash", json=data)
async def recover_file(self, fileIDs: List[int]) -> Dict[Any, Any]:
"""从回收站恢复文件"""
results = []
batch_size = 100
if len(fileIDs) > batch_size:
for i in range(0, len(fileIDs), batch_size):
batch = fileIDs[i:i+batch_size]
data = {"fileIDs": batch}
result = await self._make_request("POST", "api/v1/file/recover", json=data)
results.append(result)
return {i: result for i, result in enumerate(results)}
else:
data = {"fileIDs": fileIDs}
return await self._make_request("POST", "api/v1/file/recover", json=data)
async def delete_file(self, fileIDs: List[int]) -> Dict[Any, Any]:
"""彻底删除文件"""
results = []
batch_size = 100
if len(fileIDs) > batch_size:
for i in range(0, len(fileIDs), batch_size):
batch = fileIDs[i:i+batch_size]
data = {"fileIDs": batch}
result = await self._make_request("POST", "api/v1/file/delete", json=data)
results.append(result)
return {i: result for i, result in enumerate(results)}
else:
data = {"fileIDs": fileIDs}
return await self._make_request("POST", "api/v1/file/delete", json=data)
async def list_files_v1(self, parentFileId: int = 0, page: int = 1, limit: int = 100, orderBy: str = "file_name", orderDirection: str = "asc", trashed: bool = False, searchData: Optional[str] = None) -> Dict[str, Any]:
"""获取文件列表 (v1)"""
params = {
"parentFileId": parentFileId,
"page": page,
"limit": limit,
"orderBy": orderBy,
"orderDirection": orderDirection,
"trashed": int(trashed)
}
if searchData:
params["searchData"] = searchData
return await self._make_request("GET", "api/v1/file/list", params=params)
async def list_files_v2(self, parentFileId: int = 0, limit: int = 100, searchData: Optional[str] = None, searchMode: Optional[int] = None, lastFileId: Optional[int] = None) -> Dict[str, Any]:
"""获取文件列表 (v2)"""
params: Dict[str, Any] = {
"parentFileId": parentFileId,
"limit": limit
}
if searchData is not None:
params["searchData"] = searchData
if searchMode is not None:
params["searchMode"] = searchMode
if lastFileId is not None:
params["lastFileId"] = lastFileId
return await self._make_request("GET", "api/v2/file/list", params=params)
async def create_folder(self, name: str, parentID: int = 0) -> Dict[str, Any]:
"""创建文件夹"""
data = {"name": name, "parentID": parentID}
return await self._make_request("POST", "upload/v1/file/mkdir", json=data)
async def create_file_v1(self, parentFileID: int, filename: str, etag: str, size: int, duplicate: int = 1, containDir: bool = False) -> Dict[str, Any]:
"""创建文件 (v1)"""
data = {
"parentFileID": parentFileID,
"filename": filename,
"etag": etag,
"size": size,
"duplicate": duplicate,
"containDir": containDir
}
return await self._make_request("POST", "upload/v1/file/create", json=data)
async def get_upload_url_v1(self, preuploadID: str, sliceNo: int) -> Dict[str, Any]:
"""获取上传URL (v1)"""
data = {
"preuploadID": preuploadID,
"sliceNo": sliceNo
}
return await self._make_request("POST", "upload/v1/file/get_upload_url", json=data)
async def list_upload_parts_v1(self, preuploadID: str) -> Dict[str, Any]:
"""列举已上传分片 (v1)"""
data = {"preuploadID": preuploadID}
return await self._make_request("POST", "upload/v1/file/list_upload_parts", json=data)
async def upload_complete_v1(self, preuploadID: str) -> Dict[str, Any]:
"""完成上传 (v1)"""
data = {"preuploadID": preuploadID}
return await self._make_request("POST", "upload/v1/file/upload_complete", json=data)
async def upload_async_result_v1(self, preuploadID: str) -> Dict[str, Any]:
"""异步轮询获取上传结果(v1)"""
data = {"preuploadID": preuploadID}
return await self._make_request("POST", "upload/v1/file/upload_async_result", json=data)
async def create_file_v2(self, parentFileID: int, filename: str, etag: str, size: int, duplicate: int = 1, containDir: bool = False) -> Dict[str, Any]:
"""创建文件 (v2)"""
data = {
"parentFileID": parentFileID,
"filename": filename,
"etag": etag,
"size": size,
"duplicate": duplicate,
"containDir": containDir
}
return await self._make_request("POST", "upload/v2/file/create", json=data)
async def upload_slice_v2(self, preuploadID: str, sliceNo: int, sliceMD5: str, slice: bytes) -> Dict[str, Any]:
"""上传分片 (v2)"""
headers = await self._get_headers()
headers["Content-Type"] = "multipart/form-data"
data = {
"preuploadID": preuploadID,
"sliceNo": sliceNo,
"sliceMD5": sliceMD5,
"slice": slice
}
return await self._make_request("POST", "upload/v2/file/slice", headers=headers, data=data)
async def upload_complete_v2(self, preuploadID: str) -> Dict[str, Any]:
"""上传完毕 (v2)"""
data = {"preuploadID": preuploadID}
return await self._make_request("POST", "upload/v2/file/complete", json=data)
async def get_upload_domain_v2(self):
"""获取上传域名 (v2)"""
return await self._make_request("GET", "upload/v2/file/domain")
async def single_upload_v2(self, parentFileID: int, filename: str, etag: str, size: int, file: bytes, duplicate: int = 1, containDir: bool = False) -> Dict[str, Any]:
"""单步上传文件 (v2)"""
data = {
"parentFileID": parentFileID,
"filename": filename,
"etag": etag,
"size": size,
"duplicate": duplicate,
"containDir": containDir
}
return await self._make_request("POST", "upload/v2/file/single/create", json=data, data=file)
async def create_offline_downlod(self, url: str, dirID: int, fileName: Optional[str] = None, callBackUrl: Optional[str] = None) -> Dict[str, Any]:
"""创建离线下载任务"""
data: Dict[str, Any] = {
"url": url,
"dirID": dirID,
"fileName": fileName
}
if callBackUrl is not None:
data["callBackUrl"] = callBackUrl
return await self._make_request("POST", "/api/v1/offline/download", json=data)
async def offline_progress(self, taskID: int) -> Dict[str, Any]:
"""离线下载进度"""
params = {"taskID": taskID}
return await self._make_request("GET", "/api/v1/offline/download/progress", params=params)
async def share_payment_files(self, shareName: str, fileIDList: str, payAmount: int, resourceDesc: str, isReward: bool|int = False) -> Dict[str, Any]:
"""分享付费文件"""
data = {
"shareName": shareName,
"fileIDList": fileIDList,
"payAmount": payAmount,
"resourceDesc": resourceDesc,
"isReward": int(isReward)
}
return await self._make_request("POST", "/api/v1/share/content-payment/create", json=data)
async def create_share(self, shareName: str, shareExpire: int, fileIDList: str, sharePwd: Optional[str] = None, trafficSwitch: Optional[int] = None, trafficLimitSwitch: Optional[int] = None, trafficLimit: Optional[int] = None) -> Dict[str, Any]:
"""创建分享"""
data: Dict[str, Any] = {
"shareName": shareName,
"shareExpire": shareExpire,
"fileIDList": fileIDList,
}
if sharePwd is not None:
data["sharePwd"] = sharePwd
if trafficSwitch is not None:
data["trafficSwitch"] = trafficSwitch
if trafficLimitSwitch is not None:
data["trafficLimitSwitch"] = trafficLimitSwitch
if trafficLimit is not None:
data["trafficLimit"] = trafficLimit
return await self._make_request("POST", "api/v1/share/create", json=data)
async def edit_share(self, shareIdList: List[int], trafficSwitch: Optional[int] = None, trafficLimitSwitch: Optional[int] = None, trafficLimit: Optional[int] = None) -> Dict[str, Any]:
"""编辑分享"""
data: Dict[str, Any] = {
"shareIdList": shareIdList,
}
if trafficSwitch is not None:
data["trafficSwitch"] = trafficSwitch
if trafficLimitSwitch is not None:
data["trafficLimitSwitch"] = trafficLimitSwitch
if trafficLimit is not None:
data["trafficLimit"] = trafficLimit
return await self._make_request("PUT", "api/v1/share/list/info", json=data)
async def get_share_list(self, limit: int = 100, lastShareId: int = 0) -> Dict[str, Any]:
"""获取分享列表"""
params = {"limit": limit, "lastShareId": lastShareId}
return await self._make_request("GET", "api/v1/share/list", params=params)
async def get_transcode_folder_info(self, folder_path: str) -> Dict[str, Any]:
"""获取转码文件夹信息"""
params = {"folder_path": folder_path}
return await self._make_request("GET", "api/v1/transcode/folder/info", params=params)
async def upload_from_cloud_disk(self, source_path: str, target_path: str) -> Dict[str, Any]:
"""从云盘上传文件进行转码"""
data = {
"source_path": source_path,
"target_path": target_path
}
return await self._make_request("POST", "api/v1/transcode/upload/from_cloud_disk", json=data)
async def delete_transcode(self, transcode_id: str) -> Dict[str, Any]:
"""删除转码任务"""
data = {"transcode_id": transcode_id}
return await self._make_request("POST", "api/v1/transcode/delete", json=data)
async def get_video_resolutions(self) -> Dict[str, Any]:
"""获取视频分辨率列表"""
return await self._make_request("GET", "api/v1/transcode/video/resolutions")
async def transcode_video(self, file_path: str, resolution: str, output_format: str = "mp4") -> Dict[str, Any]:
"""转码视频"""
data = {
"file_path": file_path,
"resolution": resolution,
"output_format": output_format
}
return await self._make_request("POST", "api/v1/transcode/video", json=data)
async def get_transcode_record(self, transcode_id: str) -> Dict[str, Any]:
"""获取转码记录"""
params = {"transcode_id": transcode_id}
return await self._make_request("GET", "api/v1/transcode/video/record", params=params)
async def get_transcode_result(self, transcode_id: str) -> Dict[str, Any]:
"""获取转码结果"""
params = {"transcode_id": transcode_id}
return await self._make_request("GET", "api/v1/transcode/video/result", params=params)
async def download_transcode_file(self, transcode_id: str, file_path: str) -> Dict[str, Any]:
"""下载转码文件"""
params = {
"transcode_id": transcode_id,
"file_path": file_path
}
return await self._make_request("GET", "api/v1/transcode/file/download", params=params)
async def download_m3u8_ts(self, m3u8_url: str, ts_file: str) -> Dict[str, Any]:
"""下载M3U8 TS文件"""
params = {
"m3u8_url": m3u8_url,
"ts_file": ts_file
}
return await self._make_request("GET", "api/v1/transcode/m3u8_ts/download", params=params)
async def download_all_transcode_files(self, transcode_id: str) -> Dict[str, Any]:
"""下载所有转码文件"""
params = {"transcode_id": transcode_id}
return await self._make_request("GET", "api/v1/transcode/file/download/all", params=params)
async def close(self):
"""关闭客户端"""
await self.client.aclose()

0
_logger.py Normal file
View File

7
_main.py Normal file
View File

@@ -0,0 +1,7 @@
import os
import time
def main():
print("Hello, world!")
time.sleep(5)

0
_utils.py Normal file
View File