Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 15 additions & 93 deletions app/controllers/api/hackatime/v1/hackatime_controller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -235,104 +235,26 @@ def body_to_json
end || {}
end

LAST_LANGUAGE_SENTINEL = "<<LAST_LANGUAGE>>"

def handle_heartbeat(heartbeat_array)
results = []
last_language = nil
heartbeat_array.each do |heartbeat|
heartbeat = heartbeat.to_h.with_indifferent_access
source_type = :direct_entry

# Resolve <<LAST_LANGUAGE>> sentinel to the most recently used language.
# Check within the current batch first, then fall back to the DB.
if heartbeat[:language] == LAST_LANGUAGE_SENTINEL
heartbeat[:language] = last_language || @user.heartbeats
.where.not(language: [ nil, "", LAST_LANGUAGE_SENTINEL ])
.order(time: :desc)
.pick(:language)
end

# Infer language from file extension when client sends blank or Unknown
if heartbeat[:language].blank? || heartbeat[:language] == "Unknown"
inferred = LanguageUtils.detect_from_extension(heartbeat[:entity])
heartbeat[:language] = inferred if inferred
end

# Track the last known language for subsequent heartbeats in this batch.
last_language = heartbeat[:language] if heartbeat[:language].present?

# Fallback to :plugin if :user_agent is not set
fallback_value = heartbeat.delete(:plugin)
heartbeat[:user_agent] ||= fallback_value

parsed_ua = WakatimeService.parse_user_agent(heartbeat[:user_agent])

# if category is not set, just default to coding
heartbeat[:category] ||= "coding"

# special case: if the entity is "test.txt", this is a test heartbeat
if heartbeat[:entity] == "test.txt"
source_type = :test_entry
end

heartbeat[:project] = heartbeat[:project]&.gsub(/[[:cntrl:]]/, "")&.strip

attrs = heartbeat.merge({
user_id: @user.id,
source_type: source_type,
result = HeartbeatIngest.call(
user: @user,
mode: :direct,
heartbeats: heartbeat_array,
request_context: {
ip_address: request.headers["CF-Connecting-IP"] || request.remote_ip,
editor: parsed_ua[:editor],
operating_system: parsed_ua[:os],
machine: request.headers["X-Machine-Name"]
}).slice(*Heartbeat.column_names.map(&:to_sym))
# ^^ They say safety laws are written in blood. Well, so is this line!
# Basically this filters out columns that aren't in our DB (the biggest one being raw_data)
temp = Heartbeat.new(attrs)
fields_hash = Heartbeat.generate_fields_hash(temp.attributes)
new_heartbeat = @user.heartbeats.find_by(fields_hash: fields_hash)

unless new_heartbeat
now = Time.current
insert_attrs = attrs.merge(
fields_hash: fields_hash,
created_at: now,
updated_at: now
)
result = Heartbeat.insert(
insert_attrs,
unique_by: :fields_hash,
returning: Heartbeat.column_names
)

if result.any?
new_heartbeat = Heartbeat.new(result.first)
# This uses insert for deduplication, so model validations/callbacks are skipped.
# Keep manual fields_hash and rollup scheduling in sync with Heartbeat callbacks.
DashboardRollupRefreshJob.schedule_for(@user.id)
else
new_heartbeat = @user.heartbeats.find_by!(fields_hash: fields_hash)
end
}
)

result.items.map do |item|
if item.status == :accepted
[ item.heartbeat.attributes, 201 ]
else
error = item.error
report_error(error, message: "Error creating heartbeat: #{error.class}: #{error.message}", extra: { backtrace: error.backtrace&.first(20) })
[ { error: error.message, type: error.class.name }, 422 ]
end

queue_project_mapping(heartbeat[:project])
results << [ new_heartbeat.attributes, 201 ]
rescue => e
report_error(e, message: "Error creating heartbeat: #{e.class}: #{e.message}", extra: { backtrace: e.backtrace&.first(20) })
results << [ { error: e.message, type: e.class.name }, 422 ]
end

results
end

def queue_project_mapping(project_name)
# only queue the job once per hour
Rails.cache.fetch("attempt_project_repo_mapping_job_#{@user.id}_#{project_name}", expires_in: 1.hour) do
AttemptProjectRepoMappingJob.perform_later(@user.id, project_name)
end
rescue => e
# never raise an error here because it will break the heartbeat flow
report_error(e, message: "Error queuing project mapping")
end

def check_lockout
Expand Down
1 change: 0 additions & 1 deletion app/services/heartbeat_import_runner.rb
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,6 @@ def self.complete_run!(run, result:)
finished_at: Time.current
)

DashboardRollupRefreshJob.schedule_for(run.user_id)
run.clear_sensitive_fields!
send_completion_email(run)
end
Expand Down
96 changes: 15 additions & 81 deletions app/services/heartbeat_import_service.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,52 +3,24 @@ class HeartbeatImportService

def self.import_from_file(file_content, user, on_progress: nil, progress_interval: 250, user_agents_by_id: {})
start_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)
user_id = user.id
imported_count = 0
total_count = 0
errors = []
seen_hashes = {}
heartbeat_batch = {}

