Manage High Content Screening CellProfiler Pipelines with Apache Airflow

If you are running a High Content Screening Pipeline you probably have a lot of moving pieces. As a non exhaustive list you need to:

  • Trigger CellProfiler Analyses, either from a LIMS system, by watching a filesystem, or some other process.
  • Keep track of dependencies of CellProfiler Analyses - first run an illumination correction and then your analysis.
  • If you have a large dataset and you want to get it analyzed sometime this century you need to split your analysis, run, and then gather the results.
  • Once you have results you need to decide on a method of organization. You need to put your data in a database and set up in depth analysis pipelines.

These tasks are much easier to accomplish when you have a system or framework that is built for scientific workflows.

If you prefer to watch I have a video where I go through all the steps in this tutorial.

Enter Apache Airflow

Apache Airflow is :

Airflow is a platform created by the community to programmatically author, schedule and monitor workflows.

There are a ton of great introductory resources out there on Apache Airflow, but I will very briefly go over it here.

Apache Airflow gives you a framework to organize your analyses into DAGs, or Directed Acyclic Graphs. If you aren't familiar with this term it's really just a way of saying Step3 depends upon Step2 which depends upon Step1, or Step1 -> Step2 -> Step3.

Apache Airflow uses DAGs, which are the bucket you throw you analysis in. Your DAG is comprised of Operators and Sensors. Operators are an abstraction on the kind of task you are completing. These will often be Bash, Python, SSH, but can also be even cooler things like Docker, Kubernetes, AWS Batch, AWS ECS, Database Operations, file pushers, and more. Then there are also Sensors, which are nice and shiny ways of waiting for various operations, whether that is waiting on a file to appear, a record in a database to appear, or another task to complete.

Out of the box you get lots of niceness, including a nice web interface with a visual browser of your tasks, a scheduler, configurable parallelism, logging, watchers and any number of executors. As all of your configuration is written in code it is also extremely flexible. It can integrate with existing systems or stand on it's own.

There are any number of scientific workflow managers out there, and by the time I finish this article a few more will have popped into existence. Apache Airflow is my favorite, but you should shop around to see what clicks with you!

Computational Backends

I briefly touched on this earlier, but one of the perks that initially drew me to Apache Airflow is just how completely agnostic it is to your compute environment. You could have a laptop, a single server, a HPC cluster, or execute on the AWS or GCP. Airflow itself does not care. All you need to do is to map out your logic, make sure the data is available, and use whichever operator is appropriate.

Example CellProfiler Analysis Workflow

In this post I'm going to discuss the BBBC021 dataset and how I would organize and batch the analysis.

I decided to go for a simple setup, which is to use Apache Airflow with docker-compose and use the Docker operator to execute the CellProfiler analysis. Once you have your logic and workflow mapped out you could use any operator for any compute infrastructure, whether that is AWS ECS, or an HPC. My favorite lately has been Kubernetes, because it is not tied to any platform and can be used on AWS, GCP or in house. You can use Kubernetes to deploy your data visualization applications such as RShiny, Dash or Redash, and if you are using networked storage or S3 all your applications can access the same data!

Project setup

Let's setup our project directory structure!

mkdir CellProfiler-Apache-Airflow
cd CellProfiler-Apache-Airflow
mkdir -p data/dags
mkdir -p data/BBBC021/Week1
cd data/BBBC021/Week1
wget https://data.broadinstitute.org/bbbc/BBBC021/BBBC021_v1_images_Week1_22123.zip
find $(pwd) -name "*zip" | xargs -I {} unzip {}
# Clean up the zips, we don't need them anymore
find $(pwd) -name "*zip" | xargs -I {} rm -rf {} 
cd ../
# Run $(pwd) to check where you are. You should be in /project/BBBC021
wget https://data.broadinstitute.org/bbbc/BBBC021/BBBC021_v1_image.csv
wget https://data.broadinstitute.org/bbbc/BBBC021/BBBC021_v1_compound.csv
wget https://data.broadinstitute.org/bbbc/BBBC021/BBBC021_v1_moa.csv
wget https://data.broadinstitute.org/bbbc/BBBC021/analysis.cppipe
wget https://data.broadinstitute.org/bbbc/BBBC021/illum.cppipe

