How to create a Airflow DAG(Scheduler) to execute a redshift query ?

Apache Airflow

Use case : We have a redshift query (an insert sql ) to load data from another table on daily basis . This need to schedule in airflow and need to trigger on daily basis. The redshift credentials cannot expose directly , so we kept it in a S3 bucket in json format. 

Steps that need to include in the Airflow Dag are as follows

1. Read the Redshift Credentials from S3 using Boto3 python library.
2. Connect redshift cluster using pg8000 Python library.
3. Execute the redshift SQL.

Source Code

import json
import boto3
import pg8000
from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.python_operator import PythonOperator
"""
S3 Data for your reference: This is stored in S3 location
s3 path : s3://freshers-dev-training-alpha/code-config/db_cred.json
json content:
{
"host": "dev-redshift-redshift.us-west-1.redshift.amazonaws.com",,
"port": 5439,
"user": "freshers_usr_dev",
"password": "***FD*ERD",
"ssl":True,
"database":"freshers-db"
}
"""
s3_config_bcket = 'freshers-dev-training-alpha'
table_cred_file = 'code-config/db_cred.json'

sql_query =""" insert into dev.freshers_mkt_Tbl as 
select freshers_id,freshers_name,freshers_dept 
from dev.freshers_raw_table where freshers_dept = 'mkt' 
and as_on_date=CURRENT_DATE"""

default_args = {
    'owner': 'freshers.in development',
    'depends_on_past': False,
}

def establish_redshift_connect(read_credential):
    """
    :param db_cred: This will have the credential that is read from S3
    :return: connetion and the cursor
    """
    try:
        conn = pg8000.connect(**read_credential)
        cur = conn.cursor()
        return conn,cur
    except Exception as e:
        print("Failed to connect to Redshift : ",e)
        exit(1)

def connect_identify_truncate_tbl(read_credential,sql_query):
    """
    Connecting to the defined credentials
    :param db_cred: Credentials Json
    :param sql_fetch: Sql that need to execute.
    :return:
    """
    conn,cur = establish_redshift_connect(read_credential)
    try:
        cur.execute(sql_query)
        cur.execute('commit)
    except Exception as e:
        print("Redshift connection", e)
        conn.close()
        cur.close()
    return None

def main_invoke_module():  
    read_credential = json.loads(boto3.resource('s3').Object(s3_config_bucket_name,table_cred_file).get()['Body'].read().decode('utf-8'))
    connect_identify_truncate_tbl(read_credential,sql_1)

dag = DAG(
    'executing_redshift_query_using_airflow',
    default_args=default_args,
    description='executing redshift query using airflow',
    schedule_interval=timedelta(days=1),
    start_date=datetime(2022, 3, 10),
    catchup=True,
    tags=['freshers'],)

main_invoke_module = PythonOperator(task_id='main_invoke_module',python_callable=main_invoke_module, dag=dag)

main_invoke_module

Boto3 is the name of the Python SDK for AWS. It allows you to directly create, update, and delete AWS resources from your Python scripts.

pg8000 is a Pure-Python interface to the PostgreSQL database engine.

Apache Airflow is an open-source workflow management platform for data engineering pipelines.

PythonOperator is used to execute Python callables.

Reference 

Python articles

 

Author: user

Leave a Reply