From 5d4766879c9fd7ccf3b5e04b9f9b7ca5a8bfceee Mon Sep 17 00:00:00 2001 From: Bartek Tofel Date: Wed, 15 Apr 2026 10:54:06 +0200 Subject: [PATCH 1/8] add docker log stream fanout, so that we don't need to read log stream multiple times in each log-consuming function --- framework/docker.go | 280 +++++++++++++++++++++++++++----- framework/docker_fanout_test.go | 133 +++++++++++++++ framework/text.go | 7 + 3 files changed, 381 insertions(+), 39 deletions(-) create mode 100644 framework/docker_fanout_test.go create mode 100644 framework/text.go diff --git a/framework/docker.go b/framework/docker.go index 784b50d63..b6503fc9a 100644 --- a/framework/docker.go +++ b/framework/docker.go @@ -6,12 +6,16 @@ import ( "bytes" "context" "encoding/binary" + "errors" "fmt" "io" + "maps" "os" "os/exec" "path/filepath" "regexp" + "slices" + "strconv" "strings" "sync" "testing" @@ -30,6 +34,20 @@ const ( DefaultCTFLogsDir = "logs/docker" ) +func CTFContainersListOpts() container.ListOptions { + return container.ListOptions{ + All: true, + Filters: dfilter.NewArgs(dfilter.KeyValuePair{ + Key: "label", + Value: "framework=ctf", + }), + } +} + +func CTFContainersLogsOpts() container.LogsOptions { + return container.LogsOptions{ShowStdout: true, ShowStderr: true} +} + func IsDockerRunning() bool { cli, err := client.NewClientWithOpts(client.FromEnv) if err != nil { @@ -286,6 +304,17 @@ func SaveAndCheckLogs(t *testing.T) error { // SaveContainerLogs writes all Docker container logs to some directory func SaveContainerLogs(dir string) ([]string, error) { + logStream, lErr := StreamContainerLogs(CTFContainersListOpts(), CTFContainersLogsOpts()) + + if lErr != nil { + return nil, lErr + } + + return SaveContainerLogsFromStreams(dir, logStream) +} + +// SaveContainerLogsFromStreams writes all provided Docker log streams to files in some directory. +func SaveContainerLogsFromStreams(dir string, logStream map[string]io.ReadCloser) ([]string, error) { L.Info().Msg("Writing Docker containers logs") if _, err := os.Stat(dir); os.IsNotExist(err) { if err := os.MkdirAll(dir, 0755); err != nil { @@ -293,57 +322,32 @@ func SaveContainerLogs(dir string) ([]string, error) { } } - logStream, lErr := StreamContainerLogs(container.ListOptions{ - All: true, - Filters: dfilter.NewArgs(dfilter.KeyValuePair{ - Key: "label", - Value: "framework=ctf", - }), - }, container.LogsOptions{ShowStdout: true, ShowStderr: true}) - - if lErr != nil { - return nil, lErr - } - eg := &errgroup.Group{} logFilePaths := make([]string, 0) + var logFilePathsMu sync.Mutex for containerName, reader := range logStream { eg.Go(func() error { + defer func() { + _ = reader.Close() + }() + logFilePath := filepath.Join(dir, fmt.Sprintf("%s.log", containerName)) logFile, err := os.Create(logFilePath) if err != nil { L.Error().Err(err).Str("Container", containerName).Msg("failed to create container log file") return err } - logFilePaths = append(logFilePaths, logFilePath) - // Parse and write logs - header := make([]byte, 8) // Docker stream header is 8 bytes - for { - _, err := io.ReadFull(reader, header) - if err == io.EOF { - break - } - if err != nil { - L.Error().Err(err).Str("Container", containerName).Msg("failed to read log stream header") - break - } + defer func() { + _ = logFile.Close() + }() - // Extract log message size - msgSize := binary.BigEndian.Uint32(header[4:8]) - - // Read the log message - msg := make([]byte, msgSize) - _, err = io.ReadFull(reader, msg) - if err != nil { - L.Error().Err(err).Str("Container", containerName).Msg("failed to read log message") - break - } + logFilePathsMu.Lock() + logFilePaths = append(logFilePaths, logFilePath) + logFilePathsMu.Unlock() - // Write the log message to the file - if _, err := logFile.Write(msg); err != nil { - L.Error().Err(err).Str("Container", containerName).Msg("failed to write log message to file") - break - } + if err := writeDockerLogPayload(logFile, reader); err != nil { + L.Error().Err(err).Str("Container", containerName).Msg("failed to write container logs") + return err } return nil }) @@ -354,6 +358,53 @@ func SaveContainerLogs(dir string) ([]string, error) { return logFilePaths, nil } +// PrintFailedContainerLogs writes exited/dead CTF containers' last log lines to stdout. +func PrintFailedContainerLogs(logLinesCount uint64) error { + logStream, lErr := StreamContainerLogs(ExitedCtfContainersListOpts, container.LogsOptions{ + ShowStderr: true, + Tail: strconv.FormatUint(logLinesCount, 10), + }) + if lErr != nil { + return lErr + } + + return PrintFailedContainerLogsFromStreams(logStream, logLinesCount) +} + +// PrintFailedContainerLogsFromStreams prints all provided container streams as red text. +func PrintFailedContainerLogsFromStreams(logStream map[string]io.ReadCloser, logLinesCount uint64) error { + if len(logStream) == 0 { + L.Info().Msg("No failed Docker containers found") + return nil + } + + L.Error().Msgf("Containers that exited with non-zero codes: %s", strings.Join(slices.Collect(maps.Keys(logStream)), ", ")) + + eg := &errgroup.Group{} + for cName, ioReader := range logStream { + eg.Go(func() error { + defer func() { + _ = ioReader.Close() + }() + + var content strings.Builder + if err := writeDockerLogPayload(&content, ioReader); err != nil { + return fmt.Errorf("failed to read logs for container %s: %w", cName, err) + } + + trimmed := strings.TrimSpace(content.String()) + if len(trimmed) > 0 { + L.Info().Str("Container", cName).Msgf("Last %d lines of logs", logLinesCount) + fmt.Println(RedText("%s\n", trimmed)) + } + + return nil + }) + } + + return eg.Wait() +} + var ExitedCtfContainersListOpts = container.ListOptions{ All: true, Filters: dfilter.NewArgs(dfilter.KeyValuePair{ @@ -405,6 +456,157 @@ func StreamContainerLogs(listOptions container.ListOptions, logOptions container return logMap, nil } +// LogStreamConsumer represents a log stream consumer that receives one stream per container. +type LogStreamConsumer struct { + Name string + Consume func(map[string]io.ReadCloser) error +} + +// StreamContainerLogsFanout fetches container logs once and fans out streams to all consumers. +func StreamContainerLogsFanout(listOptions container.ListOptions, logOptions container.LogsOptions, consumers ...LogStreamConsumer) error { + logStream, err := StreamContainerLogs(listOptions, logOptions) + if err != nil { + return err + } + + return fanoutContainerLogs(logStream, consumers...) +} + +// StreamCTFContainerLogsFanout fetches CTF logs once and fans out streams to all consumers. +func StreamCTFContainerLogsFanout(consumers ...LogStreamConsumer) error { + return StreamContainerLogsFanout(CTFContainersListOpts(), CTFContainersLogsOpts(), consumers...) +} + +func fanoutContainerLogs(logStream map[string]io.ReadCloser, consumers ...LogStreamConsumer) error { + if len(consumers) == 0 { + for _, reader := range logStream { + _ = reader.Close() + } + return nil + } + + consumerStreams := make([]map[string]io.ReadCloser, len(consumers)) + for i := range consumers { + consumerStreams[i] = make(map[string]io.ReadCloser, len(logStream)) + } + + pumpGroup := &errgroup.Group{} + for containerName, sourceReader := range logStream { + writers := make([]*io.PipeWriter, len(consumers)) + for i := range consumers { + reader, writer := io.Pipe() + consumerStreams[i][containerName] = reader + writers[i] = writer + } + + pumpGroup.Go(func() error { + defer func() { + _ = sourceReader.Close() + }() + + readBuf := make([]byte, 32*1024) + for { + n, readErr := sourceReader.Read(readBuf) + if n > 0 { + chunk := readBuf[:n] + for i, writer := range writers { + if writer == nil { + continue + } + if writeErr := writeAll(writer, chunk); writeErr != nil { + if errors.Is(writeErr, io.ErrClosedPipe) { + writers[i] = nil + continue + } + closeAllPipeWritersWithError(writers, fmt.Errorf("failed writing stream for container %s: %w", containerName, writeErr)) + return fmt.Errorf("failed writing stream for container %s: %w", containerName, writeErr) + } + } + } + + if readErr == io.EOF { + closeAllPipeWriters(writers) + return nil + } + if readErr != nil { + closeAllPipeWritersWithError(writers, readErr) + return fmt.Errorf("failed reading stream for container %s: %w", containerName, readErr) + } + } + }) + } + + consumerGroup := &errgroup.Group{} + for i, consumer := range consumers { + i := i + consumer := consumer + consumerGroup.Go(func() error { + if consumer.Consume == nil { + return fmt.Errorf("consumer %q has nil Consume function", consumer.Name) + } + if err := consumer.Consume(consumerStreams[i]); err != nil { + return fmt.Errorf("consumer %q failed: %w", consumer.Name, err) + } + return nil + }) + } + + pumpErr := pumpGroup.Wait() + consumerErr := consumerGroup.Wait() + return errors.Join(pumpErr, consumerErr) +} + +func writeDockerLogPayload(dst io.Writer, reader io.Reader) error { + header := make([]byte, 8) + for { + _, err := io.ReadFull(reader, header) + if err == io.EOF { + return nil + } + if err != nil { + return fmt.Errorf("failed to read log stream header: %w", err) + } + + msgSize := binary.BigEndian.Uint32(header[4:8]) + msg := make([]byte, msgSize) + if _, err = io.ReadFull(reader, msg); err != nil { + return fmt.Errorf("failed to read log message: %w", err) + } + if _, err = dst.Write(msg); err != nil { + return fmt.Errorf("failed to write log message: %w", err) + } + } +} + +func writeAll(writer io.Writer, data []byte) error { + for len(data) > 0 { + n, err := writer.Write(data) + if err != nil { + return err + } + data = data[n:] + } + return nil +} + +func closeAllPipeWriters(writers []*io.PipeWriter) { + for _, writer := range writers { + if writer == nil { + continue + } + _ = writer.Close() + } +} + +func closeAllPipeWritersWithError(writers []*io.PipeWriter, err error) { + for _, writer := range writers { + if writer == nil { + continue + } + _ = writer.CloseWithError(err) + } +} + func BuildImageOnce(once *sync.Once, dctx, dfile, nameAndTag string, buildArgs map[string]string) error { var err error once.Do(func() { diff --git a/framework/docker_fanout_test.go b/framework/docker_fanout_test.go new file mode 100644 index 000000000..9f803bbff --- /dev/null +++ b/framework/docker_fanout_test.go @@ -0,0 +1,133 @@ +package framework + +import ( + "bytes" + "encoding/binary" + "io" + "os" + "path/filepath" + "slices" + "strings" + "sync" + "testing" +) + +func TestFanoutContainerLogsReplicatesBytesToAllConsumers(t *testing.T) { + containerName := "container-a" + payload := dockerMuxPayload("line-1\n", "line-2\n") + logStream := map[string]io.ReadCloser{ + containerName: io.NopCloser(bytes.NewReader(payload)), + } + + type consumeResult struct { + name string + data []byte + } + var results []consumeResult + var resultsMu sync.Mutex + + consumers := []LogStreamConsumer{ + { + Name: "consumer-1", + Consume: func(streams map[string]io.ReadCloser) error { + data, err := io.ReadAll(streams[containerName]) + if err != nil { + return err + } + resultsMu.Lock() + results = append(results, consumeResult{name: "consumer-1", data: data}) + resultsMu.Unlock() + return nil + }, + }, + { + Name: "consumer-2", + Consume: func(streams map[string]io.ReadCloser) error { + data, err := io.ReadAll(streams[containerName]) + if err != nil { + return err + } + resultsMu.Lock() + results = append(results, consumeResult{name: "consumer-2", data: data}) + resultsMu.Unlock() + return nil + }, + }, + } + + err := fanoutContainerLogs(logStream, consumers...) + if err != nil { + t.Fatalf("expected no error, got: %v", err) + } + + if len(results) != 2 { + t.Fatalf("expected 2 consumer results, got: %d", len(results)) + } + + slices.SortFunc(results, func(a, b consumeResult) int { + if a.name < b.name { + return -1 + } + if a.name > b.name { + return 1 + } + return 0 + }) + for _, result := range results { + if !bytes.Equal(result.data, payload) { + t.Fatalf("consumer %s did not receive full payload", result.name) + } + } +} + +func TestSaveContainerLogsFromStreams(t *testing.T) { + tDir := t.TempDir() + logStreams := map[string]io.ReadCloser{ + "node-a": io.NopCloser(bytes.NewReader(dockerMuxPayload("a-1\n", "a-2\n"))), + "node-b": io.NopCloser(bytes.NewReader(dockerMuxPayload("b-1\n"))), + "node-c": io.NopCloser(bytes.NewReader(dockerMuxPayload("c-1\n", "c-2\n", "c-3\n"))), + } + + paths, err := SaveContainerLogsFromStreams(tDir, logStreams) + if err != nil { + t.Fatalf("expected no error, got: %v", err) + } + + if len(paths) != len(logStreams) { + t.Fatalf("expected %d files, got %d", len(logStreams), len(paths)) + } + + for _, p := range paths { + content, readErr := os.ReadFile(filepath.Clean(p)) + if readErr != nil { + t.Fatalf("failed to read log file %s: %v", p, readErr) + } + if strings.TrimSpace(string(content)) == "" { + t.Fatalf("expected non-empty log file at %s", p) + } + } +} + +func TestPrintFailedContainerLogsFromStreams(t *testing.T) { + logStreams := map[string]io.ReadCloser{ + "node-a": io.NopCloser(bytes.NewReader(dockerMuxPayload("error line\n"))), + "node-b": io.NopCloser(bytes.NewReader(dockerMuxPayload("warn line\n", "trace line\n"))), + } + + if err := PrintFailedContainerLogsFromStreams(logStreams, 30); err != nil { + t.Fatalf("expected no error, got: %v", err) + } +} + +func dockerMuxPayload(lines ...string) []byte { + var out bytes.Buffer + for _, line := range lines { + msg := []byte(line) + header := make([]byte, 8) + header[0] = 1 + binary.BigEndian.PutUint32(header[4:], uint32(len(msg))) + out.Write(header) + out.Write(msg) + } + return out.Bytes() +} diff --git a/framework/text.go b/framework/text.go new file mode 100644 index 000000000..9353e27f0 --- /dev/null +++ b/framework/text.go @@ -0,0 +1,7 @@ +package framework + +import "fmt" + +func RedText(text string, args ...any) string { + return fmt.Sprintf("\033[31m%s\033[0m", fmt.Sprintf(text, args...)) +} From c2e9f2ea2050ea7c0f6b662d4803136d61c7c7bf Mon Sep 17 00:00:00 2001 From: Bartek Tofel Date: Wed, 15 Apr 2026 11:08:44 +0200 Subject: [PATCH 2/8] add function for printing panic-related Docker logs --- framework/docker.go | 149 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 149 insertions(+) diff --git a/framework/docker.go b/framework/docker.go index b6503fc9a..0e3863782 100644 --- a/framework/docker.go +++ b/framework/docker.go @@ -405,6 +405,155 @@ func PrintFailedContainerLogsFromStreams(logStream map[string]io.ReadCloser, log return eg.Wait() } +// CheckContainersForPanicsFromStreams scans the provided stream (usually Docker container logs) for panic-related patterns. +// When a panic is detected, it displays the panic line and up to maxLinesAfterPanic lines following it. +// +// This is useful for debugging test failures where a Docker container may have panicked. +// The function searches for common panic patterns including: +// - "panic:" - Standard Go panic +// - "runtime error:" - Runtime errors (nil pointer, index out of bounds, etc.) +// - "fatal error:" - Fatal errors +// - "goroutine N [running]" - Stack trace indicators +// +// The function scans all containers in parallel and stops as soon as the first panic is found. +// +// Parameters: +// - logStream: Map of container names to their log streams (io.ReadCloser). +// - maxLinesAfterPanic: Maximum number of lines to show after the panic line (including stack trace). +// Recommended: 50-200 depending on how much context you want. +// +// Returns: +// - true if any panics were found in any container, false otherwise +func CheckContainersForPanicsFromStreams(logStream map[string]io.ReadCloser, maxLinesAfterPanic int) bool { + // Panic patterns to search for + panicPatterns := map[string]*regexp.Regexp{ + "panic": regexp.MustCompile(`(?i)panic:`), // Go panic + "runtime error": regexp.MustCompile(`(?i)runtime error:`), // Runtime errors + "fatal error": regexp.MustCompile(`(?i)fatal error:`), // Fatal errors + "stack trace": regexp.MustCompile(`goroutine \d+ \[running\]`), // Stack trace indicator + } + + // Create context for early cancellation when first panic is found + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Channel to receive panic results + panicFoundChan := make(chan bool, len(logStream)) + var wg sync.WaitGroup + + // Scan all containers in parallel + for containerName, reader := range logStream { + wg.Add(1) + go func(name string, r io.ReadCloser) { + defer wg.Done() + defer r.Close() + + panicFound := scanContainerForPanics(ctx, L, name, r, panicPatterns, maxLinesAfterPanic) + if panicFound { + panicFoundChan <- true + cancel() // Signal other goroutines to stop + } + }(containerName, reader) + } + + // Wait for all goroutines to finish in a separate goroutine + go func() { + wg.Wait() + close(panicFoundChan) + }() + + // Check if any panic was found + for range panicFoundChan { + return true // Return as soon as first panic is found + } + + L.Info().Msg("No panics detected in any container logs") + + return false +} + +// scanContainerForPanics scans a single container's log stream for panic patterns +// It checks the context for cancellation to enable early termination when another goroutine finds a panic +func scanContainerForPanics(ctx context.Context, logger zerolog.Logger, containerName string, reader io.Reader, patterns map[string]*regexp.Regexp, maxLinesAfter int) bool { + scanner := bufio.NewScanner(reader) + // Increase buffer size to handle large log lines + scanner.Buffer(make([]byte, 64*1024), 1024*1024) + + var logLines []string + lineNum := 0 + panicLineNum := -1 + patternNameFound := "" + + // Read all lines and detect panic + for scanner.Scan() { + // Check if context is cancelled (another goroutine found a panic) + select { + case <-ctx.Done(): + return false // Stop scanning, another container already found a panic + default: + } + + line := scanner.Text() + lineNum++ + + // If we found a panic, collect remaining lines up to maxLinesAfter + if panicLineNum >= 0 { + logLines = append(logLines, line) + // Stop reading once we have enough context after the panic + if lineNum >= panicLineNum+maxLinesAfter+1 { + break + } + continue + } + + // Still searching for panic - store all lines + logLines = append(logLines, line) + + // Check if this line contains a panic pattern + for patternName, pattern := range patterns { + if pattern.MatchString(line) { + patternNameFound = patternName + panicLineNum = lineNum - 1 // Store index (0-based) + break + } + } + } + + if err := scanner.Err(); err != nil { + logger.Error().Err(err).Str("Container", containerName).Msg("error reading container logs") + return false + } + + // If panic was found, display it with context + if panicLineNum >= 0 { + logger.Error(). + Str("Container", containerName). + Int("PanicLineNumber", panicLineNum+1). + Msgf("🔥 %s DETECTED in container logs", strings.ToUpper(patternNameFound)) + + // Calculate range to display + startLine := panicLineNum + endLine := min(len(logLines), panicLineNum+maxLinesAfter+1) + + // Build the output + var output strings.Builder + fmt.Fprintf(&output, "\n%s\n", strings.Repeat("=", 80)) + fmt.Fprintf(&output, "%s FOUND IN CONTAINER: %s (showing %d lines from panic)\n", strings.ToUpper(patternNameFound), containerName, endLine-startLine) + fmt.Fprintf(&output, "%s\n", strings.Repeat("=", 80)) + + for i := startLine; i < endLine; i++ { + fmt.Fprintf(&output, "%s\n", logLines[i]) + } + + fmt.Fprintf(&output, "%s\n", strings.Repeat("=", 80)) + + fmt.Println(RedText("%s\n", output.String())) + return true + } + + return false +} + var ExitedCtfContainersListOpts = container.ListOptions{ All: true, Filters: dfilter.NewArgs(dfilter.KeyValuePair{ From b7f55c374a08d8173cd1676854e725afe5433ae6 Mon Sep 17 00:00:00 2001 From: Bartek Tofel Date: Wed, 15 Apr 2026 12:02:37 +0200 Subject: [PATCH 3/8] lints --- framework/docker.go | 2 +- framework/docker_fanout_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/framework/docker.go b/framework/docker.go index 0e3863782..e7ce774f4 100644 --- a/framework/docker.go +++ b/framework/docker.go @@ -709,7 +709,7 @@ func writeDockerLogPayload(dst io.Writer, reader io.Reader) error { header := make([]byte, 8) for { _, err := io.ReadFull(reader, header) - if err == io.EOF { + if errors.Is(err, io.EOF) { return nil } if err != nil { diff --git a/framework/docker_fanout_test.go b/framework/docker_fanout_test.go index 9f803bbff..17820587d 100644 --- a/framework/docker_fanout_test.go +++ b/framework/docker_fanout_test.go @@ -125,7 +125,7 @@ func dockerMuxPayload(lines ...string) []byte { msg := []byte(line) header := make([]byte, 8) header[0] = 1 - binary.BigEndian.PutUint32(header[4:], uint32(len(msg))) + binary.BigEndian.PutUint32(header[4:], uint32(len(msg))) // #nosec: G115 out.Write(header) out.Write(msg) } From ee58a21c8393271f3fb386eb18805d0bb473b00c Mon Sep 17 00:00:00 2001 From: Bartek Tofel Date: Wed, 15 Apr 2026 12:03:45 +0200 Subject: [PATCH 4/8] add changeset --- framework/.changeset/v0.15.17.md | 2 ++ 1 file changed, 2 insertions(+) create mode 100644 framework/.changeset/v0.15.17.md diff --git a/framework/.changeset/v0.15.17.md b/framework/.changeset/v0.15.17.md new file mode 100644 index 000000000..1411645d1 --- /dev/null +++ b/framework/.changeset/v0.15.17.md @@ -0,0 +1,2 @@ +- Add Docker logs stream fanout and convert existing logs-based functions to consume streams +- Add function for checking & printing traces of panics found in Docker logs \ No newline at end of file From a0edbb683b70a3e5b7cfeee1715d3ce8eddf7784 Mon Sep 17 00:00:00 2001 From: Bartek Tofel Date: Wed, 15 Apr 2026 14:25:35 +0200 Subject: [PATCH 5/8] CR changes --- framework/docker.go | 35 +++++++++++++++++++++++++++++++---- 1 file changed, 31 insertions(+), 4 deletions(-) diff --git a/framework/docker.go b/framework/docker.go index e7ce774f4..e46f3d525 100644 --- a/framework/docker.go +++ b/framework/docker.go @@ -378,7 +378,7 @@ func PrintFailedContainerLogsFromStreams(logStream map[string]io.ReadCloser, log return nil } - L.Error().Msgf("Containers that exited with non-zero codes: %s", strings.Join(slices.Collect(maps.Keys(logStream)), ", ")) + L.Error().Msgf("Exited/dead containers: %s", strings.Join(slices.Collect(maps.Keys(logStream)), ", ")) eg := &errgroup.Group{} for cName, ioReader := range logStream { @@ -585,7 +585,7 @@ func StreamContainerLogs(listOptions container.ListOptions, logOptions container for _, containerInfo := range containers { eg.Go(func() error { - containerName := containerInfo.Names[0] + containerName := safeContainerName(containerInfo) L.Debug().Str("Container", containerName).Msg("Collecting logs") logs, err := provider.Client().ContainerLogs(context.Background(), containerInfo.ID, logOptions) if err != nil { @@ -657,6 +657,8 @@ func fanoutContainerLogs(logStream map[string]io.ReadCloser, consumers ...LogStr for { n, readErr := sourceReader.Read(readBuf) if n > 0 { + // NOTE: io.Pipe is unbuffered, so a slow consumer can still backpressure others. + // Future improvement: decouple per-consumer delivery with bounded buffering. chunk := readBuf[:n] for i, writer := range writers { if writer == nil { @@ -687,9 +689,9 @@ func fanoutContainerLogs(logStream map[string]io.ReadCloser, consumers ...LogStr consumerGroup := &errgroup.Group{} for i, consumer := range consumers { - i := i - consumer := consumer consumerGroup.Go(func() error { + defer closeAllPipeReaders(consumerStreams[i]) + if consumer.Consume == nil { return fmt.Errorf("consumer %q has nil Consume function", consumer.Name) } @@ -756,6 +758,15 @@ func closeAllPipeWritersWithError(writers []*io.PipeWriter, err error) { } } +func closeAllPipeReaders(readers map[string]io.ReadCloser) { + for _, reader := range readers { + if reader == nil { + continue + } + _ = reader.Close() + } +} + func BuildImageOnce(once *sync.Once, dctx, dfile, nameAndTag string, buildArgs map[string]string) error { var err error once.Do(func() { @@ -767,6 +778,22 @@ func BuildImageOnce(once *sync.Once, dctx, dfile, nameAndTag string, buildArgs m return err } +func safeContainerName(info container.Summary) string { + if len(info.Names) > 0 { + name := strings.TrimPrefix(info.Names[0], "/") + if name != "" { + // defensive: docker names normally don't include "/" beyond prefix, + // but this guarantees safe map keys and filenames. + return strings.ReplaceAll(name, "/", "_") + } + } + // fallback when Names is missing/unexpected + if len(info.ID) >= 12 { + return info.ID[:12] + } + return info.ID +} + func BuildImage(dctx, dfile, nameAndTag string, buildArgs map[string]string) error { dfilePath := filepath.Join(dctx, dfile) From 3c909c24db159d329b230a6dc4a00392fb2b9397 Mon Sep 17 00:00:00 2001 From: Bartek Tofel Date: Wed, 15 Apr 2026 14:57:16 +0200 Subject: [PATCH 6/8] add a convenience wrapper --- framework/docker.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/framework/docker.go b/framework/docker.go index e46f3d525..372859b5e 100644 --- a/framework/docker.go +++ b/framework/docker.go @@ -405,6 +405,16 @@ func PrintFailedContainerLogsFromStreams(logStream map[string]io.ReadCloser, log return eg.Wait() } +func CheckContainersForPanics(maxLinesAfterPanic int) bool { + logstream, err := StreamContainerLogs(CTFContainersListOpts(), CTFContainersLogsOpts()) + if err != nil { + L.Error().Err(err).Msg("failed to stream container logs for panic check") + return true + } + + return CheckContainersForPanicsFromStreams(logstream, maxLinesAfterPanic) +} + // CheckContainersForPanicsFromStreams scans the provided stream (usually Docker container logs) for panic-related patterns. // When a panic is detected, it displays the panic line and up to maxLinesAfterPanic lines following it. // From 7f829c499d68b85ed6897cebf043ab5d411606cb Mon Sep 17 00:00:00 2001 From: Bartek Tofel Date: Wed, 15 Apr 2026 16:24:37 +0200 Subject: [PATCH 7/8] update docs + example --- book/src/developing/asserting_logs.md | 56 +- framework/docker.go | 542 ---------------- .../examples/myproject/smoke_logs_test.go | 46 +- framework/logs.go | 579 ++++++++++++++++-- 4 files changed, 601 insertions(+), 622 deletions(-) diff --git a/book/src/developing/asserting_logs.md b/book/src/developing/asserting_logs.md index 5ba148421..df66fd676 100644 --- a/book/src/developing/asserting_logs.md +++ b/book/src/developing/asserting_logs.md @@ -1,35 +1,47 @@ # Asserting Container Logs -You can either assert that CL nodes have no errors like that, we check `(CRIT|PANIC|FATAL)` levels by default for all the nodes +Use built-in critical-level assertion (`CRIT|PANIC|FATAL`) for Chainlink node logs: ```golang - in, err := framework.Load[Cfg](t) +in, err := framework.Load[Cfg](t) +require.NoError(t, err) +t.Cleanup(func() { + err := framework.SaveAndCheckLogs(t) require.NoError(t, err) - t.Cleanup(func() { - err := framework.SaveAndCheckLogs(t) - require.NoError(t, err) - }) +}) ``` -or customize file assertions +For custom checks, assert logs directly from streams with `StreamCTFContainerLogsFanout`. ```golang - in, err := framework.Load[Cfg](t) +re := regexp.MustCompile(`name=HeadReporter version=\d+`) +t.Cleanup(func() { + err := framework.StreamCTFContainerLogsFanout( + framework.LogStreamConsumer{ + Name: "custom-regex-assert", + Consume: func(logStreams map[string]io.ReadCloser) error { + for name, stream := range logStreams { + scanner := bufio.NewScanner(stream) + found := false + for scanner.Scan() { + if re.MatchString(scanner.Text()) { + found = true + break + } + } + if err := scanner.Err(); err != nil { + return fmt.Errorf("scan %s: %w", name, err) + } + if !found { + return fmt.Errorf("missing HeadReporter log in %s", name) + } + } + return nil + }, + }, + ) require.NoError(t, err) - t.Cleanup(func() { - // save all the logs to default directory "logs/docker-$test_name" - logs, err := framework.SaveContainerLogs(fmt.Sprintf("%s-%s", framework.DefaultCTFLogsDir, t.Name())) - require.NoError(t, err) - // check that CL nodes has no errors (CRIT|PANIC|FATAL) levels - err = framework.CheckCLNodeContainerErrors() - require.NoError(t, err) - // do custom assertions - for _, l := range logs { - matches, err := framework.SearchLogFile(l, " name=HeadReporter version=\\d") - require.NoError(t, err) - _ = matches - } - }) +}) ``` Full [example](https://github.com/smartcontractkit/chainlink-testing-framework/blob/main/framework/examples/myproject/smoke_logs_test.go) \ No newline at end of file diff --git a/framework/docker.go b/framework/docker.go index 372859b5e..6c033c24f 100644 --- a/framework/docker.go +++ b/framework/docker.go @@ -2,52 +2,24 @@ package framework import ( "archive/tar" - "bufio" "bytes" "context" - "encoding/binary" - "errors" "fmt" "io" - "maps" "os" "os/exec" "path/filepath" - "regexp" - "slices" - "strconv" "strings" "sync" - "testing" "github.com/docker/docker/api/types/container" - dfilter "github.com/docker/docker/api/types/filters" "github.com/docker/docker/client" "github.com/docker/go-connections/nat" "github.com/google/uuid" "github.com/rs/zerolog" tc "github.com/testcontainers/testcontainers-go" - "golang.org/x/sync/errgroup" ) -const ( - DefaultCTFLogsDir = "logs/docker" -) - -func CTFContainersListOpts() container.ListOptions { - return container.ListOptions{ - All: true, - Filters: dfilter.NewArgs(dfilter.KeyValuePair{ - Key: "label", - Value: "framework=ctf", - }), - } -} - -func CTFContainersLogsOpts() container.LogsOptions { - return container.LogsOptions{ShowStdout: true, ShowStderr: true} -} - func IsDockerRunning() bool { cli, err := client.NewClientWithOpts(client.FromEnv) if err != nil { @@ -263,520 +235,6 @@ func (dc *DockerClient) copyToContainer(containerID, sourceFile, targetPath stri return nil } -// SearchLogFile searches logfile using regex and return matches or error -func SearchLogFile(fp string, regex string) ([]string, error) { - file, err := os.Open(fp) - if err != nil { - return nil, err - } - defer file.Close() - scanner := bufio.NewScanner(file) - re, err := regexp.Compile(regex) - if err != nil { - return nil, err - } - matches := make([]string, 0) - for scanner.Scan() { - line := scanner.Text() - if re.MatchString(line) { - L.Info().Str("Regex", regex).Msg("Log match found") - matches = append(matches, line) - } - } - - if err := scanner.Err(); err != nil { - return matches, err - } - return matches, nil -} - -func SaveAndCheckLogs(t *testing.T) error { - _, err := SaveContainerLogs(fmt.Sprintf("%s-%s", DefaultCTFLogsDir, t.Name())) - if err != nil { - return err - } - err = CheckCLNodeContainerErrors() - if err != nil { - return err - } - return nil -} - -// SaveContainerLogs writes all Docker container logs to some directory -func SaveContainerLogs(dir string) ([]string, error) { - logStream, lErr := StreamContainerLogs(CTFContainersListOpts(), CTFContainersLogsOpts()) - - if lErr != nil { - return nil, lErr - } - - return SaveContainerLogsFromStreams(dir, logStream) -} - -// SaveContainerLogsFromStreams writes all provided Docker log streams to files in some directory. -func SaveContainerLogsFromStreams(dir string, logStream map[string]io.ReadCloser) ([]string, error) { - L.Info().Msg("Writing Docker containers logs") - if _, err := os.Stat(dir); os.IsNotExist(err) { - if err := os.MkdirAll(dir, 0755); err != nil { - return nil, fmt.Errorf("failed to create directory %s: %w", dir, err) - } - } - - eg := &errgroup.Group{} - logFilePaths := make([]string, 0) - var logFilePathsMu sync.Mutex - for containerName, reader := range logStream { - eg.Go(func() error { - defer func() { - _ = reader.Close() - }() - - logFilePath := filepath.Join(dir, fmt.Sprintf("%s.log", containerName)) - logFile, err := os.Create(logFilePath) - if err != nil { - L.Error().Err(err).Str("Container", containerName).Msg("failed to create container log file") - return err - } - defer func() { - _ = logFile.Close() - }() - - logFilePathsMu.Lock() - logFilePaths = append(logFilePaths, logFilePath) - logFilePathsMu.Unlock() - - if err := writeDockerLogPayload(logFile, reader); err != nil { - L.Error().Err(err).Str("Container", containerName).Msg("failed to write container logs") - return err - } - return nil - }) - } - if err := eg.Wait(); err != nil { - return nil, err - } - return logFilePaths, nil -} - -// PrintFailedContainerLogs writes exited/dead CTF containers' last log lines to stdout. -func PrintFailedContainerLogs(logLinesCount uint64) error { - logStream, lErr := StreamContainerLogs(ExitedCtfContainersListOpts, container.LogsOptions{ - ShowStderr: true, - Tail: strconv.FormatUint(logLinesCount, 10), - }) - if lErr != nil { - return lErr - } - - return PrintFailedContainerLogsFromStreams(logStream, logLinesCount) -} - -// PrintFailedContainerLogsFromStreams prints all provided container streams as red text. -func PrintFailedContainerLogsFromStreams(logStream map[string]io.ReadCloser, logLinesCount uint64) error { - if len(logStream) == 0 { - L.Info().Msg("No failed Docker containers found") - return nil - } - - L.Error().Msgf("Exited/dead containers: %s", strings.Join(slices.Collect(maps.Keys(logStream)), ", ")) - - eg := &errgroup.Group{} - for cName, ioReader := range logStream { - eg.Go(func() error { - defer func() { - _ = ioReader.Close() - }() - - var content strings.Builder - if err := writeDockerLogPayload(&content, ioReader); err != nil { - return fmt.Errorf("failed to read logs for container %s: %w", cName, err) - } - - trimmed := strings.TrimSpace(content.String()) - if len(trimmed) > 0 { - L.Info().Str("Container", cName).Msgf("Last %d lines of logs", logLinesCount) - fmt.Println(RedText("%s\n", trimmed)) - } - - return nil - }) - } - - return eg.Wait() -} - -func CheckContainersForPanics(maxLinesAfterPanic int) bool { - logstream, err := StreamContainerLogs(CTFContainersListOpts(), CTFContainersLogsOpts()) - if err != nil { - L.Error().Err(err).Msg("failed to stream container logs for panic check") - return true - } - - return CheckContainersForPanicsFromStreams(logstream, maxLinesAfterPanic) -} - -// CheckContainersForPanicsFromStreams scans the provided stream (usually Docker container logs) for panic-related patterns. -// When a panic is detected, it displays the panic line and up to maxLinesAfterPanic lines following it. -// -// This is useful for debugging test failures where a Docker container may have panicked. -// The function searches for common panic patterns including: -// - "panic:" - Standard Go panic -// - "runtime error:" - Runtime errors (nil pointer, index out of bounds, etc.) -// - "fatal error:" - Fatal errors -// - "goroutine N [running]" - Stack trace indicators -// -// The function scans all containers in parallel and stops as soon as the first panic is found. -// -// Parameters: -// - logStream: Map of container names to their log streams (io.ReadCloser). -// - maxLinesAfterPanic: Maximum number of lines to show after the panic line (including stack trace). -// Recommended: 50-200 depending on how much context you want. -// -// Returns: -// - true if any panics were found in any container, false otherwise -func CheckContainersForPanicsFromStreams(logStream map[string]io.ReadCloser, maxLinesAfterPanic int) bool { - // Panic patterns to search for - panicPatterns := map[string]*regexp.Regexp{ - "panic": regexp.MustCompile(`(?i)panic:`), // Go panic - "runtime error": regexp.MustCompile(`(?i)runtime error:`), // Runtime errors - "fatal error": regexp.MustCompile(`(?i)fatal error:`), // Fatal errors - "stack trace": regexp.MustCompile(`goroutine \d+ \[running\]`), // Stack trace indicator - } - - // Create context for early cancellation when first panic is found - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - // Channel to receive panic results - panicFoundChan := make(chan bool, len(logStream)) - var wg sync.WaitGroup - - // Scan all containers in parallel - for containerName, reader := range logStream { - wg.Add(1) - go func(name string, r io.ReadCloser) { - defer wg.Done() - defer r.Close() - - panicFound := scanContainerForPanics(ctx, L, name, r, panicPatterns, maxLinesAfterPanic) - if panicFound { - panicFoundChan <- true - cancel() // Signal other goroutines to stop - } - }(containerName, reader) - } - - // Wait for all goroutines to finish in a separate goroutine - go func() { - wg.Wait() - close(panicFoundChan) - }() - - // Check if any panic was found - for range panicFoundChan { - return true // Return as soon as first panic is found - } - - L.Info().Msg("No panics detected in any container logs") - - return false -} - -// scanContainerForPanics scans a single container's log stream for panic patterns -// It checks the context for cancellation to enable early termination when another goroutine finds a panic -func scanContainerForPanics(ctx context.Context, logger zerolog.Logger, containerName string, reader io.Reader, patterns map[string]*regexp.Regexp, maxLinesAfter int) bool { - scanner := bufio.NewScanner(reader) - // Increase buffer size to handle large log lines - scanner.Buffer(make([]byte, 64*1024), 1024*1024) - - var logLines []string - lineNum := 0 - panicLineNum := -1 - patternNameFound := "" - - // Read all lines and detect panic - for scanner.Scan() { - // Check if context is cancelled (another goroutine found a panic) - select { - case <-ctx.Done(): - return false // Stop scanning, another container already found a panic - default: - } - - line := scanner.Text() - lineNum++ - - // If we found a panic, collect remaining lines up to maxLinesAfter - if panicLineNum >= 0 { - logLines = append(logLines, line) - // Stop reading once we have enough context after the panic - if lineNum >= panicLineNum+maxLinesAfter+1 { - break - } - continue - } - - // Still searching for panic - store all lines - logLines = append(logLines, line) - - // Check if this line contains a panic pattern - for patternName, pattern := range patterns { - if pattern.MatchString(line) { - patternNameFound = patternName - panicLineNum = lineNum - 1 // Store index (0-based) - break - } - } - } - - if err := scanner.Err(); err != nil { - logger.Error().Err(err).Str("Container", containerName).Msg("error reading container logs") - return false - } - - // If panic was found, display it with context - if panicLineNum >= 0 { - logger.Error(). - Str("Container", containerName). - Int("PanicLineNumber", panicLineNum+1). - Msgf("🔥 %s DETECTED in container logs", strings.ToUpper(patternNameFound)) - - // Calculate range to display - startLine := panicLineNum - endLine := min(len(logLines), panicLineNum+maxLinesAfter+1) - - // Build the output - var output strings.Builder - fmt.Fprintf(&output, "\n%s\n", strings.Repeat("=", 80)) - fmt.Fprintf(&output, "%s FOUND IN CONTAINER: %s (showing %d lines from panic)\n", strings.ToUpper(patternNameFound), containerName, endLine-startLine) - fmt.Fprintf(&output, "%s\n", strings.Repeat("=", 80)) - - for i := startLine; i < endLine; i++ { - fmt.Fprintf(&output, "%s\n", logLines[i]) - } - - fmt.Fprintf(&output, "%s\n", strings.Repeat("=", 80)) - - fmt.Println(RedText("%s\n", output.String())) - return true - } - - return false -} - -var ExitedCtfContainersListOpts = container.ListOptions{ - All: true, - Filters: dfilter.NewArgs(dfilter.KeyValuePair{ - Key: "label", - Value: "framework=ctf", - }, - dfilter.KeyValuePair{ - Key: "status", - Value: "exited"}, - dfilter.KeyValuePair{ - Key: "status", - Value: "dead"}), -} - -func StreamContainerLogs(listOptions container.ListOptions, logOptions container.LogsOptions) (map[string]io.ReadCloser, error) { - L.Info().Msg("Streaming Docker containers logs") - provider, err := tc.NewDockerProvider() - if err != nil { - return nil, fmt.Errorf("failed to create Docker provider: %w", err) - } - containers, err := provider.Client().ContainerList(context.Background(), listOptions) - if err != nil { - return nil, fmt.Errorf("failed to list Docker containers: %w", err) - } - - eg := &errgroup.Group{} - logMap := make(map[string]io.ReadCloser) - var mutex sync.Mutex - - for _, containerInfo := range containers { - eg.Go(func() error { - containerName := safeContainerName(containerInfo) - L.Debug().Str("Container", containerName).Msg("Collecting logs") - logs, err := provider.Client().ContainerLogs(context.Background(), containerInfo.ID, logOptions) - if err != nil { - L.Error().Err(err).Str("Container", containerName).Msg("failed to fetch logs for container") - return err - } - mutex.Lock() - defer mutex.Unlock() - logMap[containerName] = logs - return nil - }) - } - if err := eg.Wait(); err != nil { - return nil, err - } - - return logMap, nil -} - -// LogStreamConsumer represents a log stream consumer that receives one stream per container. -type LogStreamConsumer struct { - Name string - Consume func(map[string]io.ReadCloser) error -} - -// StreamContainerLogsFanout fetches container logs once and fans out streams to all consumers. -func StreamContainerLogsFanout(listOptions container.ListOptions, logOptions container.LogsOptions, consumers ...LogStreamConsumer) error { - logStream, err := StreamContainerLogs(listOptions, logOptions) - if err != nil { - return err - } - - return fanoutContainerLogs(logStream, consumers...) -} - -// StreamCTFContainerLogsFanout fetches CTF logs once and fans out streams to all consumers. -func StreamCTFContainerLogsFanout(consumers ...LogStreamConsumer) error { - return StreamContainerLogsFanout(CTFContainersListOpts(), CTFContainersLogsOpts(), consumers...) -} - -func fanoutContainerLogs(logStream map[string]io.ReadCloser, consumers ...LogStreamConsumer) error { - if len(consumers) == 0 { - for _, reader := range logStream { - _ = reader.Close() - } - return nil - } - - consumerStreams := make([]map[string]io.ReadCloser, len(consumers)) - for i := range consumers { - consumerStreams[i] = make(map[string]io.ReadCloser, len(logStream)) - } - - pumpGroup := &errgroup.Group{} - for containerName, sourceReader := range logStream { - writers := make([]*io.PipeWriter, len(consumers)) - for i := range consumers { - reader, writer := io.Pipe() - consumerStreams[i][containerName] = reader - writers[i] = writer - } - - pumpGroup.Go(func() error { - defer func() { - _ = sourceReader.Close() - }() - - readBuf := make([]byte, 32*1024) - for { - n, readErr := sourceReader.Read(readBuf) - if n > 0 { - // NOTE: io.Pipe is unbuffered, so a slow consumer can still backpressure others. - // Future improvement: decouple per-consumer delivery with bounded buffering. - chunk := readBuf[:n] - for i, writer := range writers { - if writer == nil { - continue - } - if writeErr := writeAll(writer, chunk); writeErr != nil { - if errors.Is(writeErr, io.ErrClosedPipe) { - writers[i] = nil - continue - } - closeAllPipeWritersWithError(writers, fmt.Errorf("failed writing stream for container %s: %w", containerName, writeErr)) - return fmt.Errorf("failed writing stream for container %s: %w", containerName, writeErr) - } - } - } - - if readErr == io.EOF { - closeAllPipeWriters(writers) - return nil - } - if readErr != nil { - closeAllPipeWritersWithError(writers, readErr) - return fmt.Errorf("failed reading stream for container %s: %w", containerName, readErr) - } - } - }) - } - - consumerGroup := &errgroup.Group{} - for i, consumer := range consumers { - consumerGroup.Go(func() error { - defer closeAllPipeReaders(consumerStreams[i]) - - if consumer.Consume == nil { - return fmt.Errorf("consumer %q has nil Consume function", consumer.Name) - } - if err := consumer.Consume(consumerStreams[i]); err != nil { - return fmt.Errorf("consumer %q failed: %w", consumer.Name, err) - } - return nil - }) - } - - pumpErr := pumpGroup.Wait() - consumerErr := consumerGroup.Wait() - return errors.Join(pumpErr, consumerErr) -} - -func writeDockerLogPayload(dst io.Writer, reader io.Reader) error { - header := make([]byte, 8) - for { - _, err := io.ReadFull(reader, header) - if errors.Is(err, io.EOF) { - return nil - } - if err != nil { - return fmt.Errorf("failed to read log stream header: %w", err) - } - - msgSize := binary.BigEndian.Uint32(header[4:8]) - msg := make([]byte, msgSize) - if _, err = io.ReadFull(reader, msg); err != nil { - return fmt.Errorf("failed to read log message: %w", err) - } - if _, err = dst.Write(msg); err != nil { - return fmt.Errorf("failed to write log message: %w", err) - } - } -} - -func writeAll(writer io.Writer, data []byte) error { - for len(data) > 0 { - n, err := writer.Write(data) - if err != nil { - return err - } - data = data[n:] - } - return nil -} - -func closeAllPipeWriters(writers []*io.PipeWriter) { - for _, writer := range writers { - if writer == nil { - continue - } - _ = writer.Close() - } -} - -func closeAllPipeWritersWithError(writers []*io.PipeWriter, err error) { - for _, writer := range writers { - if writer == nil { - continue - } - _ = writer.CloseWithError(err) - } -} - -func closeAllPipeReaders(readers map[string]io.ReadCloser) { - for _, reader := range readers { - if reader == nil { - continue - } - _ = reader.Close() - } -} - func BuildImageOnce(once *sync.Once, dctx, dfile, nameAndTag string, buildArgs map[string]string) error { var err error once.Do(func() { diff --git a/framework/examples/myproject/smoke_logs_test.go b/framework/examples/myproject/smoke_logs_test.go index 6c42f0a15..728eb4d74 100644 --- a/framework/examples/myproject/smoke_logs_test.go +++ b/framework/examples/myproject/smoke_logs_test.go @@ -1,7 +1,10 @@ package examples import ( + "bufio" "fmt" + "io" + "regexp" "testing" "github.com/stretchr/testify/require" @@ -22,23 +25,38 @@ func TestLogsSmoke(t *testing.T) { in, err := framework.Load[CfgLogs](t) require.NoError(t, err) // most simple checks, save all the logs and check (CRIT|PANIC|FATAL) log levels - //t.Cleanup(func() { - // err := framework.SaveAndCheckLogs(t) - // require.NoError(t, err) - //}) t.Cleanup(func() { - // save all the logs to default directory "logs/docker-$test_name" - logs, err := framework.SaveContainerLogs(fmt.Sprintf("%s-%s", framework.DefaultCTFLogsDir, t.Name())) + err := framework.SaveAndCheckLogs(t) require.NoError(t, err) - // check that CL nodes has no errors (CRIT|PANIC|FATAL) levels - err = framework.CheckCLNodeContainerErrors() + }) + + re := regexp.MustCompile(`name=HeadReporter version=\d+`) + t.Cleanup(func() { + err := framework.StreamCTFContainerLogsFanout( + framework.LogStreamConsumer{ + Name: "custom-regex-assert", + Consume: func(logStreams map[string]io.ReadCloser) error { + for name, stream := range logStreams { + scanner := bufio.NewScanner(stream) + found := false + for scanner.Scan() { + if re.MatchString(scanner.Text()) { + found = true + break + } + } + if err := scanner.Err(); err != nil { + return fmt.Errorf("scan %s: %w", name, err) + } + if !found { + return fmt.Errorf("missing HeadReporter log in %s", name) + } + } + return nil + }, + }, + ) require.NoError(t, err) - // do custom assertions - for _, l := range logs { - matches, err := framework.SearchLogFile(l, " name=HeadReporter version=\\d") - require.NoError(t, err) - _ = matches - } }) bc, err := blockchain.NewBlockchainNetwork(in.BlockchainA) diff --git a/framework/logs.go b/framework/logs.go index c63c6ea9e..13167f8d8 100644 --- a/framework/logs.go +++ b/framework/logs.go @@ -2,83 +2,574 @@ package framework import ( "bufio" + "context" + "encoding/binary" + "errors" "fmt" + "io" + "maps" "os" "path/filepath" "regexp" + "slices" + "strconv" "strings" + "sync" + "testing" + + "github.com/docker/docker/api/types/container" + dfilter "github.com/docker/docker/api/types/filters" + "github.com/rs/zerolog" + tc "github.com/testcontainers/testcontainers-go" + "golang.org/x/sync/errgroup" ) const ( EnvVarIgnoreCriticalLogs = "CTF_IGNORE_CRITICAL_LOGS" + DefaultCTFLogsDir = "logs/docker" ) -func getLogDirectories() ([]string, error) { - logDirs := make([]string, 0) - currentDir, err := os.Getwd() +var criticalLogLevelRegex = regexp.MustCompile(`(CRIT|PANIC|FATAL)`) + +func checkNodeLogStream(source string, r io.Reader) error { + scanner := bufio.NewScanner(r) + // safer for long log lines + scanner.Buffer(make([]byte, 64*1024), 1024*1024) + lineNumber := 1 + for scanner.Scan() { + line := scanner.Text() + if criticalLogLevelRegex.MatchString(line) { + return fmt.Errorf("source %s contains matching log level at line %d: %s", source, lineNumber, line) + } + lineNumber++ + } + if err := scanner.Err(); err != nil { + return fmt.Errorf("error reading source %s: %w", source, err) + } + return nil +} + +// New stream-first API +func checkNodeLogErrorsFromStreams(streams map[string]io.ReadCloser) error { + if os.Getenv(EnvVarIgnoreCriticalLogs) == "true" { + L.Warn().Msg(`CTF_IGNORE_CRITICAL_LOGS is set to true, we ignore all CRIT|FATAL|PANIC errors in node logs!`) + return nil + } + for name, rc := range streams { + func() { + defer rc.Close() + }() + if err := checkNodeLogStream(name, rc); err != nil { + return err + } + } + return nil +} + +func StreamContainerLogs(listOptions container.ListOptions, logOptions container.LogsOptions) (map[string]io.ReadCloser, error) { + L.Info().Msg("Streaming Docker containers logs") + provider, err := tc.NewDockerProvider() if err != nil { - return nil, fmt.Errorf("failed to get current directory: %w", err) + return nil, fmt.Errorf("failed to create Docker provider: %w", err) } - entries, err := os.ReadDir(currentDir) + containers, err := provider.Client().ContainerList(context.Background(), listOptions) if err != nil { - return nil, fmt.Errorf("failed to read directory: %w", err) + return nil, fmt.Errorf("failed to list Docker containers: %w", err) } - for _, entry := range entries { - if entry.IsDir() && strings.HasPrefix(entry.Name(), "logs-") { - logDirs = append(logDirs, filepath.Join(currentDir, entry.Name())) - } + + eg := &errgroup.Group{} + logMap := make(map[string]io.ReadCloser) + var mutex sync.Mutex + + for _, containerInfo := range containers { + eg.Go(func() error { + containerName := safeContainerName(containerInfo) + L.Debug().Str("Container", containerName).Msg("Collecting logs") + logs, err := provider.Client().ContainerLogs(context.Background(), containerInfo.ID, logOptions) + if err != nil { + L.Error().Err(err).Str("Container", containerName).Msg("failed to fetch logs for container") + return err + } + mutex.Lock() + defer mutex.Unlock() + logMap[containerName] = logs + return nil + }) + } + if err := eg.Wait(); err != nil { + return nil, err + } + + return logMap, nil +} + +func CTFContainersListOpts() container.ListOptions { + return container.ListOptions{ + All: true, + Filters: dfilter.NewArgs(dfilter.KeyValuePair{ + Key: "label", + Value: "framework=ctf", + }), } - return logDirs, nil } -// CheckCLNodeContainerErrors check if any CL node container logs has errors -func CheckCLNodeContainerErrors() error { - dirs, err := getLogDirectories() +func CTFContainersLogsOpts() container.LogsOptions { + return container.LogsOptions{ShowStdout: true, ShowStderr: true} +} + +// LogStreamConsumer represents a log stream consumer that receives one stream per container. +type LogStreamConsumer struct { + Name string + Consume func(map[string]io.ReadCloser) error +} + +// StreamContainerLogsFanout fetches container logs once and fans out streams to all consumers. +func StreamContainerLogsFanout(listOptions container.ListOptions, logOptions container.LogsOptions, consumers ...LogStreamConsumer) error { + logStream, err := StreamContainerLogs(listOptions, logOptions) if err != nil { return err } - for _, dd := range dirs { - if err := checkNodeLogErrors(dd); err != nil { - return err + + return fanoutContainerLogs(logStream, consumers...) +} + +// StreamCTFContainerLogsFanout fetches CTF logs once and fans out streams to all consumers. +func StreamCTFContainerLogsFanout(consumers ...LogStreamConsumer) error { + return StreamContainerLogsFanout(CTFContainersListOpts(), CTFContainersLogsOpts(), consumers...) +} + +func fanoutContainerLogs(logStream map[string]io.ReadCloser, consumers ...LogStreamConsumer) error { + if len(consumers) == 0 { + for _, reader := range logStream { + _ = reader.Close() } + return nil } - return nil + + consumerStreams := make([]map[string]io.ReadCloser, len(consumers)) + for i := range consumers { + consumerStreams[i] = make(map[string]io.ReadCloser, len(logStream)) + } + + pumpGroup := &errgroup.Group{} + for containerName, sourceReader := range logStream { + writers := make([]*io.PipeWriter, len(consumers)) + for i := range consumers { + reader, writer := io.Pipe() + consumerStreams[i][containerName] = reader + writers[i] = writer + } + + pumpGroup.Go(func() error { + defer func() { + _ = sourceReader.Close() + }() + + readBuf := make([]byte, 32*1024) + for { + n, readErr := sourceReader.Read(readBuf) + if n > 0 { + // NOTE: io.Pipe is unbuffered, so a slow consumer can still backpressure others. + // Future improvement: decouple per-consumer delivery with bounded buffering. + chunk := readBuf[:n] + for i, writer := range writers { + if writer == nil { + continue + } + if writeErr := writeAll(writer, chunk); writeErr != nil { + if errors.Is(writeErr, io.ErrClosedPipe) { + writers[i] = nil + continue + } + closeAllPipeWritersWithError(writers, fmt.Errorf("failed writing stream for container %s: %w", containerName, writeErr)) + return fmt.Errorf("failed writing stream for container %s: %w", containerName, writeErr) + } + } + } + + if readErr == io.EOF { + closeAllPipeWriters(writers) + return nil + } + if readErr != nil { + closeAllPipeWritersWithError(writers, readErr) + return fmt.Errorf("failed reading stream for container %s: %w", containerName, readErr) + } + } + }) + } + + consumerGroup := &errgroup.Group{} + for i, consumer := range consumers { + consumerGroup.Go(func() error { + defer closeAllPipeReaders(consumerStreams[i]) + + if consumer.Consume == nil { + return fmt.Errorf("consumer %q has nil Consume function", consumer.Name) + } + if err := consumer.Consume(consumerStreams[i]); err != nil { + return fmt.Errorf("consumer %q failed: %w", consumer.Name, err) + } + return nil + }) + } + + pumpErr := pumpGroup.Wait() + consumerErr := consumerGroup.Wait() + return errors.Join(pumpErr, consumerErr) } -// checkNodeLogsErrors check Chainlink nodes logs for error levels -func checkNodeLogErrors(dir string) error { - if os.Getenv(EnvVarIgnoreCriticalLogs) == "true" { - L.Warn().Msg(`CTF_IGNORE_CRITICAL_LOGS is set to true, we ignore all CRIT|FATAL|PANIC errors in node logs!`) - return nil +func writeDockerLogPayload(dst io.Writer, reader io.Reader) error { + header := make([]byte, 8) + for { + _, err := io.ReadFull(reader, header) + if errors.Is(err, io.EOF) { + return nil + } + if err != nil { + return fmt.Errorf("failed to read log stream header: %w", err) + } + + msgSize := binary.BigEndian.Uint32(header[4:8]) + msg := make([]byte, msgSize) + if _, err = io.ReadFull(reader, msg); err != nil { + return fmt.Errorf("failed to read log message: %w", err) + } + if _, err = dst.Write(msg); err != nil { + return fmt.Errorf("failed to write log message: %w", err) + } } - fileRegex := regexp.MustCompile(`^node.*\.log$`) - logLevelRegex := regexp.MustCompile(`(CRIT|PANIC|FATAL)`) +} - err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error { +func writeAll(writer io.Writer, data []byte) error { + for len(data) > 0 { + n, err := writer.Write(data) if err != nil { return err } - if !info.IsDir() && fileRegex.MatchString(info.Name()) { - file, err := os.Open(path) + data = data[n:] + } + return nil +} + +func closeAllPipeWriters(writers []*io.PipeWriter) { + for _, writer := range writers { + if writer == nil { + continue + } + _ = writer.Close() + } +} + +func closeAllPipeWritersWithError(writers []*io.PipeWriter, err error) { + for _, writer := range writers { + if writer == nil { + continue + } + _ = writer.CloseWithError(err) + } +} + +func closeAllPipeReaders(readers map[string]io.ReadCloser) { + for _, reader := range readers { + if reader == nil { + continue + } + _ = reader.Close() + } +} + +func SaveAndCheckLogs(t *testing.T) error { + return StreamCTFContainerLogsFanout( + LogStreamConsumer{ + Name: "save-container-logs", + Consume: func(logStreams map[string]io.ReadCloser) error { + _, logsErr := SaveContainerLogsFromStreams(fmt.Sprintf("%s-%s", DefaultCTFLogsDir, t.Name()), logStreams) + return logsErr + }, + }, + LogStreamConsumer{ + Name: "check-container-logs", + Consume: func(logStreams map[string]io.ReadCloser) error { + checkErr := checkNodeLogErrorsFromStreams(logStreams) + return checkErr + }, + }, + ) +} + +// SaveContainerLogs writes all Docker container logs to some directory +func SaveContainerLogs(dir string) ([]string, error) { + logStream, lErr := StreamContainerLogs(CTFContainersListOpts(), CTFContainersLogsOpts()) + + if lErr != nil { + return nil, lErr + } + + return SaveContainerLogsFromStreams(dir, logStream) +} + +// SaveContainerLogsFromStreams writes all provided Docker log streams to files in some directory. +func SaveContainerLogsFromStreams(dir string, logStream map[string]io.ReadCloser) ([]string, error) { + L.Info().Msg("Writing Docker containers logs") + if _, err := os.Stat(dir); os.IsNotExist(err) { + if err := os.MkdirAll(dir, 0755); err != nil { + return nil, fmt.Errorf("failed to create directory %s: %w", dir, err) + } + } + + eg := &errgroup.Group{} + logFilePaths := make([]string, 0) + var logFilePathsMu sync.Mutex + for containerName, reader := range logStream { + eg.Go(func() error { + defer func() { + _ = reader.Close() + }() + + logFilePath := filepath.Join(dir, fmt.Sprintf("%s.log", containerName)) + logFile, err := os.Create(logFilePath) if err != nil { - return fmt.Errorf("failed to open file %s: %w", path, err) + L.Error().Err(err).Str("Container", containerName).Msg("failed to create container log file") + return err } - defer file.Close() - - scanner := bufio.NewScanner(file) - lineNumber := 1 - for scanner.Scan() { - line := scanner.Text() - if logLevelRegex.MatchString(line) { - return fmt.Errorf("file %s contains a matching log level at line %d: %s", path, lineNumber, line) - } - lineNumber++ + defer func() { + _ = logFile.Close() + }() + + logFilePathsMu.Lock() + logFilePaths = append(logFilePaths, logFilePath) + logFilePathsMu.Unlock() + + if err := writeDockerLogPayload(logFile, reader); err != nil { + L.Error().Err(err).Str("Container", containerName).Msg("failed to write container logs") + return err + } + return nil + }) + } + if err := eg.Wait(); err != nil { + return nil, err + } + return logFilePaths, nil +} + +var ExitedCtfContainersListOpts = container.ListOptions{ + All: true, + Filters: dfilter.NewArgs(dfilter.KeyValuePair{ + Key: "label", + Value: "framework=ctf", + }, + dfilter.KeyValuePair{ + Key: "status", + Value: "exited"}, + dfilter.KeyValuePair{ + Key: "status", + Value: "dead"}), +} + +// PrintFailedContainerLogs writes exited/dead CTF containers' last log lines to stdout. +func PrintFailedContainerLogs(logLinesCount uint64) error { + logStream, lErr := StreamContainerLogs(ExitedCtfContainersListOpts, container.LogsOptions{ + ShowStderr: true, + Tail: strconv.FormatUint(logLinesCount, 10), + }) + if lErr != nil { + return lErr + } + + return PrintFailedContainerLogsFromStreams(logStream, logLinesCount) +} + +// PrintFailedContainerLogsFromStreams prints all provided container streams as red text. +func PrintFailedContainerLogsFromStreams(logStream map[string]io.ReadCloser, logLinesCount uint64) error { + if len(logStream) == 0 { + L.Info().Msg("No failed Docker containers found") + return nil + } + + L.Error().Msgf("Exited/dead containers: %s", strings.Join(slices.Collect(maps.Keys(logStream)), ", ")) + + eg := &errgroup.Group{} + for cName, ioReader := range logStream { + eg.Go(func() error { + defer func() { + _ = ioReader.Close() + }() + + var content strings.Builder + if err := writeDockerLogPayload(&content, ioReader); err != nil { + return fmt.Errorf("failed to read logs for container %s: %w", cName, err) + } + + trimmed := strings.TrimSpace(content.String()) + if len(trimmed) > 0 { + L.Info().Str("Container", cName).Msgf("Last %d lines of logs", logLinesCount) + fmt.Println(RedText("%s\n", trimmed)) + } + + return nil + }) + } + + return eg.Wait() +} + +func CheckContainersForPanics(maxLinesAfterPanic int) bool { + logstream, err := StreamContainerLogs(CTFContainersListOpts(), CTFContainersLogsOpts()) + if err != nil { + L.Error().Err(err).Msg("failed to stream container logs for panic check") + return true + } + + return CheckContainersForPanicsFromStreams(logstream, maxLinesAfterPanic) +} + +// CheckContainersForPanicsFromStreams scans the provided stream (usually Docker container logs) for panic-related patterns. +// When a panic is detected, it displays the panic line and up to maxLinesAfterPanic lines following it. +// +// This is useful for debugging test failures where a Docker container may have panicked. +// The function searches for common panic patterns including: +// - "panic:" - Standard Go panic +// - "runtime error:" - Runtime errors (nil pointer, index out of bounds, etc.) +// - "fatal error:" - Fatal errors +// - "goroutine N [running]" - Stack trace indicators +// +// The function scans all containers in parallel and stops as soon as the first panic is found. +// +// Parameters: +// - logStream: Map of container names to their log streams (io.ReadCloser). +// - maxLinesAfterPanic: Maximum number of lines to show after the panic line (including stack trace). +// Recommended: 50-200 depending on how much context you want. +// +// Returns: +// - true if any panics were found in any container, false otherwise +func CheckContainersForPanicsFromStreams(logStream map[string]io.ReadCloser, maxLinesAfterPanic int) bool { + // Panic patterns to search for + panicPatterns := map[string]*regexp.Regexp{ + "panic": regexp.MustCompile(`(?i)panic:`), // Go panic + "runtime error": regexp.MustCompile(`(?i)runtime error:`), // Runtime errors + "fatal error": regexp.MustCompile(`(?i)fatal error:`), // Fatal errors + "stack trace": regexp.MustCompile(`goroutine \d+ \[running\]`), // Stack trace indicator + } + + // Create context for early cancellation when first panic is found + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Channel to receive panic results + panicFoundChan := make(chan bool, len(logStream)) + var wg sync.WaitGroup + + // Scan all containers in parallel + for containerName, reader := range logStream { + wg.Add(1) + go func(name string, r io.ReadCloser) { + defer wg.Done() + defer r.Close() + + panicFound := scanContainerForPanics(ctx, L, name, r, panicPatterns, maxLinesAfterPanic) + if panicFound { + panicFoundChan <- true + cancel() // Signal other goroutines to stop } - if err := scanner.Err(); err != nil { - return fmt.Errorf("error reading file %s: %w", path, err) + }(containerName, reader) + } + + // Wait for all goroutines to finish in a separate goroutine + go func() { + wg.Wait() + close(panicFoundChan) + }() + + // Check if any panic was found + for range panicFoundChan { + return true // Return as soon as first panic is found + } + + L.Info().Msg("No panics detected in any container logs") + + return false +} + +// scanContainerForPanics scans a single container's log stream for panic patterns +// It checks the context for cancellation to enable early termination when another goroutine finds a panic +func scanContainerForPanics(ctx context.Context, logger zerolog.Logger, containerName string, reader io.Reader, patterns map[string]*regexp.Regexp, maxLinesAfter int) bool { + scanner := bufio.NewScanner(reader) + // Increase buffer size to handle large log lines + scanner.Buffer(make([]byte, 64*1024), 1024*1024) + + var logLines []string + lineNum := 0 + panicLineNum := -1 + patternNameFound := "" + + // Read all lines and detect panic + for scanner.Scan() { + // Check if context is cancelled (another goroutine found a panic) + select { + case <-ctx.Done(): + return false // Stop scanning, another container already found a panic + default: + } + + line := scanner.Text() + lineNum++ + + // If we found a panic, collect remaining lines up to maxLinesAfter + if panicLineNum >= 0 { + logLines = append(logLines, line) + // Stop reading once we have enough context after the panic + if lineNum >= panicLineNum+maxLinesAfter+1 { + break } + continue } - return nil - }) - return err + + // Still searching for panic - store all lines + logLines = append(logLines, line) + + // Check if this line contains a panic pattern + for patternName, pattern := range patterns { + if pattern.MatchString(line) { + patternNameFound = patternName + panicLineNum = lineNum - 1 // Store index (0-based) + break + } + } + } + + if err := scanner.Err(); err != nil { + logger.Error().Err(err).Str("Container", containerName).Msg("error reading container logs") + return false + } + + // If panic was found, display it with context + if panicLineNum >= 0 { + logger.Error(). + Str("Container", containerName). + Int("PanicLineNumber", panicLineNum+1). + Msgf("🔥 %s DETECTED in container logs", strings.ToUpper(patternNameFound)) + + // Calculate range to display + startLine := panicLineNum + endLine := min(len(logLines), panicLineNum+maxLinesAfter+1) + + // Build the output + var output strings.Builder + fmt.Fprintf(&output, "\n%s\n", strings.Repeat("=", 80)) + fmt.Fprintf(&output, "%s FOUND IN CONTAINER: %s (showing %d lines from panic)\n", strings.ToUpper(patternNameFound), containerName, endLine-startLine) + fmt.Fprintf(&output, "%s\n", strings.Repeat("=", 80)) + + for i := startLine; i < endLine; i++ { + fmt.Fprintf(&output, "%s\n", logLines[i]) + } + + fmt.Fprintf(&output, "%s\n", strings.Repeat("=", 80)) + + fmt.Println(RedText("%s\n", output.String())) + return true + } + + return false } From a254209841e77c68f7bbfabb84684b72cf86d61e Mon Sep 17 00:00:00 2001 From: Bartek Tofel Date: Wed, 15 Apr 2026 17:03:35 +0200 Subject: [PATCH 8/8] fix bug + test --- framework/logs.go | 5 ++--- framework/logs_test.go | 21 ++++++++++++--------- 2 files changed, 14 insertions(+), 12 deletions(-) diff --git a/framework/logs.go b/framework/logs.go index 13167f8d8..35efb6053 100644 --- a/framework/logs.go +++ b/framework/logs.go @@ -56,12 +56,11 @@ func checkNodeLogErrorsFromStreams(streams map[string]io.ReadCloser) error { return nil } for name, rc := range streams { - func() { - defer rc.Close() - }() if err := checkNodeLogStream(name, rc); err != nil { + _ = rc.Close() return err } + _ = rc.Close() } return nil } diff --git a/framework/logs_test.go b/framework/logs_test.go index 31a363933..80e75863f 100644 --- a/framework/logs_test.go +++ b/framework/logs_test.go @@ -1,43 +1,46 @@ package framework import ( - "path/filepath" + "io" + "strings" "testing" ) -// Table test for checkLogFilesForLevels -func TestCheckLogFilesForLevels(t *testing.T) { +func TestCheckNodeLogErrorsFromStreams(t *testing.T) { tests := []struct { name string - dir string content string expectError bool }{ { name: "Clean", - dir: "clean", + content: `{"level":"info","msg":"all good"}`, expectError: false, }, { name: "Contains CRIT", - dir: "crit", + content: `{"level":"error","msg":"CRIT happened"}`, expectError: true, }, { name: "Contains PANIC", - dir: "panic", + content: `PANIC: something bad`, expectError: true, }, { name: "Contains FATAL", - dir: "fatal", + content: `{"msg":"FATAL condition"}`, expectError: true, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - err := checkNodeLogErrors(filepath.Join("testdata", tt.dir)) + streams := map[string]io.ReadCloser{ + tt.name: io.NopCloser(strings.NewReader(tt.content)), + } + + err := checkNodeLogErrorsFromStreams(streams) if tt.expectError && err == nil { t.Errorf("expected error but got none") } else if !tt.expectError && err != nil {