< Annie Hu

Python Concurrency Patterns

Understanding async and parallel programming patterns through web crawler implementations

Overview

This guide explores Python concurrency patterns through web crawler implementations. Each pattern solves the same problem - crawling a website to discover all linked pages - but uses different concurrency strategies with unique trade-offs.

The Challenge

Web crawling is I/O-bound: most time is spent waiting for network responses. This makes it ideal for concurrent programming, where we can overlap waiting periods with useful work.

Core Components

# Basic URL fetching function
def get_url(url):
    parsed_url = urlparse(url)
    hostname = parsed_url.hostname
    port = parsed_url.port or 443
    path = parsed_url.path
    
    if parsed_url.hostname != "www.anniehu.com" or parsed_url.scheme == "mailto":
        return ""
    
    # Either use requests or raw sockets
    if USE_REAL_HTTP:
        r = requests.get(url)
        if not r.headers["Content-Type"].startswith("text/html"):
            return ""
        return r.content.decode()
    else:
        # Raw socket implementation...

# Async version
async def get_url_async(url):
    # Similar but with async I/O

# Link extraction
def find_links(path, content):
    links = []
    soup = BeautifulSoup(content, 'html.parser')
    
    for a_tag in soup.find_all('a'):
        links.append(a_tag.get('href'))
    for img_tag in soup.find_all('img'):
        links.append(img_tag.get('src'))
    
    links = [urljoin(path, link) for link in links]
    return links

Sequential Baseline

The simplest implementation processes URLs one at a time, providing a reference point for performance comparisons.

Implementation

def crawl_web(starting_link):
    all_links = {starting_link}
    links_to_explore = {starting_link}
    
    start = time.perf_counter()
    while links_to_explore:
        link = links_to_explore.pop()
        link_content = get_url(link)
        new_links = find_links(link, link_content)
        new_links = set(new_links) - all_links
        links_to_explore.update(new_links)
        all_links.add(link)
    end = time.perf_counter()
    print(f"{end - start}")
    
    return all_links

Generic Pattern

frontier = {initial_work}
visited = set()

while frontier:
    work = frontier.pop()
    result = process(work)
    new_work = extract_new_work(result)
    new_work = filter_unvisited(new_work, visited)
    frontier.update(new_work)
    visited.add(work)

Pros

  • Simple to understand and debug
  • No concurrency issues
  • Predictable behavior

Cons

  • Slow - no parallelism
  • Wastes time waiting for I/O
  • Poor resource utilization

MapReduce Pattern

Process work items in batches, waiting for all items in a batch to complete before starting the next batch.

1. ThreadPoolExecutor with map()

def crawl_web_thread_map(starting_link):
    all_links = {starting_link}
    links_to_explore = {starting_link}
    
    def get_link_content(link):
        return link, get_url(link)
    
    start = time.perf_counter()
    with concurrent.futures.ThreadPoolExecutor(max_workers=16) as executor:
        while links_to_explore:
            # Process all links in parallel
            link_content_list = executor.map(get_link_content, links_to_explore)
            links_to_explore.clear()
            
            for link, link_content in link_content_list:
                new_links = find_links(link, link_content)
                new_links = set(new_links) - all_links
                links_to_explore.update(new_links)
                all_links.add(link)
    end = time.perf_counter()
    print(f"{end - start}")
    
    return all_links

2. ProcessPoolExecutor

def get_link_content_proc(link):
    return link, get_url(link)

def crawl_web_process_map(starting_link):
    all_links = {starting_link}
    links_to_explore = {starting_link}
    
    start = time.perf_counter()
    with concurrent.futures.ProcessPoolExecutor(max_workers=16) as executor:
        while links_to_explore:
            link_content_list = executor.map(get_link_content_proc, links_to_explore)
            links_to_explore.clear()
            for link, link_content in link_content_list:
                new_links = find_links(link, link_content)
                new_links = set(new_links) - all_links
                links_to_explore.update(new_links)
                all_links.add(link)
    end = time.perf_counter()
    print(f"{end - start}")
    
    return all_links

3. Multiprocessing Pool with imap_unordered

