Snowflakeでロード前に前処理を行う - ExcelファイルをSnowflakeにロードする方法 -
はじめに
Snowflakeでは、 Snowpark Python UDFとCOPY FILES コマンドを組み合わせて、外部ファイルをSnowflakeへロードする前に前処理をすることができます。
この記事では、ステージ上に配置した ExcelファイルをCSVファイルに変換し、テーブルへロードする手順を紹介します。
おそらく、一番のユースケースはAI・ML向けに非構造化ファイルを前処理するケースですが、テーブルへロードする際の前処理にも本記事の処理が利用できます。
この記事は、NTTデータ Snowflakeアドベントカレンダーの2日目です。
前処理の流れ
以下のような流れで、Snowpark Python UDFとCOPY FILESコマンドによりSnowflake内でExcelファイルをCSVファイルに変換する前処理を行います。
- 生データ用ステージ(
@rowdata_stage)へファイルをアップロードする。 - Snowpark Python UDF(
convert_to_csv_from_excel)で、生データ用ステージにあるファイルをスコープ付きURLで参照し、データを処理し、 処理済みファイルを参照するスコープ付きURL を出力する。 - COPY FILESコマンドにより、スコープ付きURLで参照したファイルをロード用ステージ(
@load_stage)へコピーする。
上記の処理の肝はスコープ付きURLをUDFの入出力に使うことであり、ファイル加工にのみ焦点をあてたUDFを作成することができます。
処理済みファイルを参照するスコープ付きURL
UDFが処理したファイルをUDF外部から参照させるため、処理済みファイルを参照するスコープ付きURLを返します。
実装については本記事の手順 4. Python UDFでExcelファイルをCSVへ変換する を参照してください。
今回はステージ上のファイルをCOPY INTOコマンドでロードするため、以下のドキュメントを参考に、COPY FILESコマンドで別のステージに書き込みます。
ファイルが UDF から返された後は、大文字と小文字に応じて、以下のストレージツールのいずれかを使ってアクセスできます:
- COPY FILES: ステージングされたファイルを別の場所にコピーします。ファイルがコピーされた後は、以下のツールを使うなどして、一般的なステージングされたファイルのように使用することができます:
- ディレクトリテーブル: WHERE 句を使用してステージ上のファイルリストをクエリし、必要に応じてフィルターをかけます。
- GET_PRESIGNED_URL: URL を@stage/file に生成します。
- 外部ステージ: クラウドプロバイダー APIs を通して Snowflake 外のファイルにアクセスします。
- UDF: 別のクエリでファイルを読み込みます。
手順
1. Excelファイルを用意する
まず、次のようなデータを含む Excelファイルを作成します。
A001 山田太郎 00100
A002 鈴木花子 00200
A003 佐藤一郎 00300
シンプルな例とするため、最初のシートの A1 セルに A001 が書かれている状態にしてください。
2. ステージとテーブルを作成する
内部ステージとロード先テーブルを準備します。
-- サンプルの実行に使うウェアハウス、データベース、スキーマを指定してください。
USE WAREHOUSE COMPUTE_WH;
USE SCHEMA DB.PUBLIC;
-- 生データ用ステージ・ロード用ステージを作成
CREATE OR REPLACE STAGE rowdata_stage
DIRECTORY = ( ENABLE = TRUE );
CREATE OR REPLACE STAGE load_stage
DIRECTORY = ( ENABLE = TRUE );
-- ロード先テーブル
CREATE OR REPLACE TABLE target_tbl (
col1 STRING,
col2 STRING,
col3 NUMBER
);
3. Excelファイルを入力ステージへアップロードする
今回はサンプルのため、Snowsight の UI から Excelファイルをアップロードします。
- Snowsight を開く
- 左メニュー「Catalog」→「Database Explorer」をクリックする
- 対象の DB/Schema 配下の「Stages」→
ROWDATA_STAGEを選択する - 右上「Files」タブで、ローカルの Excelファイルをドラッグ&ドロップする
- 「Upload」をクリックし、ファイル一覧に表示されることを確認する
4. Python UDFでExcelファイルをCSVへ変換する
Snowpark for Python を使い、Excelファイルを CSVファイルに変換する UDF を作成します。
CREATE OR REPLACE FUNCTION convert_to_csv_from_excel(filename string)
RETURNS STRING
LANGUAGE PYTHON
VOLATILE
RUNTIME_VERSION = 3.12
PACKAGES = ('snowflake-snowpark-python', 'pandas', 'openpyxl')
HANDLER = 'main'
AS
$$
from snowflake.snowpark.files import SnowflakeFile
import pandas as pd
def main(filename):
with SnowflakeFile.open(filename, 'rb') as input_file:
df = pd.read_excel(input_file)
with SnowflakeFile.open_new_result("wb") as output_file:
df.to_csv(output_file, index=False)
return output_file
$$;
補足
- ファイル読み込み
-
SnowflakeFile.open(filename, 'rb')でfilenameにスコープ付きURLを指定することで、ステージ上のファイルを読み込みます。
-
-
Pandasによるファイル変換
-
pd.read_excel(input_file)により、Pandasのデフォルト設定でExcelファイルを1シート目1行目A列からヘッダー無しで読み込みます。
-
-
ファイル書き込み
-
SnowflakeFile.open_new_result("wb") as output_fileで書き込み先のファイルオブジェクトを作成することで、output_fileはスコープ付きURLを返します。 -
output_fileが返すスコープ付きURLはSnowflake社が管理する専用領域で、ファイル参照しかできないため、永続化やロード(COPY INTO)を行う場合、COPY FILESコマンドで別のステージへファイルをコピーする必要があります。
-
5. 変換を実行して出力ステージに配置する
次の SQL を実行し、UDF を呼び出して変換処理を行います。
-- スコープドURLを作ってUDFに渡し、結果ファイルを load_stage にコピー
COPY FILES INTO @load_stage FROM
( SELECT convert_to_csv_from_excel(BUILD_SCOPED_FILE_URL(@rowdata_stage, 'example.xlsx')),
'example.csv' );
処理ステップ
-
BUILD_SCOPED_FILE_URL(@rowdata_stage, 'example.xlsx')
@rowdata_stageにあるexample.xlsxファイルのスコープ付きURLを生成 -
convert_to_csv_from_excel
example.xlsxファイルのスコープ付きURLを参照し、CSVファイルへ変換し、CSVファイルのスコープ付きURLを生成 -
COPY FILES INTO @load_stage FROM ( ..., 'example.csv' )
CSVファイルのスコープ付きURLを参照し、@load_stage上にexample.csvという名前でファイルをコピー
6. テーブルへロードする(COPY INTO実行)
変換後のファイルをSnowflakeテーブルへロードします。
COPY INTO target_tbl
FROM @load_stage
;
ロード結果を確認します。
-- 確認
SELECT * FROM target_tbl;
まとめ
この記事では、Snowflakeでロード前に Python(Snowpark + Pandas)を用いて Excelファイルを CSV に変換し、その結果をテーブルへロードする手順を紹介しました。
Snowflakeの UDF を活用すれば、ロード前に軽量な変換処理をサーバーレスで実行できます。
Excel のほか、Snowflakeに対応していない文字コード(EBCDIC等)や固定長ファイル等も Pandas で扱える範囲なら同様の方法が利用可能です。
Snowflakeのロードパイプライン設計やデータ前処理フローの検討に役立ててください。
仲間募集
NTTデータ ソリューション事業本部 では、以下の職種を募集しています。
Snowflake、生成AIを活用したデータ基盤構築/活用支援(Snowflake Data Superheroesとの協働)
Databricks、生成AIを活用したデータ基盤構築/活用支援(Databricks Championとの協働)
プロジェクトマネージャー(データ分析プラットフォームソリューションの企画~開発~導入/生成AI活用)
クラウドを活用したデータ分析プラットフォームの開発(ITアーキテクト/PM/クラウドエンジニア)
ソリューション紹介
Trusted Data Foundationについて
~データ資産を分析活用するための環境をオールインワンで提供するソリューション~
最新のクラウド技術を採用して弊社が独自に設計したリファレンスアーキテクチャ(Datalake+DWH+AI/BI)を顧客要件に合わせてカスタマイズして提供します。
可視化、機械学習、DeepLearningなどデータ資産を分析活用するための環境がオールインワンで用意されており、これまでとは別次元の量と質のデータを用いてアジリティ高くDX推進を実現できます。
TDFⓇ-AM(Trusted Data Foundation - Analytics Managed Service)について
~データ活用基盤の段階的な拡張支援(Quick Start) と保守運用のマネジメント(Analytics Managed)をご提供することでお客様のDXを成功に導く、データ活用プラットフォームサービス~ TDFⓇ-AMは、データ活用をQuickに始めることができ、データ活用の成熟度に応じて段階的に環境を拡張します。プラットフォームの保守運用はNTTデータが一括で実施し、お客様は成果創出に専念することが可能です。また、日々最新のテクノロジーをキャッチアップし、常に活用しやすい環境を提供します。なお、ご要望に応じて上流のコンサルティングフェーズからAI/BIなどのデータ活用支援に至るまで、End to Endで課題解決に向けて伴走することも可能です。
NTTデータとSnowflakeについて
NTTデータとSnowflakeについて
NTTデータでは、Snowflake Inc.とソリューションパートナー契約を締結し、クラウド・データプラットフォーム「Snowflake」の導入・構築、および活用支援を開始しています。
NTTデータではこれまでも、独自ノウハウに基づき、ビッグデータ・AIなど領域に係る市場競争力のあるさまざまなソリューションパートナーとともにエコシステムを形成し、お客さまのビジネス変革を導いてきました。
Snowflakeは、これら先端テクノロジーとのエコシステムの形成に強みがあり、NTTデータはこれらを組み合わせることでお客さまに最適なインテグレーションをご提供いたします。
NTTデータとDatabricksについて
NTTデータは、お客様企業のデジタル変革・DXの成功に向けて、「databricks」のソリューションの提供に加え、情報活用戦略の立案から、AI技術の活用も含めたアナリティクス、分析基盤構築・運用、分析業務のアウトソースまで、ワンストップの支援を提供いたします。
NTT DATA公式アカウントです。 技術を愛するNTT DATAの技術者が、気軽に楽しく発信していきます。 当社のサービスなどについてのお問い合わせは、 お問い合わせフォーム nttdata.com/jp/ja/contact-us/ へお願いします。