Nextflowの勉強1
はじめに
Nextflowは、主にバイオインフォマティクス領域で使用される、解析パイプラインの記述・実行・管理を行うためのワークフローツールです。複数の解析ステップを自動化し、ヒューマンエラーを防ぎながら、効率的に解析を進めることを目的としています。
Nextflowの主な特徴は以下の通りです。
- 複雑な解析ステップを、明確に定義されたフローに従って自動的に実行できる
- 大量のデータやサンプルに対して並列処理を自動で行い、計算資源を有効活用できる
- ローカルPC、クラウド、HPC環境など、幅広いプラットフォームで動作する
- コードの再現性が高く、他者との共有や長期的な保守に適している
Nextflowでは、基本的に以下の要素でパイプラインを構築します。
-
process
個々の解析ステップ(コマンド実行単位)を定義する単位です。 -
workflow
定義した process をどのような順序・関係性で実行するかを記述するセクションです。 -
channel
データ(ファイル名、パラメータなど)を process 間で受け渡すための仕組みです。 -
params
各種設定値(データディレクトリ、オプションパラメータなど)を記録するための変数です。
これらを組み合わせることで、柔軟かつ拡張性の高い解析パイプラインを表現できます。
多機能すぎるNextflowについて、簡単にではありますが学んだ内容を、メモとしてここに記録しておきます。最終的にFastqファイルをFastpで処理して、結果のSummaryをMultiQCで取得することをゴールとしています。
Nextflowのインストール
Nextflowを最もシンプルに導入するには、以下の手順を実行します:
- Java 11 以上がインストールされていることを確認
-
curl -s https://get.nextflow.io | bash
を実行
# Javaのバージョンの確認
java --version
# Nextflowのインストール
curl -s https://get.nextflow.io | bash
N E X T F L O W
version 24.10.6 build 5937
created 23-04-2025 16:53 UTC (24-04-2025 01:53 JDT)
cite doi:10.1038/nbt.3820
http://nextflow.io
また、Biocondaを使って以下のようにインストールすることも可能です。
# Miniforge3のインストール
wget "https://github.com/conda-forge/miniforge/releases/download/24.11.3-2/Miniforge3-24.11.3-2-Linux-x86_64.sh"
bash Miniforge3-24.11.3-2-Linux-x86_64.sh -b -p $PWD/miniforge3
# 現在のシェル環境にconda/mambaの設定を反映
. ${PWD}/miniforge3/etc/profile.d/conda.sh
. ${PWD}/miniforge3/etc/profile.d/mamba.sh
# 仮想環境の有効化
mamba activate
# 仮想環境の作成とパッケージのインストール(例:nextflow, seqkit, fastp, multiqc, pigz)
mamba create -n nextflow_env -c bioconda nextflow seqkit fastp multiqc pigz
# 仮想環境の有効化
mamba activate nextflow_env
# バージョン確認
nextflow -v
# nextflow version 24.10.6.5937
また、バイナリファイルを直接ダウンロードして使用することもできます。
# バイナリのダウンロード
wget https://github.com/nextflow-io/nextflow/releases/download/v24.10.6/nextflow-24.10.6-dist
# 実行権限の付与
chmod +x nextflow-24.10.6-dist
# バージョン確認
./nextflow-24.10.6-dist -v
# nextflow version 24.10.6.5937
上記のいずれかの方法でNextflowをインストールできます。
nextflowでデータ処理
冒頭でも簡単に触れましたが、ここでNextflowの構成要素について改めて整理します。
Nextflowの構成要素(Process, Channel, Workflow)
-
process
:- プロセスでは、実行するタスクを定義します。実行スクリプトは、bash、Perl、Ruby、Python、Rなど、Linux上で動作する任意のスクリプト言語で記述可能です。
- 各入力セットに対してタスクが生成され、それぞれのタスクは独立して実行されます。他のタスクとの直接のやり取りはできません。
- プロセス間でデータを受け渡すには、非同期キューである
Channel
を使用します。 - プロセスでは、タスクの入力・処理内容・出力を定義します。
-
Channel
:- チャネルは、あるプロセスから次のプロセスへのデータの流れを定義します。
- ファイルや値など、複数の要素を保持可能です。
- チャネルに保持される要素の数が、それを入力とするプロセスの実行回数を決定します。
-
Workflow
:- ワークフローは、プロセスとデータフロー(チャネルやオペレーター)を構成する機能です。
Channel
チャネルには、 value channel と queue channel の2種類があります。
value channel
値を1つだけ保持するチャネルで、以下のように定義します。変数1つに飲み割り当てられますが、プロセスやオペレーターによって何度も使用可能です。
ch1 = Channel.value( 'T2T-CHM13v2.0' )
ch2 = Channel.value( ['chromosome1', 'chromosome2', 'chromosome3'] )
ch3 = Channel.value( ['chromosome1': '248387328', 'chromosome2': '242696752', 'chromosome3': '201105948'] )
queue Channel
複数の値を保持できるチャネルで、以下のように様々なファクトリーメソッドで生成できます。
.of
Channel
// Channel.of() ファクトリー + view演算子
Channel.of(1,2,3,4).view()
// 複数の型情報を持つことができる
Channel.of(1..22, 'X', 'Y').view()
.fromList
Channel
//fromList() ファクトリー + view演算子
tool_lists = ['bowtie2', 'bwa-mem2']
Channel.fromList(tool_lists).view()
// -> bowtie2
// -> bwa-mem2と出力
//.of()だと、['bowtie2', 'bwa-mem2']と出力
-
.fromList
は要素を分割して出力しますが、.of
は1つのリストとして出力する点に注意が必要です。
.fromPath
Channel
globパターンで一致するファイルパスをチャネルとして取得します。
// refフォルダの*.faファイルのパスを取得
ref_ch = Channel.fromPath( 'ref/T2T-CHM13v2.0.fa' )
// globパターンマッチで複数ファイルの参照も可能
ref_ch = Channel.fromPath( 'ref/*.fa' )
// ref/T2T-CHM13v2.0.fa
// ref/GRCh38.fa
// ref/GRCm39.fa
// ref/mm10.fa
追加オプション:
オプション名 | 説明 |
---|---|
glob |
true の場合、* 、? 、[] 、{} の文字がグロブワイルドカードとして解釈されます。false の場合は文字通りに扱われます(デフォルト: true ) |
type | マッチ対象: file (ファイル)、dir (ディレクトリ)、any (いずれか)。デフォルト: file
|
hidden | 隠しファイルの含有可否(デフォルト: false) |
maxDepth | 検索するディレクトリの最大深さ(デフォルト: 制限なし) |
followLinks | シンボリックリンクの追跡可否(デフォルト: true) |
relative | 相対パスでの出力可否 (デフォルト: false) |
checkIfExists | 存在しないファイルに対するエラーチェック(デフォルト: false) |
例:
// Pathの取得とファイルの存在を同時に実行
ref_ch = Channel.fromPath( 'ref/*.fasta', checkIfExists: true )
// refフォルダに.fastaファイルがない場合は、エラーが発生
.fromFilePairs
Channel
ショートリードデータのペア (R1/R2) を扱う場合に便利です。
Ref | R1 | R2 |
---|---|---|
ref1 | sequence/ref1_R1.fastq.gz | sequence/ref1_R2.fastq.gz |
ref2 | sequence/ref2_R1.fastq.gz | sequence/ref2_R2.fastq.gz |
ref3 | sequence/ref3_R1.fastq.gz | sequence/ref3_R2.fastq.gz |
SRR18585215 | sequence/SRR18585215_R1.fastq.gz | sequence/SRR18585215_R2.fastq.gz |
SRR18585218 | sequence/SRR18585218_R1.fastq.gz | sequence/SRR18585218_R2.fastq.gz |
SRR18585216 | sequence/SRR18585216_R1.fastq.gz | sequence/SRR18585216_R2.fastq.gz |
sequenceフォルダが以下の構造である場合、
// sequenceフォルダのfastq.gzファイルのタプル取得
read_pair_ch = Channel.fromFilePairs( 'sequence/*_R{1,2}.fastq.gz' )
プレフィックスの一致でフルパスのタプルが作成されます。
[ref1, [<Path to sequence dir>/sequence/ref1_R1.fastq.gz, <Path to sequence dir>/sequence/ref1_R2.fastq.gz]]
[ref2, [<Path to sequence dir>/sequence/ref2_R1.fastq.gz, <Path to sequence dir>/sequence/ref2_R2.fastq.gz]]
[ref3, [<Path to sequence dir>/sequence/ref3_R1.fastq.gz, <Path to sequence dir>/sequence/ref3_R2.fastq.gz]]
[SRR18585215, [<Path to sequence dir>/sequence/SRR18585215_R1.fastq.gz, <Path to sequence dir>/sequence/SRR18585215_R2.fastq.gz]]
[SRR18585218, [<Path to sequence dir>/sequence/SRR18585218_R1.fastq.gz, <Path to sequence dir>/sequence/SRR18585218_R2.fastq.gz]]
[SRR18585216, [<Path to sequence dir>/sequence/SRR18585216_R1.fastq.gz, <Path to sequence dir>/sequence/SRR18585216_R2.fastq.gz]]
プレフィックスでまとめたい場合:
read_pair_ch = Channel.fromFilePairs( 'sequence/ref{1,2,3}*.fastq.gz', size:6 )
read_pair_ch.view()
出力例:
[ref, [<Path to sequence dir>/ref1_R1.fastq.gz, <Path to sequence dir>/ref1_R2.fastq.gz, <Path to sequence dir>/ref2_R1.fastq.gz, <Path to sequence dir>/ref2_R2.fastq.gz, <Path to sequence dir>/ref3_R1.fastq.gz, <Path to sequence dir>/ref3_R2.fastq.gz]]
.fromSRA
Channel
NCBIのSRAデータベースから直接FASTQファイルを取得するチャネルを生成できます。
// SRR18585215を含むプロジェクトACC IDを指定
sra_ch = Channel.fromSRA('SRP367264')
sra_ch.view()
出力例:
[SRR22221926, [/vol1/fastq/SRR222/026/SRR22221926/SRR22221926_1.fastq.gz, /vol1/fastq/SRR222/026/SRR22221926/SRR22221926_2.fastq.gz]]
[SRR22221927, [/vol1/fastq/SRR222/027/SRR22221927/SRR22221927_1.fastq.gz, /vol1/fastq/SRR222/027/SRR22221927/SRR22221927_2.fastq.gz]]
...
または複数のSRR IDを指定することも可能です。
ids = ['SRR18585215', 'SRR18585216' ,'SRR18585218']
sra_ch = Channel.fromSRA(ids)
sra_ch.view()
出力例:
[SRR18585215, [/vol1/fastq/SRR185/015/SRR18585215/SRR18585215_1.fastq.gz, /vol1/fastq/SRR185/015/SRR18585215/SRR18585215_2.fastq.gz]]
[SRR18585216, [/vol1/fastq/SRR185/016/SRR18585216/SRR18585216_1.fastq.gz, /vol1/fastq/SRR185/016/SRR18585216/SRR18585216_2.fastq.gz]]
[SRR18585218, [/vol1/fastq/SRR185/018/SRR18585218/SRR18585218_1.fastq.gz, /vol1/fastq/SRR185/018/SRR18585218/SRR18585218_2.fastq.gz]]
Process
Process定義のうちデータ処理には以下のブロックが使用されます:
- 実行する処理を記述する
script
ブロック - プロセスの入力データを定義する
input
ブロック - プロセスの出力データを定義する
output
ブロック - オプション設定を記述する
directives
ブロック
また、stab
ブロックはコマンドライン引数と併用することでプロトタイプ実行を可能にします。
scriptブロック
処理を記述します。スクリプトの先頭にShebangを指定することで、bash以外のスクリプト言語も使用可能です。
例えば、アユのリファレンスゲノム配列をNCBIのFTPサイトからダウンロードし、Fastaヘッダの数をカウントする処理は以下のように記述できます。
nextflow.enable.dsl=2
process Paa_COUNT {
script:
"""
wget -O GCA_036571765.1_ASM3657176v1_genomic.fna.gz \
https://ftp.ncbi.nlm.nih.gov/genomes/all/GCA/036/571/765/GCA_036571765.1_ASM3657176v1/GCA_036571765.1_ASM3657176v1_genomic.fna.gz
zgrep -c '^>' GCA_036571765.1_ASM3657176v1_genomic.fna.gz
"""
}
workflow {
Paa_COUNT()
}
実行例:
nextflow run main.nf -process.echo
出力:
Launching `main.nf` [spontaneous_keller] DSL2 - revision: 49fab666b8
executor > local (1)
[ac/a6d232] process > Paa_COUNT [100%] 1 of 1 ✔
4119
Pythonを用いた例も次のように記述可能です:
nextflow.enable.dsl=2
process Paa_COUNT {
script:
"""
#!/usr/bin/env python
import gzip
import requests
url='https://ftp.ncbi.nlm.nih.gov/genomes/all/GCA/036/571/765/GCA_036571765.1_ASM3657176v1/GCA_036571765.1_ASM3657176v1_genomic.fna.gz'
filename='GCA_036571765.1_ASM3657176v1_genomic.fna.gz'
urlData = requests.get(url).content
with open(filename, mode = 'wb') as f:
f.write(urlData)
header_count = 0
with gzip.open(filename, mode = 'rt') as f:
for line in f :
if line.startswith('>'):
header_count += 1
print(f'{header_count}')
"""
}
workflow {
Paa_COUNT()
}
実行:
nextflow run main.nf -process.echo
出力:
Launching `main.nf` [high_einstein] DSL2 - revision: 0ba4a9e999
executor > local (1)
[14/5c7540] process > Paa_COUNT [100%] 1 of 1 ✔
4119
また、if文を用いた条件分岐も可能です:
nextflow.enable.dsl=2
params.species = paa
process SEQ_COUNT {
script:
if ( params.species == 'paa' ) {
"""
wget -O GCA_036571765.1_ASM3657176v1_genomic.fna.gz \
https://ftp.ncbi.nlm.nih.gov/genomes/all/GCA/036/571/765/GCA_036571765.1_ASM3657176v1/GCA_036571765.1_ASM3657176v1_genomic.fna.gz
zgrep -c '^>' GCA_036571765.1_ASM3657176v1_genomic.fna.gz
"""
}
else if ( params.species == 'zebra' ) {
"""
wget -O GRCz11.fna.gz \
https://ftp.ncbi.nlm.nih.gov/genomes/all/GCF/000/002/035/GCF_000002035.6_GRCz11/GCF_000002035.6_GRCz11_genomic.fna.gz
zgrep -c '^>' GRCz11.fna.gz
"""
}
else {
"""
echo Unknown spceies $params.species
"""
}
}
workflow {
SEQ_COUNT()
}
実行:
nextflow run main.nf -process.echo --species zebra
出力:
Launching `main.nf` [silly_avogadro] DSL2 - revision: 56b967513e
executor > local (1)
[1b/f496b8] process > SEQ_COUNT [100%] 1 of 1 ✔
Execute zebra genome sequence data
1923
Completed at: 05-May-2025 14:57:12
Duration : 1m 49s
CPU hours : (a few seconds)
Succeeded : 1
input・outputブロック
プロセスは互いに独立していますが、チャネルを介してinput
およびoutput
ブロックで情報の受け渡しが可能です。
input
ブロックは、プロセスがどのチャネルから情報を受け取るかを定義します。チャネル内の要素数により、プロセスの依存関係および実行回数が決まります。
input
ブロックはプロセス内で1つだけ記述する必要があります。
input:
<識別子> <変数名>
識別子の種類と説明:
識別子 | 説明 |
---|---|
val |
値を変数として受け取り、スクリプト内で名称によって参照可能にします |
env |
受け取った値を環境変数として設定します |
path |
ファイルをプロセスにコピーし、ファイルとして使用可能にします |
stdin |
値を標準入力として渡します |
tuple |
val, env, path, stdinのいずれかの識別子を組み合わせた入力のグループを定義します |
each |
各値に対してプロセスを繰り返し実行します |
よく使われるのは val, path, tuple です。
以下は、val識別子で値を受け取る例です:
nextflow.enable.dsl=2
process basicExample {
input:
val x
script:
"""
echo process job $x
"""
}
workflow {
Channel.of(1,2,3) | basicExample
}
実行:
nextflow run main.nf -process.echo
channel.ofで指定した値が$x
に代入されて順不同で出力されます。
executor > local (3)
[de/6f5d29] process > basicExample (3) [100%] 3 of 3 ✔
process job 2
process job 1
process job 3
path
識別子を用いると、ファイルをプロセスのワークディレクトリにコピーして使用可能になります。
nextflow.enable.dsl=2
process stats_ref {
input:
path ref_file
output:
val
script:
"""
seqkit stats -a -T -b ${ref_file}
"""
}
workflow {
Channel.fromPath('ref/*.fna.gz') | stats_ref
}
実行:
nextflow run main.nf -process.echo
出力:
Launching `main.nf` [cheesy_heisenberg] DSL2 - revision: 62d86431e7
executor > local (2)
executor > local (2)
[41/fc83ca] process > stats_ref (1) [100%] 2 of 2 ✔
executor > local (2)
[41/fc83ca] process > stats_ref (1) [100%] 2 of 2 ✔
file format type991 num_seqs sum_len min_len avg_len max_len Q1 Q2 Q3 sum_gap N50 N50_num Q20(%) Q30(%) AvgQual GC(%) sum_n
GCA_036571765.1_ASM3657176v1_genomic.fna.gz FASTA DNA 4119 448375639 200 108855.5 22408856 1070.5 1619.0 3524.0 0 17010411 12 0.00 0.00 0.00e 45.89t 9105991
file18 format type num_seqs sum_len min_len avg_len max_len Q1 Q2 Q3 sum_gap N50 N50_num Q20(%) Q30(%) AvgQual GC(%) sum_n
GRCz11.fna.gz FASTA DNA 1923 1679203469 650 873220.7 78093715 17142.0 146921.0 313473.0 0 52186027 14 0.00 0.00 0.00 36.504693618
続いてoutput
ブロックでは、結果をファイルや変数として出力するために使用します:
output:
<識別子> <変数名>
例:*.tsv形式で出力されたファイルを確認するプロセス
seqkitの結果ファイルをNextflowの作業ディレクトリ内に(リファレンス名)_stats.tsv
の名称で作成します。outputブロックで*.tsv
ファイルを宣言しているためstats_ref
チャネルを介して出力されたファイルが期待する*.tsvであるかを精査し、一致しない場合エラーを返します。
nextflow.enable.dsl=2
process stats_ref {
input:
path ref_file
output:
path '*.tsv'
script:
"""
prefix=\$(basename $ref_file .fna.gz)
seqkit stats -a -T -b ${ref_file} > \${prefix}_stats.tsv
"""
}
workflow {
Channel.fromPath('ref/*.fna.gz') | stats_ref
// use the view operator to display contents of the channel
stats_ref.out.view()
}
実行:
nextflow run main.nf -process.echo
出力例:
N E X T F L O W ~ version 24.10.6
Launching `main.nf` [deadly_yalow] DSL2 - revision: 56d56e763b
executor > local (2)
[f6/cb8c47] process > stats_ref (1) [100%] 2 of 2 ✔
<Path to work dir>/work/30/e153723625b44954620fa4747fa8c7/GCA_036571765.1_ASM3657176v1_genomic_stats.tsv
<Path to work dir>/work/f6/cb8c47a15eb6996b4f7dee56ba988a/GRCz11_stats.tsv
tuple
識別子は複数の値をグループとして受け取ることができます。
[group_key,[file1,file2,...]]
タプルを含むチャネルを使用する場合、.filesFromPairs
を使用し、対応するprocessのinputにtuple識別子を使用するとともに、タプル内の各項目を定義します。
以下はその使用例です。ファイル名称を_1.fastq.gz -> _R1.fq.gz、_2.fastq.gz -> _R2.fq.gzに変更するような処理をしています。
nextflow.enable.dsl=2
process TUPPLE_FASTQ {
input:
tuple val(sample_id), path(reads)
output:
tuple val(sample_id), path("*.fq.gz")
script:
"""
cat *_1.fastq.gz | pigz > ${sample_id}_R1.fq.gz
cat *_2.fastq.gz | pigz > ${sample_id}_R2.fq.gz
"""
}
// sequenceフォルダのfastq.gzファイルのタプル取得
read_pair_ch = Channel.fromFilePairs( 'sequence/*_{1,2}.fastq.gz' )
workflow {
TUPPLE_FASTQ(read_pair_ch)
TUPPLE_FASTQ.out.view()
}
実行:
nextflow run main.nf -process.echo
出力:
[SRR18585216, [<Path to work dir>/test_nextflow/work/b0/a5e66b7d5336e02dae880ff148d0aa/SRR18585216_R1.fq.gz, <Path to work dir>/test_nextflow/work/b0/a5e66b7d5336e02dae880ff148d0aa/SRR18585216_R2.fq.gz]]
[SRR18585213, [<Path to work dir>/test_nextflow/work/dc/0e6e9fc29a9189f91ffa03343a610a/SRR18585213_R1.fq.gz, <Path to work dir>/test_nextflow/work/dc/0e6e9fc29a9189f91ffa03343a610a/SRR18585213_R2.fq.gz]]
[SRR18585217, [<Path to work dir>/test_nextflow/work/b3/d9cd73ee68377a7888898ca6ef33e8/SRR18585217_R1.fq.gz, <Path to work dir>/test_nextflow/work/b3/d9cd73ee68377a7888898ca6ef33e8/SRR18585217_R2.fq.gz]]
...
directivesブロック
directives
ブロックでは、各プロセスに対する実行環境の設定を行います。これはNextflowの実行エンジン(ローカル、クラスタ、クラウドなど)に対して、各ジョブのリソース要件や制約を指定するために使用します。
使用可能な主なディレクティブの一覧は以下のとおりです。
ディレクティブ | 説明 |
---|---|
cpus | プロセスで使用するCPUコア数を指定します。 |
memory | プロセスで使用するメモリ量を指定します。例: 4.GB、500.MB |
time | プロセスの最大実行時間を指定します。例: '2h', '30min' |
queue | ジョブスケジューラ(例: SGE, SLURM)で使用するキュー名を指定します。 |
label | プロセスにラベルを付与します。構成ファイルと連携して柔軟な制御が可能です。 |
container | 使用するDocker/Singularityコンテナを指定します。 |
conda | Conda 環境を指定します。 .yml ファイルやパッケージ名で指定可能です。 |
label | プロセスにラベルをつけて分類します。nextflow.config でwithLabel を用いて一括設定が可能です。 |
tag | 実行ログや出力ディレクトリ名に表示されるジョブ名の一部を定義します。ジェブの識別やデバック時に便利です。 |
publishDir | プロセスの出力ファイルを指定ディレクトリにコピー(または移動、Link)します。引数にオプションを指定することで上書きやリンクの方法などを制御できます。 |
以下は、seqkit を用いて FastQ ファイルの統計情報(stats)を取得するプロセスの例です。
tag
にinput
で指定した$sample_id
を設定することで、標準出力上に処理対象のサンプル名を表示でき、ログの可読性が向上します。また、cpus
に4を指定し、スクリプト内で、$task.cpus
を使うことで、seqkit
のスレッド数に4
が指定されるようになります。
nextflow.enable.dsl=2
process STATS_FASTQ {
tag "$sample_id"
cpus 4
input:
tuple val(sample_id), path(reads)
output:
path('*.tsv')
script:
"""
seqkit stats -a -b -T -j $task.cpus ${sample_id}_1.fastq.gz ${sample_id}_2.fastq.gz > '${sample_id}_stats.tsv'
"""
}
// sequenceフォルダのfastq.gzファイルのタプル取得
read_pair_ch = Channel.fromFilePairs( 'sequence/*_{1,2}.fastq.gz' )
workflow {
STATS_FASTQ(read_pair_ch)
}
実行:
nextflow run main.nf
実行中の標準出力では、プロセス名の横にtag
で指定したサンプルIDが表示され、進行中の処理を視覚的に把握しやすくなります。
Launching `main.nf` [condescending_knuth] DSL2 - revision: 8b2fbc3393
executor > local (12)
[62/ee87e7] process > STATS_FASTQ (SRR18585215) [100%] 12 of 12 ✔
また、tag
には単一の値だけではなく、複数の変数を組み合わせた文字列を指定することが可能です。
nextflow.enable.dsl=2
process STATS_FASTQ {
tag "${sample_id} (${reads.size()} reads)"
cpus 4
publishDir "Results/SRR185851", pattern: "SRR185851*stats.tsv", mode: "copy"
publishDir "Results/SRR185852", pattern: "SRR185852*stats.tsv", mode: "copy"
input:
tuple val(sample_id), path(reads)
output:
path('*.tsv')
script:
"""
seqkit stats -a -b -T -j $task.cpus ${sample_id}_1.fastq.gz ${sample_id}_2.fastq.gz > '${sample_id}_stats.tsv'
"""
}
read_pair_ch = Channel.fromFilePairs( 'sequence/*_{1,2}.fastq.gz' )
workflow {
STATS_FASTQ(read_pair_ch)
}
実行:
nextflow run main.nf
出力:
Launching `main.nf` [nostalgic_lamarck] DSL2 - revision: 16e6869253
executor > local (9)
[7b/478551] process > STATS_FASTQ (SRR18585215 (2 reads)) [100%] 9 of 9 ✔
ただ、このままでは出力ファイルの保存先が明示されておらず、解析結果はwork/
以下に保存されます。
このディレクトリは Nextflow が一時的に使用する場所であるため、後の処理や整理のためにはpublishDir
を使って任意のディレクトリに出力をコピー、リンク、または移動するのが一般的です。
publishDir <directory>, parameter: value, parameter2: value ...
たとえば、先ほど生成した*.tsv
ファイルをResults
フォルダに出力するには、次のように指定します。
以下は、Resultsフォルダにファイルを コピー する例です。
nextflow.enable.dsl=2
process STATS_FASTQ {
tag "$sample_id"
cpus 4
publishDir "Results", mode: "copy"
input:
tuple val(sample_id), path(reads)
output:
path('*.tsv')
script:
"""
seqkit stats -a -b -T -j $task.cpus ${sample_id}_1.fastq.gz ${sample_id}_2.fastq.gz > '${sample_id}_stats.tsv'
"""
}
// sequenceフォルダのfastq.gzファイルのタプル取得
read_pair_ch = Channel.fromFilePairs( 'sequence/*_{1,2}.fastq.gz' )
workflow {
STATS_FASTQ(read_pair_ch)
}
実行:
nextflow run main.nf
publishDir
で指定したフォルダ(ここでは Results)は自動で作成されます。現状、再実行時はオプションを明示的に指定しない限り、ファイルは上書きされます。
確認:
tree Results
Results
├── SRR18585102_1_stats.tsv
├── SRR18585102_2_stats.tsv
├── SRR18585213_1_stats.tsv
...
└── SRR18585220_2_stats.tsv
サンプル ID のパターンに基づいて出力先を分けたい場合は、pattern
オプションを使ってpublishDir
を複数指定できます。
nextflow.enable.dsl=2
process STATS_FASTQ {
tag "$sample_id"
cpus 4
publishDir "Results/SRR185851", pattern: "SRR185851*stats.tsv", mode: "copy"
publishDir "Results/SRR185852", pattern: "SRR185852*stats.tsv", mode: "copy"
input:
tuple val(sample_id), path(reads)
output:
path('*.tsv')
script:
"""
seqkit stats -a -b -T -j $task.cpus ${sample_id}_1.fastq.gz ${sample_id}_2.fastq.gz > '${sample_id}_stats.tsv'
"""
}
read_pair_ch = Channel.fromFilePairs( 'sequence/*_{1,2}.fastq.gz' )
workflow {
STATS_FASTQ(read_pair_ch)
}
実行:
nextflow run main.nf
確認:
tree Results
Results
├── SRR185851
│ ├── SRR18585102_1_stats.tsv
│ └── SRR18585102_2_stats.tsv
└── SRR185852
├── SRR18585213_1_stats.tsv
├── SRR18585213_2_stats.tsv
├── ...
└── SRR18585220_2_stats.tsv
lebel
ディレクティブを使用すると、複数のプロセスに共通のラベルをつけ、nextflow.config
出側で実行環境(CPU数、メモリ量、queueなど)をまとめて指定できます。これによりプロセスごとに毎回リソースを設定せずにすみ保守性が向上します。
process STATS_FASTQ {
label 'standard'
...
}
このように書くと、nextflow.config
側で以下のような設定が有効になります。
process {
withLabel: standard {
cpus = 4
memory = '8.GB'
time = '2h'
queue = 'grid01'
}
}
label
を使うことで、実行環境の切り替えを外部から柔軟に制御可能になります。
Operators
NextflowではChannel
を使ってデータの流れを制御しますが、その中でオペレーターと呼ばれる機能を使うことでチャネルに流れるデータの変換・フィルタリング・結合を柔軟に扱うことができます。これは、Unixのパイプ処理のような考え方で、データを逐次処理するイメージです。
channel_obj.<operator>()
map
: データの変換
map
は、チャネル内の各要素に対して処理を行い、変換結果を新しいチャネルに流します。getSinpleName
もオペレーターで/some/path/file.tar.gz -> file
の変換をします。
例: ファイル名のプレフィックスを抽出
nextflow.enable.dsl=2
workflow {
read_ch = Channel.fromPath('sequence/*.fastq.gz')
prefix_ch = read_ch.map { name ->
name.getSimpleName().replaceAll(/_[12]$/, '')
}
prefix_ch.view()
}
実行:
nextflow run main.nf
出力:
Launching `main.nf` [astonishing_feynman] DSL2 - revision: 84ad2eb946
SRR18585218
SRR18585214
SRR18585220
SRR18585219
SRR18585102
SRR18585213
SRR18585213
SRR18585216
SRR18585220
SRR18585217
SRR18585216
SRR18585217
SRR18585219
SRR18585218
SRR18585102
SRR18585215
SRR18585214
SRR18585215
R1とR2それぞれに対して置換処理をして出力しているので、ACC IDが重複しています。
filter
: 条件によるフィルタリング
filter
は、チャネル内のデータのうち、条件を満たすものだけを次に渡します。
例: R1ファイルだけを取り出してファイル名のプレフィックスを抽出
nextflow.enable.dsl=2
workflow {
read_ch = Channel.fromPath('sequence/*.fastq.gz')
r1_ch = read_ch.filter { r1 ->
r1.name.contains('_1.fastq.gz')
}
prefix_ch = r1_ch.map { name ->
file.getSimpleName().replaceAll(/_[1]$/, '')
}
prefix_ch.view()
}
実行:
nextflow run main.nf
出力:
Launching `main.nf` [jolly_volta] DSL2 - revision: a11bf4c0a9
SRR18585214
SRR18585219
SRR18585213
SRR18585220
SRR18585217
SRR18585216
SRR18585218
SRR18585102
SRR18585215
また、ペアファイル確定であれば.fromFilePairs()でサンプルIDとペアファイルのtupleを作成して、Sample IDの要素にアクセスすることで同様の目的が達成できます。
Channel.fromFilePairs("sequence/*_{1,2}.fastq.gz")
.map { name -> name[0] }
.view()
Channel内のアイテム分割
Nextflowにはいくつかのファイルフォーマットに対応した分割処理用のOperatorが用意されています。大きなファイルを小さな単位で処理したい時や外部ファイルから処理サンプルを指定するときなどに便利です。
-
splitCsv:
- csvまたはtsvファイルを1行ずつ読み込み。サンプルシートやメタデータの処理、ヘッダーを認識してのカラム処理に使用
-
splitFasta:
- Fasta形式のファイルを1レコードごとに分割。各シーケンスごとの個別処理や巨大なFastaファイルの分割処理に使用
-
splitFastq:
- 1リードごと(4行単位)に分割して読み込み。1リード単位でのフィルタリングや変換、分割処理や特定配列のパターン処理に使用
-
splitText:
- テキストファイルを1行づつ分割読み込み。ログファイルやプレーンなtsv/csvファイルなどの行処理やキーワードで行フィルタリングする場合に使用
splitCsv
複数ある分割処理系のOperatorの中で、一番使用頻度が高くて使い勝手の良いだと思います。解析対象サンプルを記載したサンプルシートを使用する時splitCsvを使用します。
例: サンプルシートに記載されたサンプル名と対応するfastq.gzのペアファイルの情報を抽出
sample_id | read1 | read2 |
---|---|---|
SRR18585214 | sequence/SRR18585214_1.fastq.gz | sequence/SRR18585214_2.fastq.gz |
SRR18585215 | sequence/SRR18585215_1.fastq.gz | sequence/SRR18585215_2.fastq.gz |
SRR18585216 | sequence/SRR18585216_1.fastq.gz | sequence/SRR18585216_2.fastq.gz |
SRR18585217 | sequence/SRR18585217_1.fastq.gz | sequence/SRR18585217_2.fastq.gz |
csv形式のテキストに対して、レコードまたは指定されたサイズのレコードグループに分割します。header lessのcsvの場合はit[0]
のように位置インデックスでアクセス可能です。ヘッダーが存在する場合、header:true
を指定することで列名で値にアクセスすることが可能です。
sample.csvをコマンドで作成する場合こちらを実行
(
echo "sample_id,read1,read2"
for sid in SRR18585214 SRR18585215 SRR18585216 SRR18585217; do
echo "${sid},sequence/${sid}_1.fastq.gz,sequence/${sid}_2.fastq.gz"
done
) > sample.csv
.fromPath
でサンプルシートファイルの情報を取得して、.splitCsv()
で分割処理しています。
nextflow.enable.dsl=2
Channel
.fromPath( 'sample.csv' )
.splitCsv()
.view()
実行:
nextflow run main.nf
Launching `main.nf` [evil_payne] DSL2 - revision: 65529d2859
[sample_id, read1, read2]
[SRR18585214, sequence/SRR18585214_1.fastq.gz, sequence/SRR18585214_2.fastq.gz]
[SRR18585215, sequence/SRR18585215_1.fastq.gz, sequence/SRR18585215_2.fastq.gz]
[SRR18585216, sequence/SRR18585216_1.fastq.gz, sequence/SRR18585216_2.fastq.gz]
[SRR18585217, sequence/SRR18585217_1.fastq.gz, sequence/SRR18585217_2.fastq.gz]
workflow
ここまでは個別の処理方法について確認してきました。ただ、実際には多くの処理がデータの受け渡しによって繋がっています。Channel
を用いることで、個々のタスクを定義したprocess
にデータを渡すことができると分かったので、ここからは、それらの処理をworkflow
でパイプライン化していきます。
以下は、FastpによるデータQCを実行し、出力されたレポートファイルをMultiQCで解析した後、結果をpublishDir
に指定したフォルダへ出力する一連の処理を示します。プロセス間のデータのやり取りにはemit
を使用しています。emit
を使うことで、プロセスの出力に任意の名前をつけることができ、以降の処理で、プロセス名.out.名前
という形式で参照することが可能になります。
加えて、csv形式のサンプルシート(sample.csv)を用いて解析対象サンプルを制御します。
nextflow.enable.dsl=2
process FASTP {
tag "$sample_id"
cpus 16
publishDir "Results/fastp/fastq", pattern: "*_R{1,2}.fastq.gz", mode: "copy"
publishDir "Results/fastp/summary", pattern: "*.{html,json}", mode: "copy"
input:
tuple val(sample_id), path(read1), path(read2)
output:
tuple(path("${sample_id}_R{1,2}.fastq.gz")), emit: 'fastp_fastq'
path("${sample_id}_report.{html,json}"), emit: 'fastp_summary'
script:
"""
fastp \
--in1 ${read1} \
--in2 ${read2} \
--out1 ${sample_id}_R1.fastq.gz \
--out2 ${sample_id}_R2.fastq.gz \
--html ${sample_id}_report.html \
--json ${sample_id}_report.json \
--qualified_quality_phred 30 \
--length_required 50 \
--detect_adapter_for_pe \
--trim_poly_g \
--cut_front \
--thread ${task.cpus}
"""
}
process MULTIQC {
publishDir "Results/Multiqc", mode: 'copy'
input:
path fastp_summary
output:
path "*"
script:
"""
multiqc .
"""
}
workflow{
read_ch=Channel
.fromPath('sample.csv')
.splitCsv(header:true)
.map {
row -> tuple(row.sample_id, file(row.read1), file(row.read2))
}
FASTP(read_ch)
MULTIQC(FASTP.out.fastp_summary.collect())
}
実行:
nextflow run main.nf
出力:
WARN: Output `tuple` must define at least two elements -- Check process `FASTP`
executor > local (5)
[83/b7f097] process > FASTP (SRR18585215) [100%] 4 of 4 ✔
[48/83d016] process > MULTIQC [100%] 1 of 1 ✔
Completed at: 06-May-2025 19:33:40
Duration : 1m 32s
CPU hours : 0.6
Succeeded : 5
確認:
tree Results
Results/
├── fastp
│ ├── fastq
│ │ ├── SRR18585214_R1.fastq.gz
│ │ ├── SRR18585214_R2.fastq.gz
│ ├── [省略]
│ └── summary
│ ├── SRR18585214_report.html
│ ├── SRR18585214_report.json
│ ├── [省略]
└── Multiqc
├── multiqc_data
│ ├── fastp_filtered_reads_plot.txt
│ ├── [省略]
└── multiqc_report.html
これでQC 済みの Fastq ファイル
と、QC 結果をまとめた multiqc_report.html
を取得することができました。
感想
Nextflow については、最初はワークフロー内でのデータの受け渡しが「出力ファイル」ではなく「プロセス間のデータフロー」で完結する点が理解しづらく苦労しました。
今後は nf-core が提供する既存のワークフローを利用するだけでなく、自身の解析にも積極的にNextflowを活用していきたいですね。
出典・参考
- nextflow
- Introduction to Bioinformatics workflows with Nextflow and nf-core
Discussion