从网盘链接到图文发布:一个内容自动化工具的技术实现记录

最开始做这个项目时,我想解决的是一个非常具体的问题:网盘资源的整理和内容发布太依赖人工了。一个资源从拿到分享链接开始,通常还要经历转存、重新分享、整理文件信息、撰写介绍文案、补充配图、复制排版、再发布到内容平台。每一步单独看都不复杂,但当这些动作每天重复出现时,时间成本和出错概率都会被放大。

所以我把项目的核心目标定义成一条自动化链路:用户提交网盘链接,系统负责识别链接、转存资源、生成新的分享链接、调用 AI 生成文章、保存内容资产,并在需要时发布到平台。随着后续场景增多,这条链路又扩展出了多链接合集、公众号文章改写、公众号监听、多用户隔离、管理员配置和任务管理等能力。

这篇文章主要记录这个项目的技术实现思路,而不是功能介绍。我会从后端架构、任务编排、Provider 抽象、AI 调用、公众号文章处理和部署方式几个角度展开,也会放一些核心代码片段来说明实现细节。

一、整体架构:把复杂流程拆成可替换模块

这个项目采用 FastAPI + SQLAlchemy asyncio + MySQL 作为后端基础,前端使用 Ant Design Pro v6。整体上是前后端分离开发,但部署时由 FastAPI 挂载前端构建产物,因此最终可以作为一个服务运行。

后端没有把所有逻辑都堆在接口函数里,而是按照职责拆成几层:API 层负责参数接收和响应封装,Service 层负责编排业务流程,Repository 层负责数据库读写,Provider 层负责不同网盘平台的协议差异,Publisher 层负责不同内容平台的发布差异,AI 模块负责文本和图片接口调用。

API 路由的入口非常轻,只负责把各个业务模块挂载到统一版本前缀下:

from fastapi import APIRouter

from . import accounts, articles, auth, drives, help_docs, pipeline, publishers, system, wechat_monitor

api_router = APIRouter()
api_router.include_router(auth.router, prefix="/auth", tags=["auth"])
api_router.include_router(system.router, prefix="/system", tags=["system"])
api_router.include_router(accounts.router, prefix="/accounts", tags=["accounts"])
api_router.include_router(drives.router, prefix="/drives", tags=["drives"])
api_router.include_router(articles.router, prefix="/articles", tags=["articles"])
api_router.include_router(publishers.router, prefix="/publishers", tags=["publishers"])
api_router.include_router(pipeline.router, prefix="/pipeline", tags=["pipeline"])
api_router.include_router(wechat_monitor.router, prefix="/wechat-monitor", tags=["wechat-monitor"])
api_router.include_router(help_docs.router, prefix="/help-docs", tags=["help-docs"])

这种拆法的好处是,业务复杂度不会扩散到 Controller。比如“提交一个链路任务”在接口层只是创建任务,真正的解析、转存、生成和发布都放在 PipelineService 内部。后续我加入公众号改写、多链接合集和任务删除时,也没有破坏 API 层结构。

二、Pipeline:用后台任务承载长链路

网盘转存、AI 生成、图片处理和平台发布都不是毫秒级操作。如果把这些动作全部放在一次 HTTP 请求里同步执行,前端体验会很差,也很容易因为超时导致状态丢失。因此我把核心流程设计成后台任务。

用户提交任务后,系统先创建一条 PipelineTask​,然后用 asyncio.create_task 把真正的执行逻辑放到后台。前端拿到任务 ID 后,通过任务详情和日志接口轮询状态。

核心提交逻辑如下:

class PipelineService:
    def __init__(self, session: AsyncSession):
        self.session = session
        self.repo = TaskRepository(session)

    async def submit(self, dto: PipelineCreate, *, user_id: int = 1) -> PipelineTask:
        source_texts = dto.normalized_sources()
        task = await self.repo.create(
            user_id=user_id,
            source_url="\n".join(source_texts),
            source_password=dto.source_password,
            drive_account_id=dto.drive_account_id,
            publisher_account_id=dto.publisher_account_id,
            topic=dto.topic,
            style=dto.style,
            status="pending",
            progress=0,
        )
        await self.repo.commit()

        asyncio.create_task(self._run_in_background(task.id, dto))
        return task

