Overview
TheWebhooksAPI
provides methods to register, list, and manage webhook endpoints that receive real-time notifications about account events. All webhook operations are accessed through the client.webhooks
property.
Methods
list()
List all registered webhooks for your account.Copy
def list(self, account_id: str) -> list[Webhook]:
"""List webhooks for a specific account."""
account_id
: The unique account identifier
list[Webhook]
- List of webhook objects
Example:
Copy
from monzoh import MonzoClient
client = MonzoClient()
# Get accounts
accounts = client.accounts.list()
account_id = accounts[0].id
# List webhooks
webhooks = client.webhooks.list(account_id=account_id)
print(f"Found {len(webhooks)} webhooks")
for webhook in webhooks:
print(f"๐ก {webhook.url}")
print(f" ID: {webhook.id}")
print(f" Status: {webhook.status}")
print(f" Created: {webhook.created}")
create()
Register a new webhook endpoint.Copy
def create(
self,
account_id: str,
url: str
) -> Webhook:
"""Register a new webhook."""
account_id
: The account to monitorurl
: The webhook endpoint URL (must be HTTPS)
Webhook
- The created webhook object
Example:
Copy
# Register a webhook
webhook = client.webhooks.create(
account_id=account_id,
url="https://your-app.com/webhooks/monzo"
)
print(f"โ
Webhook registered: {webhook.id}")
print(f"๐ก URL: {webhook.url}")
print(f"๐ Account: {webhook.account_id}")
delete()
Remove a registered webhook.Copy
def delete(self, webhook_id: str) -> dict:
"""Delete a webhook."""
webhook_id
: The unique webhook identifier
dict
- Deletion confirmation
Example:
Copy
# Delete a webhook
result = client.webhooks.delete(webhook_id="webhook_123")
print("โ
Webhook deleted")
Webhook Processing Utilities
Monzoh provides built-in utilities to parse and validate incoming webhook payloads with type safety.Core Functions
parse_webhook_payload()
Parse and validate any incoming webhook payload:Copy
from monzoh.webhooks import parse_webhook_payload, WebhookParseError
def parse_webhook_payload(
body: str | bytes,
) -> TransactionWebhookPayload | WebhookPayload:
"""Parse and validate incoming webhook payload."""
body
: Raw request body (string or bytes)
WebhookParseError
: If payload parsing or validation fails
parse_transaction_webhook()
Convenience function for transaction-specific webhooks:Copy
from monzoh.webhooks import parse_transaction_webhook
def parse_transaction_webhook(
body: str | bytes,
) -> Transaction:
"""Parse transaction.created webhook and return Transaction object directly."""
Webhook Event Handling
Copy
from fastapi import FastAPI, Request, HTTPException
from monzoh.webhooks import parse_webhook_payload, TransactionWebhookPayload
app = FastAPI(title="Monzo Webhook Handler")
@app.post("/webhooks/monzo")
async def handle_monzo_webhook(request: Request):
"""Handle incoming Monzo webhook events."""
# Get raw request body
body = await request.body()
try:
# Parse webhook payload
payload = parse_webhook_payload(body=body)
# Handle different event types with type safety
if isinstance(payload, TransactionWebhookPayload):
transaction = payload.data
print(f"New transaction: {transaction.description}")
print(f"Amount: ยฃ{transaction.amount/100:.2f}")
print(f"Category: {transaction.category}")
# Process the transaction
await handle_transaction_created(transaction)
else:
print(f"Received {payload.type} event")
await handle_other_event(payload)
except Exception as e:
print(f"Error processing webhook: {e}")
raise HTTPException(status_code=400, detail=str(e))
return {"status": "received"}
Setting up a Complete Webhook Server
Hereโs a complete example using the webhook utilities:Copy
from fastapi import FastAPI, Request, HTTPException
from monzoh.webhooks import (
parse_webhook_payload,
TransactionWebhookPayload,
WebhookParseError
)
from datetime import datetime
app = FastAPI(title="Monzo Webhook Handler")
@app.post("/webhooks/monzo")
async def handle_monzo_webhook(request: Request):
"""Handle incoming Monzo webhook events."""
# Get raw request body
body = await request.body()
try:
# Parse webhook payload
payload = parse_webhook_payload(body=body)
# Handle different event types
if isinstance(payload, TransactionWebhookPayload):
await handle_transaction_created(payload.data)
else:
await handle_other_event(payload)
except WebhookParseError as e:
raise HTTPException(status_code=400, detail=str(e))
return {"status": "received"}
async def handle_transaction_created(transaction_data):
"""Handle new transaction events."""
amount = transaction_data["amount"] / 100
description = transaction_data["description"]
account_id = transaction_data["account_id"]
print(f"๐ณ New Transaction Alert")
print(f" Description: {description}")
print(f" Amount: ยฃ{amount:.2f}")
print(f" Account: {account_id}")
print(f" Time: {datetime.now(tz=datetime.timezone.utc)}")
# Custom business logic
await process_transaction_event(transaction_data)
async def handle_balance_updated(balance_data):
"""Handle balance update events."""
balance = balance_data["balance"] / 100
account_id = balance_data["account_id"]
print(f"๐ฐ Balance Update Alert")
print(f" New Balance: ยฃ{balance:.2f}")
print(f" Account: {account_id}")
# Custom business logic
await process_balance_event(balance_data)
async def process_transaction_event(transaction_data):
"""Process transaction events with custom logic."""
amount = transaction_data["amount"] / 100
description = transaction_data["description"]
category = transaction_data.get("category", "unknown")
# Example: Large transaction alert
if abs(amount) > 100:
await send_large_transaction_alert(description, amount)
# Example: Coffee spending tracking
if "coffee" in description.lower():
await track_coffee_spending(amount)
# Example: Automatic categorization
if category == "unknown":
predicted_category = await predict_category(description)
if predicted_category:
print(f"๐ก Suggested category: {predicted_category}")
async def process_balance_event(balance_data):
"""Process balance events with custom logic."""
balance = balance_data["balance"] / 100
# Example: Low balance alert
if balance < 50:
await send_low_balance_alert(balance)
# Example: Savings opportunity
if balance > 1000:
await suggest_savings_transfer(balance)
# Helper functions for custom logic
async def send_large_transaction_alert(description, amount):
"""Send alert for large transactions."""
print(f"๐จ LARGE TRANSACTION ALERT")
print(f" {description}: ยฃ{amount:.2f}")
# Implement: Send email, SMS, push notification, etc.
async def track_coffee_spending(amount):
"""Track coffee spending."""
print(f"โ Coffee purchase: ยฃ{abs(amount):.2f}")
# Implement: Update coffee spending database, etc.
async def predict_category(description):
"""Predict transaction category."""
# Simple keyword matching (could use ML)
keywords = {
"groceries": ["tesco", "sainsbury", "asda", "grocery"],
"transport": ["uber", "bus", "train", "petrol"],
"eating_out": ["restaurant", "pizza", "mcdonald"]
}
description_lower = description.lower()
for category, words in keywords.items():
if any(word in description_lower for word in words):
return category
return None
async def send_low_balance_alert(balance):
"""Send low balance alert."""
print(f"โ ๏ธ LOW BALANCE ALERT: ยฃ{balance:.2f}")
# Implement: Send notification, trigger automatic top-up, etc.
async def suggest_savings_transfer(balance):
"""Suggest savings opportunity."""
surplus = balance - 500 # Keep ยฃ500 in current account
if surplus > 50: # Only suggest if surplus is significant
print(f"๐ก SAVINGS OPPORTUNITY: Consider saving ยฃ{surplus:.2f}")
# Implement: Send suggestion, automatic transfer, etc.
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
Webhook Registration Helper
Create a helper script to register webhooks:Copy
from monzoh import MonzoClient, MonzoError
import os
class WebhookManager:
def __init__(self):
self.client = MonzoClient()
def setup_webhooks(self, webhook_url):
"""Set up webhooks for all accounts."""
try:
accounts = self.client.accounts.list()
for account in accounts:
if account.closed:
continue
print(f"Setting up webhook for {account.description}")
# Check existing webhooks
existing_webhooks = self.client.webhooks.list(
account_id=account.id
)
# Remove existing webhooks to avoid duplicates
for webhook in existing_webhooks:
if webhook.url == webhook_url:
print(f" Removing existing webhook: {webhook.id}")
self.client.webhooks.delete(webhook_id=webhook.id)
# Create new webhook
webhook = self.client.webhooks.create(
account_id=account.id,
url=webhook_url
)
print(f" โ
Webhook created: {webhook.id}")
except MonzoError as e:
print(f"โ Error setting up webhooks: {e}")
def list_all_webhooks(self):
"""List webhooks for all accounts."""
accounts = self.client.accounts.list()
print("๐ก Registered Webhooks")
print("=" * 40)
total_webhooks = 0
for account in accounts:
if account.closed:
continue
webhooks = self.client.webhooks.list(account_id=account.id)
print(f"\n๐ฆ {account.description}")
print(f" Account ID: {account.id}")
if webhooks:
for webhook in webhooks:
print(f" ๐ก {webhook.url}")
print(f" ID: {webhook.id}")
print(f" Status: {webhook.status}")
print(f" Created: {webhook.created}")
total_webhooks += 1
else:
print(" No webhooks registered")
print(f"\n๐ Total webhooks: {total_webhooks}")
def cleanup_webhooks(self):
"""Remove all registered webhooks."""
accounts = self.client.accounts.list()
removed_count = 0
for account in accounts:
if account.closed:
continue
webhooks = self.client.webhooks.list(account_id=account.id)
for webhook in webhooks:
try:
self.client.webhooks.delete(webhook_id=webhook.id)
print(f"โ
Removed webhook: {webhook.url}")
removed_count += 1
except MonzoError as e:
print(f"โ Failed to remove webhook {webhook.id}: {e}")
print(f"\n๐ Removed {removed_count} webhooks")
def main():
manager = WebhookManager()
# Your webhook URL (must be HTTPS and publicly accessible)
webhook_url = "https://your-app.com/webhooks/monzo"
print("๐ง Monzo Webhook Management")
print("=" * 30)
# List current webhooks
manager.list_all_webhooks()
# Setup new webhooks
print(f"\n๐ Setting up webhook: {webhook_url}")
manager.setup_webhooks(webhook_url)
# List updated webhooks
print(f"\n๐ Updated webhook list:")
manager.list_all_webhooks()
if __name__ == "__main__":
main()
Testing Webhooks
Test your webhook endpoint before registering:Copy
import requests
import json
from datetime import datetime
def test_webhook_endpoint(webhook_url):
"""Test webhook endpoint with sample data."""
# Sample transaction.created event
sample_transaction_event = {
"type": "transaction.created",
"data": {
"id": "tx_test123",
"account_id": "acc_test123",
"amount": -450, # ยฃ4.50 spending
"currency": "GBP",
"description": "Test Coffee Shop",
"category": "eating_out",
"created": datetime.now(tz=datetime.timezone.utc).isoformat() + "Z",
"merchant": {
"name": "Test Coffee Shop",
"category": "coffee"
}
}
}
# Sample balance.updated event
sample_balance_event = {
"type": "account_balance.updated",
"data": {
"account_id": "acc_test123",
"balance": 12450, # ยฃ124.50
"currency": "GBP"
}
}
# Test both events
events = [
("Transaction Created", sample_transaction_event),
("Balance Updated", sample_balance_event)
]
for event_name, event_data in events:
print(f"๐งช Testing {event_name} event")
try:
response = requests.post(
webhook_url,
json=event_data,
headers={"Content-Type": "application/json"},
timeout=10
)
if response.status_code == 200:
print(f" โ
Success: {response.status_code}")
else:
print(f" โ Failed: {response.status_code}")
print(f" Response: {response.text}")
except requests.exceptions.RequestException as e:
print(f" โ Connection error: {e}")
print()
# Test your webhook
if __name__ == "__main__":
test_webhook_endpoint("http://localhost:8000/webhooks/monzo")
Webhook Security
Rate Limiting
Implement rate limiting to prevent abuse:Copy
from collections import defaultdict
from datetime import datetime, timedelta
class RateLimiter:
def __init__(self, max_requests=100, window_minutes=10):
self.max_requests = max_requests
self.window = timedelta(minutes=window_minutes)
self.requests = defaultdict(list)
def is_allowed(self, client_ip):
now = datetime.now(tz=datetime.timezone.utc)
# Clean old requests
self.requests[client_ip] = [
req_time for req_time in self.requests[client_ip]
if now - req_time < self.window
]
# Check if under limit
if len(self.requests[client_ip]) >= self.max_requests:
return False
# Record this request
self.requests[client_ip].append(now)
return True
rate_limiter = RateLimiter()
@app.post("/webhooks/monzo")
async def handle_webhook(request: Request):
client_ip = request.client.host
if not rate_limiter.is_allowed(client_ip):
raise HTTPException(status_code=429, detail="Rate limit exceeded")
# Process webhook...
Best Practices
- Use HTTPS: Webhook URLs must use HTTPS in production
- Handle idempotency: Process duplicate events gracefully
- Respond quickly: Return 200 status code within 10 seconds
- Log events: Maintain logs for debugging and monitoring
- Implement retries: Handle temporary failures with retry logic
- Rate limiting: Protect your endpoint from excessive requests