Demo: ETL from GCS to BigQuery using Composer(Airflow)

Published on
8 mins read
––– views
Photo by Chris Ried on Unsplash
python-notes


This post shares the daily ETL operation from Cloud storage (Daily Avro file) into BigQuery Raw Table.

  1. DAG Creation: Developed two DAGs to monitor and perform ETL operations efficiently.

  2. Validation Implementations:

    • Implemented logic to identify partition gaps.
    • Added data count validation to ensure data integrity.
    • Included schema mismatch validation to detect any discrepancies in the data structure.
  3. Slack Alerts: Created a Slack alert task to send instant notifications for any issues or updates during the ETL process.


Diagram:

gcs_bigquery_airlfow

gcs_bigquery_airlfow_1

This version provides a clear and concise summary of your achievements.

Setting up Slack in Composer (Airflow)

  1. Click DAGS in Cloud Console option, then add apache-airflow-providers-slack under PyPI packages tab for install slack dependencies in the airflow cluster.
  2. Create slack_api_default under the Admin -> Connections tab via Composer UI. Paste the slack token here.
from airflow.models.connection import Connection
conn = Connection(
conn_id="slack_api_default",
conn_type="slack",
password="",
extra={
# Specify extra parameters here
"timeout": "42",
},
)
# Generate Environment Variable Name
env_key = f"AIRFLOW_CONN_{conn.conn_id.upper()}"
print(f"{env_key}='{conn.get_uri()}'")
# AIRFLOW_CONN_SLACK_API_DEFAULT='slack://:xoxb-1234567890123-09876543210987-AbCdEfGhIjKlMnOpQrStUvWx@/?timeout=42'

Daily Monitor.py

