Zenn
🤖

Cloud RunにExternal Knowledge APIをデプロイしてDifyからRAGとして利用する方法

に公開

こんにちは。Acompany 入社12ヶ月目のハルカです。
昨日、東京から名古屋に出社して退勤後にフットサル部に参加したせいで全身が痛いのですが、かろうじて指は動くので記事を書きます。弊社では多くの部活動があり、フットサル部はその一つです。昨日は記念すべき第0回目でしたので2時間半かけて出勤して、約1時間参加してきました!このあたりの雰囲気は弊社のカルチャーデックをご覧ください。

はじめに

近頃、Dify を使ったAIワークフローが注目を集めています。弊社でも社内にDifyを導入し、業務効率化を図っています。
今回は、Difyのナレッジベース機能を拡張するために、外部のデータソースを取り込む方法を紹介します。具体的には、Difyの External Knowledge API を利用してRAG(Retrieval-Augmented Generation)APIをCloud Run上に構築し、標準のナレッジベースでは登録できないデータを取り込む方法を解説します。
https://github.com/langgenius/dify
https://docs.dify.ai/guides/knowledge-base/external-knowledge-api-documentation

なぜExternal Knowledge APIが必要か

Difyには標準でナレッジベース機能が備わっており、NotionやPDF、テキストファイルなどを登録してRAGとして利用できます。しかし、一部のNotionデータや社内DBに存在するデータなど、Difyの標準機能では直接取得できない場合があります。そのような場合、外部APIを用意してDifyに接続することで、様々なデータを取り込むことができます。

取得できないNotionデータの例

Difyのナレッジベース機能で取得できるのは、Notionページのタイトルと本文です。Notionのデータベース(DB)で プロパティのみ を使ってテーブルを作っている場合、本文が空のページとして扱われてしまいます。その結果、 ページ本文に情報がない=実質何も取り込めない状態 になってしまうのです。

Notion DBの例

ページが空の例

NotionのDBでプロパティだけを使ってテーブルを作成している場合、ページ本文が空になります。
Notion DBのページ。本文が空になっている

同様に、社内の他のDBやシステムで管理しているデータについても、Dify標準のコネクタでは取得できないケースがあります。こうした場合、自前でAPIを用意し、Difyに接続する方法が解決策となります。

External Knowledge APIの仕様

Difyの公式ドキュメント(External Knowledge API Documentation)によると、リクエストおよびレスポンス形式は以下のように定義されています。

リクエスト例

POST <your-endpoint>/retrieval HTTP/1.1
-- header
Content-Type: application/json
Authorization: Bearer your-api-key
-- data
{
    "knowledge_id": "your-knowledge-id",
    "query": "your question",
    "retrieval_setting":{
        "top_k": 2,
        "score_threshold": 0.5
    }
}

レスポンス例

HTTP/1.1 200
Content-type: application/json
{
    "records": [
        {
            "metadata": {
                "path": "s3://dify/knowledge.txt",
                "description": "dify knowledge document"
            },
            "score": 0.98,
            "title": "knowledge.txt",
            "content": "This is the document for external knowledge."
        },
        {
            "metadata": {
                "path": "s3://dify/introduce.txt",
                "description": "dify introduce"
            },
            "score": 0.66,
            "title": "introduce.txt",
            "content": "The Innovation Engine for GenAI Applications"
        }
    ]
}

つまり、Dify側からクエリが与えられたときに、それに合致する情報(複数ドキュメント)をrecordsという配列で返してあげればよい、という仕組みです。
このAPIをFastAPIで実装し、GCPのCloud Runにデプロイして外部公開する流れをみていきます。

ディレクトリ構成と環境構築

ディレクトリ構成

今回の実装例では、以下のようなディレクトリ構成を想定します。なお、ここでは最終的に Cloud Run にデプロイするためのファイルも含まれています。

.
├── Dockerfile
├── README.md
├── main.tf               # Terraform用
├── pyproject.toml
├── src
│   └── api
│       ├── __init__.py
│       ├── db
│       │   ├── __init__.py
│       │   ├── base_db.py
│       │   └── notion_db.py
│       └── server.py     # ここでFastAPIを実装
├── terraform.tfstate
├── terraform.tfvars
└── uv.lock

