🍏

Snowpark Container Services のJobで動かすNextflowプラグイン「nf-snowflake」の紹介

に公開

はじめに

先日、Snowpark Container Services (SPCS) のJobサービスでnextflowを動かすことができるプラグインがあることを教えていただきました。
https://github.com/Snowflake-Labs/nf-snowflake

実際に動かしてみたのですが、なかなか動かなかったためこの記事はデバッグの奮闘日記になります。
完全版ガイドを書いてやろうと思っていたのですが、想像以上に苦戦しているためこれまで試して「こういうことだろう」という確信を得た内容に関して、自分の知識をセーブする意味合いも込めて記事に書き示しておきます・・・

(250721 追記)
先ほど、無事にnf-snowflakeを動かすことに成功いたしました!🎉成功したコマンドは後続に記述しておきます
https://github.com/Snowflake-Labs/nf-snowflake/issues/15

Nextflowとは?

Nextflowは、科学的データ解析におけるワークフロー管理システムとして設計された再現性を担保するためのプラットフォームです。データエンジニア界隈だとAirflowなどワークフローツールに馴染みがあると思いますが、同じようにステップやワークフローを定義できます。このツールは、データフロープログラミングモデルに基づいて構築されており、並列・分散パイプラインの記述を大幅に簡素化し、データの流れと計算に集中する設計となっています。特にバイオインフォマティクス領域において広く採用されており、解析プロセスの自動化と再現性の確保を主要な目的としています。

Henri E. Bal, Jennifer G. Steiner, and Andrew S. Tanenbaum. 1989. Programming languages for distributed computing systems. ACM Comput. Surv. 21, 3 (Sep. 1989), 261–322. https://doi.org/10.1145/72551.72552

Nextflowの最大の特徴は、パイプラインをシンプルで読みやすいGroovy構文で記述できる宣言的構文にあります。研究者は複雑な処理ロジックを考える必要がなく、データの流れを定義するだけでワークフローが自動実行されるデータフロー駆動型の設計となっています。

また、同じパイプラインをローカル環境、クラスター、クラウド環境で変更なく実行できるプラットフォーム非依存性も大きな魅力です。DockerやSingularityとの完全統合により、実行環境の差異を吸収し、どこでも同じ結果を得ることができます。

さらに、失敗したタスクから処理を再開できる復旧機能と、依存関係を自動解析して並列化する機能により、大規模なデータ処理でも効率的に実行することが可能です。

// シンプルなNextflowパイプラインの例
process sayHello {
    input:
    val name
    
    output:
    stdout
    
    script:
    """
    echo "Hello, ${name}!"
    """
}

workflow {
    names = Channel.of('World', 'Nextflow', 'Snowflake')
    sayHello(names) | view
}

科学技術研究はとにかく 研究結果の再現性を担保する ことが大切です。データエンジニアリングの文脈でいうと冪等性を担保することと似ています。論文化した後、論文を読んだ研究者が同様の手順を踏めば同じ結果が出てくるような研究になっていなければ、科学としての進捗は得られなかったと等しくなります。生命科学系においても再現性を担保する仕組みが必要で、Nextflowのようなワークフローツールを効果的に使う必要があります。

nf-coreの魅力

nf-coreは、Nextflowコミュニティが開発・維持する高品質なパイプラインコレクションです。現在70以上のパイプラインが公開されており、生命科学の様々な解析に対応しています。

https://nf-co.re/

nf-coreプロジェクトの最大の価値は、厳格な品質保証にあります。全てのパイプラインは統一された品質基準をクリアする必要があり、CI/CDパイプラインによる継続的な自動テストが実施されています。また、コミュニティによる厳格なピアレビュープロセスを経ることで、高い品質が維持されています。

標準化も重要な価値の一つです。全パイプラインで一貫したパラメータ体系を採用しており、設定ファイル生成やパラメータ検証などの共通ツールが提供されています。詳細な使用方法とトラブルシューティングガイドも整備されており、研究者は迷うことなく解析を開始できます。