def crawl_web_mp_map(starting_link):
    all_links = {starting_link}
    links_to_explore = {starting_link}
    
    start = time.perf_counter()
    with mp.Pool(16) as pool:
        while links_to_explore:
            new_links_to_explore = set()
            # imap_unordered returns results as they complete
            for link, link_content in pool.imap_unordered(get_link_content_proc,
                                                          links_to_explore):
                new_links = find_links(link, link_content)
                new_links = set(new_links) - all_links
                new_links_to_explore.update(new_links)
                all_links.add(link)
            links_to_explore = new_links_to_explore
        pool.close()
        pool.join()
    end = time.perf_counter()
    print(f"{end - start}")
    
    return all_links

Generic MapReduce Pattern

frontier = {initial_work}
visited = set()

with Pool(workers) as pool:
    while frontier:
        # Map phase: distribute work
        results = pool.map(process_work, frontier)
        frontier.clear()
        
        # Reduce phase: collect results
        for result in results:
            new_work = extract_new_work(result)
            new_work = filter_unvisited(new_work, visited)
            frontier.update(new_work)
            visited.add(result.source)

Performance Note

The map() approach waits for all URLs in a batch to complete before starting the next batch. This can lead to idle workers if some URLs are slower than others. The comment mentions that multiprocessing is slow here because we have few work items per batch compared to processes, leading to inefficient chunk assignment.

Producer-Consumer Pattern

Uses separate queues for input and output, with worker threads consuming from one queue and producing to another.

Threading with Queues

def crawl_web_threads(starting_link):
    input_q, output_q = queue.Queue(), queue.Queue()
    output_q.put(starting_link)
    all_links = set()  # Empty initially, unlike other implementations
    
    def worker():
        while True:
            link = input_q.get()
            if link is None:    # Shutdown signal
                break
            link_content = get_url(link)
            new_urls = find_links(link, link_content)
            for url in new_urls:
                output_q.put(url)
            input_q.task_done()
    
    # Create worker threads
    threads = []
    for i in range(16):
        thread = threading.Thread(target=worker)
        thread.start()
        threads.append(thread)
    
    start = time.perf_counter()
    while True:
        links = []
        # Drain output queue
        try:
            while True:
                link = output_q.get(block=False)
                links.append(link)
        except queue.Empty:
            pass
        
        links = set(links) - all_links
        if not links:
            break
        
        all_links.update(links)
        for link in links:
            input_q.put(link)
        input_q.join()  # Wait for all tasks to complete
    end = time.perf_counter()
    print(f"{end - start}")
    
    # Cleanup
    for i in range(len(threads)):
        input_q.put(None)
    
    for thread in threads:
        thread.join()
    
    return all_links

Generic Producer-Consumer Pattern

input_queue = Queue()
output_queue = Queue()
output_queue.put(initial_work)

# Start workers
def worker():
    while True:
        work = input_queue.get()
        if work is None:  # Shutdown signal
            break
        result = process(work)
        output_queue.put(result)
        input_queue.task_done()

# Main coordinator loop
while True:
    # Collect all results
    results = drain_queue(output_queue)
    new_work = filter_unprocessed(results)
    
    if not new_work:
        break
    
    # Submit new work
    for work in new_work:
        input_queue.put(work)
    input_queue.join()  # Wait for completion

Key Pattern: Double Queue

Workers consume from input queue, produce to output queue. Main thread coordinates by moving URLs between queues. The comment notes that daemon threads would be killed without cleanup when the program exits.

Gather/Batch Pattern

Run multiple async operations concurrently and wait for all to complete.

1. asyncio.gather()

async def crawl_web_async_gather(starting_link):
    all_links = {starting_link}
    links_to_explore = {starting_link}
    
    async def get_link_content(link):
        return link, await get_url_async(link)
    
    start = time.perf_counter()
    while links_to_explore:
        # Run all fetches concurrently
        link_content_list = await asyncio.gather(*(
            get_link_content(link) for link in links_to_explore
        ))
        links_to_explore.clear()
        
        for link, link_content in link_content_list:
            new_links = find_links(link, link_content)
            new_links = set(new_links) - all_links
            links_to_explore.update(new_links)
            all_links.add(link)
    end = time.perf_counter()
    print(f"{end - start}")
    
    return all_links

2. TaskGroup (Python 3.11+)