環境構築

また、上記構成から分かるように、Pythonのパッケージ管理にはuvを使用します。pyproject.tomlに必要なパッケージを記述し、uv syncコマンドで環境を構築します。

pyproject.toml
[project]
name = "rag-api"
version = "0.1.0"
description = "Add your description here"
readme = "README.md"
requires-python = ">=3.12.3"
dependencies = [
    "fastapi>=0.115.11",
    "llama-index>=0.12.23",
    "llama-index-embeddings-huggingface>=0.5.2",
    "llama-index-readers-file>=0.4.6",
    "python-dotenv>=1.0.1",
    "requests>=2.32.3",
    "uvicorn>=0.34.0",
]

[dependency-groups]
dev = [
    "pytest>=8.3.5",
]

https://github.com/astral-sh/uv

必要なファイルとポイント解説

このプロジェクトで重要となるファイルやポイントを解説します。

1. FastAPIアプリケーション (server.py)

RAG APIを実装するメインのファイルです。

  • ここではFastAPIを使ってAPIサーバを起動します。
  • 環境変数からNotionのデータベースIDやAPIキーを取得し、必要に応じて初期化処理を行います。
  • /retrieval エンドポイントを実装し、Difyからのリクエストを受けた際に検索を行ってレスポンスを返します。
  • 検索部分は llama_index を使い、Notionから取得した情報をベクトル検索できるようにしています。
server.py
# src/api/server.py
import logging
import os
from contextlib import asynccontextmanager
from typing import Any, Dict, List

from dotenv import find_dotenv, load_dotenv
from fastapi import FastAPI, HTTPException, Request
from llama_index.core import Document, VectorStoreIndex
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
from pydantic import BaseModel

from .db.notion_db import NotionDB

logging.basicConfig(level=logging.INFO)

dotenv_path = find_dotenv()
if dotenv_path:
    logging.info(f"Loading environment variables from: {dotenv_path}")
    load_dotenv(dotenv_path)


@asynccontextmanager
async def lifespan(app: FastAPI):
    src_type = os.getenv("RAG_SRC_TYPE", "notion")  # データソースの種別
    try:
        app.state.rag_index = get_rag_db(src_type)
        logging.info(f"Knowledge index built successfully using source: {src_type}")
    except Exception as ex:
        logging.exception("Failed to build index")
        raise ex  # 初期化失敗時は例外を再送出する
    yield


app = FastAPI(lifespan=lifespan)


def get_rag_db(src_type: str) -> VectorStoreIndex:
    """
    src_type: データソースの種別
    """
    embed_model_name = os.getenv("EMBED_MODEL", "all-MiniLM-L6-v2")
    logging.info(f"Embedding model: {embed_model_name}")
    embedding_model = HuggingFaceEmbedding(
        model_name=embed_model_name, trust_remote_code=True
    )
    documents = []

    def process_documents(documents: List[Dict[str, Any]]) -> List[Document]:
        return [
            Document(text=doc.get("text", ""), metadata=doc.get("metadata", {}))
            for doc in documents
        ]

    if src_type.lower() == "notion":
        documents = NotionDB().get_db()
    # 他のデータソースを追加する場合は以下に処理を追加
    # elif src_type.lower() == "GoogleDrive":
    #     documents = get_google_drive_db()
    else:
        raise ValueError(f"Unsupported source type: {src_type}")

    if not documents:
        raise ValueError("No documents found in the source data.")

    return VectorStoreIndex.from_documents(
        process_documents(documents), embed_model=embedding_model
    )


# -------------------------------
# Pydantic モデル定義
# -------------------------------
class RetrievalSetting(BaseModel):
    top_k: int = 2
    score_threshold: float = 0.5


class RetrievalRequest(BaseModel):
    knowledge_id: str
    query: str
    retrieval_setting: RetrievalSetting