后台执行时,我在内部定义了一个 step​ 函数,用来统一更新任务状态、进度和日志。这样每个阶段只需要调用一次 step,前端就能看到实时变化。

async def step(stage: str, progress: int, message: str = "", level: str = "INFO") -> None:
    task.status = stage
    task.progress = progress
    await repo.append_log(task_id, stage=stage, level=level, message=message or stage)
    await session.commit()
    logger.info("[task#{}] {} {}% {}", task_id, stage, progress, message)

这个设计对排查问题非常有帮助。比如任务停在 parsing​,说明链接识别阶段出问题;停在 transferring​,说明网盘转存或分享创建失败;停在 generating,通常就是 AI 或文章生成逻辑异常。用户看到的是可读的任务日志,开发时则可以结合后端日志继续深入定位。

三、Provider 抽象:隔离不同网盘的协议差异

项目最核心的扩展点是网盘 Provider。不同网盘平台表面上都提供“分享链接、转存、创建分享”这些能力,但实现细节差异非常大。如果业务层直接依赖各个平台的接口,整个 Pipeline 很快就会变得难以维护。

因此我定义了统一的数据结构和抽象基类:

@dataclass
class ParsedShare:
    provider: str
    raw_url: str
    share_id: str
    password: str = ""
    extra: dict[str, Any] = field(default_factory=dict)

@dataclass
class TransferResult:
    success: bool
    target_dir: str
    file_ids: list[str] = field(default_factory=list)
    transferred_paths: list[str] = field(default_factory=list)
    message: str = ""

@dataclass
class ShareResult:
    share_url: str
    password: str = ""
    expire_at: int = 0
    raw: dict[str, Any] = field(default_factory=dict)

class CloudDriveProvider(ABC):
    name: str = "base"
    display_name: str = "Base"

    @classmethod
    @abstractmethod
    def can_parse(cls, text: str) -> bool:
        pass

    @classmethod
    @abstractmethod
    def parse_share_url(cls, text: str) -> ParsedShare | None:
        pass

    @abstractmethod
    async def list_share_files(self, share: ParsedShare) -> list[FileEntry]:
        pass

    @abstractmethod
    async def transfer(self, share: ParsedShare, target_dir: str) -> TransferResult:
        pass

    @abstractmethod
    async def create_share(self, file_ids: list[str], *, password: str = "", expire_days: int = 0) -> ShareResult:
        pass

有了这层抽象之后,百度、夸克、迅雷、UC 这些平台都可以作为独立 Provider 实现。业务层不关心某个平台是 Cookie 登录、扫码登录,还是需要 captcha token;也不关心创建分享时是立即返回 share_id,还是需要轮询 task_id。业务层只接收 TransferResult​ 和 ShareResult

真正执行转存时,DriveService 会先解析链接,再校验 Provider 类型是否匹配,然后按照优先级创建分享:优先按转存后的子项路径分享,其次按文件 ID 分享,最后降级为目录直达链接。

async def _transfer_and_share_with_provider(
    self,
    text: str,
    provider: CloudDriveProvider,
    target_dir: str,
    log_fn: Callable[[str], Any] | None = None,
) -> tuple[ParsedShare, TransferResult, ShareResult]:
    parsed = self.parse_link(text)
    if provider.name != parsed.provider:
        raise AppError(
            f"链接类型为 {parsed.provider},请选择同类型网盘账号",
            code="DRIVE_PROVIDER_MISMATCH",
            status=400,
        )

    transfer = await provider.transfer(parsed, target_dir)
    if not transfer.success:
        raise AppError(f"转存失败: {transfer.message}", code="TRANSFER_FAIL", status=502)

    share: ShareResult | None = None
    if transfer.transferred_paths and hasattr(provider, "create_share_by_paths"):
        try:
            share = await provider.create_share_by_paths(transfer.transferred_paths)
        except Exception:
            share = None

    if share is None and transfer.file_ids:
        try:
            share = await provider.create_share(transfer.file_ids)
        except Exception:
            share = None

    if share is None:
        web_url = _dir_web_url(parsed.provider, target_dir)
        share = ShareResult(
            share_url=web_url or target_dir,
            password="",
            raw={"skipped": True, "dir_path": target_dir},
        )

    return parsed, transfer, share