# Let's create a data file ONLY for the week1 images, the first dataset
head -n 1 BBBC021_v1_image.csv > images_week1.csv
cat BBBC021_v1_image.csv | grep Week1_22123 >> images_week1.csv

This is mostly the Apache Airflow configuration from Bitnami. Bitnami is awesome and I use their configurations and images all the time. I made a few modifications to this one to bind our analysis dags, and also made a quick change so we could use the docker operator.

Dockerfile

We're going to use a custom Cellprofiler image to run our pipelines. Create a Dockerfile with this:

FROM cellprofiler/cellprofiler:3.1.9

RUN apt-get update -y; apt-get install -y unzip imagemagick

ENV TINI_VERSION v0.16.1
ADD https://github.com/krallin/tini/releases/download/${TINI_VERSION}/tini /usr/bin/tini
RUN chmod +x /usr/bin/tini

ENTRYPOINT [ "/usr/bin/tini", "--" ]
CMD [ "/bin/bash" ]

Now we'll build out new CellProfiler image!

docker build -t cellprofiler .

Grab the Docker Compose Configuration

Grab this file and save in your project root as docker-compose.yml.

version: '2'

services:
  postgresql:
    image: 'bitnami/postgresql:10'
      - 'postgresql_data:/bitnami/postgresql'
    environment:
      - POSTGRESQL_DATABASE=bitnami_airflow
      - POSTGRESQL_USERNAME=bn_airflow
      - POSTGRESQL_PASSWORD=bitnami1
      - ALLOW_EMPTY_PASSWORD=yes
  redis:
    image: bitnami/redis:5.0
    volumes:
      - 'redis_data:/bitnami'
    environment:
      - ALLOW_EMPTY_PASSWORD=yes
  airflow-scheduler:
    image: bitnami/airflow-scheduler:1
    environment:
      - AIRFLOW_LOAD_EXAMPLES=no
      - AIRFLOW_DATABASE_NAME=bitnami_airflow
      - AIRFLOW_DATABASE_USERNAME=bn_airflow
      - AIRFLOW_DATABASE_PASSWORD=bitnami1
      - AIRFLOW_EXECUTOR=CeleryExecutor
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
      - ./data:/data
      - ./dags:/opt/bitnami/airflow/dags
      - airflow_scheduler_data:/bitnami
  airflow-worker:
    image: bitnami/airflow-worker:1
    environment:
      - AIRFLOW_LOAD_EXAMPLES=no
      - AIRFLOW_DATABASE_NAME=bitnami_airflow
      - AIRFLOW_DATABASE_USERNAME=bn_airflow
      - AIRFLOW_DATABASE_PASSWORD=bitnami1
      - AIRFLOW_EXECUTOR=CeleryExecutor
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
      - ./data:/data
      - ./dags:/opt/bitnami/airflow/dags
      - airflow_worker_data:/bitnami
  airflow:
    image: bitnami/airflow:1
    environment:
      - AIRFLOW_LOAD_EXAMPLES=no
      - AIRFLOW_DATABASE_NAME=bitnami_airflow
      - AIRFLOW_DATABASE_USERNAME=bn_airflow
      - AIRFLOW_DATABASE_PASSWORD=bitnami1
      - AIRFLOW_EXECUTOR=CeleryExecutor
    ports:
      - '8080:8080'
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
      - ./data:/data
      - ./dags:/opt/bitnami/airflow/dags
      - airflow_data:/bitnami
volumes:
  airflow_scheduler_data:
    driver: local
  airflow_worker_data:
    driver: local
  airflow_data:
    driver: local
  postgresql_data:
    driver: local
  redis_data:
    driver: local

