Closed3
AirflowでのWebHDFS操作
概要
- AirflowからWebHDFSでHDFS操作を行いたい
- apache-airflow-providers-apache-hdfsのパッケージを入れれば導入できる
- ただ、他のairflow providerパッケージと違い、WebHDFSにはOperatorがない
- Operatorがない場合は、Hookを使うのが定番だが、WebHDFSHookに用意されている関数が少ない
-
check_for_path
、load_file
、read_file
くらいしかない - たとえばファイルの削除やコピーを行いたい場合、それに対応する関数がWebHDFSHookにはない
-
ファイル削除やコピーをどのように実現するか
-
WebHDFSHookのドキュメントを読むと、
This class is a wrapper around the hdfscli library.
と書いてある- hdfscliはWebHDFS操作を行うpythonのライブラリ
- また、
load_file
の実装を見てみるとself.get_conn()
でhdfscliのクライアントを取得した後、hdfscliのuploadを実行している- hdfscliクライアントの初期化は
self.get_conn()
内で上手いことやってくれていそう
- hdfscliクライアントの初期化は
- なので、実行したいHDFS操作に対応する関数がhdfscliにあれば、比較的簡単にWebHDFSHookに実装されていないHDFS操作を実現できそう
ファイル削除を実装するならこういう形
hdfscliにdeleteがあるので、これを使ってファイル削除を行うOperatorを実装してみる
Hookが良い場合は、WebHDFSHookを継承して関数を追加すれば良さそう
from airflow.models.baseoperator import BaseOperator
from airflow.providers.apache.hdfs.hooks.webhdfs import WebHDFSHook
class DeleteHdfsFileOperator(BaseOperator):
template_fields: Sequence[str] = ("hdfs_path",)
def __init__(
self,
webhdfs_conn_id: str,
hdfs_path: str,
recursive: bool = False,
skip_trash: bool = True,
**kwargs
) -> None:
super().__init__(**kwargs)
self.webhdfs_conn_id = webhdfs_conn_id
self.hdfs_path = hdfs_path
self.recursive = recursive
self.skip_trash = skip_trash
def execute(self, context):
hook = WebHDFSHook(self.webhdfs_conn_id)
conn = hook.get_conn()
conn.delete(self.hdfs_path, self.recursive, self.skip_trash)
ログやエラー処理まで考慮できていないので、そこは実際に動かしながら対応する
このスクラップは3ヶ月前にクローズされました