What is Synchronous Execution?

In synchronous execution, the tasks are executed sequentially without any background process. This refers to ‘Task 2' waiting for ‘Task 1’ to complete before proceeding. In the below mentioned example, it took 4 seconds to complete 4 tasks.

Image of Synchronous Execution

Synchronously Downloading Images using Requests library

We'll explore a Python script that downloads images from a list of URLs stored in a urls.txt file. The script uses the requests library to send HTTP requests to each URL, and then saves the image file inside a local directory. We'll also go through how the script saves a reports of the response code for each URL and provides a summary of the time taken to complete the download process.

First let's import all the necessary modules needed for the script and open a REPORT.csv file in append mode and writing a header row with two columns: URL and Response code. Read the contents of urls.txt and create a directory named downloaded-images if it doesn't already exist, and removes it if the directory already exists.

import requests
import os
import shutil
import time

start_time = time.time()

REPORT = "REPORT.csv"
file = open(REPORT, "w")
file.write("URL, Response code\n")
file.close()

data = open("urls.txt", "r").read().split("\n")
total_urls = len(data)
download_image_path = "downloaded-images"

if(os.path.exists(download_image_path)):
    shutil.rmtree(download_image_path)
    os.makedirs(download_image_path)
else:
    if not os.path.exists(download_image_path):
        os.makedirs(download_image_path)

Then loop through each URL in the list, send an HTTP GET request using the requests library. For each URL, the script extracts the image name from the URL and saves the image content to a file in the downloaded-images directory using the open() function in binary write mode ('wb'). Then it writes a row to the REPORT.csv file with the URL and response code (200 for successful downloads, or the actual response code for failed image url). Finally, prints the total time taken to complete the download process.

for index, url in enumerate(data):
    print(f"[{index}/{total_urls}] Processing {url}...")
    req = requests.get(url)
    img_name = url.split("/")[-1]
    img_content = req.content

    if req.status_code == 200:
        with open(os.path.join(download_image_path, img_name), 'wb') as f:
            f.write(img_content)
        file = open(REPORT, "a")
        file.write(f"{url}, 200\n")
        file.close()
    else:
        file = open(REPORT, "a")
        file.write(f"{url}, {str(req.status_code)}\n")
        file.close()
    
end_time = time.time()
print(f"Time taken: {end_time - start_time} seconds")

Here is the complete code,

import requests
import os
import shutil
import time

start_time = time.time()

REPORT = "REPORT.csv"
file = open(REPORT, "w")
file.write("URL, Response code\n")
file.close()

data = open("urls.txt", "r").read().split("\n")
total_urls = len(data)
download_image_path = "downloaded-images"

if(os.path.exists(download_image_path)):
    shutil.rmtree(download_image_path)
    os.makedirs(download_image_path)
else:
    if not os.path.exists(download_image_path):
        os.makedirs(download_image_path)

for index, url in enumerate(data):
    print(f"[{index}/{total_urls}] Processing {url}...")
    req = requests.get(url)
    img_name = url.split("/")[-1]
    img_content = req.content

    if req.status_code == 200:
        with open(os.path.join(download_image_path, img_name), 'wb') as f:
            f.write(img_content)
        file = open(REPORT, "a")
        file.write(f"{url}, 200\n")
        file.close()
    else:
        file = open(REPORT, "a")
        file.write(f"{url}, {str(req.status_code)}\n")
        file.close()
    
end_time = time.time()
print(f"Time taken: {end_time - start_time} seconds")

What is Asynchronous Execution?

In asynchronous execution, all the tasks are executed almost parallelly as a background process where the execution of one task isn't dependent on another task. This refers to ‘Task 1' and ‘Task 2’ are executed almost simultaneously. In the below depicted image these tasks are started in a 500ms interval for basic representation.

Image of Asynchronous Execution

Before diving into the script we need to understand event loop and coroutines.

What is Event loop?

In python, an event loop runs in a thread (typically the main thread) and executes all callbacks and Tasks in its thread. While a Task is running in the event loop, no other Tasks can run in the same thread. When a Task executes an await expression, the running Task gets suspended, and the event loop executes the next Task.

What is Coroutines?

Coroutines in Python are a type of asynchronous programming that allows multiple tasks to run concurrently, improving the performance and responsiveness of programs. A coroutine is a special type of function that can suspend its execution before reaching the end, and it can indirectly pass control to another coroutine for some time. In Python, coroutines are implemented using the async and await keywords. The async keyword is used to define a coroutine function, and the await keyword is used to suspend the execution of the coroutine until the awaited task is complete.

Asynchronously Downloading Images using Asynchronous I/O, AIOHTTP library

Import all the libraries and declare the configs, feel free to change the configs as per your need. Check if the python script is executed directly and not as a module and start the coroutine by invoking asyncio.run() and passing a coroutine object main() function where all the tasks are executed in a event loop.

import asyncio
import aiohttp
from aiohttp import ClientResponseError, ClientPayloadError
import os
import shutil
import time

# configs
REPORT = "REPORT.csv"
DOWNLOAD_IMAGE_PATH = "downloaded-images"
MAX_RETRIES = 5
URLS_PER_BATCH = 1000
INPUT_URLS = "urls.txt"

