From 74ec71cbf54996bedabbba49f54f7892ac802e75 Mon Sep 17 00:00:00 2001 From: James Scott Date: Fri, 26 Dec 2025 22:56:41 +0000 Subject: [PATCH] feat(event_producer): implement pubsub adapters Introduces the `gcppubsubadapters` package to connect the `EventProducer` domain logic to Google Cloud Pub/Sub. This includes: - `EventProducerSubscriberAdapter`: A high-level adapter that routes incoming Pub/Sub messages (RefreshSearch, BatchRefresh, ConfigurationChanged) to the appropriate `EventProducer` methods. - `EventProducerPublisherAdapter`: An adapter for publishing `FeatureDiffEvent` notifications back to Pub/Sub. - `RunGroup`: A concurrency utility for managing the lifecycle of multiple blocking subscribers. --- .../gcppubsubadapters/event_producer.go | 163 +++++++++ .../gcppubsubadapters/event_producer_test.go | 342 ++++++++++++++++++ lib/gcppubsub/gcppubsubadapters/utils.go | 58 +++ lib/gcppubsub/gcppubsubadapters/utils_test.go | 101 ++++++ 4 files changed, 664 insertions(+) create mode 100644 lib/gcppubsub/gcppubsubadapters/event_producer.go create mode 100644 lib/gcppubsub/gcppubsubadapters/event_producer_test.go create mode 100644 lib/gcppubsub/gcppubsubadapters/utils.go create mode 100644 lib/gcppubsub/gcppubsubadapters/utils_test.go diff --git a/lib/gcppubsub/gcppubsubadapters/event_producer.go b/lib/gcppubsub/gcppubsubadapters/event_producer.go new file mode 100644 index 000000000..33ed8db62 --- /dev/null +++ b/lib/gcppubsub/gcppubsubadapters/event_producer.go @@ -0,0 +1,163 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package gcppubsubadapters + +import ( + "context" + "log/slog" + + "github.com/GoogleChrome/webstatus.dev/lib/event" + batchrefreshv1 "github.com/GoogleChrome/webstatus.dev/lib/event/batchrefreshtrigger/v1" + featurediffv1 "github.com/GoogleChrome/webstatus.dev/lib/event/featurediff/v1" + refreshv1 "github.com/GoogleChrome/webstatus.dev/lib/event/refreshsearchcommand/v1" + searchconfigv1 "github.com/GoogleChrome/webstatus.dev/lib/event/searchconfigurationchanged/v1" + "github.com/GoogleChrome/webstatus.dev/lib/workertypes" +) + +type EventProducerSearchMessageHandler interface { + ProcessSearch(ctx context.Context, searchID string, query string, + frequency workertypes.JobFrequency, triggerID string) error +} + +type EventProducerBatchUpdateHandler interface { + ProcessBatchUpdate(ctx context.Context, triggerID string, frequency workertypes.JobFrequency) error +} + +type EventSubscriber interface { + Subscribe(ctx context.Context, subID string, + handler func(ctx context.Context, msgID string, data []byte) error) error +} + +type SubscriberConfig struct { + SearchSubscriptionID string + BatchUpdateSubscriptionID string +} + +type EventProducerSubscriberAdapter struct { + searchEventHandler EventProducerSearchMessageHandler + batchUpdateHandler EventProducerBatchUpdateHandler + eventSubscriber EventSubscriber + config SubscriberConfig + searchEventRouter *event.Router + batchUpdateRouter *event.Router +} + +func NewEventProducerSubscriberAdapter( + searchMessageHandler EventProducerSearchMessageHandler, + batchUpdateHandler EventProducerBatchUpdateHandler, + eventSubscriber EventSubscriber, + config SubscriberConfig, +) *EventProducerSubscriberAdapter { + searchEventRouter := event.NewRouter() + + batchUpdateRouter := event.NewRouter() + + ret := &EventProducerSubscriberAdapter{ + searchEventHandler: searchMessageHandler, + batchUpdateHandler: batchUpdateHandler, + eventSubscriber: eventSubscriber, + config: config, + searchEventRouter: searchEventRouter, + batchUpdateRouter: batchUpdateRouter, + } + + event.Register(searchEventRouter, ret.processRefreshSearchCommand) + event.Register(searchEventRouter, ret.processSearchConfigurationChangedEvent) + + event.Register(batchUpdateRouter, ret.processBatchUpdateCommand) + + return ret +} + +func (a *EventProducerSubscriberAdapter) processRefreshSearchCommand(ctx context.Context, + eventID string, event refreshv1.RefreshSearchCommand) error { + slog.InfoContext(ctx, "received refresh search command", "eventID", eventID, "event", event) + + return a.searchEventHandler.ProcessSearch(ctx, event.SearchID, event.Query, + event.Frequency.ToWorkerTypeJobFrequency(), eventID) +} + +func (a *EventProducerSubscriberAdapter) processSearchConfigurationChangedEvent(ctx context.Context, + eventID string, event searchconfigv1.SearchConfigurationChangedEvent) error { + slog.InfoContext(ctx, "received search configuration changed event", "eventID", eventID, "event", event) + + return a.searchEventHandler.ProcessSearch(ctx, event.SearchID, event.Query, + event.Frequency.ToWorkerTypeJobFrequency(), eventID) +} + +func (a *EventProducerSubscriberAdapter) Subscribe(ctx context.Context) error { + return RunGroup(ctx, + // Handler 1: Search + func(ctx context.Context) error { + return a.eventSubscriber.Subscribe(ctx, a.config.SearchSubscriptionID, + func(ctx context.Context, msgID string, data []byte) error { + return a.searchEventRouter.HandleMessage(ctx, msgID, data) + }) + }, + // Handler 2: Batch Update + func(ctx context.Context) error { + return a.eventSubscriber.Subscribe(ctx, a.config.BatchUpdateSubscriptionID, + func(ctx context.Context, msgID string, data []byte) error { + return a.batchUpdateRouter.HandleMessage(ctx, msgID, data) + }) + }, + ) +} + +func (a *EventProducerSubscriberAdapter) processBatchUpdateCommand(ctx context.Context, + eventID string, event batchrefreshv1.BatchRefreshTrigger) error { + slog.InfoContext(ctx, "received batch update command", "eventID", eventID, "event", event) + + return a.batchUpdateHandler.ProcessBatchUpdate(ctx, eventID, + event.Frequency.ToWorkerTypeJobFrequency()) +} + +type EventPublisher interface { + Publish(ctx context.Context, topicID string, data []byte) (string, error) +} + +type EventProducerPublisherAdapter struct { + eventPublisher EventPublisher + topicID string +} + +func NewEventProducerPublisherAdapter(eventPublisher EventPublisher, topicID string) *EventProducerPublisherAdapter { + return &EventProducerPublisherAdapter{ + eventPublisher: eventPublisher, + topicID: topicID, + } +} + +func (a *EventProducerPublisherAdapter) Publish(ctx context.Context, + req workertypes.PublishEventRequest) (string, error) { + b, err := event.New(featurediffv1.FeatureDiffEvent{ + EventID: req.EventID, + SearchID: req.SearchID, + Query: req.Query, + Summary: req.Summary, + StateID: req.StateID, + StateBlobPath: req.StateBlobPath, + DiffID: req.DiffID, + DiffBlobPath: req.DiffBlobPath, + GeneratedAt: req.GeneratedAt, + Frequency: featurediffv1.ToJobFrequency(req.Frequency), + Reasons: featurediffv1.ToReasons(req.Reasons), + }) + if err != nil { + return "", err + } + + return a.eventPublisher.Publish(ctx, a.topicID, b) +} diff --git a/lib/gcppubsub/gcppubsubadapters/event_producer_test.go b/lib/gcppubsub/gcppubsubadapters/event_producer_test.go new file mode 100644 index 000000000..f3264d6e0 --- /dev/null +++ b/lib/gcppubsub/gcppubsubadapters/event_producer_test.go @@ -0,0 +1,342 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package gcppubsubadapters + +import ( + "context" + "encoding/base64" + "encoding/json" + "sync" + "testing" + "time" + + batchrefreshv1 "github.com/GoogleChrome/webstatus.dev/lib/event/batchrefreshtrigger/v1" + refreshv1 "github.com/GoogleChrome/webstatus.dev/lib/event/refreshsearchcommand/v1" + "github.com/GoogleChrome/webstatus.dev/lib/workertypes" + "github.com/google/go-cmp/cmp" +) + +// --- Mocks --- + +type mockSearchHandler struct { + calls []searchCall + mu sync.Mutex + err error +} + +type searchCall struct { + SearchID string + Query string + Frequency workertypes.JobFrequency + TriggerID string +} + +func (m *mockSearchHandler) ProcessSearch(_ context.Context, searchID, query string, + freq workertypes.JobFrequency, triggerID string) error { + m.mu.Lock() + defer m.mu.Unlock() + m.calls = append(m.calls, searchCall{searchID, query, freq, triggerID}) + + return m.err +} + +type mockBatchHandler struct { + calls []batchCall + mu sync.Mutex + err error +} + +type batchCall struct { + TriggerID string + Frequency workertypes.JobFrequency +} + +func (m *mockBatchHandler) ProcessBatchUpdate(_ context.Context, triggerID string, + freq workertypes.JobFrequency) error { + m.mu.Lock() + defer m.mu.Unlock() + m.calls = append(m.calls, batchCall{triggerID, freq}) + + return m.err +} + +type mockSubscriber struct { + handlers map[string]func(context.Context, string, []byte) error + mu sync.Mutex + // block allows us to simulate a long-running Subscribe call so RunGroup doesn't exit immediately + block chan struct{} +} + +func (m *mockSubscriber) Subscribe(ctx context.Context, subID string, + handler func(context.Context, string, []byte) error) error { + m.mu.Lock() + if m.handlers == nil { + m.handlers = make(map[string]func(context.Context, string, []byte) error) + } + m.handlers[subID] = handler + m.mu.Unlock() + + // Simulate blocking behavior of a real subscriber logic + if m.block != nil { + select { + case <-m.block: + return nil + case <-ctx.Done(): + return ctx.Err() + } + } + + return nil +} + +type mockPublisher struct { + publishedData []byte + publishedTopic string + err error +} + +func (m *mockPublisher) Publish(_ context.Context, topicID string, data []byte) (string, error) { + m.publishedData = data + m.publishedTopic = topicID + + return "msg-id", m.err +} + +// --- Tests --- + +type testEnv struct { + searchHandler *mockSearchHandler + batchHandler *mockBatchHandler + subscriber *mockSubscriber + adapter *EventProducerSubscriberAdapter + searchFn func(context.Context, string, []byte) error + batchFn func(context.Context, string, []byte) error + stop func() +} + +func setupTestAdapter(t *testing.T) *testEnv { + t.Helper() + searchHandler := new(mockSearchHandler) + batchHandler := new(mockBatchHandler) + subscriber := &mockSubscriber{block: make(chan struct{}), mu: sync.Mutex{}, handlers: nil} + config := SubscriberConfig{ + SearchSubscriptionID: "search-sub", + BatchUpdateSubscriptionID: "batch-sub", + } + + adapter := NewEventProducerSubscriberAdapter(searchHandler, batchHandler, subscriber, config) + + // Run Subscribe in a goroutine because it blocks + ctx, cancel := context.WithCancel(context.Background()) + + errChan := make(chan error) + go func() { + errChan <- adapter.Subscribe(ctx) + }() + + // Wait briefly for RunGroup to start and handlers to be registered + time.Sleep(50 * time.Millisecond) + + subscriber.mu.Lock() + searchFn := subscriber.handlers["search-sub"] + batchFn := subscriber.handlers["batch-sub"] + subscriber.mu.Unlock() + + if searchFn == nil || batchFn == nil { + cancel() + t.Fatal("Subscribe did not register handlers for both subscriptions") + } + + return &testEnv{ + searchHandler: searchHandler, + batchHandler: batchHandler, + subscriber: subscriber, + adapter: adapter, + searchFn: searchFn, + batchFn: batchFn, + stop: func() { + close(subscriber.block) // Unblock the subscriber + cancel() // Cancel the context + <-errChan // Wait for adapter.Subscribe to return + }, + } +} + +func TestSubscribe_RoutesRefreshSearchCommand(t *testing.T) { + env := setupTestAdapter(t) + defer env.stop() + + refreshCmd := refreshv1.RefreshSearchCommand{ + SearchID: "s1", + Query: "q1", + Frequency: "DAILY", + Timestamp: time.Time{}, + } + ceWrapper := map[string]interface{}{ + "apiVersion": "v1", + "kind": "RefreshSearchCommand", + "data": refreshCmd, + } + ceBytes, _ := json.Marshal(ceWrapper) + + if err := env.searchFn(context.Background(), "msg-1", ceBytes); err != nil { + t.Errorf("searchFn failed: %v", err) + } + + if len(env.searchHandler.calls) != 1 { + t.Fatalf("Expected 1 search call, got %d", len(env.searchHandler.calls)) + } + + expectedCall := searchCall{ + SearchID: "s1", + Query: "q1", + Frequency: workertypes.FrequencyDaily, + TriggerID: "msg-1", + } + + if diff := cmp.Diff(expectedCall, env.searchHandler.calls[0]); diff != "" { + t.Errorf("Search call mismatch (-want +got):\n%s", diff) + } +} + +func TestSubscribe_RoutesBatchUpdate(t *testing.T) { + env := setupTestAdapter(t) + defer env.stop() + + batchTrig := batchrefreshv1.BatchRefreshTrigger{ + Frequency: "WEEKLY", + } + ceWrapperBatch := map[string]interface{}{ + "apiVersion": "v1", + "kind": "BatchRefreshTrigger", + "data": batchTrig, + } + ceBytesBatch, _ := json.Marshal(ceWrapperBatch) + + if err := env.batchFn(context.Background(), "msg-2", ceBytesBatch); err != nil { + t.Errorf("batchFn failed: %v", err) + } + + if len(env.batchHandler.calls) != 1 { + t.Fatalf("Expected 1 batch call, got %d", len(env.batchHandler.calls)) + } + + expectedCall := batchCall{ + TriggerID: "msg-2", + Frequency: workertypes.FrequencyWeekly, + } + + if diff := cmp.Diff(expectedCall, env.batchHandler.calls[0]); diff != "" { + t.Errorf("Batch call mismatch (-want +got):\n%s", diff) + } +} + +func TestSubscribe_RoutesSearchConfigurationChanged(t *testing.T) { + env := setupTestAdapter(t) + defer env.stop() + + // We construct the payload manually for the test execution + configEventPayload := map[string]interface{}{ + "search_id": "s2", + "query": "q2", + "user_id": "user-1", + "timestamp": "0001-01-01T00:00:00Z", + "is_creation": false, + "frequency": "IMMEDIATE", + } + + ceWrapperConfig := map[string]interface{}{ + "apiVersion": "v1", + "kind": "SearchConfigurationChangedEvent", + "data": configEventPayload, + } + ceBytesConfig, _ := json.Marshal(ceWrapperConfig) + + if err := env.searchFn(context.Background(), "msg-3", ceBytesConfig); err != nil { + t.Errorf("searchFn (config event) failed: %v", err) + } + + if len(env.searchHandler.calls) != 1 { + t.Fatalf("Expected 1 search call, got %d", len(env.searchHandler.calls)) + } + + expectedCall := searchCall{ + SearchID: "s2", + Query: "q2", + Frequency: workertypes.FrequencyImmediate, + TriggerID: "msg-3", + } + + if diff := cmp.Diff(expectedCall, env.searchHandler.calls[0]); diff != "" { + t.Errorf("Search call mismatch (-want +got):\n%s", diff) + } +} + +func TestPublisher_Publish(t *testing.T) { + publisher := new(mockPublisher) + adapter := NewEventProducerPublisherAdapter(publisher, "topic-1") + now := time.Date(2025, 1, 1, 12, 0, 0, 0, time.UTC) + + req := workertypes.PublishEventRequest{ + EventID: "evt-1", + SearchID: "search-1", + Query: "query-1", + Frequency: "DAILY", + Reasons: []workertypes.Reason{workertypes.ReasonDataUpdated}, + Summary: []byte(`{"added": 1}`), + StateID: "state-id-1", + DiffID: "diff-id-1", + StateBlobPath: "gs://bucket/state-blob", + DiffBlobPath: "gs://bucket/diff-blob", + GeneratedAt: now, + } + + _, err := adapter.Publish(context.Background(), req) + if err != nil { + t.Fatalf("Publish failed: %v", err) + } + + if publisher.publishedTopic != "topic-1" { + t.Errorf("Topic mismatch: got %s, want topic-1", publisher.publishedTopic) + } + + var actualEnvelope map[string]interface{} + if err := json.Unmarshal(publisher.publishedData, &actualEnvelope); err != nil { + t.Fatalf("Failed to unmarshal published data: %v", err) + } + + expectedEnvelope := map[string]interface{}{ + "apiVersion": "v1", + "kind": "FeatureDiffEvent", + "data": map[string]interface{}{ + "event_id": "evt-1", + "search_id": "search-1", + "query": "query-1", + // go encodes/decodes []byte as base64 strings + "summary": base64.StdEncoding.EncodeToString([]byte(`{"added": 1}`)), + "state_id": "state-id-1", + "diff_id": "diff-id-1", + "state_blob_path": "gs://bucket/state-blob", + "diff_blob_path": "gs://bucket/diff-blob", + "reasons": []interface{}{"DATA_UPDATED"}, + "generated_at": now.Format(time.RFC3339), + "frequency": "DAILY", + }, + } + + if diff := cmp.Diff(expectedEnvelope, actualEnvelope); diff != "" { + t.Errorf("Payload mismatch (-want +got):\n%s", diff) + } +} diff --git a/lib/gcppubsub/gcppubsubadapters/utils.go b/lib/gcppubsub/gcppubsubadapters/utils.go new file mode 100644 index 000000000..8dfe2bbf6 --- /dev/null +++ b/lib/gcppubsub/gcppubsubadapters/utils.go @@ -0,0 +1,58 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package gcppubsubadapters + +import ( + "context" + "sync" +) + +// RunGroup runs multiple blocking functions concurrently. +// - It returns the first error encountered. +// - If one function fails, it cancels the context for the others. +// - It waits for all functions to exit before returning. +func RunGroup(ctx context.Context, fns ...func(ctx context.Context) error) error { + // 1. Create a derived context so we can signal cancellation to all siblings + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + var wg sync.WaitGroup + errChan := make(chan error, len(fns)) + + for _, fn := range fns { + wg.Add(1) + // Capture fn in the loop scope + go func(f func(context.Context) error) { + defer wg.Done() + + // Pass the cancellable context to the function + if err := f(ctx); err != nil { + // Try to push the error; if channel is full, we already have an error + select { + case errChan <- err: + // Signal other routines to stop + cancel() + default: + } + } + }(fn) + } + + wg.Wait() + close(errChan) + + // Return the first error (if any) + return <-errChan +} diff --git a/lib/gcppubsub/gcppubsubadapters/utils_test.go b/lib/gcppubsub/gcppubsubadapters/utils_test.go new file mode 100644 index 000000000..32a5e518b --- /dev/null +++ b/lib/gcppubsub/gcppubsubadapters/utils_test.go @@ -0,0 +1,101 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package gcppubsubadapters + +import ( + "context" + "errors" + "testing" + "time" +) + +func TestRunGroup(t *testing.T) { + tests := []struct { + name string + fns []func(context.Context) error + expectedErr error + }{ + { + name: "All functions succeed", + fns: []func(context.Context) error{ + func(_ context.Context) error { return nil }, + func(_ context.Context) error { return nil }, + }, + expectedErr: nil, + }, + { + name: "One function fails immediately", + fns: []func(context.Context) error{ + func(_ context.Context) error { return errors.New("fail") }, + func(ctx context.Context) error { + // Simulate work + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(100 * time.Millisecond): + return nil + } + }, + }, + expectedErr: errors.New("fail"), + }, + { + name: "Multiple failures return first error", + fns: []func(context.Context) error{ + func(_ context.Context) error { return errors.New("error 1") }, + func(_ context.Context) error { + time.Sleep(10 * time.Millisecond) // Ensure this happens slightly later + + return errors.New("error 2") + }, + }, + expectedErr: errors.New("error 1"), + }, + { + name: "Cancellation propagates", + fns: []func(context.Context) error{ + func(_ context.Context) error { + return errors.New("trigger cancel") + }, + func(ctx context.Context) error { + select { + case <-ctx.Done(): + // Correct behavior: context was cancelled + return nil + case <-time.After(1 * time.Second): + return errors.New("timeout: context was not cancelled") + } + }, + }, + expectedErr: errors.New("trigger cancel"), + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + err := RunGroup(context.Background(), tc.fns...) + + if tc.expectedErr == nil { + if err != nil { + t.Errorf("RunGroup() unexpected error: %v", err) + } + } else { + if err == nil || err.Error() != tc.expectedErr.Error() { + t.Errorf("RunGroup() error = %v, want %v", err, tc.expectedErr) + } + } + }) + } +}