import os
from datetime import timedelta, datetime
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.dummy import DummyOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.providers.google.cloud.operators.gcs import GCSListObjectsOperator
from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from airflow.providers.slack.operators.slack import SlackAPIPostOperator
# -----------------------------------------------------------------------------
# Parameters
# -----------------------------------------------------------------------------
translate_table = str.maketrans(" :-", "___")
translate_partition = str.maketrans(":-", " ")
environment = "dev"
data_frequency = "daily"
gcs_file_format = "AVRO"
gcs_success_file = "_SUCCESS"
project_name = "PROJECT_NAME"
team_name = "PROJECT_NAME"
location = "europe-west2"
project_id = "PROJECT_ID"
dataset_id = "DATASET_ID"
gcs_bucket = "BUCKET_NAME"
feed_name = "FEED_NAME"
slack_channel_id = "CXXXXXX" # develop
slack_conn_id = "slack_api_default" # Created via Google Cloud Composer Airflow UI
slack_username = "Demo_ETL"
dag_id = f"{os.path.basename(__file__).replace('.py', '')}"
gcs_prefix = f"{feed_name}/"
table_slug = feed_name.translate(translate_table)
raw_table_id = f"raw__{table_slug}__{data_frequency}"
labels = {"env": environment, "project": project_name, "team": team_name}
pipeline_dag_id = (
f"{environment}__{project_name}__{table_slug}__{data_frequency}__pipeline"
)
# SQL Query to get the bigquery table partitions
get_raw_table_partition_query = f"""
SELECT
partition_id
FROM
`{project_id}.{dataset_id}.INFORMATION_SCHEMA.PARTITIONS`
WHERE
table_name = '{raw_table_id}';"""
# SQL Query for identifying the partition gap
identify_missing_partition_query = f"""
WITH
raw AS (
SELECT
partition_id
FROM
`{project_id}.{dataset_id}.INFORMATION_SCHEMA.PARTITIONS`
WHERE
table_name = '{raw_table_id}'
AND partition_id <> '__NULL__'
),
expected AS (
SELECT
FORMAT_DATE("%Y%m%d", date) AS partition_id
FROM
UNNEST(
GENERATE_DATE_ARRAY(
'2024-01-01',
DATE_SUB(CURRENT_DATE(), INTERVAL 2 DAY), -- TIMEDELTA
INTERVAL 1 DAY
)
) AS date
)
SELECT
expected.partition_id
FROM
expected LEFT JOIN raw ON raw.partition_id = expected.partition_id
WHERE
raw.partition_id IS NULL
AND expected.partition_id >= (
SELECT
COALESCE(MIN(partition_id), FORMAT_DATE("%Y%m%d", CURRENT_DATE()))
FROM
`{project_id}.{dataset_id}.INFORMATION_SCHEMA.PARTITIONS`
WHERE
table_name = '{raw_table_id}'
AND partition_id <> '__NULL__'
);
"""
# -----------------------------------------------------------------------------
# DAG Init
# -----------------------------------------------------------------------------
default_args = {
"owner": f"{team_name.upper()}",
"start_date": days_ago(1),
"retries": 0,
"retry_delay": timedelta(hours=1),
"execution_timeout": timedelta(minutes=30),
"depends_on_past": False,
}
dag = DAG(
dag_id=dag_id,
description=f"A DAG to monitor the {project_name.capitalize()} file landing path in GCS and initiate the ETL DAG pipeline.",
default_args=default_args,
schedule=timedelta(hours=1),
catchup=False,
max_active_runs=1,
tags=[environment, project_name, "etl", table_slug, data_frequency],
)
# -----------------------------------------------------------------------------
# Functions
# -----------------------------------------------------------------------------
def get_gcs_partition(**kwargs):
"""fetches the gcs partition list from `scan_gcs_bucket` task and filter the successfully uploaded partition folders."""
ti = kwargs["ti"]
folders = [
os.path.dirname(object)
for object in ti.xcom_pull(task_ids="scan_gcs_bucket")
if object.endswith(gcs_success_file)
]
storage_partition = [item.split("/")[1] for item in folders]
print(f"\t{storage_partition=}")
ti.xcom_push(key="storage_partition", value=storage_partition)
def check_partition_exists(job_id, **kwargs):
"""compare the gcs parition and bigquery table partition to get the latest/new partition list."""
print(f"\t BigQuery Job ID: {job_id}")
ti = kwargs["ti"]
hook = BigQueryHook()
client = hook.get_client(project_id=project_id, location=location)
query_results = client.get_job(job_id).result()
bq_partition = [
datetime.strptime(row.get("partition_id"), "%Y%m%d%H").strftime(
"%Y-%m-%d %H:%M:%S"
)
for row in query_results
if row.get("partition_id", "__NULL__") not in ["__NULL__"]
]
bq_partition.sort()
print(f"\t{bq_partition=}")
storage_partition = ti.xcom_pull(
task_ids="get_gcs_partition", key="storage_partition"
)
storage_partition.sort()
print(f"\t{storage_partition=}")
# Identify the new partitions
new_storage_partition = list(set(storage_partition) - set(bq_partition))
new_storage_partition.sort()
print(f"\t{new_storage_partition=}")
ti.xcom_push(key="new_storage_partition", value=new_storage_partition)
def trigger_dags(**kwargs):
"""triggers the ETL pipeline DAG with partition_key"""
ti = kwargs["ti"]
partition_keys = ti.xcom_pull(
task_ids="check_new_partition", key="new_storage_partition"
)
for partition_key in partition_keys:
print({"parition_key": partition_key})
TriggerDagRunOperator(
task_id=f"pipeline_dag__{partition_key.translate(translate_table)}",
trigger_dag_id=pipeline_dag_id,
conf={"partition_key": partition_key},
dag=dag,
).execute(context=kwargs)
def identify_partition_gap(**kwargs):
"""Checks for the partition gap and notifies in slack channel if any gap is found."""
ti = kwargs["ti"]
print(identify_missing_partition_query)
hook = BigQueryHook(location=location, use_legacy_sql=False, labels=labels)
df = hook.get_pandas_df(identify_missing_partition_query)
missing_partitions = df["partition_id"].tolist()
print(f"\t{missing_partitions}")
if len(missing_partitions):
message = f"""{dag_id} | {project_id} | {dataset_id} | {raw_table_id} | Partition gap identified | Total - {len(missing_partitions)} - {missing_partitions}"""
ti.xcom_push(key="slack_message", value=message)
return "send_alert"
return "ignore_alert"
# -----------------------------------------------------------------------------
# DAG Tasks
# -----------------------------------------------------------------------------
scan_gcs_bucket_task = GCSListObjectsOperator(
task_id="scan_gcs_bucket",
bucket=gcs_bucket,
prefix=gcs_prefix,
match_glob=f"**/*/{gcs_success_file}",
dag=dag,
)
get_gcs_partition_task = PythonOperator(
task_id="get_gcs_partition",
python_callable=get_gcs_partition,
provide_context=True,
dag=dag,
)
get_bq_partition_task = BigQueryInsertJobOperator(
task_id="get_bq_partition",
configuration={
"query": {
"query": get_raw_table_partition_query,
"useLegacySql": False,
}
},
location=location,
project_id=project_id,
dag=dag,
)
check_new_partition_task = PythonOperator(
task_id="check_new_partition",
python_callable=check_partition_exists,
op_args=["{{ task_instance.xcom_pull(task_ids='get_bq_partition') }}"],
provide_context=True,
dag=dag,
)
trigger_dag_task_loop = PythonOperator(
task_id="trigger_dag_loop",
python_callable=trigger_dags,
provide_context=True,
dag=dag,
)
identify_partition_gap_task = BranchPythonOperator(
task_id="identify_partition_gap",
python_callable=identify_partition_gap,
dag=dag,
)
send_alert_task = SlackAPIPostOperator(
task_id="send_alert",
text="{{ task_instance.xcom_pull(key='slack_message') }}",
channel=slack_channel_id,
username=slack_username,
dag=dag,
)
ignore_alert_task = DummyOperator(
task_id="ignore_alert",
dag=dag,
)
# -----------------------------------------------------------------------------
# DAG's Tasks Dependencies (ie execution order)
# -----------------------------------------------------------------------------
(
scan_gcs_bucket_task
>> get_gcs_partition_task
>> get_bq_partition_task
>> check_new_partition_task
>> trigger_dag_task_loop
>> identify_partition_gap_task
>> [send_alert_task, ignore_alert_task]
)

