Problem Definition

One typical Airflow usage scenario is to continuously execute some workflow with regular base, and the output data of last iteration will be the input data for the next iteration.

One way we can do that is to keep your output data as a local file or store that into database table, and read and update those data in every iteration. However, with those solutions you need to manual handle database connections and that is not convenient sometime.

If you are looking for passing variables across different tasks within a dag, have a look at Airflow XComs and that could potentially solve your problem.

Solution with Airflow

Airflow provide a feature called Variables to solve such problem. Under the hood Variables are just a key-value storage, and the data will be stored into the PostgreSQL database.

Airflow will automatic handle the database connection, and you can direct use it in your Airflow Dag and don’t need to worry about database connection, release database pool resource, etc.

Example

An example dag script would be the following

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
#!/usr/bin/python
# -*- coding: utf-8 -*-
# __author__ = "eyang#outlook.co.nz"
#
from pathlib import Path
from datetime import datetime, timedelta

import airflow
from airflow import DAG
from airflow.models import Variable
from airflow.operators.python_operator import PythonOperator

args = {
    "owner": "Eric Yang",
    "depends_on_past": False,
    "start_date": airflow.utils.dates.days_ago(2),
    "email": ["eyang#outlook.co.nz"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 0,
    "retry_delay": timedelta(minutes=5)
}

dag_id = Path(__file__).stem

dag = DAG(
    dag_id=dag_id,
    default_args=args,
    description="Airflow Variables Test",
    schedule_interval=None)


def set_variable(**kwargs):
    print(kwargs, type(kwargs))
    Variable.set("foo", "bar")

    bar_value = Variable.get("foo")
    print(bar_value, type(bar_value))


set_variable_opt = PythonOperator(
    task_id="set_variable",
    python_callable=set_variable,
    provide_context=True,
    dag=dag)

The above code define a PythonOperator called set_variable_opt, in which it calls another set_variable Python function and setup a Variable called foo with value bar.