This is continuation to my previous post. We are going to execute a python script using python operator. This task reads the csv file from and trim the header and put the contents of csv file in /tmp directory.
Step 1: First we need to select a location to keep all our python scripts.
Example
export PYTHONPATH=/Users/Shared/airflow/data-pipelines
Step 2: Create ‘fetching_emps_info.py’ file in PYTHONPATH.
fetching_emps_info.py
import pandas as pd
from datetime import datetime as dt
TEMP_DIR = '/tmp/'
CSV_FILE_PATH = '/Users/Shared/airflow/data/empsInfo.csv'
def main():
empsInfo = pd.read_csv(CSV_FILE_PATH, encoding='utf-8', skiprows=[0])
empsInfo.to_csv(TEMP_DIR + 'empsData.csv', index=False)
Step 3: Update ‘first_dag.py’ like below.
first_dag.py
# To crate DAG, we need to import the class DAG from the moduel airflow
from airflow import DAG
from airflow.contrib.sensors.file_sensor import FileSensor
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
import fetching_emps_info
# DAG needs start date and schedule interval to run. owner property is used to filter the dags
default_args = {
"start_date": datetime(2020, 2, 2),
"owner": "airflow"
}
# Get an instance of DAG
# @daily: It is a predefined cron identifier in airflow, trigger this dag everyday once
with DAG(dag_id="first_dag", schedule_interval="@daily", default_args=default_args ) as dag:
waiting_for_emps_Info = FileSensor(task_id="Waiting_for_employees_info",fs_conn_id="emps_info", filepath="empsInfo.csv",
poke_interval=10)
emp_details = PythonOperator(task_id="fetch_employees_info", python_callable=fetching_emps_info.main)
Step 4: Execute the task
airflow test first_dag fetch_employees_info 2020-02-02
$ airflow test first_dag fetch_employees_info 2020-02-02
[2020-10-06 11:23:04,596] {__init__.py:50} INFO - Using executor SequentialExecutor
[2020-10-06 11:23:04,597] {dagbag.py:417} INFO - Filling up the DagBag from /Users/krishna/airflow/dags
[2020-10-06 11:23:04,793] {taskinstance.py:670} INFO - Dependencies all met for <TaskInstance: first_dag.fetch_employees_info 2020-02-02T00:00:00+00:00 [None]>
[2020-10-06 11:23:04,798] {taskinstance.py:670} INFO - Dependencies all met for <TaskInstance: first_dag.fetch_employees_info 2020-02-02T00:00:00+00:00 [None]>
[2020-10-06 11:23:04,798] {taskinstance.py:880} INFO -
--------------------------------------------------------------------------------
[2020-10-06 11:23:04,798] {taskinstance.py:881} INFO - Starting attempt 1 of 1
[2020-10-06 11:23:04,798] {taskinstance.py:882} INFO -
--------------------------------------------------------------------------------
[2020-10-06 11:23:04,799] {taskinstance.py:901} INFO - Executing <Task(PythonOperator): fetch_employees_info> on 2020-02-02T00:00:00+00:00
[2020-10-06 11:23:04,814] {python_operator.py:114} INFO - Done. Returned value was: None
[2020-10-06 11:23:04,816] {taskinstance.py:1057} INFO - Marking task as SUCCESS.dag_id=first_dag, task_id=fetch_employees_info, execution_date=20200202T000000, start_date=20201006T055304, end_date=20201006T055304
Step 5: Query the file empsData.csv, you can see the employees information.
$ cat /tmp/empsData.csv
1,Krishna,31
2,Ram,28
3,Sailu,30
4,Bala,42
No comments:
Post a Comment