現在nf-coreでは70以上の豊富なパイプラインが公開されており、生命科学の様々な解析分野をカバーしています。またNextflowで動かすpipelineの多くがクラウドで動作させているらしく、nf-coreのパイプラインをSnowflakeで動かすことができれば多くの生命科学系のパイプラインをSnowflake上で稼働させることが可能となります。

以下より、実際の使い方を解説していきます。

前提条件

ローカル環境では、Nextflow 23.10以降のバージョンとlinux/amd64プラットフォーム対応のDocker環境が必要です。また、nf-snowflakeプラグインのバージョン0.6.3を使用することを前提としています。

また取り組むにあたりSPCSのJob機能の知識が必要になりますが、下記ハンズオンが大変わかりやすくキャッチアップできました。
https://docs.snowflake.com/en/developer-guide/snowpark-container-services/tutorials/tutorial-2

Snowflake側の準備

以下の SQL スクリプトを順に実行することで、Compute Pool、データベース・スキーマ、ステージ、Image Repository、ネットワークルール、外部アクセス統合をまとめて準備し、最後に Nextflow パイプラインを Job Service として起動できます。

Nextflowはデフォルトではプロセスは 1 つの CPU を要求します。CPU_X64_XS には vCPU が 1 つしかありません。そのため、CPU_X64_S以上でないと、スケジューラはコンピューティングプールに十分な CPU リソースがないと判断する可能性がありますのでご注意ください。

CREATE COMPUTE POOL NEXTFLOW_CP
  MIN_NODES = 1
  MAX_NODES = 1
  INSTANCE_FAMILY = CPU_X64_M;


CREATE OR REPLACE DATABASE tutorial_db;
USE DATABASE tutorial_db;

CREATE OR REPLACE SCHEMA data_schema;
CREATE IMAGE REPOSITORY IF NOT EXISTS tutorial_repository;

ステージを3つ用意します。

CREATE OR REPLACE STAGE NXF_INPUT ENCRYPTION=(TYPE='SNOWFLAKE_SSE');
CREATE OR REPLACE STAGE NXF_RESULTS  ENCRYPTION=(TYPE='SNOWFLAKE_SSE');
CREATE OR REPLACE STAGE NXF_WORKDIR ENCRYPTION=(TYPE='SNOWFLAKE_SSE');

それぞれ以下の役割があります。

ステージ 用途 マウントパス 詳細
NXF_INPUT 読み取り専用データ /mnt/input パイプラインが参照する元データ(FASTQ・VCF・設定ファイルなど)を配置。書き込みは行わない。
NXF_RESULTS 最終成果物 /mnt/output 完了後に生成されるレポートや BAM/VCF などを publishDir でコピー。後段解析や共有に利用。
NXF_WORKDIR 一時作業領域 /mnt/workdir 各プロセスの .command.* や中間ファイルを保持。-work-dir で指定され、-resume でも参照。実行時は cd /mnt/workdir/<prefix>/<hash> へ移動して処理を実行。

Nextflow の Job Service ではデフォルトで /mnt/input/mnt/output/mnt/workdir にマウントされ、プロセスは -work-dir でこれらを参照します。

外部のプラグインのロードを必要とするため、NETWORK RULEとEXTERNAL ACCESS INTEGRATIONを作成する必要があります。

CREATE NETWORK RULE nf_nextflow_rule
  TYPE = HOST_PORT
  MODE = EGRESS          -- 外向き通信
  VALUE_LIST = (
      'www.nextflow.io:443',          -- Nextflow 自身のチェック
      'github.com:443',               -- プラグイン INDEX
      'raw.githubusercontent.com:443',-- release メタ
      'objects.githubusercontent.com:443'
  );
  
CREATE EXTERNAL ACCESS INTEGRATION nf_nextflow_eai
  ALLOWED_NETWORK_RULES = (nf_nextflow_rule)
  ENABLED = TRUE;

