diff --git a/app/controllers/api/hackatime/v1/hackatime_controller.rb b/app/controllers/api/hackatime/v1/hackatime_controller.rb index 41bb9da7b..a8c6480ed 100644 --- a/app/controllers/api/hackatime/v1/hackatime_controller.rb +++ b/app/controllers/api/hackatime/v1/hackatime_controller.rb @@ -235,104 +235,26 @@ def body_to_json end || {} end - LAST_LANGUAGE_SENTINEL = "<>" - def handle_heartbeat(heartbeat_array) - results = [] - last_language = nil - heartbeat_array.each do |heartbeat| - heartbeat = heartbeat.to_h.with_indifferent_access - source_type = :direct_entry - - # Resolve <> sentinel to the most recently used language. - # Check within the current batch first, then fall back to the DB. - if heartbeat[:language] == LAST_LANGUAGE_SENTINEL - heartbeat[:language] = last_language || @user.heartbeats - .where.not(language: [ nil, "", LAST_LANGUAGE_SENTINEL ]) - .order(time: :desc) - .pick(:language) - end - - # Infer language from file extension when client sends blank or Unknown - if heartbeat[:language].blank? || heartbeat[:language] == "Unknown" - inferred = LanguageUtils.detect_from_extension(heartbeat[:entity]) - heartbeat[:language] = inferred if inferred - end - - # Track the last known language for subsequent heartbeats in this batch. - last_language = heartbeat[:language] if heartbeat[:language].present? - - # Fallback to :plugin if :user_agent is not set - fallback_value = heartbeat.delete(:plugin) - heartbeat[:user_agent] ||= fallback_value - - parsed_ua = WakatimeService.parse_user_agent(heartbeat[:user_agent]) - - # if category is not set, just default to coding - heartbeat[:category] ||= "coding" - - # special case: if the entity is "test.txt", this is a test heartbeat - if heartbeat[:entity] == "test.txt" - source_type = :test_entry - end - - heartbeat[:project] = heartbeat[:project]&.gsub(/[[:cntrl:]]/, "")&.strip - - attrs = heartbeat.merge({ - user_id: @user.id, - source_type: source_type, + result = HeartbeatIngest.call( + user: @user, + mode: :direct, + heartbeats: heartbeat_array, + request_context: { ip_address: request.headers["CF-Connecting-IP"] || request.remote_ip, - editor: parsed_ua[:editor], - operating_system: parsed_ua[:os], machine: request.headers["X-Machine-Name"] - }).slice(*Heartbeat.column_names.map(&:to_sym)) - # ^^ They say safety laws are written in blood. Well, so is this line! - # Basically this filters out columns that aren't in our DB (the biggest one being raw_data) - temp = Heartbeat.new(attrs) - fields_hash = Heartbeat.generate_fields_hash(temp.attributes) - new_heartbeat = @user.heartbeats.find_by(fields_hash: fields_hash) - - unless new_heartbeat - now = Time.current - insert_attrs = attrs.merge( - fields_hash: fields_hash, - created_at: now, - updated_at: now - ) - result = Heartbeat.insert( - insert_attrs, - unique_by: :fields_hash, - returning: Heartbeat.column_names - ) - - if result.any? - new_heartbeat = Heartbeat.new(result.first) - # This uses insert for deduplication, so model validations/callbacks are skipped. - # Keep manual fields_hash and rollup scheduling in sync with Heartbeat callbacks. - DashboardRollupRefreshJob.schedule_for(@user.id) - else - new_heartbeat = @user.heartbeats.find_by!(fields_hash: fields_hash) - end + } + ) + + result.items.map do |item| + if item.status == :accepted + [ item.heartbeat.attributes, 201 ] + else + error = item.error + report_error(error, message: "Error creating heartbeat: #{error.class}: #{error.message}", extra: { backtrace: error.backtrace&.first(20) }) + [ { error: error.message, type: error.class.name }, 422 ] end - - queue_project_mapping(heartbeat[:project]) - results << [ new_heartbeat.attributes, 201 ] - rescue => e - report_error(e, message: "Error creating heartbeat: #{e.class}: #{e.message}", extra: { backtrace: e.backtrace&.first(20) }) - results << [ { error: e.message, type: e.class.name }, 422 ] - end - - results - end - - def queue_project_mapping(project_name) - # only queue the job once per hour - Rails.cache.fetch("attempt_project_repo_mapping_job_#{@user.id}_#{project_name}", expires_in: 1.hour) do - AttemptProjectRepoMappingJob.perform_later(@user.id, project_name) end - rescue => e - # never raise an error here because it will break the heartbeat flow - report_error(e, message: "Error queuing project mapping") end def check_lockout diff --git a/app/services/heartbeat_import_runner.rb b/app/services/heartbeat_import_runner.rb index a29131a62..c295c96b7 100644 --- a/app/services/heartbeat_import_runner.rb +++ b/app/services/heartbeat_import_runner.rb @@ -290,7 +290,6 @@ def self.complete_run!(run, result:) finished_at: Time.current ) - DashboardRollupRefreshJob.schedule_for(run.user_id) run.clear_sensitive_fields! send_completion_email(run) end diff --git a/app/services/heartbeat_import_service.rb b/app/services/heartbeat_import_service.rb index 8aafda11f..f67012a9a 100644 --- a/app/services/heartbeat_import_service.rb +++ b/app/services/heartbeat_import_service.rb @@ -3,52 +3,24 @@ class HeartbeatImportService def self.import_from_file(file_content, user, on_progress: nil, progress_interval: 250, user_agents_by_id: {}) start_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) - user_id = user.id imported_count = 0 total_count = 0 errors = [] - seen_hashes = {} + heartbeat_batch = {} handler = HeartbeatSaxHandler.new do |hb| total_count += 1 on_progress&.call(total_count) if progress_interval.positive? && (total_count % progress_interval).zero? begin - time_value = hb["time"].is_a?(String) ? Time.parse(hb["time"]).to_f : hb["time"].to_f - normalized = normalize_imported_heartbeat(hb, user_agents_by_id:) - - attrs = { - user_id: user_id, - time: time_value, - entity: hb["entity"], - type: hb["type"], - category: hb["category"] || "coding", - project: hb["project"], - language: normalized[:language], - editor: normalized[:editor], - operating_system: normalized[:operating_system], - machine: normalized[:machine], - branch: hb["branch"], - user_agent: normalized[:user_agent], - is_write: hb["is_write"] || false, - line_additions: hb["line_additions"], - line_deletions: hb["line_deletions"], - lineno: hb["lineno"], - lines: hb["lines"], - cursorpos: hb["cursorpos"], - dependencies: hb["dependencies"] || [], - project_root_count: hb["project_root_count"], - source_type: Heartbeat.source_types.fetch("wakapi_import") - } - - attrs[:fields_hash] = Heartbeat.generate_fields_hash(attrs) - - existing = seen_hashes[attrs[:fields_hash]] - seen_hashes[attrs[:fields_hash]] = attrs if existing.nil? || attrs[:time] > existing[:time] - - if seen_hashes.size >= BATCH_SIZE - imported_count += flush_batch(seen_hashes) - seen_hashes.clear + attrs = HeartbeatIngest.normalize_imported_heartbeat(user:, heartbeat: hb, user_agents_by_id:) + heartbeat_batch[attrs[:fields_hash]] = hb + + if heartbeat_batch.size >= BATCH_SIZE + result = HeartbeatIngest.call(user:, mode: :import, heartbeats: heartbeat_batch.values, user_agents_by_id:, schedule_rollup_refresh: false) + imported_count += result.persisted_count + errors.concat(result.errors) + heartbeat_batch.clear end rescue => e errors << { heartbeat: hb, error: e.message } @@ -61,7 +33,12 @@ def self.import_from_file(file_content, user, on_progress: nil, progress_interva if total_count.zero? raise StandardError, "Expected a heartbeat export JSON file." end - imported_count += flush_batch(seen_hashes) if seen_hashes.any? + if heartbeat_batch.any? + result = HeartbeatIngest.call(user:, mode: :import, heartbeats: heartbeat_batch.values, user_agents_by_id:, schedule_rollup_refresh: false) + imported_count += result.persisted_count + errors.concat(result.errors) + end + HeartbeatIngest.schedule_rollup_refresh(user:) if imported_count.positive? elapsed = Process.clock_gettime(Process::CLOCK_MONOTONIC) - start_time @@ -98,49 +75,6 @@ def self.count_heartbeats(file_content) total_count end - def self.normalize_imported_heartbeat(heartbeat, user_agents_by_id: {}) - hb = heartbeat.respond_to?(:with_indifferent_access) ? heartbeat.with_indifferent_access : heartbeat.to_h.with_indifferent_access - user_agent_id = hb[:user_agent_id].to_s - user_agent_info = (user_agents_by_id[user_agent_id] || {}).with_indifferent_access - resolved_user_agent = hb[:user_agent].presence || user_agent_info[:value].presence || hb[:user_agent_id].presence - parsed_user_agent = parse_user_agent(resolved_user_agent) - - { - language: LanguageUtils.fill_missing_language(hb[:language], entity: hb[:entity]), - editor: hb[:editor].presence || user_agent_info[:editor].presence || parsed_user_agent[:editor].presence, - operating_system: hb[:operating_system].presence || user_agent_info[:os].presence || parsed_user_agent[:os].presence, - machine: hb[:machine].presence || hb[:machine_name_id].presence, - user_agent: resolved_user_agent - } - end - - def self.flush_batch(seen_hashes) - return 0 if seen_hashes.empty? - - records = seen_hashes.values - records.each do |r| - timestamp = Time.current - r[:created_at] = timestamp - r[:updated_at] = timestamp - end - - ActiveRecord::Base.logger.silence do - result = Heartbeat.upsert_all(records, unique_by: [ :fields_hash ]) - result.length - end - end - - def self.parse_user_agent(user_agent) - return { editor: nil, os: nil } if user_agent.blank? - - parsed = WakatimeService.parse_user_agent(user_agent) - { - editor: parsed[:editor].presence, - os: parsed[:os].presence - } - end - private_class_method :parse_user_agent - class HeartbeatSaxHandler < Oj::Saj def initialize(&block) @block = block diff --git a/app/services/heartbeat_ingest.rb b/app/services/heartbeat_ingest.rb new file mode 100644 index 000000000..2ab672464 --- /dev/null +++ b/app/services/heartbeat_ingest.rb @@ -0,0 +1,221 @@ +class HeartbeatIngest + LAST_LANGUAGE_SENTINEL = "<>" + + Result = Data.define(:total_count, :persisted_count, :duplicate_count, :failed_count, :errors, :items) + Item = Data.define(:heartbeat, :status, :error) + + def self.call(...) + new(...).call + end + + def self.schedule_rollup_refresh(user:) + DashboardRollupRefreshJob.schedule_for(user.id) + end + + def self.normalize_imported_heartbeat(user:, heartbeat:, user_agents_by_id: {}) + new(user:, mode: :import, heartbeats: [], user_agents_by_id:).send(:normalize_imported_heartbeat, heartbeat) + end + + def initialize(user:, mode:, heartbeats:, request_context: {}, user_agents_by_id: {}, schedule_rollup_refresh: true) + @user = user + @mode = mode + @heartbeats = heartbeats + @request_context = request_context.with_indifferent_access + @user_agents_by_id = user_agents_by_id + @schedule_rollup_refresh = schedule_rollup_refresh + end + + def call + case @mode + when :direct + ingest_direct + when :import + ingest_import + else + raise ArgumentError, "Unsupported heartbeat ingest mode: #{@mode.inspect}" + end + end + + private + + def ingest_direct + items = [] + persisted_count = 0 + duplicate_count = 0 + errors = [] + last_language = nil + + @heartbeats.each do |heartbeat| + attrs = normalize_direct_heartbeat(heartbeat, last_language:) + + persisted, duplicate = persist_direct_heartbeat(attrs) + last_language = attrs[:language] if attrs[:language].present? + persisted_count += 1 unless duplicate + duplicate_count += 1 if duplicate + items << Item.new(heartbeat: persisted, status: :accepted, error: nil) + queue_project_mapping(attrs[:project]) + rescue => e + errors << { heartbeat: heartbeat, error: e.message, type: e.class.name } + items << Item.new(heartbeat: nil, status: :failed, error: e) + end + + Result.new( + total_count: @heartbeats.length, + persisted_count: persisted_count, + duplicate_count: duplicate_count, + failed_count: errors.length, + errors: errors, + items: items + ) + end + + def normalize_direct_heartbeat(heartbeat, last_language:) + attrs = heartbeat.to_h.with_indifferent_access + source_type = attrs[:entity] == "test.txt" ? :test_entry : :direct_entry + + if attrs[:language] == LAST_LANGUAGE_SENTINEL + attrs[:language] = last_language || @user.heartbeats + .where.not(language: [ nil, "", LAST_LANGUAGE_SENTINEL ]) + .order(time: :desc) + .pick(:language) + end + + if attrs[:language].blank? || attrs[:language] == "Unknown" + inferred = LanguageUtils.detect_from_extension(attrs[:entity]) + attrs[:language] = inferred if inferred + end + + fallback_value = attrs.delete(:plugin) + attrs[:user_agent] ||= fallback_value + parsed_ua = WakatimeService.parse_user_agent(attrs[:user_agent]) + attrs[:category] ||= "coding" + attrs[:project] = attrs[:project]&.gsub(/[[:cntrl:]]/, "")&.strip + + attrs.merge( + user_id: @user.id, + source_type: source_type, + ip_address: @request_context[:ip_address], + editor: parsed_ua[:editor], + operating_system: parsed_ua[:os], + machine: @request_context[:machine] + ).slice(*Heartbeat.column_names.map(&:to_sym)) + end + + def persist_direct_heartbeat(attrs) + temp = Heartbeat.new(attrs) + fields_hash = Heartbeat.generate_fields_hash(temp.attributes) + existing = @user.heartbeats.find_by(fields_hash: fields_hash) + return [ existing, true ] if existing + + now = Time.current + result = Heartbeat.insert( + attrs.merge(fields_hash: fields_hash, created_at: now, updated_at: now), + unique_by: :fields_hash, + returning: Heartbeat.column_names + ) + + persisted = if result.any? + Heartbeat.new(result.first) + else + @user.heartbeats.find_by!(fields_hash: fields_hash) + end + + self.class.schedule_rollup_refresh(user: @user) if result.any? && @schedule_rollup_refresh + [ persisted, !result.any? ] + end + + def ingest_import + seen_hashes = {} + total_count = 0 + errors = [] + + @heartbeats.each do |heartbeat| + total_count += 1 + attrs = normalize_imported_heartbeat(heartbeat) + existing = seen_hashes[attrs[:fields_hash]] + seen_hashes[attrs[:fields_hash]] = attrs if existing.nil? || attrs[:time] > existing[:time] + rescue => e + errors << { heartbeat: heartbeat, error: e.message, type: e.class.name } + end + + persisted_count = flush_import_batch(seen_hashes) + self.class.schedule_rollup_refresh(user: @user) if persisted_count.positive? && @schedule_rollup_refresh + + Result.new( + total_count: total_count, + persisted_count: persisted_count, + duplicate_count: total_count - persisted_count - errors.length, + failed_count: errors.length, + errors: errors, + items: [] + ) + end + + def normalize_imported_heartbeat(heartbeat) + hb = heartbeat.respond_to?(:with_indifferent_access) ? heartbeat.with_indifferent_access : heartbeat.to_h.with_indifferent_access + user_agent_id = hb[:user_agent_id].to_s + user_agent_info = (@user_agents_by_id[user_agent_id] || {}).with_indifferent_access + resolved_user_agent = hb[:user_agent].presence || user_agent_info[:value].presence || hb[:user_agent_id].presence + parsed_user_agent = parse_user_agent(resolved_user_agent) + + attrs = { + user_id: @user.id, + time: hb[:time].is_a?(String) ? Time.parse(hb[:time]).to_f : hb[:time].to_f, + entity: hb[:entity], + type: hb[:type], + category: hb[:category] || "coding", + project: hb[:project], + language: LanguageUtils.fill_missing_language(hb[:language], entity: hb[:entity]), + editor: hb[:editor].presence || user_agent_info[:editor].presence || parsed_user_agent[:editor].presence, + operating_system: hb[:operating_system].presence || user_agent_info[:os].presence || parsed_user_agent[:os].presence, + machine: hb[:machine].presence || hb[:machine_name_id].presence, + branch: hb[:branch], + user_agent: resolved_user_agent, + is_write: hb[:is_write] || false, + line_additions: hb[:line_additions], + line_deletions: hb[:line_deletions], + lineno: hb[:lineno], + lines: hb[:lines], + cursorpos: hb[:cursorpos], + dependencies: hb[:dependencies] || [], + project_root_count: hb[:project_root_count], + source_type: Heartbeat.source_types.fetch("wakapi_import") + } + + attrs[:fields_hash] = Heartbeat.generate_fields_hash(attrs) + attrs + end + + def flush_import_batch(seen_hashes) + return 0 if seen_hashes.empty? + + timestamp = Time.current + records = seen_hashes.values.map do |record| + record.merge(created_at: timestamp, updated_at: timestamp) + end + + ActiveRecord::Base.logger.silence do + Heartbeat.insert_all(records, unique_by: [ :fields_hash ]).length + end + end + + def parse_user_agent(user_agent) + return { editor: nil, os: nil } if user_agent.blank? + + parsed = WakatimeService.parse_user_agent(user_agent) + { + editor: parsed[:editor].presence, + os: parsed[:os].presence + } + end + + def queue_project_mapping(project_name) + return if project_name.blank? + + Rails.cache.fetch("attempt_project_repo_mapping_job_#{@user.id}_#{project_name}", expires_in: 1.hour) do + AttemptProjectRepoMappingJob.perform_later(@user.id, project_name) + end + rescue => e + Rails.error.report(e, handled: true, context: { message: "Error queuing project mapping" }) + end +end diff --git a/test/services/heartbeat_ingest_test.rb b/test/services/heartbeat_ingest_test.rb new file mode 100644 index 000000000..4e55726a4 --- /dev/null +++ b/test/services/heartbeat_ingest_test.rb @@ -0,0 +1,167 @@ +require "test_helper" + +class HeartbeatIngestTest < ActiveSupport::TestCase + include ActiveJob::TestHelper + + setup do + Rails.cache.clear + clear_enqueued_jobs + @original_queue_adapter = ActiveJob::Base.queue_adapter + ActiveJob::Base.queue_adapter = :test + end + + teardown do + Rails.cache.clear + clear_enqueued_jobs + ActiveJob::Base.queue_adapter = @original_queue_adapter + end + + test "direct heartbeat ingest persists normalized heartbeats and schedules dashboard rollup refresh" do + user = User.create!(timezone: "UTC") + + assert_difference("user.heartbeats.count", 1) do + assert_enqueued_with(job: DashboardRollupRefreshJob, args: [ user.id ]) do + assert_enqueued_with(job: AttemptProjectRepoMappingJob, args: [ user.id, "hackatime" ]) do + result = HeartbeatIngest.call( + user: user, + mode: :direct, + heartbeats: [ { + entity: "src/main.rb", + plugin: "vscode/1.0.0", + project: "hackatime", + time: Time.current.to_f, + type: "file" + } ], + request_context: { + ip_address: "203.0.113.10", + machine: "laptop" + } + ) + + assert_equal 1, result.total_count + assert_equal 1, result.persisted_count + assert_equal 0, result.duplicate_count + assert_equal 0, result.failed_count + assert_equal 1, result.items.length + assert_equal :accepted, result.items.first.status + end + end + end + + heartbeat = user.heartbeats.order(:id).last + assert_equal "vscode/1.0.0", heartbeat.user_agent + assert_equal "coding", heartbeat.category + assert_equal "laptop", heartbeat.machine + assert_equal "203.0.113.10", heartbeat.ip_address.to_s + assert_equal "direct_entry", heartbeat.source_type + end + + test "direct heartbeat ingest returns existing heartbeat for duplicate input" do + user = User.create!(timezone: "UTC") + payload = { + entity: "src/main.rb", + plugin: "vscode/1.0.0", + project: "hackatime", + time: Time.current.to_f, + type: "file" + } + + first_result = HeartbeatIngest.call( + user: user, + mode: :direct, + heartbeats: [ payload ], + request_context: { ip_address: "203.0.113.10" } + ) + first_heartbeat = first_result.items.first.heartbeat + + clear_enqueued_jobs + + assert_no_difference("user.heartbeats.count") do + result = HeartbeatIngest.call( + user: user, + mode: :direct, + heartbeats: [ payload ], + request_context: { ip_address: "203.0.113.20" } + ) + + assert_equal 1, result.total_count + assert_equal 0, result.persisted_count + assert_equal 1, result.duplicate_count + assert_equal 0, result.failed_count + assert_equal first_heartbeat.id, result.items.first.heartbeat.id + end + + assert_no_enqueued_jobs only: DashboardRollupRefreshJob + end + + test "direct heartbeat ingest resolves last language within the batch" do + user = User.create!(timezone: "UTC") + now = Time.current.to_f + + result = HeartbeatIngest.call( + user: user, + mode: :direct, + heartbeats: [ + { + entity: "src/first.py", + plugin: "vscode/1.0.0", + project: "hackatime", + time: now - 1, + type: "file", + language: "Python" + }, + { + entity: "src/second.rb", + plugin: "vscode/1.0.0", + project: "hackatime", + time: now, + type: "file", + language: "<>" + } + ] + ) + + assert_equal 2, result.persisted_count + heartbeats = user.heartbeats.order(:time) + assert_equal [ "Python", "Python" ], heartbeats.pluck(:language) + end + + test "import heartbeat ingest deduplicates imported heartbeats and schedules dashboard rollup refresh" do + user = User.create!(timezone: "UTC") + + assert_difference("user.heartbeats.count", 1) do + assert_enqueued_with(job: DashboardRollupRefreshJob, args: [ user.id ]) do + result = HeartbeatIngest.call( + user: user, + mode: :import, + heartbeats: [ + { + entity: "/tmp/test.rb", + type: "file", + time: 1_700_000_000.0, + project: "hackatime", + language: "Ruby", + is_write: true + }, + { + entity: "/tmp/test.rb", + type: "file", + time: 1_700_000_000.0, + project: "hackatime", + language: "Ruby", + is_write: true + } + ] + ) + + assert_equal 2, result.total_count + assert_equal 1, result.persisted_count + assert_equal 1, result.duplicate_count + assert_equal 0, result.failed_count + end + end + + heartbeat = user.heartbeats.order(:id).last + assert_equal "wakapi_import", heartbeat.source_type + end +end