from concurrent.futures import ThreadPoolExecutor
from google.cloud import bigquery
from math import floor, log
client = bigquery.Client()
job_config = bigquery.QueryJobConfig(dry_run=True, use_query_cache=False)
query_event_min_max_time = '''
SELECT
'{dataset}' AS dataset,
'{table_name}' AS table_name,
'{layer_level}' AS layer_level,
'{table}' AS source,
MAX({event_dt_col}) AS last_dt,
DATE_DIFF(CURRENT_TIMESTAMP(), MAX({event_dt_col}), day) AS day,
DATE_DIFF(CURRENT_TIMESTAMP(), MAX({event_dt_col}), hour) AS hour,
DATE_DIFF(CURRENT_TIMESTAMP(), MAX({event_dt_col}), minute) AS minute,
CASE WHEN COALESCE(DATE_DIFF(CURRENT_TIMESTAMP(), MAX({event_dt_col}), day), 2) > 1 THEN 'check' ELSE 'ignore' END AS action,
CURRENT_TIMESTAMP() AS processed_dt
FROM `{table}`
WHERE
TIMESTAMP_TRUNC({partition_col}, DAY) BETWEEN DATE_ADD(CURRENT_TIMESTAMP(), INTERVAL -1 DAY) AND TIMESTAMP_TRUNC(CURRENT_TIMESTAMP(), DAY)
LIMIT 1;
'''
def format_bytes(size):
power = 0 if size <= 0 else floor(log(size, 1024))
return f"{round(size / 1024 ** power, 2)} {['B', 'KB', 'MB', 'GB', 'TB'][int(power)]}"
def run_bigquery(query=''):
if not query:
return []
query_job = client.query(query, job_config=job_config)
bytes_scanned = format_bytes(query_job.total_bytes_processed)
if 'GB' in bytes_scanned or 'TB' in bytes_scanned:
print(query)
print(f'Query data scan costs higher: {bytes_scanned}')
return []
df = client.query(query).to_dataframe()
df['bytes_scanned'] = bytes_scanned
return df.to_dict('records')
result_list = []
with ThreadPoolExecutor(max_workers=32) as executor:
futures = []
for item in list_of_queries: # Format the different query add to list
future = executor.submit(run_bigquery, item)
futures.append(future)
for f in futures:
result_list.extend(f.result())
df = pd.DataFrame(result_list)
df.sort_values(['dataset', 'table_name', 'layer_level'],
ascending=[True, True, True], inplace=True)
text = df.to_markdown(index=False)
print(text)