Dask on HPC

dask distributed computing hpc parallel computing python Sep 26, 2019

Recently I saw that Dask, a distributed Python library, created some really handy wrappers for running Dask projects on a High-Performance Computing Cluster, HPC.

Most people who use HPC are pretty well versed in technologies like MPI, and just generally abusing multiple compute nodes all at once, but I think technologies like Dask are really going to be game-changers in the way we all work. Because really, who wants to write MPI code or vectorize?

If you've never heard of Dask and it's awesomeness before, I think the easiest way to get started is to look at their Embarrassingly Parallel Example, and don't listen to the haters who think speeding up for loops is lame. It's a superpower!

Onward with examples!

Client and Scheduler

Firstly, these are all pretty much borrowed from the Dask Job Queues page. Pretty much, what you do, is you write your Python code as usual. Then, when you need to scale across nodes you leverage your HPC scheduler to get you some nodes.

In any distributed software you have somewhere that creates jobs or tasks, normally a scheduler, and something that actually executes them. Dask calls the part that executes the jobs the client, and the scheduler. The client does the fun stuff too, like saying how many cores we can use/take advantage of. 

When using Dask JobQueue drop in the configuration for your scheduler instead, as shown in Dask JobQueue - How this works.

#Instead of this (from the embarrassingly parallel example
from dask.distributed import Client, progress
client = Client(threads_per_worker=4, n_workers=1)

# Use this - from the HPC Example
cluster = PBSCluster(  # <-- scheduler started here
     cores=24,
     memory='100GB',
     shebang='#!/usr/bin/env zsh',  # default is bash
     processes=6,
     local_directory='$TMPDIR',
     resource_spec='select=1:ncpus=24:mem=100GB',
     queue='regular',
     project='my-project',
     walltime='02:00:00',
)
Python

Then, when it's time for your dask task to execute it is submitted as just a regular job script, with your usual #SBATCH or whatever the PBS equivalent of that is.

Minimal Example

This is, once again, mostly taken directly from the docs, but I had a tough time finding it amongst all the awesomeness.  I removed a little, just for the sake of having a truly minimal example.

from dask_jobqueue import SLURMCluster
from distributed import Client
from dask import delayed

cluster = SLURMCluster(memory='8g', cores=2)

cluster.start_workers(2)
client = Client(cluster)


def step_1(data):
    return "Step 1 done for: %s" % data


def step_2(data):
    return "Step 2 done for: %s" % data


stage_1 = [delayed(step_1_w_single_GPU)(i) for i in range(10)]
stage_2 = [delayed(step_2_w_local_IO)(s2) for s2 in stage_1]

result_stage_2 = client.compute(stage_2)
Python

And that's it! There are still some things I'm digging into, such as the client.start_workers call, and trying to see how I can get support for job arrays in Slurm going, but besides that I'm pretty happy! It's nice to see some of the newer techs still taking on HPC, since many of us are still (willingly or not!) working in a HPC environment.

In the future I'm going to write a Part 2, where I really dive into the nitty gritty details of this library.

Happy teching!

Bioinformatics Solutions on AWS Newsletter 

Get the first 3 chapters of my book, Bioinformatics Solutions on AWS, as well as weekly updates on the world of Bioinformatics and Cloud Computing, completely free, by filling out the form next to this text.

Bioinformatics Solutions on AWS

If you'd like to learn more about AWS and how it relates to the future of Bioinformatics, sign up here.

We won't send spam. Unsubscribe at any time.