diff --git a/composer.json b/composer.json index 3d97c12d..640a2bf7 100644 --- a/composer.json +++ b/composer.json @@ -12,7 +12,8 @@ ], "require": { "php": ">=5.6.3", - "guzzlehttp/guzzle": "^6.2" + "guzzlehttp/guzzle": "^6.2", + "predis/predis": "v1.1" }, "require-dev": { "phpunit/phpunit": "4.1.0" diff --git a/examples/flush_adapter.php b/examples/flush_adapter.php index 0c3862d5..4fe370d2 100644 --- a/examples/flush_adapter.php +++ b/examples/flush_adapter.php @@ -1,6 +1,9 @@ flushMemory(); -} \ No newline at end of file +} elseif ($adapter === 'redis-cluster') { + $instanceID = !empty($_SERVER['SERVER_ADDR']) ? $_SERVER['SERVER_ADDR'] : '127.0.0.1'; + RedisCluster::setDefaultOptions(array( + 'redis_list' => ['tcp://127.0.0.1:7001', 'tcp://127.0.0.1:7002', 'tcp://127.0.0.1:7003'], + 'cluster' => 'redis', + 'password' => null, + 'timeout' => 0.1, + 'read_timeout' => 10, + 'persistent' => false + )); + RedisCluster::setPrefix('TEST_PROMETHEUS:' . $instanceID); + RedisCluster::setHashTag('TEST_PROMETHEUS'); + $redisClusterAdapter = new Prometheus\Storage\RedisCluster(); + $redisClusterAdapter->flushRedisCluster(); +} diff --git a/examples/metrics.php b/examples/metrics.php index fa89247f..444a66c7 100644 --- a/examples/metrics.php +++ b/examples/metrics.php @@ -5,6 +5,7 @@ use Prometheus\CollectorRegistry; use Prometheus\RenderTextFormat; use Prometheus\Storage\Redis; +use Prometheus\Storage\RedisCluster; $adapter = $_GET['adapter']; @@ -15,6 +16,19 @@ $adapter = new Prometheus\Storage\APC(); } elseif ($adapter === 'in-memory') { $adapter = new Prometheus\Storage\InMemory(); +} elseif ($adapter === 'redis-cluster') { + $instanceID = !empty($_SERVER['SERVER_ADDR']) ? $_SERVER['SERVER_ADDR'] : '127.0.0.1'; + RedisCluster::setDefaultOptions(array( + 'redis_list' => ['tcp://127.0.0.1:7001', 'tcp://127.0.0.1:7002', 'tcp://127.0.0.1:7003'], + 'cluster' => 'redis', + 'password' => null, + 'timeout' => 0.1, + 'read_timeout' => 10, + 'persistent' => false + )); + RedisCluster::setPrefix('TEST_PROMETHEUS:' . $instanceID); + RedisCluster::setHashTag('TEST_PROMETHEUS'); + $adapter = new Prometheus\Storage\RedisCluster(); } $registry = new CollectorRegistry($adapter); $renderer = new RenderTextFormat(); diff --git a/examples/pushgateway.php b/examples/pushgateway.php index 984035d5..dc700987 100644 --- a/examples/pushgateway.php +++ b/examples/pushgateway.php @@ -3,6 +3,7 @@ use Prometheus\Storage\Redis; use Prometheus\CollectorRegistry; +use Prometheus\Storage\RedisCluster; $adapter = $_GET['adapter']; @@ -13,6 +14,19 @@ $adapter = new Prometheus\Storage\APC(); } elseif ($adapter === 'in-memory') { $adapter = new Prometheus\Storage\InMemory(); +} elseif ($adapter === 'redis-cluster') { + $instanceID = !empty($_SERVER['SERVER_ADDR']) ? $_SERVER['SERVER_ADDR'] : '127.0.0.1'; + RedisCluster::setDefaultOptions(array( + 'redis_list' => ['tcp://127.0.0.1:7001', 'tcp://127.0.0.1:7002', 'tcp://127.0.0.1:7003'], + 'cluster' => 'redis', + 'password' => null, + 'timeout' => 0.1, + 'read_timeout' => 10, + 'persistent' => false + )); + RedisCluster::setPrefix('TEST_PROMETHEUS:' . $instanceID); + RedisCluster::setHashTag('TEST_PROMETHEUS'); + $adapter = new Prometheus\Storage\RedisCluster(); } $registry = new CollectorRegistry($adapter); @@ -21,4 +35,4 @@ $counter->incBy(6, ['blue']); $pushGateway = new \Prometheus\PushGateway('192.168.59.100:9091'); -$pushGateway->push($registry, 'my_job', array('instance'=>'foo')); +$pushGateway->push($registry, 'my_job', array('instance' => 'foo')); diff --git a/examples/some_counter.php b/examples/some_counter.php index 823bfac6..06eb410e 100644 --- a/examples/some_counter.php +++ b/examples/some_counter.php @@ -4,6 +4,7 @@ use Prometheus\CollectorRegistry; use Prometheus\Storage\Redis; +use Prometheus\Storage\RedisCluster; $adapter = $_GET['adapter']; @@ -14,6 +15,19 @@ $adapter = new Prometheus\Storage\APC(); } elseif ($adapter === 'in-memory') { $adapter = new Prometheus\Storage\InMemory(); +} elseif ($adapter === 'redis-cluster') { + $instanceID = !empty($_SERVER['SERVER_ADDR']) ? $_SERVER['SERVER_ADDR'] : '127.0.0.1'; + RedisCluster::setDefaultOptions(array( + 'redis_list' => ['tcp://127.0.0.1:7001', 'tcp://127.0.0.1:7002', 'tcp://127.0.0.1:7003'], + 'cluster' => 'redis', + 'password' => null, + 'timeout' => 0.1, + 'read_timeout' => 10, + 'persistent' => false + )); + RedisCluster::setPrefix('TEST_PROMETHEUS:' . $instanceID); + RedisCluster::setHashTag('TEST_PROMETHEUS'); + $adapter = new Prometheus\Storage\RedisCluster(); } $registry = new CollectorRegistry($adapter); diff --git a/examples/some_gauge.php b/examples/some_gauge.php index b3e2382a..a6067189 100644 --- a/examples/some_gauge.php +++ b/examples/some_gauge.php @@ -4,9 +4,10 @@ use Prometheus\CollectorRegistry; use Prometheus\Storage\Redis; +use Prometheus\Storage\RedisCluster; -error_log('c='. $_GET['c']); +error_log('c=' . $_GET['c']); $adapter = $_GET['adapter']; @@ -17,6 +18,19 @@ $adapter = new Prometheus\Storage\APC(); } elseif ($adapter === 'in-memory') { $adapter = new Prometheus\Storage\InMemory(); +} elseif ($adapter === 'redis-cluster') { + $instanceID = !empty($_SERVER['SERVER_ADDR']) ? $_SERVER['SERVER_ADDR'] : '127.0.0.1'; + RedisCluster::setDefaultOptions(array( + 'redis_list' => ['tcp://127.0.0.1:7001', 'tcp://127.0.0.1:7002', 'tcp://127.0.0.1:7003'], + 'cluster' => 'redis', + 'password' => null, + 'timeout' => 0.1, + 'read_timeout' => 10, + 'persistent' => false + )); + RedisCluster::setPrefix('TEST_PROMETHEUS:' . $instanceID); + RedisCluster::setHashTag('TEST_PROMETHEUS'); + $adapter = new Prometheus\Storage\RedisCluster(); } $registry = new CollectorRegistry($adapter); diff --git a/examples/some_histogram.php b/examples/some_histogram.php index 6b34809f..c368e80c 100644 --- a/examples/some_histogram.php +++ b/examples/some_histogram.php @@ -4,8 +4,9 @@ use Prometheus\CollectorRegistry; use Prometheus\Storage\Redis; +use Prometheus\Storage\RedisCluster; -error_log('c='. $_GET['c']); +error_log('c=' . $_GET['c']); $adapter = $_GET['adapter']; @@ -16,6 +17,19 @@ $adapter = new Prometheus\Storage\APC(); } elseif ($adapter === 'in-memory') { $adapter = new Prometheus\Storage\InMemory(); +} elseif ($adapter === 'redis-cluster') { + $instanceID = !empty($_SERVER['SERVER_ADDR']) ? $_SERVER['SERVER_ADDR'] : '127.0.0.1'; + RedisCluster::setDefaultOptions(array( + 'redis_list' => ['tcp://127.0.0.1:7001', 'tcp://127.0.0.1:7002', 'tcp://127.0.0.1:7003'], + 'cluster' => 'redis', + 'password' => null, + 'timeout' => 0.1, + 'read_timeout' => 10, + 'persistent' => false + )); + RedisCluster::setPrefix('TEST_PROMETHEUS:' . $instanceID); + RedisCluster::setHashTag('TEST_PROMETHEUS'); + $adapter = new Prometheus\Storage\RedisCluster(); } $registry = new CollectorRegistry($adapter); diff --git a/src/Prometheus/Storage/RedisCluster.php b/src/Prometheus/Storage/RedisCluster.php new file mode 100644 index 00000000..9ea8f2ba --- /dev/null +++ b/src/Prometheus/Storage/RedisCluster.php @@ -0,0 +1,402 @@ +options = array_merge(self::$defaultOptions, $options); + + $this->client = new \Predis\Client( + $this->options['redis_list'], + [ + 'cluster' => $this->options['cluster'], + 'parameters' => [ + 'password' => $this->options['password'], + 'timeout' => $this->options['timeout'], + 'read_write_timeout' => $this->options['read_write_timeout'], + 'persistent' => $this->options['persistent'], + ] + ] + ); + } + + /** + * @param array $options + */ + public static function setDefaultOptions(array $options) + { + self::$defaultOptions = array_merge(self::$defaultOptions, $options); + } + + public static function setPrefix($prefix) + { + self::$prefix = $prefix; + } + + /** + * set hash tag + * @param string $hashTag hash tag content (not contain {}) + */ + public static function setHashTag($hashTag) + { + self::$hashTag = $hashTag; + } + + + public function flushRedisCluster() + { + $deletedCount = 0; + + $metricKeysKeys = [ + $this->addHashTag($this->toMetricKeyKey(Histogram::TYPE)), + $this->addHashTag($this->toMetricKeyKey(Gauge::TYPE)), + $this->addHashTag($this->toMetricKeyKey(Counter::TYPE)), + ]; + + foreach ($metricKeysKeys as $metricKeysKey) { + $metricKeys = $this->client->smembers($metricKeysKey); + + if (!empty($metricKeys)) { + $count = $this->client->del($metricKeys); + $deletedCount += $count; + + $this->client->del([$metricKeysKey]); + } + } + + return $deletedCount; + } + + /** + * @return MetricFamilySamples[] + * @throws StorageException + */ + public function collect() + { + $metrics = $this->collectHistograms(); + $metrics = array_merge($metrics, $this->collectGauges()); + $metrics = array_merge($metrics, $this->collectCounters()); + return array_map( + function (array $metric) { + return new MetricFamilySamples($metric); + }, + $metrics + ); + } + + public function updateHistogram(array $data) + { + $bucketToIncrease = '+Inf'; + foreach ($data['buckets'] as $bucket) { + if ($data['value'] <= $bucket) { + $bucketToIncrease = $bucket; + break; + } + } + $metaData = $data; + unset($metaData['value']); + unset($metaData['labelValues']); + + // add hash tag, ensure all keys in the same slot + $metricKey = $this->addHashTag($this->toMetricKey($data)); + $metricKeysKey = $this->addHashTag($this->toMetricKeyKey(Histogram::TYPE)); + + $sumKey = json_encode(array('b' => 'sum', 'labelValues' => $data['labelValues'])); + $bucketKey = json_encode(array('b' => $bucketToIncrease, 'labelValues' => $data['labelValues'])); + + // use pipeline to execute all commands + $pipe = $this->client->pipeline(); + + // add sum + $pipe->hincrbyfloat($metricKey, $sumKey, (float)$data['value']); + + // add bucket count + $pipe->hincrby($metricKey, $bucketKey, 1); + + // set meta key (hsetnx: only when it does not exist) + $pipe->hsetnx($metricKey, '__meta', json_encode($metaData)); + + // add to set + $pipe->sadd($metricKeysKey, $metricKey); + + // execute + $pipe->execute(); + } + + public function updateGauge(array $data) + { + $metaData = $data; + unset($metaData['value']); + unset($metaData['labelValues']); + unset($metaData['command']); + + $metricKey = $this->addHashTag($this->toMetricKey($data)); + $metricKeysKey = $this->addHashTag($this->toMetricKeyKey(Gauge::TYPE)); + $labelKey = json_encode($data['labelValues']); + + // use pipeline to execute all commands + $pipe = $this->client->pipeline(); + + // get redis command + switch ($data['command']) { + case Adapter::COMMAND_INCREMENT_INTEGER: + $pipe->hincrby($metricKey, $labelKey, (int)$data['value']); + break; + case Adapter::COMMAND_INCREMENT_FLOAT: + $pipe->hincrbyfloat($metricKey, $labelKey, (float)$data['value']); + break; + case Adapter::COMMAND_SET: + $pipe->hset($metricKey, $labelKey, $data['value']); + break; + } + + // set meta key (hsetnx: only when it does not exist) + $pipe->hsetnx($metricKey, '__meta', json_encode($metaData)); + + // set meta key (hsetnx: only when it does not exist) + $pipe->sadd($metricKeysKey, $metricKey); + + $pipe->execute(); + } + + public function updateCounter(array $data) + { + $metaData = $data; + unset($metaData['value']); + unset($metaData['labelValues']); + unset($metaData['command']); + + $metricKey = $this->addHashTag($this->toMetricKey($data)); + $metricKeysKey = $this->addHashTag($this->toMetricKeyKey(Counter::TYPE)); + $labelKey = json_encode($data['labelValues']); + + // use pipeline to execute all commands + $pipe = $this->client->pipeline(); + + // get redis command + switch ($data['command']) { + case Adapter::COMMAND_INCREMENT_INTEGER: + $pipe->hincrby($metricKey, $labelKey, (int)$data['value']); + break; + case Adapter::COMMAND_INCREMENT_FLOAT: + $pipe->hincrbyfloat($metricKey, $labelKey, (float)$data['value']); + break; + } + + // set meta key (hsetnx: only when it does not exist) + $pipe->hsetnx($metricKey, '__meta', json_encode($metaData)); + + // set meta key (hsetnx: only when it does not exist) + $pipe->sadd($metricKeysKey, $metricKey); + + $results = $pipe->execute(); + + return isset($results[0]) ? $results[0] : 0; + } + + private function collectHistograms() + { + $metricKeysKey = $this->addHashTag($this->toMetricKeyKey(Histogram::TYPE)); + $keys = $this->client->sMembers($metricKeysKey); + + sort($keys); + $histograms = array(); + foreach ($keys as $key) { + $raw = $this->client->hGetAll($key); + $histogram = json_decode($raw['__meta'], true); + unset($raw['__meta']); + $histogram['samples'] = array(); + + // Add the Inf bucket so we can compute it later on + $histogram['buckets'][] = '+Inf'; + + $allLabelValues = array(); + foreach (array_keys($raw) as $k) { + $d = json_decode($k, true); + if ($d['b'] == 'sum') { + continue; + } + $allLabelValues[] = $d['labelValues']; + } + + // We need set semantics. + // This is the equivalent of array_unique but for arrays of arrays. + $allLabelValues = array_map("unserialize", array_unique(array_map("serialize", $allLabelValues))); + sort($allLabelValues); + + foreach ($allLabelValues as $labelValues) { + // Fill up all buckets. + // If the bucket doesn't exist fill in values from + // the previous one. + $acc = 0; + foreach ($histogram['buckets'] as $bucket) { + $bucketKey = json_encode(array('b' => $bucket, 'labelValues' => $labelValues)); + if (!isset($raw[$bucketKey])) { + $histogram['samples'][] = array( + 'name' => $histogram['name'] . '_bucket', + 'labelNames' => array('le'), + 'labelValues' => array_merge($labelValues, array($bucket)), + 'value' => $acc + ); + } else { + $acc += $raw[$bucketKey]; + $histogram['samples'][] = array( + 'name' => $histogram['name'] . '_bucket', + 'labelNames' => array('le'), + 'labelValues' => array_merge($labelValues, array($bucket)), + 'value' => $acc + ); + } + } + + // Add the count + $histogram['samples'][] = array( + 'name' => $histogram['name'] . '_count', + 'labelNames' => array(), + 'labelValues' => $labelValues, + 'value' => $acc + ); + + // Add the sum + $histogram['samples'][] = array( + 'name' => $histogram['name'] . '_sum', + 'labelNames' => array(), + 'labelValues' => $labelValues, + 'value' => $raw[json_encode(array('b' => 'sum', 'labelValues' => $labelValues))] + ); + } + $histograms[] = $histogram; + } + return $histograms; + } + + private function collectGauges() + { + $metricKeysKey = $this->addHashTag($this->toMetricKeyKey(Gauge::TYPE)); + $keys = $this->client->sMembers($metricKeysKey); + + sort($keys); + $gauges = array(); + foreach ($keys as $key) { + $raw = $this->client->hGetAll($key); + $gauge = json_decode($raw['__meta'], true); + unset($raw['__meta']); + $gauge['samples'] = array(); + foreach ($raw as $k => $value) { + $gauge['samples'][] = array( + 'name' => $gauge['name'], + 'labelNames' => array(), + 'labelValues' => json_decode($k, true), + 'value' => $value + ); + } + usort($gauge['samples'], function ($a, $b) { + return strcmp(implode("", $a['labelValues']), implode("", $b['labelValues'])); + }); + $gauges[] = $gauge; + } + return $gauges; + } + + private function collectCounters() + { + $metricKeysKey = $this->addHashTag($this->toMetricKeyKey(Counter::TYPE)); + $keys = $this->client->sMembers($metricKeysKey); + + sort($keys); + $counters = array(); + foreach ($keys as $key) { + $raw = $this->client->hGetAll($key); + $counter = json_decode($raw['__meta'], true); + unset($raw['__meta']); + $counter['samples'] = array(); + foreach ($raw as $k => $value) { + $counter['samples'][] = array( + 'name' => $counter['name'], + 'labelNames' => array(), + 'labelValues' => json_decode($k, true), + 'value' => $value + ); + } + usort($counter['samples'], function ($a, $b) { + return strcmp(implode("", $a['labelValues']), implode("", $b['labelValues'])); + }); + $counters[] = $counter; + } + return $counters; + } + + /** + * @param array $data + * @return string + */ + private function toMetricKey(array $data) + { + return implode(':', array(self::$prefix, $data['type'], $data['name'])); + } + + private function toMetricKeyKey($metricsType) + { + return self::$prefix . ':' . $metricsType . self::PROMETHEUS_METRIC_KEYS_SUFFIX; + } + + /** + * add hash tag, ensure all keys in the same slot + * @param $key + */ + private function addHashTag($key) + { + // 如果key已经包含哈希标签,直接返回 + if (preg_match('/\{[^}]+\}/', $key)) { + return $key; + } + + $hashTagContent = self::$hashTag ?: 'PROMETHEUS'; + + if (strpos($key, $hashTagContent) !== false) { + return str_replace($hashTagContent, '{' . $hashTagContent . '}', $key); + } + + return '{' . $hashTagContent . '}' . $key; + } +}