Skip to content

Commit ddb5c0e

Browse files
authored
better logging - debug to info (#306)
1 parent 3b624a1 commit ddb5c0e

File tree

2 files changed

+13
-7
lines changed

2 files changed

+13
-7
lines changed

backend/deepchecks_monitoring/bgtasks/tasks_queuer.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ def __init__(
8484
update(Task)
8585
.where(next_execution_time <= func.statement_timestamp())
8686
.values({Task.num_pushed: Task.num_pushed + 1})
87-
.returning(Task.id)
87+
.returning(Task.id, Task.bg_worker_task, Task.num_pushed)
8888
)
8989

9090
async def run(self):
@@ -120,7 +120,12 @@ async def move_tasks_to_queue(self, session) -> int:
120120
try:
121121
# Push to sorted set. if task id is already in set then do nothing.
122122
pushed_count = await self.redis.zadd(GLOBAL_TASK_QUEUE, task_ids, nx=True)
123-
return pushed_count
123+
for task in tasks:
124+
id = task['id']
125+
worker = task['bg_worker_task']
126+
num_pushed = task['num_pushed']
127+
self.logger.info(f'pushing task {id} for {worker} that was pushed {num_pushed}')
128+
return pushed_count
124129
except redis_exceptions.ConnectionError:
125130
# If redis failed, does not commit the update to the db
126131
await session.rollback()

backend/deepchecks_monitoring/bgtasks/tasks_runner.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -80,11 +80,12 @@ async def run(self):
8080
raise
8181

8282
async def wait_for_task(self, timeout=120):
83+
self.logger.info('Checking for tasks')
8384
task_entry = await self.redis.bzpopmin(GLOBAL_TASK_QUEUE, timeout=timeout)
8485

8586
# If timeout is not 0 we might get return value of None
8687
if task_entry is None:
87-
self.logger.debug('Got from redis queue task_id none')
88+
self.logger.info('Got from redis queue task_id none')
8889
return
8990
else:
9091
# Return value from redis is (redis key, value, score)
@@ -101,16 +102,16 @@ async def run_single_task(self, task_id, session, queued_timestamp):
101102
lock = self.redis.lock(lock_name, blocking=False, timeout=60 * 5)
102103
lock_acquired = await lock.acquire()
103104
if not lock_acquired:
104-
self.logger.debug(f'Failed to acquire lock for task id: {task_id}')
105+
self.logger.info(f'Failed to acquire lock for task id: {task_id}')
105106
return
106107

107108
task = await session.scalar(select(Task).where(Task.id == task_id))
108109
# Making sure task wasn't deleted for some reason
109110
if task is not None:
110-
self.logger.debug(f'Running task id: {task_id}')
111+
self.logger.info(f'Running task id: {task_id}')
111112
await self._run_task(task, session, queued_timestamp, lock)
112113
else:
113-
self.logger.debug(f'Got already removed task id: {task_id}')
114+
self.logger.info(f'Got already removed task id: {task_id}')
114115

115116
try:
116117
await lock.release()
@@ -130,7 +131,7 @@ async def _run_task(self, task: Task, session, queued_timestamp, lock):
130131
delay = start.int_timestamp - queued_timestamp
131132
self.logger.info({'duration': duration, 'task': task.bg_worker_task, 'delay': delay})
132133
else:
133-
self.logger.error({'message': f'Unknown task type: {task.bg_worker_task}'})
134+
self.logger.info({'message': f'Unknown task type: {task.bg_worker_task}'})
134135
except Exception: # pylint: disable=broad-except
135136
await session.rollback()
136137
self.logger.exception({'message': 'Exception running task', 'task': task.bg_worker_task})

0 commit comments

Comments
 (0)