Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions complex_tokenization/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from complex_tokenization.tokenizer import (
BNETokenizer,
BoundlessBPETokenizer,
BPETokenizer,
SuperBPETokenizer,
Tokenizer,
)

__all__ = [
"Tokenizer",
"BPETokenizer",
"BNETokenizer",
"BoundlessBPETokenizer",
"SuperBPETokenizer",
]
Empty file.
40 changes: 0 additions & 40 deletions complex_tokenization/examples/bne.py

This file was deleted.

11 changes: 0 additions & 11 deletions complex_tokenization/examples/boundless_bpe.py

This file was deleted.

24 changes: 0 additions & 24 deletions complex_tokenization/examples/super_bpe.py

This file was deleted.

10 changes: 0 additions & 10 deletions complex_tokenization/examples/utils.py

This file was deleted.

106 changes: 106 additions & 0 deletions complex_tokenization/fast_bpe_trainer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
"""Fast BPE trainer using incremental pair counting.

Instead of rescanning the entire corpus for merge candidates each iteration,
maintains a running pair frequency count and only updates affected positions.
"""

from collections import Counter

from complex_tokenization.graph import GraphVertex, Node, NodesSequence, UnconnectedGraphs
from complex_tokenization.graphs.settings import GraphSettings
from complex_tokenization.graphs.units import utf8_clusters
from complex_tokenization.graphs.words import words


class FastBPETrainer:
def __init__(self, texts: list[str], connected: bool = False, units=utf8_clusters):
GraphSettings.ONLY_MINIMAL_MERGES = True
GraphSettings.MAX_MERGE_SIZE = 2
GraphSettings.USE_SINGLETONS = False

self.word_freqs: dict[tuple[bytes, ...], int] = Counter()
for text in texts:
tokens = self._text_to_token_tuples(text, connected, units)
for token_tuple in tokens:
self.word_freqs[token_tuple] += 1

self.merges: list[tuple[bytes, bytes]] = []

@staticmethod
def _text_to_token_tuples(text, connected, units) -> list[tuple[bytes, ...]]:
graph = words(text, connected=connected, units=units)
result = []

if isinstance(graph, UnconnectedGraphs):
subgraphs = graph.subgraphs
else:
subgraphs = (graph,)

for sg in subgraphs:
token_tuple = FastBPETrainer._flatten_to_bytes(sg)
if token_tuple and len(token_tuple) > 1:
result.append(token_tuple)
return result

@staticmethod
def _flatten_to_bytes(vertex: GraphVertex) -> tuple[bytes, ...]:
if isinstance(vertex, Node):
return (vertex.value,)
if isinstance(vertex, NodesSequence):
result = []
for n in vertex.nodes:
result.extend(FastBPETrainer._flatten_to_bytes(n))
return tuple(result)
return (bytes(vertex),)

def _get_pair_counts(self) -> Counter:
counts = Counter()
for word, freq in self.word_freqs.items():
for i in range(len(word) - 1):
counts[(word[i], word[i + 1])] += freq
return counts

def _apply_merge(self, pair: tuple[bytes, bytes]) -> dict[tuple[bytes, ...], int]:
a, b = pair
merged = a + b
new_freqs = {}

for word, freq in self.word_freqs.items():
new_word = []
i = 0
while i < len(word):
if i < len(word) - 1 and word[i] == a and word[i + 1] == b:
new_word.append(merged)
i += 2
else:
new_word.append(word[i])
i += 1
new_freqs[tuple(new_word)] = new_freqs.get(tuple(new_word), 0) + freq

self.word_freqs = new_freqs
return new_freqs

def train(self, num_merges: int = 100):
pair_counts = self._get_pair_counts()

for _ in range(num_merges):
if not pair_counts:
break

best_pair = max(pair_counts, key=pair_counts.get)
if pair_counts[best_pair] < 1:
break

self._apply_merge(best_pair)
self.merges.append(best_pair)

pair_counts = Counter()
for word, freq in self.word_freqs.items():
for i in range(len(word) - 1):
pair_counts[(word[i], word[i + 1])] += freq

def get_merges(self) -> list[tuple[str, ...]]:
return [
tuple(b.decode("utf-8", errors="replace") for b in pair)
for pair in self.merges
]
113 changes: 113 additions & 0 deletions complex_tokenization/tokenizer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
"""High-level tokenizer API.

Usage:
tokenizer = BPETokenizer()
tokenizer.train(texts, num_merges=100)
merges = tokenizer.get_merges()

With language-specific decomposition:
from complex_tokenization.languages.hebrew.decompose import decompose_cluster
tokenizer = BPETokenizer()
tokenizer.register_script("Hebrew", decompose_cluster)
tokenizer.train(texts, num_merges=100)
"""

from collections.abc import Callable
from functools import reduce

from complex_tokenization.graph import GraphVertex, Node
from complex_tokenization.graphs.settings import GraphSettings
from complex_tokenization.graphs.units import characters, register_script, utf8, utf8_clusters
from complex_tokenization.graphs.words import words
from complex_tokenization.trainer import Trainer

UNIT_FUNCTIONS: dict[str, Callable[[str], GraphVertex]] = {
"utf8": utf8,
"utf8_clusters": utf8_clusters,
"characters": characters,
}


