Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 38 additions & 4 deletions src/harness_sdk/instrumentation/grpc/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,10 @@ def telemetry_interceptor(request_or_iterator, context) -> None:
'''Process request for hypertrace.'''
logger.debug(
'Entering OpenTelemetryServerInterceptorWrapper.telemetry_interceptor().')
# handle streaming responses specially
# Client/bidi streaming RPCs (e.g. reflection) pass an iterator.
if request_streaming:
return behavior(request_or_iterator, context)
# handle server-streaming responses specially
if response_streaming:
return self._intercept_server_stream(
behavior,
Expand Down Expand Up @@ -221,11 +224,42 @@ def _intercept_server_stream(
behavior,
handler_call_details,
request_or_iterator,
context) -> None:
'''Setup interceptor helper for streaming requests.'''
context):
'''Setup interceptor helper for server-streaming responses.'''
logger.debug(
'Entering OpenTelemetryServerInterceptorWrapper.intercept_server_stream().')
# COME_BACK -- need to implement this
with self._set_remote_context(context):
with self._start_span(
handler_call_details,
context,
set_status_on_exception=False,
) as span:
context = _OpenTelemetryWrapperServicerContext(context, span)
invocation_metadata = dict(context.invocation_metadata())
req_dict = MessageToDict(request_or_iterator)
self._gisw.generic_rpc_request_handler(
invocation_metadata, json.dumps(req_dict), span)
try:
control_result = get_control_registry().evaluate(
span,
'',
invocation_metadata,
request_or_iterator,
True,
)
if control_result.block:
logger.debug('should block evaluated to true, aborting with 403')
context.abort(grpc.StatusCode.PERMISSION_DENIED, 'Permission Denied')

for response in behavior(request_or_iterator, context):
response_dict = MessageToDict(response)
self._gisw.generic_rpc_response_handler(
{}, json.dumps(response_dict), span)
yield response
except Exception as error: # pylint: disable=W0703
if type(error) != Exception: # noqa: E721
span.record_exception(error)
raise error

# Wrapper around client-side interceptor
class OpenTelemetryClientInterceptorWrapper(_client.OpenTelemetryClientInterceptor):
Expand Down
Loading