diff options
| author | Paul Buetow <paul@buetow.org> | 2026-03-13 09:29:59 +0200 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2026-03-13 09:29:59 +0200 |
| commit | d8f88d455990636bb797643dee7d39a95bbbd62c (patch) | |
| tree | 8c8447fc975ec6deebe48218d27e3defa1b3dcce /internal/mapr/client/session_state.go | |
| parent | 7a79d0a8bf58b05dfbae331d00275739530b9584 (diff) | |
task 4abe7505: reset dmap generation state
Diffstat (limited to 'internal/mapr/client/session_state.go')
| -rw-r--r-- | internal/mapr/client/session_state.go | 95 |
1 files changed, 95 insertions, 0 deletions
diff --git a/internal/mapr/client/session_state.go b/internal/mapr/client/session_state.go new file mode 100644 index 0000000..1983644 --- /dev/null +++ b/internal/mapr/client/session_state.go @@ -0,0 +1,95 @@ +package client + +import ( + "fmt" + "sync" + + "github.com/mimecast/dtail/internal/mapr" +) + +// SessionSnapshot captures the current client-side mapreduce session state. +type SessionSnapshot struct { + Generation uint64 + Query *mapr.Query + GlobalGroup *mapr.GlobalGroupSet + LastResult string +} + +// SessionState keeps the mutable mapreduce query state shared by the client +// reporter and per-server handlers. +type SessionState struct { + mu sync.RWMutex + generation uint64 + query *mapr.Query + global *mapr.GlobalGroupSet + lastResult string + changedCh chan struct{} +} + +// NewSessionState returns a new shared mapreduce session state. +func NewSessionState(query *mapr.Query) *SessionState { + return &SessionState{ + query: query, + global: mapr.NewGlobalGroupSet(), + changedCh: make(chan struct{}, 1), + } +} + +// Snapshot returns a point-in-time copy of the shared mapreduce state. +func (s *SessionState) Snapshot() SessionSnapshot { + s.mu.RLock() + defer s.mu.RUnlock() + + return SessionSnapshot{ + Generation: s.generation, + Query: s.query, + GlobalGroup: s.global, + LastResult: s.lastResult, + } +} + +// Changes returns a channel that is signaled whenever a new generation is committed. +func (s *SessionState) Changes() <-chan struct{} { + return s.changedCh +} + +// CommitQuery resets the shared aggregation state for a newly accepted query generation. +func (s *SessionState) CommitQuery(rawQuery string, generation uint64) (*mapr.Query, error) { + query, err := mapr.NewQuery(rawQuery) + if err != nil { + return nil, fmt.Errorf("parse session query: %w", err) + } + + s.mu.Lock() + s.generation = generation + s.query = query + s.global = mapr.NewGlobalGroupSet() + s.lastResult = "" + s.mu.Unlock() + + s.notifyChange() + return query, nil +} + +// CommitRenderedResult stores the last rendered result for the active generation. +func (s *SessionState) CommitRenderedResult(generation uint64, result string) (changed bool, ok bool) { + s.mu.Lock() + defer s.mu.Unlock() + + if s.generation != generation { + return false, false + } + if s.lastResult == result { + return false, true + } + + s.lastResult = result + return true, true +} + +func (s *SessionState) notifyChange() { + select { + case s.changedCh <- struct{}{}: + default: + } +} |
