Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions server/cmd/fleetd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ import (
fleetmanagementDomain "github.com/block/proto-fleet/server/internal/domain/fleetmanagement"
fleetnodeauth "github.com/block/proto-fleet/server/internal/domain/fleetnode/auth"
"github.com/block/proto-fleet/server/internal/domain/fleetnode/control"
fleetnodediscovery "github.com/block/proto-fleet/server/internal/domain/fleetnode/discovery"
"github.com/block/proto-fleet/server/internal/domain/fleetnode/enrollment"
fleetnodepairing "github.com/block/proto-fleet/server/internal/domain/fleetnode/pairing"
"github.com/block/proto-fleet/server/internal/domain/fleetoptions"
Expand Down Expand Up @@ -220,6 +221,7 @@ func start(config *Config) error {
fleetNodePairingStore := sqlstores.NewSQLFleetNodePairingStore(conn)
fleetNodePairingSvc := fleetnodepairing.NewService(fleetNodePairingStore, fleetNodeEnrollmentStore, transactor)
fleetNodeControlRegistry := control.NewRegistry()
fleetNodeDiscoverySvc := fleetnodediscovery.NewService(fleetNodeControlRegistry, fleetNodeEnrollmentSvc)
fleetNodeAuthStore := sqlstores.NewSQLFleetNodeAuthStore(conn)
fleetNodeAuthSvc := fleetnodeauth.NewService(fleetNodeAuthStore, fleetNodeEnrollmentStore, apiKeySvc)

Expand Down Expand Up @@ -522,7 +524,7 @@ func start(config *Config) error {

mux.Handle(authv1connect.NewAuthServiceHandler(auth.NewHandler(authSvc), li))
mux.Handle(onboardingv1connect.NewOnboardingServiceHandler(onboarding.NewHandler(authSvc, onboardingSvc), li))
mux.Handle(pairingv1connect.NewPairingServiceHandler(pairing.NewHandler(pairingSvc), li))
mux.Handle(pairingv1connect.NewPairingServiceHandler(pairing.NewHandler(pairingSvc, fleetNodeDiscoverySvc), li))
mux.Handle(networkinfov1connect.NewNetworkInfoServiceHandler(networkinfo.NewHandler(pairingSvc), li))
mux.Handle(fleetmanagementv1connect.NewFleetManagementServiceHandler(fleetmanagement.NewHandler(fleetMgmtSvc), li))
mux.Handle(minercommandv1connect.NewMinerCommandServiceHandler(command.NewHandler(commandSvc), li))
Expand All @@ -532,7 +534,7 @@ func start(config *Config) error {
mux.Handle(sitesv1connect.NewSiteServiceHandler(sitesHandler.NewHandler(sitesSvc), li))
mux.Handle(buildingsv1connect.NewBuildingServiceHandler(buildingsHandler.NewHandler(buildingsSvc), li))
mux.Handle(fleetnodegatewayv1connect.NewFleetNodeGatewayServiceHandler(gateway.NewHandler(fleetNodeEnrollmentSvc, fleetNodeAuthSvc, fleetNodePairingSvc, fleetNodeControlRegistry), li))
mux.Handle(fleetnodeadminv1connect.NewFleetNodeAdminServiceHandler(admin.NewHandler(fleetNodeEnrollmentSvc, fleetNodePairingSvc, fleetNodeControlRegistry), li))
mux.Handle(fleetnodeadminv1connect.NewFleetNodeAdminServiceHandler(admin.NewHandler(fleetNodeEnrollmentSvc, fleetNodePairingSvc, fleetNodeDiscoverySvc), li))
mux.Handle(collectionv1connect.NewDeviceCollectionServiceHandler(collectionHandler.NewHandler(collectionSvc), li))
mux.Handle(device_setv1connect.NewDeviceSetServiceHandler(devicesetHandler.NewHandler(collectionSvc), li))
mux.Handle(telemetryv1connect.NewTelemetryServiceHandler(telemetryHandler.NewHandler(telemetryService), li))
Expand Down
97 changes: 90 additions & 7 deletions server/cmd/fleetnode/nmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,68 @@ import (
pairingpb "github.com/block/proto-fleet/server/generated/grpc/pairing/v1"
"github.com/block/proto-fleet/server/internal/domain/netutil"
"github.com/block/proto-fleet/server/internal/domain/nmaptarget"
"github.com/block/proto-fleet/server/internal/infrastructure/networking"
)

// errNoLocalSubnet means the host has no usable IPv4 subnet to scan for a
// LocalSubnetTarget command. Surfaces as AGENT_INCAPABLE so a fan-out skips this
// node and tries the others.
var errNoLocalSubnet = errors.New("no local IPv4 subnet found")

// detectLocalSubnets returns the subnet(s) the agent scans for a local-subnet
// nmap command (the nmaptarget.LocalSubnetTarget sentinel).
//
// It reuses the same primary-interface detection the cloud Discover path uses
// (networking.GetLocalNetworkInfo). That is intentionally less robust than
// per-NIC private filtering — it picks one interface, doesn't skip
// virtual/container NICs, and returns the raw OS mask. The caller
// (buildNmapOptions) rejects non-private or over-broad results before scanning
// (see validateLocalSubnetTarget). The localSubnets seam lets tests inject
// canned CIDRs.
func (r *RunCmd) detectLocalSubnets() ([]string, error) {
if r.localSubnets != nil {
return r.localSubnets()
}
info, err := networking.GetLocalNetworkInfo()
if err != nil {
return nil, fmt.Errorf("get local network info: %w", err)
}
if info.Subnet == "" {
return nil, errNoLocalSubnet
}
return []string{info.Subnet}, nil
}

// validateNmapTarget enforces the shared nmap target grammar (see nmaptarget).
func validateNmapTarget(s string) error {
return nmaptarget.Validate(s)
}

// validateLocalSubnetTarget guards the local-subnet sentinel: a detected subnet
// must be a private (RFC1918) IPv4 prefix no broader than the shared scan-size
// cap before it is scanned. Primary-interface detection doesn't filter for
// RFC1918 and returns the raw OS interface mask, so a public NIC or an
// over-broad prefix (e.g. 10.0.0.0/16) would otherwise reach nmap. The breadth
// limit mirrors nmaptarget.Validate so the fan-out can't sweep more hosts per
// node than an operator-supplied target is allowed to.
func validateLocalSubnetTarget(s string) error {
prefix, err := netip.ParsePrefix(s)
if err != nil {
return fmt.Errorf("not a CIDR: %w", err)
}
addr := prefix.Addr()
if !addr.Is4() {
return errors.New("not IPv4")
}
if !addr.IsPrivate() {
return errors.New("not in an RFC1918 private range")
}
if prefix.Bits() < nmaptarget.MinIPv4PrefixBits {
return fmt.Errorf("prefix /%d is broader than the supported maximum /%d", prefix.Bits(), nmaptarget.MinIPv4PrefixBits)
}
return nil
}

const (
nmapHostTimeout = 10 * time.Second
nmapMinRTT = 100 * time.Millisecond
Expand Down Expand Up @@ -107,16 +162,48 @@ func validateNmapBinary(path string) (string, error) {

func (r *RunCmd) buildNmapOptions(ctx context.Context, req *pairingpb.NmapModeRequest, ports []string) ([]nmap.Option, error) {
target := strings.TrimSpace(req.GetTarget())

// The LocalSubnetTarget sentinel means the server couldn't know the node's
// network, so the agent enumerates its own private IPv4 subnet(s) and scans
// those (IPv4 only, same as the manual path's IPv6-CIDR rejection). Matched
// exactly, before any hostname resolution.
if target == nmaptarget.LocalSubnetTarget {
subnets, err := r.detectLocalSubnets()
if err != nil {
return nil, cmdErr(pb.AckCode_ACK_CODE_AGENT_INCAPABLE, "no connected private IPv4 subnet for local-subnet scan: %s", err)
}
// detectLocalSubnets reuses primary-interface detection that doesn't
// filter for RFC1918 or cap breadth, so a public NIC or an over-broad
// prefix would otherwise be scanned. Refuse such targets here so an
// automatic "Scan your network" can never probe a public network or sweep
// more hosts than an operator-supplied target may.
for _, s := range subnets {
if err := validateLocalSubnetTarget(s); err != nil {
return nil, cmdErr(pb.AckCode_ACK_CODE_AGENT_INCAPABLE, "local-subnet scan target %q is not scannable: %s", s, err)
}
}
return append(baseNmapOptions(r.nmapPath, ports), nmap.WithTargets(subnets...)), nil
Comment thread
ankitgoswami marked this conversation as resolved.
Comment thread
ankitgoswami marked this conversation as resolved.
Comment thread
ankitgoswami marked this conversation as resolved.
}

if err := validateNmapTarget(target); err != nil {
return nil, cmdErr(pb.AckCode_ACK_CODE_BAD_REQUEST, "%s", err)
}
resolved, useIPv6, err := resolveNmapTarget(ctx, target, r.lookupIPAddr())
if err != nil {
return nil, cmdErr(pb.AckCode_ACK_CODE_BAD_REQUEST, "%s", err)
}
opts := []nmap.Option{
nmap.WithBinaryPath(r.nmapPath),
nmap.WithTargets(resolved),
opts := append(baseNmapOptions(r.nmapPath, ports), nmap.WithTargets(resolved))
if useIPv6 {
opts = append(opts, nmap.WithIPv6Scanning())
}
return opts, nil
}

// baseNmapOptions are the timing/safety options shared by targeted and
// local-subnet scans; callers append the target(s) (and -6 if needed).
func baseNmapOptions(binaryPath string, ports []string) []nmap.Option {
return []nmap.Option{
nmap.WithBinaryPath(binaryPath),
nmap.WithPorts(strings.Join(ports, ",")),
nmap.WithUnique(),
nmap.WithDisabledDNSResolution(),
Expand All @@ -125,10 +212,6 @@ func (r *RunCmd) buildNmapOptions(ctx context.Context, req *pairingpb.NmapModeRe
nmap.WithHostTimeout(nmapHostTimeout),
nmap.WithMinRTTTimeout(nmapMinRTT),
}
if useIPv6 {
opts = append(opts, nmap.WithIPv6Scanning())
}
return opts, nil
}

// Mirrors pairing-service validateNmapTargets so agent and server feed
Expand Down
75 changes: 75 additions & 0 deletions server/cmd/fleetnode/nmap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

pb "github.com/block/proto-fleet/server/generated/grpc/fleetnodegateway/v1"
pairingpb "github.com/block/proto-fleet/server/generated/grpc/pairing/v1"
"github.com/block/proto-fleet/server/internal/domain/nmaptarget"
)

func testLogger() *slog.Logger { return slog.New(slog.DiscardHandler) }
Expand Down Expand Up @@ -246,6 +248,79 @@ func TestBuildNmapOptions_AddsIPv6Scanning(t *testing.T) {
assert.True(t, slices.Contains(v6Scanner.Args(), "-6"), "IPv6 target must carry -6")
}

func TestBuildNmapOptions_LocalSubnetTarget_UsesDetectedCIDRs(t *testing.T) {
// Arrange: inject the detected subnets so the test doesn't depend on the host.
r := &RunCmd{
nmapPath: "/usr/bin/nmap",
discoverer: &stubDiscoverer{},
localSubnets: func() ([]string, error) { return []string{"192.168.1.0/24"}, nil },
}
req := &pairingpb.NmapModeRequest{Target: nmaptarget.LocalSubnetTarget, Ports: []string{"4028"}}

// Act
opts, err := r.buildNmapOptions(context.Background(), req, req.Ports)
require.NoError(t, err)
scanner, err := nmap.NewScanner(context.Background(), opts...)
require.NoError(t, err)

// Assert: the detected subnet reaches nmap and the scan stays IPv4-only.
args := scanner.Args()
assert.True(t, slices.Contains(args, "192.168.1.0/24"), "expected detected subnet in argv: %v", args)
assert.False(t, slices.Contains(args, "-6"), "local-subnet scan must be IPv4-only: %v", args)
assert.False(t, slices.Contains(args, nmaptarget.LocalSubnetTarget), "sentinel must not reach nmap as a literal target: %v", args)
}

func TestBuildNmapOptions_LocalSubnetTarget_RejectsUnscannableSubnet(t *testing.T) {
tests := []struct {
name string
subnet string
}{
// A node whose primary NIC is public must never have it probed.
{name: "public subnet", subnet: "8.8.8.0/24"},
// A private prefix broader than the /22 cap would sweep tens of
// thousands of hosts; reject it like an operator-supplied target.
{name: "over-broad private subnet", subnet: "10.0.0.0/16"},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
// Arrange
r := &RunCmd{
nmapPath: "/usr/bin/nmap",
discoverer: &stubDiscoverer{},
localSubnets: func() ([]string, error) { return []string{tc.subnet}, nil },
}
req := &pairingpb.NmapModeRequest{Target: nmaptarget.LocalSubnetTarget, Ports: []string{"4028"}}

// Act
_, err := r.buildNmapOptions(context.Background(), req, req.Ports)

// Assert: refused before any scan, mapped so a fan-out skips this node.
var ce *commandError
require.ErrorAs(t, err, &ce)
assert.Equal(t, pb.AckCode_ACK_CODE_AGENT_INCAPABLE, ce.code)
})
}
}

func TestBuildNmapOptions_LocalSubnetTarget_NoSubnetIsAgentIncapable(t *testing.T) {
// Arrange: detection finds no private subnet.
r := &RunCmd{
nmapPath: "/usr/bin/nmap",
discoverer: &stubDiscoverer{},
localSubnets: func() ([]string, error) { return nil, errNoLocalSubnet },
}
req := &pairingpb.NmapModeRequest{Target: nmaptarget.LocalSubnetTarget, Ports: []string{"4028"}}

// Act
_, err := r.buildNmapOptions(context.Background(), req, req.Ports)

// Assert: a fan-out should skip this node, so the ack maps to FailedPrecondition.
var ce *commandError
require.ErrorAs(t, err, &ce)
assert.Equal(t, pb.AckCode_ACK_CODE_AGENT_INCAPABLE, ce.code)
}

func TestDiscoverForCommand_NmapPathEmptyFailsClosed(t *testing.T) {
// Arrange
r := &RunCmd{nmapPath: "", discoverer: &stubDiscoverer{}}
Expand Down
1 change: 1 addition & 0 deletions server/cmd/fleetnode/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type RunCmd struct {
discoverer discoverer `kong:"-"`
nmapPath string `kong:"-"`
resolver ipResolver `kong:"-"`
localSubnets func() ([]string, error) `kong:"-"` // test seam for local-subnet detection

stateMu sync.Mutex `kong:"-"` // guards st.SessionToken across refreshAndSave + tokenSource.
}
Expand Down
13 changes: 13 additions & 0 deletions server/internal/domain/fleetnode/control/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,19 @@ func NewRegistry() *Registry {
return &Registry{conns: make(map[int64]*connection)}
}

// ConnectedFleetNodeIDs returns the fleet_node IDs with an active ControlStream
// right now. Used by fan-out discovery to target only nodes the server can reach;
// callers intersect this with the org's CONFIRMED nodes. Order is unspecified.
func (r *Registry) ConnectedFleetNodeIDs() []int64 {
r.mu.Lock()
defer r.mu.Unlock()
ids := make([]int64, 0, len(r.conns))
for id := range r.conns {
ids = append(ids, id)
}
return ids
}

// teardown closes connection.done and cmd.done (if any). Caller holds Registry.mu
// and must then remove/replace the conn so teardown can't run twice.
func teardown(conn *connection) {
Expand Down
18 changes: 18 additions & 0 deletions server/internal/domain/fleetnode/control/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,3 +327,21 @@ func receive(t *testing.T, ch <-chan CommandEvent) CommandEvent {
return CommandEvent{}
}
}

func TestConnectedFleetNodeIDs_ReflectsRegisterAndUnregister(t *testing.T) {
// Arrange
r := NewRegistry()
s1 := r.Register(1)
s2 := r.Register(2)

// Act + Assert: both connected.
assert.ElementsMatch(t, []int64{1, 2}, r.ConnectedFleetNodeIDs())

// Act + Assert: unregistering one drops it.
s1.Unregister()
assert.ElementsMatch(t, []int64{2}, r.ConnectedFleetNodeIDs())

// Act + Assert: empty once all are gone.
s2.Unregister()
assert.Empty(t, r.ConnectedFleetNodeIDs())
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package admin
package discovery

import (
"testing"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package admin
package discovery

import (
"testing"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package admin
package discovery

import (
"net/netip"
Expand All @@ -25,8 +25,21 @@ func buildReportScope(req *pairingpb.DiscoverRequest) control.ReportScope {
return inPort(port) && inIP(ip)
}
case *pairingpb.DiscoverRequest_Nmap:
inTarget := nmapTargetMatcher(m.Nmap.GetTarget())
inPort := portMatcher(m.Nmap.GetPorts())
// The LocalSubnetTarget sentinel lets the agent pick its own subnet, so
// the server can't predict the IPs. Degrade the IP scope to the
// private-only invariant (RFC1918/RFC4193) that validateReport
// independently enforces; port scoping is still applied.
if m.Nmap.GetTarget() == nmaptarget.LocalSubnetTarget {
return func(ip, port string) bool {
if !inPort(port) {
return false
}
a, ok := parseScopeAddr(ip)
return ok && a.IsPrivate()
}
Comment thread
ankitgoswami marked this conversation as resolved.
}
inTarget := nmapTargetMatcher(m.Nmap.GetTarget())
return func(ip, port string) bool {
return inPort(port) && inTarget(ip)
}
Expand Down
Loading
Loading