🧟‍♀️

Next.js x OpenSearch x Lambda(Python) で簡単な検索・蔵書アプリケーションを作ってみた

2025/02/22に公開

背景

今まで業務の中で検索機能の実装を担当した際、バックエンド側から取得したデータをフロントエンド側でフィルタリングして表示するようにしていました。

しかし、この実装だとデータ量が増加するにつれてパフォーマンスの低下が懸念されるのと、複雑な検索要件への対応が難しいかなと思ったので、何かいい方法はないかなーと探していたところ、AWSのマネージドサービスであるOpenSearchを見つけました。

ちょうどサーバーレスアーキテクチャについても勉強しようと思っていたので、いろいろ調べた結果、YoutubeでAWSのサーバレスアーキテクチャで実装した紹介例があったので、自分でもやってみようと思いました。

引用元: Youtube

https://dev.classmethod.jp/articles/connect-to-local-opensearch-with-cloudflared/

アウトプット

技術スタック

  • フロントエンド: TypeScript, Next.js(App router v15)
  • バックエンド: Python
  • インフラ: Lambda, API Gateway, DynamoDB, OpenSearch
  • 認証: Auth0(本記事では省略)
  • コンテナ: Docker(本記事では省略)
  • ローカル環境構築: Cloudflare tunnel

Architecture

フロントエンド層:

  • ブラウザを通してユーザーがアプリケーションにアクセス
  • Next.jsを使用したUIコンポーネントで実装
  • Auth0による認証サーバーでユーザー認証を処理

バックエンド層:

  • FastAPI(Pythonベース)のAPI Gatewayを設置
  • Lambdaでフロントエンドからのリクエストの処理を行う

データストレージ層:

  • DynamoDBをメインのデータストレージとして使用
  • OpenSearchを検索エンジンとして採用
  • DynamoDBストリームを使用してデータの同期や更新を処理

Backend

ローカル環境(Docker)でOpenSearchを構築

最初はDockerでOpenSearchを構築し、Cloudflare tunnelを使ってLambdaからアクセスできる環境を準備しました。AWSでいきなりOpenSearchを起動するとなると運用料金が高くついてしまう(月額$30~$40)ので、一旦ローカル環境のOpenSearchに対して疎通確認ができるかという検証をやりました。

Cloudflare tunnelは、インターネットへの安全な公開を可能にするCloudflareのサービスで、公開IPアドレスやポート開放が不要な暗号化トンネルを提供してくれます。

イメージは、環境については下記のdocker-compose.ymlで、OpenSearchとOpenSearch Dashboardを構築し、Cloudflare tunnelを経由してLambdaからアクセスできることを確かめたって感じです。

docker-compose.yml
docker-compose.yml
---
version: '3'
services:
  opensearch-node1:
    image: opensearchproject/opensearch:2.18.0
    container_name: opensearch-node1
    environment:
      - discovery.type=single-node
      - node.name=opensearch-node1
      - plugins.security.disabled=true
      - "_JAVA_OPTIONS=-XX:UseSVE=0"
      - OPENSEARCH_INITIAL_ADMIN_PASSWORD=${OPENSEARCH_INITIAL_ADMIN_PASSWORD}
      - http.host=0.0.0.0
      - transport.host=127.0.0.1
    ulimits:
      memlock:
        soft: -1
        hard: -1
      nofile:
        soft: 65536
        hard: 65536
    volumes:
      - opensearch-data1:/usr/share/opensearch/data
    ports:
      - 9200:9200
      - 9600:9600
    networks:
      - opensearch-net
  opensearch-dashboards:
    image: opensearchproject/opensearch-dashboards:2.18.0
    container_name: opensearch-dashboards
    ports:
      - 5601:5601
    expose:
      - '5601'
    environment:
      # OPENSEARCH_HOSTS: '["https://opensearch-node1:9200","https://opensearch-node2:9200"]'
      - OPENSEARCH_HOSTS=http://opensearch-node1:9200
      - DISABLE_SECURITY_DASHBOARDS_PLUGIN=true 
    networks:
      - opensearch-net

volumes:
  opensearch-data1:

networks:
  opensearch-net:


画像引用元

やり方についてはこちらの記事で紹介されていました
https://dev.classmethod.jp/articles/how-to-build-opensearch-with-docker/
https://dev.classmethod.jp/articles/connect-to-local-opensearch-with-cloudflared/

Serverless Frameworkでインフラを構築

インフラ全体の構造としては、検索および登録APIをAPI Gatewayに定義して処理をLambdaに、保存先としてDynamoDB、検索エンジンとしてOpenSearchを使用するようなアーキテクチャになっています。

Serverless Framework
serverless.yml
service: search-api