handler = HeartbeatSaxHandler.new do |hb|
total_count += 1
on_progress&.call(total_count) if progress_interval.positive? && (total_count % progress_interval).zero?

begin
time_value = hb["time"].is_a?(String) ? Time.parse(hb["time"]).to_f : hb["time"].to_f
normalized = normalize_imported_heartbeat(hb, user_agents_by_id:)

attrs = {
user_id: user_id,
time: time_value,
entity: hb["entity"],
type: hb["type"],
category: hb["category"] || "coding",
project: hb["project"],
language: normalized[:language],
editor: normalized[:editor],
operating_system: normalized[:operating_system],
machine: normalized[:machine],
branch: hb["branch"],
user_agent: normalized[:user_agent],
is_write: hb["is_write"] || false,
line_additions: hb["line_additions"],
line_deletions: hb["line_deletions"],
lineno: hb["lineno"],
lines: hb["lines"],
cursorpos: hb["cursorpos"],
dependencies: hb["dependencies"] || [],
project_root_count: hb["project_root_count"],
source_type: Heartbeat.source_types.fetch("wakapi_import")
}

attrs[:fields_hash] = Heartbeat.generate_fields_hash(attrs)

existing = seen_hashes[attrs[:fields_hash]]
seen_hashes[attrs[:fields_hash]] = attrs if existing.nil? || attrs[:time] > existing[:time]

if seen_hashes.size >= BATCH_SIZE
imported_count += flush_batch(seen_hashes)
seen_hashes.clear
attrs = HeartbeatIngest.normalize_imported_heartbeat(user:, heartbeat: hb, user_agents_by_id:)
heartbeat_batch[attrs[:fields_hash]] = hb

if heartbeat_batch.size >= BATCH_SIZE
result = HeartbeatIngest.call(user:, mode: :import, heartbeats: heartbeat_batch.values, user_agents_by_id:, schedule_rollup_refresh: false)
imported_count += result.persisted_count
Comment thread
skyfallwastaken marked this conversation as resolved.
errors.concat(result.errors)
heartbeat_batch.clear
end
rescue => e
errors << { heartbeat: hb, error: e.message }
Expand All @@ -61,7 +33,12 @@ def self.import_from_file(file_content, user, on_progress: nil, progress_interva
if total_count.zero?
Comment thread
skyfallwastaken marked this conversation as resolved.
raise StandardError, "Expected a heartbeat export JSON file."
end
imported_count += flush_batch(seen_hashes) if seen_hashes.any?
if heartbeat_batch.any?
result = HeartbeatIngest.call(user:, mode: :import, heartbeats: heartbeat_batch.values, user_agents_by_id:, schedule_rollup_refresh: false)
imported_count += result.persisted_count
errors.concat(result.errors)
end
HeartbeatIngest.schedule_rollup_refresh(user:) if imported_count.positive?
Comment thread
skyfallwastaken marked this conversation as resolved.

elapsed = Process.clock_gettime(Process::CLOCK_MONOTONIC) - start_time

Expand Down Expand Up @@ -98,49 +75,6 @@ def self.count_heartbeats(file_content)
total_count
end

def self.normalize_imported_heartbeat(heartbeat, user_agents_by_id: {})
hb = heartbeat.respond_to?(:with_indifferent_access) ? heartbeat.with_indifferent_access : heartbeat.to_h.with_indifferent_access
user_agent_id = hb[:user_agent_id].to_s
user_agent_info = (user_agents_by_id[user_agent_id] || {}).with_indifferent_access
resolved_user_agent = hb[:user_agent].presence || user_agent_info[:value].presence || hb[:user_agent_id].presence
parsed_user_agent = parse_user_agent(resolved_user_agent)

{
language: LanguageUtils.fill_missing_language(hb[:language], entity: hb[:entity]),
editor: hb[:editor].presence || user_agent_info[:editor].presence || parsed_user_agent[:editor].presence,
operating_system: hb[:operating_system].presence || user_agent_info[:os].presence || parsed_user_agent[:os].presence,
machine: hb[:machine].presence || hb[:machine_name_id].presence,
user_agent: resolved_user_agent
}
end

def self.flush_batch(seen_hashes)
return 0 if seen_hashes.empty?

records = seen_hashes.values
records.each do |r|
timestamp = Time.current
r[:created_at] = timestamp
r[:updated_at] = timestamp
end

ActiveRecord::Base.logger.silence do
result = Heartbeat.upsert_all(records, unique_by: [ :fields_hash ])
result.length
end
end

def self.parse_user_agent(user_agent)
return { editor: nil, os: nil } if user_agent.blank?

parsed = WakatimeService.parse_user_agent(user_agent)
{
editor: parsed[:editor].presence,
os: parsed[:os].presence
}
end
private_class_method :parse_user_agent

class HeartbeatSaxHandler < Oj::Saj
def initialize(&block)
@block = block
Expand Down
Loading