Syncing Between GCS Buckets in Google Cloud using Composer (Airflow)

Published on
9 mins read
––– views

Syncing Between GCS Buckets in Google Cloud

When working with Google Cloud, synchronizing data between Google Cloud Storage (GCS) buckets is a common requirement. There are multiple ways to achieve this, each with its own benefits and limitations. My goal was to identify a solution suitable for Google Cloud Composer (Airflow) that meets the following criteria:

  • Sync only when there are new or updated files (based on hash changes).
  • Remove unmatched files from the destination bucket.
  • Avoid unnecessary file rewrites.
  • Ensure reliability while working in an Airflow DAG.

Here are the options I explored:


Option 1: Using BashOperator with gcloud storage rsync

The gcloud storage rsync command is a powerful and flexible way to synchronize buckets. By combining it with Airflow's BashOperator, you can achieve a solution that meets all the criteria.

Prerequisites:

  • Google Cloud Composer includes the gcloud CLI by default. For hosted Airflow, you must manually install the CLI.

Use Cases:

  • Prioritize specific file types (e.g., .avro files) during synchronization.
  • Sync based on patterns and control the order of file transfers.
  • Preserve POSIX metadata when needed.

Pros:

  • Detects and syncs only new or modified files.
  • Supports recursive syncing of subfolders.
  • Deletes unmatched files in the destination bucket.
  • Allows pattern-based file selection for customized workflows.
  • Minimizes unnecessary file rewrites.

Cons:

  • Relies on Airflow worker resources like /tmp and SQLite databases, potentially causing crashes under heavy loads.
  • Consumes worker node resources, which may impact parallel task performance.
  • Does not move objects; it only copies them.

Reference:

Official gcloud storage rsync Documentation

Sample Code:

gcloud storage rsync gs://src gs://dest--checksums-only --continue-on-error --delete-unmatched-destination-objects --recursive --quiet --verbosity=info --preserve-posix --no-ignore-symlinks --user-output-enabled --include-managed-folders --skip-unsupported --exclude=".*\_SUCCESS$"
gcloud storage rsync gs://src gs://dest --checksums-only --continue-on-error --delete-unmatched-destination-objects --recursive --quiet --verbosity=info --preserve-posix --no-ignore-symlinks --user-output-enabled --include-managed-folders --skip-unsupported --exclude=".*\.avro$"
# Task 1: Sync .avro files first from source to destination bucket (daily)
sync_avro_daily_files = BashOperator(
task_id="sync_avro_daily_files",
bash_command=f"""gcloud storage rsync gs://{gcs_3rdparty_landing_bucket_daily} gs://{gcs_landing_bucket_daily} --checksums-only --continue-on-error --delete-unmatched-destination-objects --recursive --quiet --verbosity=info --preserve-posix --no-ignore-symlinks --user-output-enabled --skip-unsupported --exclude=".*\_SUCCESS$" --impersonate-service-account="{impersonate_account}" """,
dag=dag,
)
# Task 2: Sync the _SUCCESS file after .avro files have been synced (daily)
sync_success_daily_file = BashOperator(
task_id="sync_success_daily_file",
bash_command=f"""gcloud storage rsync gs://{gcs_3rdparty_landing_bucket_daily} gs://{gcs_landing_bucket_daily} --checksums-only --continue-on-error --delete-unmatched-destination-objects --recursive --quiet --verbosity=info --preserve-posix --no-ignore-symlinks --user-output-enabled --skip-unsupported --exclude=".*\.avro$" --impersonate-service-account="{impersonate_account}" """,
dag=dag,
)

Errors_log

