Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion apps/dmt/src/dmt.app.src
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@
dmt_core,
dmt_object,
opentelemetry_api,
opentelemetry,
opentelemetry_exporter,
opentelemetry
opentelemetry_experimental
]},
{env, []},
{modules, []},
Expand Down
3 changes: 1 addition & 2 deletions apps/dmt/src/dmt_app.erl
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,5 @@ start(_StartType, _StartArgs) ->
dmt_sup:start_link().

stop(_State) ->
ok = dmt_sup:flush_otel_logs(),
ok.

%% internal functions
65 changes: 65 additions & 0 deletions apps/dmt/src/dmt_otel_log_filter.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
%%% @doc
%%% Logger filter для otel_logs handler: конвертирует otel_trace_id и otel_span_id
%%% из hex-формата (32/16 символов) в raw bytes (16/8 байт), как требует OTLP LogRecord.
%%% opentelemetry hex_span_ctx возвращает hex, collector ожидает bytes.
%%% @end
-module(dmt_otel_log_filter).

-export([filter/2]).
-export([format_otp_report_utf8/1]).

-spec filter(logger:log_event(), term()) -> logger:filter_return().
filter(#{meta := Meta} = LogEvent, _FilterConfig) ->
case convert_otel_ids(Meta) of
Meta ->
LogEvent;
Meta1 ->
LogEvent#{meta => Meta1}
end.

%% Конвертируем hex -> raw bytes только если формат hex (32/16 символов).
%% OTLP LogRecord: trace_id=16 bytes, span_id=8 bytes.
convert_otel_ids(#{otel_trace_id := TraceIdHex, otel_span_id := SpanIdHex} = Meta) ->
case {hex_to_trace_id_bytes(TraceIdHex), hex_to_span_id_bytes(SpanIdHex)} of
{TraceIdBytes, SpanIdBytes} when TraceIdBytes =/= undefined, SpanIdBytes =/= undefined ->
Meta#{otel_trace_id => TraceIdBytes, otel_span_id => SpanIdBytes};
_ ->
%% Некорректный формат — убираем, чтобы otel_otlp_logs не отправил в OTLP
maps:without([otel_trace_id, otel_span_id, otel_trace_flags], Meta)
end;
convert_otel_ids(Meta) ->
Meta.

%% logger:format_otp_report/1 возвращает chardata (часто list()),
%% из-за чего downstream JSON может сериализовать body как массив байт.
%% Явно приводим к UTF-8 binary(), чтобы body в OTel/Loki был строкой.
-spec format_otp_report_utf8(logger:report()) -> {unicode:chardata(), list()}.
format_otp_report_utf8(Report) ->
Bin =
try logger:format_otp_report(Report) of
{Format, Args} ->
unicode:characters_to_binary(io_lib:format(Format, Args))
catch
_:_ ->
%% Не даём report_cb падать: fallback в печатное представление отчёта.
unicode:characters_to_binary(io_lib:format("~tp", [Report]))
end,
{"~ts", [Bin]}.

hex_to_trace_id_bytes(Hex) when is_binary(Hex), byte_size(Hex) =:= 32 ->
try
<<(binary_to_integer(Hex, 16)):128>>
catch
_:_ -> undefined
end;
hex_to_trace_id_bytes(_) ->
undefined.

hex_to_span_id_bytes(Hex) when is_binary(Hex), byte_size(Hex) =:= 16 ->
try
<<(binary_to_integer(Hex, 16)):64>>
catch
_:_ -> undefined
end;
hex_to_span_id_bytes(_) ->
undefined.
48 changes: 48 additions & 0 deletions apps/dmt/src/dmt_otel_log_handler.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
-module(dmt_otel_log_handler).

-export([log/2]).
-export([adding_handler/1]).
-export([removing_handler/1]).
-export([changing_config/3]).
-export([filter_config/1]).

-spec log(logger:log_event(), map()) -> ok.
log(LogEvent, Config) ->
otel_log_handler:log(LogEvent, Config).

-spec adding_handler(map()) -> {ok, map()} | {error, term()}.
adding_handler(Config) ->
otel_log_handler:adding_handler(merge_module_config(Config)).

-spec removing_handler(map()) -> ok.
removing_handler(Config) ->
otel_log_handler:removing_handler(Config).

-spec changing_config(set | update, map(), map()) -> {ok, map()} | {error, term()}.
changing_config(SetOrUpdate, OldConfig, NewConfig) ->
otel_log_handler:changing_config(
SetOrUpdate,
merge_module_config(OldConfig),
merge_module_config(NewConfig)
).

-spec filter_config(map()) -> map().
filter_config(Config) ->
otel_log_handler:filter_config(Config).

%% Переносим ключи из вложенного config в корневой map для otel_log_handler.
%% Только ключи, которые ожидает otel_log_handler — чтобы случайно не перезаписать
%% верхнеуровневые настройки logger (level, filters, filter_default и т.д.).
-define(OTEL_LOG_HANDLER_KEYS, [
exporter,
report_cb,
max_queue_size,
scheduled_delay_ms,
exporting_timeout_ms
]).

merge_module_config(#{config := ModuleConfig} = Config) when is_map(ModuleConfig) ->
OtelConfig = maps:with(?OTEL_LOG_HANDLER_KEYS, ModuleConfig),
maps:merge(Config, OtelConfig);
merge_module_config(Config) ->
Config.
54 changes: 54 additions & 0 deletions apps/dmt/src/dmt_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

-export([get_service/1]).
-export([get_damsel_version/0]).
-export([flush_otel_logs/0]).

-define(APP, dmt).
-define(DEFAULT_DB, default_db).
Expand All @@ -30,6 +31,7 @@ start_link() ->
%% type => worker(), % optional
%% modules => modules()} % optional
init(_) ->
ok = ensure_otel_log_handler(),
ok = dbinit(),
ok = setup_kafka(dmt_kafka_publisher:is_kafka_enabled()),
ok = setup_damsel_version(),
Expand Down Expand Up @@ -226,6 +228,58 @@ application_get_env(App, Key, Default) ->
undefined -> Default
end.

ensure_otel_log_handler() ->
MaxQueue = application:get_env(?APP, otel_log_max_queue_size, 2048),
DelayMs = application:get_env(?APP, otel_log_scheduled_delay_ms, 1000),
TimeoutMs = application:get_env(?APP, otel_log_exporting_timeout_ms, 300000),
LogLevel = application:get_env(?APP, otel_log_level, info),
HandlerConfig = #{
report_cb => fun dmt_otel_log_filter:format_otp_report_utf8/1,
exporter =>
{otel_exporter_logs_otlp, #{
protocol => http_protobuf,
ssl_options => []
}},
max_queue_size => MaxQueue,
scheduled_delay_ms => DelayMs,
exporting_timeout_ms => TimeoutMs
},
LoggerHandlerConfig = #{
level => LogLevel,
filter_default => log,
filters => [{dmt_otel_trace_id_bytes, {fun dmt_otel_log_filter:filter/2, undefined}}],
config => HandlerConfig
},
case logger:add_handler(otel_logs, dmt_otel_log_handler, LoggerHandlerConfig) of
ok ->
ok;
{error, {already_exist, _}} ->
ok;
{error, Reason} ->
throw({otel_log_handler_failed, Reason})
end.

