From 83deee67274d2d533ca8d3ebd9663f64d4333845 Mon Sep 17 00:00:00 2001 From: valerioda Date: Fri, 7 Oct 2022 10:36:09 +0200 Subject: [PATCH 01/10] get_super_detector -> get_logical_detector, dispatcher looping on physical detectors --- dispatcher/#MongoConnect.py# | 839 +++++++++++++++++++++++++++++++++++ dispatcher/MongoConnect.py | 84 ++-- dispatcher/dispatcher.py | 20 +- 3 files changed, 899 insertions(+), 44 deletions(-) create mode 100644 dispatcher/#MongoConnect.py# diff --git a/dispatcher/#MongoConnect.py# b/dispatcher/#MongoConnect.py# new file mode 100644 index 00000000..78f775bb --- /dev/null +++ b/dispatcher/#MongoConnect.py# @@ -0,0 +1,839 @@ +import datetime +from daqnt import DAQ_STATUS +import threading +import time +import pytz +import numpy as np + + +def encode_for_numpy(doc): + if '_id' in doc: + del doc['_id'] + mode_max_length = 32 + t = ( + doc['time'].isoformat().split('+')[0], # strip tz info + doc['detector'], + doc['number'], + (doc['mode'] or '')[:mode_max_length].lower(), + doc['rate'], + doc['buff'], + doc['status'], + ) + dtype = [ + ('time', 'datetime64[us]'), + ('detector', 'U16'), + ('number', np.int32), + ('mode', f'U{mode_max_length}'), + ('rate', np.float32), + ('buff', np.float32), + ('status', np.int8) + ] + return t, dtype + + +def _all(values, target): + return len(values) > 0 and all([v == target for v in values]) + +def now(): + return datetime.datetime.now(pytz.utc) + +# Communicate between various parts of dispatcher that no new run was determined +NO_NEW_RUN = -1 + +class MongoConnect(object): + """ + MongoDB Connectivity Class for XENONnT DAQ Dispatcher + D. Coderre, 12. Mar. 2019 + D. Masson, 2019-2021 + S. di Pede, 2020-2021 + V. D'Andrea, May o2022 + + Brief: This code handles the mongo connectivity for both the DAQ + databases (the ones used for system-wide communication) and the + runs database. + + """ + + def __init__(self, config, daq_config, logger, control_mc, runs_mc, hypervisor, testing=False): + + # Define DB connectivity. Log is separate to make it easier to split off if needed + dbn = config['ControlDatabaseName'] + rdbn = config['RunsDatabaseName'] + self.dax_db = control_mc[dbn] + self.runs_db = runs_mc[rdbn] + self.hypervisor = hypervisor + + self.latest_settings = {} + + self.loglevels = {"DEBUG": 0, "MESSAGE": 1, "WARNING": 2, "ERROR": 3, "FATAL": 4} + + # Each collection we actually interact with is stored here + self.collections = { + 'incoming_commands': self.dax_db['detector_control'], + 'node_status': self.dax_db['status'], + 'aggregate_status': self.dax_db['aggregate_status'], + 'outgoing_commands': self.dax_db['control'], + 'log': self.dax_db['log'], + 'options': self.dax_db['options'], + 'run': self.runs_db[config['RunsDatabaseCollection']], + } + + self.error_sent = {} + + # How often we should push certain types of errors (seconds) + self.error_timeouts = { + "ARM_TIMEOUT": 1, # 1=push all + "START_TIMEOUT": 1, + "STOP_TIMEOUT": 3600/4 # 15 minutes + } + # Timeout (in seconds). How long must a node not report to be considered timing out + self.timeout = int(config['ClientTimeout']) + + # How long a node can be timing out or missed an ack before it gets fixed (TPC only) + self.timeout_take_action = int(config['TimeoutActionThreshold']) + + # how long to give the CC to start the run. The +1 is so we check _after_ the CC should have acted + self.cc_start_wait = int(config['StartCmdDelay']) + 1 + + # Which control keys do we look for? + self.control_keys = config['ControlKeys'].split() + + # a place to buffer commands temporarily + self.command_queue = [] + self.q_mutex = threading.Lock() + + self.run_start_cache = {} + + # How often can we restart hosts? + self.hypervisor_host_restart_timeout = int(config['HypervisorHostRestartTimeout']) + self.host_is_timeout = set() + + self.digi_type = 'V17' if not testing else 'f17' + self.cc_type = 'V2718' if not testing else 'f2718' + + # We will store the latest status from each reader here + # Format: + # { + # 'tpc': { + # 'status': {enum}, + # 'mode': {string} run mode if any, + # 'rate': {int} aggregate rate if any, + # 'readers': { + # 'reader_0_reader_0': { + # 'status': {enum}, + # 'rate': {float}, + # }, + # 'controller': {} + # } + # } + self.latest_status = {} + self.host_config = {} + self.dc = daq_config + self.hv_timeout_fix = {} + for detector in self.dc: + self.latest_status[detector] = {'readers': {}, 'controller': {}} + for reader in self.dc[detector]['readers']: + self.latest_status[detector]['readers'][reader] = {} + self.host_config[reader] = detector + self.hv_timeout_fix[reader] = now() + for controller in self.dc[detector]['controller']: + self.latest_status[detector]['controller'][controller] = {} + self.host_config[controller] = detector + self.hv_timeout_fix[controller] = now() + + self.should_backup_aggstat = True + self.logger = logger + self.run = True + self.event = threading.Event() + self.command_thread = threading.Thread(target=self.process_commands) + self.command_thread.start() + + def quit(self): + self.run = False + try: + self.event.set() + self.command_thread.join() + except: + pass + + def __del__(self): + self.quit() + + def backup_aggstat(self): + """ + Backs up the aggregate status collection by numpyizing it + """ + today = now() + coll = self.collections['aggregate_status'] + if today.day == 1 and self.should_backup_aggstat: + then = (today - datetime.timedelta(days=31)).replace(tzinfo=None) + self.logger.info(f'Backing up aggregated status older than {then.isoformat()}') + data = [] + dtype = None + for doc in coll.find({'time': {'$lt': then}}): + try: + row, dtype = encode_for_numpy(doc) + data.append(row) + except Exception as e: + self.logger.debug(f'Error encoding {doc}') + return + try: + data = np.array(data, dtype=dtype) + # TODO better place to store them? + with open(f'/daq_common2/logs/aggstat_{then.year}{then.month:02d}{then.day:02d}.npz', 'wb') as f: + np.savez_compressed(f, data=data) + except Exception as e: + self.logger.error(f'Caught a {type(e)} while numpyizing aggstat: {e}') + else: + ret = coll.delete_many({'time': {'$lt': then}}) + self.logger.debug(f'Backed up {ret.deleted_count} docs') + self.should_backup_aggstat = False + else: + self.should_backup_aggstat = today.day != 1 + + + def get_update(self, dc): + """ + Gets the latest documents from the database for + each node we know about + """ + try: + for detector in dc.keys(): + for host in dc[detector]['readers'].keys(): + doc = self.collections['node_status'].find_one({'host': host}, + sort=[('_id', -1)]) + dc[detector]['readers'][host] = doc + for host in dc[detector]['controller'].keys(): + doc = self.collections['node_status'].find_one({'host': host}, + sort=[('_id', -1)]) + dc[detector]['controller'][host] = doc + except Exception as e: + self.logger.error(f'Got error while getting update: {type(e)}: {e}') + return None + + self.latest_status = dc + + # Now compute aggregate status + return self.latest_status if self.aggregate_status() is None else None + + def clear_error_timeouts(self): + self.error_sent = {} + + def aggregate_status(self): + """ + Compute the total status of each "detector" based on the most recent + updates of its individual nodes. Here are some general rules: + - Usually all nodes have the same status (i.e. 'running') and this is + not very complicated + - During changes of state (i.e. starting a run) some nodes might + be faster than others. In this case the status can be 'unknown'. + The main program should interpret whether 'unknown' is a reasonable + thing, like was a command sent recently? If so then sure, a 'unknown' + status will happpen. + - If any single node reports error then the whole thing is in error + - If any single node times out then the whole thing is in timeout + - Rates, buffer usage, and PLL counters only apply to the physical + detector, not the logical detector, while status and run number + apply to both + """ + now_time = time.time() + ret = None + aggstat = { + k:{ 'status': -1, + 'detector': k, + 'rate': 0, + 'time': now(), + 'buff': 0, + 'mode': None, + 'pll_unlocks': 0, + 'number': -1} + for k in self.dc} + phys_stat = {k: [] for k in self.dc} + for detector in self.latest_status.keys(): + # detector = logical + statuses = {} + status = None + modes = [] + run_nums = [] + for n, doc in self.latest_status[detector]['readers'].items(): + if doc is None: + self.logger.debug(f'{n} seems to have been offline for a few days') + continue + phys_det = self.host_config[doc['host']] + try: + aggstat[phys_det]['rate'] += doc['rate'] + aggstat[phys_det]['buff'] += doc['buffer_size'] + aggstat[phys_det]['pll_unlocks'] += doc.get('pll', 0) + except Exception as e: + # This is not really important but it's nice if we have it + self.logger.debug(f'Rate calculation ran into {type(e)}: {e}') + pass + + status = self.extract_status(doc, now_time) + statuses[doc['host']] = status + phys_stat[phys_det].append(status) + + for n, doc in self.latest_status[detector]['controller'].items(): + if doc is None: + self.logger.debug(f'{n} seems to have been offline for a few days') + modes.append('none') + run_nums.append(-1) + statuses['none'] = DAQ_STATUS.UNKNOWN + continue + phys_det = self.host_config[doc['host']] + status = self.extract_status(doc, now_time) + statuses[doc['host']] = status + doc['status'] = status + modes.append(doc.get('mode', 'none')) + run_nums.append(doc.get('number', None)) + aggstat[phys_det]['status'] = status + aggstat[phys_det]['mode'] = modes[-1] + aggstat[phys_det]['number'] = run_nums[-1] + phys_stat[phys_det].append(status) + + mode = modes[0] + run_num = run_nums[0] + if not _all(modes, mode) or not _all(run_nums, run_num): + self.logger.error(f'No quorum? {modes}, {run_nums}') + status_list = [DAQ_STATUS.UNKNOWN] + mode = 'none' + run_num = -1 + elif mode != 'none': # readout is "active": + a,b = self.get_hosts_for_mode(mode) + active = a + b + status_list = [v for k,v in statuses.items() if k in active] + else: + status_list = list(statuses.values()) + + # Now we aggregate the statuses + status = self.combine_statuses(status_list) + + self.latest_status[detector]['status'] = status + self.latest_status[detector]['number'] = run_num + self.latest_status[detector]['mode'] = mode + + try: + self.collections['aggregate_status'].insert_many(aggstat.values()) + except Exception as e: + self.logger.error(f'DB snafu? Couldn\'t update aggregate status. ' + f'{type(e)}, {e}') + + self.physical_status = phys_stat + return ret + + def combine_statuses(self, status_list): + # First, the "or" statuses + for stat in ['ARMING','ERROR','TIMEOUT','UNKNOWN']: + if DAQ_STATUS[stat] in status_list: + return DAQ_STATUS[stat] + # then the "and" statuses + for stat in ['IDLE','ARMED','RUNNING']: + if _all(status_list, DAQ_STATUS[stat]): + return DAQ_STATUS[stat] + return DAQ_STATUS.UNKNOWN + + def extract_status(self, doc, now_time): + try: + return DAQ_STATUS.TIMEOUT if self.is_timeout(doc, now_time) else DAQ_STATUS(doc['status']) + except Exception as e: + self.logger.debug(f'Setting status to unknown for {doc.get("host", "unknown")} because of {type(e)}: {e}') + return DAQ_STATUS.UNKNOWN + + def is_timeout(self, doc, t): + """ + Checks to see if the specified status doc corresponds to a timeout situation + """ + host = doc['host'] + dt = t - int(str(doc['_id'])[:8], 16) + has_ackd = self.host_ackd_command(host) + ret = False + self.logger.debug(f'{host} last reported {int(dt)} sec ago') + if dt > self.timeout: + ret = ret or True + if has_ackd is not None and t - has_ackd > self.timeout_take_action: + if host not in self.host_is_timeout: + self.logger.critical(f'{host} hasn\'t ackd a command from {int(t-has_ackd)} sec ago') + self.host_is_timeout.add(host) + if self.host_config[host] == 'tpc': + dt = (now() - self.hv_timeout_fix[host]).total_seconds() + if dt > self.hypervisor_host_restart_timeout: + self.log_error(f'Hypervisor fixes timeout of {host}', "ERROR", "ERROR") + self.hypervisor.handle_timeout(host) + self.hv_timeout_fix[host] = now() + else: + self.logger.debug(f'Not restarting {host}, timeout at {int(dt)}') + ret = ret or True + if not ret and host in self.host_is_timeout: + self.host_is_timeout.discard(host) + return ret + + def get_wanted_state(self): + """ + Figure out what the system is supposed to be doing right now + """ + try: + latest_settings = {} + for detector in self.dc: + latest = None + latest_settings[detector] = {} + for key in self.control_keys: + doc = self.collections['incoming_commands'].find_one( + {'key': f'{detector}.{key}'}, sort=[('_id', -1)]) + if doc is None: + self.logger.error(f'No key {key} for {detector}???') + return None + latest_settings[detector][doc['field']] = doc['value'] + if latest is None or doc['time'] > latest: + latest = doc['time'] + latest_settings[detector]['user'] = doc['user'] + self.goal_state = latest_settings + return self.goal_state + except Exception as e: + self.logger.debug(f'get_wanted_state failed due to {type(e)} {e}') + return None + + def is_linked_mode(self): + """ + Are we in a linked configuration for this control iteration? + """ + # self.dc has the physical detectors, self.latest_status has the logical detectors + return len(self.dc.keys()) != len(self.latest_status.keys()) + + def is_linked(self, a, b): + """ + Check if the detectors are in a compatible linked configuration. + """ + mode_a = self.goal_state[a]["mode"] + mode_b = self.goal_state[b]["mode"] + if mode_a != mode_b: + self.logger.debug(f'{a} and {b} are not linked ({mode_a}/{mode_b})') + # shortcut to lessen the load on the db + return False + + # we don't need to pull the whole combined document because the 'detector' field is at the top level + if (doc := self.collections['options'].find_one({'name': mode_a}, {'detector': 1})) is not None: + detectors = doc['detector'] + else: + detectors = None + + # Check if the linked detectors share the same run mode and + # if they are both present in the detectors list of that mode + # also no "tpc_muon_veto" bullshit, it must be ['tpc', 'muon_veto'] + if isinstance(detectors, list) and a in detectors and b in detectors: + self.logger.debug(f'{a} and {b} are linked ({mode_a}/{detectors})') + return True + else: + self.logger.debug(f'{a} and {b} aren\'t link?? How this happen?? {mode_a} {detectors}') + return False + + def get_logical_detector(self): + """ + Get the Logical Detector configuration + if the detectors are in a compatible linked mode. + - case A: tpc, mv and nv all linked + - case B: tpc, mv and nv all un-linked + - case C: tpc and mv linked, nv un-linked + - case D: tpc and nv linked, mv un-linked + - case E: tpc unlinked, mv and nv linked + We will check the compatibility of the linked mode for a pair of detectors per time. + """ + + tpc = self.dc['tpc'] + mv = self.dc['muon_veto'] + nv = self.dc['neutron_veto'] + + is_tpc_mv = self.is_linked('tpc', 'muon_veto') + is_tpc_nv = self.is_linked('tpc', 'neutron_veto') + is_mv_nv = self.is_linked('muon_veto', 'neutron_veto') + + if is_tpc_mv and is_tpc_nv and is_mv_nv: + # case A + ret = {'all_linked': {'controller': tpc['controller'][:] + mv['controller'][:] + nv['controller'][:], + 'readers': tpc['readers'][:] + mv['readers'][:] + nv['readers'][:], + 'detectors': ['tpc','muon_veto','neutron_veto']}} + elif is_tpc_mv and not is_tpc_nv and not is_mv_nv: + # case C + ret = {'tpc_mv': {'controller': tpc['controller'][:] + mv['controller'][:], + 'readers': tpc['readers'][:] + mv['readers'][:], + 'detectors': ['tpc','muon_veto']}, + 'nv': {'controller': nv['controller'][:], + 'readers': nv['readers'][:], + 'detectors': ['neutron_veto']}} + elif is_tpc_nv and not is_tpc_mv and not is_mv_nv: + # case D + ret = {'tpc_nv': {'controller': tpc['controller'][:] + nv['controller'][:], + 'readers': tpc['readers'][:] + nv['readers'][:], + 'detectors': ['tpc','neutron_veto']}, + 'mv' = {'controller': mv['controller'][:], + 'readers': mv['readers'][:], + 'detectors': ['muon_veto']}} + elif is_mv_nv and not is_tpc_mv and not is_tpc_nv: + # case E + ret = {'tpc': {'controller': tpc['controller'][:], + 'readers': tpc['readers'][:], + 'detectors': ['tpc']}, + 'mv_nv' = {'controller': mv['controller'][:] + nv['controller'][:], + 'readers': mv['readers'][:] + nv['readers'][:], + 'detectors': ['muon_veto','neutron_veto']}} + else: + # case B + ret = {'tpc': {'controller': tpc['controller'][:], + 'readers': tpc['readers'][:], + 'detectors': ['tpc']}, + 'mv' = {'controller': mv['controller'][:], + 'readers': mv['readers'][:], + 'detectors': ['muon_veto']}, + 'nv' = {'controller': nv['controller'][:], + 'readers': nv['readers'][:], + 'detectors': ['neutron_veto']}} + + # convert the host lists to dics for later + for det in list(ret.keys()): + ret[det]['controller'] = {c:{} for c in ret[det]['controller']} + ret[det]['readers'] = {c:{} for c in ret[det]['readers']} + return ret + + def get_run_mode(self, mode): + """ + Pull a run doc from the options collection and add all the includes + """ + if mode is None: + return None + base_doc = self.collections['options'].find_one({'name': mode}) + if base_doc is None: + self.log_error("Mode '%s' doesn't exist" % mode, "info", "info") + return None + if 'includes' not in base_doc or len(base_doc['includes']) == 0: + return base_doc + try: + if self.collections['options'].count_documents({'name': + {'$in': base_doc['includes']}}) != len(base_doc['includes']): + self.log_error("At least one subconfig for mode '%s' doesn't exist" % mode, "WARNING", "WARNING") + return None + return list(self.collections["options"].aggregate([ + {'$match': {'name': mode}}, + {'$lookup': {'from': 'options', 'localField': 'includes', + 'foreignField': 'name', 'as': 'subconfig'}}, + {'$addFields': {'subconfig': {'$concatArrays': ['$subconfig', ['$$ROOT']]}}}, + {'$unwind': '$subconfig'}, + {'$group': {'_id': None, 'config': {'$mergeObjects': '$subconfig'}}}, + {'$replaceWith': '$config'}, + {'$project': {'_id': 0, 'description': 0, 'includes': 0, 'subconfig': 0}}, + ]))[0] + except Exception as e: + self.logger.error("Got a %s exception in doc pulling: %s" % (type(e), e)) + return None + + def get_hosts_for_mode(self, mode, detector=None): + """ + Get the nodes we need from the run mode + """ + if mode is None or mode == 'none': + if detector is None: + self.logger.error('No mode, no detector? wtf?') + return [], [] + return (list(self.latest_status[detector]['readers'].keys()), + list(self.latest_status[detector]['controller'].keys())) + if (doc := self.get_run_mode(mode)) is None: + self.logger.error('How did this happen?') + return [], [] + cc = [] + hostlist = [] + for b in doc['boards']: + if self.digi_type in b['type'] and b['host'] not in hostlist: + hostlist.append(b['host']) + elif b['type'] == self.cc_type and b['host'] not in cc: + cc.append(b['host']) + return hostlist, cc + + def get_next_run_number(self): + try: + cursor = self.collections["run"].find({},{'number': 1}).sort("number", -1).limit(1) + except Exception as e: + self.logger.error(f'Database is having a moment? {type(e)}, {e}') + return NO_NEW_RUN + if cursor.count() == 0: + self.logger.info("wtf, first run?") + return 0 + return list(cursor)[0]['number']+1 + + def set_stop_time(self, number, detectors, force): + """ + Sets the 'end' field of the run doc to the time when the STOP command was ack'd + """ + self.logger.info(f"Updating run {number} with end time ({detectors})") + if number == -1: + return + try: + time.sleep(0.5) # this number depends on the CC command polling time + if (endtime := self.get_ack_time(detectors, 'stop') ) is None: + self.logger.debug(f'No end time found for run {number}') + endtime = now() -datetime.timedelta(seconds=1) + query = {"number": int(number), "end": None, 'detectors': detectors} + updates = {"$set": {"end": endtime}} + if force: + updates["$push"] = {"tags": {"name": "_messy", "user": "daq", + "date": now()}} + if self.collections['run'].update_one(query, updates).modified_count == 1: + self.logger.debug('Update successful') + rate = {} + for doc in self.collections['aggregate_status'].aggregate([ + {'$match': {'number': number}}, + {'$group': {'_id': '$detector', + 'avg': {'$avg': '$rate'}, + 'max': {'$max': '$rate'}}} + ]): + rate[doc['_id']] = {'avg': doc['avg'], 'max': doc['max']} + channels = set() + if 'tpc' in detectors: + # figure out which channels weren't running + readers = list(self.latest_status[detectors]['readers'].keys()) + for doc in self.collections['node_status'].find({'host': {'$in': readers}, 'number': int(number)}): + channels |= set(map(int, doc['channels'].keys())) + updates = {'rate': rate} + if len(channels): + updates['no_data_from'] = sorted(list(set(range(494)) - channels)) + self.collections['run'].update_one({'number': int(number)}, + {'$set': updates}) + if str(number) in self.run_start_cache: + del self.run_start_cache[str(number)] + else: + self.logger.debug('No run updated?') + except Exception as e: + self.logger.error(f"Database having a moment, hope this doesn't crash. {type(e)}, {e}") + return + + def get_ack_time(self, detector, command, recurse=True): + ''' + Finds the time when specified detector's crate controller ack'd the specified command + ''' + # the first cc is the "master", so its ack time is what counts + cc = list(self.latest_status[detector]['controller'].keys())[0] + query = {'host': cc, f'acknowledged.{cc}': {'$ne': 0}, 'command': command} + sort = [('_id', -1)] + doc = self.collections['outgoing_commands'].find_one(query, sort=sort) + dt = (now() - doc['acknowledged'][cc].replace(tzinfo=pytz.utc)).total_seconds() + if dt > 30: # TODO make this a config value + if recurse: + # No way we found the correct command here, maybe we're too soon + self.logger.debug(f'Most recent ack for {detector}-{command} is {dt:.1f}?') + time.sleep(2) # if in doubt + return self.get_ack_time(detector, command, False) + else: + # Welp + self.logger.debug(f'No recent ack time for {detector}-{command}') + return None + return doc['acknowledged'][cc] + + def send_command(self, command, hosts, user, detector, mode="", delay=0, force=False): + """ + Send this command to these hosts. If delay is set then wait that amount of time + """ + number = None + ls = self.latest_status[detector] + for host_list in hosts: + for h in host_list: + if h not in ls['readers'] and h not in ls['controller']: + self.logger.error(f'Trying to issue a {command} to {detector}/{h}?') + host_list.remove(h) + if command == 'stop' and not self.detector_ackd_command(detector, 'stop'): + self.logger.error(f"{detector} hasn't ack'd its last stop, let's not flog a dead horse") + if not force: + return 1 + try: + if command == 'arm': + number = self.get_next_run_number() + if number == NO_NEW_RUN: + return -1 + self.latest_status[detector]['number'] = number + doc_base = { + "command": command, + "user": user, + "detector": detector, + "mode": mode, + "createdAt": now() + } + if command == 'arm': + doc_base['options_override'] = {'number': number} + if delay == 0: + docs = doc_base + docs['host'] = hosts[0]+hosts[1] + docs['acknowledged'] = {h:0 for h in docs['host']} + docs = [docs] + else: + docs = [dict(doc_base.items()), dict(doc_base.items())] + docs[0]['host'], docs[1]['host'] = hosts + docs[0]['acknowledged'] = {h:0 for h in docs[0]['host']} + docs[1]['acknowledged'] = {h:0 for h in docs[1]['host']} + docs[1]['createdAt'] += datetime.timedelta(seconds=delay) + with self.q_mutex: + self.command_queue += docs + except Exception as e: + self.logger.debug(f'SendCommand ran into {type(e)}, {e})') + return -1 + else: + self.logger.debug(f'Queued {command} for {detector}') + self.event.set() + return 0 + + def process_commands(self): + """ + Process our internal command queue + """ + outgoing = self.collections['outgoing_commands'] + while self.run == True: + try: + with self.q_mutex: + if len(self.command_queue) > 1: + self.command_queue.sort(key=lambda d : d['createdAt'].timestamp()) + if len(self.command_queue) > 0: + next_cmd = self.command_queue[0] + dt = (next_cmd['createdAt'].replace(tzinfo=pytz.utc) - now()).total_seconds() + else: + dt = 10 + if dt < 0.01: + with self.q_mutex: + outgoing.insert_one(self.command_queue.pop(0)) + except Exception as e: + dt = 10 + self.logger.error(f"DB down? {type(e)}, {e}") + self.event.clear() + self.event.wait(dt) + + def host_ackd_command(self, host): + """ + Finds the timestamp of the oldest unacknowledged command send to the specified host + :param host: str, the process name to check + :returns: float, the timestamp of the last unack'd command, or None if none exist + """ + q = {'host': host, f'acknowledged.{host}': 0} + sort = [('_id', 1)] + if (doc := self.collections['outgoing_commands'].find_one(q, sort=sort)) is None: + self.logger.debug(f'No unack\'d commands for {host}') + return None + return doc['createdAt'].replace(tzinfo=pytz.utc).timestamp() + + def detector_ackd_command(self, detector, command): + """ + Finds when the specified/most recent command was ack'd + """ + q = {'detector': detector} + sort = [('_id', -1)] + if command is not None: + q['command'] = command + if (doc := self.collections['outgoing_commands'].find_one(q, sort=sort)) is None: + self.logger.error('No previous command found?') + return True + # we can't naively use everything in the hosts field, because we might be transitioning + # out of linked mode, and there might be "garbage" in the host list because someone + # didn't follow very clear instructions, and if a stop is issued to a host that doesn't + # exist, the dispatcher basically stops working + hosts_this_detector = set(self.latest_status[detector]['readers'].keys()) | set(self.latest_status[detector]['controller'].keys()) + hosts_in_doc = set(doc['host']) + hosts_ignored = hosts_in_doc - hosts_this_detector + if len(hosts_ignored): + self.logger.warning(f'Ignoring hosts: {hosts_ignored}') + # so we only loop over the intersection of this detector's hosts and the doc's hosts + for h in hosts_this_detector & hosts_in_doc: + if doc['acknowledged'][h] == 0: + return False + return True + + def log_error(self, message, priority, etype): + #Start by logging the error localy + self.logger.info(message) + # Note that etype allows you to define timeouts. + nowtime = now() + if ( (etype in self.error_sent and self.error_sent[etype] is not None) and + (etype in self.error_timeouts and self.error_timeouts[etype] is not None) and + (nowtime-self.error_sent[etype]).total_seconds() <= self.error_timeouts[etype]): + self.logger.debug("Could log error, but still in timeout for type %s"%etype) + return + self.error_sent[etype] = nowtime + try: + self.collections['log'].insert({ + "user": "dispatcher", + "message": message, + "priority": self.loglevels[priority] + }) + except Exception as e: + self.logger.error(f'Database error, can\'t issue error message: {type(e)}, {e}') + self.logger.info("Error message from dispatcher: %s" % (message)) + return + + def get_run_start(self, number): + """ + Returns the timezone-corrected run start time from the rundoc + """ + if str(number) in self.run_start_cache: + return self.run_start_cache[str(number)] + try: + doc = self.collections['run'].find_one({"number": number}, {"start": 1}) + except Exception as e: + self.logger.error(f'Database is having a moment: {type(e)}, {e}') + return None + if doc is not None and 'start' in doc: + self.run_start_cache[str(number)] = doc['start'].replace(tzinfo=pytz.utc) + return self.run_start_cache[str(number)] + return None + + def insert_run_doc(self, detector): + + if (number := self.get_next_run_number()) == NO_NEW_RUN: + self.logger.error("DB having a moment") + return -1 + # the rundoc gets the physical detectors, not the logical + detectors = self.latest_status[detector]['detectors'] + + run_doc = { + "number": number, + 'detectors': detectors, + 'user': self.goal_state[detector]['user'], + 'mode': self.goal_state[detector]['mode'], + 'bootstrax': {'state': None}, + 'end': None + } + + # If there's a source add the source. Also add the complete ini file. + cfg = self.get_run_mode(self.goal_state[detector]['mode']) + if cfg is not None and 'source' in cfg.keys(): + run_doc['source'] = str(cfg['source']) + run_doc['daq_config'] = cfg + + # If the user started the run with a comment add that too + if "comment" in self.goal_state[detector] and self.goal_state[detector]['comment'] != "": + run_doc['comments'] = [{ + "user": self.goal_state[detector]['user'], + "date": now(), + "comment": self.goal_state[detector]['comment'] + }] + + # Make a data entry so bootstrax can find the thing + if 'strax_output_path' in cfg: + run_doc['data'] = [{ + 'type': 'live', + 'host': 'daq', + 'location': cfg['strax_output_path'] + }] + + # The cc needs some time to get started + time.sleep(self.cc_start_wait) + try: + start_time = self.get_ack_time(detector, 'start') + except Exception as e: + self.logger.error('Couldn\'t find start time ack') + start_time = None + + if start_time is None: + start_time = now()-datetime.timedelta(seconds=2) + # if we miss the ack time, we don't really know when the run started + # so may as well tag it + run_doc['tags'] = [{'name': 'messy', 'user': 'daq', 'date': start_time}] + run_doc['start'] = start_time + + try: + self.collections['run'].insert_one(run_doc) + except Exception as e: + self.logger.error(f'Database having a moment: {type(e)}, {e}') + return -1 + return None diff --git a/dispatcher/MongoConnect.py b/dispatcher/MongoConnect.py index 6b5589b1..adbee0c2 100644 --- a/dispatcher/MongoConnect.py +++ b/dispatcher/MongoConnect.py @@ -46,6 +46,7 @@ class MongoConnect(object): D. Coderre, 12. Mar. 2019 D. Masson, 2019-2021 S. di Pede, 2020-2021 + V. D'Andrea, May 2022 Brief: This code handles the mongo connectivity for both the DAQ databases (the ones used for system-wide communication) and the @@ -425,9 +426,9 @@ def is_linked(self, a, b): self.logger.debug(f'{a} and {b} aren\'t link?? How this happen?? {mode_a} {detectors}') return False - def get_super_detector(self): + def get_logical_detector(self): """ - Get the Super Detector configuration + Get the Logical Detector configuration if the detectors are in a compatible linked mode. - case A: tpc, mv and nv all linked - case B: tpc, mv and nv all un-linked @@ -436,43 +437,56 @@ def get_super_detector(self): - case E: tpc unlinked, mv and nv linked We will check the compatibility of the linked mode for a pair of detectors per time. """ - ret = {'tpc': {'controller': self.dc['tpc']['controller'][:], - 'readers': self.dc['tpc']['readers'][:], - 'detectors': ['tpc']}} + + tpc = self.dc['tpc'] mv = self.dc['muon_veto'] nv = self.dc['neutron_veto'] - - tpc_mv = self.is_linked('tpc', 'muon_veto') - tpc_nv = self.is_linked('tpc', 'neutron_veto') - mv_nv = self.is_linked('muon_veto', 'neutron_veto') - - # tpc and muon_veto linked mode - if tpc_mv: - # case A or C - ret['tpc']['controller'] += mv['controller'] - ret['tpc']['readers'] += mv['readers'] - ret['tpc']['detectors'] += ['muon_veto'] - else: - # case B or E - ret['muon_veto'] = {'controller': mv['controller'][:], - 'readers': mv['readers'][:], - 'detectors': ['muon_veto']} - if tpc_nv: - # case A or D - ret['tpc']['controller'] += nv['controller'][:] - ret['tpc']['readers'] += nv['readers'][:] - ret['tpc']['detectors'] += ['neutron_veto'] - elif mv_nv and not tpc_mv: + + is_tpc_mv = self.is_linked('tpc', 'muon_veto') + is_tpc_nv = self.is_linked('tpc', 'neutron_veto') + is_mv_nv = self.is_linked('muon_veto', 'neutron_veto') + + if is_tpc_mv and is_tpc_nv and is_mv_nv: + # case A + ret = {'all_linked': {'controller': tpc['controller'][:] + mv['controller'][:] + nv['controller'][:], + 'readers': tpc['readers'][:] + mv['readers'][:] + nv['readers'][:], + 'detectors': ['tpc','muon_veto','neutron_veto']}} + elif is_tpc_mv and not is_tpc_nv and not is_mv_nv: + # case C + ret = {'tpc_mv': {'controller': tpc['controller'][:] + mv['controller'][:], + 'readers': tpc['readers'][:] + mv['readers'][:], + 'detectors': ['tpc','muon_veto']}, + 'nv': {'controller': nv['controller'][:], + 'readers': nv['readers'][:], + 'detectors': ['neutron_veto']}} + elif is_tpc_nv and not is_tpc_mv and not is_mv_nv: + # case D + ret = {'tpc_nv': {'controller': tpc['controller'][:] + nv['controller'][:], + 'readers': tpc['readers'][:] + nv['readers'][:], + 'detectors': ['tpc','neutron_veto']}, + 'mv' = {'controller': mv['controller'][:], + 'readers': mv['readers'][:], + 'detectors': ['muon_veto']}} + elif is_mv_nv and not is_tpc_mv and not is_tpc_nv: # case E - ret['muon_veto']['controller'] += nv['controller'][:] - ret['muon_veto']['readers'] += nv['readers'][:] - ret['muon_veto']['detectors'] += ['neutron_veto'] + ret = {'tpc': {'controller': tpc['controller'][:], + 'readers': tpc['readers'][:], + 'detectors': ['tpc']}, + 'mv_nv' = {'controller': mv['controller'][:] + nv['controller'][:], + 'readers': mv['readers'][:] + nv['readers'][:], + 'detectors': ['muon_veto','neutron_veto']}} else: - # case B or C - ret['neutron_veto'] = {'controller': nv['controller'][:], - 'readers': nv['readers'][:], - 'detectors': ['neutron_veto']} - + # case B + ret = {'tpc': {'controller': tpc['controller'][:], + 'readers': tpc['readers'][:], + 'detectors': ['tpc']}, + 'mv' = {'controller': mv['controller'][:], + 'readers': mv['readers'][:], + 'detectors': ['muon_veto']}, + 'nv' = {'controller': nv['controller'][:], + 'readers': nv['readers'][:], + 'detectors': ['neutron_veto']}} + # convert the host lists to dics for later for det in list(ret.keys()): ret[det]['controller'] = {c:{} for c in ret[det]['controller']} diff --git a/dispatcher/dispatcher.py b/dispatcher/dispatcher.py index b106a1c2..e8e4eaf9 100755 --- a/dispatcher/dispatcher.py +++ b/dispatcher/dispatcher.py @@ -90,20 +90,22 @@ def main(config, control_mc, logger, daq_config, vme_config, SlackBot, runs_mc, # Get most recent goal state from database. Users will update this from the website. if (goal_state := mc.get_wanted_state()) is None: continue - # Get the Super-Detector configuration - current_config = mc.get_super_detector() + # Get the Logical Detector configuration + current_config = mc.get_logical_detector() # Get most recent check-in from all connected hosts if (latest_status := mc.get_update(current_config)) is None: continue # Print an update - for detector in latest_status.keys(): - state = 'ACTIVE' if goal_state[detector]['active'] == 'true' else 'INACTIVE' - msg = (f'The {detector} should be {state} and is ' - f'{latest_status[detector]["status"].name}') - if latest_status[detector]['number'] != -1: - msg += f' ({latest_status[detector]["number"]})' - logger.debug(msg) + for log_det in latest_status.keys(): + for detector in latest_status[log_det]['detectors'].keys(): + state = 'ACTIVE' if goal_state[detector]['active'] == 'true' else 'INACTIVE' + msg = (f'Logical detector {log_det}: ' + f'The {detector} should be {state} and is ' + f'{latest_status[detector]["status"].name}') + if latest_status[log_det]['number'] != -1: + msg += f' ({latest_status[log_det]["number"]})' + logger.debug(msg) msg = (f"Linking: tpc-mv: {mc.is_linked('tpc', 'muon_veto')}, " f"tpc-nv: {mc.is_linked('tpc', 'neutron_veto')}, " f"mv-nv: {mc.is_linked('muon_veto', 'neutron_veto')}") From 951e3c9b354f4a27ff763986269b490e4cc8f08c Mon Sep 17 00:00:00 2001 From: valerioda Date: Fri, 7 Oct 2022 10:39:02 +0200 Subject: [PATCH 02/10] clean --- .gitignore | 1 + dispatcher/#MongoConnect.py# | 839 ----------------------------------- 2 files changed, 1 insertion(+), 839 deletions(-) delete mode 100644 dispatcher/#MongoConnect.py# diff --git a/.gitignore b/.gitignore index 013a5df4..deb549d2 100644 --- a/.gitignore +++ b/.gitignore @@ -5,6 +5,7 @@ # temporary files *.swp *~ +*# # executable redax diff --git a/dispatcher/#MongoConnect.py# b/dispatcher/#MongoConnect.py# deleted file mode 100644 index 78f775bb..00000000 --- a/dispatcher/#MongoConnect.py# +++ /dev/null @@ -1,839 +0,0 @@ -import datetime -from daqnt import DAQ_STATUS -import threading -import time -import pytz -import numpy as np - - -def encode_for_numpy(doc): - if '_id' in doc: - del doc['_id'] - mode_max_length = 32 - t = ( - doc['time'].isoformat().split('+')[0], # strip tz info - doc['detector'], - doc['number'], - (doc['mode'] or '')[:mode_max_length].lower(), - doc['rate'], - doc['buff'], - doc['status'], - ) - dtype = [ - ('time', 'datetime64[us]'), - ('detector', 'U16'), - ('number', np.int32), - ('mode', f'U{mode_max_length}'), - ('rate', np.float32), - ('buff', np.float32), - ('status', np.int8) - ] - return t, dtype - - -def _all(values, target): - return len(values) > 0 and all([v == target for v in values]) - -def now(): - return datetime.datetime.now(pytz.utc) - -# Communicate between various parts of dispatcher that no new run was determined -NO_NEW_RUN = -1 - -class MongoConnect(object): - """ - MongoDB Connectivity Class for XENONnT DAQ Dispatcher - D. Coderre, 12. Mar. 2019 - D. Masson, 2019-2021 - S. di Pede, 2020-2021 - V. D'Andrea, May o2022 - - Brief: This code handles the mongo connectivity for both the DAQ - databases (the ones used for system-wide communication) and the - runs database. - - """ - - def __init__(self, config, daq_config, logger, control_mc, runs_mc, hypervisor, testing=False): - - # Define DB connectivity. Log is separate to make it easier to split off if needed - dbn = config['ControlDatabaseName'] - rdbn = config['RunsDatabaseName'] - self.dax_db = control_mc[dbn] - self.runs_db = runs_mc[rdbn] - self.hypervisor = hypervisor - - self.latest_settings = {} - - self.loglevels = {"DEBUG": 0, "MESSAGE": 1, "WARNING": 2, "ERROR": 3, "FATAL": 4} - - # Each collection we actually interact with is stored here - self.collections = { - 'incoming_commands': self.dax_db['detector_control'], - 'node_status': self.dax_db['status'], - 'aggregate_status': self.dax_db['aggregate_status'], - 'outgoing_commands': self.dax_db['control'], - 'log': self.dax_db['log'], - 'options': self.dax_db['options'], - 'run': self.runs_db[config['RunsDatabaseCollection']], - } - - self.error_sent = {} - - # How often we should push certain types of errors (seconds) - self.error_timeouts = { - "ARM_TIMEOUT": 1, # 1=push all - "START_TIMEOUT": 1, - "STOP_TIMEOUT": 3600/4 # 15 minutes - } - # Timeout (in seconds). How long must a node not report to be considered timing out - self.timeout = int(config['ClientTimeout']) - - # How long a node can be timing out or missed an ack before it gets fixed (TPC only) - self.timeout_take_action = int(config['TimeoutActionThreshold']) - - # how long to give the CC to start the run. The +1 is so we check _after_ the CC should have acted - self.cc_start_wait = int(config['StartCmdDelay']) + 1 - - # Which control keys do we look for? - self.control_keys = config['ControlKeys'].split() - - # a place to buffer commands temporarily - self.command_queue = [] - self.q_mutex = threading.Lock() - - self.run_start_cache = {} - - # How often can we restart hosts? - self.hypervisor_host_restart_timeout = int(config['HypervisorHostRestartTimeout']) - self.host_is_timeout = set() - - self.digi_type = 'V17' if not testing else 'f17' - self.cc_type = 'V2718' if not testing else 'f2718' - - # We will store the latest status from each reader here - # Format: - # { - # 'tpc': { - # 'status': {enum}, - # 'mode': {string} run mode if any, - # 'rate': {int} aggregate rate if any, - # 'readers': { - # 'reader_0_reader_0': { - # 'status': {enum}, - # 'rate': {float}, - # }, - # 'controller': {} - # } - # } - self.latest_status = {} - self.host_config = {} - self.dc = daq_config - self.hv_timeout_fix = {} - for detector in self.dc: - self.latest_status[detector] = {'readers': {}, 'controller': {}} - for reader in self.dc[detector]['readers']: - self.latest_status[detector]['readers'][reader] = {} - self.host_config[reader] = detector - self.hv_timeout_fix[reader] = now() - for controller in self.dc[detector]['controller']: - self.latest_status[detector]['controller'][controller] = {} - self.host_config[controller] = detector - self.hv_timeout_fix[controller] = now() - - self.should_backup_aggstat = True - self.logger = logger - self.run = True - self.event = threading.Event() - self.command_thread = threading.Thread(target=self.process_commands) - self.command_thread.start() - - def quit(self): - self.run = False - try: - self.event.set() - self.command_thread.join() - except: - pass - - def __del__(self): - self.quit() - - def backup_aggstat(self): - """ - Backs up the aggregate status collection by numpyizing it - """ - today = now() - coll = self.collections['aggregate_status'] - if today.day == 1 and self.should_backup_aggstat: - then = (today - datetime.timedelta(days=31)).replace(tzinfo=None) - self.logger.info(f'Backing up aggregated status older than {then.isoformat()}') - data = [] - dtype = None - for doc in coll.find({'time': {'$lt': then}}): - try: - row, dtype = encode_for_numpy(doc) - data.append(row) - except Exception as e: - self.logger.debug(f'Error encoding {doc}') - return - try: - data = np.array(data, dtype=dtype) - # TODO better place to store them? - with open(f'/daq_common2/logs/aggstat_{then.year}{then.month:02d}{then.day:02d}.npz', 'wb') as f: - np.savez_compressed(f, data=data) - except Exception as e: - self.logger.error(f'Caught a {type(e)} while numpyizing aggstat: {e}') - else: - ret = coll.delete_many({'time': {'$lt': then}}) - self.logger.debug(f'Backed up {ret.deleted_count} docs') - self.should_backup_aggstat = False - else: - self.should_backup_aggstat = today.day != 1 - - - def get_update(self, dc): - """ - Gets the latest documents from the database for - each node we know about - """ - try: - for detector in dc.keys(): - for host in dc[detector]['readers'].keys(): - doc = self.collections['node_status'].find_one({'host': host}, - sort=[('_id', -1)]) - dc[detector]['readers'][host] = doc - for host in dc[detector]['controller'].keys(): - doc = self.collections['node_status'].find_one({'host': host}, - sort=[('_id', -1)]) - dc[detector]['controller'][host] = doc - except Exception as e: - self.logger.error(f'Got error while getting update: {type(e)}: {e}') - return None - - self.latest_status = dc - - # Now compute aggregate status - return self.latest_status if self.aggregate_status() is None else None - - def clear_error_timeouts(self): - self.error_sent = {} - - def aggregate_status(self): - """ - Compute the total status of each "detector" based on the most recent - updates of its individual nodes. Here are some general rules: - - Usually all nodes have the same status (i.e. 'running') and this is - not very complicated - - During changes of state (i.e. starting a run) some nodes might - be faster than others. In this case the status can be 'unknown'. - The main program should interpret whether 'unknown' is a reasonable - thing, like was a command sent recently? If so then sure, a 'unknown' - status will happpen. - - If any single node reports error then the whole thing is in error - - If any single node times out then the whole thing is in timeout - - Rates, buffer usage, and PLL counters only apply to the physical - detector, not the logical detector, while status and run number - apply to both - """ - now_time = time.time() - ret = None - aggstat = { - k:{ 'status': -1, - 'detector': k, - 'rate': 0, - 'time': now(), - 'buff': 0, - 'mode': None, - 'pll_unlocks': 0, - 'number': -1} - for k in self.dc} - phys_stat = {k: [] for k in self.dc} - for detector in self.latest_status.keys(): - # detector = logical - statuses = {} - status = None - modes = [] - run_nums = [] - for n, doc in self.latest_status[detector]['readers'].items(): - if doc is None: - self.logger.debug(f'{n} seems to have been offline for a few days') - continue - phys_det = self.host_config[doc['host']] - try: - aggstat[phys_det]['rate'] += doc['rate'] - aggstat[phys_det]['buff'] += doc['buffer_size'] - aggstat[phys_det]['pll_unlocks'] += doc.get('pll', 0) - except Exception as e: - # This is not really important but it's nice if we have it - self.logger.debug(f'Rate calculation ran into {type(e)}: {e}') - pass - - status = self.extract_status(doc, now_time) - statuses[doc['host']] = status - phys_stat[phys_det].append(status) - - for n, doc in self.latest_status[detector]['controller'].items(): - if doc is None: - self.logger.debug(f'{n} seems to have been offline for a few days') - modes.append('none') - run_nums.append(-1) - statuses['none'] = DAQ_STATUS.UNKNOWN - continue - phys_det = self.host_config[doc['host']] - status = self.extract_status(doc, now_time) - statuses[doc['host']] = status - doc['status'] = status - modes.append(doc.get('mode', 'none')) - run_nums.append(doc.get('number', None)) - aggstat[phys_det]['status'] = status - aggstat[phys_det]['mode'] = modes[-1] - aggstat[phys_det]['number'] = run_nums[-1] - phys_stat[phys_det].append(status) - - mode = modes[0] - run_num = run_nums[0] - if not _all(modes, mode) or not _all(run_nums, run_num): - self.logger.error(f'No quorum? {modes}, {run_nums}') - status_list = [DAQ_STATUS.UNKNOWN] - mode = 'none' - run_num = -1 - elif mode != 'none': # readout is "active": - a,b = self.get_hosts_for_mode(mode) - active = a + b - status_list = [v for k,v in statuses.items() if k in active] - else: - status_list = list(statuses.values()) - - # Now we aggregate the statuses - status = self.combine_statuses(status_list) - - self.latest_status[detector]['status'] = status - self.latest_status[detector]['number'] = run_num - self.latest_status[detector]['mode'] = mode - - try: - self.collections['aggregate_status'].insert_many(aggstat.values()) - except Exception as e: - self.logger.error(f'DB snafu? Couldn\'t update aggregate status. ' - f'{type(e)}, {e}') - - self.physical_status = phys_stat - return ret - - def combine_statuses(self, status_list): - # First, the "or" statuses - for stat in ['ARMING','ERROR','TIMEOUT','UNKNOWN']: - if DAQ_STATUS[stat] in status_list: - return DAQ_STATUS[stat] - # then the "and" statuses - for stat in ['IDLE','ARMED','RUNNING']: - if _all(status_list, DAQ_STATUS[stat]): - return DAQ_STATUS[stat] - return DAQ_STATUS.UNKNOWN - - def extract_status(self, doc, now_time): - try: - return DAQ_STATUS.TIMEOUT if self.is_timeout(doc, now_time) else DAQ_STATUS(doc['status']) - except Exception as e: - self.logger.debug(f'Setting status to unknown for {doc.get("host", "unknown")} because of {type(e)}: {e}') - return DAQ_STATUS.UNKNOWN - - def is_timeout(self, doc, t): - """ - Checks to see if the specified status doc corresponds to a timeout situation - """ - host = doc['host'] - dt = t - int(str(doc['_id'])[:8], 16) - has_ackd = self.host_ackd_command(host) - ret = False - self.logger.debug(f'{host} last reported {int(dt)} sec ago') - if dt > self.timeout: - ret = ret or True - if has_ackd is not None and t - has_ackd > self.timeout_take_action: - if host not in self.host_is_timeout: - self.logger.critical(f'{host} hasn\'t ackd a command from {int(t-has_ackd)} sec ago') - self.host_is_timeout.add(host) - if self.host_config[host] == 'tpc': - dt = (now() - self.hv_timeout_fix[host]).total_seconds() - if dt > self.hypervisor_host_restart_timeout: - self.log_error(f'Hypervisor fixes timeout of {host}', "ERROR", "ERROR") - self.hypervisor.handle_timeout(host) - self.hv_timeout_fix[host] = now() - else: - self.logger.debug(f'Not restarting {host}, timeout at {int(dt)}') - ret = ret or True - if not ret and host in self.host_is_timeout: - self.host_is_timeout.discard(host) - return ret - - def get_wanted_state(self): - """ - Figure out what the system is supposed to be doing right now - """ - try: - latest_settings = {} - for detector in self.dc: - latest = None - latest_settings[detector] = {} - for key in self.control_keys: - doc = self.collections['incoming_commands'].find_one( - {'key': f'{detector}.{key}'}, sort=[('_id', -1)]) - if doc is None: - self.logger.error(f'No key {key} for {detector}???') - return None - latest_settings[detector][doc['field']] = doc['value'] - if latest is None or doc['time'] > latest: - latest = doc['time'] - latest_settings[detector]['user'] = doc['user'] - self.goal_state = latest_settings - return self.goal_state - except Exception as e: - self.logger.debug(f'get_wanted_state failed due to {type(e)} {e}') - return None - - def is_linked_mode(self): - """ - Are we in a linked configuration for this control iteration? - """ - # self.dc has the physical detectors, self.latest_status has the logical detectors - return len(self.dc.keys()) != len(self.latest_status.keys()) - - def is_linked(self, a, b): - """ - Check if the detectors are in a compatible linked configuration. - """ - mode_a = self.goal_state[a]["mode"] - mode_b = self.goal_state[b]["mode"] - if mode_a != mode_b: - self.logger.debug(f'{a} and {b} are not linked ({mode_a}/{mode_b})') - # shortcut to lessen the load on the db - return False - - # we don't need to pull the whole combined document because the 'detector' field is at the top level - if (doc := self.collections['options'].find_one({'name': mode_a}, {'detector': 1})) is not None: - detectors = doc['detector'] - else: - detectors = None - - # Check if the linked detectors share the same run mode and - # if they are both present in the detectors list of that mode - # also no "tpc_muon_veto" bullshit, it must be ['tpc', 'muon_veto'] - if isinstance(detectors, list) and a in detectors and b in detectors: - self.logger.debug(f'{a} and {b} are linked ({mode_a}/{detectors})') - return True - else: - self.logger.debug(f'{a} and {b} aren\'t link?? How this happen?? {mode_a} {detectors}') - return False - - def get_logical_detector(self): - """ - Get the Logical Detector configuration - if the detectors are in a compatible linked mode. - - case A: tpc, mv and nv all linked - - case B: tpc, mv and nv all un-linked - - case C: tpc and mv linked, nv un-linked - - case D: tpc and nv linked, mv un-linked - - case E: tpc unlinked, mv and nv linked - We will check the compatibility of the linked mode for a pair of detectors per time. - """ - - tpc = self.dc['tpc'] - mv = self.dc['muon_veto'] - nv = self.dc['neutron_veto'] - - is_tpc_mv = self.is_linked('tpc', 'muon_veto') - is_tpc_nv = self.is_linked('tpc', 'neutron_veto') - is_mv_nv = self.is_linked('muon_veto', 'neutron_veto') - - if is_tpc_mv and is_tpc_nv and is_mv_nv: - # case A - ret = {'all_linked': {'controller': tpc['controller'][:] + mv['controller'][:] + nv['controller'][:], - 'readers': tpc['readers'][:] + mv['readers'][:] + nv['readers'][:], - 'detectors': ['tpc','muon_veto','neutron_veto']}} - elif is_tpc_mv and not is_tpc_nv and not is_mv_nv: - # case C - ret = {'tpc_mv': {'controller': tpc['controller'][:] + mv['controller'][:], - 'readers': tpc['readers'][:] + mv['readers'][:], - 'detectors': ['tpc','muon_veto']}, - 'nv': {'controller': nv['controller'][:], - 'readers': nv['readers'][:], - 'detectors': ['neutron_veto']}} - elif is_tpc_nv and not is_tpc_mv and not is_mv_nv: - # case D - ret = {'tpc_nv': {'controller': tpc['controller'][:] + nv['controller'][:], - 'readers': tpc['readers'][:] + nv['readers'][:], - 'detectors': ['tpc','neutron_veto']}, - 'mv' = {'controller': mv['controller'][:], - 'readers': mv['readers'][:], - 'detectors': ['muon_veto']}} - elif is_mv_nv and not is_tpc_mv and not is_tpc_nv: - # case E - ret = {'tpc': {'controller': tpc['controller'][:], - 'readers': tpc['readers'][:], - 'detectors': ['tpc']}, - 'mv_nv' = {'controller': mv['controller'][:] + nv['controller'][:], - 'readers': mv['readers'][:] + nv['readers'][:], - 'detectors': ['muon_veto','neutron_veto']}} - else: - # case B - ret = {'tpc': {'controller': tpc['controller'][:], - 'readers': tpc['readers'][:], - 'detectors': ['tpc']}, - 'mv' = {'controller': mv['controller'][:], - 'readers': mv['readers'][:], - 'detectors': ['muon_veto']}, - 'nv' = {'controller': nv['controller'][:], - 'readers': nv['readers'][:], - 'detectors': ['neutron_veto']}} - - # convert the host lists to dics for later - for det in list(ret.keys()): - ret[det]['controller'] = {c:{} for c in ret[det]['controller']} - ret[det]['readers'] = {c:{} for c in ret[det]['readers']} - return ret - - def get_run_mode(self, mode): - """ - Pull a run doc from the options collection and add all the includes - """ - if mode is None: - return None - base_doc = self.collections['options'].find_one({'name': mode}) - if base_doc is None: - self.log_error("Mode '%s' doesn't exist" % mode, "info", "info") - return None - if 'includes' not in base_doc or len(base_doc['includes']) == 0: - return base_doc - try: - if self.collections['options'].count_documents({'name': - {'$in': base_doc['includes']}}) != len(base_doc['includes']): - self.log_error("At least one subconfig for mode '%s' doesn't exist" % mode, "WARNING", "WARNING") - return None - return list(self.collections["options"].aggregate([ - {'$match': {'name': mode}}, - {'$lookup': {'from': 'options', 'localField': 'includes', - 'foreignField': 'name', 'as': 'subconfig'}}, - {'$addFields': {'subconfig': {'$concatArrays': ['$subconfig', ['$$ROOT']]}}}, - {'$unwind': '$subconfig'}, - {'$group': {'_id': None, 'config': {'$mergeObjects': '$subconfig'}}}, - {'$replaceWith': '$config'}, - {'$project': {'_id': 0, 'description': 0, 'includes': 0, 'subconfig': 0}}, - ]))[0] - except Exception as e: - self.logger.error("Got a %s exception in doc pulling: %s" % (type(e), e)) - return None - - def get_hosts_for_mode(self, mode, detector=None): - """ - Get the nodes we need from the run mode - """ - if mode is None or mode == 'none': - if detector is None: - self.logger.error('No mode, no detector? wtf?') - return [], [] - return (list(self.latest_status[detector]['readers'].keys()), - list(self.latest_status[detector]['controller'].keys())) - if (doc := self.get_run_mode(mode)) is None: - self.logger.error('How did this happen?') - return [], [] - cc = [] - hostlist = [] - for b in doc['boards']: - if self.digi_type in b['type'] and b['host'] not in hostlist: - hostlist.append(b['host']) - elif b['type'] == self.cc_type and b['host'] not in cc: - cc.append(b['host']) - return hostlist, cc - - def get_next_run_number(self): - try: - cursor = self.collections["run"].find({},{'number': 1}).sort("number", -1).limit(1) - except Exception as e: - self.logger.error(f'Database is having a moment? {type(e)}, {e}') - return NO_NEW_RUN - if cursor.count() == 0: - self.logger.info("wtf, first run?") - return 0 - return list(cursor)[0]['number']+1 - - def set_stop_time(self, number, detectors, force): - """ - Sets the 'end' field of the run doc to the time when the STOP command was ack'd - """ - self.logger.info(f"Updating run {number} with end time ({detectors})") - if number == -1: - return - try: - time.sleep(0.5) # this number depends on the CC command polling time - if (endtime := self.get_ack_time(detectors, 'stop') ) is None: - self.logger.debug(f'No end time found for run {number}') - endtime = now() -datetime.timedelta(seconds=1) - query = {"number": int(number), "end": None, 'detectors': detectors} - updates = {"$set": {"end": endtime}} - if force: - updates["$push"] = {"tags": {"name": "_messy", "user": "daq", - "date": now()}} - if self.collections['run'].update_one(query, updates).modified_count == 1: - self.logger.debug('Update successful') - rate = {} - for doc in self.collections['aggregate_status'].aggregate([ - {'$match': {'number': number}}, - {'$group': {'_id': '$detector', - 'avg': {'$avg': '$rate'}, - 'max': {'$max': '$rate'}}} - ]): - rate[doc['_id']] = {'avg': doc['avg'], 'max': doc['max']} - channels = set() - if 'tpc' in detectors: - # figure out which channels weren't running - readers = list(self.latest_status[detectors]['readers'].keys()) - for doc in self.collections['node_status'].find({'host': {'$in': readers}, 'number': int(number)}): - channels |= set(map(int, doc['channels'].keys())) - updates = {'rate': rate} - if len(channels): - updates['no_data_from'] = sorted(list(set(range(494)) - channels)) - self.collections['run'].update_one({'number': int(number)}, - {'$set': updates}) - if str(number) in self.run_start_cache: - del self.run_start_cache[str(number)] - else: - self.logger.debug('No run updated?') - except Exception as e: - self.logger.error(f"Database having a moment, hope this doesn't crash. {type(e)}, {e}") - return - - def get_ack_time(self, detector, command, recurse=True): - ''' - Finds the time when specified detector's crate controller ack'd the specified command - ''' - # the first cc is the "master", so its ack time is what counts - cc = list(self.latest_status[detector]['controller'].keys())[0] - query = {'host': cc, f'acknowledged.{cc}': {'$ne': 0}, 'command': command} - sort = [('_id', -1)] - doc = self.collections['outgoing_commands'].find_one(query, sort=sort) - dt = (now() - doc['acknowledged'][cc].replace(tzinfo=pytz.utc)).total_seconds() - if dt > 30: # TODO make this a config value - if recurse: - # No way we found the correct command here, maybe we're too soon - self.logger.debug(f'Most recent ack for {detector}-{command} is {dt:.1f}?') - time.sleep(2) # if in doubt - return self.get_ack_time(detector, command, False) - else: - # Welp - self.logger.debug(f'No recent ack time for {detector}-{command}') - return None - return doc['acknowledged'][cc] - - def send_command(self, command, hosts, user, detector, mode="", delay=0, force=False): - """ - Send this command to these hosts. If delay is set then wait that amount of time - """ - number = None - ls = self.latest_status[detector] - for host_list in hosts: - for h in host_list: - if h not in ls['readers'] and h not in ls['controller']: - self.logger.error(f'Trying to issue a {command} to {detector}/{h}?') - host_list.remove(h) - if command == 'stop' and not self.detector_ackd_command(detector, 'stop'): - self.logger.error(f"{detector} hasn't ack'd its last stop, let's not flog a dead horse") - if not force: - return 1 - try: - if command == 'arm': - number = self.get_next_run_number() - if number == NO_NEW_RUN: - return -1 - self.latest_status[detector]['number'] = number - doc_base = { - "command": command, - "user": user, - "detector": detector, - "mode": mode, - "createdAt": now() - } - if command == 'arm': - doc_base['options_override'] = {'number': number} - if delay == 0: - docs = doc_base - docs['host'] = hosts[0]+hosts[1] - docs['acknowledged'] = {h:0 for h in docs['host']} - docs = [docs] - else: - docs = [dict(doc_base.items()), dict(doc_base.items())] - docs[0]['host'], docs[1]['host'] = hosts - docs[0]['acknowledged'] = {h:0 for h in docs[0]['host']} - docs[1]['acknowledged'] = {h:0 for h in docs[1]['host']} - docs[1]['createdAt'] += datetime.timedelta(seconds=delay) - with self.q_mutex: - self.command_queue += docs - except Exception as e: - self.logger.debug(f'SendCommand ran into {type(e)}, {e})') - return -1 - else: - self.logger.debug(f'Queued {command} for {detector}') - self.event.set() - return 0 - - def process_commands(self): - """ - Process our internal command queue - """ - outgoing = self.collections['outgoing_commands'] - while self.run == True: - try: - with self.q_mutex: - if len(self.command_queue) > 1: - self.command_queue.sort(key=lambda d : d['createdAt'].timestamp()) - if len(self.command_queue) > 0: - next_cmd = self.command_queue[0] - dt = (next_cmd['createdAt'].replace(tzinfo=pytz.utc) - now()).total_seconds() - else: - dt = 10 - if dt < 0.01: - with self.q_mutex: - outgoing.insert_one(self.command_queue.pop(0)) - except Exception as e: - dt = 10 - self.logger.error(f"DB down? {type(e)}, {e}") - self.event.clear() - self.event.wait(dt) - - def host_ackd_command(self, host): - """ - Finds the timestamp of the oldest unacknowledged command send to the specified host - :param host: str, the process name to check - :returns: float, the timestamp of the last unack'd command, or None if none exist - """ - q = {'host': host, f'acknowledged.{host}': 0} - sort = [('_id', 1)] - if (doc := self.collections['outgoing_commands'].find_one(q, sort=sort)) is None: - self.logger.debug(f'No unack\'d commands for {host}') - return None - return doc['createdAt'].replace(tzinfo=pytz.utc).timestamp() - - def detector_ackd_command(self, detector, command): - """ - Finds when the specified/most recent command was ack'd - """ - q = {'detector': detector} - sort = [('_id', -1)] - if command is not None: - q['command'] = command - if (doc := self.collections['outgoing_commands'].find_one(q, sort=sort)) is None: - self.logger.error('No previous command found?') - return True - # we can't naively use everything in the hosts field, because we might be transitioning - # out of linked mode, and there might be "garbage" in the host list because someone - # didn't follow very clear instructions, and if a stop is issued to a host that doesn't - # exist, the dispatcher basically stops working - hosts_this_detector = set(self.latest_status[detector]['readers'].keys()) | set(self.latest_status[detector]['controller'].keys()) - hosts_in_doc = set(doc['host']) - hosts_ignored = hosts_in_doc - hosts_this_detector - if len(hosts_ignored): - self.logger.warning(f'Ignoring hosts: {hosts_ignored}') - # so we only loop over the intersection of this detector's hosts and the doc's hosts - for h in hosts_this_detector & hosts_in_doc: - if doc['acknowledged'][h] == 0: - return False - return True - - def log_error(self, message, priority, etype): - #Start by logging the error localy - self.logger.info(message) - # Note that etype allows you to define timeouts. - nowtime = now() - if ( (etype in self.error_sent and self.error_sent[etype] is not None) and - (etype in self.error_timeouts and self.error_timeouts[etype] is not None) and - (nowtime-self.error_sent[etype]).total_seconds() <= self.error_timeouts[etype]): - self.logger.debug("Could log error, but still in timeout for type %s"%etype) - return - self.error_sent[etype] = nowtime - try: - self.collections['log'].insert({ - "user": "dispatcher", - "message": message, - "priority": self.loglevels[priority] - }) - except Exception as e: - self.logger.error(f'Database error, can\'t issue error message: {type(e)}, {e}') - self.logger.info("Error message from dispatcher: %s" % (message)) - return - - def get_run_start(self, number): - """ - Returns the timezone-corrected run start time from the rundoc - """ - if str(number) in self.run_start_cache: - return self.run_start_cache[str(number)] - try: - doc = self.collections['run'].find_one({"number": number}, {"start": 1}) - except Exception as e: - self.logger.error(f'Database is having a moment: {type(e)}, {e}') - return None - if doc is not None and 'start' in doc: - self.run_start_cache[str(number)] = doc['start'].replace(tzinfo=pytz.utc) - return self.run_start_cache[str(number)] - return None - - def insert_run_doc(self, detector): - - if (number := self.get_next_run_number()) == NO_NEW_RUN: - self.logger.error("DB having a moment") - return -1 - # the rundoc gets the physical detectors, not the logical - detectors = self.latest_status[detector]['detectors'] - - run_doc = { - "number": number, - 'detectors': detectors, - 'user': self.goal_state[detector]['user'], - 'mode': self.goal_state[detector]['mode'], - 'bootstrax': {'state': None}, - 'end': None - } - - # If there's a source add the source. Also add the complete ini file. - cfg = self.get_run_mode(self.goal_state[detector]['mode']) - if cfg is not None and 'source' in cfg.keys(): - run_doc['source'] = str(cfg['source']) - run_doc['daq_config'] = cfg - - # If the user started the run with a comment add that too - if "comment" in self.goal_state[detector] and self.goal_state[detector]['comment'] != "": - run_doc['comments'] = [{ - "user": self.goal_state[detector]['user'], - "date": now(), - "comment": self.goal_state[detector]['comment'] - }] - - # Make a data entry so bootstrax can find the thing - if 'strax_output_path' in cfg: - run_doc['data'] = [{ - 'type': 'live', - 'host': 'daq', - 'location': cfg['strax_output_path'] - }] - - # The cc needs some time to get started - time.sleep(self.cc_start_wait) - try: - start_time = self.get_ack_time(detector, 'start') - except Exception as e: - self.logger.error('Couldn\'t find start time ack') - start_time = None - - if start_time is None: - start_time = now()-datetime.timedelta(seconds=2) - # if we miss the ack time, we don't really know when the run started - # so may as well tag it - run_doc['tags'] = [{'name': 'messy', 'user': 'daq', 'date': start_time}] - run_doc['start'] = start_time - - try: - self.collections['run'].insert_one(run_doc) - except Exception as e: - self.logger.error(f'Database having a moment: {type(e)}, {e}') - return -1 - return None From 8475cc797a4459f916d34c66b87ccadd3ed94f06 Mon Sep 17 00:00:00 2001 From: valerioda Date: Mon, 10 Oct 2022 10:55:03 +0200 Subject: [PATCH 03/10] Calculate aggregate status for physical detectors --- dispatcher/.#dispatcher.py | 1 + dispatcher/MongoConnect.py | 20 ++++++++++++++------ dispatcher/config.ini | 4 ++-- dispatcher/dispatcher.py | 1 - 4 files changed, 17 insertions(+), 9 deletions(-) create mode 120000 dispatcher/.#dispatcher.py diff --git a/dispatcher/.#dispatcher.py b/dispatcher/.#dispatcher.py new file mode 120000 index 00000000..e632cf8b --- /dev/null +++ b/dispatcher/.#dispatcher.py @@ -0,0 +1 @@ +dandrea@LAPTOP-TN8QBC6F.22 \ No newline at end of file diff --git a/dispatcher/MongoConnect.py b/dispatcher/MongoConnect.py index adbee0c2..45a8a56d 100644 --- a/dispatcher/MongoConnect.py +++ b/dispatcher/MongoConnect.py @@ -46,7 +46,7 @@ class MongoConnect(object): D. Coderre, 12. Mar. 2019 D. Masson, 2019-2021 S. di Pede, 2020-2021 - V. D'Andrea, May 2022 + V. D'Andrea, Oct 2022 Brief: This code handles the mongo connectivity for both the DAQ databases (the ones used for system-wide communication) and the @@ -305,7 +305,7 @@ def aggregate_status(self): else: status_list = list(statuses.values()) - # Now we aggregate the statuses + # Now we aggregate the statuses for the logical detectors status = self.combine_statuses(status_list) self.latest_status[detector]['status'] = status @@ -318,7 +318,15 @@ def aggregate_status(self): self.logger.error(f'DB snafu? Couldn\'t update aggregate status. ' f'{type(e)}, {e}') + # Aggregate status for the physical detectors + phys_agg_status = {k: {} for k in self.dc} + for detector in phys_agg_status.keys(): + status = self.combine_statuses(phys_stat[detector]) + phys_agg_status[detector]['status'] = status + self.physical_status = phys_stat + self.physical_agg_status = phys_agg_status + return ret def combine_statuses(self, status_list): @@ -464,7 +472,7 @@ def get_logical_detector(self): ret = {'tpc_nv': {'controller': tpc['controller'][:] + nv['controller'][:], 'readers': tpc['readers'][:] + nv['readers'][:], 'detectors': ['tpc','neutron_veto']}, - 'mv' = {'controller': mv['controller'][:], + 'mv': {'controller': mv['controller'][:], 'readers': mv['readers'][:], 'detectors': ['muon_veto']}} elif is_mv_nv and not is_tpc_mv and not is_tpc_nv: @@ -472,7 +480,7 @@ def get_logical_detector(self): ret = {'tpc': {'controller': tpc['controller'][:], 'readers': tpc['readers'][:], 'detectors': ['tpc']}, - 'mv_nv' = {'controller': mv['controller'][:] + nv['controller'][:], + 'mv_nv': {'controller': mv['controller'][:] + nv['controller'][:], 'readers': mv['readers'][:] + nv['readers'][:], 'detectors': ['muon_veto','neutron_veto']}} else: @@ -480,10 +488,10 @@ def get_logical_detector(self): ret = {'tpc': {'controller': tpc['controller'][:], 'readers': tpc['readers'][:], 'detectors': ['tpc']}, - 'mv' = {'controller': mv['controller'][:], + 'mv': {'controller': mv['controller'][:], 'readers': mv['readers'][:], 'detectors': ['muon_veto']}, - 'nv' = {'controller': nv['controller'][:], + 'nv': {'controller': nv['controller'][:], 'readers': nv['readers'][:], 'detectors': ['neutron_veto']}} diff --git a/dispatcher/config.ini b/dispatcher/config.ini index 6471db91..85e6d356 100644 --- a/dispatcher/config.ini +++ b/dispatcher/config.ini @@ -68,8 +68,8 @@ MasterDAQConfig = { "neutron_veto": { "controller": ["reader6_controller_0"], "readers": ["reader6_reader_0", "reader6_reader_1"] - } - } + } + } # Addresses for the VME crates VMEConfig = { diff --git a/dispatcher/dispatcher.py b/dispatcher/dispatcher.py index e8e4eaf9..976bfba0 100755 --- a/dispatcher/dispatcher.py +++ b/dispatcher/dispatcher.py @@ -110,7 +110,6 @@ def main(config, control_mc, logger, daq_config, vme_config, SlackBot, runs_mc, f"tpc-nv: {mc.is_linked('tpc', 'neutron_veto')}, " f"mv-nv: {mc.is_linked('muon_veto', 'neutron_veto')}") logger.debug(msg) - # Decision time. Are we actually in our goal state? If not what should we do? dc.solve_problem(latest_status, goal_state) From f60db3f58156140e0ff130eb5b90419ef6abd761 Mon Sep 17 00:00:00 2001 From: valerioda Date: Mon, 10 Oct 2022 11:58:33 +0200 Subject: [PATCH 04/10] dispatcher checking status of both logical and physical detectors --- dispatcher/.#dispatcher.py | 1 - dispatcher/MongoConnect.py | 34 ++++++++++++++++------------------ dispatcher/dispatcher.py | 15 ++++++++------- 3 files changed, 24 insertions(+), 26 deletions(-) delete mode 120000 dispatcher/.#dispatcher.py diff --git a/dispatcher/.#dispatcher.py b/dispatcher/.#dispatcher.py deleted file mode 120000 index e632cf8b..00000000 --- a/dispatcher/.#dispatcher.py +++ /dev/null @@ -1 +0,0 @@ -dandrea@LAPTOP-TN8QBC6F.22 \ No newline at end of file diff --git a/dispatcher/MongoConnect.py b/dispatcher/MongoConnect.py index 45a8a56d..09ca3700 100644 --- a/dispatcher/MongoConnect.py +++ b/dispatcher/MongoConnect.py @@ -214,8 +214,8 @@ def get_update(self, dc): self.latest_status = dc # Now compute aggregate status - return self.latest_status if self.aggregate_status() is None else None - + return self.aggregate_status() + def clear_error_timeouts(self): self.error_sent = {} @@ -237,17 +237,16 @@ def aggregate_status(self): apply to both """ now_time = time.time() - ret = None aggstat = { - k:{ 'status': -1, - 'detector': k, - 'rate': 0, - 'time': now(), - 'buff': 0, - 'mode': None, - 'pll_unlocks': 0, - 'number': -1} - for k in self.dc} + k:{ 'status': -1, + 'detector': k, + 'rate': 0, + 'time': now(), + 'buff': 0, + 'mode': None, + 'pll_unlocks': 0, + 'number': -1} + for k in self.dc} phys_stat = {k: [] for k in self.dc} for detector in self.latest_status.keys(): # detector = logical @@ -317,18 +316,17 @@ def aggregate_status(self): except Exception as e: self.logger.error(f'DB snafu? Couldn\'t update aggregate status. ' f'{type(e)}, {e}') - + return None + self.physical_status = phys_stat + # Aggregate status for the physical detectors phys_agg_status = {k: {} for k in self.dc} for detector in phys_agg_status.keys(): status = self.combine_statuses(phys_stat[detector]) phys_agg_status[detector]['status'] = status - - self.physical_status = phys_stat self.physical_agg_status = phys_agg_status - - return ret - + return True + def combine_statuses(self, status_list): # First, the "or" statuses for stat in ['ARMING','ERROR','TIMEOUT','UNKNOWN']: diff --git a/dispatcher/dispatcher.py b/dispatcher/dispatcher.py index 976bfba0..60b8eb8e 100755 --- a/dispatcher/dispatcher.py +++ b/dispatcher/dispatcher.py @@ -93,16 +93,17 @@ def main(config, control_mc, logger, daq_config, vme_config, SlackBot, runs_mc, # Get the Logical Detector configuration current_config = mc.get_logical_detector() # Get most recent check-in from all connected hosts - if (latest_status := mc.get_update(current_config)) is None: + if mc.get_update(current_config) is None: continue - + latest_status = mc.latest_status + latest_physical_status = mc.physical_agg_status # Print an update for log_det in latest_status.keys(): - for detector in latest_status[log_det]['detectors'].keys(): - state = 'ACTIVE' if goal_state[detector]['active'] == 'true' else 'INACTIVE' - msg = (f'Logical detector {log_det}: ' - f'The {detector} should be {state} and is ' - f'{latest_status[detector]["status"].name}') + for det in latest_status[log_det]['detectors'].keys(): + state = 'ACTIVE' if goal_state[det]['active'] == 'true' else 'INACTIVE' + msg = (f'{log_det} {det} should be {state}: ' + f'logical detector is {latest_status[log_det]["status"]}, ' + f'physical detector is {latest_physical_status[det]["status"]}') if latest_status[log_det]['number'] != -1: msg += f' ({latest_status[log_det]["number"]})' logger.debug(msg) From 1436b74fe9fd38b8675b8a4ea31bda86875798b8 Mon Sep 17 00:00:00 2001 From: valerioda Date: Mon, 10 Oct 2022 14:25:48 +0200 Subject: [PATCH 05/10] physical detectors status in latest status --- dispatcher/MongoConnect.py | 13 ++++++------- dispatcher/dispatcher.py | 6 ++---- 2 files changed, 8 insertions(+), 11 deletions(-) diff --git a/dispatcher/MongoConnect.py b/dispatcher/MongoConnect.py index 09ca3700..d9687ff4 100644 --- a/dispatcher/MongoConnect.py +++ b/dispatcher/MongoConnect.py @@ -214,7 +214,7 @@ def get_update(self, dc): self.latest_status = dc # Now compute aggregate status - return self.aggregate_status() + return self.latest_status if self.aggregate_status() else None def clear_error_timeouts(self): self.error_sent = {} @@ -318,13 +318,12 @@ def aggregate_status(self): f'{type(e)}, {e}') return None self.physical_status = phys_stat - + # Aggregate status for the physical detectors - phys_agg_status = {k: {} for k in self.dc} - for detector in phys_agg_status.keys(): - status = self.combine_statuses(phys_stat[detector]) - phys_agg_status[detector]['status'] = status - self.physical_agg_status = phys_agg_status + for log_det in self.latest_status.keys(): + for det in self.latest_status[log_det]['detectors'].keys(): + status = self.combine_statuses(phys_stat[det]) + self.latest_status[log_det]['detectors'][det]['status'] = status return True def combine_statuses(self, status_list): diff --git a/dispatcher/dispatcher.py b/dispatcher/dispatcher.py index 60b8eb8e..854ff722 100755 --- a/dispatcher/dispatcher.py +++ b/dispatcher/dispatcher.py @@ -93,17 +93,15 @@ def main(config, control_mc, logger, daq_config, vme_config, SlackBot, runs_mc, # Get the Logical Detector configuration current_config = mc.get_logical_detector() # Get most recent check-in from all connected hosts - if mc.get_update(current_config) is None: + if (latest_status := mc.get_update(current_config)) is None: continue - latest_status = mc.latest_status - latest_physical_status = mc.physical_agg_status # Print an update for log_det in latest_status.keys(): for det in latest_status[log_det]['detectors'].keys(): state = 'ACTIVE' if goal_state[det]['active'] == 'true' else 'INACTIVE' msg = (f'{log_det} {det} should be {state}: ' f'logical detector is {latest_status[log_det]["status"]}, ' - f'physical detector is {latest_physical_status[det]["status"]}') + f'physical detector is {latest_status[log_det]["detectors"][det]["status"]}') if latest_status[log_det]['number'] != -1: msg += f' ({latest_status[log_det]["number"]})' logger.debug(msg) From 37db848aca4417e7284e6b02538943712cfa2db5 Mon Sep 17 00:00:00 2001 From: valerioda Date: Mon, 10 Oct 2022 18:32:54 +0200 Subject: [PATCH 06/10] solve_problem and control_detector working with logical detector --- dispatcher/DAQController.py | 131 +++++++++++++++++++----------------- 1 file changed, 68 insertions(+), 63 deletions(-) diff --git a/dispatcher/DAQController.py b/dispatcher/DAQController.py index a43ee2b4..51d40efe 100644 --- a/dispatcher/DAQController.py +++ b/dispatcher/DAQController.py @@ -14,6 +14,7 @@ class DAQController(): D. Coderre, 12. Mar. 2019 D. Masson, 06 Apr 2020 S. di Pede, 17 Mar 2021 + V. D'Andrea, Oct 2022 Brief: This code handles the logic of what the dispatcher does when. It takes in aggregated status updates and commands from the mongo connector and decides if @@ -87,37 +88,41 @@ def solve_problem(self, latest_status, goal_state): self.latest_status = latest_status self.one_detector_arming = False - for det in latest_status.keys(): - if latest_status[det]['status'] == DAQ_STATUS.IDLE: + for logical in latest_status.keys(): + # TO MODIFY IN ORDER TO TAKE ACTION ALSO ON OTHER DETECTORS + det = list(latest_status[logical]['detectors'].keys())[0] + if latest_status[logical]['status'] == DAQ_STATUS.IDLE: self.can_force_stop[det] = True self.error_stop_count[det] = 0 - if latest_status[det]['status'] in [DAQ_STATUS.ARMING, DAQ_STATUS.ARMED]: + if latest_status[logical]['status'] in [DAQ_STATUS.ARMING, DAQ_STATUS.ARMED]: self.one_detector_arming = True active_states = [DAQ_STATUS.RUNNING, DAQ_STATUS.ARMED, DAQ_STATUS.ARMING, DAQ_STATUS.UNKNOWN] - for det in latest_status.keys(): + for logical in latest_status.keys(): + # Take the first physical detector (only one is needed to retrieve the goal status) + det = list(latest_status[logical]['detectors'].keys())[0] # The detector should be INACTIVE if goal_state[det]['active'] == 'false': # The detector is not in IDLE, ERROR or TIMEOUT: it needs to be stopped - if latest_status[det]['status'] in active_states: + if latest_status[logical]['status'] in active_states: # Check before if the status is UNKNOWN and it is maybe timing out - if latest_status[det]['status'] == DAQ_STATUS.UNKNOWN: - self.logger.info(f"The status of {det} is unknown, check timeouts") - self.check_timeouts(detector=det) + if latest_status[logical]['status'] == DAQ_STATUS.UNKNOWN: + self.logger.info(f"The status of {logical} is unknown, check timeouts") + self.check_timeouts(detector=det) # TODO: CHECK LOGICAL OR PHYSICAL # Otherwise stop the detector else: - self.logger.info(f"Sending stop command to {det}") - self.stop_detector_gently(detector=det) + self.logger.info(f"Sending stop command to {logical}") + self.stop_detector_gently(detector=det) # TODO: CHECK LOGICAL OR PHYSICAL # Deal separately with the TIMEOUT and ERROR statuses, by stopping the detector if needed - elif latest_status[det]['status'] == DAQ_STATUS.TIMEOUT: - self.logger.info(f"The {det} is in timeout, check timeouts") + elif latest_status[logical]['status'] == DAQ_STATUS.TIMEOUT: + self.logger.info(f"The {logical} is in timeout, check timeouts") # TODO update - self.handle_timeout(detector=det) + self.handle_timeout(detector=det) # TODO: CHECK LOGICAL OR PHYSICAL - elif latest_status[det]['status'] == DAQ_STATUS.ERROR: - self.logger.info(f"The {det} has error, sending stop command") - self.control_detector(command='stop', detector=det, force=self.can_force_stop[det]) + elif latest_status[logical]['status'] == DAQ_STATUS.ERROR: + self.logger.info(f"The {logical} has error, sending stop command") + self.control_detector(logical, 'stop', force=self.can_force_stop[det]) self.can_force_stop[det]=False else: # the only remaining option is 'idle', which is fine @@ -125,41 +130,39 @@ def solve_problem(self, latest_status, goal_state): # The detector should be ACTIVE (RUNNING) else: #goal_state[det]['active'] == 'true': - if latest_status[det]['status'] == DAQ_STATUS.RUNNING: - self.logger.info(f"The {det} is running") + if latest_status[logical]['status'] == DAQ_STATUS.RUNNING: + self.logger.info(f"The {logical} is running") self.check_run_turnover(detector=det) # TODO does this work properly? - if latest_status[det]['mode'] != goal_state[det]['mode']: - self.control_detector(command='stop', detector=det) + if latest_status[logical]['mode'] != goal_state[det]['mode']: + self.control_detector(logical, 'stop') # ARMED, start the run - elif latest_status[det]['status'] == DAQ_STATUS.ARMED: - self.logger.info(f"The {det} is armed, sending start command") - self.control_detector(command='start', detector=det) + elif latest_status[logical]['status'] == DAQ_STATUS.ARMED: + self.logger.info(f"The {logical} is armed, sending start command") + self.control_detector(logical,'start') # ARMING, check if it is timing out - elif latest_status[det]['status'] == DAQ_STATUS.ARMING: - self.logger.info(f"The {det} is arming, check timeouts") - self.logger.debug(f"Checking the {det} timeouts") - self.check_timeouts(detector=det, command='arm') + elif latest_status[logical]['status'] == DAQ_STATUS.ARMING: + self.logger.info(f"The {logical} is arming, check timeouts") + self.logger.debug(f"Checking the {logical} timeouts") + self.check_timeouts(detector=det) # TODO: CHECK LOGICAL OR PHYSICAL # UNKNOWN, check if it is timing out - elif latest_status[det]['status'] == DAQ_STATUS.UNKNOWN: - self.logger.info(f"The status of {det} is unknown, check timeouts") - self.logger.debug(f"Checking the {det} timeouts") - self.check_timeouts(detector=det) - + elif latest_status[logical]['status'] == DAQ_STATUS.UNKNOWN: + self.logger.info(f"The status of {logical} is unknown, check timeouts") + self.logger.debug(f"Checking the {logical} timeouts") + self.check_timeouts(detector=det) # TODO: CHECK LOGICAL OR PHYSICAL # Maybe the detector is IDLE, we should arm a run - elif latest_status[det]['status'] == DAQ_STATUS.IDLE: - self.logger.info(f"The {det} is idle, sending arm command") - self.control_detector(command='arm', detector=det) - + elif latest_status[logical]['status'] == DAQ_STATUS.IDLE: + self.logger.info(f"The {logical} is idle, sending arm command") + self.control_detector(logical, 'arm') # Deal separately with the TIMEOUT and ERROR statuses, by stopping the detector if needed - elif latest_status[det]['status'] == DAQ_STATUS.TIMEOUT: - self.logger.info(f"The {det} is in timeout, check timeouts") - self.logger.debug("Checking %s timeouts", det) - self.handle_timeout(detector=det) - - elif latest_status[det]['status'] == DAQ_STATUS.ERROR: - self.logger.info(f"The {det} has error, sending stop command") - self.control_detector(command='stop', detector=det, force=self.can_force_stop[det]) + elif latest_status[logical]['status'] == DAQ_STATUS.TIMEOUT: + self.logger.info(f"The {logical} is in timeout, check timeouts") + self.logger.debug("Checking %s timeouts", logical) + self.handle_timeout(detector=det) # TODO: CHECK LOGICAL OR PHYSICAL + + elif latest_status[logical]['status'] == DAQ_STATUS.ERROR: + self.logger.info(f"The {logical} has error, sending stop command") + self.control_detector(logical,'stop', force=self.can_force_stop[det]) self.can_force_stop[det]=False else: # shouldn't be able to get here @@ -190,22 +193,23 @@ def stop_detector_gently(self, detector): else: self.control_detector(detector=detector, command='stop') - def control_detector(self, command, detector, force=False): + def control_detector(self, logical, command, force=False): """ Issues the command to the detector if allowed by the timeout. Returns 0 if a command was issued and 1 otherwise """ time_now = now() + det = list(self.latest_status[logical]['detectors'].keys())[0] try: - dt = (time_now - self.last_command[command][detector]).total_seconds() + dt = (time_now - self.last_command[command][det]).total_seconds() except (KeyError, TypeError): dt = 2*self.timeouts[command] # make sure we don't rush things if command == 'start': - dt_last = (time_now - self.last_command['arm'][detector]).total_seconds() + dt_last = (time_now - self.last_command['arm'][det]).total_seconds() elif command == 'arm': - dt_last = (time_now - self.last_command['stop'][detector]).total_seconds() + dt_last = (time_now - self.last_command['stop'][det]).total_seconds() else: dt_last = self.time_between_commands*2 @@ -214,43 +218,44 @@ def control_detector(self, command, detector, force=False): gs = self.goal_state if command == 'arm': if self.one_detector_arming: - self.logger.info('Another detector already arming, can\'t arm %s' % detector) + self.logger.info('Another detector already arming, can\'t arm %s' % logical) # this leads to run number overlaps return 1 - readers, cc = self.mongo.get_hosts_for_mode(gs[detector]['mode']) + readers, cc = self.mongo.get_hosts_for_mode(gs[det]['mode']) hosts = (cc, readers) delay = 0 self.one_detector_arming = True elif command == 'start': - readers, cc = self.mongo.get_hosts_for_mode(ls[detector]['mode']) + readers, cc = self.mongo.get_hosts_for_mode(ls[logical]['mode']) hosts = (readers, cc) delay = self.start_cmd_delay - #Reset arming timeout counter - self.missed_arm_cycles[detector]=0 + #Reset arming timeout counter + for dd in self.latest_status[logical]['detectors'].keys(): + self.missed_arm_cycles[dd] = 0 else: # stop - readers, cc = self.mongo.get_hosts_for_mode(ls[detector]['mode'], detector) + readers, cc = self.mongo.get_hosts_for_mode(ls[logical]['mode'], detector) hosts = (cc, readers) - if force or ls[detector]['status'] not in [DAQ_STATUS.RUNNING]: + if force or ls[logical]['status'] not in [DAQ_STATUS.RUNNING]: delay = 0 else: delay = self.stop_cmd_delay - self.logger.debug(f'Sending {command.upper()} to {detector}') - if self.mongo.send_command(command, hosts, gs[detector]['user'], - detector, gs[detector]['mode'], delay, force): + self.logger.debug(f'Sending {command.upper()} to {logical}') + if self.mongo.send_command(command, hosts, gs[det]['user'], + logical, gs[det]['mode'], delay, force): # failed return 1 - self.last_command[command][detector] = time_now - if command == 'start' and self.mongo.insert_run_doc(detector): + self.last_command[command][det] = time_now + if command == 'start' and self.mongo.insert_run_doc(logical): # db having a moment return 0 - if (command == 'stop' and ls[detector]['number'] != -1 and - self.mongo.set_stop_time(ls[detector]['number'], detector, force)): + if (command == 'stop' and ls[logical]['number'] != -1 and + self.mongo.set_stop_time(ls[logical]['number'], logical, force)): # db having a moment return 0 else: self.logger.debug('Can\'t send %s to %s, timeout at %i/%i' % ( - command, detector, dt, self.timeouts[command])) + command, logical, dt, self.timeouts[command])) return 1 return 0 From eb600f9f177d2622dabf2591e158bb9a9d34e984 Mon Sep 17 00:00:00 2001 From: valerioda Date: Tue, 11 Oct 2022 12:14:23 +0200 Subject: [PATCH 07/10] check logical and physical detectors --- dispatcher/DAQController.py | 59 ++++++++++++++++++------------------- 1 file changed, 29 insertions(+), 30 deletions(-) diff --git a/dispatcher/DAQController.py b/dispatcher/DAQController.py index 51d40efe..b5268d52 100644 --- a/dispatcher/DAQController.py +++ b/dispatcher/DAQController.py @@ -29,10 +29,9 @@ def __init__(self, config, daq_config, mongo_connector, logger, hypervisor): self.goal_state = {} self.latest_status = {} - # Timeouts. There are a few things that we want to wait for that might take time. # The keys for these dicts will be detector identifiers. - detectors = list(daq_config.keys()) + detectors = list(daq_config.keys()) # physical detectors self.last_command = {} for k in ['arm', 'start', 'stop']: self.last_command[k] = {} @@ -40,7 +39,7 @@ def __init__(self, config, daq_config, mongo_connector, logger, hypervisor): self.last_command[k][d] = now() self.error_stop_count = {d : 0 for d in detectors} self.max_arm_cycles = int(config['MaxArmCycles']) - self.missed_arm_cycles={k:0 for k in detectors} + self.missed_arm_cycles = {k:0 for k in detectors} # Timeout properties come from config self.timeouts = { @@ -53,7 +52,7 @@ def __init__(self, config, daq_config, mongo_connector, logger, hypervisor): self.logger = logger self.time_between_commands = int(config['TimeBetweenCommands']) - self.can_force_stop={k:True for k in detectors} + self.can_force_stop = {k:True for k in detectors} self.one_detector_arming = False self.start_cmd_delay = float(config['StartCmdDelay']) @@ -89,7 +88,6 @@ def solve_problem(self, latest_status, goal_state): self.one_detector_arming = False for logical in latest_status.keys(): - # TO MODIFY IN ORDER TO TAKE ACTION ALSO ON OTHER DETECTORS det = list(latest_status[logical]['detectors'].keys())[0] if latest_status[logical]['status'] == DAQ_STATUS.IDLE: self.can_force_stop[det] = True @@ -98,9 +96,9 @@ def solve_problem(self, latest_status, goal_state): self.one_detector_arming = True active_states = [DAQ_STATUS.RUNNING, DAQ_STATUS.ARMED, DAQ_STATUS.ARMING, DAQ_STATUS.UNKNOWN] - + for logical in latest_status.keys(): - # Take the first physical detector (only one is needed to retrieve the goal status) + # Take the first physical detector, only one is needed to retrieve the goal status det = list(latest_status[logical]['detectors'].keys())[0] # The detector should be INACTIVE if goal_state[det]['active'] == 'false': @@ -117,13 +115,12 @@ def solve_problem(self, latest_status, goal_state): # Deal separately with the TIMEOUT and ERROR statuses, by stopping the detector if needed elif latest_status[logical]['status'] == DAQ_STATUS.TIMEOUT: self.logger.info(f"The {logical} is in timeout, check timeouts") - # TODO update - self.handle_timeout(detector=det) # TODO: CHECK LOGICAL OR PHYSICAL + self.handle_timeout(logical) elif latest_status[logical]['status'] == DAQ_STATUS.ERROR: self.logger.info(f"The {logical} has error, sending stop command") self.control_detector(logical, 'stop', force=self.can_force_stop[det]) - self.can_force_stop[det]=False + self.can_force_stop[det] = False else: # the only remaining option is 'idle', which is fine pass @@ -132,7 +129,7 @@ def solve_problem(self, latest_status, goal_state): else: #goal_state[det]['active'] == 'true': if latest_status[logical]['status'] == DAQ_STATUS.RUNNING: self.logger.info(f"The {logical} is running") - self.check_run_turnover(detector=det) + self.check_run_turnover(logical) # TODO does this work properly? if latest_status[logical]['mode'] != goal_state[det]['mode']: self.control_detector(logical, 'stop') @@ -158,42 +155,44 @@ def solve_problem(self, latest_status, goal_state): elif latest_status[logical]['status'] == DAQ_STATUS.TIMEOUT: self.logger.info(f"The {logical} is in timeout, check timeouts") self.logger.debug("Checking %s timeouts", logical) - self.handle_timeout(detector=det) # TODO: CHECK LOGICAL OR PHYSICAL + self.handle_timeout(logical) elif latest_status[logical]['status'] == DAQ_STATUS.ERROR: self.logger.info(f"The {logical} has error, sending stop command") self.control_detector(logical,'stop', force=self.can_force_stop[det]) - self.can_force_stop[det]=False + self.can_force_stop[det] = False else: # shouldn't be able to get here pass return - def handle_timeout(self, detector): + def handle_timeout(self, logical): """ Detector already in the TIMEOUT status are directly stopped. """ - self.control_detector(command='stop', detector=detector, force=self.can_force_stop[detector]) - self.can_force_stop[detector]=False - self.check_timeouts(detector) + det = list(latest_status[logical]['detectors'].keys())[0] + self.control_detector(logical, 'stop', force = self.can_force_stop[det]) + self.can_force_stop[det] = False + self.check_timeouts(detector) # TODO: CHECK LOGICAL OR PHYSICAL return - def stop_detector_gently(self, detector): + def stop_detector_gently(self, logical): """ Stops the detector, unless we're told to wait for the current run to end """ + det = list(latest_status[logical]['detectors'].keys())[0] if ( # Running normally (not arming, error, timeout, etc) - self.latest_status[detector]['status'] == DAQ_STATUS.RUNNING and + self.latest_status[logical]['status'] == DAQ_STATUS.RUNNING and # We were asked to wait for the current run to stop - self.goal_state[detector].get('softstop', 'false') == 'true'): - self.check_run_turnover(detector) + self.goal_state[det].get('softstop', 'false') == 'true'): + self.check_run_turnover(logical) else: - self.control_detector(detector=detector, command='stop') + self.control_detector(logical,'stop') - def control_detector(self, logical, command, force=False): + def control_detector(self, logical, command, force = False): """ Issues the command to the detector if allowed by the timeout. Returns 0 if a command was issued and 1 otherwise @@ -348,23 +347,23 @@ def throw_error(self): 'ERROR', "GENERAL_ERROR") - def check_run_turnover(self, detector): + def check_run_turnover(self, logical): """ During normal operation we want to run for a certain number of minutes, then automatically stop and restart the run. No biggie. We check the time here to see if it's something we have to do. """ - - number = self.latest_status[detector]['number'] + det = list(latest_status[logical]['detectors'].keys())[0] + number = self.latest_status[logical]['number'] start_time = self.mongo.get_run_start(number) if start_time is None: self.logger.debug(f'No start time for {number}?') return time_now = now() - run_length = int(self.goal_state[detector]['stop_after'])*60 + run_length = int(self.goal_state[det]['stop_after'])*60 run_duration = (time_now - start_time).total_seconds() - self.logger.debug('Checking run turnover for %s: %i/%i' % (detector, run_duration, run_length)) + self.logger.debug('Checking run turnover for %s: %i/%i' % (logical, run_duration, run_length)) if run_duration > run_length: - self.logger.info('Stopping run for %s' % detector) - self.control_detector(detector=detector, command='stop') + self.logger.info('Stopping run for %s' % logical) + self.control_detector(logical, 'stop') From 4d88fe120ef74cfcbebc4472a1f0382e4d6ca784 Mon Sep 17 00:00:00 2001 From: valerioda Date: Tue, 11 Oct 2022 15:53:05 +0200 Subject: [PATCH 08/10] dc.check_timeouts with logical detector input --- dispatcher/DAQController.py | 58 ++++++++++++++++++------------------- dispatcher/MongoConnect.py | 52 +++++++++++++++++---------------- 2 files changed, 56 insertions(+), 54 deletions(-) diff --git a/dispatcher/DAQController.py b/dispatcher/DAQController.py index b5268d52..8274628c 100644 --- a/dispatcher/DAQController.py +++ b/dispatcher/DAQController.py @@ -258,7 +258,7 @@ def control_detector(self, logical, command, force = False): return 1 return 0 - def check_timeouts(self, detector, command=None): + def check_timeouts(self, logical, command=None): """ This one is invoked if we think we need to change states. Either a stop command needs to be sent, or we've detected an anomaly and want to decide what to do. @@ -269,13 +269,13 @@ def check_timeouts(self, detector, command=None): """ time_now = now() - - #First check how often we have been timing out, if it happened to often - # something bad happened and we start from scratch again - if self.missed_arm_cycles[detector]>self.max_arm_cycles and detector=='tpc': + det = list(self.latest_status[logical]['detectors'].keys())[0] + #How often we have been timing out? + if self.missed_arm_cycles[det] > self.max_arm_cycles and det == 'tpc': if (dt := (now()-self.last_nuke).total_seconds()) > self.hv_nuclear_timeout: self.logger.critical('There\'s only one way to be sure') - self.control_detector(detector='tpc', command='stop', force=True) + #self.control_detector(detector='tpc', command='stop', force=True) + self.control_detector(logical,'stop', force=True) if self.hypervisor.tactical_nuclear_option(self.mongo.is_linked_mode()): self.last_nuke = now() else: @@ -283,24 +283,21 @@ def check_timeouts(self, detector, command=None): self.logger.debug(f'Nuclear timeout at {int(dt)}/{self.hv_nuclear_timeout}') if command is None: # not specified, we figure out it here - command_times = [(cmd,doc[detector]) for cmd,doc in self.last_command.items()] + command_times = [(cmd,doc[det]) for cmd,doc in self.last_command.items()] command = sorted(command_times, key=lambda x : x[1])[-1][0] - self.logger.debug(f'Most recent command for {detector} is {command}') + self.logger.debug(f'Most recent command for {logical} is {command}') else: - self.logger.debug(f'Checking {command} timeout for {detector}') + self.logger.debug(f'Checking {command} timeout for {logical}') - dt = (time_now - self.last_command[command][detector]).total_seconds() + dt = (time_now - self.last_command[command][det]).total_seconds() local_timeouts = dict(self.timeouts.items()) - local_timeouts['stop'] = self.timeouts['stop']*(self.error_stop_count[detector]+1) + local_timeouts['stop'] = self.timeouts['stop']*(self.error_stop_count[det]+1) - if dt < local_timeouts[command]: - self.logger.debug('%i is within the %i second timeout for a %s command' % - (dt, local_timeouts[command], command)) - else: + if dt > local_timeouts[command]: # timing out, maybe send stop? if command == 'stop': - if self.error_stop_count[detector] >= self.stop_retries: + if self.error_stop_count[det] >= self.stop_retries: # failed too many times, issue error self.mongo.log_error( ("Dispatcher control loop detects a timeout that STOP " + @@ -308,34 +305,37 @@ def check_timeouts(self, detector, command=None): 'ERROR', "STOP_TIMEOUT") # also invoke the nuclear option - if detector == 'tpc': + if det == 'tpc': if (dt := (now()-self.last_nuke).total_seconds()) > self.hv_nuclear_timeout: - self.control_detector(detector='tpc', command='stop', force=True) + self.control_detector(logical,'stop', force=True) self.logger.critical('There\'s only one way to be sure') if self.hypervisor.tactical_nuclear_option(self.mongo.is_linked_mode()): self.last_nuke = now() else: - self.control_detector(detector=detector, command='stop') + self.control_detector(logical, 'stop') self.logger.debug(f'Nuclear timeout at {int(dt)}/{self.hv_nuclear_timeout}') - self.error_stop_count[detector] = 0 + self.error_stop_count[det] = 0 else: - self.control_detector(detector=detector, command='stop') - self.logger.debug(f'Working on a stop counter for {detector}') - self.error_stop_count[detector] += 1 + self.control_detector(logical, 'stop') + self.logger.debug(f'Working on a stop counter for {logical}') + self.error_stop_count[det] += 1 else: self.mongo.log_error( ('%s took more than %i seconds to %s, indicating a possible timeout or error' % - (detector, self.timeouts[command], command)), + (logical, self.timeouts[command], command)), 'ERROR', '%s_TIMEOUT' % command.upper()) #Keep track of how often the arming sequence times out - if self.control_detector(detector=detector, command='stop') == 0: + if self.control_detector(logical, 'stop') == 0: # only increment the counter if we actually issued a STOP - self.missed_arm_cycles[detector] += 1 - self.logger.info(f'{detector} missed {self.missed_arm_cycles[detector]} arm cycles') + self.missed_arm_cycles[det] += 1 + self.logger.info(f'{logical} missed {self.missed_arm_cycles[det]} arm cycles') else: - self.logger.debug(f'{detector} didn\'t actually get a command, no arm cycler increment') - + self.logger.debug(f'{logical} didn\'t actually get a command, no arm cycler increment') + else: + self.logger.debug('%i is within the %i second timeout for a %s command' % + (dt, local_timeouts[command], command)) + return def throw_error(self): diff --git a/dispatcher/MongoConnect.py b/dispatcher/MongoConnect.py index d9687ff4..03117185 100644 --- a/dispatcher/MongoConnect.py +++ b/dispatcher/MongoConnect.py @@ -320,10 +320,10 @@ def aggregate_status(self): self.physical_status = phys_stat # Aggregate status for the physical detectors - for log_det in self.latest_status.keys(): - for det in self.latest_status[log_det]['detectors'].keys(): + for logical in self.latest_status.keys(): + for det in self.latest_status[logical]['detectors'].keys(): status = self.combine_statuses(phys_stat[det]) - self.latest_status[log_det]['detectors'][det]['status'] = status + self.latest_status[logical]['detectors'][det]['status'] = status return True def combine_statuses(self, status_list): @@ -562,19 +562,20 @@ def get_next_run_number(self): return 0 return list(cursor)[0]['number']+1 - def set_stop_time(self, number, detectors, force): + def set_stop_time(self, number, logical, force): """ Sets the 'end' field of the run doc to the time when the STOP command was ack'd """ - self.logger.info(f"Updating run {number} with end time ({detectors})") + self.logger.info(f"Updating run {number} with end time ({logical})") if number == -1: return try: time.sleep(0.5) # this number depends on the CC command polling time - if (endtime := self.get_ack_time(detectors, 'stop') ) is None: + if (endtime := self.get_ack_time(logical, 'stop') ) is None: self.logger.debug(f'No end time found for run {number}') - endtime = now() -datetime.timedelta(seconds=1) - query = {"number": int(number), "end": None, 'detectors': detectors} + endtime = now() - datetime.timedelta(seconds=1) + det = list(latest_status[logical]['detectors'].keys())[0] + query = {"number": int(number), "end": None, 'detectors': det} updates = {"$set": {"end": endtime}} if force: updates["$push"] = {"tags": {"name": "_messy", "user": "daq", @@ -590,10 +591,11 @@ def set_stop_time(self, number, detectors, force): ]): rate[doc['_id']] = {'avg': doc['avg'], 'max': doc['max']} channels = set() - if 'tpc' in detectors: + + if 'tpc' in latest_status[logical]['detectors'].keys(): # figure out which channels weren't running - readers = list(self.latest_status[detectors]['readers'].keys()) - for doc in self.collections['node_status'].find({'host': {'$in': readers}, 'number': int(number)}): + readers = list(self.latest_status[logical]['readers'].keys()) + for doc in self.collections['node_status'].find({'host': {'$in': readers},'number': int(number)}): channels |= set(map(int, doc['channels'].keys())) updates = {'rate': rate} if len(channels): @@ -608,12 +610,12 @@ def set_stop_time(self, number, detectors, force): self.logger.error(f"Database having a moment, hope this doesn't crash. {type(e)}, {e}") return - def get_ack_time(self, detector, command, recurse=True): + def get_ack_time(self, logical, command, recurse=True): ''' Finds the time when specified detector's crate controller ack'd the specified command ''' # the first cc is the "master", so its ack time is what counts - cc = list(self.latest_status[detector]['controller'].keys())[0] + cc = list(self.latest_status[logical]['controller'].keys())[0] query = {'host': cc, f'acknowledged.{cc}': {'$ne': 0}, 'command': command} sort = [('_id', -1)] doc = self.collections['outgoing_commands'].find_one(query, sort=sort) @@ -621,12 +623,12 @@ def get_ack_time(self, detector, command, recurse=True): if dt > 30: # TODO make this a config value if recurse: # No way we found the correct command here, maybe we're too soon - self.logger.debug(f'Most recent ack for {detector}-{command} is {dt:.1f}?') + self.logger.debug(f'Most recent ack for {logical}-{command} is {dt:.1f}?') time.sleep(2) # if in doubt - return self.get_ack_time(detector, command, False) + return self.get_ack_time(logical, command, False) else: # Welp - self.logger.debug(f'No recent ack time for {detector}-{command}') + self.logger.debug(f'No recent ack time for {logical}-{command}') return None return doc['acknowledged'][cc] @@ -782,35 +784,35 @@ def get_run_start(self, number): return self.run_start_cache[str(number)] return None - def insert_run_doc(self, detector): + def insert_run_doc(self, logical): if (number := self.get_next_run_number()) == NO_NEW_RUN: self.logger.error("DB having a moment") return -1 # the rundoc gets the physical detectors, not the logical detectors = self.latest_status[detector]['detectors'] - + det = list(detectors.keys())[0]: run_doc = { "number": number, 'detectors': detectors, - 'user': self.goal_state[detector]['user'], - 'mode': self.goal_state[detector]['mode'], + 'user': self.goal_state[det]['user'], + 'mode': self.goal_state[det]['mode'], 'bootstrax': {'state': None}, 'end': None } # If there's a source add the source. Also add the complete ini file. - cfg = self.get_run_mode(self.goal_state[detector]['mode']) + cfg = self.get_run_mode(self.goal_state[det]['mode']) if cfg is not None and 'source' in cfg.keys(): run_doc['source'] = str(cfg['source']) run_doc['daq_config'] = cfg # If the user started the run with a comment add that too - if "comment" in self.goal_state[detector] and self.goal_state[detector]['comment'] != "": + if "comment" in self.goal_state[det] and self.goal_state[det]['comment'] != "": run_doc['comments'] = [{ - "user": self.goal_state[detector]['user'], + "user": self.goal_state[det]['user'], "date": now(), - "comment": self.goal_state[detector]['comment'] + "comment": self.goal_state[det]['comment'] }] # Make a data entry so bootstrax can find the thing @@ -824,7 +826,7 @@ def insert_run_doc(self, detector): # The cc needs some time to get started time.sleep(self.cc_start_wait) try: - start_time = self.get_ack_time(detector, 'start') + start_time = self.get_ack_time(logical, 'start') except Exception as e: self.logger.error('Couldn\'t find start time ack') start_time = None From 5dd338b5785c8f4cbc89dc0b0f3c7606aa029ae5 Mon Sep 17 00:00:00 2001 From: valerioda Date: Wed, 12 Oct 2022 18:07:05 +0200 Subject: [PATCH 09/10] minor changes --- dispatcher/DAQController.py | 18 ++++++++---------- dispatcher/MongoConnect.py | 6 +++--- dispatcher/dispatcher.py | 14 +++++++------- dispatcher/hypervisor.py | 33 ++++++++++++++++++++------------- 4 files changed, 38 insertions(+), 33 deletions(-) diff --git a/dispatcher/DAQController.py b/dispatcher/DAQController.py index 8274628c..ac20a170 100644 --- a/dispatcher/DAQController.py +++ b/dispatcher/DAQController.py @@ -37,9 +37,9 @@ def __init__(self, config, daq_config, mongo_connector, logger, hypervisor): self.last_command[k] = {} for d in detectors: self.last_command[k][d] = now() - self.error_stop_count = {d : 0 for d in detectors} + self.error_stop_count = {d: 0 for d in detectors} self.max_arm_cycles = int(config['MaxArmCycles']) - self.missed_arm_cycles = {k:0 for k in detectors} + self.missed_arm_cycles = {k: 0 for k in detectors} # Timeout properties come from config self.timeouts = { @@ -107,11 +107,11 @@ def solve_problem(self, latest_status, goal_state): # Check before if the status is UNKNOWN and it is maybe timing out if latest_status[logical]['status'] == DAQ_STATUS.UNKNOWN: self.logger.info(f"The status of {logical} is unknown, check timeouts") - self.check_timeouts(detector=det) # TODO: CHECK LOGICAL OR PHYSICAL + self.check_timeouts(logical) # Otherwise stop the detector else: self.logger.info(f"Sending stop command to {logical}") - self.stop_detector_gently(detector=det) # TODO: CHECK LOGICAL OR PHYSICAL + self.stop_detector_gently(logical) # Deal separately with the TIMEOUT and ERROR statuses, by stopping the detector if needed elif latest_status[logical]['status'] == DAQ_STATUS.TIMEOUT: self.logger.info(f"The {logical} is in timeout, check timeouts") @@ -141,12 +141,12 @@ def solve_problem(self, latest_status, goal_state): elif latest_status[logical]['status'] == DAQ_STATUS.ARMING: self.logger.info(f"The {logical} is arming, check timeouts") self.logger.debug(f"Checking the {logical} timeouts") - self.check_timeouts(detector=det) # TODO: CHECK LOGICAL OR PHYSICAL + self.check_timeouts(logical) # UNKNOWN, check if it is timing out elif latest_status[logical]['status'] == DAQ_STATUS.UNKNOWN: self.logger.info(f"The status of {logical} is unknown, check timeouts") self.logger.debug(f"Checking the {logical} timeouts") - self.check_timeouts(detector=det) # TODO: CHECK LOGICAL OR PHYSICAL + self.check_timeouts(logical) # Maybe the detector is IDLE, we should arm a run elif latest_status[logical]['status'] == DAQ_STATUS.IDLE: self.logger.info(f"The {logical} is idle, sending arm command") @@ -174,7 +174,7 @@ def handle_timeout(self, logical): det = list(latest_status[logical]['detectors'].keys())[0] self.control_detector(logical, 'stop', force = self.can_force_stop[det]) self.can_force_stop[det] = False - self.check_timeouts(detector) # TODO: CHECK LOGICAL OR PHYSICAL + self.check_timeouts(logical) return def stop_detector_gently(self, logical): @@ -229,8 +229,7 @@ def control_detector(self, logical, command, force = False): hosts = (readers, cc) delay = self.start_cmd_delay #Reset arming timeout counter - for dd in self.latest_status[logical]['detectors'].keys(): - self.missed_arm_cycles[dd] = 0 + self.missed_arm_cycles[det] = 0 else: # stop readers, cc = self.mongo.get_hosts_for_mode(ls[logical]['mode'], detector) hosts = (cc, readers) @@ -274,7 +273,6 @@ def check_timeouts(self, logical, command=None): if self.missed_arm_cycles[det] > self.max_arm_cycles and det == 'tpc': if (dt := (now()-self.last_nuke).total_seconds()) > self.hv_nuclear_timeout: self.logger.critical('There\'s only one way to be sure') - #self.control_detector(detector='tpc', command='stop', force=True) self.control_detector(logical,'stop', force=True) if self.hypervisor.tactical_nuclear_option(self.mongo.is_linked_mode()): self.last_nuke = now() diff --git a/dispatcher/MongoConnect.py b/dispatcher/MongoConnect.py index 03117185..20eeec15 100644 --- a/dispatcher/MongoConnect.py +++ b/dispatcher/MongoConnect.py @@ -131,13 +131,13 @@ def __init__(self, config, daq_config, logger, control_mc, runs_mc, hypervisor, self.dc = daq_config self.hv_timeout_fix = {} for detector in self.dc: - self.latest_status[detector] = {'readers': {}, 'controller': {}} + #self.latest_status[detector] = {'readers': {}, 'controller': {}} for reader in self.dc[detector]['readers']: - self.latest_status[detector]['readers'][reader] = {} + #self.latest_status[detector]['readers'][reader] = {} self.host_config[reader] = detector self.hv_timeout_fix[reader] = now() for controller in self.dc[detector]['controller']: - self.latest_status[detector]['controller'][controller] = {} + #self.latest_status[detector]['controller'][controller] = {} self.host_config[controller] = detector self.hv_timeout_fix[controller] = now() diff --git a/dispatcher/dispatcher.py b/dispatcher/dispatcher.py index 854ff722..266968f6 100755 --- a/dispatcher/dispatcher.py +++ b/dispatcher/dispatcher.py @@ -96,14 +96,14 @@ def main(config, control_mc, logger, daq_config, vme_config, SlackBot, runs_mc, if (latest_status := mc.get_update(current_config)) is None: continue # Print an update - for log_det in latest_status.keys(): - for det in latest_status[log_det]['detectors'].keys(): + for logical in latest_status.keys(): + for det in latest_status[logical]['detectors'].keys(): state = 'ACTIVE' if goal_state[det]['active'] == 'true' else 'INACTIVE' - msg = (f'{log_det} {det} should be {state}: ' - f'logical detector is {latest_status[log_det]["status"]}, ' - f'physical detector is {latest_status[log_det]["detectors"][det]["status"]}') - if latest_status[log_det]['number'] != -1: - msg += f' ({latest_status[log_det]["number"]})' + msg = (f'{logical} {det} should be {state}: ' + f'logical detector is {latest_status[logical]["status"]}, ' + f'physical detector is {latest_status[logical]["detectors"][det]["status"]}') + if latest_status[logical]['number'] != -1: + msg += f' ({latest_status[logical]["number"]})' logger.debug(msg) msg = (f"Linking: tpc-mv: {mc.is_linked('tpc', 'muon_veto')}, " f"tpc-nv: {mc.is_linked('tpc', 'neutron_veto')}, " diff --git a/dispatcher/hypervisor.py b/dispatcher/hypervisor.py index a3df0a53..3fb7222b 100644 --- a/dispatcher/hypervisor.py +++ b/dispatcher/hypervisor.py @@ -405,19 +405,26 @@ def linked_nuclear_option(self): if not hasattr(self, 'mongo_connect'): self.logger.error('Darryl hasn\'t made this work in testing yet') raise ValueError('This only works in prod') - ok, not_ok = [], [] + + timeout_list, running_list = [], [] physical_status = self.mongo_connect.physical_status - for phys_det, statuses in physical_status.items(): - if self.mongo_connect.combine_statuses(statuses) in [daqnt.DAQ_STATUS.TIMEOUT]: - not_ok.append(phys_det) - else: - ok.append(phys_det) - self.logger.debug(f'These detectors are ok: {ok}, these aren\'t: {not_ok}') - if self.detector in not_ok: - # welp, looks like we're part of the problem + latest_status = self.mongo_connect.latest_status + for logical in latest_status.keys(): + log_status = latest_status[logical]["status"] + for det in latest_status[logical]['detectors'].keys(): + phy_status = physical_status[det]['status'] + logger.debug(f'{logical} is {log_status}, {det} is {phy_status}') + if phy_status in [daqnt.DAQ_STATUS.TIMEOUT]: + timeout_list.append(phys_det) + else: + running_list.append(phys_det) + self.logger.debug(f'These detectors are not in timeout: {running_list}, ' + f'these are in timeout: {ntimeout_list}') + # check if the TPC is part of the problem + if 'tpc' in timeout_list: return False - if len(ok) == len(physical_status): + if len(timeout_list) == len(physical_status): self.logger.error('Uh, how did you get here???') self.slackbot.send_message('This happened again, you should really' ' get someone to fix this', tags='ALL') @@ -428,12 +435,12 @@ def linked_nuclear_option(self): add_tags='ALL') # ok, we aren't the problem, let's see about unlinking - if len(ok) == 1: + if len(running_list) == 1: # everyone else has died, it's just us left logical_detectors = physical_status.keys() - elif self.mongo_connect.is_linked(ok[0], ok[1]): + elif self.mongo_connect.is_linked(running_list[0], running_list[1]): # the detector we are linked with is fine, the other one crashed - logical_detectors = [ok, not_ok] + logical_detectors = [running_list, timeout_list] else: # the detector we linked with crashed, and the other one is fine logical_detectors = physical_status.keys() From 7f56de9feb4e860691d93790500ba9cb8a150cde Mon Sep 17 00:00:00 2001 From: valerioda Date: Fri, 14 Oct 2022 12:02:15 +0200 Subject: [PATCH 10/10] aggregate physical detector status in linked nuclear option --- dispatcher/MongoConnect.py | 2 +- dispatcher/hypervisor.py | 39 +++++++++++++++++++++++++++++++------- 2 files changed, 33 insertions(+), 8 deletions(-) diff --git a/dispatcher/MongoConnect.py b/dispatcher/MongoConnect.py index 20eeec15..cae2e3b3 100644 --- a/dispatcher/MongoConnect.py +++ b/dispatcher/MongoConnect.py @@ -318,7 +318,7 @@ def aggregate_status(self): f'{type(e)}, {e}') return None self.physical_status = phys_stat - + # Aggregate status for the physical detectors for logical in self.latest_status.keys(): for det in self.latest_status[logical]['detectors'].keys(): diff --git a/dispatcher/hypervisor.py b/dispatcher/hypervisor.py index 3fb7222b..625fd62f 100644 --- a/dispatcher/hypervisor.py +++ b/dispatcher/hypervisor.py @@ -14,7 +14,7 @@ def date_now(): class Hypervisor(object): - __version__ = '4.0.3' + __version__ = '4.0.4' def __init__(self, db, logger, @@ -409,17 +409,42 @@ def linked_nuclear_option(self): timeout_list, running_list = [], [] physical_status = self.mongo_connect.physical_status latest_status = self.mongo_connect.latest_status + host_config = self.mongo_connect.host_config + phys_stat = {k: [] for k in config} + now_time = time.time() for logical in latest_status.keys(): log_status = latest_status[logical]["status"] + for n, doc in latest_status[logical]['readers'].items(): + if doc is None: + self.logger.debug(f'{n} seems to have been offline for a few days') + continue + det = host_config[doc['host']] + status = self.mongo_connect.extract_status(doc, now_time) + doc['status'] = status + phys_stat[det].append(status) + self.logger.debug(f'{det}: {doc["host"]} is in {status}') + for n, doc in self.latest_status[logical]['controller'].items(): + if doc is None: + self.logger.debug(f'{n} seems to have been offline for a few days') + continue + det = host_config[doc['host']] + status = self.mongo_connect.extract_status(doc, now_time) + doc['status'] = status + phys_stat[det].append(status) + self.logger.debug(f'{det}: {doc["host"]} is in {status}') for det in latest_status[logical]['detectors'].keys(): - phy_status = physical_status[det]['status'] - logger.debug(f'{logical} is {log_status}, {det} is {phy_status}') - if phy_status in [daqnt.DAQ_STATUS.TIMEOUT]: - timeout_list.append(phys_det) + phy_status = latest_status[logical]['detectors']['status'] + agg_status = self.mongo_connect.combine_statuses(phys_stat[det]) + self.logger.debug(f'{logical} is {log_status}, {det} is {phy_status} -> {agg_stat}') + #if phy_status in [daqnt.DAQ_STATUS.TIMEOUT]: timeout_list.append(phys_det) + #else: running_list.append(phys_det) + if (phy_status != agg_status) or (phy_status in ['ERROR','TIMEOUT','UNKNOWN']): + timeout_list.append(det) else: - running_list.append(phys_det) + running_list.append(det) + self.logger.debug(f'These detectors are not in timeout: {running_list}, ' - f'these are in timeout: {ntimeout_list}') + f'these are in timeout: {timeout_list}') # check if the TPC is part of the problem if 'tpc' in timeout_list: return False