Monday 11 October 2021

Setup Apache Airflow

Prerequisite

a. Install python

brew install python

 

Step 1: Open terminal and execute below command.

pip3 install \
 apache-airflow==1.10.12 \
 --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-1.10.12/constraints-3.7.txt"

                  (or)


 

pip install \
 apache-airflow==1.10.12 \
 --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-1.10.12/constraints-3.7.txt"

Step 2: Installing with extra softwares (for example mysql, redis)

pip3 install \
 "apache-airflow[celery, crypto, rabbitmq, redis]"==1.10.12 \
 --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-1.10.12/constraints-3.7.txt"


Step 3: Execute below command to initialize airflow.

airflow initdb


Above command initializes the database for airflow. Airflow requires a database to be initialized before you can run tasks. If you’re just experimenting and learning Airflow, you can stick with the default SQLite option.

 

After I executed the command ‘airflow’ folder is created in my home directory (/Users/{user_home}/airflow}

 

You can see .db, .cfg files are created in airflow folder.

$ls
airflow.cfg	airflow.db	logs		unittests.cfg


Step 4: Create dags folder. You need to create ‘dags’ folder.

 

Execute below command to know the location to create ‘dags’ folder.

 

grep dags_folder airflow.cfg

$grep dags_folder airflow.cfg 
dags_folder = /Users/krishna/airflow/dags
# search for dags in dags_folder,
git_dags_folder_mount_point =


From the output, you can confirm the location to create dags folder. By default ‘dags’ created in same folder where .cfg and logs folder located.

$mkdir dags
$
$ls
airflow.cfg	airflow.db	dags		logs		unittests.cfg


Step 5: Unload dag examples ship with airflow.

By default, all the dag examples that come with airflow are loaded, lets unload them by updating the property load_examples to False in airflow.cfg file.

 

load_examples = False

 

Step 6: Reset db by executing below command.

airflow resetdb

 

Above command drop the dags from the metadata database.

 

Step 7: Create hello_world.py file in dags folder.

 

hello_world.py

from datetime import timedelta

# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': days_ago(2),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
    # 'wait_for_downstream': False,
    # 'dag': dag,
    # 'sla': timedelta(hours=2),
    # 'execution_timeout': timedelta(seconds=300),
    # 'on_failure_callback': some_function,
    # 'on_success_callback': some_other_function,
    # 'on_retry_callback': another_function,
    # 'sla_miss_callback': yet_another_function,
    # 'trigger_rule': 'all_success'
}
dag = DAG(
    'tutorial',
    default_args=default_args,
    description='A simple tutorial DAG',
    schedule_interval=timedelta(days=1),
)

# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag,
)

t2 = BashOperator(
    task_id='sleep',
    depends_on_past=False,
    bash_command='sleep 5',
    retries=3,
    dag=dag,
)
dag.doc_md = __doc__

t1.doc_md = """\
#### Task Documentation
You can document your task using the attributes `doc_md` (markdown),
`doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
rendered in the UI's Task Instance Details page.
![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png)
"""
templated_command = """
{% for i in range(5) %}
    echo "{{ ds }}"
    echo "{{ macros.ds_add(ds, 7)}}"
    echo "{{ params.my_param }}"
{% endfor %}
"""

t3 = BashOperator(
    task_id='templated',
    depends_on_past=False,
    bash_command=templated_command,
    params={'my_param': 'Parameter I passed in'},
    dag=dag,
)

t1 >> [t2, t3]


I copied hello_world.py file from http://airflow.apache.org/docs/stable/tutorial.html.

 

 

Step 8: Start scheduler by executing below command.


airflow scheduler

 

Wait until scheduler starts.

 

Step 9: Start airflow webserver by executing below command.


airflow webserver

 

Webserver will start at port 8080. Open the url ‘http://localhost:8080/’ in browser. You will see DAGs user interface.





As you see, the dag is in off state by default. If you turn it on, scheduler will trigger the dag as per the defined schedule.

 

Schedule column is a CRON expression to specify the frequency of the job.

 

Default owner is ‘airflow’.

 

Recent Tasks column specify the status of all the tasks of active DAG runs.

 

Last Run column specifies the execution time of latest dag run

 

DAG Runs column specifies the status of all previous DAG runs.

 

Step 10: Let’s execute the DAG by toggling the state from OFF to ON.

Once you toggle the OFF button, refresh the page, you will see ‘Recent Tasks’, DAG Runs will be updated.





Click on Graph View button to launch graph view.




 

 

Graph view


 



Click on any node in the graph to see the task details.

 

 


That’s it for this session.


Previous                                                    Next                                                    Home

No comments:

Post a Comment