diff --git a/cmd/gcs/main.go b/cmd/gcs/main.go index e45fa55d03..dff55a6154 100644 --- a/cmd/gcs/main.go +++ b/cmd/gcs/main.go @@ -388,35 +388,24 @@ func main() { if err != nil { logrus.WithError(err).Fatal("failed to initialize new runc runtime") } - mux := bridge.NewBridgeMux() - b := bridge.Bridge{ - Handler: mux, - EnableV4: *v4, - } + + publisher := &bridge.Publisher{} h := hcsv2.NewHost(rtime, tport, initialEnforcer, logWriter) // Initialize virtual pod support in the host if err := h.InitializeVirtualPodSupport(virtualPodsControl); err != nil { logrus.WithError(err).Warn("Virtual pod support initialization failed") } - b.AssignHandlers(mux, h) - var bridgeIn io.ReadCloser - var bridgeOut io.WriteCloser - if *useInOutErr { - bridgeIn = os.Stdin - bridgeOut = os.Stdout - } else { - const commandPort uint32 = 0x40000000 - bridgeCon, err := tport.Dial(commandPort) - if err != nil { - logrus.WithFields(logrus.Fields{ - "port": commandPort, - logrus.ErrorKey: err, - }).Fatal("failed to dial host vsock connection") - } - bridgeIn = bridgeCon - bridgeOut = bridgeCon - } + const commandPort uint32 = 0x40000000 + + // Reconnect loop: on each iteration we create a fresh bridge+mux, dial the + // host, and serve until the connection drops. After a live migration the + // vsock connection breaks; we re-dial and continue. + // + // During live migration the VM is frozen and only wakes up when the host + // shim is ready, so the vsock port should be immediately available. We + // use a tight retry interval instead of exponential backoff. + const reconnectInterval = 100 * time.Millisecond event := cgroups1.MemoryThresholdEvent(*gcsMemLimitBytes, false) gefd, err := gcsControl.RegisterMemoryEvent(event) @@ -433,7 +422,6 @@ func main() { oomFile := os.NewFile(oom, "cefd") defer oomFile.Close() - // Setup OOM monitoring for virtual-pods cgroup virtualPodsOom, err := virtualPodsControl.OOMEventFD() if err != nil { logrus.WithError(err).Fatal("failed to retrieve the virtual-pods cgroups oom eventfd") @@ -441,7 +429,6 @@ func main() { virtualPodsOomFile := os.NewFile(virtualPodsOom, "vp-oomfd") defer virtualPodsOomFile.Close() - // time synchronization service if !(*disableTimeSync) { if err = startTimeSyncService(); err != nil { logrus.WithError(err).Fatal("failed to start time synchronization service") @@ -451,10 +438,44 @@ func main() { go readMemoryEvents(startTime, gefdFile, "/gcs", int64(*gcsMemLimitBytes), gcsControl) go readMemoryEvents(startTime, oomFile, "/containers", containersLimit, containersControl) go readMemoryEvents(startTime, virtualPodsOomFile, "/containers/virtual-pods", containersLimit, virtualPodsControl) - err = b.ListenAndServe(bridgeIn, bridgeOut) - if err != nil { - logrus.WithFields(logrus.Fields{ - logrus.ErrorKey: err, - }).Fatal("failed to serve gcs service") + + for { + mux := bridge.NewBridgeMux() + b := bridge.Bridge{ + Handler: mux, + EnableV4: *v4, + Publisher: publisher, + } + b.AssignHandlers(mux, h) + publisher.SetBridge(&b) + + var bridgeIn io.ReadCloser + var bridgeOut io.WriteCloser + if *useInOutErr { + bridgeIn = os.Stdin + bridgeOut = os.Stdout + } else { + bridgeCon, dialErr := tport.Dial(commandPort) + if dialErr != nil { + logrus.WithError(dialErr).Warn("failed to dial host, retrying") + time.Sleep(reconnectInterval) + continue + } + bridgeIn = bridgeCon + bridgeOut = bridgeCon + } + + logrus.Info("bridge connected, serving") + + serveErr := b.ListenAndServe(bridgeIn, bridgeOut) + publisher.SetBridge(nil) + + if b.ShutdownRequested() { + logrus.Info("bridge shutdown requested, exiting reconnect loop") + break + } + + logrus.WithError(serveErr).Warn("bridge connection lost, will reconnect") + time.Sleep(reconnectInterval) } } diff --git a/internal/guest/bridge/bridge.go b/internal/guest/bridge/bridge.go index 4ea03ed104..f291093dee 100644 --- a/internal/guest/bridge/bridge.go +++ b/internal/guest/bridge/bridge.go @@ -189,6 +189,15 @@ type Bridge struct { hasQuitPending atomic.Bool protVer prot.ProtocolVersion + + // Publisher is a stable notification sink that survives bridge recreation + // during live migration. + Publisher *Publisher +} + +// ShutdownRequested returns true if the bridge has been asked to shut down. +func (b *Bridge) ShutdownRequested() bool { + return b.hasQuitPending.Load() } // AssignHandlers creates and assigns the appropriate bridge @@ -226,17 +235,20 @@ func (b *Bridge) AssignHandlers(mux *Mux, host *hcsv2.Host) { func (b *Bridge) ListenAndServe(bridgeIn io.ReadCloser, bridgeOut io.WriteCloser) error { requestChan := make(chan *Request) requestErrChan := make(chan error) - b.responseChan = make(chan bridgeResponse) + b.responseChan = make(chan bridgeResponse, 16) responseErrChan := make(chan error) b.quitChan = make(chan bool) - defer close(b.quitChan) + // Close order matters: quitChan must close first so PublishNotification + // and in-flight handlers see it before responseChan becomes invalid. + // responseChan is never explicitly closed — the response writer exits + // when quitChan closes and no more sends are possible. + defer bridgeIn.Close() + defer close(requestErrChan) + defer close(requestChan) defer bridgeOut.Close() defer close(responseErrChan) - defer close(b.responseChan) - defer close(requestChan) - defer close(requestErrChan) - defer bridgeIn.Close() + defer close(b.quitChan) // Receive bridge requests and schedule them to be processed. go func() { @@ -440,7 +452,20 @@ func (b *Bridge) PublishNotification(n *prot.ContainerNotification) { }, response: n, } - b.responseChan <- resp + // Check quitChan first to avoid sending to a dead bridge. + select { + case <-b.quitChan: + logrus.WithField("containerID", n.ContainerID). + Warn("bridge quit, dropping notification") + return + default: + } + select { + case b.responseChan <- resp: + case <-b.quitChan: + logrus.WithField("containerID", n.ContainerID). + Warn("bridge quit, dropping notification") + } } // setErrorForResponseBase modifies the passed-in MessageResponseBase to diff --git a/internal/guest/bridge/bridge_v2.go b/internal/guest/bridge/bridge_v2.go index 800094e549..9b9178745d 100644 --- a/internal/guest/bridge/bridge_v2.go +++ b/internal/guest/bridge/bridge_v2.go @@ -123,7 +123,7 @@ func (b *Bridge) createContainerV2(r *Request) (_ RequestResponse, err error) { Result: 0, ResultInfo: "", } - b.PublishNotification(notification) + b.Publisher.Publish(notification) }() return &prot.ContainerCreateResponse{}, nil diff --git a/internal/guest/bridge/publisher.go b/internal/guest/bridge/publisher.go new file mode 100644 index 0000000000..1db3db3b3b --- /dev/null +++ b/internal/guest/bridge/publisher.go @@ -0,0 +1,39 @@ +//go:build linux + +package bridge + +import ( + "sync" + + "github.com/Microsoft/hcsshim/internal/guest/prot" + "github.com/sirupsen/logrus" +) + +// Publisher provides a stable reference for container exit goroutines +// to publish notifications through. It survives bridge recreation +// during live migration — when the bridge is nil, notifications are dropped. +type Publisher struct { + mu sync.Mutex + b *Bridge +} + +// SetBridge attaches or detaches the current bridge. +// Pass nil to detach (notifications will be dropped until a new bridge is set). +func (p *Publisher) SetBridge(b *Bridge) { + p.mu.Lock() + defer p.mu.Unlock() + p.b = b +} + +// Publish sends a container notification to the current bridge. +// If no bridge is connected, the notification is dropped with a warning. +func (p *Publisher) Publish(n *prot.ContainerNotification) { + p.mu.Lock() + defer p.mu.Unlock() + if p.b == nil { + logrus.WithField("containerID", n.ContainerID). + Warn("bridge not connected, dropping container notification") + return + } + p.b.PublishNotification(n) +} diff --git a/internal/guest/bridge/publisher_test.go b/internal/guest/bridge/publisher_test.go new file mode 100644 index 0000000000..19a9a17ac1 --- /dev/null +++ b/internal/guest/bridge/publisher_test.go @@ -0,0 +1,26 @@ +//go:build linux + +package bridge + +import ( + "testing" + + "github.com/Microsoft/hcsshim/internal/guest/prot" +) + +func TestPublisher_NilBridge(t *testing.T) { + p := &Publisher{} + // Should not panic when bridge is nil + p.Publish(&prot.ContainerNotification{ + MessageBase: prot.MessageBase{ContainerID: "test"}, + }) +} + +func TestPublisher_SetBridgeNil(t *testing.T) { + p := &Publisher{} + p.SetBridge(nil) + // Should not panic + p.Publish(&prot.ContainerNotification{ + MessageBase: prot.MessageBase{ContainerID: "test"}, + }) +}