Asynchronous Instructor
Asynchronous usage pattern for Instructor
In this example, I was extracting a list of topics from a list of 1000 JSON objects loaded as a list of Python dicts.
I used the Anthropic Haiku model.
Using the instructor library with the asynchronous Anthropic client makes it much faster to make a large number of calls to the Anthropic API fairly quickly.
(is there a batch option instead, and does it work with instructor?)
Setup
First, we set up the Anthropic async client with Instructor. We also set up the Pydantic schema we'll be using for structured extraction.
import instructor from anthropic import Anthropic, AsyncAnthropic from pydantic import BaseModel from dotenv import load_dotenv from pydantic import BaseModel, Field # I had ANTHROPIC_API_KEY in a .env file load_dotenv() # set up the async client aclient = instructor.from_anthropic(AsyncAnthropic()) # set up pydantic schema class Topics(BaseModel): candidates: list = Field( description=( "List of topics that might be considered among the main topics of the talk." " Should include (1) high level categories like 'data science' or 'ai'; (2)" " key technologies/tools like 'spark' or 'LLMs', and (3) subtopics like" " 'structured streaaming' or 'retrieval-augmented generation'" ) ) topics: list = Field( description="List of the top main topics of the talk, from the list of candidates. Select at least one but no more than three main topics." )
Iterate throught the objects
import asyncio import time from typing import List import anthropic from tqdm import tqdm # Create a semaphore with a limit of 10 concurrent tasks sem = asyncio.Semaphore(10) topics_list: List[str] = [] new_event_data = event_data.copy() async def process_event(event: dict, sem: asyncio.Semaphore, progress_bar: tqdm) -> None: retries = 3 delay = 1 # Initial delay in seconds while retries > 0: try: # Acquire the semaphore to limit concurrency async with sem: # Make an asynchronous API call to create a message resp = await aclient.messages.create( model="claude-3-haiku-20240307", max_tokens=1024, messages=[ { "role": "user", "content": event['abstract'], } ], response_model=Topics, ) # Store the topics in the event dictionary event['topics'] = resp.topics topics_list.extend(resp.topics) progress_bar.update(1) return except Exception as e: if isinstance(e, anthropic._exceptions.RateLimitError): # Handle rate limit error retries -= 1 if retries > 0: print(f"Rate limit reached, retrying in {delay} seconds...") await asyncio.sleep(delay) delay *= 2 # Exponential backoff else: print("Maximum retries reached, skipping event.") else: # Handle other errors print(f"Error processing event: {e}") retries -= 1 if retries > 0: await asyncio.sleep(delay) delay *= 2 else: print("Maximum retries reached, skipping event.") async def run_in_notebook() -> None: tasks = [] with tqdm(total=len(new_event_data), unit="event") as progress_bar: for event in new_event_data: # Create a task for each event and add it to the tasks list tasks.append(process_event(event, sem, progress_bar)) # Wait for all tasks to complete concurrently await asyncio.gather(*tasks) # Enable asyncio support in Jupyter Notebook import nest_asyncio nest_asyncio.apply() # Run the async function in the notebook asyncio.get_event_loop().run_until_complete(run_in_notebook())
In this code, we:
- define a semaphor, which limits the number of concurrent tasks/concurrent API calls. This is important as the per-minute (and, unfortunately, the per-day) rate limits for Haiku are quite low.
- Rate limit errors result in a retry with backoff. Other exceptionls also result in retries.
await asyncio.gather(*tasks)
waits until the tasks are all finished.- The
nest_asyncio
library is used to run the async methods in a notebook.