Tuesday, 19 October 2021

Airflow: Add python task to the dag

This is continuation to my previous post. We are going to execute a python script using python operator. This task reads the csv file from and trim the header and put the contents of csv file in /tmp directory.

 

Step 1: First we need to select a location to keep all our python scripts.

 

Example

export PYTHONPATH=/Users/Shared/airflow/data-pipelines

 

Step 2: Create ‘fetching_emps_info.py’ file in PYTHONPATH.

fetching_emps_info.py

import pandas as pd
from datetime import datetime as dt

TEMP_DIR = '/tmp/'
CSV_FILE_PATH = '/Users/Shared/airflow/data/empsInfo.csv'

def main():
	empsInfo = pd.read_csv(CSV_FILE_PATH, encoding='utf-8', skiprows=[0])

	empsInfo.to_csv(TEMP_DIR + 'empsData.csv', index=False)

 

Step 3: Update ‘first_dag.py’ like below.

 

first_dag.py

 

# To crate DAG, we need to import the class DAG from the moduel airflow
from airflow import DAG
from airflow.contrib.sensors.file_sensor import FileSensor
from airflow.operators.python_operator import PythonOperator

from datetime import datetime

import fetching_emps_info


# DAG needs start date and schedule interval to run. owner property is used to filter the dags
default_args = {
    "start_date": datetime(2020, 2, 2),
    "owner": "airflow"
}

# Get an instance of DAG
# @daily: It is a predefined cron identifier in airflow, trigger this dag everyday once
with DAG(dag_id="first_dag", schedule_interval="@daily", default_args=default_args ) as dag:
    waiting_for_emps_Info = FileSensor(task_id="Waiting_for_employees_info",fs_conn_id="emps_info", filepath="empsInfo.csv",
        poke_interval=10)

    emp_details = PythonOperator(task_id="fetch_employees_info", python_callable=fetching_emps_info.main)

 

Step 4: Execute the task

airflow test first_dag fetch_employees_info 2020-02-02

$ airflow test first_dag fetch_employees_info 2020-02-02
[2020-10-06 11:23:04,596] {__init__.py:50} INFO - Using executor SequentialExecutor
[2020-10-06 11:23:04,597] {dagbag.py:417} INFO - Filling up the DagBag from /Users/krishna/airflow/dags
[2020-10-06 11:23:04,793] {taskinstance.py:670} INFO - Dependencies all met for <TaskInstance: first_dag.fetch_employees_info 2020-02-02T00:00:00+00:00 [None]>
[2020-10-06 11:23:04,798] {taskinstance.py:670} INFO - Dependencies all met for <TaskInstance: first_dag.fetch_employees_info 2020-02-02T00:00:00+00:00 [None]>
[2020-10-06 11:23:04,798] {taskinstance.py:880} INFO - 
--------------------------------------------------------------------------------
[2020-10-06 11:23:04,798] {taskinstance.py:881} INFO - Starting attempt 1 of 1
[2020-10-06 11:23:04,798] {taskinstance.py:882} INFO - 
--------------------------------------------------------------------------------
[2020-10-06 11:23:04,799] {taskinstance.py:901} INFO - Executing <Task(PythonOperator): fetch_employees_info> on 2020-02-02T00:00:00+00:00
[2020-10-06 11:23:04,814] {python_operator.py:114} INFO - Done. Returned value was: None
[2020-10-06 11:23:04,816] {taskinstance.py:1057} INFO - Marking task as SUCCESS.dag_id=first_dag, task_id=fetch_employees_info, execution_date=20200202T000000, start_date=20201006T055304, end_date=20201006T055304

Step 5: Query the file empsData.csv, you can see the employees information.

$ cat /tmp/empsData.csv 
1,Krishna,31
2,Ram,28
3,Sailu,30
4,Bala,42


 

Previous                                                    Next                                                    Home

No comments:

Post a Comment