Skip to content

Commit 896ee1a

Browse files
committed
Harden auto-standby startup and reconnects
1 parent 76caba7 commit 896ee1a

4 files changed

Lines changed: 84 additions & 3 deletions

File tree

lib/autostandby/conntrack_events_linux.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
type conntrackStream struct {
2020
fd int
2121
closeOnce sync.Once
22+
done chan struct{}
2223
events chan ConnectionEvent
2324
errs chan error
2425
}
@@ -52,6 +53,7 @@ func (s *ConntrackSource) OpenStream(ctx context.Context) (ConnectionStream, err
5253

5354
stream := &conntrackStream{
5455
fd: fd,
56+
done: make(chan struct{}),
5557
events: make(chan ConnectionEvent, 256),
5658
errs: make(chan error, 16),
5759
}
@@ -66,6 +68,7 @@ func (s *conntrackStream) Errors() <-chan error { return s.errs }
6668
func (s *conntrackStream) Close() error {
6769
var err error
6870
s.closeOnce.Do(func() {
71+
close(s.done)
6972
err = unix.Close(s.fd)
7073
})
7174
return err
@@ -76,8 +79,11 @@ func (s *conntrackStream) run(ctx context.Context) {
7679
defer close(s.errs)
7780

7881
go func() {
79-
<-ctx.Done()
80-
_ = s.Close()
82+
select {
83+
case <-ctx.Done():
84+
_ = s.Close()
85+
case <-s.done:
86+
}
8187
}()
8288

8389
buf := make([]byte, 1<<20)

lib/autostandby/conntrack_events_linux_test.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,28 @@ import (
1313
"golang.org/x/sys/unix"
1414
)
1515

16+
func TestConntrackStreamCloseSignalsDone(t *testing.T) {
17+
t.Parallel()
18+
19+
fd, err := unix.Socket(unix.AF_UNIX, unix.SOCK_DGRAM, 0)
20+
require.NoError(t, err)
21+
22+
stream := &conntrackStream{
23+
fd: fd,
24+
done: make(chan struct{}),
25+
events: make(chan ConnectionEvent),
26+
errs: make(chan error),
27+
}
28+
29+
require.NoError(t, stream.Close())
30+
31+
select {
32+
case <-stream.done:
33+
default:
34+
t.Fatal("expected Close to signal stream shutdown")
35+
}
36+
}
37+
1638
func TestConnectionEventFromNetlinkMessageParsesIPv4TCPEvent(t *testing.T) {
1739
t.Parallel()
1840

lib/autostandby/controller.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,8 @@ func (c *Controller) Run(ctx context.Context) error {
186186
}
187187

188188
if err := c.startupResync(ctx); err != nil {
189-
return err
189+
c.recordControllerError("startup_resync")
190+
c.log.Warn("auto-standby startup resync failed", "error", err)
190191
}
191192

192193
instanceEvents, unsubscribe, err := c.store.SubscribeInstanceEvents()
@@ -399,6 +400,8 @@ func (c *Controller) startupResync(ctx context.Context) error {
399400
}
400401
conns, err := c.source.ListConnections(ctx)
401402
if err != nil {
403+
c.setObserverError(err)
404+
c.recordObserverError("startup_resync")
402405
c.recordStartupResync(start, "error")
403406
recordSpanError(span, err)
404407
return err

lib/autostandby/controller_test.go

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ type fakeInstanceStore struct {
1717
persistedRuntime map[string]*Runtime
1818
events chan InstanceEvent
1919
standbyErr error
20+
listErr error
2021
}
2122

2223
func newFakeInstanceStore(instances []Instance) *fakeInstanceStore {
@@ -28,6 +29,9 @@ func newFakeInstanceStore(instances []Instance) *fakeInstanceStore {
2829
}
2930

3031
func (f *fakeInstanceStore) ListInstances(context.Context) ([]Instance, error) {
32+
if f.listErr != nil {
33+
return nil, f.listErr
34+
}
3135
out := make([]Instance, 0, len(f.instances))
3236
for _, inst := range f.instances {
3337
out = append(out, cloneInstance(inst))
@@ -56,13 +60,21 @@ func (f *fakeInstanceStore) SubscribeInstanceEvents() (<-chan InstanceEvent, fun
5660

5761
type fakeConnectionSource struct {
5862
connections []Connection
63+
listErr error
64+
openErr error
5965
}
6066

6167
func (f *fakeConnectionSource) ListConnections(context.Context) ([]Connection, error) {
68+
if f.listErr != nil {
69+
return nil, f.listErr
70+
}
6271
return append([]Connection(nil), f.connections...), nil
6372
}
6473

6574
func (f *fakeConnectionSource) OpenStream(context.Context) (ConnectionStream, error) {
75+
if f.openErr != nil {
76+
return nil, f.openErr
77+
}
6678
return &fakeConnectionStream{
6779
events: make(chan ConnectionEvent),
6880
errs: make(chan error),
@@ -405,6 +417,44 @@ func TestStatusReportsObserverError(t *testing.T) {
405417
require.Equal(t, ReasonObserverError, status.Reason)
406418
}
407419

420+
func TestRunDegradesWhenStartupResyncFails(t *testing.T) {
421+
t.Parallel()
422+
423+
store := newFakeInstanceStore([]Instance{{
424+
ID: "inst-err",
425+
Name: "inst-err",
426+
State: StateRunning,
427+
NetworkEnabled: true,
428+
IP: "192.168.100.60",
429+
AutoStandby: &Policy{Enabled: true, IdleTimeout: "1m"},
430+
}})
431+
source := &fakeConnectionSource{
432+
listErr: errors.New("conntrack permission denied"),
433+
}
434+
controller := NewController(store, source, ControllerOptions{})
435+
436+
ctx, cancel := context.WithCancel(context.Background())
437+
defer cancel()
438+
439+
done := make(chan error, 1)
440+
go func() {
441+
done <- controller.Run(ctx)
442+
}()
443+
444+
select {
445+
case err := <-done:
446+
require.NoError(t, err, "controller should wait for cancellation instead of exiting on startup resync failure")
447+
case <-time.After(50 * time.Millisecond):
448+
}
449+
450+
cancel()
451+
require.NoError(t, <-done)
452+
453+
status := controller.Describe(store.instances[0])
454+
require.Equal(t, StatusError, status.Status)
455+
require.Equal(t, ReasonObserverError, status.Reason)
456+
}
457+
408458
func mustAddr(raw string) netip.Addr {
409459
return netip.MustParseAddr(raw)
410460
}

0 commit comments

Comments
 (0)