# -------------------------------
# /retrieval エンドポイント
# -------------------------------
@app.post("/retrieval")
async def retrieval_endpoint(retrieval_request: RetrievalRequest, request: Request):
    expected_knowledge_id = os.getenv("KNOWLEDGE_ID", "default")
    if retrieval_request.knowledge_id != expected_knowledge_id:
        raise HTTPException(status_code=400, detail="knowledge_id が不正です。")
    if (
        not hasattr(request.app.state, "rag_index")
        or request.app.state.rag_index is None
    ):
        raise HTTPException(
            status_code=500, detail="知識ベースのインデックスが構築されていません。"
        )
    rag_index = request.app.state.rag_index
    try:
        top_k = retrieval_request.retrieval_setting.top_k
        # VectorStoreIndex から retriever を作成して、クエリを実行
        retriever = rag_index.as_retriever(similarity_top_k=top_k)
        source_nodes = retriever.retrieve(retrieval_request.query)
        records = []
        for node in source_nodes:
            score = getattr(node, "score", 0.0)
            if score < retrieval_request.retrieval_setting.score_threshold:
                continue
            doc = node.node
            record = {
                "metadata": doc.metadata,
                "score": score,
                "title": doc.metadata.get("title", ""),
                "content": doc.text,
            }
            records.append(record)
        return {"records": records}
    except Exception:
        logging.exception("Error occurred in retrieval endpoint")
        raise HTTPException(status_code=500, detail="内部エラーが発生しました。")

2. データ取得 (base_db.py / notion_db.py)

NotionのDBからデータを取得する部分のクラスを定義しています。
base_db.pyは抽象クラス (BaseDB) と、各ドキュメントのデータ構造 (BaseItem) を定義して、上述のレスポンス形式に必要な情報を取得できるようにしています。
notion_db.pyでは、BaseDBを継承してNotionのDBからデータを取得するクラス (NotionDB) を実装しています。
また、Notion DBのプロパティをパースするための関数 (parse_properties) も定義しています。

base_db.py
# src/api/db/base_db.py
from abc import ABC, abstractmethod
from typing import Any, Dict, List


class BaseItem:
    def __init__(self, text: str, metadata: Dict[str, Any]) -> None:
        self.text = text
        self.metadata = metadata

    def to_dict(self) -> dict:
        return {"text": self.text, "metadata": self.metadata}


class BaseDB(ABC):
    @abstractmethod
    def get_db(self) -> List[Dict[str, Any]]:
        """
        サブクラスで実装してください。
        """
        raise NotImplementedError("サブクラスで実装してください")
notion_db.py

この例では、プロパティにID質問項目回答例を持つNotionのDBを想定しています。このプロパティを含まないDBだとエラーになるので、適宜変更してください。

import os
from typing import Any, Dict, List

import requests
from fastapi import HTTPException

from .base_db import BaseDB, BaseItem


def parse_properties(
    properties: Dict[str, Any], valid_keys: List[str]
) -> Dict[str, str]:
    """
    Notionのレコードプロパティを解析し、有効なキーに対応する特定の値を抽出します。

    パラメータ:
        properties (Dict[str, Any]): Notionレコードのプロパティを表す辞書です。
            各キーはプロパティ名であり、各値はそのプロパティのデータを含む辞書です。
        valid_keys (List[str]): 処理対象となるプロパティ名のリストです。

    戻り値:
        Dict[str, str]: 各処理済みキーとその対応する文字列値をマッピングした辞書です。

    動作:
        - キーが 'ID' の場合: 'unique_id' というサブ辞書が存在する場合、'prefix' と 'number'
          の値を組み合わせます(両方が存在する場合)または 'number' のみを使用します。
        - その他のキー(例: 'title' や 'rich_text'):これらのフィールドを確認し、
          各アイテムの 'plain_text' 値を改行で連結します。
    """
    parsed_props: Dict[str, str] = {}
    for key, value in properties.items():
        if key in valid_keys:
            if key == "ID":
                if "unique_id" in value:
                    prefix = value["unique_id"].get("prefix", "")
                    number = value["unique_id"].get("number", "")
                    parsed_props[key] = (
                        f"{prefix}-{number}" if prefix and number else f"{number}"
                    )
            else:
                for field in ["title", "rich_text"]:
                    if field in value and value[field]:
                        parsed_props[key] = "\n".join(
                            item.get("plain_text", "") for item in value[field]
                        )
                        break
    return parsed_props


