1313import learning_observer .constants as constants
1414import learning_observer .kvs
1515import learning_observer .settings
16+ from learning_observer .stream_analytics .fields import KeyField , KeyStateType , EventField
1617import learning_observer .stream_analytics .helpers
1718# import traceback
1819import learning_observer .util
1920
20- from learning_observer .log_event import debug_log
21-
2221pmss .register_field (
2322 name = 'use_nlp' ,
2423 description = 'Flag for loading in and using AWE Components. These are ' \
@@ -177,10 +176,11 @@ async def get_latest_student_documents(student_data):
177176
178177 # Compile a list of the active students.
179178 active_students = [s for s in student_data if 'writing_observer.writing_analysis.last_document' in s ]
179+
180180 # Now collect documents for all of the active students.
181181 document_keys = ([
182182 learning_observer .stream_analytics .helpers .make_key (
183- writing_observer .writing_analysis .reconstruct ,
183+ writing_observer .writing_analysis .gdoc_scope_reconstruct ,
184184 {
185185 KeyField .STUDENT : s [constants .USER_ID ],
186186 EventField ('doc_id' ): get_last_document_id (s )
@@ -190,7 +190,6 @@ async def get_latest_student_documents(student_data):
190190
191191 kvs_data = await kvs .multiget (keys = document_keys )
192192
193-
194193 # Return blank entries if no data, rather than None. This makes it possible
195194 # to use item.get with defaults sanely. For the sake of later alignment
196195 # we also zip up the items with the keys and users that they come from
@@ -209,9 +208,7 @@ async def get_latest_student_documents(student_data):
209208 # Now insert the student data and pass it along.
210209 doc ['student' ] = student
211210 writing_data .append (doc )
212-
213- print (writing_data )
214-
211+
215212 return writing_data
216213
217214
@@ -226,19 +223,16 @@ async def remove_extra_data(writing_data):
226223 return writing_data
227224
228225
229- # async def merge_with_student_data(writing_data, student_data):
230- # '''
231- # Add the student metadata to each text. Because we may have
232- # fewer entries in writing_data than student_data we iterate
233- # over the student_data locating writing data that matches it
234- # if any.
235- # '''
226+ async def merge_with_student_data (writing_data , student_data ):
227+ '''
228+ Add the student metadata to each text
229+ '''
236230
237- # for item, student in zip(writing_data, student_data):
238- # if 'edit_metadata' in item:
239- # del item['edit_metadata']
240- # item['student'] = student
241- # return writing_data
231+ for item , student in zip (writing_data , student_data ):
232+ if 'edit_metadata' in item :
233+ del item ['edit_metadata' ]
234+ item ['student' ] = student
235+ return writing_data
242236
243237
244238# TODO the use_nlp initialization code ought to live in a
@@ -294,7 +288,7 @@ async def fetch_doc_from_google(student, doc_id):
294288 if 'text' not in text :
295289 return None
296290 key = learning_observer .stream_analytics .helpers .make_key (
297- writing_observer .writing_analysis .reconstruct ,
291+ writing_observer .writing_analysis .gdoc_scope_reconstruct ,
298292 {
299293 KeyField .STUDENT : student ,
300294 EventField ('doc_id' ): doc_id
@@ -347,7 +341,7 @@ async def fetch_doc_from_google(student):
347341 text = await learning_observer .google .doctext (runtime , documentId = docId )
348342 # set reconstruction data to ground truth
349343 key = learning_observer .stream_analytics .helpers .make_key (
350- writing_observer .writing_analysis .reconstruct ,
344+ writing_observer .writing_analysis .gdoc_scope_reconstruct ,
351345 {
352346 KeyField .STUDENT : student [constants .USER_ID ],
353347 EventField ('doc_id' ): docId
@@ -383,7 +377,6 @@ async def latest_data(runtime, student_data, options=None):
383377 # single_doc.update(annotated_text)
384378 :return: The latest writing data.
385379 '''
386- debug_log ("WritingObserver latest_data students:" , student_data )
387380
388381 # HACK we have a cache downstream that relies on redis_ephemeral being setup
389382 # when that is resolved, we can remove the feature flag
@@ -394,18 +387,29 @@ async def latest_data(runtime, student_data, options=None):
394387 # Get the latest documents with the students appended.
395388 writing_data = await get_latest_student_documents (student_data )
396389
397- # Strip out the unnecessary edit_metadata from the merged
398- # student and writing data.
390+ # Strip out the unnecessary extra data.
399391 writing_data = await remove_extra_data (writing_data )
400392
401- # Now process the remaining data. Previously this called
402- # for merge_with_student_data however that is unnecessary
403- # as the steps above will already integrate the student
404- # information into the writing data.
393+ # print(">>> WRITE DATA-premerge: {}".format(writing_data))
394+
395+ # This is the error. Skipping now.
396+ writing_data_merge = await merge_with_student_data (writing_data , student_data )
397+ # print(">>> WRITE DATA-postmerge: {}".format(writing_data_merge))
398+
399+ # #print(">>>> PRINT WRITE DATA: Merge")
400+ # #print(writing_data)
401+
402+ # just_the_text = [w.get("text", "") for w in writing_data]
403+
404+ # annotated_texts = await writing_observer.awe_nlp.process_texts_parallel(just_the_text)
405+
406+ # for annotated_text, single_doc in zip(annotated_texts, writing_data):
407+ # if annotated_text != "Error":
408+ # single_doc.update(annotated_text)
409+
410+ writing_data = await merge_with_student_data (writing_data , student_data )
405411 writing_data = await processor (writing_data , options )
406412
407- debug_log ("WritingObserver latest_data result: " , writing_data )
408-
409413 return {'latest_writing_data' : writing_data }
410414
411415
0 commit comments