Async - Millions of API extract using Python - ETL (Local to Bigquery)
- Published on
- • 4 mins read•––– views
Photo by Chris Ried on Unsplash
import osimport subprocessimport certifiimport asyncioimport aiofilesimport aiohttpimport jsonimport dotenvimport timeimport sslimport randomimport jsonimport uvloopimport randomimport argparse
from datetime import datetimefrom google.cloud import bigquery
ssl_context = ssl.create_default_context()ssl_context.load_verify_locations(certifi.where())tcpconn = aiohttp.TCPConnector(ssl=ssl_context)
dotenv.load_dotenv()current_datetime = datetime.now()formatted_date = current_datetime.strftime("%Y-%m-%d")formatted_datetime = current_datetime.strftime("%Y-%m-%d %H:%M:%S")
root_dir = os.path.dirname(os.path.dirname(__file__))data_dir = os.path.join(root_dir, "data")sql_dir = os.path.join(root_dir, "bigquery")
os.environ['SSL_CERT_FILE'] = certifi.where()
host = os.environ["API__HOSTNAME"]username = os.environ["API__USERNAME"]password = os.environ["API__PASSWORD"]host_url = f"https://{host}/api/v2"signin_url = f"{host_url}/authentication/signin"signin_data = json.dumps({"name": username, "password": password})max_retry = 2
gcp_cloud_storage_dir = os.environ["GOOGLE_CLOUD_STORAGE"] gcp_bigquery_dataset = os.environ["GOOGLE_CLOUD_DATASET"]gcp_project_id = os.environ["GOOGLE_CLOUD_PROJECT"]
def create_dir(path: str) -> None: try: os.makedirs(path) except: pass
def read_json_file(json_file): with open(json_file, 'r', encoding='utf-8') as f: return json.load(f)
def read_file(input_file): with open(input_file, 'r', encoding='utf-8') as f: return f.read()
async def write_response_as_json(data, output_file): async with aiofiles.open(output_file, "w", encoding="utf-8") as f: await f.write(data) await asyncio.sleep(0)
async def fetch(session, semaphore, fetch_url, params, retry=0): try: async with semaphore: async with session.get(fetch_url, params=params) as response: await asyncio.sleep(1) if response.status != 200: print(f"{response.headers.get('DATE')}:{response.url} - {response.status}") if response.status > 399: response.raise_for_status() data = await response.json() await asyncio.sleep(1) return data except aiohttp.client_exceptions.ClientOSError as e: if retry < max_retry: await asyncio.sleep(2 + random.randint(1,10)) data = await fetch(session, semaphore, fetch_url, params, retry=retry+1) await asyncio.sleep(1) return data else: print(f'Retry failed! {fetch_url} - {retry=}') return {} except Exception as e: print(str(e)) return {}
async def request_api(session, semaphore, fetch_url, params, output_file):
data = await fetch(session, semaphore, fetch_url, params) await asyncio.sleep(1)
if isinstance(data, dict) and len(data.get("content", [])): delimited_json = "\n".join( [json.dumps({'extracted_dt': formatted_datetime, **x}) for x in data["content"]]) await write_response_as_json(delimited_json, output_file) await asyncio.sleep(1)
if all(key in list(data.keys()) for key in ['pageNumber', 'totalPages']) and data.get('totalPages', 0) > 1 : new_page_number = data['pageNumber'] + 1 if new_page_number < data['totalPages']: new_output_file = output_file.replace(f'{params["page"]}.json', f'{new_page_number}.json') new_params = params.copy() new_params['page'] = new_page_number task = asyncio.create_task(request_api(session, semaphore, fetch_url, new_params, new_output_file)) await task await asyncio.sleep(1) else: print(f'No data found! - {fetch_url} - {params} - {data}')
async def main(kwargs_list): semaphore = asyncio.Semaphore(1000)
async with aiohttp.ClientSession() as session: session.headers.update({"accept": "application/json", "content-type": "application/json"})
async with session.post(signin_url, data=signin_data, ssl_context=ssl_context) as response: if response.status > 399: response.raise_for_status() data = await response.json() session.headers.update({"X-AUTH-TOKEN": data["token"]})
# print(session.headers) coros = [request_api(session=session, semaphore=semaphore, **kwargs) for kwargs in kwargs_list] await asyncio.gather(*coros)
def run_subprocess(cmd): print(" ".join(cmd)) result = subprocess.run(cmd,check=True,capture_output=True, text=True) print("Return code:", result.returncode) if result.stdout: print("Standard output:", result.stdout) # if result.stderr: # print("Standard error:", result.stderr) return result.stdout
def run_bigquery_sql_to_df(query=''): if not query: return [] client = bigquery.Client(project=gcp_project_id) return client.query(query).to_dataframe()
if __name__ == "__main__": parser=argparse.ArgumentParser() parser.add_argument("slug", type=str, choices=['xxxx', 'yyyy']) args=parser.parse_args() slug = args.slug output_dir = os.path.join(data_dir, slug, formatted_date) create_dir(output_dir)
loop = uvloop.new_event_loop() asyncio.set_event_loop(loop)
if slug == 'yyyy': df = run_bigquery_sql_to_df(mappings[slug]['query']) batch_size = mappings[slug]['batch_size'] df['device_ids'] = df['device_id'].astype(int) device_ids = df['device_id'].tolist() print(f'Total device ids - {len(device_ids)}')
# Loop through device ids in batches for i in range(0,len(device_ids), batch_size): _kwargs = [{'fetch_url':mappings[slug]['fetch_url'].format(_id), 'params':mappings[slug]['params'], 'output_file': os.path.join(output_dir, mappings[slug]['output_file'].format(_id))} for _id in device_ids[i:i+batch_size] if not os.path.isfile( os.path.join(output_dir, mappings[slug]['output_file'].format(_id)))] start = time.time() asyncio.get_event_loop().run_until_complete(main(_kwargs)) pending_calls = len(device_ids) - (i+batch_size) print(f"# Time to complete '{slug}': {round(time.time() - start, 2)} seconds..") print(datetime.now(), "---------------->>>>>>>>>>>>>>>", i, i+batch_size, pending_calls)
elif slug in ('xxxx'): _kwargs = [{ 'fetch_url':mappings[slug]['fetch_url'], 'params':mappings[slug]['params'], 'output_file': os.path.join(output_dir, mappings[slug]['output_file'])}] start = time.time() asyncio.get_event_loop().run_until_complete(main(_kwargs)) print(f"# Time to complete '{slug}': {round(time.time() - start, 2)} seconds..") print("---------------->>>>>>>>>>>>>>>", datetime.now())
print("# Local to GCP Cloud Storage") run_subprocess(['gsutil', '-m', 'cp', '-R', output_dir, f'gs://{gcp_cloud_storage_dir}/{slug}/']) # run_subprocess(['gsutil', '-m', 'rsync', '-r', output_dir, f'gs://{gcp_cloud_storage_dir}/{slug}/'])
print('# Delete old bigquery raw table if exists already') run_subprocess(['bq', 'rm', '-f', '-t', f'{gcp_bigquery_dataset}.raw_{slug}_{formatted_date}'])
print('# Create a new bigquery raw table from the latest json files') run_subprocess(['bq', 'load', '--autodetect', '--source_format=NEWLINE_DELIMITED_JSON', f'{gcp_bigquery_dataset}.raw_{slug}_{formatted_date}', f"gs://{gcp_cloud_storage_dir}/{slug}/{formatted_date}/{slug}_*.json"])
print('# Delete old bigquery stage table if exists already') run_subprocess(['bq', 'rm', '-f', '-t', f'{gcp_bigquery_dataset}.stage_{slug}'])
print('# Create stage table from raw table') stage_sql = read_file(os.path.join(sql_dir, mappings[slug]['stage_table'])) stage_sql = stage_sql.format(formatted_date=formatted_date).replace('\n', ' ') run_subprocess(['bq', 'query', '--batch', '--use_legacy_sql=false', f'{stage_sql}']) print(' # Run merge stage table with final table query') merge_query = read_file(os.path.join(sql_dir, mappings[slug]['merge_query'])) run_subprocess(['bq', 'query', '--batch', '--use_legacy_sql=false', f'{merge_query}'])
print('# Delete old bigquery raw table if exists already') run_subprocess(['bq', 'rm', '-f', '-t', f'{gcp_bigquery_dataset}.raw_{slug}_{formatted_date}'])
# print('# Delete local files') # # ToDO