Performance Guide¶
This guide covers performance characteristics, caching behavior, and optimization strategies for py-identity-model.
Overview¶
py-identity-model is designed for high performance with built-in caching and support for both synchronous and asynchronous I/O. Understanding these performance characteristics will help you optimize your application.
Caching¶
Discovery Document Caching¶
Discovery documents are cached automatically to avoid repeated HTTP requests:
Sync Implementation:
from functools import lru_cache
@lru_cache(maxsize=128)
def _get_disco_response(disco_doc_address: str) -> DiscoveryDocumentResponse:
"""Cached discovery document fetching"""
request = DiscoveryDocumentRequest(address=disco_doc_address)
return get_discovery_document(request)
Async Implementation:
from async_lru import alru_cache
@alru_cache(maxsize=128)
async def _get_disco_response(disco_doc_address: str) -> DiscoveryDocumentResponse:
"""Cached async discovery document fetching"""
request = DiscoveryDocumentRequest(address=disco_doc_address)
return await get_discovery_document(request)
JWKS Caching¶
JSON Web Key Sets are also cached:
- Cache Size: 128 entries (both sync and async)
- Cache Key: JWKS URL (from discovery document)
- Cache Duration: Lifetime of the process
- Thread Safety: Both implementations are thread-safe
Cache Behavior Differences¶
| Aspect | Synchronous | Asynchronous |
|---|---|---|
| Library | functools.lru_cache |
async_lru.alru_cache |
| Max Entries | 128 | 128 |
| Thread Safety | ✅ Yes | ✅ Yes |
| Cache Sharing | Sync calls only | Async calls only |
| Cache Clearing | function.cache_clear() |
function.cache_clear() |
| Performance | Fast | Fast |
Important: Sync and async caches are separate. They do not share cached data.
Example: Cache Separation¶
from py_identity_model import get_discovery_document as sync_disco
from py_identity_model.aio import get_discovery_document as async_disco
from py_identity_model import DiscoveryDocumentRequest
# First call - fetches from network (sync cache)
disco = sync_disco(DiscoveryDocumentRequest(address="https://..."))
# Second call - uses sync cache (fast)
disco = sync_disco(DiscoveryDocumentRequest(address="https://..."))
# Async call - fetches from network again (separate cache!)
disco = await async_disco(DiscoveryDocumentRequest(address="https://..."))
# Second async call - uses async cache (fast)
disco = await async_disco(DiscoveryDocumentRequest(address="https://..."))
Clearing Caches¶
You can manually clear caches if needed:
from py_identity_model.sync.token_validation import _get_disco_response, _get_jwks_response
# Clear discovery document cache
_get_disco_response.cache_clear()
# Clear JWKS cache
_get_jwks_response.cache_clear()
For async:
from py_identity_model.aio.token_validation import _get_disco_response, _get_jwks_response
# Clear async discovery document cache
_get_disco_response.cache_clear()
# Clear async JWKS cache
_get_jwks_response.cache_clear()
Cache Statistics¶
You can inspect cache performance:
from py_identity_model.sync.token_validation import _get_disco_response
# Get cache statistics
info = _get_disco_response.cache_info()
print(f"Hits: {info.hits}")
print(f"Misses: {info.misses}")
print(f"Size: {info.currsize}")
print(f"Max Size: {info.maxsize}")
print(f"Hit Rate: {info.hits / (info.hits + info.misses) * 100:.2f}%")
Performance Benchmarks¶
Token Validation Performance¶
Typical token validation times (with caching):
| Operation | First Call | Cached Call | Notes |
|---|---|---|---|
| Discovery Document | ~500-1000ms | <1ms | Network latency dependent |
| JWKS Fetch | ~500-1000ms | <1ms | Network latency dependent |
| JWT Decode & Verify | ~1-5ms | ~1-5ms | No caching (always validates) |
| Total (First) | ~1-2s | - | First request is slow |
| Total (Cached) | - | ~1-5ms | Subsequent requests are fast |
Key Insight: The first token validation is slow due to network requests. Subsequent validations are very fast due to caching.
Sync vs Async Performance¶
Single Operation¶
For a single token validation, sync and async have similar performance:
import time
# Sync (second request, cached)
start = time.time()
claims = validate_token(token, config, disco_address)
print(f"Sync: {(time.time() - start) * 1000:.2f}ms")
# Output: Sync: 1.5ms
# Async (second request, cached)
start = time.time()
claims = await validate_token(token, config, disco_address)
print(f"Async: {(time.time() - start) * 1000:.2f}ms")
# Output: Async: 1.8ms
Conclusion: For single operations, sync and async have nearly identical performance.
Concurrent Operations¶
For multiple concurrent operations, async can be significantly faster:
import asyncio
import time
tokens = [token1, token2, token3, token4, token5]
# Sequential sync validation
start = time.time()
for token in tokens:
claims = validate_token(token, config, disco_address)
elapsed_sync = time.time() - start
print(f"Sync Sequential: {elapsed_sync * 1000:.0f}ms")
# Output: Sync Sequential: 7.5ms (5 tokens × 1.5ms each)
# Concurrent async validation
start = time.time()
results = await asyncio.gather(
*[validate_token(token, config, disco_address) for token in tokens]
)
elapsed_async = time.time() - start
print(f"Async Concurrent: {elapsed_async * 1000:.0f}ms")
# Output: Async Concurrent: 2.0ms (overhead + max(all validations))
print(f"Speedup: {elapsed_sync / elapsed_async:.1f}x")
# Output: Speedup: 3.7x
Conclusion: Async provides significant performance benefits when processing multiple operations concurrently.
Optimization Strategies¶
1. Minimize Discovery Calls¶
Discovery documents rarely change. Call once and cache:
Bad:
def validate_every_request(token: str):
# This fetches discovery doc every time!
disco_request = DiscoveryDocumentRequest(address=DISCO_ADDRESS)
disco_response = get_discovery_document(disco_request)
# Then validates token...
Good:
# Discovery is cached automatically when using perform_disco=True
def validate_every_request(token: str):
config = TokenValidationConfig(
perform_disco=True, # Uses cached discovery
audience="api",
)
claims = validate_token(token, config, DISCO_ADDRESS)
return claims
Best:
# For ultimate performance, use validate_token with perform_disco=True
# This handles all caching automatically
config = TokenValidationConfig(perform_disco=True, audience="api")
def validate_request(token: str):
return validate_token(token, config, DISCO_ADDRESS)
2. Use Async for Concurrent Operations¶
If you need to validate multiple tokens or make multiple requests:
Slow (Sequential):
results = []
for token in tokens:
claims = validate_token(token, config, disco_address)
results.append(claims)
Fast (Concurrent):
results = await asyncio.gather(
*[validate_token(token, config, disco_address) for token in tokens],
return_exceptions=True # Don't fail entire batch on single error
)
3. HTTP Client Management & Connection Pooling¶
The library uses different HTTP client strategies for sync and async to optimize performance and thread safety.
Synchronous API: Thread-Local Clients¶
Each thread gets its own HTTP client using threading.local():
# Thread-local storage for sync HTTP client
_thread_local = threading.local()
def get_http_client() -> httpx.Client:
"""Get or create HTTP client for current thread."""
if not hasattr(_thread_local, "client") or _thread_local.client is None:
_thread_local.client = httpx.Client(
verify=get_ssl_verify(),
timeout=timeout,
follow_redirects=True,
)
return _thread_local.client
Benefits: - ✅ No global state: Eliminates race conditions - ✅ Thread isolation: Each thread has its own connection pool - ✅ No locks needed: Thread-local access is lock-free - ✅ Automatic cleanup: Each thread manages its own client lifecycle
Connection Pool per Thread: - Max Connections: 100 (httpx default) - Max Keepalive: 20 connections (httpx default) - Timeout: 30 seconds (py-identity-model default)
Memory Trade-off: - 10 threads = 10 clients (one per thread) - Each client has its own connection pool - Acceptable trade-off for thread safety
Asynchronous API: Singleton Client¶
All async operations share a single HTTP client per process:
_async_http_client: httpx.AsyncClient | None = None
_async_client_lock = threading.Lock()
async def get_async_http_client() -> httpx.AsyncClient:
"""Get or create the singleton async HTTP client."""
global _async_http_client
if _async_http_client is None:
with _async_client_lock: # Thread-safe initialization
if _async_http_client is None:
_async_http_client = httpx.AsyncClient(...)
return _async_http_client
Benefits: - ✅ Shared connection pool: All async operations share connections - ✅ Memory efficient: Single client for all async operations - ✅ No I/O locks: Lock only used during initialization - ✅ Optimal for async: Matches async/await concurrency model
Shared Connection Pool: - Max Connections: 100 (shared across all async operations) - Max Keepalive: 20 connections (shared) - Timeout: 30 seconds
Performance Comparison¶
| Aspect | Sync (Thread-Local) | Async (Singleton) |
|---|---|---|
| Clients Created | One per thread | One per process |
| Connection Pool | Per-thread | Shared process-wide |
| Memory Usage | Higher (multiple clients) | Lower (single client) |
| Lock Contention | None (thread-local) | None (during I/O) |
| Best For | Multi-threaded apps | Async/await apps |
Advanced: Custom Connection Limits¶
For high-throughput applications, you may want to customize connection limits.
Note: The library uses internal client creation, so customizing limits requires forking or using environment variables for timeout configuration.
Workaround for Custom Limits:
# Option 1: Use HTTP_TIMEOUT environment variable
import os
os.environ['HTTP_TIMEOUT'] = '60.0' # Increase timeout to 60 seconds
# Option 2: Create your own client wrapper (advanced)
import httpx
from py_identity_model.core.discovery_logic import process_discovery_response
async def custom_discovery_fetch(url: str):
"""Custom discovery fetch with tuned connection pool."""
limits = httpx.Limits(
max_connections=200,
max_keepalive_connections=50,
)
async with httpx.AsyncClient(limits=limits, timeout=60.0) as client:
response = await client.get(url)
return process_discovery_response(response)
4. Batch Token Validations¶
For batch processing, use async with controlled concurrency:
import asyncio
from itertools import islice
async def validate_tokens_batched(tokens: list[str], batch_size: int = 50):
"""Validate tokens in batches to avoid overwhelming the system."""
results = []
# Process in batches
for i in range(0, len(tokens), batch_size):
batch = tokens[i:i + batch_size]
batch_results = await asyncio.gather(
*[validate_token(token, config, disco_address) for token in batch],
return_exceptions=True
)
results.extend(batch_results)
# Optional: add delay between batches
if i + batch_size < len(tokens):
await asyncio.sleep(0.1)
return results
5. Warm Up the Cache¶
For production applications, warm up caches on startup:
async def warmup_cache():
"""Warm up discovery document and JWKS caches on startup."""
from py_identity_model.aio import get_discovery_document
from py_identity_model import DiscoveryDocumentRequest
disco_response = await get_discovery_document(
DiscoveryDocumentRequest(address=DISCO_ADDRESS)
)
if disco_response.is_successful:
# JWKS will be cached on first token validation
print("✓ Cache warmed up")
else:
print("✗ Cache warmup failed")
# In FastAPI
@app.on_event("startup")
async def startup_event():
await warmup_cache()
Production Recommendations¶
FastAPI / Async Frameworks¶
from fastapi import FastAPI, Depends, HTTPException
from py_identity_model import TokenValidationConfig
from py_identity_model.aio import validate_token
app = FastAPI()
# Create config once
TOKEN_CONFIG = TokenValidationConfig(
perform_disco=True,
audience="api",
options={
"verify_signature": True,
"verify_aud": True,
"verify_exp": True,
"verify_iss": True,
},
)
# Warm up cache on startup
@app.on_event("startup")
async def startup():
from py_identity_model.aio import get_discovery_document
from py_identity_model import DiscoveryDocumentRequest
disco = await get_discovery_document(
DiscoveryDocumentRequest(address=DISCO_ADDRESS)
)
if disco.is_successful:
print("✓ Discovery cache warmed")
# Use in dependency
async def verify_token(token: str = Depends(oauth2_scheme)):
try:
claims = await validate_token(
jwt=token,
token_validation_config=TOKEN_CONFIG,
disco_doc_address=DISCO_ADDRESS,
)
return claims
except Exception as e:
raise HTTPException(status_code=401, detail=str(e))
@app.get("/api/data")
async def protected_route(claims: dict = Depends(verify_token)):
return {"data": "protected", "user": claims["sub"]}
Flask / Sync Frameworks¶
from flask import Flask, request, jsonify
from py_identity_model import TokenValidationConfig, validate_token
app = Flask(__name__)
# Create config once
TOKEN_CONFIG = TokenValidationConfig(
perform_disco=True,
audience="api",
)
# Warm up cache on first request
@app.before_first_request
def warmup():
from py_identity_model import get_discovery_document, DiscoveryDocumentRequest
disco = get_discovery_document(
DiscoveryDocumentRequest(address=DISCO_ADDRESS)
)
if disco.is_successful:
print("✓ Discovery cache warmed")
def verify_token():
auth_header = request.headers.get("Authorization", "")
token = auth_header.replace("Bearer ", "")
try:
claims = validate_token(
jwt=token,
token_validation_config=TOKEN_CONFIG,
disco_doc_address=DISCO_ADDRESS,
)
return claims
except Exception as e:
return None
@app.route("/api/data")
def protected_route():
claims = verify_token()
if not claims:
return jsonify({"error": "Unauthorized"}), 401
return jsonify({"data": "protected", "user": claims["sub"]})
Monitoring and Metrics¶
Cache Hit Rate¶
Monitor cache performance in production:
import logging
from py_identity_model.sync.token_validation import _get_disco_response
logger = logging.getLogger(__name__)
def log_cache_stats():
"""Log cache statistics periodically."""
disco_info = _get_disco_response.cache_info()
total = disco_info.hits + disco_info.misses
hit_rate = (disco_info.hits / total * 100) if total > 0 else 0
logger.info(
f"Discovery cache: {disco_info.hits} hits, "
f"{disco_info.misses} misses, "
f"{hit_rate:.1f}% hit rate, "
f"{disco_info.currsize}/{disco_info.maxsize} entries"
)
# Log every 1000 requests or periodically
Performance Metrics¶
Track key metrics:
- Token validation time (p50, p95, p99)
- Discovery cache hit rate
- JWKS cache hit rate
- Validation failures (expired, invalid signature, etc.)
- Network errors (discovery/JWKS fetch failures)
Example with Prometheus:
from prometheus_client import Counter, Histogram
import time
validation_duration = Histogram(
'token_validation_duration_seconds',
'Token validation duration'
)
validation_total = Counter(
'token_validation_total',
'Total token validations',
['result'] # success, expired, invalid, etc.
)
async def validate_with_metrics(token: str):
start = time.time()
try:
claims = await validate_token(token, config, disco_address)
validation_total.labels(result='success').inc()
return claims
except TokenExpiredException:
validation_total.labels(result='expired').inc()
raise
except Exception:
validation_total.labels(result='error').inc()
raise
finally:
validation_duration.observe(time.time() - start)
Summary¶
Best Practices:
- ✅ Use
perform_disco=Trueto enable automatic caching - ✅ Use async API for FastAPI and high-concurrency applications
- ✅ Warm up caches on application startup
- ✅ Monitor cache hit rates in production
- ✅ Use connection pooling (automatic with httpx)
- ✅ Batch concurrent operations with controlled concurrency
- ✅ Track validation metrics (duration, failures, etc.)
Performance Expectations:
- First validation: ~1-2 seconds (network requests)
- Cached validations: ~1-5ms (very fast)
- Async concurrency: Up to 5-10x faster for batch operations
- Cache hit rate: Should be >95% in steady state
When to Use Async:
- FastAPI, Starlette, or other async frameworks
- High concurrency requirements (100+ req/s)
- Batch token processing
- Already using asyncio
When Sync Is Fine:
- Flask, Django, or other sync frameworks
- Low concurrency (<100 req/s)
- Simple CLI tools or scripts
- Single token validations