Nextflowパイプラインの準備

ここからはNextflow側の準備をしていきます。
Nextflowファイルであるmain.nfを作成していきます。今回は試しにシンプルなHello Worldパイプラインを作成します。

process HELLO {
  container '/TUTORIAL_DB/DATA_SCHEMA/TUTORIAL_REPOSITORY/hello-worker:latest'
  output:
    stdout
  '''
  echo "🎉  Hello Snowflake & Nextflow!"
  '''
}

workflow { 
  HELLO() 
}

nf-snowflakeプラグインの設定をnextflow.configに追記していきます。

plugins {
  id 'nf-snowflake@0.6.3'
}

profiles {
  snowflake {
    process.executor = 'snowflake'
    snowflake.computePool = 'NEXTFLOW_CP'
    snowflake.stageMounts = 'NXF_RESULTS:/mnt/results,NXF_INPUT:/mnt/input'
    snowflake.workDirStage = 'NXF_WORKDIR'
  }
}

config キー解説
snowflake.computePool — 先ほど作成した Compute Pool 名。
snowflake.stageMountsstageName:mountPath のカンマ区切りリスト。inputとresults(output)のステージ名をマウントします。
snowflake.workDirStage.command.* や中間ファイルを置くワークディレクトリ用ステージ。-work-dir オプションで /mnt/workdir を指定する場所です。

pluginsのidは、最新のリリースタグを使ってください。
https://github.com/Snowflake-Labs/nf-snowflake/releases

Dockerイメージの準備

Nextflowのjob serviceは二種類のDocker imageを用います。Docker イメージの役割は以下です。

Worker イメージ
解析ツール(salmon / fastqc / multiqc など)を含むコンテナで、Nextflow の各 process が実際の計算を行う際に起動されます。プロセスごとに複数インスタンスがスケーリングされるため、依存ライブラリをまとめてビルドしつつもイメージサイズを抑えることが重要です。本記事では Micromamba を用いて Conda パッケージを層の少ない形で導入しています。

Main イメージ
Nextflow ランタイムと nf-snowflake プラグインを含むコンテナで、パイプライン全体をオーケストレーションする “指揮者” 的役割を担います。nextflow run を実行し、Snowflake 上に Worker ジョブ(Job Service)をサブミットします。Main は 1 インスタンスのみ起動されるため、サイズよりも互換性と安定動作を優先し、公式の ghcr.io/snowflake-labs/nf-snowflake イメージをベースにしています。

Worker用Dockerfile

# Dockerfile.worker
FROM mambaorg/micromamba

RUN \
   micromamba install -y -n base -c defaults -c bioconda -c conda-forge \
      salmon=1.10.2 \
      fastqc=0.12.1 \
      multiqc=1.17 \
      python=3.11 \
      typing_extensions \
      importlib_metadata \
      procps-ng \
   && micromamba clean -a -y

ENV PATH="$MAMBA_ROOT_PREFIX/bin:$PATH"
USER root

Main用Dockerfile

# Dockerfile.main
FROM ghcr.io/snowflake-labs/nf-snowflake:latest

COPY . /pipeline
WORKDIR /pipeline 
SPCS用のイメージのビルドとイメージレジストリへのプッシュの手順はこちら
  1. Snowflakeレジストリにログイン
docker login <your-account>.registry.snowflakecomputing.com \
  -u <SNOWFLAKE_USER> -p '<PASSWORD>'
  1. Worker イメージのビルド・プッシュ
# Worker イメージ
docker build --no-cache --rm --platform linux/amd64 \
  -f Dockerfile.worker \
  -t <your-account>.registry.snowflakecomputing.com/tutorial_db/data_schema/tutorial_repository/hello-worker:latest .

docker push <your-account>.registry.snowflakecomputing.com/tutorial_db/data_schema/tutorial_repository/hello-worker:latest
  1. Main イメージのビルド・プッシュ
