diff --git a/hscontrol/mapper/batcher_test.go b/hscontrol/mapper/batcher_test.go index f621d0b6..13d18bb2 100644 --- a/hscontrol/mapper/batcher_test.go +++ b/hscontrol/mapper/batcher_test.go @@ -1107,102 +1107,6 @@ func TestBatcherWorkQueueBatching(t *testing.T) { } } -// TestBatcherChannelClosingRace tests the fix for the async channel closing -// race condition that previously caused panics and data races. -// -// Enhanced with real database test data, this test simulates rapid node -// reconnections using real registered nodes while processing actual updates. -// The test verifies that channels are closed synchronously and deterministically -// even when real node updates are being processed, ensuring no race conditions -// occur during channel replacement with actual workload. -func XTestBatcherChannelClosingRace(t *testing.T) { - t.Helper() - - for _, batcherFunc := range allBatcherFunctions { - t.Run(batcherFunc.name, func(t *testing.T) { - // Create test environment with real database and nodes - testData, cleanup := setupBatcherWithTestData(t, batcherFunc.fn, 1, 1, 8) - defer cleanup() - - batcher := testData.Batcher - testNode := &testData.Nodes[0] - - var ( - channelIssues int - mutex sync.Mutex - ) - - // Run rapid connect/disconnect cycles with real updates to test channel closing - - for i := range 100 { - var wg sync.WaitGroup - - // First connection - ch1 := make(chan *tailcfg.MapResponse, 1) - - wg.Go(func() { - _ = batcher.AddNode(testNode.n.ID, ch1, tailcfg.CapabilityVersion(100), nil) - }) - - // Add real work during connection chaos - if i%10 == 0 { - batcher.AddWork(change.DERPMap()) - } - - // Rapid second connection - should replace ch1 - ch2 := make(chan *tailcfg.MapResponse, 1) - - wg.Go(func() { - runtime.Gosched() // Yield to introduce timing variability - - _ = batcher.AddNode(testNode.n.ID, ch2, tailcfg.CapabilityVersion(100), nil) - }) - - // Remove second connection - - wg.Go(func() { - runtime.Gosched() // Yield to introduce timing variability - runtime.Gosched() // Extra yield to offset from AddNode - batcher.RemoveNode(testNode.n.ID, ch2) - }) - - wg.Wait() - - // Verify ch1 behavior when replaced by ch2 - // The test is checking if ch1 gets closed/replaced properly - select { - case <-ch1: - // Channel received data or was closed, which is expected - case <-time.After(1 * time.Millisecond): - // If no data received, increment issues counter - mutex.Lock() - - channelIssues++ - - mutex.Unlock() - } - - // Clean up ch2 - select { - case <-ch2: - default: - } - } - - mutex.Lock() - defer mutex.Unlock() - - t.Logf("Channel closing issues: %d out of 100 iterations", channelIssues) - - // The main fix prevents panics and race conditions. Some timing variations - // are acceptable as long as there are no crashes or deadlocks. - if channelIssues > 50 { // Allow some timing variations - t.Errorf("Excessive channel closing issues: %d iterations", channelIssues) - } - }) - } -} - // TestBatcherWorkerChannelSafety tests that worker goroutines handle closed // channels safely without panicking when processing work items. // @@ -1616,521 +1520,6 @@ func TestBatcherConcurrentClients(t *testing.T) { } } -// TestBatcherHighLoadStability tests batcher behavior under high concurrent load -// scenarios with multiple nodes rapidly connecting and disconnecting while -// continuous updates are generated. -// -// This test creates a high-stress environment with many nodes connecting and -// disconnecting rapidly while various types of updates are generated continuously. -// It validates that the system remains stable with no deadlocks, panics, or -// missed updates under sustained high load. The test uses real node data to -// generate authentic update scenarios and tracks comprehensive statistics. -// -//nolint:gocyclo,thelper // complex scalability test scenario -func XTestBatcherScalability(t *testing.T) { - if testing.Short() { - t.Skip("Skipping scalability test in short mode") - } - - // Reduce verbose application logging for cleaner test output - originalLevel := zerolog.GlobalLevel() - defer zerolog.SetGlobalLevel(originalLevel) - - zerolog.SetGlobalLevel(zerolog.ErrorLevel) - - // Full test matrix for scalability testing - nodes := []int{25, 50, 100} // 250, 500, 1000, - - cycles := []int{10, 100} // 500 - bufferSizes := []int{1, 200, 1000} - chaosTypes := []string{"connection", "processing", "mixed"} - - type testCase struct { - name string - nodeCount int - cycles int - bufferSize int - chaosType string - expectBreak bool - description string - } - - testCases := make([]testCase, 0, len(chaosTypes)*len(bufferSizes)*len(cycles)*len(nodes)) - - // Generate all combinations of the test matrix - for _, nodeCount := range nodes { - for _, cycleCount := range cycles { - for _, bufferSize := range bufferSizes { - for _, chaosType := range chaosTypes { - expectBreak := false - // resourceIntensity := float64(nodeCount*cycleCount) / float64(bufferSize) - - // switch chaosType { - // case "processing": - // resourceIntensity *= 1.1 - // case "mixed": - // resourceIntensity *= 1.15 - // } - - // if resourceIntensity > 500000 { - // expectBreak = true - // } else if nodeCount >= 1000 && cycleCount >= 500 && bufferSize <= 1 { - // expectBreak = true - // } else if nodeCount >= 500 && cycleCount >= 500 && bufferSize <= 1 && chaosType == "mixed" { - // expectBreak = true - // } - - name := fmt.Sprintf( - "%s_%dn_%dc_%db", - chaosType, - nodeCount, - cycleCount, - bufferSize, - ) - description := fmt.Sprintf("%s chaos: %d nodes, %d cycles, %d buffers", - chaosType, nodeCount, cycleCount, bufferSize) - - testCases = append(testCases, testCase{ - name: name, - nodeCount: nodeCount, - cycles: cycleCount, - bufferSize: bufferSize, - chaosType: chaosType, - expectBreak: expectBreak, - description: description, - }) - } - } - } - } - - for _, batcherFunc := range allBatcherFunctions { - t.Run(batcherFunc.name, func(t *testing.T) { - for i, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - // Create comprehensive test environment with real data using the specific buffer size for this test case - // Need 1000 nodes for largest test case, all from same user so they can be peers - usersNeeded := max(1, tc.nodeCount/1000) // 1 user per 1000 nodes, minimum 1 - nodesPerUser := tc.nodeCount / usersNeeded - - testData, cleanup := setupBatcherWithTestData( - t, - batcherFunc.fn, - usersNeeded, - nodesPerUser, - tc.bufferSize, - ) - defer cleanup() - - batcher := testData.Batcher - allNodes := testData.Nodes - - t.Logf("[%d/%d] SCALABILITY TEST: %s", i+1, len(testCases), tc.description) - t.Logf( - " Cycles: %d, Buffer Size: %d, Chaos Type: %s", - tc.cycles, - tc.bufferSize, - tc.chaosType, - ) - - // Use provided nodes, limit to requested count - testNodes := allNodes[:min(len(allNodes), tc.nodeCount)] - - tracker := newUpdateTracker() - panicCount := int64(0) - deadlockDetected := false - - startTime := time.Now() - setupTime := time.Since(startTime) - t.Logf( - "Starting scalability test with %d nodes (setup took: %v)", - len(testNodes), - setupTime, - ) - - // Comprehensive stress test - done := make(chan struct{}) - - // Start update consumers for all nodes - for i := range testNodes { - testNodes[i].start() - } - - // Yield to allow tracking goroutines to start - runtime.Gosched() - - // Connect all nodes first so they can see each other as peers - connectedNodes := make(map[types.NodeID]bool) - - var connectedNodesMutex sync.RWMutex - - for i := range testNodes { - node := &testNodes[i] - _ = batcher.AddNode(node.n.ID, node.ch, tailcfg.CapabilityVersion(100), nil) - - connectedNodesMutex.Lock() - - connectedNodes[node.n.ID] = true - - connectedNodesMutex.Unlock() - } - - // Wait for all connections to be established - assert.EventuallyWithT(t, func(c *assert.CollectT) { - for i := range testNodes { - assert.True(c, batcher.IsConnected(testNodes[i].n.ID), "node should be connected") - } - }, 5*time.Second, 50*time.Millisecond, "waiting for nodes to connect") - - batcher.AddWork(change.FullUpdate()) - - // Wait for initial update to propagate - assert.EventuallyWithT(t, func(c *assert.CollectT) { - for i := range testNodes { - assert.GreaterOrEqual(c, atomic.LoadInt64(&testNodes[i].updateCount), int64(1), "should have received initial update") - } - }, 5*time.Second, 50*time.Millisecond, "waiting for initial update") - - go func() { - defer close(done) - - var wg sync.WaitGroup - - t.Logf( - "Starting load generation: %d cycles with %d nodes", - tc.cycles, - len(testNodes), - ) - - // Main load generation - varies by chaos type - for cycle := range tc.cycles { - if cycle%10 == 0 { - t.Logf("Cycle %d/%d completed", cycle, tc.cycles) - } - // Yield for mixed chaos to introduce timing variability - if tc.chaosType == "mixed" && cycle%10 == 0 { - runtime.Gosched() - } - - // For chaos testing, only disconnect/reconnect a subset of nodes - // This ensures some nodes stay connected to continue receiving updates - startIdx := cycle % len(testNodes) - - endIdx := min(startIdx+len(testNodes)/4, len(testNodes)) - - if startIdx >= endIdx { - startIdx = 0 - endIdx = min(len(testNodes)/4, len(testNodes)) - } - - chaosNodes := testNodes[startIdx:endIdx] - if len(chaosNodes) == 0 { - chaosNodes = testNodes[:min(1, len(testNodes))] // At least one node for chaos - } - - // Connection/disconnection cycles for subset of nodes - for i := range chaosNodes { - node := &chaosNodes[i] - // Only add work if this is connection chaos or mixed - if tc.chaosType == "connection" || tc.chaosType == "mixed" { - wg.Add(2) - - // Disconnection first - go func(nodeID types.NodeID, channel chan *tailcfg.MapResponse) { - defer func() { - if r := recover(); r != nil { - atomic.AddInt64(&panicCount, 1) - } - - wg.Done() - }() - - connectedNodesMutex.RLock() - - isConnected := connectedNodes[nodeID] - - connectedNodesMutex.RUnlock() - - if isConnected { - batcher.RemoveNode(nodeID, channel) - connectedNodesMutex.Lock() - - connectedNodes[nodeID] = false - - connectedNodesMutex.Unlock() - } - }( - node.n.ID, - node.ch, - ) - - // Then reconnection - go func(nodeID types.NodeID, channel chan *tailcfg.MapResponse, index int) { - defer func() { - if r := recover(); r != nil { - atomic.AddInt64(&panicCount, 1) - } - - wg.Done() - }() - - // Yield before reconnecting to introduce timing variability - for range index % 3 { - runtime.Gosched() - } - - _ = batcher.AddNode( - nodeID, - channel, - tailcfg.CapabilityVersion(100), - nil, - ) - - connectedNodesMutex.Lock() - - connectedNodes[nodeID] = true - - connectedNodesMutex.Unlock() - - // Add work to create load - if index%5 == 0 { - batcher.AddWork(change.FullUpdate()) - } - }( - node.n.ID, - node.ch, - i, - ) - } - } - - // Concurrent work generation - scales with load - updateCount := min(tc.nodeCount/5, 20) // Scale updates with node count - for i := range updateCount { - wg.Add(1) - - go func(index int) { - defer func() { - if r := recover(); r != nil { - atomic.AddInt64(&panicCount, 1) - } - - wg.Done() - }() - - // Generate different types of work to ensure updates are sent - switch index % 4 { - case 0: - batcher.AddWork(change.FullUpdate()) - case 1: - batcher.AddWork(change.PolicyChange()) - case 2: - batcher.AddWork(change.DERPMap()) - default: - // Pick a random node and generate a node change - if len(testNodes) > 0 { - nodeIdx := index % len(testNodes) - batcher.AddWork( - change.NodeAdded(testNodes[nodeIdx].n.ID), - ) - } else { - batcher.AddWork(change.FullUpdate()) - } - } - }(i) - } - } - - t.Logf("Waiting for all goroutines to complete") - wg.Wait() - t.Logf("All goroutines completed") - }() - - // Wait for completion with timeout and progress monitoring - progressTicker := time.NewTicker(10 * time.Second) - defer progressTicker.Stop() - - select { - case <-done: - t.Logf("Test completed successfully") - case <-time.After(testTimeout): - deadlockDetected = true - // Collect diagnostic information - allStats := tracker.getAllStats() - - totalUpdates := 0 - for _, stats := range allStats { - totalUpdates += stats.TotalUpdates - } - - interimPanics := atomic.LoadInt64(&panicCount) - - t.Logf("TIMEOUT DIAGNOSIS: Test timed out after %v", testTimeout) - t.Logf( - " Progress at timeout: %d total updates, %d panics", - totalUpdates, - interimPanics, - ) - t.Logf( - " Possible causes: deadlock, excessive load, or performance bottleneck", - ) - - // Try to detect if workers are still active - if totalUpdates > 0 { - t.Logf( - " System was processing updates - likely performance bottleneck", - ) - } else { - t.Logf(" No updates processed - likely deadlock or startup issue") - } - } - - // Wait for batcher workers to process all work and send updates - // before disconnecting nodes - assert.EventuallyWithT(t, func(c *assert.CollectT) { - // Check that at least some updates were processed - var totalUpdates int64 - for i := range testNodes { - totalUpdates += atomic.LoadInt64(&testNodes[i].updateCount) - } - - assert.Positive(c, totalUpdates, "should have processed some updates") - }, 5*time.Second, 50*time.Millisecond, "waiting for updates to be processed") - - // Now disconnect all nodes from batcher to stop new updates - for i := range testNodes { - node := &testNodes[i] - batcher.RemoveNode(node.n.ID, node.ch) - } - - // Wait for nodes to be disconnected - assert.EventuallyWithT(t, func(c *assert.CollectT) { - for i := range testNodes { - assert.False(c, batcher.IsConnected(testNodes[i].n.ID), "node should be disconnected") - } - }, 5*time.Second, 50*time.Millisecond, "waiting for nodes to disconnect") - - // Cleanup nodes and get their final stats - totalUpdates := int64(0) - totalPatches := int64(0) - totalFull := int64(0) - maxPeersGlobal := 0 - nodeStatsReport := make([]string, 0, len(testNodes)) - - for i := range testNodes { - node := &testNodes[i] - stats := node.cleanup() - totalUpdates += stats.TotalUpdates - totalPatches += stats.PatchUpdates - - totalFull += stats.FullUpdates - if stats.MaxPeersSeen > maxPeersGlobal { - maxPeersGlobal = stats.MaxPeersSeen - } - - if stats.TotalUpdates > 0 { - nodeStatsReport = append(nodeStatsReport, - fmt.Sprintf( - "Node %d: %d total (%d patch, %d full), max %d peers", - node.n.ID, - stats.TotalUpdates, - stats.PatchUpdates, - stats.FullUpdates, - stats.MaxPeersSeen, - )) - } - } - - // Comprehensive final summary - t.Logf( - "FINAL RESULTS: %d total updates (%d patch, %d full), max peers seen: %d", - totalUpdates, - totalPatches, - totalFull, - maxPeersGlobal, - ) - - if len(nodeStatsReport) <= 10 { // Only log details for smaller tests - for _, report := range nodeStatsReport { - t.Logf(" %s", report) - } - } else { - t.Logf(" (%d nodes had activity, details suppressed for large test)", len(nodeStatsReport)) - } - - // Legacy tracker comparison (optional) - allStats := tracker.getAllStats() - - legacyTotalUpdates := 0 - for _, stats := range allStats { - legacyTotalUpdates += stats.TotalUpdates - } - - if legacyTotalUpdates != int(totalUpdates) { - t.Logf( - "Note: Legacy tracker mismatch - legacy: %d, new: %d", - legacyTotalUpdates, - totalUpdates, - ) - } - - finalPanicCount := atomic.LoadInt64(&panicCount) - - // Validation based on expectation - testPassed := true - - if tc.expectBreak { - // For tests expected to break, we're mainly checking that we don't crash - if finalPanicCount > 0 { - t.Errorf( - "System crashed with %d panics (even breaking point tests shouldn't crash)", - finalPanicCount, - ) - - testPassed = false - } - // Timeout/deadlock is acceptable for breaking point tests - if deadlockDetected { - t.Logf( - "Expected breaking point reached: system overloaded at %d nodes", - len(testNodes), - ) - } - } else { - // For tests expected to pass, validate proper operation - if finalPanicCount > 0 { - t.Errorf("Scalability test failed with %d panics", finalPanicCount) - - testPassed = false - } - - if deadlockDetected { - t.Errorf("Deadlock detected at %d nodes (should handle this load)", len(testNodes)) - - testPassed = false - } - - if totalUpdates == 0 { - t.Error("No updates received - system may be completely stalled") - - testPassed = false - } - } - - // Clear success/failure indication - if testPassed { - t.Logf("PASS: %s | %d nodes, %d updates, 0 panics, no deadlock", - tc.name, len(testNodes), totalUpdates) - } else { - t.Logf("FAIL: %s | %d nodes, %d updates, %d panics, deadlock: %v", - tc.name, len(testNodes), totalUpdates, finalPanicCount, deadlockDetected) - } - }) - } - }) - } -} - // TestBatcherFullPeerUpdates verifies that when multiple nodes are connected // and we send a FullSet update, nodes receive the complete peer list. func TestBatcherFullPeerUpdates(t *testing.T) {