From c5ebd8463d5eb2d024492a2eb7112d82adbca6c0 Mon Sep 17 00:00:00 2001 From: chelovek Date: Wed, 25 Jun 2025 17:33:55 +0300 Subject: [PATCH 1/3] filter --- README.md | 2 +- parser.py | 106 ++++++++++++++++++++++++++++++++++++++++++------------ 2 files changed, 85 insertions(+), 23 deletions(-) diff --git a/README.md b/README.md index 47533e7..08a1fd9 100644 --- a/README.md +++ b/README.md @@ -3,7 +3,7 @@ # Как использовать: 1. Скачайте данный репозиторий на ваш пк -2. Установите зафисимости с помощью +2. Установите зависимости с помощью ``` pip install -r requirements.txt ``` diff --git a/parser.py b/parser.py index 9c86e82..7d40818 100644 --- a/parser.py +++ b/parser.py @@ -1,29 +1,88 @@ import json from pathlib import Path from typing import Dict, Any, List, Optional, Set -from export import export_data +from datasets import load_dataset +import orjson import typer import pandas as pd +import re Message = Dict[str, Any] Context = List[Optional[Message]] + app = typer.Typer() +BAD_WORDS = {"нежелательное_слово1", "нежелательное_слово2", "нежелательное_слово3"} + +#поиск emoji +EMOJI_RE = re.compile("[\U0001F600-\U0001F64F" + "\U0001F300-\U0001F5FF" + "\U0001F680-\U0001F6FF" + "\U0001F1E0-\U0001F1FF]+", flags=re.UNICODE) + +def contains_emoji(text: str) -> bool: + return bool(EMOJI_RE.search(text)) + +def contains_bad_words(text: str) -> bool: + lowered = text.lower() + return any(bad in lowered for bad in BAD_WORDS) + +def is_valid(text: Optional[str], min_len: int = 5, max_len: int = 500) -> bool: + if not text: + return False + if not (min_len <= len(text.strip()) <= max_len): + return False + if contains_bad_words(text): + return False + if contains_emoji(text): + return False + return True + +def export_data(path: Path): + print(f"Loading CSV dataset from {path / 'raw.csv'}") + data = load_dataset('csv', data_files={'train': str(path / "raw.csv")}) + data = data['train'].train_test_split(test_size=0.2) + + def is_pair_clean(sample): + return all(is_valid(sample.get(field)) for field in ['context_1', 'response']) + + print("Filtering dataset...") + data = data.filter(is_pair_clean) + + print("Saving train.jsonl and test.jsonl") + with open(path / 'train.jsonl', 'wb') as dataset: + for chunk in data['train']: + dataset.write(orjson.dumps(chunk, option=orjson.OPT_APPEND_NEWLINE)) + + with open(path / 'test.jsonl', 'wb') as dataset: + for chunk in data['test']: + dataset.write(orjson.dumps(chunk, option=orjson.OPT_APPEND_NEWLINE)) + print("Export finished") + + @app.command() def prepare_messages( tg_history_path: Path = typer.Option(..., help='Path to telegram history json file'), - output_path: Path = typer.Option(..., help='Path to output file'), + output_path: Path = typer.Option(..., help='Path to output directory'), ): - with tg_history_path.open() as messages_file: - messages = json.load(messages_file)['messages'] + print(f"Loading telegram history from {tg_history_path}") + with tg_history_path.open(encoding='utf-8') as messages_file: + data = json.load(messages_file) + messages = data.get('messages', []) + print(f"Loaded {len(messages)} messages") contexts = _create_contexts(messages) contexts = _transform_contexts(contexts) + print(f"Prepared {len(contexts)} contexts") + output_path.mkdir(parents=True, exist_ok=True) contexts_df = pd.DataFrame.from_records(contexts) contexts_df.drop_duplicates(inplace=True) - contexts_df.to_csv(output_path + '/raw.csv', index=False) + csv_path = output_path / 'raw.csv' + contexts_df.to_csv(csv_path, index=False) + print(f"Saved CSV to {csv_path}") + export_data(output_path) def _create_contexts(messages: List[Message]) -> List[Context]: @@ -40,9 +99,9 @@ def _create_contexts(messages: List[Message]) -> List[Context]: for message in messages: if ( - message['type'] != 'message' or - not message['text'] or - not isinstance(message['text'], str) or + message.get('type') != 'message' or + not message.get('text') or + not isinstance(message.get('text'), (str, list)) or message['id'] in visited_replies ): continue @@ -58,8 +117,13 @@ def _create_contexts(messages: List[Message]) -> List[Context]: _resolve_thread(contexts, replies_threads, visited_replies, id_to_message, message) continue - if cur_context[-1] and message['from_id'] == cur_context[-1]['from_id']: - contexts[-1][-1]['text'] += '\n' + message["text"] + if cur_context[-1] and message.get('from_id') == cur_context[-1].get('from_id'): + # Объединяем текст сообщений от одного отправителя подряд + if isinstance(cur_context[-1]['text'], list): + # Если предыдущий текст — список, соединяем + cur_context[-1]['text'].extend(message['text'] if isinstance(message['text'], list) else [message['text']]) + else: + cur_context[-1]['text'] += '\n' + (message['text'] if isinstance(message['text'], str) else ''.join(message['text'])) continue cur_context.pop(0) @@ -68,7 +132,6 @@ def _create_contexts(messages: List[Message]) -> List[Context]: return contexts - def _resolve_thread( contexts: List[Context], replies_threads: Dict[int, int], @@ -87,11 +150,9 @@ def _resolve_thread( visited_replies.add(cur_id) cur_id = replies_threads.get(cur_id) - def _transform_contexts(contexts: List[Context]) -> List[Dict[str, Optional[str]]]: return [_transform_context(context) for context in contexts if any(context)] - def _transform_context(context: Context) -> Dict[str, Optional[str]]: return { 'context_3': _transform_message(context[0]), @@ -100,21 +161,22 @@ def _transform_context(context: Context) -> Dict[str, Optional[str]]: 'response': _transform_message(context[3]), } - def _transform_message(message: Optional[Message]) -> Optional[str]: if not message: return None - if isinstance(message['text'], list): - texts = [text['text'] if isinstance(text, dict) else text for text in message['text']] - message['text'] = ''.join(texts) - - return message['text'] - + text = message.get('text') + if isinstance(text, list): + texts = [text_part['text'] if isinstance(text_part, dict) else text_part for text_part in text] + text = ''.join(texts) + return text def _create_default_list(message: Optional[Message] = None) -> List[Optional[Message]]: return [None, None, None, message] - -if __name__ == '__main__': +if __name__ == "__main__": + import sys + if len(sys.argv) > 1 and sys.argv[1] == "prepare-messages": + # Убираем первый аргумент, чтобы typer понял команду + sys.argv = sys.argv[1:] app() From 7800be54595e65c53e5931ca77c295fcf2247c89 Mon Sep 17 00:00:00 2001 From: chelovek Date: Wed, 25 Jun 2025 17:52:52 +0300 Subject: [PATCH 2/3] filter --- parser.py | 142 +++++++++++++++++++++++------------------------------- 1 file changed, 59 insertions(+), 83 deletions(-) diff --git a/parser.py b/parser.py index 7d40818..d7027e1 100644 --- a/parser.py +++ b/parser.py @@ -1,33 +1,36 @@ import json +import re +import sys from pathlib import Path -from typing import Dict, Any, List, Optional, Set -from datasets import load_dataset +from typing import Dict, Any, List, Optional + import orjson -import typer import pandas as pd -import re +import typer +from datasets import load_dataset Message = Dict[str, Any] Context = List[Optional[Message]] app = typer.Typer() - BAD_WORDS = {"нежелательное_слово1", "нежелательное_слово2", "нежелательное_слово3"} -#поиск emoji EMOJI_RE = re.compile("[\U0001F600-\U0001F64F" "\U0001F300-\U0001F5FF" "\U0001F680-\U0001F6FF" "\U0001F1E0-\U0001F1FF]+", flags=re.UNICODE) + def contains_emoji(text: str) -> bool: return bool(EMOJI_RE.search(text)) + def contains_bad_words(text: str) -> bool: lowered = text.lower() return any(bad in lowered for bad in BAD_WORDS) + def is_valid(text: Optional[str], min_len: int = 5, max_len: int = 500) -> bool: if not text: return False @@ -39,6 +42,7 @@ def is_valid(text: Optional[str], min_len: int = 5, max_len: int = 500) -> bool: return False return True + def export_data(path: Path): print(f"Loading CSV dataset from {path / 'raw.csv'}") data = load_dataset('csv', data_files={'train': str(path / "raw.csv")}) @@ -51,13 +55,13 @@ def is_pair_clean(sample): data = data.filter(is_pair_clean) print("Saving train.jsonl and test.jsonl") - with open(path / 'train.jsonl', 'wb') as dataset: - for chunk in data['train']: - dataset.write(orjson.dumps(chunk, option=orjson.OPT_APPEND_NEWLINE)) + with open(path / 'train.jsonl', 'wb') as train_file: + for item in data['train']: + train_file.write(orjson.dumps(item, option=orjson.OPT_APPEND_NEWLINE)) + with open(path / 'test.jsonl', 'wb') as test_file: + for item in data['test']: + test_file.write(orjson.dumps(item, option=orjson.OPT_APPEND_NEWLINE)) - with open(path / 'test.jsonl', 'wb') as dataset: - for chunk in data['test']: - dataset.write(orjson.dumps(chunk, option=orjson.OPT_APPEND_NEWLINE)) print("Export finished") @@ -67,92 +71,69 @@ def prepare_messages( output_path: Path = typer.Option(..., help='Path to output directory'), ): print(f"Loading telegram history from {tg_history_path}") - with tg_history_path.open(encoding='utf-8') as messages_file: - data = json.load(messages_file) - messages = data.get('messages', []) + with tg_history_path.open(encoding='utfФ-8') as f: + messages = json.load(f).get("messages", []) + print(f"Loaded {len(messages)} messages") contexts = _create_contexts(messages) - contexts = _transform_contexts(contexts) - print(f"Prepared {len(contexts)} contexts") + transformed = _transform_contexts(contexts) + + print(f"Prepared {len(transformed)} contexts") output_path.mkdir(parents=True, exist_ok=True) - contexts_df = pd.DataFrame.from_records(contexts) - contexts_df.drop_duplicates(inplace=True) - csv_path = output_path / 'raw.csv' - contexts_df.to_csv(csv_path, index=False) - print(f"Saved CSV to {csv_path}") + df = pd.DataFrame.from_records(transformed) + df.drop_duplicates(inplace=True) + csv_path = output_path / "raw.csv" + df.to_csv(csv_path, index=False) + print(f"Saved to {csv_path}") export_data(output_path) + def _create_contexts(messages: List[Message]) -> List[Context]: - replies_threads = {} - id_to_message = {} - for message in messages: - id_to_message[message['id']] = message - if 'reply_to_message_id' in message: - replies_threads[message['reply_to_message_id']] = message['id'] - - contexts = [] - cur_context = _create_default_list() - visited_replies = set() - - for message in messages: - if ( - message.get('type') != 'message' or - not message.get('text') or - not isinstance(message.get('text'), (str, list)) or - message['id'] in visited_replies - ): - continue + contexts: List[Context] = [] + current_context: List[Optional[Message]] = [] - if 'forwarded_from' in message and cur_context: - contexts.append(cur_context) - cur_context = _create_default_list() - continue + last_author = None - if message['id'] in replies_threads: - contexts.append(cur_context) - cur_context = _create_default_list() - _resolve_thread(contexts, replies_threads, visited_replies, id_to_message, message) + for msg in messages: + if msg.get("type") != "message": + continue + text = msg.get("text") + if not text or not isinstance(text, (str, list)): continue - if cur_context[-1] and message.get('from_id') == cur_context[-1].get('from_id'): - # Объединяем текст сообщений от одного отправителя подряд - if isinstance(cur_context[-1]['text'], list): - # Если предыдущий текст — список, соединяем - cur_context[-1]['text'].extend(message['text'] if isinstance(message['text'], list) else [message['text']]) + # Объединяем сообщения одного автора + if last_author == msg.get("from_id") and current_context: + if isinstance(current_context[-1]['text'], list): + if isinstance(text, list): + current_context[-1]['text'].extend(text) + else: + current_context[-1]['text'].append(text) else: - cur_context[-1]['text'] += '\n' + (message['text'] if isinstance(message['text'], str) else ''.join(message['text'])) + if isinstance(text, list): + combined = ''.join(t['text'] if isinstance(t, dict) else t for t in text) + else: + combined = text + current_context[-1]['text'] += '\n' + combined continue - cur_context.pop(0) - cur_context.append(message) - contexts.append(cur_context.copy()) + # Новый автор — добавляем как новый шаг контекста + current_context.append(msg) + last_author = msg.get("from_id") - return contexts - -def _resolve_thread( - contexts: List[Context], - replies_threads: Dict[int, int], - visited_replies: Set[int], - id_to_message: Dict[int, Message], - message: Message, -) -> None: - cur_context = _create_default_list() - cur_id = message['id'] + if len(current_context) == 4: + contexts.append(current_context.copy()) + current_context.pop(0) - while cur_id: - cur_context.pop(0) - cur_context.append(id_to_message[cur_id]) - contexts.append(cur_context.copy()) + return contexts - visited_replies.add(cur_id) - cur_id = replies_threads.get(cur_id) def _transform_contexts(contexts: List[Context]) -> List[Dict[str, Optional[str]]]: return [_transform_context(context) for context in contexts if any(context)] + def _transform_context(context: Context) -> Dict[str, Optional[str]]: return { 'context_3': _transform_message(context[0]), @@ -161,22 +142,17 @@ def _transform_context(context: Context) -> Dict[str, Optional[str]]: 'response': _transform_message(context[3]), } + def _transform_message(message: Optional[Message]) -> Optional[str]: if not message: return None - - text = message.get('text') + text = message.get("text") if isinstance(text, list): - texts = [text_part['text'] if isinstance(text_part, dict) else text_part for text_part in text] - text = ''.join(texts) + return ''.join(t["text"] if isinstance(t, dict) else t for t in text) return text -def _create_default_list(message: Optional[Message] = None) -> List[Optional[Message]]: - return [None, None, None, message] if __name__ == "__main__": - import sys if len(sys.argv) > 1 and sys.argv[1] == "prepare-messages": - # Убираем первый аргумент, чтобы typer понял команду sys.argv = sys.argv[1:] app() From 81f213a2afbb0e1a9d2f1383ae5f1b87bb8d005e Mon Sep 17 00:00:00 2001 From: MarkProMaster229 <164912605+MarkProMaster229@users.noreply.github.com> Date: Wed, 25 Jun 2025 17:54:26 +0300 Subject: [PATCH 3/3] Update README.md --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index 08a1fd9..e283c4f 100644 --- a/README.md +++ b/README.md @@ -18,3 +18,4 @@ python parser.py --tg_history_path /path/to/history/file.json --output_path /pat 1. raw.csv - файл с неочищенными данными 2. train.jsonl и test.jsonl - данные готовые для дальнейшей обработки ``` +P.S добавлен фильтр контента и сортировка диалога