🦔

Nextflowの勉強1

に公開

はじめに

https://www.nextflow.io/

Nextflowは、主にバイオインフォマティクス領域で使用される、解析パイプラインの記述・実行・管理を行うためのワークフローツールです。複数の解析ステップを自動化し、ヒューマンエラーを防ぎながら、効率的に解析を進めることを目的としています。

Nextflowの主な特徴は以下の通りです。

  • 複雑な解析ステップを、明確に定義されたフローに従って自動的に実行できる
  • 大量のデータやサンプルに対して並列処理を自動で行い、計算資源を有効活用できる
  • ローカルPC、クラウド、HPC環境など、幅広いプラットフォームで動作する
  • コードの再現性が高く、他者との共有や長期的な保守に適している

Nextflowでは、基本的に以下の要素でパイプラインを構築します。

  • process
    個々の解析ステップ(コマンド実行単位)を定義する単位です。

  • workflow
    定義した process をどのような順序・関係性で実行するかを記述するセクションです。

  • channel
    データ(ファイル名、パラメータなど)を process 間で受け渡すための仕組みです。

  • params
    各種設定値(データディレクトリ、オプションパラメータなど)を記録するための変数です。

これらを組み合わせることで、柔軟かつ拡張性の高い解析パイプラインを表現できます。

多機能すぎるNextflowについて、簡単にではありますが学んだ内容を、メモとしてここに記録しておきます。最終的にFastqファイルをFastpで処理して、結果のSummaryをMultiQCで取得することをゴールとしています。

Nextflowのインストール

Nextflowを最もシンプルに導入するには、以下の手順を実行します:

  1. Java 11 以上がインストールされていることを確認
  2. curl -s https://get.nextflow.io | bash を実行
# Javaのバージョンの確認
java --version

# Nextflowのインストール
curl -s https://get.nextflow.io | bash
N E X T F L O W
version 24.10.6 build 5937
created 23-04-2025 16:53 UTC (24-04-2025 01:53 JDT)
cite doi:10.1038/nbt.3820
http://nextflow.io

また、Biocondaを使って以下のようにインストールすることも可能です。

# Miniforge3のインストール
wget "https://github.com/conda-forge/miniforge/releases/download/24.11.3-2/Miniforge3-24.11.3-2-Linux-x86_64.sh"
bash Miniforge3-24.11.3-2-Linux-x86_64.sh -b -p $PWD/miniforge3

# 現在のシェル環境にconda/mambaの設定を反映
. ${PWD}/miniforge3/etc/profile.d/conda.sh
. ${PWD}/miniforge3/etc/profile.d/mamba.sh

# 仮想環境の有効化
mamba activate

# 仮想環境の作成とパッケージのインストール(例:nextflow, seqkit, fastp, multiqc, pigz)
mamba create -n nextflow_env -c bioconda nextflow seqkit fastp multiqc pigz

# 仮想環境の有効化
mamba activate nextflow_env
# バージョン確認
nextflow -v
# nextflow version 24.10.6.5937

また、バイナリファイルを直接ダウンロードして使用することもできます。

# バイナリのダウンロード
wget https://github.com/nextflow-io/nextflow/releases/download/v24.10.6/nextflow-24.10.6-dist

# 実行権限の付与
chmod +x nextflow-24.10.6-dist
# バージョン確認
./nextflow-24.10.6-dist -v
# nextflow version 24.10.6.5937

上記のいずれかの方法でNextflowをインストールできます。

nextflowでデータ処理

冒頭でも簡単に触れましたが、ここでNextflowの構成要素について改めて整理します。

Nextflowの構成要素(Process, Channel, Workflow)

  • process:

    • プロセスでは、実行するタスクを定義します。実行スクリプトは、bash、Perl、Ruby、Python、Rなど、Linux上で動作する任意のスクリプト言語で記述可能です。
    • 各入力セットに対してタスクが生成され、それぞれのタスクは独立して実行されます。他のタスクとの直接のやり取りはできません。
    • プロセス間でデータを受け渡すには、非同期キューであるChannelを使用します。
    • プロセスでは、タスクの入力・処理内容・出力を定義します。
  • Channel:

    • チャネルは、あるプロセスから次のプロセスへのデータの流れを定義します。
    • ファイルや値など、複数の要素を保持可能です。
    • チャネルに保持される要素の数が、それを入力とするプロセスの実行回数を決定します。
  • Workflow:

    • ワークフローは、プロセスとデータフロー(チャネルやオペレーター)を構成する機能です。

Channel

チャネルには、 value channelqueue channel の2種類があります。

value channel

値を1つだけ保持するチャネルで、以下のように定義します。変数1つに飲み割り当てられますが、プロセスやオペレーターによって何度も使用可能です。

ch1 = Channel.value( 'T2T-CHM13v2.0' )
ch2 = Channel.value( ['chromosome1', 'chromosome2', 'chromosome3'] )
ch3 = Channel.value( ['chromosome1': '248387328', 'chromosome2': '242696752', 'chromosome3': '201105948'] )
queue Channel

