Demo: ETL from GCS to BigQuery using Composer(Airflow)
- Published on
- • 8 mins read•––– views
Photo by Chris Ried on Unsplash
This post shares the daily ETL operation from Cloud storage (Daily Avro file) into BigQuery Raw Table.
-
DAG Creation: Developed two DAGs to monitor and perform ETL operations efficiently.
-
Validation Implementations:
- Implemented logic to identify partition gaps.
- Added data count validation to ensure data integrity.
- Included schema mismatch validation to detect any discrepancies in the data structure.
-
Slack Alerts: Created a Slack alert task to send instant notifications for any issues or updates during the ETL process.
Diagram:
This version provides a clear and concise summary of your achievements.
Setting up Slack in Composer (Airflow)
- Click DAGS in Cloud Console option, then add
apache-airflow-providers-slack
under PyPI packages tab for install slack dependencies in the airflow cluster. - Create
slack_api_default
under the Admin -> Connections tab via Composer UI. Paste the slack token here.
from airflow.models.connection import Connection
conn = Connection( conn_id="slack_api_default", conn_type="slack", password="", extra={ # Specify extra parameters here "timeout": "42", },)
# Generate Environment Variable Nameenv_key = f"AIRFLOW_CONN_{conn.conn_id.upper()}"
print(f"{env_key}='{conn.get_uri()}'")# AIRFLOW_CONN_SLACK_API_DEFAULT='slack://:xoxb-1234567890123-09876543210987-AbCdEfGhIjKlMnOpQrStUvWx@/?timeout=42'
Daily Monitor.py
import osfrom datetime import timedelta, datetimefrom airflow import DAGfrom airflow.utils.dates import days_agofrom airflow.operators.dummy import DummyOperatorfrom airflow.operators.trigger_dagrun import TriggerDagRunOperatorfrom airflow.operators.python import PythonOperator, BranchPythonOperatorfrom airflow.providers.google.cloud.operators.gcs import GCSListObjectsOperatorfrom airflow.providers.google.cloud.hooks.bigquery import BigQueryHookfrom airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperatorfrom airflow.providers.slack.operators.slack import SlackAPIPostOperator
# -----------------------------------------------------------------------------# Parameters# -----------------------------------------------------------------------------translate_table = str.maketrans(" :-", "___")translate_partition = str.maketrans(":-", " ")
environment = "dev"data_frequency = "daily"gcs_file_format = "AVRO"gcs_success_file = "_SUCCESS"project_name = "PROJECT_NAME"team_name = "PROJECT_NAME"location = "europe-west2"
project_id = "PROJECT_ID"dataset_id = "DATASET_ID"gcs_bucket = "BUCKET_NAME"feed_name = "FEED_NAME"
slack_channel_id = "CXXXXXX" # developslack_conn_id = "slack_api_default" # Created via Google Cloud Composer Airflow UIslack_username = "Demo_ETL"
dag_id = f"{os.path.basename(__file__).replace('.py', '')}"gcs_prefix = f"{feed_name}/"table_slug = feed_name.translate(translate_table)raw_table_id = f"raw__{table_slug}__{data_frequency}"labels = {"env": environment, "project": project_name, "team": team_name}pipeline_dag_id = ( f"{environment}__{project_name}__{table_slug}__{data_frequency}__pipeline")
# SQL Query to get the bigquery table partitionsget_raw_table_partition_query = f"""SELECT partition_idFROM `{project_id}.{dataset_id}.INFORMATION_SCHEMA.PARTITIONS`WHERE table_name = '{raw_table_id}';"""
# SQL Query for identifying the partition gapidentify_missing_partition_query = f"""WITH raw AS ( SELECT partition_id FROM `{project_id}.{dataset_id}.INFORMATION_SCHEMA.PARTITIONS` WHERE table_name = '{raw_table_id}' AND partition_id <> '__NULL__' ), expected AS ( SELECT FORMAT_DATE("%Y%m%d", date) AS partition_id FROM UNNEST( GENERATE_DATE_ARRAY( '2024-01-01', DATE_SUB(CURRENT_DATE(), INTERVAL 2 DAY), -- TIMEDELTA INTERVAL 1 DAY ) ) AS date )SELECT expected.partition_idFROM expected LEFT JOIN raw ON raw.partition_id = expected.partition_idWHERE raw.partition_id IS NULL AND expected.partition_id >= ( SELECT COALESCE(MIN(partition_id), FORMAT_DATE("%Y%m%d", CURRENT_DATE())) FROM `{project_id}.{dataset_id}.INFORMATION_SCHEMA.PARTITIONS` WHERE table_name = '{raw_table_id}' AND partition_id <> '__NULL__' );"""# -----------------------------------------------------------------------------# DAG Init# -----------------------------------------------------------------------------
default_args = { "owner": f"{team_name.upper()}", "start_date": days_ago(1), "retries": 0, "retry_delay": timedelta(hours=1), "execution_timeout": timedelta(minutes=30), "depends_on_past": False,}
dag = DAG( dag_id=dag_id, description=f"A DAG to monitor the {project_name.capitalize()} file landing path in GCS and initiate the ETL DAG pipeline.", default_args=default_args, schedule=timedelta(hours=1), catchup=False, max_active_runs=1, tags=[environment, project_name, "etl", table_slug, data_frequency],)
# -----------------------------------------------------------------------------# Functions# -----------------------------------------------------------------------------
def get_gcs_partition(**kwargs): """fetches the gcs partition list from `scan_gcs_bucket` task and filter the successfully uploaded partition folders.""" ti = kwargs["ti"] folders = [ os.path.dirname(object) for object in ti.xcom_pull(task_ids="scan_gcs_bucket") if object.endswith(gcs_success_file) ] storage_partition = [item.split("/")[1] for item in folders] print(f"\t{storage_partition=}") ti.xcom_push(key="storage_partition", value=storage_partition)
def check_partition_exists(job_id, **kwargs): """compare the gcs parition and bigquery table partition to get the latest/new partition list.""" print(f"\t BigQuery Job ID: {job_id}")
ti = kwargs["ti"] hook = BigQueryHook() client = hook.get_client(project_id=project_id, location=location) query_results = client.get_job(job_id).result()
bq_partition = [ datetime.strptime(row.get("partition_id"), "%Y%m%d%H").strftime( "%Y-%m-%d %H:%M:%S" ) for row in query_results if row.get("partition_id", "__NULL__") not in ["__NULL__"] ] bq_partition.sort() print(f"\t{bq_partition=}")
storage_partition = ti.xcom_pull( task_ids="get_gcs_partition", key="storage_partition" ) storage_partition.sort() print(f"\t{storage_partition=}")
# Identify the new partitions new_storage_partition = list(set(storage_partition) - set(bq_partition)) new_storage_partition.sort() print(f"\t{new_storage_partition=}") ti.xcom_push(key="new_storage_partition", value=new_storage_partition)
def trigger_dags(**kwargs): """triggers the ETL pipeline DAG with partition_key""" ti = kwargs["ti"] partition_keys = ti.xcom_pull( task_ids="check_new_partition", key="new_storage_partition" )
for partition_key in partition_keys: print({"parition_key": partition_key}) TriggerDagRunOperator( task_id=f"pipeline_dag__{partition_key.translate(translate_table)}", trigger_dag_id=pipeline_dag_id, conf={"partition_key": partition_key}, dag=dag, ).execute(context=kwargs)
def identify_partition_gap(**kwargs): """Checks for the partition gap and notifies in slack channel if any gap is found.""" ti = kwargs["ti"] print(identify_missing_partition_query) hook = BigQueryHook(location=location, use_legacy_sql=False, labels=labels) df = hook.get_pandas_df(identify_missing_partition_query) missing_partitions = df["partition_id"].tolist() print(f"\t{missing_partitions}") if len(missing_partitions): message = f"""{dag_id} | {project_id} | {dataset_id} | {raw_table_id} | Partition gap identified | Total - {len(missing_partitions)} - {missing_partitions}""" ti.xcom_push(key="slack_message", value=message) return "send_alert" return "ignore_alert"
# -----------------------------------------------------------------------------# DAG Tasks# -----------------------------------------------------------------------------
scan_gcs_bucket_task = GCSListObjectsOperator( task_id="scan_gcs_bucket", bucket=gcs_bucket, prefix=gcs_prefix, match_glob=f"**/*/{gcs_success_file}", dag=dag,)
get_gcs_partition_task = PythonOperator( task_id="get_gcs_partition", python_callable=get_gcs_partition, provide_context=True, dag=dag,)
get_bq_partition_task = BigQueryInsertJobOperator( task_id="get_bq_partition", configuration={ "query": { "query": get_raw_table_partition_query, "useLegacySql": False, } }, location=location, project_id=project_id, dag=dag,)
check_new_partition_task = PythonOperator( task_id="check_new_partition", python_callable=check_partition_exists, op_args=["{{ task_instance.xcom_pull(task_ids='get_bq_partition') }}"], provide_context=True, dag=dag,)
trigger_dag_task_loop = PythonOperator( task_id="trigger_dag_loop", python_callable=trigger_dags, provide_context=True, dag=dag,)
identify_partition_gap_task = BranchPythonOperator( task_id="identify_partition_gap", python_callable=identify_partition_gap, dag=dag,)
send_alert_task = SlackAPIPostOperator( task_id="send_alert", text="{{ task_instance.xcom_pull(key='slack_message') }}", channel=slack_channel_id, username=slack_username, dag=dag,)
ignore_alert_task = DummyOperator( task_id="ignore_alert", dag=dag,)# -----------------------------------------------------------------------------# DAG's Tasks Dependencies (ie execution order)# -----------------------------------------------------------------------------
( scan_gcs_bucket_task >> get_gcs_partition_task >> get_bq_partition_task >> check_new_partition_task >> trigger_dag_task_loop >> identify_partition_gap_task >> [send_alert_task, ignore_alert_task])
Daily Pipeline.py
Note:
To access configuration in your DAG use {{ dag_run.conf }}
. As core.dag_run_conf_overrides_params
is set to True
, so passing any configuration here will override task params which can be accessed via {{ params }}
.
import osfrom datetime import timedeltafrom airflow import DAGfrom airflow.utils.dates import days_agofrom airflow.operators.python_operator import PythonOperator, BranchPythonOperatorfrom airflow.exceptions import AirflowExceptionfrom airflow.providers.google.cloud.hooks.bigquery import BigQueryHookfrom airflow.providers.google.cloud.transfers.gcs_to_bigquery import ( GCSToBigQueryOperator,)from airflow.providers.google.cloud.operators.bigquery import ( BigQueryDeleteTableOperator, BigQueryInsertJobOperator,)from airflow.providers.slack.operators.slack import SlackAPIPostOperator
# -----------------------------------------------------------------------------# Parameters# -----------------------------------------------------------------------------translate_table = str.maketrans(" :-", "___")translate_partition = str.maketrans(":-", " ")
environment = "dev"data_frequency = "daily"gcs_file_format = "AVRO"project_name = "PROJECT_NAME"team_name = "TEAM_NAME"location = "europe-west2"project_id = "PROJECT_ID"dataset_id = "DATASET_ID"feed_name = "FEED_NAME"gcs_bucket = "BUCKET_NAME"
slack_channel_id = "CXXXXXXX" # developslack_conn_id = "slack_api_default" # Created via Google Cloud Composer Airflow UIslack_username = "Demo_ETL"
dag_id = f"{os.path.basename(__file__).replace('.py', '')}"labels = {"env": environment, "project": project_name, "team": team_name}
delete_partition_from_table_query = """DELETE `{project_id}.{dataset_id}.{raw_table_id}` WHERE partition_key = DATE_TRUNC("{partition_key}", DAY);"""
insert_into_table_query = """INSERT INTO `{project_id}.{dataset_id}.{raw_table_id}` ( view_id, view_name, file_id, file_name, dt, partition_key )SELECT view_id, view_name, file_id, file_name, dt, DATE_TRUNC("{partition_key}", DAY) as partition_keyFROM `{temp_table_uri}`;"""
raw_table_partition_count_query = """SELECT total_rows as totalFROM `{project_id}.{dataset_id}.INFORMATION_SCHEMA.PARTITIONS`WHERE table_name = '{raw_table_id}' and partition_id = '{bq_parition_key}' ;"""
temp_table_count_query = """SELECT count(*) as totalFROM `{temp_table_uri}`"""
# -----------------------------------------------------------------------------# DAG Init# -----------------------------------------------------------------------------
default_args = { "owner": f"{team_name.upper()}", "start_date": days_ago(1), "retries": 0, "retry_delay": timedelta(hours=1), "execution_timeout": timedelta(minutes=30), "depends_on_past": False,}
dag = DAG( dag_id=dag_id, description=f"A DAG to perform ETL from GCS bucket({data_frequency}) to BigQuery", default_args=default_args, schedule_interval=None, catchup=False, max_active_runs=1, render_template_as_native_obj=True, tags=[ environment, project_name, "etl", feed_name.translate(translate_table), data_frequency, ],)
# -----------------------------------------------------------------------------# Functions# -----------------------------------------------------------------------------
def set_xcom_variables(**kwargs): """read partition_key from params and sets the xcom variables""" ti = kwargs["ti"] print(kwargs) print(f"\t{kwargs['params']}") params = kwargs["params"] print(params) partition_key = params.get("partition_key", None) if not partition_key: message = f"""{dag_id} | {project_id} | {dataset_id} | Partition Key Missing""" print(message) ti.xcom_push(key="slack_message", value=message) raise AirflowException(f"Partition key missing in params.{kwargs['params']}")
print(params.get("partition_key", None)) gcs_prefix = f"{feed_name}/{partition_key}/*.avro" table_slug = feed_name.translate(translate_table) bq_parition_key = partition_key.translate(translate_partition).replace(" ", "")[:10] raw_table_id = f"raw__{table_slug}__{data_frequency}" raw_flat_table_id = f"raw__{table_slug}__flat__{data_frequency}" temp_table_id = f"tmp__{table_slug}__{data_frequency}__{partition_key.translate(translate_table)}" temp_table_uri = f"{project_id}.{dataset_id}.{temp_table_id}"
kv = dict( project_id=project_id, dataset_id=dataset_id, raw_table_id=raw_table_id, raw_flat_table_id=raw_flat_table_id, partition_key=partition_key, bq_parition_key=bq_parition_key, temp_table_uri=temp_table_uri, temp_table_id=temp_table_id, )
ti.xcom_push(key="raw_table_id", value=raw_table_id) ti.xcom_push(key="raw_flat_table_id", value=raw_flat_table_id) ti.xcom_push(key="temp_table_id", value=temp_table_id) ti.xcom_push(key="partition_key", value=partition_key) ti.xcom_push(key="gcs_prefix", value=gcs_prefix) ti.xcom_push(key="temp_table_uri", value=temp_table_uri) ti.xcom_push( key="delete_partition_from_table_query", value=delete_partition_from_table_query.format(**kv), ) ti.xcom_push( key="insert_into_table_query", value=insert_into_table_query.format(**kv) ) ti.xcom_push( key="raw_table_partition_count_query", value=raw_table_partition_count_query.format(**kv), ) ti.xcom_push( key="temp_table_count_query", value=temp_table_count_query.format(**kv) )
def validate_schema(**kwargs): """Checks for Schema consistency""" ti = kwargs["ti"] temp_table_id = ti.xcom_pull(key="temp_table_id") raw_table_id = ti.xcom_pull(key="raw_table_id") hook = BigQueryHook(location=location, use_legacy_sql=False, labels=labels) temp_table_schema = hook.get_schema( dataset_id=dataset_id, table_id=temp_table_id, project_id=project_id ) raw_table_schema = hook.get_schema( dataset_id=dataset_id, table_id=raw_table_id, project_id=project_id )
print(f"{type(temp_table_schema)} {temp_table_schema=}") print(f"{type(raw_table_schema)} {raw_table_schema=}")
# remove the 'partition_key' in the raw schema raw_table_schema["fields"] = [ _d for _d in raw_table_schema["fields"] if _d.get("name", "partition_key") != "partition_key" ]
if temp_table_schema == raw_table_schema: print("Schema Matches") return "insert_into_raw_table" else: message = f"""{dag_id} | {project_id} | {dataset_id} | {raw_table_id} | {temp_table_id} | Schema Mismatch identified""" ti.xcom_push(key="slack_message", value=message) raise AirflowException(message)
def validate_data_count(**kwargs): """Checks the data count in temp table and raw table(partition)""" ti = kwargs["ti"] hook = BigQueryHook(location=location, use_legacy_sql=False, labels=labels)
temp_table_id = ti.xcom_pull(key="temp_table_id") raw_table_id = ti.xcom_pull(key="raw_table_id") raw_table_partition_count_query = ti.xcom_pull( key="raw_table_partition_count_query" ) temp_table_count_query = ti.xcom_pull(key="temp_table_count_query")
raw_result_df = hook.get_pandas_df(raw_table_partition_count_query) raw_table_count = raw_result_df["total"].tolist() print(raw_table_count)
temp_result_df = hook.get_pandas_df(temp_table_count_query) temp_table_count = temp_result_df["total"].tolist() print(temp_table_count)
if raw_result_df.equals(temp_result_df): print("\tCount Matches in both Raw and Temp tables") return "delete_temp_table" else: message = f"""{dag_id} | {project_id} | {dataset_id} | {raw_table_id} | {temp_table_id} | Data count mismatch identified | {raw_table_count=} | {temp_table_count=}""" ti.xcom_push(key="slack_message", value=message) raise AirflowException(message)
# -----------------------------------------------------------------------------# DAG Tasks# -----------------------------------------------------------------------------
set_xcom_variables_task = PythonOperator( task_id="set_xcom_variables", python_callable=set_xcom_variables, provide_context=True, dag=dag,)
create_temp_table_task = GCSToBigQueryOperator( task_id="create_temp_table", bucket=gcs_bucket, source_objects=['{{ ti.xcom_pull(key="gcs_prefix") }}'], destination_project_dataset_table='{{ ti.xcom_pull(key="temp_table_uri") }}', source_format=gcs_file_format, write_disposition="WRITE_TRUNCATE", encoding="UTF-8", external_table=True, autodetect=True, project_id=project_id, labels=labels, dag=dag,)
delete_partition_if_exists_task = BigQueryInsertJobOperator( task_id="delete_partition_if_exists", configuration={ "query": { "query": '{{ ti.xcom_pull(key="delete_partition_from_table_query") }}', "useLegacySql": False, } }, dag=dag,)
insert_into_raw_table_task = BigQueryInsertJobOperator( task_id="insert_into_raw_table", configuration={ "query": { "query": '{{ ti.xcom_pull(key="insert_into_table_query") }}', "useLegacySql": False, } }, dag=dag,)
delete_temp_table_task = BigQueryDeleteTableOperator( task_id="delete_temp_table", deletion_dataset_table='{{ ti.xcom_pull(key="temp_table_uri") }}', ignore_if_missing=False, dag=dag,)
delete_temp_table_if_exists_task = BigQueryDeleteTableOperator( task_id="delete_temp_table_if_exists", deletion_dataset_table='{{ ti.xcom_pull(key="temp_table_uri") }}', ignore_if_missing=True, dag=dag,)
validate_schema_task = BranchPythonOperator( task_id="validate_schema", python_callable=validate_schema, provide_context=True, dag=dag,)
validate_data_count_task = BranchPythonOperator( task_id="validate_data_count", python_callable=validate_data_count, provide_context=True, dag=dag,)
send_alert_task = SlackAPIPostOperator( task_id="send_alert", text="{{ task_instance.xcom_pull(key='slack_message') }}", channel=slack_channel_id, username=slack_username, trigger_rule="one_failed", dag=dag,)
# -----------------------------------------------------------------------------# DAG's Tasks Dependencies (ie execution order)# -----------------------------------------------------------------------------
( set_xcom_variables_task >> delete_temp_table_if_exists_task >> delete_partition_if_exists_task >> create_temp_table_task >> validate_schema_task >> [insert_into_raw_table_task, send_alert_task])
( insert_into_raw_table_task >> validate_data_count_task >> [delete_temp_table_task, send_alert_task])
send_alert_task
Same code can be tweaked to the hourly partition data as well.
Reference: