From bbce5adf9414807c8bba7acd335c8e47af98c9bd Mon Sep 17 00:00:00 2001 From: Gus Brodman Date: Mon, 29 Jun 2026 14:54:13 -0400 Subject: [PATCH] Verify billing recurrence expansion is caught up before invoicing --- .../billing/GenerateInvoicesAction.java | 21 ++++++ .../billing/GenerateInvoicesActionTest.java | 66 +++++++++++++++++++ 2 files changed, 87 insertions(+) diff --git a/core/src/main/java/google/registry/reporting/billing/GenerateInvoicesAction.java b/core/src/main/java/google/registry/reporting/billing/GenerateInvoicesAction.java index bb5f44de5e2..e067c1fa34c 100644 --- a/core/src/main/java/google/registry/reporting/billing/GenerateInvoicesAction.java +++ b/core/src/main/java/google/registry/reporting/billing/GenerateInvoicesAction.java @@ -14,10 +14,15 @@ package google.registry.reporting.billing; +import static com.google.common.base.Preconditions.checkState; import static google.registry.beam.BeamUtils.createJobName; +import static google.registry.model.common.Cursor.CursorType.RECURRING_BILLING; +import static google.registry.persistence.transaction.TransactionManagerFactory.tm; import static google.registry.request.Action.Method.POST; +import static google.registry.util.DateTimeUtils.START_INSTANT; import static jakarta.servlet.http.HttpServletResponse.SC_INTERNAL_SERVER_ERROR; import static jakarta.servlet.http.HttpServletResponse.SC_OK; +import static java.time.ZoneOffset.UTC; import com.google.api.services.dataflow.Dataflow; import com.google.api.services.dataflow.model.LaunchFlexTemplateParameter; @@ -29,6 +34,7 @@ import com.google.common.net.MediaType; import google.registry.batch.CloudTasksUtils; import google.registry.config.RegistryConfig.Config; +import google.registry.model.common.Cursor; import google.registry.persistence.PersistenceModule; import google.registry.reporting.ReportingModule; import google.registry.request.Action; @@ -40,6 +46,7 @@ import jakarta.inject.Inject; import java.io.IOException; import java.time.Duration; +import java.time.Instant; import java.time.YearMonth; /** @@ -107,6 +114,20 @@ public void run() { response.setContentType(MediaType.PLAIN_TEXT_UTF_8); logger.atInfo().log("Launching invoicing pipeline for %s.", yearMonth); try { + Instant startOfNextMonth = yearMonth.plusMonths(1).atDay(1).atStartOfDay(UTC).toInstant(); + Instant cursorTime = + tm().transact( + () -> + tm().loadByKeyIfPresent(Cursor.createGlobalVKey(RECURRING_BILLING)) + .orElse(Cursor.createGlobal(RECURRING_BILLING, START_INSTANT)) + .getCursorTime()); + checkState( + !cursorTime.isBefore(startOfNextMonth), + "BillingRecurrence expansion cursor (%s) is before the start of the next month (%s). " + + "Run ExpandBillingRecurrencesAction first.", + cursorTime, + startOfNextMonth); + LaunchFlexTemplateParameter parameter = new LaunchFlexTemplateParameter() .setJobName(createJobName("invoicing", clock)) diff --git a/core/src/test/java/google/registry/reporting/billing/GenerateInvoicesActionTest.java b/core/src/test/java/google/registry/reporting/billing/GenerateInvoicesActionTest.java index a77cc7d616d..4b96effd143 100644 --- a/core/src/test/java/google/registry/reporting/billing/GenerateInvoicesActionTest.java +++ b/core/src/test/java/google/registry/reporting/billing/GenerateInvoicesActionTest.java @@ -15,8 +15,11 @@ package google.registry.reporting.billing; import static com.google.common.truth.Truth.assertThat; +import static google.registry.model.common.Cursor.CursorType.RECURRING_BILLING; +import static google.registry.testing.DatabaseHelper.persistResource; import static jakarta.servlet.http.HttpServletResponse.SC_INTERNAL_SERVER_ERROR; import static jakarta.servlet.http.HttpServletResponse.SC_OK; +import static org.mockito.ArgumentMatchers.startsWith; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -25,6 +28,7 @@ import com.google.common.net.MediaType; import google.registry.batch.CloudTasksUtils; import google.registry.beam.BeamActionTestBase; +import google.registry.model.common.Cursor; import google.registry.persistence.transaction.JpaTestExtensions; import google.registry.persistence.transaction.JpaTestExtensions.JpaIntegrationTestExtension; import google.registry.reporting.ReportingModule; @@ -33,6 +37,7 @@ import google.registry.testing.FakeClock; import java.io.IOException; import java.time.Duration; +import java.time.Instant; import java.time.YearMonth; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; @@ -50,8 +55,13 @@ class GenerateInvoicesActionTest extends BeamActionTestBase { private CloudTasksUtils cloudTasksUtils = cloudTasksHelper.getTestCloudTasksUtils(); private GenerateInvoicesAction action; + private void setCursor(Instant cursorTime) { + persistResource(Cursor.createGlobal(RECURRING_BILLING, cursorTime)); + } + @Test void testLaunchTemplateJob_withPublish() throws Exception { + setCursor(Instant.parse("2017-11-01T00:00:00Z")); action = new GenerateInvoicesAction( "test-project", @@ -84,6 +94,7 @@ void testLaunchTemplateJob_withPublish() throws Exception { @Test void testLaunchTemplateJob_withoutPublish() throws Exception { + setCursor(Instant.parse("2017-11-01T00:00:00Z")); action = new GenerateInvoicesAction( "test-project", @@ -107,6 +118,7 @@ void testLaunchTemplateJob_withoutPublish() throws Exception { @Test void testCaughtIOException() throws IOException { + setCursor(Instant.parse("2017-11-01T00:00:00Z")); when(launch.execute()).thenThrow(new IOException("Pipeline error")); action = new GenerateInvoicesAction( @@ -128,4 +140,58 @@ void testCaughtIOException() throws IOException { verify(emailUtils).sendAlertEmail("Pipeline Launch failed due to Pipeline error"); cloudTasksHelper.assertNoTasksEnqueued("beam-reporting"); } + + @Test + void testFailure_cursorLagging() { + setCursor(Instant.parse("2017-10-31T23:59:59.999Z")); + action = + new GenerateInvoicesAction( + "test-project", + "test-region", + "staging_bucket", + "billing_bucket", + "REG-INV", + false, + YearMonth.of(2017, 10), + emailUtils, + cloudTasksUtils, + clock, + response, + dataflow); + action.run(); + assertThat(response.getStatus()).isEqualTo(SC_INTERNAL_SERVER_ERROR); + assertThat(response.getPayload()).contains("Pipeline launch failed"); + assertThat(response.getPayload()).contains("BillingRecurrence expansion cursor"); + verify(emailUtils) + .sendAlertEmail( + startsWith("Pipeline Launch failed due to BillingRecurrence expansion cursor")); + cloudTasksHelper.assertNoTasksEnqueued("beam-reporting"); + } + + @Test + void testFailure_cursorMissing() { + // Do not set cursor, should default to START_INSTANT (1970) + action = + new GenerateInvoicesAction( + "test-project", + "test-region", + "staging_bucket", + "billing_bucket", + "REG-INV", + false, + YearMonth.of(2017, 10), + emailUtils, + cloudTasksUtils, + clock, + response, + dataflow); + action.run(); + assertThat(response.getStatus()).isEqualTo(SC_INTERNAL_SERVER_ERROR); + assertThat(response.getPayload()).contains("Pipeline launch failed"); + assertThat(response.getPayload()).contains("BillingRecurrence expansion cursor"); + verify(emailUtils) + .sendAlertEmail( + startsWith("Pipeline Launch failed due to BillingRecurrence expansion cursor")); + cloudTasksHelper.assertNoTasksEnqueued("beam-reporting"); + } }