🚀

【AWS】 FastAPI + Streamlit + DocumentDB構成で、簡易的な顧客情報入力フォームを作る(2)

2022/05/20に公開


はじめに

ご覧いただきありがとうございます。阿河です。

前回に引き続き 「FastAPI + Streamlit + DocumentDB構成」で簡単な顧客情報入力フォームを作り、DocumentDBとのデータ連携を試していきたいと思います。

対象者

  • AWSを運用中
  • DynamoDBやDocumentDBなどデータベースの挙動を確かめてみたいが、コーディングスキルがなく検証できないとお悩みの方
  • スキルの幅は広げたいが、フロントエンドまで手を出せない方
  • MongoDB互換のDocumentDBを触ってみたい方

なお当方コーディング経験が浅いため、開発経験が長い方からすると効率の悪いコードの書き方をしている箇所があると思います。ご了承いただけると幸いです。

概要

(★)がついているセクションは、今回手を動かして頂く項目です。

  1. 今回のハンズオン構成
  2. EC2にFastAPI,Streamlitをインストール
  3. FastAPIの簡易テスト: GETリクエストを投げる
  4. Streamlitで入力フォーム作成
  5. DocumentDBの構築~接続テスト~MongoDBの挙動確認
  6. 入力フォーム ⇒ DocumentDB(★)
  7. Streamlitで顧客情報ページを作成(★)
  8. DocumentDB ⇒ 顧客情報ページ(★)
  9. 挙動確認(★)

各種公式ドキュメントを参照しながら進めていきます。
前回構築したFastAPI/Streamlit側とDocumentDB間を連携させていきます。

6. 入力フォーム ⇒ DocumentDB(★)

PyMongoのインストール

まずPyMonngoをインストールしましょう。
PythonからMongoDBを操作できるようにします。

$ pip3 install pymongo
$ pip3 list

pymongoがインストールされていることを確認します。

PyMongoからDocumentDBへの接続/書き込み処理の実施

新規ファイルを作成しましょう。

$ cd fast-app
$ vi database.py

database.pyでは、PyMongoを利用してDocumentDBへのCRUD操作を実行します。

公式ドキュメントで、Python~DocumentDB間の接続方法を確認しましょう。

・MongoDBクライアントを作成し、DocumentDBへの接続を開始します。
・使用するデータベースを指定
・使用するコレクションの指定
・ドキュメントの挿入(Create)
・ドキュメントの情報取得
・取得したドキュメント情報の表示
・接続を終了

上記ドキュメントやPyMongoのドキュメントのコード例を参考に、コレクション内に3つのドキュメントを挿入するpythonファイルを作成します。

database.py

#インポート
import pymongo
import sys

#クライアントの作成とDocumentDBとの接続
client = pymongo.MongoClient('mongodb://[※EC2のIPv4アドレス]:[※パスワード]@[※DBホスト].cluster-cserfiucxgkt.ap-northeast-1.docdb.amazonaws.com:27017/?ssl=true&ssl_ca_certs=rds-combined-ca-bundle.pem&replicaSet=rs0&readPreference=secondaryPreferred&retryWrites=false')

#データベースの指定
db = client.customer_db
#コレクションの指定
col = db.customer

#ドキュメントの挿入
db.col.insert_one({"ID":"1", "Name":"test taro" })
db.col.insert_one({"ID":"2", "Name":"test jiro" })
db.col.insert_one({"ID":"3", "Name":"test saburo" })

#コレクション内の全ドキュメントの検索
result = db.col.find()

#接続の終了
client.close()

