PEX — The secret sauce for the perfect PySpark deployment of AWS EMR workloads

With ephemeral EMR we have to bootstrap our cluster with our application requirements before every single run of a PySpark application. This should be a simple pip install of our PySpark application package. But the reality couldn’t be further from simple.

  1. We need to install and update packages on EMR because the default installations lack behind many versions.
  2. We should create a virtual Python environment for our application as global Python packages can interfere in weird ways. We all have been there.
  3. We need to install our PySpark application and requirements with pip
  4. We need to copy assets from S3, e.g. the main.py script for spark-submit
  5. Finally, we call spark-submit with our application-specific Spark configurations to finally run our workload

This approach to EMR has a few pain points:

  1. The bootstrap process is a lot of paid for cluster-time while we do not actually run any real workload. This will add up in particular for big clusters!
  2. We have to maintain and download assets which are not part of our Python application package, e.g. the spark-submit main.py script
  3. We have to maintain application-specific Spark configurations for our jobs independently of our Python application package which we pass as arguments to the spark-submit script. These usually end up scattered across Jenkins pipelines or Airflow DAGs adding unnecessary complexity to maintaining PySpark applications.
  4. Packaging and installing a Python application with complex dependencies is leading you straight to the notorious dependency hell.

Considering how popular Python is, the Python toolchain is far from ideal and installing packages with complex dependency chains with pip ends often in the notorious dependency hell. Unfortunately, pip lacks a powerful dependency resolver.

CC BY-NC 2.5, https://xkcd.com/1987/

In commercial data science teams there is usually a second layer of complexity with private package indexes for proprietary code. And I bet that your private package index is slow. Therefore, a common EMR bootstrap script for your PySpark application might look like this:

[OC]

Wouldn’t it be great if running a PySpark application was as simple as just calling an executable? No pip install of requirements. No main.py. No spark-submit with Spark memory configurations cluttering up Jenkins or Airflow.

With the help of PEX running a PySpark application on EMR no longer requires any bootstrapping!

  • PEX greatly simplifies running PySpark applications and
  • saves money by utilising the cluster to run our actual application workload much earlier without the need of any cluster bootstrapping.
[OC]

To create a PEX archive you use the pex utilities. You can simply install it with

pip install pex

When you do this within a new Python virtual environment, you can use pex to package itself. The following command creates a pex file containing pex and requests, using the console script named “pex”. Save the created executable to ~/bin/pex and you can use pex in or outside of any virtualenv like any other executable on your PATH.

pex pex requests -c pex -o ~/bin/pex

There is one complication with PEX: the executable contains a self contained Python virtual environment but not the Python interpreter itself: the PEX executable is platform dependent. Currently, EMR runs Python 3.6.10 on Linux and you might develop on a Mac. Therefore, it’s generally best practise to use Docker to create reproducible results.

Build a docker image compatible with EMR to create your PEX archives within a docker container:

FROM python:3.6-slim-busterRUN apt-get update && apt-get install -y --no-install-recommends 
git
&& rm -rf /var/lib/apt/lists/*
RUN pip3 install pexRUN mkdir /app
WORKDIR /app
ENV PATH=/root/.local/bin:$PATH

If your organisation uses a private package index, e.g. Artifactory, then PEX shows another weakness: at the time of writing it does not expose parameters for the requests library via the CLI which means we cannot set a custom network timeout for pip when using PEX directly to resolve package dependencies. A workaround is to use a wheelhouse. The following script can be used to build a pex archive using a wheelhouse and a private package index with a custom timeout:

#!/usr/bin/env bashpip3 download -r requirements.txt 
--dest ./build/wheelhouse
--extra-index-url https://private.registry.dev/pypi/simple
--trusted-host private.registry.dev
--timeout 120
pex . -r requirements.txt
-o ./dist/my_application.pex
--platform manylinux2014-x86_64-cp-36-m
--no-index
-f ./build/wheelhouse
if __name__ == "__main__":pex_file = os.path.basename([path for path in sys.path if path.endswith(".pex")][0])
os.environ["PYSPARK_PYTHON"] = "./" + pex_file
spark = (
SparkSession.builder
.master("yarn")
.appName("my_spark_application")
.config("spark.submit.deployMode", "cluster")
.config("spark.yarn.dist.files", pex_file)
.config("spark.executorEnv.PEX_ROOT", "./.pex")
.config("spark.sql.shuffle.partitions", 4000)
.config("spark.executor.memory", "1G")
.enableHiveSupport()
.getOrCreate()
)

You can pass any option from your usual spark-submit to the SparkSession builder.

This allows you to execute your PySpark application inside a PEX executable for example like this:

./my_application.pex -m my_application.main

The script-runner calls our thin wrapper which pulls a PEX file from S3 and executes it with all environment variables and command line arguments we might need. The following script is a thin wrapper called pex-executor.sh you can use. Simply put it on S3 to make it available to your EMR cluster:

#!/bin/bash
# Author: Jan Teichmann
# Version: 2020-02-10
# Wrapper to execute a PEX archive via an EMR Step
# Step type: Custom JAR
# JAR location: s3://eu-west-1.elasticmapreduce/libs/script-runner/script-runner.jar
# Arguments:
# s3://.../pex-executer.sh
# s3://.../some-etl-job.pex
# HADOOP_HOME=/usr/lib/hadoop
# SPARK_HOME=/usr/lib/spark
# ./some-etl-job.pex -m package.module -fromdate=2020-04-20 -todate=2020-04-22
aws s3 cp $1 .
chmod +x $(basename -- $1);
shift;
eval "$@"

Now you can submit an EMR step, e.g. via Airflow’s EmrAddStepsOperator:

EmrAddStepsOperator(
task_id="my_application",
job_flow_id="my_emr_cluster_id",
steps=[
{
"ActionOnFailure": "CONTINUE",
"Name": "Run my_application Step",
"HadoopJarStep": {
"Args": [
"s3://.../pex-executer.sh",
"s3://.../my_application.pex",
"HADOOP_HOME=/usr/lib/hadoop",
"SPARK_HOME=/usr/lib/spark",
"./my_application.pex",
"-m",
"my_pyspark_application.main",
"-parameter1",
"value1"
],
"Jar": "s3://eu-west-1.elasticmapreduce/libs/script-runner/script-runner.jar",
},
}
],
aws_conn_id="aws_default",
dag=dag,
)

As you can see above, we first pass two s3 paths. The first to our thin wrapper called pex-executor.sh which will be executed by AWS script-runner.jar. The pex-executor script in turn will download the application pex executable.

We also define two environment variables HADOOP_HOME and SPARK_HOME which are needed for EMR. You could add any additional environment variables you might want.

We then pass the name of the executable and pass any CLI parameters for our Python application.

[OC]

This greatly simplifies the use of ephemeral EMR clusters with PySpark and saves time and saves money as we do not have to bootstrap the cluster.

Wrap the creation of your pex executables into a Jenkins pipeline and you have a powerful DevOps pattern to build packages and upload them to S3 for deployment.

You can then schedule your PySpark applications for example with Airflow. Because your PEX applications are fully self-contained you will be able to create extremely generic DAGs in Airflow without that application any application logic and configurations end up scattered across multiple platforms and repositories.

Simplicity is the ultimate sophistication