AirflowでFTPを使用してファイルを取得する
株式会社WEDでデータエンジニアをしているthimi0412こと清水です。
AirflowでFTPしたい
データを他企業からダウンロードする際にFTPを使用する必要があり、整形してBigQueryのテーブルにappendしていく必要があったのでAirflowでFTPを使用してファイルを取得しました。(FTPがセキュリティ的にリスクがある話は置いておいて)
やりたい処理
- FTPで対象のファイルを取得
- 取得したファイルをGCSに保存
- GCSに保存したファイルをBigQueryに入れる ← ここは今回は触れない
Airflowのprovidersを見てみる
AirflowにはPythonOperator
が存在して、その中でFTPでファイルを取得する処理を記述すれば動いてくれます。それでもいいですがProviders
にはBigQueryToGCSOperator
(BigQueryからGCSに転送)やGCSToS3Operator
(GCSからS3に転送)といった便利なOperatorが用意されていて、使い方も簡単なのでやりたいことが出来そうなOperatorを毎回探して開発をしています。
OperatorがなかったのでPythonOperatorで動かす
SFTPToGCSOperator
と言うSFTPを使うならOperatorがありましたがFTPはないようです。(分からなくもない)
幸いなことにFTPHook
が用意されているのでこれをPythonOperatorの中で使用してファイルを取得します。そしてGCSHook
を使用してFTPで取得したデータをGCSにアップロードします。
Hook
は各Providerに用意されていて外部プラットフォームのAPIを簡単に使用できるものです。
Connectionを準備する
AirflowのUIにあるAdmin/Connections
からConnectionを作成します。
入力する項目は以下
- Connection Id: FTPHookで指定する用
- Connection Type: FTPを指定
- Host: ホスト名
- Login: ログインID
- Password: パスワード
入力してSaveします。Testのボタンを押すと接続テストができます。
FTPHookを使う
FTPHook
の ftp_conn_id
に先ほど作成したConnectionのidを入れます。
ディレクトリの一覧とファイルのダウンロードを今回はおこなっていますが、ディレクトリの作成や削除などのメソッドも用意されています。
import logging
from typing import List
from airflow.providers.google.cloud.hooks.gcs import GCSHook
ftp_hook: FTPHook = FTPHook(ftp_conn_id="ftp_hogehuga") # Admin/Connectionsに記載
# ディレクトリの一覧を取得
file_list: List = ftp_hook.list_directory("./")
logging.info(file_list)
# 対象のファイルをカレントディレクトリにダウンロード
TARGET_FILE = "hogehuga.csv"
ftp_hook.retrieve_file(f"./{TARGET_FILE}", f"./{TARGET_FILE}")
FTPでファイルをダウンロードしてGCSにアップロード
GCSHook
の使い方やスケジュールの設定は割愛しますが比較的少ないコード&簡単に実装ができたので皆様もぜひProviders
を活用してください。
使用したコード
import logging
import os
from datetime import timedelta
from typing import List
import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.providers.ftp.hooks.ftp import FTPHook
from airflow.providers.google.cloud.hooks.gcs import GCSHook
TARGET_FILE = "hogehuga.csv"
def transfer_ftp_to_gcs(**context) -> None:
ftp_hook: FTPHook = FTPHook(ftp_conn_id="ftp_hogehuga") # Admin/Connectionsに記載
file_list: List = ftp_hook.list_directory("./")
logging.info(file_list)
ftp_hook.retrieve_file(f"./{TARGET_FILE}", f"./{TARGET_FILE}")
logging.info(os.listdir("./"))
gcs_hook: GCSHook = GCSHook()
bucket_name: str = "<BUCKET_NAME>"
bucket_path: str = "<UPLOAD_FILE_PATH>"
local_file_path: str = f"./{TARGET_FILE}"
gcs_hook.upload(bucket_name, bucket_path, filename=local_file_path)
os.remove(local_file_path)
with DAG(
"transfer_ftp_to_gcs",
default_args=default_args,
description="FTPでデータを取得してGCSにアップロードする",
schedule_interval=None,
dagrun_timeout=timedelta(minutes=20),
catchup=False,
) as dag:
task_transfer_ftp_to_gcs = PythonOperator(
task_id="transfer_ftp_to_gcs",
python_callable=transfer_ftp_to_gcs,
provide_context=True,
dag=dag,
)
Discussion