forked from OpenBudget/open-budget-data
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathprocess.py
More file actions
142 lines (132 loc) · 5.51 KB
/
process.py
File metadata and controls
142 lines (132 loc) · 5.51 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
import sys
import os
import glob
import yaml
import logging
import time
processor_order = ['spreadsheet_to_jsons',
'download_pending_changes',
'new_budget_csv',
'rar_to_zip',
'combine_budget_jsons',
'prepare_budget_changes',
'csv_to_jsons',
'item_connections',
'aggregate_jsons_by_key',
'analyze_budgets',
'extract_txt_from_docs',
'extract_change_groups',
'concat',
'consolidate_change_dates',
'fix_changeline_budget_titles',
'fix_support_budget_titles',
'make_search_prefixes',
'dump_to_db',
'join',
'upload',
'rss',
'extract_for_partition_layout',
'prepare_compare_record']
processor_order = dict( (e,i) for i,e in enumerate(processor_order) )
def collect_processors():
current_path = "." # os.path.abspath(".")
for dirpath, dirnames, filenames in os.walk(current_path):
processors = [ f for f in filenames if f.endswith("yaml") ]
for processor in processors:
processor_fname = os.path.join(dirpath,processor)
parsed = yaml.load(file(processor_fname).read())['rules']
for p in parsed:
p['_basepath'] = dirpath
p['_filename'] = processor
files = [os.path.join(dirpath,processor), os.path.join('processors',p['processor'])+'.py']
p['_modtime'] = max(os.path.getmtime(x) for x in files)
#logging.info("PROCESSOR %r" % p)
yield p
def is_relevant_processor(processor):
basepath = processor['_basepath']
modtime = processor['_modtime']
input = processor.get('input')
if type(input) == str:
input = os.path.join(basepath,input)
if '*' in input:
input = glob.glob(input)
elif type(input) == list:
input = [ os.path.join(basepath,x) for x in input ]
elif input is None:
input = []
output = processor['output']
delay = processor.get('delay',0)
if output.startswith("/"):
src,dst = output.split('/')[1:3]
output = [ x.replace(src,dst) for x in input ]
tuples = zip(input,output)
tuples = [ (i,o) for i,o in tuples if
os.path.exists(i) and
((os.path.exists(o) and max(modtime,os.path.getmtime(i)) > (delay+os.path.getmtime(o))) or
(not os.path.exists(o))) ]
ret = len(tuples)>0
else:
list_input = input
if type(input) != list:
list_input = [ list_input ]
ret = all([os.path.exists(i) for i in list_input])
if len(input) > 0:
modified_times = [ os.path.getmtime(i) for i in list_input if os.path.exists(i) ]
modified_times.append(modtime)
else:
modified_times = [ time.time() ]
output = os.path.join(basepath,output)
ret = ret and ((not os.path.exists(output)) or (len(modified_times)>0 and max(modified_times) > (delay+os.path.getmtime(output))))
tuples = [ (input, output) ]
#logging.info("PROCESSOR %sRELEVANT%r" % ("" if ret else "NOT ", p))
processor['_tuples'] = tuples
return ret
def run_processor(processor,apikey):
for inputs,output in processor['_tuples']:
processor_classname = processor['processor']
processor_module = "processors."+processor_classname
processor_class = __import__(processor_module, globals(), locals(), [processor_classname], -1).__dict__.get(processor_classname)
processor_obj = processor_class()
params = processor.get('params',{})
if processor_classname == "upload":
params['APIKEY'] = apikey
logging.info("%s(%s) %s << %s" % (processor_classname,
", ".join("%s=%s" % i for i in params.iteritems()), output,
" + ".join(inputs) if type(inputs)==list else inputs))
try:
processor_obj.process(inputs,output,**params)
except Exception,e:
logging.error("%s(%s) %s, deleting %s" % (processor_classname,
", ".join("%s=%s" % i for i in params.iteritems()), e, output))
if os.path.exists(output):
os.unlink(output)
raise
def setup_logging():
root = logging.getLogger()
root.setLevel(logging.DEBUG)
out = file('process.log','w')
ch = logging.StreamHandler(out)
ch.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s:%(levelname)s:%(message)s')
ch.setFormatter(formatter)
root.addHandler(ch)
if __name__ == "__main__":
import singleton
me = singleton.SingleInstance()
APIKEY = None
if len(sys.argv) > 1:
APIKEY = sys.argv[1]
has_logging = False
priorities = list
processors = list( collect_processors() )
while True:
relevant = [ p for p in processors if is_relevant_processor(p) ]
relevant.sort( key=lambda p: processor_order[p['processor']] )
if len(relevant) == 0:
break
else:
if not has_logging:
setup_logging()
has_logging = True
logging.debug("relevant processors: %r" % [ r['processor'] for r in relevant ])
run_processor(relevant[0],APIKEY)