diff --git a/.azure/functional-test-pipeline.yml b/.azure/functional-test-pipeline.yml index 5464118c8..c67fb8243 100644 --- a/.azure/functional-test-pipeline.yml +++ b/.azure/functional-test-pipeline.yml @@ -38,8 +38,8 @@ jobs: # Matrix. strategy: matrix: - Mac: - test_os: Darwin + # Mac: + # test_os: Darwin Linux: test_os: Linux Win: @@ -77,15 +77,15 @@ jobs: condition: and(eq(variables['agent.os'], 'Windows_NT'), succeeded()) # Download pyocd.yaml config file into test directory. - - task: UniversalPackages@0 + - task: DownloadPackage@1 name: install_test_config inputs: - command: download - vstsFeed: 'pyocd/config' - vstsFeedPackage: 'pyocd-test-config' - vstsPackageVersion: '*' - downloadDirectory: 'test' - verbosity: 'debug' + packageType: 'upack' + feed: 'pyocd/config' + definition: 'pyocd-test-config' + version: '*' + downloadPath: 'test' + files: '**' displayName: 'Install test config' # Linux/Mac: Activate venv and run automated test suite. diff --git a/pyocd/debug/sequences/functions.py b/pyocd/debug/sequences/functions.py index 32946385e..d2b7a5131 100644 --- a/pyocd/debug/sequences/functions.py +++ b/pyocd/debug/sequences/functions.py @@ -19,7 +19,9 @@ import logging import os +import re import shlex +import shutil import subprocess import threading from dataclasses import dataclass @@ -162,16 +164,14 @@ def _write_value(self, ap: MEM_AP, addr: int, size: int, value: int) -> None: def _int_to_bytes(value: int, size: int) -> bytes: return int(value & ((1 << (size * 8)) - 1)).to_bytes(size, "little") - def _expand_path(self, raw_path: str) -> Path: - """@brief Expand a path string from a debug sequence, replacing custom placeholders and environment variables.""" - # Lazy-initialize and cache placeholders since they don't change during a session + def _expand_placeholders(self, raw_path: str) -> str: + """@brief Expand debug-sequence placeholders in a string.""" + # Lazy-initialize and cache placeholders since they don't change during a session. if self._placeholders_cache is None: device = self.context.delegate.cmsis_pack_device pname = self.context.pname - if hasattr(self.target, 'get_output'): - output = self.target.get_output() - else: - output = {} + get_output = getattr(self.target, 'get_output', None) + output = get_output() if get_output else {} out_file_path = next((f for f, (_, _, p) in output.items() if (pname is None) or (pname == p)), '') out_folder_path = str(Path(out_file_path).parent) if out_file_path else '' @@ -185,17 +185,30 @@ def _expand_path(self, raw_path: str) -> Path: '$D': getattr(device, 'part_number', ''), } - # Unescape double characters - path_str = raw_path.replace("$$", "$").replace("##", "#").replace("%%", "%") + assert self._placeholders_cache is not None + placeholders = self._placeholders_cache + + # Expand placeholders and escaped marker characters in a single pass so replacement + # values are not recursively re-processed as placeholders. + tokens = re.compile(r'(\$\$|##|%%|\$P|#P|\$L|%L|\$S|\$D)') - # Replace custom placeholders using cached dictionary - for placeholder, value in self._placeholders_cache.items(): - path_str = path_str.replace(placeholder, str(value)) + def _replace_token(match: "re.Match[str]") -> str: + token = match.group(0) + if token == "$$": + return "$" + if token == "##": + return "#" + if token == "%%": + return "%" + return str(placeholders.get(token, token)) - # Expand environment variables - path_str = path_str.lstrip("\\/") - path_str = os.path.expandvars(path_str) - return Path(path_str).expanduser() + path_str = tokens.sub(_replace_token, raw_path) + + return path_str + + def _expand_path(self, raw_path: str) -> Path: + """@brief Expand a path string from a debug sequence, replacing custom placeholders and environment variables.""" + return Path(os.path.expanduser(os.path.expandvars(self._expand_placeholders(raw_path)))) def _get_ap_addr(self) -> APAddressBase: """@brief Return the AP address selected by __ap or __apid variables. @@ -854,7 +867,7 @@ def runapplication(self, path: str, args: str, workdir: str, timeout: int) -> in timeout_s = None if timeout == 0 else timeout / 1e6 split_args = shlex.split(args.replace('\\"', '"'), posix=True) if args else [] - split_args = [str(self._expand_path(arg)) for arg in split_args] + split_args = [self._expand_placeholders(arg) for arg in split_args] cmd = [str(app_path), *split_args] try: @@ -866,6 +879,44 @@ def runapplication(self, path: str, args: str, workdir: str, timeout: int) -> in except subprocess.TimeoutExpired as err: raise DebugSequenceRuntimeError(f"application '{app_path}' timed out after {timeout_s}s") from err + def runpythonscript(self, path: str, args: str, workdir: str, timeout: int) -> int: + """@brief Run a Python script with an external Python interpreter. + + @param path: Constant string representing the script path. Refer to character sequences + for path/file name place holders. + @param args: Constant string with arguments to pass to the script command line. + Same placeholder character sequences apply as for path. + @param workdir: A constant string with the work directory for running the script. + An empty string means that the work directory is the current project folder. + Same placeholder character sequences apply as for path. + @param timeout: Timeout in microseconds. + @return: Script specific exit code. + """ + script_path = self._expand_path(path) + cwd = None if workdir == "" else str(self._expand_path(workdir)) + timeout_s = None if timeout == 0 else timeout / 1e6 + + split_args = shlex.split(args.replace('\\"', '"'), posix=True) if args else [] + split_args = [self._expand_placeholders(arg) for arg in split_args] + + interpreter_path = shutil.which("python3") or shutil.which("python") + if interpreter_path is None: + raise DebugSequenceRuntimeError("could not locate system Python interpreter;" \ + "ensure 'python3'/'python' is available in PATH") + + cmd = [interpreter_path, str(script_path), *split_args] + + try: + result = subprocess.run(cmd, cwd=cwd, timeout=timeout_s, check=False, capture_output=True) + LOG.debug("RunPython stdout:\n%s", result.stdout.decode(errors='replace')) + return int(result.returncode) + except FileNotFoundError as err: + raise DebugSequenceRuntimeError( + f"failed to run python script '{script_path}' with interpreter '{interpreter_path}': {err}") from err + except subprocess.TimeoutExpired as err: + raise DebugSequenceRuntimeError( + f"python script '{script_path}' timed out after {timeout_s}s") from err + def filepathexists(self, path: str, timeout: int) -> int: """@brief Check for existence of a file path.