agentic_huge_data_base / wiki
页面 RAGFlow · 6.6 数据源连接器·DeepWiki 中文全文译文

6.6 · 数据源连接器(Data Source Connectors)

复杂文档理解与引用检索 · 聚焦本章的模块关系、源码依据与实现要点。

项目RAGFlow 章节6.6 状态全文译文 模块测试、发布与运维、界面与交互、认证、权限与安全、系统架构
源码线索
  • api/db/services/connector_service.py
  • common/constants.py
  • common/data_source/__init__.py
  • common/data_source/blob_connector.py
  • common/data_source/config.py
  • common/data_source/confluence_connector.py
  • common/data_source/discord_connector.py
  • common/data_source/interfaces.py
  • common/data_source/models.py
  • common/data_source/notion_connector.py
模块标签
  • 测试、发布与运维
  • 界面与交互
  • 认证、权限与安全
  • 系统架构
  • 存储与持久化

章节正文

数据源连接器

数据源连接器

相关源文件

以下文件为本维基页面的生成提供了上下文:

  • api/db/services/connector_service.py
  • common/constants.py
  • common/data_source/__init__.py
  • common/data_source/blob_connector.py
  • common/data_source/config.py
  • common/data_source/confluence_connector.py
  • common/data_source/discord_connector.py
  • common/data_source/interfaces.py
  • common/data_source/models.py
  • common/data_source/notion_connector.py
  • common/data_source/seafile_connector.py
  • common/data_source/sharepoint_connector.py
  • common/data_source/slack_connector.py
  • common/data_source/teams_connector.py
  • common/data_source/utils.py
  • rag/svr/sync_data_source.py
  • test/unit_test/common/test_blob_connector_fingerprint.py
  • web/src/assets/svg/data-source/seafile.svg
  • web/src/components/dynamic-form.tsx
  • web/src/pages/dataflow-result/components/parse-editer/index.tsx
  • web/src/pages/dataset/dataset-overview/dataset-common.ts
  • web/src/pages/dataset/dataset-overview/overview-table.tsx
  • web/src/pages/dataset/dataset-setting/components/link-data-source.tsx
  • web/src/pages/user-setting/data-source/add-datasource-modal.tsx
  • web/src/pages/user-setting/data-source/component/added-source-card.tsx
  • web/src/pages/user-setting/data-source/component/box-token-field.tsx
  • web/src/pages/user-setting/data-source/component/gmail-token-field.tsx
  • web/src/pages/user-setting/data-source/component/google-drive-token-field.tsx
  • web/src/pages/user-setting/data-source/constant/index.tsx
  • web/src/pages/user-setting/data-source/constant/s3-constant.tsx
  • web/src/pages/user-setting/data-source/constant/seafile-constant.tsx
  • web/src/pages/user-setting/data-source/data-source-detail-page/index.tsx
  • web/src/pages/user-setting/data-source/hooks.ts
  • web/src/pages/user-setting/data-source/index.tsx
  • web/src/services/data-source-service.ts

RAGFlow 提供了一个强大的外部数据源连接器系统,允许用户将来自 30 多个外部平台的内容直接同步到知识库中。该系统可以自动完成文档、电子表格、代码仓库和通信日志的入库,并将其集成到 RAGFlow 的文档处理管线中。

概述

连接器系统基于多层架构构建,负责处理认证、定期轮询、增量同步和错误恢复。它将各种 API(REST、GraphQL、WebDAV 等)抽象为统一的 Document 模型,RAGFlow 可以对其进行解析和索引。

支持的数据源

支持的数据源在 FileSource 枚举 common/constants.py:114-147DocumentSource 枚举 common/data_source/config.py:41-71 中定义,包括:

  • 存储:S3、Google Cloud Storage、OCI Storage、R2、Dropbox、Box、Seafile、WebDAV。
  • 协作:Notion、Confluence、Jira、Airtable、Asana、Zendesk。
  • 通信:Slack、Discord、Gmail、IMAP。
  • 开发:GitHub、GitLab、Bitbucket。
  • 数据库:MySQL、PostgreSQL。

系统架构与数据流

连接器系统在外部 API 空间与 RAGFlow 的内部存储及任务执行层之间架起了桥梁。

代码实体映射

下图展示了逻辑组件与其具体代码实现之间的对应关系。

图:连接器系统代码实体

RAGFlow · 代码实体映射 · 图 1
RAGFlow · 代码实体映射 · 图 1

来源:web/src/pages/user-setting/data-source/constant/index.tsx:18-50api/db/services/connector_service.py:35-36rag/svr/sync_data_source.py:85-92common/data_source/__init__.py:26-47

同步管线

同步过程由 rag/svr/sync_data_source.py 管理。它作为一个后台工作进程运行,负责轮询已调度的任务。该工作进程使用名为 task_limiterasyncio.Semaphore 来控制并发度,默认允许 5 个并发任务 rag/svr/sync_data_source.py:81-82

图:同步管线数据流

RAGFlow · 同步管线 · 图 2
RAGFlow · 同步管线 · 图 2

来源:rag/svr/sync_data_source.py:129-160api/db/services/connector_service.py:142-160rag/svr/sync_data_source.py:81-82

核心组件

1. 连接器接口与模型

