Wednesday 13 October 2021

Airflow: hello world DAG

Step 1: Create first_dag.py file and place it in dags folder.

 

first_dag.py

# To crate DAG, we need to import the class DAG from the moduel airflow
from airflow import DAG

# Get an instance of DAG
# @daily: It is a predefined cron identifier in airflow, trigger this dag everyday once
with DAG(dag_id="first_dag", schedule_interval="@daily" ) as dag:
    None

 

from airflow import DAG

To crate DAG, we need to import the class DAG from the moduel airflow

 

with DAG(dag_id="first_dag", schedule_interval="@daily" ) as dag:

Above statement get an instance of DAG.

 

@daily

It is a predefined cron identifier in airflow, trigger this dag everyday once

 

Step 2: Start airflow scheduler by executing below command.

airflow scheduler

$airflow scheduler
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/
[2020-10-04 10:22:47,605] {__init__.py:50} INFO - Using executor SequentialExecutor
[2020-10-04 10:22:47,610] {scheduler_job.py:1367} INFO - Starting the scheduler
[2020-10-04 10:22:47,610] {scheduler_job.py:1375} INFO - Running execute loop for -1 seconds
[2020-10-04 10:22:47,610] {scheduler_job.py:1376} INFO - Processing each file at most -1 times
[2020-10-04 10:22:47,610] {scheduler_job.py:1379} INFO - Searching for files in /Users/krishna/airflow/dags
[2020-10-04 10:22:47,617] {scheduler_job.py:1381} INFO - There are 2 files in /Users/krishna/airflow/dags
[2020-10-04 10:22:47,617] {scheduler_job.py:1438} INFO - Resetting orphaned tasks for active dag runs
[2020-10-04 10:22:47,648] {dag_processing.py:562} INFO - Launched DagFileProcessorManager with pid: 6462
[2020-10-04 10:22:48,875] {settings.py:55} INFO - Configured default timezone <Timezone [UTC]>
[2020-10-04 10:22:48,884] {dag_processing.py:774} WARNING - Because we cannot use more than 1 thread (max_threads = 2) when using sqlite. So we set parallelism to 1.

 

Step 3: Start webserver by executing below command.

airflow webserver

$airflow webserver
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/
[2020-10-04 10:23:34,785] {__init__.py:50} INFO - Using executor SequentialExecutor
[2020-10-04 10:23:34,786] {dagbag.py:417} INFO - Filling up the DagBag from /Users/krishna/airflow/dags
Running the Gunicorn Server with:
Workers: 4 sync
Host: 0.0.0.0:8080
Timeout: 120
Logfiles: - -
=================================================================            
[2020-10-04 10:23:35 +0530] [6523] [INFO] Starting gunicorn 20.0.4
[2020-10-04 10:23:35 +0530] [6523] [INFO] Listening at: http://0.0.0.0:8080 (6523)
[2020-10-04 10:23:35 +0530] [6523] [INFO] Using worker: sync
[2020-10-04 10:23:35 +0530] [6526] [INFO] Booting worker with pid: 6526
[2020-10-04 10:23:35 +0530] [6527] [INFO] Booting worker with pid: 6527
[2020-10-04 10:23:36 +0530] [6528] [INFO] Booting worker with pid: 6528
[2020-10-04 10:23:36 +0530] [6529] [INFO] Booting worker with pid: 6529

  Step 4: Open the url http://localhost:8080 in browser to see the dag first_dag.

 


 

Go to ‘Graph View’ by clicking on Graph view button.

 

 


You will be navigated to Graph View of the dag.

 

 


You can observe there is a warning message ‘No tasks found’.

 

Let’s define a task to the airflow first_dag in the next post.



Previous                                                    Next                                                    Home

No comments:

Post a Comment