Skip to main content

Overview

Monzoh provides both synchronous and asynchronous clients for the Monzo API. The AsyncMonzoClient offers native async/await support, while the synchronous MonzoClient can be integrated with async applications using thread execution patterns. This guide covers both approaches.

Why Async Integration?

Integrating Monzoh with async frameworks is useful for:
  • Web Applications: FastAPI, Starlette, and other async web frameworks
  • Concurrent Processing: Handle multiple API calls simultaneously
  • Non-blocking Operations: Keep your application responsive during API calls
  • Background Tasks: Process Monzo data without blocking the main thread

Native Async Client

The AsyncMonzoClient provides native async/await support:
import asyncio
from monzoh import AsyncMonzoClient

async def get_accounts_async():
    """Get accounts using native async client."""
    async with AsyncMonzoClient() as client:
        accounts = await client.accounts.list()
        return accounts

async def get_account_balance_async(account_id: str):
    """Get account balance using native async client."""
    async with AsyncMonzoClient() as client:
        balance = await client.accounts.get_balance(account_id=account_id)
        return balance

# Usage
async def main():
    accounts = await get_accounts_async()
    print(f"Found {len(accounts)} accounts")
    
    if accounts:
        balance = await get_account_balance_async(accounts[0].id)
        print(f"Balance: £{balance.balance / 100:.2f}")

# Run the async function
asyncio.run(main())

Context Manager Usage

The async client supports async context managers for proper resource cleanup:
import asyncio
from monzoh import AsyncMonzoClient

async def main():
    async with AsyncMonzoClient() as client:
        # Get basic info
        whoami = await client.whoami()
        print(f"Authenticated as: {whoami.user_id}")
        
        # Get accounts
        accounts = await client.accounts.list()
        print(f"Found {len(accounts)} accounts")
        
        # Get transactions for first account
        if accounts:
            transactions = await client.transactions.list(
                account_id=accounts[0].id,
                limit=10
            )
            print(f"Recent transactions: {len(transactions)}")

asyncio.run(main())

FastAPI Integration

Monzoh integrates seamlessly with FastAPI applications using the native async client:
from fastapi import FastAPI, HTTPException, Depends
from contextlib import asynccontextmanager
from typing import List
from monzoh import AsyncMonzoClient, MonzoError
from monzoh.models import Account, Balance, Transaction

# Create a shared client for the application
@asynccontextmanager
async def lifespan(app: FastAPI):
    # Create shared async client
    app.state.monzo_client = AsyncMonzoClient()
    yield
    # Clean up
    await app.state.monzo_client.__aexit__(None, None, None)

app = FastAPI(title="Monzo API Proxy", version="1.0.0", lifespan=lifespan)

def get_monzo_client() -> AsyncMonzoClient:
    return app.state.monzo_client

@app.get("/accounts", response_model=List[Account])
async def get_accounts(client: AsyncMonzoClient = Depends(get_monzo_client)):
    """Get all accounts."""
    try:
        accounts = await client.accounts.list()
        return accounts
    except MonzoError as e:
        raise HTTPException(status_code=400, detail=str(e))

@app.get("/accounts/{account_id}/balance", response_model=Balance)
async def get_account_balance(
    account_id: str,
    client: AsyncMonzoClient = Depends(get_monzo_client)
):
    """Get account balance."""
    try:
        balance = await client.accounts.get_balance(account_id=account_id)
        return balance
    except MonzoError as e:
        raise HTTPException(status_code=400, detail=str(e))

@app.get("/accounts/{account_id}/transactions", response_model=List[Transaction])
async def get_transactions(
    account_id: str,
    limit: int = 100,
    client: AsyncMonzoClient = Depends(get_monzo_client)
):
    """Get account transactions."""
    try:
        transactions = await client.transactions.list(
            account_id=account_id,
            limit=limit
        )
        return transactions
    except MonzoError as e:
        raise HTTPException(status_code=400, detail=str(e))

@app.get("/dashboard/{account_id}")
async def get_dashboard_data(
    account_id: str,
    client: AsyncMonzoClient = Depends(get_monzo_client)
):
    """Get dashboard data with concurrent API calls."""
    try:
        # Make multiple API calls concurrently using native async client
        balance, transactions, pots = await asyncio.gather(
            client.accounts.get_balance(account_id=account_id),
            client.transactions.list(account_id=account_id, limit=20),
            client.pots.list(),
            return_exceptions=True
        )
        
        # Handle any exceptions
        if isinstance(balance, Exception):
            raise balance
        if isinstance(transactions, Exception):
            raise transactions
        if isinstance(pots, Exception):
            raise pots
        
        return {
            "balance": {
                "current": balance.balance / 100,
                "spend_today": balance.spend_today / 100,
                "currency": balance.currency
            },
            "recent_transactions": [
                {
                    "id": t.id,
                    "description": t.description,
                    "amount": t.amount / 100,
                    "created": t.created
                }
                for t in transactions[:10]
            ],
            "pots_summary": {
                "total_pots": len(pots),
                "total_saved": sum(p.balance for p in pots) / 100
            }
        }
        
    except MonzoError as e:
        raise HTTPException(status_code=400, detail=str(e))