If you're not used to containers this can get a little tricky, but one thing to note is that paths on the host are not necessarily the same as paths in the docker container, for example, our data directory could be anywhere on our host system, but is bound as /data on our container.

Put the docker-compose.yml file in your project directory and bring it up with docker-compose up. It may take some time to initialize. This is when I go make tea. ;-)

Once it's up you'll be able to access your Airflow instance at localhost:8080 with the default configuration.

AIRFLOW_USERNAME: Airflow application username. Default: user
AIRFLOW_PASSWORD: Airflow application password. Default: bitnami

There won't be anything interesting here yet, because we don't have our analysis in place.

Grab the CellProfiler Analysis DAGs

First grab the illum dag. Place it in your dags folder. It can be named anything, what Airflow references is the dag_id, but I'll reference it as cellprofiler-illum-dag.py.

# dags/cellprofiler-illum-dag.py
from airflow import DAG
from datetime import datetime, timedelta
import string
import random
from airflow.utils import timezone
from airflow.operators.dagrun_operator import TriggerDagRunOperator
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
# Depending on which version of airflow you are on you will use either operators.docker_operator or providers.docker
from airflow.operators.docker_operator import DockerOperator
# from airflow.providers.docker.operators.docker import DockerOperator
from airflow.api.common.experimental.trigger_dag import trigger_dag
from airflow.sensors.external_task_sensor import ExternalTaskSensor
from airflow.api.common.experimental import check_and_get_dag, check_and_get_dagrun
import time
import os
from pprint import pprint
from airflow.utils.state import State

this_env = os.environ.copy()

this_dir = os.path.dirname(os.path.realpath(__file__))

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2019, 1, 1),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG('cellprofiler_illumination', default_args=default_args, schedule_interval=None)

EXAMPLE_TRIGGER = """
{
    "illum_pipeline" : "/data/BBBC021/illum.cppipe",
    "analysis_pipeline" : "/data/BBBC021/analysis.cppipe",
    "pipeline" : "/data/BBBC021/illum.cppipe",
    "output": "/data/BBBC021/Week1/Week1_22123",
    "input": "/data/BBBC021/Week1/Week1_22123",
    "data_file": "/data/BBBC021/images_week1.csv"
}
"""

# Volumes are from the HOST MACHINE
illum = DockerOperator(
    dag=dag,
    task_id='illum',
    retries=1,
    volumes=[
        # UPDATE THIS to your path! 
        '/path-to-project-on-HOST/data:/data'
    ],
    working_dir='/data/BBBC021',
    tty=True,
    image='cellprofiler',
    command=[
        "bash", "-c",
        """cellprofiler --run --run-headless \
            -p {{ dag_run.conf['illum_pipeline'] }}  \
            -o {{ dag_run.conf['output'] }}  \
            -i {{ dag_run.conf['input'] }}  \
            -data-file {{ dag_run.conf['data_file'] }} \
            -c -r"""
    ]
)


def get_number_of_tasks(data_file):
    """
    Parse the file to get the number of lines
    The number of lines, minus 1 for the header
    is the number of groups
    :param data_file:
    :return:
    """
    file = open(data_file, "r")
    number_of_lines = 0
    for line in file:
        number_of_lines += 1
    file.close()
    return number_of_lines - 1


def watch_task(triggers):
    """
    This is only here for demonstration purposes
    to show how you could dynamically watch the cellprofiler analysis DAG
    :param triggers:
    :return:
    """
    print('-------------------------------------------')
    print('Checking up on our dag...')
    check_dag = check_and_get_dag(dag_id='cellprofiler_analysis')
    dag_run = check_and_get_dagrun(check_dag, triggers[0].execution_date)
    state = dag_run.get_state()
    finished = State.finished()
    unfinished = State.unfinished()

    while state in unfinished:
        time.sleep(10)
        state = dag_run.get_state()

    print('-------------------------------------------')
    print('Dag run finished or dead')
    pprint(dag_run.get_state())


