基于Nextflow的宏基因组有参分析-IV Kraken2+Bracken

Shell脚本

和fastp+decont的步骤一样,我们可以首先将流程写成一个shell脚本:

#!/bin/bash
set -e -o pipefail
source activate metagenomics

fq1=$1
fq2=$2
prefix=$3
KRAKEN_DB=/data/minikraken2_v2_8GB 

### run it twice...
kraken2 \
    --db $KRAKEN_DB \
    --paired \
    --threads 8 \
    --output ${prefix}.out \
    --report ${prefix}.kraken2.tsv \
    $fq1 $fq2 \
    --use-mpa-style

### run again for bracken
kraken2 \
    --db $KRAKEN_DB \
    --paired \
    --threads 8 \
    --report ${prefix}.kraken2 \
    $fq1 $fq2 > /dev/null


for tax in s g;
do
bracken -d ${KRAKEN_DB} \
    -i ${prefix}.kraken2 \
    -o ${prefix}.bracken.${tax} \
    -l ${tax^^}

sed 's/ /_/g' ${prefix}.bracken.${tax} | \
    tail -n+2 | \
    cut -f 1,7 > ${prefix}.${tax}
done

在上面的脚本里面,为了获取mpa(MetaPhlAn)格式的输出,我运行了两次Kraken2,第一次生成mpa格式的分类谱,第二次生成Bracken所需要的report格式。Kraken2的参考数据库(含Bracken参考)可从官方FTP下载。

Nextflow模块

将上面的脚本写成一个nextflow模块kraken2.nf:

params.outdir = './'

process KRAKEN2 {
    tag "${prefix}"
    cpus 8
    publishDir params.outdir, mode: 'copy'

    input:
    file index_path
    tuple prefix, file(reads1), file(reads2)

    output:
    tuple prefix, file("${prefix}.kraken2.report")
    tuple prefix, file("${prefix}.kraken2.tax")
    file "${prefix}.kraken2.out"

    script:
    """
    kraken2 \\
    --db $index_path \\
    --paired \\
    --threads $task.cpus \\
    --output ${prefix}.kraken2.out \\
    --report ${prefix}.kraken2.tax \\
    $reads1 $reads2 \\
    --use-mpa-style \\

    ### run again for bracken
    kraken2 \\
    --db $index_path \\
    --paired \\
    --threads $task.cpu \\
    --report ${prefix}.kraken2.report \\
    $reads1 $reads2 \\
    --output -
    """
}

process BRACKEN {
    tag "${prefix}"
    publishDir params.outdir, mode: 'copy'

    input:
    file index_path  // This must have the bracken database
    tuple prefix, file(kraken2_report)
    each tax

    output:
    file "${prefix}*.tsv"

    script:
    """
    TAX=$tax; \\

    bracken -d $index_path \\
    -i $kraken2_report \\
    -o ${prefix}.bracken.${tax} \\
    -l \${TAX^^}; \\

    sed 's/ /_/g' ${prefix}.bracken.${tax} | \\
    tail -n+2 | \\
    cut -f 1,7 > ${prefix}.bracken.${tax}.tsv
    """
}

其中each相当于上面脚本的for循环,对于不同的分类s(species),g(genus)分别运行bracken。

主脚本中调用模块

接上一篇中的主脚本(main.nf),我们可以直接使用DECONT过程的结果作为Kraken2+Bracken的输入:

#!/usr/bin/env nextflow

// DSL 2 syntax
nextflow.preview.dsl=2

// parameters
params.help = false
params.read_path  = "${workflow.projectDir}/data"

// parameters decont
params.decont_refpath = '/data/nucleotide/'
params.decont_index   = 'hg19.fa'
params.decont_outdir  = './pipeline_output/decont_out'
ch_bwa_idx = file(params.decont_refpath)
// parameters kraken2                                              // ***
params.kraken2_refpath = '/data/minikraken2_v2_8GB_201904_UPDATE/' // ***
params.kraken2_outdir = './pipeline_output/kraken2_out'            // ***
ch_kraken_idx = file(params.kraken2_refpath)                       // ***

