Re: Pydanticから始める異世界Data Platform
はじめに
こんにちは。Acompanyの田中(@tkrk_p)です。本記事はAcompany5周年アドベントカレンダー24日目の記事となります。
今月Pydantic V2のベータ版がリリースされ、Pydantic V1から約17倍高速化されたとのニュースがありました。FastAPIも即座に追従し、先日Pydantic V2をサポートしたv0.100.0-betaがリリースされました。
その内容については以前、記事にしたので気になる方はこちら↓
Pydanticの大型アップデートに沸いているFastAPI界隈ですが、実はその陰で6月中旬にPydanticから Help us build our roadmap なる興味位深いリリースが出ていたので今回はこちらを紹介していきます。
Pydantic Services Inc.の設立
2023年2月にリリースがあった通り、Pydantic作者のSamuel Colvin氏はPydantic Services Inc.(以下Pydantic社)を立ち上げ、Sequoia Capitalがリードを務めたシードラウンドを2月にクローズ、累計470万ドルを調達していました。
Sequoia CapitalはApple、Google、Zoomといった企業をはじめ、MongoDB、Confluent、dbt Labsなどのオープンソース基盤を持つ新興企業へ初期段階から投資をしてきた著名なVCであり注目の高さが伺えます。
TechCrunchの記事によると、
「私たちはアプリケーションの開発とクラウドへのデプロイをより簡単、安全、より速く、そして最終的には開発者にとってより楽しいものにしていきます。まずは小規模なアプリケーションや機能を扱うエンジニアを支援することから始めますが、長期的な目標は、すべての開発者にとって力を倍増させる存在となり、すべての人の世界を改善できるツールを開発者に提供することです。」
…
したがって、おそらくここで私たちが話しているのは、少なくとも長期的には、Salesforce が所有する Heroku と同様の種類のサービスとしてのプラットフォーム (PaaS) に似たものです。
とあり、今後Pydantic社はPydanticを基軸にPydantic Data Platformと呼ばれるPaaSの展開を構想していることが語られていました。
Pydantic Data Platformとは?
Pydanticの使用の中心となるのは常にデータです。Pydantic はデータにスキーマと健全性をもたらします。一方で、たとえ Pydanticを使ってゴリゴリスキーマ定義して検証しストレスフリーに開発を行えていたとしても、Python から離れたときにデータを操作するのは依然として面倒なことが多いという問題があります。
例えば、FastAPIでWebAPIを開発する際、データベーススキーマやHTTP Request/Responseをただの文字列/数値という分類でなく、Email/Date/URLなどのフィールドを持つPydantic Modelにより定義することで、データをあらゆる表現に変換する際に自動でパース・検証を行えるようになります。一方で、DBに蓄積されたトランザクションデータを分析したい場合はどうでしょうか?
アプリケーション上で収集する際はPydanticによりスキーマ定義していたにもかかわらず、一度Pythonから離れてDBに格納してしまうとPydanticで定義した詳細なスキーマ定義との紐付きは消え、そこには文字列や数値といった型としての情報しか残りません。同じデータを扱っているのに、再度利用する際にはまた別の言語・別のツールでスキーマ定義することになってしまいます。gRPCのProtobufの様な定義言語を使えば多言語向けコンパイルも可能なので、1カ所で定義したスキーマ定義を使いまわせたりします。それでもやはり、記述表現の自由度も下がりますしDBとの連続性は失われてしまいます。
この問題を解決できる”最強”のData PlatformがPydantic Data Platformです。
サービスロードマップ
Pydantic社が考えた”最強”のData Platform、Pydantic Data Platformとは具体的にどういったものなのでしょうか。公開されたロードマップHelp us build our roadmapをもとに構想を見ていきましょう。
1. Python の分析/可観測性
Pythonを用いたアプリケーションのメトリクス・ログ監視、オブザーバビリティ、ビジネスアナリティクスプラットフォームについて、2つの問題があります。
まず1つ目は、「ログ」(何が起こったか)と「メトリクス」(何回起こったか)が別々に管理されていることです。Pythonを含む他の言語でも、これらの情報を同時に記録し、表示する既存のツールにあまり満足いくものはありませんでした。
そして2つ目は、開発者サイドによる「オブザーバビリティ」強化とビジネスサイドによる「ビジネスアナリティクス」が同じデータを扱っているのにもかかわらず、その設定や構築作業が完全に分離されていることです。
Prometheusもメトリクス収集に特化しているためログ収集する際はGrafana Lokiを使ったりElasticsearch・Fluentd・Kibana(EFK)スタックを組み合わせたりとやや構築が面倒ですよね。DatadogやNewRelicといった高機能な監視SaaSツールもありますが、なかなかお高 高機能すぎて小規模開発だと持て余すケースも多いと思います。ビジネスアナリティクスのためのデータ分析基盤を構築する際も、さまざまなデータが複数の管理者によってサイロ的に管理されており統合的に集約して分析するのは一苦労です。
どうせアプリケーションが吐き出すデータが全ての起点になるのだから、これら4つのシーンで使用する全てのツールを一口にまとめたいよね。というのがPydantic社の取り組む課題です。
この課題に対し、Pydantic Data Platformは次の 3 つのツールを提供します。
- Pythonアプリケーションでデータを収集するためのSDK
- 総合的なビューを提供するダッシュボード
- データを自由にクエリするための軽量なPython ORM
このツールは、既に広く使用されているFastAPIを使用するウェブアプリケーションやAPIから、OpenAIやLangChain、HuggingFace、Prefectなどが既に使用しているPydanticパッケージを介して、機械学習の準備やLLMの検証など、さまざまなドメインで利用されることが想定されています。
最終的に、50行程度のスクリプトでコアアプリケーションの監視ダッシュボードやビジネスインテリジェンスレポートが丸っと作成できる環境が整うイメージです。
具体的な使い方を見てみましょう。まずは、データ収集についてです。
from pydantic_analytics import log # 名前は未定
async def transaction(payment: PaymentObject):
...
log("transaction-complete amount={payment.amount}", payment)
上記のコードでは、pydantic_analytics
という名前のSDKを使用してデータを収集しています。 PaymentObject
はPydantic Modelです。 transaction-complete
はこのイベントを一意に識別し、amount
はイベントの概要に表示され、payment
は詳細に表示されます。
これにより、トランザクションの詳細を表示するだけでなく、金額ごとに集計することも可能になります。
次に、Pydanticとの統合について見てみましょう。
from datetime import datetime
from pydantic import BaseModel, EmailStr, SecretStr
class Signup(BaseModel, analytics="record-all"):
email: EmailStr
name: str
password: SecretStr
signup_ts: datetime
@app.post("/signup/")
def signup(signup_form: Signup):
# サインアップは検証時に自動的に記録されます
...
上記のコードでは、Signup
というPydanticモデルが定義されています。このモデルにはanalytics="record-all"
というアナリティクスの設定が追加されています。
この設定により、コードの変更なしにアプリケーションを監視することができます。たとえば、「すべてのバリデーションを記録する」といった指定や、「記録するモデルをホワイトリストにする」といった設定が可能です。さらに、Secret
で定義された型のフィールドは、監視や分析上では自動的に非表示化されます。
Pydantic Data Platformでは、さまざまなエンティティを定義し、新しいログメッセージを送信するのと同じようにこれらのエンティティを更新することができます。例えば、顧客のエンティティには、last_login
というフィールドがあり、customer-login user_id=xxx
というログメッセージが受け取られるたびに更新されます。
また、既存のユニークなIDを使用してエンティティ間をリンクすることも可能です。これにより、Pydantic Data Platformは、ログやBIツールとしてだけでなく、アプリケーションデータの管理ビューとしても使用することができます。
さらに、他のソースからのログデータを収集することも可能です。Opentelemetryを使用することで、さまざまな技術からデータを受け取ることができます。また、直接の統合や他の言語のSDKの開発を容易にするために、優れたAPI(OpenAPIドキュメント、エラーメッセージなど)も提供されます。
Pydantic学習しておけばDB連携も監視もビジネス分析もできる。個人開発者から膨大なデータを持つ大企業まで色々なシチュエーションで活用できそうですよね。
2. オブジェクト ストア用のデータ ゲートウェイ
AWSのS3はデフォルトでキーバリューストアとして機能し、特定のパス以下のすべてのBLOBが特定のスキーマがあることや、すべてのオブジェクトがJSONファイルであることを強制することはできません。S3コンソールでは、キーをデリミタに基づいたフォルダ構造で整理していますが、フォルダに移動してもデータについてはサンプルを開いたり、それらを生成したソースコードを確認しない限り何も分かりません。
この課題に対し、Pydantic Data PlatformはクライアントとS3などのデータストレージの間にデータバリデーション用リバースプロキシやゲートウェイとして機能するスケーラブルでパフォーマンスの高いサービスを提供します。
このサービスでは、Pydanticモデルを使用してスキーマを定義しますが、Pydantic単体では得られないいくつかの機能を提供します。
- JSON、CSV、Excel、Parquet、Avro、msgpackなど、さまざまな入力形式をサポートし、自動的にストレージ形式に変換します。
- JSONとParquetをはじめとする複数のストレージ形式をサポートします。Delta LakeテーブルやIcebergテーブルも将来的に追加される可能性があります。
- HTTP API、S3準拠のAPI(aws s3 cpなどの利用も可能)、Python SDK/ORMを介してデータをアップロードおよびダウンロードするための複数のインターフェースを提供します。
- 画像、動画、ドキュメントなどの任意のバイナリデータ形式をサポートし、形式のチェックや画像や動画の解像度の確認などのバリデーションを行います。
- その他、スキーマの知識を活用してS3のコストとパフォーマンス、および開発者のエクスペリエンスを改善するための機能を順次追加していく予定です。
- (オプションで)成功したアップロードと失敗したアップロードをログ/分析サービスに記録することも可能です。
このソリューションでは、バリデーションと変換はRustで実装され、各プロセスが多くの異なるスキーマのバリデーションを実行できるため、Pydanticを使用したPythonサービスを実行するよりも低コストで提供することができます。
例えば、特定のプレフィックスににユーザーからの問い合わせデータをアップロードし、JSONとして保存したい場合、以下のような方法が考えられます。
from datetime import datetime
from pydantic import BaseModel, EmailStr
from pydantic_gateway import JsonDataset # 名前は未定
class CustomerInquiry(BaseModel):
email: EmailStr
name: str
inquiry_ts: datetime
dataset = JsonDataset("my-bucket/inquiries/", CustomerInquiry)
dataset.register()
upload = dataset.create_upload_url(expiration=3_600)
print(upload.url)
#> https://gateway.pydantic.dev/<account-id>/<uuid>/?key=e3ca0f89etc
# バリデーションは直接Pythonから行います
# (注:バリデーションはローカルではなく、サーバー上で実行されます)
dataset.upload({'email': 'jane.doe@example.com', ...})
問い合わせに関するデータは、別のサービスを介して以下のように追加することができます。
$ curl -X POST \
-d '{"email": "joe.bloggs@example.com"}' \
'https://gateway.pydantic.dev/<account-id>/<uuid>/?key=e3ca0f89etc'
または、awscliを使用してCSVファイルから複数の問い合わせをアップロードすることもできます。
$ aws s3 cp \
--endpoint-url https://<account-id>.s3.pydantic.dev \
inquiries.csv s3://<uuid>
その後、Pythonからデータを読み取ることができます。
print(dataset.read(limit=100))
#> [CustomerInquiry(email='jane.doe@example.com', ...), ...]
ここでの利点は、顧客の問い合わせを送信するサービスがスキーマエラーを起こした場合、データの受け入れが拒否されるか、後で確認するための隔離スペースに保存されるようにできることです。
S3が提供する最も強力なツールの1つは、pre-signed URLです。
pre-signed URLのサポートを提供し、データバリデーションが保持されたまま、データセット全体のアップロードエンドポイントの作成にも対応する予定です。つまり、データバリデーションが保持されたpre-authorized URLを生成できるようになります。
from pydantic_gateway import FileDataset, image_schema
profile_picture = image_schema(
output_width=100,
output_height=100,
output_max_size=1_000,
output_formats="png",
)
dataset = FileDataset(
"my-bucket/profile-pics/",
profile_picture
)
file_upload = dataset.create_file_upload_url(
"john-doe.jpg",
expiration=60
)
print(file_upload.url) # クライアントへのpre-signed URLの返却
#> https://gateway.pydantic.dev/<account-id>/<uuid>/upload?path=/users/1.jpg&key=e3ca0f89etc
# ユーザーの現在の写真を更新する前に、クライアントがデータをアップロードするのを待つ
await file_upload.completed()
file_contents = await file_upload.download()
...
これにより、データバリデーションが保持されたまま、事前承認されたURLを生成できるようになります。
3. データ ウェアハウス用のデータ ゲートウェイ
前述のコンポーネントは、S3のようなオブジェクトストアに限らず、データウェアハウス(SnowflakeやBigQueryなど)でも有用です。データウェアハウス向けにも同様のサービスを提供します。
基本的に、Pydanticモデルとテーブル識別子を提供すると、データをバリデーションした後、行としてテーブルに挿入するためのエンドポイントを提供します。
from pydantic_gateway import BigQueryTable
dataset = BigQueryTable(
"my-project.my-dataset.inquiries",
CustomerInquiry
)
upload = dataset.create_upload_url(
expiration=3_600,
)
print(upload.url)
#> https://gateway.pydantic.dev/<account-id>/<uuid>/upload?key=e3ca0f89etc
上記の様に、データウェアハウスの前面で検証したいというニーズがある一方で、データウェアハウスとパイプラインの構成と運用方法について多くのチームにヒアリングした結果、これは課題の1 つにすぎないことがわかっています。長期的には、データウェアハウス内でデータが移動する際に適用される変換を定義するための宣言的な方法を提供することに大きな価値があると考えています。
4. スキーマカタログ(個人的イチ押し機能)
最も信頼性の高いスキーマとしてPydanticモデルが随所で利用されている一方で、これらのモデルがしばしば複数のリポジトリに分散してサイロ化してしまうことがあります。これらのスキーマを簡単に見つけて使用するのは困難であり、変更の追跡やこれらの変更が組織内の異なる部分にどのような影響を与えるかを把握することも難しいです。
そのため、次の情報を含むスキーマを一元的に表示するPydanticスキーマカタログを提供します:
- Pydanticモデルのコード
- JSONスキーマに基づくSwagger/Redocのドキュメント
- このスキーマを使用するデータソース
- 外部キーなどを介した他のスキーマとの関連付け
- スキーマの変更履歴(後方互換性があるかどうか、どのようなマイグレーションロジックが必要かなど)
上記のコンポーネント(ObservabilityおよびData Gateway)と組み合わせることで、スキーマから直接そのスキーマで保存されたデータにアクセスすることができます。
スキーマカタログでは、スキーマを表示および管理するためのWebインターフェースと、スキーマカタログと対話するためのCLIも提供します。
スキーマカタログの例:
from datetime import datetime
from pydantic import BaseModel, EmailStr
class CustomerInquiry(BaseModel):
email: EmailStr
name: str
inquiry_ts: datetime
import pydantic_schema_catalogue # 名前は未定
pydantic_schema_catalogue.register(CustomerInquiry)
別のプロジェクトで次のように使用できます
$ pydantic-schema-catalogue list
... # スキーマ `User` を Pydanticモデルとしてダウンロード
$ pydantic-schema-catalogue get --format pydantic User > user_model.py
# スキーマ `User` を PostgresのSQLとしてダウンロードしてテーブルを作成
$ pydantic-schema-catalogue get --format postgres User >> tables.sql
スキーマカタログは、前述の監視ツールやORM、BIなどの他のコンポーネントで共通して利用可能です。
- ログに記録されたモデルのスキーマは、自動的にスキーマカタログに登録される
- スキーマカタログのスキーマにより、1, 2回のクリック、もしくはCLIコマンドを使用してサクッとバリデーションエンドポイントが作成可能になる
データにはしばしばスキーマがなく、包括的なスキーマをリバースエンジニアリングすることは手作業で苦痛なプロセスです。
Pydanticスキーマカタログは、データセットからスキーマを推論する方法を提供し、データのサンプルから新しいスキーマを初期化できる機能を提供します。
$ pydantic-schema-catalogue infer --name 'Customer Inquiry' inquiries.csv
5. Pydantic モデルを活用したダッシュボードと UI
データ収集の主な目的は、収集したデータからインサイトを得て、データドリブンで意思決定を行えるようにすることです。しかし、しばしばインサイトや意思決定の責任者は、非エンジニアチームのメンバーだったり、場合によっては組織の外部にいます。そのため、データウェアハウスにデータを収集して終了ではなく、組織の他のメンバーがデータを視覚化し、データと対話できるようにBIツールなどを用意する必要があります。
一方で、この構築作業に1週間や1か月もの時間をかけたくないし、リソースを張れる人がいないので保守や拡張はできるだけ避けたい…と思われるケースは多いかと思います。
そんな課題に対して、PydanticモデルとPythonコードで動作するUIを数分で構築できるPydantic ダッシュボードというサービスを提供します。
Pydantic ダッシュボードでは、以下の様な機能を提供します。
- ホスティング、スケーリング、メンテナンスの管理、および認証の強制などの機能をマネージド提供
- UIコンポーネントとして一般的なもの(テーブル、ページネーション、フォーム、チャート)が標準で使える
- 上記のコンポーネントの上に構築したORMを使用してデータと対話するためのシンプルでパワフルなインタフェースを提供
上記で定義した Customer Inquiry
を例に、以下の様にコードを書けば上記の全ての機能を利用可能になるイメージです。
from datetime import datetime
from typing import Literal
from pydantic_dash import app, components
from pydantic_dash.auth import GoogleAuth
from pydantic_dash.responses import Redirect
from pydantic_db import DatabaseModel, DayAgg
from pydantic import EmailStr
app.auth(GoogleAuth(domain='my-company.com'))
class CustomerInquiry(DatabaseModel):
email: EmailStr
name: str
inquiry_ts: datetime
source: Literal['website', 'email', 'phone']
@app.view('/', buttons={
'/new/': 'Create New',
'/list/': 'View All'
})
def inquiries_analytics():
# データベースから最近の問い合わせをクエリする
recent_inquiries = (
CustomerInquiry.db_all()
.order_by('inquiry_ts', desc=True)
.limit(1_000)
)
# パイチャートとバーチャートの2つのコンポーネントを返す
return components.Row(
# チャートはORMと一緒に設計されており、
# 例えば、ここではチャートが`groupby`で暗黙的に`.count()`を推測します
components.PieChart(
recent_inquiries.groupby('source'),
title='Inquiry Sources',
),
components.BarChart(
recent_inquiries.groupby(DayAgg('inquiry_ts')),
x_label='Date',
y_label='Inquiry Count',
),
),
# list_viewを使用することで、クエリ結果が自動的にテーブルとして表示され、ページネーションされます
@app.list_view('/list/', buttons={'/new/': 'Create New'})
def inquiries_list():
return CustomerInquiry.db_all().order_by('inquiry_ts', desc=True)
# form_viewはGETとPOSTのフォームエンドポイントを提供します
# GETビューでは、`CustomerInquiry`モデルに基づいたHTMLフォームがレンダリングされます
# POSTビューでは、リクエストを`CustomerInquiry`モデルで検証し、関数を呼び出します
@app.form_view('/new/', CustomerInquiry)
def new_inquiry(inquiry: CustomerInquiry):
inq_id = inquiry.db_save()
return Redirect(f'/{inq_id}/')
既存の高機能なBIツールを置き換える用途というより、サクッとダッシュボードやシンプルなアプリケーションを構築したいというシンプルなユースケースにて真価を発揮しそうですね。
まとめ
本記事ではPydanticを軸に最高の開発者体験を提供するData Platform、その名もPydantic Data Cloudのロードマップを紹介しました。
Pydanticでは現在、今回紹介した機能の優先順位をつけるためにユーザアンケートを募集しています。
今回答するとクローズドベータ版のテスターに招待してもらえるそうなので、気に入った機能があれば是非この機会に回答してみてはいかがでしょうか。
Discussion