From 681f05a89dcd5c583fc0a57fd247cc8b104d8701 Mon Sep 17 00:00:00 2001 From: Emidio Cruciani Date: Fri, 16 Jan 2026 15:02:15 +0000 Subject: [PATCH 1/3] MDI240 | remove TTL references --- docs/database.md | 3 --- internal/outbox/outbox.go | 10 +++------- internal/pipeline/callback.go | 4 ++-- internal/pipeline/intake.go | 12 ++++++------ internal/pipeline/intake_test.go | 9 --------- internal/pipeline/interface.go | 6 +++--- internal/pipeline/sender.go | 14 +++++++------- internal/testutils/mocks/outbox.go | 6 +++--- 8 files changed, 24 insertions(+), 40 deletions(-) diff --git a/docs/database.md b/docs/database.md index 306dae7..5782cd7 100644 --- a/docs/database.md +++ b/docs/database.md @@ -9,7 +9,6 @@ type Email struct { PayloadFilePath string UpdatedAt string Reason string - TTL *int64 // Attualmente non usato (nil) Version int // Versione per optimistic locking } ``` @@ -40,8 +39,6 @@ CREATE TABLE IF NOT EXISTS emails ( ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; ``` -**Nota**: MySQL non supporta TTL nativo. La pulizia dei record obsoleti deve essere gestita esternamente. - ### Tabella `email_statuses` Tabella per lo storico dei cambi di stato (history). diff --git a/internal/outbox/outbox.go b/internal/outbox/outbox.go index 1baefc7..dad68c1 100644 --- a/internal/outbox/outbox.go +++ b/internal/outbox/outbox.go @@ -48,7 +48,6 @@ type Email struct { PayloadFilePath string UpdatedAt string Reason string - TTL *int64 Version int } @@ -185,8 +184,7 @@ func (o *Outbox) Query(ctx context.Context, status string, limit int) ([]Email, // Update changes the status of an email using optimistic locking based on version. // It determines the expected "from" status based on the target "to" status. // The operation is executed within a transaction with retry logic for transient errors. -// Note: ttl parameter is ignored for MySQL. -func (o *Outbox) Update(ctx context.Context, id string, status string, errorReason string, _ *int64) error { +func (o *Outbox) Update(ctx context.Context, id string, status string, errorReason string) error { fromStatus := getExpectedFromStatus(status) updateQuery := ` @@ -239,8 +237,7 @@ func (o *Outbox) Update(ctx context.Context, id string, status string, errorReas // Requeue updates the email from PROCESSING back to READY and removes the PROCESSING history row. // The operation is executed within a transaction with retry logic for transient errors. -// Note: ttl parameter is ignored for MySQL. -func (o *Outbox) Requeue(ctx context.Context, id string, _ *int64) error { +func (o *Outbox) Requeue(ctx context.Context, id string) error { updateQuery := ` UPDATE emails SET status = ?, reason = ?, version = version + 1 @@ -294,8 +291,7 @@ func (o *Outbox) Requeue(ctx context.Context, id string, _ *int64) error { // Ready updates the email to READY status with the eml file path. // Expected from status is INTAKING. // The operation is executed within a transaction with retry logic for transient errors. -// Note: ttl parameter is ignored for MySQL. -func (o *Outbox) Ready(ctx context.Context, id string, emlFilePath string, _ *int64) error { +func (o *Outbox) Ready(ctx context.Context, id string, emlFilePath string) error { updateQuery := ` UPDATE emails SET status = ?, eml_file_path = ?, version = version + 1 diff --git a/internal/pipeline/callback.go b/internal/pipeline/callback.go index 46e79bd..f9226c8 100644 --- a/internal/pipeline/callback.go +++ b/internal/pipeline/callback.go @@ -48,7 +48,7 @@ func (p *CallbackPipeline) Process(ctx context.Context) { p.logger.Info(fmt.Sprintf("processing email %v", email.Id)) subLogger := p.logger.With("email", email.Id) - if err = p.outbox.Update(ctx, email.Id, p.processingStatus, email.Reason, email.TTL); err != nil { + if err = p.outbox.Update(ctx, email.Id, p.processingStatus, email.Reason); err != nil { subLogger.Warn(fmt.Sprintf("failed to acquire processing lock, error: %v", err)) return } @@ -144,7 +144,7 @@ func (p *CallbackPipeline) Process(ctx context.Context) { subLogger.Info("callback successfully processed") } - if err = p.outbox.Update(context.Background(), email.Id, p.acknowledgedStatus, email.Reason, email.TTL); err != nil { + if err = p.outbox.Update(context.Background(), email.Id, p.acknowledgedStatus, email.Reason); err != nil { subLogger.Error(fmt.Sprintf("error while updating status after callback, error: %v", err)) } }(e) diff --git a/internal/pipeline/intake.go b/internal/pipeline/intake.go index 2759c14..2060200 100644 --- a/internal/pipeline/intake.go +++ b/internal/pipeline/intake.go @@ -38,20 +38,20 @@ func (p *IntakePipeline) Process(ctx context.Context) { p.logger.Info(fmt.Sprintf("processing outbox %v", email.Id)) subLogger := p.logger.With("outbox", email.Id) - if err = p.outbox.Update(ctx, email.Id, outbox.StatusIntaking, "", email.TTL); err != nil { + if err = p.outbox.Update(ctx, email.Id, outbox.StatusIntaking, ""); err != nil { subLogger.Warn(fmt.Sprintf("failed to acquire processing lock, error: %v", err)) return } if err := p.validatePayload(email); err != nil { subLogger.Error(fmt.Sprintf("failed to validate payload, error: %v", err)) - p.handle(context.Background(), subLogger, email.Id, outbox.StatusInvalid, err.Error(), email.TTL) + p.handle(context.Background(), subLogger, email.Id, outbox.StatusInvalid, err.Error()) return } - if err := p.outbox.Ready(context.Background(), email.Id, "", email.TTL); err != nil { + if err := p.outbox.Ready(context.Background(), email.Id, ""); err != nil { subLogger.Error(fmt.Sprintf("failed to update status to READY: %v", err)) - p.handle(context.Background(), subLogger, email.Id, outbox.StatusInvalid, err.Error(), email.TTL) + p.handle(context.Background(), subLogger, email.Id, outbox.StatusInvalid, err.Error()) } else { subLogger.Info("successfully intaken") } @@ -69,8 +69,8 @@ func (p *IntakePipeline) validatePayload(e outbox.Email) error { return nil } -func (p *IntakePipeline) handle(ctx context.Context, logger *slog.Logger, emailId string, status string, errorReason string, ttl *int64) { - if err := p.outbox.Update(ctx, emailId, status, errorReason, ttl); err != nil { +func (p *IntakePipeline) handle(ctx context.Context, logger *slog.Logger, emailId string, status string, errorReason string) { + if err := p.outbox.Update(ctx, emailId, status, errorReason); err != nil { msg := fmt.Sprintf("error updating status to %v, error: %v", status, err) logger.Error(msg) } diff --git a/internal/pipeline/intake_test.go b/internal/pipeline/intake_test.go index ab3df02..abd6321 100644 --- a/internal/pipeline/intake_test.go +++ b/internal/pipeline/intake_test.go @@ -40,10 +40,6 @@ func createTestPayloadFile(t *testing.T, payload email.Payload) string { return tmpFile.Name() } -func int64Ptr(value int64) *int64 { - return &value -} - func TestSuccessfulIntake(t *testing.T) { payload := email.Payload{ Id: "550e8400-e29b-41d4-a716-446655440000", @@ -62,7 +58,6 @@ func TestSuccessfulIntake(t *testing.T) { Id: "1", Status: outbox.StatusAccepted, PayloadFilePath: payloadFile, - TTL: int64Ptr(1234567890), }), ) @@ -106,7 +101,6 @@ func TestIntakeUpdateError(t *testing.T) { Id: "1", Status: outbox.StatusAccepted, PayloadFilePath: payloadFile, - TTL: int64Ptr(1234567890), }), mocks.UpdateMethodError(errors.New("some update error")), ) @@ -128,7 +122,6 @@ func TestIntakeInvalidPayloadFile(t *testing.T) { Id: "1", Status: outbox.StatusAccepted, PayloadFilePath: "/nonexistent/file.json", - TTL: int64Ptr(1234567890), }), ) @@ -157,7 +150,6 @@ func TestIntakeInvalidJSON(t *testing.T) { Id: "1", Status: outbox.StatusAccepted, PayloadFilePath: tmpFile.Name(), - TTL: int64Ptr(1234567890), }), ) @@ -186,7 +178,6 @@ func TestIntakeValidationError(t *testing.T) { Id: "1", Status: outbox.StatusAccepted, PayloadFilePath: payloadFile, - TTL: int64Ptr(1234567890), }), ) diff --git a/internal/pipeline/interface.go b/internal/pipeline/interface.go index 3baa95f..97836e7 100644 --- a/internal/pipeline/interface.go +++ b/internal/pipeline/interface.go @@ -7,7 +7,7 @@ import ( type outboxService interface { Query(ctx context.Context, status string, limit int) ([]outbox.Email, error) - Update(ctx context.Context, id string, status string, errorReason string, ttl *int64) error - Ready(ctx context.Context, id string, emlFilePath string, ttl *int64) error - Requeue(ctx context.Context, id string, ttl *int64) error + Update(ctx context.Context, id string, status string, errorReason string) error + Ready(ctx context.Context, id string, emlFilePath string) error + Requeue(ctx context.Context, id string) error } diff --git a/internal/pipeline/sender.go b/internal/pipeline/sender.go index 7f0740b..46c40f8 100644 --- a/internal/pipeline/sender.go +++ b/internal/pipeline/sender.go @@ -48,7 +48,7 @@ func (p *MainSenderPipeline) Process(ctx context.Context) { p.logger.Info(fmt.Sprintf("processing outbox %v", outboxEmail.Id)) logger := p.logger.With("outbox", outboxEmail.Id) - if err = p.outbox.Update(ctx, outboxEmail.Id, outbox.StatusProcessing, "", outboxEmail.TTL); err != nil { + if err = p.outbox.Update(ctx, outboxEmail.Id, outbox.StatusProcessing, ""); err != nil { logger.Warn(fmt.Sprintf("failed to acquire processing lock, error: %v", err)) return } @@ -56,23 +56,23 @@ func (p *MainSenderPipeline) Process(ctx context.Context) { payload, payloadErr := email.LoadPayload(outboxEmail.PayloadFilePath) if payloadErr != nil { logger.Error(fmt.Sprintf("failed to load payload, error: %v", payloadErr)) - p.handle(context.Background(), logger, outboxEmail.Id, outbox.StatusFailed, payloadErr.Error(), outboxEmail.TTL) + p.handle(context.Background(), logger, outboxEmail.Id, outbox.StatusFailed, payloadErr.Error()) return } if err = p.client.Send(payload, p.attachmentsBasePath); err != nil { if isSMTPThrottling(err) { logger.Warn(fmt.Sprintf("smtp throttling, requeueing: %v", err)) - if requeueErr := p.outbox.Requeue(context.Background(), outboxEmail.Id, outboxEmail.TTL); requeueErr != nil { + if requeueErr := p.outbox.Requeue(context.Background(), outboxEmail.Id); requeueErr != nil { logger.Error(fmt.Sprintf("error requeueing email, error: %v", requeueErr)) } } else { logger.Error(fmt.Sprintf("failed to send, error: %v", err)) - p.handle(context.Background(), logger, outboxEmail.Id, outbox.StatusFailed, err.Error(), outboxEmail.TTL) + p.handle(context.Background(), logger, outboxEmail.Id, outbox.StatusFailed, err.Error()) } } else { logger.Info("successfully sent") - p.handle(context.Background(), logger, outboxEmail.Id, outbox.StatusSent, "", outboxEmail.TTL) + p.handle(context.Background(), logger, outboxEmail.Id, outbox.StatusSent, "") } }(e) } @@ -80,8 +80,8 @@ func (p *MainSenderPipeline) Process(ctx context.Context) { wg.Wait() } -func (p *MainSenderPipeline) handle(ctx context.Context, logger *slog.Logger, emailId string, status string, errorReason string, ttl *int64) { - if err := p.outbox.Update(ctx, emailId, status, errorReason, ttl); err != nil { +func (p *MainSenderPipeline) handle(ctx context.Context, logger *slog.Logger, emailId string, status string, errorReason string) { + if err := p.outbox.Update(ctx, emailId, status, errorReason); err != nil { msg := fmt.Sprintf("error updating status to %v, error: %v", status, err) logger.Error(msg) } diff --git a/internal/testutils/mocks/outbox.go b/internal/testutils/mocks/outbox.go index 076f86d..42c6ff5 100644 --- a/internal/testutils/mocks/outbox.go +++ b/internal/testutils/mocks/outbox.go @@ -78,7 +78,7 @@ func (m *OutboxMock) Query(ctx context.Context, status string, limit int) ([]out return []outbox.Email{m.email}, m.queryMethodError } -func (m *OutboxMock) Update(ctx context.Context, id string, status string, errorReason string, ttl *int64) error { +func (m *OutboxMock) Update(ctx context.Context, id string, status string, errorReason string) error { m.lastMethod = "update" m.updateMethodCall++ if m.updateMethodCall == m.updateMethodFailsCall { @@ -87,7 +87,7 @@ func (m *OutboxMock) Update(ctx context.Context, id string, status string, error return nil } -func (m *OutboxMock) Ready(ctx context.Context, id string, emlFilePath string, ttl *int64) error { +func (m *OutboxMock) Ready(ctx context.Context, id string, emlFilePath string) error { m.lastMethod = "ready" m.updateMethodCall++ if m.updateMethodCall == m.updateMethodFailsCall { @@ -96,7 +96,7 @@ func (m *OutboxMock) Ready(ctx context.Context, id string, emlFilePath string, t return nil } -func (m *OutboxMock) Requeue(ctx context.Context, id string, ttl *int64) error { +func (m *OutboxMock) Requeue(ctx context.Context, id string) error { m.lastMethod = "requeue" m.requeueMethodCall++ if m.requeueMethodCall == m.requeueMethodFailsCall { From 0aba9743c058f0c16c202033c308d04bf554a7a1 Mon Sep 17 00:00:00 2001 From: Emidio Cruciani Date: Fri, 16 Jan 2026 15:07:46 +0000 Subject: [PATCH 2/3] MDI240 | remove eml_file_path column --- .../mysql/migrations/001_create_emails_table.sql | 1 - docs/database.md | 4 +--- internal/outbox/outbox.go | 15 ++++++--------- internal/outbox/outbox_component_test.go | 9 ++++----- internal/outbox/outbox_test.go | 15 +++++++-------- internal/pipeline/intake.go | 2 +- internal/pipeline/interface.go | 2 +- internal/testutils/mocks/outbox.go | 2 +- 8 files changed, 21 insertions(+), 29 deletions(-) diff --git a/docker/mysql/migrations/001_create_emails_table.sql b/docker/mysql/migrations/001_create_emails_table.sql index 2186760..4956293 100644 --- a/docker/mysql/migrations/001_create_emails_table.sql +++ b/docker/mysql/migrations/001_create_emails_table.sql @@ -6,7 +6,6 @@ CREATE TABLE IF NOT EXISTS emails ( 'CALLING-SENT-CALLBACK','CALLING-FAILED-CALLBACK', 'SENT-ACKNOWLEDGED','FAILED-ACKNOWLEDGED' ) NOT NULL, - eml_file_path VARCHAR(500), payload_file_path VARCHAR(500), reason TEXT, version INT NOT NULL DEFAULT 1, diff --git a/docs/database.md b/docs/database.md index 5782cd7..c8675f3 100644 --- a/docs/database.md +++ b/docs/database.md @@ -5,7 +5,6 @@ type Email struct { Id string Status string - EmlFilePath string // Campo mantenuto per compatibilità, non usato PayloadFilePath string UpdatedAt string Reason string @@ -27,7 +26,6 @@ CREATE TABLE IF NOT EXISTS emails ( 'CALLING-SENT-CALLBACK','CALLING-FAILED-CALLBACK', 'SENT-ACKNOWLEDGED','FAILED-ACKNOWLEDGED' ) NOT NULL, - eml_file_path VARCHAR(500), payload_file_path VARCHAR(500), reason TEXT, version INT NOT NULL DEFAULT 1, @@ -75,7 +73,7 @@ Le query di lettura utilizzano `FOR UPDATE SKIP LOCKED` per: - Migliorare il throughput in scenari con più processori concorrenti ```sql -SELECT id, status, eml_file_path, payload_file_path, reason, version, updated_at +SELECT id, status, payload_file_path, reason, version, updated_at FROM emails WHERE status = ? ORDER BY updated_at ASC diff --git a/internal/outbox/outbox.go b/internal/outbox/outbox.go index dad68c1..f80e42d 100644 --- a/internal/outbox/outbox.go +++ b/internal/outbox/outbox.go @@ -44,7 +44,6 @@ var retryableErrNos = map[uint16]bool{ type Email struct { Id string Status string - EmlFilePath string PayloadFilePath string UpdatedAt string Reason string @@ -133,7 +132,7 @@ func (o *Outbox) Query(ctx context.Context, status string, limit int) ([]Email, // - Rows currently locked by other transactions are skipped // - Reduces contention when multiple workers poll simultaneously query := ` - SELECT id, status, eml_file_path, payload_file_path, reason, version, updated_at + SELECT id, status, payload_file_path, reason, version, updated_at FROM emails WHERE status = ? ORDER BY updated_at ASC @@ -150,13 +149,12 @@ func (o *Outbox) Query(ctx context.Context, status string, limit int) ([]Email, var emails []Email for rows.Next() { var e Email - var emlFilePath, payloadFilePath, reason sql.NullString + var payloadFilePath, reason sql.NullString var updatedAt time.Time err := rows.Scan( &e.Id, &e.Status, - &emlFilePath, &payloadFilePath, &reason, &e.Version, @@ -166,7 +164,6 @@ func (o *Outbox) Query(ctx context.Context, status string, limit int) ([]Email, return []Email{}, err } - e.EmlFilePath = emlFilePath.String e.PayloadFilePath = payloadFilePath.String e.Reason = reason.String e.UpdatedAt = updatedAt.Format(time.RFC3339) @@ -288,13 +285,13 @@ func (o *Outbox) Requeue(ctx context.Context, id string) error { return err } -// Ready updates the email to READY status with the eml file path. +// Ready updates the email to READY status. // Expected from status is INTAKING. // The operation is executed within a transaction with retry logic for transient errors. -func (o *Outbox) Ready(ctx context.Context, id string, emlFilePath string) error { +func (o *Outbox) Ready(ctx context.Context, id string) error { updateQuery := ` UPDATE emails - SET status = ?, eml_file_path = ?, version = version + 1 + SET status = ?, version = version + 1 WHERE id = ? AND status = ? ` historyQuery := ` @@ -305,7 +302,7 @@ func (o *Outbox) Ready(ctx context.Context, id string, emlFilePath string) error var err error for attempt := range maxAttempts { err = o.executeInTransaction(ctx, func(tx *sql.Tx) error { - result, execErr := tx.ExecContext(ctx, updateQuery, StatusReady, emlFilePath, id, StatusIntaking) + result, execErr := tx.ExecContext(ctx, updateQuery, StatusReady, id, StatusIntaking) if execErr != nil { return execErr } diff --git a/internal/outbox/outbox_component_test.go b/internal/outbox/outbox_component_test.go index baf90f5..c767129 100644 --- a/internal/outbox/outbox_component_test.go +++ b/internal/outbox/outbox_component_test.go @@ -114,8 +114,8 @@ func TestMySQLOutboxReadyWorkflow(t *testing.T) { require.NoError(t, err) fixtures = append(fixtures, id) - // update to READY without eml file path - err = sut.Ready(context.TODO(), id, "", nil) + // update to READY + err = sut.Ready(context.TODO(), id) require.NoError(t, err) // verify status changed to READY @@ -124,10 +124,9 @@ func TestMySQLOutboxReadyWorkflow(t *testing.T) { require.Len(t, res, 1) assert.Equal(t, id, res[0].Id) assert.Equal(t, StatusReady, res[0].Status) - assert.Empty(t, res[0].EmlFilePath) // trying to call Ready again should fail (status is now READY, not INTAKING) - err = sut.Ready(context.TODO(), id, "", nil) + err = sut.Ready(context.TODO(), id) assert.Error(t, err) assert.ErrorIs(t, err, ErrLockNotAcquired) } @@ -194,7 +193,7 @@ func TestMySQLOutboxStateTransitions(t *testing.T) { assert.Equal(t, StatusIntaking, status) // INTAKING -> READY (using Ready method) - err = sut.Ready(context.TODO(), id, "", nil) + err = sut.Ready(context.TODO(), id) require.NoError(t, err) status, err = facade.GetEmailStatus(context.TODO(), id) diff --git a/internal/outbox/outbox_test.go b/internal/outbox/outbox_test.go index 375a7db..e33fdd3 100644 --- a/internal/outbox/outbox_test.go +++ b/internal/outbox/outbox_test.go @@ -22,11 +22,11 @@ func TestQuery_WhenDatabaseHasRecords_ShouldReturnEmails(t *testing.T) { now := time.Now() - rows := sqlmock.NewRows([]string{"id", "status", "eml_file_path", "payload_file_path", "reason", "version", "updated_at"}). - AddRow("test-id-1", "READY", "", "/path/to/payload", "", 1, now). - AddRow("test-id-2", "READY", "", "/path/to/payload2", "some reason", 2, now) + rows := sqlmock.NewRows([]string{"id", "status", "payload_file_path", "reason", "version", "updated_at"}). + AddRow("test-id-1", "READY", "/path/to/payload", "", 1, now). + AddRow("test-id-2", "READY", "/path/to/payload2", "some reason", 2, now) - mock.ExpectQuery("SELECT id, status, eml_file_path, payload_file_path, reason, version, updated_at FROM emails"). + mock.ExpectQuery("SELECT id, status, payload_file_path, reason, version, updated_at FROM emails"). WithArgs("READY", 25). WillReturnRows(rows) @@ -38,7 +38,6 @@ func TestQuery_WhenDatabaseHasRecords_ShouldReturnEmails(t *testing.T) { require.Len(t, emails, 2) assert.Equal(t, "test-id-1", emails[0].Id) assert.Equal(t, "READY", emails[0].Status) - assert.Equal(t, "", emails[0].EmlFilePath) assert.Equal(t, "test-id-2", emails[1].Id) assert.NoError(t, mock.ExpectationsWereMet()) @@ -70,7 +69,7 @@ func TestQuery_WhenDatabaseHasNoRecords_ShouldReturnEmptySlice(t *testing.T) { require.NoError(t, err) defer db.Close() - rows := sqlmock.NewRows([]string{"id", "status", "eml_file_path", "payload_file_path", "reason", "version", "updated_at"}) + rows := sqlmock.NewRows([]string{"id", "status", "payload_file_path", "reason", "version", "updated_at"}) mock.ExpectQuery("SELECT"). WithArgs("READY", 10). @@ -172,7 +171,7 @@ func TestReady_WhenUpdateSucceeds_ShouldReturnNoError(t *testing.T) { sut := NewOutboxWithDB(db) - err = sut.Ready(context.TODO(), "test-id", "", nil) + err = sut.Ready(context.TODO(), "test-id") assert.NoError(t, err) assert.NoError(t, mock.ExpectationsWereMet()) @@ -193,7 +192,7 @@ func TestReady_WhenNoRowsAffected_ShouldReturnLockError(t *testing.T) { sut := NewOutboxWithDB(db) - err = sut.Ready(context.TODO(), "test-id", "", nil) + err = sut.Ready(context.TODO(), "test-id") assert.Error(t, err) assert.ErrorIs(t, err, ErrLockNotAcquired) diff --git a/internal/pipeline/intake.go b/internal/pipeline/intake.go index 2060200..8708f5b 100644 --- a/internal/pipeline/intake.go +++ b/internal/pipeline/intake.go @@ -49,7 +49,7 @@ func (p *IntakePipeline) Process(ctx context.Context) { return } - if err := p.outbox.Ready(context.Background(), email.Id, ""); err != nil { + if err := p.outbox.Ready(context.Background(), email.Id); err != nil { subLogger.Error(fmt.Sprintf("failed to update status to READY: %v", err)) p.handle(context.Background(), subLogger, email.Id, outbox.StatusInvalid, err.Error()) } else { diff --git a/internal/pipeline/interface.go b/internal/pipeline/interface.go index 97836e7..76302b2 100644 --- a/internal/pipeline/interface.go +++ b/internal/pipeline/interface.go @@ -8,6 +8,6 @@ import ( type outboxService interface { Query(ctx context.Context, status string, limit int) ([]outbox.Email, error) Update(ctx context.Context, id string, status string, errorReason string) error - Ready(ctx context.Context, id string, emlFilePath string) error + Ready(ctx context.Context, id string) error Requeue(ctx context.Context, id string) error } diff --git a/internal/testutils/mocks/outbox.go b/internal/testutils/mocks/outbox.go index 42c6ff5..0fbbd51 100644 --- a/internal/testutils/mocks/outbox.go +++ b/internal/testutils/mocks/outbox.go @@ -87,7 +87,7 @@ func (m *OutboxMock) Update(ctx context.Context, id string, status string, error return nil } -func (m *OutboxMock) Ready(ctx context.Context, id string, emlFilePath string) error { +func (m *OutboxMock) Ready(ctx context.Context, id string) error { m.lastMethod = "ready" m.updateMethodCall++ if m.updateMethodCall == m.updateMethodFailsCall { From fa6d9ac0c62387a93a59abbd09b27ce3d22726cf Mon Sep 17 00:00:00 2001 From: Emidio Cruciani Date: Tue, 20 Jan 2026 14:08:10 +0000 Subject: [PATCH 3/3] added restore pipelines --- cmd/main/config/config.yaml | 3 + docs/pipeline.md | 27 ++++++- internal/app/app.go | 37 ++++++--- internal/app/app_test.go | 22 +++++- internal/config/config.go | 16 +++- ...alid-with-envvar-in-outbox-table-name.yaml | 3 + internal/config/testdata/valid.yaml | 3 + internal/outbox/outbox.go | 79 ++++++++++++++++--- internal/outbox/outbox_component_test.go | 14 ++-- internal/outbox/outbox_test.go | 59 ++++++++++++-- internal/pipeline/interface.go | 5 +- internal/pipeline/restore.go | 73 +++++++++++++++++ internal/pipeline/sender.go | 6 +- internal/pipeline/sender_test.go | 6 +- internal/testutils/mocks/outbox.go | 45 +++++++---- 15 files changed, 338 insertions(+), 60 deletions(-) create mode 100644 internal/pipeline/restore.go diff --git a/cmd/main/config/config.yaml b/cmd/main/config/config.yaml index 01a1373..552abb1 100644 --- a/cmd/main/config/config.yaml +++ b/cmd/main/config/config.yaml @@ -19,6 +19,9 @@ mysql: pipeline: interval: ${PIPELINE_INTERVAL} + restore: + interval: 10 + timeout_minutes: 30 smtp: host: "${SMTP_HOST}" diff --git a/docs/pipeline.md b/docs/pipeline.md index f186bf0..6ce0d60 100644 --- a/docs/pipeline.md +++ b/docs/pipeline.md @@ -1,7 +1,7 @@ # Pipeline Parallele del Mailculator Processor ## Panoramica -Il sistema esegue quattro pipeline parallele che elaborano gli email attraverso diversi stati del ciclo di vita, utilizzando MySQL come storage e un client SMTP per l'invio diretto. +Il sistema esegue otto pipeline parallele che elaborano gli email attraverso diversi stati del ciclo di vita, utilizzando MySQL come storage e un client SMTP per l'invio diretto. ## Stati degli Email - **ACCEPTED**: Email accettato, in attesa di intake @@ -97,5 +97,28 @@ Questa pipeline elabora gli email dallo stato FAILED. - In caso di successo HTTP 200: aggiorna stato a "FAILED-ACKNOWLEDGED" 3. **Ciclo**: Si ripete ogni intervallo configurato +## Pipeline 5-8: RestorePipeline (Ripristino Email Bloccate) +Quattro pipeline di restore riportano gli email in uno stato precedente quando restano bloccati troppo a lungo nello stato di lavorazione: + +1. **INTAKING → ACCEPTED**: se l’email è in INTAKING da più di `timeout_minutes` +2. **PROCESSING → READY**: se l’email è in PROCESSING da più di `timeout_minutes` +3. **CALLING-SENT-CALLBACK → SENT**: se la callback sent è in corso da più di `timeout_minutes` +4. **CALLING-FAILED-CALLBACK → FAILED**: se la callback failed è in corso da più di `timeout_minutes` + +Per ogni pipeline: +- **Query**: Recupera tutte le email con stato specifico e `updated_at` più vecchio della soglia +- **Elaborazione parallela**: Aggiorna lo stato allo step precedente +- **Ciclo**: Si ripete ogni intervallo configurato + ## Esecuzione Parallela -Le quattro pipeline vengono eseguite contemporaneamente in goroutine separate, ciascuna con il proprio ciclo di polling che si attiva ogni N secondi (configurabile). Un health check server rimane attivo per monitorare lo stato del sistema. +Le pipeline vengono eseguite contemporaneamente in goroutine separate, ciascuna con il proprio ciclo di polling che si attiva ogni N secondi (configurabile). Un health check server rimane attivo per monitorare lo stato del sistema. + +## Configurazione Restore +Nel file di configurazione: +```yaml +pipeline: + interval: 3 + restore: + interval: 10 + timeout_minutes: 30 +``` diff --git a/internal/app/app.go b/internal/app/app.go index 5b5d086..be9f3db 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -20,9 +20,13 @@ type pipelineProcessor interface { Process(ctx context.Context) } +type pipelineEntry struct { + proc pipelineProcessor + interval int +} + type App struct { - pipes []pipelineProcessor - interval int + pipes []pipelineEntry healthCheckServer *healthcheck.Server mysqlDB *sql.DB // Keep reference for cleanup } @@ -30,6 +34,8 @@ type App struct { type configProvider interface { GetHealthCheckServerPort() int GetPipelineInterval() int + GetRestorePipelineInterval() int + GetRestorePipelineMaxAge() time.Duration GetCallbackConfig() pipeline.CallbackConfig GetSmtpConfig() smtp.Config GetAttachmentsBasePath() string @@ -47,7 +53,7 @@ func NewWithMySQLOpener(cp configProvider, opener mysqlOpener) (*App, error) { callbackConfig := cp.GetCallbackConfig() healthCheckServer := healthcheck.NewServer(cp.GetHealthCheckServerPort()) - var pipes []pipelineProcessor + var pipes []pipelineEntry var mysqlDB *sql.DB dsn := cp.GetMySQLDSN() @@ -74,19 +80,26 @@ func NewWithMySQLOpener(cp configProvider, opener mysqlOpener) (*App, error) { mysqlOutbox := outbox.NewOutbox(mysqlDB) + mainInterval := cp.GetPipelineInterval() + restoreInterval := cp.GetRestorePipelineInterval() + restoreMaxAge := cp.GetRestorePipelineMaxAge() + pipes = append(pipes, - pipeline.NewIntakePipeline(mysqlOutbox), - pipeline.NewMainSenderPipeline(mysqlOutbox, client, cp.GetAttachmentsBasePath()), - pipeline.NewSentCallbackPipeline(mysqlOutbox, callbackConfig), - pipeline.NewFailedCallbackPipeline(mysqlOutbox, callbackConfig), + pipelineEntry{proc: pipeline.NewIntakePipeline(mysqlOutbox), interval: mainInterval}, + pipelineEntry{proc: pipeline.NewMainSenderPipeline(mysqlOutbox, client, cp.GetAttachmentsBasePath()), interval: mainInterval}, + pipelineEntry{proc: pipeline.NewSentCallbackPipeline(mysqlOutbox, callbackConfig), interval: mainInterval}, + pipelineEntry{proc: pipeline.NewFailedCallbackPipeline(mysqlOutbox, callbackConfig), interval: mainInterval}, + pipelineEntry{proc: pipeline.NewRestoreIntakingPipeline(mysqlOutbox, restoreMaxAge), interval: restoreInterval}, + pipelineEntry{proc: pipeline.NewRestoreProcessingPipeline(mysqlOutbox, restoreMaxAge), interval: restoreInterval}, + pipelineEntry{proc: pipeline.NewRestoreCallingSentPipeline(mysqlOutbox, restoreMaxAge), interval: restoreInterval}, + pipelineEntry{proc: pipeline.NewRestoreCallingFailedPipeline(mysqlOutbox, restoreMaxAge), interval: restoreInterval}, ) - slog.Info("MySQL pipelines initialized", "count", 4) + slog.Info("MySQL pipelines initialized", "count", len(pipes)) slog.Info("App initialized", "total_pipelines", len(pipes)) return &App{ pipes: pipes, - interval: cp.GetPipelineInterval(), healthCheckServer: healthCheckServer, mysqlDB: mysqlDB, }, nil @@ -107,11 +120,13 @@ func (a *App) runPipelineUntilContextIsDone(ctx context.Context, proc pipelinePr func (a *App) Run(ctx context.Context) { var wg sync.WaitGroup - for _, proc := range a.pipes { + for _, entry := range a.pipes { + proc := entry.proc + interval := entry.interval wg.Add(1) go func() { defer wg.Done() - a.runPipelineUntilContextIsDone(ctx, proc, a.interval) + a.runPipelineUntilContextIsDone(ctx, proc, interval) }() } diff --git a/internal/app/app_test.go b/internal/app/app_test.go index a984ebc..2d9cb49 100644 --- a/internal/app/app_test.go +++ b/internal/app/app_test.go @@ -27,6 +27,14 @@ func (cp *configProviderMock) GetPipelineInterval() int { return 1 } +func (cp *configProviderMock) GetRestorePipelineInterval() int { + return 5 +} + +func (cp *configProviderMock) GetRestorePipelineMaxAge() time.Duration { + return 30 * time.Minute +} + func (cp *configProviderMock) GetCallbackConfig() pipeline.CallbackConfig { return pipeline.CallbackConfig{Url: "dummy-domain.com", RetryInterval: 2, @@ -71,11 +79,15 @@ func TestAppInstance(t *testing.T) { app, errNew := NewWithMySQLOpener(newConfigProviderMock(), opener) require.NoError(t, errNew) - require.Equal(t, 4, len(app.pipes)) + require.Equal(t, 8, len(app.pipes)) assert.NotZero(t, app.pipes[0]) assert.NotZero(t, app.pipes[1]) assert.NotZero(t, app.pipes[2]) assert.NotZero(t, app.pipes[3]) + assert.NotZero(t, app.pipes[4]) + assert.NotZero(t, app.pipes[5]) + assert.NotZero(t, app.pipes[6]) + assert.NotZero(t, app.pipes[7]) assert.NoError(t, mock.ExpectationsWereMet()) } @@ -99,7 +111,13 @@ func TestRunFunction(t *testing.T) { proc1 := newProcessorMock(200) proc2 := newProcessorMock(200) healthCheckServer := healthcheck.NewServer(8080) - app := &App{pipes: []pipelineProcessor{proc1, proc2}, healthCheckServer: healthCheckServer} + app := &App{ + pipes: []pipelineEntry{ + {proc: proc1, interval: 1}, + {proc: proc2, interval: 1}, + }, + healthCheckServer: healthCheckServer, + } ctx, cancel := context.WithTimeout(context.Background(), 150*time.Millisecond) defer cancel() diff --git a/internal/config/config.go b/internal/config/config.go index de541fb..b77e848 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -30,7 +30,13 @@ type HealthCheckConfig struct { } type PipelineConfig struct { - Interval int `yaml:"interval" validate:"required"` + Interval int `yaml:"interval" validate:"required"` + Restore RestorePipelineConfig `yaml:"restore,flow" validate:"required"` +} + +type RestorePipelineConfig struct { + Interval int `yaml:"interval" validate:"required"` + TimeoutMinutes int `yaml:"timeout_minutes" validate:"required"` } type SmtpConfig struct { @@ -112,6 +118,14 @@ func (c *Config) GetPipelineInterval() int { return c.Pipeline.Interval } +func (c *Config) GetRestorePipelineInterval() int { + return c.Pipeline.Restore.Interval +} + +func (c *Config) GetRestorePipelineMaxAge() time.Duration { + return time.Duration(c.Pipeline.Restore.TimeoutMinutes) * time.Minute +} + func (c *Config) GetSmtpConfig() smtp.Config { return smtp.Config{ Host: c.Smtp.Host, diff --git a/internal/config/testdata/valid-with-envvar-in-outbox-table-name.yaml b/internal/config/testdata/valid-with-envvar-in-outbox-table-name.yaml index aac3107..2b2408d 100644 --- a/internal/config/testdata/valid-with-envvar-in-outbox-table-name.yaml +++ b/internal/config/testdata/valid-with-envvar-in-outbox-table-name.yaml @@ -19,6 +19,9 @@ mysql: pipeline: interval: 3 + restore: + interval: 10 + timeout_minutes: 30 smtp: host: dummy-host diff --git a/internal/config/testdata/valid.yaml b/internal/config/testdata/valid.yaml index 1280a52..f30a6c8 100644 --- a/internal/config/testdata/valid.yaml +++ b/internal/config/testdata/valid.yaml @@ -19,6 +19,9 @@ mysql: pipeline: interval: 3 + restore: + interval: 10 + timeout_minutes: 30 smtp: host: dummy-host diff --git a/internal/outbox/outbox.go b/internal/outbox/outbox.go index f80e42d..7774c53 100644 --- a/internal/outbox/outbox.go +++ b/internal/outbox/outbox.go @@ -178,6 +178,67 @@ func (o *Outbox) Query(ctx context.Context, status string, limit int) ([]Email, return emails, nil } +// QueryStale returns emails by status that are older than the provided duration. +func (o *Outbox) QueryStale(ctx context.Context, status string, olderThan time.Duration, limit int) ([]Email, error) { + query := ` + SELECT id, status, payload_file_path, reason, version, updated_at + FROM emails + WHERE status = ? AND updated_at < ? + ORDER BY updated_at ASC + ` + if limit > 0 { + query += ` + LIMIT ? + ` + } + query += ` + FOR UPDATE SKIP LOCKED + ` + cutoff := time.Now().Add(-olderThan) + + args := []any{status, cutoff} + if limit > 0 { + args = append(args, limit) + } + + rows, err := o.db.QueryContext(ctx, query, args...) + if err != nil { + return []Email{}, err + } + defer rows.Close() + + var emails []Email + for rows.Next() { + var e Email + var payloadFilePath, reason sql.NullString + var updatedAt time.Time + + err := rows.Scan( + &e.Id, + &e.Status, + &payloadFilePath, + &reason, + &e.Version, + &updatedAt, + ) + if err != nil { + return []Email{}, err + } + + e.PayloadFilePath = payloadFilePath.String + e.Reason = reason.String + e.UpdatedAt = updatedAt.Format(time.RFC3339) + + emails = append(emails, e) + } + + if err = rows.Err(); err != nil { + return []Email{}, err + } + + return emails, nil +} + // Update changes the status of an email using optimistic locking based on version. // It determines the expected "from" status based on the target "to" status. // The operation is executed within a transaction with retry logic for transient errors. @@ -232,25 +293,23 @@ func (o *Outbox) Update(ctx context.Context, id string, status string, errorReas return err } -// Requeue updates the email from PROCESSING back to READY and removes the PROCESSING history row. +// UpdateFrom changes status using an explicit fromStatus (used for restore). // The operation is executed within a transaction with retry logic for transient errors. -func (o *Outbox) Requeue(ctx context.Context, id string) error { +func (o *Outbox) UpdateFrom(ctx context.Context, id string, fromStatus string, toStatus string, errorReason string) error { updateQuery := ` UPDATE emails SET status = ?, reason = ?, version = version + 1 WHERE id = ? AND status = ? ` - deleteProcessingQuery := ` - DELETE FROM email_statuses - WHERE email_id = ? AND status = ? - ORDER BY id DESC - LIMIT 1 + historyQuery := ` + INSERT INTO email_statuses (email_id, status, reason) + VALUES (?, ?, ?) ` var err error for attempt := range maxAttempts { err = o.executeInTransaction(ctx, func(tx *sql.Tx) error { - result, execErr := tx.ExecContext(ctx, updateQuery, StatusReady, "", id, StatusProcessing) + result, execErr := tx.ExecContext(ctx, updateQuery, toStatus, errorReason, id, fromStatus) if execErr != nil { return execErr } @@ -264,8 +323,8 @@ func (o *Outbox) Requeue(ctx context.Context, id string) error { return ErrLockNotAcquired } - _, delErr := tx.ExecContext(ctx, deleteProcessingQuery, id, StatusProcessing) - return delErr + _, histErr := tx.ExecContext(ctx, historyQuery, id, toStatus, errorReason) + return histErr }) if err == nil || !o.shouldRetryMySQL(err) { diff --git a/internal/outbox/outbox_component_test.go b/internal/outbox/outbox_component_test.go index c767129..11ce22b 100644 --- a/internal/outbox/outbox_component_test.go +++ b/internal/outbox/outbox_component_test.go @@ -74,7 +74,7 @@ func TestMySQLOutboxComponentWorkflow(t *testing.T) { require.Len(t, res, 0) // update fixture to status PROCESSING - err = sut.Update(context.TODO(), id, StatusProcessing, "", nil) + err = sut.Update(context.TODO(), id, StatusProcessing, "") require.NoError(t, err) // filtering by status READY should return 1 record at this point @@ -90,7 +90,7 @@ func TestMySQLOutboxComponentWorkflow(t *testing.T) { assert.Equal(t, StatusProcessing, res[0].Status) // item already is in status PROCESSING, trying to update from READY should fail - err = sut.Update(context.TODO(), id, StatusProcessing, "", nil) + err = sut.Update(context.TODO(), id, StatusProcessing, "") assert.Error(t, err) assert.ErrorIs(t, err, ErrLockNotAcquired) } @@ -185,7 +185,7 @@ func TestMySQLOutboxStateTransitions(t *testing.T) { require.NoError(t, err) // ACCEPTED -> INTAKING - err = sut.Update(context.TODO(), id, StatusIntaking, "", nil) + err = sut.Update(context.TODO(), id, StatusIntaking, "") require.NoError(t, err) status, err := facade.GetEmailStatus(context.TODO(), id) @@ -201,7 +201,7 @@ func TestMySQLOutboxStateTransitions(t *testing.T) { assert.Equal(t, StatusReady, status) // READY -> PROCESSING - err = sut.Update(context.TODO(), id, StatusProcessing, "", nil) + err = sut.Update(context.TODO(), id, StatusProcessing, "") require.NoError(t, err) status, err = facade.GetEmailStatus(context.TODO(), id) @@ -209,7 +209,7 @@ func TestMySQLOutboxStateTransitions(t *testing.T) { assert.Equal(t, StatusProcessing, status) // PROCESSING -> SENT - err = sut.Update(context.TODO(), id, StatusSent, "", nil) + err = sut.Update(context.TODO(), id, StatusSent, "") require.NoError(t, err) status, err = facade.GetEmailStatus(context.TODO(), id) @@ -217,7 +217,7 @@ func TestMySQLOutboxStateTransitions(t *testing.T) { assert.Equal(t, StatusSent, status) // SENT -> CALLING-SENT-CALLBACK - err = sut.Update(context.TODO(), id, StatusCallingSentCallback, "", nil) + err = sut.Update(context.TODO(), id, StatusCallingSentCallback, "") require.NoError(t, err) status, err = facade.GetEmailStatus(context.TODO(), id) @@ -225,7 +225,7 @@ func TestMySQLOutboxStateTransitions(t *testing.T) { assert.Equal(t, StatusCallingSentCallback, status) // CALLING-SENT-CALLBACK -> SENT-ACKNOWLEDGED - err = sut.Update(context.TODO(), id, StatusSentAcknowledged, "", nil) + err = sut.Update(context.TODO(), id, StatusSentAcknowledged, "") require.NoError(t, err) status, err = facade.GetEmailStatus(context.TODO(), id) diff --git a/internal/outbox/outbox_test.go b/internal/outbox/outbox_test.go index e33fdd3..9bc4c9c 100644 --- a/internal/outbox/outbox_test.go +++ b/internal/outbox/outbox_test.go @@ -84,6 +84,31 @@ func TestQuery_WhenDatabaseHasNoRecords_ShouldReturnEmptySlice(t *testing.T) { assert.NoError(t, mock.ExpectationsWereMet()) } +func TestQueryStale_WhenDatabaseHasRecords_ShouldReturnEmails(t *testing.T) { + t.Parallel() + + db, mock, err := sqlmock.New() + require.NoError(t, err) + defer db.Close() + + now := time.Now() + rows := sqlmock.NewRows([]string{"id", "status", "payload_file_path", "reason", "version", "updated_at"}). + AddRow("test-id-1", "READY", "/path/to/payload", "", 1, now) + + mock.ExpectQuery("SELECT id, status, payload_file_path, reason, version, updated_at FROM emails"). + WithArgs("READY", sqlmock.AnyArg(), 25). + WillReturnRows(rows) + + sut := NewOutboxWithDB(db) + + emails, err := sut.QueryStale(context.TODO(), StatusReady, 30*time.Minute, 25) + + assert.NoError(t, err) + require.Len(t, emails, 1) + assert.Equal(t, "test-id-1", emails[0].Id) + assert.NoError(t, mock.ExpectationsWereMet()) +} + func TestUpdate_WhenUpdateSucceeds_ShouldReturnNoError(t *testing.T) { t.Parallel() @@ -102,7 +127,31 @@ func TestUpdate_WhenUpdateSucceeds_ShouldReturnNoError(t *testing.T) { sut := NewOutboxWithDB(db) - err = sut.Update(context.TODO(), "test-id", StatusProcessing, "", nil) + err = sut.Update(context.TODO(), "test-id", StatusProcessing, "") + + assert.NoError(t, err) + assert.NoError(t, mock.ExpectationsWereMet()) +} + +func TestUpdateFrom_WhenUpdateSucceeds_ShouldReturnNoError(t *testing.T) { + t.Parallel() + + db, mock, err := sqlmock.New() + require.NoError(t, err) + defer db.Close() + + mock.ExpectBegin() + mock.ExpectExec("UPDATE emails"). + WithArgs("READY", "", "test-id", "PROCESSING"). + WillReturnResult(sqlmock.NewResult(0, 1)) + mock.ExpectExec("INSERT INTO email_statuses"). + WithArgs("test-id", "READY", ""). + WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectCommit() + + sut := NewOutboxWithDB(db) + + err = sut.UpdateFrom(context.TODO(), "test-id", StatusProcessing, StatusReady, "") assert.NoError(t, err) assert.NoError(t, mock.ExpectationsWereMet()) @@ -123,7 +172,7 @@ func TestUpdate_WhenNoRowsAffected_ShouldReturnLockError(t *testing.T) { sut := NewOutboxWithDB(db) - err = sut.Update(context.TODO(), "test-id", StatusProcessing, "", nil) + err = sut.Update(context.TODO(), "test-id", StatusProcessing, "") assert.Error(t, err) assert.ErrorIs(t, err, ErrLockNotAcquired) @@ -146,7 +195,7 @@ func TestUpdate_WhenDatabaseReturnsError_ShouldReturnError(t *testing.T) { sut := NewOutboxWithDB(db) - err = sut.Update(context.TODO(), "test-id", StatusProcessing, "", nil) + err = sut.Update(context.TODO(), "test-id", StatusProcessing, "") assert.Error(t, err) assert.Equal(t, expectedError, err) @@ -162,7 +211,7 @@ func TestReady_WhenUpdateSucceeds_ShouldReturnNoError(t *testing.T) { mock.ExpectBegin() mock.ExpectExec("UPDATE emails"). - WithArgs("READY", "", "test-id", "INTAKING"). + WithArgs("READY", "test-id", "INTAKING"). WillReturnResult(sqlmock.NewResult(0, 1)) mock.ExpectExec("INSERT INTO email_statuses"). WithArgs("test-id", "READY", ""). @@ -186,7 +235,7 @@ func TestReady_WhenNoRowsAffected_ShouldReturnLockError(t *testing.T) { mock.ExpectBegin() mock.ExpectExec("UPDATE emails"). - WithArgs("READY", "", "test-id", "INTAKING"). + WithArgs("READY", "test-id", "INTAKING"). WillReturnResult(sqlmock.NewResult(0, 0)) mock.ExpectRollback() diff --git a/internal/pipeline/interface.go b/internal/pipeline/interface.go index 76302b2..d719d77 100644 --- a/internal/pipeline/interface.go +++ b/internal/pipeline/interface.go @@ -2,12 +2,15 @@ package pipeline import ( "context" + "time" + "mailculator-processor/internal/outbox" ) type outboxService interface { Query(ctx context.Context, status string, limit int) ([]outbox.Email, error) + QueryStale(ctx context.Context, status string, olderThan time.Duration, limit int) ([]outbox.Email, error) Update(ctx context.Context, id string, status string, errorReason string) error + UpdateFrom(ctx context.Context, id string, fromStatus string, toStatus string, errorReason string) error Ready(ctx context.Context, id string) error - Requeue(ctx context.Context, id string) error } diff --git a/internal/pipeline/restore.go b/internal/pipeline/restore.go new file mode 100644 index 0000000..908ab31 --- /dev/null +++ b/internal/pipeline/restore.go @@ -0,0 +1,73 @@ +package pipeline + +import ( + "context" + "fmt" + "log/slog" + "sync" + "time" + + "mailculator-processor/internal/outbox" +) + +type RestorePipeline struct { + outbox outboxService + logger *slog.Logger + startStatus string + restoreStatus string + maxAge time.Duration +} + +func newRestorePipeline(outbox outboxService, name string, startStatus string, restoreStatus string, maxAge time.Duration) *RestorePipeline { + return &RestorePipeline{ + outbox: outbox, + logger: slog.With("pipe", name), + startStatus: startStatus, + restoreStatus: restoreStatus, + maxAge: maxAge, + } +} + +func (p *RestorePipeline) Process(ctx context.Context) { + staleList, err := p.outbox.QueryStale(ctx, p.startStatus, p.maxAge, 0) + if err != nil { + p.logger.Error(fmt.Sprintf("error while querying stale emails to restore: %v", err)) + return + } + + var wg sync.WaitGroup + + for _, e := range staleList { + wg.Add(1) + go func(email outbox.Email) { + defer wg.Done() + p.logger.Info(fmt.Sprintf("restoring email %v", email.Id)) + subLogger := p.logger.With("email", email.Id) + + if err = p.outbox.UpdateFrom(ctx, email.Id, p.startStatus, p.restoreStatus, ""); err != nil { + subLogger.Warn(fmt.Sprintf("failed to restore email status, error: %v", err)) + return + } + + subLogger.Info("successfully restored email status") + }(e) + } + + wg.Wait() +} + +func NewRestoreIntakingPipeline(ob outboxService, maxAge time.Duration) *RestorePipeline { + return newRestorePipeline(ob, "restore-intaking", outbox.StatusIntaking, outbox.StatusAccepted, maxAge) +} + +func NewRestoreProcessingPipeline(ob outboxService, maxAge time.Duration) *RestorePipeline { + return newRestorePipeline(ob, "restore-processing", outbox.StatusProcessing, outbox.StatusReady, maxAge) +} + +func NewRestoreCallingSentPipeline(ob outboxService, maxAge time.Duration) *RestorePipeline { + return newRestorePipeline(ob, "restore-calling-sent", outbox.StatusCallingSentCallback, outbox.StatusSent, maxAge) +} + +func NewRestoreCallingFailedPipeline(ob outboxService, maxAge time.Duration) *RestorePipeline { + return newRestorePipeline(ob, "restore-calling-failed", outbox.StatusCallingFailedCallback, outbox.StatusFailed, maxAge) +} diff --git a/internal/pipeline/sender.go b/internal/pipeline/sender.go index 46c40f8..13a2e9c 100644 --- a/internal/pipeline/sender.go +++ b/internal/pipeline/sender.go @@ -62,9 +62,9 @@ func (p *MainSenderPipeline) Process(ctx context.Context) { if err = p.client.Send(payload, p.attachmentsBasePath); err != nil { if isSMTPThrottling(err) { - logger.Warn(fmt.Sprintf("smtp throttling, requeueing: %v", err)) - if requeueErr := p.outbox.Requeue(context.Background(), outboxEmail.Id); requeueErr != nil { - logger.Error(fmt.Sprintf("error requeueing email, error: %v", requeueErr)) + logger.Warn(fmt.Sprintf("smtp throttling, restoring to READY: %v", err)) + if restoreErr := p.outbox.UpdateFrom(context.Background(), outboxEmail.Id, outbox.StatusProcessing, outbox.StatusReady, ""); restoreErr != nil { + logger.Error(fmt.Sprintf("error restoring email to READY, error: %v", restoreErr)) } } else { logger.Error(fmt.Sprintf("failed to send, error: %v", err)) diff --git a/internal/pipeline/sender_test.go b/internal/pipeline/sender_test.go index cd0ebdf..9a3ca92 100644 --- a/internal/pipeline/sender_test.go +++ b/internal/pipeline/sender_test.go @@ -124,7 +124,7 @@ func TestSendEmailError(t *testing.T) { ) } -func TestSendEmailThrottlingRequeue(t *testing.T) { +func TestSendEmailThrottlingRestore(t *testing.T) { payloadFile := createPayloadFile(t) buf, logger := mocks.NewLoggerMock() outboxServiceMock := mocks.NewOutboxMock( @@ -136,9 +136,9 @@ func TestSendEmailThrottlingRequeue(t *testing.T) { sender.Process(context.TODO()) assert.Equal(t, 0, senderServiceMock.sendMethodCounter) - assert.Equal(t, "requeue", outboxServiceMock.LastMethod()) + assert.Equal(t, "updateFrom", outboxServiceMock.LastMethod()) assert.Equal(t, - "level=INFO msg=\"processing outbox 1\"\nlevel=WARN msg=\"smtp throttling, requeueing: 454 Throttling failure\" outbox=1", + "level=INFO msg=\"processing outbox 1\"\nlevel=WARN msg=\"smtp throttling, restoring to READY: 454 Throttling failure\" outbox=1", strings.TrimSpace(buf.String()), ) } diff --git a/internal/testutils/mocks/outbox.go b/internal/testutils/mocks/outbox.go index 0fbbd51..5374e5a 100644 --- a/internal/testutils/mocks/outbox.go +++ b/internal/testutils/mocks/outbox.go @@ -2,6 +2,8 @@ package mocks import ( "context" + "time" + "mailculator-processor/internal/outbox" ) @@ -10,9 +12,10 @@ type OutboxMock struct { updateMethodError error updateMethodCall int updateMethodFailsCall int - requeueMethodError error - requeueMethodCall int - requeueMethodFailsCall int + queryStaleMethodError error + updateFromMethodError error + updateFromMethodCall int + updateFromFailsCall int email outbox.Email lastMethod string } @@ -37,15 +40,21 @@ func UpdateMethodFailsCall(updateMethodFailsCall int) OutboxMockOptions { } } -func RequeueMethodError(requeueMethodError error) OutboxMockOptions { +func QueryStaleMethodError(queryStaleMethodError error) OutboxMockOptions { return func(o *OutboxMock) { - o.requeueMethodError = requeueMethodError + o.queryStaleMethodError = queryStaleMethodError } } -func RequeueMethodFailsCall(requeueMethodFailsCall int) OutboxMockOptions { +func UpdateFromMethodError(updateFromMethodError error) OutboxMockOptions { return func(o *OutboxMock) { - o.requeueMethodFailsCall = requeueMethodFailsCall + o.updateFromMethodError = updateFromMethodError + } +} + +func UpdateFromMethodFailsCall(updateFromFailsCall int) OutboxMockOptions { + return func(o *OutboxMock) { + o.updateFromFailsCall = updateFromFailsCall } } @@ -61,9 +70,10 @@ func NewOutboxMock(opts ...OutboxMockOptions) *OutboxMock { updateMethodError: nil, updateMethodCall: 0, updateMethodFailsCall: 1, - requeueMethodError: nil, - requeueMethodCall: 0, - requeueMethodFailsCall: 1, + queryStaleMethodError: nil, + updateFromMethodError: nil, + updateFromMethodCall: 0, + updateFromFailsCall: 1, email: outbox.Email{}, lastMethod: "", } @@ -78,6 +88,11 @@ func (m *OutboxMock) Query(ctx context.Context, status string, limit int) ([]out return []outbox.Email{m.email}, m.queryMethodError } +func (m *OutboxMock) QueryStale(ctx context.Context, status string, olderThan time.Duration, limit int) ([]outbox.Email, error) { + m.lastMethod = "queryStale" + return []outbox.Email{m.email}, m.queryStaleMethodError +} + func (m *OutboxMock) Update(ctx context.Context, id string, status string, errorReason string) error { m.lastMethod = "update" m.updateMethodCall++ @@ -96,11 +111,11 @@ func (m *OutboxMock) Ready(ctx context.Context, id string) error { return nil } -func (m *OutboxMock) Requeue(ctx context.Context, id string) error { - m.lastMethod = "requeue" - m.requeueMethodCall++ - if m.requeueMethodCall == m.requeueMethodFailsCall { - return m.requeueMethodError +func (m *OutboxMock) UpdateFrom(ctx context.Context, id string, fromStatus string, toStatus string, errorReason string) error { + m.lastMethod = "updateFrom" + m.updateFromMethodCall++ + if m.updateFromMethodCall == m.updateFromFailsCall { + return m.updateFromMethodError } return nil }