diff options
Diffstat (limited to 'fs')
| -rw-r--r-- | fs/catfile.go | 27 | ||||
| -rw-r--r-- | fs/filereader.go | 9 | ||||
| -rw-r--r-- | fs/lineread.go | 28 | ||||
| -rw-r--r-- | fs/permissions/permission.go | 14 | ||||
| -rw-r--r-- | fs/permissions/permission_linux.c | 395 | ||||
| -rw-r--r-- | fs/permissions/permission_linux.go | 33 | ||||
| -rw-r--r-- | fs/permissions/permission_linux.h | 60 | ||||
| -rw-r--r-- | fs/permissions/permission_test.go | 112 | ||||
| -rw-r--r-- | fs/readfile.go | 318 | ||||
| -rw-r--r-- | fs/stats.go | 69 | ||||
| -rw-r--r-- | fs/tailfile.go | 27 |
11 files changed, 1092 insertions, 0 deletions
diff --git a/fs/catfile.go b/fs/catfile.go new file mode 100644 index 0000000..99f521f --- /dev/null +++ b/fs/catfile.go @@ -0,0 +1,27 @@ +package fs + +import "sync" + +// CatFile is for reading a whole file. +type CatFile struct { + readFile +} + +// NewCatFile returns a new file catter. +func NewCatFile(filePath string, globID string, serverMessages chan<- string, limiter chan struct{}) CatFile { + var mutex sync.Mutex + + return CatFile{ + readFile: readFile{ + filePath: filePath, + stop: make(chan struct{}), + globID: globID, + serverMessages: serverMessages, + retry: false, + canSkipLines: false, + seekEOF: false, + limiter: limiter, + mutex: &mutex, + }, + } +} diff --git a/fs/filereader.go b/fs/filereader.go new file mode 100644 index 0000000..5a08e27 --- /dev/null +++ b/fs/filereader.go @@ -0,0 +1,9 @@ +package fs + +// FileReader is the interface used on the dtail server to read/cat/grep/mapr... a file. +type FileReader interface { + Start(lines chan<- LineRead, regex string) error + FilePath() string + Retry() bool + Stop() +} diff --git a/fs/lineread.go b/fs/lineread.go new file mode 100644 index 0000000..7ee558e --- /dev/null +++ b/fs/lineread.go @@ -0,0 +1,28 @@ +package fs + +import ( + "fmt" +) + +// LineRead represents a read log line. +type LineRead struct { + // The content of the log line. + Content []byte + // Until now, how many log lines were processed? + Count uint64 + // Sometimes we produce too many log lines so that the client + // is too slow to process all of them. The server will drop log + // lines if that happens but it will signal to the client how + // many log lines in % could be transmitted to the client. + TransmittedPerc int + GlobID *string +} + +// Return a human readable representation of the followed line. +func (l LineRead) String() string { + return fmt.Sprintf("LineRead(Content:%s,TransmittedPerc:%v,Count:%v,GlobID:%s)", + string(l.Content), + l.TransmittedPerc, + l.Count, + *l.GlobID) +} diff --git a/fs/permissions/permission.go b/fs/permissions/permission.go new file mode 100644 index 0000000..7d242f1 --- /dev/null +++ b/fs/permissions/permission.go @@ -0,0 +1,14 @@ +// +build !linux + +package permissions + +import ( + "dtail/logger" +) + +// ToRead is to check whether user has read permissions to a given file. +func ToRead(user, filePath string) (bool, error) { + // Only implemented for Linux, always expect true + logger.Warn(user, filePath, "Not performing ACL check, not supported on this platform") + return true, nil +} diff --git a/fs/permissions/permission_linux.c b/fs/permissions/permission_linux.c new file mode 100644 index 0000000..cd10525 --- /dev/null +++ b/fs/permissions/permission_linux.c @@ -0,0 +1,395 @@ +#include "permission_linux.h" + +#ifdef DEBUG +void debug_print_checker(struct permission_checker *pc) { + fprintf(stderr, "DEBUG: user_name:%s (%d)\n", + pc->user_name, pc->uid); + + fprintf(stderr, "DEBUG: ngids:%d\n", pc->ngids); + int j; + for (j = 0; j < pc->ngids; j++) { + fprintf(stderr, "DEBUG: %d", pc->gids[j]); + struct group *gr = getgrgid(pc->gids[j]); + if (gr != NULL) + fprintf(stderr, " (%s)", gr->gr_name); + fprintf(stderr, "\n"); + } + + fprintf(stderr, "DEBUG: file_path:%s (%d:%d)\n", + pc->file_path, pc->file_stat.st_uid, pc->file_stat.st_gid); +} +#endif // DEBUG + +int stat_file(struct permission_checker *pc) { + if (stat(pc->file_path, &pc->file_stat) != 0) + return -1; + +#ifdef DEBUG + fprintf(stderr, "DEBUG: File'%s' is owned by '%d:%d'\n", + pc->file_path, pc->file_stat.st_uid, pc->file_stat.st_gid); +#endif + + return 0; +} + +int get_user_uid(struct permission_checker *pc) { + struct passwd *result = NULL; + + size_t bufsize = sysconf(_SC_GETPW_R_SIZE_MAX); + if (bufsize == -1) + bufsize = 16384; + + char *buf = malloc(bufsize); + if (buf == NULL) { +#ifdef DEBUG + fprintf(stderr, "DEBUG: Unabel to allocate bufer while retrieving user '%s'\n", pc->user_name); +#endif + return -1; + } + + int rc = getpwnam_r(pc->user_name, &pc->pw, buf, bufsize, &result); + + if (result == NULL) { +#ifdef DEBUG + if (rc == 0) { + fprintf(stderr, "DEBUG: No user '%s' found\n", pc->user_name); + } else { + fprintf(stderr, "DEBUG: Unknown error while retrieving user '%s'\n", pc->user_name); + } +#endif + + free(buf); + return -1; + } + + pc->uid = pc->pw.pw_uid; + + free(buf); + return 0; +} + +int get_user_groups(struct permission_checker *pc) { + // First assume we are in 10 groups max + pc->ngids = 10; + pc->gids = malloc(pc->ngids * sizeof(gid_t)); + + if (pc->gids == NULL) { +#ifdef DEBUG + fprintf(stderr, "DEBUG: Unable to allocate space for gids."); +#endif + return -1; + } + + // Try so many times to load group list until it fits into group array. + while (getgrouplist(pc->user_name, pc->pw.pw_gid, pc->gids, &pc->ngids) == -1) { + // Too many groups, enlarge group array and try again + int newngids = pc->ngids + 100; + size_t newsize = newngids * sizeof(gid_t); + + if (SIZE_MAX / newngids < sizeof(gid_t)) { + // Overflow +#ifdef DEBUG + fprintf(stderr, "DEBUG: Overflow detected."); +#endif + return -1; + } + + gid_t *newgids = realloc(pc->gids, newsize); + if (newgids == NULL) { +#ifdef DEBUG + fprintf(stderr, "DEBUG: Unable to allocate space for gids."); +#endif + free(pc->gids); + return -1; + } + + pc->gids = newgids; + pc->ngids = newngids; + } + + return 0; +} + +int is_member_of_group(struct permission_checker *pc, gid_t gid) { + int j; + for (j = 0; j < pc->ngids; j++) + if (pc->gids[j] == gid) + return 1; + return 0; +} + +int check_acl_uid_matches(uid_t uid, acl_entry_t entry) { + int ret = -1; + uid_t *acl_uid = acl_get_qualifier(entry); + if (acl_uid == NULL) { +#ifdef DEBUG + fprintf(stderr, "DEBUG: Unable to retrieve user uid from ACL entry"); +#endif + return -1; + } + + ret = *acl_uid == uid ? 0 : -1; +#ifdef DEBUG + fprintf(stderr, "DEBUG: ACL user match?: %d <=> %d: %d\n", *acl_uid, uid, ret); +#endif + acl_free(acl_uid); + return ret; +} + +int check_acl_gid_matches(gid_t *gids, int ngids, acl_entry_t entry) { + int ret = -1; + gid_t *acl_gid = acl_get_qualifier(entry); + if (acl_gid == NULL) { +#ifdef DEBUG + fprintf(stderr, "DEBUG: Unable to retrieve user uid from ACL entry"); +#endif + return -1; + } + + int j; + for (j = 0; j < ngids; j++) { + if (*acl_gid == gids[j]) { +#ifdef DEBUG + fprintf(stderr, "DEBUG: User is in group %d", *acl_gid); +#endif + ret = 0; + break; + } + } + +#ifdef DEBUG + fprintf(stderr, "DEBUG: ACL group match?: %d <=> ...: %d\n", *acl_gid, ret); +#endif + acl_free(acl_gid); + return ret; +} + +int check_acl(struct permission_checker *pc, const int flag) { + // By default user has no read perm. + int has_read_perm = 0; + + // By default mask tells that there are read perm. However in order to have + // read permissions both, has_read_perm and mask_allows_read_access must be 1! + int mask_allows_read_access = 1; + + acl_type_t type = ACL_TYPE_ACCESS; + acl_t acl = acl_get_file(pc->file_path, type); + + if (acl == NULL) + // Unable to retrieve ACL. + return -1; + + // Walk through each entry of this ACL. + int id; + for (id = ACL_FIRST_ENTRY; ; id = ACL_NEXT_ENTRY) { + acl_entry_t entry; + if (acl_get_entry(acl, id, &entry) != 1) + // No more ACL entries. + break; + + acl_tag_t tag; + if (acl_get_tag_type(entry, &tag) == -1) + // Unable to retrieve ACL tag. + return -1; + + switch (tag) { + case ACL_USER_OBJ: + if (flag == GROUP_CHECK) + continue; +#ifdef DEBUG + fprintf(stderr, "DEBUG: ACL_USER_OBJ\n"); +#endif + // Ignore this ACL entry if user is not owner of file. + if (pc->uid != pc->file_stat.st_uid) + continue; + break; + case ACL_USER: + if (flag == GROUP_CHECK) + continue; +#ifdef DEBUG + fprintf(stderr, "DEBUG: ACL_USER\n"); +#endif + // Ignore this ACL entry if uid does not match. + if (check_acl_uid_matches(pc->uid, entry) != 0) + continue; + break; + case ACL_GROUP_OBJ: + if (flag == USER_CHECK) + continue; +#ifdef DEBUG + fprintf(stderr, "DEBUG: ACL_GROUP_OBJ\n"); +#endif + // Ignore ACL entry if user is not in group of file. + if (!is_member_of_group(pc, pc->file_stat.st_gid)) + continue; + break; + case ACL_GROUP: + if (flag == USER_CHECK) + continue; +#ifdef DEBUG + fprintf(stderr, "DEBUG: ACL_GROUP\n"); +#endif + // Ignore ACL entry if user is not in group of entry. + if (check_acl_gid_matches(pc->gids, pc->ngids, entry) != 0) + continue; + break; + case ACL_OTHER: + if (flag == GROUP_CHECK) + continue; +#ifdef DEBUG + fprintf(stderr, "DEBUG: ACL_OTHER\n"); +#endif + break; + case ACL_MASK: +#ifdef DEBUG + fprintf(stderr, "DEBUG: ACL_MASK\n"); +#endif + break; + default: +#ifdef DEBUG + fprintf(stderr, "DEBUG: Unknown ACL tag\n"); +#endif + return -1; + } + +#ifdef DEBUG + fprintf(stderr, "DEBUG: Retrieving permset\n"); +#endif + acl_permset_t permset; + int permission; + if (acl_get_permset(entry, &permset) == -1) + // Unable to retrieve permset. + return -1; + + if ((permission = acl_get_perm(permset, ACL_READ)) == -1) + // Unable to retrieve permset value. + return -1; + + if (permission == 1 && tag != ACL_MASK) { +#ifdef DEBUG + fprintf(stderr, "DEBUG: ACL says user has permission to read file.\n"); +#endif + has_read_perm = 1; + } else if (permission == 0 && tag == ACL_MASK) { + // Mask says that there are no permissions to read. + mask_allows_read_access = 0; +#ifdef DEBUG + fprintf(stderr, "DEBUG: ACL mask says no permission to read file.\n"); +#endif + } + } + + if (has_read_perm && mask_allows_read_access) { +#ifdef DEBUG + fprintf(stderr, "DEBUG: ACL end result: User has permission to read file.\n"); +#endif + return 1; + } + +#ifdef DEBUG + fprintf(stderr, "DEBUG: ACL end result: User has no permission to read file.\n"); +#endif + return 0; +} + +int check_traditional(struct permission_checker *pc, const int flag) { + mode_t mode = pc->file_stat.st_mode; + uid_t uid = pc->file_stat.st_uid; + gid_t gid = pc->file_stat.st_gid; + + if (flag == USER_CHECK && (mode & S_IROTH)) { +#ifdef DEBUG + fprintf(stderr, "DEBUG: Others can read file '%s'\n", + pc->file_path); +#endif + return 1; + + } else if (flag == USER_CHECK && (mode & S_IRUSR) && uid == pc->uid) { +#ifdef DEBUG + fprintf(stderr, "DEBUG: User '%s' can read file '%s'\n", + pc->user_name, pc->file_path); +#endif + return 1; + + } else if (flag == GROUP_CHECK && (mode & S_IRGRP) && is_member_of_group(pc, gid)) { +#ifdef DEBUG + fprintf(stderr, "DEBUG: User's '%s' group can read file '%s'\n", + pc->user_name, pc->file_path); +#endif + return 1; + } + + return 0; +} + +int permission_to_read(char* user_name, char *file_path) { + int rc = -1; + +#ifdef DEBUG + fprintf(stderr, "DEBUG: User check '%s' for file '%s'\n", user_name, file_path); +#endif + struct permission_checker pc = { + .user_name = user_name, + .gids = NULL, + .ngids = 0, + .file_path = file_path, + }; + + // Gather user's UID. + if ((rc = get_user_uid(&pc)) == -1) + // Could not retrieve UID. + goto cleanup; + + // Gather file owner (user and group). + if ((rc = stat_file(&pc)) == -1) + // Could not stat file. + goto cleanup; + + // Check whether there is an ACL entry which would allow the user + // to read the file. Don't check for any groups yet. The issue with + // groups is that it can be very slow to retrieve the list of groups + // of a specific user when done via a remote LDAP server! + if ((rc = check_acl(&pc, USER_CHECK)) == 1) + // Yes, has permissions. + goto cleanup; + + // Check whether ACLs of file could be retrieved. + if (rc == -1) { + if (errno != ENOTSUP) + // Unknown error. + goto cleanup; + + // File system does not support ACLs. + // Fallback to traditional permissions. + if ((rc = check_traditional(&pc, USER_CHECK)) == 1) + // Yes, has traditional permissions. + goto cleanup; + + if ((rc = get_user_groups(&pc)) == -1) + // Can not retrieve user's groups. + goto cleanup; + + rc = check_traditional(&pc, GROUP_CHECK); + goto cleanup; + } + + if ((rc = get_user_groups(&pc)) == -1) + // Can not retrieve use'r groups. + goto cleanup; + + // Check whether there is an ACL entry which would allow any of the + // user's groups to read the file. + rc = check_acl(&pc, GROUP_CHECK); + +cleanup: +#ifdef DEBUG + debug_print_checker(&pc); +#endif + + if (pc.ngids) + free(pc.gids); + + return rc; +} + +// vim: set tabstop=8 softtabstop=0 expandtab shiftwidth=4 smarttab diff --git a/fs/permissions/permission_linux.go b/fs/permissions/permission_linux.go new file mode 100644 index 0000000..feae729 --- /dev/null +++ b/fs/permissions/permission_linux.go @@ -0,0 +1,33 @@ +package permissions + +/* +#include "permission_linux.h" +#cgo LDFLAGS: -L. -lacl +*/ +import "C" + +import ( + "errors" + "unsafe" +) + +// To check whether user has Linux file system permissions to read a given file. +func ToRead(user, filePath string) (bool, error) { + cUser := C.CString(user) + cFilePath := C.CString(filePath) + + defer C.free(unsafe.Pointer(cUser)) + defer C.free(unsafe.Pointer(cFilePath)) + + cOk, err := C.permission_to_read(cUser, cFilePath) + if cOk == 1 { + return true, nil + } + + if err != nil { + // err contains errno message + return false, err + } + + return false, errors.New("User without permission to read file") +} diff --git a/fs/permissions/permission_linux.h b/fs/permissions/permission_linux.h new file mode 100644 index 0000000..a2c266e --- /dev/null +++ b/fs/permissions/permission_linux.h @@ -0,0 +1,60 @@ +#ifndef PERMISSION_LINUX_H +#define PERMISSION_LINUX_H + +#include <acl/libacl.h> +#include <errno.h> +#include <grp.h> +#include <pwd.h> +#include <stdio.h> +#include <stdint.h> +#include <stdlib.h> +#include <sys/acl.h> +#include <sys/stat.h> +#include <sys/types.h> +#include <unistd.h> + +//#define DEBUG +#define USER_CHECK 0 +#define GROUP_CHECK 1 + +struct permission_checker { + char *user_name; + uid_t uid; + gid_t *gids; + int ngids; + char *file_path; + struct stat file_stat; + struct passwd pw; +}; + + +#ifdef DEBUG +// Print out permission_checker struct. +void debug_print_checker(struct permission_checker *pc); +#endif + +// Stat a given file to retrieve traditional UNIX permissions. +int stat_file(struct permission_checker *pc); + +// Retrieve UID of user. +int get_user_uid(struct permission_checker *pc); + +// Retrieve all groups of the user. +int get_user_groups(struct permission_checker *pc); + +// Check whether user is member of a group or not. +int is_member_of_group(struct permission_checker *pc, gid_t gid); + +// Check whether user can read file according Linux ACLs. +// As flag use either USER_CHECK or GROUP_CHECK. +int check_acl(struct permission_checker *pc, const int flag); + +// Check whether user has permissions to read file according traditional +// UNIX permissions. As flag use either USER_CHECK or GROUP_CHECK. +int check_traditional(struct permission_checker *pc, const int flag); + +// Returns 1 if user has permission to read file. +// Returns <0 on error and returns 0 if no permissions. +int permission_to_read(char* user, char *file_path); + +#endif // PERMISSION_LINUX_H diff --git a/fs/permissions/permission_test.go b/fs/permissions/permission_test.go new file mode 100644 index 0000000..d415ac2 --- /dev/null +++ b/fs/permissions/permission_test.go @@ -0,0 +1,112 @@ +// +build linux + +package permissions + +import ( + "os" + "os/exec" + "os/user" + "strings" + "testing" +) + +const ( + setfacl string = "/usr/bin/setfacl" + file string = "/tmp/acltest" +) + +func TestLinuxACL(t *testing.T) { + setfacl := "/usr/bin/setfacl" + file := "/tmp/acltest" + + // Delete file if it exists. + if _, err := os.Stat(file); err == nil { + os.Remove(file) + } + + f, err := os.Create(file) + if err != nil { + t.Errorf("%v", err) + } + defer func() { + f.Close() + //os.Remove(file) + }() + + user, err := user.Current() + if err != nil { + t.Errorf("Unable to retrieve current user: %v", err) + } + + // Test 1: Remove all permissions and perform a permission check + cmd := exec.Command(setfacl, "-b", "-m", "u::---,g::---,o::---", file) + if err := cmd.Run(); err != nil { + t.Errorf("%s -> %v", strings.Join(cmd.Args, " "), err) + } + if ok, _ := ToRead(user.Username, file); ok { + t.Errorf("Didn't expect permissions to read file!") + } + + // Test 2: Add read permission to file owner + cmd = exec.Command(setfacl, "-b", "-m", "u::r--,g::---,o::---", file) + if err := cmd.Run(); err != nil { + t.Errorf("%s -> %v", strings.Join(cmd.Args, " "), err) + } + if ok, err := ToRead(user.Username, file); !ok { + t.Errorf("Expected permissions to read file: %v", err) + } + + // Test 3: Add read permission to file group + cmd = exec.Command(setfacl, "-b", "-m", "u::---,g::r--,o::---", file) + if err := cmd.Run(); err != nil { + t.Errorf("%s -> %v", strings.Join(cmd.Args, " "), err) + } + if ok, err := ToRead(user.Username, file); !ok { + t.Errorf("Expected permissions to read file: %v", err) + } + + // Test 4: Add read permission to others + cmd = exec.Command(setfacl, "-b", "-m", "u::---,g::---,o::r--", file) + if err := cmd.Run(); err != nil { + t.Errorf("%s -> %v", strings.Join(cmd.Args, " "), err) + } + + if ok, err := ToRead(user.Username, file); !ok { + t.Errorf("Expected permissions to read file: %v", err) + } + + // Test 5: Remove read permission from mask + cmd = exec.Command(setfacl, "-m", "m::---", file) + if err := cmd.Run(); err != nil { + t.Errorf("%s -> %v", strings.Join(cmd.Args, " "), err) + } + if ok, _ := ToRead(user.Username, file); ok { + t.Errorf("Didn't expect permissions to read file!") + } + cmd = exec.Command(setfacl, "-m", "m::r--", file) + if err := cmd.Run(); err != nil { + t.Errorf("%s -> %v", strings.Join(cmd.Args, " "), err) + } + + // Test 6: Add read permission to specific group + cmd = exec.Command(setfacl, "-b", "-m", "u::---,g:"+user.Username+":r--,o::---", file) + if err := cmd.Run(); err != nil { + t.Errorf("%s -> %v", strings.Join(cmd.Args, " "), err) + } + if ok, err := ToRead(user.Username, file); !ok { + t.Errorf("Expected permissions to read file for user %v: %v", user.Username, err) + } + + // Test 7: Remove all permissions but mask + cmd = exec.Command(setfacl, "-b", "-m", "u::---,g::---,o::---", file) + if err := cmd.Run(); err != nil { + t.Errorf("%s -> %v", strings.Join(cmd.Args, " "), err) + } + cmd = exec.Command(setfacl, "-m", "m::r--", file) + if err := cmd.Run(); err != nil { + t.Errorf("%s -> %v", strings.Join(cmd.Args, " "), err) + } + if ok, _ := ToRead(user.Username, file); ok { + t.Errorf("Didn't expect permissions to read file!") + } +} diff --git a/fs/readfile.go b/fs/readfile.go new file mode 100644 index 0000000..375378b --- /dev/null +++ b/fs/readfile.go @@ -0,0 +1,318 @@ +package fs + +import ( + "bufio" + "compress/gzip" + "dtail/logger" + "errors" + "io" + "os" + "regexp" + "strings" + "sync" + "time" + + "github.com/DataDog/zstd" +) + +// Used to tail and filter a local log file. +type readFile struct { + // Various statistics (e.g. regex hit percentage, transfer percentage). + stats + // Path of log file to tail. + filePath string + // Only consider all log lines matching this regular expression. + re *regexp.Regexp + // The glob identifier of the file. + globID string + // Channel to send a server message to the dtail client + serverMessages chan<- string + // Signals to stop tailing the log file. + stop chan struct{} + // Periodically retry reading file. + retry bool + // Can I skip messages when there are too many? + canSkipLines bool + // Seek to the EOF before processing file? + seekEOF bool + // Mutex to control the stopping of the file + mutex *sync.Mutex + limiter chan struct{} +} + +// FilePath returns the full file path. +func (f readFile) FilePath() string { + return f.filePath +} + +// Retry reading the file on error? +func (f readFile) Retry() bool { + return f.retry +} + +// Start tailing a log file. +func (f readFile) Start(lines chan<- LineRead, regex string) error { + defer func() { + select { + case <-f.limiter: + default: + } + }() + + select { + case f.limiter <- struct{}{}: + default: + select { + case f.serverMessages <- logger.Warn(f.filePath, f.globID, "Server limit reached. Queuing file..."): + case <-f.stop: + return nil + } + f.limiter <- struct{}{} + } + + fd, err := os.Open(f.filePath) + if err != nil { + return err + } + defer fd.Close() + + if f.seekEOF { + fd.Seek(0, io.SeekEnd) + } + + rawLines := make(chan []byte, 100) + truncate := make(chan struct{}) + + var wg sync.WaitGroup + wg.Add(1) + + go f.periodicTruncateCheck(truncate) + go f.filter(&wg, rawLines, lines, regex) + + err = f.read(fd, rawLines, truncate) + close(rawLines) + wg.Wait() + + return err +} + +func (f readFile) periodicTruncateCheck(truncate chan struct{}) { + for { + select { + case <-time.After(time.Second * 3): + select { + case truncate <- struct{}{}: + case <-f.stop: + } + case <-f.stop: + return + } + } +} + +// Stop reading file. +func (f readFile) Stop() { + f.mutex.Lock() + defer f.mutex.Unlock() + + select { + case <-f.stop: + return + default: + } + + close(f.stop) +} + +func (f readFile) makeReader(fd *os.File) (reader *bufio.Reader, err error) { + switch { + case strings.HasSuffix(f.FilePath(), ".gz"): + fallthrough + case strings.HasSuffix(f.FilePath(), ".gzip"): + logger.Info(f.FilePath(), "Detected gzip compression format") + var gzipReader *gzip.Reader + gzipReader, err = gzip.NewReader(fd) + if err != nil { + return + } + reader = bufio.NewReader(gzipReader) + case strings.HasSuffix(f.FilePath(), ".zst"): + logger.Info(f.FilePath(), "Detected zstd compression format") + reader = bufio.NewReader(zstd.NewReader(fd)) + default: + reader = bufio.NewReader(fd) + } + + return +} + +func (f readFile) read(fd *os.File, rawLines chan []byte, truncate <-chan struct{}) error { + reader, err := f.makeReader(fd) + if err != nil { + return err + } + rawLine := make([]byte, 0, 512) + var offset uint64 + + lineLengthThreshold := 1024 * 1024 // 1mb + longLineWarning := false + + for { + select { + case <-truncate: + if isTruncated, err := f.truncated(fd); isTruncated { + return err + } + logger.Info(f.filePath, "Current offset", offset) + + case <-f.stop: + return nil + default: + } + + // Read some bytes (max 4k at once as of go 1.12). isPrefix will + // be set if line does not fit into 4k buffer. + bytes, isPrefix, err := reader.ReadLine() + + if err != nil { + // If EOF, sleep a couple of ms and return with nil error. + // If other error, return with non-nil error. + if err != io.EOF { + return err + } + if !f.seekEOF { + logger.Debug(f.FilePath(), "End of file reached") + return nil + } + time.Sleep(time.Millisecond * 100) + continue + } + + rawLine = append(rawLine, bytes...) + offset += uint64(len(bytes)) + + if !isPrefix { + // last LineRead call returned contend until end of line. + rawLine = append(rawLine, '\n') + select { + case rawLines <- rawLine: + case <-f.stop: + return nil + } + rawLine = make([]byte, 0, 512) + if longLineWarning { + longLineWarning = false + } + continue + } + + // Last LineRead call could not read content until end of line, buffer + // was too small. Determine whether we exceed the max line length we + // want dtail to send to the client at once. Possibly split up log line + // into multiple log lines. + if len(rawLine) >= lineLengthThreshold { + if !longLineWarning { + f.serverMessages <- logger.Warn(f.filePath, "Long log line, splitting into multiple lines") + // Only print out one warning per long log line. + longLineWarning = true + } + rawLine = append(rawLine, '\n') + select { + case rawLines <- rawLine: + case <-f.stop: + return nil + } + rawLine = make([]byte, 0, 512) + } + } +} + +// Filter log lines matching a given regular expression. +func (f readFile) filter(wg *sync.WaitGroup, rawLines <-chan []byte, lines chan<- LineRead, regex string) { + defer wg.Done() + + if regex == "" { + regex = "." + } + + re, err := regexp.Compile(regex) + if err != nil { + logger.Error(regex, "Can't compile regex, using '.' instead", err) + re = regexp.MustCompile(".") + } + f.re = re + + for { + select { + case line, ok := <-rawLines: + f.updatePosition() + if !ok { + return + } + if filteredLine, ok := f.transmittable(line, len(lines), cap(lines)); ok { + select { + case lines <- filteredLine: + case <-f.stop: + return + } + } + } + } +} + +func (f readFile) transmittable(line []byte, length, capacity int) (LineRead, bool) { + var read LineRead + + if !f.re.Match(line) { + f.updateLineNotMatched() + f.updateLineNotTransmitted() + return read, false + } + f.updateLineMatched() + + // Can we actually send more messages, channel capacity reached? + if f.canSkipLines && length >= capacity { + f.updateLineNotTransmitted() + return read, false + } + f.updateLineTransmitted() + + read = LineRead{ + Content: line, + GlobID: &f.globID, + Count: f.totalLineCount(), + TransmittedPerc: f.transmittedPerc(), + } + + return read, true +} + +// Check wether log file is truncated. Returns nil if not. +func (f readFile) truncated(fd *os.File) (bool, error) { + logger.Debug(f.filePath, "File truncation check") + + // Can not seek currently open FD. + curPos, err := fd.Seek(0, os.SEEK_CUR) + if err != nil { + return true, err + } + + // Can not open file at original path. + pathFd, err := os.Open(f.filePath) + if err != nil { + return true, err + } + defer pathFd.Close() + + // Can not seek file at original path. + pathPos, err := pathFd.Seek(0, io.SeekEnd) + if err != nil { + return true, err + } + + if curPos > pathPos { + return true, errors.New("File got truncated") + } + + return false, nil +} diff --git a/fs/stats.go b/fs/stats.go new file mode 100644 index 0000000..4121ff7 --- /dev/null +++ b/fs/stats.go @@ -0,0 +1,69 @@ +package fs + +// Used to calculate how many log lines matched the regular expression +// and how many log files could be transmitted from the server to the client. +// Hit and transmit percentage takes only the last 100 log lines into calculation. +type stats struct { + pos int + lineCount uint64 + matched [100]bool + matchCount uint64 + transmitted [100]bool + transmitCount int +} + +// Return the total line count. +func (f *stats) totalLineCount() uint64 { + return f.lineCount +} + +// Calculate the percentage of log lines transmitted to the client. +func (f *stats) transmittedPerc() int { + return int(percentOf(float64(f.matchCount), float64(f.transmitCount))) +} + +// Update bucket position. We only take into consideration the last 100 +// lines for stats. +func (f *stats) updatePosition() { + f.pos = (f.pos + 1) % 100 + f.lineCount++ +} + +// Increment match counter. +func (f *stats) updateLineMatched() { + if !f.matched[f.pos] { + f.matchCount++ + f.matched[f.pos] = true + } +} + +// Increment transmitted counter. +func (f *stats) updateLineTransmitted() { + if !f.transmitted[f.pos] { + f.transmitCount++ + f.transmitted[f.pos] = true + } +} + +// Decrement match counter. +func (f *stats) updateLineNotMatched() { + if f.matched[f.pos] { + f.matchCount-- + f.matched[f.pos] = false + } +} + +// Decrement transmitted counter. +func (f *stats) updateLineNotTransmitted() { + if f.transmitted[f.pos] { + f.transmitCount-- + f.transmitted[f.pos] = false + } +} + +func percentOf(total float64, value float64) float64 { + if total == 0 || total == value { + return 100 + } + return value / (total / 100.0) +} diff --git a/fs/tailfile.go b/fs/tailfile.go new file mode 100644 index 0000000..a19d4e6 --- /dev/null +++ b/fs/tailfile.go @@ -0,0 +1,27 @@ +package fs + +import "sync" + +// TailFile is to tail and filter a log file. +type TailFile struct { + readFile +} + +// NewTailFile returns a new file tailer. +func NewTailFile(filePath string, globID string, serverMessages chan<- string, limiter chan struct{}) TailFile { + var mutex sync.Mutex + + return TailFile{ + readFile: readFile{ + filePath: filePath, + stop: make(chan struct{}), + globID: globID, + serverMessages: serverMessages, + retry: true, + canSkipLines: true, + seekEOF: true, + limiter: limiter, + mutex: &mutex, + }, + } +} |
