Files
123driver/_main.py

184 lines
8.4 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import re
import os
from typing import Dict, List, Optional, Any
from _api import API
from _logger import logger
from _utils import async_to_sync, Utils
class Driver:
def __init__(self, client_id: str, client_secret: str, base_url: str = "https://open-api.123pan.com"):
self.utils = Utils()
self.api = API(client_id=client_id, client_secret=client_secret, base_url=base_url)
self.api.check_access_token()
logger.info("Driver initialized.")
async def user_info(self):
"""获取用户信息"""
logger.info("Calling user_info()")
user_info = await self.api.get_user_info()
if user_info.get('code') == 0:
logger.info("User info fetched successfully.")
else:
logger.error(f"Error: {user_info.get('message')}")
logger.debug(f"user_info result: {user_info}")
return user_info
async def list_dir(
self,
dir: str = '/',
page: int = 1,
limit: int = 100,
return_parentFileId: bool = False,
):
"""获取目录下的文件列表"""
logger.info(f"Calling list_dir(dir={dir}, page={page}, limit={limit})")
parentFileId = 0
dir_list = dir.split('/')
files = await self._list_dir_fetch_or_cache(parentFileId=parentFileId, page=page, limit=limit)
for i in dir_list:
logger.debug(f"Processing dir segment: '{i}' with parentFileId={parentFileId}")
if i:
parentFileId = await self._list_dir_fetch_parentFileId(parentFileId, files, i, limit)
logger.debug(f"Updated parentFileId: {parentFileId}")
files = await self._list_dir_fetch_or_cache(parentFileId=parentFileId, page=page, limit=limit)
logger.info(f"Returning file list for dir={dir}")
if return_parentFileId:
return files['data']['fileList'], parentFileId
else:
return files['data']['fileList']
async def _list_dir_fetch_or_cache(
self,
parentFileId: int,
page: int = 1,
limit: int = 100,
lastFileId: Optional[int] = None,
) -> Dict[str, Any]:
"""从缓存中获取目录下的文件列表如果缓存中没有则从API获取"""
files_list = []
pages = self.utils.computing_page(page=page, limit=limit)
for p in pages:
logger.debug(f"_list_dir_fetch_or_cache(parentFileId={parentFileId}, page={p}.")
files = self.utils.get_cached_files(parentFileId=parentFileId, page=p)
if not files:
logger.debug(f"No cached files found for parentFileId={parentFileId}, fetching from API.")
files = await self.api.list_files_v2(parentFileId=parentFileId, limit=limit, lastFileId=lastFileId)
self.utils.cache_files(files=files, parentFileId=parentFileId, page=p)
return files
else:
logger.debug(f"Cached files found for parentFileId={parentFileId}, returning from cache.")
files = self.utils.get_cached_files(parentFileId=parentFileId, page=p)
files_list.append(files)
return self.utils.merge_files(files_list)
async def _list_dir_fetch_parentFileId(
self,
parentFileId: int,
files: Dict[str, Any],
filename: str,
limit: int = 100,
lastFileId: Optional[int] = None,
) -> int:
logger.info(f"_list_dir_fetch_parentFileId(parentFileId={parentFileId}, filename={filename}, limit={limit}, lastFileId={lastFileId})")
if await self._list_dir_in_files(files, filename) and lastFileId != -1: # 文件名在文件列表中直接返回parentFileId
logger.debug(f"Found {filename} in files, getting parentFileId.")
return await self._list_dir_get_parentFileId(files, filename)
elif lastFileId != -1: # 文件名不在文件列表中但有lastFileId继续搜索
logger.debug(f"Fetching more files for parentFileId={parentFileId} with lastFileId={lastFileId}")
files = await self.api.list_files_v2(parentFileId=parentFileId, limit=limit, lastFileId=lastFileId)
return await self._list_dir_fetch_parentFileId(parentFileId, files, filename, limit, files['data']['lastFileId'])
else: # 文件名不在文件列表中且lastFileId等于-1说明文件列表已经遍历完毕没有找到返回0
if await self._list_dir_in_files(files, filename):
logger.debug(f"Found {filename} in files after lastFileId exhausted.")
return await self._list_dir_get_parentFileId(files, filename)
else:
logger.error(f"Error: {filename} not found in {files['data']['fileList']}")
return 0
async def _list_dir_get_parentFileId(
self,
files: Dict[str, Any],
filename: str,
) -> int:
"""获取指定目录的parentFileId"""
logger.info(f"_list_dir_get_parentFileId(filename={filename})")
for f in files['data']['fileList']:
if f['type'] == 1 and f['filename'] == filename:
logger.debug(f"Found parentFileId: {f['fileId']} for filename: {filename}")
return f['fileId']
logger.warning(f"Directory {filename} not found in fileList.")
return 0
async def _list_dir_in_files(
self,
files: Dict[str, Any],
filename: str,
) -> bool:
"""判断目标文件是否在文件列表中"""
logger.info(f"_list_dir_in_files(filename={filename})")
for f in files['data']['fileList']:
if f['type'] == 1 and f['filename'] == filename:
logger.debug(f"Found directory {filename} in fileList.")
return True
logger.debug(f"Directory {filename} not found in fileList.")
return False
async def fetch_file(self, parentFileId: int, filename: str, lastFileId: Optional[int] = None, page: int = 1) -> Dict[str, Any]:
"""获取文件信息"""
logger.info(f"Calling fetch_file(parentFileID={parentFileId}, filename={filename})")
files = self.utils.get_cached_files(parentFileId=parentFileId, page=page)
if not files:
files = await self.api.list_files_v2(parentFileId=parentFileId, limit=100, lastFileId=lastFileId)
self.utils.cache_files(files=files, parentFileId=parentFileId, page=page)
for f in files['data']['fileList']:
if f['type'] == 0 and f['filename'] == filename:
logger.debug(f"Found file {filename} in fileList.")
return f
if lastFileId != -1: # 文件名不在文件列表中但有lastFileId继续搜索
logger.debug(f"Fetching more files for parentFileId={parentFileId} with lastFileId={lastFileId}")
await self.fetch_file(parentFileId=parentFileId, filename=filename, lastFileId=lastFileId, page=page+1)
logger.error(f"Error: {filename} not found in {files['data']['fileList']}")
return {}
async def download_file(
self,
file_path: str,
save_path: str,
progress_bar: bool = True,
):
"""下载文件"""
logger.info(f"Calling download_file(file_path={file_path}, save_path={save_path})")
dirname = os.path.dirname(file_path)
_, parentFileId = await self.list_dir(dir=dirname, return_parentFileId=True)
file = await self.fetch_file(parentFileId=parentFileId, filename=os.path.basename(file_path))
if file:
logger.debug(f"Downloading file {file_path} to {save_path}")
downlod_info = await self.api.download_file(fileId=file['fileId'])
if downlod_info.get('code') == 0:
logger.debug(f"Downloading file {file_path} to {save_path} started.")
self.utils.download_file(url=downlod_info['data']['downloadUrl'], file_path=save_path, progress_bar=progress_bar)
else:
logger.error(f"Error: {downlod_info.get('message')}")
else:
logger.error(f"Error: {file_path} not found.")
return downlod_info
@async_to_sync
async def main() -> None:
from privacy import client_id, client_secret
from _utils import Utils, time
utils = Utils()
logger.info("Starting main()")
driver = Driver(client_id=client_id, client_secret=client_secret)
start_time = time.time()
dirs = await driver.list_dir(dir='/')
if __name__ == '__main__':
_ = main()