[2024-11-21, 16:15:29 UTC] {subprocess.py:93} INFO - ..WARNING: Could not store access token in cache: database is locked
[2024-11-21, 16:15:29 UTC] {subprocess.py:93} INFO - WARNING: Could not store access token in cache: database is locked
[2024-11-21, 16:15:29 UTC] {subprocess.py:93} INFO - WARNING: Could not store access token in cache: database is locked
[2024-11-21, 16:15:30 UTC] {subprocess.py:93} INFO - ...WARNING: Could not store access token in cache: database is locked
[2024-11-21, 16:15:30 UTC] {subprocess.py:93} INFO - .WARNING: Could not store access token in cache: database is locked
[2024-11-21, 16:15:30 UTC] {subprocess.py:93} INFO - WARNING: Could not store access token in cache: database is locked
[2024-11-21, 16:15:31 UTC] {subprocess.py:93} INFO - ....
[2024-11-21, 16:15:31 UTC] {subprocess.py:93} INFO -
[2024-11-21, 16:15:31 UTC] {subprocess.py:93} INFO - Average throughput: 621.3MiB/s
[2024-11-21, 16:15:31 UTC] {subprocess.py:97} INFO - Command exited with return code 1
[2024-11-21, 16:15:31 UTC] {taskinstance.py:1826} ERROR - Task failed with exception
Traceback (most recent call last):
File "/opt/python3.11/lib/python3.11/site-packages/airflow/operators/bash.py", line 210, in execute
raise AirflowException(
airflow.exceptions.AirflowException: Bash command failed. The command returned a non-zero exit code 1.
[2024-11-21, 16:15:31 UTC] {taskinstance.py:1346} INFO - Marking task as FAILED. dag_id=1__bash_operator_rsync_tool, task_id=sync_avro_daily_files, execution_date=20241121T161246, start_date=20241121T161507, end_date=20241121T161531
[2024-11-21, 16:15:31 UTC] {standard_task_runner.py:104} ERROR - Failed to execute job 16002616 for task sync_avro_daily_files (Bash command failed. The command returned a non-zero exit code 1.; 2838153)
[2024-11-21, 16:15:32 UTC] {local_task_job_runner.py:225} INFO - Task exited with return code 1
[2024-11-21, 16:15:32 UTC] {taskinstance.py:2656} INFO - 0 downstream tasks scheduled from follow-on schedule check

2. GCSSynchronizeBucketsOperator

Synchronizes the contents of the buckets or bucket’s directories in the Google Cloud Services.

Prerequisites:

  • Install the Airflow provider: pip install apache-airflow-providers-google.

Use Cases:

  • Synchronizing entire directories or buckets.
  • Keeping multiple buckets (e.g., dev, test, staging) in sync with a single source.
  • Maintain identical copies of buckets.

Pros:

  • Handles recursive syncing efficiently.
  • Deletes unmatched files in the destination bucket.
  • Includes options like allow_overwrite for fine-grained control.
  • Safer and more reliable than BashOperator.

Cons:

  • Operates at a directory level; individual file synchronization isn’t supported.
  • Doesn’t move files; source files remain until explicitly deleted.

Reference:

GCSSynchronizeBucketsOperator Documentation

Sample Code:

class airflow.providers.google.cloud.operators.gcs.GCSSynchronizeBucketsOperator(*, source_bucket, destination_bucket, source_object=None, destination_object=None, recursive=True, delete_extra_files=False, allow_overwrite=False, gcp_conn_id='google_cloud_default', impersonation_chain=None, **kwargs)
sync_daily_files = GCSSynchronizeBucketsOperator(
task_id="sync_daily_files",
source_bucket=gcs_3rdparty_landing_bucket_daily,
destination_bucket=gcs_landing_bucket_daily,
source_object="acc/",
destination_object="acc/",
recursive=True,
delete_extra_files=True,
allow_overwrite=True,
impersonation_chain=impersonate_account,
dag=dag,
)

3. GCSToGCSOperator

A straightforward operator for copying objects between buckets.

Prerequisites:

  • Install the Airflow provider: pip install apache-airflow-providers-google.

Use Cases:

  • Incremental copies without hash validation.
  • Preserving historical copies in the destination bucket.

Pros:

  • Efficient for incremental copy tasks.
  • Supports file patterns and recursive operations.

Cons:

  • Lacks hash validation, leading to redundant rewrites.
  • Doesn’t delete unmatched files in the destination bucket.

Reference:

GCSToGCSOperator Documentation

Sample Code:

class airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSToGCSOperator(*, source_bucket, source_object=None, source_objects=None, destination_bucket=None, destination_object=None, delimiter=None, move_object=False, replace=True, gcp_conn_id='google_cloud_default', last_modified_time=None, maximum_modified_time=None, is_older_than=None, impersonation_chain=None, source_object_required=False, exact_match=False, match_glob=None, **kwargs)
sync_daily_files = GCSToGCSOperator(
task_id="sync_daily_files",
source_bucket=gcs_3rdparty_landing_bucket_daily,
destination_bucket=gcs_landing_bucket_daily,
source_object="acc/",
destination_object="acc/",
move_object=False,
replace=True,
last_modified_time=None,
maximum_modified_time=None,
is_older_than=None,
source_object_required=False,
exact_match=False,
match_glob=None,
impersonation_chain=impersonate_account,
dag=dag,
)

4. CloudDataTransferServiceGCSToGCSOperator

This operator utilizes Google’s Cloud Storage Transfer Service to manage bucket-to-bucket transfers.

Pre-request:

  • pip install apache-airflow-providers-google if it is not google managed composer

Pros:

  • Highly efficient for large datasets and enterprise-level use cases.
  • Offloads processing to Google’s backend infrastructure.
  • Supports incremental and scheduled transfers.

Cons:

  • Non-idempotent: Creates new transfer jobs for each run, requiring manual cleanup.
  • Configuration is more complex than other operators.
  • Can experience timeout issues for very large transfers.

Reference:

CloudDataTransferServiceGCSToGCSOperator Documentation

Sample Code:

class airflow.providers.google.cloud.operators.cloud_storage_transfer_service.CloudDataTransferServiceGCSToGCSOperator(*, source_bucket, destination_bucket, source_path=None, destination_path=None, project_id=PROVIDE_PROJECT_ID, gcp_conn_id='google_cloud_default', description=None, schedule=None, object_conditions=None, transfer_options=None, wait=True, timeout=None, google_impersonation_chain=None, delete_job_after_completion=False, **kwargs)
sync_daily_files = CloudDataTransferServiceGCSToGCSOperator(
task_id="sync_daily_files",
project_id=project_id,
source_bucket=gcs_3rdparty_landing_bucket_daily,
destination_bucket=gcs_landing_bucket_daily,
source_path="acc/",
destination_path="acc/",
description="copies 3rdparty landing bucket to application project bucket (daily)",
wait=True,
timeout=None,
delete_job_after_completion=True, # If run history is not required.
schedule=None,
object_conditions=None,
transfer_options={
"overwriteObjectsAlreadyExistingInSink": True,
"deleteObjectsUniqueInSink": False,
"deleteObjectsFromSourceAfterTransfer": False,
"overwriteWhen": "DIFFERENT",
"metadataOptions": {},
},
google_impersonation_chain=impersonate_account,
dag=dag,
)

Errors_log

2024-11-26, 11:23:15 UTC] {base.py:73} INFO - Using connection ID 'google_cloud_default' for task execution.
[2024-11-26, 11:23:15 UTC] {credentials_provider.py:353} INFO - Getting connection using `google.auth.default()` since no explicit credentials are provided.
[2024-11-26, 11:23:17 UTC] {cloud_storage_transfer_service.py:217} INFO - Created job transferJobs/17484812810658397503
[2024-11-26, 11:23:28 UTC] {cloud_storage_transfer_service.py:453} INFO - Progress for operation transferOperations/transferJobs-17484812810658397503-1117135540339681960: {'objectsFoundFromSource': '3079', 'bytesFoundFromSource': '7487997999', 'objectsFromSourceSkippedBySync': '5433', 'bytesFromSourceSkippedBySync': '19451774659'}
[2024-11-26, 11:23:39 UTC] {cloud_storage_transfer_service.py:453} INFO - Progress for operation transferOperations/transferJobs-17484812810658397503-1117135540339681960: {'objectsFoundFromSource': '3079', 'bytesFoundFromSource': '7487997999', 'objectsFromSourceSkippedBySync': '5433', 'bytesFromSourceSkippedBySync': '19451774659'}
[2024-11-26, 11:23:50 UTC] {cloud_storage_transfer_service.py:453} INFO - Progress for operation transferOperations/transferJobs-17484812810658397503-1117135540339681960: {'objectsFoundFromSource': '3079', 'bytesFoundFromSource': '7487997999', 'objectsFromSourceSkippedBySync': '5433', 'bytesFromSourceSkippedBySync': '19451774659'}
[2024-11-26, 11:24:01 UTC] {cloud_storage_transfer_service.py:453} INFO - Progress for operation transferOperations/transferJobs-17484812810658397503-1117135540339681960: {'objectsFoundFromSource': '3079', 'bytesFoundFromSource': '7487997999', 'objectsFromSourceSkippedBySync': '5433', 'bytesFromSourceSkippedBySync': '19451774659'}
[2024-11-26, 11:24:11 UTC] {cloud_storage_transfer_service.py:453} INFO - Progress for operation transferOperations/transferJobs-17484812810658397503-1117135540339681960: {'objectsFoundFromSource': '3079', 'bytesFoundFromSource': '7487997999', 'objectsFromSourceSkippedBySync': '5433', 'bytesFromSourceSkippedBySync': '19451774659'}
[2024-11-26, 11:24:21 UTC] {taskinstance.py:1826} ERROR - Task failed with exception
Traceback (most recent call last):
File "/opt/python3.11/lib/python3.11/site-packages/airflow/providers/google/cloud/operators/cloud_storage_transfer_service.py", line 1061, in execute
hook.wait_for_transfer_job(job, timeout=self.timeout)
File "/opt/python3.11/lib/python3.11/site-packages/airflow/providers/google/cloud/hooks/cloud_storage_transfer_service.py", line 458, in wait_for_transfer_job
raise AirflowException("Timeout. The operation could not be completed within the allotted time.")
airflow.exceptions.AirflowException: Timeout. The operation could not be completed within the allotted time.
[2024-11-26, 11:24:21 UTC] {taskinstance.py:1346} INFO - Marking task as FAILED. dag_id=4__cloud_transfer_gcs_to_gcs, task_id=sync_hourly_files, execution_date=20241126T111756, start_date=20241126T112315, end_date=20241126T112421
[2024-11-26, 11:24:21 UTC] {standard_task_runner.py:104} ERROR - Failed to execute job 16363172 for task sync_hourly_files (Timeout. The operation could not be completed within the allotted time.; 2963689)
[2024-11-26, 11:24:21 UTC] {local_task_job_runner.py:225} INFO - Task exited with return code 1
[2024-11-26, 11:24:21 UTC] {taskinstance.py:2656} INFO - 0 downstream tasks scheduled from follow-on schedule check

