How to run a Spark Job in Airflow hosted on a different server using BashOperator ?

Airflow + Spark @ Freshers.in

Airflow Spark @ Freshers.inIn this article we will discuss on how we can trigger a PySpark Job running on a AWS EMR from an Airflow hosted on different instance. For initial set up you need to create a permanent connection between the airflow server and EMR ( Where your Spark Server ) . You can achieve the permanent connection using  SSH Key in ~/.ssh folder of the EMR

How to generate the Key

For 2048-bit RSA:
ssh-keygen -t rsa -b 2048 -C "<comment>"
For ED25519:
ssh-keygen -t ed25519 -C "<comment>"

Once the Connection is successfully established you can use BashOperator to trigger the Spark Job. The first question that comes in your mind is whether we can get the Spark Logs in the Airfolw. Answer is “Yes” we can get using the bellow method . 

For executing we need these 

  1. Connection between the Airflow server and EMR => We established above
  2. Spark Code in EMR 
  3. Shell script to run the spark code in Airflow server path.
  4. Airflow DAG(python code) to run the Shell Script

Spark Code in EMR server (file name : bash_freshers_sample_spark.py)

#Sample Code
from pyspark.sql import SparkSession
from pyspark.sql.types import MapType
from pyspark.sql.types import StructType,StructField, StringType,IntegerType
from pyspark.sql.functions import current_date,concat_ws,col,lit
emp_data =[
("Finance",("King","James","John"),101),
("Technology",("Dude","David","William"),201),
("Human_Resource",("Sam","","John"),301),
("Administrative",("Wilson","Christopher","Daniel"),401),
("Contractors",("Sirus","Kenneth","Kevin"),501),
]
emp_data_schema = StructType([
StructField("dept",StringType(),True),
StructField("name",StructType([
StructField("first_name",StringType(),True),
StructField("middle_name",StringType(),True),
StructField("last_name",StringType(),True)])),
StructField("dept_id",IntegerType(),True),])
emp_df_1 = spark.createDataFrame(data=emp_data,schema=emp_data_schema)
emp_df_1.printSchema()
emp_df_1.show(20,False)

Shell script to run the spark code in Airflow server path. (file name : freshers_in_spark.sh)

The first three lines should go in a single line( For easy viewability I made as 3 lines [ssh command should go in one line …ssh to till _spark.py])

ssh -i ~/.ssh/keyfor_emr_rsa -o StrictHostKeyChecking=no user_freshers@101.21.135.17 
"spark-submit --packages com.databricks:spark-xml_2.12:0.12.0 --conf spark.ui.port=5051 
--conf spark.port.maxRetries=20 /mnt/freshers/in/spark/analytics/bash_freshers_sample_spark.py "
 
test $? -eq 0 || { printf "JOB_GET_FRESHERS_GS. $NC \n\n" && exit 1 ; }
 
printf "Executed SuccessfullyJOB_GET_FRESHERS_GS \n"

Airflow DAG(python code) in Airflow server to run the Shell Script (file name : freshers_spark_job.py)

from airflow import DAG
from datetime import datetime
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator

exe_user = user_freshers@101.21.135.17
bash_freshers_gs = f"sh /mnt/freshers/in/jobs/freshers_in_spark.sh {exe_user} "
bash_freshers_fb   = f"sh /mnt/freshers/in/jobs/job_get_freshers_fb_analytics.sh {exe_user} "

default_args = {
    'owner': 'freshers_in',
    'depends_on_past': False
}

dag = DAG(
    dag_id='Freshers_Spark_Job',
    default_args=default_args,
    description=' Freshrs.in Sample Spark Job from Airflow ',
    schedule_interval='0 9 * * *',
    start_date='20220403035134',
    catchup=True,
    tags=['Freshers', 'Spark'], )

JOB_GET_FRESHERS_GS = BashOperator(task_id='JOB_GET_FRESHERS_GS', bash_command=bash_freshers_gs,
                                          dag=dag)

JOB_GET_FRESHERS_GS 

Reference

  1. Airflow Official Page
  2. Spark Interview Questions
  3. Spark Examples
  4. PySpark Blogs
  5. Bigdata Blogs
  6. Official Page
Author: user

Leave a Reply