Closed3

AirflowでのWebHDFS操作

KogaKoga

概要

  • AirflowからWebHDFSでHDFS操作を行いたい
  • apache-airflow-providers-apache-hdfsのパッケージを入れれば導入できる
  • ただ、他のairflow providerパッケージと違い、WebHDFSにはOperatorがない
  • Operatorがない場合は、Hookを使うのが定番だが、WebHDFSHookに用意されている関数が少ない
    • check_for_pathload_fileread_fileくらいしかない
    • たとえばファイルの削除やコピーを行いたい場合、それに対応する関数がWebHDFSHookにはない
KogaKoga

ファイル削除やコピーをどのように実現するか

  • WebHDFSHookのドキュメントを読むと、This class is a wrapper around the hdfscli library.と書いてある
    • hdfscliはWebHDFS操作を行うpythonのライブラリ
  • また、load_fileの実装を見てみるとself.get_conn()でhdfscliのクライアントを取得した後、hdfscliのuploadを実行している
    • hdfscliクライアントの初期化は self.get_conn()内で上手いことやってくれていそう

https://github.com/apache/airflow/blob/providers-apache-hdfs/4.5.0/airflow/providers/apache/hdfs/hooks/webhdfs.py#L157-L161

  • なので、実行したいHDFS操作に対応する関数がhdfscliにあれば、比較的簡単にWebHDFSHookに実装されていないHDFS操作を実現できそう
KogaKoga

ファイル削除を実装するならこういう形

hdfscliにdeleteがあるので、これを使ってファイル削除を行うOperatorを実装してみる
Hookが良い場合は、WebHDFSHookを継承して関数を追加すれば良さそう

from airflow.models.baseoperator import BaseOperator
from airflow.providers.apache.hdfs.hooks.webhdfs import WebHDFSHook

class DeleteHdfsFileOperator(BaseOperator):
    template_fields: Sequence[str] = ("hdfs_path",)

    def __init__(
        self,
        webhdfs_conn_id: str,
        hdfs_path: str,
        recursive: bool = False,
        skip_trash: bool = True,
        **kwargs
    ) -> None:
        super().__init__(**kwargs)
        self.webhdfs_conn_id = webhdfs_conn_id
        self.hdfs_path = hdfs_path
        self.recursive = recursive
        self.skip_trash = skip_trash

    def execute(self, context):
        hook = WebHDFSHook(self.webhdfs_conn_id)
        conn = hook.get_conn()
        conn.delete(self.hdfs_path, self.recursive, self.skip_trash)

ログやエラー処理まで考慮できていないので、そこは実際に動かしながら対応する

このスクラップは3ヶ月前にクローズされました