diff --git a/hscontrol/servertest/ha_dynamic_test.go b/hscontrol/servertest/ha_dynamic_test.go index 86a223a6..1ceb505c 100644 --- a/hscontrol/servertest/ha_dynamic_test.go +++ b/hscontrol/servertest/ha_dynamic_test.go @@ -72,7 +72,7 @@ func TestHAFailover_ViewerSeesPrimaryFlip(t *testing.T) { return hasPeerPrimaryRoute(nm, "dyn-flip-r1", route) }) - changed := srv.State().SetNodeUnhealthy(id1, true) + changed := srv.State().SetNodeHealth(id1, false) require.True(t, changed, "marking primary unhealthy should change primaries") srv.App.Change(change.PolicyChange()) diff --git a/hscontrol/servertest/ha_health_test.go b/hscontrol/servertest/ha_health_test.go index 5178db5a..c56eab63 100644 --- a/hscontrol/servertest/ha_health_test.go +++ b/hscontrol/servertest/ha_health_test.go @@ -111,7 +111,7 @@ func TestHAHealthProbe_UnhealthyFailover(t *testing.T) { require.Contains(t, primaries, route, "node 1 should be primary initially") // Mark node 1 unhealthy — should failover to node 2. - changed := srv.State().SetNodeUnhealthy(nodeID1, true) + changed := srv.State().SetNodeHealth(nodeID1, false) assert.True(t, changed, "marking primary unhealthy should change primaries") primaries2 := srv.State().GetNodePrimaryRoutes(nodeID2) @@ -141,12 +141,12 @@ func TestHAHealthProbe_RecoveryNoFlap(t *testing.T) { nodeID2 := advertiseAndApproveRoute(t, srv, c2, route) // Failover: node 1 → node 2. - srv.State().SetNodeUnhealthy(nodeID1, true) + srv.State().SetNodeHealth(nodeID1, false) primaries := srv.State().GetNodePrimaryRoutes(nodeID2) require.Contains(t, primaries, route, "node 2 should be primary") // Recovery: node 1 healthy again. Node 2 should STAY primary. - changed := srv.State().SetNodeUnhealthy(nodeID1, false) + changed := srv.State().SetNodeHealth(nodeID1, true) assert.False(t, changed, "recovery should not change primaries (no flap)") primaries = srv.State().GetNodePrimaryRoutes(nodeID2) @@ -173,7 +173,7 @@ func TestHAHealthProbe_ConnectClearsUnhealthy(t *testing.T) { advertiseAndApproveRoute(t, srv, c2, route) // Mark unhealthy. - srv.State().SetNodeUnhealthy(nodeID1, true) + srv.State().SetNodeHealth(nodeID1, false) assert.False(t, srv.State().IsNodeHealthy(nodeID1)) // Reconnect clears unhealthy via State.Connect → ClearUnhealthy. @@ -208,7 +208,7 @@ func TestHAHealthProbe_SetApprovedRoutesEmptyClearsUnhealthy(t *testing.T) { nodeID1 := advertiseAndApproveRoute(t, srv, c1, route) advertiseAndApproveRoute(t, srv, c2, route) - srv.State().SetNodeUnhealthy(nodeID1, true) + srv.State().SetNodeHealth(nodeID1, false) require.False(t, srv.State().IsNodeHealthy(nodeID1)) _, _, err := srv.State().SetApprovedRoutes(nodeID1, nil) @@ -242,7 +242,7 @@ func TestHAHealthProbe_DisconnectClearsUnhealthy(t *testing.T) { nodeID1 := advertiseAndApproveRoute(t, srv, c1, route) advertiseAndApproveRoute(t, srv, c2, route) - srv.State().SetNodeUnhealthy(nodeID1, true) + srv.State().SetNodeHealth(nodeID1, false) require.False(t, srv.State().IsNodeHealthy(nodeID1)) c1.Disconnect(t) @@ -277,10 +277,10 @@ func TestHAHealthProbe_SetUnhealthyNoRoutesIsNoOp(t *testing.T) { _, _, err := srv.State().SetApprovedRoutes(nodeID1, nil) require.NoError(t, err) - srv.State().SetNodeUnhealthy(nodeID1, true) + srv.State().SetNodeHealth(nodeID1, false) assert.True(t, srv.State().IsNodeHealthy(nodeID1), - "SetNodeUnhealthy on node with no approved routes should be a no-op") + "SetNodeHealth(false) on node with no approved routes should be a no-op") } // TestHAHealthProbe_ReconnectDuringProbeKeepsHealthy reproduces the diff --git a/hscontrol/state/ha_health.go b/hscontrol/state/ha_health.go index 19f9331f..6442902a 100644 --- a/hscontrol/state/ha_health.go +++ b/hscontrol/state/ha_health.go @@ -3,6 +3,7 @@ package state import ( "context" "sync" + "sync/atomic" "time" "github.com/juanfont/headscale/hscontrol/types" @@ -59,10 +60,14 @@ func (p *HAHealthProber) forgetSession(id types.NodeID) { p.lastStableSession.Delete(id) } -// ProbeOnce pings all HA subnet router nodes and dispatches health -// changes inline. A timeout that fires after the node reconnected, -// or against a session younger than one probe cycle, is dropped so -// wgengine has time to apply the new netmap before a failover. +// ProbeOnce pings every HA subnet router and applies the cycle's +// results in one batch so the election sees a single transition. +// Per-result snapshots could otherwise elect a node that the next +// snapshot demotes again, flipping primary onto an unreachable peer. +// A timeout that fires after the node reconnected, or against a +// session younger than one probe cycle, is dropped so wgengine has +// time to apply the new netmap before silence is read as +// unreachability. func (p *HAHealthProber) ProbeOnce( ctx context.Context, dispatch func(...change.Change), @@ -109,7 +114,11 @@ func (p *HAHealthProber) ProbeOnce( Int("haNodes", len(nodeIDs)). Msg("HA health prober starting probe cycle") - var wg sync.WaitGroup + var ( + wg sync.WaitGroup + results = xsync.NewMap[types.NodeID, bool]() + deferred atomic.Bool + ) for _, id := range nodeIDs { if !p.isConnected(id) { @@ -148,13 +157,7 @@ func (p *HAHealthProber) ProbeOnce( Dur("latency", latency). Msg("HA probe: node responded") - if p.state.SetNodeUnhealthy(id, false) { - dispatch(change.PolicyChange()) - - log.Info(). - Uint64(zf.NodeID, id.Uint64()). - Msg("HA probe: node recovered, recalculating primaries") - } + results.Store(id, true) case <-timer.C: p.state.CancelPing(pingID) @@ -179,6 +182,8 @@ func (p *HAHealthProber) ProbeOnce( Uint64("current_session", curr.SessionEpoch()). Msg("HA probe: node reconnected during probe, skipping") + deferred.Store(true) + return } @@ -188,6 +193,8 @@ func (p *HAHealthProber) ProbeOnce( Uint64("probe_session", probeSession). Msg("HA probe: probe of fresh session timed out, deferring to next cycle") + deferred.Store(true) + return } @@ -196,13 +203,7 @@ func (p *HAHealthProber) ProbeOnce( Dur("timeout", p.cfg.ProbeTimeout). Msg("HA probe: node did not respond") - if p.state.SetNodeUnhealthy(id, true) { - dispatch(change.PolicyChange()) - - log.Info(). - Uint64(zf.NodeID, id.Uint64()). - Msg("HA probe: node unhealthy, triggering failover") - } + results.Store(id, false) case <-ctx.Done(): p.state.CancelPing(pingID) @@ -211,4 +212,28 @@ func (p *HAHealthProber) ProbeOnce( } wg.Wait() + + // When any probe in the cycle was deferred (fresh session or + // reconnected mid-probe), drop the whole cycle's results: a partial + // batch lets the election pick a node whose connectivity is still + // unknown. The next cycle will run with stable sessions for every + // candidate and can decide on a complete picture. + if deferred.Load() { + return + } + + healthByNode := make(map[types.NodeID]bool, results.Size()) + results.Range(func(id types.NodeID, healthy bool) bool { + healthByNode[id] = healthy + + return true + }) + + if p.state.BatchSetNodeHealth(healthByNode) { + dispatch(change.PolicyChange()) + + log.Info(). + Int("haNodes", len(healthByNode)). + Msg("HA probe: health changed, triggering failover/recovery") + } } diff --git a/hscontrol/state/node_store.go b/hscontrol/state/node_store.go index a2066f15..ec3a808e 100644 --- a/hscontrol/state/node_store.go +++ b/hscontrol/state/node_store.go @@ -35,9 +35,9 @@ var ( const ( put = 1 del = 2 - update = 3 rebuildPeerMaps = 4 setName = 5 + updateMulti = 6 ) const prometheusNamespace = "headscale" @@ -166,14 +166,18 @@ type work struct { op int nodeID types.NodeID node types.Node - updateFn UpdateNodeFunc result chan struct{} - nodeResult chan types.NodeView // Channel to return the resulting node after batch application + nodeResult chan types.NodeView // For rebuildPeerMaps operation rebuildResult chan struct{} // For setName operation (admin rename, reject-on-collision path). name string errResult chan error + // For updateMulti: per-node update functions applied as a single + // batch entry so callers that need an atomic election (e.g. the HA + // prober applying multiple probe results at once) cannot have a + // partial snapshot published between the updates. + multiUpdates map[types.NodeID]UpdateNodeFunc } // PutNode adds or updates a node in the store. @@ -210,45 +214,55 @@ func (s *NodeStore) PutNode(n types.Node) types.NodeView { // UpdateNodeFunc is a function type that takes a pointer to a Node and modifies it. type UpdateNodeFunc func(n *types.Node) -// UpdateNode applies a function to modify a specific node in the store. -// This is a blocking operation that waits for the write to complete. -// This is analogous to a database "transaction", or, the caller should -// rather collect all data they want to change, and then call this function. -// Fewer calls are better. -// Returns the resulting node after all modifications in the batch have been applied. +// UpdateNode applies a function to modify a specific node in the +// store. Single-node convenience wrapper around [NodeStore.UpdateNodes] +// — the writer goroutine signals completion only after the post-batch +// snapshot has been stored, so the follow-up GetNode read sees the +// applied update. Returns the resulting node and whether it exists. // -// TODO(kradalby): Technically we could have a version of this that modifies the node -// in the current snapshot if _we know_ that the change will not affect the peer relationships. -// This is because the main nodesByID map contains the struct, and every other map is using a -// pointer to the underlying struct. The gotcha with this is that we will need to introduce -// a lock around the nodesByID map to ensure that no other writes are happening -// while we are modifying the node. Which mean we would need to implement read-write locks -// on all read operations. -func (s *NodeStore) UpdateNode(nodeID types.NodeID, updateFn func(n *types.Node)) (types.NodeView, bool) { +// Callers that need to change several nodes atomically should call +// UpdateNodes directly; collecting changes into one batch keeps the +// election from running on a half-applied snapshot. +func (s *NodeStore) UpdateNode(nodeID types.NodeID, updateFn UpdateNodeFunc) (types.NodeView, bool) { timer := prometheus.NewTimer(nodeStoreOperationDuration.WithLabelValues("update")) defer timer.ObserveDuration() - work := work{ - op: update, - nodeID: nodeID, - updateFn: updateFn, - result: make(chan struct{}), - nodeResult: make(chan types.NodeView, 1), + s.UpdateNodes(map[types.NodeID]UpdateNodeFunc{nodeID: updateFn}) + + nodeStoreOperations.WithLabelValues("update").Inc() + + return s.GetNode(nodeID) +} + +// UpdateNodes applies per-node update functions in a single atomic +// batch. The election that recomputes primary routes runs once, after +// every update has landed, so callers cannot observe an intermediate +// snapshot where only some of the updates are visible. Use this when +// the order in which two writers' updates are individually published +// would change the election outcome — e.g. the HA prober applying +// concurrent probe-timeout results. +func (s *NodeStore) UpdateNodes(updates map[types.NodeID]UpdateNodeFunc) { + timer := prometheus.NewTimer(nodeStoreOperationDuration.WithLabelValues("update_multi")) + defer timer.ObserveDuration() + + if len(updates) == 0 { + return + } + + w := work{ + op: updateMulti, + multiUpdates: updates, + result: make(chan struct{}), } nodeStoreQueueDepth.Inc() - s.writeQueue <- work + s.writeQueue <- w - <-work.result + <-w.result nodeStoreQueueDepth.Dec() - resultNode := <-work.nodeResult - - nodeStoreOperations.WithLabelValues("update").Inc() - - // Return the node and whether it exists (is valid) - return resultNode, resultNode.Valid() + nodeStoreOperations.WithLabelValues("update_multi").Inc() } // DeleteNode removes a node from the store by its ID. @@ -405,20 +419,21 @@ func (s *NodeStore) applyBatch(batch []work) { if w.nodeResult != nil { nodeResultRequests[w.nodeID] = append(nodeResultRequests[w.nodeID], w) } - case update: - // Update the specific node identified by nodeID - if n, exists := nodes[w.nodeID]; exists { + case updateMulti: + for id, fn := range w.multiUpdates { + n, exists := nodes[id] + if !exists { + continue + } + oldGivenName := n.GivenName - w.updateFn(&n) + fn(&n) if n.GivenName != oldGivenName { n.GivenName = resolveGivenName(nodes, n.ID, n.GivenName) } - nodes[w.nodeID] = n - } - if w.nodeResult != nil { - nodeResultRequests[w.nodeID] = append(nodeResultRequests[w.nodeID], w) + nodes[id] = n } case del: delete(nodes, w.nodeID) diff --git a/hscontrol/state/primaries_test.go b/hscontrol/state/primaries_test.go index ab6116a3..c854f6da 100644 --- a/hscontrol/state/primaries_test.go +++ b/hscontrol/state/primaries_test.go @@ -72,7 +72,7 @@ func (f *primariesFixture) disconnect(id types.NodeID) { }) } -// unhealthy mirrors State.SetNodeUnhealthy(id, true). +// unhealthy mirrors State.SetNodeHealth(id, false). func (f *primariesFixture) unhealthy(id types.NodeID) { f.t.Helper() f.ns.UpdateNode(id, func(n *types.Node) { @@ -80,7 +80,7 @@ func (f *primariesFixture) unhealthy(id types.NodeID) { }) } -// healthy mirrors State.SetNodeUnhealthy(id, false). +// healthy mirrors State.SetNodeHealth(id, true). func (f *primariesFixture) healthy(id types.NodeID) { f.t.Helper() f.ns.UpdateNode(id, func(n *types.Node) { diff --git a/hscontrol/state/state.go b/hscontrol/state/state.go index 5ff95e2c..be824b36 100644 --- a/hscontrol/state/state.go +++ b/hscontrol/state/state.go @@ -1222,35 +1222,64 @@ func (s *State) IsNodeHealthy(id types.NodeID) bool { return s.nodeStore.IsNodeHealthy(id) } -// SetNodeUnhealthy flips the runtime Unhealthy bit and reports whether -// the resulting primary route assignment changed, so the HA prober can -// decide whether to fan out a PolicyChange. -// -// A request to mark a node Unhealthy is dropped if the node is no -// longer an HA candidate (offline or no approved routes). The prober -// reads HANodes() at the start of a probe cycle and writes back after -// the timeout fires; in that window the node may have left the -// candidate set, and the bit would just be stale. The check happens +// SetNodeHealth flips the runtime health bit for one node and reports +// whether the resulting primary-route assignment changed, so the HA +// prober can decide whether to fan out a PolicyChange. true means +// healthy; false means unhealthy. An unhealthy mark is dropped when +// the node is no longer an HA candidate (offline or no approved +// routes) — between probe dispatch and result the node may have left +// candidacy, and the bit would just be stale. The check happens // inside the writer goroutine so it serialises against the -// SetApprovedRoutes / Disconnect that removed candidacy. -func (s *State) SetNodeUnhealthy(id types.NodeID, unhealthy bool) bool { +// SetApprovedRoutes / Disconnect that removed candidacy. Single-node +// convenience wrapper around [State.BatchSetNodeHealth]. +func (s *State) SetNodeHealth(id types.NodeID, healthy bool) bool { + return s.BatchSetNodeHealth(map[types.NodeID]bool{id: healthy}) +} + +// BatchSetNodeHealth applies a set of health updates atomically: the +// election runs once after every flag has been flipped, so observers +// never see an intermediate snapshot. Returns true when the +// primary-route assignment differs from before the batch so callers +// can gate a single PolicyChange dispatch. Map value true = healthy; +// false = unhealthy (gated as in [State.SetNodeHealth]). +// +// Per-call publication would let a writer applying two flips +// back-to-back elect a node that the next snapshot demotes, +// momentarily pointing peers at the wrong primary; the batched form +// closes that window. +func (s *State) BatchSetNodeHealth(updates map[types.NodeID]bool) bool { + if len(updates) == 0 { + return false + } + prevRoutes := s.nodeStore.PrimaryRoutes() - _, ok := s.nodeStore.UpdateNode(id, func(n *types.Node) { - if unhealthy { + fns := make(map[types.NodeID]UpdateNodeFunc, len(updates)) + for id, healthy := range updates { + fns[id] = healthSetter(healthy) + } + + s.nodeStore.UpdateNodes(fns) + + return !maps.Equal(prevRoutes, s.nodeStore.PrimaryRoutes()) +} + +// healthSetter returns an UpdateNodeFunc that flips n.Unhealthy to +// the inverse of healthy, with the same gate as [State.SetNodeHealth]: +// an unhealthy mark only sticks when the node is still online and +// still advertises approved routes, so a node that left HA candidacy +// between probe dispatch and result does not carry a stale bit. +func healthSetter(healthy bool) UpdateNodeFunc { + return func(n *types.Node) { + if !healthy { online := n.IsOnline != nil && *n.IsOnline if !online || len(n.AllApprovedRoutes()) == 0 { return } } - n.Unhealthy = unhealthy - }) - if !ok { - return false + n.Unhealthy = !healthy } - - return !maps.Equal(prevRoutes, s.nodeStore.PrimaryRoutes()) } // ValidateAPIKey checks if an API key is valid and active.