Next.js x OpenSearch x Lambda(Python) で簡単な検索・蔵書アプリケーションを作ってみた
背景
今まで業務の中で検索機能の実装を担当した際、バックエンド側から取得したデータをフロントエンド側でフィルタリングして表示するようにしていました。
しかし、この実装だとデータ量が増加するにつれてパフォーマンスの低下が懸念されるのと、複雑な検索要件への対応が難しいかなと思ったので、何かいい方法はないかなーと探していたところ、AWSのマネージドサービスであるOpenSearchを見つけました。
ちょうどサーバーレスアーキテクチャについても勉強しようと思っていたので、いろいろ調べた結果、YoutubeでAWSのサーバレスアーキテクチャで実装した紹介例があったので、自分でもやってみようと思いました。
引用元: Youtube
アウトプット
技術スタック
- フロントエンド: TypeScript, Next.js(App router v15)
- バックエンド: Python
- インフラ: Lambda, API Gateway, DynamoDB, OpenSearch
- 認証: Auth0(本記事では省略)
- コンテナ: Docker(本記事では省略)
- ローカル環境構築: Cloudflare tunnel
Architecture
フロントエンド層:
- ブラウザを通してユーザーがアプリケーションにアクセス
- Next.jsを使用したUIコンポーネントで実装
- Auth0による認証サーバーでユーザー認証を処理
バックエンド層:
- FastAPI(Pythonベース)のAPI Gatewayを設置
- Lambdaでフロントエンドからのリクエストの処理を行う
データストレージ層:
- DynamoDBをメインのデータストレージとして使用
- OpenSearchを検索エンジンとして採用
- DynamoDBストリームを使用してデータの同期や更新を処理
Backend
ローカル環境(Docker)でOpenSearchを構築
最初はDockerでOpenSearchを構築し、Cloudflare tunnelを使ってLambdaからアクセスできる環境を準備しました。AWSでいきなりOpenSearchを起動するとなると運用料金が高くついてしまう(月額$30~$40)ので、一旦ローカル環境のOpenSearchに対して疎通確認ができるかという検証をやりました。
Cloudflare tunnelは、インターネットへの安全な公開を可能にするCloudflareのサービスで、公開IPアドレスやポート開放が不要な暗号化トンネルを提供してくれます。
イメージは、環境については下記のdocker-compose.yml
で、OpenSearchとOpenSearch Dashboardを構築し、Cloudflare tunnelを経由してLambdaからアクセスできることを確かめたって感じです。
docker-compose.yml
---
version: '3'
services:
opensearch-node1:
image: opensearchproject/opensearch:2.18.0
container_name: opensearch-node1
environment:
- discovery.type=single-node
- node.name=opensearch-node1
- plugins.security.disabled=true
- "_JAVA_OPTIONS=-XX:UseSVE=0"
- OPENSEARCH_INITIAL_ADMIN_PASSWORD=${OPENSEARCH_INITIAL_ADMIN_PASSWORD}
- http.host=0.0.0.0
- transport.host=127.0.0.1
ulimits:
memlock:
soft: -1
hard: -1
nofile:
soft: 65536
hard: 65536
volumes:
- opensearch-data1:/usr/share/opensearch/data
ports:
- 9200:9200
- 9600:9600
networks:
- opensearch-net
opensearch-dashboards:
image: opensearchproject/opensearch-dashboards:2.18.0
container_name: opensearch-dashboards
ports:
- 5601:5601
expose:
- '5601'
environment:
# OPENSEARCH_HOSTS: '["https://opensearch-node1:9200","https://opensearch-node2:9200"]'
- OPENSEARCH_HOSTS=http://opensearch-node1:9200
- DISABLE_SECURITY_DASHBOARDS_PLUGIN=true
networks:
- opensearch-net
volumes:
opensearch-data1:
networks:
opensearch-net:
やり方についてはこちらの記事で紹介されていました
Serverless Frameworkでインフラを構築
インフラ全体の構造としては、検索および登録APIをAPI Gatewayに定義して処理をLambdaに、保存先としてDynamoDB、検索エンジンとしてOpenSearchを使用するようなアーキテクチャになっています。
Serverless Framework
service: search-api
provider:
name: aws
runtime: python3.9
region: ap-northeast-1
environment: ${file(env.yml)}
iam:
role:
statements:
- Effect: Allow
Action: dynamodb:*
Resource: !GetAtt BooksTable.Arn
- Effect: Allow
Action: es:ESHttp*
Resource:
- !Sub arn:aws:es:${AWS::Region}:${AWS::AccountId}:domain/${self:provider.environment.OPENSEARCH_DOMAIN_NAME}/*
package:
individually: true
patterns:
- '!**'
- 'requirements.txt'
functions:
api:
handler: src.main.handler
package:
include:
- src/**/*.py
events:
- httpApi:
path: /docs
method: get
- httpApi:
path: /openapi.json
method: get
- httpApi:
path: /search
method: get
- httpApi:
path: /books
method: ANY
layers:
- Ref: PythonRequirementsLambdaLayer
syncToOpensearch:
handler: src.dynamodb_stream.handler
package:
include:
- src/**/*.py
events:
- stream:
type: dynamodb
arn: !GetAtt BooksTable.StreamArn
layers:
- Ref: PythonRequirementsLambdaLayer
resources:
Resources:
BooksTable:
Type: AWS::DynamoDB::Table
Properties:
TableName: Books
AttributeDefinitions:
- AttributeName: id
AttributeType: S
KeySchema:
- AttributeName: id
KeyType: HASH
BillingMode: PAY_PER_REQUEST
StreamSpecification:
StreamViewType: NEW_AND_OLD_IMAGES
OpenSearchDomain:
Type: AWS::OpenSearchService::Domain
Properties:
DomainName: ${self:provider.environment.OPENSEARCH_DOMAIN_NAME}
ClusterConfig:
InstanceCount: 1
InstanceType: t3.small.search
EBSOptions:
EBSEnabled: true
VolumeSize: 10
VolumeType: gp2
EncryptionAtRestOptions:
Enabled: true
NodeToNodeEncryptionOptions:
Enabled: true
DomainEndpointOptions:
EnforceHTTPS: true
AdvancedSecurityOptions:
Enabled: true
InternalUserDatabaseEnabled: true
MasterUserOptions:
MasterUserName: ${self:provider.environment.OPENSEARCH_USERNAME}
MasterUserPassword: ${self:provider.environment.OPENSEARCH_PASSWORD}
plugins:
- serverless-python-requirements
custom:
pythonRequirements:
layer: true
特徴
- Python 3.9ランタイム
- 依存関係はserverless-python-requirementsプラグインで管理
- シングルノード構成(t3.small)のOpenSearch
- FastAPIのフレームワークを使用(/docsエンドポイントの存在から)検索、書籍データのCRUD操作を提供
- DyonamoDBの変更をOpenSearchに同期するため、DynamoDBのストリームを有効化し、Lambdaが実行される
-
syncToOpensearch
: DynamoDBからOpenSearchへデータを同期するための関数 - 書籍データを保存したり、検索したりするDynamoDBテーブル(BooksTable)
検索API
開発途中でSwaggerUIを用いてAPIのテストをしたかったので、PythonのFastAPIを使用して、OpenAPI(Swagger)ドキュメントの自動生成を行えるようにしました。
特徴
- タイトルと本文の両方を検索対象にしています
- OpenSearchクライアントとの接続処理においては、IAMベースの認証とHTTPS通信の強制しています
- DynamoDBとOpenSearchを使い分けています(書き込みと検索の分離)※登録APIの記述は省略しています
-
hits
→ ヒット数を取得 (result["hits"]["total"]["value"]
) - ``results
→ 検索結果リストを
SearchResponse` 型のオブジェクトに変換
table = get_dynamodb_table()
app = FastAPI(
title="Search API",
description="Search API for OpenSearch",
version="1.0.0"
)
@app.get("/books",response_model=SearchResult, summary="Search for a character", description="Search for a character by name")
async def search(keyword: str=None):
try:
client = get_opensearch_client()
query = build_opensearch_query(keyword)
result = client.search(index="books", body=query)
hits = result["hits"]["total"]["value"]
results = [
SearchResponse(
id=str(hit['_source']['id']),
title=hit['_source']['title'],
story=hit['_source']['story'],
attributes=hit['_source']['attributes'],
created_at=hit['_source']['created_at'],
updated_at=hit['_source']['updated_at']
)
for hit in result["hits"]["hits"]
]
return SearchResult(hits=hits, results=results)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
handler = Mangum(app)
schemaとutilsについては下記
schema.py
from pydantic import BaseModel
from typing import List
class SearchResponse(BaseModel):
id: str
title: str
story: str
attributes: List[str]
created_at: str
updated_at: str
class SearchResult(BaseModel):
hits: int
results: List[SearchResponse]
class CreateBook(BaseModel):
title: str
class Book(BaseModel):
id: str
title: str
story: str
attributes: List[str]
created_at: str
updated_at: str
utils.py
from opensearchpy import OpenSearch, RequestsHttpConnection
from requests_aws4auth import AWS4Auth
import os
import boto3
def get_opensearch_client():
endpoint = os.environ.get('OPENSEARCH_ENDPOINT', '')
host = endpoint.replace('https://', '').replace('http://', '')
credentials = boto3.Session().get_credentials()
region = os.environ.get('AWS_REGION', 'ap-northeast-1')
auth = AWS4Auth(
credentials.access_key,
credentials.secret_key,
region,
'es',
session_token=credentials.token
)
return OpenSearch(
hosts=[{'host': host, 'port': 443}],
http_auth=auth,
use_ssl=True,
verify_certs=False,
connection_class=RequestsHttpConnection
)
def get_dynamodb_table():
return boto3.resource('dynamodb').Table('Books')
OPENSEARCH_INDEX = 'books'
OPENSEARCH_MAPPING = {
"mappings": {
"properties": {
"id": {"type": "keyword"},
"title": {"type": "text"},
"story": {"type": "text"},
"attributes": {"type": "keyword"},
"created_at": {"type": "date"},
"updated_at": {"type": "date"}
}
}
}
def build_opensearch_query(keyword):
return {
"query": {
"multi_match": {
"query": keyword,
"fields": ["title", "story"]
}
}
}
Frontend
ServerActionsとServiceで関心の分離
以下のようにServerActionsとServiceについて、関心の分離をすることで、コードを見やすくし、リクエストを投げた時に、スキーマによるバリデーションでエラーが起きるのか、またはリクエストの処理の段階、もしくはレスポンスでエラーになっているのかが明確になります。
ServerActions(next-safe-action):
- zodによる入力値のスキーマバリデーション
- クライアントからの呼び出しインターフェース
- 型安全性
- エラーハンドリング(アプリケーションレベルのエラー処理)
Service:
- ビジネスロジックの実装に集中できる
- APIコールの実装
- レスポンス処理
- エラーハンドリング(API通信やビジネスロジックに関するエラー処理)
'use server'
import { actionClient } from '@/lib/actions/safe-action'
import { searchBooks } from '@/lib/services/books/search-books'
import { searchQuerySchema } from '@/lib/zod/schemas/books'
export const searchBooksAction = actionClient
.schema(searchQuerySchema)
.action(async input => {
const books = await searchBooks({
params: { keyword: input.parsedInput.keyword }
})
return books
})
import 'server-only'
import { path, handleFailed, handleSucceed } from '@/lib/services/utils'
import { SearchBooksRequest, SearchBooksResponse } from '@/lib/types/books'
export const searchBooks = async ({
params: { keyword }
}: SearchBooksRequest): Promise<SearchBooksResponse> => {
const url = path(`/books?keyword=${encodeURIComponent(keyword)}`)
return fetch(url, {
headers: {
'Content-Type': 'application/json'
},
method: 'GET',
cache: 'no-store'
})
.then(handleSucceed)
.catch(handleFailed)
}
モダンなUIコンポーネントを提供してくれる shadcn-ui
今回実装では command
のコンポーネントを使用しました。
shadcn/uiを使うと、実装の省力(検索UI特有の複雑な実装であるドロップダウンやフォーカス管理など)ができて、検索機能のビジネスロジックに集中できます。
結果的にUIの実装にかかる時間を大幅に削減できるのでおすすめです。
Biomeによる高速なLinter & Formatter
今回TypeScriptを使っているのでESLintやPrettierの代わりにBiomeを使用しました。
そもそもBiomeについてですが、以下のような特徴があります。
- Rustで書かれた高速なコードリンター&フォーマッター
- ESLintとPrettierの機能を1つのツールに統合したもの
- パフォーマンスを重視した設計になっていて、高速に動作します(めっちゃ早い)
- セットアップが簡単なのですぐに導入できる(VSCodeだとBiomeの拡張機能入れて biome.jsonをカスタマイズするだけで完了)
例えば下記のようなMakefileを下記のように書いておくだけで、ターミナル上でmake check
で型チェックとリンターとフォーマットチェック
また、make format
で自身でカスタマイズしたbiomeのルールに沿ってフォーマットが高速に実行されます。
biome-check:
npx biome check ./src
biome-write:
npx biome check --write ./src
tsc-check:
npx tsc --noEmit
check:
make biome-check
make tsc-check
format:
make biome-write
最後に
今後もし機会があれば、日本語テキストの検索精度を上げるために、Kuromoji
やSudachi
のような日本語形態素解析ツールの導入したいなーと思っています。
内容が盛りだくさんなので、わかりづらい部分や半ば省略したところがあるかと思いますが、誰かのお役に立てればと思って書きました!🙇
説明
Discussion