Demo: ETL from GCS to BigQuery using Composer(Airflow)
- Published on
-  • hourglass-not-done8 mins read•eye––– views
Photo by Chris Ried on Unsplash
This post shares the daily ETL operation from Cloud storage (Daily Avro file) into BigQuery Raw Table.
- 
DAG Creation: Developed two DAGs to monitor and perform ETL operations efficiently. 
- 
Validation Implementations: - Implemented logic to identify partition gaps.
- Added data count validation to ensure data integrity.
- Included schema mismatch validation to detect any discrepancies in the data structure.
 
- 
Slack Alerts: Created a Slack alert task to send instant notifications for any issues or updates during the ETL process. 
Diagram:


This version provides a clear and concise summary of your achievements.
Setting up Slack in Composer (Airflow)
- Click DAGS in Cloud Console option, then add apache-airflow-providers-slackunder PyPI packages tab for install slack dependencies in the airflow cluster.
- Create slack_api_defaultunder the Admin -> Connections tab via Composer UI. Paste the slack token here.
from airflow.models.connection import Connection
conn = Connection(    conn_id="slack_api_default",    conn_type="slack",    password="",    extra={        # Specify extra parameters here        "timeout": "42",    },)
# Generate Environment Variable Nameenv_key = f"AIRFLOW_CONN_{conn.conn_id.upper()}"
print(f"{env_key}='{conn.get_uri()}'")# AIRFLOW_CONN_SLACK_API_DEFAULT='slack://:xoxb-1234567890123-09876543210987-AbCdEfGhIjKlMnOpQrStUvWx@/?timeout=42'Daily Monitor.py
import osfrom datetime import timedelta, datetimefrom airflow import DAGfrom airflow.utils.dates import days_agofrom airflow.operators.dummy import DummyOperatorfrom airflow.operators.trigger_dagrun import TriggerDagRunOperatorfrom airflow.operators.python import PythonOperator, BranchPythonOperatorfrom airflow.providers.google.cloud.operators.gcs import GCSListObjectsOperatorfrom airflow.providers.google.cloud.hooks.bigquery import BigQueryHookfrom airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperatorfrom airflow.providers.slack.operators.slack import SlackAPIPostOperator
# -----------------------------------------------------------------------------# Parameters# -----------------------------------------------------------------------------translate_table = str.maketrans(" :-", "___")translate_partition = str.maketrans(":-", "  ")
environment = "dev"data_frequency = "daily"gcs_file_format = "AVRO"gcs_success_file = "_SUCCESS"project_name = "PROJECT_NAME"team_name = "PROJECT_NAME"location = "europe-west2"
project_id = "PROJECT_ID"dataset_id = "DATASET_ID"gcs_bucket = "BUCKET_NAME"feed_name = "FEED_NAME"
slack_channel_id = "CXXXXXX"  # developslack_conn_id = "slack_api_default"  # Created via Google Cloud Composer Airflow UIslack_username = "Demo_ETL"
dag_id = f"{os.path.basename(__file__).replace('.py', '')}"gcs_prefix = f"{feed_name}/"table_slug = feed_name.translate(translate_table)raw_table_id = f"raw__{table_slug}__{data_frequency}"labels = {"env": environment, "project": project_name, "team": team_name}pipeline_dag_id = (    f"{environment}__{project_name}__{table_slug}__{data_frequency}__pipeline")
# SQL Query to get the bigquery table partitionsget_raw_table_partition_query = f"""SELECT    partition_idFROM    `{project_id}.{dataset_id}.INFORMATION_SCHEMA.PARTITIONS`WHERE    table_name = '{raw_table_id}';"""
# SQL Query for identifying the partition gapidentify_missing_partition_query = f"""WITH     raw AS (        SELECT            partition_id        FROM            `{project_id}.{dataset_id}.INFORMATION_SCHEMA.PARTITIONS`        WHERE            table_name = '{raw_table_id}'            AND partition_id <> '__NULL__'    ),    expected AS (        SELECT            FORMAT_DATE("%Y%m%d", date) AS partition_id        FROM            UNNEST(                GENERATE_DATE_ARRAY(                    '2024-01-01',                    DATE_SUB(CURRENT_DATE(), INTERVAL 2 DAY), -- TIMEDELTA                    INTERVAL 1 DAY                )            ) AS date    )SELECT    expected.partition_idFROM    expected LEFT JOIN raw ON raw.partition_id = expected.partition_idWHERE    raw.partition_id IS NULL    AND expected.partition_id >= (        SELECT            COALESCE(MIN(partition_id), FORMAT_DATE("%Y%m%d", CURRENT_DATE()))        FROM            `{project_id}.{dataset_id}.INFORMATION_SCHEMA.PARTITIONS`        WHERE            table_name = '{raw_table_id}'            AND partition_id <> '__NULL__'    );"""# -----------------------------------------------------------------------------# DAG Init# -----------------------------------------------------------------------------
default_args = {    "owner": f"{team_name.upper()}",    "start_date": days_ago(1),    "retries": 0,    "retry_delay": timedelta(hours=1),    "execution_timeout": timedelta(minutes=30),    "depends_on_past": False,}
dag = DAG(    dag_id=dag_id,    description=f"A DAG to monitor the {project_name.capitalize()} file landing path in GCS and initiate the ETL DAG pipeline.",    default_args=default_args,    schedule=timedelta(hours=1),    catchup=False,    max_active_runs=1,    tags=[environment, project_name, "etl", table_slug, data_frequency],)
# -----------------------------------------------------------------------------# Functions# -----------------------------------------------------------------------------
def get_gcs_partition(**kwargs):    """fetches the gcs partition list from `scan_gcs_bucket` task and filter the successfully uploaded partition folders."""    ti = kwargs["ti"]    folders = [        os.path.dirname(object)        for object in ti.xcom_pull(task_ids="scan_gcs_bucket")        if object.endswith(gcs_success_file)    ]    storage_partition = [item.split("/")[1] for item in folders]    print(f"\t{storage_partition=}")    ti.xcom_push(key="storage_partition", value=storage_partition)
def check_partition_exists(job_id, **kwargs):    """compare the gcs parition and bigquery table partition to get the latest/new partition list."""    print(f"\t BigQuery Job ID: {job_id}")
    ti = kwargs["ti"]    hook = BigQueryHook()    client = hook.get_client(project_id=project_id, location=location)    query_results = client.get_job(job_id).result()
    bq_partition = [        datetime.strptime(row.get("partition_id"), "%Y%m%d%H").strftime(            "%Y-%m-%d %H:%M:%S"        )        for row in query_results        if row.get("partition_id", "__NULL__") not in ["__NULL__"]    ]    bq_partition.sort()    print(f"\t{bq_partition=}")
    storage_partition = ti.xcom_pull(        task_ids="get_gcs_partition", key="storage_partition"    )    storage_partition.sort()    print(f"\t{storage_partition=}")
    # Identify the new partitions    new_storage_partition = list(set(storage_partition) - set(bq_partition))    new_storage_partition.sort()    print(f"\t{new_storage_partition=}")    ti.xcom_push(key="new_storage_partition", value=new_storage_partition)
def trigger_dags(**kwargs):    """triggers the ETL pipeline DAG with partition_key"""    ti = kwargs["ti"]    partition_keys = ti.xcom_pull(        task_ids="check_new_partition", key="new_storage_partition"    )
    for partition_key in partition_keys:        print({"parition_key": partition_key})        TriggerDagRunOperator(            task_id=f"pipeline_dag__{partition_key.translate(translate_table)}",            trigger_dag_id=pipeline_dag_id,            conf={"partition_key": partition_key},            dag=dag,        ).execute(context=kwargs)
def identify_partition_gap(**kwargs):    """Checks for the partition gap and notifies in slack channel if any gap is found."""    ti = kwargs["ti"]    print(identify_missing_partition_query)    hook = BigQueryHook(location=location, use_legacy_sql=False, labels=labels)    df = hook.get_pandas_df(identify_missing_partition_query)    missing_partitions = df["partition_id"].tolist()    print(f"\t{missing_partitions}")    if len(missing_partitions):        message = f"""{dag_id} | {project_id} | {dataset_id} | {raw_table_id} | Partition gap identified | Total - {len(missing_partitions)} - {missing_partitions}"""        ti.xcom_push(key="slack_message", value=message)        return "send_alert"    return "ignore_alert"
# -----------------------------------------------------------------------------# DAG Tasks# -----------------------------------------------------------------------------
scan_gcs_bucket_task = GCSListObjectsOperator(    task_id="scan_gcs_bucket",    bucket=gcs_bucket,    prefix=gcs_prefix,    match_glob=f"**/*/{gcs_success_file}",    dag=dag,)
get_gcs_partition_task = PythonOperator(    task_id="get_gcs_partition",    python_callable=get_gcs_partition,    provide_context=True,    dag=dag,)
get_bq_partition_task = BigQueryInsertJobOperator(    task_id="get_bq_partition",    configuration={        "query": {            "query": get_raw_table_partition_query,            "useLegacySql": False,        }    },    location=location,    project_id=project_id,    dag=dag,)
check_new_partition_task = PythonOperator(    task_id="check_new_partition",    python_callable=check_partition_exists,    op_args=["{{ task_instance.xcom_pull(task_ids='get_bq_partition') }}"],    provide_context=True,    dag=dag,)
trigger_dag_task_loop = PythonOperator(    task_id="trigger_dag_loop",    python_callable=trigger_dags,    provide_context=True,    dag=dag,)
identify_partition_gap_task = BranchPythonOperator(    task_id="identify_partition_gap",    python_callable=identify_partition_gap,    dag=dag,)
send_alert_task = SlackAPIPostOperator(    task_id="send_alert",    text="{{ task_instance.xcom_pull(key='slack_message') }}",    channel=slack_channel_id,    username=slack_username,    dag=dag,)
ignore_alert_task = DummyOperator(    task_id="ignore_alert",    dag=dag,)# -----------------------------------------------------------------------------# DAG's Tasks Dependencies (ie execution order)# -----------------------------------------------------------------------------
(    scan_gcs_bucket_task    >> get_gcs_partition_task    >> get_bq_partition_task    >> check_new_partition_task    >> trigger_dag_task_loop    >> identify_partition_gap_task    >> [send_alert_task, ignore_alert_task])
Daily Pipeline.py
Note:
To access configuration in your DAG use {{ dag_run.conf }}. As core.dag_run_conf_overrides_params is set to True, so passing any configuration here will override task params which can be accessed via {{ params }}.
import osfrom datetime import timedeltafrom airflow import DAGfrom airflow.utils.dates import days_agofrom airflow.operators.python_operator import PythonOperator, BranchPythonOperatorfrom airflow.exceptions import AirflowExceptionfrom airflow.providers.google.cloud.hooks.bigquery import BigQueryHookfrom airflow.providers.google.cloud.transfers.gcs_to_bigquery import (    GCSToBigQueryOperator,)from airflow.providers.google.cloud.operators.bigquery import (    BigQueryDeleteTableOperator,    BigQueryInsertJobOperator,)from airflow.providers.slack.operators.slack import SlackAPIPostOperator
# -----------------------------------------------------------------------------# Parameters# -----------------------------------------------------------------------------translate_table = str.maketrans(" :-", "___")translate_partition = str.maketrans(":-", "  ")
environment = "dev"data_frequency = "daily"gcs_file_format = "AVRO"project_name = "PROJECT_NAME"team_name = "TEAM_NAME"location = "europe-west2"project_id = "PROJECT_ID"dataset_id = "DATASET_ID"feed_name = "FEED_NAME"gcs_bucket = "BUCKET_NAME"
slack_channel_id = "CXXXXXXX"  # developslack_conn_id = "slack_api_default"  # Created via Google Cloud Composer Airflow UIslack_username = "Demo_ETL"
dag_id = f"{os.path.basename(__file__).replace('.py', '')}"labels = {"env": environment, "project": project_name, "team": team_name}
delete_partition_from_table_query = """DELETE `{project_id}.{dataset_id}.{raw_table_id}` WHERE partition_key = DATE_TRUNC("{partition_key}", DAY);"""
insert_into_table_query = """INSERT INTO    `{project_id}.{dataset_id}.{raw_table_id}`  (    view_id,    view_name,    file_id,    file_name,    dt,    partition_key    )SELECT    view_id,    view_name,    file_id,    file_name,    dt, DATE_TRUNC("{partition_key}", DAY) as partition_keyFROM    `{temp_table_uri}`;"""
raw_table_partition_count_query = """SELECT    total_rows as totalFROM    `{project_id}.{dataset_id}.INFORMATION_SCHEMA.PARTITIONS`WHERE    table_name = '{raw_table_id}' and partition_id = '{bq_parition_key}' ;"""
temp_table_count_query = """SELECT    count(*) as totalFROM    `{temp_table_uri}`"""
# -----------------------------------------------------------------------------# DAG Init# -----------------------------------------------------------------------------
default_args = {    "owner": f"{team_name.upper()}",    "start_date": days_ago(1),    "retries": 0,    "retry_delay": timedelta(hours=1),    "execution_timeout": timedelta(minutes=30),    "depends_on_past": False,}
dag = DAG(    dag_id=dag_id,    description=f"A DAG to perform ETL from GCS bucket({data_frequency}) to BigQuery",    default_args=default_args,    schedule_interval=None,    catchup=False,    max_active_runs=1,    render_template_as_native_obj=True,    tags=[        environment,        project_name,        "etl",        feed_name.translate(translate_table),        data_frequency,    ],)
# -----------------------------------------------------------------------------# Functions# -----------------------------------------------------------------------------
def set_xcom_variables(**kwargs):    """read partition_key from params and sets the xcom variables"""    ti = kwargs["ti"]    print(kwargs)    print(f"\t{kwargs['params']}")    params = kwargs["params"]    print(params)    partition_key = params.get("partition_key", None)    if not partition_key:        message = f"""{dag_id} | {project_id} | {dataset_id} | Partition Key Missing"""        print(message)        ti.xcom_push(key="slack_message", value=message)        raise AirflowException(f"Partition key missing in params.{kwargs['params']}")
    print(params.get("partition_key", None))    gcs_prefix = f"{feed_name}/{partition_key}/*.avro"    table_slug = feed_name.translate(translate_table)    bq_parition_key = partition_key.translate(translate_partition).replace(" ", "")[:10]    raw_table_id = f"raw__{table_slug}__{data_frequency}"    raw_flat_table_id = f"raw__{table_slug}__flat__{data_frequency}"    temp_table_id = f"tmp__{table_slug}__{data_frequency}__{partition_key.translate(translate_table)}"    temp_table_uri = f"{project_id}.{dataset_id}.{temp_table_id}"
    kv = dict(        project_id=project_id,        dataset_id=dataset_id,        raw_table_id=raw_table_id,        raw_flat_table_id=raw_flat_table_id,        partition_key=partition_key,        bq_parition_key=bq_parition_key,        temp_table_uri=temp_table_uri,        temp_table_id=temp_table_id,    )
    ti.xcom_push(key="raw_table_id", value=raw_table_id)    ti.xcom_push(key="raw_flat_table_id", value=raw_flat_table_id)    ti.xcom_push(key="temp_table_id", value=temp_table_id)    ti.xcom_push(key="partition_key", value=partition_key)    ti.xcom_push(key="gcs_prefix", value=gcs_prefix)    ti.xcom_push(key="temp_table_uri", value=temp_table_uri)    ti.xcom_push(        key="delete_partition_from_table_query",        value=delete_partition_from_table_query.format(**kv),    )    ti.xcom_push(        key="insert_into_table_query", value=insert_into_table_query.format(**kv)    )    ti.xcom_push(        key="raw_table_partition_count_query",        value=raw_table_partition_count_query.format(**kv),    )    ti.xcom_push(        key="temp_table_count_query", value=temp_table_count_query.format(**kv)    )
def validate_schema(**kwargs):    """Checks for Schema consistency"""    ti = kwargs["ti"]    temp_table_id = ti.xcom_pull(key="temp_table_id")    raw_table_id = ti.xcom_pull(key="raw_table_id")    hook = BigQueryHook(location=location, use_legacy_sql=False, labels=labels)    temp_table_schema = hook.get_schema(        dataset_id=dataset_id, table_id=temp_table_id, project_id=project_id    )    raw_table_schema = hook.get_schema(        dataset_id=dataset_id, table_id=raw_table_id, project_id=project_id    )
    print(f"{type(temp_table_schema)} {temp_table_schema=}")    print(f"{type(raw_table_schema)} {raw_table_schema=}")
    # remove the 'partition_key' in the raw schema    raw_table_schema["fields"] = [        _d        for _d in raw_table_schema["fields"]        if _d.get("name", "partition_key") != "partition_key"    ]
    if temp_table_schema == raw_table_schema:        print("Schema Matches")        return "insert_into_raw_table"    else:        message = f"""{dag_id} | {project_id} | {dataset_id} | {raw_table_id} | {temp_table_id} | Schema Mismatch identified"""        ti.xcom_push(key="slack_message", value=message)        raise AirflowException(message)
def validate_data_count(**kwargs):    """Checks the data count in temp table and raw table(partition)"""    ti = kwargs["ti"]    hook = BigQueryHook(location=location, use_legacy_sql=False, labels=labels)
    temp_table_id = ti.xcom_pull(key="temp_table_id")    raw_table_id = ti.xcom_pull(key="raw_table_id")    raw_table_partition_count_query = ti.xcom_pull(        key="raw_table_partition_count_query"    )    temp_table_count_query = ti.xcom_pull(key="temp_table_count_query")
    raw_result_df = hook.get_pandas_df(raw_table_partition_count_query)    raw_table_count = raw_result_df["total"].tolist()    print(raw_table_count)
    temp_result_df = hook.get_pandas_df(temp_table_count_query)    temp_table_count = temp_result_df["total"].tolist()    print(temp_table_count)
    if raw_result_df.equals(temp_result_df):        print("\tCount Matches in both Raw and Temp tables")        return "delete_temp_table"    else:        message = f"""{dag_id} | {project_id} | {dataset_id} | {raw_table_id} | {temp_table_id} | Data count mismatch identified | {raw_table_count=} | {temp_table_count=}"""        ti.xcom_push(key="slack_message", value=message)        raise AirflowException(message)
# -----------------------------------------------------------------------------# DAG Tasks# -----------------------------------------------------------------------------
set_xcom_variables_task = PythonOperator(    task_id="set_xcom_variables",    python_callable=set_xcom_variables,    provide_context=True,    dag=dag,)
create_temp_table_task = GCSToBigQueryOperator(    task_id="create_temp_table",    bucket=gcs_bucket,    source_objects=['{{ ti.xcom_pull(key="gcs_prefix") }}'],    destination_project_dataset_table='{{ ti.xcom_pull(key="temp_table_uri") }}',    source_format=gcs_file_format,    write_disposition="WRITE_TRUNCATE",    encoding="UTF-8",    external_table=True,    autodetect=True,    project_id=project_id,    labels=labels,    dag=dag,)
delete_partition_if_exists_task = BigQueryInsertJobOperator(    task_id="delete_partition_if_exists",    configuration={        "query": {            "query": '{{ ti.xcom_pull(key="delete_partition_from_table_query") }}',            "useLegacySql": False,        }    },    dag=dag,)
insert_into_raw_table_task = BigQueryInsertJobOperator(    task_id="insert_into_raw_table",    configuration={        "query": {            "query": '{{ ti.xcom_pull(key="insert_into_table_query") }}',            "useLegacySql": False,        }    },    dag=dag,)
delete_temp_table_task = BigQueryDeleteTableOperator(    task_id="delete_temp_table",    deletion_dataset_table='{{ ti.xcom_pull(key="temp_table_uri") }}',    ignore_if_missing=False,    dag=dag,)
delete_temp_table_if_exists_task = BigQueryDeleteTableOperator(    task_id="delete_temp_table_if_exists",    deletion_dataset_table='{{ ti.xcom_pull(key="temp_table_uri") }}',    ignore_if_missing=True,    dag=dag,)
validate_schema_task = BranchPythonOperator(    task_id="validate_schema",    python_callable=validate_schema,    provide_context=True,    dag=dag,)
validate_data_count_task = BranchPythonOperator(    task_id="validate_data_count",    python_callable=validate_data_count,    provide_context=True,    dag=dag,)
send_alert_task = SlackAPIPostOperator(    task_id="send_alert",    text="{{ task_instance.xcom_pull(key='slack_message') }}",    channel=slack_channel_id,    username=slack_username,    trigger_rule="one_failed",    dag=dag,)
# -----------------------------------------------------------------------------# DAG's Tasks Dependencies (ie execution order)# -----------------------------------------------------------------------------
(    set_xcom_variables_task    >> delete_temp_table_if_exists_task    >> delete_partition_if_exists_task    >> create_temp_table_task    >> validate_schema_task    >> [insert_into_raw_table_task, send_alert_task])
(    insert_into_raw_table_task    >> validate_data_count_task    >> [delete_temp_table_task, send_alert_task])
send_alert_task
Same code can be tweaked to the hourly partition data as well.
Reference: