Closed8
pythonでSQL ServerをAzure AI Searchのデータソースとして登録する
環境 Python3.10.10
ライブラリ
pyodbc==5.1.0
python-dotenv==1.0.1
pandas==2.2.2
openai==1.34.0
jupyter==1.0.0
requests==2.32.3
azure-core==1.30.2
azure-search-documents==11.6.0b4
手順:
- データソース作成
- インデックス作成
- スキルセット作成(今回はベクトル検索 + セマンティックサーチ)
- インデクサー作成・実行
前提:
- Azure SQL Serverを作成済み
- Azure AI Searchを作成済み
- Azure Open AIを作成済み(Embeddingに使用)
0. import
from azure.core.credentials import AzureKeyCredential
from azure.search.documents import SearchClient
from azure.search.documents.indexes import SearchIndexClient, SearchIndexerClient
from azure.search.documents.models import VectorizableTextQuery
from azure.search.documents.indexes.models import (
AzureOpenAIEmbeddingSkill,
AzureOpenAIParameters,
AzureOpenAIVectorizer,
HnswAlgorithmConfiguration,
InputFieldMappingEntry,
OutputFieldMappingEntry,
SemanticPrioritizedFields,
SearchField,
SearchFieldDataType,
SearchIndex,
SearchIndexer,
SearchIndexerDataContainer,
SearchIndexerDataSourceConnection,
SearchIndexerIndexProjectionSelector,
SearchIndexerIndexProjectionsParameters,
IndexProjectionMode,
SearchIndexerIndexProjections,
SearchIndexerSkillset,
SemanticConfiguration,
SemanticField,
SemanticSearch,
SplitSkill,
SqlIntegratedChangeTrackingPolicy,
VectorSearch,
VectorSearchAlgorithmKind,
VectorSearchProfile,
)
1. データソース作成
# SQL SERVERの設定
CONNECTION_STRING = "" # 接続文字列
TABLE_NAME = "" # テーブル名
# AZURE AI SEARCHの設定
AZURE_AI_SEARCH_ENDPOINT = "" # AI Search Endpoint
AZURE_AI_SEARCH_KEY = "" # AI Search Key
# インデックスの名前
INDEX_NAME = "table-index"
# AI SearchからSQL Serverへアクセスを承認する
cogsearch_credential = AzureKeyCredential(AZURE_AI_SEARCH_KEY)
ds_client = SearchIndexerClient(AZURE_AI_SEARCH_ENDPOINT, cogsearch_credential)
container = SearchIndexerDataContainer(name=TABLE_NAME)
# SQL Serverの対象テーブルが更新された場合にインデクサーを自動更新する処理
change_detection_policy = SqlIntegratedChangeTrackingPolicy()
#データソース元を設定しデータソース作成
data_source_connection = SearchIndexerDataSourceConnection(
name=f"{INDEX_NAME}-azuresql-connection",
type="azuresql",
connection_string=CONNECTION_STRING,
container=container,
data_change_detection_policy=change_detection_policy,
)
data_source = ds_client.create_or_update_data_source_connection(data_source_connection)
print(f"Data source '{data_source.name}' created or updated")
注意点
- データベースの変更にともなってIndexerを更新する場合はDatabase TableのトラッキングをONにする必要がある。
-- DatabaseのトラッキングをONにする
ALTER DATABASE データベース名
SET CHANGE_TRACKING = ON
(CHANGE_RETENTION = 2 DAYS, AUTO_CLEANUP = ON);
-- 対象テーブルのトラッキングをONにする
ALTER TABLE テーブル名
ENABLE CHANGE_TRACKING
WITH (TRACK_COLUMNS_UPDATED = ON);
- インデクサー作成時のIDで複合キーを指定している場合は、別途単独キーを設定しないと変更を追跡できないため、トラッキングは必要がなければOFFにする
- AZURE SQL SERVERでAzure サービスおよびリソースにこのサーバーへのアクセスを許可するにチェックを入れる(プライベートリンクの接続や、共有プライベートアクセスを設定してもできなかった。Azure AI SearchのIPをそのままSQL Serverで許可すればできそう?)
2. インデックス作成
Embeddingに使用するAzure OpenAIの設定
# Open AI Embedding
import openai
openai.api_type = "azure"
openai.api_key = "" # Azure OpenAI API Key
openai.api_base = "" # Azure OpenAI API Endpoint
openai.api_version = 2
openai_deployment_embedding = "deployment-text-embedding-ada-002"
openai_model_embedding = "text-embedding-ada-002"
EMBEDDING_LENGTH = 1536
インデックス作成
# Create a search index
index_client = SearchIndexClient(
endpoint=AZURE_AI_SEARCH_ENDPOINT, credential=cogsearch_credential
)
# インデックスのフィールド
fields = [
# Properties of individual chunk
# Id 複合キーだと作れない?のでユニークのカラムを使う
SearchField(
name="Id",
type=SearchFieldDataType.String,
key=True,
sortable=True,
filterable=True,
facetable=True,
analyzer_name="keyword",
),
# 対象のカラムのテキストをチャンクした結果を格納するフィールド カラムの指定はSkillSetで使う
# ChatTitleを対象
SearchField(
name="chunk",
type=SearchFieldDataType.String,
sortable=False,
filterable=False,
facetable=False,
),
# チャンクのベクトル 指定したカラムでSkillSetでベクトルにする
SearchField(
name="vector",
type=SearchFieldDataType.Collection(SearchFieldDataType.Single),
vector_search_dimensions=EMBEDDING_LENGTH,
vector_search_profile_name="my-vector-search-profile",
),
# チャンクしたカラムの親ID
SearchField(
name="parent_id",
type=SearchFieldDataType.String,
sortable=True,
filterable=True,
facetable=True,
),
# その他DBにある別カラム
SearchField(
name="sample1",
type=SearchFieldDataType.String,
sortable=True,
filterable=True,
facetable=True,
),
# その他DBにある別カラム
# サイズが大きくなるとエラーになる場合はsortable, filterable, facetableなど不要な要素はFalseにする
SearchField(
name="sample2",
type=SearchFieldDataType.String,
sortable=False,
filterable=False,
facetable=False,
),
]
# ベクトル検索のエンジンを指定
vector_search = VectorSearch(
algorithms=[
HnswAlgorithmConfiguration(
name="my-hnsw-config", kind=VectorSearchAlgorithmKind.HNSW
)
],
profiles=[
VectorSearchProfile(
name="my-vector-search-profile",
algorithm_configuration_name="my-hnsw-config",
vectorizer="my-openai",
)
],
# Azure AI SearchのEmbeddingはここ
vectorizers=[
AzureOpenAIVectorizer(
name="my-openai",
kind="azureOpenAI",
azure_open_ai_parameters=AzureOpenAIParameters(
resource_uri=openai.api_base,
deployment_id=openai_deployment_embedding,
api_key=openai.api_key,
model_name=openai_model_embedding,
),
)
],
)
# セマンティックサーチのコンフィグを設定
semantic_config = SemanticConfiguration(
name="my-semantic-config",
prioritized_fields=SemanticPrioritizedFields(
# セマンティックサーチの対象カラム
content_fields=[SemanticField(field_name="sample1")]
),
)
# セマンティックサーチの設定
semantic_search = SemanticSearch(configurations=[semantic_config])
# インデックス作成
index = SearchIndex(
name=INDEX_NAME,
fields=fields,
vector_search=vector_search,
semantic_search=semantic_search,
)
result = index_client.create_or_update_index(index)
print(f"{result.name} created")
注意点
- OpenAi embeddingはプライベートネットではなくパブリックじゃないと使えない(Azure AI SearchがS2レベルじゃないとプライベートネットワーク間で接続できない)
https://learn.microsoft.com/ja-jp/azure/search/search-sku-tier
3. スキルセット作成
テキストのチャンク + チャンク文字のEmbeddingを実行
# テキストのチャンク
split_skill = SplitSkill(
description="Split skill to chunk documents",
text_split_mode="pages", # page単位でチャンク
context="/document", # デフォルト
maximum_page_length=300, # チャンクサイズ
page_overlap_length=20, # 前後チャンクとのパディング
inputs=[
InputFieldMappingEntry(name="text", source="/document/sample1"), # チャンクするカラム
],
# チャンクしたテキストをpagesに入れる
outputs=[OutputFieldMappingEntry(name="textItems", target_name="pages")],
)
# ベクトル化
embedding_skill = AzureOpenAIEmbeddingSkill(
description="Skill to generate embeddings via Azure OpenAI",
context="/document/pages/*",
resource_uri=openai.api_base,
deployment_id=openai_deployment_embedding,
api_key=openai.api_key,
model_name=openai_model_embedding,
inputs=[
InputFieldMappingEntry(name="text", source="/document/pages/*"),
],
# ベクトル化した結果をvectorフィールドに入れる
outputs=[OutputFieldMappingEntry(name="embedding", target_name="vector")],
)
# スキルセットの結果をインデックスにマッピングする
index_projections = SearchIndexerIndexProjections(
selectors=[
SearchIndexerIndexProjectionSelector(
target_index_name=INDEX_NAME,
parent_key_field_name="parent_id", # チャンクしたカラムのID 複数チャンクされたら同じIDになる
source_context="/document/pages/*",
mappings=[
InputFieldMappingEntry(name="chunk", source="/document/pages/*"),
InputFieldMappingEntry(
name="vector", source="/document/pages/*/vector"
),
InputFieldMappingEntry(name="JobType", source="/document/sample1"),
InputFieldMappingEntry(name="UserName", source="/document/sample2"),
],
),
],
# チャンク元の親テキストをインデックスに追加しない場合は追加
# 追加しない場合はchunk, chunk_idが空の親テキストがインデックスに登録される
# Azure AI Searchのポータルから作る場合は下記が付属された状態のため基本的には必要だと思う
parameters=SearchIndexerIndexProjectionsParameters(
projection_mode=IndexProjectionMode.SKIP_INDEXING_PARENT_DOCUMENTS
),
)
# スキルセット作成
skillset = SearchIndexerSkillset(
name=f"{INDEX_NAME}-skillset",
description="Skillset to chunk documents and generating embeddings",
skills=[split_skill, embedding_skill],
index_projections=index_projections,
)
client = SearchIndexerClient(AZURE_AI_SEARCH_ENDPOINT, cogsearch_credential)
client.create_or_update_skillset(skillset)
print(f" {skillset.name} created")
インデクサー作成
データソース・インデックス・スキルセットを統括するインデクサー作成
# Create an indexer
indexer_name = f"{INDEX_NAME}-indexer"
indexer = SearchIndexer(
name=indexer_name,
description="Indexer to chunk documents and generate embeddings",
skillset_name=skillset_name,
target_index_name=INDEX_NAME,
data_source_name=data_source.name,
)
indexer_client = SearchIndexerClient(AZURE_AI_SEARCH_ENDPOINT, cogsearch_credential)
# Indexerの作成
indexer_result = indexer_client.create_or_update_indexer(indexer)
# Indexerの実行(更新)
indexer_client.run_indexer(indexer_name)
print(f" {indexer_name} created")
このスクラップは4ヶ月前にクローズされました