This lesson is in the early stages of development (Alpha version)

Developing parallel workflows

Overview

Teaching: 15 min
Exercises: 10 min
Questions
  • How to scale up and run thousands of jobs?

  • What is a DAG?

  • What is a Scatter-Gather paradigm?

  • How to run Yadage workflows on REANA?

Objectives
  • Learn about Directed Acyclic Graphs (DAG)

  • Understand Yadage workflow language

  • Practice running and inspecting parallel workflows

Overview

We now know how to develop reproducible analyses on small scale using serial workflows.

In this lesson we shall learn how to scale-up for real life work, which requires using paraller workflows.

Workflows as Directed Acyclic Graphs (DAG)

The computational analyses can be expressed as a set of steps where some steps depends on other steps before they can begin their computations. In other words, the computational steps as expressed as Directed Acyclic Graphs, for example:

The REANA platform supports several DAG workflow specification languages:

In this lesson we shall use the Yadage workflow specification language.

Yadage

Yadage enables to describe complex computational workflows. Let us start having a look at the Yadag e specification for the RooFit example we have used in the beginning episodes:

stages:
  - name: gendata
    dependencies: [init]
    scheduler:
      scheduler_type: 'singlestep-stage'
      parameters:
        events: {step: init, output: events}
        gendata: {step: init, output: gendata}
        outfilename: '{workdir}/data.root'
      step:
        process:
          process_type: 'interpolated-script-cmd'
          script: root -b -q '{gendata}({events},"{outfilename}")'
        publisher:
          publisher_type: 'frompar-pub'
          outputmap:
            data: outfilename
        environment:
          environment_type: 'docker-encapsulated'
          image: 'reanahub/reana-env-root6'
          imagetag: '6.18.04'
  - name: fitdata
    dependencies: [gendata]
    scheduler:
      scheduler_type: 'singlestep-stage'
      parameters:
        fitdata: {step: init, output: fitdata}
        data: {step: gendata, output: data}
        outfile: '{workdir}/plot.png'
      step:
        process:
          process_type: 'interpolated-script-cmd'
          script: root -b -q '{fitdata}("{data}","{outfile}")'
        publisher:
          publisher_type: 'frompar-pub'
          outputmap:
            plot: outfile
        environment:
          environment_type: 'docker-encapsulated'
          image: 'reanahub/reana-env-root6'
          imagetag: '6.18.04'

We can see that the workflow consists of two steps, gendata does not depending on anything ([init]) and fitdata depending on gendata. This is how linear workflows are expressed in the Yadage language.

Running Yadage workflows

Let us write the above workflow as workflow.yaml in the RooFit example directory.

How can we run the example on REANA platform? We have to instruct REANA that we are going to use Yadage as our workflow engine. We can do that by editing reana.yaml and specifying:

version: 0.6.0
inputs:
  parameters:
    events: 20000
    gendata: code/gendata.C
    fitdata: code/fitdata.C
workflow:
  type: yadage
  file: workflow.yaml
outputs:
  files:
    - fitdata/plot.png

We can run the example on REANA in the usual way:

$ reana-client run -w roofityadage -f reana-yadage.yaml

Exercise

Run RooFit example using Yadage workflow engine on the REANA cloud. Upload code, run workflow, inspect status, check logs, download final plot.

Solution

Nothing changes in the usual user interaction with the REANA platform:

$ reana-client create -w roofityadage -f ./reana-yadage.yaml
$ reana-client upload ./code -w roofityadage
$ reana-client start -w roofityadage
$ reana-client status -w roofityadage
$ reana-client logs -w roofityadage
$ reana-client ls -w roofityadage
$ reana-client download plot.png -w roofityadage

Physics code vs orchestration code

Note that it wasn’t necessary to change anything in our code: we simply modified the workflow definition and we could run the RooFit code “as is” using another workflow engine. This is a simple demonstration of the separation of concerns between “physics code” and “orchestration code”.

Parallelism via step dependencies

We have seen how serial workflows can be also expressed in Yadage syntax using step dependencies. Note that if dependency graph would have permitted, the workflow steps not depending on each other or on the results of previous computations could have been executed in parallel by the workflow engine – the physicist only has to supply knoweldge about which steps depend on which other steps and the workflow engine takes care about efficiently starting and scheduling tasks.

HiggsToTauTau analysis: simple version

Let us demonstrate how to write simple Yaage workflow for the HiggsToTauTau analysis using simple step dependencies.

The workflow looks like:

stages:
- name: skim
  dependencies: [init]
  scheduler:
    scheduler_type: singlestep-stage
    parameters:
      input_dir: {step: init, output: input_dir}
      output_dir: '{workdir}/output'
    step: {$ref: 'steps.yaml#/skim'}

- name: histogram
  dependencies: [skim]
  scheduler:
    scheduler_type: singlestep-stage
    parameters:
      input_dir: {step: skim, output: skimmed_dir}
      output_dir: '{workdir}/output'
    step: {$ref: 'steps.yaml#/histogram'}