def trigger_analysis(ds, **kwargs):
    """
    Trigger the cellprofiler analysis DAG
    We want one DAG run per row in the datafile, or -f / -l combo
    :param ds:
    :param kwargs:
    :return:
    """
    print('-------------------------------------------')
    print("Here's the conf!")
    pprint(kwargs['dag_run'].conf)
    output = kwargs['dag_run'].conf['output']
    data_file = kwargs['dag_run'].conf['data_file']
    no_tasks = get_number_of_tasks(str(data_file))
    triggers = []
    print('-------------------------------------------')
    print('Triggering our dag...')
    for index, value in enumerate(range(1, no_tasks + 1)):
        trigger = trigger_dag(
            dag_id="cellprofiler_analysis",
            replace_microseconds=False,
            run_id="trig__{}__f_{}__l_{}".format(
                timezone.utcnow().isoformat(),
                value,
                value
            ),
            conf={
                "pipeline": kwargs['dag_run'].conf['analysis_pipeline'],
                "output": "{}/f-{}__l-{}".format(output, value, value),
                "input": kwargs['dag_run'].conf['input'],
                "data_file": data_file,
                "first": value,
                "last": value,
            }
        )
        triggers.append(trigger)


trigger_analysis_task = PythonOperator(
    dag=dag,
    task_id='trigger_analysis',
    provide_context=True,
    python_callable=trigger_analysis
)

trigger_analysis_task.set_upstream(illum)

And now our cellprofiler-analysis-dag.py.

# dags/cellprofiler-analysis-dag.py
from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.python_operator import PythonOperator
from airflow.operators.docker_operator import DockerOperator
import os
from pprint import pprint

this_env = os.environ.copy()

this_dir = os.path.dirname(os.path.realpath(__file__))

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2019, 1, 1),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG('cellprofiler_analysis', default_args=default_args, schedule_interval=None)

analysis = DockerOperator(
    dag=dag,
    task_id='analysis',
    retries=1,
    volumes=[
        # Volumes are from the HOST MACHINE
        # UPDATE THIS to your path! 
        '/path-on-HOST/data:/data'
    ],
    tty=True,
    image='cellprofiler',
    command=[
        "bash", "-c",
        """cellprofiler --run --run-headless \
            -p {{ dag_run.conf['pipeline'] }}  \
            -o {{ dag_run.conf['output'] }}  \
            -i {{ dag_run.conf['input'] }} \
            --data-file {{ dag_run.conf['data_file'] }} \
            -c -r -f {{ dag_run.conf['first'] }} -l {{ dag_run.conf['last'] }}"""
    ]
)


def gather_results(ds, **kwargs):
    """
    Once we have the Cellprofiler results let's do something with them!
    :param ds:
    :param kwargs:
    :return:
    """
    pass


gather_results_task = PythonOperator(
    dag=dag,
    task_id='gather_results_task',
    provide_context=True,
    python_callable=gather_results
)

gather_results_task.set_upstream(analysis)

Make sure that you update the DockerOperator volumes to match your local filesystem! Otherwise your analysis will not work!

Analysis Organizational Overview

What we have here are 2 separate DAGs, one for each CellProfiler Analysis.

The steps are:

  • Process Illumination pipeline for ALL images
  • Grab the datafile to see how many images we have
  • Dynamically generate one CellProfiler submission per image.
  • (Placeholder) Do something with our results!

You will notice that we are dynamically splitting the CellProfiler analysis. This will get our analysis done faster, which is increasingly important when you buy fancy robotic microscopes that generate oodles of data. Airflow takes care of the job queueing under the hood, so all we need to do is to figure out the logic of how we split the analysis. If you'd like to know more about how parallelism is handled in Airflow this is a great article.

Ok, that last one is just a place holder. I would imagine that you would want to do something with your results once you have them, such as put them in a database, fire off one or more analyses based on certain criteria, or do some post processing, but for now this is blank so you can wonder about the possibilities.