# Main イメージ
docker build --no-cache --rm --platform linux/amd64 \
  -f Dockerfile.main \
  -t <your-account>.registry.snowflakecomputing.com/tutorial_db/data_schema/tutorial_repository/nxf-main:latest .

docker push <your-account>.registry.snowflakecomputing.com/tutorial_db/data_schema/tutorial_repository/nxf-main:latest

このあたりの操作の説明は割愛します。下記記事に書きました。
https://zenn.dev/dataheroes/articles/3437ca8c0eddfc

ジョブの実行

いよいよジョブを実行していきます。EXECUTE JOB SERVICE コマンドにて実行できます。

EXECUTE JOB SERVICE
  IN COMPUTE POOL NEXTFLOW_CP
  NAME = HELLO_NXF
  EXTERNAL_ACCESS_INTEGRATIONS = (nf_nextflow_eai)
  FROM SPECIFICATION $$
  spec:
      container:
      - name: main
        image: /TUTORIAL_DB/DATA_SCHEMA/TUTORIAL_REPOSITORY/nxf-main:latest
        volumeMounts:
        - name: input
          mountPath: /mnt/input
        - name: output
          mountPath: /mnt/output
        - name: workdir
          mountPath: /mnt/workdir
        command:
        - nextflow
        - run
        - .
        - -profile
        - snowflake
        - -work-dir
        - /mnt/workdir
      volumes:
      - name: input
        source: "@NXF_INPUT"
      - name: output
        source: "@NXF_RESULTS"
      - name: workdir
        source: "@NXF_WORKDIR"
  $$;

それぞれの句が何をしているかを上から順に解説します。

  1. EXECUTE JOB SERVICE 句
EXECUTE JOB SERVICE
  IN COMPUTE POOL NEXTFLOW_CP
  NAME = HELLO_NXF
  EXTERNAL_ACCESS_INTEGRATIONS = (nf_nextflow_eai)
要素 役割
EXECUTE JOB SERVICE Job Service(長時間実行・再起動可能なサービス)を起動する DDL。
IN COMPUTE POOL NEXTFLOW_CP ジョブを動かす Compute Poolを指定。
NAME = HELLO_NXF この Job Service の論理名。SHOW JOB SERVICES などで識別する際に使う。
EXTERNAL_ACCESS_INTEGRATIONS = (nf_nextflow_eai) コンテナが外部ネットワーク(PyPI、docker hub など)へアクセスする必要がある場合に許可する External Access Integration。先ほど作成した nf_nextflow_eai を使います。
  1. FROM SPECIFICATION 

Snowflake SQL 内に YAML 形式で Spec を埋め込む部分です。

2.1 container セクション

container:
  - name: main
    image: /TUTORIAL_DB/DATA_SCHEMA/TUTORIAL_REPOSITORY/nxf-main:latest

/TUTORIAL_DB/DATA_SCHEMA/TUTORIAL_REPOSITORY/nxf-main:latest
Snowflake 内Image Repository から取得する OCI イメージ。Snowflake ステージと同様に「<db>/<schema>/<repo>/<image>:<tag>」パスで指定します。
ここには Nextflow 実行環境(Java, nf-snowflake plugin など)が焼き込まれています。

2.2 volumeMounts / volumes

    volumeMounts:
    - name: input
      mountPath: /mnt/input
    - name: output
      mountPath: /mnt/output
    - name: workdir
      mountPath: /mnt/workdir
volumes:
  - name: input
    source: "@NXF_INPUT"
  - name: output
    source: "@NXF_RESULTS"
  - name: workdir
    source: "@NXF_WORKDIR"
名称 役割
input @NXF_INPUT という ステージ(またはマテリアライズド ステージ)がバインドされ、コンテナ内では /mnt/input に見える。パイプラインの入力ファイルが自動アップロードされる。
output /mnt/output@NXF_RESULTS ステージへ。Nextflow 実行後の成果物(結果ファイル)をこのステージに保存。
workdir Nextflow のワークディレクトリ /mnt/workdir と Snowflake ステージ @NXF_WORKDIR を接続。リスタート時にチェックポイントや .nextflow メタ情報を保持する。

