Data Freshness Monitoring

Monitor data pipeline health and alert on stale data

Monitor data pipeline health and alert on stale data.

from datetime import datetime, timezone, timedelta

# Datasets to monitor with expected update frequencies
MONITORED_DATASETS = {
    "ercot_fuel_mix": timedelta(minutes=10),
    "ercot_load": timedelta(minutes=10),
    "ercot_lmp_by_settlement_point": timedelta(minutes=10),
    "caiso_lmp_real_time_5_min": timedelta(minutes=10),
    "pjm_load": timedelta(minutes=15),
}

def check_all_datasets():
    """Check freshness of all monitored datasets."""
    now = datetime.now(timezone.utc)
    issues = []

    for dataset_id, max_age in MONITORED_DATASETS.items():
        metadata = client.get(
                f"{client.host}/datasets/{dataset_id}",
                return_raw_response_json=True
        )

        latest = datetime.fromisoformat(
            metadata['latest_available_time_utc'].replace('Z', '+00:00')
        )
        age = now - latest

        status = "OK" if age <= max_age * 2 else "STALE"
        age_str = f"{age.total_seconds() / 60:.0f} min"

        if status == "STALE":
            issues.append({
                'dataset': dataset_id,
                'age': age_str,
                'expected': f"{max_age.total_seconds() / 60:.0f} min"
            })

        print(f"[{status}] {dataset_id}: {age_str} old")

    return issues

print("=== Data Freshness Check ===")
print(f"Timestamp: {datetime.now(timezone.utc).isoformat()}\n")

issues = check_all_datasets()

if issues:
    print(f"\n*** ALERT: {len(issues)} datasets are stale ***")
    for issue in issues:
        print(f"  {issue['dataset']}: {issue['age']} old (expected: {issue['expected']})")
else:
    print("\nAll datasets are fresh.")

Last updated

Was this helpful?