feat(telemetry): add caching, alerts, migration & dashboard improvements
- Add Redis/in-memory caching layer (cache.go) - Add SMTP alerting for high failure rates (alerts.go) - Add data migration script from old API (migrate.go) - Add docker-compose.yml for easy deployment - Move dashboard to / with redirect from /dashboard - Add dark/light mode toggle - Add error analysis and failed apps statistics - Add PVE version and LXC/VM type stats - Add /metrics Prometheus endpoint - Add /api/records pagination endpoint - Add CSV export functionality - Enhanced healthcheck with PB connection status New ENV vars: - Cache: ENABLE_CACHE, CACHE_TTL_SECONDS, ENABLE_REDIS, REDIS_URL - Alerts: ALERT_ENABLED, SMTP_*, ALERT_FAILURE_THRESHOLD, etc. - Migration: RUN_MIGRATION, MIGRATION_REQUIRED, MIGRATION_SOURCE_URL
This commit is contained in:
@@ -37,6 +37,25 @@ type Config struct {
|
||||
RateKeyHeader string // e.g. "X-Telemetry-Key"
|
||||
RequestTimeout time.Duration // upstream timeout
|
||||
EnableReqLogging bool // default false (GDPR-friendly)
|
||||
|
||||
// Cache
|
||||
RedisURL string
|
||||
EnableRedis bool
|
||||
CacheTTL time.Duration
|
||||
CacheEnabled bool
|
||||
|
||||
// Alerts (SMTP)
|
||||
AlertEnabled bool
|
||||
SMTPHost string
|
||||
SMTPPort int
|
||||
SMTPUser string
|
||||
SMTPPassword string
|
||||
SMTPFrom string
|
||||
SMTPTo []string
|
||||
SMTPUseTLS bool
|
||||
AlertFailureThreshold float64
|
||||
AlertCheckInterval time.Duration
|
||||
AlertCooldown time.Duration
|
||||
}
|
||||
|
||||
// TelemetryIn matches payload from api.func (bash client)
|
||||
@@ -238,6 +257,59 @@ func (p *PBClient) UpdateTelemetryStatus(ctx context.Context, recordID string, u
|
||||
return nil
|
||||
}
|
||||
|
||||
// FetchRecordsPaginated retrieves records with pagination and optional filters
|
||||
func (p *PBClient) FetchRecordsPaginated(ctx context.Context, page, limit int, status, app, osType string) ([]TelemetryRecord, int, error) {
|
||||
if err := p.ensureAuth(ctx); err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
// Build filter
|
||||
var filters []string
|
||||
if status != "" {
|
||||
filters = append(filters, fmt.Sprintf("status='%s'", status))
|
||||
}
|
||||
if app != "" {
|
||||
filters = append(filters, fmt.Sprintf("nsapp~'%s'", app))
|
||||
}
|
||||
if osType != "" {
|
||||
filters = append(filters, fmt.Sprintf("os_type='%s'", osType))
|
||||
}
|
||||
|
||||
filterStr := ""
|
||||
if len(filters) > 0 {
|
||||
filterStr = "&filter=" + strings.Join(filters, "&&")
|
||||
}
|
||||
|
||||
reqURL := fmt.Sprintf("%s/api/collections/%s/records?sort=-created&page=%d&perPage=%d%s",
|
||||
p.baseURL, p.targetColl, page, limit, filterStr)
|
||||
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, reqURL, nil)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
req.Header.Set("Authorization", "Bearer "+p.token)
|
||||
|
||||
resp, err := p.http.Do(req)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
|
||||
return nil, 0, fmt.Errorf("pocketbase fetch failed: %s", resp.Status)
|
||||
}
|
||||
|
||||
var result struct {
|
||||
Items []TelemetryRecord `json:"items"`
|
||||
TotalItems int `json:"totalItems"`
|
||||
}
|
||||
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
return result.Items, result.TotalItems, nil
|
||||
}
|
||||
|
||||
// UpsertTelemetry handles both creation and updates intelligently
|
||||
// - status="installing": Always creates a new record
|
||||
// - status!="installing": Updates existing record (found by random_id) with status/error/exit_code only
|
||||
@@ -536,6 +608,25 @@ func main() {
|
||||
RateKeyHeader: env("RATE_KEY_HEADER", "X-Telemetry-Key"),
|
||||
RequestTimeout: time.Duration(envInt("UPSTREAM_TIMEOUT_MS", 4000)) * time.Millisecond,
|
||||
EnableReqLogging: envBool("ENABLE_REQUEST_LOGGING", false),
|
||||
|
||||
// Cache config
|
||||
RedisURL: env("REDIS_URL", ""),
|
||||
EnableRedis: envBool("ENABLE_REDIS", false),
|
||||
CacheTTL: time.Duration(envInt("CACHE_TTL_SECONDS", 60)) * time.Second,
|
||||
CacheEnabled: envBool("ENABLE_CACHE", true),
|
||||
|
||||
// Alert config
|
||||
AlertEnabled: envBool("ALERT_ENABLED", false),
|
||||
SMTPHost: env("SMTP_HOST", ""),
|
||||
SMTPPort: envInt("SMTP_PORT", 587),
|
||||
SMTPUser: env("SMTP_USER", ""),
|
||||
SMTPPassword: env("SMTP_PASSWORD", ""),
|
||||
SMTPFrom: env("SMTP_FROM", "telemetry@proxmoxved.local"),
|
||||
SMTPTo: splitCSV(env("SMTP_TO", "")),
|
||||
SMTPUseTLS: envBool("SMTP_USE_TLS", false),
|
||||
AlertFailureThreshold: envFloat("ALERT_FAILURE_THRESHOLD", 20.0),
|
||||
AlertCheckInterval: time.Duration(envInt("ALERT_CHECK_INTERVAL_MIN", 15)) * time.Minute,
|
||||
AlertCooldown: time.Duration(envInt("ALERT_COOLDOWN_MIN", 60)) * time.Minute,
|
||||
}
|
||||
|
||||
var pt *ProxyTrust
|
||||
@@ -550,20 +641,100 @@ func main() {
|
||||
pb := NewPBClient(cfg)
|
||||
rl := NewRateLimiter(cfg.RateLimitRPM, cfg.RateBurst)
|
||||
|
||||
// Initialize cache
|
||||
cache := NewCache(CacheConfig{
|
||||
RedisURL: cfg.RedisURL,
|
||||
EnableRedis: cfg.EnableRedis,
|
||||
DefaultTTL: cfg.CacheTTL,
|
||||
})
|
||||
|
||||
// Initialize alerter
|
||||
alerter := NewAlerter(AlertConfig{
|
||||
Enabled: cfg.AlertEnabled,
|
||||
SMTPHost: cfg.SMTPHost,
|
||||
SMTPPort: cfg.SMTPPort,
|
||||
SMTPUser: cfg.SMTPUser,
|
||||
SMTPPassword: cfg.SMTPPassword,
|
||||
SMTPFrom: cfg.SMTPFrom,
|
||||
SMTPTo: cfg.SMTPTo,
|
||||
UseTLS: cfg.SMTPUseTLS,
|
||||
FailureThreshold: cfg.AlertFailureThreshold,
|
||||
CheckInterval: cfg.AlertCheckInterval,
|
||||
Cooldown: cfg.AlertCooldown,
|
||||
}, pb)
|
||||
alerter.Start()
|
||||
|
||||
mux := http.NewServeMux()
|
||||
|
||||
mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(200)
|
||||
_, _ = w.Write([]byte("ok"))
|
||||
// Check PocketBase connectivity
|
||||
ctx, cancel := context.WithTimeout(r.Context(), 2*time.Second)
|
||||
defer cancel()
|
||||
|
||||
status := map[string]interface{}{
|
||||
"status": "ok",
|
||||
"time": time.Now().UTC().Format(time.RFC3339),
|
||||
}
|
||||
|
||||
if err := pb.ensureAuth(ctx); err != nil {
|
||||
status["status"] = "degraded"
|
||||
status["pocketbase"] = "disconnected"
|
||||
w.WriteHeader(503)
|
||||
} else {
|
||||
status["pocketbase"] = "connected"
|
||||
w.WriteHeader(200)
|
||||
}
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
json.NewEncoder(w).Encode(status)
|
||||
})
|
||||
|
||||
// Dashboard HTML page
|
||||
mux.HandleFunc("/dashboard", func(w http.ResponseWriter, r *http.Request) {
|
||||
// Dashboard HTML page - serve on root
|
||||
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
|
||||
if r.URL.Path != "/" {
|
||||
http.NotFound(w, r)
|
||||
return
|
||||
}
|
||||
w.Header().Set("Content-Type", "text/html; charset=utf-8")
|
||||
w.Header().Set("Cache-Control", "no-cache, no-store, must-revalidate")
|
||||
_, _ = w.Write([]byte(DashboardHTML()))
|
||||
})
|
||||
|
||||
// Dashboard API endpoint
|
||||
// Redirect /dashboard to / for backwards compatibility
|
||||
mux.HandleFunc("/dashboard", func(w http.ResponseWriter, r *http.Request) {
|
||||
http.Redirect(w, r, "/", http.StatusMovedPermanently)
|
||||
})
|
||||
|
||||
// Prometheus-style metrics endpoint
|
||||
mux.HandleFunc("/metrics", func(w http.ResponseWriter, r *http.Request) {
|
||||
ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second)
|
||||
defer cancel()
|
||||
|
||||
data, err := pb.FetchDashboardData(ctx, 1) // Last 24h only for metrics
|
||||
if err != nil {
|
||||
http.Error(w, "failed to fetch metrics", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
w.Header().Set("Content-Type", "text/plain; version=0.0.4")
|
||||
fmt.Fprintf(w, "# HELP telemetry_installs_total Total number of installations\n")
|
||||
fmt.Fprintf(w, "# TYPE telemetry_installs_total counter\n")
|
||||
fmt.Fprintf(w, "telemetry_installs_total %d\n\n", data.TotalInstalls)
|
||||
fmt.Fprintf(w, "# HELP telemetry_installs_success_total Successful installations\n")
|
||||
fmt.Fprintf(w, "# TYPE telemetry_installs_success_total counter\n")
|
||||
fmt.Fprintf(w, "telemetry_installs_success_total %d\n\n", data.SuccessCount)
|
||||
fmt.Fprintf(w, "# HELP telemetry_installs_failed_total Failed installations\n")
|
||||
fmt.Fprintf(w, "# TYPE telemetry_installs_failed_total counter\n")
|
||||
fmt.Fprintf(w, "telemetry_installs_failed_total %d\n\n", data.FailedCount)
|
||||
fmt.Fprintf(w, "# HELP telemetry_installs_pending Current installing count\n")
|
||||
fmt.Fprintf(w, "# TYPE telemetry_installs_pending gauge\n")
|
||||
fmt.Fprintf(w, "telemetry_installs_pending %d\n\n", data.InstallingCount)
|
||||
fmt.Fprintf(w, "# HELP telemetry_success_rate Success rate percentage\n")
|
||||
fmt.Fprintf(w, "# TYPE telemetry_success_rate gauge\n")
|
||||
fmt.Fprintf(w, "telemetry_success_rate %.2f\n", data.SuccessRate)
|
||||
})
|
||||
|
||||
// Dashboard API endpoint (with caching)
|
||||
mux.HandleFunc("/api/dashboard", func(w http.ResponseWriter, r *http.Request) {
|
||||
days := 30
|
||||
if d := r.URL.Query().Get("days"); d != "" {
|
||||
@@ -579,6 +750,16 @@ func main() {
|
||||
ctx, cancel := context.WithTimeout(r.Context(), 10*time.Second)
|
||||
defer cancel()
|
||||
|
||||
// Try cache first
|
||||
cacheKey := fmt.Sprintf("dashboard:%d", days)
|
||||
var data *DashboardData
|
||||
if cfg.CacheEnabled && cache.Get(ctx, cacheKey, &data) {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.Header().Set("X-Cache", "HIT")
|
||||
json.NewEncoder(w).Encode(data)
|
||||
return
|
||||
}
|
||||
|
||||
data, err := pb.FetchDashboardData(ctx, days)
|
||||
if err != nil {
|
||||
log.Printf("dashboard fetch failed: %v", err)
|
||||
@@ -586,10 +767,86 @@ func main() {
|
||||
return
|
||||
}
|
||||
|
||||
// Cache the result
|
||||
if cfg.CacheEnabled {
|
||||
_ = cache.Set(ctx, cacheKey, data, cfg.CacheTTL)
|
||||
}
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.Header().Set("X-Cache", "MISS")
|
||||
json.NewEncoder(w).Encode(data)
|
||||
})
|
||||
|
||||
// Paginated records API
|
||||
mux.HandleFunc("/api/records", func(w http.ResponseWriter, r *http.Request) {
|
||||
page := 1
|
||||
limit := 50
|
||||
status := r.URL.Query().Get("status")
|
||||
app := r.URL.Query().Get("app")
|
||||
osType := r.URL.Query().Get("os")
|
||||
|
||||
if p := r.URL.Query().Get("page"); p != "" {
|
||||
fmt.Sscanf(p, "%d", &page)
|
||||
if page < 1 {
|
||||
page = 1
|
||||
}
|
||||
}
|
||||
if l := r.URL.Query().Get("limit"); l != "" {
|
||||
fmt.Sscanf(l, "%d", &limit)
|
||||
if limit < 1 {
|
||||
limit = 1
|
||||
}
|
||||
if limit > 100 {
|
||||
limit = 100
|
||||
}
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(r.Context(), 10*time.Second)
|
||||
defer cancel()
|
||||
|
||||
records, total, err := pb.FetchRecordsPaginated(ctx, page, limit, status, app, osType)
|
||||
if err != nil {
|
||||
log.Printf("records fetch failed: %v", err)
|
||||
http.Error(w, "failed to fetch records", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
response := map[string]interface{}{
|
||||
"records": records,
|
||||
"page": page,
|
||||
"limit": limit,
|
||||
"total": total,
|
||||
"total_pages": (total + limit - 1) / limit,
|
||||
}
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
json.NewEncoder(w).Encode(response)
|
||||
})
|
||||
|
||||
// Alert history and test endpoints
|
||||
mux.HandleFunc("/api/alerts", func(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
json.NewEncoder(w).Encode(map[string]interface{}{
|
||||
"enabled": cfg.AlertEnabled,
|
||||
"history": alerter.GetAlertHistory(),
|
||||
})
|
||||
})
|
||||
|
||||
mux.HandleFunc("/api/alerts/test", func(w http.ResponseWriter, r *http.Request) {
|
||||
if r.Method != http.MethodPost {
|
||||
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
|
||||
return
|
||||
}
|
||||
|
||||
if err := alerter.TestAlert(); err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
w.WriteHeader(http.StatusOK)
|
||||
w.Write([]byte("test alert sent"))
|
||||
})
|
||||
|
||||
mux.HandleFunc("/telemetry", func(w http.ResponseWriter, r *http.Request) {
|
||||
if r.Method != http.MethodPost {
|
||||
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
|
||||
@@ -741,6 +998,18 @@ func envBool(k string, def bool) bool {
|
||||
}
|
||||
return v == "1" || v == "true" || v == "yes" || v == "on"
|
||||
}
|
||||
func envFloat(k string, def float64) float64 {
|
||||
v := os.Getenv(k)
|
||||
if v == "" {
|
||||
return def
|
||||
}
|
||||
var f float64
|
||||
_, _ = fmt.Sscanf(v, "%f", &f)
|
||||
if f == 0 && v != "0" {
|
||||
return def
|
||||
}
|
||||
return f
|
||||
}
|
||||
func splitCSV(s string) []string {
|
||||
s = strings.TrimSpace(s)
|
||||
if s == "" {
|
||||
|
||||
Reference in New Issue
Block a user