Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions proto/fleetnodeadmin/v1/fleetnodeadmin.proto
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,8 @@ message DiscoverOnFleetNodeResponse {
message ListFleetNodeDiscoveredDevicesRequest {
// 0 = all fleet nodes in org; > 0 restricts to that fleet node.
int64 fleet_node_id = 1 [(buf.validate.field).int64.gte = 0];
// Max devices to return; 0 = no limit. A node can discover thousands of
// devices, so operators should page.
// Max devices to return; 0 = server default page size (1024, also the cap).
// A node can discover thousands of devices, so operators page via next_cursor.
int32 limit = 2 [(buf.validate.field).int32 = {gte: 0, lte: 1024}];
// Forward cursor: pass the previous response's next_cursor; 0 = first page.
int64 cursor = 3 [(buf.validate.field).int64.gte = 0];
Expand Down
26 changes: 25 additions & 1 deletion proto/fleetnodegateway/v1/fleetnodegateway.proto
Original file line number Diff line number Diff line change
Expand Up @@ -144,13 +144,37 @@ enum PairOutcome {
message FleetNodePairResult {
string device_identifier = 1 [(buf.validate.field).string = {min_len: 1, max_len: 255}];
PairOutcome outcome = 2;
// Identity learned during pairing; populated on PAIRED. Never carries credentials.
// Identity learned during pairing; populated on PAIRED.
string serial_number = 3 [(buf.validate.field).string.max_len = 255];
string mac_address = 4 [(buf.validate.field).string.max_len = 64];
string model = 5 [(buf.validate.field).string.max_len = 255];
string manufacturer = 6 [(buf.validate.field).string.max_len = 255];
string firmware_version = 7 [(buf.validate.field).string.max_len = 255];
string error_message = 8 [(buf.validate.field).string.max_len = 4096];

// Flat scalars couldn't distinguish a basic-auth credential with an empty
// username/password from an absent one, so a successful empty-credential
// pairing was persisted with no auth material. Replaced by used_credentials,
// whose message presence carries that distinction.
reserved 9, 10;
reserved "used_username", "used_password";

// The credentials the node authenticated with. Set for basic-auth drivers
// (operator-supplied OR plugin defaults) and absent for asymmetric-auth
// drivers, which pair with the node's signing key and carry no credentials.
// Presence is meaningful: a present message -- even with an empty username or
// password -- means the cloud must persist it as the device's auth material;
// absent means store nothing.
UsedCredentials used_credentials = 11;
}

// UsedCredentials carries the basic-auth credentials a node authenticated with,
// echoed so the cloud can persist working auth material for the device. Capped to
// bound the ReportPairedDevices payload. The node refuses to pair (reports ERROR)
// rather than authenticate with a credential it cannot report within these caps.
message UsedCredentials {
string username = 1 [(buf.validate.field).string.max_len = 255];
string password = 2 [(buf.validate.field).string.max_len = 1024];
}

message ReportPairedDevicesRequest {
Expand Down
2 changes: 2 additions & 0 deletions server/cmd/fleetd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,8 @@ func start(config *Config) error {
discoverer := pluginService.CreateDiscoverer()
discoveredDeviceStore := sqlstores.NewSQLDiscoveredDeviceStore(conn)

fleetNodePairingSvc.WithProvisioning(deviceStore, discoveredDeviceStore, encryptSvc, fleetNodeControlRegistry)

timescaledbService, err := timescaledb.NewTelemetryStore(conn, config.TimescaleDB)
if err != nil {
return err
Expand Down
7 changes: 6 additions & 1 deletion server/cmd/fleetnode/control_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,7 @@ type controlFakeBehavior struct {
rejectWithCode connect.Code
reportErr error
pairReportErr error
pairRejected int64
}

type pendingCommand struct {
Expand Down Expand Up @@ -535,11 +536,15 @@ func (f *controlFakeGateway) ReportPairedDevices(_ context.Context, req *connect
f.mu.Lock()
f.pairReports = append(f.pairReports, req.Msg)
reportErr := f.behavior.pairReportErr
rejected := f.behavior.pairRejected
f.mu.Unlock()
if reportErr != nil {
return nil, reportErr
}
return connect.NewResponse(&pb.ReportPairedDevicesResponse{AcceptedCount: int64(len(req.Msg.GetResults()))}), nil
return connect.NewResponse(&pb.ReportPairedDevicesResponse{
AcceptedCount: int64(len(req.Msg.GetResults())) - rejected,
RejectedCount: rejected,
}), nil
}

func (f *controlFakeGateway) pairReportsCopy() []*pb.ReportPairedDevicesRequest {
Expand Down
87 changes: 71 additions & 16 deletions server/cmd/fleetnode/pair.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,17 @@ const pairConcurrency = 16
const (
maxPairIdentityBytes = 255
maxPairMACBytes = 64
maxUsedPasswordBytes = 1024
)

// credentialsReportable reports whether username/password fit the
// FleetNodePairResult caps. We refuse an oversized credential rather than pair with
// it: the node could authenticate but the cloud couldn't persist it back, leaving
// the device PAIRED but unusable.
func credentialsReportable(username, password string) bool {
return len(username) <= maxPairIdentityBytes && len(password) <= maxUsedPasswordBytes
}

// perPairTimeout bounds one device's auth handshake. var so tests can shrink it.
var perPairTimeout = 60 * time.Second

Expand Down Expand Up @@ -88,19 +97,30 @@ func (p *pluginPairer) Pair(ctx context.Context, target *pairingpb.FleetNodePair
// Asymmetric-auth drivers (Proto) pair with the node's own miner-signing key;
// operator-supplied username/password covers basic-auth drivers.
if bundle, ok := secretBundleFor(plugin.Caps, p.minerSigningPubKey, creds); ok {
basicAuth := !plugin.Caps[sdk.CapabilityAsymmetricAuth]
if basicAuth && !credentialsReportable(creds.GetUsername(), creds.GetPassword()) {
res.Outcome = pb.PairOutcome_PAIR_OUTCOME_ERROR
res.ErrorMessage = "supplied credentials exceed the maximum reportable size"
return res
}
updated, pairErr := plugin.Driver.PairDevice(ctx, deviceInfo, bundle)
if pairErr != nil {
classifyNodePairError(pairErr, res)
return res
}
setPaired(res, updated)
if basicAuth {
res.UsedCredentials = &pb.UsedCredentials{Username: creds.GetUsername(), Password: creds.GetPassword()}
}
return res
}

// No credentials supplied: try plugin-provided defaults.
if provider, ok := plugin.Driver.(sdk.DefaultCredentialsProvider); ok {
defaults := provider.GetDefaultCredentials(ctx, target.GetManufacturer(), target.GetFirmwareVersion())
for _, c := range defaults {
if !credentialsReportable(c.Username, c.Password) {
continue
}
bundle := sdk.SecretBundle{Version: "v1", Kind: sdk.UsernamePassword{Username: c.Username, Password: c.Password}}
updated, pairErr := plugin.Driver.PairDevice(ctx, deviceInfo, bundle)
if pairErr != nil {
Expand All @@ -111,6 +131,7 @@ func (p *pluginPairer) Pair(ctx context.Context, target *pairingpb.FleetNodePair
return res
}
setPaired(res, updated)
res.UsedCredentials = &pb.UsedCredentials{Username: c.Username, Password: c.Password}
return res
}
}
Expand Down Expand Up @@ -205,14 +226,27 @@ func (r *RunCmd) handlePairCommand(ctx context.Context, client gatewayClient, st
targets := req.GetTargets()
logger.Info("pair command received", "command_id", commandID, "targets", len(targets))

// One pair command at a time, held until every worker has exited: a truncated
// batch abandons ctx-ignoring workers that may still be mutating miners, and a
// second command must not race them. BUSY maps to a retryable operator error.
if !r.pairMu.TryLock() {
r.sendAck(stream, commandID, pb.AckCode_ACK_CODE_BUSY, "a pair command is still running on this node; retry shortly", logger)
return
}

cmdCtx, cancel := context.WithTimeout(ctx, commandTimeout)
defer cancel()

results, truncated := fanOutPairs(cmdCtx, targets, req.GetCredentials(), pairConcurrency, r.pairer.Pair, logger)
results, truncated, workersDone := fanOutPairs(cmdCtx, targets, req.GetCredentials(), pairConcurrency, r.pairer.Pair, logger)
go func() {
<-workersDone
r.pairMu.Unlock()
}()

// Stream on the parent ctx, not cmdCtx: a deadline-hit cmdCtx must not
// suppress upload of the results already collected.
if err := r.streamPairResults(ctx, client, commandID, results, logger); err != nil {
rejected, err := r.streamPairResults(ctx, client, commandID, results, logger)
if err != nil {
r.sendAck(stream, commandID, pb.AckCode_ACK_CODE_REPORT_FAILED, err.Error(), logger)
return
}
Expand All @@ -224,45 +258,65 @@ func (r *RunCmd) handlePairCommand(ctx context.Context, client gatewayClient, st
r.sendAck(stream, commandID, pb.AckCode_ACK_CODE_PARTIAL, fmt.Sprintf("pair supervisor budget exceeded; %d of %d result(s) uploaded", len(results), len(targets)), logger)
Comment thread
ankitgoswami marked this conversation as resolved.
return
}
// RejectedCount > 0 means the cloud didn't store a miner the node paired, so ack
// PARTIAL (not OK) and let the operator re-list and re-issue the remainder.
if rejected > 0 {
r.sendAck(stream, commandID, pb.AckCode_ACK_CODE_PARTIAL, fmt.Sprintf("cloud did not persist %d of %d reported result(s); re-list and retry", rejected, len(results)), logger)
return
}
r.sendAck(stream, commandID, pb.AckCode_ACK_CODE_OK, "", logger)
}

func (r *RunCmd) streamPairResults(ctx context.Context, client gatewayClient, commandID string, results []*pb.FleetNodePairResult, logger *slog.Logger) error {
// streamPairResults uploads results in chunks and returns how many the gateway
// failed to persist, so the caller can ack PARTIAL instead of claiming full success.
func (r *RunCmd) streamPairResults(ctx context.Context, client gatewayClient, commandID string, results []*pb.FleetNodePairResult, logger *slog.Logger) (int64, error) {
var rejected int64
for chunk := range slices.Chunk(results, maxDevicesPerReport) {
callCtx, cancel := context.WithTimeout(ctx, discoveryReportTimeout)
_, err := client.ReportPairedDevices(callCtx, connect.NewRequest(&pb.ReportPairedDevicesRequest{
resp, err := client.ReportPairedDevices(callCtx, connect.NewRequest(&pb.ReportPairedDevicesRequest{
CommandId: commandID,
Results: chunk,
}))
cancel()
if err != nil {
logger.Error("pair report failed", "command_id", commandID, "err", err)
return fmt.Errorf("report paired devices: %w", err)
return rejected, fmt.Errorf("report paired devices: %w", err)
}
logger.Info("pair report accepted", "command_id", commandID, "batch_size", len(chunk))
rejected += resp.Msg.GetRejectedCount()
logger.Info("pair report accepted", "command_id", commandID, "batch_size", len(chunk), "rejected", resp.Msg.GetRejectedCount())
}
return nil
return rejected, nil
}

// fanOutPairs pairs targets with bounded concurrency, returning collected
// results and whether the batch was truncated (a hung plugin or a cancelled
// parent ctx left some targets unattempted; the operator re-lists and retries).
func fanOutPairs(ctx context.Context, targets []*pairingpb.FleetNodePairTarget, creds *pairingpb.Credentials, concurrency int, pair func(context.Context, *pairingpb.FleetNodePairTarget, *pairingpb.Credentials) *pb.FleetNodePairResult, logger *slog.Logger) ([]*pb.FleetNodePairResult, bool) {
if len(targets) == 0 {
return nil, false
}
// results, whether the batch was truncated (a hung plugin or a cancelled parent
// ctx left some targets unattempted; the operator re-lists and retries), and a
// channel closed once every started worker has exited. A truncated batch abandons
// ctx-ignoring workers that may still be mutating miners; the caller must not
// admit another pair command until that channel closes.
func fanOutPairs(ctx context.Context, targets []*pairingpb.FleetNodePairTarget, creds *pairingpb.Credentials, concurrency int, pair func(context.Context, *pairingpb.FleetNodePairTarget, *pairingpb.Credentials) *pb.FleetNodePairResult, logger *slog.Logger) ([]*pb.FleetNodePairResult, bool, <-chan struct{}) {
var (
mu sync.Mutex
results []*pb.FleetNodePairResult
wg sync.WaitGroup
)
// Called only after the spawn loop stops, so a transient wg zero-crossing
// mid-spawn can't close the channel while workers are still being added.
workersDone := func() <-chan struct{} {
done := make(chan struct{})
go func() { wg.Wait(); close(done) }()
return done
}
if len(targets) == 0 {
return nil, false, workersDone()
}
sem := make(chan struct{}, concurrency)
for _, t := range targets {
select {
case sem <- struct{}{}:
case <-ctx.Done():
out, _ := waitSupervisor(&wg, &mu, &results, perPairTimeout*2, "pair", logger)
return out, true
return out, true, workersDone()
}
wg.Add(1)
go func(target *pairingpb.FleetNodePairTarget) {
Expand All @@ -276,7 +330,8 @@ func fanOutPairs(ctx context.Context, targets []*pairingpb.FleetNodePairTarget,
mu.Unlock()
}(t)
}
return waitSupervisor(&wg, &mu, &results, perPairTimeout*2, "pair", logger)
out, truncated := waitSupervisor(&wg, &mu, &results, perPairTimeout*2, "pair", logger)
return out, truncated, workersDone()
}

// truncateUTF8 trims s to at most maxLen bytes on a rune boundary so it stays valid
Expand Down
Loading
Loading