For the complete documentation index, see llms.txt. This page is also available as Markdown.

Data Freshness Monitoring

Monitor data pipeline health and alert on stale data

Monitor data pipeline health and alert on stale data.

from datetime import datetime, timezone, timedelta

# Datasets to monitor with expected update frequencies
MONITORED_DATASETS = {
    "ercot_fuel_mix": timedelta(minutes=10),
    "ercot_load": timedelta(minutes=10),
    "ercot_lmp_by_settlement_point": timedelta(minutes=10),
    "caiso_lmp_real_time_5_min": timedelta(minutes=10),
    "pjm_load": timedelta(minutes=15),
}

def check_all_datasets():
    """Check freshness of all monitored datasets."""
    now = datetime.now(timezone.utc)
    issues = []

    for dataset_id, max_age in MONITORED_DATASETS.items():
        metadata = client.get(
                f"{client.host}/datasets/{dataset_id}",
                return_raw_response_json=True
        )

        latest = datetime.fromisoformat(
            metadata['latest_available_time_utc'].replace('Z', '+00:00')
        )
        age = now - latest

        status = "OK" if age <= max_age * 2 else "STALE"
        age_str = f"{age.total_seconds() / 60:.0f} min"

        if status == "STALE":
            issues.append({
                'dataset': dataset_id,
                'age': age_str,
                'expected': f"{max_age.total_seconds() / 60:.0f} min"
            })

        print(f"[{status}] {dataset_id}: {age_str} old")

    return issues

print("=== Data Freshness Check ===")
print(f"Timestamp: {datetime.now(timezone.utc).isoformat()}\n")

issues = check_all_datasets()

if issues:
    print(f"\n*** ALERT: {len(issues)} datasets are stale ***")
    for issue in issues:
        print(f"  {issue['dataset']}: {issue['age']} old (expected: {issue['expected']})")
else:
    print("\nAll datasets are fresh.")

Last updated

Was this helpful?