client = pymongo.MongoClient('mongodb:// ~)の接続先は、「DocumentDB⇒※クラスターの指定⇒接続とセキュリティタブ⇒アプリケーションでこのクラスターに接続する」に記載のあるエンドポイントを指定しました。

実行してみましょう。

エラーが起こります。
オプションの指定に問題がありそうです。

公式ドキュメントの「TLSが有効な場合の接続」に記載あるコードを再度確認します。

client = pymongo.MongoClient('mongodb://<sample-user>:<password>@sample-cluster.node.us-east-1.docdb.amazonaws.com:27017/?tls=true&tlsCAFile=rds-combined-ca-bundle.pem&replicaSet=rs0&readPreference=secondaryPreferred&retryWrites=false')
#先程実行したコード
?ssl=true&ssl_ca_certs

#ドキュメントに記載あるコード
?tls=true&tlsCAFile

TLSの設定に問題があるかもしれません。
デフォルトでは、TLSを使用して、アプリケーションとAmazonDocumentDBクラスター間の接続の暗号化が有効化になっているようです。


パラメータグループを確認すると、確かにTLSが有効化されています。

接続先の指定で、SSLをTLSに書き換えます。
※結論としては公式ドキュメントの「TLSが有効な場合の接続」に記載のあるコードに従います。

database.py

#インポート
import pymongo
import sys

#クライアントの作成とDocumentDBとの接続
client = pymongo.MongoClient('mongodb://[※EC2のIPv4アドレス]:[※パスワード]@[※DBホスト].cluster-cserfiucxgkt.ap-northeast-1.docdb.amazonaws.com:27017/?tls=true&tlsCAFile=rds-combined-ca-bundle.pem&replicaSet=rs0&readPreference=secondaryPreferred&retryWrites=false')

#データベースの指定
db = client.customer_db
#コレクションの指定
col = db.customer

#ドキュメントの挿入
db.col.insert_one({"ID":"1", "Name":"test taro" })
db.col.insert_one({"ID":"2", "Name":"test jiro" })
db.col.insert_one({"ID":"3", "Name":"test saburo" })

#コレクション内の全ドキュメントの検索
result = db.col.find()
print(result)

#接続の終了
client.close()

実行すると、特にエラーは起こらず完了したようです。

試しにmongoコマンドからDocumentDBに接続して、正常にデータベースへの書き込みが行われているかを確認しましょう。

接続方法については前回解説していますが、DocumentDB⇒※クラスターの指定⇒接続とセキュリティタブ⇒mongo シェルでこのクラスターに接続する」に記載のあるコマンドを実行ください。

DocumentDBにログインしたら、データベースの中身を確認します。

#データベースの表示
> show dbs
customer_db  0.000GB

#コレクションの表示
> use customer_db
switched to db customer_db
> show collections
col

#コレクション内の全ドキュメント検索
> db.col.find();
{ "_id" : ObjectId("627fba600090af734d94c430"), "ID" : "1", "Name" : "test taro" }
{ "_id" : ObjectId("627fba600090af734d94c431"), "ID" : "2", "Name" : "test jiro" }
{ "_id" : ObjectId("627fba600090af734d94c432"), "ID" : "3", "Name" : "test saburo" }

新規でデータベース/コレクションが作成され、コレクション内に3つのドキュメントが書き込まれています。

main.pyにPOSTリクエスト処理を作成

データベースの書き込みができたので、次にやることは「Streamlitに書き込まれたデータを、DocumentDBに書き込むこと」です。

Streamlitの処理に関わるapp.pyを確認しましょう。
うちデータベースへの書き込みに関係ありそうなコードは下記です。

app.py

customer_information = {
            'customer_name': customer_name,
            'customer_age' : customer_age,
            'customer_gender': customer_gender,
            'customer_address': customer_address,
            'customer_mail': customer_mail

        }
        submit_button = st.form_submit_button(label='Send')
        
    if submit_button:
        
        update_page()

ユーザーが入力フォームに書き込んだデータを、customer_informationとしてまとめます。

submit_buttonが押されたら、update_page(Streamlitの画面上の表示に関わる処理)とは別に、データーベースにcutomer_informationを流し込む処理を追加すればよさそうです。

Streamlit(app.pyの処理)でFastAPI(main.pyの処理)にPOSTリクエストを投げて、POSTリクエストを受け取ったらDocumentDBにデータを流す(main.pyでdatabase.pyの処理を呼び出す)ようにします。

まずはmain.py側で受け入れ体制を作ります。

main.py

from fastapi import FastAPI
import uvicorn
import random
import database
from pydantic import BaseModel #BaseModelのインポート

#データモデルの定義
class Customer_Info(BaseModel):
    customer_name: str
    customer_age: int 
    customer_gender: str
    customer_address: str
    customer_mail: str

app = FastAPI()

@app.get("/test")
async def test():
    return {
            "id": random.randint(1,10),
            "name": "name",
            "address": "Tokyo",
            "age": 30
            }

#POSTリクエスト処理
@app.post("/customer_info")
async def create_info(response: Customer_Info):
    return response

if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000)

