|
7 | 7 | from typing import Tuple |
8 | 8 | from typing import Union |
9 | 9 |
|
| 10 | +from prometheus_client import Gauge |
10 | 11 | from sqlalchemy import create_engine |
11 | 12 | from sqlalchemy import event |
12 | 13 | from sqlalchemy.engine import Connection |
@@ -125,7 +126,7 @@ def parse( |
125 | 126 | engine = engine_from_config( |
126 | 127 | raw_config, secrets=self.secrets, prefix=f"{key_path}.", **self.kwargs |
127 | 128 | ) |
128 | | - return SQLAlchemySessionContextFactory(engine) |
| 129 | + return SQLAlchemySessionContextFactory(engine, key_path) |
129 | 130 |
|
130 | 131 |
|
131 | 132 | Parameters = Optional[Union[Dict[str, Any], Sequence[Any]]] |
@@ -155,12 +156,46 @@ class SQLAlchemyEngineContextFactory(ContextFactory): |
155 | 156 |
|
156 | 157 | """ |
157 | 158 |
|
158 | | - def __init__(self, engine: Engine): |
| 159 | + PROM_PREFIX = "bp_sqlalchemy_pool" |
| 160 | + PROM_LABELS = ["pool"] |
| 161 | + |
| 162 | + max_connections_gauge = Gauge( |
| 163 | + f"{PROM_PREFIX}_max_size", |
| 164 | + "Maximum number of connections allowed in this pool", |
| 165 | + PROM_LABELS, |
| 166 | + ) |
| 167 | + |
| 168 | + checked_in_connections_gauge = Gauge( |
| 169 | + f"{PROM_PREFIX}_idle_connections", |
| 170 | + "Number of available, checked in, connections in this pool", |
| 171 | + PROM_LABELS, |
| 172 | + ) |
| 173 | + |
| 174 | + checked_out_connections_gauge = Gauge( |
| 175 | + f"{PROM_PREFIX}_active_connections", |
| 176 | + "Number of connections in use, or checked out, in this pool", |
| 177 | + PROM_LABELS, |
| 178 | + ) |
| 179 | + |
| 180 | + overflow_connections_gauge = Gauge( |
| 181 | + f"{PROM_PREFIX}_overflow_connections", |
| 182 | + "Number of connections over the desired size of this pool", |
| 183 | + PROM_LABELS, |
| 184 | + ) |
| 185 | + |
| 186 | + def __init__(self, engine: Engine, name: str = "sqlalchemy"): |
159 | 187 | self.engine = engine.execution_options() |
160 | 188 | event.listen(self.engine, "before_cursor_execute", self.on_before_execute, retval=True) |
161 | 189 | event.listen(self.engine, "after_cursor_execute", self.on_after_execute) |
162 | 190 | event.listen(self.engine, "handle_error", self.on_error) |
163 | 191 |
|
| 192 | + pool = self.engine.pool |
| 193 | + if isinstance(pool, QueuePool): |
| 194 | + self.max_connections_gauge.labels(name).set_function(pool.size) |
| 195 | + self.checked_in_connections_gauge.labels(name).set_function(pool.checkedin) |
| 196 | + self.checked_out_connections_gauge.labels(name).set_function(pool.checkedout) |
| 197 | + self.overflow_connections_gauge.labels(name).set_function(pool.overflow) |
| 198 | + |
164 | 199 | def report_runtime_metrics(self, batch: metrics.Client) -> None: |
165 | 200 | pool = self.engine.pool |
166 | 201 | if not isinstance(pool, QueuePool): |
|
0 commit comments