Understanding async and parallel programming patterns through web crawler implementations
This guide explores Python concurrency patterns through web crawler implementations. Each pattern solves the same problem - crawling a website to discover all linked pages - but uses different concurrency strategies with unique trade-offs.
Web crawling is I/O-bound: most time is spent waiting for network responses. This makes it ideal for concurrent programming, where we can overlap waiting periods with useful work.
# Basic URL fetching function
def get_url(url):
parsed_url = urlparse(url)
hostname = parsed_url.hostname
port = parsed_url.port or 443
path = parsed_url.path
if parsed_url.hostname != "www.anniehu.com" or parsed_url.scheme == "mailto":
return ""
# Either use requests or raw sockets
if USE_REAL_HTTP:
r = requests.get(url)
if not r.headers["Content-Type"].startswith("text/html"):
return ""
return r.content.decode()
else:
# Raw socket implementation...
# Async version
async def get_url_async(url):
# Similar but with async I/O
# Link extraction
def find_links(path, content):
links = []
soup = BeautifulSoup(content, 'html.parser')
for a_tag in soup.find_all('a'):
links.append(a_tag.get('href'))
for img_tag in soup.find_all('img'):
links.append(img_tag.get('src'))
links = [urljoin(path, link) for link in links]
return links
The simplest implementation processes URLs one at a time, providing a reference point for performance comparisons.
def crawl_web(starting_link):
all_links = {starting_link}
links_to_explore = {starting_link}
start = time.perf_counter()
while links_to_explore:
link = links_to_explore.pop()
link_content = get_url(link)
new_links = find_links(link, link_content)
new_links = set(new_links) - all_links
links_to_explore.update(new_links)
all_links.add(link)
end = time.perf_counter()
print(f"{end - start}")
return all_links
frontier = {initial_work}
visited = set()
while frontier:
work = frontier.pop()
result = process(work)
new_work = extract_new_work(result)
new_work = filter_unvisited(new_work, visited)
frontier.update(new_work)
visited.add(work)
Process work items in batches, waiting for all items in a batch to complete before starting the next batch.
def crawl_web_thread_map(starting_link):
all_links = {starting_link}
links_to_explore = {starting_link}
def get_link_content(link):
return link, get_url(link)
start = time.perf_counter()
with concurrent.futures.ThreadPoolExecutor(max_workers=16) as executor:
while links_to_explore:
# Process all links in parallel
link_content_list = executor.map(get_link_content, links_to_explore)
links_to_explore.clear()
for link, link_content in link_content_list:
new_links = find_links(link, link_content)
new_links = set(new_links) - all_links
links_to_explore.update(new_links)
all_links.add(link)
end = time.perf_counter()
print(f"{end - start}")
return all_links
def get_link_content_proc(link):
return link, get_url(link)
def crawl_web_process_map(starting_link):
all_links = {starting_link}
links_to_explore = {starting_link}
start = time.perf_counter()
with concurrent.futures.ProcessPoolExecutor(max_workers=16) as executor:
while links_to_explore:
link_content_list = executor.map(get_link_content_proc, links_to_explore)
links_to_explore.clear()
for link, link_content in link_content_list:
new_links = find_links(link, link_content)
new_links = set(new_links) - all_links
links_to_explore.update(new_links)
all_links.add(link)
end = time.perf_counter()
print(f"{end - start}")
return all_links
def crawl_web_mp_map(starting_link):
all_links = {starting_link}
links_to_explore = {starting_link}
start = time.perf_counter()
with mp.Pool(16) as pool:
while links_to_explore:
new_links_to_explore = set()
# imap_unordered returns results as they complete
for link, link_content in pool.imap_unordered(get_link_content_proc,
links_to_explore):
new_links = find_links(link, link_content)
new_links = set(new_links) - all_links
new_links_to_explore.update(new_links)
all_links.add(link)
links_to_explore = new_links_to_explore
pool.close()
pool.join()
end = time.perf_counter()
print(f"{end - start}")
return all_links
frontier = {initial_work}
visited = set()
with Pool(workers) as pool:
while frontier:
# Map phase: distribute work
results = pool.map(process_work, frontier)
frontier.clear()
# Reduce phase: collect results
for result in results:
new_work = extract_new_work(result)
new_work = filter_unvisited(new_work, visited)
frontier.update(new_work)
visited.add(result.source)
The map() approach waits for all URLs in a batch to complete before starting the next batch. This can lead to idle workers if some URLs are slower than others. The comment mentions that multiprocessing is slow here because we have few work items per batch compared to processes, leading to inefficient chunk assignment.
Uses separate queues for input and output, with worker threads consuming from one queue and producing to another.
def crawl_web_threads(starting_link):
input_q, output_q = queue.Queue(), queue.Queue()
output_q.put(starting_link)
all_links = set() # Empty initially, unlike other implementations
def worker():
while True:
link = input_q.get()
if link is None: # Shutdown signal
break
link_content = get_url(link)
new_urls = find_links(link, link_content)
for url in new_urls:
output_q.put(url)
input_q.task_done()
# Create worker threads
threads = []
for i in range(16):
thread = threading.Thread(target=worker)
thread.start()
threads.append(thread)
start = time.perf_counter()
while True:
links = []
# Drain output queue
try:
while True:
link = output_q.get(block=False)
links.append(link)
except queue.Empty:
pass
links = set(links) - all_links
if not links:
break
all_links.update(links)
for link in links:
input_q.put(link)
input_q.join() # Wait for all tasks to complete
end = time.perf_counter()
print(f"{end - start}")
# Cleanup
for i in range(len(threads)):
input_q.put(None)
for thread in threads:
thread.join()
return all_links
input_queue = Queue()
output_queue = Queue()
output_queue.put(initial_work)
# Start workers
def worker():
while True:
work = input_queue.get()
if work is None: # Shutdown signal
break
result = process(work)
output_queue.put(result)
input_queue.task_done()
# Main coordinator loop
while True:
# Collect all results
results = drain_queue(output_queue)
new_work = filter_unprocessed(results)
if not new_work:
break
# Submit new work
for work in new_work:
input_queue.put(work)
input_queue.join() # Wait for completion
Workers consume from input queue, produce to output queue. Main thread coordinates by moving URLs between queues. The comment notes that daemon threads would be killed without cleanup when the program exits.
Run multiple async operations concurrently and wait for all to complete.
async def crawl_web_async_gather(starting_link):
all_links = {starting_link}
links_to_explore = {starting_link}
async def get_link_content(link):
return link, await get_url_async(link)
start = time.perf_counter()
while links_to_explore:
# Run all fetches concurrently
link_content_list = await asyncio.gather(*(
get_link_content(link) for link in links_to_explore
))
links_to_explore.clear()
for link, link_content in link_content_list:
new_links = find_links(link, link_content)
new_links = set(new_links) - all_links
links_to_explore.update(new_links)
all_links.add(link)
end = time.perf_counter()
print(f"{end - start}")
return all_links
async def crawl_web_async_task(starting_link):
all_links = {starting_link}
links_to_explore = {starting_link}
async def get_link_content(link):
return link, await get_url_async(link)
start = time.perf_counter()
async with asyncio.TaskGroup() as tg:
while links_to_explore:
link_content_list = []
# Create tasks within the group
for link in links_to_explore:
link_content_list.append(tg.create_task(get_link_content(link)))
links_to_explore.clear()
# Await each task
for task in link_content_list:
link, link_content = await task
new_links = find_links(link, link_content)
new_links = set(new_links) - all_links
links_to_explore.update(new_links)
all_links.add(link)
end = time.perf_counter()
print(f"{end - start}")
return all_links
frontier = {initial_work}
visited = set()
while frontier:
# Create concurrent tasks
tasks = [async_process(work) for work in frontier]
frontier.clear()
# Wait for all to complete
results = await gather_all(tasks)
# Process results
for result in results:
new_work = extract_new_work(result)
new_work = filter_unvisited(new_work, visited)
frontier.update(new_work)
visited.add(result.source)
Tasks can create new tasks dynamically as they discover work, avoiding batch synchronization.
async def crawl_web_async_task_callback(starting_link):
all_links = {starting_link}
async def get_link_content(link):
link_content = await get_url_async(link)
new_links = find_links(link, link_content)
new_links = set(new_links) - all_links
all_links.update(new_links)
# Dynamically create new tasks
for new_link in new_links:
tg.create_task(get_link_content(new_link))
start = time.perf_counter()
# When you exit the task group context, it awaits every task created
# inside the context. Tasks can create more tasks, which are also awaited.
async with asyncio.TaskGroup() as tg:
# Initial task spawns all others
tg.create_task(get_link_content(starting_link))
debug_print("before exit")
debug_print("after exit")
end = time.perf_counter()
print(f"{end - start}")
return all_links
def crawl_web_concurrent_futures(starting_link):
all_links = {starting_link}
lock = threading.RLock() # Or use careful ordering
def future_done(f):
new_links = f.result()
new_links = set(new_links) - all_links
all_links.update(new_links)
for link in new_links:
# Submit returns immediately
future = executor.submit(explore_link, link)
links_to_explore.add(future)
# Callback runs when future completes
future.add_done_callback(future_done)
links_to_explore.remove(f)
start = time.perf_counter()
with ThreadPoolExecutor(max_workers=16) as executor:
future = executor.submit(explore_link, starting_link)
future.add_done_callback(future_done)
links_to_explore = {future}
# Wait for all futures to complete
while links_to_explore:
time.sleep(0.01)
# Alternative: use as_completed
#completed_future = next(concurrent.futures.as_completed(links_to_explore))
#completed_future.result()
end = time.perf_counter()
print(f"{end - start}")
return all_links
The code demonstrates three approaches to handle concurrent callback modifications:
# Race condition without careful ordering:
# 1. submit() returns future
# 2. Callback might run immediately if task completes fast
# 3. Callback tries to remove future from set
# 4. But we haven't added it to the set yet!
# 5. KeyError!
# Solution: Add to set BEFORE registering callback
future = executor.submit(explore_link, link)
links_to_explore.add(future) # First
future.add_done_callback(future_done) # Then
visited = set()
active_tasks = set()
def task_complete_callback(result):
new_work = extract_new_work(result)
new_work = filter_unvisited(new_work, visited)
visited.update(new_work)
for work in new_work:
task = create_task(process, work)
active_tasks.add(task)
task.on_complete(task_complete_callback)
active_tasks.remove(current_task)
# Start with initial task
initial_task = create_task(process, initial_work)
active_tasks.add(initial_task)
initial_task.on_complete(task_complete_callback)
# Wait for all tasks to complete
wait_until(active_tasks.empty())
Combine different concurrency approaches for specific use cases.
async def crawl_web_async_thread(starting_link):
all_links = {starting_link}
links_to_explore = {starting_link}
async def get_link_content(link):
# Run synchronous function in thread pool
return link, await asyncio.to_thread(get_url, link)
start = time.perf_counter()
while links_to_explore:
link_content_list = await asyncio.gather(*(
get_link_content(link) for link in links_to_explore
))
links_to_explore.clear()
for link, link_content in link_content_list:
new_links = find_links(link, link_content)
new_links = set(new_links) - all_links
links_to_explore.update(new_links)
all_links.add(link)
end = time.perf_counter()
print(f"{end - start}")
return all_links
# When you have async code but need to call sync functions
async def hybrid_pattern():
async def process_with_fallback(work):
if has_async_implementation(work):
return await async_process(work)
else:
# Run sync code in thread pool
return await run_in_thread(sync_process, work)
# Use with any async pattern
results = await gather_all([
process_with_fallback(work) for work in frontier
])
Use this pattern when you have an async architecture but need to integrate synchronous libraries that don't support async I/O. The asyncio.to_thread() function runs synchronous code in a thread pool without blocking the event loop.
The implementation comments reveal three fundamental challenges in concurrent web crawling:
With parallel processing, the frontier might be temporarily empty while workers are still fetching pages.
submit() returns a Future object - check if it is done or wait for it to complete before considering the crawl finished.
If you submit 100 URLs, do you want to wait for all 100 to complete before submitting new work?
Submit new work as soon as it is discovered. concurrent.futures.as_completed() gives you futures as they finish, so you can have a main loop that waits for any future to complete and then processes it.
Add when you submit the work, not when you get the results, to prevent submitting the same URL multiple times.
| Pattern | Implementation | Concurrency Type | Best For | Key Limitation |
|---|---|---|---|---|
| Sequential | Basic loop | None | Simple scripts, debugging | No parallelism |
| MapReduce | executor.map() | Threads/Processes | Batch processing | Waits for slowest in batch |
| Producer-Consumer | Queue + Workers | Threads | Continuous processing | Queue coordination overhead |
| Gather/Batch | asyncio.gather | Coroutines | Known work sets | All-or-nothing completion |
| Dynamic Tasks | Callbacks/TaskGroup | Threads/Coroutines | Unknown work size | Complex state management |
| Hybrid | async + threads | Mixed | Legacy integration | Overhead of both models |