From ab1b22c09b82834840a372ace253bb29ae87d03b Mon Sep 17 00:00:00 2001 From: Ankit Goswami Date: Wed, 27 May 2026 12:14:40 -0700 Subject: [PATCH 1/2] fleetnode: harden enrollment transport + stage install layout - Scheme-aware gateway HTTP client: TLS-validating http2.Transport for https://, h2c for http://. NewGatewayClient and NewAuthenticatedGateway- Client return error so the scheme check has a place to surface. - Per-RPC handshake timeout (var so tests can shorten it). Register wrapped so a blackholed server cannot hang fleetnode enroll while the state lock is held. - State lock: LOCK_EX | LOCK_NB plus owner-PID write under the lock so a contending invocation gets an actionable error instead of blocking. - Shared testutil.NewH2CServer / NewH2CClient helpers; fake_gateway_test serves the agent client over h2c to match production behavior. - just build-fleetnode recipe stages server/.fleetnode/{fleetnode, nmap, plugins/}; .gitignore covers the artifact dir. - README skeleton: install/enroll surface only; run + discovery sections land in later stack PRs. Co-Authored-By: Claude Opus 4.7 (1M context) --- .gitignore | 1 + justfile | 70 ++++++++++---- server/cmd/fleetnode/README.md | 55 +++++++++++ server/cmd/fleetnode/fake_gateway_test.go | 6 +- server/cmd/fleetnode/run.go | 15 +-- server/cmd/fleetnode/run_test.go | 6 +- .../internal/fleetnodebootstrap/authclient.go | 10 +- .../fleetnodebootstrap/authclient_test.go | 18 ++-- server/internal/fleetnodebootstrap/client.go | 50 +++++++--- .../fleetnodebootstrap/client_test.go | 91 ++++++++++++++++--- .../internal/fleetnodebootstrap/enrollment.go | 15 ++- .../fleetnodebootstrap/enrollment_test.go | 35 +++++++ .../internal/fleetnodebootstrap/handshake.go | 25 +++-- .../fleetnodebootstrap/handshake_test.go | 14 +-- .../fleetnodebootstrap/locking_unix.go | 75 ++++++++++++--- server/internal/fleetnodebootstrap/refresh.go | 6 +- server/internal/testutil/h2c.go | 35 +++++++ 17 files changed, 431 insertions(+), 96 deletions(-) create mode 100644 server/cmd/fleetnode/README.md create mode 100644 server/internal/testutil/h2c.go diff --git a/.gitignore b/.gitignore index 89bdf6545..0f275ca25 100644 --- a/.gitignore +++ b/.gitignore @@ -20,6 +20,7 @@ lefthook-local.yml # Binary files server/cmd/fleetd/fleetd server/fleetnode +server/.fleetnode/ server/fake-antminer/fake-antminer server/fake-proto-rig/fake-proto-rig server/miner-debug-cli diff --git a/justfile b/justfile index a4c2462ff..2e2c71eae 100644 --- a/justfile +++ b/justfile @@ -172,10 +172,25 @@ update-go-deps: # --- Packaging --- -# build the fleetnode operator CLI (writes to server/fleetnode) -[working-directory: 'server'] -build-fleetnode: - go build -o ./fleetnode ./cmd/fleetnode +# Build the fleetnode operator CLI into server/.fleetnode/ along with native +# plugins and an nmap symlink so the binary-adjacent defaults in +# `fleetnode run` resolve without flags. Kept separate from server/plugins/ +# because `just dev` puts cross-compiled Linux/arm64 plugins there for the +# Docker server, and the native agent can't exec ELF binaries. +build-fleetnode: (_build-go-plugins-native "server/.fleetnode/plugins") (_asicrs-build "server/.fleetnode/plugins") + #!/usr/bin/env bash + set -euo pipefail + cd server + mkdir -p ./.fleetnode + go build -o ./.fleetnode/fleetnode ./cmd/fleetnode + if NMAP=$(command -v nmap 2>/dev/null); then + ln -sfn "$NMAP" ./.fleetnode/nmap + echo "linked server/.fleetnode/nmap -> $NMAP" + else + rm -f ./.fleetnode/nmap + echo "note: nmap not on PATH; install it (brew install nmap / apt-get install nmap) so the agent finds it at scan time" + fi + echo "agent staged at server/.fleetnode/fleetnode" # build Windows installer [working-directory: 'deployment-files/windows'] @@ -325,30 +340,47 @@ _build-go-plugins-multi-arch: _go-work-sync (cd plugin/antminer && GOOS=linux GOARCH=arm64 go build -o ../../deployment-files/server/antminer-plugin-arm64 .) chmod +x deployment-files/server/*-plugin-* -_asicrs-build: +_asicrs-build outdir="server/plugins": #!/usr/bin/env bash set -euo pipefail - BIN=server/plugins/asicrs-plugin - PLATFORM_MARKER=server/plugins/.asicrs-platform - WANT_PLATFORM="native" + BIN={{outdir}}/asicrs-plugin + PLATFORM_MARKER={{outdir}}/.asicrs-platform + HOST_OS="$(uname -s)" + # Docker on macOS produces a Linux ELF that the host can't exec. Use local + # cargo there; Linux hosts stay on the docker path so CI doesn't need Rust. + if [ "$HOST_OS" = "Darwin" ]; then + WANT_PLATFORM="darwin-native" + else + WANT_PLATFORM="native" + fi if [ -f "$BIN" ] \ && [ -f "$PLATFORM_MARKER" ] && [ "$(cat "$PLATFORM_MARKER")" = "$WANT_PLATFORM" ] \ && [ -z "$(find plugin/asicrs sdk/rust server/sdk/v1/pb -newer "$BIN" -type f 2>/dev/null | head -1)" ]; then echo "asicrs plugin up to date, skipping build." exit 0 fi - echo "Building asicrs plugin..." - mkdir -p server/plugins - CACHE_ARGS=() - if [ -n "${GITHUB_ACTIONS:-}" ]; then - CACHE_ARGS+=(--cache-from 'type=gha,scope=asicrs-native') - CACHE_ARGS+=(--cache-to 'type=gha,mode=max,scope=asicrs-native') + echo "Building asicrs plugin ($WANT_PLATFORM)..." + mkdir -p {{outdir}} + if [ "$HOST_OS" = "Darwin" ]; then + if ! command -v cargo >/dev/null 2>&1; then + echo "cargo not on PATH; install Rust (https://rustup.rs/) to build asicrs natively on macOS" >&2 + exit 1 + fi + (cd plugin/asicrs && cargo build --release) + cp plugin/asicrs/target/release/asicrs-plugin "$BIN" + cp plugin/asicrs/config.yaml {{outdir}}/asicrs-config.yaml + else + CACHE_ARGS=() + if [ -n "${GITHUB_ACTIONS:-}" ]; then + CACHE_ARGS+=(--cache-from 'type=gha,scope=asicrs-native') + CACHE_ARGS+=(--cache-to 'type=gha,mode=max,scope=asicrs-native') + fi + docker buildx build \ + ${CACHE_ARGS[@]+"${CACHE_ARGS[@]}"} \ + --file plugin/asicrs/Dockerfile.build \ + --output type=local,dest={{outdir}} \ + . fi - docker buildx build \ - ${CACHE_ARGS[@]+"${CACHE_ARGS[@]}"} \ - --file plugin/asicrs/Dockerfile.build \ - --output type=local,dest=server/plugins \ - . chmod +x "$BIN" # buildx --output type=local preserves the in-image mtime; touch so freshness checks see "now". touch "$BIN" diff --git a/server/cmd/fleetnode/README.md b/server/cmd/fleetnode/README.md new file mode 100644 index 000000000..6fd072bf5 --- /dev/null +++ b/server/cmd/fleetnode/README.md @@ -0,0 +1,55 @@ +# fleetnode + +`fleetnode` is the on-prem agent that enrolls with a fleet server, holds a session, and (in later stack PRs) opens a `ControlStream` to execute server-issued discovery commands against the operator's local network. + +## Subcommands + +| Command | Purpose | +|---------|---------| +| `fleetnode enroll` | Register with a fleet server using a one-time enrollment code. Persists keys and `api_key`. See [enroll.go](enroll.go). | +| `fleetnode status` | Print local state (server URL, fleet_node_id, fingerprint, session expiry). See [status.go](status.go). | +| `fleetnode refresh` | Renew the session token using the stored `api_key`. See [refresh.go](refresh.go). | + +`fleetnode run` lands in the run-heartbeat PR. + +## State directory and lock + +State lives in `state.yaml` under one of, in order: + +1. `--state-dir ` (override; primarily for tests) +2. `$XDG_STATE_HOME/fleetnode` +3. `~/.local/state/fleetnode` + +The file holds `server_url`, `fleet_node_id`, `identity_fingerprint`, both keypairs, the `api_key`, and the current `session_token`. It is created `0600` under a `0700` directory. See [state.go](../../internal/fleetnodebootstrap/state.go). + +A `state.lock` file in the same directory serializes commands. Only one process may hold it at a time (`LOCK_EX | LOCK_NB`). The PID of the holder is written under the lock; if another invocation hits contention, the error includes that PID. To clear a stale lock, identify the owner with `ps -p `; remove the lock file only if the process is gone. + +## Build + +```bash +just build-fleetnode # produces server/.fleetnode/{fleetnode, nmap, plugins/} +go build -o fleetnode ./server/cmd/fleetnode # fast iteration +``` + +The staged layout reserves a `plugins/` subdirectory and a `nmap` symlink for the discovery PR. The agent does not yet exec anything from those locations. + +## Enrollment flow + +1. Operator mints an enrollment code in the UI and shares it. +2. `fleetnode enroll --server-url=...` prompts for the code, registers, prints a fingerprint. +3. Operator verifies the fingerprint in the UI and clicks confirm; the UI displays the `api_key`. +4. Agent prompts for the `api_key`, completes the handshake, persists session. + +If anything is interrupted between Register and Complete, `fleetnode refresh` resumes from the persisted state. + +## Security model + +- **Transport.** `ValidateServerURL` requires `https://` for non-loopback servers; `--allow-insecure-transport` permits `http://` for testing only. The HTTP/2 transport is scheme-aware: `https://` goes through a TLS-validating `http2.Transport`, `http://` goes through h2c. See [client.go](../../internal/fleetnodebootstrap/client.go). +- **State file.** `state.yaml` is `0600` under a `0700` directory; the writer fsyncs the temp file, renames, then fsyncs the directory. Symlinks at the state dir leaf are refused. +- **Lock contention.** PID is written under the lock so contention reports are actionable. + +## Development + +```bash +go test ./server/internal/fleetnodebootstrap/... -race -count=1 +``` diff --git a/server/cmd/fleetnode/fake_gateway_test.go b/server/cmd/fleetnode/fake_gateway_test.go index 29e55e02e..b1b1ff98b 100644 --- a/server/cmd/fleetnode/fake_gateway_test.go +++ b/server/cmd/fleetnode/fake_gateway_test.go @@ -16,8 +16,8 @@ import ( pb "github.com/block/proto-fleet/server/generated/grpc/fleetnodegateway/v1" "github.com/block/proto-fleet/server/generated/grpc/fleetnodegateway/v1/fleetnodegatewayv1connect" - "github.com/block/proto-fleet/server/internal/fleetnodebootstrap" + "github.com/block/proto-fleet/server/internal/testutil" ) type fakeFleetNodeGateway struct { @@ -125,7 +125,5 @@ func newFakeServer(t *testing.T, fake *fakeFleetNodeGateway) *httptest.Server { mux := http.NewServeMux() path, h := fleetnodegatewayv1connect.NewFleetNodeGatewayServiceHandler(fake) mux.Handle(path, h) - srv := httptest.NewServer(mux) - t.Cleanup(srv.Close) - return srv + return testutil.NewH2CServer(t, mux) } diff --git a/server/cmd/fleetnode/run.go b/server/cmd/fleetnode/run.go index ba8bbb1c3..a529c61ef 100644 --- a/server/cmd/fleetnode/run.go +++ b/server/cmd/fleetnode/run.go @@ -27,10 +27,10 @@ const ( type RunCmd struct { HeartbeatInterval time.Duration `name:"heartbeat-interval" default:"30s" help:"interval between UploadHeartbeat calls"` - now func() time.Time `kong:"-"` - clientFactory func(serverURL string, tokenSource func() string) gatewayClient `kong:"-"` - signals []os.Signal `kong:"-"` - parentCtx context.Context `kong:"-"` //nolint:containedctx // test seam for daemon shutdown without OS signals + now func() time.Time `kong:"-"` + clientFactory func(serverURL string, tokenSource func() string) (gatewayClient, error) `kong:"-"` + signals []os.Signal `kong:"-"` + parentCtx context.Context `kong:"-"` //nolint:containedctx // test seam for daemon shutdown without OS signals } type gatewayClient interface { @@ -49,7 +49,7 @@ func (r *RunCmd) run(c *Context, stderr io.Writer) error { r.now = func() time.Time { return time.Now().UTC() } } if r.clientFactory == nil { - r.clientFactory = func(url string, src func() string) gatewayClient { + r.clientFactory = func(url string, src func() string) (gatewayClient, error) { return fleetnodebootstrap.NewAuthenticatedGatewayClient(url, src) } } @@ -107,7 +107,10 @@ func (r *RunCmd) runLocked(c *Context, logger *slog.Logger) error { } } - client := r.clientFactory(st.ServerURL, func() string { return st.SessionToken }) + client, err := r.clientFactory(st.ServerURL, func() string { return st.SessionToken }) + if err != nil { + return err + } logger.Info("daemon started", "fleet_node_id", st.FleetNodeID, diff --git a/server/cmd/fleetnode/run_test.go b/server/cmd/fleetnode/run_test.go index bacc1fd5b..b3c86c5c6 100644 --- a/server/cmd/fleetnode/run_test.go +++ b/server/cmd/fleetnode/run_test.go @@ -96,7 +96,7 @@ func TestRunCmd_HappyPathThreeTicks(t *testing.T) { cmd := &RunCmd{ HeartbeatInterval: 5 * time.Millisecond, parentCtx: parent, - clientFactory: func(_ string, _ func() string) gatewayClient { return stub }, + clientFactory: func(_ string, _ func() string) (gatewayClient, error) { return stub, nil }, } done := make(chan error, 1) @@ -327,7 +327,7 @@ func TestRunCmd_ValidatesServerURLBeforeBuildingClient(t *testing.T) { stub := &stubGatewayClient{} cmd := &RunCmd{ HeartbeatInterval: time.Second, - clientFactory: func(_ string, _ func() string) gatewayClient { return stub }, + clientFactory: func(_ string, _ func() string) (gatewayClient, error) { return stub, nil }, } // Act @@ -353,7 +353,7 @@ func TestRunCmd_ExitsOnCodeNotFoundHeartbeat(t *testing.T) { } cmd := &RunCmd{ HeartbeatInterval: 5 * time.Millisecond, - clientFactory: func(_ string, _ func() string) gatewayClient { return stub }, + clientFactory: func(_ string, _ func() string) (gatewayClient, error) { return stub, nil }, } // Act diff --git a/server/internal/fleetnodebootstrap/authclient.go b/server/internal/fleetnodebootstrap/authclient.go index 73bda9c3b..4b9384cff 100644 --- a/server/internal/fleetnodebootstrap/authclient.go +++ b/server/internal/fleetnodebootstrap/authclient.go @@ -13,12 +13,16 @@ import ( // tokenSource is invoked per-call so a daemon that mutates its own // state.SessionToken via Refresh picks up the new value on the next // request without rebuilding the client. -func NewAuthenticatedGatewayClient(serverURL string, tokenSource func() string) fleetnodegatewayv1connect.FleetNodeGatewayServiceClient { +func NewAuthenticatedGatewayClient(serverURL string, tokenSource func() string) (fleetnodegatewayv1connect.FleetNodeGatewayServiceClient, error) { + httpClient, err := newGatewayHTTPClient(serverURL) + if err != nil { + return nil, err + } return fleetnodegatewayv1connect.NewFleetNodeGatewayServiceClient( - newGatewayHTTPClient(), + httpClient, serverURL, connect.WithInterceptors(bearerInterceptor(tokenSource)), - ) + ), nil } func bearerInterceptor(tokenSource func() string) connect.Interceptor { diff --git a/server/internal/fleetnodebootstrap/authclient_test.go b/server/internal/fleetnodebootstrap/authclient_test.go index 971e448ec..f39f07314 100644 --- a/server/internal/fleetnodebootstrap/authclient_test.go +++ b/server/internal/fleetnodebootstrap/authclient_test.go @@ -3,7 +3,6 @@ package fleetnodebootstrap import ( "context" "net/http" - "net/http/httptest" "sync" "testing" "time" @@ -15,6 +14,7 @@ import ( pb "github.com/block/proto-fleet/server/generated/grpc/fleetnodegateway/v1" "github.com/block/proto-fleet/server/generated/grpc/fleetnodegateway/v1/fleetnodegatewayv1connect" + "github.com/block/proto-fleet/server/internal/testutil" ) type captureGateway struct { @@ -47,15 +47,15 @@ func TestAuthenticatedClient_AttachesBearerHeaderPerCall(t *testing.T) { mux := http.NewServeMux() path, h := fleetnodegatewayv1connect.NewFleetNodeGatewayServiceHandler(fake) mux.Handle(path, h) - srv := httptest.NewServer(mux) - t.Cleanup(srv.Close) + srv := testutil.NewH2CServer(t, mux) var token string - client := NewAuthenticatedGatewayClient(srv.URL, func() string { return token }) + client, err := NewAuthenticatedGatewayClient(srv.URL, func() string { return token }) + require.NoError(t, err) // Act token = "t1" - _, err := client.UploadHeartbeat(context.Background(), connect.NewRequest(&pb.UploadHeartbeatRequest{SentAt: timestamppb.Now()})) + _, err = client.UploadHeartbeat(context.Background(), connect.NewRequest(&pb.UploadHeartbeatRequest{SentAt: timestamppb.Now()})) require.NoError(t, err) token = "t2" _, err = client.UploadHeartbeat(context.Background(), connect.NewRequest(&pb.UploadHeartbeatRequest{SentAt: timestamppb.Now()})) @@ -72,14 +72,14 @@ func TestAuthenticatedClient_RejectsEmptyToken(t *testing.T) { t.Parallel() // Arrange - srv := httptest.NewServer(http.NewServeMux()) - t.Cleanup(srv.Close) - client := NewAuthenticatedGatewayClient(srv.URL, func() string { return "" }) + srv := testutil.NewH2CServer(t, http.NewServeMux()) + client, err := NewAuthenticatedGatewayClient(srv.URL, func() string { return "" }) + require.NoError(t, err) // Act ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() - _, err := client.UploadHeartbeat(ctx, connect.NewRequest(&pb.UploadHeartbeatRequest{SentAt: timestamppb.Now()})) + _, err = client.UploadHeartbeat(ctx, connect.NewRequest(&pb.UploadHeartbeatRequest{SentAt: timestamppb.Now()})) // Assert require.Error(t, err) diff --git a/server/internal/fleetnodebootstrap/client.go b/server/internal/fleetnodebootstrap/client.go index d65e2222c..4db33a28b 100644 --- a/server/internal/fleetnodebootstrap/client.go +++ b/server/internal/fleetnodebootstrap/client.go @@ -1,29 +1,57 @@ package fleetnodebootstrap import ( + "context" + "crypto/tls" "errors" + "fmt" + "net" "net/http" - "time" + "net/url" + + "golang.org/x/net/http2" "github.com/block/proto-fleet/server/generated/grpc/fleetnodegateway/v1/fleetnodegatewayv1connect" ) -const httpClientTimeout = 30 * time.Second - -// Refusing every 30x stops a downgrade redirect from replaying the POST body -// (enrollment token, api_key, signature) to a plaintext target; Connect-RPC -// itself never expects redirects. +// Refusing 3xx stops a downgrade redirect from replaying the signed POST +// body (enrollment token, api_key, signature) to an attacker-chosen target. var errRedirectNotAllowed = errors.New("redirects are not allowed for connect-rpc calls") -func newGatewayHTTPClient() *http.Client { +// A shared AllowHTTP+DialTLSContext shim would silently downgrade https to +// plaintext, defeating ValidateServerURL's https-required policy. +func newGatewayHTTPClient(serverURL string) (*http.Client, error) { + u, err := url.Parse(serverURL) + if err != nil { + return nil, fmt.Errorf("parse server-url: %w", err) + } + var tr http.RoundTripper + switch u.Scheme { + case "http": + tr = &http2.Transport{ + AllowHTTP: true, + DialTLSContext: func(ctx context.Context, network, addr string, _ *tls.Config) (net.Conn, error) { + var d net.Dialer + return d.DialContext(ctx, network, addr) + }, + } + case "https": + tr = &http2.Transport{} + default: + return nil, fmt.Errorf("server-url scheme must be http or https; got %q", u.Scheme) + } return &http.Client{ - Timeout: httpClientTimeout, CheckRedirect: func(_ *http.Request, _ []*http.Request) error { return errRedirectNotAllowed }, - } + Transport: tr, + }, nil } -func NewGatewayClient(serverURL string) fleetnodegatewayv1connect.FleetNodeGatewayServiceClient { - return fleetnodegatewayv1connect.NewFleetNodeGatewayServiceClient(newGatewayHTTPClient(), serverURL) +func NewGatewayClient(serverURL string) (fleetnodegatewayv1connect.FleetNodeGatewayServiceClient, error) { + httpClient, err := newGatewayHTTPClient(serverURL) + if err != nil { + return nil, err + } + return fleetnodegatewayv1connect.NewFleetNodeGatewayServiceClient(httpClient, serverURL), nil } diff --git a/server/internal/fleetnodebootstrap/client_test.go b/server/internal/fleetnodebootstrap/client_test.go index aad253c57..c9c4cec04 100644 --- a/server/internal/fleetnodebootstrap/client_test.go +++ b/server/internal/fleetnodebootstrap/client_test.go @@ -1,6 +1,8 @@ package fleetnodebootstrap import ( + "crypto/tls" + "crypto/x509" "net/http" "net/http/httptest" "strings" @@ -8,8 +10,81 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "golang.org/x/net/http2" + + "github.com/block/proto-fleet/server/internal/testutil" ) +func newHTTP2TestServer(t *testing.T) *httptest.Server { + t.Helper() + srv := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusOK) + })) + srv.EnableHTTP2 = true + srv.StartTLS() + t.Cleanup(srv.Close) + return srv +} + +// Regression coverage for the prior DialTLSContext shim that quietly +// bypassed TLS for https:// URLs. +func TestGatewayHTTPClient_HTTPS_ValidatesCertificate(t *testing.T) { + t.Parallel() + + // Arrange + srv := newHTTP2TestServer(t) + trusted, err := newGatewayHTTPClient(srv.URL) + require.NoError(t, err) + tlsTransport, ok := trusted.Transport.(*http2.Transport) + require.True(t, ok, "https client must use http2.Transport") + pool := x509.NewCertPool() + pool.AddCert(srv.Certificate()) + tlsTransport.TLSClientConfig = &tls.Config{RootCAs: pool, MinVersion: tls.VersionTLS12} + + // Act + resp, err := trusted.Get(srv.URL) + + // Assert + require.NoError(t, err) + require.NotNil(t, resp) + _ = resp.Body.Close() + assert.Equal(t, http.StatusOK, resp.StatusCode) +} + +func TestGatewayHTTPClient_HTTPS_RejectsUntrustedCertificate(t *testing.T) { + t.Parallel() + + // Arrange + srv := newHTTP2TestServer(t) + client, err := newGatewayHTTPClient(srv.URL) + require.NoError(t, err) + + // Act + resp, err := client.Get(srv.URL) //nolint:bodyclose // request fails; resp is nil + + // Assert + require.Error(t, err) + assert.Nil(t, resp) + assert.Contains(t, err.Error(), "x509", "expected x509 cert validation error, got %v", err) +} + +func TestNewGatewayClient_RejectsNonHTTPScheme(t *testing.T) { + t.Parallel() + + cases := []string{"ftp://example.com", "://no-scheme", "ws://example.com"} + for _, target := range cases { + t.Run(target, func(t *testing.T) { + t.Parallel() + + // Act + _, err := NewGatewayClient(target) + + // Assert + require.Error(t, err) + }) + } +} + func TestGatewayHTTPClient_RejectsRedirect(t *testing.T) { t.Parallel() @@ -25,12 +100,12 @@ func TestGatewayHTTPClient_RejectsRedirect(t *testing.T) { t.Parallel() // Arrange - srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + srv := testutil.NewH2CServer(t, http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { w.Header().Set("Location", "http://attacker.example.com/") w.WriteHeader(code) })) - t.Cleanup(srv.Close) - client := newGatewayHTTPClient() + client, err := newGatewayHTTPClient(srv.URL) + require.NoError(t, err) // Act resp, err := client.Post(srv.URL, "application/proto", strings.NewReader("")) @@ -44,13 +119,3 @@ func TestGatewayHTTPClient_RejectsRedirect(t *testing.T) { }) } } - -func TestGatewayHTTPClient_HasTimeout(t *testing.T) { - t.Parallel() - - // Act - client := newGatewayHTTPClient() - - // Assert - assert.Equal(t, httpClientTimeout, client.Timeout) -} diff --git a/server/internal/fleetnodebootstrap/enrollment.go b/server/internal/fleetnodebootstrap/enrollment.go index 53f41166e..b21f7cf96 100644 --- a/server/internal/fleetnodebootstrap/enrollment.go +++ b/server/internal/fleetnodebootstrap/enrollment.go @@ -55,13 +55,18 @@ func Register(ctx context.Context, p RegisterParams) (*RegisterResult, error) { return nil, err } - client := NewGatewayClient(p.ServerURL) - resp, err := client.Register(ctx, connect.NewRequest(&pb.RegisterRequest{ + client, err := NewGatewayClient(p.ServerURL) + if err != nil { + return nil, err + } + callCtx, cancel := withHandshakeTimeout(ctx) + resp, err := client.Register(callCtx, connect.NewRequest(&pb.RegisterRequest{ EnrollmentToken: p.Code, Name: p.Name, IdentityPubkey: idPub, MinerSigningPubkey: mPub, })) + cancel() if err != nil { code := connect.CodeOf(err) if code == connect.CodeAlreadyExists || code == connect.CodeFailedPrecondition || code == connect.CodeUnauthenticated { @@ -107,7 +112,11 @@ func CompleteEnrollment(ctx context.Context, state *State, apiKey string) error attempt := *state attempt.APIKey = apiKey - if err := RunHandshake(ctx, NewGatewayClient(state.ServerURL), &attempt); err != nil { + client, err := NewGatewayClient(state.ServerURL) + if err != nil { + return err + } + if err := RunHandshake(ctx, client, &attempt); err != nil { return err } state.APIKey = attempt.APIKey diff --git a/server/internal/fleetnodebootstrap/enrollment_test.go b/server/internal/fleetnodebootstrap/enrollment_test.go index 67c4cc948..d477c0643 100644 --- a/server/internal/fleetnodebootstrap/enrollment_test.go +++ b/server/internal/fleetnodebootstrap/enrollment_test.go @@ -5,6 +5,7 @@ import ( "crypto/ed25519" "encoding/hex" "errors" + "net" "testing" "time" @@ -244,6 +245,40 @@ func TestCompleteEnrollment_RejectsEmptyInputs(t *testing.T) { assert.Contains(t, errEmptyKey.Error(), "apiKey") } +func TestRegister_TimesOutAgainstBlackholeServer(t *testing.T) { + // Arrange + prev := handshakeStepTimeout + handshakeStepTimeout = 250 * time.Millisecond + t.Cleanup(func() { handshakeStepTimeout = prev }) + + ln, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + t.Cleanup(func() { _ = ln.Close() }) + go func() { + for { + conn, err := ln.Accept() + if err != nil { + return + } + t.Cleanup(func() { _ = conn.Close() }) + } + }() + + // Act + start := time.Now() + _, err = Register(t.Context(), RegisterParams{ + ServerURL: "http://" + ln.Addr().String(), + Name: "stuck", + Code: "anything", + AllowInsecureTransport: true, + }) + elapsed := time.Since(start) + + // Assert + require.Error(t, err) + assert.Less(t, elapsed, 5*time.Second, "Register should not block past the per-call timeout") +} + func TestValidateServerURL(t *testing.T) { t.Parallel() diff --git a/server/internal/fleetnodebootstrap/handshake.go b/server/internal/fleetnodebootstrap/handshake.go index 7e9c7bcb0..2b68aad19 100644 --- a/server/internal/fleetnodebootstrap/handshake.go +++ b/server/internal/fleetnodebootstrap/handshake.go @@ -6,6 +6,7 @@ import ( "encoding/hex" "errors" "fmt" + "time" "connectrpc.com/connect" @@ -13,13 +14,15 @@ import ( "github.com/block/proto-fleet/server/generated/grpc/fleetnodegateway/v1/fleetnodegatewayv1connect" ) -// Wraps Unauthenticated from BeginAuthHandshake. The server returns it for -// api_key revocation, identity_pubkey mismatch, or any other auth failure -// on that call; the library cannot distinguish the cause. Distinct from -// CompleteAuthHandshake failures (expired challenge, bad signature). +var handshakeStepTimeout = 30 * time.Second + +// ErrBeginAuthRejected wraps Unauthenticated from BeginAuthHandshake, which +// the server returns for revoked api_key, identity_pubkey mismatch, or any +// auth failure on that call. Kept distinct from CompleteAuthHandshake errors +// (expired challenge, bad signature) so callers can branch on root cause. var ErrBeginAuthRejected = errors.New("BeginAuthHandshake rejected") -// Mutates s.SessionToken and s.SessionExpiresAt only on success. +// RunHandshake mutates s.SessionToken / s.SessionExpiresAt only on success. func RunHandshake(ctx context.Context, c fleetnodegatewayv1connect.FleetNodeGatewayServiceClient, s *State) error { if s == nil { return errors.New("state is required") @@ -42,10 +45,12 @@ func RunHandshake(ctx context.Context, c fleetnodegatewayv1connect.FleetNodeGate return errors.New("client is required") } - begin, err := c.BeginAuthHandshake(ctx, connect.NewRequest(&pb.BeginAuthHandshakeRequest{ + beginCtx, cancel := withHandshakeTimeout(ctx) + begin, err := c.BeginAuthHandshake(beginCtx, connect.NewRequest(&pb.BeginAuthHandshakeRequest{ ApiKey: s.APIKey, IdentityPubkey: pub, })) + cancel() if err != nil { if connect.CodeOf(err) == connect.CodeUnauthenticated { return fmt.Errorf("%w: %w", ErrBeginAuthRejected, err) @@ -55,10 +60,12 @@ func RunHandshake(ctx context.Context, c fleetnodegatewayv1connect.FleetNodeGate challenge := begin.Msg.GetChallenge() signature := ed25519.Sign(ed25519.PrivateKey(priv), challenge) - complete, err := c.CompleteAuthHandshake(ctx, connect.NewRequest(&pb.CompleteAuthHandshakeRequest{ + completeCtx, cancel := withHandshakeTimeout(ctx) + complete, err := c.CompleteAuthHandshake(completeCtx, connect.NewRequest(&pb.CompleteAuthHandshakeRequest{ Challenge: challenge, Signature: signature, })) + cancel() if err != nil { return fmt.Errorf("complete handshake: %w", err) } @@ -69,3 +76,7 @@ func RunHandshake(ctx context.Context, c fleetnodegatewayv1connect.FleetNodeGate } return nil } + +func withHandshakeTimeout(ctx context.Context) (context.Context, context.CancelFunc) { + return context.WithTimeout(ctx, handshakeStepTimeout) +} diff --git a/server/internal/fleetnodebootstrap/handshake_test.go b/server/internal/fleetnodebootstrap/handshake_test.go index 62788d6c6..4d7f7cd85 100644 --- a/server/internal/fleetnodebootstrap/handshake_test.go +++ b/server/internal/fleetnodebootstrap/handshake_test.go @@ -18,6 +18,7 @@ import ( pb "github.com/block/proto-fleet/server/generated/grpc/fleetnodegateway/v1" "github.com/block/proto-fleet/server/generated/grpc/fleetnodegateway/v1/fleetnodegatewayv1connect" + "github.com/block/proto-fleet/server/internal/testutil" ) type fakeAgentGateway struct { @@ -81,9 +82,7 @@ func newFakeServer(t *testing.T, fake *fakeAgentGateway) *httptest.Server { mux := http.NewServeMux() path, h := fleetnodegatewayv1connect.NewFleetNodeGatewayServiceHandler(fake) mux.Handle(path, h) - srv := httptest.NewServer(mux) - t.Cleanup(srv.Close) - return srv + return testutil.NewH2CServer(t, mux) } func TestRunHandshake_RejectsNilState(t *testing.T) { @@ -137,7 +136,8 @@ func TestRunHandshake_HappyPath(t *testing.T) { IdentityPrivateKeyHex: hex.EncodeToString(priv), IdentityPublicKeyHex: hex.EncodeToString(pub), } - client := NewGatewayClient(srv.URL) + client, err := NewGatewayClient(srv.URL) + require.NoError(t, err) // Act err = RunHandshake(t.Context(), client, state) @@ -166,7 +166,8 @@ func TestRunHandshake_WrongAPIKey(t *testing.T) { IdentityPrivateKeyHex: hex.EncodeToString(priv), IdentityPublicKeyHex: hex.EncodeToString(pub), } - client := NewGatewayClient(srv.URL) + client, err := NewGatewayClient(srv.URL) + require.NoError(t, err) // Act err = RunHandshake(t.Context(), client, state) @@ -250,7 +251,8 @@ func TestRunHandshake_BadSignature(t *testing.T) { IdentityPrivateKeyHex: hex.EncodeToString(otherPriv), IdentityPublicKeyHex: hex.EncodeToString(pub), } - client := NewGatewayClient(srv.URL) + client, err := NewGatewayClient(srv.URL) + require.NoError(t, err) // Act err = RunHandshake(t.Context(), client, state) diff --git a/server/internal/fleetnodebootstrap/locking_unix.go b/server/internal/fleetnodebootstrap/locking_unix.go index e067302e4..9d393ffae 100644 --- a/server/internal/fleetnodebootstrap/locking_unix.go +++ b/server/internal/fleetnodebootstrap/locking_unix.go @@ -3,16 +3,21 @@ package fleetnodebootstrap import ( + "errors" "fmt" + "io" "os" "path/filepath" + "strconv" + "strings" "syscall" ) -// Serializes concurrent refreshes via flock on /state.lock so a slower -// writer can't clobber a newer state.yaml. Refuses to follow a symlink at -// the dir leaf so the lock can't land in an attacker-chosen location and -// silently break serialization for the SaveState that follows. +// WithStateLock serializes refreshes via flock on /state.lock. Lstat +// rejects symlink leaves so an attacker can't redirect the lock to a path +// they control and break serialization of the following SaveState. LOCK_NB +// fails fast with a useful error (naming the recorded holder pid) instead +// of blocking forever. func WithStateLock(dir string, fn func() error) error { if info, err := os.Lstat(dir); err == nil && info.Mode()&os.ModeSymlink != 0 { return fmt.Errorf("state dir %s is a symlink; refusing to take a lock through it", dir) @@ -23,22 +28,70 @@ func WithStateLock(dir string, fn func() error) error { if err := tightenStateDirPerms(dir); err != nil { return err } - f, err := os.OpenFile(filepath.Join(dir, "state.lock"), os.O_CREATE|os.O_RDWR, 0o600) + lockPath := filepath.Join(dir, "state.lock") + f, err := os.OpenFile(lockPath, os.O_CREATE|os.O_RDWR, 0o600) if err != nil { return fmt.Errorf("open state lock: %w", err) } - // Kernel releases flock on close; explicit LOCK_UN would race with the - // deferred Close. + // Closing f releases the flock; explicit LOCK_UN would race the defer. defer func() { _ = f.Close() }() - if err := syscall.Flock(int(f.Fd()), syscall.LOCK_EX); err != nil { + if err := syscall.Flock(int(f.Fd()), syscall.LOCK_EX|syscall.LOCK_NB); err != nil { + if errors.Is(err, syscall.EWOULDBLOCK) { + return contendedLockError(lockPath, f) + } return fmt.Errorf("acquire state lock: %w", err) } + _ = writeLockOwnerPID(f) // best-effort; failure only degrades the next contention error. return fn() } -// fsyncs a directory so a preceding os.Rename is durable across a power -// loss. POSIX only: Windows directory handles do not support -// FlushFileBuffers. +func writeLockOwnerPID(f *os.File) error { + if _, err := f.Seek(0, io.SeekStart); err != nil { + return fmt.Errorf("seek lock file: %w", err) + } + if err := f.Truncate(0); err != nil { + return fmt.Errorf("truncate lock file: %w", err) + } + if _, err := fmt.Fprintf(f, "%d\n", os.Getpid()); err != nil { + return fmt.Errorf("write lock owner pid: %w", err) + } + return nil +} + +func contendedLockError(lockPath string, f *os.File) error { + if pid, ok := readLockOwnerPID(f); ok { + if processAlive(pid) { + return fmt.Errorf("state lock %s held by fleetnode pid=%d; stop it (kill %d) or use a different --state-dir", lockPath, pid, pid) + } + // Dead owner + held lock means a subprocess inherited the FD. + return fmt.Errorf("state lock %s contended; recorded owner pid=%d is not running but the lock is still held (likely a subprocess inherited the FD); kill any lingering fleetnode children or use a different --state-dir", lockPath, pid) + } + return fmt.Errorf("state lock %s held by an unknown process; check `pgrep -lf fleetnode` and stop it, or use a different --state-dir", lockPath) +} + +func readLockOwnerPID(f *os.File) (int, bool) { + if _, err := f.Seek(0, io.SeekStart); err != nil { + return 0, false + } + data, err := io.ReadAll(io.LimitReader(f, 32)) + if err != nil { + return 0, false + } + pid, err := strconv.Atoi(strings.TrimSpace(string(data))) + if err != nil || pid <= 0 { + return 0, false + } + return pid, true +} + +func processAlive(pid int) bool { + err := syscall.Kill(pid, 0) + // EPERM means alive but different uid -- still "alive" for our purposes. + return err == nil || errors.Is(err, syscall.EPERM) +} + +// syncDir fsyncs a directory so a preceding os.Rename survives a power loss. +// POSIX only; Windows directory handles don't support FlushFileBuffers. func syncDir(path string) error { f, err := os.Open(path) if err != nil { diff --git a/server/internal/fleetnodebootstrap/refresh.go b/server/internal/fleetnodebootstrap/refresh.go index 2adbdac53..206680cba 100644 --- a/server/internal/fleetnodebootstrap/refresh.go +++ b/server/internal/fleetnodebootstrap/refresh.go @@ -21,5 +21,9 @@ func Refresh(ctx context.Context, state *State) error { if err := ValidateServerURL(state.ServerURL, state.AllowInsecureTransport); err != nil { return err } - return RunHandshake(ctx, NewGatewayClient(state.ServerURL), state) + client, err := NewGatewayClient(state.ServerURL) + if err != nil { + return err + } + return RunHandshake(ctx, client, state) } diff --git a/server/internal/testutil/h2c.go b/server/internal/testutil/h2c.go new file mode 100644 index 000000000..019112b57 --- /dev/null +++ b/server/internal/testutil/h2c.go @@ -0,0 +1,35 @@ +package testutil + +import ( + "context" + "crypto/tls" + "net" + "net/http" + "net/http/httptest" + "testing" + + "golang.org/x/net/http2" + "golang.org/x/net/http2/h2c" +) + +// NewH2CServer is HTTP/2-over-cleartext; required for Connect-RPC bidi +// streams, which can't run over HTTP/1.1. +func NewH2CServer(t *testing.T, h http.Handler) *httptest.Server { + t.Helper() + srv := httptest.NewUnstartedServer(h2c.NewHandler(h, &http2.Server{})) + srv.Start() + t.Cleanup(srv.Close) + return srv +} + +func NewH2CClient() *http.Client { + return &http.Client{ + Transport: &http2.Transport{ + AllowHTTP: true, + DialTLSContext: func(ctx context.Context, network, addr string, _ *tls.Config) (net.Conn, error) { + var d net.Dialer + return d.DialContext(ctx, network, addr) + }, + }, + } +} From 83ff5297ddbac311a333307f959aaeb81092ebeb Mon Sep 17 00:00:00 2001 From: Ankit Goswami Date: Wed, 27 May 2026 12:46:12 -0700 Subject: [PATCH 2/2] fleetnode: address PR 322 review - Replace bare http2.Transport with net/http Transport on the https branch so ALPN negotiates H2 when available and falls back to HTTP/1.1. Packaged nginx terminates TLS and proxies upstream over HTTP/1.1, which the H2-only transport refused (Codex P1). - Stop calling t.Cleanup from the blackhole accept-loop goroutine; collect conns under a mutex and close them via a single Cleanup registered on the test goroutine. testing.T.Cleanup is not goroutine-safe. - Replace the "fleetnode run lands in the run-heartbeat PR" placeholder with a real Subcommands row: run is already registered in main.go. Co-Authored-By: Claude Opus 4.7 (1M context) --- server/cmd/fleetnode/README.md | 3 +-- server/internal/fleetnodebootstrap/client.go | 13 ++++++++++-- .../fleetnodebootstrap/client_test.go | 5 ++--- .../fleetnodebootstrap/enrollment_test.go | 21 +++++++++++++++++-- 4 files changed, 33 insertions(+), 9 deletions(-) diff --git a/server/cmd/fleetnode/README.md b/server/cmd/fleetnode/README.md index 6fd072bf5..3840d5a73 100644 --- a/server/cmd/fleetnode/README.md +++ b/server/cmd/fleetnode/README.md @@ -9,8 +9,7 @@ | `fleetnode enroll` | Register with a fleet server using a one-time enrollment code. Persists keys and `api_key`. See [enroll.go](enroll.go). | | `fleetnode status` | Print local state (server URL, fleet_node_id, fingerprint, session expiry). See [status.go](status.go). | | `fleetnode refresh` | Renew the session token using the stored `api_key`. See [refresh.go](refresh.go). | - -`fleetnode run` lands in the run-heartbeat PR. +| `fleetnode run` | Long-running daemon: maintain session, post heartbeats, and (in the discovery PR) consume the control stream. See [run.go](run.go). | ## State directory and lock diff --git a/server/internal/fleetnodebootstrap/client.go b/server/internal/fleetnodebootstrap/client.go index 4db33a28b..47661935a 100644 --- a/server/internal/fleetnodebootstrap/client.go +++ b/server/internal/fleetnodebootstrap/client.go @@ -8,6 +8,7 @@ import ( "net" "net/http" "net/url" + "time" "golang.org/x/net/http2" @@ -19,7 +20,10 @@ import ( var errRedirectNotAllowed = errors.New("redirects are not allowed for connect-rpc calls") // A shared AllowHTTP+DialTLSContext shim would silently downgrade https to -// plaintext, defeating ValidateServerURL's https-required policy. +// plaintext, defeating ValidateServerURL's https-required policy. The https +// branch uses net/http's Transport so ALPN can negotiate H2 when available +// and fall back to HTTP/1.1 — packaged nginx terminates TLS and proxies +// upstream over HTTP/1.1, so a bare http2.Transport refuses to talk to it. func newGatewayHTTPClient(serverURL string) (*http.Client, error) { u, err := url.Parse(serverURL) if err != nil { @@ -36,7 +40,12 @@ func newGatewayHTTPClient(serverURL string) (*http.Client, error) { }, } case "https": - tr = &http2.Transport{} + tr = &http.Transport{ + Proxy: http.ProxyFromEnvironment, + TLSHandshakeTimeout: 10 * time.Second, + ExpectContinueTimeout: 1 * time.Second, + ForceAttemptHTTP2: true, + } default: return nil, fmt.Errorf("server-url scheme must be http or https; got %q", u.Scheme) } diff --git a/server/internal/fleetnodebootstrap/client_test.go b/server/internal/fleetnodebootstrap/client_test.go index c9c4cec04..46da06d1a 100644 --- a/server/internal/fleetnodebootstrap/client_test.go +++ b/server/internal/fleetnodebootstrap/client_test.go @@ -10,7 +10,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "golang.org/x/net/http2" "github.com/block/proto-fleet/server/internal/testutil" ) @@ -35,8 +34,8 @@ func TestGatewayHTTPClient_HTTPS_ValidatesCertificate(t *testing.T) { srv := newHTTP2TestServer(t) trusted, err := newGatewayHTTPClient(srv.URL) require.NoError(t, err) - tlsTransport, ok := trusted.Transport.(*http2.Transport) - require.True(t, ok, "https client must use http2.Transport") + tlsTransport, ok := trusted.Transport.(*http.Transport) + require.True(t, ok, "https client must use http.Transport") pool := x509.NewCertPool() pool.AddCert(srv.Certificate()) tlsTransport.TLSClientConfig = &tls.Config{RootCAs: pool, MinVersion: tls.VersionTLS12} diff --git a/server/internal/fleetnodebootstrap/enrollment_test.go b/server/internal/fleetnodebootstrap/enrollment_test.go index d477c0643..ccc0cc854 100644 --- a/server/internal/fleetnodebootstrap/enrollment_test.go +++ b/server/internal/fleetnodebootstrap/enrollment_test.go @@ -6,6 +6,7 @@ import ( "encoding/hex" "errors" "net" + "sync" "testing" "time" @@ -253,14 +254,30 @@ func TestRegister_TimesOutAgainstBlackholeServer(t *testing.T) { ln, err := net.Listen("tcp", "127.0.0.1:0") require.NoError(t, err) - t.Cleanup(func() { _ = ln.Close() }) + + // testing.T.Cleanup is not safe to call from a goroutine, so the accept + // loop collects conns under a lock and a single Cleanup closes them all. + var ( + mu sync.Mutex + conns []net.Conn + ) + t.Cleanup(func() { + _ = ln.Close() + mu.Lock() + defer mu.Unlock() + for _, c := range conns { + _ = c.Close() + } + }) go func() { for { conn, err := ln.Accept() if err != nil { return } - t.Cleanup(func() { _ = conn.Close() }) + mu.Lock() + conns = append(conns, conn) + mu.Unlock() } }()