What is Synchronous Execution?
In synchronous execution, the tasks are executed sequentially without any background process. This refers to ‘Task 2' waiting for ‘Task 1’ to complete before proceeding. In the below mentioned example, it took 4 seconds to complete 4 tasks.
Synchronously Downloading Images using Requests library
We'll explore a Python script that downloads images from a list of URLs stored in a urls.txt
file. The script uses the requests library to send HTTP requests to each URL, and then saves the image file inside a local directory. We'll also go through how the script saves a reports of the response code for each URL and provides a summary of the time taken to complete the download process.
First let's import all the necessary modules needed for the script and open a REPORT.csv
file in append mode and writing a header row with two columns: URL and Response code. Read the contents of urls.txt
and create a directory named downloaded-images
if it doesn't already exist, and removes it if the directory already exists.
import requests
import os
import shutil
import time
start_time = time.time()
REPORT = "REPORT.csv"
file = open(REPORT, "w")
file.write("URL, Response code\n")
file.close()
data = open("urls.txt", "r").read().split("\n")
total_urls = len(data)
download_image_path = "downloaded-images"
if(os.path.exists(download_image_path)):
shutil.rmtree(download_image_path)
os.makedirs(download_image_path)
else:
if not os.path.exists(download_image_path):
os.makedirs(download_image_path)
Then loop through each URL in the list, send an HTTP GET request using the requests
library. For each URL, the script extracts the image name from the URL and saves the image content to a file in the downloaded-images
directory using the open()
function in binary write mode ('wb'
). Then it writes a row to the REPORT.csv
file with the URL and response code (200 for successful downloads, or the actual response code for failed image url). Finally, prints the total time taken to complete the download process.
for index, url in enumerate(data):
print(f"[{index}/{total_urls}] Processing {url}...")
req = requests.get(url)
img_name = url.split("/")[-1]
img_content = req.content
if req.status_code == 200:
with open(os.path.join(download_image_path, img_name), 'wb') as f:
f.write(img_content)
file = open(REPORT, "a")
file.write(f"{url}, 200\n")
file.close()
else:
file = open(REPORT, "a")
file.write(f"{url}, {str(req.status_code)}\n")
file.close()
end_time = time.time()
print(f"Time taken: {end_time - start_time} seconds")
Here is the complete code,
import requests
import os
import shutil
import time
start_time = time.time()
REPORT = "REPORT.csv"
file = open(REPORT, "w")
file.write("URL, Response code\n")
file.close()
data = open("urls.txt", "r").read().split("\n")
total_urls = len(data)
download_image_path = "downloaded-images"
if(os.path.exists(download_image_path)):
shutil.rmtree(download_image_path)
os.makedirs(download_image_path)
else:
if not os.path.exists(download_image_path):
os.makedirs(download_image_path)
for index, url in enumerate(data):
print(f"[{index}/{total_urls}] Processing {url}...")
req = requests.get(url)
img_name = url.split("/")[-1]
img_content = req.content
if req.status_code == 200:
with open(os.path.join(download_image_path, img_name), 'wb') as f:
f.write(img_content)
file = open(REPORT, "a")
file.write(f"{url}, 200\n")
file.close()
else:
file = open(REPORT, "a")
file.write(f"{url}, {str(req.status_code)}\n")
file.close()
end_time = time.time()
print(f"Time taken: {end_time - start_time} seconds")
What is Asynchronous Execution?
In asynchronous execution, all the tasks are executed almost parallelly as a background process where the execution of one task isn't dependent on another task. This refers to ‘Task 1' and ‘Task 2’ are executed almost simultaneously. In the below depicted image these tasks are started in a 500ms interval for basic representation.
Before diving into the script we need to understand event loop and coroutines.
What is Event loop?
In python, an event loop runs in a thread (typically the main thread) and executes all callbacks and Tasks in its thread. While a Task is running in the event loop, no other Tasks can run in the same thread. When a Task executes an await expression, the running Task gets suspended, and the event loop executes the next Task.
What is Coroutines?
Coroutines in Python are a type of asynchronous programming that allows multiple tasks to run concurrently, improving the performance and responsiveness of programs. A coroutine is a special type of function that can suspend its execution before reaching the end, and it can indirectly pass control to another coroutine for some time.
In Python, coroutines are implemented using the async
and await
keywords. The async keyword is used to define a coroutine function, and the await keyword is used to suspend the execution of the coroutine until the awaited task is complete.
Asynchronously Downloading Images using Asynchronous I/O, AIOHTTP library
Import all the libraries and declare the configs, feel free to change the configs as per your need. Check if the python script is executed directly and not as a module and start the coroutine by invoking asyncio.run()
and passing a coroutine object main()
function where all the tasks are executed in a event loop.
import asyncio
import aiohttp
from aiohttp import ClientResponseError, ClientPayloadError
import os
import shutil
import time
# configs
REPORT = "REPORT.csv"
DOWNLOAD_IMAGE_PATH = "downloaded-images"
MAX_RETRIES = 5
URLS_PER_BATCH = 1000
INPUT_URLS = "urls.txt"
report_file = open(REPORT, "w")
report_file.write("URL, Response code\n")
report_file.close()
counter = 1
if(os.path.exists(DOWNLOAD_IMAGE_PATH)):
shutil.rmtree(DOWNLOAD_IMAGE_PATH)
os.makedirs(DOWNLOAD_IMAGE_PATH)
else:
if not os.path.exists(DOWNLOAD_IMAGE_PATH):
os.makedirs(DOWNLOAD_IMAGE_PATH)
if __name__ == '__main__':
start_time = time.time()
asyncio.run(main())
end_time = time.time()
print(f"Time taken: {end_time - start_time} seconds")
Now let's create a asynchronous main
function, read all the urls from urls.txt and check and remove any empty values if present. Consider there are 50,000 image urls in urls.txt
, without processing these urls in batches all the 50,000 GET request will be sent at once which may will lead to system crash, script's performance issues and will put immense load on the server, to solve this we will be doing batch processing. A 2-Dimensional array batch_url_arr
will be created by splitting the larger array to its sub array and these nested sub arrays will be having number of urls mentioned in the ‘URLS_PER_BATCH’ configs. Iterates over the batched URLs in batch_url_arr
so it creates an asynchronous HTTP client session using aiohttp
with a timeout of 3600 seconds (1 hour) for each batch.Then lets create a list of ‘tasks’ to download images for each URL in the batch using the download_image
function. Using asyncio.gather
the tasks are executed concurrently and wait for their completion.
async def main():
image_urls = open("urls.txt", "r").read().split("\n")
total_url_count = len(image_urls)
image_urls = list(filter(None, image_urls))
# Splitting the total number of urls to perform batch processing
batch_url_arr = []
split_count = len(image_urls) // URLS_PER_BATCH
for i in range(0, split_count+1):
if (i == split_count):
batch_url_arr.append(image_urls[(i * URLS_PER_BATCH):])
else:
batch_url_arr.append(image_urls[(i * URLS_PER_BATCH):((i + 1) * URLS_PER_BATCH)])
# Async HTTP client session is created for each batch
for urls in batch_url_arr:
async with aiohttp.ClientSession(timeout = aiohttp.ClientTimeout(total=3600)) as session:
tasks = [download_image(session, url, total_url_count) for url in urls]
await asyncio.gather(*tasks)
Once the async download_image
function is called the session will send a GET request to each image url which will be iterated over ‘n’ times mentioned in the MAX_RETRIES
if the GET request get failed. A coroutine is started for the session's GET request's response where 8192 bytes of image content is read and written into local image file in write binary mode and the REPORT.csv
is updated breaking the while loop to avoid retires. Here there are some scenarios where it could go south and these exceptions should be handled so it will retried again till the MAX_RETRIES
is reached.
- ClientResponseError - If the response code is 400 or higher the ClientResponseError is thrown.
- ClientPayloadError - If the response object is closed before response receives all data or in case if any transfer encoding related errors like malformed chunked encoding of broken compression data.
- Exception - Or any other exception which could be thrown.
async def download_image(session, path, total_url_count):
global counter, MAX_RETRIES
retries = 0
while retries < MAX_RETRIES:
try:
url = path
img_name = path.split("/")[-1]
async with session.get(url) as response:
if response.status == 200:
with open(os.path.join(DOWNLOAD_IMAGE_PATH, img_name), 'wb') as f:
while True:
chunk = await response.content.read(8192)
if not chunk:
break
f.write(chunk)
report_file = open(REPORT, "a")
report_file.write(f"{url}, 200\n")
report_file.close()
print(f"[{counter}/{total_url_count}] {path}")
counter += 1
await asyncio.sleep(1)
break
else:
report_file = open(REPORT, "a")
report_file.write(f"{url}, {response.status}\n")
report_file.close()
print(f"[{counter}] {path}")
counter += 1
break
except ClientResponseError as e:
if(retries == MAX_RETRIES-1):
report_file = open(REPORT, "a")
report_file.write(f"{url}, FAILED {type(e).__name__}\n")
report_file.close()
print(f"[{counter}] {path}")
counter += 1
break
# Retry after 2 seconds
await asyncio.sleep(2)
print(f"[{counter}] {path} RETRYING...")
retries += 1
except ClientPayloadError as e:
if(retries == MAX_RETRIES-1):
report_file = open(REPORT, "a")
report_file.write(f"{url}, FAILED {type(e).__name__}\n")
report_file.close()
print(f"[{counter}] {path}")
counter += 1
break
# Retry after 2 seconds
await asyncio.sleep(2)
print(f"[{counter}] {path} RETRYING...")
retries += 1
except Exception as e:
if(retries == MAX_RETRIES-1):
report_file = open(REPORT, "a")
report_file.write(f"{url}, FAILED {type(e).__name__}\n")
report_file.close()
print(f"[{counter}] {path}")
counter += 1
break
# Retry after 2 seconds
await asyncio.sleep(2)
print(f"[{counter}] {path} RETRYING...")
retries += 1
Here is the complete code,
import asyncio
import aiohttp
from aiohttp import ClientResponseError, ClientPayloadError
import os
import shutil
import time
# configs
REPORT = "REPORT.csv"
DOWNLOAD_IMAGE_PATH = "downloaded-images"
MAX_RETRIES = 5
URLS_PER_BATCH = 500
report_file = open(REPORT, "w")
report_file.write("URL, Response code\n")
report_file.close()
counter = 1
if(os.path.exists(DOWNLOAD_IMAGE_PATH)):
shutil.rmtree(DOWNLOAD_IMAGE_PATH)
os.makedirs(DOWNLOAD_IMAGE_PATH)
else:
if not os.path.exists(DOWNLOAD_IMAGE_PATH):
os.makedirs(DOWNLOAD_IMAGE_PATH)
async def download_image(session, path, total_url_count):
global counter, MAX_RETRIES
retries = 0
while retries < MAX_RETRIES:
try:
url = path
img_name = path.split("/")[-1]
async with session.get(url) as response:
if response.status == 200:
with open(os.path.join(DOWNLOAD_IMAGE_PATH, img_name), 'wb') as f:
while True:
chunk = await response.content.read(8192)
if not chunk:
break
f.write(chunk)
report_file = open(REPORT, "a")
report_file.write(f"{url}, 200\n")
report_file.close()
print(f"[{counter}/{total_url_count}] {path}")
counter += 1
await asyncio.sleep(1)
break
else:
report_file = open(REPORT, "a")
report_file.write(f"{url}, {response.status}\n")
report_file.close()
print(f"[{counter}] {path}")
counter += 1
break
except ClientResponseError as e:
if(retries == MAX_RETRIES-1):
report_file = open(REPORT, "a")
report_file.write(f"{url}, FAILED {type(e).__name__}\n")
report_file.close()
print(f"[{counter}] {path}")
counter += 1
break
# Retry after 2 seconds
await asyncio.sleep(2)
print(f"[{counter}] {path} RETRYING...")
retries += 1
except ClientPayloadError as e:
if(retries == MAX_RETRIES-1):
report_file = open(REPORT, "a")
report_file.write(f"{url}, FAILED {type(e).__name__}\n")
report_file.close()
print(f"[{counter}] {path}")
counter += 1
break
# Retry after 2 seconds
await asyncio.sleep(2)
print(f"[{counter}] {path} RETRYING...")
retries += 1
except Exception as e:
if(retries == MAX_RETRIES-1):
report_file = open(REPORT, "a")
report_file.write(f"{url}, FAILED {type(e).__name__}\n")
report_file.close()
print(f"[{counter}] {path}")
counter += 1
break
# Retry after 2 seconds
await asyncio.sleep(2)
print(f"[{counter}] {path} RETRYING...")
retries += 1
async def main():
image_urls = open("urls.txt", "r").read().split("\n")
total_url_count = len(image_urls)
image_urls = list(filter(None, image_urls))
# Splitting the total number of urls to perform batch processing
batch_url_arr = []
split_count = len(image_urls) // URLS_PER_BATCH
for i in range(0, split_count+1):
if (i == split_count):
batch_url_arr.append(image_urls[(i * URLS_PER_BATCH):])
else:
batch_url_arr.append(image_urls[(i * URLS_PER_BATCH):((i + 1) * URLS_PER_BATCH)])
# Async HTTP client session is created for each batch
for urls in batch_url_arr:
async with aiohttp.ClientSession(timeout = aiohttp.ClientTimeout(total=3600)) as session:
tasks = [download_image(session, url, total_url_count) for url in urls]
await asyncio.gather(*tasks)
if __name__ == '__main__':
start_time = time.time()
asyncio.run(main())
end_time = time.time()
print(f"Time taken: {end_time - start_time} seconds")
Comparing synchronous and asynchronous script for performance:
Note: The performance of the script may vary based on the network speed and system specification.
Synchronous Execution:
- Took 756 secs to download 1000 images having total file size of 2.7gb
Asynchronous Execution:
- Took 438 secs to download 1000 images in 1000 images per batch(1 batch)
- Took 420 secs to download 1000 images in 500 images per batch(2 batches)
- Took 456 secs to download 1000 images in 250 images per batch(4 batches)
- Took 510 secs to download 1000 images in 100 images per batch(10 batches)
From this we could conclude that asynchronous approach is much faster and also possible to compute huge sets of data with batch processing.
Farewell 👋!