这里的降级策略是实际迭代中总结出来的。比如百度网盘有时不能直接对某些资源创建二次分享,如果没有兜底,用户只能看到失败;但如果能给出一个登录后可访问的目录直达地址,至少可以保证文章里的资源入口不是完全断掉的。

四、多账号重试:把平台错误转成可恢复流程

多网盘适配中,一个比较典型的问题是夸克容量不足。早期如果自动选择到的账号容量满了,整个任务就会失败。但从用户角度看,如果他还有其他同类型账号,系统应该继续尝试,而不是直接终止。

因此 DriveService 在按 Provider 名称自动选择账号时,会遍历当前用户启用的同类型账号。只有识别到容量不足这类可恢复错误时,才继续尝试下一个账号;其他错误则直接抛出,避免掩盖真实问题。

async def transfer_and_share_with_provider(
    self,
    text: str,
    provider_name: str,
    *,
    user_id: int,
    log_fn: Callable[[str], Any] | None = None,
) -> tuple[ParsedShare, TransferResult, ShareResult, int]:
    providers = await self.get_providers_by_name(provider_name, user_id=user_id)
    errors: list[str] = []
    for provider, target_dir, account_id in providers:
        try:
            return (*await self._transfer_and_share_with_provider(text, provider, target_dir, log_fn=log_fn), account_id)
        except Exception as e:
            message = str(e)
            errors.append(f"账号 #{account_id}: {message}")
            if "QUARK_CAPACITY_LIMIT" not in repr(e) and "容量不足" not in message and "capacity limit" not in message.lower():
                raise
            logger.warning("[drive] {} 账号 #{} 容量不足,尝试下一个账号", provider_name, account_id)
    raise AppError(
        f"所有 {provider_name} 网盘账号均转存失败: {';'.join(errors)}",
        code="DRIVE_PROVIDER_ACCOUNTS_FAILED",
        status=400,
    )

这个逻辑看起来不复杂,但它体现了一个重要原则:不是所有异常都应该被重试。只有明确知道错误是账号容量不足,并且换账号可能解决时,才应该继续尝试。其他错误,比如凭证失效、链接解析失败、平台接口结构变化,都应该尽早暴露。

五、AI 调用:兼容标准 JSON 和 SSE 响应

AI 文本生成采用兼容 OpenAI 的 /v1/chat/completions​ 接口。但在实际接入中,我遇到过两种返回格式:一种是标准 JSON,一种是虽然请求没有开启 stream,但服务端仍然返回 data: 前缀的 SSE 片段。

为了让上层业务不关心这些差异,我把响应解析放在 AITextClient​ 内部。它会先判断是否是 JSON,再判断是否包含 SSE 的 data:​ 行,并把所有 delta.content 拼接起来。

def _parse_response_text(text: str, status_code: int) -> str:
    text = text.strip()
    if not text:
        raise ProviderError("AI 接口返回空响应", code="AI_TEXT_BAD_RESP")

    if text.startswith("{"):
        try:
            data = json.loads(text)
            choices = data.get("choices")
            if not isinstance(choices, list) or not choices:
                raise KeyError("choices")
            first = choices[0]
            message = first.get("message")
            if isinstance(message, dict):
                content = message.get("content")
                if content:
                    return str(content)
            delta = first.get("delta")
            if isinstance(delta, dict) and delta.get("content"):
                return str(delta["content"])
            raise KeyError("message.content")
        except (json.JSONDecodeError, KeyError, TypeError) as e:
            raise ProviderError(f"AI 文本返回结构异常: {text[:200]}", code="AI_TEXT_BAD_RESP") from e

    if "data:" in text:
        parts: list[str] = []
        for line in text.splitlines():
            line = line.strip()
            if not line.startswith("data:"):
                continue
            payload = line[5:].strip()
            if payload == "[DONE]":
                break
            try:
                chunk = json.loads(payload)
                choices = chunk.get("choices")
                if not isinstance(choices, list) or not choices:
                    continue
                first = choices[0]
                delta = first.get("delta")
                message = first.get("message")
                if isinstance(delta, dict):
                    content = delta.get("content") or ""
                elif isinstance(message, dict):
                    content = message.get("content") or ""
                else:
                    content = ""
                if content:
                    parts.append(str(content))
            except (json.JSONDecodeError, TypeError):
                continue
        result = "".join(parts)
        if result:
            return result
        raise ProviderError("AI SSE 流响应中未找到有效内容", code="AI_TEXT_BAD_RESP")

    raise ProviderError(f"AI 接口返回未知格式: {text[:200]}", code="AI_TEXT_BAD_RESP")