report_file = open(REPORT, "w")
report_file.write("URL, Response code\n")
report_file.close()

counter = 1

if(os.path.exists(DOWNLOAD_IMAGE_PATH)):
    shutil.rmtree(DOWNLOAD_IMAGE_PATH)
    os.makedirs(DOWNLOAD_IMAGE_PATH)
else:
    if not os.path.exists(DOWNLOAD_IMAGE_PATH):
        os.makedirs(DOWNLOAD_IMAGE_PATH)
if __name__ == '__main__':
    start_time = time.time()

    asyncio.run(main())

    end_time = time.time()
    print(f"Time taken: {end_time - start_time} seconds")

Now let's create a asynchronous main function, read all the urls from urls.txt and check and remove any empty values if present. Consider there are 50,000 image urls in urls.txt, without processing these urls in batches all the 50,000 GET request will be sent at once which may will lead to system crash, script's performance issues and will put immense load on the server, to solve this we will be doing batch processing. A 2-Dimensional array batch_url_arr will be created by splitting the larger array to its sub array and these nested sub arrays will be having number of urls mentioned in the ‘URLS_PER_BATCH’ configs. Iterates over the batched URLs in batch_url_arr so it creates an asynchronous HTTP client session using aiohttp with a timeout of 3600 seconds (1 hour) for each batch.Then lets create a list of ‘tasks’ to download images for each URL in the batch using the download_image function. Using asyncio.gather the tasks are executed concurrently and wait for their completion.

async def main():
    image_urls = open("urls.txt", "r").read().split("\n")
    total_url_count = len(image_urls)

    image_urls = list(filter(None, image_urls))

    # Splitting the total number of urls to perform batch processing
    batch_url_arr = []
    split_count = len(image_urls) // URLS_PER_BATCH
    for i in range(0, split_count+1):
        if (i == split_count):
            batch_url_arr.append(image_urls[(i * URLS_PER_BATCH):])
        else:
             batch_url_arr.append(image_urls[(i * URLS_PER_BATCH):((i + 1) * URLS_PER_BATCH)])

    # Async HTTP client session is created for each batch
    for urls in batch_url_arr:
        async with aiohttp.ClientSession(timeout = aiohttp.ClientTimeout(total=3600)) as session:
            tasks = [download_image(session, url, total_url_count) for url in urls]
            await asyncio.gather(*tasks)

Once the async download_image function is called the session will send a GET request to each image url which will be iterated over ‘n’ times mentioned in the MAX_RETRIES if the GET request get failed. A coroutine is started for the session's GET request's response where 8192 bytes of image content is read and written into local image file in write binary mode and the REPORT.csv is updated breaking the while loop to avoid retires. Here there are some scenarios where it could go south and these exceptions should be handled so it will retried again till the MAX_RETRIES is reached.

  1. ClientResponseError - If the response code is 400 or higher the ClientResponseError is thrown.
  2. ClientPayloadError - If the response object is closed before response receives all data or in case if any transfer encoding related errors like malformed chunked encoding of broken compression data.
  3. Exception - Or any other exception which could be thrown.
async def download_image(session, path, total_url_count):
    global counter, MAX_RETRIES
    retries = 0
    while retries < MAX_RETRIES:
        try:
            url = path
            img_name = path.split("/")[-1]
            
            async with session.get(url) as response:
                if response.status == 200:
                    with open(os.path.join(DOWNLOAD_IMAGE_PATH, img_name), 'wb') as f:
                        while True:
                            chunk = await response.content.read(8192)
                            if not chunk:
                                break
                            f.write(chunk)
                    report_file = open(REPORT, "a")
                    report_file.write(f"{url}, 200\n")
                    report_file.close()
                    
                    print(f"[{counter}/{total_url_count}] {path}")
                    counter += 1

                    await asyncio.sleep(1)
                    break

                else:
                    report_file = open(REPORT, "a")
                    report_file.write(f"{url}, {response.status}\n")
                    report_file.close()

                    print(f"[{counter}] {path}")
                    counter += 1
                    break
        except ClientResponseError as e:
            if(retries == MAX_RETRIES-1):
                report_file = open(REPORT, "a")
                report_file.write(f"{url}, FAILED {type(e).__name__}\n")
                report_file.close()
                
                print(f"[{counter}] {path}")
                counter += 1
                break
            
            # Retry after 2 seconds
            await asyncio.sleep(2)
            print(f"[{counter}] {path} RETRYING...")
            retries += 1
        except ClientPayloadError as e:
            if(retries == MAX_RETRIES-1):
                report_file = open(REPORT, "a")
                report_file.write(f"{url}, FAILED {type(e).__name__}\n")
                report_file.close()
                
                print(f"[{counter}] {path}")
                counter += 1
                break
            
            # Retry after 2 seconds
            await asyncio.sleep(2)
            print(f"[{counter}] {path} RETRYING...")
            retries += 1

        except Exception as e:
            if(retries == MAX_RETRIES-1):
                report_file = open(REPORT, "a")
                report_file.write(f"{url}, FAILED {type(e).__name__}\n")
                report_file.close()
                
                print(f"[{counter}] {path}")
                counter += 1
                break
            
            # Retry after 2 seconds
            await asyncio.sleep(2)
            print(f"[{counter}] {path} RETRYING...")
            retries += 1

