🙆
PythonでLanceDBとAWS S3の統合を実装
結論
日本語でも英語でもLanceDBの実装記事があまり無いため、結局公式のドキュメントやコードを参考にした。
S3との接続方法について、公式ドキュメントのどこに書いてあるか分かりにくかった。
以下を参考にした。
環境変数をミスると、ローカルにs3:
というディレクトリが作成されてしまい、エラーが出ないため、注意する必要がある。
また、今回はS3のバケットタイプをS3 Express One Zoneで作成したので、"s3_express": "true"
としている。
コード
import asyncio
import lancedb
from src.settings import (
BUCKET_NAME,
AWS_REGION,
AWS_ACCESS_KEY_ID,
AWS_SECRET_ACCESS_KEY,
)
uri = f"s3://{BUCKET_NAME}/lancedb"
storage_options = {
"aws_access_key_id": AWS_ACCESS_KEY_ID,
"aws_secret_access_key": AWS_SECRET_ACCESS_KEY,
"s3_express": "true",
"aws_region": AWS_REGION,
}
# table operations
def create_table(table_name: str, schema, overwrite: bool = False):
print("🚀 create_table")
async def test():
db = await lancedb.connect_async(uri, storage_options=storage_options)
tbl = await db.create_table(
table_name,
schema=schema,
mode="overwrite" if overwrite else "create", # overwrite if table exists
)
print(f"created {table_name}: {tbl}\n")
return asyncio.run(test())
def list_tables():
print("🚀 list_tables")
async def test():
db = await lancedb.connect_async(uri, storage_options=storage_options)
tbl_names = await db.table_names()
print(f"list of tables: {tbl_names}\n")
return asyncio.run(test())
def get_table(table_name: str):
print("🚀 get_table")
async def test():
db = await lancedb.connect_async(uri, storage_options=storage_options)
tbl = await db.open_table(table_name)
print(f"get {table_name}: {tbl}\n")
return asyncio.run(test())
def delete_table(table_name: str):
print("🚀 delete_table")
async def test():
db = await lancedb.connect_async(uri, storage_options=storage_options)
await db.drop_table(table_name)
print(f"deleted table {table_name}\n")
return asyncio.run(test())
# data operations
def insert_data(table_name: str, data):
print("🚀 insert_data")
async def test():
db = await lancedb.connect_async(uri, storage_options=storage_options)
tbl = await db.open_table(table_name)
await tbl.add([data])
print(f"inserted data into {table_name}: {data}\n")
return asyncio.run(test())
def search_data(table_name: str, vector):
print("🚀 search_data")
async def test():
db = await lancedb.connect_async(uri, storage_options=storage_options)
tbl = await db.open_table(table_name)
items = await tbl.vector_search(vector).limit(2).to_pandas()
print(f"{items}\n")
return asyncio.run(test())
def update_data(table_name: str, data):
print("🚀 update_data")
async def test():
db = await lancedb.connect_async(uri, storage_options=storage_options)
tbl = await db.open_table(table_name)
await tbl.update(data)
print(f"updated data into {table_name}: {data}\n")
return asyncio.run(test())
def delete_data(table_name: str, query: str):
print("🚀 delete_data")
async def test():
db = await lancedb.connect_async(uri, storage_options=storage_options)
tbl = await db.open_table(table_name)
await tbl.delete(query)
print(f"deleted data {table_name}\n")
return asyncio.run(test())
環境変数
.env
BUCKET_NAME=sample-backet-name--apne1-az4--x-s3
AWS_REGION=**********
AWS_ACCESS_KEY_ID=**********
AWS_SECRET_ACCESS_KEY=**********
settings.py
import os
from os.path import join, dirname
from dotenv import load_dotenv
load_dotenv(verbose=True)
dotenv_path = join(dirname(__file__), ".env")
load_dotenv(dotenv_path)
BUCKET_NAME = os.environ.get("BUCKET_NAME")
AWS_REGION = os.environ.get("AWS_REGION")
AWS_ACCESS_KEY_ID = os.environ.get("AWS_ACCESS_KEY_ID")
AWS_SECRET_ACCESS_KEY = os.environ.get("AWS_SECRET_ACCESS_KEY")
その他
requirements.txt
python-dotenv==1.0.1
lancedb==0.13.0
pandas==2.2.3
参考にした記事
-
公式ドキュメント: https://lancedb.github.io/lancedb/
-
LanceDBのテストコード: https://github.com/lancedb/lancedb/blob/main/python/python/tests/test_s3.py
-
LanceDBのブログ: https://blog.lancedb.com/
-
組み込みデータベース(3):LanceDBとモジュラーデータスタック: https://thedataquarry.com/posts/embedded-db-3/
Discussion