124 lines
3.1 KiB
Go
124 lines
3.1 KiB
Go
package snmp_lookup
|
|
|
|
import (
|
|
"errors"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/alitto/pond"
|
|
"github.com/hashicorp/golang-lru/v2/expirable"
|
|
|
|
"github.com/influxdata/telegraf/config"
|
|
)
|
|
|
|
var ErrNotYetAvailable = errors.New("data not yet available")
|
|
|
|
type store struct {
|
|
cache *expirable.LRU[string, *tagMap]
|
|
pool *pond.WorkerPool
|
|
minUpdateInterval time.Duration
|
|
inflight sync.Map
|
|
deferredUpdates map[string]time.Time
|
|
deferredUpdatesTimer *time.Timer
|
|
notify func(string, *tagMap)
|
|
update func(string) *tagMap
|
|
|
|
sync.Mutex
|
|
}
|
|
|
|
func newStore(size int, ttl config.Duration, workers int, minUpdateInterval config.Duration) *store {
|
|
return &store{
|
|
cache: expirable.NewLRU[string, *tagMap](size, nil, time.Duration(ttl)),
|
|
pool: pond.New(workers, 0, pond.MinWorkers(workers/2+1)),
|
|
deferredUpdates: make(map[string]time.Time),
|
|
minUpdateInterval: time.Duration(minUpdateInterval),
|
|
}
|
|
}
|
|
|
|
func (s *store) addBacklog(agent string, earliest time.Time) {
|
|
s.Lock()
|
|
defer s.Unlock()
|
|
t, found := s.deferredUpdates[agent]
|
|
if !found || t.After(earliest) {
|
|
s.deferredUpdates[agent] = earliest
|
|
s.refreshTimer()
|
|
}
|
|
}
|
|
|
|
func (s *store) removeBacklog(agent string) {
|
|
s.Lock()
|
|
defer s.Unlock()
|
|
delete(s.deferredUpdates, agent)
|
|
s.refreshTimer()
|
|
}
|
|
|
|
func (s *store) refreshTimer() {
|
|
if s.deferredUpdatesTimer != nil {
|
|
s.deferredUpdatesTimer.Stop()
|
|
}
|
|
if len(s.deferredUpdates) == 0 {
|
|
return
|
|
}
|
|
var agent string
|
|
var earliest time.Time
|
|
for k, t := range s.deferredUpdates {
|
|
if agent == "" || t.Before(earliest) {
|
|
agent = k
|
|
earliest = t
|
|
}
|
|
}
|
|
s.deferredUpdatesTimer = time.AfterFunc(time.Until(earliest), func() { s.enqueue(agent) })
|
|
}
|
|
|
|
func (s *store) enqueue(agent string) {
|
|
if _, inflight := s.inflight.LoadOrStore(agent, true); inflight {
|
|
return
|
|
}
|
|
s.pool.Submit(func() {
|
|
entry := s.update(agent)
|
|
s.cache.Add(agent, entry)
|
|
s.removeBacklog(agent)
|
|
s.notify(agent, entry)
|
|
s.inflight.Delete(agent)
|
|
})
|
|
}
|
|
|
|
func (s *store) lookup(agent, index string) {
|
|
entry, cached := s.cache.Get(agent)
|
|
if !cached {
|
|
// There is no cache at all, so we need to enqueue an update.
|
|
s.enqueue(agent)
|
|
return
|
|
}
|
|
|
|
// In case the index does not exist, we need to update the agent as this
|
|
// new index might have been added in the meantime (e.g. after hot-plugging
|
|
// hardware). In any way, we release the metric unresolved to not block
|
|
// ordered operations for long time.
|
|
if _, found := entry.rows[index]; !found {
|
|
// Only update the agent if the user wants to
|
|
if s.minUpdateInterval > 0 {
|
|
if time.Since(entry.created) > s.minUpdateInterval {
|
|
// The minimum time between updates has passed so we are good to
|
|
// directly update the cache.
|
|
s.enqueue(agent)
|
|
return
|
|
}
|
|
// The minimum time between updates has not yet passed so we
|
|
// need to defer the agent update to later.
|
|
s.addBacklog(agent, entry.created.Add(s.minUpdateInterval))
|
|
}
|
|
}
|
|
|
|
s.notify(agent, entry)
|
|
}
|
|
|
|
func (s *store) destroy() {
|
|
s.pool.StopAndWait()
|
|
}
|
|
|
|
func (s *store) purge() {
|
|
s.Lock()
|
|
defer s.Unlock()
|
|
s.cache.Purge()
|
|
}
|