diff --git a/book/src/developing/asserting_logs.md b/book/src/developing/asserting_logs.md index 5ba148421..df66fd676 100644 --- a/book/src/developing/asserting_logs.md +++ b/book/src/developing/asserting_logs.md @@ -1,35 +1,47 @@ # Asserting Container Logs -You can either assert that CL nodes have no errors like that, we check `(CRIT|PANIC|FATAL)` levels by default for all the nodes +Use built-in critical-level assertion (`CRIT|PANIC|FATAL`) for Chainlink node logs: ```golang - in, err := framework.Load[Cfg](t) +in, err := framework.Load[Cfg](t) +require.NoError(t, err) +t.Cleanup(func() { + err := framework.SaveAndCheckLogs(t) require.NoError(t, err) - t.Cleanup(func() { - err := framework.SaveAndCheckLogs(t) - require.NoError(t, err) - }) +}) ``` -or customize file assertions +For custom checks, assert logs directly from streams with `StreamCTFContainerLogsFanout`. ```golang - in, err := framework.Load[Cfg](t) +re := regexp.MustCompile(`name=HeadReporter version=\d+`) +t.Cleanup(func() { + err := framework.StreamCTFContainerLogsFanout( + framework.LogStreamConsumer{ + Name: "custom-regex-assert", + Consume: func(logStreams map[string]io.ReadCloser) error { + for name, stream := range logStreams { + scanner := bufio.NewScanner(stream) + found := false + for scanner.Scan() { + if re.MatchString(scanner.Text()) { + found = true + break + } + } + if err := scanner.Err(); err != nil { + return fmt.Errorf("scan %s: %w", name, err) + } + if !found { + return fmt.Errorf("missing HeadReporter log in %s", name) + } + } + return nil + }, + }, + ) require.NoError(t, err) - t.Cleanup(func() { - // save all the logs to default directory "logs/docker-$test_name" - logs, err := framework.SaveContainerLogs(fmt.Sprintf("%s-%s", framework.DefaultCTFLogsDir, t.Name())) - require.NoError(t, err) - // check that CL nodes has no errors (CRIT|PANIC|FATAL) levels - err = framework.CheckCLNodeContainerErrors() - require.NoError(t, err) - // do custom assertions - for _, l := range logs { - matches, err := framework.SearchLogFile(l, " name=HeadReporter version=\\d") - require.NoError(t, err) - _ = matches - } - }) +}) ``` Full [example](https://github.com/smartcontractkit/chainlink-testing-framework/blob/main/framework/examples/myproject/smoke_logs_test.go) \ No newline at end of file diff --git a/framework/.changeset/v0.15.17.md b/framework/.changeset/v0.15.17.md new file mode 100644 index 000000000..1411645d1 --- /dev/null +++ b/framework/.changeset/v0.15.17.md @@ -0,0 +1,2 @@ +- Add Docker logs stream fanout and convert existing logs-based functions to consume streams +- Add function for checking & printing traces of panics found in Docker logs \ No newline at end of file diff --git a/framework/docker.go b/framework/docker.go index 784b50d63..6c033c24f 100644 --- a/framework/docker.go +++ b/framework/docker.go @@ -2,32 +2,22 @@ package framework import ( "archive/tar" - "bufio" "bytes" "context" - "encoding/binary" "fmt" "io" "os" "os/exec" "path/filepath" - "regexp" "strings" "sync" - "testing" "github.com/docker/docker/api/types/container" - dfilter "github.com/docker/docker/api/types/filters" "github.com/docker/docker/client" "github.com/docker/go-connections/nat" "github.com/google/uuid" "github.com/rs/zerolog" tc "github.com/testcontainers/testcontainers-go" - "golang.org/x/sync/errgroup" -) - -const ( - DefaultCTFLogsDir = "logs/docker" ) func IsDockerRunning() bool { @@ -245,166 +235,6 @@ func (dc *DockerClient) copyToContainer(containerID, sourceFile, targetPath stri return nil } -// SearchLogFile searches logfile using regex and return matches or error -func SearchLogFile(fp string, regex string) ([]string, error) { - file, err := os.Open(fp) - if err != nil { - return nil, err - } - defer file.Close() - scanner := bufio.NewScanner(file) - re, err := regexp.Compile(regex) - if err != nil { - return nil, err - } - matches := make([]string, 0) - for scanner.Scan() { - line := scanner.Text() - if re.MatchString(line) { - L.Info().Str("Regex", regex).Msg("Log match found") - matches = append(matches, line) - } - } - - if err := scanner.Err(); err != nil { - return matches, err - } - return matches, nil -} - -func SaveAndCheckLogs(t *testing.T) error { - _, err := SaveContainerLogs(fmt.Sprintf("%s-%s", DefaultCTFLogsDir, t.Name())) - if err != nil { - return err - } - err = CheckCLNodeContainerErrors() - if err != nil { - return err - } - return nil -} - -// SaveContainerLogs writes all Docker container logs to some directory -func SaveContainerLogs(dir string) ([]string, error) { - L.Info().Msg("Writing Docker containers logs") - if _, err := os.Stat(dir); os.IsNotExist(err) { - if err := os.MkdirAll(dir, 0755); err != nil { - return nil, fmt.Errorf("failed to create directory %s: %w", dir, err) - } - } - - logStream, lErr := StreamContainerLogs(container.ListOptions{ - All: true, - Filters: dfilter.NewArgs(dfilter.KeyValuePair{ - Key: "label", - Value: "framework=ctf", - }), - }, container.LogsOptions{ShowStdout: true, ShowStderr: true}) - - if lErr != nil { - return nil, lErr - } - - eg := &errgroup.Group{} - logFilePaths := make([]string, 0) - for containerName, reader := range logStream { - eg.Go(func() error { - logFilePath := filepath.Join(dir, fmt.Sprintf("%s.log", containerName)) - logFile, err := os.Create(logFilePath) - if err != nil { - L.Error().Err(err).Str("Container", containerName).Msg("failed to create container log file") - return err - } - logFilePaths = append(logFilePaths, logFilePath) - // Parse and write logs - header := make([]byte, 8) // Docker stream header is 8 bytes - for { - _, err := io.ReadFull(reader, header) - if err == io.EOF { - break - } - if err != nil { - L.Error().Err(err).Str("Container", containerName).Msg("failed to read log stream header") - break - } - - // Extract log message size - msgSize := binary.BigEndian.Uint32(header[4:8]) - - // Read the log message - msg := make([]byte, msgSize) - _, err = io.ReadFull(reader, msg) - if err != nil { - L.Error().Err(err).Str("Container", containerName).Msg("failed to read log message") - break - } - - // Write the log message to the file - if _, err := logFile.Write(msg); err != nil { - L.Error().Err(err).Str("Container", containerName).Msg("failed to write log message to file") - break - } - } - return nil - }) - } - if err := eg.Wait(); err != nil { - return nil, err - } - return logFilePaths, nil -} - -var ExitedCtfContainersListOpts = container.ListOptions{ - All: true, - Filters: dfilter.NewArgs(dfilter.KeyValuePair{ - Key: "label", - Value: "framework=ctf", - }, - dfilter.KeyValuePair{ - Key: "status", - Value: "exited"}, - dfilter.KeyValuePair{ - Key: "status", - Value: "dead"}), -} - -func StreamContainerLogs(listOptions container.ListOptions, logOptions container.LogsOptions) (map[string]io.ReadCloser, error) { - L.Info().Msg("Streaming Docker containers logs") - provider, err := tc.NewDockerProvider() - if err != nil { - return nil, fmt.Errorf("failed to create Docker provider: %w", err) - } - containers, err := provider.Client().ContainerList(context.Background(), listOptions) - if err != nil { - return nil, fmt.Errorf("failed to list Docker containers: %w", err) - } - - eg := &errgroup.Group{} - logMap := make(map[string]io.ReadCloser) - var mutex sync.Mutex - - for _, containerInfo := range containers { - eg.Go(func() error { - containerName := containerInfo.Names[0] - L.Debug().Str("Container", containerName).Msg("Collecting logs") - logs, err := provider.Client().ContainerLogs(context.Background(), containerInfo.ID, logOptions) - if err != nil { - L.Error().Err(err).Str("Container", containerName).Msg("failed to fetch logs for container") - return err - } - mutex.Lock() - defer mutex.Unlock() - logMap[containerName] = logs - return nil - }) - } - if err := eg.Wait(); err != nil { - return nil, err - } - - return logMap, nil -} - func BuildImageOnce(once *sync.Once, dctx, dfile, nameAndTag string, buildArgs map[string]string) error { var err error once.Do(func() { @@ -416,6 +246,22 @@ func BuildImageOnce(once *sync.Once, dctx, dfile, nameAndTag string, buildArgs m return err } +func safeContainerName(info container.Summary) string { + if len(info.Names) > 0 { + name := strings.TrimPrefix(info.Names[0], "/") + if name != "" { + // defensive: docker names normally don't include "/" beyond prefix, + // but this guarantees safe map keys and filenames. + return strings.ReplaceAll(name, "/", "_") + } + } + // fallback when Names is missing/unexpected + if len(info.ID) >= 12 { + return info.ID[:12] + } + return info.ID +} + func BuildImage(dctx, dfile, nameAndTag string, buildArgs map[string]string) error { dfilePath := filepath.Join(dctx, dfile) diff --git a/framework/docker_fanout_test.go b/framework/docker_fanout_test.go new file mode 100644 index 000000000..17820587d --- /dev/null +++ b/framework/docker_fanout_test.go @@ -0,0 +1,133 @@ +package framework + +import ( + "bytes" + "encoding/binary" + "io" + "os" + "path/filepath" + "slices" + "strings" + "sync" + "testing" +) + +func TestFanoutContainerLogsReplicatesBytesToAllConsumers(t *testing.T) { + containerName := "container-a" + payload := dockerMuxPayload("line-1\n", "line-2\n") + logStream := map[string]io.ReadCloser{ + containerName: io.NopCloser(bytes.NewReader(payload)), + } + + type consumeResult struct { + name string + data []byte + } + var results []consumeResult + var resultsMu sync.Mutex + + consumers := []LogStreamConsumer{ + { + Name: "consumer-1", + Consume: func(streams map[string]io.ReadCloser) error { + data, err := io.ReadAll(streams[containerName]) + if err != nil { + return err + } + resultsMu.Lock() + results = append(results, consumeResult{name: "consumer-1", data: data}) + resultsMu.Unlock() + return nil + }, + }, + { + Name: "consumer-2", + Consume: func(streams map[string]io.ReadCloser) error { + data, err := io.ReadAll(streams[containerName]) + if err != nil { + return err + } + resultsMu.Lock() + results = append(results, consumeResult{name: "consumer-2", data: data}) + resultsMu.Unlock() + return nil + }, + }, + } + + err := fanoutContainerLogs(logStream, consumers...) + if err != nil { + t.Fatalf("expected no error, got: %v", err) + } + + if len(results) != 2 { + t.Fatalf("expected 2 consumer results, got: %d", len(results)) + } + + slices.SortFunc(results, func(a, b consumeResult) int { + if a.name < b.name { + return -1 + } + if a.name > b.name { + return 1 + } + return 0 + }) + for _, result := range results { + if !bytes.Equal(result.data, payload) { + t.Fatalf("consumer %s did not receive full payload", result.name) + } + } +} + +func TestSaveContainerLogsFromStreams(t *testing.T) { + tDir := t.TempDir() + logStreams := map[string]io.ReadCloser{ + "node-a": io.NopCloser(bytes.NewReader(dockerMuxPayload("a-1\n", "a-2\n"))), + "node-b": io.NopCloser(bytes.NewReader(dockerMuxPayload("b-1\n"))), + "node-c": io.NopCloser(bytes.NewReader(dockerMuxPayload("c-1\n", "c-2\n", "c-3\n"))), + } + + paths, err := SaveContainerLogsFromStreams(tDir, logStreams) + if err != nil { + t.Fatalf("expected no error, got: %v", err) + } + + if len(paths) != len(logStreams) { + t.Fatalf("expected %d files, got %d", len(logStreams), len(paths)) + } + + for _, p := range paths { + content, readErr := os.ReadFile(filepath.Clean(p)) + if readErr != nil { + t.Fatalf("failed to read log file %s: %v", p, readErr) + } + if strings.TrimSpace(string(content)) == "" { + t.Fatalf("expected non-empty log file at %s", p) + } + } +} + +func TestPrintFailedContainerLogsFromStreams(t *testing.T) { + logStreams := map[string]io.ReadCloser{ + "node-a": io.NopCloser(bytes.NewReader(dockerMuxPayload("error line\n"))), + "node-b": io.NopCloser(bytes.NewReader(dockerMuxPayload("warn line\n", "trace line\n"))), + } + + if err := PrintFailedContainerLogsFromStreams(logStreams, 30); err != nil { + t.Fatalf("expected no error, got: %v", err) + } +} + +func dockerMuxPayload(lines ...string) []byte { + var out bytes.Buffer + for _, line := range lines { + msg := []byte(line) + header := make([]byte, 8) + header[0] = 1 + binary.BigEndian.PutUint32(header[4:], uint32(len(msg))) // #nosec: G115 + out.Write(header) + out.Write(msg) + } + return out.Bytes() +} diff --git a/framework/examples/myproject/smoke_logs_test.go b/framework/examples/myproject/smoke_logs_test.go index 6c42f0a15..728eb4d74 100644 --- a/framework/examples/myproject/smoke_logs_test.go +++ b/framework/examples/myproject/smoke_logs_test.go @@ -1,7 +1,10 @@ package examples import ( + "bufio" "fmt" + "io" + "regexp" "testing" "github.com/stretchr/testify/require" @@ -22,23 +25,38 @@ func TestLogsSmoke(t *testing.T) { in, err := framework.Load[CfgLogs](t) require.NoError(t, err) // most simple checks, save all the logs and check (CRIT|PANIC|FATAL) log levels - //t.Cleanup(func() { - // err := framework.SaveAndCheckLogs(t) - // require.NoError(t, err) - //}) t.Cleanup(func() { - // save all the logs to default directory "logs/docker-$test_name" - logs, err := framework.SaveContainerLogs(fmt.Sprintf("%s-%s", framework.DefaultCTFLogsDir, t.Name())) + err := framework.SaveAndCheckLogs(t) require.NoError(t, err) - // check that CL nodes has no errors (CRIT|PANIC|FATAL) levels - err = framework.CheckCLNodeContainerErrors() + }) + + re := regexp.MustCompile(`name=HeadReporter version=\d+`) + t.Cleanup(func() { + err := framework.StreamCTFContainerLogsFanout( + framework.LogStreamConsumer{ + Name: "custom-regex-assert", + Consume: func(logStreams map[string]io.ReadCloser) error { + for name, stream := range logStreams { + scanner := bufio.NewScanner(stream) + found := false + for scanner.Scan() { + if re.MatchString(scanner.Text()) { + found = true + break + } + } + if err := scanner.Err(); err != nil { + return fmt.Errorf("scan %s: %w", name, err) + } + if !found { + return fmt.Errorf("missing HeadReporter log in %s", name) + } + } + return nil + }, + }, + ) require.NoError(t, err) - // do custom assertions - for _, l := range logs { - matches, err := framework.SearchLogFile(l, " name=HeadReporter version=\\d") - require.NoError(t, err) - _ = matches - } }) bc, err := blockchain.NewBlockchainNetwork(in.BlockchainA) diff --git a/framework/logs.go b/framework/logs.go index c63c6ea9e..35efb6053 100644 --- a/framework/logs.go +++ b/framework/logs.go @@ -2,83 +2,573 @@ package framework import ( "bufio" + "context" + "encoding/binary" + "errors" "fmt" + "io" + "maps" "os" "path/filepath" "regexp" + "slices" + "strconv" "strings" + "sync" + "testing" + + "github.com/docker/docker/api/types/container" + dfilter "github.com/docker/docker/api/types/filters" + "github.com/rs/zerolog" + tc "github.com/testcontainers/testcontainers-go" + "golang.org/x/sync/errgroup" ) const ( EnvVarIgnoreCriticalLogs = "CTF_IGNORE_CRITICAL_LOGS" + DefaultCTFLogsDir = "logs/docker" ) -func getLogDirectories() ([]string, error) { - logDirs := make([]string, 0) - currentDir, err := os.Getwd() +var criticalLogLevelRegex = regexp.MustCompile(`(CRIT|PANIC|FATAL)`) + +func checkNodeLogStream(source string, r io.Reader) error { + scanner := bufio.NewScanner(r) + // safer for long log lines + scanner.Buffer(make([]byte, 64*1024), 1024*1024) + lineNumber := 1 + for scanner.Scan() { + line := scanner.Text() + if criticalLogLevelRegex.MatchString(line) { + return fmt.Errorf("source %s contains matching log level at line %d: %s", source, lineNumber, line) + } + lineNumber++ + } + if err := scanner.Err(); err != nil { + return fmt.Errorf("error reading source %s: %w", source, err) + } + return nil +} + +// New stream-first API +func checkNodeLogErrorsFromStreams(streams map[string]io.ReadCloser) error { + if os.Getenv(EnvVarIgnoreCriticalLogs) == "true" { + L.Warn().Msg(`CTF_IGNORE_CRITICAL_LOGS is set to true, we ignore all CRIT|FATAL|PANIC errors in node logs!`) + return nil + } + for name, rc := range streams { + if err := checkNodeLogStream(name, rc); err != nil { + _ = rc.Close() + return err + } + _ = rc.Close() + } + return nil +} + +func StreamContainerLogs(listOptions container.ListOptions, logOptions container.LogsOptions) (map[string]io.ReadCloser, error) { + L.Info().Msg("Streaming Docker containers logs") + provider, err := tc.NewDockerProvider() if err != nil { - return nil, fmt.Errorf("failed to get current directory: %w", err) + return nil, fmt.Errorf("failed to create Docker provider: %w", err) } - entries, err := os.ReadDir(currentDir) + containers, err := provider.Client().ContainerList(context.Background(), listOptions) if err != nil { - return nil, fmt.Errorf("failed to read directory: %w", err) + return nil, fmt.Errorf("failed to list Docker containers: %w", err) } - for _, entry := range entries { - if entry.IsDir() && strings.HasPrefix(entry.Name(), "logs-") { - logDirs = append(logDirs, filepath.Join(currentDir, entry.Name())) - } + + eg := &errgroup.Group{} + logMap := make(map[string]io.ReadCloser) + var mutex sync.Mutex + + for _, containerInfo := range containers { + eg.Go(func() error { + containerName := safeContainerName(containerInfo) + L.Debug().Str("Container", containerName).Msg("Collecting logs") + logs, err := provider.Client().ContainerLogs(context.Background(), containerInfo.ID, logOptions) + if err != nil { + L.Error().Err(err).Str("Container", containerName).Msg("failed to fetch logs for container") + return err + } + mutex.Lock() + defer mutex.Unlock() + logMap[containerName] = logs + return nil + }) + } + if err := eg.Wait(); err != nil { + return nil, err + } + + return logMap, nil +} + +func CTFContainersListOpts() container.ListOptions { + return container.ListOptions{ + All: true, + Filters: dfilter.NewArgs(dfilter.KeyValuePair{ + Key: "label", + Value: "framework=ctf", + }), } - return logDirs, nil } -// CheckCLNodeContainerErrors check if any CL node container logs has errors -func CheckCLNodeContainerErrors() error { - dirs, err := getLogDirectories() +func CTFContainersLogsOpts() container.LogsOptions { + return container.LogsOptions{ShowStdout: true, ShowStderr: true} +} + +// LogStreamConsumer represents a log stream consumer that receives one stream per container. +type LogStreamConsumer struct { + Name string + Consume func(map[string]io.ReadCloser) error +} + +// StreamContainerLogsFanout fetches container logs once and fans out streams to all consumers. +func StreamContainerLogsFanout(listOptions container.ListOptions, logOptions container.LogsOptions, consumers ...LogStreamConsumer) error { + logStream, err := StreamContainerLogs(listOptions, logOptions) if err != nil { return err } - for _, dd := range dirs { - if err := checkNodeLogErrors(dd); err != nil { - return err + + return fanoutContainerLogs(logStream, consumers...) +} + +// StreamCTFContainerLogsFanout fetches CTF logs once and fans out streams to all consumers. +func StreamCTFContainerLogsFanout(consumers ...LogStreamConsumer) error { + return StreamContainerLogsFanout(CTFContainersListOpts(), CTFContainersLogsOpts(), consumers...) +} + +func fanoutContainerLogs(logStream map[string]io.ReadCloser, consumers ...LogStreamConsumer) error { + if len(consumers) == 0 { + for _, reader := range logStream { + _ = reader.Close() } + return nil } - return nil + + consumerStreams := make([]map[string]io.ReadCloser, len(consumers)) + for i := range consumers { + consumerStreams[i] = make(map[string]io.ReadCloser, len(logStream)) + } + + pumpGroup := &errgroup.Group{} + for containerName, sourceReader := range logStream { + writers := make([]*io.PipeWriter, len(consumers)) + for i := range consumers { + reader, writer := io.Pipe() + consumerStreams[i][containerName] = reader + writers[i] = writer + } + + pumpGroup.Go(func() error { + defer func() { + _ = sourceReader.Close() + }() + + readBuf := make([]byte, 32*1024) + for { + n, readErr := sourceReader.Read(readBuf) + if n > 0 { + // NOTE: io.Pipe is unbuffered, so a slow consumer can still backpressure others. + // Future improvement: decouple per-consumer delivery with bounded buffering. + chunk := readBuf[:n] + for i, writer := range writers { + if writer == nil { + continue + } + if writeErr := writeAll(writer, chunk); writeErr != nil { + if errors.Is(writeErr, io.ErrClosedPipe) { + writers[i] = nil + continue + } + closeAllPipeWritersWithError(writers, fmt.Errorf("failed writing stream for container %s: %w", containerName, writeErr)) + return fmt.Errorf("failed writing stream for container %s: %w", containerName, writeErr) + } + } + } + + if readErr == io.EOF { + closeAllPipeWriters(writers) + return nil + } + if readErr != nil { + closeAllPipeWritersWithError(writers, readErr) + return fmt.Errorf("failed reading stream for container %s: %w", containerName, readErr) + } + } + }) + } + + consumerGroup := &errgroup.Group{} + for i, consumer := range consumers { + consumerGroup.Go(func() error { + defer closeAllPipeReaders(consumerStreams[i]) + + if consumer.Consume == nil { + return fmt.Errorf("consumer %q has nil Consume function", consumer.Name) + } + if err := consumer.Consume(consumerStreams[i]); err != nil { + return fmt.Errorf("consumer %q failed: %w", consumer.Name, err) + } + return nil + }) + } + + pumpErr := pumpGroup.Wait() + consumerErr := consumerGroup.Wait() + return errors.Join(pumpErr, consumerErr) } -// checkNodeLogsErrors check Chainlink nodes logs for error levels -func checkNodeLogErrors(dir string) error { - if os.Getenv(EnvVarIgnoreCriticalLogs) == "true" { - L.Warn().Msg(`CTF_IGNORE_CRITICAL_LOGS is set to true, we ignore all CRIT|FATAL|PANIC errors in node logs!`) - return nil +func writeDockerLogPayload(dst io.Writer, reader io.Reader) error { + header := make([]byte, 8) + for { + _, err := io.ReadFull(reader, header) + if errors.Is(err, io.EOF) { + return nil + } + if err != nil { + return fmt.Errorf("failed to read log stream header: %w", err) + } + + msgSize := binary.BigEndian.Uint32(header[4:8]) + msg := make([]byte, msgSize) + if _, err = io.ReadFull(reader, msg); err != nil { + return fmt.Errorf("failed to read log message: %w", err) + } + if _, err = dst.Write(msg); err != nil { + return fmt.Errorf("failed to write log message: %w", err) + } } - fileRegex := regexp.MustCompile(`^node.*\.log$`) - logLevelRegex := regexp.MustCompile(`(CRIT|PANIC|FATAL)`) +} - err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error { +func writeAll(writer io.Writer, data []byte) error { + for len(data) > 0 { + n, err := writer.Write(data) if err != nil { return err } - if !info.IsDir() && fileRegex.MatchString(info.Name()) { - file, err := os.Open(path) + data = data[n:] + } + return nil +} + +func closeAllPipeWriters(writers []*io.PipeWriter) { + for _, writer := range writers { + if writer == nil { + continue + } + _ = writer.Close() + } +} + +func closeAllPipeWritersWithError(writers []*io.PipeWriter, err error) { + for _, writer := range writers { + if writer == nil { + continue + } + _ = writer.CloseWithError(err) + } +} + +func closeAllPipeReaders(readers map[string]io.ReadCloser) { + for _, reader := range readers { + if reader == nil { + continue + } + _ = reader.Close() + } +} + +func SaveAndCheckLogs(t *testing.T) error { + return StreamCTFContainerLogsFanout( + LogStreamConsumer{ + Name: "save-container-logs", + Consume: func(logStreams map[string]io.ReadCloser) error { + _, logsErr := SaveContainerLogsFromStreams(fmt.Sprintf("%s-%s", DefaultCTFLogsDir, t.Name()), logStreams) + return logsErr + }, + }, + LogStreamConsumer{ + Name: "check-container-logs", + Consume: func(logStreams map[string]io.ReadCloser) error { + checkErr := checkNodeLogErrorsFromStreams(logStreams) + return checkErr + }, + }, + ) +} + +// SaveContainerLogs writes all Docker container logs to some directory +func SaveContainerLogs(dir string) ([]string, error) { + logStream, lErr := StreamContainerLogs(CTFContainersListOpts(), CTFContainersLogsOpts()) + + if lErr != nil { + return nil, lErr + } + + return SaveContainerLogsFromStreams(dir, logStream) +} + +// SaveContainerLogsFromStreams writes all provided Docker log streams to files in some directory. +func SaveContainerLogsFromStreams(dir string, logStream map[string]io.ReadCloser) ([]string, error) { + L.Info().Msg("Writing Docker containers logs") + if _, err := os.Stat(dir); os.IsNotExist(err) { + if err := os.MkdirAll(dir, 0755); err != nil { + return nil, fmt.Errorf("failed to create directory %s: %w", dir, err) + } + } + + eg := &errgroup.Group{} + logFilePaths := make([]string, 0) + var logFilePathsMu sync.Mutex + for containerName, reader := range logStream { + eg.Go(func() error { + defer func() { + _ = reader.Close() + }() + + logFilePath := filepath.Join(dir, fmt.Sprintf("%s.log", containerName)) + logFile, err := os.Create(logFilePath) if err != nil { - return fmt.Errorf("failed to open file %s: %w", path, err) + L.Error().Err(err).Str("Container", containerName).Msg("failed to create container log file") + return err } - defer file.Close() - - scanner := bufio.NewScanner(file) - lineNumber := 1 - for scanner.Scan() { - line := scanner.Text() - if logLevelRegex.MatchString(line) { - return fmt.Errorf("file %s contains a matching log level at line %d: %s", path, lineNumber, line) - } - lineNumber++ + defer func() { + _ = logFile.Close() + }() + + logFilePathsMu.Lock() + logFilePaths = append(logFilePaths, logFilePath) + logFilePathsMu.Unlock() + + if err := writeDockerLogPayload(logFile, reader); err != nil { + L.Error().Err(err).Str("Container", containerName).Msg("failed to write container logs") + return err + } + return nil + }) + } + if err := eg.Wait(); err != nil { + return nil, err + } + return logFilePaths, nil +} + +var ExitedCtfContainersListOpts = container.ListOptions{ + All: true, + Filters: dfilter.NewArgs(dfilter.KeyValuePair{ + Key: "label", + Value: "framework=ctf", + }, + dfilter.KeyValuePair{ + Key: "status", + Value: "exited"}, + dfilter.KeyValuePair{ + Key: "status", + Value: "dead"}), +} + +// PrintFailedContainerLogs writes exited/dead CTF containers' last log lines to stdout. +func PrintFailedContainerLogs(logLinesCount uint64) error { + logStream, lErr := StreamContainerLogs(ExitedCtfContainersListOpts, container.LogsOptions{ + ShowStderr: true, + Tail: strconv.FormatUint(logLinesCount, 10), + }) + if lErr != nil { + return lErr + } + + return PrintFailedContainerLogsFromStreams(logStream, logLinesCount) +} + +// PrintFailedContainerLogsFromStreams prints all provided container streams as red text. +func PrintFailedContainerLogsFromStreams(logStream map[string]io.ReadCloser, logLinesCount uint64) error { + if len(logStream) == 0 { + L.Info().Msg("No failed Docker containers found") + return nil + } + + L.Error().Msgf("Exited/dead containers: %s", strings.Join(slices.Collect(maps.Keys(logStream)), ", ")) + + eg := &errgroup.Group{} + for cName, ioReader := range logStream { + eg.Go(func() error { + defer func() { + _ = ioReader.Close() + }() + + var content strings.Builder + if err := writeDockerLogPayload(&content, ioReader); err != nil { + return fmt.Errorf("failed to read logs for container %s: %w", cName, err) + } + + trimmed := strings.TrimSpace(content.String()) + if len(trimmed) > 0 { + L.Info().Str("Container", cName).Msgf("Last %d lines of logs", logLinesCount) + fmt.Println(RedText("%s\n", trimmed)) + } + + return nil + }) + } + + return eg.Wait() +} + +func CheckContainersForPanics(maxLinesAfterPanic int) bool { + logstream, err := StreamContainerLogs(CTFContainersListOpts(), CTFContainersLogsOpts()) + if err != nil { + L.Error().Err(err).Msg("failed to stream container logs for panic check") + return true + } + + return CheckContainersForPanicsFromStreams(logstream, maxLinesAfterPanic) +} + +// CheckContainersForPanicsFromStreams scans the provided stream (usually Docker container logs) for panic-related patterns. +// When a panic is detected, it displays the panic line and up to maxLinesAfterPanic lines following it. +// +// This is useful for debugging test failures where a Docker container may have panicked. +// The function searches for common panic patterns including: +// - "panic:" - Standard Go panic +// - "runtime error:" - Runtime errors (nil pointer, index out of bounds, etc.) +// - "fatal error:" - Fatal errors +// - "goroutine N [running]" - Stack trace indicators +// +// The function scans all containers in parallel and stops as soon as the first panic is found. +// +// Parameters: +// - logStream: Map of container names to their log streams (io.ReadCloser). +// - maxLinesAfterPanic: Maximum number of lines to show after the panic line (including stack trace). +// Recommended: 50-200 depending on how much context you want. +// +// Returns: +// - true if any panics were found in any container, false otherwise +func CheckContainersForPanicsFromStreams(logStream map[string]io.ReadCloser, maxLinesAfterPanic int) bool { + // Panic patterns to search for + panicPatterns := map[string]*regexp.Regexp{ + "panic": regexp.MustCompile(`(?i)panic:`), // Go panic + "runtime error": regexp.MustCompile(`(?i)runtime error:`), // Runtime errors + "fatal error": regexp.MustCompile(`(?i)fatal error:`), // Fatal errors + "stack trace": regexp.MustCompile(`goroutine \d+ \[running\]`), // Stack trace indicator + } + + // Create context for early cancellation when first panic is found + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Channel to receive panic results + panicFoundChan := make(chan bool, len(logStream)) + var wg sync.WaitGroup + + // Scan all containers in parallel + for containerName, reader := range logStream { + wg.Add(1) + go func(name string, r io.ReadCloser) { + defer wg.Done() + defer r.Close() + + panicFound := scanContainerForPanics(ctx, L, name, r, panicPatterns, maxLinesAfterPanic) + if panicFound { + panicFoundChan <- true + cancel() // Signal other goroutines to stop } - if err := scanner.Err(); err != nil { - return fmt.Errorf("error reading file %s: %w", path, err) + }(containerName, reader) + } + + // Wait for all goroutines to finish in a separate goroutine + go func() { + wg.Wait() + close(panicFoundChan) + }() + + // Check if any panic was found + for range panicFoundChan { + return true // Return as soon as first panic is found + } + + L.Info().Msg("No panics detected in any container logs") + + return false +} + +// scanContainerForPanics scans a single container's log stream for panic patterns +// It checks the context for cancellation to enable early termination when another goroutine finds a panic +func scanContainerForPanics(ctx context.Context, logger zerolog.Logger, containerName string, reader io.Reader, patterns map[string]*regexp.Regexp, maxLinesAfter int) bool { + scanner := bufio.NewScanner(reader) + // Increase buffer size to handle large log lines + scanner.Buffer(make([]byte, 64*1024), 1024*1024) + + var logLines []string + lineNum := 0 + panicLineNum := -1 + patternNameFound := "" + + // Read all lines and detect panic + for scanner.Scan() { + // Check if context is cancelled (another goroutine found a panic) + select { + case <-ctx.Done(): + return false // Stop scanning, another container already found a panic + default: + } + + line := scanner.Text() + lineNum++ + + // If we found a panic, collect remaining lines up to maxLinesAfter + if panicLineNum >= 0 { + logLines = append(logLines, line) + // Stop reading once we have enough context after the panic + if lineNum >= panicLineNum+maxLinesAfter+1 { + break } + continue } - return nil - }) - return err + + // Still searching for panic - store all lines + logLines = append(logLines, line) + + // Check if this line contains a panic pattern + for patternName, pattern := range patterns { + if pattern.MatchString(line) { + patternNameFound = patternName + panicLineNum = lineNum - 1 // Store index (0-based) + break + } + } + } + + if err := scanner.Err(); err != nil { + logger.Error().Err(err).Str("Container", containerName).Msg("error reading container logs") + return false + } + + // If panic was found, display it with context + if panicLineNum >= 0 { + logger.Error(). + Str("Container", containerName). + Int("PanicLineNumber", panicLineNum+1). + Msgf("🔥 %s DETECTED in container logs", strings.ToUpper(patternNameFound)) + + // Calculate range to display + startLine := panicLineNum + endLine := min(len(logLines), panicLineNum+maxLinesAfter+1) + + // Build the output + var output strings.Builder + fmt.Fprintf(&output, "\n%s\n", strings.Repeat("=", 80)) + fmt.Fprintf(&output, "%s FOUND IN CONTAINER: %s (showing %d lines from panic)\n", strings.ToUpper(patternNameFound), containerName, endLine-startLine) + fmt.Fprintf(&output, "%s\n", strings.Repeat("=", 80)) + + for i := startLine; i < endLine; i++ { + fmt.Fprintf(&output, "%s\n", logLines[i]) + } + + fmt.Fprintf(&output, "%s\n", strings.Repeat("=", 80)) + + fmt.Println(RedText("%s\n", output.String())) + return true + } + + return false } diff --git a/framework/logs_test.go b/framework/logs_test.go index 31a363933..80e75863f 100644 --- a/framework/logs_test.go +++ b/framework/logs_test.go @@ -1,43 +1,46 @@ package framework import ( - "path/filepath" + "io" + "strings" "testing" ) -// Table test for checkLogFilesForLevels -func TestCheckLogFilesForLevels(t *testing.T) { +func TestCheckNodeLogErrorsFromStreams(t *testing.T) { tests := []struct { name string - dir string content string expectError bool }{ { name: "Clean", - dir: "clean", + content: `{"level":"info","msg":"all good"}`, expectError: false, }, { name: "Contains CRIT", - dir: "crit", + content: `{"level":"error","msg":"CRIT happened"}`, expectError: true, }, { name: "Contains PANIC", - dir: "panic", + content: `PANIC: something bad`, expectError: true, }, { name: "Contains FATAL", - dir: "fatal", + content: `{"msg":"FATAL condition"}`, expectError: true, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - err := checkNodeLogErrors(filepath.Join("testdata", tt.dir)) + streams := map[string]io.ReadCloser{ + tt.name: io.NopCloser(strings.NewReader(tt.content)), + } + + err := checkNodeLogErrorsFromStreams(streams) if tt.expectError && err == nil { t.Errorf("expected error but got none") } else if !tt.expectError && err != nil { diff --git a/framework/text.go b/framework/text.go new file mode 100644 index 000000000..9353e27f0 --- /dev/null +++ b/framework/text.go @@ -0,0 +1,7 @@ +package framework + +import "fmt" + +func RedText(text string, args ...any) string { + return fmt.Sprintf("\033[31m%s\033[0m", fmt.Sprintf(text, args...)) +}