class NotionItem(BaseItem):
    def __init__(self, title: str, content: str, notion_data: dict) -> None:
        text = f"質問項目:{title}\n\n回答:{content}"
        metadata = {
            "id": notion_data.get("id", ""),
            "url": notion_data.get("url", ""),
            "path": notion_data.get("url", ""),
            "description": content,
            "title": title,
        }
        super().__init__(text, metadata)


class NotionDB(BaseDB):
    def get_db(self) -> List[Dict[str, Any]]:
        """
        Notion DB からデータを取得し、リストで返す。
        """
        DATABASE_ID = os.getenv("NOTION_DATABASE_ID")
        KEY = os.getenv("NOTION_INTEGRATION_KEY")
        property_map = {
            "id": os.getenv("NOTION_ID_FIELD", "ID"),
            "title": os.getenv("NOTION_TITLE_FIELD", "質問項目"),
            "content": os.getenv("NOTION_CONTENT_FIELD", "回答例"),
        }

        if not DATABASE_ID or not KEY:
            raise ValueError(
                "NOTION_DATABASE_ID または NOTION_INTEGRATION_KEY が設定されていません。"
            )

        url = f"https://api.notion.com/v1/databases/{DATABASE_ID}/query"
        headers = {
            "Accept": "application/json",
            "Notion-Version": "2022-06-28",
            "Authorization": f"Bearer {KEY}",
        }

        try:
            with requests.Session() as session:
                response = session.post(url, headers=headers, json={})
                response.raise_for_status()
                res_dict = response.json()
        except requests.HTTPError as http_err:
            status_code = (
                http_err.response.status_code if http_err.response is not None else 500
            )
            raise HTTPException(
                status_code=status_code,
                detail=f"Notion APIへのリクエストに失敗しました: {http_err}",
            ) from http_err
        except requests.RequestException as req_err:
            raise HTTPException(
                status_code=500,
                detail=f"Notion APIへのリクエスト中にエラーが発生しました: {req_err}",
            ) from req_err

        results = []
        # keys_list は property_map で指定した Notion 側のプロパティ名リスト
        keys_list = list(property_map.values())
        for r in res_dict.get("results", []):
            refs = parse_properties(r["properties"], keys_list)
            if len(refs) == len(property_map):
                title_val = refs.get(property_map["title"], "")
                content_val = refs.get(property_map["content"], "")
                item = NotionItem(title_val, content_val, r)
                results.append(item.to_dict())
        return results


if __name__ == "__main__":
    notion_db = NotionDB()
    try:
        db_items = notion_db.get_db()
        print("取得したデータ:")
        for item in db_items:
            print(item.to_dict())
    except Exception as e:
        print("エラーが発生しました:", e)

Notionのインテグレーションの準備

Notion DBを使ったAPIを構築するためには、Notionのインテグレーションキー(NOTION_INTEGRATION_KEY)とデータベースID(NOTION_DATABASE_ID)が必要です。
以下のNotionのページから、インテグレーションを作成し、キーを取得してください。
データベースIDは、NotionのデータベースのURLの一部です。
例:https://www.notion.so/name/xxx...xxx?v=yyy...yyy の場合、xxx...xxx がデータベースIDになります。
注意する点として、データベースIDはDBが表示されている「ページ」のIDではなく、DBを「フルページで表示」したときのURLに含まれるIDです。
https://www.notion.so/profile/integrations

3. Dockerfile

FastAPIアプリをコンテナ化するためのDockerfileです。uvを使ってPython環境を構築し、uvicornでサーバを起動します。今回はこのDockerfileをビルドしてCloud Runへデプロイします。

Dockerfile
FROM ghcr.io/astral-sh/uv:debian-slim

# The installer requires curl (and certificates) to download the release archive
RUN apt-get update && apt-get install -y --no-install-recommends git curl vim ca-certificates libcairo2

ENV PYTHONDONTWRITEBYTECODE=1
ENV PYTHONUNBUFFERED=1

WORKDIR /opt
COPY ./pyproject.toml ./uv.lock ./README.md ./
COPY src/ src/

RUN uv sync --frozen --no-cache

EXPOSE 8080

ENTRYPOINT ["uv", "run",  "uvicorn", "src.api.server:app", "--reload", "--port", "8080", "--host", "0.0.0.0"]

