From 600451777162d8384b92c5d11a865d28c2301ace Mon Sep 17 00:00:00 2001 From: Jakub Sztandera Date: Mon, 24 Nov 2025 13:46:52 -0300 Subject: [PATCH 1/4] feat(pdp): add idempotency to create and add handlers Introduces a `IdempotencyKey` to JSON paylods of these handlers, which deboounces requests and if the request with the same key completed, redirects to the created resource. Users should use UUIDs or ULIDs as their IdempotencyKey-s Signed-off-by: Jakub Sztandera --- .../sql/20251124-pdp-idempotency.sql | 9 + pdp/handlers.go | 1 + pdp/handlers_add.go | 35 ++- pdp/handlers_create.go | 75 ++++++- pdp/idempotency.go | 201 ++++++++++++++++++ 5 files changed, 310 insertions(+), 11 deletions(-) create mode 100644 harmony/harmonydb/sql/20251124-pdp-idempotency.sql create mode 100644 pdp/idempotency.go diff --git a/harmony/harmonydb/sql/20251124-pdp-idempotency.sql b/harmony/harmonydb/sql/20251124-pdp-idempotency.sql new file mode 100644 index 000000000..79fa3c83e --- /dev/null +++ b/harmony/harmonydb/sql/20251124-pdp-idempotency.sql @@ -0,0 +1,9 @@ +-- Idempotency tracking for PDP operations +CREATE TABLE pdp_idempotency ( + idempotency_key TEXT PRIMARY KEY, + tx_hash TEXT, -- NULL initially, set after successful transaction + created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP +); + +-- Index for cleanup operations +CREATE INDEX pdp_idempotency_created_at_idx ON pdp_idempotency(created_at); diff --git a/pdp/handlers.go b/pdp/handlers.go index 2341e0e8e..d80f279bf 100644 --- a/pdp/handlers.go +++ b/pdp/handlers.go @@ -64,6 +64,7 @@ func NewPDPService(ctx context.Context, db *harmonydb.DB, stor paths.StashStore, } go p.cleanup(ctx) + go p.startIdempotencyCleanup(ctx) return p } diff --git a/pdp/handlers_add.go b/pdp/handlers_add.go index 7d964b381..6d6313ac0 100644 --- a/pdp/handlers_add.go +++ b/pdp/handlers_add.go @@ -305,8 +305,9 @@ func (p *PDPService) handleAddPieceToDataSet(w http.ResponseWriter, r *http.Requ // AddPiecesPayload defines the structure for the entire add pieces request payload type AddPiecesPayload struct { - Pieces []AddPieceRequest `json:"pieces"` - ExtraData *string `json:"extraData,omitempty"` + IdempotencyKey string `json:"idempotencyKey,omitempty"` + Pieces []AddPieceRequest `json:"pieces"` + ExtraData *string `json:"extraData,omitempty"` } var payload AddPiecesPayload @@ -319,6 +320,24 @@ func (p *PDPService) handleAddPieceToDataSet(w http.ResponseWriter, r *http.Requ _ = r.Body.Close() }() + // Step 4: Validate idempotency key + if err := validateIdempotencyKey(payload.IdempotencyKey); err != nil { + http.Error(w, "Invalid idempotency key: "+err.Error(), http.StatusBadRequest) + return + } + + // Step 5: Check or reserve idempotency key + idempotencyResult, err := p.checkOrReserveIdempotencyKey(ctx, payload.IdempotencyKey) + if err != nil { + http.Error(w, "Failed to check idempotency: "+err.Error(), http.StatusInternalServerError) + return + } + + if idempotencyResult.Exists { + p.handleAddIdempotencyResponse(w, &idempotencyResult, dataSetIdStr) + return + } + if len(payload.Pieces) == 0 { http.Error(w, "At least one piece must be provided", http.StatusBadRequest) return @@ -386,6 +405,12 @@ func (p *PDPService) handleAddPieceToDataSet(w http.ResponseWriter, r *http.Requ reason := "pdp-addpieces" txHash, err := p.sender.Send(ctx, fromAddress, txEth, reason) if err != nil { + // Clean up reserved idempotency key on failure + if payload.IdempotencyKey != "" { + if cleanupErr := p.cleanupReservedIdempotencyKey(ctx, payload.IdempotencyKey); cleanupErr != nil { + logAdd.Errorw("Failed to cleanup idempotency key", "error", cleanupErr) + } + } http.Error(w, "Failed to send transaction: "+err.Error(), http.StatusInternalServerError) logAdd.Errorf("Failed to send transaction: %+v", err) return @@ -399,6 +424,12 @@ func (p *PDPService) handleAddPieceToDataSet(w http.ResponseWriter, r *http.Requ "pieceCount", len(payload.Pieces)) _, err = p.db.BeginTransaction(ctx, func(txdb *harmonydb.Tx) (bool, error) { + // Update idempotency key with transaction hash + if payload.IdempotencyKey != "" { + if err := p.updateIdempotencyKey(txdb, payload.IdempotencyKey, txHashLower); err != nil { + logAdd.Errorw("Failed to update idempotency key", "error", err) + } + } // Insert into message_waits_eth logAdd.Debugw("Inserting AddPieces into message_waits_eth", "txHash", txHashLower, diff --git a/pdp/handlers_create.go b/pdp/handlers_create.go index f7200d45e..d49bc7f95 100644 --- a/pdp/handlers_create.go +++ b/pdp/handlers_create.go @@ -30,18 +30,37 @@ func (p *PDPService) handleCreateDataSetAndAddPieces(w http.ResponseWriter, r *h return } - type RequestBody struct { - RecordKeeper string `json:"recordKeeper"` - Pieces []AddPieceRequest `json:"pieces"` - ExtraData *string `json:"extraData,omitempty"` + type CreateAddRequestBody struct { + Pieces []AddPieceRequest `json:"pieces"` + IdempotencyKey string `json:"idempotencyKey,omitempty"` + RecordKeeper string `json:"recordKeeper"` + ExtraData *string `json:"extraData,omitempty"` } - var reqBody RequestBody + var reqBody CreateAddRequestBody if err := json.NewDecoder(r.Body).Decode(&reqBody); err != nil { http.Error(w, "Invalid JSON in request body: "+err.Error(), http.StatusBadRequest) return } + // Step 3: Validate idempotency key + if err := validateIdempotencyKey(reqBody.IdempotencyKey); err != nil { + http.Error(w, "Invalid idempotency key: "+err.Error(), http.StatusBadRequest) + return + } + + // Step 4: Check or reserve idempotency key + idempotencyResult, err := p.checkOrReserveIdempotencyKey(ctx, reqBody.IdempotencyKey) + if err != nil { + http.Error(w, "Failed to check idempotency: "+err.Error(), http.StatusInternalServerError) + return + } + + if idempotencyResult.Exists { + p.handleCreateIdempotencyResponse(w, &idempotencyResult) + return + } + if reqBody.RecordKeeper == "" { http.Error(w, "recordKeeper address is required", http.StatusBadRequest) return @@ -112,6 +131,10 @@ func (p *PDPService) handleCreateDataSetAndAddPieces(w http.ResponseWriter, r *h reason := "pdp-create-and-add" txHash, err := p.sender.Send(ctx, fromAddress, tx, reason) if err != nil { + // Clean up reserved idempotency key on transaction failure + if reqBody.IdempotencyKey != "" { + _ = p.cleanupReservedIdempotencyKey(ctx, reqBody.IdempotencyKey) + } http.Error(w, "Failed to send transaction: "+err.Error(), http.StatusInternalServerError) logCreate.Errorf("Failed to send transaction: %+v", err) return @@ -124,6 +147,14 @@ func (p *PDPService) handleCreateDataSetAndAddPieces(w http.ResponseWriter, r *h "recordKeeper", recordKeeperAddr.Hex()) // Begin a database transaction _, err = p.db.BeginTransaction(ctx, func(tx *harmonydb.Tx) (bool, error) { + // Update idempotency key with transaction hash + if reqBody.IdempotencyKey != "" { + if err := p.updateIdempotencyKey(tx, reqBody.IdempotencyKey, txHashLower); err != nil { + logCreate.Warnw("Failed to update idempotency key", "error", err) + // Don't fail the transaction for idempotency issues + } + } + err := p.insertMessageWaitsAndDataSetCreate(tx, txHashLower, serviceLabel) if err != nil { return false, err @@ -181,9 +212,10 @@ func (p *PDPService) handleCreateDataSet(w http.ResponseWriter, r *http.Request) } // Step 2: Parse the request body to get the 'recordKeeper' address and extraData - type RequestBody struct { - RecordKeeper string `json:"recordKeeper"` - ExtraData *string `json:"extraData,omitempty"` + type CreateDataSetRequestBody struct { + IdempotencyKey string `json:"idempotencyKey,omitempty"` + RecordKeeper string `json:"recordKeeper"` + ExtraData *string `json:"extraData,omitempty"` } body, err := io.ReadAll(r.Body) @@ -195,12 +227,30 @@ func (p *PDPService) handleCreateDataSet(w http.ResponseWriter, r *http.Request) _ = r.Body.Close() }() - var reqBody RequestBody + var reqBody CreateDataSetRequestBody if err := json.Unmarshal(body, &reqBody); err != nil { http.Error(w, "Invalid JSON in request body: "+err.Error(), http.StatusBadRequest) return } + // Step 3: Validate idempotency key + if err := validateIdempotencyKey(reqBody.IdempotencyKey); err != nil { + http.Error(w, "Invalid idempotency key: "+err.Error(), http.StatusBadRequest) + return + } + + // Step 4: Check or reserve idempotency key + idempotencyResult, err := p.checkOrReserveIdempotencyKey(ctx, reqBody.IdempotencyKey) + if err != nil { + http.Error(w, "Failed to check idempotency: "+err.Error(), http.StatusInternalServerError) + return + } + + if idempotencyResult.Exists { + p.handleCreateIdempotencyResponse(w, &idempotencyResult) + return + } + if reqBody.RecordKeeper == "" { http.Error(w, "recordKeeper address is required", http.StatusBadRequest) return @@ -275,6 +325,13 @@ func (p *PDPService) handleCreateDataSet(w http.ResponseWriter, r *http.Request) // Begin a database transaction _, err = p.db.BeginTransaction(ctx, func(tx *harmonydb.Tx) (bool, error) { + // Update idempotency key with transaction hash + if reqBody.IdempotencyKey != "" { + if err := p.updateIdempotencyKey(tx, reqBody.IdempotencyKey, txHashLower); err != nil { + logCreate.Warnw("Failed to update idempotency key", "error", err) + // Don't fail the transaction for idempotency issues + } + } err := p.insertMessageWaitsAndDataSetCreate(tx, txHashLower, serviceLabel) if err != nil { return false, err diff --git a/pdp/idempotency.go b/pdp/idempotency.go new file mode 100644 index 000000000..04e51564e --- /dev/null +++ b/pdp/idempotency.go @@ -0,0 +1,201 @@ +package pdp + +import ( + "context" + "database/sql" + "errors" + "fmt" + "net/http" + "path" + "regexp" + "time" + + logger "github.com/ipfs/go-log/v2" + + "github.com/filecoin-project/curio/harmony/harmonydb" +) + +var logIdempotency = logger.Logger("pdp/idempotency") + +// Ensure harmonydb import is used +var _ = (*harmonydb.DB)(nil) + +// IdempotencyResult represents the result of an idempotency check +type IdempotencyResult struct { + Exists bool + TxHash *string + IsReserved bool // true if key exists but tx_hash is NULL +} + +// validateIdempotencyKey validates the format of client-provided idempotency keys +func validateIdempotencyKey(key string) error { + if key == "" { + return nil // Optional field + } + + if len(key) > 255 { + return errors.New("idempotency key must be 255 characters or less") + } + + // Allow UUID v4, v7, ULID, and similar formats + // Pattern matches: 550e8400-e29b-41d4-a716-446655440000 (UUID) + // Pattern matches: 018f4b8c-9c7b-7f3b-8b3c-4d3e5f6a7b8c (UUID v7) + // Pattern matches: 01H8XKZ9N8J8R8KZ9N8J8R8K (ULID) + // Pattern matches: custom formats with alphanumerics, hyphens, underscores + matched, _ := regexp.MatchString(`^[a-zA-Z0-9\-_]+$`, key) + if !matched { + return errors.New("idempotency key can only contain letters, numbers, hyphens, and underscores") + } + + return nil +} + +// checkOrReserveIdempotencyKey atomically checks for existing key or reserves it +func (p *PDPService) checkOrReserveIdempotencyKey(ctx context.Context, idempotencyKey string) (IdempotencyResult, error) { + if idempotencyKey == "" { + return IdempotencyResult{Exists: false}, nil + } + + var txHash sql.NullString + err := p.db.QueryRow(ctx, ` + INSERT INTO pdp_idempotency (idempotency_key, tx_hash) + VALUES ($1, NULL) + ON CONFLICT (idempotency_key) + DO UPDATE SET tx_hash = EXCLUDED.tx_hash + RETURNING tx_hash + `, idempotencyKey).Scan(&txHash) + + if err != nil { + return IdempotencyResult{}, fmt.Errorf("failed to check/reserve idempotency key: %w", err) + } + + result := IdempotencyResult{ + Exists: true, // Key exists (either just inserted or already existed) + TxHash: nil, + IsReserved: true, // tx_hash is NULL means we just reserved it + } + + if txHash.Valid { + txHashStr := txHash.String + result.TxHash = &txHashStr + result.IsReserved = false // Key exists with tx_hash, operation completed + } + + return result, nil +} + +// updateIdempotencyKey updates a reserved key with actual transaction hash +func (p *PDPService) updateIdempotencyKey(tx *harmonydb.Tx, idempotencyKey, txHash string) error { + if idempotencyKey == "" { + return nil + } + + _, err := tx.Exec(` + UPDATE pdp_idempotency + SET tx_hash = $1 + WHERE idempotency_key = $2 AND tx_hash IS NULL + `, txHash, idempotencyKey) + if err != nil { + return fmt.Errorf("failed to update idempotency key: %w", err) + } + + return nil +} + +// cleanupReservedIdempotencyKey removes a reserved key on operation failure +func (p *PDPService) cleanupReservedIdempotencyKey(ctx context.Context, idempotencyKey string) error { + if idempotencyKey == "" { + return nil + } + + _, err := p.db.Exec(ctx, ` + DELETE FROM pdp_idempotency + WHERE idempotency_key = $1 AND tx_hash IS NULL + `, idempotencyKey) + + if err != nil { + return fmt.Errorf("failed to cleanup idempotency key: %w", err) + } + + return nil +} + +// handleCreateIdempotencyResponse handles HTTP response for create operations +func (p *PDPService) handleCreateIdempotencyResponse(w http.ResponseWriter, result *IdempotencyResult) { + if result.IsReserved { + // Another request is processing this operation + w.WriteHeader(http.StatusAccepted) // 202 - Processing + return + } + + if result.TxHash != nil && *result.TxHash != "" { + // Operation already completed + location := path.Join("/pdp/data-sets/created", *result.TxHash) + w.Header().Set("Location", location) + w.WriteHeader(http.StatusCreated) // 201 - Operation completed + return + } + + http.Error(w, "Invalid idempotency state", http.StatusInternalServerError) +} + +// handleAddIdempotencyResponse handles HTTP response for add operations +func (p *PDPService) handleAddIdempotencyResponse(w http.ResponseWriter, result *IdempotencyResult, dataSetIdStr string) { + if result.IsReserved { + // Another request is processing this operation + w.WriteHeader(http.StatusAccepted) // 202 - Processing + return + } + + if result.TxHash != nil && *result.TxHash != "" { + // Operation already completed + location := path.Join("/pdp/data-sets", dataSetIdStr, "pieces/added", *result.TxHash) + w.Header().Set("Location", location) + w.WriteHeader(http.StatusCreated) // 201 - Operation completed + return + } + + http.Error(w, "Invalid idempotency state", http.StatusInternalServerError) +} + +// startIdempotencyCleanup starts background cleanup of old records +func (p *PDPService) startIdempotencyCleanup(ctx context.Context) { + ticker := time.NewTicker(time.Hour) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + p.cleanupOldIdempotencyRecords(ctx) + case <-ctx.Done(): + return + } + } +} + +// cleanupOldIdempotencyRecords removes old records +func (p *PDPService) cleanupOldIdempotencyRecords(ctx context.Context) { + // Clean up stuck reserved records (NULL tx_hash for > 1 hour) + count, err := p.db.Exec(ctx, ` + DELETE FROM pdp_idempotency + WHERE tx_hash IS NULL + AND created_at < NOW() - INTERVAL '1 hour' + `) + if err != nil { + logIdempotency.Errorw("Failed to cleanup old reserved idempotency records", "error", err) + } else if count > 0 { + logIdempotency.Infow("Cleaned up old reserved idempotency records", "count", count) + } + + // Clean up old completed records (older than 24 hours) + count, err = p.db.Exec(ctx, ` + DELETE FROM pdp_idempotency + WHERE tx_hash IS NOT NULL + AND created_at < NOW() - INTERVAL '24 hours' + `) + if err != nil { + logIdempotency.Errorw("Failed to cleanup old completed idempotency records", "error", err) + } else if count > 0 { + logIdempotency.Infow("Cleaned up old completed idempotency records", "count", count) + } +} From 27d944e9b5347b8902f750830c781944d4c3828a Mon Sep 17 00:00:00 2001 From: Jakub Sztandera Date: Tue, 25 Nov 2025 11:32:11 -0300 Subject: [PATCH 2/4] feat(pdp): move idempotency validation to JSON unmarshaling Replace manual validation with custom IdempotencyKey type that validates during JSON parsing using pre-compiled regex for better performance. Signed-off-by: Jakub Sztandera --- pdp/handlers_add.go | 10 +-- pdp/handlers_create.go | 24 ++----- pdp/idempotency.go | 66 +++++++++++------- pdp/idempotency_test.go | 146 ++++++++++++++++++++++++++++++++++++++++ 4 files changed, 196 insertions(+), 50 deletions(-) create mode 100644 pdp/idempotency_test.go diff --git a/pdp/handlers_add.go b/pdp/handlers_add.go index 6d6313ac0..262735c65 100644 --- a/pdp/handlers_add.go +++ b/pdp/handlers_add.go @@ -305,7 +305,7 @@ func (p *PDPService) handleAddPieceToDataSet(w http.ResponseWriter, r *http.Requ // AddPiecesPayload defines the structure for the entire add pieces request payload type AddPiecesPayload struct { - IdempotencyKey string `json:"idempotencyKey,omitempty"` + IdempotencyKey IdempotencyKey `json:"idempotencyKey,omitempty"` Pieces []AddPieceRequest `json:"pieces"` ExtraData *string `json:"extraData,omitempty"` } @@ -320,13 +320,7 @@ func (p *PDPService) handleAddPieceToDataSet(w http.ResponseWriter, r *http.Requ _ = r.Body.Close() }() - // Step 4: Validate idempotency key - if err := validateIdempotencyKey(payload.IdempotencyKey); err != nil { - http.Error(w, "Invalid idempotency key: "+err.Error(), http.StatusBadRequest) - return - } - - // Step 5: Check or reserve idempotency key + // Step 4: Check or reserve idempotency key idempotencyResult, err := p.checkOrReserveIdempotencyKey(ctx, payload.IdempotencyKey) if err != nil { http.Error(w, "Failed to check idempotency: "+err.Error(), http.StatusInternalServerError) diff --git a/pdp/handlers_create.go b/pdp/handlers_create.go index d49bc7f95..c12517ac3 100644 --- a/pdp/handlers_create.go +++ b/pdp/handlers_create.go @@ -32,7 +32,7 @@ func (p *PDPService) handleCreateDataSetAndAddPieces(w http.ResponseWriter, r *h type CreateAddRequestBody struct { Pieces []AddPieceRequest `json:"pieces"` - IdempotencyKey string `json:"idempotencyKey,omitempty"` + IdempotencyKey IdempotencyKey `json:"idempotencyKey,omitempty"` RecordKeeper string `json:"recordKeeper"` ExtraData *string `json:"extraData,omitempty"` } @@ -43,13 +43,7 @@ func (p *PDPService) handleCreateDataSetAndAddPieces(w http.ResponseWriter, r *h return } - // Step 3: Validate idempotency key - if err := validateIdempotencyKey(reqBody.IdempotencyKey); err != nil { - http.Error(w, "Invalid idempotency key: "+err.Error(), http.StatusBadRequest) - return - } - - // Step 4: Check or reserve idempotency key + // Step 3: Check or reserve idempotency key idempotencyResult, err := p.checkOrReserveIdempotencyKey(ctx, reqBody.IdempotencyKey) if err != nil { http.Error(w, "Failed to check idempotency: "+err.Error(), http.StatusInternalServerError) @@ -213,9 +207,9 @@ func (p *PDPService) handleCreateDataSet(w http.ResponseWriter, r *http.Request) // Step 2: Parse the request body to get the 'recordKeeper' address and extraData type CreateDataSetRequestBody struct { - IdempotencyKey string `json:"idempotencyKey,omitempty"` - RecordKeeper string `json:"recordKeeper"` - ExtraData *string `json:"extraData,omitempty"` + IdempotencyKey IdempotencyKey `json:"idempotencyKey,omitempty"` + RecordKeeper string `json:"recordKeeper"` + ExtraData *string `json:"extraData,omitempty"` } body, err := io.ReadAll(r.Body) @@ -233,13 +227,7 @@ func (p *PDPService) handleCreateDataSet(w http.ResponseWriter, r *http.Request) return } - // Step 3: Validate idempotency key - if err := validateIdempotencyKey(reqBody.IdempotencyKey); err != nil { - http.Error(w, "Invalid idempotency key: "+err.Error(), http.StatusBadRequest) - return - } - - // Step 4: Check or reserve idempotency key + // Step 3: Check or reserve idempotency key idempotencyResult, err := p.checkOrReserveIdempotencyKey(ctx, reqBody.IdempotencyKey) if err != nil { http.Error(w, "Failed to check idempotency: "+err.Error(), http.StatusInternalServerError) diff --git a/pdp/idempotency.go b/pdp/idempotency.go index 04e51564e..4292402ae 100644 --- a/pdp/idempotency.go +++ b/pdp/idempotency.go @@ -3,6 +3,7 @@ package pdp import ( "context" "database/sql" + "encoding/json" "errors" "fmt" "net/http" @@ -20,38 +21,55 @@ var logIdempotency = logger.Logger("pdp/idempotency") // Ensure harmonydb import is used var _ = (*harmonydb.DB)(nil) -// IdempotencyResult represents the result of an idempotency check -type IdempotencyResult struct { - Exists bool - TxHash *string - IsReserved bool // true if key exists but tx_hash is NULL -} +// Pre-compiled regex for idempotency key validation +var idempotencyKeyRegex = regexp.MustCompile(`^[a-zA-Z0-9\-_]+$`) + +// IdempotencyKey represents a validated idempotency key +type IdempotencyKey string + +// UnmarshalJSON implements custom JSON unmarshaling with validation +func (ik *IdempotencyKey) UnmarshalJSON(data []byte) error { + // Handle null case + if string(data) == "null" { + *ik = "" + return nil + } -// validateIdempotencyKey validates the format of client-provided idempotency keys -func validateIdempotencyKey(key string) error { - if key == "" { - return nil // Optional field + // Unmarshal raw string + var str string + if err := json.Unmarshal(data, &str); err != nil { + return err } - if len(key) > 255 { + // Empty string is allowed (optional field) + if str == "" { + *ik = "" + return nil + } + + // Validate length + if len(str) > 255 { return errors.New("idempotency key must be 255 characters or less") } - // Allow UUID v4, v7, ULID, and similar formats - // Pattern matches: 550e8400-e29b-41d4-a716-446655440000 (UUID) - // Pattern matches: 018f4b8c-9c7b-7f3b-8b3c-4d3e5f6a7b8c (UUID v7) - // Pattern matches: 01H8XKZ9N8J8R8KZ9N8J8R8K (ULID) - // Pattern matches: custom formats with alphanumerics, hyphens, underscores - matched, _ := regexp.MatchString(`^[a-zA-Z0-9\-_]+$`, key) - if !matched { + // Validate format using pre-compiled regex + if !idempotencyKeyRegex.MatchString(str) { return errors.New("idempotency key can only contain letters, numbers, hyphens, and underscores") } + *ik = IdempotencyKey(str) return nil } +// IdempotencyResult represents the result of an idempotency check +type IdempotencyResult struct { + Exists bool + TxHash *string + IsReserved bool // true if key exists but tx_hash is NULL +} + // checkOrReserveIdempotencyKey atomically checks for existing key or reserves it -func (p *PDPService) checkOrReserveIdempotencyKey(ctx context.Context, idempotencyKey string) (IdempotencyResult, error) { +func (p *PDPService) checkOrReserveIdempotencyKey(ctx context.Context, idempotencyKey IdempotencyKey) (IdempotencyResult, error) { if idempotencyKey == "" { return IdempotencyResult{Exists: false}, nil } @@ -63,7 +81,7 @@ func (p *PDPService) checkOrReserveIdempotencyKey(ctx context.Context, idempoten ON CONFLICT (idempotency_key) DO UPDATE SET tx_hash = EXCLUDED.tx_hash RETURNING tx_hash - `, idempotencyKey).Scan(&txHash) + `, string(idempotencyKey)).Scan(&txHash) if err != nil { return IdempotencyResult{}, fmt.Errorf("failed to check/reserve idempotency key: %w", err) @@ -85,7 +103,7 @@ func (p *PDPService) checkOrReserveIdempotencyKey(ctx context.Context, idempoten } // updateIdempotencyKey updates a reserved key with actual transaction hash -func (p *PDPService) updateIdempotencyKey(tx *harmonydb.Tx, idempotencyKey, txHash string) error { +func (p *PDPService) updateIdempotencyKey(tx *harmonydb.Tx, idempotencyKey IdempotencyKey, txHash string) error { if idempotencyKey == "" { return nil } @@ -94,7 +112,7 @@ func (p *PDPService) updateIdempotencyKey(tx *harmonydb.Tx, idempotencyKey, txHa UPDATE pdp_idempotency SET tx_hash = $1 WHERE idempotency_key = $2 AND tx_hash IS NULL - `, txHash, idempotencyKey) + `, txHash, string(idempotencyKey)) if err != nil { return fmt.Errorf("failed to update idempotency key: %w", err) } @@ -103,7 +121,7 @@ func (p *PDPService) updateIdempotencyKey(tx *harmonydb.Tx, idempotencyKey, txHa } // cleanupReservedIdempotencyKey removes a reserved key on operation failure -func (p *PDPService) cleanupReservedIdempotencyKey(ctx context.Context, idempotencyKey string) error { +func (p *PDPService) cleanupReservedIdempotencyKey(ctx context.Context, idempotencyKey IdempotencyKey) error { if idempotencyKey == "" { return nil } @@ -111,7 +129,7 @@ func (p *PDPService) cleanupReservedIdempotencyKey(ctx context.Context, idempote _, err := p.db.Exec(ctx, ` DELETE FROM pdp_idempotency WHERE idempotency_key = $1 AND tx_hash IS NULL - `, idempotencyKey) + `, string(idempotencyKey)) if err != nil { return fmt.Errorf("failed to cleanup idempotency key: %w", err) diff --git a/pdp/idempotency_test.go b/pdp/idempotency_test.go new file mode 100644 index 000000000..1d0f104ff --- /dev/null +++ b/pdp/idempotency_test.go @@ -0,0 +1,146 @@ +package pdp + +import ( + "encoding/json" + "strings" + "testing" +) + +func TestIdempotencyKeyUnmarshalJSON(t *testing.T) { + tests := []struct { + name string + json string + expectError bool + errorMsg string + expected string + }{ + { + name: "valid UUID", + json: `"550e8400-e29b-41d4-a716-446655440000"`, + expected: "550e8400-e29b-41d4-a716-446655440000", + }, + { + name: "valid ULID", + json: `"01H8XKZ9N8J8R8KZ9N8J8R8K"`, + expected: "01H8XKZ9N8J8R8KZ9N8J8R8K", + }, + { + name: "valid custom format", + json: `"custom-key_123"`, + expected: "custom-key_123", + }, + { + name: "empty string", + json: `""`, + expected: "", + }, + { + name: "null value", + json: `null`, + expected: "", + }, + { + name: "invalid characters", + json: `"invalid@key"`, + expectError: true, + errorMsg: "idempotency key can only contain letters, numbers, hyphens, and underscores", + }, + { + name: "too long key", + json: `"` + strings.Repeat("a", 256) + `"`, + expectError: true, + errorMsg: "idempotency key must be 255 characters or less", + }, + { + name: "invalid JSON", + json: `"unclosed`, + expectError: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var ik IdempotencyKey + err := json.Unmarshal([]byte(tt.json), &ik) + + if tt.expectError { + if err == nil { + t.Errorf("expected error but got none") + return + } + if tt.errorMsg != "" && err.Error() != tt.errorMsg { + t.Errorf("expected error message %q, got %q", tt.errorMsg, err.Error()) + } + } else { + if err != nil { + t.Errorf("unexpected error: %v", err) + return + } + if string(ik) != tt.expected { + t.Errorf("expected %q, got %q", tt.expected, string(ik)) + } + } + }) + } +} + +func TestIdempotencyKeyInStruct(t *testing.T) { + type TestStruct struct { + IdempotencyKey IdempotencyKey `json:"idempotencyKey,omitempty"` + OtherField string `json:"otherField"` + } + + tests := []struct { + name string + json string + expectError bool + expectedKey string + }{ + { + name: "valid key in struct", + json: `{"idempotencyKey":"test-key-123","otherField":"value"}`, + expectedKey: "test-key-123", + }, + { + name: "omitted key", + json: `{"otherField":"value"}`, + expectedKey: "", + }, + { + name: "null key", + json: `{"idempotencyKey":null,"otherField":"value"}`, + expectedKey: "", + }, + { + name: "empty key", + json: `{"idempotencyKey":"","otherField":"value"}`, + expectedKey: "", + }, + { + name: "invalid key in struct", + json: `{"idempotencyKey":"invalid@key","otherField":"value"}`, + expectError: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var ts TestStruct + err := json.Unmarshal([]byte(tt.json), &ts) + + if tt.expectError { + if err == nil { + t.Errorf("expected error but got none") + } + } else { + if err != nil { + t.Errorf("unexpected error: %v", err) + return + } + if string(ts.IdempotencyKey) != tt.expectedKey { + t.Errorf("expected key %q, got %q", tt.expectedKey, string(ts.IdempotencyKey)) + } + } + }) + } +} From e7e0d7d2345741f23fdffa4f7b0e8e7da496f2f4 Mon Sep 17 00:00:00 2001 From: Jakub Sztandera Date: Tue, 25 Nov 2025 11:39:58 -0300 Subject: [PATCH 3/4] misc: remove step numbers from comments Signed-off-by: Jakub Sztandera --- pdp/handlers_add.go | 26 +++++++++++++------------- pdp/handlers_create.go | 20 ++++++++++---------- 2 files changed, 23 insertions(+), 23 deletions(-) diff --git a/pdp/handlers_add.go b/pdp/handlers_add.go index 262735c65..a1ce53a2c 100644 --- a/pdp/handlers_add.go +++ b/pdp/handlers_add.go @@ -94,7 +94,7 @@ func (p *PDPService) transformAddPiecesRequest(ctx context.Context, serviceLabel // Start a DB transaction _, err := p.db.BeginTransaction(ctx, func(tx *harmonydb.Tx) (bool, error) { - // Step 4: Get pdp_piecerefs matching all subPiece cids + make sure those refs belong to serviceLabel + // Get pdp_piecerefs matching all subPiece cids + make sure those refs belong to serviceLabel rows, err := tx.Query(` SELECT ppr.piece_cid, ppr.id AS pdp_pieceref_id, ppr.piece_ref, pp.piece_padded_size, pp.piece_raw_size @@ -255,14 +255,14 @@ func (p *PDPService) transformAddPiecesRequest(ctx context.Context, serviceLabel func (p *PDPService) handleAddPieceToDataSet(w http.ResponseWriter, r *http.Request) { ctx := r.Context() - // Step 1: Verify that the request is authorized using ECDSA JWT + // Verify that the request is authorized using ECDSA JWT serviceLabel, err := p.AuthService(r) if err != nil { http.Error(w, "Unauthorized: "+err.Error(), http.StatusUnauthorized) return } - // Step 2: Extract dataSetId from the URL + // Extract dataSetId from the URL dataSetIdStr := chi.URLParam(r, "dataSetId") if dataSetIdStr == "" { http.Error(w, "Missing data set ID in URL", http.StatusBadRequest) @@ -301,7 +301,7 @@ func (p *PDPService) handleAddPieceToDataSet(w http.ResponseWriter, r *http.Requ // Convert dataSetId to *big.Int dataSetId := new(big.Int).SetUint64(dataSetIdUint64) - // Step 3: Parse the request body + // Parse the request body // AddPiecesPayload defines the structure for the entire add pieces request payload type AddPiecesPayload struct { @@ -320,7 +320,7 @@ func (p *PDPService) handleAddPieceToDataSet(w http.ResponseWriter, r *http.Requ _ = r.Body.Close() }() - // Step 4: Check or reserve idempotency key + // Check or reserve idempotency key idempotencyResult, err := p.checkOrReserveIdempotencyKey(ctx, payload.IdempotencyKey) if err != nil { http.Error(w, "Failed to check idempotency: "+err.Error(), http.StatusInternalServerError) @@ -343,14 +343,14 @@ func (p *PDPService) handleAddPieceToDataSet(w http.ResponseWriter, r *http.Requ return } - // Step 4: Prepare piece information + // Prepare piece information pieceDataArray, subPieceInfoMap, subPieceCidList, err := p.transformAddPiecesRequest(ctx, serviceLabel, payload.Pieces) if err != nil { logAdd.Warnf("Failed to process AddPieces request data: %+v", err) http.Error(w, "Failed to process request: "+err.Error(), http.StatusBadRequest) } - // Step 5: Prepare the Ethereum transaction data outside the DB transaction + // Prepare the Ethereum transaction data outside the DB transaction // Obtain the ABI of the PDPVerifier contract abiData, err := contract.PDPVerifierMetaData.GetAbi() if err != nil { @@ -358,7 +358,7 @@ func (p *PDPService) handleAddPieceToDataSet(w http.ResponseWriter, r *http.Requ return } - // Step 6: Prepare the Ethereum transaction + // Prepare the Ethereum transaction // Pack the method call data // The extraDataBytes variable is now correctly populated above data, err := abiData.Pack("addPieces", dataSetId, common.Address{}, pieceDataArray, extraDataBytes) @@ -367,7 +367,7 @@ func (p *PDPService) handleAddPieceToDataSet(w http.ResponseWriter, r *http.Requ return } - // Step 7: Get the sender address from 'eth_keys' table where role = 'pdp' limit 1 + // Get the sender address from 'eth_keys' table where role = 'pdp' limit 1 fromAddress, err := p.getSenderAddress(ctx) if err != nil { http.Error(w, "Failed to get sender address: "+err.Error(), http.StatusInternalServerError) @@ -384,7 +384,7 @@ func (p *PDPService) handleAddPieceToDataSet(w http.ResponseWriter, r *http.Requ data, ) - // Step 8: Check for indexing requirements + // Check for indexing requirements mustIndex, err := CheckIfIndexingNeeded(p.ethClient, dataSetIdUint64) if err != nil { logAdd.Errorw("Failed to check indexing requirements", "error", err, "dataSetId", dataSetId) @@ -395,7 +395,7 @@ func (p *PDPService) handleAddPieceToDataSet(w http.ResponseWriter, r *http.Requ logAdd.Infow("Data set has withIPFSIndexing enabled, pieces will be indexed", "dataSetId", dataSetId) } - // Step 9: Send the transaction + // Send the transaction reason := "pdp-addpieces" txHash, err := p.sender.Send(ctx, fromAddress, txEth, reason) if err != nil { @@ -410,7 +410,7 @@ func (p *PDPService) handleAddPieceToDataSet(w http.ResponseWriter, r *http.Requ return } - // Step 10: Insert database tracking records + // Insert database tracking records txHashLower := strings.ToLower(txHash.Hex()) logAdd.Infow("PDP AddPieces: Inserting transaction tracking", "txHash", txHashLower, @@ -460,7 +460,7 @@ func (p *PDPService) handleAddPieceToDataSet(w http.ResponseWriter, r *http.Requ return } - // Step 10: Respond with 201 Created + // Respond with 201 Created w.Header().Set("Location", path.Join("/pdp/data-sets", dataSetIdStr, "pieces/added", txHashLower)) w.WriteHeader(http.StatusCreated) } diff --git a/pdp/handlers_create.go b/pdp/handlers_create.go index c12517ac3..95d7020c8 100644 --- a/pdp/handlers_create.go +++ b/pdp/handlers_create.go @@ -23,7 +23,7 @@ var logCreate = logger.Logger("pdp/create") func (p *PDPService) handleCreateDataSetAndAddPieces(w http.ResponseWriter, r *http.Request) { ctx := r.Context() - // Step 1: Verify that the request is authorized using ECDSA JWT + // Verify that the request is authorized using ECDSA JWT serviceLabel, err := p.AuthService(r) if err != nil { http.Error(w, "Unauthorized: "+err.Error(), http.StatusUnauthorized) @@ -43,7 +43,7 @@ func (p *PDPService) handleCreateDataSetAndAddPieces(w http.ResponseWriter, r *h return } - // Step 3: Check or reserve idempotency key + // Check or reserve idempotency key idempotencyResult, err := p.checkOrReserveIdempotencyKey(ctx, reqBody.IdempotencyKey) if err != nil { http.Error(w, "Failed to check idempotency: "+err.Error(), http.StatusInternalServerError) @@ -198,14 +198,14 @@ func decodeExtraData(extraDataString *string) ([]byte, error) { func (p *PDPService) handleCreateDataSet(w http.ResponseWriter, r *http.Request) { ctx := r.Context() - // Step 1: Verify that the request is authorized using ECDSA JWT + // Verify that the request is authorized using ECDSA JWT serviceLabel, err := p.AuthService(r) if err != nil { http.Error(w, "Unauthorized: "+err.Error(), http.StatusUnauthorized) return } - // Step 2: Parse the request body to get the 'recordKeeper' address and extraData + // Parse the request body to get the 'recordKeeper' address and extraData type CreateDataSetRequestBody struct { IdempotencyKey IdempotencyKey `json:"idempotencyKey,omitempty"` RecordKeeper string `json:"recordKeeper"` @@ -227,7 +227,7 @@ func (p *PDPService) handleCreateDataSet(w http.ResponseWriter, r *http.Request) return } - // Step 3: Check or reserve idempotency key + // Check or reserve idempotency key idempotencyResult, err := p.checkOrReserveIdempotencyKey(ctx, reqBody.IdempotencyKey) if err != nil { http.Error(w, "Failed to check idempotency: "+err.Error(), http.StatusInternalServerError) @@ -263,14 +263,14 @@ func (p *PDPService) handleCreateDataSet(w http.ResponseWriter, r *http.Request) return } - // Step 3: Get the sender address from 'eth_keys' table where role = 'pdp' limit 1 + // Get the sender address from 'eth_keys' table where role = 'pdp' limit 1 fromAddress, err := p.getSenderAddress(ctx) if err != nil { http.Error(w, "Failed to get sender address: "+err.Error(), http.StatusInternalServerError) return } - // Step 4: Manually create the transaction without requiring a Signer + // Manually create the transaction without requiring a Signer // Obtain the ABI of the PDPVerifier contract abiData, err := contract.PDPVerifierMetaData.GetAbi() if err != nil { @@ -295,7 +295,7 @@ func (p *PDPService) handleCreateDataSet(w http.ResponseWriter, r *http.Request) data, ) - // Step 5: Send the transaction using SenderETH + // Send the transaction using SenderETH reason := "pdp-mkdataset" txHash, err := p.sender.Send(ctx, fromAddress, tx, reason) if err != nil { @@ -304,7 +304,7 @@ func (p *PDPService) handleCreateDataSet(w http.ResponseWriter, r *http.Request) return } - // Step 6: Insert into message_waits_eth and pdp_data_set_creates + // Insert into message_waits_eth and pdp_data_set_creates txHashLower := strings.ToLower(txHash.Hex()) logCreate.Infow("PDP CreateDataSet: Inserting transaction tracking", "txHash", txHashLower, @@ -333,7 +333,7 @@ func (p *PDPService) handleCreateDataSet(w http.ResponseWriter, r *http.Request) return } - // Step 7: Respond with 201 Created and Location header + // Respond with 201 Created and Location header w.Header().Set("Location", path.Join("/pdp/data-sets/created", txHashLower)) w.WriteHeader(http.StatusCreated) } From fd8ff43b5d60b6dd01b747e3a39a2b24480403b1 Mon Sep 17 00:00:00 2001 From: Jakub Sztandera Date: Tue, 25 Nov 2025 11:59:19 -0300 Subject: [PATCH 4/4] chore: use default logger Signed-off-by: Jakub Sztandera --- pdp/idempotency.go | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/pdp/idempotency.go b/pdp/idempotency.go index 4292402ae..44de612b9 100644 --- a/pdp/idempotency.go +++ b/pdp/idempotency.go @@ -11,13 +11,9 @@ import ( "regexp" "time" - logger "github.com/ipfs/go-log/v2" - "github.com/filecoin-project/curio/harmony/harmonydb" ) -var logIdempotency = logger.Logger("pdp/idempotency") - // Ensure harmonydb import is used var _ = (*harmonydb.DB)(nil) @@ -200,9 +196,9 @@ func (p *PDPService) cleanupOldIdempotencyRecords(ctx context.Context) { AND created_at < NOW() - INTERVAL '1 hour' `) if err != nil { - logIdempotency.Errorw("Failed to cleanup old reserved idempotency records", "error", err) + log.Errorw("Failed to cleanup old reserved idempotency records", "error", err) } else if count > 0 { - logIdempotency.Infow("Cleaned up old reserved idempotency records", "count", count) + log.Infow("Cleaned up old reserved idempotency records", "count", count) } // Clean up old completed records (older than 24 hours) @@ -212,8 +208,8 @@ func (p *PDPService) cleanupOldIdempotencyRecords(ctx context.Context) { AND created_at < NOW() - INTERVAL '24 hours' `) if err != nil { - logIdempotency.Errorw("Failed to cleanup old completed idempotency records", "error", err) + log.Errorw("Failed to cleanup old completed idempotency records", "error", err) } else if count > 0 { - logIdempotency.Infow("Cleaned up old completed idempotency records", "count", count) + log.Infow("Cleaned up old completed idempotency records", "count", count) } }