# Run with: uvicorn main:app --reload

Background Tasks with Celery

For background processing with Celery:
from celery import Celery
from monzoh import MonzoClient, MonzoError
import logging

# Configure Celery
celery_app = Celery('monzo_tasks')
celery_app.config_from_object('celeryconfig')

logger = logging.getLogger(__name__)

@celery_app.task(bind=True, max_retries=3)
def sync_account_data(self, account_id: str):
    """Background task to sync account data."""
    try:
        client = MonzoClient()
        
        # Get latest account data
        balance = client.accounts.get_balance(account_id=account_id)
        transactions = client.transactions.list(
            account_id=account_id,
            limit=100
        )
        
        # Process and store data (your business logic here)
        result = {
            "account_id": account_id,
            "balance": balance.balance / 100,
            "transaction_count": len(transactions),
            "last_sync": "2023-01-01T00:00:00Z"  # Current timestamp
        }
        
        logger.info(f"Synced data for account {account_id}")
        return result
        
    except MonzoError as e:
        logger.error(f"Monzo API error for account {account_id}: {e}")
        # Retry with exponential backoff
        raise self.retry(countdown=60 * (2 ** self.request.retries))

@celery_app.task
def sync_all_accounts():
    """Sync data for all accounts."""
    try:
        client = MonzoClient()
        accounts = client.accounts.list()
        
        # Queue individual sync tasks
        for account in accounts:
            sync_account_data.delay(account.id)
            
        return f"Queued sync for {len(accounts)} accounts"
        
    except MonzoError as e:
        logger.error(f"Failed to list accounts: {e}")
        raise

# Schedule periodic sync (in celeryconfig.py)
# from celery.schedules import crontab
# 
# beat_schedule = {
#     'sync-accounts-hourly': {
#         'task': 'tasks.sync_all_accounts',
#         'schedule': crontab(minute=0),  # Every hour
#     },
# }

Custom Async Context Managers

You can create custom async context managers for additional functionality:
import asyncio
from contextlib import asynccontextmanager
from monzoh import AsyncMonzoClient, MonzoError
import httpx

@asynccontextmanager
async def monzo_client_with_custom_timeout(timeout: float = 60.0):
    """Async context manager for Monzo client with custom timeout."""
    async with httpx.AsyncClient(timeout=timeout) as http_client:
        async with AsyncMonzoClient(http_client=http_client) as client:
            yield client

# Usage
async def get_account_summary():
    async with monzo_client_with_custom_timeout(timeout=120.0) as client:
        accounts = await client.accounts.list()
        
        summaries = []
        for account in accounts:
            balance = await client.accounts.get_balance(account_id=account.id)
            summaries.append({
                "account": account.description,
                "balance": balance.balance / 100
            })
        
        return summaries

Concurrent API Calls

Process multiple accounts or operations concurrently using the native async client:
import asyncio
from monzoh import AsyncMonzoClient
from typing import List, Dict, Any

async def process_account_concurrently(
    client: AsyncMonzoClient,
    account_id: str
) -> Dict[str, Any]:
    """Process a single account's data concurrently."""
    
    # Make multiple calls for this account concurrently
    balance, transactions = await asyncio.gather(
        client.accounts.get_balance(account_id=account_id),
        client.transactions.list(account_id=account_id, limit=50)
    )
    
    # Calculate spending statistics
    spending_today = balance.spend_today / 100
    total_spent = sum(
        abs(t.amount) for t in transactions 
        if t.amount < 0
    ) / 100
    
    return {
        "account_id": account_id,
        "balance": balance.balance / 100,
        "spending_today": spending_today,
        "total_spent_recent": total_spent,
        "transaction_count": len(transactions)
    }

async def get_financial_overview():
    """Get overview of all accounts concurrently."""
    async with AsyncMonzoClient() as client:
        # Get all accounts first
        accounts = await client.accounts.list()
        
        # Process all accounts concurrently
        tasks = [
            process_account_concurrently(client, account.id)
            for account in accounts
        ]
        
        account_summaries = await asyncio.gather(*tasks)
        
        # Aggregate data
        total_balance = sum(summary["balance"] for summary in account_summaries)
        total_spending = sum(summary["spending_today"] for summary in account_summaries)
        
        return {
            "overview": {
                "total_balance": total_balance,
                "total_spending_today": total_spending,
                "account_count": len(accounts)
            },
            "accounts": account_summaries
        }

