diff --git a/lib/ldclient-rb/data_system.rb b/lib/ldclient-rb/data_system.rb index 22d2004a..d613e30d 100644 --- a/lib/ldclient-rb/data_system.rb +++ b/lib/ldclient-rb/data_system.rb @@ -2,6 +2,7 @@ require 'ldclient-rb/interfaces/data_system' require 'ldclient-rb/config' +require 'ldclient-rb/impl/data_system/polling' module LaunchDarkly # @@ -97,23 +98,39 @@ def build end end - # @private + # + # Returns a builder proc for creating a polling data source. + # This is a building block that can be used with {ConfigBuilder#initializers} + # or {ConfigBuilder#synchronizers} to create custom data system configurations. + # + # @return [Proc] A proc that takes (sdk_key, config) and returns a polling data source + # def self.polling_ds_builder - # TODO(fdv2): Implement polling data source builder - lambda do |_sdk_key, _config| - raise NotImplementedError, "Polling data source not yet implemented for FDv2" + lambda do |sdk_key, config| + LaunchDarkly::Impl::DataSystem::PollingDataSourceBuilder.new(sdk_key, config).build end end - # @private + # + # Returns a builder proc for creating an FDv1 fallback polling data source. + # This is a building block that can be used with {ConfigBuilder#fdv1_compatible_synchronizer} + # to provide FDv1 compatibility in custom data system configurations. + # + # @return [Proc] A proc that takes (sdk_key, config) and returns an FDv1 polling data source + # def self.fdv1_fallback_ds_builder - # TODO(fdv2): Implement FDv1 fallback polling data source builder - lambda do |_sdk_key, _config| - raise NotImplementedError, "FDv1 fallback data source not yet implemented for FDv2" + lambda do |sdk_key, config| + LaunchDarkly::Impl::DataSystem::FDv1PollingDataSourceBuilder.new(sdk_key, config).build end end - # @private + # + # Returns a builder proc for creating a streaming data source. + # This is a building block that can be used with {ConfigBuilder#synchronizers} + # to create custom data system configurations. + # + # @return [Proc] A proc that takes (sdk_key, config) and returns a streaming data source + # def self.streaming_ds_builder # TODO(fdv2): Implement streaming data source builder lambda do |_sdk_key, _config| diff --git a/lib/ldclient-rb/impl/data_system/polling.rb b/lib/ldclient-rb/impl/data_system/polling.rb new file mode 100644 index 00000000..15ff1ff2 --- /dev/null +++ b/lib/ldclient-rb/impl/data_system/polling.rb @@ -0,0 +1,546 @@ +# frozen_string_literal: true + +require "ldclient-rb/interfaces" +require "ldclient-rb/interfaces/data_system" +require "ldclient-rb/impl/data_system" +require "ldclient-rb/impl/data_system/protocolv2" +require "ldclient-rb/impl/data_source/requestor" +require "ldclient-rb/impl/util" +require "concurrent" +require "json" +require "uri" +require "http" + +module LaunchDarkly + module Impl + module DataSystem + FDV2_POLLING_ENDPOINT = "/sdk/poll" + FDV1_POLLING_ENDPOINT = "/sdk/latest-all" + + LD_ENVID_HEADER = "x-launchdarkly-env-id" + LD_FD_FALLBACK_HEADER = "x-launchdarkly-fd-fallback" + + # + # Requester protocol for polling data source + # + module Requester + # + # Fetches the data for the given selector. + # Returns a Result containing a tuple of [ChangeSet, headers], + # or an error if the data could not be retrieved. + # + # @param selector [LaunchDarkly::Interfaces::DataSystem::Selector, nil] + # @return [Result] + # + def fetch(selector) + raise NotImplementedError + end + end + + # + # PollingDataSource is a data source that can retrieve information from + # LaunchDarkly either as an Initializer or as a Synchronizer. + # + class PollingDataSource + include LaunchDarkly::Interfaces::DataSystem::Initializer + include LaunchDarkly::Interfaces::DataSystem::Synchronizer + + attr_reader :name + + # + # @param poll_interval [Float] Polling interval in seconds + # @param requester [Requester] The requester to use for fetching data + # @param logger [Logger] The logger + # + def initialize(poll_interval, requester, logger) + @requester = requester + @poll_interval = poll_interval + @logger = logger + @interrupt_event = Concurrent::Event.new + @stop = Concurrent::Event.new + @name = "PollingDataSourceV2" + end + + # + # Fetch returns a Basis, or an error if the Basis could not be retrieved. + # + # @param ss [LaunchDarkly::Interfaces::DataSystem::SelectorStore] + # @return [LaunchDarkly::Interfaces::DataSystem::Basis, nil] + # + def fetch(ss) + poll(ss) + end + + # + # sync begins the synchronization process for the data source, yielding + # Update objects until the connection is closed or an unrecoverable error + # occurs. + # + # @param ss [LaunchDarkly::Interfaces::DataSystem::SelectorStore] + # @yieldparam update [LaunchDarkly::Interfaces::DataSystem::Update] + # + def sync(ss) + @logger.info { "[LDClient] Starting PollingDataSourceV2 synchronizer" } + @stop.reset + @interrupt_event.reset + + until @stop.set? + result = @requester.fetch(ss.selector) + + if !result.success? + fallback = false + envid = nil + + if result.headers + fallback = result.headers[LD_FD_FALLBACK_HEADER] == 'true' + envid = result.headers[LD_ENVID_HEADER] + end + + if result.exception.is_a?(LaunchDarkly::Impl::DataSource::UnexpectedResponseError) + error_info = LaunchDarkly::Interfaces::DataSource::ErrorInfo.new( + LaunchDarkly::Interfaces::DataSource::ErrorInfo::ERROR_RESPONSE, + result.exception.status, + Impl::Util.http_error_message( + result.exception.status, "polling request", "will retry" + ), + Time.now + ) + + status_code = result.exception.status + if Impl::Util.http_error_recoverable?(status_code) + yield LaunchDarkly::Interfaces::DataSystem::Update.new( + state: LaunchDarkly::Interfaces::DataSource::Status::INTERRUPTED, + error: error_info, + environment_id: envid, + revert_to_fdv1: fallback + ) + # Stop polling if fallback is set; caller will handle shutdown + break if fallback + @interrupt_event.wait(@poll_interval) + next + end + + yield LaunchDarkly::Interfaces::DataSystem::Update.new( + state: LaunchDarkly::Interfaces::DataSource::Status::OFF, + error: error_info, + environment_id: envid, + revert_to_fdv1: fallback + ) + break + end + + error_info = LaunchDarkly::Interfaces::DataSource::ErrorInfo.new( + LaunchDarkly::Interfaces::DataSource::ErrorInfo::NETWORK_ERROR, + 0, + result.error, + Time.now + ) + + yield LaunchDarkly::Interfaces::DataSystem::Update.new( + state: LaunchDarkly::Interfaces::DataSource::Status::INTERRUPTED, + error: error_info, + environment_id: envid, + revert_to_fdv1: fallback + ) + else + change_set, headers = result.value + fallback = headers[LD_FD_FALLBACK_HEADER] == 'true' + yield LaunchDarkly::Interfaces::DataSystem::Update.new( + state: LaunchDarkly::Interfaces::DataSource::Status::VALID, + change_set: change_set, + environment_id: headers[LD_ENVID_HEADER], + revert_to_fdv1: fallback + ) + end + + break if fallback + break if @interrupt_event.wait(@poll_interval) + end + end + + # + # Stops the synchronizer. + # + def stop + @logger.info { "[LDClient] Stopping PollingDataSourceV2 synchronizer" } + @interrupt_event.set + @stop.set + end + + # + # @param ss [LaunchDarkly::Interfaces::DataSystem::SelectorStore] + # @return [LaunchDarkly::Result] + # + private def poll(ss) + result = @requester.fetch(ss.selector) + + unless result.success? + if result.exception.is_a?(LaunchDarkly::Impl::DataSource::UnexpectedResponseError) + status_code = result.exception.status + http_error_message_result = Impl::Util.http_error_message( + status_code, "polling request", "will retry" + ) + @logger.warn { "[LDClient] #{http_error_message_result}" } if Impl::Util.http_error_recoverable?(status_code) + return LaunchDarkly::Result.fail(http_error_message_result, result.exception) + end + + return LaunchDarkly::Result.fail(result.error || 'Failed to request payload', result.exception) + end + + change_set, headers = result.value + + env_id = headers[LD_ENVID_HEADER] + env_id = nil unless env_id.is_a?(String) + + basis = LaunchDarkly::Interfaces::DataSystem::Basis.new( + change_set: change_set, + persist: change_set.selector.defined?, + environment_id: env_id + ) + + LaunchDarkly::Result.success(basis) + rescue => e + msg = "Error: Exception encountered when updating flags. #{e}" + @logger.error { "[LDClient] #{msg}" } + @logger.debug { "[LDClient] Exception trace: #{e.backtrace}" } + LaunchDarkly::Result.fail(msg, e) + end + end + + # + # HTTPPollingRequester is a Requester that uses HTTP to make + # requests to the FDv2 polling endpoint. + # + class HTTPPollingRequester + include Requester + + # + # @param sdk_key [String] + # @param config [LaunchDarkly::Config] + # + def initialize(sdk_key, config) + @etag = nil + @config = config + @sdk_key = sdk_key + @poll_uri = config.base_uri + FDV2_POLLING_ENDPOINT + @http_client = Impl::Util.new_http_client(config.base_uri, config) + .use(:auto_inflate) + .headers("Accept-Encoding" => "gzip") + end + + # + # @param selector [LaunchDarkly::Interfaces::DataSystem::Selector, nil] + # @return [Result] + # + def fetch(selector) + query_params = [] + query_params << ["filter", @config.payload_filter_key] unless @config.payload_filter_key.nil? + + if selector && selector.defined? + query_params << ["selector", selector.state] + end + + uri = @poll_uri + if query_params.any? + filter_query = URI.encode_www_form(query_params) + uri = "#{uri}?#{filter_query}" + end + + headers = {} + Impl::Util.default_http_headers(@sdk_key, @config).each { |k, v| headers[k] = v } + headers["If-None-Match"] = @etag unless @etag.nil? + + begin + response = @http_client.request("GET", uri, headers: headers) + status = response.status.code + response_headers = response.headers.to_h.transform_keys(&:downcase) + + if status >= 400 + return LaunchDarkly::Result.fail( + "HTTP error #{status}", + LaunchDarkly::Impl::DataSource::UnexpectedResponseError.new(status), + response_headers + ) + end + + if status == 304 + return LaunchDarkly::Result.success([LaunchDarkly::Interfaces::DataSystem::ChangeSetBuilder.no_changes, response_headers]) + end + + body = response.to_s + data = JSON.parse(body, symbolize_names: true) + etag = response_headers["etag"] + @etag = etag unless etag.nil? + + @config.logger.debug { "[LDClient] #{uri} response status:[#{status}] ETag:[#{etag}]" } + + changeset_result = LaunchDarkly::Impl::DataSystem.polling_payload_to_changeset(data) + if changeset_result.success? + LaunchDarkly::Result.success([changeset_result.value, response_headers]) + else + LaunchDarkly::Result.fail(changeset_result.error, changeset_result.exception, response_headers) + end + rescue JSON::ParserError => e + LaunchDarkly::Result.fail("Failed to parse JSON: #{e.message}", e, response_headers) + rescue => e + LaunchDarkly::Result.fail("Network error: #{e.message}", e) + end + end + end + + # + # HTTPFDv1PollingRequester is a Requester that uses HTTP to make + # requests to the FDv1 polling endpoint. + # + class HTTPFDv1PollingRequester + include Requester + + # + # @param sdk_key [String] + # @param config [LaunchDarkly::Config] + # + def initialize(sdk_key, config) + @etag = nil + @config = config + @sdk_key = sdk_key + @poll_uri = config.base_uri + FDV1_POLLING_ENDPOINT + @http_client = Impl::Util.new_http_client(config.base_uri, config) + .use(:auto_inflate) + .headers("Accept-Encoding" => "gzip") + end + + # + # @param selector [LaunchDarkly::Interfaces::DataSystem::Selector, nil] + # @return [Result] + # + def fetch(selector) + query_params = [] + query_params << ["filter", @config.payload_filter_key] unless @config.payload_filter_key.nil? + + uri = @poll_uri + if query_params.any? + filter_query = URI.encode_www_form(query_params) + uri = "#{uri}?#{filter_query}" + end + + headers = {} + Impl::Util.default_http_headers(@sdk_key, @config).each { |k, v| headers[k] = v } + headers["If-None-Match"] = @etag unless @etag.nil? + + begin + response = @http_client.request("GET", uri, headers: headers) + status = response.status.code + response_headers = response.headers.to_h.transform_keys(&:downcase) + + if status >= 400 + return LaunchDarkly::Result.fail( + "HTTP error #{status}", + LaunchDarkly::Impl::DataSource::UnexpectedResponseError.new(status), + response_headers + ) + end + + if status == 304 + return LaunchDarkly::Result.success([LaunchDarkly::Interfaces::DataSystem::ChangeSetBuilder.no_changes, response_headers]) + end + + body = response.to_s + data = JSON.parse(body, symbolize_names: true) + etag = response_headers["etag"] + @etag = etag unless etag.nil? + + @config.logger.debug { "[LDClient] #{uri} response status:[#{status}] ETag:[#{etag}]" } + + changeset_result = LaunchDarkly::Impl::DataSystem.fdv1_polling_payload_to_changeset(data) + if changeset_result.success? + LaunchDarkly::Result.success([changeset_result.value, response_headers]) + else + LaunchDarkly::Result.fail(changeset_result.error, changeset_result.exception, response_headers) + end + rescue JSON::ParserError => e + LaunchDarkly::Result.fail("Failed to parse JSON: #{e.message}", e, response_headers) + rescue => e + LaunchDarkly::Result.fail("Network error: #{e.message}", e) + end + end + end + + # + # Converts a polling payload into a ChangeSet. + # + # @param data [Hash] The polling payload + # @return [LaunchDarkly::Result] Result containing ChangeSet on success, or error message on failure + # + def self.polling_payload_to_changeset(data) + unless data[:events].is_a?(Array) + return LaunchDarkly::Result.fail("Invalid payload: 'events' key is missing or not a list") + end + + builder = LaunchDarkly::Interfaces::DataSystem::ChangeSetBuilder.new + + data[:events].each do |event| + unless event.is_a?(Hash) + return LaunchDarkly::Result.fail("Invalid payload: 'events' must be a list of objects") + end + + next unless event[:event] + + case event[:event] + when LaunchDarkly::Interfaces::DataSystem::EventName::SERVER_INTENT + begin + server_intent = LaunchDarkly::Interfaces::DataSystem::ServerIntent.from_h(event[:data]) + rescue ArgumentError => e + return LaunchDarkly::Result.fail("Invalid JSON in server intent", e) + end + + if server_intent.payload.code == LaunchDarkly::Interfaces::DataSystem::IntentCode::TRANSFER_NONE + return LaunchDarkly::Result.success(LaunchDarkly::Interfaces::DataSystem::ChangeSetBuilder.no_changes) + end + + builder.start(server_intent.payload.code) + + when LaunchDarkly::Interfaces::DataSystem::EventName::PUT_OBJECT + begin + put = LaunchDarkly::Impl::DataSystem::ProtocolV2::PutObject.from_h(event[:data]) + rescue ArgumentError => e + return LaunchDarkly::Result.fail("Invalid JSON in put object", e) + end + + builder.add_put(put.kind, put.key, put.version, put.object) + + when LaunchDarkly::Interfaces::DataSystem::EventName::DELETE_OBJECT + begin + delete_object = LaunchDarkly::Impl::DataSystem::ProtocolV2::DeleteObject.from_h(event[:data]) + rescue ArgumentError => e + return LaunchDarkly::Result.fail("Invalid JSON in delete object", e) + end + + builder.add_delete(delete_object.kind, delete_object.key, delete_object.version) + + when LaunchDarkly::Interfaces::DataSystem::EventName::PAYLOAD_TRANSFERRED + begin + selector = LaunchDarkly::Interfaces::DataSystem::Selector.from_h(event[:data]) + changeset = builder.finish(selector) + return LaunchDarkly::Result.success(changeset) + rescue ArgumentError, RuntimeError => e + return LaunchDarkly::Result.fail("Invalid JSON in payload transferred object", e) + end + end + end + + LaunchDarkly::Result.fail("didn't receive any known protocol events in polling payload") + end + + # + # Converts an FDv1 polling payload into a ChangeSet. + # + # @param data [Hash] The FDv1 polling payload + # @return [LaunchDarkly::Result] Result containing ChangeSet on success, or error message on failure + # + def self.fdv1_polling_payload_to_changeset(data) + builder = LaunchDarkly::Interfaces::DataSystem::ChangeSetBuilder.new + builder.start(LaunchDarkly::Interfaces::DataSystem::IntentCode::TRANSFER_FULL) + selector = LaunchDarkly::Interfaces::DataSystem::Selector.no_selector + + kind_mappings = [ + [LaunchDarkly::Interfaces::DataSystem::ObjectKind::FLAG, :flags], + [LaunchDarkly::Interfaces::DataSystem::ObjectKind::SEGMENT, :segments], + ] + + kind_mappings.each do |kind, fdv1_key| + kind_data = data[fdv1_key] + next if kind_data.nil? + + unless kind_data.is_a?(Hash) + return LaunchDarkly::Result.fail("Invalid format: #{fdv1_key} is not an object") + end + + kind_data.each do |key, flag_or_segment| + unless flag_or_segment.is_a?(Hash) + return LaunchDarkly::Result.fail("Invalid format: #{key} is not an object") + end + + version = flag_or_segment[:version] + return LaunchDarkly::Result.fail("Invalid format: #{key} does not have a version set") if version.nil? + + builder.add_put(kind, key.to_s, version, flag_or_segment) + end + end + + LaunchDarkly::Result.success(builder.finish(selector)) + end + + # + # Builder for a PollingDataSource. + # + class PollingDataSourceBuilder + # + # @param sdk_key [String] + # @param config [LaunchDarkly::Config] + # + def initialize(sdk_key, config) + @sdk_key = sdk_key + @config = config + @requester = nil + end + + # + # Sets a custom Requester for the PollingDataSource. + # + # @param requester [Requester] + # @return [PollingDataSourceBuilder] + # + def requester(requester) + @requester = requester + self + end + + # + # Builds the PollingDataSource with the configured parameters. + # + # @return [PollingDataSource] + # + def build + requester = @requester || HTTPPollingRequester.new(@sdk_key, @config) + PollingDataSource.new(@config.poll_interval, requester, @config.logger) + end + end + + # + # Builder for an FDv1 PollingDataSource. + # + class FDv1PollingDataSourceBuilder + # + # @param sdk_key [String] + # @param config [LaunchDarkly::Config] + # + def initialize(sdk_key, config) + @sdk_key = sdk_key + @config = config + @requester = nil + end + + # + # Sets a custom Requester for the PollingDataSource. + # + # @param requester [Requester] + # @return [FDv1PollingDataSourceBuilder] + # + def requester(requester) + @requester = requester + self + end + + # + # Builds the PollingDataSource with the configured parameters. + # + # @return [PollingDataSource] + # + def build + requester = @requester || HTTPFDv1PollingRequester.new(@sdk_key, @config) + PollingDataSource.new(@config.poll_interval, requester, @config.logger) + end + end + end + end +end diff --git a/lib/ldclient-rb/interfaces/data_system.rb b/lib/ldclient-rb/interfaces/data_system.rb index 00fdcc23..00ccba25 100644 --- a/lib/ldclient-rb/interfaces/data_system.rb +++ b/lib/ldclient-rb/interfaces/data_system.rb @@ -464,7 +464,7 @@ def initialize def self.no_changes ChangeSet.new( intent_code: IntentCode::TRANSFER_NONE, - selector: nil, + selector: Selector.no_selector, changes: [] ) end diff --git a/lib/ldclient-rb/util.rb b/lib/ldclient-rb/util.rb index 242e5447..206e4788 100644 --- a/lib/ldclient-rb/util.rb +++ b/lib/ldclient-rb/util.rb @@ -27,10 +27,11 @@ def self.success(value) # # @param error [String] # @param exception [Exception, nil] + # @param headers [Hash, nil] # @return [Result] # - def self.fail(error, exception = nil) - Result.new(nil, error, exception) + def self.fail(error, exception = nil, headers = nil) + Result.new(nil, error, exception, headers) end # @@ -57,10 +58,16 @@ def success? # attr_reader :exception - private def initialize(value, error = nil, exception = nil) + # + # @return [Hash] Optional headers associated with the result + # + attr_reader :headers + + private def initialize(value, error = nil, exception = nil, headers = nil) @value = value @error = error @exception = exception + @headers = headers || {} end end end diff --git a/spec/impl/data_system/polling_initializer_spec.rb b/spec/impl/data_system/polling_initializer_spec.rb new file mode 100644 index 00000000..c94d9fdb --- /dev/null +++ b/spec/impl/data_system/polling_initializer_spec.rb @@ -0,0 +1,157 @@ +# frozen_string_literal: true + +require "spec_helper" +require "ldclient-rb/impl/data_system/polling" +require "ldclient-rb/interfaces" + +module LaunchDarkly + module Impl + module DataSystem + RSpec.describe PollingDataSource do + let(:logger) { double("Logger", info: nil, warn: nil, error: nil, debug: nil) } + + class MockExceptionThrowingPollingRequester + include Requester + + def fetch(selector) + raise "This is a mock exception for testing purposes." + end + end + + class MockPollingRequester + include Requester + + def initialize(result) + @result = result + end + + def fetch(selector) + @result + end + end + + class MockSelectorStore + include LaunchDarkly::Interfaces::DataSystem::SelectorStore + + def initialize(selector) + @selector = selector + end + + def selector + @selector + end + end + + describe "#fetch" do + it "polling has a name" do + mock_requester = MockPollingRequester.new(LaunchDarkly::Result.fail("failure message")) + ds = PollingDataSource.new(1.0, mock_requester, logger) + + expect(ds.name).to eq("PollingDataSourceV2") + end + + it "error is returned on failure" do + mock_requester = MockPollingRequester.new(LaunchDarkly::Result.fail("failure message")) + ds = PollingDataSource.new(1.0, mock_requester, logger) + + result = ds.fetch(MockSelectorStore.new(LaunchDarkly::Interfaces::DataSystem::Selector.no_selector)) + + expect(result).not_to be_nil + expect(result.success?).to eq(false) + expect(result.error).to eq("failure message") + end + + it "error is recoverable" do + mock_requester = MockPollingRequester.new( + LaunchDarkly::Result.fail( + "failure message", + LaunchDarkly::Impl::DataSource::UnexpectedResponseError.new(408) + ) + ) + ds = PollingDataSource.new(1.0, mock_requester, logger) + + result = ds.fetch(MockSelectorStore.new(LaunchDarkly::Interfaces::DataSystem::Selector.no_selector)) + + expect(result).not_to be_nil + expect(result.success?).to eq(false) + end + + it "error is unrecoverable" do + mock_requester = MockPollingRequester.new( + LaunchDarkly::Result.fail( + "failure message", + LaunchDarkly::Impl::DataSource::UnexpectedResponseError.new(401) + ) + ) + ds = PollingDataSource.new(1.0, mock_requester, logger) + + result = ds.fetch(MockSelectorStore.new(LaunchDarkly::Interfaces::DataSystem::Selector.no_selector)) + + expect(result).not_to be_nil + expect(result.success?).to eq(false) + end + + it "handles transfer none" do + mock_requester = MockPollingRequester.new( + LaunchDarkly::Result.success([LaunchDarkly::Interfaces::DataSystem::ChangeSetBuilder.no_changes, {}]) + ) + ds = PollingDataSource.new(1.0, mock_requester, logger) + + result = ds.fetch(MockSelectorStore.new(LaunchDarkly::Interfaces::DataSystem::Selector.no_selector)) + + expect(result).not_to be_nil + expect(result.success?).to eq(true) + expect(result.value.change_set.intent_code).to eq(LaunchDarkly::Interfaces::DataSystem::IntentCode::TRANSFER_NONE) + expect(result.value.change_set.changes).to eq([]) + expect(result.value.persist).to eq(false) + end + + it "handles uncaught exception" do + mock_requester = MockExceptionThrowingPollingRequester.new + ds = PollingDataSource.new(1.0, mock_requester, logger) + + result = ds.fetch(MockSelectorStore.new(LaunchDarkly::Interfaces::DataSystem::Selector.no_selector)) + + expect(result).not_to be_nil + expect(result.success?).to eq(false) + expect(result.error).to include("Exception encountered when updating flags") + end + + it "handles transfer full" do + payload_str = '{"events":[ {"event":"server-intent","data":{"payloads":[ {"id":"5A46PZ79FQ9D08YYKT79DECDNV","target":461,"intentCode":"xfer-full","reason":"payload-missing"}]}},{"event": "put-object","data": {"key":"sample-feature","kind":"flag","version":461,"object":{"key":"sample-feature","on":false,"prerequisites":[],"targets":[],"contextTargets":[],"rules":[],"fallthrough":{"variation":0},"offVariation":1,"variations":[true,false],"clientSideAvailability":{"usingMobileKey":false,"usingEnvironmentId":false},"clientSide":false,"salt":"9945e63a79a44787805b79728fee1926","trackEvents":false,"trackEventsFallthrough":false,"debugEventsUntilDate":null,"version":112,"deleted":false}}},{"event":"payload-transferred","data":{"state":"(p:5A46PZ79FQ9D08YYKT79DECDNV:461)","id":"5A46PZ79FQ9D08YYKT79DECDNV","version":461}}]}' # rubocop:disable Layout/LineLength + change_set_result = LaunchDarkly::Impl::DataSystem.polling_payload_to_changeset(JSON.parse(payload_str, symbolize_names: true)) + expect(change_set_result.success?).to eq(true) + + mock_requester = MockPollingRequester.new(LaunchDarkly::Result.success([change_set_result.value, {}])) + ds = PollingDataSource.new(1.0, mock_requester, logger) + + result = ds.fetch(MockSelectorStore.new(LaunchDarkly::Interfaces::DataSystem::Selector.no_selector)) + + expect(result).not_to be_nil + expect(result.success?).to eq(true) + expect(result.value.change_set.intent_code).to eq(LaunchDarkly::Interfaces::DataSystem::IntentCode::TRANSFER_FULL) + expect(result.value.change_set.changes.length).to eq(1) + expect(result.value.persist).to eq(true) + end + + it "handles transfer changes" do + payload_str = '{"events":[{"event": "server-intent","data": {"payloads":[{"id":"5A46PZ79FQ9D08YYKT79DECDNV","target":462,"intentCode":"xfer-changes","reason":"stale"}]}},{"event": "put-object","data": {"key":"sample-feature","kind":"flag","version":462,"object":{"key":"sample-feature","on":true,"prerequisites":[],"targets":[],"contextTargets":[],"rules":[],"fallthrough":{"variation":0},"offVariation":1,"variations":[true,false],"clientSideAvailability":{"usingMobileKey":false,"usingEnvironmentId":false},"clientSide":false,"salt":"9945e63a79a44787805b79728fee1926","trackEvents":false,"trackEventsFallthrough":false,"debugEventsUntilDate":null,"version":113,"deleted":false}}},{"event": "payload-transferred","data": {"state":"(p:5A46PZ79FQ9D08YYKT79DECDNV:462)","id":"5A46PZ79FQ9D08YYKT79DECDNV","version":462}}]}' # rubocop:disable Layout/LineLength + change_set_result = LaunchDarkly::Impl::DataSystem.polling_payload_to_changeset(JSON.parse(payload_str, symbolize_names: true)) + expect(change_set_result.success?).to eq(true) + + mock_requester = MockPollingRequester.new(LaunchDarkly::Result.success([change_set_result.value, {}])) + ds = PollingDataSource.new(1.0, mock_requester, logger) + + result = ds.fetch(MockSelectorStore.new(LaunchDarkly::Interfaces::DataSystem::Selector.no_selector)) + + expect(result).not_to be_nil + expect(result.success?).to eq(true) + expect(result.value.change_set.intent_code).to eq(LaunchDarkly::Interfaces::DataSystem::IntentCode::TRANSFER_CHANGES) + expect(result.value.change_set.changes.length).to eq(1) + expect(result.value.persist).to eq(true) + end + end + end + end + end +end diff --git a/spec/impl/data_system/polling_payload_parsing_spec.rb b/spec/impl/data_system/polling_payload_parsing_spec.rb new file mode 100644 index 00000000..93688f5b --- /dev/null +++ b/spec/impl/data_system/polling_payload_parsing_spec.rb @@ -0,0 +1,318 @@ +# frozen_string_literal: true + +require "spec_helper" +require "ldclient-rb/impl/data_system/polling" +require "ldclient-rb/interfaces" +require "json" + +module LaunchDarkly + module Impl + module DataSystem + RSpec.describe ".polling_payload_to_changeset" do + it "payload is missing events key" do + data = {} + result = LaunchDarkly::Impl::DataSystem.polling_payload_to_changeset(data) + expect(result.success?).to eq(false) + end + + it "payload events value is invalid" do + data = { events: "not a list" } + result = LaunchDarkly::Impl::DataSystem.polling_payload_to_changeset(data) + expect(result.success?).to eq(false) + end + + it "payload event is invalid" do + data = { events: ["this should be a dictionary"] } + result = LaunchDarkly::Impl::DataSystem.polling_payload_to_changeset(data) + expect(result.success?).to eq(false) + end + + it "missing protocol events" do + data = { events: [] } + result = LaunchDarkly::Impl::DataSystem.polling_payload_to_changeset(data) + expect(result.success?).to eq(false) + end + + it "transfer none" do + payload_str = '{"events":[{"event": "server-intent","data": {"payloads":[{"id":"5A46PZ79FQ9D08YYKT79DECDNV","target":462,"intentCode":"none","reason":"up-to-date"}]}}]}' + result = LaunchDarkly::Impl::DataSystem.polling_payload_to_changeset(JSON.parse(payload_str, symbolize_names: true)) + + expect(result).not_to be_nil + expect(result.value.intent_code).to eq(LaunchDarkly::Interfaces::DataSystem::IntentCode::TRANSFER_NONE) + expect(result.value.changes.length).to eq(0) + expect(result.value.selector).not_to be_nil + expect(result.value.selector.defined?).to eq(false) + end + + it "transfer full with empty payload" do + payload_str = '{"events":[ {"event":"server-intent","data":{"payloads":[ {"id":"5A46PZ79FQ9D08YYKT79DECDNV","target":461,"intentCode":"xfer-full","reason":"payload-missing"}]}},{"event":"payload-transferred","data":{"state":"(p:5A46PZ79FQ9D08YYKT79DECDNV:461)","id":"5A46PZ79FQ9D08YYKT79DECDNV","version":461}}]}' # rubocop:disable Layout/LineLength + result = LaunchDarkly::Impl::DataSystem.polling_payload_to_changeset(JSON.parse(payload_str, symbolize_names: true)) + + expect(result).not_to be_nil + expect(result.value.intent_code).to eq(LaunchDarkly::Interfaces::DataSystem::IntentCode::TRANSFER_FULL) + expect(result.value.changes.length).to eq(0) + expect(result.value.selector).not_to be_nil + expect(result.value.selector.state).to eq("(p:5A46PZ79FQ9D08YYKT79DECDNV:461)") + expect(result.value.selector.version).to eq(461) + end + + it "server intent decoding fails" do + payload_str = '{"events":[ {"event":"server-intent","data":{}},{"event":"payload-transferred","data":{"state":"(p:5A46PZ79FQ9D08YYKT79DECDNV:461)","id":"5A46PZ79FQ9D08YYKT79DECDNV","version":461}}]}' # rubocop:disable Layout/LineLength + result = LaunchDarkly::Impl::DataSystem.polling_payload_to_changeset(JSON.parse(payload_str, symbolize_names: true)) + expect(result.success?).to eq(false) + end + + it "processes put object" do + payload_str = '{"events":[ {"event":"server-intent","data":{"payloads":[ {"id":"5A46PZ79FQ9D08YYKT79DECDNV","target":461,"intentCode":"xfer-full","reason":"payload-missing"}]}},{"event": "put-object","data": {"key":"sample-feature","kind":"flag","version":461,"object":{"key":"sample-feature","on":false,"prerequisites":[],"targets":[],"contextTargets":[],"rules":[],"fallthrough":{"variation":0},"offVariation":1,"variations":[true,false],"clientSideAvailability":{"usingMobileKey":false,"usingEnvironmentId":false},"clientSide":false,"salt":"9945e63a79a44787805b79728fee1926","trackEvents":false,"trackEventsFallthrough":false,"debugEventsUntilDate":null,"version":112,"deleted":false}}},{"event":"payload-transferred","data":{"state":"(p:5A46PZ79FQ9D08YYKT79DECDNV:461)","id":"5A46PZ79FQ9D08YYKT79DECDNV","version":461}}]}' # rubocop:disable Layout/LineLength + result = LaunchDarkly::Impl::DataSystem.polling_payload_to_changeset(JSON.parse(payload_str, symbolize_names: true)) + expect(result).not_to be_nil + + expect(result.value.intent_code).to eq(LaunchDarkly::Interfaces::DataSystem::IntentCode::TRANSFER_FULL) + expect(result.value.changes.length).to eq(1) + + expect(result.value.changes[0].action).to eq(LaunchDarkly::Interfaces::DataSystem::ChangeType::PUT) + expect(result.value.changes[0].kind).to eq(LaunchDarkly::Interfaces::DataSystem::ObjectKind::FLAG) + expect(result.value.changes[0].key).to eq("sample-feature") + expect(result.value.changes[0].version).to eq(461) + expect(result.value.changes[0].object).to be_a(Hash) + + expect(result.value.selector).not_to be_nil + expect(result.value.selector.state).to eq("(p:5A46PZ79FQ9D08YYKT79DECDNV:461)") + expect(result.value.selector.version).to eq(461) + end + + it "processes delete object" do + payload_str = '{"events":[ {"event":"server-intent","data":{"payloads":[ {"id":"5A46PZ79FQ9D08YYKT79DECDNV","target":461,"intentCode":"xfer-full","reason":"payload-missing"}]}},{"event": "delete-object","data": {"key":"sample-feature","kind":"flag","version":461}},{"event":"payload-transferred","data":{"state":"(p:5A46PZ79FQ9D08YYKT79DECDNV:461)","id":"5A46PZ79FQ9D08YYKT79DECDNV","version":461}}]}' # rubocop:disable Layout/LineLength + result = LaunchDarkly::Impl::DataSystem.polling_payload_to_changeset(JSON.parse(payload_str, symbolize_names: true)) + expect(result).not_to be_nil + + expect(result.value.intent_code).to eq(LaunchDarkly::Interfaces::DataSystem::IntentCode::TRANSFER_FULL) + expect(result.value.changes.length).to eq(1) + + expect(result.value.changes[0].action).to eq(LaunchDarkly::Interfaces::DataSystem::ChangeType::DELETE) + expect(result.value.changes[0].kind).to eq(LaunchDarkly::Interfaces::DataSystem::ObjectKind::FLAG) + expect(result.value.changes[0].key).to eq("sample-feature") + expect(result.value.changes[0].version).to eq(461) + expect(result.value.changes[0].object).to be_nil + + expect(result.value.selector).not_to be_nil + expect(result.value.selector.state).to eq("(p:5A46PZ79FQ9D08YYKT79DECDNV:461)") + expect(result.value.selector.version).to eq(461) + end + + it "handles invalid put object" do + payload_str = '{"events":[ {"event":"server-intent","data":{"payloads":[ {"id":"5A46PZ79FQ9D08YYKT79DECDNV","target":461,"intentCode":"xfer-full","reason":"payload-missing"}]}},{"event": "put-object","data": {}},{"event":"payload-transferred","data":{"state":"(p:5A46PZ79FQ9D08YYKT79DECDNV:461)","id":"5A46PZ79FQ9D08YYKT79DECDNV","version":461}}]}' # rubocop:disable Layout/LineLength + result = LaunchDarkly::Impl::DataSystem.polling_payload_to_changeset(JSON.parse(payload_str, symbolize_names: true)) + expect(result.success?).to eq(false) + end + + it "handles invalid delete object" do + payload_str = '{"events":[ {"event":"server-intent","data":{"payloads":[ {"id":"5A46PZ79FQ9D08YYKT79DECDNV","target":461,"intentCode":"xfer-full","reason":"payload-missing"}]}},{"event": "delete-object","data": {}},{"event":"payload-transferred","data":{"state":"(p:5A46PZ79FQ9D08YYKT79DECDNV:461)","id":"5A46PZ79FQ9D08YYKT79DECDNV","version":461}}]}' # rubocop:disable Layout/LineLength + result = LaunchDarkly::Impl::DataSystem.polling_payload_to_changeset(JSON.parse(payload_str, symbolize_names: true)) + expect(result.success?).to eq(false) + end + + it "handles invalid payload transferred" do + payload_str = '{"events":[ {"event":"server-intent","data":{"payloads":[ {"id":"5A46PZ79FQ9D08YYKT79DECDNV","target":461,"intentCode":"xfer-full","reason":"payload-missing"}]}},{"event":"payload-transferred","data":{}}]}' # rubocop:disable Layout/LineLength + result = LaunchDarkly::Impl::DataSystem.polling_payload_to_changeset(JSON.parse(payload_str, symbolize_names: true)) + expect(result.success?).to eq(false) + end + + it "fails if starts with transferred" do + payload_str = '{"events":[ {"event":"payload-transferred","data":{"state":"(p:5A46PZ79FQ9D08YYKT79DECDNV:461)","id":"5A46PZ79FQ9D08YYKT79DECDNV","version":461}},{"event":"server-intent","data":{"payloads":[ {"id":"5A46PZ79FQ9D08YYKT79DECDNV","target":461,"intentCode":"xfer-full","reason":"payload-missing"}]}},{"event": "put-object","data": {"key":"sample-feature","kind":"flag","version":461,"object":{"key":"sample-feature","on":false,"prerequisites":[],"targets":[],"contextTargets":[],"rules":[],"fallthrough":{"variation":0},"offVariation":1,"variations":[true,false],"clientSideAvailability":{"usingMobileKey":false,"usingEnvironmentId":false},"clientSide":false,"salt":"9945e63a79a44787805b79728fee1926","trackEvents":false,"trackEventsFallthrough":false,"debugEventsUntilDate":null,"version":112,"deleted":false}}}]}' # rubocop:disable Layout/LineLength + result = LaunchDarkly::Impl::DataSystem.polling_payload_to_changeset(JSON.parse(payload_str, symbolize_names: true)) + expect(result.success?).to eq(false) + end + + it "fails if starts with put" do + payload_str = '{"events":[ {"event": "put-object","data": {"key":"sample-feature","kind":"flag","version":461,"object":{"key":"sample-feature","on":false,"prerequisites":[],"targets":[],"contextTargets":[],"rules":[],"fallthrough":{"variation":0},"offVariation":1,"variations":[true,false],"clientSideAvailability":{"usingMobileKey":false,"usingEnvironmentId":false},"clientSide":false,"salt":"9945e63a79a44787805b79728fee1926","trackEvents":false,"trackEventsFallthrough":false,"debugEventsUntilDate":null,"version":112,"deleted":false}}},{"event":"payload-transferred","data":{"state":"(p:5A46PZ79FQ9D08YYKT79DECDNV:461)","id":"5A46PZ79FQ9D08YYKT79DECDNV","version":461}},{"event":"server-intent","data":{"payloads":[ {"id":"5A46PZ79FQ9D08YYKT79DECDNV","target":461,"intentCode":"xfer-full","reason":"payload-missing"}]}}]}' # rubocop:disable Layout/LineLength + result = LaunchDarkly::Impl::DataSystem.polling_payload_to_changeset(JSON.parse(payload_str, symbolize_names: true)) + expect(result.success?).to eq(false) + end + end + + RSpec.describe ".fdv1_polling_payload_to_changeset" do + it "handles empty flags and segments" do + data = { + flags: {}, + segments: {}, + } + result = LaunchDarkly::Impl::DataSystem.fdv1_polling_payload_to_changeset(data) + expect(result).not_to be_nil + + expect(result.value.intent_code).to eq(LaunchDarkly::Interfaces::DataSystem::IntentCode::TRANSFER_FULL) + expect(result.value.changes.length).to eq(0) + expect(result.value.selector).not_to be_nil + expect(result.value.selector.defined?).to eq(false) + end + + it "handles single flag" do + data = { + flags: { + "test-flag" => { + key: "test-flag", + version: 1, + on: true, + variations: [true, false], + }, + }, + segments: {}, + } + result = LaunchDarkly::Impl::DataSystem.fdv1_polling_payload_to_changeset(data) + expect(result).not_to be_nil + + expect(result.value.intent_code).to eq(LaunchDarkly::Interfaces::DataSystem::IntentCode::TRANSFER_FULL) + expect(result.value.changes.length).to eq(1) + + change = result.value.changes[0] + expect(change.action).to eq(LaunchDarkly::Interfaces::DataSystem::ChangeType::PUT) + expect(change.kind).to eq(LaunchDarkly::Interfaces::DataSystem::ObjectKind::FLAG) + expect(change.key).to eq("test-flag") + expect(change.version).to eq(1) + end + + it "handles multiple flags" do + data = { + flags: { + "flag-1" => { key: "flag-1", version: 1, on: true }, + "flag-2" => { key: "flag-2", version: 2, on: false }, + "flag-3" => { key: "flag-3", version: 3, on: true }, + }, + segments: {}, + } + result = LaunchDarkly::Impl::DataSystem.fdv1_polling_payload_to_changeset(data) + expect(result).not_to be_nil + + expect(result.value.changes.length).to eq(3) + flag_keys = result.value.changes.map(&:key).to_set + expect(flag_keys).to eq(Set["flag-1", "flag-2", "flag-3"]) + end + + it "handles single segment" do + data = { + flags: {}, + segments: { + "test-segment" => { + key: "test-segment", + version: 5, + included: ["user1", "user2"], + }, + }, + } + result = LaunchDarkly::Impl::DataSystem.fdv1_polling_payload_to_changeset(data) + expect(result).not_to be_nil + + expect(result.value.changes.length).to eq(1) + change = result.value.changes[0] + expect(change.action).to eq(LaunchDarkly::Interfaces::DataSystem::ChangeType::PUT) + expect(change.kind).to eq(LaunchDarkly::Interfaces::DataSystem::ObjectKind::SEGMENT) + expect(change.key).to eq("test-segment") + expect(change.version).to eq(5) + end + + it "handles flags and segments" do + data = { + flags: { + "flag-1" => { key: "flag-1", version: 1, on: true }, + "flag-2" => { key: "flag-2", version: 2, on: false }, + }, + segments: { + "segment-1" => { key: "segment-1", version: 10 }, + "segment-2" => { key: "segment-2", version: 20 }, + }, + } + result = LaunchDarkly::Impl::DataSystem.fdv1_polling_payload_to_changeset(data) + expect(result).not_to be_nil + + expect(result.value.changes.length).to eq(4) + + flag_changes = result.value.changes.select { |c| c.kind == LaunchDarkly::Interfaces::DataSystem::ObjectKind::FLAG } + segment_changes = result.value.changes.select { |c| c.kind == LaunchDarkly::Interfaces::DataSystem::ObjectKind::SEGMENT } + + expect(flag_changes.length).to eq(2) + expect(segment_changes.length).to eq(2) + end + + it "fails when flags is not dict" do + data = { + flags: "not a dict", + } + result = LaunchDarkly::Impl::DataSystem.fdv1_polling_payload_to_changeset(data) + expect(result.success?).to eq(false) + end + + it "fails when segments is not dict" do + data = { + flags: {}, + segments: "not a dict", + } + result = LaunchDarkly::Impl::DataSystem.fdv1_polling_payload_to_changeset(data) + expect(result.success?).to eq(false) + end + + it "fails when flag value is not dict" do + data = { + flags: { + "bad-flag" => "not a dict", + }, + } + result = LaunchDarkly::Impl::DataSystem.fdv1_polling_payload_to_changeset(data) + expect(result.success?).to eq(false) + end + + it "fails when flag missing version" do + data = { + flags: { + "no-version-flag" => { + key: "no-version-flag", + on: true, + }, + }, + } + result = LaunchDarkly::Impl::DataSystem.fdv1_polling_payload_to_changeset(data) + expect(result.success?).to eq(false) + end + + it "fails when segment missing version" do + data = { + flags: {}, + segments: { + "no-version-segment" => { + key: "no-version-segment", + included: [], + }, + }, + } + result = LaunchDarkly::Impl::DataSystem.fdv1_polling_payload_to_changeset(data) + expect(result.success?).to eq(false) + end + + it "works with only flags, no segments key" do + data = { + flags: { + "test-flag" => { key: "test-flag", version: 1, on: true }, + }, + } + result = LaunchDarkly::Impl::DataSystem.fdv1_polling_payload_to_changeset(data) + expect(result).not_to be_nil + + expect(result.value.changes.length).to eq(1) + expect(result.value.changes[0].key).to eq("test-flag") + end + + it "works with only segments, no flags key" do + data = { + segments: { + "test-segment" => { key: "test-segment", version: 1 }, + }, + } + result = LaunchDarkly::Impl::DataSystem.fdv1_polling_payload_to_changeset(data) + expect(result).not_to be_nil + + expect(result.value.changes.length).to eq(1) + expect(result.value.changes[0].key).to eq("test-segment") + end + end + end + end +end diff --git a/spec/impl/data_system/polling_synchronizer_spec.rb b/spec/impl/data_system/polling_synchronizer_spec.rb new file mode 100644 index 00000000..dcf551d6 --- /dev/null +++ b/spec/impl/data_system/polling_synchronizer_spec.rb @@ -0,0 +1,668 @@ +# frozen_string_literal: true + +require "spec_helper" +require "ldclient-rb/impl/data_system/polling" +require "ldclient-rb/interfaces" + +module LaunchDarkly + module Impl + module DataSystem + RSpec.describe PollingDataSource do + let(:logger) { double("Logger", info: nil, warn: nil, error: nil, debug: nil) } + + class ListBasedRequester + include Requester + + def initialize(results) + @results = results + @index = 0 + end + + def fetch(selector) + @results[@index].tap { @index += 1 } + end + end + + class MockSelectorStore + include LaunchDarkly::Interfaces::DataSystem::SelectorStore + + def initialize(selector) + @selector = selector + end + + def selector + @selector + end + end + + describe "#sync" do + it "handles no changes" do + change_set = LaunchDarkly::Interfaces::DataSystem::ChangeSetBuilder.no_changes + headers = {} + polling_result = LaunchDarkly::Result.success([change_set, headers]) + + synchronizer = PollingDataSource.new(0.01, ListBasedRequester.new([polling_result]), logger) + updates = [] + + thread = Thread.new do + synchronizer.sync(MockSelectorStore.new(LaunchDarkly::Interfaces::DataSystem::Selector.no_selector)) do |update| + updates << update + break + end + end + + thread.join(1) + synchronizer.stop + + expect(updates.length).to eq(1) + valid = updates[0] + + expect(valid.state).to eq(LaunchDarkly::Interfaces::DataSource::Status::VALID) + expect(valid.error).to be_nil + expect(valid.revert_to_fdv1).to eq(false) + expect(valid.environment_id).to be_nil + expect(valid.change_set).not_to be_nil + expect(valid.change_set.intent_code).to eq(LaunchDarkly::Interfaces::DataSystem::IntentCode::TRANSFER_NONE) + expect(valid.change_set.changes.length).to eq(0) + end + + it "handles empty changeset" do + builder = LaunchDarkly::Interfaces::DataSystem::ChangeSetBuilder.new + builder.start(LaunchDarkly::Interfaces::DataSystem::IntentCode::TRANSFER_FULL) + change_set = builder.finish(LaunchDarkly::Interfaces::DataSystem::Selector.new(state: "p:SOMETHING:300", version: 300)) + headers = {} + polling_result = LaunchDarkly::Result.success([change_set, headers]) + + synchronizer = PollingDataSource.new(0.01, ListBasedRequester.new([polling_result]), logger) + updates = [] + + thread = Thread.new do + synchronizer.sync(MockSelectorStore.new(LaunchDarkly::Interfaces::DataSystem::Selector.no_selector)) do |update| + updates << update + break + end + end + + thread.join(1) + synchronizer.stop + + expect(updates.length).to eq(1) + valid = updates[0] + + expect(valid.state).to eq(LaunchDarkly::Interfaces::DataSource::Status::VALID) + expect(valid.error).to be_nil + expect(valid.revert_to_fdv1).to eq(false) + expect(valid.environment_id).to be_nil + expect(valid.change_set).not_to be_nil + expect(valid.change_set.changes.length).to eq(0) + expect(valid.change_set.selector).not_to be_nil + expect(valid.change_set.selector.version).to eq(300) + expect(valid.change_set.selector.state).to eq("p:SOMETHING:300") + expect(valid.change_set.intent_code).to eq(LaunchDarkly::Interfaces::DataSystem::IntentCode::TRANSFER_FULL) + end + + it "handles put objects" do + builder = LaunchDarkly::Interfaces::DataSystem::ChangeSetBuilder.new + builder.start(LaunchDarkly::Interfaces::DataSystem::IntentCode::TRANSFER_FULL) + builder.add_put( + LaunchDarkly::Interfaces::DataSystem::ObjectKind::FLAG, + "flag-key", + 100, + { key: "flag-key" } + ) + change_set = builder.finish(LaunchDarkly::Interfaces::DataSystem::Selector.new(state: "p:SOMETHING:300", version: 300)) + headers = {} + polling_result = LaunchDarkly::Result.success([change_set, headers]) + + synchronizer = PollingDataSource.new(0.01, ListBasedRequester.new([polling_result]), logger) + updates = [] + + thread = Thread.new do + synchronizer.sync(MockSelectorStore.new(LaunchDarkly::Interfaces::DataSystem::Selector.no_selector)) do |update| + updates << update + break + end + end + + thread.join(1) + synchronizer.stop + + expect(updates.length).to eq(1) + valid = updates[0] + + expect(valid.state).to eq(LaunchDarkly::Interfaces::DataSource::Status::VALID) + expect(valid.error).to be_nil + expect(valid.revert_to_fdv1).to eq(false) + expect(valid.environment_id).to be_nil + expect(valid.change_set).not_to be_nil + expect(valid.change_set.changes.length).to eq(1) + expect(valid.change_set.changes[0].action).to eq(LaunchDarkly::Interfaces::DataSystem::ChangeType::PUT) + expect(valid.change_set.changes[0].kind).to eq(LaunchDarkly::Interfaces::DataSystem::ObjectKind::FLAG) + expect(valid.change_set.changes[0].key).to eq("flag-key") + expect(valid.change_set.changes[0].object).to eq({ key: "flag-key" }) + expect(valid.change_set.changes[0].version).to eq(100) + expect(valid.change_set.selector).not_to be_nil + expect(valid.change_set.selector.version).to eq(300) + expect(valid.change_set.selector.state).to eq("p:SOMETHING:300") + expect(valid.change_set.intent_code).to eq(LaunchDarkly::Interfaces::DataSystem::IntentCode::TRANSFER_FULL) + end + + it "handles delete objects" do + builder = LaunchDarkly::Interfaces::DataSystem::ChangeSetBuilder.new + builder.start(LaunchDarkly::Interfaces::DataSystem::IntentCode::TRANSFER_FULL) + builder.add_delete(LaunchDarkly::Interfaces::DataSystem::ObjectKind::FLAG, "flag-key", 101) + change_set = builder.finish(LaunchDarkly::Interfaces::DataSystem::Selector.new(state: "p:SOMETHING:300", version: 300)) + headers = {} + polling_result = LaunchDarkly::Result.success([change_set, headers]) + + synchronizer = PollingDataSource.new(0.01, ListBasedRequester.new([polling_result]), logger) + updates = [] + + thread = Thread.new do + synchronizer.sync(MockSelectorStore.new(LaunchDarkly::Interfaces::DataSystem::Selector.no_selector)) do |update| + updates << update + break + end + end + + thread.join(1) + synchronizer.stop + + expect(updates.length).to eq(1) + valid = updates[0] + + expect(valid.state).to eq(LaunchDarkly::Interfaces::DataSource::Status::VALID) + expect(valid.error).to be_nil + expect(valid.revert_to_fdv1).to eq(false) + expect(valid.environment_id).to be_nil + expect(valid.change_set).not_to be_nil + expect(valid.change_set.changes.length).to eq(1) + expect(valid.change_set.changes[0].action).to eq(LaunchDarkly::Interfaces::DataSystem::ChangeType::DELETE) + expect(valid.change_set.changes[0].kind).to eq(LaunchDarkly::Interfaces::DataSystem::ObjectKind::FLAG) + expect(valid.change_set.changes[0].key).to eq("flag-key") + expect(valid.change_set.changes[0].version).to eq(101) + expect(valid.change_set.selector).not_to be_nil + expect(valid.change_set.selector.version).to eq(300) + expect(valid.change_set.selector.state).to eq("p:SOMETHING:300") + expect(valid.change_set.intent_code).to eq(LaunchDarkly::Interfaces::DataSystem::IntentCode::TRANSFER_FULL) + end + + it "generic error interrupts and recovers" do + builder = LaunchDarkly::Interfaces::DataSystem::ChangeSetBuilder.new + builder.start(LaunchDarkly::Interfaces::DataSystem::IntentCode::TRANSFER_FULL) + builder.add_delete(LaunchDarkly::Interfaces::DataSystem::ObjectKind::FLAG, "flag-key", 101) + change_set = builder.finish(LaunchDarkly::Interfaces::DataSystem::Selector.new(state: "p:SOMETHING:300", version: 300)) + headers = {} + polling_result = LaunchDarkly::Result.success([change_set, headers]) + + synchronizer = PollingDataSource.new( + 0.01, + ListBasedRequester.new([ + LaunchDarkly::Result.fail("error for test"), + polling_result, + ]), + logger + ) + updates = [] + + thread = Thread.new do + synchronizer.sync(MockSelectorStore.new(LaunchDarkly::Interfaces::DataSystem::Selector.no_selector)) do |update| + updates << update + break if updates.length >= 2 + end + end + + thread.join(2) + synchronizer.stop + + expect(updates.length).to eq(2) + interrupted = updates[0] + valid = updates[1] + + expect(interrupted.state).to eq(LaunchDarkly::Interfaces::DataSource::Status::INTERRUPTED) + expect(interrupted.error).not_to be_nil + expect(interrupted.error.kind).to eq(LaunchDarkly::Interfaces::DataSource::ErrorInfo::NETWORK_ERROR) + expect(interrupted.error.status_code).to eq(0) + expect(interrupted.error.message).to eq("error for test") + expect(interrupted.revert_to_fdv1).to eq(false) + expect(interrupted.environment_id).to be_nil + + expect(valid.change_set).not_to be_nil + expect(valid.change_set.changes.length).to eq(1) + expect(valid.change_set.intent_code).to eq(LaunchDarkly::Interfaces::DataSystem::IntentCode::TRANSFER_FULL) + expect(valid.change_set.changes[0].action).to eq(LaunchDarkly::Interfaces::DataSystem::ChangeType::DELETE) + end + + it "recoverable error continues" do + builder = LaunchDarkly::Interfaces::DataSystem::ChangeSetBuilder.new + builder.start(LaunchDarkly::Interfaces::DataSystem::IntentCode::TRANSFER_FULL) + builder.add_delete(LaunchDarkly::Interfaces::DataSystem::ObjectKind::FLAG, "flag-key", 101) + change_set = builder.finish(LaunchDarkly::Interfaces::DataSystem::Selector.new(state: "p:SOMETHING:300", version: 300)) + headers = {} + polling_result = LaunchDarkly::Result.success([change_set, headers]) + + failure = LaunchDarkly::Result.fail( + "error for test", + LaunchDarkly::Impl::DataSource::UnexpectedResponseError.new(408) + ) + + synchronizer = PollingDataSource.new( + 0.01, + ListBasedRequester.new([failure, polling_result]), + logger + ) + updates = [] + + thread = Thread.new do + synchronizer.sync(MockSelectorStore.new(LaunchDarkly::Interfaces::DataSystem::Selector.no_selector)) do |update| + updates << update + break if updates.length >= 2 + end + end + + thread.join(2) + synchronizer.stop + + expect(updates.length).to eq(2) + interrupted = updates[0] + valid = updates[1] + + expect(interrupted.state).to eq(LaunchDarkly::Interfaces::DataSource::Status::INTERRUPTED) + expect(interrupted.error).not_to be_nil + expect(interrupted.error.kind).to eq(LaunchDarkly::Interfaces::DataSource::ErrorInfo::ERROR_RESPONSE) + expect(interrupted.error.status_code).to eq(408) + expect(interrupted.revert_to_fdv1).to eq(false) + expect(interrupted.environment_id).to be_nil + + expect(valid.state).to eq(LaunchDarkly::Interfaces::DataSource::Status::VALID) + expect(valid.error).to be_nil + expect(valid.revert_to_fdv1).to eq(false) + expect(valid.environment_id).to be_nil + + expect(valid.change_set).not_to be_nil + expect(valid.change_set.changes.length).to eq(1) + expect(valid.change_set.intent_code).to eq(LaunchDarkly::Interfaces::DataSystem::IntentCode::TRANSFER_FULL) + expect(valid.change_set.changes[0].action).to eq(LaunchDarkly::Interfaces::DataSystem::ChangeType::DELETE) + end + + it "unrecoverable error shuts down" do + failure = LaunchDarkly::Result.fail( + "error for test", + LaunchDarkly::Impl::DataSource::UnexpectedResponseError.new(401) + ) + + synchronizer = PollingDataSource.new( + 0.01, + ListBasedRequester.new([failure]), + logger + ) + updates = [] + + thread = Thread.new do + synchronizer.sync(MockSelectorStore.new(LaunchDarkly::Interfaces::DataSystem::Selector.no_selector)) do |update| + updates << update + end + end + + thread.join(1) + synchronizer.stop + + expect(updates.length).to eq(1) + off = updates[0] + expect(off.state).to eq(LaunchDarkly::Interfaces::DataSource::Status::OFF) + expect(off.error).not_to be_nil + expect(off.error.kind).to eq(LaunchDarkly::Interfaces::DataSource::ErrorInfo::ERROR_RESPONSE) + expect(off.error.status_code).to eq(401) + expect(off.revert_to_fdv1).to eq(false) + expect(off.environment_id).to be_nil + expect(off.change_set).to be_nil + end + + it "captures envid from success headers" do + change_set = LaunchDarkly::Interfaces::DataSystem::ChangeSetBuilder.no_changes + headers = { LD_ENVID_HEADER => 'test-env-polling-123' } + polling_result = LaunchDarkly::Result.success([change_set, headers]) + + synchronizer = PollingDataSource.new(0.01, ListBasedRequester.new([polling_result]), logger) + updates = [] + + thread = Thread.new do + synchronizer.sync(MockSelectorStore.new(LaunchDarkly::Interfaces::DataSystem::Selector.no_selector)) do |update| + updates << update + break + end + end + + thread.join(1) + synchronizer.stop + + expect(updates.length).to eq(1) + valid = updates[0] + + expect(valid.state).to eq(LaunchDarkly::Interfaces::DataSource::Status::VALID) + expect(valid.error).to be_nil + expect(valid.revert_to_fdv1).to eq(false) + expect(valid.environment_id).to eq('test-env-polling-123') + end + + it "captures envid and fallback from success with changeset" do + builder = LaunchDarkly::Interfaces::DataSystem::ChangeSetBuilder.new + builder.start(LaunchDarkly::Interfaces::DataSystem::IntentCode::TRANSFER_FULL) + builder.add_put( + LaunchDarkly::Interfaces::DataSystem::ObjectKind::FLAG, + "flag-key", + 100, + { key: "flag-key" } + ) + change_set = builder.finish(LaunchDarkly::Interfaces::DataSystem::Selector.new(state: "p:SOMETHING:300", version: 300)) + headers = { + LD_ENVID_HEADER => 'test-env-456', + LD_FD_FALLBACK_HEADER => 'true', + } + polling_result = LaunchDarkly::Result.success([change_set, headers]) + + synchronizer = PollingDataSource.new(0.01, ListBasedRequester.new([polling_result]), logger) + updates = [] + + thread = Thread.new do + synchronizer.sync(MockSelectorStore.new(LaunchDarkly::Interfaces::DataSystem::Selector.no_selector)) do |update| + updates << update + break + end + end + + thread.join(1) + synchronizer.stop + + expect(updates.length).to eq(1) + valid = updates[0] + + expect(valid.state).to eq(LaunchDarkly::Interfaces::DataSource::Status::VALID) + expect(valid.environment_id).to eq('test-env-456') + expect(valid.revert_to_fdv1).to eq(true) + expect(valid.change_set).not_to be_nil + expect(valid.change_set.changes.length).to eq(1) + end + + it "captures envid from error headers recoverable" do + builder = LaunchDarkly::Interfaces::DataSystem::ChangeSetBuilder.new + builder.start(LaunchDarkly::Interfaces::DataSystem::IntentCode::TRANSFER_FULL) + builder.add_delete(LaunchDarkly::Interfaces::DataSystem::ObjectKind::FLAG, "flag-key", 101) + change_set = builder.finish(LaunchDarkly::Interfaces::DataSystem::Selector.new(state: "p:SOMETHING:300", version: 300)) + headers_success = { LD_ENVID_HEADER => 'test-env-success' } + polling_result = LaunchDarkly::Result.success([change_set, headers_success]) + + headers_error = { LD_ENVID_HEADER => 'test-env-408' } + failure = LaunchDarkly::Result.fail( + "error for test", + LaunchDarkly::Impl::DataSource::UnexpectedResponseError.new(408), + headers_error + ) + + synchronizer = PollingDataSource.new( + 0.01, + ListBasedRequester.new([failure, polling_result]), + logger + ) + updates = [] + + thread = Thread.new do + synchronizer.sync(MockSelectorStore.new(LaunchDarkly::Interfaces::DataSystem::Selector.no_selector)) do |update| + updates << update + break if updates.length >= 2 + end + end + + thread.join(2) + synchronizer.stop + + expect(updates.length).to eq(2) + interrupted = updates[0] + valid = updates[1] + + expect(interrupted.state).to eq(LaunchDarkly::Interfaces::DataSource::Status::INTERRUPTED) + expect(interrupted.environment_id).to eq('test-env-408') + expect(interrupted.error).not_to be_nil + expect(interrupted.error.status_code).to eq(408) + + expect(valid.state).to eq(LaunchDarkly::Interfaces::DataSource::Status::VALID) + expect(valid.environment_id).to eq('test-env-success') + end + + it "captures envid from error headers unrecoverable" do + headers_error = { LD_ENVID_HEADER => 'test-env-401' } + failure = LaunchDarkly::Result.fail( + "error for test", + LaunchDarkly::Impl::DataSource::UnexpectedResponseError.new(401), + headers_error + ) + + synchronizer = PollingDataSource.new( + 0.01, + ListBasedRequester.new([failure]), + logger + ) + updates = [] + + thread = Thread.new do + synchronizer.sync(MockSelectorStore.new(LaunchDarkly::Interfaces::DataSystem::Selector.no_selector)) do |update| + updates << update + end + end + + thread.join(1) + synchronizer.stop + + expect(updates.length).to eq(1) + off = updates[0] + + expect(off.state).to eq(LaunchDarkly::Interfaces::DataSource::Status::OFF) + expect(off.environment_id).to eq('test-env-401') + expect(off.error).not_to be_nil + expect(off.error.status_code).to eq(401) + end + + it "captures envid and fallback from error with fallback" do + headers_error = { + LD_ENVID_HEADER => 'test-env-503', + LD_FD_FALLBACK_HEADER => 'true', + } + failure = LaunchDarkly::Result.fail( + "error for test", + LaunchDarkly::Impl::DataSource::UnexpectedResponseError.new(503), + headers_error + ) + + synchronizer = PollingDataSource.new( + 0.01, + ListBasedRequester.new([failure]), + logger + ) + updates = [] + + thread = Thread.new do + synchronizer.sync(MockSelectorStore.new(LaunchDarkly::Interfaces::DataSystem::Selector.no_selector)) do |update| + updates << update + end + end + + thread.join(1) + synchronizer.stop + + expect(updates.length).to eq(1) + interrupted = updates[0] + + # 503 is recoverable, so status is INTERRUPTED with fallback flag + expect(interrupted.state).to eq(LaunchDarkly::Interfaces::DataSource::Status::INTERRUPTED) + expect(interrupted.revert_to_fdv1).to eq(true) + expect(interrupted.environment_id).to eq('test-env-503') + end + + it "captures envid from generic error with headers" do + builder = LaunchDarkly::Interfaces::DataSystem::ChangeSetBuilder.new + builder.start(LaunchDarkly::Interfaces::DataSystem::IntentCode::TRANSFER_FULL) + change_set = builder.finish(LaunchDarkly::Interfaces::DataSystem::Selector.new(state: "p:SOMETHING:300", version: 300)) + headers_success = {} + polling_result = LaunchDarkly::Result.success([change_set, headers_success]) + + headers_error = { LD_ENVID_HEADER => 'test-env-generic' } + failure = LaunchDarkly::Result.fail("generic error for test", nil, headers_error) + + synchronizer = PollingDataSource.new( + 0.01, + ListBasedRequester.new([failure, polling_result]), + logger + ) + updates = [] + + thread = Thread.new do + synchronizer.sync(MockSelectorStore.new(LaunchDarkly::Interfaces::DataSystem::Selector.no_selector)) do |update| + updates << update + break if updates.length >= 2 + end + end + + thread.join(2) + synchronizer.stop + + expect(updates.length).to eq(2) + interrupted = updates[0] + valid = updates[1] + + expect(interrupted.state).to eq(LaunchDarkly::Interfaces::DataSource::Status::INTERRUPTED) + expect(interrupted.environment_id).to eq('test-env-generic') + expect(interrupted.error).not_to be_nil + expect(interrupted.error.kind).to eq(LaunchDarkly::Interfaces::DataSource::ErrorInfo::NETWORK_ERROR) + + expect(valid.state).to eq(LaunchDarkly::Interfaces::DataSource::Status::VALID) + end + + it "preserves fallback header on JSON parse error" do + headers_with_fallback = { + LD_ENVID_HEADER => 'test-env-parse-error', + LD_FD_FALLBACK_HEADER => 'true', + } + # Simulate a JSON parse error with fallback header + parse_error_result = LaunchDarkly::Result.fail( + "Failed to parse JSON: unexpected token", + JSON::ParserError.new("unexpected token"), + headers_with_fallback + ) + + synchronizer = PollingDataSource.new( + 0.01, + ListBasedRequester.new([parse_error_result]), + logger + ) + updates = [] + + thread = Thread.new do + synchronizer.sync(MockSelectorStore.new(LaunchDarkly::Interfaces::DataSystem::Selector.no_selector)) do |update| + updates << update + break # Break after first update to avoid polling again + end + end + + thread.join(1) + synchronizer.stop + + expect(updates.length).to eq(1) + interrupted = updates[0] + + # Verify the update signals INTERRUPTED state with fallback flag + # Caller (FDv2) will handle shutdown based on revert_to_fdv1 flag + expect(interrupted.state).to eq(LaunchDarkly::Interfaces::DataSource::Status::INTERRUPTED) + expect(interrupted.revert_to_fdv1).to eq(true) + expect(interrupted.environment_id).to eq('test-env-parse-error') + expect(interrupted.error).not_to be_nil + expect(interrupted.error.kind).to eq(LaunchDarkly::Interfaces::DataSource::ErrorInfo::NETWORK_ERROR) + end + + it "signals fallback on recoverable HTTP error with fallback header" do + headers_with_fallback = { + LD_ENVID_HEADER => 'test-env-408', + LD_FD_FALLBACK_HEADER => 'true', + } + # 408 is a recoverable error + error_result = LaunchDarkly::Result.fail( + "error for test", + LaunchDarkly::Impl::DataSource::UnexpectedResponseError.new(408), + headers_with_fallback + ) + + synchronizer = PollingDataSource.new( + 0.01, + ListBasedRequester.new([error_result]), + logger + ) + updates = [] + + thread = Thread.new do + synchronizer.sync(MockSelectorStore.new(LaunchDarkly::Interfaces::DataSystem::Selector.no_selector)) do |update| + updates << update + break # Break after first update to avoid polling again + end + end + + sleep 0.1 # Give thread time to process first update + synchronizer.stop + thread.join(1) + + expect(updates.length).to eq(1) + interrupted = updates[0] + + # Should be INTERRUPTED (recoverable) with fallback flag set + # Caller will handle shutdown based on revert_to_fdv1 flag + expect(interrupted.state).to eq(LaunchDarkly::Interfaces::DataSource::Status::INTERRUPTED) + expect(interrupted.revert_to_fdv1).to eq(true) + expect(interrupted.environment_id).to eq('test-env-408') + expect(interrupted.error).not_to be_nil + expect(interrupted.error.kind).to eq(LaunchDarkly::Interfaces::DataSource::ErrorInfo::ERROR_RESPONSE) + expect(interrupted.error.status_code).to eq(408) + end + + it "uses data but signals fallback on successful response with fallback header" do + builder = LaunchDarkly::Interfaces::DataSystem::ChangeSetBuilder.new + builder.start(LaunchDarkly::Interfaces::DataSystem::IntentCode::TRANSFER_FULL) + change_set = builder.finish(LaunchDarkly::Interfaces::DataSystem::Selector.new(state: "p:SOMETHING:300", version: 300)) + + headers_with_fallback = { + LD_ENVID_HEADER => 'test-env-success-fallback', + LD_FD_FALLBACK_HEADER => 'true', + } + + # Server sends successful response with valid data but also signals fallback + success_result = LaunchDarkly::Result.success([change_set, headers_with_fallback]) + + synchronizer = PollingDataSource.new( + 0.01, + ListBasedRequester.new([success_result]), + logger + ) + updates = [] + + thread = Thread.new do + synchronizer.sync(MockSelectorStore.new(LaunchDarkly::Interfaces::DataSystem::Selector.no_selector)) do |update| + updates << update + end + end + + sleep 0.1 # Give thread time to process first update + synchronizer.stop + thread.join(1) + + expect(updates.length).to eq(1) + valid = updates[0] + + # Should use the data (VALID state) but signal future fallback + expect(valid.state).to eq(LaunchDarkly::Interfaces::DataSource::Status::VALID) + expect(valid.revert_to_fdv1).to eq(true) + expect(valid.environment_id).to eq('test-env-success-fallback') + expect(valid.error).to be_nil + expect(valid.change_set).not_to be_nil # Data is provided + end + end + end + end + end +end