diff --git a/src/harness_sdk/instrumentation/grpc/__init__.py b/src/harness_sdk/instrumentation/grpc/__init__.py index 4ca4008..728f8a3 100644 --- a/src/harness_sdk/instrumentation/grpc/__init__.py +++ b/src/harness_sdk/instrumentation/grpc/__init__.py @@ -164,7 +164,10 @@ def telemetry_interceptor(request_or_iterator, context) -> None: '''Process request for hypertrace.''' logger.debug( 'Entering OpenTelemetryServerInterceptorWrapper.telemetry_interceptor().') - # handle streaming responses specially + # Client/bidi streaming RPCs (e.g. reflection) pass an iterator. + if request_streaming: + return behavior(request_or_iterator, context) + # handle server-streaming responses specially if response_streaming: return self._intercept_server_stream( behavior, @@ -221,11 +224,42 @@ def _intercept_server_stream( behavior, handler_call_details, request_or_iterator, - context) -> None: - '''Setup interceptor helper for streaming requests.''' + context): + '''Setup interceptor helper for server-streaming responses.''' logger.debug( 'Entering OpenTelemetryServerInterceptorWrapper.intercept_server_stream().') - # COME_BACK -- need to implement this + with self._set_remote_context(context): + with self._start_span( + handler_call_details, + context, + set_status_on_exception=False, + ) as span: + context = _OpenTelemetryWrapperServicerContext(context, span) + invocation_metadata = dict(context.invocation_metadata()) + req_dict = MessageToDict(request_or_iterator) + self._gisw.generic_rpc_request_handler( + invocation_metadata, json.dumps(req_dict), span) + try: + control_result = get_control_registry().evaluate( + span, + '', + invocation_metadata, + request_or_iterator, + True, + ) + if control_result.block: + logger.debug('should block evaluated to true, aborting with 403') + context.abort(grpc.StatusCode.PERMISSION_DENIED, 'Permission Denied') + + for response in behavior(request_or_iterator, context): + response_dict = MessageToDict(response) + self._gisw.generic_rpc_response_handler( + {}, json.dumps(response_dict), span) + yield response + except Exception as error: # pylint: disable=W0703 + if type(error) != Exception: # noqa: E721 + span.record_exception(error) + raise error # Wrapper around client-side interceptor class OpenTelemetryClientInterceptorWrapper(_client.OpenTelemetryClientInterceptor):