3 minutes
Real-time Token Usage Monitoring
Working with LLMs through Bedrock is great until you need to track token usage across different models, users, and applications. Sure you can check CloudWatch metrics (or AWS Cost Explorer?), but by then it’s too late. You need real-time visibility into token consumption patterns.
The Solution
We’ll leverage LangChain’s callback system to intercept every Bedrock call, extract token metrics, and stream them to SQS. From there, you can build whatever monitoring stack makes sense for your use case.
Here’s what we’re tracking:
{
"userId": "dev_test_123", # Who's using it
"modelId": "anthropic.claude-3-sonnet-20240229-v1:0", # Which model
"inputTokens": 156, # Tokens in
"outputTokens": 423, # Tokens out
"applicationId": "code-review-bot", # Which app
"timestamp": "2024-01-23T14:30:00Z" # When
}
First, let’s set up our callback handler:
from langchain.callbacks.base import BaseCallbackHandler
from langchain_aws import ChatBedrock
import boto3
import json
from datetime import datetime
from typing import Any, Dict
class TokenUsageHandler(BaseCallbackHandler):
"""Captures token usage from Bedrock calls and ships it to SQS"""
def __init__(self, user_id: str, application_id: str):
self.user_id = user_id
self.application_id = application_id
# In prod, you'll want to init this once and reuse
self.sqs = boto3.client('sqs')
self.queue_url = 'YOUR_SQS_QUEUE_URL'
def on_llm_end(self, response: Any, **kwargs: Any) -> None:
"""Fires after each successful LLM call"""
try:
# Extract usage from response
usage = response.llm_output.get('usage', {})
model_id = response.llm_output.get('model_id')
if not usage or not model_id:
print("Warning: Missing usage data in response")
return
# Structure the data
usage_data = {
"timestamp": datetime.utcnow().isoformat(),
"userId": self.user_id,
"applicationId": self.application_id,
"inputTokens": usage.get('prompt_tokens'),
"outputTokens": usage.get('completion_tokens'),
"modelId": model_id
}
# Ship it to SQS
self.sqs.send_message(
QueueUrl=self.queue_url,
MessageBody=json.dumps(usage_data)
)
except Exception as e:
# Log and continue - don't break the main application flow
print(f"Failed to process token usage: {str(e)}")
Using it in your application:
# Initialize your Bedrock client
bedrock_client = boto3.client('bedrock-runtime')
# Set up the chat model
chat = ChatBedrock(
model_id="anthropic.claude-3-sonnet-20240229-v1:0",
client=bedrock_client,
model_kwargs={
"temperature": 0.7,
"max_tokens": 5000
}
)
# Create your usage handler
handler = TokenUsageHandler(
user_id="dev_test_123", # However you track users
application_id="code-review-bot" # Your app identifier
)
# Make your API call
response = chat.invoke(
messages=[{"role": "user", "content": "Review this code..."}],
config={"callbacks": [handler]}
)
Production Considerations
Before you deploy this:
-
Error Handling: The current implementation is basic. You might want to:
- Add retries for SQS failures
- Implement proper logging
- Consider what happens if usage data is missing
-
Performance: The handler creates a new SQS client for each instance. In production:
- Make the SQS client a singleton
- Consider batching messages
- Monitor for latency impact
-
Configuration: Don’t hardcode the queue URL:
self.queue_url = os.environ['SQS_QUEUE_URL']
What Next?
Once you have token usage data flowing into SQS, you can:
- Process it with Lambda
- Store it in a persistent data store
- Set up alerts, build dashboards to analyze patterns, etc
This implementation gives you real-time visibility into token usage with minimal overhead. It’s a starting point you can adapt based on your monitoring needs.