基于Nextflow的宏基因组有参分析-III 质控+去宿主DNA

Shell脚本

质控和去宿主主要由以下工具完成:

  1. fastp: 去接头,修剪低质量序列
  2. BWA + samtools:去宿主

参照这篇教程,我们可以设计如下简单的流程:

#!/bin/bash

## requirement:
## fastp, BWA, samtools >= 1.7

ref=$1
reads1=$2
reads2=$3
prefix=$4
threads=${5:-4}

fastp -i $reads1 -I $reads2 --stdout -j ${prefix}.json -h ${prefix}.html | 
    bwa mem -p -t $threads $ref - | 
    samtools fastq -f12 -F256  -1  ${prefix}_fastpdecont_1.fastq.gz -2 ${prefix}_fastpdecont_2.fastq.gz -

此处尽可能地使用管道来避免硬盘读写和不必要的数据存储。

Nextflow模块

我们可以把以上脚本写成简单的nextflow模块decont.nf:

params.index = 'hg19.fa'
params.outdir = './'

process DECONT {
    tag "${prefix}"
    cpus 8
    publishDir params.outdir, mode: 'copy'

    input:
    file index_path
    tuple prefix, file(reads1), file(reads2)

    output:
    tuple prefix, file("${prefix}*1.fastq.gz"), file("${prefix}*2.fastq.gz")
    tuple file("${prefix}.html"), file("${prefix}.json")

    script:
    """
    fastp -i $reads1 -I $reads2 --stdout -j ${prefix}.json -h ${prefix}.html | \\
    bwa mem -p -t $task.cpus ${index_path}/${params.index} - | \\
    samtools fastq -f12 -F256  -1  ${prefix}_fastpdecont_1.fastq.gz -2 ${prefix}_fastpdecont_2.fastq.gz -
    """
}

逐行解析

模块参数:

params.index = 'hg19.fa'
params.outdir = './'

和上一篇介绍的流程参数一致,定义了在该模块的参数(我推测这些参数的scope是local,即仅限于该模块,有待确定)。

过程定义:

process DECONT {
    input:
    ...
    output:
    ...
    script:
    ...
}

以上为Nextflow过程(process)的骨架结构,主要由输入(input),输出(output),脚本(script)来构成。

过程指令:

tag "${prefix}"
cpus 8
publishDir params.outdir, mode: 'copy'
  • tag:给每一个过程执行命名,方便在执行日志中查看
  • cpus:此过程运行时的CPU数量
  • publishDir:结果发布路径,运行完成后将最终的结果(由output定义)拷贝('copy')到该路径

更多的指令建议参考Nextflow的官方文档

输入、输出:

input:
    file index_path
    tuple prefix, file(reads1), file(reads2)

output:
    tuple prefix, file("${prefix}*1.fastq.gz"), file("${prefix}*2.fastq.gz")
    tuple file("${prefix}.html"), file("${prefix}.json")

由关键字input:output:(注意冒号)构成,后面接内容(全都为Channel类型)。内容可包含:

  • 值(val):可省略
  • 文件(file)
  • 列表(tuple):形式为[element1, element2, elemnet3, ...]

注:似乎在Nextflow文档中tuplelist等同。

脚本

script:
    """
    fastp -i $reads1 -I $reads2 --stdout -j ${prefix}.json -h ${prefix}.html | \\
    bwa mem -p -t $task.cpus ${index_path}/${params.index} - | \\
    samtools fastq -f12 -F256  -1  ${prefix}_fastpdecont_1.fastq.gz -2 ${prefix}_fastpdecont_2.fastq.gz -
    """

Nextflow过程的脚本部分为Shell脚本,注意Shell中变量要使用\$开头。脚本部分涉及的文件(或路径)必须是Channel。

在主脚本中调用模块

延续上一篇中的主脚本(main.nf),我们可以调用DECONT过程:

#!/usr/bin/env nextflow

// DSL 2 syntax
nextflow.preview.dsl=2

// parameters
params.help = false
params.read_path  = "${workflow.projectDir}/data"

// parameters decont                                                                       // ***
params.decont_refpath = '/data/nucleotide/'                                                // ***
params.decont_index   = 'hg19.fa'                                                          // ***
params.decont_outdir  = './pipeline_output/decont_out'                                     // ***
ch_bwa_idx = file(params.decont_refpath)                                                   // ***
include './decont' params(index: "$params.decont_index", outdir: "$params.decont_outdir")  // ***


// help message
def helpMessage() {
    log.info"""
    =================================================================
    Usage: ${workflow.projectDir}/main.nf --read_path PATH/OF/READS
    =================================================================
    """.stripIndent()
}
if (params.help){
    helpMessage()
    exit 0
}

// Create channel for reads
ch_reads = Channel
    .fromFilePairs(params.read_path + '/**{1,2}.f*q*', flat: true)

workflow{                                                                                 // ***
    DECONT(ch_bwa_idx, ch_reads)                                                          // ***
}

加入// ***的部分为添加的内容

逐行解析

配置DECONT过程参数

// parameters decont                                                                       // ***
params.decont_refpath = '/data/nucleotide/'                                                // ***
params.decont_index   = 'hg19.fa'                                                          // ***
params.decont_outdir  = './pipeline_output/decont_out'                                     // ***
ch_bwa_idx = file(params.decont_refpath)                                                   // ***

我们知道DECONT部分需要四个参数:

  1. BWA索引路径
  2. BWA索引的前缀
  3. DECONT输出路径
  4. 输入fastq文件:[prefix, r1.fastq, r2.fastq] (tuple prefix, file(reads1), file(reads2))

注意这里BWA索引路径需要以Channel的形式传入

引用模块

include './decont' params(index: "$params.decont_index", outdir: "$params.decont_outdir")  // ***

非文件或路径的参数通过params(param1:, param2:)传入。

流程定义

workflow{                                                                                 // ***
    DECONT(ch_bwa_idx, ch_reads)                                                          // ***
}

执行流程,process所需的输入文件通过上面的方式传入。

执行

$ ls data
SRR1950772_1.fastq.gz  SRR1950772_2.fastq.gz  SRR1950773_1.fastq.gz  SRR1950773_2.fastq.gz
$ ./main.nf
N E X T F L O W  ~  version 19.09.0-edge
Launching `./main.nf` [desperate_mcnulty] - revision: f74174c1c6
WARN: DSL 2 IS AN EXPERIMENTAL FEATURE UNDER DEVELOPMENT -- SYNTAX MAY CHANGE IN FUTURE RELEASE
executor >  local (2)
[f7/14223b] process > DECONT (SRR1950773) [100%] 2 of 2 ✔

查看结果:

$ ls pipeline_output/decont_out/
SRR1950772_fastpdecont_1.fastq.gz  SRR1950772.html  SRR1950773_fastpdecont_1.fastq.gz  SRR1950773.html
SRR1950772_fastpdecont_2.fastq.gz  SRR1950772.json  SRR1950773_fastpdecont_2.fastq.gz  SRR1950773.json

参考: https://www.nextflow.io/docs/latest/

(未完)

Related

comments powered by Disqus