Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions sqapi/conf/sqapi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,6 @@ meta_store:

data_store:
type: 'disk'

execution:
post_processing: True
23 changes: 23 additions & 0 deletions sqapi/core/post_processor/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import logging

from sqapi.core.message import Message
from sqapi.util import detector

log = logging.getLogger(__name__)


class PostProcessor:
def __init__(self, config):
self.executor = detector.detect_post_processor(config.get('post_processing'))

def post_plugin(self, messsage: Message, plugin, success):
try:
self.executor.plugin(messsage, plugin, success)
except Exception as e:
log.warning('Could not execute post plugin: {}'.format(str(e)))

def post_execution(self, message: Message, success):
try:
self.executor.execution(message, success)
except Exception as e:
log.warning('Could not execute post execution: {}'.format(str(e)))
9 changes: 9 additions & 0 deletions sqapi/core/process_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from sqapi.core.message import Message
from sqapi.core.plugin_manager import PluginManager
from sqapi.core.post_processor import PostProcessor
from sqapi.query import data as q_data, metadata as q_meta
from sqapi.util import detector
from sqapi.util.cfg_util import Config
Expand All @@ -29,6 +30,7 @@ def __init__(self, config: Config, plugin_manager: PluginManager):
self.database.initialize_message_table()

self.listener = detector.detect_listener(self.config.msg_broker, self.process_message)
self.post_processor = PostProcessor(config.execution)

def start_subscribing(self):
log.info('Starting message subscription')
Expand Down Expand Up @@ -59,9 +61,12 @@ def process_message(self, message: Message):
self.database.update_message(message, STATUS_DONE)
log.info('Processing completed')

self.post_processor.post_execution(message, True)

except LookupError as e:
self.database.update_message(message, STATUS_RETRY, str(e))
log.warning('Could not fetch data and/or metadata at this point: {}'.format(str(e)))
self.post_processor.post_execution(message, False)

except Exception as e:
try:
Expand All @@ -75,6 +80,8 @@ def process_message(self, message: Message):
log.debug(message)
log.debug(e)

self.post_processor.post_execution(message, False)

def check_mime_type(self, message: Message):
log.debug('Validating data type')
log.debug('Message data type: {}'.format(message.type))
Expand Down Expand Up @@ -120,8 +127,10 @@ def plugin_execution(self, plugin, message, metadata, data_path):
open(data_path, 'rb')
)
except Exception as e:
self.post_processor.post_plugin(message, plugin.name, False)
log.warning('{} failed processing {}: {}'.format(plugin.name, message.uuid, str(e)))
else:
self.post_processor.post_plugin(message, plugin.name, True)
run_time = (time.time() - start) * 1000.0
log.info('{} used {} (milliseconds) processing {}'.format(plugin.name, run_time, message.uuid))

Expand Down
2 changes: 2 additions & 0 deletions sqapi/util/cfg_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ def __init__(self, config_file):
self.api = cfg.get('api') or {}
self.custom = cfg.get('custom') or {}
self.packages = cfg.get('packages') or {}
self.execution = cfg.get('execution') or {}

def merge_config(self, override):
self.plugin.update(override.plugin)
Expand All @@ -34,6 +35,7 @@ def merge_config(self, override):
self.api.update(override.api)
self.custom.update(override.custom)
self.packages.update(override.packages)
self.execution.update(override.execution)


def load_config(config_file):
Expand Down
16 changes: 16 additions & 0 deletions sqapi/util/detector.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,22 @@ def detect_meta_connectors(config):
raise AttributeError(err)


def detect_post_processor(config):
log.debug('Detecting post processors')

if not config.get('active'):
log.debug('Post Processing is not activated in configuration')
return

# TODO: Decide; Detect from post_processor package, or specific file?
# TODO: Decide; Multiple post processors vs. singular

# TODO: Example; post_processors in packages and multiple configurable options
# TODO: Enable specific post_processors, like "send event", "log execution", etc.?

return None


def import_module(target_module, directory):
module_dict = detect_modules(directory)

Expand Down