Data Freshness Monitoring
Monitor data pipeline health and alert on stale data
from datetime import datetime, timezone, timedelta
# Datasets to monitor with expected update frequencies
MONITORED_DATASETS = {
"ercot_fuel_mix": timedelta(minutes=10),
"ercot_load": timedelta(minutes=10),
"ercot_lmp_by_settlement_point": timedelta(minutes=10),
"caiso_lmp_real_time_5_min": timedelta(minutes=10),
"pjm_load": timedelta(minutes=15),
}
def check_all_datasets():
"""Check freshness of all monitored datasets."""
now = datetime.now(timezone.utc)
issues = []
for dataset_id, max_age in MONITORED_DATASETS.items():
metadata = client.get(
f"{client.host}/datasets/{dataset_id}",
return_raw_response_json=True
)
latest = datetime.fromisoformat(
metadata['latest_available_time_utc'].replace('Z', '+00:00')
)
age = now - latest
status = "OK" if age <= max_age * 2 else "STALE"
age_str = f"{age.total_seconds() / 60:.0f} min"
if status == "STALE":
issues.append({
'dataset': dataset_id,
'age': age_str,
'expected': f"{max_age.total_seconds() / 60:.0f} min"
})
print(f"[{status}] {dataset_id}: {age_str} old")
return issues
print("=== Data Freshness Check ===")
print(f"Timestamp: {datetime.now(timezone.utc).isoformat()}\n")
issues = check_all_datasets()
if issues:
print(f"\n*** ALERT: {len(issues)} datasets are stale ***")
for issue in issues:
print(f" {issue['dataset']}: {issue['age']} old (expected: {issue['expected']})")
else:
print("\nAll datasets are fresh.")Last updated
Was this helpful?