async def crawl_web_async_task(starting_link):
    all_links = {starting_link}
    links_to_explore = {starting_link}
    
    async def get_link_content(link):
        return link, await get_url_async(link)
    
    start = time.perf_counter()
    async with asyncio.TaskGroup() as tg:
        while links_to_explore:
            link_content_list = []
            # Create tasks within the group
            for link in links_to_explore:
                link_content_list.append(tg.create_task(get_link_content(link)))
            links_to_explore.clear()
            
            # Await each task
            for task in link_content_list:
                link, link_content = await task
                new_links = find_links(link, link_content)
                new_links = set(new_links) - all_links
                links_to_explore.update(new_links)
                all_links.add(link)
    end = time.perf_counter()
    print(f"{end - start}")
    
    return all_links

Generic Gather Pattern

frontier = {initial_work}
visited = set()

while frontier:
    # Create concurrent tasks
    tasks = [async_process(work) for work in frontier]
    frontier.clear()
    
    # Wait for all to complete
    results = await gather_all(tasks)
    
    # Process results
    for result in results:
        new_work = extract_new_work(result)
        new_work = filter_unvisited(new_work, visited)
        frontier.update(new_work)
        visited.add(result.source)

Dynamic Task Creation Pattern

Tasks can create new tasks dynamically as they discover work, avoiding batch synchronization.

1. TaskGroup with Callbacks

async def crawl_web_async_task_callback(starting_link):
    all_links = {starting_link}
    
    async def get_link_content(link):
        link_content = await get_url_async(link)
        new_links = find_links(link, link_content)
        new_links = set(new_links) - all_links
        all_links.update(new_links)
        # Dynamically create new tasks
        for new_link in new_links:
            tg.create_task(get_link_content(new_link))
    
    start = time.perf_counter()
    # When you exit the task group context, it awaits every task created
    # inside the context. Tasks can create more tasks, which are also awaited.
    async with asyncio.TaskGroup() as tg:
        # Initial task spawns all others
        tg.create_task(get_link_content(starting_link))
        debug_print("before exit")
    debug_print("after exit")
    end = time.perf_counter()
    print(f"{end - start}")
    
    return all_links

2. Futures with Callbacks

def crawl_web_concurrent_futures(starting_link):
    all_links = {starting_link}
    
    lock = threading.RLock()  # Or use careful ordering
    
    def future_done(f):
        new_links = f.result()
        new_links = set(new_links) - all_links
        all_links.update(new_links)
        for link in new_links:
            # Submit returns immediately
            future = executor.submit(explore_link, link)
            links_to_explore.add(future)
            # Callback runs when future completes
            future.add_done_callback(future_done)
        links_to_explore.remove(f)
    
    start = time.perf_counter()
    with ThreadPoolExecutor(max_workers=16) as executor:
        future = executor.submit(explore_link, starting_link)
        future.add_done_callback(future_done)
        links_to_explore = {future}
        
        # Wait for all futures to complete
        while links_to_explore:
            time.sleep(0.01)
            # Alternative: use as_completed
            #completed_future = next(concurrent.futures.as_completed(links_to_explore))
            #completed_future.result()
    
    end = time.perf_counter()
    print(f"{end - start}")
    
    return all_links

Three Locking Strategies

The code demonstrates three approaches to handle concurrent callback modifications:

  1. No Lock (Careful Ordering): Add to set before callback registration to avoid race conditions
  2. Regular Lock: Lock around modifications, callbacks added after to avoid deadlock
  3. Reentrant Lock (RLock): Allows same thread to acquire lock multiple times, avoiding deadlock when submit triggers immediate callback
# Race condition without careful ordering:
# 1. submit() returns future
# 2. Callback might run immediately if task completes fast
# 3. Callback tries to remove future from set
# 4. But we haven't added it to the set yet!
# 5. KeyError!

# Solution: Add to set BEFORE registering callback
future = executor.submit(explore_link, link)
links_to_explore.add(future)  # First
future.add_done_callback(future_done)  # Then

Generic Dynamic Task Pattern

visited = set()
active_tasks = set()

def task_complete_callback(result):
    new_work = extract_new_work(result)
    new_work = filter_unvisited(new_work, visited)
    visited.update(new_work)
    
    for work in new_work:
        task = create_task(process, work)
        active_tasks.add(task)
        task.on_complete(task_complete_callback)
    
    active_tasks.remove(current_task)

# Start with initial task
initial_task = create_task(process, initial_work)
active_tasks.add(initial_task)
initial_task.on_complete(task_complete_callback)

# Wait for all tasks to complete
wait_until(active_tasks.empty())

Hybrid Patterns

Combine different concurrency approaches for specific use cases.

Async with Threading Fallback

