从网盘链接到图文发布:一个内容自动化工具的技术实现记录
从网盘链接到图文发布:一个内容自动化工具的技术实现记录
最开始做这个项目时,我想解决的是一个非常具体的问题:网盘资源的整理和内容发布太依赖人工了。一个资源从拿到分享链接开始,通常还要经历转存、重新分享、整理文件信息、撰写介绍文案、补充配图、复制排版、再发布到内容平台。每一步单独看都不复杂,但当这些动作每天重复出现时,时间成本和出错概率都会被放大。
所以我把项目的核心目标定义成一条自动化链路:用户提交网盘链接,系统负责识别链接、转存资源、生成新的分享链接、调用 AI 生成文章、保存内容资产,并在需要时发布到平台。随着后续场景增多,这条链路又扩展出了多链接合集、公众号文章改写、公众号监听、多用户隔离、管理员配置和任务管理等能力。
这篇文章主要记录这个项目的技术实现思路,而不是功能介绍。我会从后端架构、任务编排、Provider 抽象、AI 调用、公众号文章处理和部署方式几个角度展开,也会放一些核心代码片段来说明实现细节。
一、整体架构:把复杂流程拆成可替换模块
这个项目采用 FastAPI + SQLAlchemy asyncio + MySQL 作为后端基础,前端使用 Ant Design Pro v6。整体上是前后端分离开发,但部署时由 FastAPI 挂载前端构建产物,因此最终可以作为一个服务运行。
后端没有把所有逻辑都堆在接口函数里,而是按照职责拆成几层:API 层负责参数接收和响应封装,Service 层负责编排业务流程,Repository 层负责数据库读写,Provider 层负责不同网盘平台的协议差异,Publisher 层负责不同内容平台的发布差异,AI 模块负责文本和图片接口调用。
API 路由的入口非常轻,只负责把各个业务模块挂载到统一版本前缀下:
from fastapi import APIRouter
from . import accounts, articles, auth, drives, help_docs, pipeline, publishers, system, wechat_monitor
api_router = APIRouter()
api_router.include_router(auth.router, prefix="/auth", tags=["auth"])
api_router.include_router(system.router, prefix="/system", tags=["system"])
api_router.include_router(accounts.router, prefix="/accounts", tags=["accounts"])
api_router.include_router(drives.router, prefix="/drives", tags=["drives"])
api_router.include_router(articles.router, prefix="/articles", tags=["articles"])
api_router.include_router(publishers.router, prefix="/publishers", tags=["publishers"])
api_router.include_router(pipeline.router, prefix="/pipeline", tags=["pipeline"])
api_router.include_router(wechat_monitor.router, prefix="/wechat-monitor", tags=["wechat-monitor"])
api_router.include_router(help_docs.router, prefix="/help-docs", tags=["help-docs"])
这种拆法的好处是,业务复杂度不会扩散到 Controller。比如“提交一个链路任务”在接口层只是创建任务,真正的解析、转存、生成和发布都放在 PipelineService 内部。后续我加入公众号改写、多链接合集和任务删除时,也没有破坏 API 层结构。
二、Pipeline:用后台任务承载长链路
网盘转存、AI 生成、图片处理和平台发布都不是毫秒级操作。如果把这些动作全部放在一次 HTTP 请求里同步执行,前端体验会很差,也很容易因为超时导致状态丢失。因此我把核心流程设计成后台任务。
用户提交任务后,系统先创建一条 PipelineTask,然后用 asyncio.create_task 把真正的执行逻辑放到后台。前端拿到任务 ID 后,通过任务详情和日志接口轮询状态。
核心提交逻辑如下:
class PipelineService:
def __init__(self, session: AsyncSession):
self.session = session
self.repo = TaskRepository(session)
async def submit(self, dto: PipelineCreate, *, user_id: int = 1) -> PipelineTask:
source_texts = dto.normalized_sources()
task = await self.repo.create(
user_id=user_id,
source_url="\n".join(source_texts),
source_password=dto.source_password,
drive_account_id=dto.drive_account_id,
publisher_account_id=dto.publisher_account_id,
topic=dto.topic,
style=dto.style,
status="pending",
progress=0,
)
await self.repo.commit()
asyncio.create_task(self._run_in_background(task.id, dto))
return task
后台执行时,我在内部定义了一个 step 函数,用来统一更新任务状态、进度和日志。这样每个阶段只需要调用一次 step,前端就能看到实时变化。
async def step(stage: str, progress: int, message: str = "", level: str = "INFO") -> None:
task.status = stage
task.progress = progress
await repo.append_log(task_id, stage=stage, level=level, message=message or stage)
await session.commit()
logger.info("[task#{}] {} {}% {}", task_id, stage, progress, message)
这个设计对排查问题非常有帮助。比如任务停在 parsing,说明链接识别阶段出问题;停在 transferring,说明网盘转存或分享创建失败;停在 generating,通常就是 AI 或文章生成逻辑异常。用户看到的是可读的任务日志,开发时则可以结合后端日志继续深入定位。
三、Provider 抽象:隔离不同网盘的协议差异
项目最核心的扩展点是网盘 Provider。不同网盘平台表面上都提供“分享链接、转存、创建分享”这些能力,但实现细节差异非常大。如果业务层直接依赖各个平台的接口,整个 Pipeline 很快就会变得难以维护。
因此我定义了统一的数据结构和抽象基类:
@dataclass
class ParsedShare:
provider: str
raw_url: str
share_id: str
password: str = ""
extra: dict[str, Any] = field(default_factory=dict)
@dataclass
class TransferResult:
success: bool
target_dir: str
file_ids: list[str] = field(default_factory=list)
transferred_paths: list[str] = field(default_factory=list)
message: str = ""
@dataclass
class ShareResult:
share_url: str
password: str = ""
expire_at: int = 0
raw: dict[str, Any] = field(default_factory=dict)
class CloudDriveProvider(ABC):
name: str = "base"
display_name: str = "Base"
@classmethod
@abstractmethod
def can_parse(cls, text: str) -> bool:
pass
@classmethod
@abstractmethod
def parse_share_url(cls, text: str) -> ParsedShare | None:
pass
@abstractmethod
async def list_share_files(self, share: ParsedShare) -> list[FileEntry]:
pass
@abstractmethod
async def transfer(self, share: ParsedShare, target_dir: str) -> TransferResult:
pass
@abstractmethod
async def create_share(self, file_ids: list[str], *, password: str = "", expire_days: int = 0) -> ShareResult:
pass
有了这层抽象之后,百度、夸克、迅雷、UC 这些平台都可以作为独立 Provider 实现。业务层不关心某个平台是 Cookie 登录、扫码登录,还是需要 captcha token;也不关心创建分享时是立即返回 share_id,还是需要轮询 task_id。业务层只接收 TransferResult 和 ShareResult。
真正执行转存时,DriveService 会先解析链接,再校验 Provider 类型是否匹配,然后按照优先级创建分享:优先按转存后的子项路径分享,其次按文件 ID 分享,最后降级为目录直达链接。
async def _transfer_and_share_with_provider(
self,
text: str,
provider: CloudDriveProvider,
target_dir: str,
log_fn: Callable[[str], Any] | None = None,
) -> tuple[ParsedShare, TransferResult, ShareResult]:
parsed = self.parse_link(text)
if provider.name != parsed.provider:
raise AppError(
f"链接类型为 {parsed.provider},请选择同类型网盘账号",
code="DRIVE_PROVIDER_MISMATCH",
status=400,
)
transfer = await provider.transfer(parsed, target_dir)
if not transfer.success:
raise AppError(f"转存失败: {transfer.message}", code="TRANSFER_FAIL", status=502)
share: ShareResult | None = None
if transfer.transferred_paths and hasattr(provider, "create_share_by_paths"):
try:
share = await provider.create_share_by_paths(transfer.transferred_paths)
except Exception:
share = None
if share is None and transfer.file_ids:
try:
share = await provider.create_share(transfer.file_ids)
except Exception:
share = None
if share is None:
web_url = _dir_web_url(parsed.provider, target_dir)
share = ShareResult(
share_url=web_url or target_dir,
password="",
raw={"skipped": True, "dir_path": target_dir},
)
return parsed, transfer, share
这里的降级策略是实际迭代中总结出来的。比如百度网盘有时不能直接对某些资源创建二次分享,如果没有兜底,用户只能看到失败;但如果能给出一个登录后可访问的目录直达地址,至少可以保证文章里的资源入口不是完全断掉的。
四、多账号重试:把平台错误转成可恢复流程
多网盘适配中,一个比较典型的问题是夸克容量不足。早期如果自动选择到的账号容量满了,整个任务就会失败。但从用户角度看,如果他还有其他同类型账号,系统应该继续尝试,而不是直接终止。
因此 DriveService 在按 Provider 名称自动选择账号时,会遍历当前用户启用的同类型账号。只有识别到容量不足这类可恢复错误时,才继续尝试下一个账号;其他错误则直接抛出,避免掩盖真实问题。
async def transfer_and_share_with_provider(
self,
text: str,
provider_name: str,
*,
user_id: int,
log_fn: Callable[[str], Any] | None = None,
) -> tuple[ParsedShare, TransferResult, ShareResult, int]:
providers = await self.get_providers_by_name(provider_name, user_id=user_id)
errors: list[str] = []
for provider, target_dir, account_id in providers:
try:
return (*await self._transfer_and_share_with_provider(text, provider, target_dir, log_fn=log_fn), account_id)
except Exception as e:
message = str(e)
errors.append(f"账号 #{account_id}: {message}")
if "QUARK_CAPACITY_LIMIT" not in repr(e) and "容量不足" not in message and "capacity limit" not in message.lower():
raise
logger.warning("[drive] {} 账号 #{} 容量不足,尝试下一个账号", provider_name, account_id)
raise AppError(
f"所有 {provider_name} 网盘账号均转存失败: {';'.join(errors)}",
code="DRIVE_PROVIDER_ACCOUNTS_FAILED",
status=400,
)
这个逻辑看起来不复杂,但它体现了一个重要原则:不是所有异常都应该被重试。只有明确知道错误是账号容量不足,并且换账号可能解决时,才应该继续尝试。其他错误,比如凭证失效、链接解析失败、平台接口结构变化,都应该尽早暴露。
五、AI 调用:兼容标准 JSON 和 SSE 响应
AI 文本生成采用兼容 OpenAI 的 /v1/chat/completions 接口。但在实际接入中,我遇到过两种返回格式:一种是标准 JSON,一种是虽然请求没有开启 stream,但服务端仍然返回 data: 前缀的 SSE 片段。
为了让上层业务不关心这些差异,我把响应解析放在 AITextClient 内部。它会先判断是否是 JSON,再判断是否包含 SSE 的 data: 行,并把所有 delta.content 拼接起来。
def _parse_response_text(text: str, status_code: int) -> str:
text = text.strip()
if not text:
raise ProviderError("AI 接口返回空响应", code="AI_TEXT_BAD_RESP")
if text.startswith("{"):
try:
data = json.loads(text)
choices = data.get("choices")
if not isinstance(choices, list) or not choices:
raise KeyError("choices")
first = choices[0]
message = first.get("message")
if isinstance(message, dict):
content = message.get("content")
if content:
return str(content)
delta = first.get("delta")
if isinstance(delta, dict) and delta.get("content"):
return str(delta["content"])
raise KeyError("message.content")
except (json.JSONDecodeError, KeyError, TypeError) as e:
raise ProviderError(f"AI 文本返回结构异常: {text[:200]}", code="AI_TEXT_BAD_RESP") from e
if "data:" in text:
parts: list[str] = []
for line in text.splitlines():
line = line.strip()
if not line.startswith("data:"):
continue
payload = line[5:].strip()
if payload == "[DONE]":
break
try:
chunk = json.loads(payload)
choices = chunk.get("choices")
if not isinstance(choices, list) or not choices:
continue
first = choices[0]
delta = first.get("delta")
message = first.get("message")
if isinstance(delta, dict):
content = delta.get("content") or ""
elif isinstance(message, dict):
content = message.get("content") or ""
else:
content = ""
if content:
parts.append(str(content))
except (json.JSONDecodeError, TypeError):
continue
result = "".join(parts)
if result:
return result
raise ProviderError("AI SSE 流响应中未找到有效内容", code="AI_TEXT_BAD_RESP")
raise ProviderError(f"AI 接口返回未知格式: {text[:200]}", code="AI_TEXT_BAD_RESP")
AI 客户端还会从数据库读取运行时配置,包括模型、Base URL、API Key 和重试次数。这样每个用户都可以有自己的 AI 配置,管理员也可以配置兜底参数。API Key 不会明文返回给前端,凭证类数据统一通过 Fernet 加密后落库。
六、凭证安全与多用户隔离
项目从个人工具扩展成多用户应用后,账号凭证和 AI Key 的安全性变得非常重要。系统里既有网盘 Cookie、发布平台配置,也有 AI API Key,这些都不能明文暴露。
凭证加密使用 Fernet,对称密钥由项目的 secret_key 派生:
def _derive_key(secret: str) -> bytes:
digest = hashlib.sha256(secret.encode("utf-8")).digest()
return base64.urlsafe_b64encode(digest)
_fernet = Fernet(_derive_key(settings.secret_key))
def encrypt(plain: str | None) -> str | None:
if plain is None or plain == "":
return plain
return _fernet.encrypt(plain.encode("utf-8")).decode("utf-8")
def decrypt(token: str | None) -> str | None:
if token is None or token == "":
return token
try:
return _fernet.decrypt(token.encode("utf-8")).decode("utf-8")
except InvalidToken as e:
raise AppError("凭证解密失败", code="CREDENTIAL_DECRYPT_ERROR", status=500) from e
用户认证使用密码哈希和 JWT。业务数据层面,网盘账号、发布账号、文章、任务和系统配置都绑定 user_id。接口通过当前登录用户过滤数据,管理员接口则显式区分普通用户和管理员权限。
这个改造的收益很大。AI 配置不会串用户,网盘账号不会被其他用户使用,任务列表也只展示当前用户自己的任务。管理员可以配置系统级兜底能力,但不会破坏用户个人配置的优先级。
七、公众号文章改写:HTML、Markdown、图片和链接替换
公众号文章改写是项目里比较复杂的一条支线。它不是简单地把一个 URL 发给 AI,而是需要先把原文处理成可控的中间格式。
我把公众号文章处理拆成单独服务,核心职责包括:识别公众号文章 URL、抓取 HTML、定位正文节点、提取标题和作者、把 HTML 转成 Markdown、识别正文中的网盘链接、下载微信图片到本地静态目录,并在必要时调用图片编辑能力处理水印。
文章解析器基于 Python 标准库的 HTMLParser 实现,只在进入正文节点后才收集内容,并跳过脚本、样式、视频、音频等不需要进入 Markdown 的节点:
class _WeChatMarkdownParser(HTMLParser):
def __init__(self) -> None:
super().__init__(convert_charrefs=True)
self.title_parts: list[str] = []
self.author_parts: list[str] = []
self.markdown_parts: list[str] = []
self._title_depth = 0
self._author_depth = 0
self._content_depth = 0
self._skip_depth = 0
self._link_stack: list[str] = []
self._list_stack: list[str] = []
self._blockquote_depth = 0
self._pre_depth = 0
self._format_stack: list[str] = []
@property
def markdown(self) -> str:
text = "".join(self.markdown_parts)
text = _normalise_markdown(text)
return text.strip()
def handle_starttag(self, tag: str, attrs: list[tuple[str, str | None]]) -> None:
attrs_map = {key.lower(): value or "" for key, value in attrs}
node_id = attrs_map.get("id", "")
tag = tag.lower()
if node_id == "activity-name":
self._title_depth = 1
return
if node_id == "js_name":
self._author_depth = 1
return
if node_id == "js_content":
self._content_depth = 1
elif self._content_depth:
self._content_depth += 1
else:
return
if self._skip_depth:
self._skip_depth += 1
return
if tag in SKIP_TAGS or (node_id != "js_content" and _is_hidden(attrs_map)):
self._skip_depth += 1
return
公众号改写和普通网盘任务最大的区别是,它需要在 Markdown 中替换链接。系统会先识别原文中的网盘链接,逐条转存,成功后替换成新的分享链接;如果某条链接转存失败,则从 Markdown 中移除,避免最终文章里残留不可用资源。
最近加入的“无网盘链接时仍改写”选项,也是基于这套流程扩展出来的。开启后,如果正文里没有识别到网盘链接,Pipeline 会跳过转存阶段,直接把 Markdown 交给 AI 重写。这样公众号能力就不再局限于资源文章,而是变成一个通用的文章改写入口。
八、多链接合集:从单资源到批量资源编排
单链接任务比较直接:一个链接对应一次转存和一篇文章。但多链接合集要复杂一些,因为每条链接可能来自不同平台,也可能需要使用不同账号。
我的处理方式是先对所有链接进行解析,得到每条链接对应的 Provider,再根据用户配置选择账号。如果用户为某个平台指定了账号,就使用指定账号;否则就按当前用户启用的同类型账号自动匹配。转存过程中每条链接单独记录成功或失败,多链接场景允许部分失败,最终文章只包含成功转存的资源。
在文章生成上,多链接合集支持 AI 生成和固定模板两种模式。AI 模式适合需要自然语言介绍的场景,固定模板模式适合批量发布、格式统一和低成本生成。这个设计让系统在“内容质量”和“生成稳定性”之间有了可选择空间。
九、文章发布与前端复制体验
生成文章只是第一步,最终还要让用户能方便地使用这些内容。因此文章管理页支持在线阅读、Markdown 原文查看和公众号排版复制。
公众号复制功能看似是前端细节,但实际影响很大。直接复制 Markdown 到公众号编辑器并不好用,所以我在文章详情页里生成富文本内容,并在复制前清理组件属性、补全图片和链接地址。如果浏览器支持 ClipboardItem,就写入 HTML;如果处在非安全上下文或浏览器不支持 Clipboard API,就回退到 DOM 选区复制。
发布层则继续使用 Publisher 抽象。微信公众号发布时,会经历 token 获取、图片上传、HTML 组装和草稿创建几个阶段,每个阶段都写入任务日志。这样当用户发布失败时,不会只看到“发布失败”,而是能知道失败发生在哪个阶段。
十、部署:单服务挂载前端和运行时资产
部署方式上,我最终选择让 FastAPI 同时提供 API、前端页面和运行时静态资源。前端构建后的 frontend/dist 挂载到根路径,AI 生成图片和公众号本地化图片挂载到 /static/assets。
这让部署变得更简单:容器里只需要启动一个 Python 服务,用户访问同一个地址即可使用前端和 API。Dockerfile 则负责在构建阶段安装前端依赖并生成 dist,再复制到 Python runtime 阶段。为了避免浏览器缓存旧 chunk,前端静态文件挂载时还增加了 no-cache 处理。
这种方式不是所有项目都适合,但对这个工具来说很合适。它减少了前后端分开部署的复杂度,也方便在 1Panel、Docker Compose 或单机环境中快速运行。
十一、总结
回头看这个项目,最重要的技术决策不是某一个具体接口怎么调,而是把不稳定的外部能力都收敛到清晰的边界里。网盘平台的差异由 Provider 处理,内容平台的差异由 Publisher 处理,长流程由 Pipeline 承载,AI 返回格式由客户端内部兼容,公众号 HTML 解析由独立服务负责。
这些边界让项目在不断迭代时还能保持可维护性。新增网盘时,不需要改主链路;新增发布平台时,不需要改文章生成;AI 接口返回格式变化时,也不需要影响业务服务。即使外部平台经常出现验证码、异步任务、容量不足、空响应这类问题,系统也能通过日志、任务状态和降级策略给出可排查、可恢复的结果。
目前这个项目已经形成了从资源链接到文章生成,再到内容发布的完整闭环。后续我更想继续加强三个方向:第一,把后台任务队列和失败重试做得更系统;第二,增加更多文章模板和素材管理能力;第三,继续扩展 Provider 和 Publisher,让它覆盖更多资源平台和内容平台。