Customer_Info型のデータ構造定義と、POSTリクエスト処理を追加しました。
http://EC2のパブリックIPアドレス:8000/customer_Info に対してPOSTリクエストがあった場合、create_infoメソッドを実行します。
リクエストボディについては、こちらを参照ください。

POSTリクエストに対して、レスポンスを返すかを検証しましょう。
検証のためのファイルを新規作成します。

test.py

import requests
import json

def main():

    url = 'http://127.0.0.1:8000/customer_info/'
    
    #POSTリクエストに投げるボディの定義
    body = {
            "customer_name": "Test-Taro",
            "customer_age": 30,
            "customer_gender": "MEN",
            "customer_address": "Kanto",
            "customer_mail": "test@example.com"
           }
  
   #POSTリクエストを投げる
    res = requests.post(url, json.dumps(body))
    print(res.json())

if __name__ == "__main__":

    main()

/customer_infoに対して、POSTリクエストを投げます。
requests.postについては、こちらを参照ください。

結果はどうでしょうか。

$ python3 test.py
{'customer_name': 'Test-Taro', 'customer_age': 30, 'customer_gender': 'MEN', 'customer_address': 'Kanto', 'customer_mail': 'test@example.com'}

POSTリクエストの結果がresに入り、そのresをprintで表示しました。

POSTリクエスト処理をデータベース側に反映

次にPOSTリクエスト結果をデータベースに反映します。

database.py

import pymongo
import sys
import json

#書き込み処理をメソッド化する
#「main.py」からデータをもらう
def create_info(data):

    #クライアント作成と接続先の設定
    client = pymongo.MongoClient('mongodb://[※EC2のIPアドレス]:[※パスワード]@[※DBホスト].cluster-cserfiucxgkt.ap-northeast-1.docdb.amazonaws.com:27017/?tls=true&tlsCAFile=rds-combined-ca-bundle.pem&replicaSet=rs0&readPreference=secondaryPreferred&retryWrites=false')
  
    #データベースとコレクションの指定
    db = client.customer
    cus_info = db.customer_info
  
   #ドキュメントの挿入
    insert_data = db.col.insert_one(
            {
	      #「main.py」から引数としてもらったdataから、各値を抜き出す
                "customer_name" :  data['customer_name'],
                "customer_age" : data['customer_age'],
                "customer_gender" : data['customer_gender'],
                "customer_address" : data['customer_address'],
                "customer_mail" : data['customer_mail']
                }
            )

  #接続の終了
    client.close() 

MongoDBへの接続/書き込みを行うdatabase.pyは、「main.pyからデータを辞書型でもらう⇒辞書からデータを引き出して、MongoDBに渡す」処理を行います。

またdatabase.pyは、main.pyから呼びだす形にするのでメソッド化しておきます。

main.py

from fastapi import FastAPI
import uvicorn
import random
import database
from pydantic import BaseModel

#データ構造の定義
class Customer_Info(BaseModel):
    customer_name: str
    customer_age: int 
    customer_gender: str
    customer_address: str
    customer_mail: str

#インスタンス化
app = FastAPI()

#GETリクエスト
@app.get("/test")
async def test():
    return {
            "id": random.randint(1,10),
            "name": "name",
            "address": "Tokyo",
            "age": 30
            }