async def crawl_web_async_thread(starting_link):
    all_links = {starting_link}
    links_to_explore = {starting_link}
    
    async def get_link_content(link):
        # Run synchronous function in thread pool
        return link, await asyncio.to_thread(get_url, link)
    
    start = time.perf_counter()
    while links_to_explore:
        link_content_list = await asyncio.gather(*(
            get_link_content(link) for link in links_to_explore
        ))
        links_to_explore.clear()
        for link, link_content in link_content_list:
            new_links = find_links(link, link_content)
            new_links = set(new_links) - all_links
            links_to_explore.update(new_links)
            all_links.add(link)
    end = time.perf_counter()
    print(f"{end - start}")
    
    return all_links

Generic Hybrid Pattern

# When you have async code but need to call sync functions
async def hybrid_pattern():
    async def process_with_fallback(work):
        if has_async_implementation(work):
            return await async_process(work)
        else:
            # Run sync code in thread pool
            return await run_in_thread(sync_process, work)
    
    # Use with any async pattern
    results = await gather_all([
        process_with_fallback(work) for work in frontier
    ])

When to Use

Use this pattern when you have an async architecture but need to integrate synchronous libraries that don't support async I/O. The asyncio.to_thread() function runs synchronous code in a thread pool without blocking the event loop.

Concurrency Challenges

The implementation comments reveal three fundamental challenges in concurrent web crawling:

1. How Will You Know When You're Done?

The Problem

With parallel processing, the frontier might be temporarily empty while workers are still fetching pages.

Timing Issue

  1. Pop the last link and submit it
  2. links_to_explore is now empty
  3. Your while loop exits
  4. That worker hasn't finished yet, it might discover 10 new links

Solution

submit() returns a Future object - check if it is done or wait for it to complete before considering the crawl finished.

2. When Should You Submit New Work?

The Question

If you submit 100 URLs, do you want to wait for all 100 to complete before submitting new work?

Better Approach

Submit new work as soon as it is discovered. concurrent.futures.as_completed() gives you futures as they finish, so you can have a main loop that waits for any future to complete and then processes it.

Flow

  1. Start with some futures in pending_futures
  2. Wait for ANY one of them to complete
  3. Move it from pending to done
  4. Process its result (maybe create new futures)
  5. Add any new futures to pending_futures
  6. Repeat until pending_futures is empty

3. When Should We Mark URLs as Visited?

The Problem

  • If page A links to page B, and page B links back to page A, what happens?
  • What if the same link appears on multiple pages?

Visit Options

  1. When you discover it
  2. When you submit it for processing

Solution

Add when you submit the work, not when you get the results, to prevent submitting the same URL multiple times.

Example Race Condition

  1. Page A links to pages B and C
  2. Page B links to page C
  3. You're processing A and B concurrently

Timeline

  1. Thread 1 processing A discovers C
  2. Thread 2 processing B also discovers C
  3. Both threads reach the main loop with C in their results

In the Main Loop

  1. Thread 1's future completes first
  2. Main loop checks: C not in all_links (YES)
  3. Adds C to all_links and submits it
  4. Thread 2's future completes
  5. Main loop checks: C is now in all_links (NO)
  6. Doesn't submit C again

Performance Comparison

Pattern Implementation Concurrency Type Best For Key Limitation
Sequential Basic loop None Simple scripts, debugging No parallelism
MapReduce executor.map() Threads/Processes Batch processing Waits for slowest in batch
Producer-Consumer Queue + Workers Threads Continuous processing Queue coordination overhead
Gather/Batch asyncio.gather Coroutines Known work sets All-or-nothing completion
Dynamic Tasks Callbacks/TaskGroup Threads/Coroutines Unknown work size Complex state management
Hybrid async + threads Mixed Legacy integration Overhead of both models

Key Insights

API Reference

Threading APIs

threading.Thread threading.RLock queue.Queue concurrent.futures.ThreadPoolExecutor

Multiprocessing APIs

multiprocessing.Pool Pool.imap_unordered() concurrent.futures.ProcessPoolExecutor

AsyncIO APIs

asyncio.gather() asyncio.TaskGroup asyncio.to_thread() asyncio.open_connection()

Futures APIs

concurrent.futures.Future Future.add_done_callback() concurrent.futures.as_completed()

Supporting Libraries

httpx (async HTTP client) requests (sync HTTP client) BeautifulSoup (HTML parsing)