Azure AI Searchカスタムスキルの作成
前置き
前回の記事では、ノーコードでAzure上にRAG環境を構築する方法をご紹介しました。
今回は、ローコードでRAGの精度を改善する方法として、Azure AI Searchのスキルセットにカスタムスキルを追加する方法を解説します。
次の図は、前回の記事と今回の記事で構築するRAGパイプラインです。
スキル1〜3は前回の記事で、スキル4は今回の記事で作成できます。
スキルセット
Azure AI Searchのスキルセットは、データのインデックス作成プロセスにおいてAIを活用してデータを加工・強化(エンリッチメント)するための再利用可能なオブジェクトです。以下にその詳細を説明します。
概要
- スキルセットの構成: スキルセットは、1つ以上の「スキル」で構成されます。スキルは、データを変換または処理するための個別の操作を指します。これには、組み込みスキルやカスタムスキルが含まれます。
- エンリッチメントの目的: スキルセットは、外部データソースから取得したドキュメントを処理し、検索インデックスに格納する前にデータを強化します。これにより、検索精度やデータの有用性が向上します。
スキルの種類
-
組み込みスキル:
- Microsoftが提供する事前トレーニング済みのAIモデルを使用します。
- 例: 言語検出、キーフレーズ抽出、感情分析、翻訳、光学文字認識(OCR)など。
-
カスタムスキル:
- ユーザーが独自に開発したスキルを外部APIとして利用可能。
- 例: 特定の業務要件に応じたデータ処理や外部AIモデルの統合。
-
ユーティリティスキル:
- Azure AI Search内部で実行される軽量な処理。課金対象外のものが多い。
ユースケース
- テキスト分割: 長文を小さなセクションに分割し、検索精度を向上。
- 翻訳: 多言語データを統一的に処理。
- 画像処理: OCRを用いて画像からテキストを抽出。
- カスタム処理: 特定の業務要件に応じたデータ加工(例: 特定のエンティティ抽出)。
設定
スキルセットはJSON形式で定義され、以下の要素を含みます:
- スキル配列: 実行するスキルのリスト。
- inputs/outputs設定: データの流れを定義。
- Cognitive Services: Azure AIサービスを利用する場合の設定。
{
"skills": [
{
"@odata.type": "#Microsoft.Skills.Custom.WebApiSkill",
"name": "myCustomSkill",
"description": "This skill calls an Azure function, which in turn calls TA sentiment",
"uri": "https://indexer-e2e-webskill.azurewebsites.net/api/DateExtractor?language=en",
"context": "/document",
"httpHeaders": {
"DateExtractor-Api-Key": "foo"
},
"inputs": [
{
"name": "contractText",
"source": "/document/content"
}
],
"outputs": [
{
"name": "contractDate",
"targetName": "date"
}
]
}
]
}
カスタムスキル
Azure AI Searchのカスタムスキルは、Azure AI Searchのパイプラインにおいて、AI Searchの外部でホストされるカスタムコードを利用して特定の処理を実行する機能です。この機能により、組み込みスキルでは対応できない独自のデータ処理やエンリッチメントを実現できます。
概要
- 目的: Azure AI Searchのインデクサーが取得したデータを外部のWeb APIに送信し、カスタム処理を実行して結果を返す仕組みです。
-
利用例:
- 特定のデータの分類やパターンマッチング。
- Azure OpenAIを利用したベクトル化処理。
- OCR結果のカスタム処理やエンティティ抽出。
構成要素
-
スキルセット:
- スキルセットは、複数のスキル(組み込みスキルやカスタムスキル)を組み合わせた処理パイプラインです。
- 各スキルは、入力(データソース)と出力(処理結果)を持ち、スキルセット内で順次実行されます。
-
カスタムスキルの定義:
- カスタムスキルはJSON形式で定義され、以下の要素を含みます:
- URI: 外部Web APIのエンドポイント。
- 入力: スキルに渡すデータ(例: ドキュメントのテキスト)。
- 出力: スキルの処理結果(例: 抽出されたエンティティ)。
- 認証情報: Azure Functionsや他のサービスと連携する場合、認証トークンやマネージドIDを使用可能。
- カスタムスキルはJSON形式で定義され、以下の要素を含みます:
-
ホスティング:
- カスタムスキルのコードはAzure Functionsやコンテナでホストされることが一般的です。
作成手順
-
スキルの設計:
- 必要な処理内容を明確化し、入力データと出力データを定義します。
-
Web APIの開発:
- 必要な処理を実行するWeb APIを開発し、Azure Functionsなどにデプロイします。
-
スキルセットへの統合:
- AzureポータルやJSON定義を使用して、カスタムスキルをスキルセットに追加します。
-
デバッグとテスト:
- デバッグセッション機能を利用して、スキルの動作を確認し、必要に応じて修正します。
ユースケース
- データのベクトル化: Azure OpenAIを利用して、ドキュメントをベクトル形式に変換し、ベクトル検索を実現。
- OCR結果の処理: OCRで抽出したテキストに対してカスタム処理を実行し、特定のフォーマットに変換。
- エンティティ抽出: 特定のエンティティを抽出し、外部APIで詳細情報を取得。
注意点
- 認証とセキュリティ: Web APIのエンドポイントはHTTPSで保護され、認証にはAzureマネージドIDやAPIキーを使用します。
- パフォーマンス: スキルの並列実行数やタイムアウト設定(最大230秒)を適切に調整する必要があります。
- 出力形式: 出力形式は以下のJSON形式に則る必要があります。
{
"values": [
{
"recordId": "1",
"data": {
"outputFieldName": "outputValue"
},
"errors": null
}
]
}
スキルセット・カスタムスキルまとめ
Azure AI Searchでは、スキルセットがデータの加工・強化処理全体を統括する基盤として機能し、組み込み・ユーティリティ・カスタム各スキルを連携して実行されます。特にカスタムスキルは外部APIを介し独自処理を実現し、スキルセット内に組み込むことで業務要件に応じた柔軟なエンリッチメントを提供します。これにより検索精度が向上し、情報を効率的に活用できるようになります。
実装方法
ここでは、マイクロソフト公式がリポジトリを出しているので、そちらを参考にカスタムスキルを構築してみます。
Azureリソース事前準備
上記リポジトリでは、カスタムスキルをAzure Functions上にデプロイして呼び出しています。
また、
そのため、カスタムスキルを利用するためのリソースをAzure上で用意しておきます。
- Azure Functions
- Azure OpenAI
VSCode事前準備
他にもいくつかAzure Functionsへのデプロイ方法はありますが、今回は最も手軽なVS Code経由のデプロイをご紹介します。
- VSCodeのインストール
- Azure拡張機能のインストール
VSCode上でカスタムスキル構築
VSCode上でカスタムスキルを作成〜Azure Functionsにデプロイします。
簡単にまとめると、以下の手順です。
- Azure Functionsプロジェクトの作成
- カスタムスキル作成
- カスタムスキルをAzure Functionsにデプロイ
1. Azure Functionsのプロジェクトを作成します。
Azure拡張機能からAzure Functionsを選択
プロジェクトの新規作成を選択
任意のディレクトリを選択
言語を選択(今回はPython)
ModelV2を選択
トリガーを選択(カスタムスキルの場合はHTTPトリガーを選択)
任意の関数名を入力
認証レベルを選択(今回は閉域化環境で実施するためAnonymous)
2. カスタムスキルを作成します。
ここでは、Microsoft公式のリポジトリを参考にカスタムスキルを構築します。
今回は文書がどんなジャンルのデータなのか、というメタデータをチャンクごとに付与するカスタムスキルを作成してみました。
手順1で自動生成されたfunction_app.py
に下記サンプルコードをコピー。
その後、custom_prompts.json
を作成し、任意のプロンプトを記載してください。
サンプルコード
import azure.functions as func
app = func.FunctionApp()
@app.function_name(name="HealthCheck")
@app.route(route="health", auth_level=func.AuthLevel.ANONYMOUS)
async def health_check(req: func.HttpRequest) -> func.HttpResponse:
import json
"""Health check endpoint"""
response_body = {"status": "Healthy"}
return func.HttpResponse(json.dumps(response_body), mimetype="application/json")
@app.function_name(name="CustomSkill")
@app.route(route="custom_skill", auth_level=func.AuthLevel.ANONYMOUS)
async def custom_skill(req: func.HttpRequest) -> func.HttpResponse:
import json
import logging
import os
from typing import Dict, Any, List
from dataclasses import dataclass
from enum import Enum
import time
import requests
import asyncio
"""Main custom skill endpoint for metadata generation"""
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class ScenarioType(Enum):
METADATA_GENERATION = "metadata-generation"
@dataclass
class ModelConfig:
temperature: float = 0.7
top_p: float = 0.95
max_tokens: int = 4096
timeout: int = 230 # seconds
class CustomSkillException(Exception):
def __init__(self, message: str, status_code: int = 500):
self.message = message
self.status_code = status_code
super().__init__(self.message)
def load_custom_prompts() -> Dict[str, str]:
"""Load custom prompts from JSON file"""
try:
with open("custom_prompts.json", "r") as file:
return json.load(file)
except Exception as e:
logger.error(f"Failed to load custom prompts: {e}")
raise CustomSkillException("Failed to load custom prompts", 500)
def prepare_messages(
request_body: Dict[str, Any], scenario: str, custom_prompts: Dict[str, str]
) -> List[Dict[str, Any]]:
"""Prepare messages for metadata generation scenario"""
try:
if scenario == ScenarioType.METADATA_GENERATION.value:
text = request_body.get("data", {}).get("text", "")
if not text:
raise CustomSkillException(
"Missing text for metadata generation", 400
)
system_message = {
"role": "system",
"content": [
{
"type": "text",
"text": custom_prompts.get(
"metadata-generation-default-system-prompt"
),
}
],
}
user_message = {
"role": "user",
"content": [{"type": "text", "text": text}],
}
return [system_message, user_message]
else:
raise CustomSkillException(f"Unknown scenario: {scenario}", 400)
except CustomSkillException:
raise
except Exception as e:
logger.error(f"Error preparing messages: {e}")
raise CustomSkillException(f"Failed to prepare messages: {str(e)}", 500)
def format_response(
request_body: Dict[str, Any], response_text: str, scenario: str
) -> Dict[str, Any]:
"""Format response for metadata generation scenario"""
try:
response_body = {
"recordId": request_body.get("recordId"),
"warnings": None,
"errors": [],
"data": {},
}
if scenario == ScenarioType.METADATA_GENERATION.value:
try:
metadata = json.loads(response_text)
if not metadata: # メタデータが空の場合
metadata = {"raw_response": response_text}
except json.JSONDecodeError: # JSON解析エラーの場合
metadata = {"raw_response": response_text}
except Exception as e: # その他のエラーの場合
logger.error(f"Error processing metadata: {e}")
metadata = {"raw_response": response_text, "error": str(e)}
# メタデータが辞書型でない場合の処理
if not isinstance(metadata, dict):
metadata = {"raw_response": str(metadata)}
response_body["data"] = metadata # メタデータを直接dataフィールドに設定
return response_body
except Exception as e:
logger.error(f"Error formatting response: {e}")
return {
"recordId": request_body.get("recordId"),
"errors": [str(e)],
"warnings": None,
"data": {"error": str(e)}, # エラー時も直接dataフィールドに設定
}
try:
# Validate request
request_json = req.get_json()
scenario = req.headers.get("scenario")
if not scenario:
raise CustomSkillException("Missing scenario in headers", 400)
input_values = request_json.get("values", [])
if not input_values:
raise CustomSkillException("Missing 'values' in request body", 400)
# Load prompts and create model configuration
custom_prompts = load_custom_prompts()
config = ModelConfig()
response_values = []
api_key = os.getenv("AZURE_INFERENCE_CREDENTIAL")
endpoint = os.getenv("AZURE_CHAT_COMPLETION_ENDPOINT")
headers = {
"Content-Type": "application/json",
"api-key": api_key,
"Authorization": f"Bearer {api_key}",
}
for request_body in input_values:
try:
messages = prepare_messages(request_body, scenario, custom_prompts)
request_payload = {
"messages": messages,
"temperature": config.temperature,
"top_p": config.top_p,
"max_tokens": config.max_tokens,
"response_format": {"type": "json_object"},
}
# Call model with timeout
async with asyncio.timeout(config.timeout):
vanilla_response = requests.post(
endpoint, headers=headers, json=request_payload
)
vanilla_response.raise_for_status()
vanilla_response_json = vanilla_response.json()
response_text = (
vanilla_response_json["choices"][0]["message"]["content"]
)
# Format response
response_values.append(
format_response(request_body, response_text, scenario)
)
except asyncio.TimeoutError:
logger.error(
f"Timeout processing record {request_body.get('recordId')}"
)
response_values.append(
{
"recordId": request_body.get("recordId"),
"errors": ["Request timeout"],
"warnings": None,
"data": None,
}
)
except Exception as e:
logger.error(
f"Error processing record {request_body.get('recordId')}: {e}"
)
response_values.append(
{
"recordId": request_body.get("recordId"),
"errors": [str(e)],
"warnings": None,
"data": None,
}
)
return func.HttpResponse(
json.dumps({"values": response_values}), mimetype="application/json"
)
except CustomSkillException as e:
logger.error(f"Custom skill error: {e}")
return func.HttpResponse(str(e), status_code=e.status_code)
except json.JSONDecodeError:
logger.error("Invalid JSON in request body")
return func.HttpResponse("Invalid JSON in request body", status_code=400)
except Exception as e:
logger.error(f"Unexpected error: {e}")
return func.HttpResponse("Internal server error", status_code=500)
3. Azure Functionsにデプロイします。
Deploy to Azureを選択し、任意のリソースを指定するとデプロイできます
カスタムスキル追加
AI Searchのスキルセットにカスタムスキルを追加します。
スキルセットのJSONファイルに下記サンプルコードを追加します。
既存のスキルセットに追記する場合は、"name": "#3"
ブロックの後に追加し、outputs
の column_name{n}
は 手順2の custom_prompts.json
で定義した出力名と一致させてください。
サンプルコード
{
"@odata.type": "#Microsoft.Skills.Custom.WebApiSkill",
"name": "#4",
"description": "The skill which calls metadata genaration",
"context": "/document/markdownDocument/*/pages/*",
"uri": "https://<your-azure-functions-resource-name>.azurewebsites.net/api/custom_skill",
"httpMethod": "POST",
"timeout": "PT230S",
"batchSize": 1,
"degreeOfParallelism": 1,
"inputs": [
{
"name": "text",
"source": "/document/markdownDocument/*/pages/*",
"inputs": []
}
],
"outputs": [
{
"name": "column_name1",
"targetName": "column_name1"
},
{
"name": "column_name2",
"targetName": "column_name2"
}
],
"httpHeaders": {
"scenario": "metadata-generation"
}
}
全体まとめ
今回は、Azure AI SearchのRAGパイプラインにカスタムスキルを追加する方法をご紹介しました。
カスタムスキルの入出力JSON形式が正しければ、Langchainなどの外部ライブラリも使用できるため、中間処理の幅が広がります。
Azure Functionsをデプロイ先に使用する場合、各処理に230秒のタイムアウト制限があるため、重たい処理は避けてください。
最後に
Givery AI Labが独自保有するフリーランス・副業の高単価AI案件や、随時開催しているセミナーやパーティなどのイベントにご興味ございましたら、ぜひTrack Worksのアカウント登録いただき、最新情報を受け取ってください!
「Track Works」とは?
Givery AI Labの運営会社である株式会社ギブリーが提供する、AI時代のフリーランスエンジニアとして「スキル」と「実績」を強化できる実践的なAI案件を、ご経歴やスキルに合わせてご紹介するフリーラナスエンジニア案件マッチングサービスです。Givery AI Labが独自保有するフリーランス・副業案件を紹介したり、AI技術やキャリアに関するイベントを随時開催しています。
参考
Discussion