mirror of
https://github.com/juanfont/headscale.git
synced 2026-05-23 10:42:30 +09:00
integration/dockertestutil: wait for libnetwork settle on reconnect
DisconnectContainerFromNetwork and ReconnectContainerToNetwork returned as soon as the docker API call completed, but libnetwork bridge reprogramming continued for several seconds after. The HA disconnect tests then raced and bounced between healthy and broken bridges. Poll until the container's endpoint is gone (on disconnect) or reconciled (on reconnect), and on the "conflicts with existing route" surface clear the stale subnet route from the netns and retry. Settle is now baked into the primitive so every caller benefits.
This commit is contained in:
@@ -1,11 +1,13 @@
|
||||
package dockertestutil
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log"
|
||||
"net"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/cenkalti/backoff/v5"
|
||||
@@ -14,10 +16,19 @@ import (
|
||||
"github.com/ory/dockertest/v3/docker"
|
||||
)
|
||||
|
||||
var ErrContainerNotFound = errors.New("container not found")
|
||||
var (
|
||||
ErrContainerNotFound = errors.New("container not found")
|
||||
ErrConditionTimeout = errors.New("condition not met within timeout")
|
||||
)
|
||||
|
||||
// retryDockerOp absorbs eventual-consistency races in libnetwork endpoint cleanup.
|
||||
// Pulls its backoff bounds from retry.go so every helper that drives a
|
||||
// docker control-plane call uses the same budget.
|
||||
func retryDockerOp(ctx context.Context, op func() error) error {
|
||||
bo := backoff.NewExponentialBackOff()
|
||||
bo.InitialInterval = DockerOpInitialInterval
|
||||
bo.MaxInterval = DockerOpMaxInterval
|
||||
|
||||
_, err := backoff.Retry(ctx, func() (struct{}, error) {
|
||||
err := op()
|
||||
if err != nil {
|
||||
@@ -25,7 +36,7 @@ func retryDockerOp(ctx context.Context, op func() error) error {
|
||||
}
|
||||
|
||||
return struct{}{}, nil
|
||||
}, backoff.WithBackOff(backoff.NewExponentialBackOff()), backoff.WithMaxElapsedTime(30*time.Second))
|
||||
}, backoff.WithBackOff(bo), backoff.WithMaxElapsedTime(DockerOpMaxElapsedTime))
|
||||
|
||||
return err
|
||||
}
|
||||
@@ -105,46 +116,266 @@ func AddContainerToNetwork(
|
||||
})
|
||||
}
|
||||
|
||||
// DisconnectContainerFromNetwork removes the container from network at
|
||||
// the docker daemon level. Mirrors a physical cable pull: the
|
||||
// container's network interface for that network disappears and any
|
||||
// in-flight TCP connections are left half-open, exactly the failure
|
||||
// mode iptables-based simulations cannot reproduce.
|
||||
// DisconnectContainerFromNetwork detaches the container at the docker
|
||||
// daemon level (cable-pull semantics) and waits for libnetwork to drop
|
||||
// the endpoint before returning — re-attaching during the
|
||||
// reprogramming window otherwise fails with "network is unreachable".
|
||||
func DisconnectContainerFromNetwork(
|
||||
pool *dockertest.Pool,
|
||||
network *dockertest.Network,
|
||||
testContainer string,
|
||||
) error {
|
||||
containers, err := pool.Client.ListContainers(docker.ListContainersOptions{
|
||||
All: true,
|
||||
Filters: map[string][]string{
|
||||
"name": {testContainer},
|
||||
},
|
||||
containerID, err := lookupContainerID(pool, testContainer)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = retryDockerOp(context.Background(), func() error {
|
||||
return pool.Client.DisconnectNetwork(network.Network.ID, docker.NetworkConnectionOptions{
|
||||
Container: containerID,
|
||||
})
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if len(containers) == 0 {
|
||||
return fmt.Errorf("%w: %s", ErrContainerNotFound, testContainer)
|
||||
err = waitNetworkContainerAbsent(pool, network, testContainer, DockerOpMaxElapsedTime)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return retryDockerOp(context.Background(), func() error {
|
||||
return pool.Client.DisconnectNetwork(network.Network.ID, docker.NetworkConnectionOptions{
|
||||
Container: containers[0].ID,
|
||||
})
|
||||
})
|
||||
// libnetwork drops the endpoint from its model before the kernel
|
||||
// netns has flushed the matching route. Re-attach with the sticky
|
||||
// IP otherwise fails with "conflicts with existing route".
|
||||
return waitContainerRouteAbsent(pool, containerID, network, DockerOpMaxElapsedTime)
|
||||
}
|
||||
|
||||
// ReconnectContainerToNetwork is the inverse of
|
||||
// DisconnectContainerFromNetwork — re-attaches the container to the
|
||||
// network so traffic can flow again.
|
||||
// ReconnectContainerToNetwork inverts DisconnectContainerFromNetwork
|
||||
// and waits until libnetwork has wired up a fresh IPv4 address.
|
||||
func ReconnectContainerToNetwork(
|
||||
pool *dockertest.Pool,
|
||||
network *dockertest.Network,
|
||||
testContainer string,
|
||||
) error {
|
||||
return AddContainerToNetwork(pool, network, testContainer)
|
||||
containerID, err := lookupContainerID(pool, testContainer)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = retryDockerOp(context.Background(), func() error {
|
||||
connectErr := pool.Client.ConnectNetwork(network.Network.ID, docker.NetworkConnectionOptions{
|
||||
Container: containerID,
|
||||
})
|
||||
if connectErr != nil && isStaleRouteConflict(connectErr) {
|
||||
// Defensive cleanup: a route survived the netns flush
|
||||
// despite the wait above. Drop subnet routes that point
|
||||
// at the disconnected interface so libnetwork can
|
||||
// reprogram the sticky IP, then let the retry budget
|
||||
// try the ConnectNetwork call again.
|
||||
removeContainerSubnetRoutes(pool, containerID, network)
|
||||
}
|
||||
|
||||
return connectErr
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return waitNetworkContainerPresent(pool, network, testContainer, DockerOpMaxElapsedTime)
|
||||
}
|
||||
|
||||
// lookupContainerID resolves a container name to its docker ID.
|
||||
func lookupContainerID(pool *dockertest.Pool, testContainer string) (string, error) {
|
||||
containers, err := pool.Client.ListContainers(docker.ListContainersOptions{
|
||||
All: true,
|
||||
Filters: map[string][]string{"name": {testContainer}},
|
||||
})
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
if len(containers) == 0 {
|
||||
return "", fmt.Errorf("%w: %s", ErrContainerNotFound, testContainer)
|
||||
}
|
||||
|
||||
return containers[0].ID, nil
|
||||
}
|
||||
|
||||
// DisconnectAndReconnect calls Disconnect followed by Reconnect; both
|
||||
// primitives drive their own libnetwork settle waits.
|
||||
func DisconnectAndReconnect(
|
||||
pool *dockertest.Pool,
|
||||
network *dockertest.Network,
|
||||
testContainer string,
|
||||
) error {
|
||||
err := DisconnectContainerFromNetwork(pool, network, testContainer)
|
||||
if err != nil {
|
||||
return fmt.Errorf("disconnecting %s from %s: %w", testContainer, network.Network.Name, err)
|
||||
}
|
||||
|
||||
err = ReconnectContainerToNetwork(pool, network, testContainer)
|
||||
if err != nil {
|
||||
return fmt.Errorf("reconnecting %s to %s: %w", testContainer, network.Network.Name, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func waitNetworkContainerAbsent(
|
||||
pool *dockertest.Pool,
|
||||
network *dockertest.Network,
|
||||
testContainer string,
|
||||
timeout time.Duration,
|
||||
) error {
|
||||
return pollUntil(timeout, func() (bool, error) {
|
||||
net, err := pool.Client.NetworkInfo(network.Network.ID)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("inspecting network %s: %w", network.Network.Name, err)
|
||||
}
|
||||
|
||||
for _, c := range net.Containers {
|
||||
if c.Name == testContainer || c.Name == "/"+testContainer {
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
|
||||
return true, nil
|
||||
})
|
||||
}
|
||||
|
||||
func waitNetworkContainerPresent(
|
||||
pool *dockertest.Pool,
|
||||
network *dockertest.Network,
|
||||
testContainer string,
|
||||
timeout time.Duration,
|
||||
) error {
|
||||
return pollUntil(timeout, func() (bool, error) {
|
||||
net, err := pool.Client.NetworkInfo(network.Network.ID)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("inspecting network %s: %w", network.Network.Name, err)
|
||||
}
|
||||
|
||||
for _, c := range net.Containers {
|
||||
if (c.Name == testContainer || c.Name == "/"+testContainer) && c.IPv4Address != "" {
|
||||
return true, nil
|
||||
}
|
||||
}
|
||||
|
||||
return false, nil
|
||||
})
|
||||
}
|
||||
|
||||
// waitContainerRouteAbsent polls the container's routing table until no
|
||||
// route remains for the network's IPAM subnet. libnetwork's docker-side
|
||||
// endpoint teardown is asynchronous from the kernel netns flush, and a
|
||||
// surviving route blocks a subsequent reconnect at sticky-IP assignment
|
||||
// with "conflicts with existing route".
|
||||
func waitContainerRouteAbsent(pool *dockertest.Pool, containerID string, network *dockertest.Network, timeout time.Duration) error {
|
||||
subnets := networkSubnets(network)
|
||||
if len(subnets) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
return pollUntil(timeout, func() (bool, error) {
|
||||
stdout, err := execStdout(pool, containerID, []string{"ip", "-4", "route", "show"})
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("inspecting routes in %s: %w", containerID, err)
|
||||
}
|
||||
|
||||
for _, subnet := range subnets {
|
||||
if strings.Contains(stdout, subnet+" ") || strings.HasSuffix(strings.TrimSpace(stdout), subnet) {
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
|
||||
return true, nil
|
||||
})
|
||||
}
|
||||
|
||||
// removeContainerSubnetRoutes drops residue subnet routes in the
|
||||
// container's netns — the leftover that libnetwork's async endpoint
|
||||
// teardown can leave behind.
|
||||
func removeContainerSubnetRoutes(pool *dockertest.Pool, containerID string, network *dockertest.Network) {
|
||||
for _, subnet := range networkSubnets(network) {
|
||||
_, err := execStdout(pool, containerID, []string{"ip", "-4", "route", "del", subnet})
|
||||
if err != nil {
|
||||
log.Printf("removing stale route %s in %s: %v", subnet, containerID, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// isStaleRouteConflict matches the libnetwork 500 raised when a
|
||||
// surviving subnet route blocks sticky-IP reprogramming on reconnect.
|
||||
func isStaleRouteConflict(err error) bool {
|
||||
if err == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
return strings.Contains(err.Error(), "conflicts with existing route")
|
||||
}
|
||||
|
||||
// networkSubnets returns the IPAM-configured subnets for a docker
|
||||
// network. Empty when IPAM is left to docker defaults.
|
||||
func networkSubnets(network *dockertest.Network) []string {
|
||||
out := make([]string, 0, len(network.Network.IPAM.Config))
|
||||
for _, cfg := range network.Network.IPAM.Config {
|
||||
if cfg.Subnet != "" {
|
||||
out = append(out, cfg.Subnet)
|
||||
}
|
||||
}
|
||||
|
||||
return out
|
||||
}
|
||||
|
||||
// execStdout runs a one-shot command in containerID and returns stdout.
|
||||
func execStdout(pool *dockertest.Pool, containerID string, cmd []string) (string, error) {
|
||||
exec, err := pool.Client.CreateExec(docker.CreateExecOptions{
|
||||
Container: containerID,
|
||||
Cmd: cmd,
|
||||
AttachStdout: true,
|
||||
AttachStderr: true,
|
||||
})
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("create exec: %w", err)
|
||||
}
|
||||
|
||||
var stdout, stderr bytes.Buffer
|
||||
|
||||
err = pool.Client.StartExec(exec.ID, docker.StartExecOptions{
|
||||
OutputStream: &stdout,
|
||||
ErrorStream: &stderr,
|
||||
})
|
||||
if err != nil {
|
||||
return stdout.String(), fmt.Errorf("start exec: %w", err)
|
||||
}
|
||||
|
||||
return stdout.String(), nil
|
||||
}
|
||||
|
||||
// pollUntil ticks every DockerOpInitialInterval until check returns
|
||||
// done=true or timeout elapses. A non-nil check error aborts the loop.
|
||||
func pollUntil(timeout time.Duration, check func() (done bool, err error)) error {
|
||||
deadline := time.Now().Add(timeout)
|
||||
|
||||
ticker := time.NewTicker(DockerOpInitialInterval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
done, err := check()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if done {
|
||||
return nil
|
||||
}
|
||||
|
||||
if time.Now().After(deadline) {
|
||||
return fmt.Errorf("%w: %s", ErrConditionTimeout, timeout)
|
||||
}
|
||||
|
||||
<-ticker.C
|
||||
}
|
||||
}
|
||||
|
||||
// RandomFreeHostPort asks the kernel for a free open port that is ready to use.
|
||||
|
||||
12
integration/dockertestutil/retry.go
Normal file
12
integration/dockertestutil/retry.go
Normal file
@@ -0,0 +1,12 @@
|
||||
package dockertestutil
|
||||
|
||||
import "time"
|
||||
|
||||
// Docker control-plane retry policy. MaxElapsedTime sits above the
|
||||
// worst observed libnetwork bridge reprogramming time (~60 s on
|
||||
// contended GHA runners).
|
||||
const (
|
||||
DockerOpInitialInterval = 1 * time.Second
|
||||
DockerOpMaxInterval = 10 * time.Second
|
||||
DockerOpMaxElapsedTime = 90 * time.Second
|
||||
)
|
||||
Reference in New Issue
Block a user