%% @doc Ждём отправки буферизованных логов перед остановкой.
-define(FLUSH_EXPORT_OVERHEAD_MS, 700).
-define(FLUSH_MAX_WAIT_MS, 5000).

-spec flush_otel_logs() -> ok.
flush_otel_logs() ->
case logger:get_handler_config(otel_logs) of
{ok, HandlerCfg} ->
Config = maps:get(config, HandlerCfg, #{}),
DelayMs = maps:get(
scheduled_delay_ms,
Config,
maps:get(scheduled_delay_ms, HandlerCfg, 1000)
),
_ = logger:info("otel_log_handler_flush"),
timer:sleep(erlang:min(?FLUSH_MAX_WAIT_MS, DelayMs + ?FLUSH_EXPORT_OVERHEAD_MS)),
ok;
_ ->
ok
end.

setup_kafka(false) ->
ok;
setup_kafka(_) ->
Expand Down
27 changes: 27 additions & 0 deletions apps/dmt/test/dmt_ct_helper.erl
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
-export([create_kafka_topics/0]).
-export([delete_kafka_topics/0]).

-export([trace_testcase/3]).
-export([maybe_end_trace/1]).

-export_type([config/0]).
-export_type([test_case_name/0]).
-export_type([group_name/0]).
Expand Down Expand Up @@ -201,3 +204,27 @@ create_kafka_topics() ->

delete_kafka_topics() ->
_ = brod:delete_topics(?BROKERS, [?TEST_TOPIC], 5000).

%% OTEL trace span per test case (for Grafana/Tempo tracing)
-spec trace_testcase(module(), atom(), config()) -> config().
trace_testcase(Module, TestCaseName, Config) ->
SpanName = iolist_to_binary([
atom_to_binary(Module, utf8),
":",
atom_to_binary(TestCaseName, utf8),
"/1"
]),
Tracer = opentelemetry:get_application_tracer(Module),
SpanCtx = otel_tracer:start_span(Tracer, SpanName, #{kind => internal}),
_ = otel_tracer:set_current_span(SpanCtx),
[{span_ctx, SpanCtx} | Config].

-spec maybe_end_trace(config()) -> ok.
maybe_end_trace(Config) ->
case lists:keyfind(span_ctx, 1, Config) of
{span_ctx, SpanCtx} ->
_ = otel_span:end_span(SpanCtx),
ok;
_ ->
ok
end.
7 changes: 6 additions & 1 deletion apps/dmt/test/dmt_integration_tests_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
end_per_suite/1,
init_per_group/2,
end_per_group/2,
init_per_testcase/2,
end_per_testcase/2,
all/0,
groups/0
Expand Down Expand Up @@ -131,7 +132,11 @@ init_per_group(_, C) ->
end_per_group(_, _C) ->
ok.

end_per_testcase(_, _C) ->
init_per_testcase(Name, C) ->
dmt_ct_helper:trace_testcase(?MODULE, Name, C).

end_per_testcase(_Name, C) ->
ok = dmt_ct_helper:maybe_end_trace(C),
dmt_ct_helper:cleanup_db(),
ok.

Expand Down
7 changes: 4 additions & 3 deletions apps/dmt/test/dmt_kafka_integration_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,11 @@ end_per_suite(_Config) ->

ok.

init_per_testcase(_TestCase, Config) ->
Config.
init_per_testcase(TestCase, Config) ->
dmt_ct_helper:trace_testcase(?MODULE, TestCase, Config).

end_per_testcase(_TestCase, _Config) ->
end_per_testcase(_TestCase, Config) ->
ok = dmt_ct_helper:maybe_end_trace(Config),
ok.

%%====================================================================
Expand Down
10 changes: 6 additions & 4 deletions apps/dmt/test/dmt_repository_client_tests_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,13 @@ init_per_group(_, C) ->
end_per_group(_, _C) ->
ok.

init_per_testcase(_, C) ->
AuthorID = create_author(<<"checkout_objects_test@tests">>, dmt_ct_helper:cfg(client, C)),
[{author_id, AuthorID} | C].
init_per_testcase(Name, C) ->
C1 = dmt_ct_helper:trace_testcase(?MODULE, Name, C),
AuthorID = create_author(<<"checkout_objects_test@tests">>, dmt_ct_helper:cfg(client, C1)),
[{author_id, AuthorID} | C1].

end_per_testcase(_, _C) ->
end_per_testcase(_Name, C) ->
ok = dmt_ct_helper:maybe_end_trace(C),
dmt_ct_helper:cleanup_db(),
ok.

Expand Down
7 changes: 6 additions & 1 deletion apps/dmt/test/dmt_search_objects_tests_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
end_per_suite/1,
init_per_group/2,
end_per_group/2,
init_per_testcase/2,
end_per_testcase/2,
all/0,
groups/0
Expand Down Expand Up @@ -79,7 +80,11 @@ init_per_group(_, C) ->
end_per_group(_, _C) ->
ok.

end_per_testcase(_, _C) ->
init_per_testcase(Name, C) ->
dmt_ct_helper:trace_testcase(?MODULE, Name, C).

end_per_testcase(_Name, C) ->
ok = dmt_ct_helper:maybe_end_trace(C),
dmt_ct_helper:cleanup_db(),
ok.

Expand Down
Loading
Loading