Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions internal/dev_server/api/events/events_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ import (
type sdkEventObserver struct {
ctx context.Context
debugSessionKey string
updateChan chan<- sdk.Message
updateChan chan<- []byte
}

func newSdkEventObserver(updateChan chan<- sdk.Message, ctx context.Context) sdkEventObserver {
func newSdkEventObserver(updateChan chan<- []byte, ctx context.Context) sdkEventObserver {
debugSessionKey := uuid.New().String()
db := model.EventStoreFromContext(ctx)
err := db.CreateDebugSession(ctx, debugSessionKey)
Expand Down Expand Up @@ -54,14 +54,14 @@ func (o sdkEventObserver) Handle(message interface{}) {
return
}

o.updateChan <- sdk.Message{Event: sdk.TYPE_PUT, Data: str}
o.updateChan <- sdk.Message{Event: sdk.TYPE_PUT, Data: str}.ToPayload()
}

func SdkEventsTeeHandler(writer http.ResponseWriter, request *http.Request) {
updateChan, errChan := sdk.OpenStream(
writer,
request.Context().Done(),
sdk.Message{Event: sdk.TYPE_PUT, Data: []byte{}},
sdk.Message{Event: sdk.TYPE_PUT, Data: []byte{}}.ToPayload(),
)
defer close(updateChan)
observers := model.GetObserversFromContext(request.Context())
Expand Down
12 changes: 7 additions & 5 deletions internal/dev_server/model/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@ package model

// Event for individual flag overrides
type OverrideEvent struct {
FlagKey string
ProjectKey string
FlagState FlagState
FlagKey string
ProjectKey string
FlagState FlagState
PayloadVersion int
}

// Event for full project sync
type SyncEvent struct {
ProjectKey string
AllFlagsState FlagsState
ProjectKey string
AllFlagsState FlagsState
PayloadVersion int
}
18 changes: 10 additions & 8 deletions internal/dev_server/model/override.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,16 @@ func UpsertOverride(ctx context.Context, projectKey, flagKey string, value ldval
return Override{}, err
}

_, err = store.IncrementProjectPayloadVersion(ctx, projectKey)
newPayloadVersion, err := store.IncrementProjectPayloadVersion(ctx, projectKey)
if err != nil {
return Override{}, errors.Wrap(err, "unable to increment payload version")
}

GetObserversFromContext(ctx).Notify(OverrideEvent{
FlagKey: flagKey,
ProjectKey: projectKey,
FlagState: override.Apply(flagState),
FlagKey: flagKey,
ProjectKey: projectKey,
FlagState: override.Apply(flagState),
PayloadVersion: newPayloadVersion,
})
return override, nil
}
Expand All @@ -84,7 +85,7 @@ func DeleteOverride(ctx context.Context, projectKey, flagKey string) error {
return err
}

_, err = store.IncrementProjectPayloadVersion(ctx, projectKey)
newPayloadVersion, err := store.IncrementProjectPayloadVersion(ctx, projectKey)
if err != nil {
return errors.Wrap(err, "unable to increment payload version")
}
Expand All @@ -97,9 +98,10 @@ func DeleteOverride(ctx context.Context, projectKey, flagKey string) error {
Version: version,
}
GetObserversFromContext(ctx).Notify(OverrideEvent{
FlagKey: flagKey,
ProjectKey: projectKey,
FlagState: override.Apply(flagState),
FlagKey: flagKey,
ProjectKey: projectKey,
FlagState: override.Apply(flagState),
PayloadVersion: newPayloadVersion,
})
return err
}
Expand Down
8 changes: 5 additions & 3 deletions internal/dev_server/model/override_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,10 @@ func TestUpsertOverride(t *testing.T) {
observer.
EXPECT().
Handle(model.OverrideEvent{
FlagKey: flagKey,
ProjectKey: projKey,
FlagState: model.FlagState{Value: ldvalue.Bool(true), Version: 2, TrackEvents: true},
FlagKey: flagKey,
ProjectKey: projKey,
FlagState: model.FlagState{Value: ldvalue.Bool(true), Version: 2, TrackEvents: true},
PayloadVersion: 1,
})

o, err := model.UpsertOverride(ctx, projKey, flagKey, ldValue)
Expand Down Expand Up @@ -139,6 +140,7 @@ func TestDeleteOverride(t *testing.T) {
Value: ldvalue.Bool(false),
Version: 3, // override version 2 + flag version 1
},
PayloadVersion: 1,
})