每个连接器都必须实现或扩展 common/data_source/ 中的逻辑。关键模型包括:

  • Document:标准数据结构,包含 idblob(内容)、extensionsize_bytesmetadata common/data_source/models.py:89-102
  • SyncBase:服务器层同步任务的基类 rag/svr/sync_data_source.py:85
  • BlobStorageConnector:针对 S3、R2 和 GCS 等对象存储源的特化基类 common/data_source/blob_connector.py:52-70
  • TextSectionImageSection:用于从 Discord 或 Notion 等数据源进行结构化提取 common/data_source/models.py:77-87
2. 数据库服务

ConnectorServiceSyncLogsService 负责管理连接器配置和执行历史的持久化。

  • ConnectorService.resume():切换连接器的状态,并调度或更新任务 api/db/services/connector_service.py:38-57
  • ConnectorService.rebuild():删除与连接器关联的所有文档,并调度一次完整的重新索引 api/db/services/connector_service.py:72-81
  • SyncLogsService.list_sync_tasks():根据 refresh_freq 检索需要轮询的任务 api/db/services/connector_service.py:146-190
  • ConnectorService.cleanup_stale_documents_for_task():通过对比远程快照(file_list),识别并移除 RAGFlow 中已不存在于外部数据源的文档 api/db/services/connector_service.py:84-139
3. 认证与 OAuth

RAGFlow 为 Google Drive、Gmail 和 Box 等服务处理复杂的 OAuth2 流程。

  • 令牌处理:像 OnyxConfluence 这样的连接器实现了自定义的凭证续期逻辑,并使用 Redis 实现分布式锁 common/data_source/confluence_connector.py:126-176
  • 凭证管理CredentialsProviderInterface 定义了连接器如何动态或静态地获取密钥 common/data_source/confluence_connector.py:35-42
4. 前端集成

前端使用 DataSourceKey 枚举来管理 30 多个数据源的可见性和配置 web/src/pages/user-setting/data-source/constant/index.tsx:18-50

  • 功能可见性DataSourceFeatureVisibilityMap 决定了哪些数据源支持 syncDeletedFiles 等高级功能 web/src/pages/user-setting/data-source/constant/index.tsx:58-139
  • 图标系统:系统使用集中的 generateDataSourceInfo 函数为 UI 提供名称、描述和图标 web/src/pages/user-setting/data-source/constant/index.tsx:152-240

实现细节:Confluence、Discord 与关系型数据库

Confluence 连接器
  • 扩展支持:使用 body.storage.valuehistory.lastUpdated 等特定字段来获取完整的页面内容 common/data_source/config.py:97-105
  • 增量同步:使用 CONFLUENCE_SYNC_TIME_BUFFER_SECONDS 确保在重叠的轮询窗口内不会遗漏任何更新 common/data_source/config.py:198-200
  • 速率限制OnyxConfluence 实现了内置的回退和重试逻辑,以管理 Atlassian API 的交互 common/data_source/confluence_connector.py:63-116
Discord 连接器
  • 消息转换:将 Discord 消息和线程历史转换为 Document 对象,并附带"频道"和"线程"等元数据 common/data_source/discord_connector.py:29-75
  • 异步桥接:使用 _manage_async_retrieval 将异步的 discord.py 客户端桥接到同步管线所需的同步迭代模式中 common/data_source/discord_connector.py:154-188
RDBMS 连接器(MySQL/PostgreSQL)
  • 灵活入库:支持从自定义 SQL 查询或整个表中读取行数据 common/data_source/rdbms_connector.py:176-179
  • 增量同步:使用 timestamp_column 作为有序游标,仅获取已更新的行 common/data_source/rdbms_connector.py:69-70
  • ID 映射:允许将特定的数据库列映射到文档 ID,或从内容生成哈希值 common/data_source/rdbms_connector.py:38-40
对象存储连接器
  • 多提供商支持:通过统一的 BlobStorageConnector 支持 S3、R2、Google Cloud Storage、OCI Storage 和 S3 兼容存储 common/data_source/blob_connector.py:52-64
  • 过滤:使用 BLOB_STORAGE_SIZE_THRESHOLD 实现基于文件大小的过滤 common/data_source/blob_connector.py:162-170
  • 认证:支持多种 S3 认证方式,包括 access_keyiam_roleassume_role common/data_source/blob_connector.py:97-115

配置常量

连接器受 common/data_source/config.py 中的多个环境变量和常量控制:

  • INDEX_BATCH_SIZE:每批处理的文档数量(默认值:2)common/data_source/config.py:109
  • REQUEST_TIMEOUT_SECONDS:外部 API 请求的全局超时时间(默认值:60 秒)common/data_source/config.py:17
  • CONTINUE_ON_CONNECTOR_FAILURE:如果为 true,单个文档失败不会停止整个同步任务 common/data_source/config.py:144-147
  • DOWNLOAD_CHUNK_SIZE:文件下载的 1MB 缓冲区 common/data_source/config.py:122

来源:rag/svr/sync_data_source.py:21-160api/db/services/connector_service.py:35-190common/data_source/models.py:77-102common/data_source/config.py:17-210common/constants.py:84-147web/src/pages/user-setting/data-source/constant/index.tsx:18-240common/data_source/confluence_connector.py:35-176common/data_source/blob_connector.py:52-170common/data_source/discord_connector.py:29-188common/data_source/rdbms_connector.py:38-179common/data_source/__init__.py:26-92