AI 客户端还会从数据库读取运行时配置,包括模型、Base URL、API Key 和重试次数。这样每个用户都可以有自己的 AI 配置,管理员也可以配置兜底参数。API Key 不会明文返回给前端,凭证类数据统一通过 Fernet 加密后落库。

六、凭证安全与多用户隔离

项目从个人工具扩展成多用户应用后,账号凭证和 AI Key 的安全性变得非常重要。系统里既有网盘 Cookie、发布平台配置,也有 AI API Key,这些都不能明文暴露。

凭证加密使用 Fernet,对称密钥由项目的 secret_key 派生:

def _derive_key(secret: str) -> bytes:
    digest = hashlib.sha256(secret.encode("utf-8")).digest()
    return base64.urlsafe_b64encode(digest)

_fernet = Fernet(_derive_key(settings.secret_key))

def encrypt(plain: str | None) -> str | None:
    if plain is None or plain == "":
        return plain
    return _fernet.encrypt(plain.encode("utf-8")).decode("utf-8")

def decrypt(token: str | None) -> str | None:
    if token is None or token == "":
        return token
    try:
        return _fernet.decrypt(token.encode("utf-8")).decode("utf-8")
    except InvalidToken as e:
        raise AppError("凭证解密失败", code="CREDENTIAL_DECRYPT_ERROR", status=500) from e

用户认证使用密码哈希和 JWT。业务数据层面,网盘账号、发布账号、文章、任务和系统配置都绑定 user_id。接口通过当前登录用户过滤数据,管理员接口则显式区分普通用户和管理员权限。

这个改造的收益很大。AI 配置不会串用户,网盘账号不会被其他用户使用,任务列表也只展示当前用户自己的任务。管理员可以配置系统级兜底能力,但不会破坏用户个人配置的优先级。

七、公众号文章改写:HTML、Markdown、图片和链接替换

公众号文章改写是项目里比较复杂的一条支线。它不是简单地把一个 URL 发给 AI,而是需要先把原文处理成可控的中间格式。

我把公众号文章处理拆成单独服务,核心职责包括:识别公众号文章 URL、抓取 HTML、定位正文节点、提取标题和作者、把 HTML 转成 Markdown、识别正文中的网盘链接、下载微信图片到本地静态目录,并在必要时调用图片编辑能力处理水印。

文章解析器基于 Python 标准库的 HTMLParser 实现,只在进入正文节点后才收集内容,并跳过脚本、样式、视频、音频等不需要进入 Markdown 的节点:

class _WeChatMarkdownParser(HTMLParser):
    def __init__(self) -> None:
        super().__init__(convert_charrefs=True)
        self.title_parts: list[str] = []
        self.author_parts: list[str] = []
        self.markdown_parts: list[str] = []
        self._title_depth = 0
        self._author_depth = 0
        self._content_depth = 0
        self._skip_depth = 0
        self._link_stack: list[str] = []
        self._list_stack: list[str] = []
        self._blockquote_depth = 0
        self._pre_depth = 0
        self._format_stack: list[str] = []

    @property
    def markdown(self) -> str:
        text = "".join(self.markdown_parts)
        text = _normalise_markdown(text)
        return text.strip()

    def handle_starttag(self, tag: str, attrs: list[tuple[str, str | None]]) -> None:
        attrs_map = {key.lower(): value or "" for key, value in attrs}
        node_id = attrs_map.get("id", "")
        tag = tag.lower()

        if node_id == "activity-name":
            self._title_depth = 1
            return
        if node_id == "js_name":
            self._author_depth = 1
            return
        if node_id == "js_content":
            self._content_depth = 1
        elif self._content_depth:
            self._content_depth += 1
        else:
            return

        if self._skip_depth:
            self._skip_depth += 1
            return
        if tag in SKIP_TAGS or (node_id != "js_content" and _is_hidden(attrs_map)):
            self._skip_depth += 1
            return

