cache: Move to own interface package
With this interface it will be easy to support multiple cache types.
This commit is contained in:
parent
5a8a85f0f9
commit
d35b77fbc9
4 changed files with 85 additions and 70 deletions
63
cache.go
63
cache.go
|
@ -1,63 +0,0 @@
|
||||||
package main
|
|
||||||
|
|
||||||
import (
|
|
||||||
"sync"
|
|
||||||
"time"
|
|
||||||
)
|
|
||||||
|
|
||||||
type fingerprint string
|
|
||||||
type status string
|
|
||||||
|
|
||||||
type cachedAlert struct {
|
|
||||||
expires time.Time
|
|
||||||
status status
|
|
||||||
}
|
|
||||||
|
|
||||||
type cache struct {
|
|
||||||
mu sync.Mutex
|
|
||||||
duration time.Duration
|
|
||||||
alerts map[fingerprint]*cachedAlert
|
|
||||||
}
|
|
||||||
|
|
||||||
func (a *cachedAlert) expired() bool {
|
|
||||||
return a.expires.Before(time.Now())
|
|
||||||
}
|
|
||||||
|
|
||||||
func newCache(d time.Duration) *cache {
|
|
||||||
c := new(cache)
|
|
||||||
c.duration = d
|
|
||||||
c.alerts = make(map[fingerprint]*cachedAlert)
|
|
||||||
|
|
||||||
return c
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *cache) cleanup() {
|
|
||||||
c.mu.Lock()
|
|
||||||
defer c.mu.Unlock()
|
|
||||||
for key, value := range c.alerts {
|
|
||||||
if value.expired() {
|
|
||||||
delete(c.alerts, key)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *cache) set(f fingerprint, s status) {
|
|
||||||
c.mu.Lock()
|
|
||||||
defer c.mu.Unlock()
|
|
||||||
alert := new(cachedAlert)
|
|
||||||
alert.expires = time.Now().Add(c.duration)
|
|
||||||
alert.status = s
|
|
||||||
|
|
||||||
c.alerts[f] = alert
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *cache) contains(f fingerprint, s status) bool {
|
|
||||||
c.mu.Lock()
|
|
||||||
defer c.mu.Unlock()
|
|
||||||
alert, ok := c.alerts[f]
|
|
||||||
if ok {
|
|
||||||
return alert.status == s
|
|
||||||
}
|
|
||||||
|
|
||||||
return false
|
|
||||||
}
|
|
9
cache/cache.go
vendored
Normal file
9
cache/cache.go
vendored
Normal file
|
@ -0,0 +1,9 @@
|
||||||
|
// Package cache includes a memory cache for ntfy-alertmanager.
|
||||||
|
package cache
|
||||||
|
|
||||||
|
// Cache is the interface that describes a cache for ntfy-alertmanager.
|
||||||
|
type Cache interface {
|
||||||
|
Set(fingerprint string, status string)
|
||||||
|
Contains(fingerprint string, status string) bool
|
||||||
|
Cleanup()
|
||||||
|
}
|
67
cache/memory.go
vendored
Normal file
67
cache/memory.go
vendored
Normal file
|
@ -0,0 +1,67 @@
|
||||||
|
package cache
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// MemoryCache is the in-memory cache.
|
||||||
|
type MemoryCache struct {
|
||||||
|
mu sync.Mutex
|
||||||
|
duration time.Duration
|
||||||
|
alerts map[string]*cachedAlert
|
||||||
|
}
|
||||||
|
|
||||||
|
type cachedAlert struct {
|
||||||
|
expires time.Time
|
||||||
|
status string
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewMemoryCache creates a in-memory cache.
|
||||||
|
func NewMemoryCache(d time.Duration) Cache {
|
||||||
|
c := new(MemoryCache)
|
||||||
|
c.duration = d
|
||||||
|
c.alerts = make(map[string]*cachedAlert)
|
||||||
|
|
||||||
|
return c
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set saves an alert in the cache.
|
||||||
|
func (c *MemoryCache) Set(fingerprint string, status string) {
|
||||||
|
c.mu.Lock()
|
||||||
|
defer c.mu.Unlock()
|
||||||
|
alert := new(cachedAlert)
|
||||||
|
alert.expires = time.Now().Add(c.duration)
|
||||||
|
alert.status = status
|
||||||
|
|
||||||
|
c.alerts[fingerprint] = alert
|
||||||
|
}
|
||||||
|
|
||||||
|
// Contains checks if an alert with a given fingerprint is in the cache
|
||||||
|
// and checks if the status matches.
|
||||||
|
func (c *MemoryCache) Contains(fingerprint string, status string) bool {
|
||||||
|
c.mu.Lock()
|
||||||
|
defer c.mu.Unlock()
|
||||||
|
alert, ok := c.alerts[fingerprint]
|
||||||
|
if ok {
|
||||||
|
return alert.status == status
|
||||||
|
}
|
||||||
|
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *cachedAlert) expired() bool {
|
||||||
|
return a.expires.Before(time.Now())
|
||||||
|
}
|
||||||
|
|
||||||
|
// Cleanup iterates over all alerts in the cache and removes them, if they
|
||||||
|
// are expired.
|
||||||
|
func (c *MemoryCache) Cleanup() {
|
||||||
|
c.mu.Lock()
|
||||||
|
defer c.mu.Unlock()
|
||||||
|
for key, value := range c.alerts {
|
||||||
|
if value.expired() {
|
||||||
|
delete(c.alerts, key)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
16
main.go
16
main.go
|
@ -15,6 +15,7 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.xenrox.net/~xenrox/go-log"
|
"git.xenrox.net/~xenrox/go-log"
|
||||||
|
"git.xenrox.net/~xenrox/ntfy-alertmanager/cache"
|
||||||
"golang.org/x/text/cases"
|
"golang.org/x/text/cases"
|
||||||
"golang.org/x/text/language"
|
"golang.org/x/text/language"
|
||||||
)
|
)
|
||||||
|
@ -24,7 +25,7 @@ var version = "dev"
|
||||||
type bridge struct {
|
type bridge struct {
|
||||||
cfg *config
|
cfg *config
|
||||||
logger *log.Logger
|
logger *log.Logger
|
||||||
cache *cache
|
cache cache.Cache
|
||||||
client *httpClient
|
client *httpClient
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -41,7 +42,7 @@ type alert struct {
|
||||||
Status string `json:"status"`
|
Status string `json:"status"`
|
||||||
Labels map[string]string `json:"labels"`
|
Labels map[string]string `json:"labels"`
|
||||||
Annotations map[string]string `json:"annotations"`
|
Annotations map[string]string `json:"annotations"`
|
||||||
Fingerprint fingerprint `json:"fingerprint"`
|
Fingerprint string `json:"fingerprint"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type notification struct {
|
type notification struct {
|
||||||
|
@ -51,14 +52,14 @@ type notification struct {
|
||||||
tags string
|
tags string
|
||||||
icon string
|
icon string
|
||||||
silenceBody string
|
silenceBody string
|
||||||
fingerprint fingerprint
|
fingerprint string
|
||||||
status string
|
status string
|
||||||
}
|
}
|
||||||
|
|
||||||
func (br *bridge) singleAlertNotifications(p *payload) []*notification {
|
func (br *bridge) singleAlertNotifications(p *payload) []*notification {
|
||||||
var notifications []*notification
|
var notifications []*notification
|
||||||
for _, alert := range p.Alerts {
|
for _, alert := range p.Alerts {
|
||||||
if br.cache.contains(alert.Fingerprint, status(alert.Status)) {
|
if br.cache.Contains(alert.Fingerprint, alert.Status) {
|
||||||
br.logger.Debugf("Alert %s skipped: Still in cache", alert.Fingerprint)
|
br.logger.Debugf("Alert %s skipped: Still in cache", alert.Fingerprint)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
@ -323,7 +324,7 @@ func (br *bridge) handleWebhooks(w http.ResponseWriter, r *http.Request) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
br.logger.Errorf("Failed to publish notification: %v", err)
|
br.logger.Errorf("Failed to publish notification: %v", err)
|
||||||
} else {
|
} else {
|
||||||
br.cache.set(n.fingerprint, status(n.status))
|
br.cache.Set(n.fingerprint, n.status)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -365,7 +366,7 @@ func (br *bridge) runCleanup() {
|
||||||
for {
|
for {
|
||||||
time.Sleep(br.cfg.cache.CleanupInterval)
|
time.Sleep(br.cfg.cache.CleanupInterval)
|
||||||
br.logger.Info("Pruning cache")
|
br.logger.Info("Pruning cache")
|
||||||
br.cache.cleanup()
|
br.cache.Cleanup()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -394,7 +395,8 @@ func main() {
|
||||||
|
|
||||||
client := &httpClient{&http.Client{Timeout: time.Second * 3}}
|
client := &httpClient{&http.Client{Timeout: time.Second * 3}}
|
||||||
|
|
||||||
bridge := &bridge{cfg: cfg, logger: logger, cache: newCache(cfg.cache.Duration), client: client}
|
c := cache.NewMemoryCache(cfg.cache.Duration)
|
||||||
|
bridge := &bridge{cfg: cfg, logger: logger, cache: c, client: client}
|
||||||
|
|
||||||
logger.Infof("Listening on %s, ntfy-alertmanager %s", cfg.HTTPAddress, version)
|
logger.Infof("Listening on %s, ntfy-alertmanager %s", cfg.HTTPAddress, version)
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue