The airflow.hooks.base module provides the foundational building blocks for creating these hooks. In this article, we delve deep into the concept of hooks in Airflow, capped with a detailed, real-world example to cement your understanding.
Understanding Airflow Hooks
Hooks are a pivotal part of the Airflow ecosystem, allowing for easy interaction with external data sources and services. They act as the glue between Airflow and the external system, abstracting the complexities involved in the connection and interaction process.
The BaseHook class
The BaseHook
class is the superclass for all hooks, encapsulating the logic for creating and managing connections to external services. It provides methods for retrieving connection metadata from the Airflow backend and utility functions for establishing connections.
Implementing a custom hook
To demonstrate the power and flexibility of Airflow hooks, let’s create a custom hook to interact with a fictional external REST API service that provides real-time stock market data.
Define the custom hook
from airflow.hooks.base_hook import BaseHook
import requests
class StockMarketHook(BaseHook):
def __init__(self, conn_id):
super().__init__(source=None)
self.conn_id = conn_id
self.connection = self.get_connection(conn_id)
self.extras = self.connection.extra_dejson
def get_stock_data(self, symbol):
endpoint = f"{self.connection.host}/api/stocks/{symbol}"
api_key = self.extras.get('api_key')
response = requests.get(endpoint, headers={"Authorization": f"Bearer {api_key}"})
response.raise_for_status()
return response.json()
In the above example, the StockMarketHook class inherits from BaseHook. It utilizes the get_connection method to retrieve the connection information stored in Airflow’s backend. The get_stock_data method makes a GET request to the external API and returns the JSON response.
Store connection information
Before using the hook, store the connection information in Airflow:
- Go to Admin -> Connections in the Airflow UI.
- Add a new connection with
conn_id
asstock_market_api
. - Include the host URL and API key in the Extras field as JSON:
{"api_key": "YOUR_API_KEY"}
.
Use the Hook in a DAG
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
from custom_hooks.stock_market_hook import StockMarketHook
default_args = {
'start_date': datetime(2023, 1, 1),
}
def pull_stock_data(ti, symbol):
hook = StockMarketHook(conn_id="stock_market_api")
stock_data = hook.get_stock_data(symbol)
ti.xcom_push(key='stock_data', value=stock_data)
with DAG('stock_market_dag', default_args=default_args, schedule_interval='@daily') as dag:
pull_data = PythonOperator(
task_id='pull_stock_data',
python_callable=pull_stock_data,
op_kwargs={'symbol': 'AAPL'},
)
pull_data
In the DAG definition, the PythonOperator
uses the custom hook to pull data for the Apple Inc. stock (AAPL) and pushes it to XComs for other tasks to use.
Read more on Airflow here : Airflow