Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 20 additions & 12 deletions .github/workflows/python-build-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,33 +20,41 @@ jobs:
strategy:
fail-fast: false
matrix:
os: [ubuntu-latest, macos-latest] # windows-latest
python-version: ["3.9", "3.10", "3.11", "3.12", "3.13"]
#exclude:
# - os: macos-latest
# python-version: '3.7'
# - os: ubuntu-latest
# python-version: '3.7'
# os x py versions here: https://raw.githubusercontent.com/actions/python-versions/main/versions-manifest.json
os: [ubuntu-22.04, ubuntu-latest, macos-latest]
python-version: ["3.10", "3.11", "3.12", "3.13", "3.14"]
exclude: #exclude some to reduce number of jobs
- os: ubuntu-latest
python-version: "3.11"
- os: ubuntu-latest
python-version: "3.13"
- os: macos-latest
python-version: "3.11"
- os: macos-latest
python-version: "3.13"
steps:
- uses: actions/checkout@v4
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v5
with:
python-version: "${{ matrix.python-version }}"
cache: pip
- name: Cache mtdata datasets
uses: actions/cache@v4
with:
path: ~/.mtdata
key: mtdata-cache-${{ matrix.os }}
- name: Install dependencies
run: |
pip install --upgrade pip
pip install setuptools==61.2 flake8
pip install --upgrade pip setuptools
python --version
pip --version
- name: Install module
run: |
pip install .[hf,test]
- name: Test with pytest
run: |
python3 -m pytest
python -m pytest
- name: Test CLI from other dir
run: |
cd iso-langs
python3 -m mtdata -ri list -id -l eng-kan
python -m mtdata -ri list -id -l eng-kan
13 changes: 8 additions & 5 deletions mtdata/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,28 @@
# Created: 4/4/20


__version__ = '0.4.3'
__version__ = '0.4.4-dev'
__description__ = 'mtdata is a tool to download datasets for machine translation'
__author__ = 'Thamme Gowda'

import logging as log
from pathlib import Path
import os
import enlighten
from ruamel.yaml import YAML

yaml = YAML()
debug_mode = False
_log_format = '%(asctime)s %(module)s.%(funcName)s:%(lineno)s %(levelname)s:: %(message)s'
log.basicConfig(level=log.INFO, datefmt='%Y-%m-%d %H:%M:%S', format=_log_format)
#_log_format = '%(module)s.%(funcName)s:%(lineno)s %(message)s'
from mtdata.pbar import get_log_handler # noqa: E402
log.basicConfig(level=log.INFO, datefmt='%Y%m%d %H:%M:%S',
handlers=[get_log_handler()])
cache_dir = Path(os.environ.get('MTDATA', '~/.mtdata')).expanduser()
recipes_dir = Path(os.getenv('MTDATA_RECIPES', '.')).resolve()
cached_index_file = cache_dir / f'mtdata.index.{__version__}.pkl'
resource_dir:Path = Path(__file__).parent / 'resource'
pbar_man = enlighten.get_manager()

from mtdata.pbar import pbar_man # noqa: E402


class MTDataException(Exception):
pass
Expand Down
34 changes: 22 additions & 12 deletions mtdata/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@
from mtdata.utils import IO

DEF_COMPRESS = 'gz'


def _worker_init(progress_queue):
"""Initialize worker process with a progress queue for remote updates."""
from mtdata import pbar_man
pbar_man._queue = progress_queue
DATA_FIELDS = ('train', 'dev', 'test', 'mono_train', 'mono_dev', 'mono_test')


