diff --git a/.github/run-package-tests.sh b/.github/run-package-tests.sh index d637e8a8b345..63faf9dc095b 100644 --- a/.github/run-package-tests.sh +++ b/.github/run-package-tests.sh @@ -70,7 +70,8 @@ for DIR in ${DIRS}; do "PubSub,cloud-pubsub" "Storage,cloud-storage,2.100" "ShoppingCommonProtos,shopping-common-protos" - "GeoCommonProtos,geo-common-protos,0.1" + "GeoCommonProtos,geo-common-protos,0.1", + "Monitoring,cloud-monitoring" ) for i in "${PACKAGE_DEPENDENCIES[@]}"; do IFS="," read -r PKG_DIR PKG_NAME PKG_VERSION <<< "$i" diff --git a/Spanner/composer.json b/Spanner/composer.json index 3e6f677ce53a..023c8ad3b378 100644 --- a/Spanner/composer.json +++ b/Spanner/composer.json @@ -7,7 +7,9 @@ "php": "^8.1", "ext-grpc": "*", "google/cloud-core": "^1.68", - "google/gax": "^1.40.0" + "google/gax": "^1.40.0", + "google/cloud-monitoring": "^2.2", + "open-telemetry/sdk": "^1.13" }, "require-dev": { "phpunit/phpunit": "^9.6", @@ -23,6 +25,7 @@ }, "suggest": { "ext-protobuf": "Provides a significant increase in throughput over the pure PHP protobuf implementation. See https://cloud.google.com/php/grpc for installation instructions.", + "ext-opentelemetry": "Provides a significant increase in performance for OpenTelemetry metrics collection.", "brick/math": "Perform arithmetic on NUMERIC values." }, "extra": { diff --git a/Spanner/src/Middleware/MetricsAttemptMiddleware.php b/Spanner/src/Middleware/MetricsAttemptMiddleware.php new file mode 100644 index 000000000000..8eb5fe01635f --- /dev/null +++ b/Spanner/src/Middleware/MetricsAttemptMiddleware.php @@ -0,0 +1,334 @@ +nextHandler = $nextHandler; + $this->attemptLatencyHistogram = $meter->createHistogram( + 'attempt_latencies', + 'ms', + 'The latency of an RPC attempt' + ); + $this->attemptCountCounter = $meter->createCounter( + 'attempt_count', + '1', + 'The number of RPC attempts' + ); + $this->attemptGfeHistogram = $meter->createHistogram( + 'gfe_latencies', + 'ms', + 'Latency between Google\'s network receiving an RPC and reading back the first byte of the response' + ); + $this->gfeConnectivityErrorCounter = $meter->createCounter( + 'gfe_connectivity_error_count', + '1', + 'Number of RPC attempts that failed to reach the GFE or returned no GFE headers' + ); + $this->attemptAfeHistogram = $meter->createHistogram( + 'afe_latencies', + 'ms', + 'Latency between Spanner Spanner AFE receiving and returning a response.' + ); + $this->afeConnectivityErrorCounter = $meter->createCounter( + 'afe_connectivity_error_count', + '1', + 'Number of connectivity errors for Spanner AFE' + ); + $this->clientId = $clientId; + $this->projectId = $projectId; + $this->clientName = 'spanner-php/' . $clientName; + $this->location = $location; + } + + public function __invoke(Call $call, array $options) + { + $next = $this->nextHandler; + + $startTime = microtime(true); + $directPathUsed = false; + + /** @var MetricsContext $metricsContext */ + $metricsContext = Context::getCurrent()->get( + MetricsContext::contextKey() + ); + if ($metricsContext) { + $metricsContext->setAttemptInstruments( + $this->attemptCountCounter, + $this->attemptLatencyHistogram + ); + $baseLabels = $this->getMetricLabels($call->getMethod(), $options, Code::OK, $directPathUsed); + unset($baseLabels['status']); + $metricsContext->setBaseLabels($baseLabels); + + $metricsContext->incrementAttemptCount(); + $metricsContext->setLastAttemptStartTime($startTime); + } + + // In case that something else is using this callback, + // we take the original one and call it later. + $originalCallback = $options['metadataCallback'] ?? null; + + // This gets the metadata on an ok status meaning we can get the GFE latency header for unary calls + $options['metadataCallback'] = function ($metadata) use ($originalCallback, $call, $options, &$directPathUsed) { + $serverTiming = $metadata['server-timing'][0] ?? null; + if ($serverTiming && strpos($serverTiming, 'afe;') !== false) { + $directPathUsed = true; + } + $this->recordGfeAndAfeLatency($metadata, $call, $options, Code::OK); + if ($originalCallback) { + $originalCallback($metadata); + } + }; + + try { + $response = $next( + $call, + $options + ); + } catch (Exception $e) { + // In case that the call is not a unary call and it is a streaming call error. + $this->recordAttempt($startTime, $e->getCode(), $call->getMethod(), $options, $directPathUsed); + $this->recordGfeAndAfeError($e, $call, $options); + throw $e; + } + + if ($response instanceof ServerStream) { + $metadata = $response->getServerStreamingCall()->getMetadata(); + $this->recordGfeAndAfeLatency($metadata, $call, $options, Code::OK); + // Attempt count and latency logging are bypassed (handled by generator wrapper) + } + + if ($response instanceof PromiseInterface) { + return $response->then( + function ($response) use ($startTime, $options, $call, &$directPathUsed) { + $this->recordAttempt($startTime, Code::OK, $call->getMethod(), $options, $directPathUsed); + return $response; + }, + function ($e) use ($startTime, $options, $call, &$directPathUsed) { + $this->recordAttempt($startTime, $e->getCode(), $call->getMethod(), $options, $directPathUsed); + $this->recordGfeAndAfeError($e, $call, $options); + throw $e; + } + ); + } + + // The response can be a stream + return $response; + } + + /** + * Records an Attempt + * + * @param array $options The options being used for the middleware layer to communicate amongst middlewares + * @param float $startTime The start time of the RPC attempt + * @param int $code The resulting code of the attempt + * @param string $method The RPC method name that is being called + * + * @return void + */ + private function recordAttempt( + float $startTime, + int $code, + string $method, + array $options, + bool $directPathUsed = false + ): void { + $endTime = microtime(true); + $duration = ($endTime - $startTime) * 1000; // Convert to MS + + $labels = $this->getMetricLabels($method, $options, $code, $directPathUsed); + + $this->attemptCountCounter->add(1, $labels); + $this->attemptLatencyHistogram->record($duration, $labels); + } + + /** + * Records the Gfe and Afe latency + * + * @param mixed $metadata + * @param Call $call + * @param array $options + * + * @return void + */ + private function recordGfeAndAfeLatency($metadata, Call $call, array $options, int $status): void + { + $serverTiming = $metadata['server-timing'][0] ?? null; + $gfeLatency = null; + $afeLatency = null; + + if ($serverTiming) { + if (preg_match('/gfet4t7;\s*dur=(\d+(\.\d+)?)/', $serverTiming, $matches)) { + $gfeLatency = (float) $matches[1]; + } + + if (preg_match('/afe;\s*dur=(\d+(\.\d+)?)/', $serverTiming, $matches)) { + $afeLatency = (float) $matches[1]; + } + } + + $directPathUsed = ($serverTiming && strpos($serverTiming, 'afe;') !== false); + $labels = $this->getMetricLabels($call->getMethod(), $options, $status, $directPathUsed); + + if ($serverTiming) { + if (!is_null($gfeLatency)) { + $this->attemptGfeHistogram->record($gfeLatency, $labels); + } + if (!is_null($afeLatency)) { + $this->attemptAfeHistogram->record($afeLatency, $labels); + } + } else { + // The server-timing header is completely missing, indicating a connectivity error. + $directPathEnabled = filter_var(getenv('GOOGLE_SPANNER_ENABLE_DIRECT_ACCESS'), FILTER_VALIDATE_BOOLEAN); + + if ($directPathEnabled) { + $this->afeConnectivityErrorCounter->add(1, $labels); + } else { + $this->gfeConnectivityErrorCounter->add(1, $labels); + } + } + } + + /** + * Creates an array containing the labels for metrics. + * + * @param string $method + * @param array $options + * @param int $code + * + * @return array + */ + private function getMetricLabels(string $method, array $options, int $code, bool $directPathUsed = false): array + { + $codeName = Code::name($code); + + // Extract resource information from the GAX routing header. + $params = $options['headers']['x-goog-request-params'][0] ?? ''; + $prefix = urldecode($params); + + if (preg_match('/instances\/([^\/]+)\/databases\/([^\/]+)/', $prefix, $matches)) { + $instanceId = $matches[1]; + $databaseId = $matches[2]; + } + + $directPathEnabled = filter_var(getenv('GOOGLE_SPANNER_ENABLE_DIRECT_ACCESS'), FILTER_VALIDATE_BOOLEAN); + + return [ + 'method' => $method, + 'status' => $codeName, + 'instance_id' => $instanceId ?? '', + 'database' => $databaseId ?? '', + 'project_id' => $this->projectId, + 'client_uid' => $this->clientId, + 'client_name' => $this->clientName, + 'instance_config' => self::INSTANCE_CONFIG, + 'location' => $this->location, + 'directpath_enabled' => $directPathEnabled ? 'true' : 'false', + 'directpath_used' => $directPathUsed ? 'true' : 'false' + ]; + } + + /** + * Records an GFE and/or an AFE error + * + * @param Exception $e + * @param Call $call + * @param array $options + * + * @return void + */ + private function recordGfeAndAfeError(Exception $e, Call $call, array $options): void + { + if ($e instanceof ApiException) { + $this->recordGfeAndAfeLatency($e->getMetadata() ?? [], $call, $options, $e->getCode()); + } else { + $this->recordGfeAndAfeLatency([], $call, $options, Code::UNKNOWN); + } + } +} diff --git a/Spanner/src/Middleware/MetricsOperationMiddleware.php b/Spanner/src/Middleware/MetricsOperationMiddleware.php new file mode 100644 index 000000000000..fbc43b852f97 --- /dev/null +++ b/Spanner/src/Middleware/MetricsOperationMiddleware.php @@ -0,0 +1,215 @@ +nextHandler = $nextHandler; + $this->operationLatencyHistogram = $meter->createHistogram( + 'operation_latencies', + 'ms', + 'The latency of an RPC operations' + ); + $this->operationCountCounter = $meter->createCounter( + 'operation_count', + '1', + 'The number of RPC operations' + ); + $this->clientId = $clientId; + $this->projectId = $projectId; + $this->clientName = 'spanner-php/' . $clientName; + $this->location = $location; + } + + public function __invoke(Call $call, array $options) + { + $next = $this->nextHandler; + + $metricsContext = Context::getCurrent()->get( + MetricsContext::contextKey() + ); + + if ($metricsContext) { + $metricsContext->setOperationInstruments( + $this->operationCountCounter, + $this->operationLatencyHistogram + ); + } + + if ($metricsContext && $metricsContext->isResume()) { + return $next($call, $options); + } + + $startTime = microtime(true); + $directPathUsed = false; + + $originalCallback = $options['metadataCallback'] ?? null; + $options['metadataCallback'] = function ($metadata) use ($originalCallback, &$directPathUsed) { + $serverTiming = $metadata['server-timing'][0] ?? null; + if ($serverTiming && strpos($serverTiming, 'afe;') !== false) { + $directPathUsed = true; + } + if ($originalCallback) { + $originalCallback($metadata); + } + }; + + try { + $response = $next( + $call, + $options + ); + } catch (Exception $ex) { + $this->recordOperation($startTime, $ex->getCode(), $call->getMethod(), $options, $directPathUsed); + throw $ex; + } + + if ($response instanceof ServerStream) { + // Let the stream iterator (Result.php) log the final operation metrics! + } + + if ($response instanceof PromiseInterface) { + return $response->then( + function ($response) use ($startTime, $options, $call, &$directPathUsed) { + $this->recordOperation($startTime, Code::OK, $call->getMethod(), $options, $directPathUsed); + return $response; + }, + function ($e) use ($startTime, $options, $call, &$directPathUsed) { + $this->recordOperation($startTime, $e->getCode(), $call->getMethod(), $options, $directPathUsed); + throw $e; + } + ); + } + + // response can be a stream + return $response; + } + + /** + * Records a completed operation (failures are considered completions). + * + * @param float $startTime The start time of the operation + * @param int $code The resulting code of the operation + * @param string $method The RPC name being called + * @param array $options The options used for middleware communication + * @param bool $directPathUsed Whether DirectPath was used + * + * @return void + */ + private function recordOperation( + float $startTime, + int $code, + string $method, + array $options, + bool $directPathUsed = false + ): void { + $endTime = microtime(true); + $duration = ($endTime - $startTime) * 1000; // Convert seconds to ms + $codeName = Code::name($code); + + // Extract resource information from the GAX routing header. + $params = $options['headers']['x-goog-request-params'][0] ?? ''; + $prefix = urldecode($params); + + if (preg_match('/instances\/([^\/]+)\/databases\/([^\/]+)/', $prefix, $matches)) { + $instanceId = $matches[1]; + $databaseId = $matches[2]; + } + + $directPathEnabled = filter_var(getenv('GOOGLE_SPANNER_ENABLE_DIRECT_ACCESS'), FILTER_VALIDATE_BOOLEAN); + + $labels = [ + 'method' => $method, + 'status' => $codeName, + 'instance_id' => $instanceId ?? '', + 'database' => $databaseId ?? '', + 'project_id' => $this->projectId, + 'client_uid' => $this->clientId, + 'client_name' => $this->clientName, + 'instance_config' => self::INSTANCE_CONFIG, + 'location' => $this->location, + 'directpath_enabled' => $directPathEnabled ? 'true' : 'false', + 'directpath_used' => $directPathUsed ? 'true' : 'false' + ]; + + $this->operationCountCounter->add(1, $labels); + $this->operationLatencyHistogram->record($duration, $labels); + } +} diff --git a/Spanner/src/OpenTelemetry/MetricsContext.php b/Spanner/src/OpenTelemetry/MetricsContext.php new file mode 100644 index 000000000000..03032be1cbc5 --- /dev/null +++ b/Spanner/src/OpenTelemetry/MetricsContext.php @@ -0,0 +1,210 @@ +operationId = uniqid('spanner-op-', true); + $this->operationStartTime = microtime(true); + $this->lastAttemptStartTime = $this->operationStartTime; + } + + /** + * Returns the unique ContextKey for OTel propagation. + */ + public static function contextKey(): ContextKeyInterface + { + if (self::$contextKey === null) { + self::$contextKey = Context::createKey('spanner-metrics-context'); + } + return self::$contextKey; + } + + /** + * Registers OTel attempt instruments. + */ + public function setAttemptInstruments(?CounterInterface $counter, ?HistogramInterface $histogram): void + { + $this->attemptCountCounter = $counter; + $this->attemptLatencyHistogram = $histogram; + } + + /** + * Returns the registered attempt counter. + */ + public function getAttemptCountCounter(): ?CounterInterface + { + return $this->attemptCountCounter; + } + + /** + * Returns the registered attempt latency histogram. + */ + public function getAttemptLatencyHistogram(): ?HistogramInterface + { + return $this->attemptLatencyHistogram; + } + + /** + * Registers OTel operation instruments. + */ + public function setOperationInstruments(?CounterInterface $counter, ?HistogramInterface $histogram): void + { + $this->operationCountCounter = $counter; + $this->operationLatencyHistogram = $histogram; + } + + /** + * Returns the registered operation counter. + */ + public function getOperationCountCounter(): ?CounterInterface + { + return $this->operationCountCounter; + } + + /** + * Returns the registered operation latency histogram. + */ + public function getOperationLatencyHistogram(): ?HistogramInterface + { + return $this->operationLatencyHistogram; + } + + /** + * Sets the base metric labels. + */ + public function setBaseLabels(array $labels): void + { + $this->baseLabels = $labels; + } + + /** + * Returns the base metric labels. + */ + public function getBaseLabels(): array + { + return $this->baseLabels; + } + + /** + * Returns the unique logical operation ID. + */ + public function getOperationId(): string + { + return $this->operationId; + } + + /** + * Returns the operation start timestamp. + */ + public function getOperationStartTime(): float + { + return $this->operationStartTime; + } + + /** + * Sets the start timestamp of the last initiated attempt. + */ + public function setLastAttemptStartTime(float $timestamp): void + { + $this->lastAttemptStartTime = $timestamp; + } + + /** + * Gets the start timestamp of the last initiated attempt. + */ + public function getLastAttemptStartTime(): float + { + return $this->lastAttemptStartTime; + } + + /** + * Sets whether the next attempt is a stream resumption. + */ + public function setIsResume(bool $isResume): void + { + $this->isResume = $isResume; + } + + /** + * Returns whether the next attempt is a stream resumption. + */ + public function isResume(): bool + { + return $this->isResume; + } + + /** + * Increments and returns the attempt count. + */ + public function incrementAttemptCount(): int + { + $this->attemptCount++; + return $this->attemptCount; + } + + /** + * Returns the current attempt count. + */ + public function getAttemptCount(): int + { + return $this->attemptCount; + } +} diff --git a/Spanner/src/OpenTelemetry/MetricsExporter.php b/Spanner/src/OpenTelemetry/MetricsExporter.php new file mode 100644 index 000000000000..249adcb10e47 --- /dev/null +++ b/Spanner/src/OpenTelemetry/MetricsExporter.php @@ -0,0 +1,322 @@ + true, + 'instance_id' => true, + 'instance_config' => true, + 'location' => true, + 'client_hash' => true, + ]; + + private MetricServiceClient $client; + private string $projectId; + private string $clientHash; + private int $timeoutMillis; + + /** + * @param MetricServiceClient $client The monitoring client. + * @param string $projectId The GCP project ID metrics will be written to. + * @param string $clientUid The unique client identifier. + * @param int $timeoutMillis The timeout defined for the metrics client during export. + */ + public function __construct(MetricServiceClient $client, string $projectId, string $clientUid, int $timeoutMillis) + { + $this->client = $client; + $this->projectId = $projectId; + $this->clientHash = $this->generateClientHash($clientUid); + $this->timeoutMillis = $timeoutMillis; + } + + /** + * Exports a batch of OTel metrics to Cloud Monitoring. + * + * @param iterable $batch + * @return bool + */ + public function export(iterable $batch): bool + { + $timeSeriesList = []; + foreach ($batch as $otelMetric) { + $timeSeriesList = array_merge($timeSeriesList, $this->mapMetric($otelMetric)); + } + + if (empty($timeSeriesList)) { + return true; + } + + $projectName = MetricServiceClient::projectName($this->projectId); + $chunks = array_chunk($timeSeriesList, self::SEND_BATCH_SIZE); + + foreach ($chunks as $chunk) { + $request = new CreateTimeSeriesRequest(); + $request->setName($projectName); + $request->setTimeSeries($chunk); + + try { + $this->client->createServiceTimeSeries($request, [ + 'timeoutMillis' => $this->timeoutMillis + ]); + } catch (\Exception $e) { + // Fail silently during shutdown to avoid user-visible errors. + } + } + + return true; + } + + /** + * Implementation of the forcePush method for PushMetricExporter interface. + * + * @return true + */ + public function forceFlush(): bool + { + return true; + } + + /** + * Implementation of the shutdown method for PushMetricExporterInterface. + * + * @return true + */ + public function shutdown(): bool + { + $this->client->close(); + return true; + } + + /** + * Returns the aggregation temporality for the given metric. + * + * @param MetricMetadataInterface $metadata + * @return string + */ + public function temporality(MetricMetadataInterface $metadata): string + { + return Temporality::CUMULATIVE; + } + + /** + * Maps an OTel Metric object to one or more GCM TimeSeries objects. + * + * @param OTelMetric $otelMetric + */ + private function mapMetric(OTelMetric $otelMetric): array + { + $timeSeriesList = []; + $metricType = $this->formatMetricName($otelMetric->name); + + $data = $otelMetric->data; + if ($data instanceof Sum || $data instanceof Histogram) { + foreach ($data->dataPoints as $point) { + $timeSeriesList[] = $this->createTimeSeries($metricType, $point, $otelMetric->unit, $data); + } + } + + return $timeSeriesList; + } + + /** + * Creates a single GCM TimeSeries from an OTel DataPoint. + * + * @param string $metricType + * @param NumberDataPoint|HistogramDataPoint $otelPoint + * @param string|null $unit + * @param DataInterface $otelData + * @return TimeSeries + */ + private function createTimeSeries( + string $metricType, + NumberDataPoint|HistogramDataPoint $otelPoint, + ?string $unit, + DataInterface $otelData + ): TimeSeries { + $ts = new TimeSeries(); + $unit = $unit ?? '1'; + + $metricLabels = []; + $resourceLabels = [ + 'client_hash' => $this->clientHash, + ]; + + // Distribute attributes between Resource and Metric labels + foreach ($otelPoint->attributes as $key => $value) { + $labelKey = str_replace('.', '_', $key); + if (isset(self::$MONITORED_RES_LABELS[$labelKey])) { + $resourceLabels[$labelKey] = (string) $value; + } else { + $metricLabels[$labelKey] = (string) $value; + } + } + + $metric = new Metric(); + $metric->setType($metricType); + $metric->setLabels($metricLabels); + $ts->setMetric($metric); + + $resource = new MonitoredResource(); + $resource->setType(self::SPANNER_RESOURCE_TYPE); + $resource->setLabels($resourceLabels); + $ts->setResource($resource); + + $ts->setUnit($unit); + + $point = new Point(); + $interval = new TimeInterval(); + + // Convert nanoseconds to Protobuf Timestamp + $interval->setStartTime($this->toTimestamp($otelPoint->startTimestamp)); + $interval->setEndTime($this->toTimestamp($otelPoint->timestamp)); + $point->setInterval($interval); + + $value = new TypedValue(); + if ($otelData instanceof Sum) { + $ts->setMetricKind($otelData->monotonic ? MetricKind::CUMULATIVE : MetricKind::GAUGE); + if (is_int($otelPoint->value)) { + $value->setInt64Value($otelPoint->value); + $ts->setValueType(ValueType::INT64); + } else { + $value->setDoubleValue((float) $otelPoint->value); + $ts->setValueType(ValueType::DOUBLE); + } + } elseif ($otelData instanceof Histogram) { + $ts->setMetricKind(MetricKind::CUMULATIVE); + $ts->setValueType(ValueType::DISTRIBUTION); + + $dist = new Distribution(); + $dist->setCount($otelPoint->count); + if ($otelPoint->count > 0) { + $dist->setMean($otelPoint->sum / $otelPoint->count); + } + $dist->setBucketCounts($otelPoint->bucketCounts); + + $bucketOptions = new BucketOptions(); + $explicit = new Explicit(); + $explicit->setBounds($otelPoint->explicitBounds); + $bucketOptions->setExplicitBuckets($explicit); + $dist->setBucketOptions($bucketOptions); + + $value->setDistributionValue($dist); + } + + $point->setValue($value); + $ts->setPoints([$point]); + + return $ts; + } + + /** + * Formats the metric name for Cloud Monitoring. + * Built-in metrics MUST use the specific internal namespace. + * + * @param string $name The OTel instrument name. + * @return string The fully qualified GCM metric type. + */ + private function formatMetricName(string $name): string + { + return self::NATIVE_METRICS_PREFIX . $name; + } + + /** + * Converts nanoseconds to a php Timestamp + * + * @param int $nanos + * @return Timestamp + */ + private function toTimestamp(int $nanos): Timestamp + { + $timestamp = new Timestamp(); + $timestamp->setSeconds((int) ($nanos / 1_000_000_000)); + $timestamp->setNanos((int) ($nanos % 1_000_000_000)); + return $timestamp; + } + + /** + * Returns a hash of the client UUID for the metrics + * + * @param string $clientUid + * @return string + */ + private function generateClientHash(string $clientUid): string + { + if ($clientUid === '') { + return '000000'; + } + + $hashHex = hash('fnv164', $clientUid); + $firstFour = substr($hashHex, 0, 4); + $intVal = hexdec($firstFour); + $tenBits = $intVal >> 6; + return sprintf('%06x', $tenBits); + } +} diff --git a/Spanner/src/Operation.php b/Spanner/src/Operation.php index 4740e82985d8..fc3c4f92bb60 100644 --- a/Spanner/src/Operation.php +++ b/Spanner/src/Operation.php @@ -28,6 +28,7 @@ use Google\Cloud\Core\RequestProcessorTrait; use Google\Cloud\Spanner\Batch\QueryPartition; use Google\Cloud\Spanner\Batch\ReadPartition; +use Google\Cloud\Spanner\OpenTelemetry\MetricsContext; use Google\Cloud\Spanner\Session\SessionCache; use Google\Cloud\Spanner\V1\BeginTransactionRequest; use Google\Cloud\Spanner\V1\Client\SpannerClient; @@ -50,6 +51,7 @@ use Google\Protobuf\RepeatedField; use Google\Rpc\Code; use InvalidArgumentException; +use OpenTelemetry\Context\Context; /** * Common interface for running operations against Cloud Spanner. This class is @@ -281,32 +283,94 @@ public function execute(SessionCache $session, string $sql, array $options = []) // @TODO potentially move to a `Spanner\CallOptions` $callOptions += $rtl; - // Initially with begin, transactionId will be null. - // Once transaction is generated, even in the case of stream failure, - // transaction will be passed to this callable by the Result class. - $call = function ($resumeToken = null, $transaction = null) use ( - $session, - $executeSqlRequest, - $callOptions - ) { - if ($transaction && !empty($transaction->id())) { - $executeSqlRequest->setTransaction(new TransactionSelector(['id' => $transaction->id()])); - } - if ($resumeToken) { - $executeSqlRequest->setResumeToken($resumeToken); - } + $metricsContext = new MetricsContext(); + $ctx = Context::getCurrent()->with( + MetricsContext::contextKey(), + $metricsContext + ); + $scope = $ctx->activate(); - if (!$this->routeToLeader) { - unset($callOptions['route-to-leader']); - } + try { + // Initially with begin, transactionId will be null. + // Once transaction is generated, even in the case of stream failure, + // transaction will be passed to this callable by the Result class. + $call = function ($resumeToken = null, $transaction = null) use ( + $session, + $executeSqlRequest, + $callOptions + ) { + if ($transaction && !empty($transaction->id())) { + $executeSqlRequest->setTransaction(new TransactionSelector(['id' => $transaction->id()])); + } + if ($resumeToken) { + $executeSqlRequest->setResumeToken($resumeToken); + } - $stream = $this->spannerClient->executeStreamingSql($executeSqlRequest, $callOptions + [ - 'resource-prefix' => $this->getDatabaseNameFromSession($session), - ]); + if (!$this->routeToLeader) { + unset($callOptions['route-to-leader']); + } - return $this->handleResultSetStream($stream, $transaction); - }; - return new Result($this, $session, $call, $miscOptions['transactionContext'] ?? null, $this->mapper); + $stream = $this->spannerClient->executeStreamingSql($executeSqlRequest, $callOptions + [ + 'resource-prefix' => $this->getDatabaseNameFromSession($session), + ]); + + return $this->handleResultSetStream($stream, $transaction); + }; + + $wrappedCall = function ($resumeToken = null, $transaction = null) use ($call, $metricsContext) { + $generator = $call($resumeToken, $transaction); + $delegator = function () use ($generator, $metricsContext) { + try { + foreach ($generator as $result) { + yield $result; + } + if ($metricsContext) { + $attemptCounter = $metricsContext->getAttemptCountCounter(); + $attemptHistogram = $metricsContext->getAttemptLatencyHistogram(); + $opCounter = $metricsContext->getOperationCountCounter(); + $opHistogram = $metricsContext->getOperationLatencyHistogram(); + + $labels = $metricsContext->getBaseLabels(); + $labels['status'] = Code::name(Code::OK); + + if ($attemptCounter && $attemptHistogram) { + $attemptCounter->add(1, $labels); + $duration = (microtime(true) - $metricsContext->getLastAttemptStartTime()) * 1000; + $attemptHistogram->record($duration, $labels); + } + + if ($opCounter && $opHistogram) { + $opCounter->add(1, $labels); + $duration = (microtime(true) - $metricsContext->getOperationStartTime()) * 1000; + $opHistogram->record($duration, $labels); + } + } + } catch (\Exception $ex) { + if ($metricsContext) { + $attemptCounter = $metricsContext->getAttemptCountCounter(); + $attemptHistogram = $metricsContext->getAttemptLatencyHistogram(); + + $labels = $metricsContext->getBaseLabels(); + $labels['status'] = \Google\Rpc\Code::name($ex->getCode()); + + if ($attemptCounter && $attemptHistogram) { + $attemptCounter->add(1, $labels); + $duration = (microtime(true) - $metricsContext->getLastAttemptStartTime()) * 1000; + $attemptHistogram->record($duration, $labels); + } + + $metricsContext->setIsResume(true); + } + throw $ex; + } + }; + return $delegator(); + }; + + return new Result($this, $session, $wrappedCall, $miscOptions['transactionContext'] ?? null, $this->mapper); + } finally { + $scope->detach(); + } } /** @@ -502,38 +566,99 @@ public function read( // Spanner allows "route-to-leader" as a call option {@see Middleware\SpannerMiddleware} $callOptions += $rtl; - $call = function ($resumeToken = null, $transaction = null) use ( - $table, - $session, - $columns, - $readRequest, - $callOptions - ) { - if ($transaction && !empty($transaction->id())) { - $readRequest->setTransaction(new TransactionSelector(['id' => $transaction->id()])); - } - if ($resumeToken) { - $readRequest->setResumeToken($resumeToken); - } - - $readRequest - ->setTable($table) - ->setSession($session->name()) - ->setColumns($columns); + $metricsContext = new MetricsContext(); + $ctx = Context::getCurrent()->with( + MetricsContext::contextKey(), + $metricsContext + ); + $scope = $ctx->activate(); - if (!$this->routeToLeader) { - unset($callOptions['route-to-leader']); - } + try { + $call = function ($resumeToken = null, $transaction = null) use ( + $table, + $session, + $columns, + $readRequest, + $callOptions + ) { + if ($transaction && !empty($transaction->id())) { + $readRequest->setTransaction(new TransactionSelector(['id' => $transaction->id()])); + } + if ($resumeToken) { + $readRequest->setResumeToken($resumeToken); + } - $stream = $this->spannerClient->streamingRead($readRequest, $callOptions + [ - 'resource-prefix' => $this->getDatabaseNameFromSession($session), - ]); + $readRequest + ->setTable($table) + ->setSession($session->name()) + ->setColumns($columns); - // return the generator - return $this->handleResultSetStream($stream, $transaction); - }; + if (!$this->routeToLeader) { + unset($callOptions['route-to-leader']); + } - return new Result($this, $session, $call, $context, $this->mapper); + $stream = $this->spannerClient->streamingRead($readRequest, $callOptions + [ + 'resource-prefix' => $this->getDatabaseNameFromSession($session), + ]); + + // return the generator + return $this->handleResultSetStream($stream, $transaction); + }; + + $wrappedCall = function ($resumeToken = null, $transaction = null) use ($call, $metricsContext) { + $generator = $call($resumeToken, $transaction); + $delegator = function () use ($generator, $metricsContext) { + try { + foreach ($generator as $result) { + yield $result; + } + if ($metricsContext) { + $attemptCounter = $metricsContext->getAttemptCountCounter(); + $attemptHistogram = $metricsContext->getAttemptLatencyHistogram(); + $opCounter = $metricsContext->getOperationCountCounter(); + $opHistogram = $metricsContext->getOperationLatencyHistogram(); + + $labels = $metricsContext->getBaseLabels(); + $labels['status'] = \Google\Rpc\Code::name(\Google\Rpc\Code::OK); + + if ($attemptCounter && $attemptHistogram) { + $attemptCounter->add(1, $labels); + $duration = (microtime(true) - $metricsContext->getLastAttemptStartTime()) * 1000; + $attemptHistogram->record($duration, $labels); + } + + if ($opCounter && $opHistogram) { + $opCounter->add(1, $labels); + $duration = (microtime(true) - $metricsContext->getOperationStartTime()) * 1000; + $opHistogram->record($duration, $labels); + } + } + } catch (\Exception $ex) { + if ($metricsContext) { + $attemptCounter = $metricsContext->getAttemptCountCounter(); + $attemptHistogram = $metricsContext->getAttemptLatencyHistogram(); + + $labels = $metricsContext->getBaseLabels(); + $labels['status'] = \Google\Rpc\Code::name($ex->getCode()); + + if ($attemptCounter && $attemptHistogram) { + $attemptCounter->add(1, $labels); + $duration = (microtime(true) - $metricsContext->getLastAttemptStartTime()) * 1000; + $attemptHistogram->record($duration, $labels); + } + + $metricsContext->setIsResume(true); + } + throw $ex; + } + }; + return $delegator(); + }; + + return new Result($this, $session, $wrappedCall, $context, $this->mapper); + } finally { + $scope->detach(); + } } /** diff --git a/Spanner/src/SpannerClient.php b/Spanner/src/SpannerClient.php index ab7f4136c91d..bc34da871bc2 100644 --- a/Spanner/src/SpannerClient.php +++ b/Spanner/src/SpannerClient.php @@ -17,11 +17,14 @@ namespace Google\Cloud\Spanner; +use Exception; use Google\ApiCore\ClientOptionsTrait; use Google\ApiCore\Middleware\MiddlewareInterface; use Google\ApiCore\Options\CallOptions; use Google\ApiCore\ValidationException; +use Google\Auth\Credentials\GCECredentials; use Google\Cloud\Core\ApiHelperTrait; +use Google\Cloud\Core\Compute\Metadata; use Google\Cloud\Core\DetectProjectIdTrait; use Google\Cloud\Core\EmulatorTrait; use Google\Cloud\Core\Exception\GoogleException; @@ -30,6 +33,7 @@ use Google\Cloud\Core\LongRunning\LongRunningClientConnection; use Google\Cloud\Core\LongRunning\LongRunningOperation; use Google\Cloud\Core\OptionsValidator; +use Google\Cloud\Monitoring\V3\Client\MetricServiceClient; use Google\Cloud\Spanner\Admin\Database\V1\Client\DatabaseAdminClient; use Google\Cloud\Spanner\Admin\Instance\V1\Client\InstanceAdminClient; use Google\Cloud\Spanner\Admin\Instance\V1\InstanceConfig; @@ -38,15 +42,24 @@ use Google\Cloud\Spanner\Admin\Instance\V1\ListInstancesRequest; use Google\Cloud\Spanner\Admin\Instance\V1\ReplicaInfo; use Google\Cloud\Spanner\Batch\BatchClient; +use Google\Cloud\Spanner\Middleware\MetricsAttemptMiddleware; +use Google\Cloud\Spanner\Middleware\MetricsOperationMiddleware; use Google\Cloud\Spanner\Middleware\RequestIdHeaderMiddleware; use Google\Cloud\Spanner\Middleware\SpannerMiddleware; +use Google\Cloud\Spanner\OpenTelemetry\MetricsExporter; use Google\Cloud\Spanner\V1\Client\SpannerClient as GapicSpannerClient; use Google\Cloud\Spanner\V1\TransactionOptions\IsolationLevel; use Google\Cloud\Spanner\V1\TransactionOptions\ReadWrite\ReadLockMode; use Google\LongRunning\Operation as OperationProto; use Google\Protobuf\Duration; +use OpenTelemetry\API\Metrics\MeterInterface; +use OpenTelemetry\API\Metrics\MeterProviderInterface; +use OpenTelemetry\SDK\Common\Util\ShutdownHandler; +use OpenTelemetry\SDK\Metrics\MeterProvider; +use OpenTelemetry\SDK\Metrics\MetricReader\ExportingReader; use Psr\Cache\CacheItemPoolInterface; use Psr\Http\Message\StreamInterface; +use Ramsey\Uuid\Uuid as RUUID; /** * Cloud Spanner is a highly scalable, transactional, managed, NewSQL @@ -133,6 +146,8 @@ class SpannerClient private int $isolationLevel; private int $readLockMode; private CacheItemPoolInterface|null $cacheItemPool; + private MeterInterface $meter; + private MeterProviderInterface $meterProvider; private static array $activeChannels = []; private static int $totalActiveChannels = 0; @@ -187,6 +202,12 @@ class SpannerClient * @type int $isolationLevel The level of Isolation for the transactions executed by this Client's instance. * **Defaults to** IsolationLevel::ISOLATION_LEVEL_UNSPECIFIED * @type CacheItemPoolInterface $cacheItemPool + * @type bool $enableBuiltInMetrics If true, built-in metrics collection will be enabled. + * **Defaults to** false. + * @type int $metricsTimeoutMillis The timeout in milliseconds for the internal + * `MetricServiceClient` used to export metrics. **Defaults to** 100. + * @type MetricServiceClient $metricServiceClient An explicit instance of + * `MetricServiceClient` to use for exporting metrics. * } * @throws GoogleException If the gRPC extension is not enabled. */ @@ -206,7 +227,9 @@ public function __construct(array $options = []) 'isolationLevel' => IsolationLevel::ISOLATION_LEVEL_UNSPECIFIED, 'readLockMode' => ReadLockMode::READ_LOCK_MODE_UNSPECIFIED, 'routeToLeader' => true, - 'cacheItemPool' => null + 'cacheItemPool' => null, + 'enableBuiltInMetrics' => false, + 'metricsTimeoutMillis' => 100, ]; $this->returnInt64AsObject = $options['returnInt64AsObject']; @@ -276,6 +299,8 @@ public function __construct(array $options = []) $this->instanceAdminClient->addMiddleware($middleware); $this->databaseAdminClient->addMiddleware($middleware); + $this->configureMetrics($options); + $this->projectName = InstanceAdminClient::projectName($this->projectId); $this->cacheItemPool = $options['cacheItemPool']; } @@ -1027,4 +1052,124 @@ private function configureKeepAlive(array $config): array return $config; } + + private function configureMetrics(array &$options): void + { + $metricsClient = $this->pluck('metricServiceClient', $options, false); + $timeoutMillis = $this->pluck('metricsTimeoutMillis', $options, false) ?? 100; + $location = $this->getLocation(); + + if (!$this->pluck('enableBuiltInMetrics', $options, false)) { + return; + } + + if (!$metricsClient) { + $metricsOptions = [ + 'projectId' => $this->projectId, + 'keyFile' => $options['keyFile'] ?? null, + 'keyFilePath' => $options['keyFilePath'] ?? null, + 'credentials' => $options['credentials'] ?? null, + 'credentialsConfig' => $options['credentialsConfig'] ?? null, + 'universeDomain' => $options['universeDomain'] ?? null, + 'transport' => $options['transport'] ?? null, + 'transportConfig' => $options['transportConfig'] ?? null + ]; + + try { + $metricsClient = new MetricServiceClient(array_filter($metricsOptions)); + } catch (ValidationException $e) { + // If we cannot instantiate the metrics client, we should not stop the execution + return; + } + } + + if (!$metricsClient instanceof MetricServiceClient) { + throw new ValidationException('The "metricServiceClient" option must be a MetricServiceClient instance.'); + } + + $metricsClientId = RUUID::uuid4()->toString() . '-' . getmypid(); + $exporter = new MetricsExporter($metricsClient, $this->projectId, $metricsClientId, $timeoutMillis); + $reader = new ExportingReader($exporter); + $this->meterProvider = MeterProvider::builder() + ->addReader($reader) + ->build(); + + $this->meter = $this->meterProvider->getMeter('google-cloud-spanner'); + ShutdownHandler::register([$this->meterProvider, 'shutdown']); + + $attemptMetricsMiddleware = function (MiddlewareInterface $handler) use ($metricsClientId, $location) { + return new MetricsAttemptMiddleware( + $handler, + $this->meter, + $metricsClientId, + $this->projectId, + $this->clientVersion(), + $location + ); + }; + + $operationMetricsMiddleware = function (MiddlewareInterface $handler) use ($metricsClientId, $location) { + return new MetricsOperationMiddleware( + $handler, + $this->meter, + $metricsClientId, + $this->projectId, + $this->clientVersion(), + $location + ); + }; + + $this->spannerClient->prependMiddleware($attemptMetricsMiddleware); + $this->spannerClient->addMiddleware($operationMetricsMiddleware); + } + + /** + * Returns the current client version. + * + * @return string + */ + private function clientVersion(): string + { + return trim(file_get_contents(__DIR__ . '/../VERSION')); + } + + /** + * Gets the current location for the client for GCP or 'global' as a fallback. + * + * @return string + */ + private function getLocation(): string + { + $location = 'global'; + if (!GCECredentials::onGce()) { + return $location; + } + + try { + $metadata = new Metadata(); + + $location = $metadata->get('instance/attributes/cluster-location'); + if ($location) { + return $location; + } + + $region = $metadata->get('instance/region'); + if ($region) { + // Region is returned as "projects/[NUM]/regions/[REGION-NAME]" + return substr($region, strrpos($region, '/') + 1); + } + + $zone = $metadata->get('instance/zone'); + if ($zone) { + // Zone is "projects/[NUM]/zones/[ZONE-NAME]" + $zoneName = substr($zone, strrpos($zone, '/') + 1); + $lastHyphen = strrpos($zoneName, '-'); + return ($lastHyphen !== false) ? substr($zoneName, 0, $lastHyphen) : $zoneName; + } + } catch (Exception $e) { + // avoid crashing the client in case of a metadata error + } + + return $location; + } } diff --git a/Spanner/tests/Snippet/CommitTimestampTest.php b/Spanner/tests/Snippet/CommitTimestampTest.php index 4b7d7a0ef09b..d6fd7ec71d5f 100644 --- a/Spanner/tests/Snippet/CommitTimestampTest.php +++ b/Spanner/tests/Snippet/CommitTimestampTest.php @@ -57,10 +57,13 @@ public function testClass() { $id = 'abc'; + // One add for the SpannerMiddleware and one for the Metrics middleware $this->spannerClient->addMiddleware(Argument::type('callable')) - ->shouldBeCalledOnce(); + ->shouldBeCalled(2); + + // One prepend for the Spanner Header Id and one for the Metrics middleware $this->spannerClient->prependMiddleware(Argument::type('callable')) - ->shouldBeCalledOnce(); + ->shouldBeCalled(2); // ensure cache hit $cacheItem = $this->prophesize(CacheItemInterface::class); diff --git a/Spanner/tests/System/QueryTest.php b/Spanner/tests/System/QueryTest.php index 4aa5a2b45de8..7b15113bd37d 100644 --- a/Spanner/tests/System/QueryTest.php +++ b/Spanner/tests/System/QueryTest.php @@ -27,6 +27,7 @@ use Google\Cloud\Spanner\Interval; use Google\Cloud\Spanner\Numeric; use Google\Cloud\Spanner\Result; +use Google\Cloud\Spanner\SpannerClient; use Google\Cloud\Spanner\StructType; use Google\Cloud\Spanner\StructValue; use Google\Cloud\Spanner\Timestamp; @@ -1251,4 +1252,34 @@ public function testBindStructInferredParameterTypesWithUnnamed() ] ], $res); } + + /** + * This test ensures that enabling built-in metrics does not interfere with + * normal client operations and that the OpenTelemetry pipeline is stable. + */ + public function testBuiltInMetrics() + { + if (self::isEmulatorUsed()) { + $this->markTestSkipped('Built-in metrics are not supported on Emulator.'); + } + + $keyFilePath = getenv('GOOGLE_CLOUD_PHP_TESTS_KEY_PATH'); + $client = new SpannerClient([ + 'keyFilePath' => $keyFilePath, + 'enableBuiltInMetrics' => true, + 'metricsTimeoutMillis' => 100000, + ]); + + $db = $client->connect(self::INSTANCE_NAME, self::$dbName); + + // Execute a query to trigger metrics (Attempt, Operation, GFE) + $res = $db->execute('SELECT 1'); + $row = $res->rows()->current(); + + $this->assertEquals(1, $row[0]); + + // Success here means the middlewares correctly handled the request/response + // and recorded metrics without throwing exceptions. + // The final export happens at PHP shutdown via ShutdownHandler. + } } diff --git a/Spanner/tests/System/SystemTestCaseTrait.php b/Spanner/tests/System/SystemTestCaseTrait.php index ab0e8ac4a3ab..50c2259e092e 100644 --- a/Spanner/tests/System/SystemTestCaseTrait.php +++ b/Spanner/tests/System/SystemTestCaseTrait.php @@ -67,11 +67,10 @@ private static function getClient() ], ] ]; - $clientConfig = [ 'keyFilePath' => $keyFilePath, + 'enableBuiltInMetrics' => false, // Disabling the metrics for general tests ]; - $serviceAddress = getenv('SPANNER_SERVICE_ADDRESS'); if ($serviceAddress) { $gapicConfig = [ diff --git a/Spanner/tests/Unit/Middleware/BuiltInMetricsAttemptMiddlewareTest.php b/Spanner/tests/Unit/Middleware/BuiltInMetricsAttemptMiddlewareTest.php new file mode 100644 index 000000000000..7779210a6fd4 --- /dev/null +++ b/Spanner/tests/Unit/Middleware/BuiltInMetricsAttemptMiddlewareTest.php @@ -0,0 +1,266 @@ +attemptHistogram = $this->prophesize(HistogramInterface::class); + $this->attemptCounter = $this->prophesize(CounterInterface::class); + $this->gfeHistogram = $this->prophesize(HistogramInterface::class); + $this->gfeErrorCounter = $this->prophesize(CounterInterface::class); + $this->afeHistogram = $this->prophesize(HistogramInterface::class); + $this->afeErrorCounter = $this->prophesize(CounterInterface::class); + $this->meter = $this->prophesize(MeterInterface::class); + + $this->meter->createHistogram( + 'attempt_latencies', + 'ms', + Argument::any() + )->willReturn($this->attemptHistogram->reveal()); + + $this->meter->createCounter( + 'attempt_count', + '1', + Argument::any() + )->willReturn($this->attemptCounter->reveal()); + + $this->meter->createHistogram( + 'gfe_latencies', + 'ms', + Argument::any() + )->willReturn($this->gfeHistogram->reveal()); + + $this->meter->createCounter( + 'gfe_connectivity_error_count', + '1', + Argument::any() + )->willReturn($this->gfeErrorCounter->reveal()); + + $this->meter->createHistogram( + 'afe_latencies', + 'ms', + Argument::any() + )->willReturn($this->afeHistogram->reveal()); + + $this->meter->createCounter( + 'afe_connectivity_error_count', + '1', + Argument::any() + )->willReturn($this->afeErrorCounter->reveal()); + + $this->nextHandler = function ($call, $options) { + if (isset($options['metadataCallback'])) { + $options['metadataCallback']([ + 'server-timing' => ['gfet4t7; dur=12.5, afe; dur=8.2'] + ]); + } + return new FulfilledPromise('ok'); + }; + } + + public function testRecordsAttemptMetrics() + { + $projectId = 'test-project'; + $clientId = 'test-client-id'; + $version = '2.5.1'; + $expectedClientName = 'spanner-php/2.5.1'; + $location = 'us-central1'; + + $middleware = new MetricsAttemptMiddleware( + $this->nextHandler, + $this->meter->reveal(), + $clientId, + $projectId, + $version, + $location + ); + + $call = $this->prophesize(Call::class); + $call->getMethod()->willReturn('Commit'); + + // GAX formats this as a URL-encoded string in a header + $options = [ + 'headers' => [ + 'x-goog-request-params' => ['database=projects%2Fp%2Finstances%2Fi%2Fdatabases%2Fd'] + ] + ]; + + // Verify Labels + $expectedLabels = [ + 'method' => 'Commit', + 'status' => 'OK', + 'instance_id' => 'i', + 'database' => 'd', + 'project_id' => $projectId, + 'client_uid' => $clientId, + 'client_name' => $expectedClientName, + 'instance_config' => 'unknown', + 'location' => $location, + 'directpath_enabled' => 'false', + 'directpath_used' => 'true' + ]; + + $this->attemptCounter->add(1, $expectedLabels)->shouldBeCalled(); + $this->attemptHistogram->record(Argument::type('float'), $expectedLabels)->shouldBeCalled(); + + // GFE and AFE metrics + $this->gfeHistogram->record(12.5, $expectedLabels)->shouldBeCalled(); + $this->afeHistogram->record(8.2, $expectedLabels)->shouldBeCalled(); + + $promise = $middleware($call->reveal(), $options); + $promise->wait(); + } + + public function testRecordsGfeMetricsOnStreamingResponse() + { + $callWrapper = $this->prophesize(ServerStreamingCallInterface::class); + $callWrapper->getMetadata()->willReturn(['server-timing' => ['gfet4t7; dur=45.0']]); + + $serverStream = new ServerStream($callWrapper->reveal()); + + $this->nextHandler = function ($call, $options) use ($serverStream) { + return $serverStream; + }; + + $middleware = new MetricsAttemptMiddleware( + $this->nextHandler, + $this->meter->reveal(), + 'client', + 'project', + 'name', + 'global' + ); + + $call = $this->prophesize(Call::class); + $call->getMethod()->willReturn('ExecuteStreamingSql'); + + $options = [ + 'headers' => [ + 'x-goog-request-params' => ['database=projects%2Fp%2Finstances%2Fi%2Fdatabases%2Fd'] + ] + ]; + + // Expect GFE latency recording from the stream metadata immediately + $this->gfeHistogram->record(45.0, Argument::any())->shouldBeCalled(); + $this->attemptCounter->add(1, Argument::any())->shouldNotBeCalled(); + + $response = $middleware($call->reveal(), $options); + $this->assertInstanceOf(ServerStream::class, $response); + } + + public function testRecordsGfeErrorOnMissingHeader() + { + $this->nextHandler = function ($call, $options) { + if (isset($options['metadataCallback'])) { + $options['metadataCallback']([]); // Missing header + } + return new FulfilledPromise('ok'); + }; + + $middleware = new MetricsAttemptMiddleware( + $this->nextHandler, + $this->meter->reveal(), + 'client', + 'project', + 'name', + 'global' + ); + + $call = $this->prophesize(Call::class); + $call->getMethod()->willReturn('Commit'); + + $options = [ + 'headers' => [ + 'x-goog-request-params' => ['database=projects%2Fp%2Finstances%2Fi%2Fdatabases%2Fd'] + ] + ]; + + $this->gfeErrorCounter->add(1, Argument::any())->shouldBeCalled(); + + $promise = $middleware($call->reveal(), $options); + $promise->wait(); + } + + public function testRecordsMetricsOnError() + { + $this->nextHandler = function ($call, $options) { + return new RejectedPromise(new \Exception('fail', 7)); + }; + + $middleware = new MetricsAttemptMiddleware( + $this->nextHandler, + $this->meter->reveal(), + 'client', + 'project', + 'name', + 'global' + ); + + $call = $this->prophesize(Call::class); + $call->getMethod()->willReturn('Commit'); + + $options = [ + 'headers' => [ + 'x-goog-request-params' => ['database=projects%2Fp%2Finstances%2Fi%2Fdatabases%2Fd'] + ] + ]; + + // On error, we expect attempt count/latency AND GFE error count (since headers were missing) + $this->attemptCounter->add(1, Argument::any())->shouldBeCalled(); + $this->attemptHistogram->record(Argument::any(), Argument::any())->shouldBeCalled(); + $this->gfeErrorCounter->add(1, Argument::any())->shouldBeCalled(); + + $promise = $middleware($call->reveal(), $options); + + try { + $promise->wait(); + } catch (\Exception $e) { + $this->assertEquals(7, $e->getCode()); + } + } +} diff --git a/Spanner/tests/Unit/Middleware/BuiltInMetricsOperationMiddlewareTest.php b/Spanner/tests/Unit/Middleware/BuiltInMetricsOperationMiddlewareTest.php new file mode 100644 index 000000000000..853932fb9f9c --- /dev/null +++ b/Spanner/tests/Unit/Middleware/BuiltInMetricsOperationMiddlewareTest.php @@ -0,0 +1,112 @@ +histogram = $this->prophesize(HistogramInterface::class); + $this->counter = $this->prophesize(CounterInterface::class); + $this->meter = $this->prophesize(MeterInterface::class); + + $this->meter->createHistogram( + 'operation_latencies', + 'ms', + Argument::any() + )->willReturn($this->histogram->reveal()); + + $this->meter->createCounter( + 'operation_count', + '1', + Argument::any() + )->willReturn($this->counter->reveal()); + + $this->nextHandler = function ($call, $options) { + return new FulfilledPromise('ok'); + }; + } + + public function testRecordsOperationMetrics() + { + $projectId = 'test-project'; + $clientId = 'test-client-id'; + $version = '2.5.1'; + $expectedClientName = 'spanner-php/2.5.1'; + $location = 'us-central1'; + + $middleware = new MetricsOperationMiddleware( + $this->nextHandler, + $this->meter->reveal(), + $clientId, + $projectId, + $version, + $location + ); + + $call = $this->prophesize(Call::class); + $call->getMethod()->willReturn('ExecuteSql'); + + $options = [ + 'headers' => [ + 'x-goog-request-params' => ['database=projects%2Fp%2Finstances%2Fi%2Fdatabases%2Fd'] + ] + ]; + + // Verify Labels + $expectedLabels = [ + 'method' => 'ExecuteSql', + 'status' => 'OK', + 'instance_id' => 'i', + 'database' => 'd', + 'project_id' => $projectId, + 'client_uid' => $clientId, + 'client_name' => $expectedClientName, + 'instance_config' => 'unknown', + 'location' => $location, + 'directpath_enabled' => 'false', + 'directpath_used' => 'false' + ]; + + $this->counter->add(1, $expectedLabels)->shouldBeCalled(); + $this->histogram->record(Argument::type('float'), $expectedLabels)->shouldBeCalled(); + + $promise = $middleware($call->reveal(), $options); + $promise->wait(); + } +} diff --git a/Spanner/tests/Unit/OpenTelemetry/BuiltInMetricsExporterTest.php b/Spanner/tests/Unit/OpenTelemetry/BuiltInMetricsExporterTest.php new file mode 100644 index 000000000000..ed333a157564 --- /dev/null +++ b/Spanner/tests/Unit/OpenTelemetry/BuiltInMetricsExporterTest.php @@ -0,0 +1,162 @@ +prophesize(MetricServiceClient::class); + $exporter = new MetricsExporter($client->reveal(), self::PROJECT_ID, self::CLIENT_ID, self::DEFAULT_TIMEOUT); + + $reflection = new ReflectionClass(MetricsExporter::class); + $method = $reflection->getMethod('generateClientHash'); + $method->setAccessible(true); + + $result = $method->invoke($exporter, $clientUid); + $this->assertEquals($expected, $result); + } + + public function hashDataProvider() + { + return [ + ['exampleUID', '00006b'], + ['', '000000'], + ['!@#$%^&*()', '000389'], + ['aVeryLongUniqueIdentifierThatExceedsNormalLength', '000125'], + ['1234567890', '00003e'], + ]; + } + + public function testExport() + { + $client = $this->prophesize(MetricServiceClient::class); + $exporter = new MetricsExporter($client->reveal(), self::PROJECT_ID, self::CLIENT_ID, 100); + + $scope = new InstrumentationScope('google-cloud-spanner', '1.0.0', null, Attributes::create([])); + $resource = ResourceInfo::create(Attributes::create(['service.name' => 'spanner'])); + + $attributes = Attributes::create([ + 'method' => 'ExecuteSql', + 'status' => 'OK', + 'instance_id' => 'my-instance', + 'database' => 'my-db' + ]); + + $point = new NumberDataPoint( + 1, + $attributes, + 1711368000000000000, // nanoseconds + 1711368060000000000 + ); + + $sum = new Sum([$point], Temporality::CUMULATIVE, true); + $metric = new OTelMetric($scope, $resource, 'attempt_count', '1', 'desc', $sum); + + $client->createServiceTimeSeries(Argument::that(function ($request) { + if (!$request instanceof CreateTimeSeriesRequest) { + return false; + } + + $projectName = MetricServiceClient::projectName(self::PROJECT_ID); + if ($request->getName() !== $projectName) { + return false; + } + + $timeSeries = $request->getTimeSeries()[0]; + + // Verify Metric Type + $expectedMetric = 'spanner.googleapis.com/internal/client/attempt_count'; + if ($timeSeries->getMetric()->getType() !== $expectedMetric) { + return false; + } + + // Verify Labels + $labels = $timeSeries->getMetric()->getLabels(); + if ($labels['method'] !== 'ExecuteSql' || + $labels['status'] !== 'OK' || + $labels['database'] !== 'my-db') { + return false; + } + + // Verify Resource + $resLabels = $timeSeries->getResource()->getLabels(); + if ($resLabels['instance_id'] !== 'my-instance') { + return false; + } + + // Verify Client Hash + if ($resLabels['client_hash'] !== '000369') { + return false; + } + + return true; + }), Argument::withEntry('timeoutMillis', 100))->shouldBeCalled(); + + $this->assertTrue($exporter->export([$metric])); + } + + public function testExportCustomTimeout() + { + $client = $this->prophesize(MetricServiceClient::class); + $timeout = 500; + $exporter = new MetricsExporter($client->reveal(), self::PROJECT_ID, self::CLIENT_ID, $timeout); + + $scope = new InstrumentationScope('google-cloud-spanner', '1.0.0', null, Attributes::create([])); + $resource = ResourceInfo::create(Attributes::create(['service.name' => 'spanner'])); + + $attributes = Attributes::create([]); + $point = new NumberDataPoint(1, $attributes, 1711368000000000000, 1711368060000000000); + $sum = new Sum([$point], Temporality::CUMULATIVE, true); + $metric = new OTelMetric($scope, $resource, 'attempt_count', '1', 'desc', $sum); + + $client->createServiceTimeSeries( + Argument::type(CreateTimeSeriesRequest::class), + Argument::withEntry('timeoutMillis', $timeout) + )->shouldBeCalled(); + + $exporter->export([$metric]); + } +} diff --git a/Spanner/tests/Unit/SpannerClientTest.php b/Spanner/tests/Unit/SpannerClientTest.php index 384d894dc378..75dcbec33665 100644 --- a/Spanner/tests/Unit/SpannerClientTest.php +++ b/Spanner/tests/Unit/SpannerClientTest.php @@ -869,4 +869,35 @@ public function testConfigureKeepAlive() $newConfig['transportConfig']['grpc']['stubOpts']['grpc.keepalive_time_ms'] ); } + + public function testBuiltinMetricsDisabledByDefault() + { + $gapicSpannerClient = $this->prophesize(GapicSpannerClient::class); + $gapicSpannerClient->prependMiddleware(Argument::any()) + ->shouldBeCalledTimes(1); + $gapicSpannerClient->addMiddleware(Argument::any()) + ->shouldBeCalledTimes(1); + + new SpannerClient([ + 'projectId' => self::PROJECT, + 'credentials' => Fixtures::KEYFILE_STUB_FIXTURE(), + 'gapicSpannerClient' => $gapicSpannerClient->reveal(), + ]); + } + + public function testBuiltinMetricsCanBeEnabled() + { + $gapicSpannerClient = $this->prophesize(GapicSpannerClient::class); + $gapicSpannerClient->prependMiddleware(Argument::any()) + ->shouldBeCalledTimes(2); + $gapicSpannerClient->addMiddleware(Argument::any()) + ->shouldBeCalledTimes(2); + + new SpannerClient([ + 'projectId' => self::PROJECT, + 'credentials' => Fixtures::KEYFILE_STUB_FIXTURE(), + 'gapicSpannerClient' => $gapicSpannerClient->reveal(), + 'enableBuiltInMetrics' => true, + ]); + } } diff --git a/Spanner/tests/Unit/bootstrap.php b/Spanner/tests/Unit/bootstrap.php index f16f16a2c9c5..7973127ece79 100644 --- a/Spanner/tests/Unit/bootstrap.php +++ b/Spanner/tests/Unit/bootstrap.php @@ -7,6 +7,7 @@ '*/src/Admin/Database/V1/Client/*', '*/src/Admin/Instance/V1/Client/*', '*/src/V1/Client/*', + '*/vendor/google/cloud-monitoring/src/V3/Client/*' ]); BypassFinals::enable(); diff --git a/composer.json b/composer.json index a5fde45b6bb9..69db448ad20b 100644 --- a/composer.json +++ b/composer.json @@ -65,7 +65,8 @@ "ramsey/uuid": "^4.0", "google/common-protos": "^4.4", "google/gax": "^1.40.0", - "google/auth": "^1.42" + "google/auth": "^1.42", + "open-telemetry/sdk": "^1.13" }, "require-dev": { "phpunit/phpunit": "^9.6",