# Usage
async def main():
    overview = await get_financial_overview()
    print(f"Total balance: £{overview['overview']['total_balance']:.2f}")
    print(f"Spending today: £{overview['overview']['total_spending_today']:.2f}")

asyncio.run(main())

Error Handling in Async Context

Handle errors properly with the native async client:
import asyncio
import logging
from monzoh import (
    AsyncMonzoClient,
    MonzoError,
    MonzoRateLimitError,
    MonzoNetworkError
)

logger = logging.getLogger(__name__)

async def robust_api_call(client_coro, max_retries=3):
    """Make a robust async API call with retries."""
    
    for attempt in range(max_retries):
        try:
            result = await client_coro
            return result
            
        except MonzoRateLimitError as e:
            if attempt < max_retries - 1:
                delay = (2 ** attempt) * 60  # Exponential backoff in minutes
                logger.warning(f"Rate limited, retrying in {delay}s: {e}")
                await asyncio.sleep(delay)
            else:
                logger.error(f"Rate limit exceeded after {max_retries} attempts")
                raise
                
        except MonzoNetworkError as e:
            if attempt < max_retries - 1:
                delay = 2 ** attempt  # Exponential backoff in seconds
                logger.warning(f"Network error, retrying in {delay}s: {e}")
                await asyncio.sleep(delay)
            else:
                logger.error(f"Network error after {max_retries} attempts")
                raise
                
        except MonzoError as e:
            logger.error(f"Non-retryable Monzo error: {e}")
            raise

async def safe_get_accounts():
    """Safely get accounts with error handling."""
    try:
        async with AsyncMonzoClient() as client:
            accounts = await robust_api_call(client.accounts.list())
            return accounts
    except MonzoError as e:
        logger.error(f"Failed to get accounts: {e}")
        return []

async def main():
    accounts = await safe_get_accounts()
    if accounts:
        print(f"Successfully retrieved {len(accounts)} accounts")
    else:
        print("Failed to retrieve accounts or no accounts found")

asyncio.run(main())

Performance Considerations

Connection Pooling

The AsyncMonzoClient automatically handles connection pooling with proper resource management:
import asyncio
import httpx
from monzoh import AsyncMonzoClient

async def create_optimized_client():
    """Create async client with optimized connection pooling."""
    
    # Custom HTTP client with connection pooling
    http_client = httpx.AsyncClient(
        limits=httpx.Limits(
            max_connections=20,
            max_keepalive_connections=10
        ),
        timeout=30.0
    )
    
    return AsyncMonzoClient(http_client=http_client)

async def batch_process_accounts():
    """Process accounts in batches for better performance."""
    async with create_optimized_client() as client:
        # Get accounts
        accounts = await client.accounts.list()
        
        # Process accounts in batches to avoid overwhelming the API
        batch_size = 5
        for i in range(0, len(accounts), batch_size):
            batch = accounts[i:i + batch_size]
            
            # Process batch concurrently
            tasks = [
                client.accounts.get_balance(account_id=account.id)
                for account in batch
            ]
            
            balances = await asyncio.gather(*tasks)
            
            for account, balance in zip(batch, balances):
                print(f"{account.description}: £{balance.balance / 100:.2f}")
                
            # Optional: small delay between batches to be API-friendly
            if i + batch_size < len(accounts):
                await asyncio.sleep(0.1)

asyncio.run(batch_process_accounts())

Best Practices

  1. Use AsyncMonzoClient for new projects - it provides native async/await support
  2. Use async context managers (async with) for proper resource cleanup
  3. Limit concurrency to avoid overwhelming the API (batch requests when needed)
  4. Handle errors gracefully with proper retry logic for transient failures
  5. Use connection pooling by passing a custom httpx.AsyncClient when needed
  6. Monitor performance and adjust concurrency levels based on your requirements

Migration from Sync to Async

To migrate from the synchronous client to the async client:
# Before (sync)
from monzoh import MonzoClient

client = MonzoClient()
accounts = client.accounts.list()
balance = client.accounts.get_balance(account_id=accounts[0].id)

# After (async)
from monzoh import AsyncMonzoClient

async with AsyncMonzoClient() as client:
    accounts = await client.accounts.list()
    balance = await client.accounts.get_balance(account_id=accounts[0].id)
The async client provides all the same functionality as the sync client with native async/await support, better resource management, and improved performance for concurrent operations.
I