provider:
  name: aws
  runtime: python3.9
  region: ap-northeast-1
  environment: ${file(env.yml)}
  iam:
    role:
      statements:
        - Effect: Allow
          Action: dynamodb:*
          Resource: !GetAtt BooksTable.Arn
        - Effect: Allow
          Action: es:ESHttp*
          Resource: 
            - !Sub arn:aws:es:${AWS::Region}:${AWS::AccountId}:domain/${self:provider.environment.OPENSEARCH_DOMAIN_NAME}/*

package:
  individually: true 
  patterns:
    - '!**'
    - 'requirements.txt'

functions:
  api:
    handler: src.main.handler
    package:
      include:
        - src/**/*.py
    events:
      - httpApi:
          path: /docs
          method: get
      - httpApi:
          path: /openapi.json
          method: get
      - httpApi:
          path: /search
          method: get
      - httpApi:
          path: /books
          method: ANY
    layers:
      - Ref: PythonRequirementsLambdaLayer

  syncToOpensearch:
    handler: src.dynamodb_stream.handler
    package:
      include:
        - src/**/*.py
    events:
      - stream:
          type: dynamodb
          arn: !GetAtt BooksTable.StreamArn
    layers:
      - Ref: PythonRequirementsLambdaLayer

resources:
  Resources:
    BooksTable:
      Type: AWS::DynamoDB::Table
      Properties:
        TableName: Books
        AttributeDefinitions:
          - AttributeName: id
            AttributeType: S
        KeySchema:
          - AttributeName: id
            KeyType: HASH
        BillingMode: PAY_PER_REQUEST
        StreamSpecification:
          StreamViewType: NEW_AND_OLD_IMAGES

    OpenSearchDomain:
      Type: AWS::OpenSearchService::Domain
      Properties:
        DomainName: ${self:provider.environment.OPENSEARCH_DOMAIN_NAME}
        ClusterConfig:
          InstanceCount: 1
          InstanceType: t3.small.search
        EBSOptions:
          EBSEnabled: true
          VolumeSize: 10
          VolumeType: gp2
        EncryptionAtRestOptions:
          Enabled: true
        NodeToNodeEncryptionOptions:
          Enabled: true
        DomainEndpointOptions:
          EnforceHTTPS: true
        AdvancedSecurityOptions:
          Enabled: true
          InternalUserDatabaseEnabled: true
          MasterUserOptions:
            MasterUserName: ${self:provider.environment.OPENSEARCH_USERNAME}
            MasterUserPassword: ${self:provider.environment.OPENSEARCH_PASSWORD}

plugins:
  - serverless-python-requirements

custom:
  pythonRequirements:
    layer: true

特徴

  • Python 3.9ランタイム
  • 依存関係はserverless-python-requirementsプラグインで管理
  • シングルノード構成(t3.small)のOpenSearch
  • FastAPIのフレームワークを使用(/docsエンドポイントの存在から)検索、書籍データのCRUD操作を提供
  • DyonamoDBの変更をOpenSearchに同期するため、DynamoDBのストリームを有効化し、Lambdaが実行される
  • syncToOpensearch: DynamoDBからOpenSearchへデータを同期するための関数
  • 書籍データを保存したり、検索したりするDynamoDBテーブル(BooksTable)

検索API

開発途中でSwaggerUIを用いてAPIのテストをしたかったので、PythonのFastAPIを使用して、OpenAPI(Swagger)ドキュメントの自動生成を行えるようにしました。

