Tuesday, 19 October 2021

Airflow: Add Bash Operator to the dag

This is continuation to my previous post. In this post, I am going to add bash operator the dag. Bash operator is used to execute bash commands in a bash shell.

 

For example, Lets add a task that print the contents of csv file.

BashOperator(task_id="print_emps_info", bash_command="cat /tmp/empsData.csv")

 

Update first_dag.py file with below content.

 

first_dag.py

 

# To crate DAG, we need to import the class DAG from the moduel airflow
from airflow import DAG
from airflow.contrib.sensors.file_sensor import FileSensor
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator

from datetime import datetime

import fetching_emps_info


# DAG needs start date and schedule interval to run. owner property is used to filter the dags
default_args = {
    "start_date": datetime(2020, 2, 2),
    "owner": "airflow"
}

# Get an instance of DAG
# @daily: It is a predefined cron identifier in airflow, trigger this dag everyday once
with DAG(dag_id="first_dag", schedule_interval="@daily", default_args=default_args ) as dag:
    waiting_for_emps_Info = FileSensor(task_id="Waiting_for_employees_info",fs_conn_id="emps_info", filepath="empsInfo.csv",
        poke_interval=10)

    emp_details = PythonOperator(task_id="fetch_employees_info", python_callable=fetching_emps_info.main)

    BashOperator(task_id="print_emps_info", bash_command="cat /tmp/empsData.csv")

 

Test 'print_emps_info' task by executing below command.

$ airflow test first_dag print_emps_info 2020-02-02
[2020-10-06 11:41:49,363] {__init__.py:50} INFO - Using executor SequentialExecutor
[2020-10-06 11:41:49,363] {dagbag.py:417} INFO - Filling up the DagBag from /Users/krishna/airflow/dags
[2020-10-06 11:41:49,565] {taskinstance.py:670} INFO - Dependencies all met for <TaskInstance: first_dag.print_emps_info 2020-02-02T00:00:00+00:00 [None]>
[2020-10-06 11:41:49,570] {taskinstance.py:670} INFO - Dependencies all met for <TaskInstance: first_dag.print_emps_info 2020-02-02T00:00:00+00:00 [None]>
[2020-10-06 11:41:49,570] {taskinstance.py:880} INFO - 
--------------------------------------------------------------------------------
[2020-10-06 11:41:49,570] {taskinstance.py:881} INFO - Starting attempt 1 of 1
[2020-10-06 11:41:49,570] {taskinstance.py:882} INFO - 
--------------------------------------------------------------------------------
[2020-10-06 11:41:49,571] {taskinstance.py:901} INFO - Executing <Task(BashOperator): print_emps_info> on 2020-02-02T00:00:00+00:00
[2020-10-06 11:41:49,583] {bash_operator.py:113} INFO - Tmp dir root location: 
 /var/folders/tp/qybw2qy54t39ffn2l0grsdrc0000gp/T
[2020-10-06 11:41:49,584] {bash_operator.py:134} INFO - Temporary script location: /var/folders/tp/qybw2qy54t39ffn2l0grsdrc0000gp/T/airflowtmpdg2babg4/print_emps_infoka1g_hj8
[2020-10-06 11:41:49,584] {bash_operator.py:146} INFO - Running command: cat /tmp/empsData.csv
[2020-10-06 11:41:49,590] {bash_operator.py:153} INFO - Output:
[2020-10-06 11:41:49,597] {bash_operator.py:157} INFO - 1,Krishna,31
[2020-10-06 11:41:49,597] {bash_operator.py:157} INFO - 2,Ram,28
[2020-10-06 11:41:49,597] {bash_operator.py:157} INFO - 3,Sailu,30
[2020-10-06 11:41:49,597] {bash_operator.py:157} INFO - 4,Bala,42
[2020-10-06 11:41:49,597] {bash_operator.py:159} INFO - Command exited with return code 0
[2020-10-06 11:41:49,604] {taskinstance.py:1057} INFO - Marking task as SUCCESS.dag_id=first_dag, task_id=print_emps_info, execution_date=20200202T000000, start_date=20201006T061149, end_date=20201006T061149

 


 

Previous                                                    Next                                                    Home

No comments:

Post a Comment