mirror of
https://github.com/juanfont/headscale.git
synced 2026-05-24 02:58:42 +09:00
state: defer HA failover when probe target reconnected mid-cycle
The HA prober dispatches a PingRequest, waits ProbeTimeout (5s), and marks the node unhealthy if no callback arrives. A node that bounced its poll session between probe cycles satisfies two conditions that conspire to fail TestHASubnetRouterFailover: a probe queued against the previous session is silently dropped when the worker writes to the closed connection (timeout always fires), and a probe sent immediately after reconnect lands while wgengine is still rebuilding magicsock state from the new netmap. Either path installs a spurious unhealthy bit, which sends the preserved-primary anti-flap the wrong way. Record the session observed at dispatch time and drop the timeout path if the node reconnected since. Require the session to survive a full probe cycle before a timeout can drive a failover.
This commit is contained in:
@@ -283,6 +283,77 @@ func TestHAHealthProbe_SetUnhealthyNoRoutesIsNoOp(t *testing.T) {
|
||||
"SetNodeUnhealthy on node with no approved routes should be a no-op")
|
||||
}
|
||||
|
||||
// TestHAHealthProbe_ReconnectDuringProbeKeepsHealthy reproduces the
|
||||
// race that surfaced as a TestHASubnetRouterFailover flake: a probe
|
||||
// dispatched against the previous poll session sees the timeout fire
|
||||
// while the client is briefly disconnected. With the session guard in
|
||||
// [HAHealthProber.ProbeOnce], the timeout path observes the reconnect
|
||||
// and bails out instead of installing a spurious Unhealthy bit.
|
||||
//
|
||||
// Without the guard, the primary fails over to the standby and the
|
||||
// anti-flap election preserves that choice even after the original
|
||||
// primary is fully back online.
|
||||
func TestHAHealthProbe_ReconnectDuringProbeKeepsHealthy(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
srv := servertest.NewServer(t)
|
||||
user := srv.CreateUser(t, "ha-probe-reconnect")
|
||||
|
||||
route := netip.MustParsePrefix("10.102.0.0/24")
|
||||
|
||||
c1 := servertest.NewClient(t, srv, "ha-pr-r1", servertest.WithUser(user))
|
||||
c2 := servertest.NewClient(t, srv, "ha-pr-r2", servertest.WithUser(user))
|
||||
|
||||
c1.WaitForPeers(t, 1, 10*time.Second)
|
||||
c2.WaitForPeers(t, 1, 10*time.Second)
|
||||
|
||||
nodeID1 := advertiseAndApproveRoute(t, srv, c1, route)
|
||||
advertiseAndApproveRoute(t, srv, c2, route)
|
||||
|
||||
// Node 1 is primary (lowest ID, healthy).
|
||||
require.Contains(t,
|
||||
srv.State().GetNodePrimaryRoutes(nodeID1), route,
|
||||
"node 1 should be primary initially")
|
||||
|
||||
prober := state.NewHAHealthProber(
|
||||
srv.State(),
|
||||
types.HARouteConfig{
|
||||
ProbeInterval: 30 * time.Second,
|
||||
ProbeTimeout: 2 * time.Second,
|
||||
},
|
||||
srv.URL,
|
||||
srv.App.MapBatcher().IsConnected,
|
||||
)
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
|
||||
defer cancel()
|
||||
|
||||
// TestClient does not implement ping responses, so every probe
|
||||
// times out. We exploit that to observe the timeout path under a
|
||||
// reconnect race: kick a probe in a goroutine, bounce the
|
||||
// primary's poll session, and confirm the prober drops the stale
|
||||
// timeout instead of marking the node unhealthy.
|
||||
done := make(chan struct{})
|
||||
|
||||
go func() {
|
||||
defer close(done)
|
||||
|
||||
prober.ProbeOnce(ctx, srv.App.Change)
|
||||
}()
|
||||
|
||||
c1.Disconnect(t)
|
||||
c1.Reconnect(t)
|
||||
c1.WaitForPeers(t, 1, 10*time.Second)
|
||||
|
||||
<-done
|
||||
|
||||
assert.True(t, srv.State().IsNodeHealthy(nodeID1),
|
||||
"reconnect during probe must not flip node unhealthy")
|
||||
assert.Contains(t,
|
||||
srv.State().GetNodePrimaryRoutes(nodeID1), route,
|
||||
"node 1 should remain primary after stale-probe timeout")
|
||||
}
|
||||
|
||||
// TestHAHealthProbe_NoHARoutes verifies that the prober is a no-op
|
||||
// when no HA configuration exists.
|
||||
func TestHAHealthProbe_NoHARoutes(t *testing.T) {
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
"github.com/juanfont/headscale/hscontrol/types"
|
||||
"github.com/juanfont/headscale/hscontrol/types/change"
|
||||
"github.com/juanfont/headscale/hscontrol/util/zlog/zf"
|
||||
"github.com/puzpuzpuz/xsync/v4"
|
||||
"github.com/rs/zerolog/log"
|
||||
"tailscale.com/tailcfg"
|
||||
"tailscale.com/util/set"
|
||||
@@ -20,6 +21,11 @@ type HAHealthProber struct {
|
||||
cfg types.HARouteConfig
|
||||
serverURL string
|
||||
isConnected func(types.NodeID) bool
|
||||
|
||||
// lastStableSession defers a timeout-driven unhealthy decision
|
||||
// for sessions younger than one probe cycle, giving wgengine
|
||||
// time to apply the new netmap on a freshly reconnected node.
|
||||
lastStableSession *xsync.Map[types.NodeID, uint64]
|
||||
}
|
||||
|
||||
// NewHAHealthProber creates a prober that uses the given State for
|
||||
@@ -32,34 +38,68 @@ func NewHAHealthProber(
|
||||
isConnected func(types.NodeID) bool,
|
||||
) *HAHealthProber {
|
||||
return &HAHealthProber{
|
||||
state: s,
|
||||
cfg: cfg,
|
||||
serverURL: serverURL,
|
||||
isConnected: isConnected,
|
||||
state: s,
|
||||
cfg: cfg,
|
||||
serverURL: serverURL,
|
||||
isConnected: isConnected,
|
||||
lastStableSession: xsync.NewMap[types.NodeID, uint64](),
|
||||
}
|
||||
}
|
||||
|
||||
// ProbeOnce pings all HA subnet router nodes. PingNode changes are
|
||||
// dispatched immediately via dispatch so nodes can respond before the
|
||||
// timeout. Health-related policy changes are also dispatched inline.
|
||||
// markSessionStable records session and returns true iff the same
|
||||
// value was already present from a prior cycle.
|
||||
func (p *HAHealthProber) markSessionStable(id types.NodeID, session uint64) bool {
|
||||
prev, loaded := p.lastStableSession.LoadAndStore(id, session)
|
||||
return loaded && prev == session
|
||||
}
|
||||
|
||||
// forgetSession drops the recorded session so a node returning to
|
||||
// HA candidacy starts fresh.
|
||||
func (p *HAHealthProber) forgetSession(id types.NodeID) {
|
||||
p.lastStableSession.Delete(id)
|
||||
}
|
||||
|
||||
// ProbeOnce pings all HA subnet router nodes and dispatches health
|
||||
// changes inline. A timeout that fires after the node reconnected,
|
||||
// or against a session younger than one probe cycle, is dropped so
|
||||
// wgengine has time to apply the new netmap before a failover.
|
||||
func (p *HAHealthProber) ProbeOnce(
|
||||
ctx context.Context,
|
||||
dispatch func(...change.Change),
|
||||
) {
|
||||
haNodes := p.state.nodeStore.HANodes()
|
||||
|
||||
// Drop stable-session entries for nodes that are no longer HA
|
||||
// candidates so a future reappearance starts fresh.
|
||||
seen := make(set.Set[types.NodeID])
|
||||
|
||||
for _, nodes := range haNodes {
|
||||
for _, id := range nodes {
|
||||
seen.Add(id)
|
||||
}
|
||||
}
|
||||
|
||||
p.lastStableSession.Range(func(id types.NodeID, _ uint64) bool {
|
||||
if !seen.Contains(id) {
|
||||
p.lastStableSession.Delete(id)
|
||||
}
|
||||
|
||||
return true
|
||||
})
|
||||
|
||||
if len(haNodes) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
// Deduplicate node IDs across prefixes.
|
||||
seen := make(set.Set[types.NodeID])
|
||||
|
||||
var nodeIDs []types.NodeID
|
||||
|
||||
dedup := make(set.Set[types.NodeID])
|
||||
|
||||
for _, nodes := range haNodes {
|
||||
for _, id := range nodes {
|
||||
if !seen.Contains(id) {
|
||||
seen.Add(id)
|
||||
if !dedup.Contains(id) {
|
||||
dedup.Add(id)
|
||||
nodeIDs = append(nodeIDs, id)
|
||||
}
|
||||
}
|
||||
@@ -77,9 +117,19 @@ func (p *HAHealthProber) ProbeOnce(
|
||||
Uint64(zf.NodeID, id.Uint64()).
|
||||
Msg("HA probe: skipping offline node")
|
||||
|
||||
p.forgetSession(id)
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
nv, ok := p.state.GetNodeByID(id)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
probeSession := nv.SessionEpoch()
|
||||
stable := p.markSessionStable(id, probeSession)
|
||||
|
||||
pingID, responseCh := p.state.RegisterPing(id)
|
||||
callbackURL := p.serverURL + "/machine/ping-response?id=" + pingID
|
||||
|
||||
@@ -117,6 +167,30 @@ func (p *HAHealthProber) ProbeOnce(
|
||||
return
|
||||
}
|
||||
|
||||
curr, ok := p.state.GetNodeByID(id)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
if curr.SessionEpoch() != probeSession {
|
||||
log.Debug().
|
||||
Uint64(zf.NodeID, id.Uint64()).
|
||||
Uint64("probe_session", probeSession).
|
||||
Uint64("current_session", curr.SessionEpoch()).
|
||||
Msg("HA probe: node reconnected during probe, skipping")
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
if !stable {
|
||||
log.Debug().
|
||||
Uint64(zf.NodeID, id.Uint64()).
|
||||
Uint64("probe_session", probeSession).
|
||||
Msg("HA probe: probe of fresh session timed out, deferring to next cycle")
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
log.Warn().
|
||||
Uint64(zf.NodeID, id.Uint64()).
|
||||
Dur("timeout", p.cfg.ProbeTimeout).
|
||||
|
||||
Reference in New Issue
Block a user