Recently I saw that Dask, a distributed Python library, created some really handy wrappers for running Dask projects on a High-Performance Computing Cluster, HPC.
Most people who use HPC are pretty well versed in technologies like MPI, and just generally abusing multiple compute nodes all at once, but I think technologies like Dask are really going to be game-changers in the way we all work. Because really, who wants to write MPI code or vectorize?
If you've never heard of Dask and it's awesomeness before, I think the easiest way to get started is to look at their Embarrassingly Parallel Example, and don't listen to the haters who think speeding up for loops is lame. It's a superpower!
Firstly, these are all pretty much borrowed from the Dask Job Queues page. Pretty much, what you do, is you write your Python code as usual. Then, when you need to scale across nodes you leverage your HPC scheduler to get you some nodes.
In any distributed software you have somewhere that creates jobs or tasks, normally a scheduler, and something that actually executes them. Dask calls the part that executes the jobs the client, and the scheduler. The client does the fun stuff too, like saying how many cores we can use/take advantage of.
When using Dask JobQueue drop in the configuration for your scheduler instead, as shown in Dask JobQueue - How this works.
#Instead of this (from the embarrassingly parallel example from dask.distributed import Client, progress client = Client(threads_per_worker=4, n_workers=1) # Use this - from the HPC Example cluster = PBSCluster( # <-- scheduler started here cores=24, memory='100GB', shebang='#!/usr/bin/env zsh', # default is bash processes=6, local_directory='$TMPDIR', resource_spec='select=1:ncpus=24:mem=100GB', queue='regular', project='my-project', walltime='02:00:00', )
Then, when it's time for your dask task to execute it is submitted as just a regular job script, with your usual #SBATCH or whatever the PBS equivalent of that is.
This is, once again, mostly taken directly from the docs, but I had a tough time finding it amongst all the awesomeness. I removed a little, just for the sake of having a truly minimal example.
from dask_jobqueue import SLURMCluster from distributed import Client from dask import delayed cluster = SLURMCluster(memory='8g', cores=2) cluster.start_workers(2) client = Client(cluster) def step_1(data): return "Step 1 done for: %s" % data def step_2(data): return "Step 2 done for: %s" % data stage_1 = [delayed(step_1_w_single_GPU)(i) for i in range(10)] stage_2 = [delayed(step_2_w_local_IO)(s2) for s2 in stage_1] result_stage_2 = client.compute(stage_2)
And that's it! There are still some things I'm digging into, such as the client.start_workers call, and trying to see how I can get support for job arrays in Slurm going, but besides that I'm pretty happy! It's nice to see some of the newer techs still taking on HPC, since many of us are still (willingly or not!) working in a HPC environment.
In the future I'm going to write a Part 2, where I really dive into the nitty gritty details of this library.
Subscribe to the newsletter! You'll get a weekly tutorial on all the DevOps you need to know as a Data Scientist. Build Python Apps with Docker, Design and Deploy complex analyses with Apache Airflow, build computer vision platforms, and more.