Passing arguments to our Analysis

This was not obvious to me when I started using Airflow, and now you get to hear all about it! ;-)

You pass arguments to a Airflow using the conf object or argument. Depending on the operator type this might look slightly different. If you are using the Python operator you access it as a dict, kwargs['dag_run'].conf, and if you are using Bash, or in this case Docker you access it as a templated variable {{ dag_run.conf['variable'] }}.

I will take you through how you use the Airflow web interface to trigger your DAG and pass in variables, but you can also trigger your DAGs using a REST API, through the Airflow CLI, or programatically using Python code. No matter how you trigger your DAG, you pass the configuration variables in as a JSON string.

A note on Docker Volumes

This is a little weird because we are using docker-compose to run Airflow, and then using the Docker Operator. Just keep in mind that when you use the Docker Operator you map your volumes using the paths on your HOST machine, not in the docker-compose containers. Your host machine is the machine you ran docker-compose up on.

For example:

analysis = DockerOperator(
    ...
    volumes=[
        # Volumes are from the HOST MACHINE
        '/path-on-HOST/data:/data'
        # NOT
        # '/data:/data'
        # Even though in our docker-compose instance the volume is bound as /data
    ],
    ...
)

Analyze!

We're ready! Now that you have your dags (you need to have your dags) all setup we can start to analyze data!

Login at localhost:8080 with user user and password bitnami. You should see a screen that looks like this:

CellProfiler DAG Overview
CellProfiler DAG Overview

By default your dags will be off. Make sure to turn them on before proceeding to the next step!

Trigger your Analysis

Head to your main page at localhost:8080 and trigger the cellprofiler-illum DAG.

CellProfiler Trigger Illumination Dag
CellProfiler Trigger Illumination Dag

You will be prompted to add in some JSON configuration variables.

{
    "illum_pipeline" : "/data/BBBC021/illum.cppipe",
    "analysis_pipeline" : "/data/BBBC021/analysis.cppipe",
    "pipeline" : "/data/BBBC021/illum.cppipe",
    "output": "/data/BBBC021/Week1/Week1_22123",
    "input": "/data/BBBC021/Week1/Week1_22123",
    "data_file": "/data/BBBC021/images_week1.csv"
}
CellProfiler Illumination DAG JSON Configuration
CellProfiler Illumination DAG JSON Configuration

You'll be brought back to the main page and should see that your CellProfiler illumination analysis is running!

CellProfiler DAG Runs
CellProfiler DAG Runs

Here's a quick (~1 minute) video showing how you can navigate the Airflow interface to investigate your analysis.

Wrap Up

That's it! Let Airflow run and you will have your Illumination and Analysis pipelines running, all split nicely and something that isn't you babysitting keeping track of the job queue, logging, and success/failure rate. You can also integrate Airflow with any other system, such as a LIMS, reporting database, or secondary analysis workflow by using the REST API, the CLI, or code.

If you'd like to know more check out my website, or reach out directly at jillian@dabbleofdevops.com.

Acknowledgments

Special thanks to the Broad BioImage Repository for hosting the dataset used, along with Dr. Anne Carpenter, Beth Cimini, and Becki Ledford for extremely valuable feedback and editing!

Citations

https://data.broadinstitute.org/bbbc/BBBC021/

"We used image set BBBC021v1 [Caie et al., Molecular Cancer Therapeutics, 2010], available from the Broad Bioimage Benchmark Collection [Ljosa et al., Nature Methods, 2012]."

Work Together

Like what you see here? Check out my services page or email me directly at jillian@dabbleofdevops.com to see how we can work together.

Close

50% Complete

DevOps for Data Scientists Weekly Tutorials

Subscribe to the newsletter! You'll get a weekly tutorial on all the DevOps you need to know as a Data Scientist. Build Python Apps with Docker, Design and Deploy complex analyses with Apache Airflow, build computer vision platforms, and more.