Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,15 @@

package google.registry.reporting.billing;

import static com.google.common.base.Preconditions.checkState;
import static google.registry.beam.BeamUtils.createJobName;
import static google.registry.model.common.Cursor.CursorType.RECURRING_BILLING;
import static google.registry.persistence.transaction.TransactionManagerFactory.tm;
import static google.registry.request.Action.Method.POST;
import static google.registry.util.DateTimeUtils.START_INSTANT;
import static jakarta.servlet.http.HttpServletResponse.SC_INTERNAL_SERVER_ERROR;
import static jakarta.servlet.http.HttpServletResponse.SC_OK;
import static java.time.ZoneOffset.UTC;

import com.google.api.services.dataflow.Dataflow;
import com.google.api.services.dataflow.model.LaunchFlexTemplateParameter;
Expand All @@ -29,6 +34,7 @@
import com.google.common.net.MediaType;
import google.registry.batch.CloudTasksUtils;
import google.registry.config.RegistryConfig.Config;
import google.registry.model.common.Cursor;
import google.registry.persistence.PersistenceModule;
import google.registry.reporting.ReportingModule;
import google.registry.request.Action;
Expand All @@ -40,6 +46,7 @@
import jakarta.inject.Inject;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.time.YearMonth;

/**
Expand Down Expand Up @@ -107,6 +114,20 @@ public void run() {
response.setContentType(MediaType.PLAIN_TEXT_UTF_8);
logger.atInfo().log("Launching invoicing pipeline for %s.", yearMonth);
try {
Instant startOfNextMonth = yearMonth.plusMonths(1).atDay(1).atStartOfDay(UTC).toInstant();
Instant cursorTime =
tm().transact(
() ->
tm().loadByKeyIfPresent(Cursor.createGlobalVKey(RECURRING_BILLING))
.orElse(Cursor.createGlobal(RECURRING_BILLING, START_INSTANT))
.getCursorTime());
checkState(
!cursorTime.isBefore(startOfNextMonth),
"BillingRecurrence expansion cursor (%s) is before the start of the next month (%s). "
+ "Run ExpandBillingRecurrencesAction first.",
cursorTime,
startOfNextMonth);

LaunchFlexTemplateParameter parameter =
new LaunchFlexTemplateParameter()
.setJobName(createJobName("invoicing", clock))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@
package google.registry.reporting.billing;

import static com.google.common.truth.Truth.assertThat;
import static google.registry.model.common.Cursor.CursorType.RECURRING_BILLING;
import static google.registry.testing.DatabaseHelper.persistResource;
import static jakarta.servlet.http.HttpServletResponse.SC_INTERNAL_SERVER_ERROR;
import static jakarta.servlet.http.HttpServletResponse.SC_OK;
import static org.mockito.ArgumentMatchers.startsWith;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand All @@ -25,6 +28,7 @@
import com.google.common.net.MediaType;
import google.registry.batch.CloudTasksUtils;
import google.registry.beam.BeamActionTestBase;
import google.registry.model.common.Cursor;
import google.registry.persistence.transaction.JpaTestExtensions;
import google.registry.persistence.transaction.JpaTestExtensions.JpaIntegrationTestExtension;
import google.registry.reporting.ReportingModule;
Expand All @@ -33,6 +37,7 @@
import google.registry.testing.FakeClock;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.time.YearMonth;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
Expand All @@ -50,8 +55,13 @@ class GenerateInvoicesActionTest extends BeamActionTestBase {
private CloudTasksUtils cloudTasksUtils = cloudTasksHelper.getTestCloudTasksUtils();
private GenerateInvoicesAction action;

private void setCursor(Instant cursorTime) {
persistResource(Cursor.createGlobal(RECURRING_BILLING, cursorTime));
}

@Test
void testLaunchTemplateJob_withPublish() throws Exception {
setCursor(Instant.parse("2017-11-01T00:00:00Z"));
action =
new GenerateInvoicesAction(
"test-project",
Expand Down Expand Up @@ -84,6 +94,7 @@ void testLaunchTemplateJob_withPublish() throws Exception {

@Test
void testLaunchTemplateJob_withoutPublish() throws Exception {
setCursor(Instant.parse("2017-11-01T00:00:00Z"));
action =
new GenerateInvoicesAction(
"test-project",
Expand All @@ -107,6 +118,7 @@ void testLaunchTemplateJob_withoutPublish() throws Exception {

@Test
void testCaughtIOException() throws IOException {
setCursor(Instant.parse("2017-11-01T00:00:00Z"));
when(launch.execute()).thenThrow(new IOException("Pipeline error"));
action =
new GenerateInvoicesAction(
Expand All @@ -128,4 +140,58 @@ void testCaughtIOException() throws IOException {
verify(emailUtils).sendAlertEmail("Pipeline Launch failed due to Pipeline error");
cloudTasksHelper.assertNoTasksEnqueued("beam-reporting");
}

@Test
void testFailure_cursorLagging() {
setCursor(Instant.parse("2017-10-31T23:59:59.999Z"));
action =
new GenerateInvoicesAction(
"test-project",
"test-region",
"staging_bucket",
"billing_bucket",
"REG-INV",
false,
YearMonth.of(2017, 10),
emailUtils,
cloudTasksUtils,
clock,
response,
dataflow);
action.run();
assertThat(response.getStatus()).isEqualTo(SC_INTERNAL_SERVER_ERROR);
assertThat(response.getPayload()).contains("Pipeline launch failed");
assertThat(response.getPayload()).contains("BillingRecurrence expansion cursor");
verify(emailUtils)
.sendAlertEmail(
startsWith("Pipeline Launch failed due to BillingRecurrence expansion cursor"));
cloudTasksHelper.assertNoTasksEnqueued("beam-reporting");
}

@Test
void testFailure_cursorMissing() {
// Do not set cursor, should default to START_INSTANT (1970)
action =
new GenerateInvoicesAction(
"test-project",
"test-region",
"staging_bucket",
"billing_bucket",
"REG-INV",
false,
YearMonth.of(2017, 10),
emailUtils,
cloudTasksUtils,
clock,
response,
dataflow);
action.run();
assertThat(response.getStatus()).isEqualTo(SC_INTERNAL_SERVER_ERROR);
assertThat(response.getPayload()).contains("Pipeline launch failed");
assertThat(response.getPayload()).contains("BillingRecurrence expansion cursor");
verify(emailUtils)
.sendAlertEmail(
startsWith("Pipeline Launch failed due to BillingRecurrence expansion cursor"));
cloudTasksHelper.assertNoTasksEnqueued("beam-reporting");
}
}
Loading