2.3 command

command:
  - nextflow
  - run
  - .
  - -profile
  - snowflake
  - -work-dir
  - /mnt/workdir
  • nextflow run . : イメージ内の カレントディレクトリ(.)にある main.nf を実行。
  • -profile snowflake : nf-snowflake plugin が提供する Snowflake 用プロファイルを有効化。これにより
    • executor = snowflake
    • staging, workdir, results のマップ
    • 認証用シークレット自動注入
      などが設定されます。
  • -work-dir /mnt/workdir : ワークディレクトリを先ほどマウントした /mnt/workdir(Snowflake ステージ)に固定。

実行が終わると、workDirに指定したステージにNextflowのタスクが出来上がります。

Nextflow実行結果は、下記コマンドでログを確認できます。

SELECT SYSTEM$GET_SERVICE_LOGS('HELLO_NXF', 0, 'main');

エラーがでた

SELECT SYSTEM$GET_SERVICE_LOGS('HELLO_NXF', 0, 'main'); の実行結果として、Hello,worldは出ておらずエラーがでました。

Nextflow 25.04.6 is available - Please consider updating your version to it

 N E X T F L O W   ~  version 24.04.1

Launching `./main.nf` [pedantic_brazil] DSL2 - revision: 0afd7a9fca

[-        ] HELLO -

[-        ] HELLO | 0 of 1

executor >  snowflake (1)
[ee/9b9537] HELLO | 0 of 1

executor >  snowflake (1)
[ee/9b9537] HELLO | 0 of 1

executor >  snowflake (1)
[ee/9b9537] HELLO | 0 of 1

executor >  snowflake (1)
[ee/9b9537] HELLO | 0 of 1
ERROR ~ Error executing process > 'HELLO'

Caused by:
  Process `HELLO` terminated with an error exit status (1)


Command executed:

  echo "🎉  Hello Snowflake & Nextflow!"

Command exit status:
  1

Command output:
  /bin/bash: .command.run: No such file or directory

Command error:
  Job PEDANTIC_BRAZIL_HELLO failed to complete. Exited with status: FAILED.

Work dir:
  /mnt/workdir/ee/9b95376c8cd0c9297517c17edbe06c

Tip: when you have fixed the problem you can continue the execution adding the option `-resume` to the run command line

 -- Check '.nextflow.log' file for details

executor >  snowflake (1)
[ee/9b9537] HELLO | 1 of 1, failed: 1 ✘
ERROR ~ Error executing process > 'HELLO'

Caused by:
  Process `HELLO` terminated with an error exit status (1)


Command executed:

  echo "🎉  Hello Snowflake & Nextflow!"

Command exit status:
  1

Command output:
  /bin/bash: .command.run: No such file or directory

Command error:
  Job PEDANTIC_BRAZIL_HELLO failed to complete. Exited with status: FAILED.

Work dir:
  /mnt/workdir/ee/9b95376c8cd0c9297517c17edbe06c

Tip: when you have fixed the problem you can continue the execution adding the option `-resume` to the run command line

 -- Check '.nextflow.log' file for details

特に下記部分ですが、nf-snowflakeのコードを読んでみたところ、どうやら各Nextflowセッションごとに生成される一意の識別子であるRunNameがSpec生成の際に含まれておらず、作業ディレクトリパスが一致しないことが原因として考えられそうです。

Command output:
  /bin/bash: .command.run: No such file or directory

こちらのPRでだされたSpec生成ロジックがうまく起動していないようです。実装者はPrivateで使っているCLIからnf-snowflakeを実行しているらしく、この問題に当たらないようです(早くCLIが公開されることを願うばかりです。)
https://github.com/Snowflake-Labs/nf-snowflake/pull/9/files#diff-700436d38c12bd9638f3957528e4e3df7b6d25c0e99a73fbeb53f17475185cd0R13