Expand Down Expand Up @@ -420,19 +426,23 @@ def add_parts(self, dir_path, entries, drop_noise=False, compress=False, desc=No

tasks = [dict(dir_path=dir_path, entry=ent, drop_noise=drop_noise, compress=compress,
fail_on_error=fail_on_error) for ent in entries]
with concurrent.futures.ProcessPoolExecutor(max_workers=self.n_jobs) as executor:
futures = [executor.submit(self.add_part_thread, task) for task in tasks]
with pbar_man.counter(color='blue', leave=False, total=len(entries), unit='it', desc=desc,
import multiprocessing as mp
progress_queue = mp.Queue()
with pbar_man.counter(color='blue', leave=False, total=len(entries), unit='it', desc=desc,
autorefresh=True, min_delta=Defaults.PBAR_REFRESH_INTERVAL, position=3) as pbar:
for future in concurrent.futures.as_completed(futures):
try:
future.result()
except Exception as e:
log.error(f"Error in thread: {e}")
if fail_on_error:
raise e
finally:
pbar.update(force=True)
with pbar_man.consume_remote(progress_queue):
with concurrent.futures.ProcessPoolExecutor(max_workers=self.n_jobs,
initializer=_worker_init, initargs=(progress_queue,)) as executor:
futures = [executor.submit(self.add_part_thread, task) for task in tasks]
for future in concurrent.futures.as_completed(futures):
try:
future.result()
except Exception as e:
log.error(f"Error in thread: {e}")
if fail_on_error:
raise e
finally:
pbar.update(force=True)

def add_parts_sequential(self, dir_path, entries, drop_noise=False, compress=False, desc=None, fail_on_error=False):
with pbar_man.counter(color='blue', leave=False, total=len(entries), unit='it', desc=desc,
Expand Down
20 changes: 20 additions & 0 deletions mtdata/index/other.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,3 +144,23 @@ def load_all(index: Index):
l2_ext = l2.replace('_', "-")
index += Entry(did=f"Microsoft-ntrex-128-{l1}-{l2}", url=_url, filename="NTREX-52b9c57c.tar.gz",
in_ext='txt', in_paths=[f"*/NTREX-128/newstest2019-ref.{l1_ext}.txt", f"*/NTREX-128/newstest2019-ref.{l2_ext}.txt"])

### English - Bhojpuri ###
url="https://github.com/shashwatup9k/BHLTR/archive/2d2550033222.zip"
filename = url.split('/')[-1] # this will force to share the file across the entries
cite = ("ojha2019english",)
# Parallel corpora
for split, splitname in [ # shortname, fullname, suffix
("train", "training"),
("dev", "development"),
("test", "test.*")]:
f1 = f"*/parallel-corpora/eng--bho.{splitname}.eng"
f2 = f"*/parallel-corpora/eng--bho.{splitname}.bho"
index += Entry(did=DatasetId(group='BHLTR', name=split, version='1', langs=('eng', 'bho')),
url=url, filename=filename, ext='zip', in_ext='txt', in_paths=[f1, f2], cite=cite)
# monolingual corpora
for version, f1 in [
("1", "*/mono-bho-corpus/monolingual.bho"),
("2", "*/mono-bho-corpus/monolingual-v0.2.bho")]:
index += Entry(did=DatasetId(group='BHLTR', name=f'mono', version=version, langs=('bho',)),
url=url, filename=filename, ext='zip', in_ext='txt', in_paths=[f1], cite=cite)
2 changes: 1 addition & 1 deletion mtdata/index/statmt.py
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ def load_mono(index: Index):

wmt22_cite = ('kocmi-etal-2022-findings',)
# 1. News Crawl
"""
r"""
base=https://data.statmt.org/news-crawl
langs=$(curl $base/ | grep -o 'href="[a-z][a-z]\+/"' | cut -f2 -d \")
for i in $langs; do
Expand Down
41 changes: 33 additions & 8 deletions mtdata/map.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@
from mtdata import log
from mtdata.utils import IO

_MAX_QSIZE = 16384 if sys.platform == 'darwin' else 1024 * 1024

DELIM = '\t'

#DELIM = '\t'
DELIM = None
SENTINEL = None


Expand All @@ -49,19 +52,19 @@ def read_paths(paths: Iterator[List[Path]]) -> Iterator[Union[dict,list]]:
for rec in zip_longest(*streams):
if len(inps) > 1 and any(x is None for x in rec):
raise ValueError(f"Unequal number of lines detected in {inps} @ count: {counter}")
rec = '\t'.join(x.strip().replace('\t', ' ') for x in rec)
rec = '\t'.join(x.strip() for x in rec)
yield rec
counter += 1
n_data += counter
log.info(f"Producer: end of {inps}; count: {counter}")
log.info(f"Producer: End of {','.join(str(x) for x in inps)}; count: {counter}")
except Exception as e:
log.exception(f"Producer: error in {inps}: {e}")
log.info(f"Producer: finishing... n_ctrls: {n_ctrls}; n_data: {n_data:,}")


class SubprocMapper:

def __init__(self, cmdline: str, max_qsize=1024*1024, shell=True):
def __init__(self, cmdline: str, max_qsize=_MAX_QSIZE, shell=True):
self.cmdline = cmdline
self._subproc_args = dict(shell=shell)
self.ctrl_queue = None
Expand Down Expand Up @@ -188,7 +191,11 @@ def read_stream(cls, paths: Iterator[List[Path]]) -> Iterator[Union[dict,list]]:

def read_input_paths(input, delim=DELIM):
for line in input:
parts = line.rstrip("\n").split(delim)
parts = line.rstrip("\n")
if delim:
parts = parts.split(delim)
else:
parts = parts.split() # white spaces
parts = [Path(p) for p in parts]
yield parts

Expand Down Expand Up @@ -219,26 +226,44 @@ def main():
stream = trim_stream(stream, skip=n_skip, limit=n_limit)

mapper = SubprocMapper(cmdline=args['cmdline'])
out = None
try:
out_stream = mapper(stream)
for rec in out_stream:
print(rec)
if isinstance(rec, dict):
if out is not None:
log.info(f"[[closing]] {out.name}")
out.close()
log.info(f"[[opening]] {rec['output']}")
out_path = Path(rec['output'])
if args['make_parents']:
out_path.parent.mkdir(parents=True, exist_ok=True)
out = out_path.open('w', encoding='utf-8', errors='replace')
else:
assert out is not None, f"Output file is not opened yet"
out.write(rec + '\n')
except Exception as e:
mapper.close()
raise

finally:
if out is not None:
log.info(f"((closing)) {out.name}")
out.close()

def parse_args():
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('-c', '--cmd', dest='cmdline', type=str, required=True,
help="Mapper command that maps line-by-line, maintains 1:1 mapping and the input order. For example: 'cat'")
parser.add_argument('-i', '--input', type=argparse.FileType('r'), default=sys.stdin,
help="Listing file containing file paths. Atleast two paths per line is expected first one is input and last one is output")
parser.add_argument('-d', '--delim', type=str, default=DELIM, help="delimiter for paths in input")
parser.add_argument('-d', '--delim', type=str, default=DELIM, help="delimiter for paths in input. default=None => split by all whitespaces (space, tab etc.)")
parser.add_argument('-l', '--limit', type=int, default=0,
help="Limit data stream to these many lines. Score: for debugging and testing")
parser.add_argument('-s', '--skip', type=int, default=0,
help="Skip the first n records. Scope: for debugging and testing")
parser.add_argument('-p', '--parents', action='store_true', dest='make_parents',
help="Create parent directories for output files if they do not exist")
# return the parsed arguments
return parser.parse_args()

if __name__ == '__main__':
Expand Down
Loading
Loading