This is continuation to my previous post. In this post, I am going to add bash operator the dag. Bash operator is used to execute bash commands in a bash shell.
For example, Lets add a task that print the contents of csv file.
BashOperator(task_id="print_emps_info", bash_command="cat /tmp/empsData.csv")
Update first_dag.py file with below content.
first_dag.py
# To crate DAG, we need to import the class DAG from the moduel airflow
from airflow import DAG
from airflow.contrib.sensors.file_sensor import FileSensor
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from datetime import datetime
import fetching_emps_info
# DAG needs start date and schedule interval to run. owner property is used to filter the dags
default_args = {
"start_date": datetime(2020, 2, 2),
"owner": "airflow"
}
# Get an instance of DAG
# @daily: It is a predefined cron identifier in airflow, trigger this dag everyday once
with DAG(dag_id="first_dag", schedule_interval="@daily", default_args=default_args ) as dag:
waiting_for_emps_Info = FileSensor(task_id="Waiting_for_employees_info",fs_conn_id="emps_info", filepath="empsInfo.csv",
poke_interval=10)
emp_details = PythonOperator(task_id="fetch_employees_info", python_callable=fetching_emps_info.main)
BashOperator(task_id="print_emps_info", bash_command="cat /tmp/empsData.csv")
Test 'print_emps_info' task by executing below command.
$ airflow test first_dag print_emps_info 2020-02-02
[2020-10-06 11:41:49,363] {__init__.py:50} INFO - Using executor SequentialExecutor
[2020-10-06 11:41:49,363] {dagbag.py:417} INFO - Filling up the DagBag from /Users/krishna/airflow/dags
[2020-10-06 11:41:49,565] {taskinstance.py:670} INFO - Dependencies all met for <TaskInstance: first_dag.print_emps_info 2020-02-02T00:00:00+00:00 [None]>
[2020-10-06 11:41:49,570] {taskinstance.py:670} INFO - Dependencies all met for <TaskInstance: first_dag.print_emps_info 2020-02-02T00:00:00+00:00 [None]>
[2020-10-06 11:41:49,570] {taskinstance.py:880} INFO -
--------------------------------------------------------------------------------
[2020-10-06 11:41:49,570] {taskinstance.py:881} INFO - Starting attempt 1 of 1
[2020-10-06 11:41:49,570] {taskinstance.py:882} INFO -
--------------------------------------------------------------------------------
[2020-10-06 11:41:49,571] {taskinstance.py:901} INFO - Executing <Task(BashOperator): print_emps_info> on 2020-02-02T00:00:00+00:00
[2020-10-06 11:41:49,583] {bash_operator.py:113} INFO - Tmp dir root location:
/var/folders/tp/qybw2qy54t39ffn2l0grsdrc0000gp/T
[2020-10-06 11:41:49,584] {bash_operator.py:134} INFO - Temporary script location: /var/folders/tp/qybw2qy54t39ffn2l0grsdrc0000gp/T/airflowtmpdg2babg4/print_emps_infoka1g_hj8
[2020-10-06 11:41:49,584] {bash_operator.py:146} INFO - Running command: cat /tmp/empsData.csv
[2020-10-06 11:41:49,590] {bash_operator.py:153} INFO - Output:
[2020-10-06 11:41:49,597] {bash_operator.py:157} INFO - 1,Krishna,31
[2020-10-06 11:41:49,597] {bash_operator.py:157} INFO - 2,Ram,28
[2020-10-06 11:41:49,597] {bash_operator.py:157} INFO - 3,Sailu,30
[2020-10-06 11:41:49,597] {bash_operator.py:157} INFO - 4,Bala,42
[2020-10-06 11:41:49,597] {bash_operator.py:159} INFO - Command exited with return code 0
[2020-10-06 11:41:49,604] {taskinstance.py:1057} INFO - Marking task as SUCCESS.dag_id=first_dag, task_id=print_emps_info, execution_date=20200202T000000, start_date=20201006T061149, end_date=20201006T061149
No comments:
Post a Comment