複数の値を保持できるチャネルで、以下のように様々なファクトリーメソッドで生成できます。

.of Channel
// Channel.of() ファクトリー + view演算子
Channel.of(1,2,3,4).view()
// 複数の型情報を持つことができる
Channel.of(1..22, 'X', 'Y').view()
.fromList Channel
main.nf
//fromList() ファクトリー + view演算子
tool_lists = ['bowtie2', 'bwa-mem2']
Channel.fromList(tool_lists).view()
// -> bowtie2
// -> bwa-mem2と出力
//.of()だと、['bowtie2', 'bwa-mem2']と出力
  • .fromListは要素を分割して出力しますが、.ofは1つのリストとして出力する点に注意が必要です。
.fromPath Channel

globパターンで一致するファイルパスをチャネルとして取得します。

main.nf
// refフォルダの*.faファイルのパスを取得
ref_ch = Channel.fromPath( 'ref/T2T-CHM13v2.0.fa' )
// globパターンマッチで複数ファイルの参照も可能
ref_ch = Channel.fromPath( 'ref/*.fa' )
// ref/T2T-CHM13v2.0.fa
// ref/GRCh38.fa
// ref/GRCm39.fa
// ref/mm10.fa

追加オプション:

オプション名 説明
glob true の場合、*?[]{} の文字がグロブワイルドカードとして解釈されます。false の場合は文字通りに扱われます(デフォルト: true
type マッチ対象: file(ファイル)、dir(ディレクトリ)、any(いずれか)。デフォルト: file
hidden 隠しファイルの含有可否(デフォルト: false)
maxDepth 検索するディレクトリの最大深さ(デフォルト: 制限なし)
followLinks シンボリックリンクの追跡可否(デフォルト: true)
relative 相対パスでの出力可否 (デフォルト: false)
checkIfExists 存在しないファイルに対するエラーチェック(デフォルト: false)

例:

// Pathの取得とファイルの存在を同時に実行
ref_ch = Channel.fromPath( 'ref/*.fasta', checkIfExists: true )
// refフォルダに.fastaファイルがない場合は、エラーが発生
.fromFilePairs Channel

ショートリードデータのペア (R1/R2) を扱う場合に便利です。

Ref R1 R2
ref1 sequence/ref1_R1.fastq.gz sequence/ref1_R2.fastq.gz
ref2 sequence/ref2_R1.fastq.gz sequence/ref2_R2.fastq.gz
ref3 sequence/ref3_R1.fastq.gz sequence/ref3_R2.fastq.gz
SRR18585215 sequence/SRR18585215_R1.fastq.gz sequence/SRR18585215_R2.fastq.gz
SRR18585218 sequence/SRR18585218_R1.fastq.gz sequence/SRR18585218_R2.fastq.gz
SRR18585216 sequence/SRR18585216_R1.fastq.gz sequence/SRR18585216_R2.fastq.gz

sequenceフォルダが以下の構造である場合、

// sequenceフォルダのfastq.gzファイルのタプル取得
read_pair_ch = Channel.fromFilePairs( 'sequence/*_R{1,2}.fastq.gz' )

プレフィックスの一致でフルパスのタプルが作成されます。

[ref1, [<Path to sequence dir>/sequence/ref1_R1.fastq.gz, <Path to sequence dir>/sequence/ref1_R2.fastq.gz]]
[ref2, [<Path to sequence dir>/sequence/ref2_R1.fastq.gz, <Path to sequence dir>/sequence/ref2_R2.fastq.gz]]
[ref3, [<Path to sequence dir>/sequence/ref3_R1.fastq.gz, <Path to sequence dir>/sequence/ref3_R2.fastq.gz]]
[SRR18585215, [<Path to sequence dir>/sequence/SRR18585215_R1.fastq.gz, <Path to sequence dir>/sequence/SRR18585215_R2.fastq.gz]]
[SRR18585218, [<Path to sequence dir>/sequence/SRR18585218_R1.fastq.gz, <Path to sequence dir>/sequence/SRR18585218_R2.fastq.gz]]
[SRR18585216, [<Path to sequence dir>/sequence/SRR18585216_R1.fastq.gz, <Path to sequence dir>/sequence/SRR18585216_R2.fastq.gz]]

プレフィックスでまとめたい場合:

read_pair_ch = Channel.fromFilePairs( 'sequence/ref{1,2,3}*.fastq.gz', size:6 )
read_pair_ch.view()

出力例:

[ref, [<Path to sequence dir>/ref1_R1.fastq.gz, <Path to sequence dir>/ref1_R2.fastq.gz, <Path to sequence dir>/ref2_R1.fastq.gz, <Path to sequence dir>/ref2_R2.fastq.gz, <Path to sequence dir>/ref3_R1.fastq.gz, <Path to sequence dir>/ref3_R2.fastq.gz]]
.fromSRA Channel

NCBIのSRAデータベースから直接FASTQファイルを取得するチャネルを生成できます。

// SRR18585215を含むプロジェクトACC IDを指定
sra_ch = Channel.fromSRA('SRP367264')
sra_ch.view()

出力例:

[SRR22221926, [/vol1/fastq/SRR222/026/SRR22221926/SRR22221926_1.fastq.gz, /vol1/fastq/SRR222/026/SRR22221926/SRR22221926_2.fastq.gz]]
[SRR22221927, [/vol1/fastq/SRR222/027/SRR22221927/SRR22221927_1.fastq.gz, /vol1/fastq/SRR222/027/SRR22221927/SRR22221927_2.fastq.gz]]
...

または複数のSRR IDを指定することも可能です。

ids = ['SRR18585215', 'SRR18585216' ,'SRR18585218']
sra_ch = Channel.fromSRA(ids)
sra_ch.view()

出力例:

[SRR18585215, [/vol1/fastq/SRR185/015/SRR18585215/SRR18585215_1.fastq.gz, /vol1/fastq/SRR185/015/SRR18585215/SRR18585215_2.fastq.gz]]
[SRR18585216, [/vol1/fastq/SRR185/016/SRR18585216/SRR18585216_1.fastq.gz, /vol1/fastq/SRR185/016/SRR18585216/SRR18585216_2.fastq.gz]]
[SRR18585218, [/vol1/fastq/SRR185/018/SRR18585218/SRR18585218_1.fastq.gz, /vol1/fastq/SRR185/018/SRR18585218/SRR18585218_2.fastq.gz]]

Process

Process定義のうちデータ処理には以下のブロックが使用されます:

  • 実行する処理を記述するscriptブロック
  • プロセスの入力データを定義するinputブロック
  • プロセスの出力データを定義するoutputブロック
  • オプション設定を記述するdirectivesブロック

また、stabブロックはコマンドライン引数と併用することでプロトタイプ実行を可能にします。

scriptブロック

処理を記述します。スクリプトの先頭にShebangを指定することで、bash以外のスクリプト言語も使用可能です。

例えば、アユのリファレンスゲノム配列をNCBIのFTPサイトからダウンロードし、Fastaヘッダの数をカウントする処理は以下のように記述できます。

main.nf
nextflow.enable.dsl=2

process Paa_COUNT {
    script:
    """
    wget -O GCA_036571765.1_ASM3657176v1_genomic.fna.gz \
    https://ftp.ncbi.nlm.nih.gov/genomes/all/GCA/036/571/765/GCA_036571765.1_ASM3657176v1/GCA_036571765.1_ASM3657176v1_genomic.fna.gz
    zgrep -c '^>' GCA_036571765.1_ASM3657176v1_genomic.fna.gz
    """
}

workflow {
    Paa_COUNT()
}

実行例:

nextflow run main.nf -process.echo

出力:

Launching `main.nf` [spontaneous_keller] DSL2 - revision: 49fab666b8

executor >  local (1)
[ac/a6d232] process > Paa_COUNT [100%] 1 of 1 ✔
4119

Pythonを用いた例も次のように記述可能です:

main.nf
nextflow.enable.dsl=2

process Paa_COUNT {
    script:
    """
    #!/usr/bin/env python
    import gzip
    import requests

    url='https://ftp.ncbi.nlm.nih.gov/genomes/all/GCA/036/571/765/GCA_036571765.1_ASM3657176v1/GCA_036571765.1_ASM3657176v1_genomic.fna.gz'
    filename='GCA_036571765.1_ASM3657176v1_genomic.fna.gz'

    urlData = requests.get(url).content
    with open(filename, mode = 'wb') as f:
        f.write(urlData)
    
    header_count = 0
    with gzip.open(filename, mode = 'rt') as f:
        for line in f :
            if line.startswith('>'):
                header_count += 1

    print(f'{header_count}')
    """
}

workflow {
    Paa_COUNT()
}

実行:

nextflow run main.nf -process.echo

出力:

Launching `main.nf` [high_einstein] DSL2 - revision: 0ba4a9e999

executor >  local (1)
[14/5c7540] process > Paa_COUNT [100%] 1 of 1 ✔
4119

また、if文を用いた条件分岐も可能です:

main.nf
nextflow.enable.dsl=2

params.species = paa

process SEQ_COUNT {
    script:
    if ( params.species == 'paa' ) {
        """
        wget -O GCA_036571765.1_ASM3657176v1_genomic.fna.gz \
        https://ftp.ncbi.nlm.nih.gov/genomes/all/GCA/036/571/765/GCA_036571765.1_ASM3657176v1/GCA_036571765.1_ASM3657176v1_genomic.fna.gz
        zgrep -c '^>' GCA_036571765.1_ASM3657176v1_genomic.fna.gz
        """
    }
    else if ( params.species == 'zebra' ) {
        """
        wget -O GRCz11.fna.gz \
        https://ftp.ncbi.nlm.nih.gov/genomes/all/GCF/000/002/035/GCF_000002035.6_GRCz11/GCF_000002035.6_GRCz11_genomic.fna.gz
        zgrep -c '^>' GRCz11.fna.gz
        """
    }
    else {
        """
        echo Unknown spceies $params.species
        """
    }

}

workflow {
    SEQ_COUNT()
}

実行:

nextflow run main.nf -process.echo --species zebra

出力:

Launching `main.nf` [silly_avogadro] DSL2 - revision: 56b967513e

executor >  local (1)
[1b/f496b8] process > SEQ_COUNT [100%] 1 of 1 ✔
Execute zebra genome sequence data
1923

Completed at: 05-May-2025 14:57:12
Duration    : 1m 49s
CPU hours   : (a few seconds)
Succeeded   : 1
input・outputブロック

プロセスは互いに独立していますが、チャネルを介してinputおよびoutputブロックで情報の受け渡しが可能です。

inputブロックは、プロセスがどのチャネルから情報を受け取るかを定義します。チャネル内の要素数により、プロセスの依存関係および実行回数が決まります。

inputブロックはプロセス内で1つだけ記述する必要があります。

input:
    <識別子> <変数名>

識別子の種類と説明:

識別子 説明
val 値を変数として受け取り、スクリプト内で名称によって参照可能にします
env 受け取った値を環境変数として設定します
path ファイルをプロセスにコピーし、ファイルとして使用可能にします
stdin 値を標準入力として渡します
tuple val, env, path, stdinのいずれかの識別子を組み合わせた入力のグループを定義します
each 各値に対してプロセスを繰り返し実行します

よく使われるのは val, path, tuple です。

以下は、val識別子で値を受け取る例です:

main.nf
nextflow.enable.dsl=2

process basicExample {
    input:
        val x
    script:
        """
        echo process job $x
        """
}

workflow {
    Channel.of(1,2,3) | basicExample
}

実行:

nextflow run main.nf -process.echo

channel.ofで指定した値が$xに代入されて順不同で出力されます。

executor >  local (3)
[de/6f5d29] process > basicExample (3) [100%] 3 of 3 ✔
process job 2

process job 1

process job 3

path識別子を用いると、ファイルをプロセスのワークディレクトリにコピーして使用可能になります。

main.nf
nextflow.enable.dsl=2

process stats_ref {
    input:
        path ref_file
    
    output:
        val 

    script:
        """
        seqkit stats -a -T -b ${ref_file} 
        """
}

workflow {
    Channel.fromPath('ref/*.fna.gz') | stats_ref
}

実行:

nextflow run main.nf -process.echo

出力:

Launching `main.nf` [cheesy_heisenberg] DSL2 - revision: 62d86431e7

executor >  local (2)
executor >  local (2)
[41/fc83ca] process > stats_ref (1) [100%] 2 of 2 ✔
executor >  local (2)
[41/fc83ca] process > stats_ref (1) [100%] 2 of 2 ✔
file    format  type991 num_seqs        sum_len min_len avg_len max_len Q1      Q2      Q3      sum_gap N50     N50_num Q20(%)  Q30(%)  AvgQual GC(%)   sum_n
GCA_036571765.1_ASM3657176v1_genomic.fna.gz     FASTA   DNA     4119    448375639       200     108855.5        22408856        1070.5  1619.0  3524.0  0       17010411        12      0.00    0.00 0.00e    45.89t  9105991

file18  format  type    num_seqs        sum_len min_len avg_len max_len Q1      Q2      Q3      sum_gap N50     N50_num Q20(%)  Q30(%)  AvgQual GC(%)   sum_n
GRCz11.fna.gz   FASTA   DNA     1923    1679203469      650     873220.7        78093715        17142.0 146921.0        313473.0        0       52186027        14      0.00    0.00    0.00    36.504693618

続いてoutputブロックでは、結果をファイルや変数として出力するために使用します:

output:
    <識別子> <変数名>

例:*.tsv形式で出力されたファイルを確認するプロセス

seqkitの結果ファイルをNextflowの作業ディレクトリ内に(リファレンス名)_stats.tsvの名称で作成します。outputブロックで*.tsvファイルを宣言しているためstats_refチャネルを介して出力されたファイルが期待する*.tsvであるかを精査し、一致しない場合エラーを返します。

main.nf
nextflow.enable.dsl=2

process stats_ref {
    input:
        path ref_file
    
    output:
        path '*.tsv'

    script:
        """
        prefix=\$(basename $ref_file .fna.gz)
        seqkit stats -a -T -b ${ref_file} > \${prefix}_stats.tsv
        """
}

workflow {
    Channel.fromPath('ref/*.fna.gz') | stats_ref
    // use the view operator to display contents of the channel
    stats_ref.out.view()
}

実行:

nextflow run main.nf -process.echo

出力例:

 N E X T F L O W   ~  version 24.10.6

Launching `main.nf` [deadly_yalow] DSL2 - revision: 56d56e763b

executor >  local (2)
[f6/cb8c47] process > stats_ref (1) [100%] 2 of 2 ✔
<Path to work dir>/work/30/e153723625b44954620fa4747fa8c7/GCA_036571765.1_ASM3657176v1_genomic_stats.tsv
<Path to work dir>/work/f6/cb8c47a15eb6996b4f7dee56ba988a/GRCz11_stats.tsv

tuple識別子は複数の値をグループとして受け取ることができます。

[group_key,[file1,file2,...]]

タプルを含むチャネルを使用する場合、.filesFromPairsを使用し、対応するprocessのinputにtuple識別子を使用するとともに、タプル内の各項目を定義します。

以下はその使用例です。ファイル名称を_1.fastq.gz -> _R1.fq.gz、_2.fastq.gz -> _R2.fq.gzに変更するような処理をしています。

main.nf
nextflow.enable.dsl=2

process TUPPLE_FASTQ {
    input:
        tuple val(sample_id), path(reads)
    
    output:
        tuple val(sample_id), path("*.fq.gz")

    script:
        """
        cat *_1.fastq.gz | pigz > ${sample_id}_R1.fq.gz
        cat *_2.fastq.gz | pigz > ${sample_id}_R2.fq.gz
        """
}

// sequenceフォルダのfastq.gzファイルのタプル取得
read_pair_ch = Channel.fromFilePairs( 'sequence/*_{1,2}.fastq.gz' )

workflow {
    TUPPLE_FASTQ(read_pair_ch)
    TUPPLE_FASTQ.out.view()
}

実行:

nextflow run main.nf -process.echo

出力:

[SRR18585216, [<Path to work dir>/test_nextflow/work/b0/a5e66b7d5336e02dae880ff148d0aa/SRR18585216_R1.fq.gz, <Path to work dir>/test_nextflow/work/b0/a5e66b7d5336e02dae880ff148d0aa/SRR18585216_R2.fq.gz]]
[SRR18585213, [<Path to work dir>/test_nextflow/work/dc/0e6e9fc29a9189f91ffa03343a610a/SRR18585213_R1.fq.gz, <Path to work dir>/test_nextflow/work/dc/0e6e9fc29a9189f91ffa03343a610a/SRR18585213_R2.fq.gz]]
[SRR18585217, [<Path to work dir>/test_nextflow/work/b3/d9cd73ee68377a7888898ca6ef33e8/SRR18585217_R1.fq.gz, <Path to work dir>/test_nextflow/work/b3/d9cd73ee68377a7888898ca6ef33e8/SRR18585217_R2.fq.gz]]
...
directivesブロック

directivesブロックでは、各プロセスに対する実行環境の設定を行います。これはNextflowの実行エンジン(ローカル、クラスタ、クラウドなど)に対して、各ジョブのリソース要件や制約を指定するために使用します。

使用可能な主なディレクティブの一覧は以下のとおりです。

ディレクティブ 説明
cpus プロセスで使用するCPUコア数を指定します。
memory プロセスで使用するメモリ量を指定します。例: 4.GB、500.MB
time プロセスの最大実行時間を指定します。例: '2h', '30min'
queue ジョブスケジューラ(例: SGE, SLURM)で使用するキュー名を指定します。
label プロセスにラベルを付与します。構成ファイルと連携して柔軟な制御が可能です。
container 使用するDocker/Singularityコンテナを指定します。
conda Conda 環境を指定します。 .yml ファイルやパッケージ名で指定可能です。
label プロセスにラベルをつけて分類します。nextflow.configwithLabelを用いて一括設定が可能です。
tag 実行ログや出力ディレクトリ名に表示されるジョブ名の一部を定義します。ジェブの識別やデバック時に便利です。
publishDir プロセスの出力ファイルを指定ディレクトリにコピー(または移動、Link)します。引数にオプションを指定することで上書きやリンクの方法などを制御できます。

以下は、seqkit を用いて FastQ ファイルの統計情報(stats)を取得するプロセスの例です。

taginputで指定した$sample_idを設定することで、標準出力上に処理対象のサンプル名を表示でき、ログの可読性が向上します。また、cpusに4を指定し、スクリプト内で、$task.cpusを使うことで、seqkitのスレッド数に4が指定されるようになります。

nextflow.enable.dsl=2

process STATS_FASTQ {
    tag "$sample_id"
    cpus 4

    input:
        tuple val(sample_id), path(reads)
    
    output:
        path('*.tsv')
    
    script:
        """
        seqkit stats -a -b -T -j $task.cpus ${sample_id}_1.fastq.gz ${sample_id}_2.fastq.gz > '${sample_id}_stats.tsv'
        """
}

// sequenceフォルダのfastq.gzファイルのタプル取得
read_pair_ch = Channel.fromFilePairs( 'sequence/*_{1,2}.fastq.gz' )

workflow {
    STATS_FASTQ(read_pair_ch)
}

実行:

nextflow run main.nf

実行中の標準出力では、プロセス名の横にtagで指定したサンプルIDが表示され、進行中の処理を視覚的に把握しやすくなります。

Launching `main.nf` [condescending_knuth] DSL2 - revision: 8b2fbc3393

executor >  local (12)
[62/ee87e7] process > STATS_FASTQ (SRR18585215) [100%] 12 of 12 ✔

また、tagには単一の値だけではなく、複数の変数を組み合わせた文字列を指定することが可能です。

nextflow.enable.dsl=2

process STATS_FASTQ {
    tag "${sample_id} (${reads.size()} reads)"
    cpus 4
    publishDir "Results/SRR185851", pattern: "SRR185851*stats.tsv", mode: "copy"
    publishDir "Results/SRR185852", pattern: "SRR185852*stats.tsv", mode: "copy"

    input:
        tuple val(sample_id), path(reads)
    
    output:
        path('*.tsv')
    
    script:
        """
        seqkit stats -a -b -T -j $task.cpus ${sample_id}_1.fastq.gz ${sample_id}_2.fastq.gz > '${sample_id}_stats.tsv'
        """
}

read_pair_ch = Channel.fromFilePairs( 'sequence/*_{1,2}.fastq.gz' )

workflow {
    STATS_FASTQ(read_pair_ch)
}

実行:

nextflow run main.nf

出力:

Launching `main.nf` [nostalgic_lamarck] DSL2 - revision: 16e6869253

executor >  local (9)
[7b/478551] process > STATS_FASTQ (SRR18585215 (2 reads)) [100%] 9 of 9 ✔

ただ、このままでは出力ファイルの保存先が明示されておらず、解析結果はwork/以下に保存されます。

このディレクトリは Nextflow が一時的に使用する場所であるため、後の処理や整理のためにはpublishDirを使って任意のディレクトリに出力をコピー、リンク、または移動するのが一般的です。

publishDir <directory>, parameter: value, parameter2: value ...

たとえば、先ほど生成した*.tsvファイルをResultsフォルダに出力するには、次のように指定します。

以下は、Resultsフォルダにファイルを コピー する例です。

main.nf
nextflow.enable.dsl=2

process STATS_FASTQ {
    tag "$sample_id"
    cpus 4
    publishDir "Results", mode: "copy"

    input:
        tuple val(sample_id), path(reads)
    
    output:
        path('*.tsv')
    
    script:
        """
        seqkit stats -a -b -T -j $task.cpus ${sample_id}_1.fastq.gz ${sample_id}_2.fastq.gz > '${sample_id}_stats.tsv'
        """
}

// sequenceフォルダのfastq.gzファイルのタプル取得
read_pair_ch = Channel.fromFilePairs( 'sequence/*_{1,2}.fastq.gz' )

workflow {
    STATS_FASTQ(read_pair_ch)
}

実行:

nextflow run main.nf

publishDirで指定したフォルダ(ここでは Results)は自動で作成されます。現状、再実行時はオプションを明示的に指定しない限り、ファイルは上書きされます。

確認:

tree Results
Results
├── SRR18585102_1_stats.tsv
├── SRR18585102_2_stats.tsv
├── SRR18585213_1_stats.tsv
...
└── SRR18585220_2_stats.tsv

サンプル ID のパターンに基づいて出力先を分けたい場合は、patternオプションを使ってpublishDirを複数指定できます。

main.nf
nextflow.enable.dsl=2

process STATS_FASTQ {
    tag "$sample_id"
    cpus 4
    publishDir "Results/SRR185851", pattern: "SRR185851*stats.tsv", mode: "copy"
    publishDir "Results/SRR185852", pattern: "SRR185852*stats.tsv", mode: "copy"

    input:
        tuple val(sample_id), path(reads)
    
    output:
        path('*.tsv')
    
    script:
        """
        seqkit stats -a -b -T -j $task.cpus ${sample_id}_1.fastq.gz ${sample_id}_2.fastq.gz > '${sample_id}_stats.tsv'
        """
}

read_pair_ch = Channel.fromFilePairs( 'sequence/*_{1,2}.fastq.gz' )

workflow {
    STATS_FASTQ(read_pair_ch)
}

実行:

nextflow run main.nf

確認:

tree Results
Results
├── SRR185851
│   ├── SRR18585102_1_stats.tsv
│   └── SRR18585102_2_stats.tsv
└── SRR185852
    ├── SRR18585213_1_stats.tsv
    ├── SRR18585213_2_stats.tsv
    ├── ...
    └── SRR18585220_2_stats.tsv

lebelディレクティブを使用すると、複数のプロセスに共通のラベルをつけ、nextflow.config出側で実行環境(CPU数、メモリ量、queueなど)をまとめて指定できます。これによりプロセスごとに毎回リソースを設定せずにすみ保守性が向上します。

process STATS_FASTQ {
    label 'standard'
    ...
}

このように書くと、nextflow.config 側で以下のような設定が有効になります。

process {
    withLabel: standard {
        cpus = 4
        memory = '8.GB'
        time = '2h'
        queue = 'grid01'
    }
}

labelを使うことで、実行環境の切り替えを外部から柔軟に制御可能になります。

Operators

NextflowではChannelを使ってデータの流れを制御しますが、その中でオペレーターと呼ばれる機能を使うことでチャネルに流れるデータの変換・フィルタリング・結合を柔軟に扱うことができます。これは、Unixのパイプ処理のような考え方で、データを逐次処理するイメージです。

channel_obj.<operator>()

map: データの変換

mapは、チャネル内の各要素に対して処理を行い、変換結果を新しいチャネルに流します。getSinpleNameもオペレーターで/some/path/file.tar.gz -> fileの変換をします。

例: ファイル名のプレフィックスを抽出

main.nf
nextflow.enable.dsl=2

workflow {
    read_ch = Channel.fromPath('sequence/*.fastq.gz')

    prefix_ch = read_ch.map { name -> 
        name.getSimpleName().replaceAll(/_[12]$/, '')
    }

    prefix_ch.view()
}

実行:

nextflow run main.nf

出力:

Launching `main.nf` [astonishing_feynman] DSL2 - revision: 84ad2eb946

SRR18585218
SRR18585214
SRR18585220
SRR18585219
SRR18585102
SRR18585213
SRR18585213
SRR18585216
SRR18585220
SRR18585217
SRR18585216
SRR18585217
SRR18585219
SRR18585218
SRR18585102
SRR18585215
SRR18585214
SRR18585215

R1とR2それぞれに対して置換処理をして出力しているので、ACC IDが重複しています。

filter: 条件によるフィルタリング

filterは、チャネル内のデータのうち、条件を満たすものだけを次に渡します。

例: R1ファイルだけを取り出してファイル名のプレフィックスを抽出

main.nf
nextflow.enable.dsl=2

workflow {
    read_ch = Channel.fromPath('sequence/*.fastq.gz')

    r1_ch = read_ch.filter { r1 ->
        r1.name.contains('_1.fastq.gz')
    }
    prefix_ch = r1_ch.map { name -> 
        file.getSimpleName().replaceAll(/_[1]$/, '')
    }

    prefix_ch.view()
}

実行:

nextflow run main.nf

出力:

Launching `main.nf` [jolly_volta] DSL2 - revision: a11bf4c0a9

SRR18585214
SRR18585219
SRR18585213
SRR18585220
SRR18585217
SRR18585216
SRR18585218
SRR18585102
SRR18585215

また、ペアファイル確定であれば.fromFilePairs()でサンプルIDとペアファイルのtupleを作成して、Sample IDの要素にアクセスすることで同様の目的が達成できます。

main.nf
Channel.fromFilePairs("sequence/*_{1,2}.fastq.gz")
    .map { name -> name[0] }
    .view()

Channel内のアイテム分割

Nextflowにはいくつかのファイルフォーマットに対応した分割処理用のOperatorが用意されています。大きなファイルを小さな単位で処理したい時や外部ファイルから処理サンプルを指定するときなどに便利です。

  • splitCsv:
    • csvまたはtsvファイルを1行ずつ読み込み。サンプルシートやメタデータの処理、ヘッダーを認識してのカラム処理に使用
  • splitFasta:
    • Fasta形式のファイルを1レコードごとに分割。各シーケンスごとの個別処理や巨大なFastaファイルの分割処理に使用
  • splitFastq:
    • 1リードごと(4行単位)に分割して読み込み。1リード単位でのフィルタリングや変換、分割処理や特定配列のパターン処理に使用
  • splitText:
    • テキストファイルを1行づつ分割読み込み。ログファイルやプレーンなtsv/csvファイルなどの行処理やキーワードで行フィルタリングする場合に使用
splitCsv

複数ある分割処理系のOperatorの中で、一番使用頻度が高くて使い勝手の良いだと思います。解析対象サンプルを記載したサンプルシートを使用する時splitCsvを使用します。

例: サンプルシートに記載されたサンプル名と対応するfastq.gzのペアファイルの情報を抽出

sample_id read1 read2
SRR18585214 sequence/SRR18585214_1.fastq.gz sequence/SRR18585214_2.fastq.gz
SRR18585215 sequence/SRR18585215_1.fastq.gz sequence/SRR18585215_2.fastq.gz
SRR18585216 sequence/SRR18585216_1.fastq.gz sequence/SRR18585216_2.fastq.gz
SRR18585217 sequence/SRR18585217_1.fastq.gz sequence/SRR18585217_2.fastq.gz

csv形式のテキストに対して、レコードまたは指定されたサイズのレコードグループに分割します。header lessのcsvの場合はit[0]のように位置インデックスでアクセス可能です。ヘッダーが存在する場合、header:trueを指定することで列名で値にアクセスすることが可能です。

sample.csvをコマンドで作成する場合こちらを実行
(
  echo "sample_id,read1,read2"
  for sid in SRR18585214 SRR18585215 SRR18585216 SRR18585217; do
    echo "${sid},sequence/${sid}_1.fastq.gz,sequence/${sid}_2.fastq.gz"
  done
) > sample.csv

.fromPathでサンプルシートファイルの情報を取得して、.splitCsv()で分割処理しています。

nextflow.enable.dsl=2

Channel
    .fromPath( 'sample.csv' )
    .splitCsv()
    .view()

実行:

nextflow run main.nf
Launching `main.nf` [evil_payne] DSL2 - revision: 65529d2859

[sample_id, read1, read2]
[SRR18585214, sequence/SRR18585214_1.fastq.gz, sequence/SRR18585214_2.fastq.gz]
[SRR18585215, sequence/SRR18585215_1.fastq.gz, sequence/SRR18585215_2.fastq.gz]
[SRR18585216, sequence/SRR18585216_1.fastq.gz, sequence/SRR18585216_2.fastq.gz]
[SRR18585217, sequence/SRR18585217_1.fastq.gz, sequence/SRR18585217_2.fastq.gz]

workflow

ここまでは個別の処理方法について確認してきました。ただ、実際には多くの処理がデータの受け渡しによって繋がっています。Channelを用いることで、個々のタスクを定義したprocessにデータを渡すことができると分かったので、ここからは、それらの処理をworkflowでパイプライン化していきます。

以下は、FastpによるデータQCを実行し、出力されたレポートファイルをMultiQCで解析した後、結果をpublishDirに指定したフォルダへ出力する一連の処理を示します。プロセス間のデータのやり取りにはemitを使用しています。emitを使うことで、プロセスの出力に任意の名前をつけることができ、以降の処理で、プロセス名.out.名前という形式で参照することが可能になります。

加えて、csv形式のサンプルシート(sample.csv)を用いて解析対象サンプルを制御します。

https://github.com/NaokiShibata/Demodata/blob/main/nextflow-demo/sample.csv

main.nf
nextflow.enable.dsl=2

process FASTP {
    tag "$sample_id"
    cpus 16
    publishDir "Results/fastp/fastq", pattern: "*_R{1,2}.fastq.gz", mode: "copy"
    publishDir "Results/fastp/summary", pattern: "*.{html,json}", mode: "copy"

    input:
        tuple val(sample_id), path(read1), path(read2)
    output:
        tuple(path("${sample_id}_R{1,2}.fastq.gz")), emit: 'fastp_fastq'
        path("${sample_id}_report.{html,json}"), emit: 'fastp_summary'

    script:
        """
        fastp \
        --in1 ${read1} \
        --in2 ${read2} \
        --out1 ${sample_id}_R1.fastq.gz \
        --out2 ${sample_id}_R2.fastq.gz \
        --html ${sample_id}_report.html \
        --json ${sample_id}_report.json \
        --qualified_quality_phred 30 \
        --length_required 50 \
        --detect_adapter_for_pe \
        --trim_poly_g \
        --cut_front \
        --thread ${task.cpus}
        """
}

process MULTIQC {
    publishDir "Results/Multiqc", mode: 'copy'
    input:
        path fastp_summary
    output:
        path "*"
    script:
        """
        multiqc .
        """
}

workflow{
    read_ch=Channel
        .fromPath('sample.csv')
        .splitCsv(header:true)
        .map {
            row -> tuple(row.sample_id, file(row.read1), file(row.read2))
        }
    FASTP(read_ch)
    MULTIQC(FASTP.out.fastp_summary.collect())
}

実行:

nextflow run main.nf

出力:

WARN: Output `tuple` must define at least two elements -- Check process `FASTP`
executor >  local (5)
[83/b7f097] process > FASTP (SRR18585215) [100%] 4 of 4 ✔
[48/83d016] process > MULTIQC             [100%] 1 of 1 ✔
Completed at: 06-May-2025 19:33:40
Duration    : 1m 32s
CPU hours   : 0.6
Succeeded   : 5

確認:

tree Results
Results/
├── fastp
│   ├── fastq
│   │   ├── SRR18585214_R1.fastq.gz
│   │   ├── SRR18585214_R2.fastq.gz
│       ├── [省略]
│   └── summary
│       ├── SRR18585214_report.html
│       ├── SRR18585214_report.json
│       ├── [省略]
└── Multiqc
    ├── multiqc_data
    │   ├── fastp_filtered_reads_plot.txt
    │   ├── [省略]
    └── multiqc_report.html

これでQC 済みの Fastq ファイル と、QC 結果をまとめた multiqc_report.html を取得することができました。

感想

Nextflow については、最初はワークフロー内でのデータの受け渡しが「出力ファイル」ではなく「プロセス間のデータフロー」で完結する点が理解しづらく苦労しました。

今後は nf-core が提供する既存のワークフローを利用するだけでなく、自身の解析にも積極的にNextflowを活用していきたいですね。

出典・参考

Discussion