添加用户信息检索功能。引入一个驱动类用于API交互,并更新版本标题以保持一致。更新README和CHANGELOG以反映新功能。

This commit is contained in:
2025-07-13 22:45:35 +08:00
parent 4904506310
commit ca7a642c4f
7 changed files with 136 additions and 9 deletions

View File

@@ -1,4 +1,10 @@
## v0.1.0 (2025-07-12) ## v0.1.0 (2025-07-12)
- 初始版本发布 - 初始版本发布
- 实现了基本的api功能 - 实现了基本的api功能
## v0.1.1 (2025-07-13)
- 添加了日志功能
- 实现了用户信息查询功能
- 实现了根据path获取文件列表功能

View File

@@ -56,8 +56,8 @@
### 项目工具 ### 项目工具
- httpx异步HTTP客户端库用于发起HTTP请求。 - `httpx`异步HTTP客户端库用于发起HTTP请求。
- loguru日志库用于记录日志。 - `loguru`:日志库,用于记录日志。
## 项目进度 ## 项目进度

View File

@@ -1,8 +1,12 @@
from .__version__ import * from .__version__ import *
from ._api import * from ._api import *
from ._logger import *
from ._main import *
__all__ = [ __all__ = [
"API", "API",
"RateLimit", "RateLimit",
"logger",
"Driver",
] ]

View File

@@ -1,4 +1,4 @@
__title__ = "123driver" __title__ = "123Driver"
__description__ = "A Python library for 123netdisk API" __description__ = "A Python library for 123netdisk API"
__url__ = "https://github.com/RiyueYuwu/123driver" __url__ = "https://github.com/RiyueYuwu/123driver"
__author__ = "RiyueYuwu" __author__ = "RiyueYuwu"

View File

@@ -0,0 +1,5 @@
from loguru import logger
logger.remove()
logger.add("./logs/log_{time}.log", rotation="10 MB", retention="10 days", level="INFO")

112
_main.py
View File

@@ -1,7 +1,109 @@
import os from typing import Dict, List, Optional, Any
import time
from _api import API
from _logger import logger
from _utils import async_to_sync
def main(): class Driver:
print("Hello, world!")
time.sleep(5) def __init__(self, client_id: str, client_secret: str, base_url: str = "https://open-api.123pan.com"):
self.api = API(client_id=client_id, client_secret=client_secret, base_url=base_url)
self.api.check_access_token()
logger.info("Driver initialized.")
async def user_info(self):
"""获取用户信息"""
logger.info("Calling user_info()")
user_info = await self.api.get_user_info()
if user_info.get('code') == 0:
logger.info("User info fetched successfully.")
else:
logger.error(f"Error: {user_info.get('message')}")
logger.debug(f"user_info result: {user_info}")
return user_info
async def list_dir(
self,
dir: str = '/',
page: int = 1,
limit: int = 100,
):
"""获取目录下的文件列表"""
logger.info(f"Calling list_dir(dir={dir}, page={page}, limit={limit})")
parentFileId = 0
dir_list = dir.split('/')
files = await self.api.list_files_v2(parentFileId=parentFileId, limit=limit)
for i in dir_list:
logger.debug(f"Processing dir segment: '{i}' with parentFileId={parentFileId}")
if i:
parentFileId = await self._list_dir_fetch_parentFileId(parentFileId, files, i, limit)
logger.debug(f"Updated parentFileId: {parentFileId}")
files = await self.api.list_files_v2(parentFileId=parentFileId, limit=limit)
logger.info(f"Returning file list for dir={dir}")
return files['data']['fileList']
async def _list_dir_fetch_parentFileId(
self,
parentFileId: int,
files: Dict[str, Any],
filename: str,
limit: int = 100,
lastFileId: Optional[int] = None,
) -> int:
logger.info(f"_list_dir_fetch_parentFileId(parentFileId={parentFileId}, filename={filename}, limit={limit}, lastFileId={lastFileId})")
if await self._list_dir_in_files(files, filename) and lastFileId != -1:
logger.debug(f"Found {filename} in files, getting parentFileId.")
return await self._list_dir_get_parentFileId(files, filename)
elif lastFileId != -1:
logger.debug(f"Fetching more files for parentFileId={parentFileId} with lastFileId={lastFileId}")
files = await self.api.list_files_v2(parentFileId=parentFileId, limit=limit, lastFileId=lastFileId)
return await self._list_dir_fetch_parentFileId(parentFileId, files, filename, limit, files['lastFileId'])
else:
if await self._list_dir_in_files(files, filename):
logger.debug(f"Found {filename} in files after lastFileId exhausted.")
return await self._list_dir_get_parentFileId(files, filename)
else:
logger.error(f"Error: {filename} not found in {files['data']['fileList']}")
return 0
async def _list_dir_get_parentFileId(
self,
files: Dict[str, Any],
filename: str,
) -> int:
"""获取指定目录的parentFileId"""
logger.info(f"_list_dir_get_parentFileId(filename={filename})")
for f in files['data']['fileList']:
if f['type'] == 1 and f['filename'] == filename:
logger.debug(f"Found parentFileId: {f['fileId']} for filename: {filename}")
return f['fileId']
logger.warning(f"Directory {filename} not found in fileList.")
return 0
async def _list_dir_in_files(
self,
files: Dict[str, Any],
filename: str,
) -> bool:
"""判断目标文件是否在文件列表中"""
logger.info(f"_list_dir_in_files(filename={filename})")
for f in files['data']['fileList']:
if f['type'] == 1 and f['filename'] == filename:
logger.debug(f"Found directory {filename} in fileList.")
return True
logger.debug(f"Directory {filename} not found in fileList.")
return False
@async_to_sync
async def main() -> None:
from privacy import client_id, client_secret
logger.info("Starting main()")
driver = Driver(client_id=client_id, client_secret=client_secret)
dirs = await driver.list_dir(dir='/')
for dir in dirs:
print(dir)
if __name__ == '__main__':
_ = main()

View File

@@ -0,0 +1,10 @@
import asyncio
from functools import wraps
def async_to_sync(func):
"""装饰器: 将异步方法转换为同步方法, 自动处理asyncio.run()"""
@wraps(func)
def wrapper(*args, **kwargs):
return asyncio.run(func(*args, **kwargs))
return wrapper