5. CloudDataTransferServiceRunJobOperator

This operator utilizes Google’s Cloud Storage Transfer Service to manage bucket-to-bucket transfers.

Pre-request:

  • pip install apache-airflow-providers-google if it is not google managed composer
  • google-cloud-storage-transfer PyPI package

Use Case:

  • Requirement to track the copy from the source to destination bucket.
  • 100% cloud solution

Pros:

  • Highly efficient for large datasets and enterprise-level use cases.
  • Offloads processing to Google’s backend infrastructure.
  • Supports incremental and scheduled transfers.
  • Deleting the unmatched file at sink is available.

Cons:

  • Doesnot support Wildcard pattern based copy in Object Options
  • Sometimes integrity checks are not guaranteed for other providers.
  • Configuration is more complex than other operators.
  • delete_objects_unique_in_sink and delete_objects_from_source_after_transfer are mutually exclusive. (Means you either delete object at source (moving) or delete the object destination. But not both at same time)

Reference:

CloudDataTransferServiceRunJobOperator Documentation

Sample Code:

resource "google_storage_transfer_job" "gcs_landing_bucket_sts_daily" {
provider = google.impersonated
project = local.project
name = "transferJobs/acc_sts_daily"
description = "Moves the object from acc 3rd party landing bucket (daily)."
transfer_spec {
# Doesnot support wildcard pattern.
object_conditions {
include_prefixes = [
"acc/",
]
}
transfer_options {
# Deletes the unmatched objects at desintain
delete_objects_unique_in_sink = true # ToDo false in Prod
delete_objects_from_source_after_transfer = false # ToDo true in Prod
# ABOVE OPTION: delete_objects_unique_in_sink and delete_objects_from_source_after_transfer are mutually exclusive.
overwrite_objects_already_existing_in_sink = true
overwrite_when = "DIFFERENT"
}
gcs_data_source {
bucket_name = "poc-logs"
path = "accc/"
}
gcs_data_sink {
bucket_name = data.terraform_remote_state.initialise.outputs.gcs_landing_bucket_daily
path = "acc/"
}
}
}
sync_daily_files = CloudDataTransferServiceRunJobOperator(
task_id="sync_daily_files",
job_name="transferJobs/acc_sts_daily",
project_id=project_id,
google_impersonation_chain=impersonate_account,
dag=dag,
)

Conclusion

Each option has its strengths and weaknesses, making the choice dependent on your specific use case:

  • For simplicity and minimal setup: Use GCSSynchronizeBucketsOperator.
  • For selective, pattern-based transfers: Use GCSToGCSOperator with move_object=True for efficiency.
  • For large-scale enterprise jobs: Leverage CloudDataTransferServiceGCSToGCSOperator.
  • For full control and flexibility: Use BashOperator with gcloud storage rsync.

In my setup, I found the GCSSynchronizeBucketsOperator to be the most reliable and straightforward solution for bucket synchronization in Airflow, with its built-in options to handle mismatched files and recursive transfers efficiently. However, for highly selective file operations, GCSToGCSOperator is a great alternative.