diff --git a/etc/scripts/dataset_pipeline/build_dataset.py b/etc/scripts/dataset_pipeline/build_dataset.py new file mode 100644 index 0000000000..d2bdf9c616 --- /dev/null +++ b/etc/scripts/dataset_pipeline/build_dataset.py @@ -0,0 +1,153 @@ +# extracts required phrases from .RULE files +# outputs a JSONL dataset for NER model training +import json +import re +import unicodedata +from pathlib import Path +import click + +from licensedcode.models import Rule +from licensedcode.models import rules_data_dir as default_rules_data_dir +from licensedcode.required_phrases import get_required_phrase_verbatim +from licensedcode.tokenize import required_phrase_splitter + + +def normalize_phrase(phrase): + """Clean raw marker phrase for training""" + result = phrase + # replace html entities + result = result.replace('"', '"').replace('&', '&') + result = result.replace('<', '<').replace('>', '>') + # strip xml tags like , but keep urls in angle brackets + result = re.sub(r'<(?![a-zA-Z]+://)[^>]+>', '', result) + # remove markdown backticks + result = result.replace('`', '') + # collapse whitespace and trim + result = re.sub(r'\s+', ' ', result).strip() + # strip trailing/leading punct thats not meaningful + result = result.strip('.,;:<>') + return result + + +def get_rule_type(rule): + """is_* flag set on the rule""" + for flag in ('is_license_text', 'is_license_notice', 'is_license_reference', + 'is_license_tag', 'is_license_intro', 'is_license_clue', + 'is_false_positive'): + if getattr(rule, flag, False): + return flag + return 'unknown' + + +def tag_tokens(text): + """Tag each word token with a BIOES label""" + tokens = [] + labels = [] + in_phrase = False + count = 0 # word tokens seen since the last {{ + + for tok in required_phrase_splitter(text): + if tok == '{{': + in_phrase = True + count = 0 + continue + if tok == '}}': + if in_phrase and count > 0: + labels[-1] = 'S-REQ' if count == 1 else 'E-REQ' + in_phrase = False + count = 0 + continue + tokens.append(tok) + if in_phrase: + labels.append('B-REQ' if count == 0 else 'I-REQ') + count += 1 + else: + labels.append('O') + return tokens, labels + + +@click.command() +@click.option('--rules-dir', type=click.Path(exists=True), default=None, + help='Path to rules directory (defaults to repo rules dir)') +@click.option('--output', default='dataset-output/required_phrases.jsonl', + help='Output JSONL path') +def main(rules_dir, output): + """Extract required phrases from rule files for NER training""" + if not rules_dir: + repo_rules = Path(__file__).resolve().parents[3] / 'src' / 'licensedcode' / 'data' / 'rules' + rules_dir = str(repo_rules) if repo_rules.is_dir() else default_rules_data_dir + + rules_path = Path(rules_dir) + out_path = Path(output) + out_path.parent.mkdir(parents=True, exist_ok=True) + + total_rules = 0 + annotated = 0 + total_phrases = 0 + results = [] + + click.echo(f'scanning rules from: {rules_path}') + for rf in sorted(rules_path.glob('*.RULE')): + try: + rule = Rule.from_file(rule_file=str(rf)) + except Exception: + continue + total_rules += 1 + + # is_required_phrase rules don't need {{ }}.the flag covers them + if getattr(rule, 'is_required_phrase', False): + continue + + text = rule.text or '' + if not text: + continue + + # normalize line endings and unicode + text = text.replace('\r\n', '\n').replace('\r', '\n') + text = unicodedata.normalize('NFKC', text) + + phrases = list(get_required_phrase_verbatim(text)) + if not phrases: + continue + + # word tokens + BIOES labels (computed before stripping markers) + tokens, bioes_labels = tag_tokens(text) + + # strip out the {{ }} markers + text = text.replace('{{', '').replace('}}', '') + + valid_phrases = [ + {'phrase': p, 'phrase_normalized': normalize_phrase(p)} + for p in phrases + ] + + annotated += 1 + total_phrases += len(valid_phrases) + results.append({ + 'identifier': rule.identifier, + 'license_expression': rule.license_expression or '', + 'rule_type': get_rule_type(rule), + 'text': text, + 'tokens': tokens, + 'bioes_labels': bioes_labels, + 'required_phrases': valid_phrases, + }) + + # write jsonl + with open(out_path, 'w', encoding='utf-8') as f: + for entry in results: + f.write(json.dumps(entry, ensure_ascii=False) + '\n') + + click.echo('\ndone') + click.echo(f' rules scanned: {total_rules}') + click.echo(f' annotated: {annotated}') + click.echo(f' phrases extracted: {total_phrases}') + click.echo(f' output: {out_path}') + + +# stuff to do(follow up commits): +# train/val/test split by license expression + + +if __name__ == '__main__': + main()