From 842fb179e909f47c6d39eef2cb2fdf93bb2618c0 Mon Sep 17 00:00:00 2001 From: Thamme Gowda Date: Sun, 27 Apr 2025 14:21:59 -0700 Subject: [PATCH 01/10] v0.4.4-dev; add eng-bho; Fixes #174 --- mtdata/__init__.py | 2 +- mtdata/index/other.py | 20 ++++++++++++++++++++ mtdata/resource/refs.bib | 7 +++++++ 3 files changed, 28 insertions(+), 1 deletion(-) diff --git a/mtdata/__init__.py b/mtdata/__init__.py index 4e7cf93..1e9c09b 100644 --- a/mtdata/__init__.py +++ b/mtdata/__init__.py @@ -4,7 +4,7 @@ # Created: 4/4/20 -__version__ = '0.4.3' +__version__ = '0.4.4-dev' __description__ = 'mtdata is a tool to download datasets for machine translation' __author__ = 'Thamme Gowda' diff --git a/mtdata/index/other.py b/mtdata/index/other.py index 006906b..12042ce 100644 --- a/mtdata/index/other.py +++ b/mtdata/index/other.py @@ -144,3 +144,23 @@ def load_all(index: Index): l2_ext = l2.replace('_', "-") index += Entry(did=f"Microsoft-ntrex-128-{l1}-{l2}", url=_url, filename="NTREX-52b9c57c.tar.gz", in_ext='txt', in_paths=[f"*/NTREX-128/newstest2019-ref.{l1_ext}.txt", f"*/NTREX-128/newstest2019-ref.{l2_ext}.txt"]) + + ### English - Bhojpuri ### + url="https://github.com/shashwatup9k/BHLTR/archive/2d2550033222.zip" + filename = url.split('/')[-1] # this will force to share the file across the entries + cite = ("ojha2019english",) + # Parallel corpora + for split, splitname in [ # shortname, fullname, suffix + ("train", "training"), + ("dev", "development"), + ("test", "test.*")]: + f1 = f"*/parallel-corpora/eng--bho.{splitname}.eng" + f2 = f"*/parallel-corpora/eng--bho.{splitname}.bho" + index += Entry(did=DatasetId(group='BHLTR', name=split, version='1', langs=('eng', 'bho')), + url=url, filename=filename, ext='zip', in_ext='txt', in_paths=[f1, f2], cite=cite) + # monolingual corpora + for version, f1 in [ + ("1", "*/mono-bho-corpus/monolingual.bho"), + ("2", "*/mono-bho-corpus/monolingual-v0.2.bho")]: + index += Entry(did=DatasetId(group='BHLTR', name=f'mono', version=version, langs=('bho',)), + url=url, filename=filename, ext='zip', in_ext='txt', in_paths=[f1], cite=cite) diff --git a/mtdata/resource/refs.bib b/mtdata/resource/refs.bib index eed9dc7..58d4568 100644 --- a/mtdata/resource/refs.bib +++ b/mtdata/resource/refs.bib @@ -720,3 +720,10 @@ @misc{nagata2024japanesechinese archivePrefix={arXiv}, primaryClass={cs.CL}, } + +@article{ojha2019english, + title={English-Bhojpuri SMT System: Insights from the Karaka Model}, + author={Ojha, Atul Kr}, + journal={arXiv preprint arXiv:1905.02239}, + year={2019} +} \ No newline at end of file From 1997e09d0a09056c2d96a9748ac5ad2cf4182c49 Mon Sep 17 00:00:00 2001 From: Thamme Gowda Date: Wed, 17 Sep 2025 15:00:52 -0700 Subject: [PATCH 02/10] add mtdata-map CLI; add tests --- mtdata/map.py | 33 +++++++++++++--- pyproject.toml | 1 + tests/test_map_cli.py | 89 +++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 118 insertions(+), 5 deletions(-) create mode 100644 tests/test_map_cli.py diff --git a/mtdata/map.py b/mtdata/map.py index 3297a64..153e589 100644 --- a/mtdata/map.py +++ b/mtdata/map.py @@ -22,7 +22,8 @@ from mtdata.utils import IO -DELIM = '\t' +#DELIM = '\t' +DELIM = None SENTINEL = None @@ -188,7 +189,11 @@ def read_stream(cls, paths: Iterator[List[Path]]) -> Iterator[Union[dict,list]]: def read_input_paths(input, delim=DELIM): for line in input: - parts = line.rstrip("\n").split(delim) + parts = line.rstrip("\n") + if delim: + parts = parts.split(delim) + else: + parts = parts.split() # white spaces parts = [Path(p) for p in parts] yield parts @@ -219,14 +224,29 @@ def main(): stream = trim_stream(stream, skip=n_skip, limit=n_limit) mapper = SubprocMapper(cmdline=args['cmdline']) + out = None try: out_stream = mapper(stream) for rec in out_stream: - print(rec) + if isinstance(rec, dict): + if out is not None: + log.info(f"closing {out.name}") + out.close() + log.info(f"[[opening]] {rec['output']}") + out_path = Path(rec['output']) + if args['make_parents']: + out_path.parent.mkdir(parents=True, exist_ok=True) + out = out_path.open('w', encoding='utf-8', errors='replace') + else: + assert out is not None, f"Output file is not opened yet" + out.write(rec + '\n') except Exception as e: mapper.close() raise - + finally: + if out is not None: + log.info(f"((closing)) {out.name}") + out.close() def parse_args(): parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) @@ -234,11 +254,14 @@ def parse_args(): help="Mapper command that maps line-by-line, maintains 1:1 mapping and the input order. For example: 'cat'") parser.add_argument('-i', '--input', type=argparse.FileType('r'), default=sys.stdin, help="Listing file containing file paths. Atleast two paths per line is expected first one is input and last one is output") - parser.add_argument('-d', '--delim', type=str, default=DELIM, help="delimiter for paths in input") + parser.add_argument('-d', '--delim', type=str, default=DELIM, help="delimiter for paths in input. default=None => split by all whitespaces (space, tab etc.)") parser.add_argument('-l', '--limit', type=int, default=0, help="Limit data stream to these many lines. Score: for debugging and testing") parser.add_argument('-s', '--skip', type=int, default=0, help="Skip the first n records. Scope: for debugging and testing") + parser.add_argument('-p', '--parents', action='store_true', dest='make_parents', + help="Create parent directories for output files if they do not exist") + # return the parsed arguments return parser.parse_args() if __name__ == '__main__': diff --git a/pyproject.toml b/pyproject.toml index 118fe87..f872014 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -61,6 +61,7 @@ test = [ "pytest", "pytest-cov[all]", "black", "isort", "mypy"] mtdata = "mtdata.__main__:main" mtdata-iso = "mtdata.iso.__main__:main" mtdata-bcp47 = "mtdata.iso.bcp47:main" +mtdata-map = "mtdata.map:main" [tool.setuptools.dynamic] diff --git a/tests/test_map_cli.py b/tests/test_map_cli.py new file mode 100644 index 0000000..1d87e6a --- /dev/null +++ b/tests/test_map_cli.py @@ -0,0 +1,89 @@ +import sys +import subprocess +from pathlib import Path + + +def run_map_cmd(cmd_args, cwd): + cmd = [sys.executable, '-m', 'mtdata.map', '-c', 'cat', '-i', str(cmd_args['list_file'])] + if cmd_args.get('make_parents'): + cmd.append('-p') + if cmd_args.get('skip'): + cmd.extend(['-s', str(cmd_args['skip'])]) + if cmd_args.get('limit'): + cmd.extend(['-l', str(cmd_args['limit'])]) + return subprocess.run(cmd, capture_output=True, text=True, cwd=cwd) + + +def test_map_cli_basic(tmp_path): + repo_root = Path(__file__).resolve().parents[1] + # create a simple input file + inp = tmp_path / 'in1.txt' + inp.write_text('line1\nline2\n') + + outp = tmp_path / 'out' / 'o1.txt' + listing = tmp_path / 'list.txt' + listing.write_text(f"{inp}\t{outp}\n") + + res = run_map_cmd({'list_file': listing, 'make_parents': True}, cwd=repo_root) + assert res.returncode == 0, f"STDERR:\n{res.stderr}" + + assert outp.exists() + assert outp.read_text().splitlines() == ['line1', 'line2'] + + +def test_map_cli_skip_limit(tmp_path): + repo_root = Path(__file__).resolve().parents[1] + inp = tmp_path / 'in2.txt' + inp.write_text('L1\nL2\nL3\n') + + outp = tmp_path / 'out2.txt' + listing = tmp_path / 'list2.txt' + listing.write_text(f"{inp}\t{outp}\n") + + # skip=1 should skip the first data line; limit=3 yields ctrl + two data lines + res = subprocess.run([sys.executable, '-m', 'mtdata.map', '-c', 'cat', '-i', str(listing), '-p', '-s', '1', '-l', '3'], + capture_output=True, text=True, cwd=repo_root) + assert res.returncode == 0, f"STDERR:\n{res.stderr}" + + assert outp.exists() + assert outp.read_text().splitlines() == ['L2', 'L3'] + + +def test_map_cli_multi_columns(tmp_path): + repo_root = Path(__file__).resolve().parents[1] + + # first group of three input files + in1a = tmp_path / 'g1_a.txt' + in1b = tmp_path / 'g1_b.txt' + in1c = tmp_path / 'g1_c.txt' + in1a.write_text('a1\na2\n') + in1b.write_text('b1\nb2\n') + in1c.write_text('c1\nc2\n') + + out1 = tmp_path / 'out_g1.txt' + + # second group of three input files + in2a = tmp_path / 'g2_a.txt' + in2b = tmp_path / 'g2_b.txt' + in2c = tmp_path / 'g2_c.txt' + in2a.write_text('x1\nx2\n') + in2b.write_text('y1\ny2\n') + in2c.write_text('z1\nz2\n') + + out2 = tmp_path / 'out_g2.txt' + + listing = tmp_path / 'list_multi.txt' + listing.write_text( + f"{in1a}\t{in1b}\t{in1c}\t{out1}\n" + f"{in2a}\t{in2b}\t{in2c}\t{out2}\n" + ) + + res = subprocess.run([sys.executable, '-m', 'mtdata.map', '-c', 'cat', '-i', str(listing), '-p'], + capture_output=True, text=True, cwd=repo_root) + assert res.returncode == 0, f"STDERR:\n{res.stderr}" + + assert out1.exists() + assert out1.read_text().splitlines() == ['a1\tb1\tc1', 'a2\tb2\tc2'] + + assert out2.exists() + assert out2.read_text().splitlines() == ['x1\ty1\tz1', 'x2\ty2\tz2'] From b66871ac9ebeaa07ae30464b42945cb826ed30bf Mon Sep 17 00:00:00 2001 From: Thamme Gowda Date: Thu, 18 Sep 2025 01:59:28 -0700 Subject: [PATCH 03/10] mtdata-map: fix: do not edit input line i.e., dont fix \t --- mtdata/map.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mtdata/map.py b/mtdata/map.py index 153e589..5d4730d 100644 --- a/mtdata/map.py +++ b/mtdata/map.py @@ -50,7 +50,7 @@ def read_paths(paths: Iterator[List[Path]]) -> Iterator[Union[dict,list]]: for rec in zip_longest(*streams): if len(inps) > 1 and any(x is None for x in rec): raise ValueError(f"Unequal number of lines detected in {inps} @ count: {counter}") - rec = '\t'.join(x.strip().replace('\t', ' ') for x in rec) + rec = '\t'.join(x.strip() for x in rec) yield rec counter += 1 n_data += counter From 2f857e1c34e85d4e7561e53927102703f3ff3fba Mon Sep 17 00:00:00 2001 From: Thamme Gowda Date: Thu, 18 Sep 2025 02:41:09 -0700 Subject: [PATCH 04/10] mtdata-map: improve log messages readability --- mtdata/map.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/mtdata/map.py b/mtdata/map.py index 5d4730d..25d5dc5 100644 --- a/mtdata/map.py +++ b/mtdata/map.py @@ -54,7 +54,7 @@ def read_paths(paths: Iterator[List[Path]]) -> Iterator[Union[dict,list]]: yield rec counter += 1 n_data += counter - log.info(f"Producer: end of {inps}; count: {counter}") + log.info(f"Producer: End of {','.join(str(x) for x in inps)}; count: {counter}") except Exception as e: log.exception(f"Producer: error in {inps}: {e}") log.info(f"Producer: finishing... n_ctrls: {n_ctrls}; n_data: {n_data:,}") @@ -230,7 +230,7 @@ def main(): for rec in out_stream: if isinstance(rec, dict): if out is not None: - log.info(f"closing {out.name}") + log.info(f"[[closing]] {out.name}") out.close() log.info(f"[[opening]] {rec['output']}") out_path = Path(rec['output']) From f4471eddcbb816c59e09e39a649446d1b6de06d4 Mon Sep 17 00:00:00 2001 From: TG Gowda Date: Sun, 22 Mar 2026 18:22:20 +0000 Subject: [PATCH 05/10] bump versions: drop py3.9, add py3.14, loosen deps version to allow minor upgrades --- .github/workflows/python-build-test.yml | 13 +++---------- pyproject.toml | 16 +++++++--------- 2 files changed, 10 insertions(+), 19 deletions(-) diff --git a/.github/workflows/python-build-test.yml b/.github/workflows/python-build-test.yml index 553d452..2c8cbdc 100644 --- a/.github/workflows/python-build-test.yml +++ b/.github/workflows/python-build-test.yml @@ -20,14 +20,8 @@ jobs: strategy: fail-fast: false matrix: - os: [ubuntu-latest, macos-latest] # windows-latest - python-version: ["3.9", "3.10", "3.11", "3.12", "3.13"] - #exclude: - # - os: macos-latest - # python-version: '3.7' - # - os: ubuntu-latest - # python-version: '3.7' - # os x py versions here: https://raw.githubusercontent.com/actions/python-versions/main/versions-manifest.json + os: [ubuntu-latest, macos-latest] + python-version: ["3.10", "3.11", "3.12", "3.13", "3.14"] steps: - uses: actions/checkout@v4 - name: Set up Python ${{ matrix.python-version }} @@ -36,8 +30,7 @@ jobs: python-version: "${{ matrix.python-version }}" - name: Install dependencies run: | - pip install --upgrade pip - pip install setuptools==61.2 flake8 + pip install --upgrade pip setuptools python --version pip --version - name: Install module diff --git a/pyproject.toml b/pyproject.toml index f872014..2d8e5c8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -10,7 +10,7 @@ readme = "README.md" #license = {text = "Apache-2.0"} #license-files = [ "LICENSE" ] # twine complaints about this; license-files = [] #TODO: revisit this after twine is fixed -requires-python = ">=3.9" +requires-python = ">=3.10" authors = [ { name = "Thamme Gowda", email = "tgowdan@gmail.com" } ] @@ -32,12 +32,11 @@ keywords = [ "computational linguistics" ] dependencies = [ - "requests>=2.32.0", - "enlighten==1.10.1", - "portalocker==2.3.0", - "pybtex==0.24.0", - "setuptools>=80.9.0; python_version >= '3.12'", # Needed until pybtex updates - "ruamel.yaml >= 0.17.10" + "requests~=2.32", + "enlighten~=1.14", + "portalocker~=3.2", + "pybtex~=0.25", + "ruamel.yaml~=0.18" ] [project.urls] @@ -52,9 +51,8 @@ exclude = ["tests*", "tmp*", "build*", "dist*", "crawler*", "*.egg-info*"] [project.optional-dependencies] -hf = ["datasets>=3.1.0"] +hf = ["datasets>=4.0.0"] test = [ "pytest", "pytest-cov[all]", "black", "isort", "mypy"] -# Note: hf datasets>=3.2 onwards doesnot support python 3.8 [project.scripts] From a880011c05f4b92004ed255ccf19d53887dbfd1e Mon Sep 17 00:00:00 2001 From: TG Gowda Date: Sun, 22 Mar 2026 18:32:10 +0000 Subject: [PATCH 06/10] refactor: replace multiprocessing.Queue with queue.Queue in SubprocMapper --- mtdata/index/statmt.py | 2 +- mtdata/map.py | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/mtdata/index/statmt.py b/mtdata/index/statmt.py index d116e48..2a42f5f 100644 --- a/mtdata/index/statmt.py +++ b/mtdata/index/statmt.py @@ -384,7 +384,7 @@ def load_mono(index: Index): wmt22_cite = ('kocmi-etal-2022-findings',) # 1. News Crawl - """ + r""" base=https://data.statmt.org/news-crawl langs=$(curl $base/ | grep -o 'href="[a-z][a-z]\+/"' | cut -f2 -d \") for i in $langs; do diff --git a/mtdata/map.py b/mtdata/map.py index 25d5dc5..913b0ba 100644 --- a/mtdata/map.py +++ b/mtdata/map.py @@ -14,7 +14,7 @@ from itertools import zip_longest import subprocess as sp -import multiprocessing as mp +import queue import threading as mt import sys @@ -74,8 +74,8 @@ def __init__(self, cmdline: str, max_qsize=1024*1024, shell=True): def start(self): assert not self._started, f'Already started' - self.ctrl_queue = mp.Queue(maxsize=self.max_qsize) - self.data_queue = mp.Queue(maxsize=self.max_qsize) + self.ctrl_queue = queue.Queue(maxsize=self.max_qsize) + self.data_queue = queue.Queue(maxsize=self.max_qsize) log.info(f"RUN:\n\t{self.cmdline}") self.proc = sp.Popen(self.cmdline, stdin=sp.PIPE, stdout=sp.PIPE, text=True, **self._subproc_args) self._stop_event.clear() From 7e5090412addee560279cc0b21e41c749ca805dc Mon Sep 17 00:00:00 2001 From: TG Gowda Date: Sun, 22 Mar 2026 18:44:08 +0000 Subject: [PATCH 07/10] enable windows-latest and ubuntu-22.04 --- .github/workflows/python-build-test.yml | 6 +++--- mtdata/tmx.py | 18 +++++++++--------- tests/__init__.py | 2 +- tests/test_cli.py | 19 +++++++++---------- tests/test_huggingface.py | 4 ++-- tests/test_map_cli.py | 3 +++ 6 files changed, 27 insertions(+), 25 deletions(-) diff --git a/.github/workflows/python-build-test.yml b/.github/workflows/python-build-test.yml index 2c8cbdc..e9a9423 100644 --- a/.github/workflows/python-build-test.yml +++ b/.github/workflows/python-build-test.yml @@ -20,7 +20,7 @@ jobs: strategy: fail-fast: false matrix: - os: [ubuntu-latest, macos-latest] + os: [ubuntu-22.04, ubuntu-latest, macos-latest, windows-latest] python-version: ["3.10", "3.11", "3.12", "3.13", "3.14"] steps: - uses: actions/checkout@v4 @@ -38,8 +38,8 @@ jobs: pip install .[hf,test] - name: Test with pytest run: | - python3 -m pytest + python -m pytest - name: Test CLI from other dir run: | cd iso-langs - python3 -m mtdata -ri list -id -l eng-kan + python -m mtdata -ri list -id -l eng-kan diff --git a/mtdata/tmx.py b/mtdata/tmx.py index a307e63..4aa8c13 100644 --- a/mtdata/tmx.py +++ b/mtdata/tmx.py @@ -7,6 +7,7 @@ from pathlib import Path from xml.etree import ElementTree as ET import argparse +import sys from mtdata import log from mtdata.utils import IO import time @@ -89,21 +90,20 @@ def read_tmx(path: Union[Path, str], langs=None): def main(inp, out, langs): recs = read_tmx(inp, langs=langs) - with IO.writer(out) as out: - count = 0 - for rec in recs: - rec = [l.replace('\t', ' ') for l in rec] - out.write('\t'.join(rec) + '\n') - count += 1 - log.warning(f"Wrote {count} lines to {out}") + count = 0 + for rec in recs: + rec = [l.replace('\t', ' ') for l in rec] + out.write('\t'.join(rec) + '\n') + count += 1 + log.warning(f"Wrote {count} lines to {out}") if __name__ == '__main__': p = argparse.ArgumentParser(description='A tool to convert TMX to TSV', formatter_class=argparse.ArgumentDefaultsHelpFormatter) p.add_argument('-i', '--inp', type=Path, required=True, help='Input file path') - p.add_argument('-o', '--out', type=Path, default=Path('/dev/stdout'), - help='Output file path') + p.add_argument('-o', '--out', type=argparse.FileType('w'), default=sys.stdout, + help='Output file path; defaults to stdout') p.add_argument('-l', '--langs', type=Langs, default=None, help='Languages from TMX. example: eng-fra or en-fr') diff --git a/tests/__init__.py b/tests/__init__.py index f0fefbe..0b39c20 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -4,4 +4,4 @@ # Created: 4/23/20 import sys -MTDATA_CMD = f"{sys.executable} -m mtdata" \ No newline at end of file +MTDATA_CMD = [sys.executable, '-m', 'mtdata'] \ No newline at end of file diff --git a/tests/test_cli.py b/tests/test_cli.py index 0a259b2..a3907cc 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -1,6 +1,6 @@ import json import subprocess -from typing import List, Union +from typing import List from mtdata.index import INDEX as index from pathlib import Path from tempfile import TemporaryDirectory @@ -8,42 +8,41 @@ from mtdata.utils import IO from . import MTDATA_CMD -def shrun(cmd: Union[str, List[str]], capture_output=False): - p = subprocess.run(cmd, shell=isinstance(cmd, str), - capture_output=True) +def shrun(cmd: List[str], capture_output=False): + p = subprocess.run(cmd, capture_output=True) if capture_output: return p.returncode, p.stdout return p.returncode def test_cli_help(): - assert shrun(f'{MTDATA_CMD} --help') == 0 + assert shrun(MTDATA_CMD + ['--help']) == 0 def test_cli_list(): - code, out = shrun(f'{MTDATA_CMD} list --id', capture_output=True) + code, out = shrun(MTDATA_CMD + ['list', '--id'], capture_output=True) assert code == 0 assert len(out.splitlines()) >= len(index.entries) def test_cli_get(): with TemporaryDirectory() as out_dir: did = 'OPUS-gnome-v1-eng-kan' - assert shrun(f'{MTDATA_CMD} get -l eng-kan -tr {did} -o {out_dir}') == 0 + assert shrun(MTDATA_CMD + ['get', '-l', 'eng-kan', '-tr', did, '-o', out_dir]) == 0 assert (Path(out_dir) / 'mtdata.signature.txt').exists() def test_cache(): - code = shrun(f'{MTDATA_CMD} cache -ri tg01_2to1_test -j3', capture_output=False) + code = shrun(MTDATA_CMD + ['cache', '-ri', 'tg01_2to1_test', '-j3'], capture_output=False) assert code == 0 def test_get_recipe(): with TemporaryDirectory() as out_dir: - code = shrun(f'{MTDATA_CMD} get-recipe -ri tg01_2to1_test -o {out_dir}', capture_output=False) + code = shrun(MTDATA_CMD + ['get-recipe', '-ri', 'tg01_2to1_test', '-o', out_dir], capture_output=False) assert code == 0 def test_metadata(): with TemporaryDirectory() as out_dir: out_dir = Path(out_dir) did = "Statmt-europarl-10-slv-eng" - code = shrun(f'{MTDATA_CMD} get -l slv-eng -tr {did} -o {out_dir}', capture_output=False) + code = shrun(MTDATA_CMD + ['get', '-l', 'slv-eng', '-tr', did, '-o', str(out_dir)], capture_output=False) assert code == 0 assert (out_dir / 'mtdata.signature.txt').exists() meta_file = out_dir / "train-parts" / f"{did}.meta.jsonl.gz" diff --git a/tests/test_huggingface.py b/tests/test_huggingface.py index d54de9a..cf4816a 100644 --- a/tests/test_huggingface.py +++ b/tests/test_huggingface.py @@ -8,8 +8,8 @@ def test_hf_echo(): """Test the hf echo command.""" data_id = "Google-wmt24pp-1-eng-zho_TW" # an example dataset from HF expected_lines = 998 - cmd = f"{MTDATA_CMD} echo {data_id}" - result = sp.run(cmd, shell=True, capture_output=True, text=True) + cmd = MTDATA_CMD + ['echo', data_id] + result = sp.run(cmd, capture_output=True, text=True) assert result.returncode == 0, f"Command failed with error: {result.stderr}" out_lines = result.stdout.strip().splitlines() assert len(out_lines) == expected_lines, f"Expected {expected_lines} lines, but got {len(out_lines)} lines." \ No newline at end of file diff --git a/tests/test_map_cli.py b/tests/test_map_cli.py index 1d87e6a..befc1c0 100644 --- a/tests/test_map_cli.py +++ b/tests/test_map_cli.py @@ -1,6 +1,9 @@ import sys import subprocess from pathlib import Path +import pytest + +pytestmark = pytest.mark.skipif(sys.platform == 'win32', reason='map uses shell subprocesses not available on Windows') def run_map_cmd(cmd_args, cwd): From 3b98771a0472d609f196dbd4b52559124887fcd0 Mon Sep 17 00:00:00 2001 From: TG Gowda Date: Sun, 22 Mar 2026 18:55:18 +0000 Subject: [PATCH 08/10] reduce CI jobs -- remove windows and exclude odd python versions from latest --- .github/workflows/python-build-test.yml | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/.github/workflows/python-build-test.yml b/.github/workflows/python-build-test.yml index e9a9423..5423c6e 100644 --- a/.github/workflows/python-build-test.yml +++ b/.github/workflows/python-build-test.yml @@ -20,14 +20,29 @@ jobs: strategy: fail-fast: false matrix: - os: [ubuntu-22.04, ubuntu-latest, macos-latest, windows-latest] + os: [ubuntu-22.04, ubuntu-latest, macos-latest] python-version: ["3.10", "3.11", "3.12", "3.13", "3.14"] + exclude: #exclude some to reduce number of jobs + - os: ubuntu-latest + python-version: "3.11" + - os: ubuntu-latest + python-version: "3.13" + - os: macos-latest + python-version: "3.11" + - os: macos-latest + python-version: "3.13" steps: - uses: actions/checkout@v4 - name: Set up Python ${{ matrix.python-version }} uses: actions/setup-python@v5 with: python-version: "${{ matrix.python-version }}" + cache: pip + - name: Cache mtdata datasets + uses: actions/cache@v4 + with: + path: ~/.mtdata + key: mtdata-cache-${{ matrix.os }} - name: Install dependencies run: | pip install --upgrade pip setuptools From cc53b8a1253af8bd146d68610f4d7769b223df33 Mon Sep 17 00:00:00 2001 From: TG Gowda Date: Sun, 22 Mar 2026 19:51:55 +0000 Subject: [PATCH 09/10] batter pbar: englighten to rich, --- mtdata/__init__.py | 11 +-- mtdata/data.py | 34 +++++---- mtdata/pbar.py | 173 +++++++++++++++++++++++++++++++++++++++++++++ pyproject.toml | 2 +- 4 files changed, 203 insertions(+), 17 deletions(-) create mode 100644 mtdata/pbar.py diff --git a/mtdata/__init__.py b/mtdata/__init__.py index 1e9c09b..07bde03 100644 --- a/mtdata/__init__.py +++ b/mtdata/__init__.py @@ -11,18 +11,21 @@ import logging as log from pathlib import Path import os -import enlighten from ruamel.yaml import YAML yaml = YAML() debug_mode = False -_log_format = '%(asctime)s %(module)s.%(funcName)s:%(lineno)s %(levelname)s:: %(message)s' -log.basicConfig(level=log.INFO, datefmt='%Y-%m-%d %H:%M:%S', format=_log_format) +#_log_format = '%(module)s.%(funcName)s:%(lineno)s %(message)s' +from mtdata.pbar import get_log_handler # noqa: E402 +log.basicConfig(level=log.INFO, datefmt='%Y%m%d %H:%M:%S', + handlers=[get_log_handler()]) cache_dir = Path(os.environ.get('MTDATA', '~/.mtdata')).expanduser() recipes_dir = Path(os.getenv('MTDATA_RECIPES', '.')).resolve() cached_index_file = cache_dir / f'mtdata.index.{__version__}.pkl' resource_dir:Path = Path(__file__).parent / 'resource' -pbar_man = enlighten.get_manager() + +from mtdata.pbar import pbar_man # noqa: E402 + class MTDataException(Exception): pass diff --git a/mtdata/data.py b/mtdata/data.py index b70aab7..860584e 100644 --- a/mtdata/data.py +++ b/mtdata/data.py @@ -23,6 +23,12 @@ from mtdata.utils import IO DEF_COMPRESS = 'gz' + + +def _worker_init(progress_queue): + """Initialize worker process with a progress queue for remote updates.""" + from mtdata import pbar_man + pbar_man._queue = progress_queue DATA_FIELDS = ('train', 'dev', 'test', 'mono_train', 'mono_dev', 'mono_test') @@ -420,19 +426,23 @@ def add_parts(self, dir_path, entries, drop_noise=False, compress=False, desc=No tasks = [dict(dir_path=dir_path, entry=ent, drop_noise=drop_noise, compress=compress, fail_on_error=fail_on_error) for ent in entries] - with concurrent.futures.ProcessPoolExecutor(max_workers=self.n_jobs) as executor: - futures = [executor.submit(self.add_part_thread, task) for task in tasks] - with pbar_man.counter(color='blue', leave=False, total=len(entries), unit='it', desc=desc, + import multiprocessing as mp + progress_queue = mp.Queue() + with pbar_man.counter(color='blue', leave=False, total=len(entries), unit='it', desc=desc, autorefresh=True, min_delta=Defaults.PBAR_REFRESH_INTERVAL, position=3) as pbar: - for future in concurrent.futures.as_completed(futures): - try: - future.result() - except Exception as e: - log.error(f"Error in thread: {e}") - if fail_on_error: - raise e - finally: - pbar.update(force=True) + with pbar_man.consume_remote(progress_queue): + with concurrent.futures.ProcessPoolExecutor(max_workers=self.n_jobs, + initializer=_worker_init, initargs=(progress_queue,)) as executor: + futures = [executor.submit(self.add_part_thread, task) for task in tasks] + for future in concurrent.futures.as_completed(futures): + try: + future.result() + except Exception as e: + log.error(f"Error in thread: {e}") + if fail_on_error: + raise e + finally: + pbar.update(force=True) def add_parts_sequential(self, dir_path, entries, drop_noise=False, compress=False, desc=None, fail_on_error=False): with pbar_man.counter(color='blue', leave=False, total=len(entries), unit='it', desc=desc, diff --git a/mtdata/pbar.py b/mtdata/pbar.py new file mode 100644 index 0000000..4f79591 --- /dev/null +++ b/mtdata/pbar.py @@ -0,0 +1,173 @@ +"""Progress bar manager using rich, with support for multi-process workers.""" + +import logging +import os +import threading +from contextlib import contextmanager + +from rich.progress import (Progress, TextColumn, BarColumn, TaskProgressColumn, + TimeElapsedColumn, TimeRemainingColumn, SpinnerColumn) +from rich.console import Console +from rich.logging import RichHandler + +console = Console(stderr=True) + + +def get_log_handler(): + """Return a RichHandler that coordinates with the progress bars.""" + return RichHandler(console=console, show_path=False, show_time=True, + omit_repeated_times=False) + + +class _PbarManager: + """A shared rich Progress that supports multiple concurrent tasks. + + Supports remote mode for worker processes: set _queue to a multiprocessing.Queue + and progress updates will be forwarded to the main process. + """ + def __init__(self): + self.enabled = True + self._lock = threading.Lock() + self._progress = None + self._active = 0 + self._queue = None # set in worker processes for remote mode + self._columns = [ + SpinnerColumn(), + TextColumn("[bold blue]{task.description}"), + BarColumn(bar_width=30), + TaskProgressColumn(), + TimeElapsedColumn(), + TextColumn("eta"), + TimeRemainingColumn(), + ] + + def _start(self): + with self._lock: + if self._progress is None: + self._progress = Progress(*self._columns, console=console) + self._progress.start() + self._active += 1 + + def _stop(self, task_id): + with self._lock: + self._progress.update(task_id, visible=False) + self._active -= 1 + if self._active <= 0: + self._progress.stop() + self._progress = None + self._active = 0 + + @contextmanager + def counter(self, desc='', total=None, unit='it', **kwargs): + if not self.enabled: + yield _NoopPbar() + return + if self._queue is not None: + pbar = _RemotePbar(self._queue) + self._queue.put(('start', pbar._id, desc, total)) + try: + yield pbar + finally: + pbar.flush() + self._queue.put(('stop', pbar._id)) + return + self._start() + task_id = self._progress.add_task(desc, total=total) + try: + yield _RichPbar(self._progress, task_id) + finally: + self._stop(task_id) + + @contextmanager + def consume_remote(self, queue): + """Consume progress events from worker processes and render them in the shared Progress.""" + import queue as _queue_mod + stop_event = threading.Event() + remote_tasks = {} + + def _consume(): + while not stop_event.is_set(): + try: + msg = queue.get(timeout=0.2) + except _queue_mod.Empty: + continue + kind = msg[0] + if kind == 'start': + _, tid, desc, total = msg + ptid = self._progress.add_task(desc, total=total) + remote_tasks[tid] = ptid + elif kind == 'update': + _, tid, incr = msg + if tid in remote_tasks: + self._progress.update(remote_tasks[tid], advance=incr) + elif kind == 'stop': + _, tid = msg + if tid in remote_tasks: + self._progress.update(remote_tasks[tid], visible=False) + del remote_tasks[tid] + # drain remaining messages + while not queue.empty(): + try: + msg = queue.get_nowait() + kind = msg[0] + if kind == 'update' and msg[1] in remote_tasks: + self._progress.update(remote_tasks[msg[1]], advance=msg[2]) + elif kind == 'stop' and msg[1] in remote_tasks: + self._progress.update(remote_tasks[msg[1]], visible=False) + del remote_tasks[msg[1]] + except _queue_mod.Empty: + break + + t = threading.Thread(target=_consume, daemon=True) + t.start() + try: + yield + finally: + stop_event.set() + t.join(timeout=5) + + +class _RichPbar: + def __init__(self, progress, task_id): + self._progress = progress + self._task_id = task_id + + def update(self, incr=1, **kwargs): + self._progress.update(self._task_id, advance=incr) + + +class _RemotePbar: + """Batches progress updates and sends them via queue at most once per FLUSH_INTERVAL.""" + _counter = 0 + _lock = threading.Lock() + FLUSH_INTERVAL = 0.5 # seconds + + def __init__(self, queue): + import time + with _RemotePbar._lock: + _RemotePbar._counter += 1 + self._id = (os.getpid(), _RemotePbar._counter) + self._queue = queue + self._pending = 0 + self._last_flush = time.monotonic() + self._time = time + + def update(self, incr=1, **kwargs): + self._pending += incr + now = self._time.monotonic() + if now - self._last_flush >= self.FLUSH_INTERVAL: + self.flush() + + def flush(self): + if self._pending > 0: + self._queue.put(('update', self._id, self._pending)) + self._pending = 0 + self._last_flush = self._time.monotonic() + + +class _NoopPbar: + def update(self, incr=1, **kwargs): + pass + + +pbar_man = _PbarManager() diff --git a/pyproject.toml b/pyproject.toml index 2d8e5c8..d528258 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -33,7 +33,7 @@ keywords = [ ] dependencies = [ "requests~=2.32", - "enlighten~=1.14", + "rich~=14.0", "portalocker~=3.2", "pybtex~=0.25", "ruamel.yaml~=0.18" From da0be80abed811ee5a7513c3486a67d9db8530f4 Mon Sep 17 00:00:00 2001 From: TG Gowda Date: Sun, 22 Mar 2026 20:17:29 +0000 Subject: [PATCH 10/10] refactor: replace queue with multiprocessing.Queue in SubprocMapper --- mtdata/map.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/mtdata/map.py b/mtdata/map.py index 913b0ba..3d5ba82 100644 --- a/mtdata/map.py +++ b/mtdata/map.py @@ -14,13 +14,15 @@ from itertools import zip_longest import subprocess as sp -import queue +import multiprocessing as mp import threading as mt import sys from mtdata import log from mtdata.utils import IO +_MAX_QSIZE = 16384 if sys.platform == 'darwin' else 1024 * 1024 + #DELIM = '\t' DELIM = None @@ -62,7 +64,7 @@ def read_paths(paths: Iterator[List[Path]]) -> Iterator[Union[dict,list]]: class SubprocMapper: - def __init__(self, cmdline: str, max_qsize=1024*1024, shell=True): + def __init__(self, cmdline: str, max_qsize=_MAX_QSIZE, shell=True): self.cmdline = cmdline self._subproc_args = dict(shell=shell) self.ctrl_queue = None @@ -74,8 +76,8 @@ def __init__(self, cmdline: str, max_qsize=1024*1024, shell=True): def start(self): assert not self._started, f'Already started' - self.ctrl_queue = queue.Queue(maxsize=self.max_qsize) - self.data_queue = queue.Queue(maxsize=self.max_qsize) + self.ctrl_queue = mp.Queue(maxsize=self.max_qsize) + self.data_queue = mp.Queue(maxsize=self.max_qsize) log.info(f"RUN:\n\t{self.cmdline}") self.proc = sp.Popen(self.cmdline, stdin=sp.PIPE, stdout=sp.PIPE, text=True, **self._subproc_args) self._stop_event.clear()