4. Terraformファイル (main.tf / terraform.tfvars)

Cloud RunにデプロイするためのTerraform設定です。

  • google_cloud_run_service リソースを利用してCloud Runサービスを作成します。
  • google_cloud_run_service_iam_member を使うことで、誰でもアクセスできるようallUsersinvokerに付与します。
  • tfvarsファイルにはプロジェクトIDやNotionのDB ID、Embeddingモデル名などを記述しています。
main.tf
provider "google" {
    project = var.project_id
    region  = var.region
}

variable "project_id" {
    description = "GCPプロジェクトID"
    type        = string
}

variable "region" {
    description = "Cloud Runをデプロイするリージョン"
    type        = string
    default     = "asia-northeast1"
}

variable "notion_database_id" {
    description = "NotionのデータベースID"
    type        = string
    default = "pkshatech/RoSEtta-base-ja"
}

variable "notion_integration_key" {
    description = "Notionのインテグレーションキー"
    type        = string
    sensitive   = true
}

variable "embed_model_name" {
    description = "Embeddingモデルの名前"
    type        = string
}

resource "google_cloud_run_service" "default" {
  name     = "rag-api-service"
  location = var.region

  template {
    spec {
      containers {
        image = "asia-northeast1-docker.pkg.dev/autoprivacy-dev/product-discovery/rag-api:latest"
        ports {
          container_port = 8080
        }
        env {
          name  = "NOTION_DATABASE_ID"
          value = var.notion_database_id
        }
        env {
          name  = "NOTION_INTEGRATION_KEY"
          value = var.notion_integration_key
        }
        env {
          name  = "EMBED_MODEL"
          value = var.embed_model_name
        }
        resources {
          limits = {
            memory = "4Gi"
            cpu    = "2"
          }
        }
      }
    }
  }

  traffic {
    percent         = 100
    latest_revision = true
  }
}

resource "google_cloud_run_service_iam_member" "noauth" {
  location = google_cloud_run_service.default.location
  project  = var.project_id
  service  = google_cloud_run_service.default.name

  role   = "roles/run.invoker"
  member = "allUsers"
}
terraform.tfvars

以下のように、terraform.tfvarsに環境変数を記述します。
EMBED_MODEL (embed_model_name) は、HuggingFaceのモデル名を指定します。ここでは、all-MiniLM-L6-v2ではなく、日本語に強いpkshatech/RoSEtta-base-jaを指定しています。

project_id      = "your-project-id"
region          = "asia-northeast1"
notion_database_id      = "your-notion-db-id"
notion_integration_key  = "your-notion-integration-key"
embed_model_name =  "pkshatech/RoSEtta-base-ja"

場合によっては、VPCやIAMポリシーなどの設定も追加するとよいでしょう。

ローカルでの動作確認

デプロイ前に、ローカルで一度起動してテストしておくことをおすすめします。以下のような手順で確認できます。

  1. 環境変数ファイル.envを準備
NOTION_DATABASE_ID="YOUR_NOTION_DB_ID"
NOTION_INTEGRATION_KEY="YOUR_NOTION_INTEGRATION_KEY"
KNOWLEDGE_ID="default"
# EMBED_MODEL="all-MiniLM-L6-v2"
RAG_SRC_TYPE="notion"
  1. Dockerビルド&起動 (ローカルで実行する場合)
docker build -t local-rag-api:latest .
docker run --rm -p 8080:8080 --env-file .env local-rag-api:latest
  1. APIリクエストを投げて確認:
curl -X POST http://localhost:8080/retrieval \
     -H "Content-Type: application/json" \
     -H "Authorization: Bearer your-api-key" \
     -d '{
         "knowledge_id": "default",
         "query": "Notionデータについて",
         "retrieval_setting": {
             "top_k": 2,
             "score_threshold": 0.5
         }
     }'

レスポンスにrecords配列が含まれていればOKです。

レスポンス例
{"records":[{"metadata":{"id":"xxxxxx....","url":"https://www.notion.so/xxx...","path":"https://www.notion.so/xxx...","description":"フォード マスタング","title":"好きな車は?"},"score":1.0,"title":"好きな車は?","content":"質問項目:好きな車は?\n\n回答:フォード マスタング"},...]}%