公众号改写和普通网盘任务最大的区别是,它需要在 Markdown 中替换链接。系统会先识别原文中的网盘链接,逐条转存,成功后替换成新的分享链接;如果某条链接转存失败,则从 Markdown 中移除,避免最终文章里残留不可用资源。

最近加入的“无网盘链接时仍改写”选项,也是基于这套流程扩展出来的。开启后,如果正文里没有识别到网盘链接,Pipeline 会跳过转存阶段,直接把 Markdown 交给 AI 重写。这样公众号能力就不再局限于资源文章,而是变成一个通用的文章改写入口。

八、多链接合集:从单资源到批量资源编排

单链接任务比较直接:一个链接对应一次转存和一篇文章。但多链接合集要复杂一些,因为每条链接可能来自不同平台,也可能需要使用不同账号。

我的处理方式是先对所有链接进行解析,得到每条链接对应的 Provider,再根据用户配置选择账号。如果用户为某个平台指定了账号,就使用指定账号;否则就按当前用户启用的同类型账号自动匹配。转存过程中每条链接单独记录成功或失败,多链接场景允许部分失败,最终文章只包含成功转存的资源。

在文章生成上,多链接合集支持 AI 生成和固定模板两种模式。AI 模式适合需要自然语言介绍的场景,固定模板模式适合批量发布、格式统一和低成本生成。这个设计让系统在“内容质量”和“生成稳定性”之间有了可选择空间。

九、文章发布与前端复制体验

生成文章只是第一步,最终还要让用户能方便地使用这些内容。因此文章管理页支持在线阅读、Markdown 原文查看和公众号排版复制。

公众号复制功能看似是前端细节,但实际影响很大。直接复制 Markdown 到公众号编辑器并不好用,所以我在文章详情页里生成富文本内容,并在复制前清理组件属性、补全图片和链接地址。如果浏览器支持 ClipboardItem,就写入 HTML;如果处在非安全上下文或浏览器不支持 Clipboard API,就回退到 DOM 选区复制。

发布层则继续使用 Publisher 抽象。微信公众号发布时,会经历 token 获取、图片上传、HTML 组装和草稿创建几个阶段,每个阶段都写入任务日志。这样当用户发布失败时,不会只看到“发布失败”,而是能知道失败发生在哪个阶段。

十、部署:单服务挂载前端和运行时资产

部署方式上,我最终选择让 FastAPI 同时提供 API、前端页面和运行时静态资源。前端构建后的 frontend/dist​ 挂载到根路径,AI 生成图片和公众号本地化图片挂载到 /static/assets

这让部署变得更简单:容器里只需要启动一个 Python 服务,用户访问同一个地址即可使用前端和 API。Dockerfile 则负责在构建阶段安装前端依赖并生成 dist,再复制到 Python runtime 阶段。为了避免浏览器缓存旧 chunk,前端静态文件挂载时还增加了 no-cache 处理。

这种方式不是所有项目都适合,但对这个工具来说很合适。它减少了前后端分开部署的复杂度,也方便在 1Panel、Docker Compose 或单机环境中快速运行。

十一、总结

回头看这个项目,最重要的技术决策不是某一个具体接口怎么调,而是把不稳定的外部能力都收敛到清晰的边界里。网盘平台的差异由 Provider 处理,内容平台的差异由 Publisher 处理,长流程由 Pipeline 承载,AI 返回格式由客户端内部兼容,公众号 HTML 解析由独立服务负责。

这些边界让项目在不断迭代时还能保持可维护性。新增网盘时,不需要改主链路;新增发布平台时,不需要改文章生成;AI 接口返回格式变化时,也不需要影响业务服务。即使外部平台经常出现验证码、异步任务、容量不足、空响应这类问题,系统也能通过日志、任务状态和降级策略给出可排查、可恢复的结果。

目前这个项目已经形成了从资源链接到文章生成,再到内容发布的完整闭环。后续我更想继续加强三个方向:第一,把后台任务队列和失败重试做得更系统;第二,增加更多文章模板和素材管理能力;第三,继续扩展 Provider 和 Publisher,让它覆盖更多资源平台和内容平台。