err := model.DeleteOverride(ctx, projKey, flagKey)
Expand Down
5 changes: 3 additions & 2 deletions internal/dev_server/model/project.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,9 @@ func UpdateProject(ctx context.Context, projectKey string, context *ldcontext.Co
}

GetObserversFromContext(ctx).Notify(SyncEvent{
ProjectKey: project.Key,
AllFlagsState: allFlagsWithOverrides,
ProjectKey: project.Key,
AllFlagsState: allFlagsWithOverrides,
PayloadVersion: project.PayloadVersion,
})
return *project, nil
}
Expand Down
5 changes: 3 additions & 2 deletions internal/dev_server/model/project_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,9 @@ func TestUpdateProject(t *testing.T) {
observer.
EXPECT().
Handle(model.SyncEvent{
ProjectKey: proj.Key,
AllFlagsState: model.FromAllFlags(allFlagsState),
ProjectKey: proj.Key,
AllFlagsState: model.FromAllFlags(allFlagsState),
PayloadVersion: 2,
})

project, err := model.UpdateProject(ctx, proj.Key, nil, nil)
Expand Down
5 changes: 3 additions & 2 deletions internal/dev_server/model/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@ func RestoreDb(ctx context.Context, stream io.Reader) error {
return err
}
observers.Notify(SyncEvent{
ProjectKey: project.Key,
AllFlagsState: allFlagsWithOverrides,
ProjectKey: project.Key,
AllFlagsState: allFlagsWithOverrides,
PayloadVersion: project.PayloadVersion,
})
}

