Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 128 additions & 0 deletions etc/scripts/dataset_pipeline/build_dataset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
# extracts required phrases from .RULE files
# outputs a JSONL dataset for NER model training
import json
import re
import unicodedata
from pathlib import Path
import click

from licensedcode.models import Rule
from licensedcode.models import rules_data_dir as default_rules_data_dir
from licensedcode.required_phrases import get_required_phrase_verbatim


def normalize_phrase(phrase):
"""Clean raw marker phrase for training"""
result = phrase
# replace html entities
result = result.replace('"', '"').replace('&', '&')
result = result.replace('&lt;', '<').replace('&gt;', '>')
# strip xml tags like <name>,</license>
result = re.sub(r'<[^>]+>', '', result)
# remove markdown backticks
result = result.replace('`', '')
# collapse whitespace and trim
result = re.sub(r'\s+', ' ', result).strip()
# strip trailing/leading punct thats not meaningful
result = result.strip('.,;:>')
return result


def get_rule_type(rule):
"""is_* flag set on the rule"""
for flag in ('is_license_text', 'is_license_notice', 'is_license_reference',
'is_license_tag', 'is_license_intro', 'is_license_clue',
'is_false_positive'):
if getattr(rule, flag, False):
return flag
return 'unknown'


@click.command()
@click.option('--rules-dir', type=click.Path(exists=True), default=None,
help='Path to rules directory (defaults to repo rules dir)')
@click.option('--output', default='dataset-output/required_phrases.jsonl',
help='Output JSONL path')
def main(rules_dir, output):
"""Extract required phrases from rule files for NER training"""
if not rules_dir:
repo_rules = Path(__file__).resolve().parents[3] / 'src' / 'licensedcode' / 'data' / 'rules'
rules_dir = str(repo_rules) if repo_rules.is_dir() else default_rules_data_dir

rules_path = Path(rules_dir)
out_path = Path(output)
out_path.parent.mkdir(parents=True, exist_ok=True)

total_rules = 0
annotated = 0
total_phrases = 0
results = []

click.echo(f'scanning rules from: {rules_path}')
for rf in sorted(rules_path.glob('*.RULE')):
try:
rule = Rule.from_file(rule_file=str(rf))
except Exception:
continue
total_rules += 1

# is_required_phrase rules don't need {{ }}.the flag covers them
if getattr(rule, 'is_required_phrase', False):
continue

text = rule.text or ''
if not text:
continue

# normalize line endings and unicode
text = text.replace('\r\n', '\n').replace('\r', '\n')
text = unicodedata.normalize('NFKC', text)

phrases = list(get_required_phrase_verbatim(text))
if not phrases:
continue

# strip out the {{ }} markers
text = text.replace('{{', '').replace('}}', '')

valid_phrases = []
for p in phrases:
normalized = normalize_phrase(p)
if not normalized:
continue
valid_phrases.append({
'phrase': p,
'phrase_normalized': normalized,
})

if not valid_phrases:
continue
annotated += 1
total_phrases += len(valid_phrases)
results.append({
'identifier': rule.identifier,
'license_expression': rule.license_expression or '',
'rule_type': get_rule_type(rule),
'text': text,
'required_phrases': valid_phrases,
})

# write jsonl
with open(out_path, 'w', encoding='utf-8') as f:
for entry in results:
f.write(json.dumps(entry, ensure_ascii=False) + '\n')

click.echo('\ndone')
click.echo(f' rules scanned: {total_rules}')
click.echo(f' annotated: {annotated}')
click.echo(f' phrases extracted: {total_phrases}')
click.echo(f' output: {out_path}')


# stuff to do(follow up commits):
# - tokens + BIOES labels
# - train/val/test split by license expression


if __name__ == '__main__':
main()