#POSTリクエスト
#/customer_infoにPOSTリクエストが届いたら、create_infoメソッドを実行する
@app.post("/customer_info")
async def create_info(response: Customer_Info):
    database.create_info(dict(response))


if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000)
    

http://EC2のアドレス:8000/customer_info にPOSTリクエストがあったら、database.pyのcreate_infoメソッドを呼び出します。

メソッドが実行されると、POSTリクエストで受け取ったCustomer_Info型のresponseを、dict型に変換してdatabase.pyに渡します。

最後にPOSTリクエストを送るtest.pyを微修正します。

test.py

import requests
import json

def main():

    url = 'http://[※EC2のIPv4アドレス]:8000/customer_info/'
    body = {
            "customer_name": "Test-Taro",
            "customer_age": 30,
            "customer_gender": "MEN",
            "customer_address": "Kanto",
            "customer_mail": "test@example.com"
           }
    requests.post(url, json.dumps(body)
    
if __name__ == "__main__":
    main()

必要ないコードを削除しました。
POSTリクエストを投げることができれば十分です。

ではtest.pyを実行してみましょう。
エラーが発生しなければ、DocumentDBにログインして、データが書き込まれているかを確認します。

> db.col.find();
{ "_id" : ObjectId("62811df73b3f6f74c6c5130c"), "customer_name" : "Test-Taro", "customer_age" : 30, "customer_gender" : "MEN", "customer_address
" : "Kanto", "customer_mail" : "test@example.com" }

無事データが書き込まれました。

StreamlitからのPUTリクエストをmain.py側で検知する

ここまで出来たら、Streamlit側の調整をしましょう。
今まではtest.pyからPOSTリクエストを投げてましたが、Streamlitの処理を担っているapp.pyからリクエストを投げるようにします。

app.py

import streamlit as st
import requests
import json


url = 'http://[※EC2のIPv4アドレス]:8000/customer_info'
page = st.sidebar.selectbox('Choose your page', ['INPUT FORM', 'RESULT'])

def update_page():
    st.balloons()
    st.markdown('# Thank you for information')
    st.json(customer_information)

※POSTリクエストを投げる
def create_info():
    requests.post(url,json.dumps(customer_information))


if page == 'INPUT FORM':
    st.title('INPUT FORM')

    with st.form(key='customer'):
        customer_name: str = st.text_input('NAME', max_chars=15)
        customer_age: int = st.text_input('AGE', max_chars=3)
        customer_gender = st.radio("GENDER",('MEN', 'Women'))
        customer_address = st.selectbox('COUNTRY',
                ('Hokkaido', 'Tohoku', 'Kanto', 'Chubu', 'Kinki', 'Kansai', 'Chugoku', 'Shikoku', 'Kyusyu', 'Okinawa'))
        customer_mail: str = st.text_input('Mail Address', max_chars = 30) 


        customer_information = {
            'customer_name': customer_name,
            'customer_age' : customer_age,
            'customer_gender': customer_gender,
            'customer_address': customer_address,
            'customer_mail': customer_mail

        }
        submit_button = st.form_submit_button(label='Send')
        
    if submit_button:
        
        update_page()
	※サブミットボタンが押されたら、POSTリクエストを投げる
        create_info()

POSTリクエストを投げるタイミングは、Streamlit上で入力処理が終わり、サブミットボタンが押されたタイミングです。

押したらどうなりますか?
DocumentDBにログインしてみます。

> db.col.find();
{ "_id" : ObjectId("62811ff8dab22a68c352aaf9"), "customer_name" : "Test-Taro", "customer_age" : 30, "customer_gender" : "MEN", "customer_address
" : "Kanto", "customer_mail" : "test@example.com" }
{ "_id" : ObjectId("62814328f178a09e19c471dd"), "customer_name" : "test-shirou", "customer_age" : 36, "customer_gender" : "MEN", "customer_addre
ss" : "Hokkaido", "customer_mail" : "test@example.com" } 

Streamlitの入力情報が反映されました。

ひとまずCreate処理の実装は完了です。

7. Streamlitで顧客情報ページを作成

データベースから顧客情報を読み込んで、Streamlit上の画面に表示します。
登録されている顧客名をセレクトボックスで選択することができ、選択をすると その顧客の情報をJSON形式で画面に表示します。

app.py

import streamlit as st
import requests
import json

url = 'http://[※EC2のIPv4アドレス]/customer_info'
page = st.sidebar.selectbox('Choose your page', ['INPUT FORM', 'RESULT'])

・・・・・・
#セレクトボックスでINPUT FORMを選択した場合
if page == 'INPUT FORM':
    st.title('INPUT FORM')

    with st.form(key='customer'):
        customer_name: str = st.text_input('NAME', max_chars=15)
        customer_age: int = st.text_input('AGE', max_chars=3)

・・・・・・
#セレクトボックスでRESULTを選択した場合
#CUSTOMER_INFORMATIONページの作成
elif page == 'RESULT':
    st.title('CUSTOMER_INFORMATION')

    customer_information = {
            'customer_name': "test taro",
            'customer_age' : 30,
            'customer_gender': "MAN",
            'customer_address': "Kanto",
            'customer_mail': "test@example.com"
        }
    st.json(customer_information)

elif 以下の処理で、新しいページの表示について実装を行います。

Streamlit画面⇒左のセレクトボックスでRESULTを選択⇒DocumentDBに保存したユーザーの情報が表示されるようにしていきます。

現状では左のセレクトボックスとページを構成できたので、次のセクションで仕上げていきます。

8. DocumentDB ⇒ 顧客情報ページ

最後のセクションとして、データベースに保存しているデータを取り出して、Streamlitに情報を反映します。

database.py側で、必要なメソッドを用意する

まずdatabase.pyを編集して、DocumentDBからデータを引き出せるかを確認します。

コレクション内のドキュメントをすべて取得するメソッドを作成します。

database.py

import pymongo                    
import sys
import json            
from bson.json_util import dumps   #dumpsのインポート                               


#ドキュメントの挿入
def create_info(data):                                                                                            
    client = pymongo.MongoClient('mongodb://[※EC2のIPアドレス]:[※パスワード]@[※DBホスト].cluster-cserfiucxgkt.ap-northeast-1.docdb.amazonaws.com:27017/?tls=true&tlsCAFile=rds-combined-ca-bundle.pem&replicaSet=rs0&readPreference=secondaryPreferred&retryWrites=false')                                                                                      
    db = client.customer                
    cus_info = db.customer_info
    insert_data = db.col.insert_one(                                                                              
            {                                                                      
                "customer_name" :  data['customer_name'],                           
                "customer_age" : data['customer_age'],                               
                "customer_gender" : data['customer_gender'],                         
                "customer_address" : data['customer_address'],                       
                "customer_mail" : data['customer_mail']                             
                }                                                               
            )                                                                                         
    client.close()                                                                                     

#コレクション内のすべてのドキュメントを取得(idフィールドの情報は除外)
def Read_all_info():                                                                                              
    client = pymongo.MongoClient('mongodb://[※EC2のIPアドレス]:[※パスワード]@[※DBホスト].cluster-cserfiucxgkt.ap-northeast-1.docdb.amazonaws.com:27017/?tls=true&tlsCAFile=rds-combined-ca-bundle.pem&replicaSet=rs0&readPreference=secondaryPreferred&retryWrites=false')                                                                                      
    db = client.customer                                                   
    cus_info = db.customer_info                                                    
    result = db.col.find({},{'_id': 0}) #idフィールドを0(非表示)として指定
    return dumps(result) #str型に変換して返す                                                       
    client.close()

find()を使用するとコレクション内のすべてのドキュメントを取得できますが、パラメータの指定によって取得対象を制限できます。
queryパラメータおよびprojectionパラメータを参照ください。

database.pyが狙い通り挙動するか検証するために、test.pyを編集します。

test.py

import database
import json


def main():
  
   #メソッドの返り値を格納
    all_info = database.Read_all_info()
    
    #返り値のデータ型を確認
    print(type(all_info)) 
 
    #str型からlist型に変換
    re_all_info = json.loads(all_info)

    #データ型とそれぞれのデータの確認
    print(type(re_all_info)) 
    print(re_all_info) 


if __name__ == "__main__":

    main()

Read_all_info(すべてのドキュメントを取得)の結果を、list型に変換しています。

test.pyを実行してみましょう。

$ python3 test.py

#データ型の確認
<class 'str'>

#②json.loads後のデータ型の確認
<class 'list'>

#json.loads後のデータの中身
[{'customer_name': 'Test-Taro', 'customer_age': 30, 'customer_gender': 'MEN', 'customer_address': 'Kanto', 'customer_mail': 'test@example.com'}, {'customer_name': 'test-shirou', 'customer_age': 36, 'customer_gender': 'MEN', 'customer_address': 'Hokkaido', 'customer_mail': 'test@example.com'}]

Read_all_infoで返ってきたstr型のデータを、json.loadでlist型に変換しました。

現在コレクション内には「Test-Taro」さんと「test-shirou」さんのドキュメントが入っているので、上記の取得結果で正解です。

Streamlit側でdatabase.pyのメソッドが利用できるか確認する

database.pyのRead_all_infoを利用して、Streamlitの顧客管理ページにDocumentDBに格納されているユーザー情報を表示します。

app.py

import streamlit as st
import requests
import json
import database


url = 'http://[EC2のIPアドレス]:8000/customer_info'
page = st.sidebar.selectbox('Choose your page', ['INPUT FORM', 'RESULT'])

def update_page():
    st.balloons()
    st.markdown('# Thank you for information')
    st.json(customer_information)

def create_info():
    requests.post(url,json.dumps(customer_information))

#database.pyのRead_all_info()を実行⇒listに変換して返す
def read_all_info():
    all_info = database.Read_all_info()
    re_all_info = json.loads(all_info)
    return re_all_info

if page == 'INPUT FORM':
    st.title('INPUT FORM')

    with st.form(key='customer'):
        customer_name: str = st.text_input('NAME', max_chars=15)
        customer_age: int = st.text_input('AGE', max_chars=3)

・・・・

elif page == 'RESULT':
    st.title('CUSTOMER_INFORMATION')

  #read_all_infoを実行して、jsonで表示
    customer_information = read_all_info()
    st.json(customer_information)

Streamlitを起動させて、左のセレクトボックスから「RESULT」を選択します。

ページ読み込み時に、自動的にDocumentDBから情報を取ってきています。
この後INPUT FORMページから新しいユーザー情報を登録したとしても、RESULTページに切り替える際に再度DocumentDBからの読み込み処理が走る想定です。

9.挙動確認

では最後に挙動確認を行います。

StreamlitとFastAPIを両方とも起動させます。

RESULTのページを開くと、現在登録されている2つのドキュメントの情報が表示されています。
INPUT_FORMで新規ユーザーを追加して、RESULTに情報が追加されるかを確認します。

INPUT FORMでユーザー情報を登録すると、DocumentDBに書き込みが行われます。
そしてRESULTページに切り替えると、DocumentDBの読み込みが走るので、すぐ登録情報が反映されました。

さいごに

以上でハンズオンは終わりです。
改善する箇所や追加実装が必要な場所は山ほどありますが、今回作ったものをベースに色々遊べると思います。

私のように仕事でコーディングを行わない人間でも、FastAPIやStreamlitなどの便利なフレームワークを使えば実験ができることが分かりました。

今回は分量が大きくなってしまいましたが、お付き合い頂きありがとうございました。
お疲れ様でした!!

MEGAZONE株式会社 Tech Blog

Discussion