From fb8eecae2530aebdfcc25003cf8fdc819d479afa Mon Sep 17 00:00:00 2001 From: Kristoffer Dalby Date: Wed, 13 May 2026 13:20:22 +0000 Subject: [PATCH] state: defer HA failover when probe target reconnected mid-cycle The HA prober dispatches a PingRequest, waits ProbeTimeout (5s), and marks the node unhealthy if no callback arrives. A node that bounced its poll session between probe cycles satisfies two conditions that conspire to fail TestHASubnetRouterFailover: a probe queued against the previous session is silently dropped when the worker writes to the closed connection (timeout always fires), and a probe sent immediately after reconnect lands while wgengine is still rebuilding magicsock state from the new netmap. Either path installs a spurious unhealthy bit, which sends the preserved-primary anti-flap the wrong way. Record the session observed at dispatch time and drop the timeout path if the node reconnected since. Require the session to survive a full probe cycle before a timeout can drive a failover. --- hscontrol/servertest/ha_health_test.go | 71 +++++++++++++++++++ hscontrol/state/ha_health.go | 96 +++++++++++++++++++++++--- 2 files changed, 156 insertions(+), 11 deletions(-) diff --git a/hscontrol/servertest/ha_health_test.go b/hscontrol/servertest/ha_health_test.go index b09ed7d3..5178db5a 100644 --- a/hscontrol/servertest/ha_health_test.go +++ b/hscontrol/servertest/ha_health_test.go @@ -283,6 +283,77 @@ func TestHAHealthProbe_SetUnhealthyNoRoutesIsNoOp(t *testing.T) { "SetNodeUnhealthy on node with no approved routes should be a no-op") } +// TestHAHealthProbe_ReconnectDuringProbeKeepsHealthy reproduces the +// race that surfaced as a TestHASubnetRouterFailover flake: a probe +// dispatched against the previous poll session sees the timeout fire +// while the client is briefly disconnected. With the session guard in +// [HAHealthProber.ProbeOnce], the timeout path observes the reconnect +// and bails out instead of installing a spurious Unhealthy bit. +// +// Without the guard, the primary fails over to the standby and the +// anti-flap election preserves that choice even after the original +// primary is fully back online. +func TestHAHealthProbe_ReconnectDuringProbeKeepsHealthy(t *testing.T) { + t.Parallel() + + srv := servertest.NewServer(t) + user := srv.CreateUser(t, "ha-probe-reconnect") + + route := netip.MustParsePrefix("10.102.0.0/24") + + c1 := servertest.NewClient(t, srv, "ha-pr-r1", servertest.WithUser(user)) + c2 := servertest.NewClient(t, srv, "ha-pr-r2", servertest.WithUser(user)) + + c1.WaitForPeers(t, 1, 10*time.Second) + c2.WaitForPeers(t, 1, 10*time.Second) + + nodeID1 := advertiseAndApproveRoute(t, srv, c1, route) + advertiseAndApproveRoute(t, srv, c2, route) + + // Node 1 is primary (lowest ID, healthy). + require.Contains(t, + srv.State().GetNodePrimaryRoutes(nodeID1), route, + "node 1 should be primary initially") + + prober := state.NewHAHealthProber( + srv.State(), + types.HARouteConfig{ + ProbeInterval: 30 * time.Second, + ProbeTimeout: 2 * time.Second, + }, + srv.URL, + srv.App.MapBatcher().IsConnected, + ) + + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + defer cancel() + + // TestClient does not implement ping responses, so every probe + // times out. We exploit that to observe the timeout path under a + // reconnect race: kick a probe in a goroutine, bounce the + // primary's poll session, and confirm the prober drops the stale + // timeout instead of marking the node unhealthy. + done := make(chan struct{}) + + go func() { + defer close(done) + + prober.ProbeOnce(ctx, srv.App.Change) + }() + + c1.Disconnect(t) + c1.Reconnect(t) + c1.WaitForPeers(t, 1, 10*time.Second) + + <-done + + assert.True(t, srv.State().IsNodeHealthy(nodeID1), + "reconnect during probe must not flip node unhealthy") + assert.Contains(t, + srv.State().GetNodePrimaryRoutes(nodeID1), route, + "node 1 should remain primary after stale-probe timeout") +} + // TestHAHealthProbe_NoHARoutes verifies that the prober is a no-op // when no HA configuration exists. func TestHAHealthProbe_NoHARoutes(t *testing.T) { diff --git a/hscontrol/state/ha_health.go b/hscontrol/state/ha_health.go index 2b3e247f..19f9331f 100644 --- a/hscontrol/state/ha_health.go +++ b/hscontrol/state/ha_health.go @@ -8,6 +8,7 @@ import ( "github.com/juanfont/headscale/hscontrol/types" "github.com/juanfont/headscale/hscontrol/types/change" "github.com/juanfont/headscale/hscontrol/util/zlog/zf" + "github.com/puzpuzpuz/xsync/v4" "github.com/rs/zerolog/log" "tailscale.com/tailcfg" "tailscale.com/util/set" @@ -20,6 +21,11 @@ type HAHealthProber struct { cfg types.HARouteConfig serverURL string isConnected func(types.NodeID) bool + + // lastStableSession defers a timeout-driven unhealthy decision + // for sessions younger than one probe cycle, giving wgengine + // time to apply the new netmap on a freshly reconnected node. + lastStableSession *xsync.Map[types.NodeID, uint64] } // NewHAHealthProber creates a prober that uses the given State for @@ -32,34 +38,68 @@ func NewHAHealthProber( isConnected func(types.NodeID) bool, ) *HAHealthProber { return &HAHealthProber{ - state: s, - cfg: cfg, - serverURL: serverURL, - isConnected: isConnected, + state: s, + cfg: cfg, + serverURL: serverURL, + isConnected: isConnected, + lastStableSession: xsync.NewMap[types.NodeID, uint64](), } } -// ProbeOnce pings all HA subnet router nodes. PingNode changes are -// dispatched immediately via dispatch so nodes can respond before the -// timeout. Health-related policy changes are also dispatched inline. +// markSessionStable records session and returns true iff the same +// value was already present from a prior cycle. +func (p *HAHealthProber) markSessionStable(id types.NodeID, session uint64) bool { + prev, loaded := p.lastStableSession.LoadAndStore(id, session) + return loaded && prev == session +} + +// forgetSession drops the recorded session so a node returning to +// HA candidacy starts fresh. +func (p *HAHealthProber) forgetSession(id types.NodeID) { + p.lastStableSession.Delete(id) +} + +// ProbeOnce pings all HA subnet router nodes and dispatches health +// changes inline. A timeout that fires after the node reconnected, +// or against a session younger than one probe cycle, is dropped so +// wgengine has time to apply the new netmap before a failover. func (p *HAHealthProber) ProbeOnce( ctx context.Context, dispatch func(...change.Change), ) { haNodes := p.state.nodeStore.HANodes() + + // Drop stable-session entries for nodes that are no longer HA + // candidates so a future reappearance starts fresh. + seen := make(set.Set[types.NodeID]) + + for _, nodes := range haNodes { + for _, id := range nodes { + seen.Add(id) + } + } + + p.lastStableSession.Range(func(id types.NodeID, _ uint64) bool { + if !seen.Contains(id) { + p.lastStableSession.Delete(id) + } + + return true + }) + if len(haNodes) == 0 { return } // Deduplicate node IDs across prefixes. - seen := make(set.Set[types.NodeID]) - var nodeIDs []types.NodeID + dedup := make(set.Set[types.NodeID]) + for _, nodes := range haNodes { for _, id := range nodes { - if !seen.Contains(id) { - seen.Add(id) + if !dedup.Contains(id) { + dedup.Add(id) nodeIDs = append(nodeIDs, id) } } @@ -77,9 +117,19 @@ func (p *HAHealthProber) ProbeOnce( Uint64(zf.NodeID, id.Uint64()). Msg("HA probe: skipping offline node") + p.forgetSession(id) + continue } + nv, ok := p.state.GetNodeByID(id) + if !ok { + continue + } + + probeSession := nv.SessionEpoch() + stable := p.markSessionStable(id, probeSession) + pingID, responseCh := p.state.RegisterPing(id) callbackURL := p.serverURL + "/machine/ping-response?id=" + pingID @@ -117,6 +167,30 @@ func (p *HAHealthProber) ProbeOnce( return } + curr, ok := p.state.GetNodeByID(id) + if !ok { + return + } + + if curr.SessionEpoch() != probeSession { + log.Debug(). + Uint64(zf.NodeID, id.Uint64()). + Uint64("probe_session", probeSession). + Uint64("current_session", curr.SessionEpoch()). + Msg("HA probe: node reconnected during probe, skipping") + + return + } + + if !stable { + log.Debug(). + Uint64(zf.NodeID, id.Uint64()). + Uint64("probe_session", probeSession). + Msg("HA probe: probe of fresh session timed out, deferring to next cycle") + + return + } + log.Warn(). Uint64(zf.NodeID, id.Uint64()). Dur("timeout", p.cfg.ProbeTimeout).