Overview
Monzoh provides both synchronous and asynchronous clients for the Monzo API. TheAsyncMonzoClient
offers native async/await support, while the synchronous MonzoClient
can be integrated with async applications using thread execution patterns. This guide covers both approaches.
Why Async Integration?
Integrating Monzoh with async frameworks is useful for:- Web Applications: FastAPI, Starlette, and other async web frameworks
- Concurrent Processing: Handle multiple API calls simultaneously
- Non-blocking Operations: Keep your application responsive during API calls
- Background Tasks: Process Monzo data without blocking the main thread
Native Async Client
Using AsyncMonzoClient (Recommended)
TheAsyncMonzoClient
provides native async/await support:
Copy
import asyncio
from monzoh import AsyncMonzoClient
async def get_accounts_async():
"""Get accounts using native async client."""
async with AsyncMonzoClient() as client:
accounts = await client.accounts.list()
return accounts
async def get_account_balance_async(account_id: str):
"""Get account balance using native async client."""
async with AsyncMonzoClient() as client:
balance = await client.accounts.get_balance(account_id=account_id)
return balance
# Usage
async def main():
accounts = await get_accounts_async()
print(f"Found {len(accounts)} accounts")
if accounts:
balance = await get_account_balance_async(accounts[0].id)
print(f"Balance: £{balance.balance / 100:.2f}")
# Run the async function
asyncio.run(main())
Context Manager Usage
The async client supports async context managers for proper resource cleanup:Copy
import asyncio
from monzoh import AsyncMonzoClient
async def main():
async with AsyncMonzoClient() as client:
# Get basic info
whoami = await client.whoami()
print(f"Authenticated as: {whoami.user_id}")
# Get accounts
accounts = await client.accounts.list()
print(f"Found {len(accounts)} accounts")
# Get transactions for first account
if accounts:
transactions = await client.transactions.list(
account_id=accounts[0].id,
limit=10
)
print(f"Recent transactions: {len(transactions)}")
asyncio.run(main())
FastAPI Integration
Monzoh integrates seamlessly with FastAPI applications using the native async client:Copy
from fastapi import FastAPI, HTTPException, Depends
from contextlib import asynccontextmanager
from typing import List
from monzoh import AsyncMonzoClient, MonzoError
from monzoh.models import Account, Balance, Transaction
# Create a shared client for the application
@asynccontextmanager
async def lifespan(app: FastAPI):
# Create shared async client
app.state.monzo_client = AsyncMonzoClient()
yield
# Clean up
await app.state.monzo_client.__aexit__(None, None, None)
app = FastAPI(title="Monzo API Proxy", version="1.0.0", lifespan=lifespan)
def get_monzo_client() -> AsyncMonzoClient:
return app.state.monzo_client
@app.get("/accounts", response_model=List[Account])
async def get_accounts(client: AsyncMonzoClient = Depends(get_monzo_client)):
"""Get all accounts."""
try:
accounts = await client.accounts.list()
return accounts
except MonzoError as e:
raise HTTPException(status_code=400, detail=str(e))
@app.get("/accounts/{account_id}/balance", response_model=Balance)
async def get_account_balance(
account_id: str,
client: AsyncMonzoClient = Depends(get_monzo_client)
):
"""Get account balance."""
try:
balance = await client.accounts.get_balance(account_id=account_id)
return balance
except MonzoError as e:
raise HTTPException(status_code=400, detail=str(e))
@app.get("/accounts/{account_id}/transactions", response_model=List[Transaction])
async def get_transactions(
account_id: str,
limit: int = 100,
client: AsyncMonzoClient = Depends(get_monzo_client)
):
"""Get account transactions."""
try:
transactions = await client.transactions.list(
account_id=account_id,
limit=limit
)
return transactions
except MonzoError as e:
raise HTTPException(status_code=400, detail=str(e))
@app.get("/dashboard/{account_id}")
async def get_dashboard_data(
account_id: str,
client: AsyncMonzoClient = Depends(get_monzo_client)
):
"""Get dashboard data with concurrent API calls."""
try:
# Make multiple API calls concurrently using native async client
balance, transactions, pots = await asyncio.gather(
client.accounts.get_balance(account_id=account_id),
client.transactions.list(account_id=account_id, limit=20),
client.pots.list(),
return_exceptions=True
)
# Handle any exceptions
if isinstance(balance, Exception):
raise balance
if isinstance(transactions, Exception):
raise transactions
if isinstance(pots, Exception):
raise pots
return {
"balance": {
"current": balance.balance / 100,
"spend_today": balance.spend_today / 100,
"currency": balance.currency
},
"recent_transactions": [
{
"id": t.id,
"description": t.description,
"amount": t.amount / 100,
"created": t.created
}
for t in transactions[:10]
],
"pots_summary": {
"total_pots": len(pots),
"total_saved": sum(p.balance for p in pots) / 100
}
}
except MonzoError as e:
raise HTTPException(status_code=400, detail=str(e))
# Run with: uvicorn main:app --reload
Background Tasks with Celery
For background processing with Celery:Copy
from celery import Celery
from monzoh import MonzoClient, MonzoError
import logging
# Configure Celery
celery_app = Celery('monzo_tasks')
celery_app.config_from_object('celeryconfig')
logger = logging.getLogger(__name__)
@celery_app.task(bind=True, max_retries=3)
def sync_account_data(self, account_id: str):
"""Background task to sync account data."""
try:
client = MonzoClient()
# Get latest account data
balance = client.accounts.get_balance(account_id=account_id)
transactions = client.transactions.list(
account_id=account_id,
limit=100
)
# Process and store data (your business logic here)
result = {
"account_id": account_id,
"balance": balance.balance / 100,
"transaction_count": len(transactions),
"last_sync": "2023-01-01T00:00:00Z" # Current timestamp
}
logger.info(f"Synced data for account {account_id}")
return result
except MonzoError as e:
logger.error(f"Monzo API error for account {account_id}: {e}")
# Retry with exponential backoff
raise self.retry(countdown=60 * (2 ** self.request.retries))
@celery_app.task
def sync_all_accounts():
"""Sync data for all accounts."""
try:
client = MonzoClient()
accounts = client.accounts.list()
# Queue individual sync tasks
for account in accounts:
sync_account_data.delay(account.id)
return f"Queued sync for {len(accounts)} accounts"
except MonzoError as e:
logger.error(f"Failed to list accounts: {e}")
raise
# Schedule periodic sync (in celeryconfig.py)
# from celery.schedules import crontab
#
# beat_schedule = {
# 'sync-accounts-hourly': {
# 'task': 'tasks.sync_all_accounts',
# 'schedule': crontab(minute=0), # Every hour
# },
# }
Custom Async Context Managers
You can create custom async context managers for additional functionality:Copy
import asyncio
from contextlib import asynccontextmanager
from monzoh import AsyncMonzoClient, MonzoError
import httpx
@asynccontextmanager
async def monzo_client_with_custom_timeout(timeout: float = 60.0):
"""Async context manager for Monzo client with custom timeout."""
async with httpx.AsyncClient(timeout=timeout) as http_client:
async with AsyncMonzoClient(http_client=http_client) as client:
yield client
# Usage
async def get_account_summary():
async with monzo_client_with_custom_timeout(timeout=120.0) as client:
accounts = await client.accounts.list()
summaries = []
for account in accounts:
balance = await client.accounts.get_balance(account_id=account.id)
summaries.append({
"account": account.description,
"balance": balance.balance / 100
})
return summaries
Concurrent API Calls
Process multiple accounts or operations concurrently using the native async client:Copy
import asyncio
from monzoh import AsyncMonzoClient
from typing import List, Dict, Any
async def process_account_concurrently(
client: AsyncMonzoClient,
account_id: str
) -> Dict[str, Any]:
"""Process a single account's data concurrently."""
# Make multiple calls for this account concurrently
balance, transactions = await asyncio.gather(
client.accounts.get_balance(account_id=account_id),
client.transactions.list(account_id=account_id, limit=50)
)
# Calculate spending statistics
spending_today = balance.spend_today / 100
total_spent = sum(
abs(t.amount) for t in transactions
if t.amount < 0
) / 100
return {
"account_id": account_id,
"balance": balance.balance / 100,
"spending_today": spending_today,
"total_spent_recent": total_spent,
"transaction_count": len(transactions)
}
async def get_financial_overview():
"""Get overview of all accounts concurrently."""
async with AsyncMonzoClient() as client:
# Get all accounts first
accounts = await client.accounts.list()
# Process all accounts concurrently
tasks = [
process_account_concurrently(client, account.id)
for account in accounts
]
account_summaries = await asyncio.gather(*tasks)
# Aggregate data
total_balance = sum(summary["balance"] for summary in account_summaries)
total_spending = sum(summary["spending_today"] for summary in account_summaries)
return {
"overview": {
"total_balance": total_balance,
"total_spending_today": total_spending,
"account_count": len(accounts)
},
"accounts": account_summaries
}
# Usage
async def main():
overview = await get_financial_overview()
print(f"Total balance: £{overview['overview']['total_balance']:.2f}")
print(f"Spending today: £{overview['overview']['total_spending_today']:.2f}")
asyncio.run(main())
Error Handling in Async Context
Handle errors properly with the native async client:Copy
import asyncio
import logging
from monzoh import (
AsyncMonzoClient,
MonzoError,
MonzoRateLimitError,
MonzoNetworkError
)
logger = logging.getLogger(__name__)
async def robust_api_call(client_coro, max_retries=3):
"""Make a robust async API call with retries."""
for attempt in range(max_retries):
try:
result = await client_coro
return result
except MonzoRateLimitError as e:
if attempt < max_retries - 1:
delay = (2 ** attempt) * 60 # Exponential backoff in minutes
logger.warning(f"Rate limited, retrying in {delay}s: {e}")
await asyncio.sleep(delay)
else:
logger.error(f"Rate limit exceeded after {max_retries} attempts")
raise
except MonzoNetworkError as e:
if attempt < max_retries - 1:
delay = 2 ** attempt # Exponential backoff in seconds
logger.warning(f"Network error, retrying in {delay}s: {e}")
await asyncio.sleep(delay)
else:
logger.error(f"Network error after {max_retries} attempts")
raise
except MonzoError as e:
logger.error(f"Non-retryable Monzo error: {e}")
raise
async def safe_get_accounts():
"""Safely get accounts with error handling."""
try:
async with AsyncMonzoClient() as client:
accounts = await robust_api_call(client.accounts.list())
return accounts
except MonzoError as e:
logger.error(f"Failed to get accounts: {e}")
return []
async def main():
accounts = await safe_get_accounts()
if accounts:
print(f"Successfully retrieved {len(accounts)} accounts")
else:
print("Failed to retrieve accounts or no accounts found")
asyncio.run(main())
Performance Considerations
Connection Pooling
TheAsyncMonzoClient
automatically handles connection pooling with proper resource management:
Copy
import asyncio
import httpx
from monzoh import AsyncMonzoClient
async def create_optimized_client():
"""Create async client with optimized connection pooling."""
# Custom HTTP client with connection pooling
http_client = httpx.AsyncClient(
limits=httpx.Limits(
max_connections=20,
max_keepalive_connections=10
),
timeout=30.0
)
return AsyncMonzoClient(http_client=http_client)
async def batch_process_accounts():
"""Process accounts in batches for better performance."""
async with create_optimized_client() as client:
# Get accounts
accounts = await client.accounts.list()
# Process accounts in batches to avoid overwhelming the API
batch_size = 5
for i in range(0, len(accounts), batch_size):
batch = accounts[i:i + batch_size]
# Process batch concurrently
tasks = [
client.accounts.get_balance(account_id=account.id)
for account in batch
]
balances = await asyncio.gather(*tasks)
for account, balance in zip(batch, balances):
print(f"{account.description}: £{balance.balance / 100:.2f}")
# Optional: small delay between batches to be API-friendly
if i + batch_size < len(accounts):
await asyncio.sleep(0.1)
asyncio.run(batch_process_accounts())
Best Practices
- Use
AsyncMonzoClient
for new projects - it provides native async/await support - Use async context managers (
async with
) for proper resource cleanup - Limit concurrency to avoid overwhelming the API (batch requests when needed)
- Handle errors gracefully with proper retry logic for transient failures
- Use connection pooling by passing a custom
httpx.AsyncClient
when needed - Monitor performance and adjust concurrency levels based on your requirements
Migration from Sync to Async
To migrate from the synchronous client to the async client:Copy
# Before (sync)
from monzoh import MonzoClient
client = MonzoClient()
accounts = client.accounts.list()
balance = client.accounts.get_balance(account_id=accounts[0].id)
# After (async)
from monzoh import AsyncMonzoClient
async with AsyncMonzoClient() as client:
accounts = await client.accounts.list()
balance = await client.accounts.get_balance(account_id=accounts[0].id)