include './decont' params(index: "$params.decont_index", outdir: "$params.decont_outdir")
include './kraken2' params(outdir: "$params.kraken2_outdir")       // ***


// help message
def helpMessage() {
    log.info"""
    =================================================================
    Usage: ${workflow.projectDir}/main.nf --read_path PATH/OF/READS
    =================================================================
    """.stripIndent()
}
if (params.help){
    helpMessage()
    exit 0
}

// Create channel for reads
ch_reads = Channel
    .fromFilePairs(params.read_path + '/**{1,2}.f*q*', flat: true)

workflow{
    DECONT(ch_bwa_idx, ch_reads)
    KRAKEN2(ch_kraken_idx, DECONT.out[0])                          // ***
    BRACKEN(ch_kraken_idx, KRAKEN2.out[0], Channel.from('s', 'g')) // ***
}

加入// ***的部分为添加的内容

逐行解析

配置参数

// parameters kraken2                                              // ***
params.kraken2_refpath = '/data/minikraken2_v2_8GB_201904_UPDATE/' // ***
params.kraken2_outdir = './pipeline_output/kraken2_out'            // ***
ch_kraken_idx = file(params.kraken2_refpath)                       // ***

此处于DECONT过程的配置类似,这里假设Bracken的索引和Kraken2的索引在同一个目录下。

引用模块

include './kraken2' params(outdir: "$params.kraken2_outdir")      // ***

此处与DECONT过程的配置类似

流程定义

workflow{
    DECONT(ch_bwa_idx, ch_reads)
    KRAKEN2(ch_kraken_idx, DECONT.out[0])                          // ***
    BRACKEN(ch_kraken_idx, KRAKEN2.out[0], Channel.from('s', 'g')) // ***
}

在上一篇博文,DECONT的输出被定义为两个tuple:

output:
    tuple prefix, file("${prefix}*1.fastq.gz"), file("${prefix}*2.fastq.gz")
    tuple file("${prefix}.html"), file("${prefix}.json")

第一个tuple为去宿主DNA之后的fastq文件,第二个为fastp的质控文件,运行KRAKEN过程的时候,我们以DECONT.out[0]来使用DECONT过程结果的第一个tuple。同样的,在BRACKEN过程调用时,我们以KRAKEN2.out[0]来使用KRAKEN2过程结果的第一个tuple

执行

$ ls data
$ ./main.nf
N E X T F L O W  ~  version 19.09.0-edge
Launching `./main.nf` [jolly_heyrovsky] - revision: b913fda572
WARN: DSL 2 IS AN EXPERIMENTAL FEATURE UNDER DEVELOPMENT -- SYNTAX MAY CHANGE IN FUTURE RELEASE
executor >  local (8)
[6f/7c66ef] process > DECONT (SRR1950773)  [100%] 2 of 2 ✔
[51/9f4f92] process > KRAKEN2 (SRR1950773) [100%] 2 of 2 ✔
[c7/603e5e] process > BRACKEN (SRR1950772) [100%] 4 of 4 ✔
Completed at: 17-Oct-2019 12:57:14
Duration    : 3m 50s
CPU hours   : 0.5
Succeeded   : 8

查看结果:

$ ls pipeline_output/kraken2_out/
SRR1950772.bracken.g.tsv  SRR1950772.kraken2.out     SRR1950772.kraken2.tax    SRR1950773.bracken.s.tsv  SRR1950773.kraken2.report
SRR1950772.bracken.s.tsv  SRR1950772.kraken2.report  SRR1950773.bracken.g.tsv  SRR1950773.kraken2.out    SRR1950773.kraken2.tax

参考: https://www.nextflow.io/docs/latest/

(未完)

Related

comments powered by Disqus