From de6be71a8630f00d84e78af056700ef83519132c Mon Sep 17 00:00:00 2001 From: Kristoffer Dalby Date: Thu, 14 May 2026 17:04:12 +0000 Subject: [PATCH] state: batch HA probe results so dual-disconnect cannot flap primary MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit requirePrimaryStable in TestHASubnetRouterFailoverDockerDisconnect Phase 5a (simultaneous cable-pull of both routers) intermittently caught the primary flipping to the offline r1. Both probe goroutines mark their target unhealthy back-to-back; SetNodeUnhealthy publishes a fresh NodeStore snapshot each call, so the intermediate snapshot — r1 unhealthy, r2 still healthy — runs the election with one healthy candidate left and picks it. The next snapshot then enters the all-unhealthy preserve-prev path, which preserves the wrong choice. Collect probe results from the cycle and apply them through a new NodeStore.UpdateNodes batched op so the election only runs once, with the cycle's final health state. PolicyChange dispatch moves outside the wg.Go goroutines and fires once if the primary assignment actually changed. --- hscontrol/servertest/ha_dynamic_test.go | 2 +- hscontrol/servertest/ha_health_test.go | 16 ++--- hscontrol/state/ha_health.go | 63 ++++++++++++----- hscontrol/state/node_store.go | 93 ++++++++++++++----------- hscontrol/state/primaries_test.go | 4 +- hscontrol/state/state.go | 67 +++++++++++++----- 6 files changed, 157 insertions(+), 88 deletions(-) diff --git a/hscontrol/servertest/ha_dynamic_test.go b/hscontrol/servertest/ha_dynamic_test.go index 86a223a6..1ceb505c 100644 --- a/hscontrol/servertest/ha_dynamic_test.go +++ b/hscontrol/servertest/ha_dynamic_test.go @@ -72,7 +72,7 @@ func TestHAFailover_ViewerSeesPrimaryFlip(t *testing.T) { return hasPeerPrimaryRoute(nm, "dyn-flip-r1", route) }) - changed := srv.State().SetNodeUnhealthy(id1, true) + changed := srv.State().SetNodeHealth(id1, false) require.True(t, changed, "marking primary unhealthy should change primaries") srv.App.Change(change.PolicyChange()) diff --git a/hscontrol/servertest/ha_health_test.go b/hscontrol/servertest/ha_health_test.go index 5178db5a..c56eab63 100644 --- a/hscontrol/servertest/ha_health_test.go +++ b/hscontrol/servertest/ha_health_test.go @@ -111,7 +111,7 @@ func TestHAHealthProbe_UnhealthyFailover(t *testing.T) { require.Contains(t, primaries, route, "node 1 should be primary initially") // Mark node 1 unhealthy — should failover to node 2. - changed := srv.State().SetNodeUnhealthy(nodeID1, true) + changed := srv.State().SetNodeHealth(nodeID1, false) assert.True(t, changed, "marking primary unhealthy should change primaries") primaries2 := srv.State().GetNodePrimaryRoutes(nodeID2) @@ -141,12 +141,12 @@ func TestHAHealthProbe_RecoveryNoFlap(t *testing.T) { nodeID2 := advertiseAndApproveRoute(t, srv, c2, route) // Failover: node 1 → node 2. - srv.State().SetNodeUnhealthy(nodeID1, true) + srv.State().SetNodeHealth(nodeID1, false) primaries := srv.State().GetNodePrimaryRoutes(nodeID2) require.Contains(t, primaries, route, "node 2 should be primary") // Recovery: node 1 healthy again. Node 2 should STAY primary. - changed := srv.State().SetNodeUnhealthy(nodeID1, false) + changed := srv.State().SetNodeHealth(nodeID1, true) assert.False(t, changed, "recovery should not change primaries (no flap)") primaries = srv.State().GetNodePrimaryRoutes(nodeID2) @@ -173,7 +173,7 @@ func TestHAHealthProbe_ConnectClearsUnhealthy(t *testing.T) { advertiseAndApproveRoute(t, srv, c2, route) // Mark unhealthy. - srv.State().SetNodeUnhealthy(nodeID1, true) + srv.State().SetNodeHealth(nodeID1, false) assert.False(t, srv.State().IsNodeHealthy(nodeID1)) // Reconnect clears unhealthy via State.Connect → ClearUnhealthy. @@ -208,7 +208,7 @@ func TestHAHealthProbe_SetApprovedRoutesEmptyClearsUnhealthy(t *testing.T) { nodeID1 := advertiseAndApproveRoute(t, srv, c1, route) advertiseAndApproveRoute(t, srv, c2, route) - srv.State().SetNodeUnhealthy(nodeID1, true) + srv.State().SetNodeHealth(nodeID1, false) require.False(t, srv.State().IsNodeHealthy(nodeID1)) _, _, err := srv.State().SetApprovedRoutes(nodeID1, nil) @@ -242,7 +242,7 @@ func TestHAHealthProbe_DisconnectClearsUnhealthy(t *testing.T) { nodeID1 := advertiseAndApproveRoute(t, srv, c1, route) advertiseAndApproveRoute(t, srv, c2, route) - srv.State().SetNodeUnhealthy(nodeID1, true) + srv.State().SetNodeHealth(nodeID1, false) require.False(t, srv.State().IsNodeHealthy(nodeID1)) c1.Disconnect(t) @@ -277,10 +277,10 @@ func TestHAHealthProbe_SetUnhealthyNoRoutesIsNoOp(t *testing.T) { _, _, err := srv.State().SetApprovedRoutes(nodeID1, nil) require.NoError(t, err) - srv.State().SetNodeUnhealthy(nodeID1, true) + srv.State().SetNodeHealth(nodeID1, false) assert.True(t, srv.State().IsNodeHealthy(nodeID1), - "SetNodeUnhealthy on node with no approved routes should be a no-op") + "SetNodeHealth(false) on node with no approved routes should be a no-op") } // TestHAHealthProbe_ReconnectDuringProbeKeepsHealthy reproduces the diff --git a/hscontrol/state/ha_health.go b/hscontrol/state/ha_health.go index 19f9331f..6442902a 100644 --- a/hscontrol/state/ha_health.go +++ b/hscontrol/state/ha_health.go @@ -3,6 +3,7 @@ package state import ( "context" "sync" + "sync/atomic" "time" "github.com/juanfont/headscale/hscontrol/types" @@ -59,10 +60,14 @@ func (p *HAHealthProber) forgetSession(id types.NodeID) { p.lastStableSession.Delete(id) } -// ProbeOnce pings all HA subnet router nodes and dispatches health -// changes inline. A timeout that fires after the node reconnected, -// or against a session younger than one probe cycle, is dropped so -// wgengine has time to apply the new netmap before a failover. +// ProbeOnce pings every HA subnet router and applies the cycle's +// results in one batch so the election sees a single transition. +// Per-result snapshots could otherwise elect a node that the next +// snapshot demotes again, flipping primary onto an unreachable peer. +// A timeout that fires after the node reconnected, or against a +// session younger than one probe cycle, is dropped so wgengine has +// time to apply the new netmap before silence is read as +// unreachability. func (p *HAHealthProber) ProbeOnce( ctx context.Context, dispatch func(...change.Change), @@ -109,7 +114,11 @@ func (p *HAHealthProber) ProbeOnce( Int("haNodes", len(nodeIDs)). Msg("HA health prober starting probe cycle") - var wg sync.WaitGroup + var ( + wg sync.WaitGroup + results = xsync.NewMap[types.NodeID, bool]() + deferred atomic.Bool + ) for _, id := range nodeIDs { if !p.isConnected(id) { @@ -148,13 +157,7 @@ func (p *HAHealthProber) ProbeOnce( Dur("latency", latency). Msg("HA probe: node responded") - if p.state.SetNodeUnhealthy(id, false) { - dispatch(change.PolicyChange()) - - log.Info(). - Uint64(zf.NodeID, id.Uint64()). - Msg("HA probe: node recovered, recalculating primaries") - } + results.Store(id, true) case <-timer.C: p.state.CancelPing(pingID) @@ -179,6 +182,8 @@ func (p *HAHealthProber) ProbeOnce( Uint64("current_session", curr.SessionEpoch()). Msg("HA probe: node reconnected during probe, skipping") + deferred.Store(true) + return } @@ -188,6 +193,8 @@ func (p *HAHealthProber) ProbeOnce( Uint64("probe_session", probeSession). Msg("HA probe: probe of fresh session timed out, deferring to next cycle") + deferred.Store(true) + return } @@ -196,13 +203,7 @@ func (p *HAHealthProber) ProbeOnce( Dur("timeout", p.cfg.ProbeTimeout). Msg("HA probe: node did not respond") - if p.state.SetNodeUnhealthy(id, true) { - dispatch(change.PolicyChange()) - - log.Info(). - Uint64(zf.NodeID, id.Uint64()). - Msg("HA probe: node unhealthy, triggering failover") - } + results.Store(id, false) case <-ctx.Done(): p.state.CancelPing(pingID) @@ -211,4 +212,28 @@ func (p *HAHealthProber) ProbeOnce( } wg.Wait() + + // When any probe in the cycle was deferred (fresh session or + // reconnected mid-probe), drop the whole cycle's results: a partial + // batch lets the election pick a node whose connectivity is still + // unknown. The next cycle will run with stable sessions for every + // candidate and can decide on a complete picture. + if deferred.Load() { + return + } + + healthByNode := make(map[types.NodeID]bool, results.Size()) + results.Range(func(id types.NodeID, healthy bool) bool { + healthByNode[id] = healthy + + return true + }) + + if p.state.BatchSetNodeHealth(healthByNode) { + dispatch(change.PolicyChange()) + + log.Info(). + Int("haNodes", len(healthByNode)). + Msg("HA probe: health changed, triggering failover/recovery") + } } diff --git a/hscontrol/state/node_store.go b/hscontrol/state/node_store.go index a2066f15..ec3a808e 100644 --- a/hscontrol/state/node_store.go +++ b/hscontrol/state/node_store.go @@ -35,9 +35,9 @@ var ( const ( put = 1 del = 2 - update = 3 rebuildPeerMaps = 4 setName = 5 + updateMulti = 6 ) const prometheusNamespace = "headscale" @@ -166,14 +166,18 @@ type work struct { op int nodeID types.NodeID node types.Node - updateFn UpdateNodeFunc result chan struct{} - nodeResult chan types.NodeView // Channel to return the resulting node after batch application + nodeResult chan types.NodeView // For rebuildPeerMaps operation rebuildResult chan struct{} // For setName operation (admin rename, reject-on-collision path). name string errResult chan error + // For updateMulti: per-node update functions applied as a single + // batch entry so callers that need an atomic election (e.g. the HA + // prober applying multiple probe results at once) cannot have a + // partial snapshot published between the updates. + multiUpdates map[types.NodeID]UpdateNodeFunc } // PutNode adds or updates a node in the store. @@ -210,45 +214,55 @@ func (s *NodeStore) PutNode(n types.Node) types.NodeView { // UpdateNodeFunc is a function type that takes a pointer to a Node and modifies it. type UpdateNodeFunc func(n *types.Node) -// UpdateNode applies a function to modify a specific node in the store. -// This is a blocking operation that waits for the write to complete. -// This is analogous to a database "transaction", or, the caller should -// rather collect all data they want to change, and then call this function. -// Fewer calls are better. -// Returns the resulting node after all modifications in the batch have been applied. +// UpdateNode applies a function to modify a specific node in the +// store. Single-node convenience wrapper around [NodeStore.UpdateNodes] +// — the writer goroutine signals completion only after the post-batch +// snapshot has been stored, so the follow-up GetNode read sees the +// applied update. Returns the resulting node and whether it exists. // -// TODO(kradalby): Technically we could have a version of this that modifies the node -// in the current snapshot if _we know_ that the change will not affect the peer relationships. -// This is because the main nodesByID map contains the struct, and every other map is using a -// pointer to the underlying struct. The gotcha with this is that we will need to introduce -// a lock around the nodesByID map to ensure that no other writes are happening -// while we are modifying the node. Which mean we would need to implement read-write locks -// on all read operations. -func (s *NodeStore) UpdateNode(nodeID types.NodeID, updateFn func(n *types.Node)) (types.NodeView, bool) { +// Callers that need to change several nodes atomically should call +// UpdateNodes directly; collecting changes into one batch keeps the +// election from running on a half-applied snapshot. +func (s *NodeStore) UpdateNode(nodeID types.NodeID, updateFn UpdateNodeFunc) (types.NodeView, bool) { timer := prometheus.NewTimer(nodeStoreOperationDuration.WithLabelValues("update")) defer timer.ObserveDuration() - work := work{ - op: update, - nodeID: nodeID, - updateFn: updateFn, - result: make(chan struct{}), - nodeResult: make(chan types.NodeView, 1), + s.UpdateNodes(map[types.NodeID]UpdateNodeFunc{nodeID: updateFn}) + + nodeStoreOperations.WithLabelValues("update").Inc() + + return s.GetNode(nodeID) +} + +// UpdateNodes applies per-node update functions in a single atomic +// batch. The election that recomputes primary routes runs once, after +// every update has landed, so callers cannot observe an intermediate +// snapshot where only some of the updates are visible. Use this when +// the order in which two writers' updates are individually published +// would change the election outcome — e.g. the HA prober applying +// concurrent probe-timeout results. +func (s *NodeStore) UpdateNodes(updates map[types.NodeID]UpdateNodeFunc) { + timer := prometheus.NewTimer(nodeStoreOperationDuration.WithLabelValues("update_multi")) + defer timer.ObserveDuration() + + if len(updates) == 0 { + return + } + + w := work{ + op: updateMulti, + multiUpdates: updates, + result: make(chan struct{}), } nodeStoreQueueDepth.Inc() - s.writeQueue <- work + s.writeQueue <- w - <-work.result + <-w.result nodeStoreQueueDepth.Dec() - resultNode := <-work.nodeResult - - nodeStoreOperations.WithLabelValues("update").Inc() - - // Return the node and whether it exists (is valid) - return resultNode, resultNode.Valid() + nodeStoreOperations.WithLabelValues("update_multi").Inc() } // DeleteNode removes a node from the store by its ID. @@ -405,20 +419,21 @@ func (s *NodeStore) applyBatch(batch []work) { if w.nodeResult != nil { nodeResultRequests[w.nodeID] = append(nodeResultRequests[w.nodeID], w) } - case update: - // Update the specific node identified by nodeID - if n, exists := nodes[w.nodeID]; exists { + case updateMulti: + for id, fn := range w.multiUpdates { + n, exists := nodes[id] + if !exists { + continue + } + oldGivenName := n.GivenName - w.updateFn(&n) + fn(&n) if n.GivenName != oldGivenName { n.GivenName = resolveGivenName(nodes, n.ID, n.GivenName) } - nodes[w.nodeID] = n - } - if w.nodeResult != nil { - nodeResultRequests[w.nodeID] = append(nodeResultRequests[w.nodeID], w) + nodes[id] = n } case del: delete(nodes, w.nodeID) diff --git a/hscontrol/state/primaries_test.go b/hscontrol/state/primaries_test.go index ab6116a3..c854f6da 100644 --- a/hscontrol/state/primaries_test.go +++ b/hscontrol/state/primaries_test.go @@ -72,7 +72,7 @@ func (f *primariesFixture) disconnect(id types.NodeID) { }) } -// unhealthy mirrors State.SetNodeUnhealthy(id, true). +// unhealthy mirrors State.SetNodeHealth(id, false). func (f *primariesFixture) unhealthy(id types.NodeID) { f.t.Helper() f.ns.UpdateNode(id, func(n *types.Node) { @@ -80,7 +80,7 @@ func (f *primariesFixture) unhealthy(id types.NodeID) { }) } -// healthy mirrors State.SetNodeUnhealthy(id, false). +// healthy mirrors State.SetNodeHealth(id, true). func (f *primariesFixture) healthy(id types.NodeID) { f.t.Helper() f.ns.UpdateNode(id, func(n *types.Node) { diff --git a/hscontrol/state/state.go b/hscontrol/state/state.go index 5ff95e2c..be824b36 100644 --- a/hscontrol/state/state.go +++ b/hscontrol/state/state.go @@ -1222,35 +1222,64 @@ func (s *State) IsNodeHealthy(id types.NodeID) bool { return s.nodeStore.IsNodeHealthy(id) } -// SetNodeUnhealthy flips the runtime Unhealthy bit and reports whether -// the resulting primary route assignment changed, so the HA prober can -// decide whether to fan out a PolicyChange. -// -// A request to mark a node Unhealthy is dropped if the node is no -// longer an HA candidate (offline or no approved routes). The prober -// reads HANodes() at the start of a probe cycle and writes back after -// the timeout fires; in that window the node may have left the -// candidate set, and the bit would just be stale. The check happens +// SetNodeHealth flips the runtime health bit for one node and reports +// whether the resulting primary-route assignment changed, so the HA +// prober can decide whether to fan out a PolicyChange. true means +// healthy; false means unhealthy. An unhealthy mark is dropped when +// the node is no longer an HA candidate (offline or no approved +// routes) — between probe dispatch and result the node may have left +// candidacy, and the bit would just be stale. The check happens // inside the writer goroutine so it serialises against the -// SetApprovedRoutes / Disconnect that removed candidacy. -func (s *State) SetNodeUnhealthy(id types.NodeID, unhealthy bool) bool { +// SetApprovedRoutes / Disconnect that removed candidacy. Single-node +// convenience wrapper around [State.BatchSetNodeHealth]. +func (s *State) SetNodeHealth(id types.NodeID, healthy bool) bool { + return s.BatchSetNodeHealth(map[types.NodeID]bool{id: healthy}) +} + +// BatchSetNodeHealth applies a set of health updates atomically: the +// election runs once after every flag has been flipped, so observers +// never see an intermediate snapshot. Returns true when the +// primary-route assignment differs from before the batch so callers +// can gate a single PolicyChange dispatch. Map value true = healthy; +// false = unhealthy (gated as in [State.SetNodeHealth]). +// +// Per-call publication would let a writer applying two flips +// back-to-back elect a node that the next snapshot demotes, +// momentarily pointing peers at the wrong primary; the batched form +// closes that window. +func (s *State) BatchSetNodeHealth(updates map[types.NodeID]bool) bool { + if len(updates) == 0 { + return false + } + prevRoutes := s.nodeStore.PrimaryRoutes() - _, ok := s.nodeStore.UpdateNode(id, func(n *types.Node) { - if unhealthy { + fns := make(map[types.NodeID]UpdateNodeFunc, len(updates)) + for id, healthy := range updates { + fns[id] = healthSetter(healthy) + } + + s.nodeStore.UpdateNodes(fns) + + return !maps.Equal(prevRoutes, s.nodeStore.PrimaryRoutes()) +} + +// healthSetter returns an UpdateNodeFunc that flips n.Unhealthy to +// the inverse of healthy, with the same gate as [State.SetNodeHealth]: +// an unhealthy mark only sticks when the node is still online and +// still advertises approved routes, so a node that left HA candidacy +// between probe dispatch and result does not carry a stale bit. +func healthSetter(healthy bool) UpdateNodeFunc { + return func(n *types.Node) { + if !healthy { online := n.IsOnline != nil && *n.IsOnline if !online || len(n.AllApprovedRoutes()) == 0 { return } } - n.Unhealthy = unhealthy - }) - if !ok { - return false + n.Unhealthy = !healthy } - - return !maps.Equal(prevRoutes, s.nodeStore.PrimaryRoutes()) } // ValidateAPIKey checks if an API key is valid and active.