🙆

PythonでLanceDBとAWS S3の統合を実装

2024/10/09に公開

結論

日本語でも英語でもLanceDBの実装記事があまり無いため、結局公式のドキュメントやコードを参考にした。

S3との接続方法について、公式ドキュメントのどこに書いてあるか分かりにくかった。
以下を参考にした。
https://lancedb.github.io/lancedb/guides/storage/#general-configuration:~:text=192.168.1.0/24.-,AWS S3,-To configure credentials

環境変数をミスると、ローカルにs3:というディレクトリが作成されてしまい、エラーが出ないため、注意する必要がある。

また、今回はS3のバケットタイプをS3 Express One Zoneで作成したので、"s3_express": "true"としている。

コード

import asyncio
import lancedb
from src.settings import (
    BUCKET_NAME,
    AWS_REGION,
    AWS_ACCESS_KEY_ID,
    AWS_SECRET_ACCESS_KEY,
)


uri = f"s3://{BUCKET_NAME}/lancedb"
storage_options = {
    "aws_access_key_id": AWS_ACCESS_KEY_ID,
    "aws_secret_access_key": AWS_SECRET_ACCESS_KEY,
    "s3_express": "true",
    "aws_region": AWS_REGION,
}


# table operations
def create_table(table_name: str, schema, overwrite: bool = False):
    print("🚀 create_table")

    async def test():
        db = await lancedb.connect_async(uri, storage_options=storage_options)
        tbl = await db.create_table(
            table_name,
            schema=schema,
            mode="overwrite" if overwrite else "create",  # overwrite if table exists
        )
        print(f"created {table_name}: {tbl}\n")

    return asyncio.run(test())


def list_tables():
    print("🚀 list_tables")

    async def test():
        db = await lancedb.connect_async(uri, storage_options=storage_options)
        tbl_names = await db.table_names()
        print(f"list of tables: {tbl_names}\n")

    return asyncio.run(test())


def get_table(table_name: str):
    print("🚀 get_table")

    async def test():
        db = await lancedb.connect_async(uri, storage_options=storage_options)
        tbl = await db.open_table(table_name)
        print(f"get {table_name}: {tbl}\n")

    return asyncio.run(test())


def delete_table(table_name: str):
    print("🚀 delete_table")

    async def test():
        db = await lancedb.connect_async(uri, storage_options=storage_options)
        await db.drop_table(table_name)
        print(f"deleted table {table_name}\n")

    return asyncio.run(test())


# data operations
def insert_data(table_name: str, data):
    print("🚀 insert_data")

    async def test():
        db = await lancedb.connect_async(uri, storage_options=storage_options)
        tbl = await db.open_table(table_name)
        await tbl.add([data])
        print(f"inserted data into {table_name}: {data}\n")

    return asyncio.run(test())


def search_data(table_name: str, vector):
    print("🚀 search_data")

    async def test():
        db = await lancedb.connect_async(uri, storage_options=storage_options)
        tbl = await db.open_table(table_name)
        items = await tbl.vector_search(vector).limit(2).to_pandas()
        print(f"{items}\n")

    return asyncio.run(test())


def update_data(table_name: str, data):
    print("🚀 update_data")

    async def test():
        db = await lancedb.connect_async(uri, storage_options=storage_options)
        tbl = await db.open_table(table_name)
        await tbl.update(data)
        print(f"updated data into {table_name}: {data}\n")

    return asyncio.run(test())


def delete_data(table_name: str, query: str):
    print("🚀 delete_data")

    async def test():
        db = await lancedb.connect_async(uri, storage_options=storage_options)
        tbl = await db.open_table(table_name)
        await tbl.delete(query)
        print(f"deleted data {table_name}\n")

    return asyncio.run(test())

環境変数

.env
BUCKET_NAME=sample-backet-name--apne1-az4--x-s3
AWS_REGION=**********
AWS_ACCESS_KEY_ID=**********
AWS_SECRET_ACCESS_KEY=**********
settings.py
import os
from os.path import join, dirname
from dotenv import load_dotenv

load_dotenv(verbose=True)

dotenv_path = join(dirname(__file__), ".env")
load_dotenv(dotenv_path)

BUCKET_NAME = os.environ.get("BUCKET_NAME")
AWS_REGION = os.environ.get("AWS_REGION")
AWS_ACCESS_KEY_ID = os.environ.get("AWS_ACCESS_KEY_ID")
AWS_SECRET_ACCESS_KEY = os.environ.get("AWS_SECRET_ACCESS_KEY")

その他

requirements.txt
python-dotenv==1.0.1
lancedb==0.13.0
pandas==2.2.3

参考にした記事

Discussion