Active Graph KG Operations Guide¶
Last Updated: 2025-11-12 Target Audience: SREs, DevOps, Production Support
This guide covers operational procedures for Active Graph KG connector infrastructure including monitoring, troubleshooting, and maintenance tasks.
Table of Contents¶
- Architecture Overview
- Monitoring & Alerts
- Webhook Troubleshooting
- Worker Troubleshooting
- Ingestion Troubleshooting
- Purger Operations
- Cache Subscriber
- Key Rotation
- Common Operations
- Incident Response
Architecture Overview¶
Active Graph KG connector system consists of:
- API Server: Hosts webhook endpoints (
/_webhooks/s3,/_webhooks/gcs) - Queue (Redis): Stores connector events per tenant/provider (
connector:{provider}:{tenant}:queue) - Worker: Background process that polls queues and processes changes
- Scheduler: APScheduler-based cron tasks (purger runs daily at 02:00 UTC)
- Config Store: Encrypted connector configurations in PostgreSQL
Data Flow¶
Key Metrics¶
| Metric | Description | Alert Threshold |
|---|---|---|
webhook_pubsub_verify_total{result} |
Webhook signature verifications | Failure rate >10% |
connector_worker_queue_depth{provider,tenant} |
Queue backlog per tenant | >1000 items for 10m |
connector_ingest_total |
Successful ingestions | 0 for 30m (stalled) |
connector_ingest_errors_total |
Ingestion failures | Error rate >1% |
connector_purger_total{result} |
Purger executions | Any errors |
connector_rotation_total{result} |
Key rotation results | Any errors |
Monitoring & Alerts¶
Prometheus Setup¶
-
Add alert rules to Prometheus:
-
Configure Alertmanager routes:
-
Scrape targets:
Grafana Dashboards¶
Import observability/dashboards/connector_overview.json for:
- Ingestion rate/errors by provider
- Queue depth heatmap
- Webhook verification success rate
- Purger execution history
- P50/P95/P99 latency
Quick metrics queries:
# Ingestion rate by provider
sum(rate(connector_ingest_total[5m])) by (provider)
# Error rate percentage
(sum(rate(connector_ingest_errors_total[5m])) / sum(rate(connector_ingest_total[5m]))) * 100
# Queue depth (current)
connector_worker_queue_depth{provider="gcs",tenant="default"}
# Webhook verify failures
sum(rate(webhook_pubsub_verify_total{result!~"secret_ok|oidc_ok|skipped"}[5m]))
Webhook Troubleshooting¶
Problem: High Webhook Verification Failures¶
Alert: WebhookVerificationFailuresHigh
Symptoms:
- >10% of webhook requests failing signature verification
- webhook_pubsub_verify_total{result="signature_invalid"} increasing
Diagnosis: 1. Check webhook logs:
-
Verify environment variables:
-
Test manual webhook:
Resolution:
- Secret mismatch: Rotate subscription token and update PUBSUB_VERIFY_SECRET
- OIDC audience mismatch: Update PUBSUB_OIDC_AUDIENCE to match subscription config
- Missing headers: Ensure Pub/Sub subscription configured with correct auth method
Prevention: - Store secrets in secure vault (AWS Secrets Manager, GCP Secret Manager) - Automate secret rotation (90-day cycle) - Add integration tests for webhook auth
Problem: Webhook Topic Rejected¶
Alert: WebhookTopicRejected
Symptoms:
- webhook_topic_rejected_total > 0
- Logs show "topic not in allowlist"
Diagnosis: 1. Check tenant's topic allowlist:
SELECT tenant_id, provider, config->>'topic_allowlist'
FROM connector_configs
WHERE tenant_id = 'default' AND provider = 'gcs';
- Compare with incoming topic:
Resolution:
# Update allowlist via admin API
curl -X PATCH https://$HOST/_admin/connectors/gcs/config \
-H "Authorization: Bearer $ADMIN_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"tenant_id": "default",
"config": {
"topic_allowlist": ["projects/my-proj/topics/activekg-gcs-prod"]
}
}'
Prevention: - Document topic naming conventions - Validate topic on connector registration - Alert on topic rejections (already configured)
Worker Troubleshooting¶
Problem: High Queue Depth¶
Alert: ConnectorQueueDepthHigh
Symptoms:
- connector_worker_queue_depth{provider,tenant} > 1000 for >10m
- Ingestion lag increasing
Diagnosis: 1. Check queue depth directly:
-
Verify worker is running:
-
Check worker logs for errors:
Common Causes: - Worker not running (crashed, not deployed) - Worker misconfigured (wrong REDIS_URL, missing credentials) - Downstream bottleneck (database connection pool exhausted) - High error rate (items failing and not being retried)
Resolution:
Scenario 1: Worker not running
# Restart worker
python -m activekg.connectors.worker
# or
kubectl rollout restart deployment/activekg-worker
Scenario 2: Database bottleneck
# Check active connections
psql $ACTIVEKG_DSN -c "SELECT count(*) FROM pg_stat_activity WHERE application_name = 'activekg';"
# Increase connection pool if needed
export ACTIVEKG_POOL_SIZE=20 # Default: 10
Scenario 3: Scaling
# Scale worker horizontally (multiple workers poll same queue)
kubectl scale deployment/activekg-worker --replicas=3
Scenario 4: Purge stale items
# Check oldest item in queue (LINDEX -1 gets last item)
redis-cli --raw LINDEX "connector:gcs:default:queue" -1 | jq '.modified_at'
# If >7 days old, consider manual purge (DANGER: only after investigation)
redis-cli LTRIM "connector:gcs:default:queue" 0 0 # Keeps only 1 item (effectively clears)
Prevention: - Set up worker autoscaling based on queue depth - Monitor worker health endpoints - Add dead-letter queue for permanently failed items
Problem: Worker Processing Errors¶
Symptoms:
- connector_worker_errors_total increasing
- Queue depth not decreasing despite worker running
Diagnosis: 1. Check error types:
- Common error types:
no_config: Connector config not found/disabledparse: Invalid queue item JSONprocessing: Ingestion failure (network, permissions, etc.)-
unsupported_provider: Unknown provider -
Examine specific errors:
Resolution by Error Type:
| Error Type | Cause | Fix |
|---|---|---|
no_config |
Config deleted/disabled | Re-register connector |
parse |
Corrupted queue item | Clear queue, investigate webhook format changes |
processing |
Network/permissions | Check GCS/S3 permissions, retry manually |
unsupported_provider |
Code bug or typo | Fix provider name in config |
Ingestion Troubleshooting¶
Problem: High Ingestion Error Rate¶
Alert: IngestErrorRateHigh
Symptoms:
- Error rate >1% for >10m
- connector_ingest_errors_total increasing
Diagnosis: 1. Check error distribution:
-
Sample failed items:
-
Test single item manually:
from activekg.connectors.providers.gcs import GCSConnector from activekg.connectors.ingest import IngestionProcessor config = {...} # From DB connector = GCSConnector(tenant_id="default", config=config) processor = IngestionProcessor(connector=connector, repo=repo, redis_client=redis) # Test single URI changes = [ChangeItem(uri="gs://bucket/docs/sample.pdf", operation="upsert")] result = processor.process_changes(changes)
Common Issues: - Permissions: GCS/S3 bucket not accessible (403/404 errors) - Format: Unsupported file type or corrupted file - Size: File too large (timeout, OOM) - Embedding: Embedding service unavailable
Resolution: 1. Permission errors:
-
Format errors:
-
Size errors:
Problem: Ingestion Stalled¶
Alert: IngestStalled
Symptoms:
- connector_ingest_total rate = 0 for >30m
- No activity despite webhook events arriving
Diagnosis: 1. Verify webhooks arriving:
-
Check worker status:
-
Check database connectivity:
Resolution: - If queue empty but expecting traffic → Check webhook configuration - If queue full but worker idle → Restart worker - If database down → Fix database connectivity first - If all healthy but still stalled → Check for deadlock/blocking queries
-- Check for blocking queries
SELECT pid, wait_event_type, wait_event, state, query
FROM pg_stat_activity
WHERE application_name = 'activekg' AND state != 'idle';
Purger¶
Daily Purge Schedule¶
Purger runs daily at 02:00 UTC via APScheduler cron job.
Configuration:
export RUN_SCHEDULER=true # Enable scheduler
export PURGER_BATCH_SIZE=500 # Items per batch (default: 500)
export PURGER_RETENTION_DAYS=30 # Grace period before hard delete (default: 30)
Problem: Purger Errors¶
Alert: PurgerErrors
Symptoms:
- connector_purger_total{result="error"} > 0
Diagnosis:
# Check scheduler logs
kubectl logs -l app=activekg-api | grep "purge cycle"
# Manual purge dry-run
curl -X POST http://$HOST/_admin/connectors/purge_deleted \
-H "Content-Type: application/json" \
-d '{"dry_run": true, "tenant_id": "default"}'
Common Errors: - Database connection lost during purge - Transaction timeout (purging too many items) - Permission denied on connector_configs table
Resolution: 1. Transaction timeout:
- Manual purge (if scheduler broken):
Cache Subscriber¶
GCS Pub/Sub subscriber maintains long-lived connection to receive real-time notifications.
Problem: High Reconnect Rate¶
Alert: PubSubReconnectsHigh
Symptoms:
- connector_pubsub_reconnect_total > 5 in 15m
- Intermittent webhook delivery delays
Diagnosis:
# Check subscriber logs
kubectl logs -l app=activekg-api | grep "subscriber"
# Check network connectivity to Pub/Sub
curl -I https://pubsub.googleapis.com/v1/projects/$PROJECT/topics
Common Causes: - Network instability (firewall, NAT timeout) - Pub/Sub service disruption - Client credential expiration - Resource limits (file descriptors, memory)
Resolution: 1. Network issues:
-
Credential issues:
-
Resource limits:
Key Rotation¶
KEK (Key Encryption Key) Rotation¶
Frequency: Every 90 days (recommended)
Procedure:
-
Generate new KEK:
-
Add to environment (non-breaking):
-
Rotate configs (re-encrypt with new key):
-
Monitor rotation:
-
Verify all configs rotated:
-
Remove old KEK (after all configs rotated):
Problem: Rotation Errors¶
Alert: RotationErrors
Diagnosis:
# Check rotation logs
kubectl logs -l app=activekg-api | grep "rotation"
# Check failed rows
curl -X POST http://$HOST/_admin/connectors/rotate_keys \
-H "Authorization: Bearer $ADMIN_TOKEN" \
-d '{"dry_run": true}'
Resolution:
# Retry failed rows only
curl -X POST http://$HOST/_admin/connectors/rotate_keys \
-H "Authorization: Bearer $ADMIN_TOKEN" \
-d '{"tenant_id": "default", "batch_size": 10}'
Problem: Config Decryption Failures¶
Alert: ConfigDecryptFailures
Symptoms:
- Worker can't load connector config
- connector_config_decrypt_failures_total > 0
Diagnosis:
# Check key versions
echo $CONNECTOR_KEK_ACTIVE_VERSION
echo $CONNECTOR_KEK_V1
echo $CONNECTOR_KEK_V2
# Check DB key versions
psql $ACTIVEKG_DSN -c "SELECT DISTINCT key_version FROM connector_configs;"
Resolution: 1. Missing KEK: Re-add old KEK temporarily
- Corrupted config: Delete and re-register
Common Operations¶
Register New Connector¶
GCS Example:
curl -X POST http://$HOST/_admin/connectors/gcs/register \
-H "Authorization: Bearer $ADMIN_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"tenant_id": "acme-corp",
"config": {
"bucket": "acme-docs",
"prefix": "knowledge-base/",
"project": "acme-prod",
"credentials_path": "/secrets/gcs-key.json",
"enabled": true,
"topic_allowlist": ["projects/acme-prod/topics/activekg-gcs"]
}
}'
Backfill Historical Data¶
curl -X POST http://$HOST/_admin/connectors/gcs/backfill \
-H "Authorization: Bearer $ADMIN_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"tenant_id": "acme-corp",
"limit": 500,
"prefix": "knowledge-base/q4-reports/"
}'
Manual Purge¶
# Dry-run first
curl -X POST http://$HOST/_admin/connectors/purge_deleted \
-H "Content-Type: application/json" \
-d '{"dry_run": true, "tenant_id": "acme-corp"}'
# Execute
curl -X POST http://$HOST/_admin/connectors/purge_deleted \
-H "Content-Type: application/json" \
-d '{"dry_run": false, "tenant_id": "acme-corp", "batch_size": 500}'
Check Health¶
# API health
curl http://$HOST/health
# Webhook health
curl http://$HOST/_webhooks/gcs/health
# Metrics
curl http://$HOST/metrics
Incident Response¶
Severity Levels¶
| Severity | Response Time | Example |
|---|---|---|
| Critical | 15 min | Ingestion completely down, data loss risk |
| Warning | 1 hour | High error rate, queue backlog |
| Info | Next business day | Single tenant issue, minor config problem |
Incident Checklist¶
- Acknowledge alert in PagerDuty/OpsGenie
- Check dashboards for context (Grafana connector overview)
- Review recent changes (deployments, config updates, traffic spikes)
- Gather logs:
- Identify root cause using runbooks above
- Mitigate (restart service, scale up, disable problematic tenant)
- Monitor recovery (metrics return to normal)
- Document incident (postmortem, runbook updates)
Emergency Contacts¶
- On-Call Engineer: Check PagerDuty schedule
- Connector Team Lead: [Contact info]
- Database Team: [Contact info] (for PostgreSQL issues)
- Cloud Provider Support: [Escalation path for GCS/S3 issues]
Appendix¶
Environment Variables Reference¶
| Variable | Description | Default |
|---|---|---|
REDIS_URL |
Redis connection string | redis://localhost:6379/0 |
ACTIVEKG_DSN |
PostgreSQL connection | Required |
RUN_SCHEDULER |
Enable APScheduler | true |
PUBSUB_VERIFY_SECRET |
Pub/Sub push endpoint token | None |
PUBSUB_VERIFY_OIDC |
Enable OIDC verification | false |
CONNECTOR_KEK_V1 |
Key Encryption Key v1 | Required |
CONNECTOR_KEK_ACTIVE_VERSION |
Active KEK version | 1 |
CONNECTOR_WORKER_BATCH_SIZE |
Worker batch size | 10 |
CONNECTOR_WORKER_POLL_INTERVAL |
Worker poll interval (seconds) | 1.0 |
PURGER_BATCH_SIZE |
Purger batch size | 500 |
PURGER_RETENTION_DAYS |
Soft-delete retention | 30 |
Useful Queries¶
-- Check connector configs
SELECT tenant_id, provider, config->>'bucket', enabled, created_at, updated_at
FROM connector_configs
ORDER BY updated_at DESC;
-- Check ingestion stats
SELECT
DATE_TRUNC('hour', created_at) AS hour,
COUNT(*) AS nodes_created
FROM nodes
WHERE entity_type = 'Document'
AND created_at > NOW() - INTERVAL '24 hours'
GROUP BY hour
ORDER BY hour DESC;
-- Check soft-deleted nodes
SELECT COUNT(*), entity_type
FROM nodes
WHERE deleted_at IS NOT NULL
AND deleted_at < NOW() - INTERVAL '30 days'
GROUP BY entity_type;
Document Version: 1.0 Last Reviewed: 2025-11-12 Next Review: 2025-12-12