Detect recurring transactions (subscriptions, bills) using pattern-based time-series analysis.
Overview
The recurring transaction detection system automatically identifies:
- Fixed subscriptions: Netflix ($15.99/month), Spotify ($9.99/month), gym memberships
- Variable bills: Utilities ($45-$65/month), phone bills with overage charges
- Irregular/annual: Insurance premiums, annual subscriptions, quarterly memberships
Uses a 3-layer hybrid detection algorithm:
- Fixed amount (85% coverage): Subscriptions with consistent amounts (±2% or ±$0.50 tolerance)
- Variable amount (10% coverage): Bills with regular patterns but fluctuating amounts (10-30% variance)
- Irregular (5% coverage): Quarterly/annual patterns (min 2 occurrences)
Quick Start
Basic Detection
from fin_infra.recurring import easy_recurring_detection
# Create detector with sensible defaults
detector = easy_recurring_detection()
# Detect patterns in transaction history
transactions = [
{"id": "1", "merchant": "Netflix", "amount": 15.99, "date": "2025-01-15"},
{"id": "2", "merchant": "Netflix", "amount": 15.99, "date": "2025-02-15"},
{"id": "3", "merchant": "Netflix", "amount": 15.99, "date": "2025-03-15"},
]
patterns = detector.detect_patterns(transactions)
for pattern in patterns:
print(f"{pattern.merchant_name}: ${pattern.amount}/month")
print(f" Confidence: {pattern.confidence:.0%}")
print(f" Next charge: {pattern.next_expected_date}")FastAPI Integration
from fastapi import FastAPI
from fin_infra.recurring import add_recurring_detection
app = FastAPI()
# One-line integration
detector = add_recurring_detection(app)
# Endpoints mounted at:
# - POST /recurring/detect
# - GET /recurring/subscriptions
# - GET /recurring/predictions
# - GET /recurring/statsCustom Configuration
# Strict detection (fewer false positives)
detector = easy_recurring_detection(
min_occurrences=4, # Require 4+ transactions
amount_tolerance=0.01, # ±1% amount variance
date_tolerance_days=3 # ±3 days date variance
)
# Lenient detection (catch more patterns)
detector = easy_recurring_detection(
min_occurrences=2, # Minimum for annual subscriptions
amount_tolerance=0.05, # ±5% amount variance
date_tolerance_days=10 # ±10 days date variance
)
# Annual-only detection
detector = easy_recurring_detection(
min_occurrences=2, # Annual needs minimum 2 years
date_tolerance_days=14 # ±2 weeks for annual patterns
)Architecture
3-Layer Detection Algorithm
The detector uses a sequential 3-layer approach to maximize coverage:
Layer 1: Fixed Amount Detection (85% coverage)
Detects subscriptions with consistent amounts:
Criteria:
- Amount variance within ±2% OR ±$0.50 (whichever is larger)
- Regular cadence: biweekly (13-15 days), monthly (28-32 days)
- Minimum 3 occurrences (configurable)
- Date consistency within ±7 days (configurable)
Examples:
- Netflix: $15.99/month → Fixed pattern, 0.95 confidence
- Spotify: $9.99/month → Fixed pattern, 0.92 confidence
- Gym: $45.00/month → Fixed pattern, 0.90 confidence
Output:
RecurringPattern(
merchant_name="Netflix",
normalized_merchant="netflix",
pattern_type=PatternType.FIXED,
cadence=CadenceType.MONTHLY,
amount=15.99,
occurrence_count=6,
confidence=0.95,
next_expected_date=datetime(2025, 7, 15),
reasoning="Detected 6 fixed charges of $15.99 with monthly cadence (30.0 days avg, 0.5 days std dev)"
)Layer 2: Variable Amount Detection (10% coverage)
Detects bills with regular patterns but fluctuating amounts:
Criteria:
- Amount variance between 10-30% (mean ± 2 std dev)
- Regular cadence (monthly, biweekly)
- Minimum 3 occurrences
- Not fixed (failed Layer 1 check)
Examples:
- Electric bill: $45-$70/month → Variable pattern, 0.75 confidence
- Phone bill: $60-$85/month (with overages) → Variable pattern, 0.72 confidence
- Water bill: $25-$40/month → Variable pattern, 0.70 confidence
Output:
RecurringPattern(
merchant_name="PG&E Utilities",
normalized_merchant="pge",
pattern_type=PatternType.VARIABLE,
cadence=CadenceType.MONTHLY,
amount=None,
amount_range=(45.00, 70.00), # mean ± 2*std_dev
amount_variance_pct=0.18, # 18% variance
occurrence_count=6,
confidence=0.75,
reasoning="Detected 6 variable charges ranging $45-$70 with monthly cadence"
)Layer 3: Irregular Detection (5% coverage)
Detects quarterly and annual patterns:
Criteria:
- Quarterly cadence: 85-95 days
- Annual cadence: 360-370 days
- Amount variance ±5% or ±$1.00
- Minimum 2 occurrences (annual needs 2 years data)
Examples:
- Amazon Prime: $139.00/year → Irregular/Annual, 0.68 confidence
- Car insurance: $450.00/quarter → Irregular/Quarterly, 0.65 confidence
- Professional membership: $299.00/year → Irregular/Annual, 0.62 confidence
Output:
RecurringPattern(
merchant_name="Amazon Prime Annual",
normalized_merchant="amazon",
pattern_type=PatternType.IRREGULAR,
cadence=CadenceType.ANNUAL,
amount=139.00,
occurrence_count=3,
confidence=0.68,
next_expected_date=datetime(2026, 11, 15),
reasoning="Detected 3 annual charges of $139.00 (365 days avg)"
)Cadence Detection
Uses median day difference algorithm with tolerance windows:
| Cadence | Day Range | Typical Use Case |
|---|---|---|
| Biweekly | 13-15 days | Paycheck, rare subscriptions |
| Monthly | 28-32 days | Most subscriptions, utilities |
| Quarterly | 85-95 days | Quarterly subscriptions |
| Annual | 360-370 days | Annual memberships, insurance |
Algorithm:
def detect_cadence(transactions):
# Calculate days between consecutive transactions
day_diffs = []
for i in range(len(transactions) - 1):
days = (transactions[i+1].date - transactions[i].date).days
day_diffs.append(days)
# Use median (robust to outliers)
median_days = median(day_diffs)
std_dev = stdev(day_diffs)
# Match to cadence type
if 13 <= median_days <= 15:
return CadenceType.BIWEEKLY, std_dev
elif 28 <= median_days <= 32:
return CadenceType.MONTHLY, std_dev
elif 85 <= median_days <= 95:
return CadenceType.QUARTERLY, std_dev
elif 360 <= median_days <= 370:
return CadenceType.ANNUAL, std_dev
return None, 0.0Merchant Normalization
Groups merchant name variants using 5-step pipeline + fuzzy matching:
Normalization Pipeline
- Lowercase:
"NETFLIX.COM"→"netflix.com" - Remove domain suffixes:
"netflix.com"→"netflix" - Remove special chars:
"netflix*subscription"→"netflix subscription" - Remove store numbers:
"starbucks #12345"→"starbucks" - Remove legal entities:
"netflix inc"→"netflix" - Normalize whitespace:
" netflix "→"netflix"
Fuzzy Matching
Uses RapidFuzz with 80% similarity threshold:
from fin_infra.recurring import FuzzyMatcher
matcher = FuzzyMatcher(similarity_threshold=80)
# Check if two merchants are the same
is_same = matcher.is_same_merchant("NETFLIX.COM", "Netflix Inc")
# → True (after normalization + fuzzy match)
# Find similar merchants
similar = matcher.find_similar("netflix", ["netflix", "hulu", "spotify"])
# → [("netflix", 100.0)]
# Group variants
groups = matcher.group_merchants([
"NETFLIX.COM",
"Netflix Inc",
"NFLX*SUBSCRIPTION"
])
# → {"NETFLIX.COM": ["NETFLIX.COM", "Netflix Inc", "NFLX*SUBSCRIPTION"]}Pre-defined Merchant Groups
Common subscriptions with known variants:
KNOWN_MERCHANT_GROUPS = {
"netflix": ["netflix", "nflx", "nflx subscription", ...],
"spotify": ["spotify", "spotify usa", "spotify premium", ...],
"amazon": ["amazon", "amazon prime", "amzn mktp us", ...],
"starbucks": ["starbucks", "starbucks coffee", "sbux"],
"apple": ["apple", "apple bill", "apple itunes", ...],
"google": ["google", "google youtube", "google storage", ...],
"hulu": ["hulu", "hulu subscription", "hulu plus"],
"disney": ["disney", "disneyplus", "disney plus"],
"hbo": ["hbo", "hbo max", "hbomax", ...],
}Confidence Scoring
Multi-factor confidence calculation:
def calculate_confidence(pattern):
# Base confidence by pattern type
base_confidence = {
PatternType.FIXED: 0.90,
PatternType.VARIABLE: 0.70,
PatternType.IRREGULAR: 0.60,
}[pattern.pattern_type]
# Bonus for more occurrences (+0.05 each, max +0.15)
occurrence_bonus = min((pattern.occurrence_count - 3) * 0.05, 0.15)
# Bonus for date consistency (+0.05 if std_dev < 2 days)
date_consistency_bonus = 0.05 if pattern.date_std_dev < 2.0 else 0.0
# Bonus for amount consistency (+0.05 if variance < 1%)
amount_bonus = 0.05 if pattern.amount_variance_pct < 0.01 else 0.0
# Penalty for high amount variance (-0.10 if > 10%)
variance_penalty = -0.10 if pattern.amount_variance_pct > 0.10 else 0.0
# Penalty for generic merchant (-0.05)
generic_penalty = -0.05 if is_generic_merchant(pattern.merchant_name) else 0.0
confidence = (
base_confidence
+ occurrence_bonus
+ date_consistency_bonus
+ amount_bonus
+ variance_penalty
+ generic_penalty
)
return max(0.0, min(1.0, confidence)) # Clamp to [0, 1]False Positive Filtering
Filters out low-quality patterns:
Rejection criteria:
- Occurrence count < min_occurrences (default: 3)
- Amount variance > 30% (too inconsistent)
- Generic merchant names: "ATM", "Payment", "Purchase", "Transfer", etc.
- Date variance too high (std_dev > date_tolerance_days * 2)
- Confidence < 0.50 (after multi-factor scoring)
Target metrics:
- False positive rate < 5%
- Precision: 95%+
- Recall: 85%+
API Reference
FastAPI Integration
Setup
from fastapi import FastAPI
from fin_infra.recurring import add_recurring_detection
app = FastAPI()
# Add recurring detection with defaults
detector = add_recurring_detection(app)
# Custom configuration
detector = add_recurring_detection(
app,
prefix="/recurring", # API prefix
min_occurrences=3, # Detection threshold
amount_tolerance=0.02, # ±2% variance
date_tolerance_days=7, # ±7 days variance
include_in_schema=True # Show in OpenAPI docs
)
# Detector stored on app.state
assert app.state.recurring_detector == detectorEndpoints
POST /recurring/detect
Detect recurring patterns in transaction list.
Request:
DetectionRequest(
days=90, # Look back N days (30-730)
min_confidence=0.70, # Filter by confidence (0.0-1.0)
include_predictions=True # Include future bill predictions
)Response:
DetectionResponse(
patterns=[
RecurringPattern(
merchant_name="Netflix",
pattern_type=PatternType.FIXED,
cadence=CadenceType.MONTHLY,
amount=15.99,
confidence=0.95,
...
),
...
],
count=5,
predictions=[
BillPrediction(
merchant_name="Netflix",
expected_date=datetime(2025, 7, 15),
expected_amount=15.99,
confidence=0.95,
cadence=CadenceType.MONTHLY
),
...
],
processing_time_ms=45
)Example:
curl -X POST http://localhost:8000/recurring/detect \
-H "Content-Type: application/json" \
-d '{
"days": 90,
"min_confidence": 0.70,
"include_predictions": true
}'GET /recurring/subscriptions
List detected subscriptions (cached results).
Query Parameters:
min_confidence(float, optional): Filter by confidence threshold (0.0-1.0)days(int, optional): Historical window (30-730 days)
Response: list[RecurringPattern]
Example:
curl http://localhost:8000/recurring/subscriptions?min_confidence=0.80&days=180GET /recurring/predictions
Predict upcoming bills.
Query Parameters:
days_ahead(int, optional): Prediction window (1-90 days, default: 30)min_confidence(float, optional): Filter by confidence (default: 0.70)
Response: list[BillPrediction] (sorted by expected_date)
Example:
curl http://localhost:8000/recurring/predictions?days_ahead=60&min_confidence=0.75GET /recurring/stats
Aggregate subscription statistics.
Response:
SubscriptionStats(
total_subscriptions=12,
monthly_total=247.89,
by_pattern_type={
"fixed": 10,
"variable": 2,
"irregular": 0
},
by_cadence={
"monthly": 10,
"biweekly": 1,
"quarterly": 0,
"annual": 1
},
top_merchants=[
{"merchant": "netflix", "amount": 15.99},
{"merchant": "spotify", "amount": 9.99},
{"merchant": "amazon prime", "amount": 14.99}
],
confidence_distribution={
"high (0.90+)": 8,
"medium (0.70-0.90)": 3,
"low (<0.70)": 1
}
)Example:
curl http://localhost:8000/recurring/statsPython API
easy_recurring_detection()
One-line builder for recurring detection.
Signature:
def easy_recurring_detection(
min_occurrences: int = 3,
amount_tolerance: float = 0.02,
date_tolerance_days: int = 7,
**config
) -> RecurringDetector:
"""
Create configured recurring transaction detector.
Args:
min_occurrences: Minimum transactions to detect pattern (≥2, default: 3)
amount_tolerance: Amount variance threshold (0.0-1.0, default: 0.02 = ±2%)
date_tolerance_days: Date variance threshold (≥0, default: 7 days)
**config: Reserved for future extensions (V2: enable_ml, llm_provider)
Returns:
RecurringDetector with configured PatternDetector
Raises:
ValueError: If parameters out of valid range
"""Examples:
# Default (balanced detection)
detector = easy_recurring_detection()
# min_occurrences=3, amount_tolerance=0.02 (±2%), date_tolerance_days=7
# Strict (fewer false positives)
detector = easy_recurring_detection(
min_occurrences=4,
amount_tolerance=0.01, # ±1% variance
date_tolerance_days=3 # ±3 days variance
)
# Lenient (catch more patterns)
detector = easy_recurring_detection(
min_occurrences=2,
amount_tolerance=0.05, # ±5% variance
date_tolerance_days=10 # ±10 days variance
)
# Annual-only (for yearly subscriptions)
detector = easy_recurring_detection(
min_occurrences=2, # Need 2 years minimum
date_tolerance_days=14 # ±2 weeks for annual
)RecurringDetector
Main detection engine.
Methods:
class RecurringDetector:
def detect_patterns(
self,
transactions: list[dict]
) -> list[RecurringPattern]:
"""
Detect recurring patterns in transaction history.
Args:
transactions: List of dicts with keys: id, merchant, amount, date
Returns:
List of detected patterns (sorted by confidence descending)
"""
def get_stats(self) -> dict:
"""
Get detection statistics.
Returns:
Dict with: total_detected, fixed_patterns, variable_patterns,
irregular_patterns, false_positives_filtered
"""Example:
detector = easy_recurring_detection()
transactions = [
{"id": "1", "merchant": "Netflix", "amount": 15.99, "date": "2025-01-15"},
{"id": "2", "merchant": "Netflix", "amount": 15.99, "date": "2025-02-15"},
{"id": "3", "merchant": "Netflix", "amount": 15.99, "date": "2025-03-15"},
]
patterns = detector.detect_patterns(transactions)
for pattern in patterns:
print(f"{pattern.merchant_name} ({pattern.pattern_type.value}):")
print(f" Amount: ${pattern.amount}")
print(f" Cadence: {pattern.cadence.value}")
print(f" Confidence: {pattern.confidence:.0%}")
print(f" Next charge: {pattern.next_expected_date}")
print(f" Reasoning: {pattern.reasoning}")
stats = detector.get_stats()
print(f"\nStats: {stats}")Data Models
RecurringPattern
Detected recurring transaction pattern.
@dataclass
class RecurringPattern:
merchant_name: str # Original merchant name
normalized_merchant: str # Normalized for grouping
pattern_type: PatternType # FIXED, VARIABLE, IRREGULAR
cadence: CadenceType # MONTHLY, BIWEEKLY, QUARTERLY, ANNUAL
amount: float | None # Fixed amount (or None for variable)
amount_range: tuple[float, float] | None # Range for variable (min, max)
amount_variance_pct: float # Amount variance (std dev / mean)
occurrence_count: int # Number of transactions
first_date: datetime # First transaction date
last_date: datetime # Last transaction date
next_expected_date: datetime # Predicted next charge
date_std_dev: float # Date consistency (lower = better)
confidence: float # 0.0-1.0 (0.90+ = high confidence)
reasoning: str | None # Human-readable explanationBillPrediction
Future bill prediction.
@dataclass
class BillPrediction:
merchant_name: str
expected_date: datetime
expected_amount: float | None
expected_range: tuple[float, float] | None
confidence: float
cadence: CadenceTypeEnums
class CadenceType(str, Enum):
BIWEEKLY = "biweekly" # 13-15 days
MONTHLY = "monthly" # 28-32 days
QUARTERLY = "quarterly" # 85-95 days
ANNUAL = "annual" # 360-370 days
class PatternType(str, Enum):
FIXED = "fixed" # Fixed amount subscriptions
VARIABLE = "variable" # Variable bills (utilities)
IRREGULAR = "irregular" # Quarterly/annual patternsConfiguration
Tuning Parameters
Adjust sensitivity based on use case:
| Parameter | Default | Strict | Lenient | Description |
|---|---|---|---|---|
| min_occurrences | 3 | 4 | 2 | Min transactions to detect pattern |
| amount_tolerance | 0.02 (±2%) | 0.01 (±1%) | 0.05 (±5%) | Amount variance threshold |
| date_tolerance_days | 7 | 3 | 10 | Date variance threshold (days) |
Use cases:
-
Strict (fewer false positives):
- Financial dashboards for accurate spending tracking
- Subscription audit tools
- Budget planning apps
-
Default (balanced):
- General-purpose recurring detection
- Personal finance apps (Mint, Credit Karma)
- Expense categorization
-
Lenient (catch more patterns):
- Exploratory analysis of spending habits
- Detecting irregular/seasonal patterns
- Annual subscription tracking (needs min 2 years data)
Environment Variables
(None required for V1 pattern-based detection)
V2 LLM enhancement will add:
GOOGLE_API_KEY(for merchant normalization)OPENAI_API_KEY(alternative provider)LLM_PROVIDER(default: "google")
Integration with svc-infra
Job Scheduling (Daily Detection)
Use svc-infra jobs for automated daily detection runs:
from svc_infra.jobs.easy import easy_jobs
from fin_infra.recurring import easy_recurring_detection
from fin_infra.banking import easy_banking
# Setup jobs
queue, scheduler = easy_jobs(app, driver="redis", redis_url="redis://localhost")
# Setup detectors
recurring_detector = easy_recurring_detection()
banking = easy_banking(provider="plaid")
# Define detection task
async def detect_recurring_task():
"""Run daily at 2 AM."""
# Fetch transactions from banking provider
users = get_all_users() # Your user retrieval logic
for user in users:
# Get last 90 days of transactions
transactions = await banking.get_transactions(
user.access_token,
days=90
)
# Detect patterns
patterns = recurring_detector.detect_patterns(transactions)
# Store in database
save_patterns(user.id, patterns)
# Send alerts for new subscriptions
new_patterns = [p for p in patterns if is_new(user.id, p)]
if new_patterns:
send_alert(user.email, new_patterns)
# Schedule daily at 2 AM (7200 seconds = 2 hours after midnight)
scheduler.add_task(
name="recurring_detection_daily",
interval_seconds=86400, # 24 hours
func=detect_recurring_task
)Caching (Results + Merchant Normalization)
Use svc-infra cache for performance:
from svc_infra.cache import init_cache, cache_read, cache_write
# Initialize cache
init_cache(url="redis://localhost", prefix="fin", version="v1")
# Cache merchant normalization (1 week TTL)
merchant_resource = resource("merchant", "merchant_name")
@merchant_resource.cache_read(suffix="normalized", ttl=604800) # 7 days
def get_normalized_merchant(merchant_name: str) -> str:
from fin_infra.recurring import normalize_merchant
return normalize_merchant(merchant_name)
# Cache detected subscriptions (24h TTL)
@cache_read(key="recurring:subscriptions:{user_id}", ttl=86400) # 1 day
def get_user_subscriptions(user_id: str) -> list[RecurringPattern]:
# Detect or retrieve from DB
pass
@cache_write(
key="recurring:subscriptions:{user_id}",
ttl=86400,
tags=["recurring", "user:{user_id}"]
)
def save_user_subscriptions(user_id: str, patterns: list[RecurringPattern]):
# Save to DB and cache
passCache hit rates:
- Merchant normalization: ~95% (after warm-up)
- User subscriptions: ~80% (with 24h TTL)
- Reduces database queries by 85%+
Webhooks (Subscription Change Alerts)
Use svc-infra webhooks for real-time notifications:
from svc_infra.webhooks.add import add_webhooks
# Setup webhooks
add_webhooks(
app,
signing_secret="your-webhook-secret",
event_types=[
"recurring.subscription_detected",
"recurring.subscription_changed",
"recurring.subscription_cancelled",
]
)
# Emit events when patterns change
from svc_infra.webhooks import emit_event
async def on_new_subscription(user_id: str, pattern: RecurringPattern):
"""Called when new recurring pattern detected."""
await emit_event(
event_type="recurring.subscription_detected",
data={
"user_id": user_id,
"merchant": pattern.merchant_name,
"amount": pattern.amount,
"cadence": pattern.cadence.value,
"confidence": pattern.confidence,
"next_charge_date": pattern.next_expected_date.isoformat(),
}
)
async def on_subscription_changed(user_id: str, old: RecurringPattern, new: RecurringPattern):
"""Called when subscription amount/date changes."""
await emit_event(
event_type="recurring.subscription_changed",
data={
"user_id": user_id,
"merchant": new.merchant_name,
"old_amount": old.amount,
"new_amount": new.amount,
"confidence": new.confidence,
}
)Webhook delivery:
- Automatic retries (exponential backoff)
- Signature verification (HMAC-SHA256)
- Delivery tracking (success/failure logs)
Logging & Observability
Use svc-infra logging for structured logs:
from svc_infra.logging import setup_logging
import logging
# Setup logging
setup_logging(level="INFO", fmt="json")
logger = logging.getLogger(__name__)
# Log detection results
detector = easy_recurring_detection()
patterns = detector.detect_patterns(transactions)
logger.info(
"recurring_detection_complete",
extra={
"user_id": user_id,
"patterns_detected": len(patterns),
"fixed_count": sum(1 for p in patterns if p.pattern_type == PatternType.FIXED),
"variable_count": sum(1 for p in patterns if p.pattern_type == PatternType.VARIABLE),
"processing_time_ms": processing_time,
}
)Metrics (via svc-infra observability):
recurring_detections_total(counter): Total detections runrecurring_patterns_detected(histogram): Patterns per userrecurring_confidence_avg(gauge): Average confidence scorerecurring_processing_time_ms(histogram): Detection latency
Performance
Benchmarks
Tested on M1 Mac with 1000 transactions:
| Operation | Time | Throughput |
|---|---|---|
| Normalization (100 merchants) | 5ms | 20,000 merchants/sec |
| Fuzzy matching (100 pairs) | 15ms | 6,600 pairs/sec |
| Pattern detection (100 txns) | 25ms | 4,000 txns/sec |
| Pattern detection (1000 txns) | 180ms | 5,500 txns/sec |
Scaling:
- Linear complexity: O(n) where n = transaction count
- Merchant grouping: O(n log n) with fuzzy matching
- Parallelizable: Process users independently with svc-infra jobs
Optimization Tips
-
Cache merchant normalization (svc-infra.cache, 7 day TTL)
- Reduces repeated normalization calls by 95%
-
Batch process users (svc-infra jobs daily at 2 AM)
- Avoid real-time detection on every transaction
-
Filter transactions (only include candidates)
- Skip ATM withdrawals, transfers (generic merchants)
- Only include merchants with 2+ transactions
-
Use pre-defined merchant groups (KNOWN_MERCHANT_GROUPS)
- Skips fuzzy matching for common subscriptions
- Instant grouping for Netflix, Spotify, etc.
Testing
Unit Tests
# Run all recurring tests
poetry run pytest tests/unit/recurring/ -v
# Run specific test class
poetry run pytest tests/unit/recurring/test_recurring.py::TestFixedAmountDetection -v
# Run with coverage
poetry run pytest tests/unit/recurring/ --cov=fin_infra.recurring --cov-report=htmlTest coverage: 37 tests covering:
- Merchant normalization (6 tests)
- Fixed amount detection (4 tests)
- Variable amount detection (2 tests)
- Irregular detection (2 tests)
- Date clustering (3 tests)
- False positive filtering (2 tests)
- Merchant grouping (2 tests)
- Easy builder (4 tests)
- Confidence scoring (2 tests)
Acceptance Tests
(Planned for real transaction datasets)
# Run with labeled test data
poetry run pytest tests/acceptance/test_recurring_accuracy.py -v
# Requires: tests/fixtures/recurring_labeled_data.json
# Contains: 150 labeled transaction histories
# Validates: 85%+ accuracy, <5% false positivesTroubleshooting
Pattern Not Detected
Symptom: Expected subscription not showing in results.
Possible causes:
- Insufficient occurrences (need min 3 for monthly, 2 for annual)
- Amount variance too high (>2% for fixed, >30% for variable)
- Date variance too high (>7 days from expected cadence)
- Generic merchant name filtered out ("Payment", "ATM", etc.)
Solutions:
# Try lenient detection
detector = easy_recurring_detection(
min_occurrences=2,
amount_tolerance=0.05,
date_tolerance_days=10
)
# Check merchant normalization
from fin_infra.recurring import normalize_merchant, is_generic_merchant
print(normalize_merchant("My Merchant")) # See normalized name
print(is_generic_merchant("My Merchant")) # Check if generic
# Inspect detection stats
patterns = detector.detect_patterns(transactions)
stats = detector.get_stats()
print(stats) # See false_positives_filtered countFalse Positives
Symptom: Random purchases detected as recurring.
Possible causes:
- Detection too lenient (amount_tolerance or date_tolerance too high)
- Coincidental purchases (e.g., monthly gas station visits)
- Merchant name too generic (not filtered by is_generic_merchant)
Solutions:
# Use strict detection
detector = easy_recurring_detection(
min_occurrences=4,
amount_tolerance=0.01,
date_tolerance_days=3
)
# Filter by confidence
patterns = detector.detect_patterns(transactions)
high_confidence = [p for p in patterns if p.confidence >= 0.85]
# Add custom generic merchant filter
from fin_infra.recurring.normalizer import is_generic_merchant
def my_is_generic(merchant: str) -> bool:
return is_generic_merchant(merchant) or merchant in ["gas station", "grocery"]Performance Issues
Symptom: Slow detection on large transaction histories.
Possible causes:
- Too many transactions (>1000 per merchant)
- Fuzzy matching on every merchant pair
- No caching of normalization results
Solutions:
# Filter transactions before detection
transactions = [
t for t in all_transactions
if not is_generic_merchant(t["merchant"])
and transaction_count(t["merchant"]) >= 2 # Skip one-off merchants
]
# Use caching (svc-infra.cache)
from svc_infra.cache import cache_read
@cache_read(key="recurring:normalized:{merchant}", ttl=604800)
def get_normalized(merchant: str) -> str:
return normalize_merchant(merchant)
# Batch process with jobs (svc-infra.jobs)
# Run detection daily at 2 AM, not on every requestRoadmap
V1 (Current) ✅
- Pattern-based detection (3-layer hybrid)
- Merchant normalization (fuzzy matching)
- Confidence scoring (multi-factor)
- FastAPI integration (4 endpoints)
- Easy builder (
easy_recurring_detection()) - Comprehensive tests (37 unit tests)
V2 (Planned) - LLM Enhancement
-
LLM merchant normalization (few-shot with Google Gemini)
- Handles edge cases: "SQ *COFFEE SHOP" → "Square Coffee Shop"
- Accuracy: 90-95% (vs 80-85% with fuzzy matching)
- Cost: ~$0.00003/merchant with 95% cache hit
-
LLM variable detection (for ambiguous patterns)
- Semantic understanding: "utility bill seasonal" vs "phone bill with overages"
- Handles >20% variance cases (too complex for statistical methods)
- Accuracy: 88%+ (vs 70% with statistical only)
-
Natural language insights (GET /recurring/insights)
- Example: "You have 5 streaming subscriptions totaling $64.95/month. Consider Disney+ bundle to save $30/month."
- On-demand generation (not automatic, user-initiated)
- Cache: 1-day TTL
-
Multi-provider LLM support
- Google Gemini (default, best cost/performance)
- OpenAI GPT-4 (higher accuracy, 2x cost)
- Anthropic Claude (best for long context)
-
Cost optimization
- Aggressive caching: 95% hit rate for merchant normalization
- LLM only for edge cases: <10% of detections need LLM
- Target: <$0.001/user/month with LLM enabled
Enable V2:
# Coming soon
detector = easy_recurring_detection(
enable_llm=True,
llm_provider="google", # or "openai", "anthropic"
)Examples
Complete Integration Example
from fastapi import FastAPI
from fin_infra.recurring import add_recurring_detection
from fin_infra.banking import add_banking
from svc_infra.jobs.easy import easy_jobs
from svc_infra.cache import init_cache
from svc_infra.logging import setup_logging
# Setup app
app = FastAPI(title="Subscription Tracker")
setup_logging()
# Setup cache
init_cache(url="redis://localhost", prefix="sub", version="v1")
# Add banking integration
banking = add_banking(app, provider="plaid")
# Add recurring detection
recurring = add_recurring_detection(
app,
min_occurrences=3,
amount_tolerance=0.02,
date_tolerance_days=7
)
# Setup jobs for daily detection
queue, scheduler = easy_jobs(app, driver="redis")
async def daily_detection():
"""Run at 2 AM daily."""
users = await get_all_users()
for user in users:
# Fetch transactions
transactions = await banking.get_transactions(
user.plaid_access_token,
days=90
)
# Detect patterns
patterns = recurring.detect_patterns(transactions)
# Save to database
await save_patterns(user.id, patterns)
# Send alerts
new_patterns = [p for p in patterns if is_new(user.id, p)]
if new_patterns:
await send_email(user.email, new_patterns)
scheduler.add_task(
name="daily_recurring_detection",
interval_seconds=86400,
func=daily_detection
)
# API endpoints automatically mounted:
# - POST /recurring/detect
# - GET /recurring/subscriptions
# - GET /recurring/predictions
# - GET /recurring/statsCustom Detection Logic
from fin_infra.recurring import easy_recurring_detection, PatternType
# Create detector
detector = easy_recurring_detection()
# Get transactions (from banking provider or database)
transactions = get_user_transactions(user_id, days=180)
# Detect patterns
patterns = detector.detect_patterns(transactions)
# Filter by type
subscriptions = [p for p in patterns if p.pattern_type == PatternType.FIXED]
bills = [p for p in patterns if p.pattern_type == PatternType.VARIABLE]
annual = [p for p in patterns if p.pattern_type == PatternType.IRREGULAR]
# Calculate total monthly spend
monthly_spend = sum(p.amount for p in subscriptions if p.cadence == CadenceType.MONTHLY)
print(f"Monthly subscriptions: ${monthly_spend:.2f}")
# Get upcoming bills (next 30 days)
from datetime import datetime, timedelta
cutoff = datetime.now() + timedelta(days=30)
upcoming = [
p for p in patterns
if p.next_expected_date <= cutoff
]
upcoming.sort(key=lambda p: p.next_expected_date)
print("Upcoming bills:")
for pattern in upcoming:
print(f" {pattern.next_expected_date.strftime('%Y-%m-%d')}: {pattern.merchant_name} ${pattern.amount}")Recurring Summary (Phase 2 Enhancement)
Generate aggregated insights about user's recurring spending patterns.
Summary Model
from fin_infra.recurring.summary import RecurringSummary, get_recurring_summary
# Generate summary from detected patterns
summary = get_recurring_summary(user_id="user_123", patterns=patterns)
print(summary.model_dump_json(indent=2))Response Structure:
{
"user_id": "user_123",
"total_monthly_cost": 145.97,
"total_monthly_income": 4333.33,
"subscriptions": [
{
"merchant_name": "Netflix",
"amount": 15.99,
"cadence": "monthly",
"monthly_cost": 15.99,
"category": "entertainment",
"confidence": 0.95,
"is_subscription": true
},
{
"merchant_name": "Costco Membership",
"amount": 60.00,
"cadence": "quarterly",
"monthly_cost": 20.00,
"category": "shopping",
"confidence": 0.88,
"is_subscription": false
}
],
"recurring_income": [
{
"merchant_name": "Employer Direct Deposit",
"amount": 2000.00,
"cadence": "biweekly",
"monthly_cost": 4333.33,
"category": "income",
"confidence": 0.98,
"is_subscription": false
}
],
"by_category": {
"entertainment": 25.98,
"shopping": 20.00,
"food_and_drink": 45.99
},
"cancellation_opportunities": [
{
"merchant_name": "Hulu",
"category": "entertainment",
"monthly_savings": 7.99,
"reason": "You have 3 streaming services. Consider consolidating to save money."
}
],
"generated_at": "2025-01-23T10:30:00Z"
}Cadence Normalization
All recurring patterns are normalized to monthly equivalents for easy comparison:
| Cadence | Formula | Example |
|---|---|---|
| Monthly | amount × 1 | $15.99/mo → $15.99/mo |
| Quarterly | amount × 4 / 12 | $60/qtr → $20/mo |
| Biweekly | amount × 26 / 12 | $2000 biweekly → $4333.33/mo |
| Weekly | amount × 52 / 12 | $100/week → $433.33/mo |
| Annual | amount / 12 | $120/year → $10/mo |
FastAPI Endpoint
# GET /recurring/summary?user_id=user_123
# Response includes:
# - Total monthly subscription cost
# - Total monthly recurring income
# - List of all subscriptions with monthly normalization
# - Spending breakdown by category
# - Cancellation opportunities (duplicate services, high-cost unused)Use Cases
1. Budget Dashboard
summary = get_recurring_summary(user_id, patterns)
print(f"Monthly Subscriptions: ${summary.total_monthly_cost:.2f}")
print(f"Monthly Income: ${summary.total_monthly_income:.2f}")
print(f"Net Monthly: ${summary.total_monthly_income - summary.total_monthly_cost:.2f}")
# Show category breakdown
for category, cost in summary.by_category.items():
print(f" {category}: ${cost:.2f}")2. Cancellation Recommendations
if summary.cancellation_opportunities:
print("💡 Potential Savings:")
for opp in summary.cancellation_opportunities:
print(f" Cancel {opp.merchant_name}: Save ${opp.monthly_savings:.2f}/month")
print(f" Reason: {opp.reason}")3. Spending Alerts
# Alert if subscription costs exceed threshold
if summary.total_monthly_cost > 200:
send_alert(
user_id,
title="High Subscription Costs",
message=f"You're spending ${summary.total_monthly_cost:.2f}/month on subscriptions"
)
# Alert if duplicate services detected
if len(summary.cancellation_opportunities) > 0:
send_alert(
user_id,
title="Potential Savings",
message=f"Cancel {len(summary.cancellation_opportunities)} services to save money"
)Cancellation Opportunity Detection
The summary automatically identifies potential savings:
Duplicate Streaming Services (>2 detected):
- Netflix + Hulu + Disney+ + HBO Max → Suggests canceling cheapest
High-Cost Subscriptions (>$50/month):
- Identifies subscriptions over $50/month for review
Unused Services (low transaction count):
- Detects subscriptions with <3 occurrences (might be unused)
Implementation Example
from fin_infra.recurring import easy_recurring_detection, add_recurring_detection
from fin_infra.recurring.summary import get_recurring_summary
from fastapi import FastAPI, Query
app = FastAPI()
# Add recurring detection endpoints
detector = add_recurring_detection(app, prefix="/recurring")
# Custom summary endpoint
@app.get("/recurring/summary")
async def get_summary(user_id: str = Query(...)):
"""Get recurring spending summary for user"""
# Fetch transactions (from banking provider)
transactions = await get_user_transactions(user_id, days=180)
# Detect patterns
patterns = detector.detect_patterns(transactions)
# Generate summary
summary = get_recurring_summary(user_id, patterns)
return summaryProduction Considerations
Caching: Cache summaries for 24 hours (daily refresh):
from svc_infra.cache import cache_read, cache_write, resource
recurring = resource("recurring_summary", "user_id")
@app.get("/recurring/summary")
@recurring.cache_read(ttl=86400) # 24 hours
async def get_cached_summary(user_id: str):
transactions = await get_user_transactions(user_id, days=180)
patterns = detector.detect_patterns(transactions)
return get_recurring_summary(user_id, patterns)Background Processing: Generate summaries overnight for all users:
from svc_infra.jobs import easy_jobs
worker, scheduler = easy_jobs(app)
@scheduler.scheduled_job('cron', hour=3) # 3 AM daily
async def generate_all_summaries():
"""Pre-generate summaries for all users"""
for user in users:
transactions = await get_user_transactions(user.id, days=180)
patterns = detector.detect_patterns(transactions)
summary = get_recurring_summary(user.id, patterns)
# Cache for next day
await cache_summary(user.id, summary)LLM-Enhanced Features (Optional)
The core recurring detection system above is fully algorithmic. For additional accuracy and insights, fin-infra provides optional LLM-enhanced features that can be enabled when ai-infra is available.
Overview
V2 adds LLM-assisted capabilities:
- Merchant Normalization: Clean up messy merchant names ("NFLX*SUB" → "Netflix")
- Variable Amount Detection: Identify utility-style recurring charges with varying amounts
- Subscription Insights: AI-generated summaries and savings recommendations
These features are optional — the LLM wiring is behind enable_llm / enable_cache flags and falls back to deterministic logic when unavailable.
Quick Start
from fin_infra.recurring.normalizers import MerchantNormalizer
from fin_infra.recurring.detectors_llm import VariableDetectorLLM
from fin_infra.recurring.insights import SubscriptionInsightsGenerator
# Normalize merchant names
norm = MerchantNormalizer(provider="google", enable_cache=False)
result = await norm.normalize("NFLX*SUB") # → "Netflix"
# Detect variable recurring patterns
det = VariableDetectorLLM(provider="google")
amounts = [45.5, 52.3, 48.75]
date_pattern = "Monthly (15th ±3 days)"
pattern = await det.detect("City Electric", amounts, date_pattern)
# Generate subscription insights
gen = SubscriptionInsightsGenerator(provider="google", enable_cache=False)
insights = await gen.generate([
{"merchant": "Netflix", "amount": 15.99, "cadence": "monthly"}
])Design Notes
- Merchant Normalization: Uses few-shot prompting with model-specific templates. Gracefully degrades to rule-based heuristics when LLM not available.
- Variable Detection: Expects a list of numeric amounts plus a date pattern string (e.g., "Monthly (15th ±3 days)"). Returns a
VariableRecurringPatternPydantic model withexpected_range: Optional[tuple[float, float]]. - Insights Generator: Accepts a list of subscriptions and returns
SubscriptionInsights(summary, top_subscriptions, recommendations, total_monthly_cost, potential_savings).
LLM Testing
- Unit tests:
tests/unit/test_recurring_normalizers.py,tests/unit/test_recurring_detectors_llm.py,tests/unit/test_recurring_insights.py— run withpytest tests/unit -q. - Acceptance tests:
tests/acceptance/test_recurring_llm.py— skipped unlessGOOGLE_API_KEY(or other provider key) is set. Exercises real model calls.
Cost & Budgeting
Each LLM call increments in-memory budget counters (_daily_cost, _monthly_cost) to protect costs:
- Defaults:
$0.10/day,$2.00/month - Production: Replace tracking with Redis or a shared store
Troubleshooting
AttributeError: ... does not have the attribute 'LLM': Ensureai-infrais installed or mocks are in place for tests- Cache-related tests skipped: Enable
svc-infracache shims in your environment; run as integration/acceptance with Redis
Related Documentation
- Transaction Categorization - Categorize transactions by type
- Banking Integration - Fetch transactions from Plaid/Teller
- svc-infra Jobs - Scheduled task processing
- svc-infra Cache - Redis caching layer
- svc-infra Webhooks - Event notifications