Cloud Runへのデプロイ

本番運用などでパブリックに公開したい場合は、Terraformやgcloud CLIを使ってCloud Runへデプロイします。ここではTerraformの流れを簡単に記します。

1. Container RegistryまたはArtifact RegistryにDockerイメージをpush

例: Artifact Registryにpushする場合

gcloud auth configure-docker asia-northeast1-docker.pkg.dev
docker build -t asia-northeast1-docker.pkg.dev/<PROJECT_ID>/<REPO_NAME>/rag-api:latest .
docker push asia-northeast1-docker.pkg.dev/<PROJECT_ID>/<REPO_NAME>/rag-api:latest

2. Terraform実行

main.tfやterraform.tfvarsに適切な値を設定したあと、以下を実行します。

terraform init
terraform apply
# (設定内容を確認後)
# yes

3. Cloud RunのURLを確認

  • デプロイが成功すると、Terraformの出力やCloud Consoleの画面でエンドポイントがわかります。
  • https://rag-api-service-xxxxxxx-uc.a.run.appのようなURLが割り当てられます。

4. 動作確認

  • ローカルテスト同様にcurlなどで確認します。https://rag-api-service-xxxxxxx-uc.a.run.app/retrievalにPOSTリクエストを送信して、正常にレスポンスが返ってくることを確認します。
  • DifyにこのURLをExternal Knowledge APIとして登録すれば、外部のNotion DB等の情報をDifyに問い合わせできるようになります。

Difyへの連携設定

Difyのダッシュボードで、ナレッジ -> 外部ナレッジ連携API -> 外部ナレッジ連携APIを追加 から、先ほどデプロイしたAPIを登録します。

  1. Name: 任意の名前を入力します。
  2. API Endpoint: 先ほど取得したCloud Runのエンドポイント(https://rag-api-service-xxxxxxx-uc.a.run.app/retrieval)を入力します。
  3. API Key: 今回のコードでは設定していないので適当な文字列を入力します。必要に応じて追加で実装してください。

DifyのExternal Knowledge設定画面
DifyのExternal Knowledge設定画面

次に、ナレッジ -> ナレッジを作成 -> 外部ナレッジベースと連携 から、先ほど登録したAPIを使ってナレッジを作成します。外部ナレッジIDはserver.pyの実装に合わせてdefaultを指定します。
Difyのナレッジ作成画面
登録後、新しい質問を投げてみて、Notionのレコードが参照されることを確認してください。以下の右側のようなレスポンスが返ってくれば成功です。Cloud Runのインスタンスを0にしているので、初回はレスポンスが遅いかもしれません。
Difyのナレッジ設定画面

最後に、これをRAGとして用いてチャットができるかを確認してみましょう。
Difyのスタジオから適当にチャットボットを作成し、コンテキストに先程作成したナレッジを設定します。右側のデバッグとプレビューでチャットを投げると、ナレッジのデータを参照して回答が返ってくることが確認できます。
Difyのスタジオ画面

まとめ

  • FastAPI + llama_index 等を組み合わせ、RAG APIを自前で作成し、Cloud Run 上にデプロイして公開しました。
  • Dify の標準ナレッジベース機能で取れないデータは、External Knowledge API を利用することで解決できました。

これによって、Notion DBの特殊なテーブル構造や社内独自DBなども柔軟にDifyへ取り込むことができます。社内向けのFAQ自動回答やドキュメント検索などの用途に活用できるはずです。
ぜひ試してみてください!

以上が、DifyのExternal Knowledge API用RAG APIをCloud Runへデプロイする流れの解説でした。コードや設定値はあくまで一例ですので、自社の運用形態にあわせてカスタマイズしてみてください。何か参考になれば幸いです。

おわりに

Acompany では、我々と一緒にプライバシーテックの領域で世界を目指してくれるメンバーを募集しています。まずは、カジュアル面談で Acompany という会社のことを知ってもらいたいです。
https://recruit.acompany.tech/#84ba9c895d464495a63fd437bac75431

また、弊社の他のブログは以下エンジニアブログハブから見られます!
https://engineering.acompany.tech/

Discussion

ログインするとコメントできます