summaryrefslogtreecommitdiff
path: root/internal/mapr/client/session_state.go
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2026-03-13 09:29:59 +0200
committerPaul Buetow <paul@buetow.org>2026-03-13 09:29:59 +0200
commitd8f88d455990636bb797643dee7d39a95bbbd62c (patch)
tree8c8447fc975ec6deebe48218d27e3defa1b3dcce /internal/mapr/client/session_state.go
parent7a79d0a8bf58b05dfbae331d00275739530b9584 (diff)
task 4abe7505: reset dmap generation state
Diffstat (limited to 'internal/mapr/client/session_state.go')
-rw-r--r--internal/mapr/client/session_state.go95
1 files changed, 95 insertions, 0 deletions
diff --git a/internal/mapr/client/session_state.go b/internal/mapr/client/session_state.go
new file mode 100644
index 0000000..1983644
--- /dev/null
+++ b/internal/mapr/client/session_state.go
@@ -0,0 +1,95 @@
+package client
+
+import (
+ "fmt"
+ "sync"
+
+ "github.com/mimecast/dtail/internal/mapr"
+)
+
+// SessionSnapshot captures the current client-side mapreduce session state.
+type SessionSnapshot struct {
+ Generation uint64
+ Query *mapr.Query
+ GlobalGroup *mapr.GlobalGroupSet
+ LastResult string
+}
+
+// SessionState keeps the mutable mapreduce query state shared by the client
+// reporter and per-server handlers.
+type SessionState struct {
+ mu sync.RWMutex
+ generation uint64
+ query *mapr.Query
+ global *mapr.GlobalGroupSet
+ lastResult string
+ changedCh chan struct{}
+}
+
+// NewSessionState returns a new shared mapreduce session state.
+func NewSessionState(query *mapr.Query) *SessionState {
+ return &SessionState{
+ query: query,
+ global: mapr.NewGlobalGroupSet(),
+ changedCh: make(chan struct{}, 1),
+ }
+}
+
+// Snapshot returns a point-in-time copy of the shared mapreduce state.
+func (s *SessionState) Snapshot() SessionSnapshot {
+ s.mu.RLock()
+ defer s.mu.RUnlock()
+
+ return SessionSnapshot{
+ Generation: s.generation,
+ Query: s.query,
+ GlobalGroup: s.global,
+ LastResult: s.lastResult,
+ }
+}
+
+// Changes returns a channel that is signaled whenever a new generation is committed.
+func (s *SessionState) Changes() <-chan struct{} {
+ return s.changedCh
+}
+
+// CommitQuery resets the shared aggregation state for a newly accepted query generation.
+func (s *SessionState) CommitQuery(rawQuery string, generation uint64) (*mapr.Query, error) {
+ query, err := mapr.NewQuery(rawQuery)
+ if err != nil {
+ return nil, fmt.Errorf("parse session query: %w", err)
+ }
+
+ s.mu.Lock()
+ s.generation = generation
+ s.query = query
+ s.global = mapr.NewGlobalGroupSet()
+ s.lastResult = ""
+ s.mu.Unlock()
+
+ s.notifyChange()
+ return query, nil
+}
+
+// CommitRenderedResult stores the last rendered result for the active generation.
+func (s *SessionState) CommitRenderedResult(generation uint64, result string) (changed bool, ok bool) {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+
+ if s.generation != generation {
+ return false, false
+ }
+ if s.lastResult == result {
+ return false, true
+ }
+
+ s.lastResult = result
+ return true, true
+}
+
+func (s *SessionState) notifyChange() {
+ select {
+ case s.changedCh <- struct{}{}:
+ default:
+ }
+}