Here is the complete code,

import asyncio
import aiohttp
from aiohttp import ClientResponseError, ClientPayloadError
import os
import shutil
import time

# configs
REPORT = "REPORT.csv"
DOWNLOAD_IMAGE_PATH = "downloaded-images"
MAX_RETRIES = 5
URLS_PER_BATCH = 500

report_file = open(REPORT, "w")
report_file.write("URL, Response code\n")
report_file.close()

counter = 1

if(os.path.exists(DOWNLOAD_IMAGE_PATH)):
    shutil.rmtree(DOWNLOAD_IMAGE_PATH)
    os.makedirs(DOWNLOAD_IMAGE_PATH)
else:
    if not os.path.exists(DOWNLOAD_IMAGE_PATH):
        os.makedirs(DOWNLOAD_IMAGE_PATH)

async def download_image(session, path, total_url_count):
    global counter, MAX_RETRIES
    retries = 0
    while retries < MAX_RETRIES:
        try:
            url = path
            img_name = path.split("/")[-1]
            
            async with session.get(url) as response:
                if response.status == 200:
                    with open(os.path.join(DOWNLOAD_IMAGE_PATH, img_name), 'wb') as f:
                        while True:
                            chunk = await response.content.read(8192)
                            if not chunk:
                                break
                            f.write(chunk)
                    report_file = open(REPORT, "a")
                    report_file.write(f"{url}, 200\n")
                    report_file.close()
                    
                    print(f"[{counter}/{total_url_count}] {path}")
                    counter += 1

                    await asyncio.sleep(1)
                    break

                else:
                    report_file = open(REPORT, "a")
                    report_file.write(f"{url}, {response.status}\n")
                    report_file.close()

                    print(f"[{counter}] {path}")
                    counter += 1
                    break
        except ClientResponseError as e:
            if(retries == MAX_RETRIES-1):
                report_file = open(REPORT, "a")
                report_file.write(f"{url}, FAILED {type(e).__name__}\n")
                report_file.close()
                
                print(f"[{counter}] {path}")
                counter += 1
                break
            
            # Retry after 2 seconds
            await asyncio.sleep(2)
            print(f"[{counter}] {path} RETRYING...")
            retries += 1
        except ClientPayloadError as e:
            if(retries == MAX_RETRIES-1):
                report_file = open(REPORT, "a")
                report_file.write(f"{url}, FAILED {type(e).__name__}\n")
                report_file.close()
                
                print(f"[{counter}] {path}")
                counter += 1
                break
            
            # Retry after 2 seconds
            await asyncio.sleep(2)
            print(f"[{counter}] {path} RETRYING...")
            retries += 1

        except Exception as e:
            if(retries == MAX_RETRIES-1):
                report_file = open(REPORT, "a")
                report_file.write(f"{url}, FAILED {type(e).__name__}\n")
                report_file.close()
                
                print(f"[{counter}] {path}")
                counter += 1
                break
            
            # Retry after 2 seconds
            await asyncio.sleep(2)
            print(f"[{counter}] {path} RETRYING...")
            retries += 1


async def main():
    image_urls = open("urls.txt", "r").read().split("\n")
    total_url_count = len(image_urls)

    image_urls = list(filter(None, image_urls))

    # Splitting the total number of urls to perform batch processing
    batch_url_arr = []
    split_count = len(image_urls) // URLS_PER_BATCH
    for i in range(0, split_count+1):
        if (i == split_count):
            batch_url_arr.append(image_urls[(i * URLS_PER_BATCH):])
        else:
             batch_url_arr.append(image_urls[(i * URLS_PER_BATCH):((i + 1) * URLS_PER_BATCH)])

    # Async HTTP client session is created for each batch
    for urls in batch_url_arr:
        async with aiohttp.ClientSession(timeout = aiohttp.ClientTimeout(total=3600)) as session:
            tasks = [download_image(session, url, total_url_count) for url in urls]
            await asyncio.gather(*tasks)

if __name__ == '__main__':
    start_time = time.time()

    asyncio.run(main())

    end_time = time.time()
    print(f"Time taken: {end_time - start_time} seconds")

Comparing synchronous and asynchronous script for performance:

Note: The performance of the script may vary based on the network speed and system specification.

Synchronous Execution:

  • Took 756 secs to download 1000 images having total file size of 2.7gb

Asynchronous Execution:

  • Took 438 secs to download 1000 images in 1000 images per batch(1 batch)
  • Took 420 secs to download 1000 images in 500 images per batch(2 batches)
  • Took 456 secs to download 1000 images in 250 images per batch(4 batches)
  • Took 510 secs to download 1000 images in 100 images per batch(10 batches)

From this we could conclude that asynchronous approach is much faster and also possible to compute huge sets of data with batch processing.

Farewell 👋!

Last updated at: 7/8/2024, 4:50:12 PM


Timothy Rodriguez

Everything or Nothing | Cyber Security Enthusiast