特徴

  • タイトルと本文の両方を検索対象にしています
  • OpenSearchクライアントとの接続処理においては、IAMベースの認証とHTTPS通信の強制しています
  • DynamoDBとOpenSearchを使い分けています(書き込みと検索の分離)※登録APIの記述は省略しています
  • hits → ヒット数を取得 (result["hits"]["total"]["value"])
  • ``results→ 検索結果リストをSearchResponse` 型のオブジェクトに変換
main.py
table = get_dynamodb_table()

app = FastAPI(
  title="Search API",
  description="Search API for OpenSearch",
  version="1.0.0"
)

@app.get("/books",response_model=SearchResult, summary="Search for a character", description="Search for a character by name")
async def search(keyword: str=None):
  try:
    client = get_opensearch_client()
    query = build_opensearch_query(keyword)
    result = client.search(index="books", body=query)
    hits = result["hits"]["total"]["value"]
    results = [
      SearchResponse(
        id=str(hit['_source']['id']),
        title=hit['_source']['title'],
        story=hit['_source']['story'],
        attributes=hit['_source']['attributes'],
        created_at=hit['_source']['created_at'],
        updated_at=hit['_source']['updated_at']
      )
      for hit in result["hits"]["hits"]
    ]

    return SearchResult(hits=hits, results=results)
  except Exception as e:
    raise HTTPException(status_code=500, detail=str(e))
  
handler = Mangum(app)

schemaとutilsについては下記

schema.py
schema.py
from pydantic import BaseModel
from typing import List

class SearchResponse(BaseModel):
    id: str
    title: str
    story: str
    attributes: List[str]
    created_at: str
    updated_at: str

class SearchResult(BaseModel):
    hits: int
    results: List[SearchResponse]
    
class CreateBook(BaseModel):
    title: str

class Book(BaseModel):
    id: str
    title: str
    story: str
    attributes: List[str]
    created_at: str
    updated_at: str

utils.py
utils.py

from opensearchpy import OpenSearch, RequestsHttpConnection
from requests_aws4auth import AWS4Auth
import os
import boto3

def get_opensearch_client():
    endpoint = os.environ.get('OPENSEARCH_ENDPOINT', '')
    host = endpoint.replace('https://', '').replace('http://', '')
    
    credentials = boto3.Session().get_credentials()
    region = os.environ.get('AWS_REGION', 'ap-northeast-1')
    
    auth = AWS4Auth(
        credentials.access_key,
        credentials.secret_key,
        region,
        'es',
        session_token=credentials.token
    )
    
    return OpenSearch(
        hosts=[{'host': host, 'port': 443}],
        http_auth=auth,
        use_ssl=True,
        verify_certs=False,
        connection_class=RequestsHttpConnection
    )

def get_dynamodb_table():
  return boto3.resource('dynamodb').Table('Books')

OPENSEARCH_INDEX = 'books'

OPENSEARCH_MAPPING = {
  "mappings": {
    "properties": {
      "id": {"type": "keyword"},
      "title": {"type": "text"},
      "story": {"type": "text"},
      "attributes": {"type": "keyword"},
      "created_at": {"type": "date"},
      "updated_at": {"type": "date"}
    }
  }
}

def build_opensearch_query(keyword):
  return {
    "query": {
      "multi_match": {
        "query": keyword,
        "fields": ["title", "story"]
      }
    }
  }

Frontend

ServerActionsとServiceで関心の分離

以下のようにServerActionsとServiceについて、関心の分離をすることで、コードを見やすくし、リクエストを投げた時に、スキーマによるバリデーションでエラーが起きるのか、またはリクエストの処理の段階、もしくはレスポンスでエラーになっているのかが明確になります。

ServerActions(next-safe-action):

  • zodによる入力値のスキーマバリデーション
  • クライアントからの呼び出しインターフェース
  • 型安全性
  • エラーハンドリング(アプリケーションレベルのエラー処理)

Service:

  • ビジネスロジックの実装に集中できる
  • APIコールの実装
  • レスポンス処理
  • エラーハンドリング(API通信やビジネスロジックに関するエラー処理)
search-books.ts(ServerActions)
'use server'

import { actionClient } from '@/lib/actions/safe-action'
import { searchBooks } from '@/lib/services/books/search-books'
import { searchQuerySchema } from '@/lib/zod/schemas/books'

export const searchBooksAction = actionClient
  .schema(searchQuerySchema)
  .action(async input => {
    const books = await searchBooks({
      params: { keyword: input.parsedInput.keyword }
    })
    return books
  })

search-books.ts(Service)
import 'server-only'
import { path, handleFailed, handleSucceed } from '@/lib/services/utils'
import { SearchBooksRequest, SearchBooksResponse } from '@/lib/types/books'

export const searchBooks = async ({
  params: { keyword }
}: SearchBooksRequest): Promise<SearchBooksResponse> => {
  const url = path(`/books?keyword=${encodeURIComponent(keyword)}`)
  return fetch(url, {
    headers: {
      'Content-Type': 'application/json'
    },
    method: 'GET',
    cache: 'no-store'
  })
    .then(handleSucceed)
    .catch(handleFailed)
}

モダンなUIコンポーネントを提供してくれる shadcn-ui

今回実装では commandのコンポーネントを使用しました。
https://ui.shadcn.com/docs/components/command

shadcn/uiを使うと、実装の省力(検索UI特有の複雑な実装であるドロップダウンやフォーカス管理など)ができて、検索機能のビジネスロジックに集中できます。
結果的にUIの実装にかかる時間を大幅に削減できるのでおすすめです。

Biomeによる高速なLinter & Formatter

今回TypeScriptを使っているのでESLintやPrettierの代わりにBiomeを使用しました。

そもそもBiomeについてですが、以下のような特徴があります。

  • Rustで書かれた高速なコードリンター&フォーマッター
  • ESLintとPrettierの機能を1つのツールに統合したもの
  • パフォーマンスを重視した設計になっていて、高速に動作します(めっちゃ早い)
  • セットアップが簡単なのですぐに導入できる(VSCodeだとBiomeの拡張機能入れて biome.jsonをカスタマイズするだけで完了)

例えば下記のようなMakefileを下記のように書いておくだけで、ターミナル上でmake checkで型チェックとリンターとフォーマットチェック
また、make formatで自身でカスタマイズしたbiomeのルールに沿ってフォーマットが高速に実行されます。

biome-check:
	npx biome check ./src

biome-write:
	npx biome check --write ./src

tsc-check:
	npx tsc --noEmit

check:
	make biome-check
	make tsc-check

format:
	make biome-write

https://biomejs.dev/ja/formatter/

最後に

今後もし機会があれば、日本語テキストの検索精度を上げるために、KuromojiSudachiのような日本語形態素解析ツールの導入したいなーと思っています。

内容が盛りだくさんなので、わかりづらい部分や半ば省略したところがあるかと思いますが、誰かのお役に立てればと思って書きました!🙇
説明

GitHubで編集を提案

Discussion