// A bridge between ntfy and Alertmanager package main import ( "context" "crypto/sha512" "crypto/subtle" "crypto/tls" "crypto/x509" _ "embed" "encoding/base64" "encoding/hex" "encoding/json" "errors" "flag" "fmt" "log/slog" "net/http" "os" "os/signal" "slices" "strings" "syscall" "time" "git.xenrox.net/~xenrox/go-utils/logging" "git.xenrox.net/~xenrox/ntfy-alertmanager/cache" "git.xenrox.net/~xenrox/ntfy-alertmanager/config" "golang.org/x/text/cases" "golang.org/x/text/language" ) var version = "dev" type bridge struct { cfg *config.Config logger *slog.Logger cache cache.Cache client *httpClient } type payload struct { Status string `json:"status"` Alerts []alert `json:"alerts"` GroupLabels map[string]string `json:"groupLabels"` CommonLabels map[string]string `json:"commonLabels"` CommonAnnotations map[string]string `json:"commonAnnotations"` ExternalURL string `json:"externalURL"` } type alert struct { Status string `json:"status"` Labels map[string]string `json:"labels"` Annotations map[string]string `json:"annotations"` Fingerprint string `json:"fingerprint"` } type notification struct { title string body string priority string tags string icon string emailAddress string call string silenceBody string fingerprint string status string } type ntfyError struct { Error string `json:"error"` } func (br *bridge) singleAlertNotifications(p *payload) []*notification { var notifications []*notification for _, alert := range p.Alerts { contains, err := br.cache.Contains(alert.Fingerprint, alert.Status) if err != nil { br.logger.Error("Failed to lookup alert in cache", slog.String("fingerprint", alert.Fingerprint), slog.String("error", err.Error())) } if contains { br.logger.Debug("Alert skipped: Still in cache", slog.String("fingerprint", alert.Fingerprint)) continue } n := new(notification) n.fingerprint = alert.Fingerprint n.status = alert.Status // create title n.title = fmt.Sprintf("[%s]", strings.ToUpper(alert.Status)) if name, ok := alert.Labels["alertname"]; ok { n.title = fmt.Sprintf("%s %s", n.title, name) } for _, value := range p.GroupLabels { n.title = fmt.Sprintf("%s %s", n.title, value) } // create body n.body = "Labels:\n" sortedLabelKeys := sortKeys(alert.Labels) for _, key := range sortedLabelKeys { n.body = fmt.Sprintf("%s%s = %s\n", n.body, key, alert.Labels[key]) } n.body += "\nAnnotations:\n" for key, value := range alert.Annotations { n.body = fmt.Sprintf("%s%s = %s\n", n.body, key, value) } var tags []string if alert.Status == "resolved" { tags = append(tags, br.cfg.Resolved.Tags...) n.icon = br.cfg.Resolved.Icon n.priority = br.cfg.Resolved.Priority } n.emailAddress = br.cfg.Ntfy.EmailAddress n.call = br.cfg.Ntfy.Call for _, labelName := range br.cfg.Labels.Order { val, ok := alert.Labels[labelName] if !ok { continue } labelConfig, ok := br.cfg.Labels.Label[fmt.Sprintf("%s:%s", labelName, val)] if !ok { continue } if n.priority == "" { n.priority = labelConfig.Priority } if n.icon == "" { n.icon = labelConfig.Icon } if n.emailAddress == "" { n.emailAddress = labelConfig.EmailAddress } if n.call == "" { n.call = labelConfig.Call } for _, val := range labelConfig.Tags { if !slices.Contains(tags, val) { tags = append(tags, val) } } } n.tags = strings.Join(tags, ",") if br.cfg.Am.SilenceDuration != 0 && alert.Status == "firing" { if br.cfg.BaseURL == "" { br.logger.Error("Failed to create silence action: No base-url set") } else { // I could not convince ntfy to accept an Action with a body which contains // a json with more than one key. Instead the json will be base64 encoded // and sent to the ntfy-alertmanager silences endpoint, that operates as // a proxy and will do the Alertmanager API request. s := &silenceBody{AlertManagerURL: p.ExternalURL, Labels: alert.Labels} b, err := json.Marshal(s) if err != nil { br.logger.Error("Failed to create silence action", slog.String("error", err.Error())) } n.silenceBody = base64.StdEncoding.EncodeToString(b) } } notifications = append(notifications, n) } return notifications } func (br *bridge) multiAlertNotification(p *payload) *notification { n := new(notification) // create title count := len(p.Alerts) title := fmt.Sprintf("[%s", strings.ToUpper(p.Status)) if p.Status == "firing" { title = fmt.Sprintf("%s:%d", title, count) } title += "]" for _, value := range p.GroupLabels { title = fmt.Sprintf("%s %s", title, value) } n.title = title // create body var body string c := cases.Title(language.English) for _, alert := range p.Alerts { alertBody := fmt.Sprintf("%s\nLabels:\n", c.String(alert.Status)) sortedLabelKeys := sortKeys(alert.Labels) for _, key := range sortedLabelKeys { alertBody = fmt.Sprintf("%s%s = %s\n", alertBody, key, alert.Labels[key]) } alertBody += "Annotations:\n" for key, value := range alert.Annotations { alertBody = fmt.Sprintf("%s%s = %s\n", alertBody, key, value) } alertBody += "\n" body += alertBody } n.body = body var tags []string if p.Status == "resolved" { tags = append(tags, br.cfg.Resolved.Tags...) n.icon = br.cfg.Resolved.Icon n.priority = br.cfg.Resolved.Priority } n.emailAddress = br.cfg.Ntfy.EmailAddress n.call = br.cfg.Ntfy.Call for _, labelName := range br.cfg.Labels.Order { val, ok := p.CommonLabels[labelName] if !ok { continue } labelConfig, ok := br.cfg.Labels.Label[fmt.Sprintf("%s:%s", labelName, val)] if !ok { continue } if n.priority == "" { n.priority = labelConfig.Priority } if n.icon == "" { n.icon = labelConfig.Icon } if n.emailAddress == "" { n.emailAddress = labelConfig.EmailAddress } if n.call == "" { n.call = labelConfig.Call } for _, val := range labelConfig.Tags { if !slices.Contains(tags, val) { tags = append(tags, val) } } } n.tags = strings.Join(tags, ",") if br.cfg.Am.SilenceDuration != 0 && p.Status == "firing" { if br.cfg.BaseURL == "" { br.logger.Error("Failed to create silence action: No base-url set") } else { s := &silenceBody{AlertManagerURL: p.ExternalURL, Labels: p.CommonLabels} b, err := json.Marshal(s) if err != nil { br.logger.Error("Failed to create silence action", slog.String("error", err.Error())) } n.silenceBody = base64.StdEncoding.EncodeToString(b) } } return n } func (br *bridge) publish(n *notification) error { req, err := http.NewRequest(http.MethodPost, br.cfg.Ntfy.Topic, strings.NewReader(n.body)) if err != nil { return err } // ntfy authentication if br.cfg.Ntfy.Password != "" && br.cfg.Ntfy.User != "" { req.SetBasicAuth(br.cfg.Ntfy.User, br.cfg.Ntfy.Password) } else if br.cfg.Ntfy.AccessToken != "" { req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", br.cfg.Ntfy.AccessToken)) } req.Header.Set("X-Title", n.title) if n.priority != "" { req.Header.Set("X-Priority", n.priority) } if n.icon != "" { req.Header.Set("X-Icon", n.icon) } if n.tags != "" { req.Header.Set("X-Tags", n.tags) } if n.emailAddress != "" { req.Header.Set("X-Email", n.emailAddress) } if n.call != "" { req.Header.Set("X-Call", n.call) } if n.silenceBody != "" { url := br.cfg.BaseURL + "/silences" var authString string if br.cfg.User != "" && br.cfg.Password != "" { auth := base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf("%s:%s", br.cfg.User, br.cfg.Password))) authString = fmt.Sprintf(", headers.Authorization=Basic %s", auth) } req.Header.Set("Actions", fmt.Sprintf("http, Silence, %s, method=POST, body=%s%s", url, n.silenceBody, authString)) } configFingerprint := br.cfg.Ntfy.CertFingerprint if configFingerprint != "" { tlsCfg := &tls.Config{} tlsCfg.VerifyPeerCertificate = func(rawCerts [][]byte, verifiedChains [][]*x509.Certificate) error { for _, rawCert := range rawCerts { hash := sha512.Sum512(rawCert) if hex.EncodeToString(hash[:]) == configFingerprint { return nil } } if len(rawCerts) == 0 { return errors.New("the ntfy server does not offer a certificate") } hash := sha512.Sum512(rawCerts[0]) var expectedFingerprint string for i, b := range hash { if i != 0 { expectedFingerprint += ":" } expectedFingerprint += fmt.Sprintf("%02X", b) } return fmt.Errorf("the ntfy certificate fingerprint (%s) is not set in the config", expectedFingerprint) } tlsCfg.InsecureSkipVerify = true br.client.Transport = &http.Transport{TLSClientConfig: tlsCfg} } resp, err := br.client.Do(req) if err != nil { return err } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { var ntfyError ntfyError if err := json.NewDecoder(resp.Body).Decode(&ntfyError); err != nil { br.logger.Debug("Publish: Failed to decode error", slog.String("error", err.Error())) return fmt.Errorf("ntfy: received status code %d", resp.StatusCode) } return fmt.Errorf("ntfy: %s (status code %d)", ntfyError.Error, resp.StatusCode) } return nil } func (br *bridge) handleWebhooks(w http.ResponseWriter, r *http.Request) { logger := br.logger.With(slog.String("handler", "/")) if r.Method != http.MethodPost { http.Error(w, "Only POST allowed", http.StatusMethodNotAllowed) logger.Debug(fmt.Sprintf("Illegal HTTP method: expected %q, got %q", "POST", r.Method)) return } contentType := r.Header.Get("Content-Type") if contentType != "application/json" { http.Error(w, "Only application/json allowed", http.StatusUnsupportedMediaType) logger.Debug(fmt.Sprintf("Illegal content type: %s", contentType)) return } var event payload if err := json.NewDecoder(r.Body).Decode(&event); err != nil { http.Error(w, "Failed to parse payload", http.StatusInternalServerError) logger.Debug("Failed to decode payload", slog.String("error", err.Error())) return } logger.Debug("Received alert", slog.Any("payload", event)) if br.cfg.AlertMode == config.Single { notifications := br.singleAlertNotifications(&event) for _, n := range notifications { err := br.publish(n) if err != nil { logger.Error("Failed to publish notification", slog.String("error", err.Error())) } else { if err := br.cache.Set(n.fingerprint, n.status); err != nil { logger.Error("Failed to cache alert", slog.String("fingerprint", n.fingerprint), slog.String("error", err.Error())) } } } } else { notification := br.multiAlertNotification(&event) err := br.publish(notification) if err != nil { logger.Error("Failed to publish notification", slog.String("error", err.Error())) } } } func (br *bridge) corsMiddleware(handler http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Access-Control-Allow-Origin", "*") w.Header().Set("Access-Control-Allow-Methods", "POST, OPTIONS") w.Header().Set("Access-Control-Allow-Headers", "Content-Type, Authorization") handler.ServeHTTP(w, r) }) } func (br *bridge) authMiddleware(handler http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { logger := br.logger.With(slog.String("url", r.URL.String())) user, pass, ok := r.BasicAuth() if !ok { logger.Debug("basic auth failure") return } inputUserHash := sha512.Sum512([]byte(user)) inputPassHash := sha512.Sum512([]byte(pass)) configUserHash := sha512.Sum512([]byte(br.cfg.User)) configPassHash := sha512.Sum512([]byte(br.cfg.Password)) validUser := subtle.ConstantTimeCompare(inputUserHash[:], configUserHash[:]) validPass := subtle.ConstantTimeCompare(inputPassHash[:], configPassHash[:]) if validUser != 1 || validPass != 1 { http.Error(w, "Unauthorized", http.StatusUnauthorized) logger.Debug("basic auth: wrong user or password") return } handler.ServeHTTP(w, r) }) } func (br *bridge) runCleanup(ctx context.Context) { for { select { case <-time.After(br.cfg.Cache.CleanupInterval): br.logger.Info("Pruning cache") br.cache.Cleanup() case <-ctx.Done(): return } } } func main() { var configPath string flag.StringVar(&configPath, "config", "/etc/ntfy-alertmanager/config", "config file path") var showVersion bool flag.BoolVar(&showVersion, "version", false, "Show version and exit") flag.Parse() if showVersion { fmt.Println(version) os.Exit(0) } cfg, err := config.ReadConfig(configPath) if err != nil { slog.Error("Failed to read config", slog.String("error", err.Error())) os.Exit(1) } logger, err := logging.New(cfg.LogLevel, cfg.LogFormat, os.Stderr) if err != nil { slog.Error("Failed to create logger", slog.String("error", err.Error())) os.Exit(1) } ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) defer stop() client := &httpClient{&http.Client{Timeout: time.Second * 3}} c, err := cache.NewCache(cfg.Cache) if err != nil { logger.Error("Failed to create cache", slog.String("error", err.Error())) os.Exit(1) } bridge := &bridge{cfg: cfg, logger: logger, cache: c, client: client} logger.Info(fmt.Sprintf("Listening on %s, ntfy-alertmanager %s", cfg.HTTPAddress, version)) mux := http.NewServeMux() mux.HandleFunc("/", bridge.handleWebhooks) mux.HandleFunc("/silences", bridge.handleSilences) httpServer := &http.Server{ Addr: cfg.HTTPAddress, Handler: mux, } if cfg.User != "" && cfg.Password != "" { logger.Info("Enabling HTTP Basic Authentication") httpServer.Handler = bridge.authMiddleware(mux) } httpServer.Handler = bridge.corsMiddleware(httpServer.Handler) if _, ok := c.(*cache.MemoryCache); ok { go bridge.runCleanup(ctx) } go func() { err = httpServer.ListenAndServe() if err != nil && err != http.ErrServerClosed { logger.Error("Failed to start HTTP server", slog.String("error", err.Error())) os.Exit(1) } }() <-ctx.Done() stop() httpShutdownContext, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() err = httpServer.Shutdown(httpShutdownContext) if err != nil { logger.Error("Failed to shutdown HTTP server", slog.String("error", err.Error())) } }