成功したので追記

開発者の方に、nextflow実行コマンドは、-nameオプションで実行名を上書きできることを教えてもらいました。

一意の識別子RunNameを固定したところ無事にnf-snowflakeを動かすことに成功いたしました!

EXECUTE JOB SERVICE
  IN COMPUTE POOL NEXTFLOW_CP
  NAME = HELLO_NXF
  EXTERNAL_ACCESS_INTEGRATIONS = (nf_nextflow_eai)
  FROM SPECIFICATION $$
  spec:
      container:
      - name: main
        image: /TUTORIAL_DB/DATA_SCHEMA/TUTORIAL_REPOSITORY/nxf-main:latest
        volumeMounts:
        - name: input
          mountPath: /mnt/input
        - name: output
          mountPath: /mnt/output
        - name: workdir
          mountPath: /mnt/workdir
        command:
        - nextflow
        - run
        - .
        - -name
        - runABC
        - -profile
        - snowflake
        - -work-dir
        - /mnt/workdir
      volumes:
      - name: input
        source: "@NXF_INPUT"
      - name: output
        source: "@NXF_RESULTS"
      - name: workdir
        source: "@NXF_WORKDIR/runABC/"
  $$;

commandをnextflow run . -name runABC -profile snowflakeに-nameオプションをつけ、valumesのworkdirのsourceに"@NXF_WORKDIR/runABC/"を指定しました。

Nextflow 25.04.6 is available - Please consider updating your version to it

 N E X T F L O W   ~  version 24.04.1

Launching `./main.nf` [runABC] DSL2 - revision: 3517c20af0

[-        ] HELLO -

[-        ] HELLO | 0 of 1

executor >  snowflake (1)
[68/6b716c] HELLO | 0 of 1

executor >  snowflake (1)
[68/6b716c] HELLO | 0 of 1

executor >  snowflake (1)
[68/6b716c] HELLO | 0 of 1

executor >  snowflake (1)
[68/6b716c] HELLO | 0 of 1

executor >  snowflake (1)
[68/6b716c] HELLO | 0 of 1

executor >  snowflake (1)
[68/6b716c] HELLO | 0 of 1

executor >  snowflake (1)
[68/6b716c] HELLO | 0 of 1

executor >  snowflake (1)
[68/6b716c] HELLO | 1 of 1 ✔

executor >  snowflake (1)
[68/6b716c] HELLO | 1 of 1 ✔
Completed at: 21-Jul-2025 03:51:19
Duration    : 1m 20s
CPU hours   : (a few seconds)
Succeeded   : 1

作成されたサービスの結果を見てみます。

SELECT SYSTEM$GET_SERVICE_LOGS('runABC_HELLO', 0, 'main');

main.tfで定義した、🎉 Hello Snowflake & Nextflow!が実行されていますね!

失敗原因

問題は、nf-snowflake によって作成されたサービスを削除していなかったことのようです。

Command error:
  SQL compilation error:
  Object 'RUNABC_HELLO' already exists.

最初に DROP SERVICE RUNABC_HELLO;を実行した後、ジョブは正常に完了しました。

まとめ

いろいろと苦労しましたが、無事にnf-snowflakeを動かすことができました。
Snowpark Container Servicesを活用することで、Snowflake上でNextflowパイプラインを実行することが可能になります。この組み合わせは、需要に応じてリソースを自動スケールするスケーラビリティと、使用した分だけの課金というコスト効率性を実現します。また、Snowflakeの強固なセキュリティ機能を活用でき、Snowflake内のデータと直接連携できるという統合性も大きな魅力です。実際にNextflowがSnowflakeで動くようになったらfastqファイルの処理やバリアントコールなどをやってみて記事にしていこうと思います。

参考資料

GitHubで編集を提案
Snowflake Data Heroes

Discussion