- name: fit
  dependencies: [histogram]
  scheduler:
    scheduler_type: singlestep-stage
    parameters:
      histogram_file: {step: histogram, output: histogram_file}
      output_dir: '{workdir}/output'
    step: {$ref: 'steps.yaml#/fit'}

- name: plot
  dependencies: [histogram]
  scheduler:
    scheduler_type: singlestep-stage
    parameters:
      histogram_file: {step: histogram, output: histogram_file}
      output_dir: '{workdir}/output'
    step: {$ref: 'steps.yaml#/plot'}

where steps are expressed as:

skim:
  process:
    process_type: 'interpolated-script-cmd'
    script: |
      mkdir {output_dir}
      bash skim.sh {input_dir} {output_dir}
  environment:
    environment_type: 'docker-encapsulated'
    image: gitlab-registry.cern.ch/awesome-workshop/awesome-analysis-eventselection-stage3
    imagetag: master
  publisher:
    publisher_type: interpolated-pub
    publish:
      skimmed_dir: '{output_dir}'

histogram:
  process:
    process_type: 'interpolated-script-cmd'
    script: |
      mkdir {output_dir}
      bash histograms_with_custom_output_location.sh {input_dir} {output_dir}
  environment:
    environment_type: 'docker-encapsulated'
    image: gitlab-registry.cern.ch/awesome-workshop/awesome-analysis-eventselection-stage3
    imagetag: master
  publisher:
    publisher_type: interpolated-pub
    publish:
      histogram_file: '{output_dir}/histograms.root'

plot:
  process:
    process_type: 'interpolated-script-cmd'
    script: |
      mkdir {output_dir}
      bash plot.sh {histogram_file} {output_dir}
  environment:
    environment_type: 'docker-encapsulated'
    image: gitlab-registry.cern.ch/awesome-workshop/awesome-analysis-eventselection-stage3
    imagetag: master
  publisher:
    publisher_type: interpolated-pub
    publish:
      datamc_plots: '{output_dir}'

fit:
  process:
    process_type: 'interpolated-script-cmd'
    script: |
      mkdir {output_dir}
      bash fit.sh {histogram_file} {output_dir}
  environment:
    environment_type: 'docker-encapsulated'
    image: gitlab-registry.cern.ch/awesome-workshop/awesome-analysis-statistics-stage3
    imagetag: master
  publisher:
    publisher_type: interpolated-pub
    publish:
      fitting_plot: '{output_dir}/fit.png'

The workflow definition is similar to that of the Serial workflow, and, as we can see, it can already lead to certain parallelism, because the fitting step and the plotting step can run simultaneously once the histograms are produced.

The graphical representation of the workflow is:

Let us try to run it on REANA cloud.

Exercise

Run HiggsToTauTau analysis example using simple Yadage workflow version. Take the workflow definition listed above, write corresponding reana.yaml, and run the example on REANA cloud.

Solution

$ vim workflow.yaml # take contents above and store it as workflow.yaml
$ vim steps.yaml    # take contents above and store it as steps.yaml
$ vim reana.yaml   # this was the task
$ cat reana.yaml
version: 0.6.0
inputs:
  parameters:
    input_dir: root://eospublic.cern.ch//eos/root-eos/HiggsTauTauReduced
workflow:
  type: yadage
  file: workflow.yaml
outputs:
  files:
    - fit/output/fit.png

Parallelism via scatter-gather paradigm

A useful paradigm of workflow languages is a “scatter-gather” behaviour where we instruct the workflow engine to run a certain step over a certain input array in parallel as if each element of the input were a single item input (the “scatter” operation). The partial results processed in parallel are then assembled together (the “gather” operation). The “scatter-gather” paradigm allows to express “map-reduce” operations with a minimal of syntax without having to duplicate workflow code or statements.

Here is an example of scatter-gather paradim in the Yadage language:

expressed as:

stages:
  - name: map
    dependencies: [init]
    scheduler:
      scheduler_type: multistep-stage
      parameters:
        input: {stages: init, output: input, unwrap: true}
      batchsize: 3
      scatter:
        method: zip
        parameters: [input]
  - name: map2
    dependencies: [map]
    scheduler:
      scheduler_type: multistep-stage
      parameters:
        input: {stages: map, output: outputA, unwrap: true}
      batchsize: 2
      scatter: ...
  - name: reduce
    dependencies: [map2]
    scheduler:
      scheduler_type: singlestep-stage
      parameters:
        input: {stages: 'map2', output: outputA}

Note the “scatter” happening over “input” with a wanted batch size.

In the next episode we shall see how the scatter paradigm can be used to speed up the HiggsToTauTau workflow using more parallelism.

Key Points

  • Computational analysis is a graph of inter-dependent steps

  • Fully declare inputs and outputs for each step

  • Use Scatter/Gather or Map/Reduce to avoid copy-paste coding