class Tokenizer:
def __init__(
self,
units: str | Callable[[str], GraphVertex] = "utf8_clusters",
merge_size: int = 2,
connected: bool = False,
):
if isinstance(units, str):
if units not in UNIT_FUNCTIONS:
raise ValueError(f"Unknown units: {units!r}. Choose from {list(UNIT_FUNCTIONS)}")
self.units = UNIT_FUNCTIONS[units]
else:
self.units = units
self.merge_size = merge_size
self.connected = connected
self.trainer: Trainer | None = None

@staticmethod
def register_script(script: str, handler: Callable[[str], GraphVertex]):
register_script(script, handler)

def _build_graphs(self, texts: list[str]) -> tuple[GraphVertex, ...]:
return tuple(
words(text, connected=self.connected, units=self.units)
for text in texts
)

def train(self, texts: list[str], num_merges: int = 100,
known_merges: list[tuple[str, ...]] | None = None) -> list:
GraphSettings.ONLY_MINIMAL_MERGES = True
GraphSettings.MAX_MERGE_SIZE = self.merge_size

graphs = self._build_graphs(texts)
self.trainer = Trainer(graphs=graphs)

if known_merges:
for merge_strs in known_merges:
nodes = tuple(Node(value=s.encode("utf-8")) for s in merge_strs)
token = reduce(lambda a, b: a + b, nodes)
self.trainer.graph = self.trainer.graph.merge(token, nodes)
self.trainer.merges.append((token, nodes))

self.trainer.train(num_merges=num_merges)
return self.get_merges()

def get_merges(self) -> list[tuple[str, ...]]:
if self.trainer is None:
return []
return self.trainer.get_merges()


class BPETokenizer(Tokenizer):
def __init__(self, units="utf8_clusters"):
super().__init__(units=units, merge_size=2, connected=False)


class BNETokenizer(Tokenizer):
def __init__(self, n=4, units="utf8_clusters"):
super().__init__(units=units, merge_size=n, connected=False)


class BoundlessBPETokenizer(Tokenizer):
def __init__(self, units="utf8_clusters"):
super().__init__(units=units, merge_size=2, connected=True)


class SuperBPETokenizer(Tokenizer):
def __init__(self, units="utf8_clusters", disconnected_merges: int | None = None):
super().__init__(units=units, merge_size=2, connected=False)
self._disconnected_merges = disconnected_merges

def train(self, texts: list[str], num_merges: int = 100,
known_merges: list[tuple[str, ...]] | None = None) -> list:
disconnected_merges = self._disconnected_merges or num_merges // 2

phase1 = BPETokenizer(units=self.units)
phase1_merges = phase1.train(texts, num_merges=disconnected_merges, known_merges=known_merges)

phase2 = BoundlessBPETokenizer(units=self.units)
result = phase2.train(texts, num_merges=num_merges, known_merges=phase1_merges)

self.trainer = phase2.trainer
return result
62 changes: 62 additions & 0 deletions tests/languages/test_hebrew_training.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
"""Test training a tokenizer on Hebrew text with diacritics decomposition."""

from complex_tokenization.graphs.settings import GraphSettings
from complex_tokenization.graphs.units import register_script, utf8_clusters
from complex_tokenization.graphs.words import pretokenize
from complex_tokenization.languages.hebrew.decompose import decompose_cluster
from complex_tokenization.trainer import Trainer


def train_hebrew(texts, num_merges=10):
register_script("Hebrew", decompose_cluster)
GraphSettings.ONLY_MINIMAL_MERGES = True
GraphSettings.MAX_MERGE_SIZE = 2

graphs = tuple(utf8_clusters(t) for t in texts)
trainer = Trainer(graphs=graphs)
trainer.train(num_merges=num_merges)
return trainer


class TestHebrewTraining:
def test_simple_word_training(self):
texts = ["שלום שלום שלום"]
trainer = train_hebrew(texts, num_merges=5)
assert len(trainer.merges) > 0

def test_nikkud_text_training(self):
texts = ["בְּרֵאשִׁית בָּרָא אֱלֹהִים"] * 3
trainer = train_hebrew(texts, num_merges=10)
assert len(trainer.merges) > 0

def test_repeated_diacritics_merge(self):
"""Shared diacritics across words should produce frequent merges."""
texts = ["בָּ כָּ דָּ גָּ פָּ תָּ"] * 5
trainer = train_hebrew(texts, num_merges=15)
merge_bytes = [
b"".join(bytes(n) for n in nodes)
for _, nodes in trainer.merges
]
dagesh = "ּ".encode()
qamats = "ָ".encode()
assert any(dagesh in mb or qamats in mb for mb in merge_bytes), (
"Expected dagesh or qamats in early merges"
)

def test_mixed_nikkud_and_plain(self):
texts = ["שלום עולם", "בְּרֵאשִׁית"]
trainer = train_hebrew(texts, num_merges=5)
assert len(trainer.merges) > 0

def test_bytes_preserved(self):
register_script("Hebrew", decompose_cluster)
text = "שלום"
graph = utf8_clusters(text)
assert bytes(graph) == text.encode()

def test_pretokenize_hebrew(self):
text = "שלום עולם"
tokens = pretokenize(text)
assert len(tokens) == 2
assert tokens[0] == "שלום"
assert tokens[1] == " עולם"
Loading
Loading