Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions cmd/localstack/awsutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func getBootstrap(args []string) (interop.Bootstrap, string) {
return NewSimpleBootstrap(bootstrapLookupCmd, currentWorkingDir), handler
}

func PrintEndReports(invokeId string, initDuration string, memorySize string, invokeStart time.Time, timeoutDuration time.Duration, w io.Writer) {
func PrintEndReports(invokeId string, initDuration string, status string, memorySize string, invokeStart time.Time, timeoutDuration time.Duration, w io.Writer) {
// Calculate invoke duration
invokeDuration := math.Min(float64(time.Now().Sub(invokeStart).Nanoseconds()),
float64(timeoutDuration.Nanoseconds())) / float64(time.Millisecond)
Expand All @@ -102,11 +102,12 @@ func PrintEndReports(invokeId string, initDuration string, memorySize string, in
// not a clean way to get this information from rapidcore
_, _ = fmt.Fprintf(w,
"REPORT RequestId: %s\t"+
initDuration+
"Duration: %.2f ms\t"+
"Billed Duration: %.f ms\t"+
"Memory Size: %s MB\t"+
"Max Memory Used: %s MB\t\n",
"Max Memory Used: %s MB\t"+
initDuration+
status+"\n",
invokeId, invokeDuration, math.Ceil(invokeDuration), memorySize, memorySize)
}

Expand Down
207 changes: 185 additions & 22 deletions cmd/localstack/custom_interop.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,17 @@ import (
"net/http"
"strconv"
"strings"
"sync/atomic"
"time"

"github.com/aws/aws-lambda-runtime-interface-emulator/internal/lambda/core/statejson"
"github.com/aws/aws-lambda-runtime-interface-emulator/internal/lambda/fatalerror"
"github.com/aws/aws-lambda-runtime-interface-emulator/internal/lambda/interop"
"github.com/aws/aws-lambda-runtime-interface-emulator/internal/lambda/rapidcore"
"github.com/aws/aws-lambda-runtime-interface-emulator/internal/lambda/rapidcore/standalone"
"github.com/aws/aws-lambda-runtime-interface-emulator/internal/lsapi"
"github.com/go-chi/chi/v5"
"github.com/google/uuid"
log "github.com/sirupsen/logrus"
)

Expand All @@ -28,6 +31,33 @@ type CustomInteropServer struct {
localStackAdapter *LocalStackAdapter
port string
upstreamEndpoint string
// logCollector accumulates the runtime's stdout/stderr plus the synthetic START/REPORT/
// INIT_REPORT lines that are flushed to LocalStack with each invocation's logs.
logCollector *LogCollector
// initStart is set once in Init() and warmStart is flipped on the first invoke.
// Both are accessed only from the single sequential init -> invoke flow (the RIE
// processes one invocation at a time), so they need no additional synchronization.
initStart time.Time
warmStart bool
// initTimedOut is set by ReportInitTimeout when the init phase exceeds its timeout. It is
// written from the init-await flow and read from the invoke flow, so it uses atomic access.
// When set, the first invocation's REPORT omits Init Duration (init was already reported as
// timed out and is re-run as a suppressed init during that invocation).
initTimedOut atomic.Bool
// initErrorForwarded is set once the runtime's own /init/error has been forwarded to
// LocalStack via SendInitErrorResponse, so the crash-path fallback (SendInitError) does
// not send a duplicate error status for the same failed initialization.
initErrorForwarded atomic.Bool
// initErrorType holds rapidcore's scrubbed fatal error type (e.g. Runtime.Unknown) when init
// failed, used to render the INIT_REPORT(phase=invoke) and REPORT Status/Error Type lines for
// the on-demand folded-into-invoke path. Stores a string; empty/unset means init did not fail.
initErrorType atomic.Value
// onDemand is true for on-demand functions, where AWS folds a failed cold-start init into
// the first invocation (suppressed init). For these we do NOT report init failures via
// /status/error; instead we signal ready and let the first invoke surface the error with
// the full INIT_REPORT/START/END/REPORT envelope. Provisioned concurrency, SnapStart, and
// Managed Instances keep the provisioning-time /status/error model.
onDemand bool
}

type LocalStackAdapter struct {
Expand All @@ -44,10 +74,11 @@ const (

func (l *LocalStackAdapter) SendStatus(status LocalStackStatus, payload []byte) error {
statusUrl := fmt.Sprintf("%s/status/%s/%s", l.UpstreamEndpoint, l.RuntimeId, status)
_, err := http.Post(statusUrl, "application/json", bytes.NewReader(payload))
resp, err := http.Post(statusUrl, "application/json", bytes.NewReader(payload))
if err != nil {
return err
}
defer resp.Body.Close()
return nil
}

Expand All @@ -57,8 +88,12 @@ func (l *LocalStackAdapter) SendLogs(invokeId string, logs lsapi.LogResponse) er
if err != nil {
return err
}
_, err = http.Post(l.UpstreamEndpoint+"/invocations/"+invokeId+"/logs", "application/json", bytes.NewReader(serialized))
return err
resp, err := http.Post(l.UpstreamEndpoint+"/invocations/"+invokeId+"/logs", "application/json", bytes.NewReader(serialized))
if err != nil {
return err
}
defer resp.Body.Close()
return nil
}

// SendResult posts the invocation result body to LocalStack.
Expand All @@ -78,19 +113,22 @@ func (l *LocalStackAdapter) SendResult(invokeId string, body []byte, isError boo
} else {
log.Infoln("Sending to /response")
}
_, err := http.Post(l.UpstreamEndpoint+endpoint, "application/json", bytes.NewReader(body))
return err
resp, err := http.Post(l.UpstreamEndpoint+endpoint, "application/json", bytes.NewReader(body))
if err != nil {
return err
}
defer resp.Body.Close()
return nil
}

func NewCustomInteropServer(lsOpts *LsOpts, delegate interop.Server, logCollector *LogCollector) (server *CustomInteropServer) {
func NewCustomInteropServer(lsOpts *LsOpts, adapter *LocalStackAdapter, delegate interop.Server, logCollector *LogCollector) (server *CustomInteropServer) {
server = &CustomInteropServer{
delegate: delegate.(*rapidcore.Server),
port: lsOpts.InteropPort,
upstreamEndpoint: lsOpts.RuntimeEndpoint,
localStackAdapter: &LocalStackAdapter{
UpstreamEndpoint: lsOpts.RuntimeEndpoint,
RuntimeId: lsOpts.RuntimeId,
},
delegate: delegate.(*rapidcore.Server),
port: lsOpts.InteropPort,
upstreamEndpoint: lsOpts.RuntimeEndpoint,
localStackAdapter: adapter,
logCollector: logCollector,
onDemand: GetenvWithDefault("AWS_LAMBDA_INITIALIZATION_TYPE", "on-demand") == "on-demand",
}

// TODO: extract this
Expand All @@ -110,8 +148,26 @@ func NewCustomInteropServer(lsOpts *LsOpts, delegate interop.Server, logCollecto
}

invokeResp := &standalone.ResponseWriterProxy{}
functionVersion := GetEnvOrDie("AWS_LAMBDA_FUNCTION_VERSION") // default $LATEST
_, _ = fmt.Fprintf(logCollector, "START RequestId: %s Version: %s\n", invokeR.InvokeId, functionVersion)
// The synthetic START line is emitted via LocalStackEventsAPI.SendInvokeStart so it
// lands after any inline (suppressed) init, matching AWS — see events.go.

initErrType, _ := server.initErrorType.Load().(string)

initDuration := ""
if !server.warmStart && !server.initTimedOut.Load() && initErrType == "" {
initTimeMS := float64(time.Since(server.initStart).Nanoseconds()) / float64(time.Millisecond)
initDuration = fmt.Sprintf("Init Duration: %.2f ms\t", initTimeMS)
}
server.warmStart = true

// On-demand init failure folded into this invocation (AWS suppressed init): emit
// the INIT_REPORT(phase=invoke) line before START (emitted during Invoke below).
if initErrType != "" {
initTimeMS := float64(time.Since(server.initStart).Nanoseconds()) / float64(time.Millisecond)
_, _ = fmt.Fprintf(logCollector,
"INIT_REPORT Init Duration: %.2f ms\tPhase: invoke\tStatus: error\tError Type: %s\n",
initTimeMS, initErrType)
}

invokeStart := time.Now()
err = server.Invoke(invokeResp, &interop.Invoke{
Expand All @@ -134,15 +190,17 @@ func NewCustomInteropServer(lsOpts *LsOpts, delegate interop.Server, logCollecto
})
timeout := int(server.delegate.GetInvokeTimeout().Seconds())
isErr := false
status := ""
if err != nil {
switch {
case errors.Is(err, rapidcore.ErrInvokeTimeout):
log.Debugf("Got invoke timeout")
isErr = true
status = "Status: timeout"
errorResponse := lsapi.ErrorResponse{
ErrorType: "Sandbox.Timedout",
ErrorMessage: fmt.Sprintf(
"%s %s Task timed out after %d.00 seconds",
time.Now().Format("2006-01-02T15:04:05Z"),
"RequestId: %s Error: Task timed out after %d.00 seconds",
invokeR.InvokeId,
timeout,
),
Expand All @@ -161,6 +219,12 @@ func NewCustomInteropServer(lsOpts *LsOpts, delegate interop.Server, logCollecto
log.Fatalln(err)
}
}
// On-demand init failure folded into this invocation: the REPORT carries the
// failure status and rapidcore's scrubbed fatal error type (e.g. Runtime.Unknown).
if initErrType != "" {
isErr = true
status = "Status: error\tError Type: " + initErrType
}
// optional sleep. can be used for debugging purposes
if lsOpts.PostInvokeWaitMS != "" {
waitMS, err := strconv.Atoi(lsOpts.PostInvokeWaitMS)
Expand All @@ -171,7 +235,7 @@ func NewCustomInteropServer(lsOpts *LsOpts, delegate interop.Server, logCollecto
}
timeoutDuration := time.Duration(timeout) * time.Second
memorySize := GetEnvOrDie("AWS_LAMBDA_FUNCTION_MEMORY_SIZE")
PrintEndReports(invokeR.InvokeId, "", memorySize, invokeStart, timeoutDuration, logCollector)
PrintEndReports(invokeR.InvokeId, initDuration, status, memorySize, invokeStart, timeoutDuration, logCollector)

if err2 := server.localStackAdapter.SendLogs(invokeR.InvokeId, logCollector.getLogs()); err2 != nil {
log.Error("failed to send logs to LocalStack: ", err2)
Expand Down Expand Up @@ -204,13 +268,101 @@ func (c *CustomInteropServer) SendErrorResponse(invokeID string, resp *interop.E
return c.delegate.SendErrorResponse(invokeID, resp)
}

// SendInitErrorResponse writes error response during init to a shared memory and sends GIRD FAULT.
// SendInitErrorResponse forwards the init error reported by the runtime (via /init/error) to
// LocalStack and then propagates it to the delegate. It marks initErrorForwarded so the
// crash-path fallback in main.go (SendInitError) does not send a duplicate error status for
// the same failed initialization.
func (c *CustomInteropServer) SendInitErrorResponse(resp *interop.ErrorInvokeResponse) error {
log.Traceln("SendInitErrorResponse called")
if err := c.localStackAdapter.SendStatus(Error, resp.Payload); err != nil {
log.Fatalln("Failed to send init error to LocalStack " + err.Error() + ". Exiting.")
// Mark synchronously, before sending: this runs in the init flow before
// AwaitInitializedWithDetails unblocks in main.go, so the fallback observes the flag.
c.initErrorForwarded.Store(true)
// Record rapidcore's scrubbed fatal error type so the folded-into-invoke path can render the
// INIT_REPORT(phase=invoke) and REPORT Status/Error Type lines (on-demand).
c.initErrorType.Store(string(resp.FunctionError.Type))

// Always cache the structured error in the delegate so the first invoke can surface it.
defer c.delegate.SendInitErrorResponse(resp)

// On-demand folds the failed init into the first invocation, which carries the error and
// logs; reporting it here via /status/error too would race the invoke and fail the env
// startup before the invoke runs. PC/SnapStart/MI report at provisioning time below.
if c.onDemand {
return nil
}

// Forward the runtime's structured payload as-is and only inject the requestId. Decoding
// into a map rather than a typed struct preserves fields exactly as the runtime emitted
// them — in particular an empty but present "stackTrace": [] (e.g. Runtime.HandlerNotFound),
// which a typed struct with omitempty would drop on re-marshal.
var payload map[string]any
if err := json.Unmarshal(resp.Payload, &payload); err != nil {
log.WithError(err).Warn("Failed to parse init error payload; forwarding raw payload")
if err := c.localStackAdapter.SendStatus(Error, resp.Payload); err != nil {
log.WithError(err).WithField("runtime-id", c.localStackAdapter.RuntimeId).
Error("Failed to send init error to LocalStack")
}
return nil
}

// No invocation is active during the init phase, so this is typically blank; AWS still
// includes a (blank) requestId in the init error payload.
payload["requestId"] = c.delegate.GetCurrentInvokeID()

body, err := json.Marshal(payload)
if err != nil {
log.WithError(err).Error("Failed to marshal adapted init error response")
body = resp.Payload
}

if err := c.localStackAdapter.SendStatus(Error, body); err != nil {
log.WithError(err).WithField("runtime-id", c.localStackAdapter.RuntimeId).
Error("Failed to send init error to LocalStack")
}
return nil
}

// SendInitError reports a structured init failure to LocalStack when the runtime failed to
// initialize WITHOUT calling /init/error itself (e.g. it crashed, called sys.exit, or had an
// invalid entrypoint). The init failure is detected by the existing rapidcore machinery
// (watchEvents -> InitFailure -> AwaitInitializedWithDetails) and surfaced to main.go.
// It is a no-op if SendInitErrorResponse already forwarded the runtime's own structured error.
func (c *CustomInteropServer) SendInitError(errType fatalerror.ErrorType, errMsg error) {
if c.initErrorForwarded.Load() {
log.Debug("Init error already forwarded to LocalStack; skipping duplicate")
return
}

if errType == "" {
errType = fatalerror.RuntimeExit
}

message := "Runtime exited during initialization"
if errMsg != nil {
message = errMsg.Error()
}

// Match AWS's fault message format "RequestId: <id> Error: <msg>". No invocation is active
// during the init phase (LocalStack only dispatches an invoke after the runtime reports
// ready), so synthesize a request ID, preferring the current invoke ID if one exists.
requestID := c.delegate.GetCurrentInvokeID()
if requestID == "" {
requestID = uuid.NewString()
}

payload, err := json.Marshal(lsapi.ErrorResponse{
ErrorType: string(errType),
ErrorMessage: fmt.Sprintf("RequestId: %s Error: %s", requestID, message),
})
if err != nil {
log.WithError(err).Error("Failed to marshal init error response")
return
}

if err := c.localStackAdapter.SendStatus(Error, payload); err != nil {
log.WithError(err).WithField("runtime-id", c.localStackAdapter.RuntimeId).
Error("Failed to send init error to LocalStack")
}
return c.delegate.SendInitErrorResponse(resp)
}

func (c *CustomInteropServer) GetCurrentInvokeID() string {
Expand All @@ -225,9 +377,20 @@ func (c *CustomInteropServer) SendRuntimeReady() error {

func (c *CustomInteropServer) Init(i *interop.Init, invokeTimeoutMs int64) error {
log.Traceln("Init called")
c.initStart = time.Now()
return c.delegate.Init(i, invokeTimeoutMs)
}

// ReportInitTimeout emits an AWS-style INIT_REPORT timeout line into the log collector and
// marks the init as timed out. The init is then re-run as a suppressed init during the first
// invocation (under the function timeout), and that invocation's REPORT omits Init Duration.
func (c *CustomInteropServer) ReportInitTimeout() {
c.initTimedOut.Store(true)
initTimeMS := float64(time.Since(c.initStart).Nanoseconds()) / float64(time.Millisecond)
_, _ = fmt.Fprintf(c.logCollector,
"INIT_REPORT Init Duration: %.2f ms\tPhase: init\tStatus: timeout\n", initTimeMS)
}

func (c *CustomInteropServer) Invoke(responseWriter http.ResponseWriter, invoke *interop.Invoke) error {
log.Traceln("Invoke called")
return c.delegate.Invoke(responseWriter, invoke)
Expand Down
30 changes: 30 additions & 0 deletions cmd/localstack/events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package main

import (
"fmt"

"github.com/aws/aws-lambda-runtime-interface-emulator/internal/lambda/interop"
"github.com/aws/aws-lambda-runtime-interface-emulator/internal/lambda/rapidcore/standalone/telemetry"
)

// LocalStackEventsAPI rides rapidcore's invoke lifecycle events to emit the synthetic START
// log line at the AWS-faithful point. rapidcore calls SendInvokeStart after any inline
// (suppressed) init and before the runtime handles the invocation (see doInvoke in
// internal/lambda/rapid/handlers.go), so emitting START here — rather than eagerly when
// LocalStack dispatches /invoke — places it after a re-run init's logs, matching AWS.
type LocalStackEventsAPI struct {
*telemetry.StandaloneEventsAPI
logCollector *LogCollector
}

func NewLocalStackEventsAPI(logCollector *LogCollector) *LocalStackEventsAPI {
return &LocalStackEventsAPI{
StandaloneEventsAPI: new(telemetry.StandaloneEventsAPI),
logCollector: logCollector,
}
}

func (e *LocalStackEventsAPI) SendInvokeStart(data interop.InvokeStartData) error {
_, _ = fmt.Fprintf(e.logCollector, "START RequestId: %s Version: %s\n", data.RequestID, data.Version)
return e.StandaloneEventsAPI.SendInvokeStart(data)
}
5 changes: 5 additions & 0 deletions cmd/localstack/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ func (lc *LogCollector) reset() {
func (lc *LogCollector) getLogs() lsapi.LogResponse {
lc.mutex.Lock()
defer lc.mutex.Unlock()
// Emit the captured runtime output verbatim. Do NOT rewrite bare carriage returns to line
// feeds: AWS keeps a bare CR inside a single CloudWatch log event (it splits records on LF
// only), so a user `print("a\rb")` must stay the one event "a\rb". LocalStack's log ingestion
// likewise splits on "\n" (see services/lambda_/.../logs.py), so converting CR to LF here
// would wrongly split such records — see TestCloudwatchLogs::test_multi_line_prints.
response := lsapi.LogResponse{
Logs: strings.Join(lc.RuntimeLogs, ""),
}
Expand Down
Loading