Introduction
Apache Airflow (or simply Airflow) is a platform to programmatically author, schedule, and monitor workflows. It can be used to author workflows as Directed Acyclic Graphs (DAGs) of tasks and define an automated pipeline of tasks to execute one after the other.
Here is a quick step by step guide on how to use Airflow alongside Spark to automatically run a workflow on Qarnot.
Versions
Software | Release year | Version |
---|---|---|
Airflow Hadoop Spark |
2021 2021 2021 |
2.1.3 3.2 3.1.1 |
If you are interested in another version, please send us an email at qlab@qarnot.com.
Prerequisites
Before starting a calculation with the Python SDK, a few steps are required:
Note: in addition to the Python SDK, Qarnot provides C# and Node.js SDKs and a Command Line.
Test case
This tutorial will showcase how to run an Airflow workflow on Qarnot from your computer. The workflow is as follows:
- Start a Spark cluster
- Submit a first Spark app to the cluster: it counts the number of words in the Iliad
- Submit a second Spark app to the cluster: it counts the number of words in the Iliad concatenated 100 times
- Fetch the output of both Spark apps
- Stop the Spark cluster
All these steps will be run in succession without any manual intervention from the user.
Before moving forward, you should setup your working environment to contain the following files which can be downloaded here:
config
: contains qarnot and logging config filespython_local_logging.conf
qarnot.conf
see below
custom_operators
: scripts developed by Qarnot and needed for our Airflow workflowqarnot_utils
qarnot_operators.py
dags
: contains the script that defines the dag that we will rundag_spark.py
see below
logs
: directory where the Python and Airflow logs will be storedspark-resources
: text input files of the Iliad and the Iliad concatenated 100 timesapps
iliad.txt
iliad100.txt
Launching the test case
Once you have downloaded all the necessary files, follow the steps below to ensure you have everything you need.
Activate your Python virtual environment and make sure that the Qarnot SDK is installed in it. If you are unsure as to how to do that, you can check the SDK installation documentation for simple steps to follow.
Then install Airflow and its dependencies by running the following command.
pip install apache-airflow['cncf.kubernetes']
Add your ssh public key instead of <<<MY PUBLIC SSH KEY>>>
in dags/dag_spark.py
Add your secret token in config/qarnot.conf
instead of <<<MY_SECRET_TOKEN>>>
Move custom_operators/
to your virtual environment site-packages. Make sure to replace <<<VENV>>>
with your virtual environment’s name and the X
in pythonX
with your python version. Note that it is recommended to work inside Python virtual envs to guarantee reproducibility and keep working environments clean.
mv custom_operators/ <<<VENV>>>/lib/pythonX/site-packages/
Set your Airflow home as your current directory
export AIRFLOW_HOME="$PWD"
Initialize Airflow
airflow db init
Build your dag named my_first_dag
python3 dags/dag_spark.py
Launch Airflow workflow on Qarnot with the start date you want
airflow dags backfill my_first_dag -s 2000-01-01
A few notes to keep in mind:
- If you want to launch Airflow a second time, you can add the
--reset-dagruns
flag to bypass some conflicts related to the previous run. - It is also possible to download input files from a GCP bucket and upload your results to it. It has been excluded from this tutorial for simplicity’s sake. If you are interested in trying it please contact qlab@qarnot.com.
Results
At any given time, you can monitor the status of your task on Tasq as well as from your local terminal.
You can view the outputs in your results bucket airflow-spark-out
. Where you will find the number of words contained in both the Iliad and the version that is concatenated 100 times, as well as different execution logs as shown below.
Wrapping up
That’s it! If you have any questions, please contact qlab@qarnot.com and we will help you with pleasure!
comments