This repository was archived by the owner on Feb 20, 2026. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathprefix.py
More file actions
341 lines (304 loc) · 14.1 KB
/
prefix.py
File metadata and controls
341 lines (304 loc) · 14.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
"""Prefix entry point and REPL wiring."""
from __future__ import annotations
import argparse
import sys
import os
from typing import List, Optional
import threading
from extensions import PrefixExtensionError, ReplContext, load_runtime_services, RuntimeServices
from interpreter import PrefixRuntimeError, Environment, ExitSignal, Interpreter, TracebackFormatter
from lexer import PrefixParseError, Lexer
from parser import Parser, Statement
import tkinter # needed for the gui extension (ext\gui.py) to work for the frozen executable
def _print_error(msg: str) -> None:
"""Print an error message to stderr prefixed with U+0007 (BEL)."""
print("\u0007" + str(msg), file=sys.stderr)
def _print_internal_error(*, exc: BaseException, interpreter: Optional[Interpreter] = None, verbose: bool = False, traceback_json: bool = False) -> int:
"""Last-resort error handler.
Requirement: no Prefix error should ever surface as a Python traceback.
"""
# Let deliberate exits behave normally.
if isinstance(exc, SystemExit):
raise exc
if isinstance(exc, KeyboardInterrupt):
print("Interrupted", file=sys.stderr)
return 130
message = f"InternalError: {exc.__class__.__name__}: {exc}"
if interpreter is not None:
try:
location = None
if interpreter.logger.entries:
location = interpreter.logger.entries[-1].source_location
error = PrefixRuntimeError(message, location=location, rewrite_rule="INTERNAL")
if interpreter.logger.entries:
error.step_index = interpreter.logger.entries[-1].step_index
formatter = TracebackFormatter(interpreter)
_print_error(formatter.format_text(error, verbose=verbose))
if traceback_json:
_print_error(formatter.to_json(error))
return 1
except Exception:
# If formatting itself fails, fall back to a simple one-liner.
pass
_print_error(message)
return 1
def _parse_statements_from_source(text: str, filename: str, *, type_names: Optional[set[str]] = None) -> List[Statement]:
lexer = Lexer(text, filename)
tokens = lexer.tokenize()
parser = Parser(tokens, filename, text.splitlines(), type_names=type_names)
program = parser.parse()
return program.statements
def run_repl(*, verbose: bool, services: Optional[RuntimeServices]) -> int:
print("\x1b[38;2;153;221;255mPrefix\033[0m REPL. Enter statements, blank line to run buffer.") # "Prefix" in light blue
# Use "<repl>" as the REPL's effective source filename so that MAIN() and imports behave
had_output = False
# Tracks whether any PRINT has occurred since the last REPL input.
first_print_since_input = True
main_thread_id = threading.get_ident()
def _output_sink(text: str) -> None:
nonlocal had_output, first_print_since_input
# If this is not the first PRINT since the last REPL input,
# emit a leading newline so outputs appear on separate lines.
if threading.get_ident() != main_thread_id:
# Background-thread output: ensure it appears on its own line
# to avoid being glued to the prompt. Use a normal print so a
# trailing newline is emitted.
try:
# If not first print since input, keep the separation.
if not first_print_since_input:
print()
first_print_since_input = False
had_output = True
print(text)
except Exception:
# Best-effort: swallow to avoid crashing the REPL.
pass
return
# Main-thread REPL output: keep original inline behaviour so the
# REPL can control newlines and prompt placement.
if not first_print_since_input:
print()
first_print_since_input = False
had_output = True
# Print the text without appending a trailing newline.
print(text, end="", flush=True)
picked = services.hook_registry.pick_repl() if services is not None else None
if picked is not None:
_name, runner, _ext = picked
assert services is not None
ctx = ReplContext(
verbose=verbose,
services=services,
make_interpreter=lambda source, filename: Interpreter(
source=source,
filename=filename,
verbose=verbose,
services=services,
input_provider=(lambda: input()),
output_sink=_output_sink,
),
)
try:
return runner(ctx)
except PrefixExtensionError as exc:
_print_error(f"ExtensionError: {exc}")
return 1
except BaseException as exc:
return _print_internal_error(exc=exc, interpreter=None, verbose=verbose)
try:
interpreter = Interpreter(
source="",
filename="<repl>",
verbose=verbose,
services=services,
input_provider=(lambda: input()),
output_sink=_output_sink,
)
except PrefixExtensionError as exc:
_print_error(f"ExtensionError: {exc}")
return 1
except BaseException as exc:
return _print_internal_error(exc=exc, interpreter=None, verbose=verbose)
global_env = Environment()
# Make the REPL top-level frame mimic script top-level frame
global_frame = interpreter._new_frame("<top-level>", global_env, None)
interpreter.call_stack.append(global_frame)
buffer: List[str] = []
while True:
prompt = "\x1b[38;2;153;221;255m>>>\033[0m " if not buffer else "\x1b[38;2;153;221;255m..>\033[0m " # light blue
if had_output:
# Ensure prompt starts on a fresh line if the program printed anything
print()
had_output = False
try:
line = input(prompt)
except EOFError:
print()
break
# Reset per-input PRINT tracking so the next PRINT is treated as
# the first since this REPL input.
first_print_since_input = True
stripped = line.strip()
is_block_start = False
if not buffer:
uc = stripped.upper()
if uc.startswith("FUNC") or uc.startswith("IF") or uc.startswith("WHILE") or uc.startswith("FOR") or uc.startswith("PARFOR"):
is_block_start = True
if stripped.endswith("{"):
is_block_start = True
if not buffer and stripped != "" and not is_block_start:
try:
statements = _parse_statements_from_source(line, "<repl>", type_names=interpreter.type_registry.names())
try:
interpreter._execute_block(statements, global_env)
except ExitSignal as sig:
return sig.code
except PrefixRuntimeError as error:
if interpreter.logger.entries:
error.step_index = interpreter.logger.entries[-1].step_index
formatter = TracebackFormatter(interpreter)
_print_error(formatter.format_text(error, verbose=interpreter.verbose))
interpreter.call_stack = [global_frame]
except PrefixParseError:
# If a single-line parse fails, treat it as start of multi-line input
buffer.append(line)
except BaseException as exc:
_print_internal_error(exc=exc, interpreter=interpreter, verbose=verbose)
continue
if stripped == "" and buffer:
source_text = "\n".join(buffer)
buffer.clear()
try:
statements = _parse_statements_from_source(source_text, "<repl>", type_names=interpreter.type_registry.names())
interpreter._execute_block(statements, global_env)
except ExitSignal as sig:
return sig.code
except PrefixParseError as error:
_print_error(f"ParseError: {error}")
except PrefixRuntimeError as error:
if interpreter.logger.entries:
error.step_index = interpreter.logger.entries[-1].step_index
formatter = TracebackFormatter(interpreter)
_print_error(formatter.format_text(error, verbose=interpreter.verbose))
# reset call stack to single top-level frame to keep REPL usable
interpreter.call_stack = [global_frame]
except BaseException as exc:
_print_internal_error(exc=exc, interpreter=interpreter, verbose=verbose)
interpreter.call_stack = [global_frame]
continue
buffer.append(line)
return 0
def run_cli(argv: Optional[List[str]] = None) -> int:
try:
parser = argparse.ArgumentParser(description="Prefix reference interpreter")
parser.add_argument("inputs", nargs="*", help="Program path/source and/or extension files (.py/.prex)")
parser.add_argument("--ext", action="append", default=[], help="Extension path (.py) or pointer file (.prex)")
parser.add_argument("-source", "--source", dest="source_mode", action="store_true", help="Treat program argument as literal source text")
parser.add_argument("-verbose", "--verbose", dest="verbose", action="store_true", help="Emit env snapshots in tracebacks")
parser.add_argument("--traceback-json", action="store_true", help="Also emit JSON traceback")
args = parser.parse_args(argv)
except SystemExit:
# argparse uses SystemExit for -h and parse failures; preserve behavior.
raise
except BaseException as exc:
return _print_internal_error(exc=exc, interpreter=None)
inputs: List[str] = list(args.inputs or [])
ext_paths: List[str] = list(args.ext or [])
remaining: List[str] = []
for item in inputs:
lower = item.lower()
if lower.endswith(".py") or lower.endswith(".prex"):
ext_paths.append(item)
else:
remaining.append(item)
# Initialize `program` early so subsequent checks can reference it safely.
program: Optional[str] = remaining[0] if remaining else None
# If the caller didn't specify any extensions, look for a pointer file named
# ".prex" in the current working directory or (when a program file was
# provided) in the program's directory. If found, use it as the extension
# pointer file so the interpreter loads the extensions it points to.
if not ext_paths:
cwd_prex = os.path.abspath(".prex")
if os.path.exists(cwd_prex):
ext_paths.append(cwd_prex)
else:
# If a program path was given (and isn't literal source text),
# also check the program's directory for a .prex pointer file.
if program and not args.source_mode:
program_dir = os.path.dirname(os.path.abspath(program))
program_prex = os.path.join(program_dir, ".prex")
if os.path.exists(program_prex):
ext_paths.append(program_prex)
else:
# Also accept a pointer file that shares the program's
# basename but uses the .prex extension instead of the
# program extension (e.g. program.pre -> program.prex).
program_alt_prex = os.path.splitext(os.path.abspath(program))[0] + ".prex"
if os.path.exists(program_alt_prex):
ext_paths.append(program_alt_prex)
try:
services = load_runtime_services(ext_paths) if ext_paths else load_runtime_services([])
except PrefixExtensionError as exc:
_print_error(f"ExtensionError: {exc}")
return 1
except BaseException as exc:
return _print_internal_error(exc=exc, interpreter=None, verbose=bool(getattr(args, "verbose", False)))
program = None
if remaining:
if len(remaining) > 1:
_print_error("Too many non-extension inputs; expected a single program argument")
return 1
program = remaining[0]
if program is None:
if args.source_mode and not ext_paths:
_print_error("-source requires a program string")
return 1
# If only extensions are present, run REPL with the loaded extensions.
return run_repl(verbose=args.verbose, services=services)
if args.source_mode:
source_text = program
filename = "<repl>"
else:
filename = program
try:
with open(filename, "r", encoding="utf-8") as handle:
source_text = handle.read()
except OSError as exc:
_print_error(f"Failed to read {filename}: {exc}")
return 1
interpreter: Optional[Interpreter] = None
try:
interpreter = Interpreter(source=source_text, filename=filename, verbose=args.verbose, services=services)
except PrefixExtensionError as exc:
_print_error(f"ExtensionError: {exc}")
return 1
except BaseException as exc:
return _print_internal_error(exc=exc, interpreter=None, verbose=args.verbose, traceback_json=args.traceback_json)
try:
interpreter.run()
except PrefixParseError as error:
_print_error(f"ParseError: {error}")
return 1
except ExitSignal as sig:
return sig.code
except PrefixRuntimeError as error:
formatter = TracebackFormatter(interpreter)
_print_error(formatter.format_text(error, verbose=args.verbose))
if args.traceback_json:
_print_error(formatter.to_json(error))
return 1
except BaseException as exc:
return _print_internal_error(exc=exc, interpreter=interpreter, verbose=args.verbose, traceback_json=args.traceback_json)
return 0
if __name__ == "__main__":
try:
raise SystemExit(run_cli())
except KeyboardInterrupt:
print("Interrupted", file=sys.stderr)
raise SystemExit(130)
except SystemExit:
raise
except BaseException as exc:
# Absolute last-resort catch: never print a Python traceback.
code = _print_internal_error(exc=exc, interpreter=None)
raise SystemExit(code)