Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 51 additions & 30 deletions cmd/gcs/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,35 +388,24 @@ func main() {
if err != nil {
logrus.WithError(err).Fatal("failed to initialize new runc runtime")
}
mux := bridge.NewBridgeMux()
b := bridge.Bridge{
Handler: mux,
EnableV4: *v4,
}

publisher := &bridge.Publisher{}
h := hcsv2.NewHost(rtime, tport, initialEnforcer, logWriter)
// Initialize virtual pod support in the host
if err := h.InitializeVirtualPodSupport(virtualPodsControl); err != nil {
logrus.WithError(err).Warn("Virtual pod support initialization failed")
}
b.AssignHandlers(mux, h)

var bridgeIn io.ReadCloser
var bridgeOut io.WriteCloser
if *useInOutErr {
bridgeIn = os.Stdin
bridgeOut = os.Stdout
} else {
const commandPort uint32 = 0x40000000
bridgeCon, err := tport.Dial(commandPort)
if err != nil {
logrus.WithFields(logrus.Fields{
"port": commandPort,
logrus.ErrorKey: err,
}).Fatal("failed to dial host vsock connection")
}
bridgeIn = bridgeCon
bridgeOut = bridgeCon
}
const commandPort uint32 = 0x40000000

// Reconnect loop: on each iteration we create a fresh bridge+mux, dial the
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general, an exponential backoff is the right answer. But in this case, the VM is frozen in time, and only wakes up when the host shim is ready. The connection should be immediately available. I think I'd rather see this a very tight loop personally

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed — the VM is frozen and wakes up with the host ready, so the vsock should be available right away. I'll switch to a tight fixed-interval retry (e.g. 100ms) instead of exponential backoff.

// host, and serve until the connection drops. After a live migration the
// vsock connection breaks; we re-dial and continue.
//
// During live migration the VM is frozen and only wakes up when the host
// shim is ready, so the vsock port should be immediately available. We
// use a tight retry interval instead of exponential backoff.
const reconnectInterval = 100 * time.Millisecond

event := cgroups1.MemoryThresholdEvent(*gcsMemLimitBytes, false)
gefd, err := gcsControl.RegisterMemoryEvent(event)
Expand All @@ -433,15 +422,13 @@ func main() {
oomFile := os.NewFile(oom, "cefd")
defer oomFile.Close()

// Setup OOM monitoring for virtual-pods cgroup
virtualPodsOom, err := virtualPodsControl.OOMEventFD()
if err != nil {
logrus.WithError(err).Fatal("failed to retrieve the virtual-pods cgroups oom eventfd")
}
virtualPodsOomFile := os.NewFile(virtualPodsOom, "vp-oomfd")
defer virtualPodsOomFile.Close()

// time synchronization service
if !(*disableTimeSync) {
if err = startTimeSyncService(); err != nil {
logrus.WithError(err).Fatal("failed to start time synchronization service")
Expand All @@ -451,10 +438,44 @@ func main() {
go readMemoryEvents(startTime, gefdFile, "/gcs", int64(*gcsMemLimitBytes), gcsControl)
go readMemoryEvents(startTime, oomFile, "/containers", containersLimit, containersControl)
go readMemoryEvents(startTime, virtualPodsOomFile, "/containers/virtual-pods", containersLimit, virtualPodsControl)
err = b.ListenAndServe(bridgeIn, bridgeOut)
if err != nil {
logrus.WithFields(logrus.Fields{
logrus.ErrorKey: err,
}).Fatal("failed to serve gcs service")

for {
mux := bridge.NewBridgeMux()
b := bridge.Bridge{
Handler: mux,
EnableV4: *v4,
Publisher: publisher,
}
b.AssignHandlers(mux, h)
publisher.SetBridge(&b)

var bridgeIn io.ReadCloser
var bridgeOut io.WriteCloser
if *useInOutErr {
bridgeIn = os.Stdin
bridgeOut = os.Stdout
} else {
bridgeCon, dialErr := tport.Dial(commandPort)
if dialErr != nil {
logrus.WithError(dialErr).Warn("failed to dial host, retrying")
time.Sleep(reconnectInterval)
continue
}
bridgeIn = bridgeCon
bridgeOut = bridgeCon
}

logrus.Info("bridge connected, serving")

serveErr := b.ListenAndServe(bridgeIn, bridgeOut)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why cant you just reset the isQuitPending and call ListenAndServe again? Wouldnt that "just work"?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It almost works, but there's a subtle issue with handler goroutines. The handler dispatch at line 356 spawns go func(r *Request) { ... b.responseChan <- br }(req) — this goroutine captures b and sends to b.responseChan, which is a struct field. If a handler is still in-flight when ListenAndServe returns (say a slow CreateContainer or ExecProcess), and we call ListenAndServe again on the same bridge, the new call overwrites b.responseChan = make(chan ...) while the old handler is about to send to it. That's a data race on the struct field — the old goroutine reads b.responseChan concurrently with the new ListenAndServe writing it.

In practice this window is very small (handlers finish fast), so it wouldn't show up in normal LM testing. But under load — say a CreateContainer request arrives right as the vsock drops during migration — the handler goroutine could be mid-flight when we re-enter ListenAndServe.

Recreating Bridge means the old handlers hold a reference to the old (now-dead) bridge with its own channels, and the new bridge has completely separate state. No shared mutable field.

That said, if you think the simplicity of reuse outweighs this edge case, we could make it work by not closing responseChan in the defers and adding a short drain period before re-entering. Happy to go either way.

publisher.SetBridge(nil)

if b.ShutdownRequested() {
logrus.Info("bridge shutdown requested, exiting reconnect loop")
break
}

logrus.WithError(serveErr).Warn("bridge connection lost, will reconnect")
time.Sleep(reconnectInterval)
}
}
39 changes: 32 additions & 7 deletions internal/guest/bridge/bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,15 @@ type Bridge struct {
hasQuitPending atomic.Bool

protVer prot.ProtocolVersion

// Publisher is a stable notification sink that survives bridge recreation
// during live migration.
Publisher *Publisher
}

// ShutdownRequested returns true if the bridge has been asked to shut down.
func (b *Bridge) ShutdownRequested() bool {
return b.hasQuitPending.Load()
}

// AssignHandlers creates and assigns the appropriate bridge
Expand Down Expand Up @@ -226,17 +235,20 @@ func (b *Bridge) AssignHandlers(mux *Mux, host *hcsv2.Host) {
func (b *Bridge) ListenAndServe(bridgeIn io.ReadCloser, bridgeOut io.WriteCloser) error {
requestChan := make(chan *Request)
requestErrChan := make(chan error)
b.responseChan = make(chan bridgeResponse)
b.responseChan = make(chan bridgeResponse, 16)
responseErrChan := make(chan error)
b.quitChan = make(chan bool)

defer close(b.quitChan)
// Close order matters: quitChan must close first so PublishNotification
// and in-flight handlers see it before responseChan becomes invalid.
// responseChan is never explicitly closed — the response writer exits
// when quitChan closes and no more sends are possible.
defer bridgeIn.Close()
defer close(requestErrChan)
defer close(requestChan)
defer bridgeOut.Close()
defer close(responseErrChan)
defer close(b.responseChan)
defer close(requestChan)
defer close(requestErrChan)
defer bridgeIn.Close()
defer close(b.quitChan)

// Receive bridge requests and schedule them to be processed.
go func() {
Expand Down Expand Up @@ -440,7 +452,20 @@ func (b *Bridge) PublishNotification(n *prot.ContainerNotification) {
},
response: n,
}
b.responseChan <- resp
// Check quitChan first to avoid sending to a dead bridge.
select {
case <-b.quitChan:
logrus.WithField("containerID", n.ContainerID).
Warn("bridge quit, dropping notification")
return
default:
}
select {
case b.responseChan <- resp:
case <-b.quitChan:
logrus.WithField("containerID", n.ContainerID).
Warn("bridge quit, dropping notification")
}
}

// setErrorForResponseBase modifies the passed-in MessageResponseBase to
Expand Down
2 changes: 1 addition & 1 deletion internal/guest/bridge/bridge_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (b *Bridge) createContainerV2(r *Request) (_ RequestResponse, err error) {
Result: 0,
ResultInfo: "",
}
b.PublishNotification(notification)
b.Publisher.Publish(notification)
}()

return &prot.ContainerCreateResponse{}, nil
Expand Down
39 changes: 39 additions & 0 deletions internal/guest/bridge/publisher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
//go:build linux

package bridge

import (
"sync"

"github.com/Microsoft/hcsshim/internal/guest/prot"
"github.com/sirupsen/logrus"
)

// Publisher provides a stable reference for container exit goroutines
// to publish notifications through. It survives bridge recreation
// during live migration — when the bridge is nil, notifications are dropped.
type Publisher struct {
mu sync.Mutex
b *Bridge
}

// SetBridge attaches or detaches the current bridge.
// Pass nil to detach (notifications will be dropped until a new bridge is set).
func (p *Publisher) SetBridge(b *Bridge) {
p.mu.Lock()
defer p.mu.Unlock()
p.b = b
}

// Publish sends a container notification to the current bridge.
// If no bridge is connected, the notification is dropped with a warning.
func (p *Publisher) Publish(n *prot.ContainerNotification) {
p.mu.Lock()
defer p.mu.Unlock()
if p.b == nil {
logrus.WithField("containerID", n.ContainerID).
Warn("bridge not connected, dropping container notification")
return
}
p.b.PublishNotification(n)
}
26 changes: 26 additions & 0 deletions internal/guest/bridge/publisher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
//go:build linux

package bridge

import (
"testing"

"github.com/Microsoft/hcsshim/internal/guest/prot"
)

func TestPublisher_NilBridge(t *testing.T) {
p := &Publisher{}
// Should not panic when bridge is nil
p.Publish(&prot.ContainerNotification{
MessageBase: prot.MessageBase{ContainerID: "test"},
})
}

func TestPublisher_SetBridgeNil(t *testing.T) {
p := &Publisher{}
p.SetBridge(nil)
// Should not panic
p.Publish(&prot.ContainerNotification{
MessageBase: prot.MessageBase{ContainerID: "test"},
})
}
Loading