Expand Down
2 changes: 1 addition & 1 deletion internal/dev_server/model/restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func TestRestoreDb(t *testing.T) {
store.EXPECT().GetDevProject(gomock.Any(), projKey).Return(&proj, nil)
store.EXPECT().GetOverridesForProject(gomock.Any(), projKey).Return(model.Overrides{}, nil)
observer := mocks.NewMockObserver(mockController)
observer.EXPECT().Handle(model.SyncEvent{ProjectKey: projKey, AllFlagsState: proj.AllFlagsState})
observer.EXPECT().Handle(model.SyncEvent{ProjectKey: projKey, AllFlagsState: proj.AllFlagsState, PayloadVersion: proj.PayloadVersion})

observers.RegisterObserver(observer)

Expand Down
27 changes: 23 additions & 4 deletions internal/dev_server/sdk/fdv2.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ const (
fdv2ReasonUpToDate = "up-to-date"
fdv2ReasonCantCatchup = "cant-catchup"
fdv2ReasonPayloadMissing = "payload-missing"
fdv2ReasonUpdate = "update"
)

// parseBasis extracts the payload ID and version from a basis state string of the
Expand All @@ -39,7 +40,7 @@ func parseBasis(basis string) (string, int) {
return inner[:lastColon], version
}

// buildPollResponse constructs the FDv2 polling response.
// buildInitialResponse constructs the FDv2 initial response for both polling and streaming.
//
// payloadID is the stable identifier for this payload (the project key).
// currentVersion is the project's current PayloadVersion.
Expand All @@ -48,7 +49,7 @@ func parseBasis(basis string) (string, int) {
//
// Delta transfers are not supported: stale clients always receive a full payload.
// Tracking the change history required for deltas is overkill for a local dev server.
func buildPollResponse(payloadID string, currentVersion int, flags model.FlagsState, basis string) (subsystems.PollingPayload, error) {
func buildInitialResponse(payloadID string, currentVersion int, flags model.FlagsState, basis string) (subsystems.PollingPayload, error) {
basisPayloadID, basisVersion := parseBasis(basis)
switch {
case basisVersion == 0:
Expand Down Expand Up @@ -122,11 +123,29 @@ func makePutObjectEvent(version int, key string, flagState model.FlagState) (sub
return subsystems.RawEvent{Name: subsystems.EventPutObject, Data: data}, nil
}

// buildFlagChangeEvents builds the events sequence for a single flag update pushed over a stream:
// server-intent(xfer-changes) + put-object(changed flag) + payload-transferred.
func buildFlagChangeEvents(payloadID string, version int, flagKey string, flagState model.FlagState) ([]subsystems.RawEvent, error) {
intentEvent, err := makeServerIntentEvent(payloadID, version, subsystems.IntentTransferChanges, fdv2ReasonUpdate)
if err != nil {
return nil, err
}
putEvent, err := makePutObjectEvent(version, flagKey, flagState)
if err != nil {
return nil, err
}
transferredEvent, err := makePayloadTransferredEvent(payloadID, version)
if err != nil {
return nil, err
}
return []subsystems.RawEvent{intentEvent, putEvent, transferredEvent}, nil
}

func makePayloadTransferredEvent(payloadID string, version int) (subsystems.RawEvent, error) {
// The selector state is synthetic and dev-server-specific: the dev server uses the
// project key as the payload ID rather than a server-assigned opaque value. The SDK
// echoes this selector back as ?basis on subsequent polls, where parseBasisVersion
// extracts the version from it.
// echoes this selector back as ?basis on subsequent polls, where parseBasis
// extracts the payload ID and version from it.
selector := subsystems.NewSelector(fmt.Sprintf("(p:%s:%d)", payloadID, version), version)
data, err := json.Marshal(selector)
if err != nil {
Expand Down
12 changes: 6 additions & 6 deletions internal/dev_server/sdk/fdv2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func TestBuildPollResponse(t *testing.T) {
}

t.Run("no basis sends xfer-full with payload-missing", func(t *testing.T) {
resp, err := buildPollResponse(payloadID, currentVersion, flags, "")
resp, err := buildInitialResponse(payloadID, currentVersion, flags, "")
require.NoError(t, err)

require.GreaterOrEqual(t, len(resp.Events), 3) // server-intent + put-objects + payload-transferred
Expand All @@ -64,7 +64,7 @@ func TestBuildPollResponse(t *testing.T) {

t.Run("up-to-date basis sends none with up-to-date", func(t *testing.T) {
basis := fmt.Sprintf("(p:%s:%d)", payloadID, currentVersion)
resp, err := buildPollResponse(payloadID, currentVersion, flags, basis)
resp, err := buildInitialResponse(payloadID, currentVersion, flags, basis)
require.NoError(t, err)

require.Len(t, resp.Events, 1)
Expand All @@ -73,7 +73,7 @@ func TestBuildPollResponse(t *testing.T) {

t.Run("basis ahead of current version sends full transfer (e.g. project recreated)", func(t *testing.T) {
basis := fmt.Sprintf("(p:%s:%d)", payloadID, currentVersion+10)
resp, err := buildPollResponse(payloadID, currentVersion, flags, basis)
resp, err := buildInitialResponse(payloadID, currentVersion, flags, basis)
require.NoError(t, err)

require.GreaterOrEqual(t, len(resp.Events), 3)
Expand All @@ -83,7 +83,7 @@ func TestBuildPollResponse(t *testing.T) {

t.Run("stale basis sends xfer-full with cant-catchup", func(t *testing.T) {
basis := fmt.Sprintf("(p:%s:%d)", payloadID, currentVersion-1)
resp, err := buildPollResponse(payloadID, currentVersion, flags, basis)
resp, err := buildInitialResponse(payloadID, currentVersion, flags, basis)
require.NoError(t, err)

require.GreaterOrEqual(t, len(resp.Events), 3)
Expand All @@ -93,7 +93,7 @@ func TestBuildPollResponse(t *testing.T) {

t.Run("basis with wrong payload ID sends xfer-full", func(t *testing.T) {
basis := fmt.Sprintf("(p:%s:%d)", "other-project", currentVersion)
resp, err := buildPollResponse(payloadID, currentVersion, flags, basis)
resp, err := buildInitialResponse(payloadID, currentVersion, flags, basis)
require.NoError(t, err)

require.GreaterOrEqual(t, len(resp.Events), 3)
Expand All @@ -106,7 +106,7 @@ func TestBuildPollResponse(t *testing.T) {
"flag-a": model.FlagState{Value: ldvalue.Bool(true), Version: 1},
"flag-b": model.FlagState{Value: ldvalue.String("hello"), Version: 2},
}
resp, err := buildPollResponse(payloadID, currentVersion, multiFlags, "")
resp, err := buildInitialResponse(payloadID, currentVersion, multiFlags, "")
require.NoError(t, err)

// server-intent + 2 put-objects + payload-transferred
Expand Down
2 changes: 1 addition & 1 deletion internal/dev_server/sdk/polling.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func PollV2(w http.ResponseWriter, r *http.Request) {
return
}

response, err := buildPollResponse(projectKey, project.PayloadVersion, allFlags, r.URL.Query().Get("basis"))
response, err := buildInitialResponse(projectKey, project.PayloadVersion, allFlags, r.URL.Query().Get("basis"))
if err != nil {
WriteError(ctx, w, errors.Wrap(err, "failed to build poll response"))
return
Expand Down
1 change: 1 addition & 0 deletions internal/dev_server/sdk/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ func BindRoutes(router *mux.Router) {
router.Handle("/all", GetProjectKeyFromAuthorizationHeader(http.HandlerFunc(StreamServerAllPayload)))
router.Handle("/sdk/latest-all", GetProjectKeyFromAuthorizationHeader(http.HandlerFunc(LatestAll)))
router.Handle("/sdk/poll", GetProjectKeyFromAuthorizationHeader(http.HandlerFunc(PollV2)))
router.Handle("/sdk/stream", GetProjectKeyFromAuthorizationHeader(http.HandlerFunc(StreamV2)))

router.PathPrefix("/sdk/flags/{flagKey}").
Methods(http.MethodGet).
Expand Down
4 changes: 2 additions & 2 deletions internal/dev_server/sdk/stream_client_flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func StreamClientFlags(w http.ResponseWriter, r *http.Request) {
updateChan, doneChan := OpenStream(
w,
r.Context().Done(),
Message{Event: TYPE_PUT, Data: jsonBody},
Message{Event: TYPE_PUT, Data: jsonBody}.ToPayload(),
)
defer close(updateChan)
projectKey := GetProjectKeyFromContext(ctx)
Expand All @@ -46,7 +46,7 @@ func StreamClientFlags(w http.ResponseWriter, r *http.Request) {
}

type clientFlagsObserver struct {
updateChan chan<- Message
updateChan chan<- []byte
projectKey string
}

Expand Down
89 changes: 89 additions & 0 deletions internal/dev_server/sdk/stream_server_fdv2.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package sdk

import (
"fmt"
"log"
"net/http"

"github.com/launchdarkly/go-server-sdk/v7/subsystems"
"github.com/launchdarkly/ldcli/internal/dev_server/model"
"github.com/pkg/errors"
)

func StreamV2(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
projectKey := GetProjectKeyFromContext(ctx)
store := model.StoreFromContext(ctx)

project, err := store.GetDevProject(ctx, projectKey)
if err != nil {
WriteError(ctx, w, errors.Wrap(err, "failed to get project"))
return
}

allFlags, err := project.GetFlagStateWithOverridesForProject(ctx)
if err != nil {
WriteError(ctx, w, errors.Wrap(err, "failed to get flag state"))
return
}

initialPayload, err := buildInitialResponse(projectKey, project.PayloadVersion, allFlags, r.URL.Query().Get("basis"))
if err != nil {
WriteError(ctx, w, errors.Wrap(err, "failed to build initial payload"))
return
}

updateChan, doneChan := OpenStream(w, r.Context().Done(), fdv2SSEPayload(initialPayload.Events))
defer close(updateChan)

observer := fdv2StreamObserver{updateChan: updateChan, projectKey: projectKey}
observerID := model.GetObserversFromContext(ctx).RegisterObserver(observer)
defer func() {
if ok := model.GetObserversFromContext(ctx).DeregisterObserver(observerID); !ok {
log.Printf("unable to deregister fdv2 stream observer")
}
}()

err = <-doneChan
if err != nil {
WriteError(ctx, w, errors.Wrap(err, "stream failure"))
}
}

// fdv2SSEPayload formats a slice of FDv2 events as raw SSE bytes.
// Each event becomes an individual SSE event in the output.
func fdv2SSEPayload(events []subsystems.RawEvent) []byte {
var buf []byte
for _, e := range events {
buf = append(buf, fmt.Sprintf("event:%s\ndata:%s\n\n", e.Name, e.Data)...)
}
return buf
}

type fdv2StreamObserver struct {
updateChan chan<- []byte
projectKey string
}

func (o fdv2StreamObserver) Handle(event interface{}) {
switch event := event.(type) {
case model.OverrideEvent:
if event.ProjectKey != o.projectKey {
return
}
events, err := buildFlagChangeEvents(o.projectKey, event.PayloadVersion, event.FlagKey, event.FlagState)
if err != nil {
panic(errors.Wrap(err, "failed to build flag change events in fdv2 stream observer"))
}
o.updateChan <- fdv2SSEPayload(events)
case model.SyncEvent:
if event.ProjectKey != o.projectKey {
return
}
payload, err := buildFullTransferResponse(o.projectKey, event.PayloadVersion, event.AllFlagsState, fdv2ReasonCantCatchup)
if err != nil {
panic(errors.Wrap(err, "failed to build full transfer in fdv2 stream observer"))
}
o.updateChan <- fdv2SSEPayload(payload.Events)
}
}
Loading
Loading