🌀

AirflowでFTPを使用してファイルを取得する

2022/12/28に公開

株式会社WEDでデータエンジニアをしているthimi0412こと清水です。

AirflowでFTPしたい

データを他企業からダウンロードする際にFTPを使用する必要があり、整形してBigQueryのテーブルにappendしていく必要があったのでAirflowでFTPを使用してファイルを取得しました。(FTPがセキュリティ的にリスクがある話は置いておいて)

やりたい処理

  1. FTPで対象のファイルを取得
  2. 取得したファイルをGCSに保存
  3. GCSに保存したファイルをBigQueryに入れる ← ここは今回は触れない

Airflowのprovidersを見てみる

AirflowにはPythonOperatorが存在して、その中でFTPでファイルを取得する処理を記述すれば動いてくれます。それでもいいですがProvidersにはBigQueryToGCSOperator(BigQueryからGCSに転送)やGCSToS3Operator(GCSからS3に転送)といった便利なOperatorが用意されていて、使い方も簡単なのでやりたいことが出来そうなOperatorを毎回探して開発をしています。

OperatorがなかったのでPythonOperatorで動かす

SFTPToGCSOperatorと言うSFTPを使うならOperatorがありましたがFTPはないようです。(分からなくもない)

幸いなことにFTPHookが用意されているのでこれをPythonOperatorの中で使用してファイルを取得します。そしてGCSHookを使用してFTPで取得したデータをGCSにアップロードします。

Hookは各Providerに用意されていて外部プラットフォームのAPIを簡単に使用できるものです。

Connectionを準備する

AirflowのUIにあるAdmin/ConnectionsからConnectionを作成します。

入力する項目は以下

  • Connection Id: FTPHookで指定する用
  • Connection Type: FTPを指定
  • Host: ホスト名
  • Login: ログインID
  • Password: パスワード

入力してSaveします。Testのボタンを押すと接続テストができます。

FTPHookを使う

FTPHookftp_conn_id に先ほど作成したConnectionのidを入れます。

ディレクトリの一覧とファイルのダウンロードを今回はおこなっていますが、ディレクトリの作成や削除などのメソッドも用意されています。

import logging
from typing import List

from airflow.providers.google.cloud.hooks.gcs import GCSHook

ftp_hook: FTPHook = FTPHook(ftp_conn_id="ftp_hogehuga")  # Admin/Connectionsに記載
# ディレクトリの一覧を取得
file_list: List = ftp_hook.list_directory("./")
logging.info(file_list)

# 対象のファイルをカレントディレクトリにダウンロード
TARGET_FILE = "hogehuga.csv"
ftp_hook.retrieve_file(f"./{TARGET_FILE}", f"./{TARGET_FILE}")

FTPでファイルをダウンロードしてGCSにアップロード

GCSHookの使い方やスケジュールの設定は割愛しますが比較的少ないコード&簡単に実装ができたので皆様もぜひProvidersを活用してください。

使用したコード

import logging
import os
from datetime import timedelta
from typing import List

import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.providers.ftp.hooks.ftp import FTPHook
from airflow.providers.google.cloud.hooks.gcs import GCSHook

TARGET_FILE = "hogehuga.csv"


def transfer_ftp_to_gcs(**context) -> None:
    ftp_hook: FTPHook = FTPHook(ftp_conn_id="ftp_hogehuga")  # Admin/Connectionsに記載
    file_list: List = ftp_hook.list_directory("./")
    logging.info(file_list)

    ftp_hook.retrieve_file(f"./{TARGET_FILE}", f"./{TARGET_FILE}")
    logging.info(os.listdir("./"))

    gcs_hook: GCSHook = GCSHook()

    bucket_name: str = "<BUCKET_NAME>"
    bucket_path: str = "<UPLOAD_FILE_PATH>"
    local_file_path: str = f"./{TARGET_FILE}"

    gcs_hook.upload(bucket_name, bucket_path, filename=local_file_path)
    os.remove(local_file_path)


with DAG(
    "transfer_ftp_to_gcs",
    default_args=default_args,
    description="FTPでデータを取得してGCSにアップロードする",
    schedule_interval=None,
    dagrun_timeout=timedelta(minutes=20),
    catchup=False,
) as dag:
    task_transfer_ftp_to_gcs = PythonOperator(
        task_id="transfer_ftp_to_gcs",
        python_callable=transfer_ftp_to_gcs,
        provide_context=True,
        dag=dag,
    )
WED Engineering Blog

Discussion