Skip to content

Commit 809bc32

Browse files
authored
better worker logs (#307)
1 parent ddb5c0e commit 809bc32

11 files changed

+146
-87
lines changed

backend/deepchecks_monitoring/bgtasks/alert_task.py

Lines changed: 27 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121

2222
from deepchecks_monitoring.api.v1.alert import AlertCreationSchema
2323
from deepchecks_monitoring.logic.check_logic import SingleCheckRunOptions, reduce_check_window, run_check_window
24-
from deepchecks_monitoring.monitoring_utils import DataFilterList, make_oparator_func
24+
from deepchecks_monitoring.monitoring_utils import DataFilterList, configure_logger, make_oparator_func
2525
from deepchecks_monitoring.public_models import Organization
2626
from deepchecks_monitoring.public_models.task import BackgroundWorker, Task
2727
from deepchecks_monitoring.resources import ResourcesProvider
@@ -32,28 +32,31 @@
3232
from deepchecks_monitoring.utils import database
3333
from deepchecks_monitoring.utils.mixpanel import AlertTriggeredEvent
3434

35-
__all__ = ["AlertsTask"]
35+
__all__ = ['AlertsTask']
3636

3737

3838
class AlertsTask(BackgroundWorker):
3939
"""Worker to calculate alerts"""
4040

4141
def __init__(self):
4242
super().__init__()
43-
self._logger = logging.getLogger(__name__)
43+
self.logger = configure_logger(self.__class__.__name__)
4444

4545
@classmethod
4646
def queue_name(cls) -> str:
47-
return "alerts"
47+
return 'alerts'
4848

4949
@classmethod
5050
def delay_seconds(cls) -> int:
5151
return 0
5252

5353
async def run(self, task: Task, session: AsyncSession, resources_provider: ResourcesProvider, lock):
54-
organization_id = task.params["organization_id"]
55-
monitor_id = task.params["monitor_id"]
56-
timestamp = task.params["timestamp"]
54+
organization_id = task.params['organization_id']
55+
monitor_id = task.params['monitor_id']
56+
timestamp = task.params['timestamp']
57+
58+
self.logger.info({'message': 'starting job', 'worker name': str(type(self)),
59+
'task': task.id, 'monitor_id': monitor_id, 'org_id': organization_id})
5760

5861
organization_schema = (await session.execute(
5962
select(Organization.schema_name).where(Organization.id == organization_id)
@@ -63,14 +66,14 @@ async def run(self, task: Task, session: AsyncSession, resources_provider: Resou
6366
if organization_schema is not None:
6467
await database.attach_schema_switcher_listener(
6568
session=session,
66-
schema_search_path=[organization_schema, "public"]
69+
schema_search_path=[organization_schema, 'public']
6770
)
6871
alerts = await execute_monitor(
6972
session=session,
7073
resources_provider=resources_provider,
7174
monitor_id=monitor_id,
7275
timestamp=timestamp,
73-
logger=self._logger,
76+
logger=self.logger,
7477
organization_id=organization_id,
7578
)
7679
else:
@@ -87,7 +90,7 @@ async def run(self, task: Task, session: AsyncSession, resources_provider: Resou
8790
organization_id=organization_id,
8891
session=session,
8992
resources_provider=resources_provider,
90-
logger=self._logger.getChild("alert-notificator")
93+
logger=self.logger.getChild('alert-notificator')
9194
)
9295
await resources_provider.report_mixpanel_event(
9396
AlertTriggeredEvent.create_event,
@@ -96,6 +99,9 @@ async def run(self, task: Task, session: AsyncSession, resources_provider: Resou
9699
)
97100
await notificator.notify()
98101

102+
self.logger.info({'message': 'finished job', 'worker name': str(type(self)),
103+
'task': task.id, 'monitor_id': monitor_id, 'org_id': organization_id})
104+
99105

100106
async def execute_monitor(
101107
monitor_id: int,
@@ -106,8 +112,8 @@ async def execute_monitor(
106112
logger: t.Optional[logging.Logger] = None,
107113
) -> t.List[Alert]:
108114
"""Execute monitor alert rules."""
109-
logger = logger or logging.getLogger("monitor-executor")
110-
logger.info("Execution of Monitor(id:%s) for timestamp %s", monitor_id, timestamp)
115+
logger = logger or logging.getLogger('monitor-executor')
116+
logger.info('Execution of Monitor(id:%s) for timestamp %s', monitor_id, timestamp)
111117

112118
monitor = t.cast(Monitor, await session.scalar(
113119
sa.select(Monitor)
@@ -119,13 +125,13 @@ async def execute_monitor(
119125
))
120126

121127
if monitor is None:
122-
raise ValueError(f"Did not find monitor with the id {monitor_id}")
128+
raise ValueError(f'Did not find monitor with the id {monitor_id}')
123129

124130
check = monitor.check
125131
alert_rules = monitor.alert_rules
126132

127133
if len(alert_rules) == 0:
128-
logger.info("Monitor(id:%s) does not have alert rules", monitor_id)
134+
logger.info('Monitor(id:%s) does not have alert rules', monitor_id)
129135
return []
130136

131137
monitor_frequency = t.cast(Frequency, monitor.frequency).to_pendulum_duration()
@@ -141,7 +147,7 @@ async def execute_monitor(
141147
)).all())
142148

143149
if not model_versions:
144-
logger.info("Model(id:%s) is empty (does not have versions)", check.model_id)
150+
logger.info('Model(id:%s) is empty (does not have versions)', check.model_id)
145151
return []
146152

147153
# First looking for results in cache if already calculated
@@ -154,7 +160,7 @@ async def execute_monitor(
154160
cache_results[model_version] = cache_result.value
155161
else:
156162
model_versions_without_cache.append(model_version)
157-
logger.debug("Cache result: %s", cache_results)
163+
logger.debug('Cache result: %s', cache_results)
158164

159165
# For model versions without result in cache running calculation
160166
if model_versions_without_cache:
@@ -178,7 +184,7 @@ async def execute_monitor(
178184
resources_provider.cache_functions.set_monitor_cache(
179185
organization_id, version.id, monitor_id, start_time, end_time, result)
180186

181-
logger.debug("Check execution result: %s", result_per_version)
187+
logger.debug('Check execution result: %s', result_per_version)
182188
else:
183189
result_per_version = {}
184190

@@ -192,20 +198,20 @@ async def execute_monitor(
192198
await session.execute(update(AlertRule).where(AlertRule.id == alert_rule.id)
193199
.values({AlertRule.start_time: func.least(AlertRule.start_time, end_time)}))
194200
if not alert_rule.is_active:
195-
logger.info("AlertRule(id:%s) is not active, skipping it", alert_rule.id)
201+
logger.info('AlertRule(id:%s) is not active, skipping it', alert_rule.id)
196202
elif alert := assert_check_results(alert_rule, check_results):
197203
alert.start_time = start_time
198204
alert.end_time = end_time
199205
AlertCreationSchema.validate(alert)
200206
session.add(alert)
201-
logger.info("Alert(id:%s) instance created for monitor(id:%s)", alert.id, monitor.id)
207+
logger.info('Alert(id:%s) instance created for monitor(id:%s)', alert.id, monitor.id)
202208
alerts.append(alert)
203209

204210
if (n_of_alerts := len(alerts)) > 0:
205-
logger.info("%s alerts raised for Monitor(id:%s)", n_of_alerts, monitor.id)
211+
logger.info('%s alerts raised for Monitor(id:%s)', n_of_alerts, monitor.id)
206212
return alerts
207213

208-
logger.info("No alerts were raised for Monitor(id:%s)", monitor.id)
214+
logger.info('No alerts were raised for Monitor(id:%s)', monitor.id)
209215
return []
210216

211217

backend/deepchecks_monitoring/bgtasks/delete_db_table_task.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
from sqlalchemy.dialects.postgresql import insert
1616
from sqlalchemy.ext.asyncio import AsyncSession
1717

18+
from deepchecks_monitoring.monitoring_utils import configure_logger
1819
from deepchecks_monitoring.public_models.task import UNIQUE_NAME_TASK_CONSTRAINT, BackgroundWorker, Task
1920

2021
__all__ = ['DeleteDbTableTask', 'insert_delete_db_table_task']
@@ -23,9 +24,11 @@
2324

2425

2526
class DeleteDbTableTask(BackgroundWorker):
26-
"""Worker to delete any database tables.
27+
"""Worker to delete any database tables."""
2728

28-
"""
29+
def __init__(self):
30+
super().__init__()
31+
self.logger = configure_logger(self.__class__.__name__)
2932

3033
@classmethod
3134
def queue_name(cls) -> str:
@@ -36,10 +39,12 @@ def delay_seconds(cls) -> int:
3639
return 0
3740

3841
async def run(self, task: 'Task', session: AsyncSession, resources_provider, lock):
42+
self.logger.info({'message': 'started job', 'worker name': str(type(self))})
3943
for table in task.params['full_table_paths']:
4044
await session.execute(text(f'DROP TABLE IF EXISTS {table}'))
4145
# Deleting the task
4246
await session.execute(delete(Task).where(Task.id == task.id))
47+
self.logger.info({'message': 'finished job', 'worker name': str(type(self))})
4348

4449

4550
async def insert_delete_db_table_task(session: AsyncSession, full_table_paths: t.List[str]):

backend/deepchecks_monitoring/bgtasks/mixpanel_system_state_event.py

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,21 +14,26 @@
1414
from sqlalchemy.dialects.postgresql import insert as pginsert
1515
from sqlalchemy.ext.asyncio import AsyncSession
1616

17+
from deepchecks_monitoring.monitoring_utils import configure_logger
1718
from deepchecks_monitoring.public_models.organization import Organization
1819
from deepchecks_monitoring.public_models.task import UNIQUE_NAME_TASK_CONSTRAINT, BackgroundWorker, Task
1920
from deepchecks_monitoring.resources import ResourcesProvider
2021
from deepchecks_monitoring.utils import database, mixpanel
2122

22-
__all__ = ["MixpanelSystemStateEvent"]
23+
__all__ = ['MixpanelSystemStateEvent']
2324

2425

25-
QUEUE_NAME = "mixpanel system state event"
26+
QUEUE_NAME = 'mixpanel system state event'
2627
DELAY = 0
2728

2829

2930
class MixpanelSystemStateEvent(BackgroundWorker):
3031
"""Worker that sends a system state event to the mixpanel."""
3132

33+
def __init__(self):
34+
super().__init__()
35+
self.logger = configure_logger(self.__class__.__name__)
36+
3237
@classmethod
3338
def queue_name(cls) -> str:
3439
"""REturn queue name."""
@@ -46,38 +51,40 @@ def retry_seconds(cls) -> int:
4651

4752
async def run(
4853
self,
49-
task: "Task",
54+
task: 'Task',
5055
session: AsyncSession,
5156
resources_provider: ResourcesProvider,
5257
lock: Lock
5358
):
5459
"""Run task."""
60+
5561
if not resources_provider.is_analytics_enabled:
5662
return
5763
if not resources_provider.settings.is_on_prem or resources_provider.settings.is_cloud:
5864
return
59-
65+
self.logger.info({'message': 'started job', 'worker name': str(type(self))})
6066
organizations = (await session.scalars(
6167
sa.select(Organization))
6268
).all()
6369

6470
for org in organizations:
6571
async with database.attach_schema_switcher(
6672
session=session,
67-
schema_search_path=[org.schema_name, "public"]
73+
schema_search_path=[org.schema_name, 'public']
6874
):
6975
await resources_provider.report_mixpanel_event(
7076
mixpanel.HealthcheckEvent.create_event,
7177
organization=org
7278
)
79+
self.logger.info({'message': 'finished job', 'worker name': str(type(self))})
7380

7481
@classmethod
7582
async def enqueue_task(cls, session: AsyncSession):
7683
"""Enqueue task."""
7784
values = {
78-
"name": "system-state",
79-
"bg_worker_task": cls.queue_name(),
80-
"params": {}
85+
'name': 'system-state',
86+
'bg_worker_task': cls.queue_name(),
87+
'params': {}
8188
}
8289
await session.execute(
8390
pginsert(Task)

backend/deepchecks_monitoring/bgtasks/model_data_ingestion_alerter.py

Lines changed: 31 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
from sqlalchemy.ext.asyncio import AsyncSession
1515
from sqlalchemy.orm import joinedload
1616

17-
from deepchecks_monitoring.monitoring_utils import make_oparator_func
17+
from deepchecks_monitoring.monitoring_utils import configure_logger, make_oparator_func
1818
from deepchecks_monitoring.public_models.organization import Organization
1919
from deepchecks_monitoring.public_models.task import BackgroundWorker, Task
2020
from deepchecks_monitoring.resources import ResourcesProvider
@@ -26,16 +26,20 @@
2626
from deepchecks_monitoring.utils import database
2727
from deepchecks_monitoring.utils.alerts import Condition
2828

29-
__all__ = ["ModelDataIngestionAlerter"]
29+
__all__ = ['ModelDataIngestionAlerter']
3030

3131

32-
QUEUE_NAME = "model data ingestion alerter"
32+
QUEUE_NAME = 'model data ingestion alerter'
3333
DELAY = 60
3434

3535

3636
class ModelDataIngestionAlerter(BackgroundWorker):
3737
"""Worker that alerts about data ingestion stats in relation to a model."""
3838

39+
def __init__(self):
40+
super().__init__()
41+
self.logger = configure_logger(self.__class__.__name__)
42+
3943
@classmethod
4044
def queue_name(cls) -> str:
4145
return QUEUE_NAME
@@ -44,14 +48,17 @@ def queue_name(cls) -> str:
4448
def delay_seconds(cls) -> int:
4549
return DELAY
4650

47-
async def run(self, task: "Task",
51+
async def run(self, task: 'Task',
4852
session: AsyncSession, # pylint: disable=unused-argument
4953
resources_provider: ResourcesProvider,
5054
lock):
51-
alert_rule_id = task.params["alert_rule_id"]
52-
org_id = task.params["organization_id"]
53-
end_time = task.params["end_time"]
54-
start_time = task.params["start_time"]
55+
alert_rule_id = task.params['alert_rule_id']
56+
org_id = task.params['organization_id']
57+
end_time = task.params['end_time']
58+
start_time = task.params['start_time']
59+
60+
self.logger.info({'message': 'entered job', 'worker name': str(type(self)),
61+
'task': task.id, 'alert_rule_id': alert_rule_id, 'org_id': org_id})
5562

5663
organization_schema = (await session.execute(
5764
sa.select(Organization.schema_name).where(
@@ -66,7 +73,7 @@ async def run(self, task: "Task",
6673

6774
await database.attach_schema_switcher_listener(
6875
session=session,
69-
schema_search_path=[organization_schema, "public"]
76+
schema_search_path=[organization_schema, 'public']
7077
)
7178

7279
alert_rule: DataIngestionAlertRule = (
@@ -86,8 +93,8 @@ async def run(self, task: "Task",
8693
pdl_start_time = as_pendulum_datetime(start_time)
8794
pdl_end_time = as_pendulum_datetime(end_time)
8895

89-
def truncate_date(col, agg_time_unit: str = "day"):
90-
return sa.func.cast(sa.func.extract("epoch", sa.func.date_trunc(agg_time_unit, col)), sa.Integer)
96+
def truncate_date(col, agg_time_unit: str = 'day'):
97+
return sa.func.cast(sa.func.extract('epoch', sa.func.date_trunc(agg_time_unit, col)), sa.Integer)
9198

9299
def sample_id(columns):
93100
return getattr(columns, SAMPLE_ID_COL)
@@ -103,33 +110,36 @@ def sample_label(columns):
103110
if not tables:
104111
return
105112

113+
self.logger.info({'message': 'starting job', 'worker name': str(type(self)),
114+
'task': task.id, 'alert_rule_id': alert_rule_id, 'org_id': org_id})
115+
106116
labels_table = model.get_sample_labels_table(session)
107117
# Get all samples within time window from all the versions
108118
data_query = sa.union_all(*(
109119
sa.select(
110-
sample_id(table.c).label("sample_id"),
120+
sample_id(table.c).label('sample_id'),
111121
truncate_date(sample_timestamp(table.c),
112-
freq.value.lower()).label("timestamp")
122+
freq.value.lower()).label('timestamp')
113123
).where(
114124
sample_timestamp(table.c) <= pdl_end_time,
115125
sample_timestamp(table.c) > pdl_start_time
116126
).distinct()
117127
for table in tables)
118128
)
119-
joined_query = sa.select(sa.literal(model.id).label("model_id"),
129+
joined_query = sa.select(sa.literal(model.id).label('model_id'),
120130
data_query.c.sample_id,
121131
data_query.c.timestamp,
122-
sa.func.cast(sample_label(labels_table.c), sa.String).label("label")) \
132+
sa.func.cast(sample_label(labels_table.c), sa.String).label('label')) \
123133
.join(labels_table, onclause=data_query.c.sample_id == sample_id(labels_table.c), isouter=True)
124134

125135
rows = (await session.execute(
126136
sa.select(
127137
joined_query.c.model_id,
128138
joined_query.c.timestamp,
129-
sa.func.count(joined_query.c.sample_id).label("count"),
130-
sa.func.count(sa.func.cast(joined_query.c.label, sa.String)).label("label_count"))
139+
sa.func.count(joined_query.c.sample_id).label('count'),
140+
sa.func.count(sa.func.cast(joined_query.c.label, sa.String)).label('label_count'))
131141
.group_by(joined_query.c.model_id, joined_query.c.timestamp)
132-
.order_by(joined_query.c.model_id, joined_query.c.timestamp, "count"),
142+
.order_by(joined_query.c.model_id, joined_query.c.timestamp, 'count'),
133143
)).fetchall()
134144

135145
pendulum_freq = freq.to_pendulum_duration()
@@ -156,3 +166,6 @@ def sample_label(columns):
156166

157167
await session.execute(sa.delete(Task).where(Task.id == task.id))
158168
await session.commit()
169+
170+
self.logger.info({'message': 'finished job', 'worker name': str(type(self)),
171+
'task': task.id, 'alert_rule_id': alert_rule_id, 'org_id': org_id})

0 commit comments

Comments
 (0)