Syncing Between GCS Buckets in Google Cloud using Composer (Airflow)
- Published on
- • 9 mins read•––– views
Syncing Between GCS Buckets in Google Cloud
When working with Google Cloud, synchronizing data between Google Cloud Storage (GCS) buckets is a common requirement. There are multiple ways to achieve this, each with its own benefits and limitations. My goal was to identify a solution suitable for Google Cloud Composer (Airflow) that meets the following criteria:
- Sync only when there are new or updated files (based on hash changes).
- Remove unmatched files from the destination bucket.
- Avoid unnecessary file rewrites.
- Ensure reliability while working in an Airflow DAG.
Here are the options I explored:
BashOperator
with gcloud storage rsync
Option 1: Using The gcloud storage rsync
command is a powerful and flexible way to synchronize buckets. By combining it with Airflow's BashOperator
, you can achieve a solution that meets all the criteria.
Prerequisites:
- Google Cloud Composer includes the
gcloud
CLI by default. For hosted Airflow, you must manually install the CLI.
Use Cases:
- Prioritize specific file types (e.g.,
.avro
files) during synchronization. - Sync based on patterns and control the order of file transfers.
- Preserve
POSIX
metadata when needed.
Pros:
- Detects and syncs only new or modified files.
- Supports recursive syncing of subfolders.
- Deletes unmatched files in the destination bucket.
- Allows pattern-based file selection for customized workflows.
- Minimizes unnecessary file rewrites.
Cons:
- Relies on Airflow worker resources like
/tmp
and SQLite databases, potentially causing crashes under heavy loads. - Consumes worker node resources, which may impact parallel task performance.
- Does not move objects; it only copies them.
Reference:
Official gcloud storage rsync Documentation
Sample Code:
gcloud storage rsync gs://src gs://dest--checksums-only --continue-on-error --delete-unmatched-destination-objects --recursive --quiet --verbosity=info --preserve-posix --no-ignore-symlinks --user-output-enabled --include-managed-folders --skip-unsupported --exclude=".*\_SUCCESS$"
gcloud storage rsync gs://src gs://dest --checksums-only --continue-on-error --delete-unmatched-destination-objects --recursive --quiet --verbosity=info --preserve-posix --no-ignore-symlinks --user-output-enabled --include-managed-folders --skip-unsupported --exclude=".*\.avro$"
# Task 1: Sync .avro files first from source to destination bucket (daily)sync_avro_daily_files = BashOperator( task_id="sync_avro_daily_files", bash_command=f"""gcloud storage rsync gs://{gcs_3rdparty_landing_bucket_daily} gs://{gcs_landing_bucket_daily} --checksums-only --continue-on-error --delete-unmatched-destination-objects --recursive --quiet --verbosity=info --preserve-posix --no-ignore-symlinks --user-output-enabled --skip-unsupported --exclude=".*\_SUCCESS$" --impersonate-service-account="{impersonate_account}" """, dag=dag,)
# Task 2: Sync the _SUCCESS file after .avro files have been synced (daily)sync_success_daily_file = BashOperator( task_id="sync_success_daily_file", bash_command=f"""gcloud storage rsync gs://{gcs_3rdparty_landing_bucket_daily} gs://{gcs_landing_bucket_daily} --checksums-only --continue-on-error --delete-unmatched-destination-objects --recursive --quiet --verbosity=info --preserve-posix --no-ignore-symlinks --user-output-enabled --skip-unsupported --exclude=".*\.avro$" --impersonate-service-account="{impersonate_account}" """, dag=dag,)
Errors_log
[2024-11-21, 16:15:29 UTC] {subprocess.py:93} INFO - ..WARNING: Could not store access token in cache: database is locked[2024-11-21, 16:15:29 UTC] {subprocess.py:93} INFO - WARNING: Could not store access token in cache: database is locked[2024-11-21, 16:15:29 UTC] {subprocess.py:93} INFO - WARNING: Could not store access token in cache: database is locked[2024-11-21, 16:15:30 UTC] {subprocess.py:93} INFO - ...WARNING: Could not store access token in cache: database is locked[2024-11-21, 16:15:30 UTC] {subprocess.py:93} INFO - .WARNING: Could not store access token in cache: database is locked[2024-11-21, 16:15:30 UTC] {subprocess.py:93} INFO - WARNING: Could not store access token in cache: database is locked[2024-11-21, 16:15:31 UTC] {subprocess.py:93} INFO - ....[2024-11-21, 16:15:31 UTC] {subprocess.py:93} INFO -[2024-11-21, 16:15:31 UTC] {subprocess.py:93} INFO - Average throughput: 621.3MiB/s[2024-11-21, 16:15:31 UTC] {subprocess.py:97} INFO - Command exited with return code 1[2024-11-21, 16:15:31 UTC] {taskinstance.py:1826} ERROR - Task failed with exceptionTraceback (most recent call last): File "/opt/python3.11/lib/python3.11/site-packages/airflow/operators/bash.py", line 210, in execute raise AirflowException(airflow.exceptions.AirflowException: Bash command failed. The command returned a non-zero exit code 1.[2024-11-21, 16:15:31 UTC] {taskinstance.py:1346} INFO - Marking task as FAILED. dag_id=1__bash_operator_rsync_tool, task_id=sync_avro_daily_files, execution_date=20241121T161246, start_date=20241121T161507, end_date=20241121T161531[2024-11-21, 16:15:31 UTC] {standard_task_runner.py:104} ERROR - Failed to execute job 16002616 for task sync_avro_daily_files (Bash command failed. The command returned a non-zero exit code 1.; 2838153)[2024-11-21, 16:15:32 UTC] {local_task_job_runner.py:225} INFO - Task exited with return code 1[2024-11-21, 16:15:32 UTC] {taskinstance.py:2656} INFO - 0 downstream tasks scheduled from follow-on schedule check
GCSSynchronizeBucketsOperator
2. Synchronizes the contents of the buckets or bucket’s directories in the Google Cloud Services.
Prerequisites:
- Install the Airflow provider:
pip install apache-airflow-providers-google
.
Use Cases:
- Synchronizing entire directories or buckets.
- Keeping multiple buckets (e.g., dev, test, staging) in sync with a single source.
- Maintain identical copies of buckets.
Pros:
- Handles recursive syncing efficiently.
- Deletes unmatched files in the destination bucket.
- Includes options like
allow_overwrite
for fine-grained control. - Safer and more reliable than
BashOperator
.
Cons:
- Operates at a directory level; individual file synchronization isn’t supported.
- Doesn’t move files; source files remain until explicitly deleted.
Reference:
GCSSynchronizeBucketsOperator Documentation
Sample Code:
class airflow.providers.google.cloud.operators.gcs.GCSSynchronizeBucketsOperator(*, source_bucket, destination_bucket, source_object=None, destination_object=None, recursive=True, delete_extra_files=False, allow_overwrite=False, gcp_conn_id='google_cloud_default', impersonation_chain=None, **kwargs)
sync_daily_files = GCSSynchronizeBucketsOperator( task_id="sync_daily_files", source_bucket=gcs_3rdparty_landing_bucket_daily, destination_bucket=gcs_landing_bucket_daily, source_object="acc/", destination_object="acc/", recursive=True, delete_extra_files=True, allow_overwrite=True, impersonation_chain=impersonate_account, dag=dag,)
GCSToGCSOperator
3. A straightforward operator for copying objects between buckets.
Prerequisites:
- Install the Airflow provider:
pip install apache-airflow-providers-google
.
Use Cases:
- Incremental copies without hash validation.
- Preserving historical copies in the destination bucket.
Pros:
- Efficient for incremental copy tasks.
- Supports file patterns and recursive operations.
Cons:
- Lacks hash validation, leading to redundant rewrites.
- Doesn’t delete unmatched files in the destination bucket.
Reference:
GCSToGCSOperator Documentation
Sample Code:
class airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSToGCSOperator(*, source_bucket, source_object=None, source_objects=None, destination_bucket=None, destination_object=None, delimiter=None, move_object=False, replace=True, gcp_conn_id='google_cloud_default', last_modified_time=None, maximum_modified_time=None, is_older_than=None, impersonation_chain=None, source_object_required=False, exact_match=False, match_glob=None, **kwargs)
sync_daily_files = GCSToGCSOperator( task_id="sync_daily_files", source_bucket=gcs_3rdparty_landing_bucket_daily, destination_bucket=gcs_landing_bucket_daily, source_object="acc/", destination_object="acc/", move_object=False, replace=True, last_modified_time=None, maximum_modified_time=None, is_older_than=None, source_object_required=False, exact_match=False, match_glob=None, impersonation_chain=impersonate_account, dag=dag,)
CloudDataTransferServiceGCSToGCSOperator
4. This operator utilizes Google’s Cloud Storage Transfer Service to manage bucket-to-bucket transfers.
Pre-request:
pip install apache-airflow-providers-google
if it is not google managed composer
Pros:
- Highly efficient for large datasets and enterprise-level use cases.
- Offloads processing to Google’s backend infrastructure.
- Supports incremental and scheduled transfers.
Cons:
- Non-idempotent: Creates new transfer jobs for each run, requiring manual cleanup.
- Configuration is more complex than other operators.
- Can experience timeout issues for very large transfers.
Reference:
CloudDataTransferServiceGCSToGCSOperator Documentation
Sample Code:
class airflow.providers.google.cloud.operators.cloud_storage_transfer_service.CloudDataTransferServiceGCSToGCSOperator(*, source_bucket, destination_bucket, source_path=None, destination_path=None, project_id=PROVIDE_PROJECT_ID, gcp_conn_id='google_cloud_default', description=None, schedule=None, object_conditions=None, transfer_options=None, wait=True, timeout=None, google_impersonation_chain=None, delete_job_after_completion=False, **kwargs)
sync_daily_files = CloudDataTransferServiceGCSToGCSOperator( task_id="sync_daily_files", project_id=project_id, source_bucket=gcs_3rdparty_landing_bucket_daily, destination_bucket=gcs_landing_bucket_daily, source_path="acc/", destination_path="acc/", description="copies 3rdparty landing bucket to application project bucket (daily)", wait=True, timeout=None, delete_job_after_completion=True, # If run history is not required. schedule=None, object_conditions=None, transfer_options={ "overwriteObjectsAlreadyExistingInSink": True, "deleteObjectsUniqueInSink": False, "deleteObjectsFromSourceAfterTransfer": False, "overwriteWhen": "DIFFERENT", "metadataOptions": {}, }, google_impersonation_chain=impersonate_account, dag=dag,)
Errors_log
2024-11-26, 11:23:15 UTC] {base.py:73} INFO - Using connection ID 'google_cloud_default' for task execution.[2024-11-26, 11:23:15 UTC] {credentials_provider.py:353} INFO - Getting connection using `google.auth.default()` since no explicit credentials are provided.[2024-11-26, 11:23:17 UTC] {cloud_storage_transfer_service.py:217} INFO - Created job transferJobs/17484812810658397503[2024-11-26, 11:23:28 UTC] {cloud_storage_transfer_service.py:453} INFO - Progress for operation transferOperations/transferJobs-17484812810658397503-1117135540339681960: {'objectsFoundFromSource': '3079', 'bytesFoundFromSource': '7487997999', 'objectsFromSourceSkippedBySync': '5433', 'bytesFromSourceSkippedBySync': '19451774659'}[2024-11-26, 11:23:39 UTC] {cloud_storage_transfer_service.py:453} INFO - Progress for operation transferOperations/transferJobs-17484812810658397503-1117135540339681960: {'objectsFoundFromSource': '3079', 'bytesFoundFromSource': '7487997999', 'objectsFromSourceSkippedBySync': '5433', 'bytesFromSourceSkippedBySync': '19451774659'}[2024-11-26, 11:23:50 UTC] {cloud_storage_transfer_service.py:453} INFO - Progress for operation transferOperations/transferJobs-17484812810658397503-1117135540339681960: {'objectsFoundFromSource': '3079', 'bytesFoundFromSource': '7487997999', 'objectsFromSourceSkippedBySync': '5433', 'bytesFromSourceSkippedBySync': '19451774659'}[2024-11-26, 11:24:01 UTC] {cloud_storage_transfer_service.py:453} INFO - Progress for operation transferOperations/transferJobs-17484812810658397503-1117135540339681960: {'objectsFoundFromSource': '3079', 'bytesFoundFromSource': '7487997999', 'objectsFromSourceSkippedBySync': '5433', 'bytesFromSourceSkippedBySync': '19451774659'}[2024-11-26, 11:24:11 UTC] {cloud_storage_transfer_service.py:453} INFO - Progress for operation transferOperations/transferJobs-17484812810658397503-1117135540339681960: {'objectsFoundFromSource': '3079', 'bytesFoundFromSource': '7487997999', 'objectsFromSourceSkippedBySync': '5433', 'bytesFromSourceSkippedBySync': '19451774659'}[2024-11-26, 11:24:21 UTC] {taskinstance.py:1826} ERROR - Task failed with exceptionTraceback (most recent call last): File "/opt/python3.11/lib/python3.11/site-packages/airflow/providers/google/cloud/operators/cloud_storage_transfer_service.py", line 1061, in execute hook.wait_for_transfer_job(job, timeout=self.timeout) File "/opt/python3.11/lib/python3.11/site-packages/airflow/providers/google/cloud/hooks/cloud_storage_transfer_service.py", line 458, in wait_for_transfer_job raise AirflowException("Timeout. The operation could not be completed within the allotted time.")airflow.exceptions.AirflowException: Timeout. The operation could not be completed within the allotted time.[2024-11-26, 11:24:21 UTC] {taskinstance.py:1346} INFO - Marking task as FAILED. dag_id=4__cloud_transfer_gcs_to_gcs, task_id=sync_hourly_files, execution_date=20241126T111756, start_date=20241126T112315, end_date=20241126T112421[2024-11-26, 11:24:21 UTC] {standard_task_runner.py:104} ERROR - Failed to execute job 16363172 for task sync_hourly_files (Timeout. The operation could not be completed within the allotted time.; 2963689)[2024-11-26, 11:24:21 UTC] {local_task_job_runner.py:225} INFO - Task exited with return code 1[2024-11-26, 11:24:21 UTC] {taskinstance.py:2656} INFO - 0 downstream tasks scheduled from follow-on schedule check
CloudDataTransferServiceRunJobOperator
5. This operator utilizes Google’s Cloud Storage Transfer Service to manage bucket-to-bucket transfers.
Pre-request:
pip install apache-airflow-providers-google
if it is not google managed composergoogle-cloud-storage-transfer
PyPI package
Use Case:
- Requirement to track the copy from the source to destination bucket.
- 100% cloud solution
Pros:
- Highly efficient for large datasets and enterprise-level use cases.
- Offloads processing to Google’s backend infrastructure.
- Supports incremental and scheduled transfers.
- Deleting the unmatched file at sink is available.
Cons:
- Doesnot support Wildcard pattern based copy in
Object Options
- Sometimes integrity checks are not guaranteed for other providers.
- Configuration is more complex than other operators.
delete_objects_unique_in_sink
anddelete_objects_from_source_after_transfer
are mutually exclusive. (Means you either delete object at source (moving) or delete the object destination. But not both at same time)
Reference:
CloudDataTransferServiceRunJobOperator Documentation
Sample Code:
resource "google_storage_transfer_job" "gcs_landing_bucket_sts_daily" { provider = google.impersonated project = local.project name = "transferJobs/acc_sts_daily" description = "Moves the object from acc 3rd party landing bucket (daily)."
transfer_spec { # Doesnot support wildcard pattern. object_conditions { include_prefixes = [ "acc/", ] }
transfer_options { # Deletes the unmatched objects at desintain delete_objects_unique_in_sink = true # ToDo false in Prod delete_objects_from_source_after_transfer = false # ToDo true in Prod # ABOVE OPTION: delete_objects_unique_in_sink and delete_objects_from_source_after_transfer are mutually exclusive.
overwrite_objects_already_existing_in_sink = true overwrite_when = "DIFFERENT" }
gcs_data_source { bucket_name = "poc-logs" path = "accc/"
}
gcs_data_sink { bucket_name = data.terraform_remote_state.initialise.outputs.gcs_landing_bucket_daily path = "acc/" }
}}
sync_daily_files = CloudDataTransferServiceRunJobOperator( task_id="sync_daily_files", job_name="transferJobs/acc_sts_daily", project_id=project_id, google_impersonation_chain=impersonate_account, dag=dag,)
Conclusion
Each option has its strengths and weaknesses, making the choice dependent on your specific use case:
- For simplicity and minimal setup: Use
GCSSynchronizeBucketsOperator
. - For selective, pattern-based transfers: Use
GCSToGCSOperator
withmove_object=True
for efficiency. - For large-scale enterprise jobs: Leverage
CloudDataTransferServiceGCSToGCSOperator
. - For full control and flexibility: Use
BashOperator
withgcloud storage rsync
.
In my setup, I found the GCSSynchronizeBucketsOperator
to be the most reliable and straightforward solution for bucket synchronization in Airflow, with its built-in options to handle mismatched files and recursive transfers efficiently. However, for highly selective file operations, GCSToGCSOperator
is a great alternative.