Dask on HPC

Recently I saw that Dask, a distributed Python library, created some really handy wrappers for running Dask projects on a High-Performance Computing Cluster, HPC.

Most people who use HPC are pretty well versed in technologies like MPI, and just generally abusing multiple compute nodes all at once, but I think technologies like Dask are really going to be game-changers in the way we all work. Because really, who wants to write MPI code or vectorize?

If you've never heard of Dask and it's awesomeness before, I think the easiest way to get started is to look at their Embarrassingly Parallel Example, and don't listen to the haters who think speeding up for loops is lame. It's a superpower!

Onward with examples!

Client and Scheduler

Firstly, these are all pretty much borrowed from the Dask Job Queues page. Pretty much, what you do, is you write your Python code as usual. Then, when you need to scale across nodes you leverage your HPC scheduler to get you some nodes.

In any distributed software you have somewhere that creates jobs or tasks, normally a scheduler, and something that actually executes them. Dask calls the part that executes the jobs the client, and the scheduler. The client does the fun stuff too, like saying how many cores we can use/take advantage of. 

When using Dask JobQueue drop in the configuration for your scheduler instead, as shown in Dask JobQueue - How this works.

#Instead of this (from the embarrassingly parallel example
from dask.distributed import Client, progress
client = Client(threads_per_worker=4, n_workers=1)

# Use this - from the HPC Example
cluster = PBSCluster(  # <-- scheduler started here
     shebang='#!/usr/bin/env zsh',  # default is bash

Then, when it's time for your dask task to execute it is submitted as just a regular job script, with your usual #SBATCH or whatever the PBS equivalent of that is.

Minimal Example

This is, once again, mostly taken directly from the docs, but I had a tough time finding it amongst all the awesomeness.  I removed a little, just for the sake of having a truly minimal example.

from dask_jobqueue import SLURMCluster
from distributed import Client
from dask import delayed

cluster = SLURMCluster(memory='8g', cores=2)

client = Client(cluster)

def step_1(data):
    return "Step 1 done for: %s" % data

def step_2(data):
    return "Step 2 done for: %s" % data

stage_1 = [delayed(step_1_w_single_GPU)(i) for i in range(10)]
stage_2 = [delayed(step_2_w_local_IO)(s2) for s2 in stage_1]

result_stage_2 = client.compute(stage_2)

And that's it! There are still some things I'm digging into, such as the client.start_workers call, and trying to see how I can get support for job arrays in Slurm going, but besides that I'm pretty happy! It's nice to see some of the newer techs still taking on HPC, since many of us are still (willingly or not!) working in a HPC environment.

In the future I'm going to write a Part 2, where I really dive into the nitty gritty details of this library.

Happy teching!


50% Complete

DevOps for Data Scientists Weekly Tutorials

Subscribe to the newsletter! You'll get a weekly tutorial on all the DevOps you need to know as a Data Scientist. Build Python Apps with Docker, Design and Deploy complex analyses with Apache Airflow, build computer vision platforms, and more.