Daily Pipeline.py

Note:

To access configuration in your DAG use {{ dag_run.conf }}. As core.dag_run_conf_overrides_params is set to True, so passing any configuration here will override task params which can be accessed via {{ params }}.

import os
from datetime import timedelta
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.python_operator import PythonOperator, BranchPythonOperator
from airflow.exceptions import AirflowException
from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import (
GCSToBigQueryOperator,
)
from airflow.providers.google.cloud.operators.bigquery import (
BigQueryDeleteTableOperator,
BigQueryInsertJobOperator,
)
from airflow.providers.slack.operators.slack import SlackAPIPostOperator
# -----------------------------------------------------------------------------
# Parameters
# -----------------------------------------------------------------------------
translate_table = str.maketrans(" :-", "___")
translate_partition = str.maketrans(":-", " ")
environment = "dev"
data_frequency = "daily"
gcs_file_format = "AVRO"
project_name = "PROJECT_NAME"
team_name = "TEAM_NAME"
location = "europe-west2"
project_id = "PROJECT_ID"
dataset_id = "DATASET_ID"
feed_name = "FEED_NAME"
gcs_bucket = "BUCKET_NAME"
slack_channel_id = "CXXXXXXX" # develop
slack_conn_id = "slack_api_default" # Created via Google Cloud Composer Airflow UI
slack_username = "Demo_ETL"
dag_id = f"{os.path.basename(__file__).replace('.py', '')}"
labels = {"env": environment, "project": project_name, "team": team_name}
delete_partition_from_table_query = """DELETE `{project_id}.{dataset_id}.{raw_table_id}` WHERE partition_key = DATE_TRUNC("{partition_key}", DAY);"""
insert_into_table_query = """
INSERT INTO
`{project_id}.{dataset_id}.{raw_table_id}` (
view_id,
view_name,
file_id,
file_name,
dt,
partition_key
)
SELECT
view_id,
view_name,
file_id,
file_name,
dt, DATE_TRUNC("{partition_key}", DAY) as partition_key
FROM
`{temp_table_uri}`;
"""
raw_table_partition_count_query = """
SELECT
total_rows as total
FROM
`{project_id}.{dataset_id}.INFORMATION_SCHEMA.PARTITIONS`
WHERE
table_name = '{raw_table_id}' and partition_id = '{bq_parition_key}' ;
"""
temp_table_count_query = """
SELECT
count(*) as total
FROM
`{temp_table_uri}`
"""
# -----------------------------------------------------------------------------
# DAG Init
# -----------------------------------------------------------------------------
default_args = {
"owner": f"{team_name.upper()}",
"start_date": days_ago(1),
"retries": 0,
"retry_delay": timedelta(hours=1),
"execution_timeout": timedelta(minutes=30),
"depends_on_past": False,
}
dag = DAG(
dag_id=dag_id,
description=f"A DAG to perform ETL from GCS bucket({data_frequency}) to BigQuery",
default_args=default_args,
schedule_interval=None,
catchup=False,
max_active_runs=1,
render_template_as_native_obj=True,
tags=[
environment,
project_name,
"etl",
feed_name.translate(translate_table),
data_frequency,
],
)
# -----------------------------------------------------------------------------
# Functions
# -----------------------------------------------------------------------------
def set_xcom_variables(**kwargs):
"""read partition_key from params and sets the xcom variables"""
ti = kwargs["ti"]
print(kwargs)
print(f"\t{kwargs['params']}")
params = kwargs["params"]
print(params)
partition_key = params.get("partition_key", None)
if not partition_key:
message = f"""{dag_id} | {project_id} | {dataset_id} | Partition Key Missing"""
print(message)
ti.xcom_push(key="slack_message", value=message)
raise AirflowException(f"Partition key missing in params.{kwargs['params']}")
print(params.get("partition_key", None))
gcs_prefix = f"{feed_name}/{partition_key}/*.avro"
table_slug = feed_name.translate(translate_table)
bq_parition_key = partition_key.translate(translate_partition).replace(" ", "")[:10]
raw_table_id = f"raw__{table_slug}__{data_frequency}"
raw_flat_table_id = f"raw__{table_slug}__flat__{data_frequency}"
temp_table_id = f"tmp__{table_slug}__{data_frequency}__{partition_key.translate(translate_table)}"
temp_table_uri = f"{project_id}.{dataset_id}.{temp_table_id}"
kv = dict(
project_id=project_id,
dataset_id=dataset_id,
raw_table_id=raw_table_id,
raw_flat_table_id=raw_flat_table_id,
partition_key=partition_key,
bq_parition_key=bq_parition_key,
temp_table_uri=temp_table_uri,
temp_table_id=temp_table_id,
)
ti.xcom_push(key="raw_table_id", value=raw_table_id)
ti.xcom_push(key="raw_flat_table_id", value=raw_flat_table_id)
ti.xcom_push(key="temp_table_id", value=temp_table_id)
ti.xcom_push(key="partition_key", value=partition_key)
ti.xcom_push(key="gcs_prefix", value=gcs_prefix)
ti.xcom_push(key="temp_table_uri", value=temp_table_uri)
ti.xcom_push(
key="delete_partition_from_table_query",
value=delete_partition_from_table_query.format(**kv),
)
ti.xcom_push(
key="insert_into_table_query", value=insert_into_table_query.format(**kv)
)
ti.xcom_push(
key="raw_table_partition_count_query",
value=raw_table_partition_count_query.format(**kv),
)
ti.xcom_push(
key="temp_table_count_query", value=temp_table_count_query.format(**kv)
)
def validate_schema(**kwargs):
"""Checks for Schema consistency"""
ti = kwargs["ti"]
temp_table_id = ti.xcom_pull(key="temp_table_id")
raw_table_id = ti.xcom_pull(key="raw_table_id")
hook = BigQueryHook(location=location, use_legacy_sql=False, labels=labels)
temp_table_schema = hook.get_schema(
dataset_id=dataset_id, table_id=temp_table_id, project_id=project_id
)
raw_table_schema = hook.get_schema(
dataset_id=dataset_id, table_id=raw_table_id, project_id=project_id
)
print(f"{type(temp_table_schema)} {temp_table_schema=}")
print(f"{type(raw_table_schema)} {raw_table_schema=}")
# remove the 'partition_key' in the raw schema
raw_table_schema["fields"] = [
_d
for _d in raw_table_schema["fields"]
if _d.get("name", "partition_key") != "partition_key"
]
if temp_table_schema == raw_table_schema:
print("Schema Matches")
return "insert_into_raw_table"
else:
message = f"""{dag_id} | {project_id} | {dataset_id} | {raw_table_id} | {temp_table_id} | Schema Mismatch identified"""
ti.xcom_push(key="slack_message", value=message)
raise AirflowException(message)
def validate_data_count(**kwargs):
"""Checks the data count in temp table and raw table(partition)"""
ti = kwargs["ti"]
hook = BigQueryHook(location=location, use_legacy_sql=False, labels=labels)
temp_table_id = ti.xcom_pull(key="temp_table_id")
raw_table_id = ti.xcom_pull(key="raw_table_id")
raw_table_partition_count_query = ti.xcom_pull(
key="raw_table_partition_count_query"
)
temp_table_count_query = ti.xcom_pull(key="temp_table_count_query")
raw_result_df = hook.get_pandas_df(raw_table_partition_count_query)
raw_table_count = raw_result_df["total"].tolist()
print(raw_table_count)
temp_result_df = hook.get_pandas_df(temp_table_count_query)
temp_table_count = temp_result_df["total"].tolist()
print(temp_table_count)
if raw_result_df.equals(temp_result_df):
print("\tCount Matches in both Raw and Temp tables")
return "delete_temp_table"
else:
message = f"""{dag_id} | {project_id} | {dataset_id} | {raw_table_id} | {temp_table_id} | Data count mismatch identified | {raw_table_count=} | {temp_table_count=}"""
ti.xcom_push(key="slack_message", value=message)
raise AirflowException(message)
# -----------------------------------------------------------------------------
# DAG Tasks
# -----------------------------------------------------------------------------
set_xcom_variables_task = PythonOperator(
task_id="set_xcom_variables",
python_callable=set_xcom_variables,
provide_context=True,
dag=dag,
)
create_temp_table_task = GCSToBigQueryOperator(
task_id="create_temp_table",
bucket=gcs_bucket,
source_objects=['{{ ti.xcom_pull(key="gcs_prefix") }}'],
destination_project_dataset_table='{{ ti.xcom_pull(key="temp_table_uri") }}',
source_format=gcs_file_format,
write_disposition="WRITE_TRUNCATE",
encoding="UTF-8",
external_table=True,
autodetect=True,
project_id=project_id,
labels=labels,
dag=dag,
)
delete_partition_if_exists_task = BigQueryInsertJobOperator(
task_id="delete_partition_if_exists",
configuration={
"query": {
"query": '{{ ti.xcom_pull(key="delete_partition_from_table_query") }}',
"useLegacySql": False,
}
},
dag=dag,
)
insert_into_raw_table_task = BigQueryInsertJobOperator(
task_id="insert_into_raw_table",
configuration={
"query": {
"query": '{{ ti.xcom_pull(key="insert_into_table_query") }}',
"useLegacySql": False,
}
},
dag=dag,
)
delete_temp_table_task = BigQueryDeleteTableOperator(
task_id="delete_temp_table",
deletion_dataset_table='{{ ti.xcom_pull(key="temp_table_uri") }}',
ignore_if_missing=False,
dag=dag,
)
delete_temp_table_if_exists_task = BigQueryDeleteTableOperator(
task_id="delete_temp_table_if_exists",
deletion_dataset_table='{{ ti.xcom_pull(key="temp_table_uri") }}',
ignore_if_missing=True,
dag=dag,
)
validate_schema_task = BranchPythonOperator(
task_id="validate_schema",
python_callable=validate_schema,
provide_context=True,
dag=dag,
)
validate_data_count_task = BranchPythonOperator(
task_id="validate_data_count",
python_callable=validate_data_count,
provide_context=True,
dag=dag,
)
send_alert_task = SlackAPIPostOperator(
task_id="send_alert",
text="{{ task_instance.xcom_pull(key='slack_message') }}",
channel=slack_channel_id,
username=slack_username,
trigger_rule="one_failed",
dag=dag,
)
# -----------------------------------------------------------------------------
# DAG's Tasks Dependencies (ie execution order)
# -----------------------------------------------------------------------------
(
set_xcom_variables_task
>> delete_temp_table_if_exists_task
>> delete_partition_if_exists_task
>> create_temp_table_task
>> validate_schema_task
>> [insert_into_raw_table_task, send_alert_task]
)
(
insert_into_raw_table_task
>> validate_data_count_task
>> [delete_temp_table_task, send_alert_task]
)
send_alert_task

Same code can be tweaked to the hourly partition data as well.

Reference:

  1. https://airflow.apache.org/docs/apache-airflow-providers-slack/stable/connections/slack.html
  2. https://airflow.apache.org/docs/apache-airflow-providers-slack/stable/index.html

Please find the Code repo Here