cache: Support redis

Closes: https://todo.xenrox.net/~xenrox/ntfy-alertmanager/11
This commit is contained in:
Thorben Günther 2023-03-08 22:05:15 +01:00
parent 652d46d972
commit 054c163ffb
No known key found for this signature in database
GPG key ID: 415CD778D8C5AFED
9 changed files with 114 additions and 15 deletions

View file

@ -96,12 +96,16 @@ alertmanager {
# When the alert-mode is set to single, ntfy-alertmanager will cache each single alert
# to avoid sending recurrences.
cache {
# The type of cache that will be used (default is memory).
# The type of cache that will be used (either memory or redis; default is memory).
type memory
# How long messages stay in the cache for
duration 24h
# Memory cache settings
# Interval in which the cache is cleaned up
cleanup-interval 1h
# Redis cache settings
}
```

4
cache/cache.go vendored
View file

@ -3,7 +3,7 @@ package cache
// Cache is the interface that describes a cache for ntfy-alertmanager.
type Cache interface {
Set(fingerprint string, status string)
Contains(fingerprint string, status string) bool
Set(fingerprint string, status string) error
Contains(fingerprint string, status string) (bool, error)
Cleanup()
}

11
cache/memory.go vendored
View file

@ -27,7 +27,7 @@ func NewMemoryCache(d time.Duration) Cache {
}
// Set saves an alert in the cache.
func (c *MemoryCache) Set(fingerprint string, status string) {
func (c *MemoryCache) Set(fingerprint string, status string) error {
c.mu.Lock()
defer c.mu.Unlock()
alert := new(cachedAlert)
@ -35,19 +35,20 @@ func (c *MemoryCache) Set(fingerprint string, status string) {
alert.status = status
c.alerts[fingerprint] = alert
return nil
}
// Contains checks if an alert with a given fingerprint is in the cache
// and checks if the status matches.
func (c *MemoryCache) Contains(fingerprint string, status string) bool {
// and if the status matches.
func (c *MemoryCache) Contains(fingerprint string, status string) (bool, error) {
c.mu.Lock()
defer c.mu.Unlock()
alert, ok := c.alerts[fingerprint]
if ok {
return alert.status == status
return alert.status == status, nil
}
return false
return false, nil
}
func (a *cachedAlert) expired() bool {

51
cache/redis.go vendored Normal file
View file

@ -0,0 +1,51 @@
package cache
import (
"context"
"time"
"github.com/redis/go-redis/v9"
)
// RedisCache is the redis cache.
type RedisCache struct {
client *redis.Client
duration time.Duration
}
// NewRedisCache creates a new redis cache/client.
func NewRedisCache(redisURL string, d time.Duration) (Cache, error) {
c := new(RedisCache)
ropts, err := redis.ParseURL(redisURL)
if err != nil {
return nil, err
}
rdb := redis.NewClient(ropts)
c.client = rdb
c.duration = d
return c, nil
}
// Set saves an alert in the cache.
func (c *RedisCache) Set(fingerprint string, status string) error {
return c.client.SetEx(context.Background(), fingerprint, status, c.duration).Err()
}
// Contains checks if an alert with a given fingerprint is in the cache
// and if the status matches.
func (c *RedisCache) Contains(fingerprint string, status string) (bool, error) {
val, err := c.client.Get(context.Background(), fingerprint).Result()
if err == redis.Nil {
return false, nil
} else if err != nil {
return false, err
}
return val == status, nil
}
// Cleanup is an empty function that is simply here to implement the interface.
// Redis does its own cleanup.
func (c *RedisCache) Cleanup() {}

View file

@ -20,6 +20,7 @@ type cacheType int
const (
memory cacheType = iota
redis
)
type config struct {
@ -256,6 +257,8 @@ func readConfig(path string) (*config, error) {
switch strings.ToLower(cacheType) {
case "memory":
config.cache.Type = memory
case "redis":
config.cache.Type = redis
default:
return nil, fmt.Errorf("cache: illegal type %q", cacheType)
}

View file

@ -54,6 +54,7 @@ alertmanager {
}
cache {
type redis
duration 48h
}
`
@ -73,7 +74,11 @@ cache {
"instance:example.com": {Tags: []string{"computer", "example"}},
},
},
cache: cacheConfig{CleanupInterval: time.Hour, Duration: 48 * time.Hour},
cache: cacheConfig{
Type: redis,
CleanupInterval: time.Hour,
Duration: 48 * time.Hour,
},
am: alertmanagerConfig{
SilenceDuration: time.Hour * 24,
User: "user",

7
go.mod
View file

@ -5,7 +5,12 @@ go 1.19
require (
git.sr.ht/~emersion/go-scfg v0.0.0-20211215104734-c2c7a15d6c99
git.xenrox.net/~xenrox/go-log v0.0.0-20221012231312-9e7356c29b74
github.com/redis/go-redis/v9 v9.0.2
golang.org/x/text v0.7.0
)
require github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect
require (
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect
)

11
go.sum
View file

@ -2,9 +2,20 @@ git.sr.ht/~emersion/go-scfg v0.0.0-20211215104734-c2c7a15d6c99 h1:1s8n5uisqkR+Bz
git.sr.ht/~emersion/go-scfg v0.0.0-20211215104734-c2c7a15d6c99/go.mod h1:t+Ww6SR24yYnXzEWiNlOY0AFo5E9B73X++10lrSpp4U=
git.xenrox.net/~xenrox/go-log v0.0.0-20221012231312-9e7356c29b74 h1:t/52xLRU4IHd2O1nkb9fcUE6K95/KdBtdQYHT31szps=
git.xenrox.net/~xenrox/go-log v0.0.0-20221012231312-9e7356c29b74/go.mod h1:d98WFDHGpxaEThKue5CfGtr9OrWgbaApprt3GH+OM4s=
github.com/bsm/ginkgo/v2 v2.5.0 h1:aOAnND1T40wEdAtkGSkvSICWeQ8L3UASX7YVCqQx+eQ=
github.com/bsm/gomega v1.20.0 h1:JhAwLmtRzXFTx2AkALSLa8ijZafntmhSoU63Ok18Uq8=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 h1:El6M4kTTCOh6aBiKaUGG7oYTSPP8MxqL4YI3kZKwcP4=
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510/go.mod h1:pupxD2MaaD3pAXIBCelhxNneeOaAeabZDe5s4K6zSpQ=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/redis/go-redis/v9 v9.0.2 h1:BA426Zqe/7r56kCcvxYLWe1mkaz71LKF77GwgFzSxfE=
github.com/redis/go-redis/v9 v9.0.2/go.mod h1:/xDTe9EF1LM61hek62Poq2nzQSGj0xSrEtEHbBQevps=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
golang.org/x/text v0.7.0 h1:4BRB4x83lYWy72KwLD/qYDuTu7q9PjSagHvijDw7cLo=
golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=

29
main.go
View file

@ -59,8 +59,12 @@ type notification struct {
func (br *bridge) singleAlertNotifications(p *payload) []*notification {
var notifications []*notification
for _, alert := range p.Alerts {
if br.cache.Contains(alert.Fingerprint, alert.Status) {
br.logger.Debugf("Alert %s skipped: Still in cache", alert.Fingerprint)
contains, err := br.cache.Contains(alert.Fingerprint, alert.Status)
if err != nil {
br.logger.Errorf("Failed to lookup alert %q in cache: %v", alert.Fingerprint, err)
}
if contains {
br.logger.Debugf("Alert %q skipped: Still in cache", alert.Fingerprint)
continue
}
@ -324,7 +328,9 @@ func (br *bridge) handleWebhooks(w http.ResponseWriter, r *http.Request) {
if err != nil {
br.logger.Errorf("Failed to publish notification: %v", err)
} else {
br.cache.Set(n.fingerprint, n.status)
if err := br.cache.Set(n.fingerprint, n.status); err != nil {
br.logger.Errorf("Failed to set alert %q in cache: %v", n.fingerprint, err)
}
}
}
} else {
@ -395,7 +401,18 @@ func main() {
client := &httpClient{&http.Client{Timeout: time.Second * 3}}
c := cache.NewMemoryCache(cfg.cache.Duration)
var c cache.Cache
switch cfg.cache.Type {
case memory:
c = cache.NewMemoryCache(cfg.cache.Duration)
case redis:
var err error
// TODO: Read URL from config
c, err = cache.NewRedisCache("redis://localhost:6379", cfg.cache.Duration)
if err != nil {
logger.Fatalf("Failed to create redis cache: %v", err)
}
}
bridge := &bridge{cfg: cfg, logger: logger, cache: c, client: client}
logger.Infof("Listening on %s, ntfy-alertmanager %s", cfg.HTTPAddress, version)
@ -409,6 +426,8 @@ func main() {
http.HandleFunc("/silences", bridge.handleSilences)
}
go bridge.runCleanup()
if cfg.cache.Type == memory {
go bridge.runCleanup()
}
logger.Fatal(http.ListenAndServe(cfg.HTTPAddress, nil))
}