summaryrefslogtreecommitdiff
path: root/internal/clients
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2021-09-08 19:10:50 +0300
committerPaul Buetow <paul@buetow.org>2021-10-02 12:26:29 +0300
commit16dc57e1e1c28e9d762424e596223a980770e059 (patch)
treeea5a7d5caa7f4de7bd3b21e57d0e18c0d8507c7d /internal/clients
parentc83c9e61a08c7ea1cb528bc26dfab25b46faa866 (diff)
mapreduce tables are in colors now too
Diffstat (limited to 'internal/clients')
-rw-r--r--internal/clients/baseclient.go1
-rw-r--r--internal/clients/handlers/basehandler.go29
-rw-r--r--internal/clients/handlers/healthhandler.go11
-rw-r--r--internal/clients/handlers/maprhandler.go42
-rw-r--r--internal/clients/maprclient.go31
5 files changed, 70 insertions, 44 deletions
diff --git a/internal/clients/baseclient.go b/internal/clients/baseclient.go
index f20156f..de0c101 100644
--- a/internal/clients/baseclient.go
+++ b/internal/clients/baseclient.go
@@ -71,6 +71,7 @@ func (c *baseClient) Start(ctx context.Context, statsCh <-chan string) (status i
go c.hostKeyCallback.PromptAddHosts(ctx)
// Print client stats every time something on statsCh is recieved.
go c.stats.Start(ctx, c.throttleCh, statsCh, c.Args.Quiet)
+
// Keep count of active connections
active := make(chan struct{}, len(c.connections))
diff --git a/internal/clients/handlers/basehandler.go b/internal/clients/handlers/basehandler.go
index 63ceaac..51f33c1 100644
--- a/internal/clients/handlers/basehandler.go
+++ b/internal/clients/handlers/basehandler.go
@@ -1,6 +1,7 @@
package handlers
import (
+ "bytes"
"encoding/base64"
"fmt"
"io"
@@ -17,7 +18,7 @@ type baseHandler struct {
server string
shellStarted bool
commands chan string
- receiveBuf []byte
+ receiveBuf bytes.Buffer
status int
}
@@ -56,14 +57,23 @@ func (h *baseHandler) SendMessage(command string) error {
// Read data from the dtail server via Writer interface.
func (h *baseHandler) Write(p []byte) (n int, err error) {
for _, b := range p {
- if b == protocol.MessageDelimiter || b == '\n' {
- if len(h.receiveBuf) == 0 {
+ switch b {
+ /*
+ // TODO: Next DTail version make it so that '\n' gets ignored. For now
+ // leave it for compatibility with older DTail server + ability to display
+ // the protocol mismatch warn message.
+ case '\n' {
+ continue
+ */
+ case '\n', protocol.MessageDelimiter:
+ message := h.receiveBuf.String()
+ if len(message) == 0 {
continue
}
- message := string(h.receiveBuf)
h.handleMessageType(message)
- } else {
- h.receiveBuf = append(h.receiveBuf, b)
+ h.receiveBuf.Reset()
+ default:
+ h.receiveBuf.WriteByte(b)
}
}
@@ -78,25 +88,22 @@ func (h *baseHandler) Read(p []byte) (n int, err error) {
case <-h.Done():
return 0, io.EOF
}
-
return
}
// Handle various message types.
func (h *baseHandler) handleMessageType(message string) {
- if len(h.receiveBuf) == 0 {
+ if len(message) == 0 {
return
}
// Hidden server commands starti with a dot "."
- if h.receiveBuf[0] == '.' {
+ if message[0] == '.' {
h.handleHiddenMessage(message)
- h.receiveBuf = h.receiveBuf[:0]
return
}
logger.Raw(message)
- h.receiveBuf = h.receiveBuf[:0]
}
// Handle messages received from server which are not meant to be displayed
diff --git a/internal/clients/handlers/healthhandler.go b/internal/clients/handlers/healthhandler.go
index 213748c..eca0348 100644
--- a/internal/clients/handlers/healthhandler.go
+++ b/internal/clients/handlers/healthhandler.go
@@ -1,6 +1,7 @@
package handlers
import (
+ "bytes"
"errors"
"fmt"
"time"
@@ -13,7 +14,7 @@ import (
type HealthHandler struct {
done *internal.Done
// Buffer of incoming data from server.
- receiveBuf []byte
+ receiveBuf bytes.Buffer
// To send commands to the server.
commands chan string
// To receive messages from the server.
@@ -72,10 +73,10 @@ func (h *HealthHandler) SendMessage(command string) error {
// Server writes byte stream to client.
func (h *HealthHandler) Write(p []byte) (n int, err error) {
for _, b := range p {
- h.receiveBuf = append(h.receiveBuf, b)
- if b == protocol.MessageDelimiter { // '\n' {
- h.receive <- string(h.receiveBuf)
- h.receiveBuf = h.receiveBuf[:0]
+ h.receiveBuf.WriteByte(b)
+ if b == protocol.MessageDelimiter {
+ h.receive <- h.receiveBuf.String()
+ h.receiveBuf.Reset()
}
}
diff --git a/internal/clients/handlers/maprhandler.go b/internal/clients/handlers/maprhandler.go
index afad507..65b1454 100644
--- a/internal/clients/handlers/maprhandler.go
+++ b/internal/clients/handlers/maprhandler.go
@@ -15,7 +15,6 @@ type MaprHandler struct {
baseHandler
aggregate *client.Aggregate
query *mapr.Query
- count uint64
}
// NewMaprHandler returns a new mapreduce client handler.
@@ -36,19 +35,20 @@ func NewMaprHandler(server string, query *mapr.Query, globalGroup *mapr.GlobalGr
// Read data from the dtail server via Writer interface.
func (h *MaprHandler) Write(p []byte) (n int, err error) {
for _, b := range p {
- h.baseHandler.receiveBuf = append(h.baseHandler.receiveBuf, b)
- if b == protocol.MessageDelimiter { // '\n' {
- if len(h.baseHandler.receiveBuf) == 0 {
- continue
+ switch b {
+ case '\n':
+ continue
+ case protocol.MessageDelimiter:
+ message := h.baseHandler.receiveBuf.String()
+ logger.Debug(message)
+ if message[0] == 'A' {
+ h.handleAggregateMessage(message)
+ } else {
+ h.baseHandler.handleMessageType(message)
}
- message := string(h.baseHandler.receiveBuf)
-
- if h.baseHandler.receiveBuf[0] == 'A' {
- h.handleAggregateMessage(strings.TrimSpace(message))
- h.baseHandler.receiveBuf = h.baseHandler.receiveBuf[:0]
- continue
- }
- h.baseHandler.handleMessageType(message)
+ h.baseHandler.receiveBuf.Reset()
+ default:
+ h.baseHandler.receiveBuf.WriteByte(b)
}
}
@@ -58,12 +58,12 @@ func (h *MaprHandler) Write(p []byte) (n int, err error) {
// Handle a message received from server including mapr aggregation
// related data.
func (h *MaprHandler) handleAggregateMessage(message string) {
- h.count++
- parts := strings.Split(message, protocol.AggregateDelimiter)
-
- // Index 0 contains 'AGGREGATE', 1 contains server host.
- // Aggregation data begins from index 2.
- logger.Debug("Received aggregate data", h.server, h.count, parts)
- h.aggregate.Aggregate(parts[2:])
- logger.Debug("Aggregated aggregate data", h.server, h.count)
+ parts := strings.SplitN(message, protocol.FieldDelimiter, 3)
+ if len(parts) != 3 {
+ logger.Error("Unable to aggregate data", h.server, message, parts, len(parts), "expected 3 parts")
+ return
+ }
+ if err := h.aggregate.Aggregate(parts[2]); err != nil {
+ logger.Error("Unable to aggregate data", h.server, message, err)
+ }
}
diff --git a/internal/clients/maprclient.go b/internal/clients/maprclient.go
index 77b674b..cab9a6c 100644
--- a/internal/clients/maprclient.go
+++ b/internal/clients/maprclient.go
@@ -9,6 +9,8 @@ import (
"time"
"github.com/mimecast/dtail/internal/clients/handlers"
+ "github.com/mimecast/dtail/internal/color"
+ "github.com/mimecast/dtail/internal/config"
"github.com/mimecast/dtail/internal/io/logger"
"github.com/mimecast/dtail/internal/mapr"
"github.com/mimecast/dtail/internal/omode"
@@ -37,6 +39,8 @@ type MaprClient struct {
query *mapr.Query
// Additative result or new result every interval run?
cumulative bool
+ // The last result string received
+ lastResult string
}
// NewMaprClient returns a new mapreduce client.
@@ -154,24 +158,37 @@ func (c *MaprClient) reportResults() {
func (c *MaprClient) printResults() {
var result string
var err error
- var numLines int
+ var numRows int
if c.cumulative {
- result, numLines, err = c.globalGroup.Result(c.query)
+ result, numRows, err = c.globalGroup.Result(c.query)
} else {
- result, numLines, err = c.globalGroup.SwapOut().Result(c.query)
+ result, numRows, err = c.globalGroup.SwapOut().Result(c.query)
}
+
if err != nil {
logger.FatalExit(err)
}
- if numLines == 0 {
- logger.Warn("Empty result set this time...")
+ if result == c.lastResult {
+ logger.Debug("Result hasn't changed compared to last time...")
return
}
+ c.lastResult = result
- //logger.Raw(fmt.Sprintf("%s\n", c.query.RawQuery))
- logger.Raw(c.query.RawQuery)
+ if numRows == 0 {
+ logger.Debug("Empty result set this time...")
+ return
+ }
+
+ rawQuery := c.query.RawQuery
+ if config.Client.TermColorsEnable {
+ rawQuery = color.PaintStrWithAttr(rawQuery,
+ config.Client.TermColors.MaprTable.RawQueryFg,
+ config.Client.TermColors.MaprTable.RawQueryBg,
+ config.Client.TermColors.MaprTable.RawQueryAttr)
+ }
+ logger.Raw(rawQuery)
logger.Raw(result)
}