diff --git a/fileglancer/app.py b/fileglancer/app.py index 1de9941a..98321fa5 100644 --- a/fileglancer/app.py +++ b/fileglancer/app.py @@ -33,7 +33,7 @@ from fileglancer.issues import create_jira_ticket, get_jira_ticket_details, delete_jira_ticket from fileglancer.utils import format_timestamp, guess_content_type, parse_range_header from fileglancer.user_context import UserContext, EffectiveUserContext, CurrentUserContext, UserContextConfigurationError -from fileglancer.filestore import Filestore +from fileglancer.filestore import Filestore, RootCheckError from fileglancer.log import AccessLogMiddleware from x2s3.utils import get_read_access_acl, get_nosuchbucket_response, get_error_response @@ -937,6 +937,30 @@ async def get_file_content(request: Request, path_name: str, subpath: Optional[s full_path = filestore._check_path_in_root(subpath) file_handle = open(full_path, 'rb') + except RootCheckError as e: + # Path attempts to escape root directory - try to find a valid fsp for this absolute path + logger.info(f"RootCheckError caught for {filestore_name}/{subpath}: {e}") + + # Use the full_path from the exception + full_path = e.full_path + + with db.get_db_session(settings.db_url) as session: + match = db.find_fsp_from_absolute_path(session, full_path) + + if match: + fsp, relative_subpath = match + # Construct the correct URL + if relative_subpath: + redirect_url = f"/api/content/{fsp.name}?subpath={relative_subpath}" + else: + redirect_url = f"/api/content/{fsp.name}" + + logger.info(f"Redirecting from /api/content/{filestore_name}?subpath={subpath} to {redirect_url}") + return RedirectResponse(url=redirect_url, status_code=307) + + # If no match found, return the original error message + logger.error(f"No valid file share found for path: {full_path}") + raise HTTPException(status_code=400, detail=str(e)) except FileNotFoundError: logger.error(f"File not found in {filestore_name}: {subpath}") raise HTTPException(status_code=404, detail="File or directory not found") @@ -1030,6 +1054,29 @@ async def get_file_metadata(path_name: str, subpath: Optional[str] = Query(''), return result + except RootCheckError as e: + # Path attempts to escape root directory - try to find a valid fsp for this absolute path + logger.info(f"RootCheckError caught for {filestore_name}/{subpath}: {e}") + + full_path = e.full_path + + with db.get_db_session(settings.db_url) as session: + match = db.find_fsp_from_absolute_path(session, full_path) + + if match: + fsp, relative_subpath = match + # Construct the correct URL + if relative_subpath: + redirect_url = f"/api/files/{fsp.name}?subpath={relative_subpath}" + else: + redirect_url = f"/api/files/{fsp.name}" + + logger.info(f"Redirecting from /api/files/{filestore_name}?subpath={subpath} to {redirect_url}") + return RedirectResponse(url=redirect_url, status_code=307) + + # If no match found, return the original error message + logger.error(f"No valid file share found for path: {full_path}") + raise HTTPException(status_code=400, detail=str(e)) except FileNotFoundError: logger.error(f"File or directory not found: {subpath}") raise HTTPException(status_code=404, detail="File or directory not found") diff --git a/fileglancer/database.py b/fileglancer/database.py index 50f79b5b..e53d2dfa 100644 --- a/fileglancer/database.py +++ b/fileglancer/database.py @@ -482,6 +482,49 @@ def _clear_sharing_key_cache(): logger.debug(f"Cleared entire sharing key cache, removed {old_size} entries") +def find_fsp_from_absolute_path(session: Session, absolute_path: str) -> Optional[tuple[FileSharePath, str]]: + """ + Find the file share path that exactly matches the given absolute path. + + This function iterates through all file share paths and checks if the absolute + path exists within any of them. Returns the first exact match found. + + Args: + session: Database session + absolute_path: Absolute file path to match against file shares + + Returns: + Tuple of (FileSharePath, relative_subpath) if an exact match is found, None otherwise + """ + # Normalize the input path + normalized_path = os.path.normpath(absolute_path) + + # Get all file share paths + paths = get_file_share_paths(session) + + for fsp in paths: + # Expand ~ to user's home directory before matching + expanded_mount_path = os.path.expanduser(fsp.mount_path) + expanded_mount_path = os.path.normpath(expanded_mount_path) + + # Check if the normalized path starts with this mount path + if normalized_path.startswith(expanded_mount_path): + # Calculate the relative subpath + if normalized_path == expanded_mount_path: + subpath = "" + logger.debug(f"Found exact match for path: {absolute_path} in fsp: {fsp.name} with subpath: {subpath}") + return (fsp, subpath) + else: + # Ensure we're matching on a directory boundary + remainder = normalized_path[len(expanded_mount_path):] + if remainder.startswith(os.sep): + subpath = remainder.lstrip(os.sep) + logger.debug(f"Found exact match for path: {absolute_path} in fsp: {fsp.name} with subpath: {subpath}") + return (fsp, subpath) + + return None + + def _validate_proxied_path(session: Session, fsp_name: str, path: str) -> None: """Validate a proxied path exists and is accessible""" # Get mount path - check database first using existing session, then check local mounts diff --git a/fileglancer/filestore.py b/fileglancer/filestore.py index f25f3b67..0a6f7001 100644 --- a/fileglancer/filestore.py +++ b/fileglancer/filestore.py @@ -18,6 +18,17 @@ # Default buffer size for streaming file contents DEFAULT_BUFFER_SIZE = 8192 + +class RootCheckError(ValueError): + """ + Raised when a path attempts to escape the root directory of a Filestore. + This exception signals that the path may be an absolute path that belongs + to a different file share and should trigger fsp resolution logic. + """ + def __init__(self, message: str, full_path: str): + super().__init__(message) + self.full_path = full_path + class FileInfo(BaseModel): """ A class that represents a file or directory in a Filestore. @@ -163,7 +174,7 @@ def _check_path_in_root(self, path: Optional[str]) -> str: str: The full path to the file or directory. Raises: - ValueError: If path attempts to escape root directory + RootCheckError: If path attempts to escape root directory """ if path is None or path == "": full_path = self.root_path @@ -174,7 +185,7 @@ def _check_path_in_root(self, path: Optional[str]) -> str: # Ensure the resolved path is within the resolved root if not full_path.startswith(root_real + os.sep) and full_path != root_real: - raise ValueError(f"Path ({full_path}) attempts to escape root directory ({root_real})") + raise RootCheckError(f"Path ({full_path}) attempts to escape root directory ({root_real})", full_path) return full_path diff --git a/tests/test_database.py b/tests/test_database.py index bb238612..ef5e021b 100644 --- a/tests/test_database.py +++ b/tests/test_database.py @@ -228,3 +228,156 @@ def test_create_proxied_path_with_home_dir(db_session, temp_dir): if os.path.exists(test_path): os.rmdir(test_path) + +def test_find_fsp_from_absolute_path_exact_match(db_session, temp_dir): + """Test finding FSP from absolute path with exact match""" + # Create a file share path + fsp = FileSharePathDB( + name="test_mount", + zone="testzone", + group="testgroup", + storage="local", + mount_path=temp_dir, + mac_path=temp_dir, + windows_path=temp_dir, + linux_path=temp_dir + ) + db_session.add(fsp) + db_session.commit() + + # Test exact match at mount root + result = find_fsp_from_absolute_path(db_session, temp_dir) + assert result is not None + assert result[0].name == "test_mount" + assert result[1] == "" + + # Test with subdirectory + subdir = os.path.join(temp_dir, "subdir") + os.makedirs(subdir, exist_ok=True) + result = find_fsp_from_absolute_path(db_session, subdir) + assert result is not None + assert result[0].name == "test_mount" + assert result[1] == "subdir" + + # Test with nested subdirectory + nested_dir = os.path.join(temp_dir, "subdir", "nested") + os.makedirs(nested_dir, exist_ok=True) + result = find_fsp_from_absolute_path(db_session, nested_dir) + assert result is not None + assert result[0].name == "test_mount" + assert result[1] == os.path.join("subdir", "nested") + + +def test_find_fsp_from_absolute_path_no_match(db_session, temp_dir): + """Test finding FSP from absolute path with no match""" + # Create a file share path + fsp = FileSharePathDB( + name="test_mount", + zone="testzone", + group="testgroup", + storage="local", + mount_path=temp_dir, + mac_path=temp_dir, + windows_path=temp_dir, + linux_path=temp_dir + ) + db_session.add(fsp) + db_session.commit() + + # Test with path that doesn't match any FSP + non_matching_path = "/completely/different/path" + result = find_fsp_from_absolute_path(db_session, non_matching_path) + assert result is None + + +def test_find_fsp_from_absolute_path_with_home_dir(db_session): + """Test finding FSP from absolute path with ~/ mount path""" + # Create a file share path using ~/ which should expand to current user's home + home_fsp = FileSharePathDB( + name="home", + zone="testzone", + group="testgroup", + storage="home", + mount_path="~/", + mac_path="~/", + windows_path="~/", + linux_path="~/" + ) + db_session.add(home_fsp) + db_session.commit() + + # Test with expanded home directory + home_dir = os.path.expanduser("~/") + result = find_fsp_from_absolute_path(db_session, home_dir) + assert result is not None + assert result[0].name == "home" + assert result[1] == "" + + # Test with subdirectory in home + test_subpath = "test_subdir" + test_path = os.path.join(home_dir, test_subpath) + result = find_fsp_from_absolute_path(db_session, test_path) + assert result is not None + assert result[0].name == "home" + assert result[1] == test_subpath + + +def test_find_fsp_from_absolute_path_normalization(db_session, temp_dir): + """Test that path normalization works correctly""" + # Create a file share path + fsp = FileSharePathDB( + name="test_mount", + zone="testzone", + group="testgroup", + storage="local", + mount_path=temp_dir, + mac_path=temp_dir, + windows_path=temp_dir, + linux_path=temp_dir + ) + db_session.add(fsp) + db_session.commit() + + # Test with trailing slashes + path_with_trailing_slash = temp_dir + "/" + result = find_fsp_from_absolute_path(db_session, path_with_trailing_slash) + assert result is not None + assert result[0].name == "test_mount" + assert result[1] == "" + + # Test with double slashes + subdir = os.path.join(temp_dir, "subdir") + os.makedirs(subdir, exist_ok=True) + path_with_double_slash = temp_dir + "//subdir" + result = find_fsp_from_absolute_path(db_session, path_with_double_slash) + assert result is not None + assert result[0].name == "test_mount" + assert result[1] == "subdir" + + +def test_find_fsp_from_absolute_path_boundary_check(db_session, temp_dir): + """Test that function correctly checks directory boundaries""" + # Create a file share path + fsp = FileSharePathDB( + name="test_mount", + zone="testzone", + group="testgroup", + storage="local", + mount_path=temp_dir, + mac_path=temp_dir, + windows_path=temp_dir, + linux_path=temp_dir + ) + db_session.add(fsp) + db_session.commit() + + # Test with a path that starts with the mount path but isn't a subdirectory + # For example, if temp_dir is "/tmp/test", then "/tmp/test2" should NOT match + parent_dir = os.path.dirname(temp_dir) + similar_path = temp_dir + "2" # e.g., /tmp/test2 + + # Only test if the similar path actually exists or if we can determine it won't match + result = find_fsp_from_absolute_path(db_session, similar_path) + # This should not match because similar_path is not a subdirectory of temp_dir + assert result is None or result[0].mount_path != temp_dir + diff --git a/tests/test_endpoints.py b/tests/test_endpoints.py index f12e50b9..0375d403 100644 --- a/tests/test_endpoints.py +++ b/tests/test_endpoints.py @@ -746,3 +746,298 @@ def test_delete_ticket_not_found(mock_delete, test_client): data = response.json() assert "error" in data + +# Symlink tests for /api/files and /api/content endpoints + +def test_get_files_with_symlink_to_same_fsp(test_client, temp_dir): + """Test /api/files endpoint with a symlink pointing within the same FSP""" + # Create a target directory within the FSP + target_dir = os.path.join(temp_dir, "target_directory") + os.makedirs(target_dir, exist_ok=True) + + # Create a file in the target directory + target_file = os.path.join(target_dir, "target_file.txt") + with open(target_file, "w") as f: + f.write("content in target") + + # Create a symlink within the FSP pointing to the target directory + symlink_path = os.path.join(temp_dir, "link_to_target") + os.symlink(target_dir, symlink_path) + + # Request files through the symlink + response = test_client.get("/api/files/tempdir?subpath=link_to_target") + assert response.status_code == 200 + data = response.json() + assert "files" in data + + # Verify we can see the target file through the symlink + file_names = [f["name"] for f in data["files"]] + assert "target_file.txt" in file_names + + +def test_get_files_with_symlink_outside_fsp(test_client, temp_dir): + """Test /api/files endpoint with a symlink pointing outside the FSP""" + # Create a separate directory outside the temp_dir (FSP root) + external_dir = tempfile.mkdtemp() + + try: + # Create a file in the external directory + external_file = os.path.join(external_dir, "external_file.txt") + with open(external_file, "w") as f: + f.write("external content") + + # Create another FSP for the external directory + from fileglancer.database import FileSharePathDB, get_db_session + from fileglancer.settings import get_settings + settings = get_settings() + + with get_db_session(settings.db_url) as session: + external_fsp = FileSharePathDB( + name="external", + zone="testzone", + group="testgroup", + storage="local", + mount_path=external_dir, + mac_path=external_dir, + windows_path=external_dir, + linux_path=external_dir + ) + session.add(external_fsp) + session.commit() + + # Create a symlink in the original FSP pointing to the external directory + symlink_path = os.path.join(temp_dir, "link_to_external") + os.symlink(external_dir, symlink_path) + + # Request files through the symlink - should get a redirect (307) that gets followed + response = test_client.get("/api/files/tempdir?subpath=link_to_external", follow_redirects=False) + assert response.status_code == 307 + + # Verify redirect location + assert "location" in response.headers + expected_location = "/api/files/external" + assert response.headers["location"] == expected_location + + # Follow the redirect and verify we get the external directory listing + response_followed = test_client.get("/api/files/tempdir?subpath=link_to_external", follow_redirects=True) + assert response_followed.status_code == 200 + data = response_followed.json() + assert "files" in data + file_names = [f["name"] for f in data["files"]] + assert "external_file.txt" in file_names + + finally: + # Clean up external directory + shutil.rmtree(external_dir) + + +def test_get_files_with_nested_symlink_outside_fsp(test_client, temp_dir): + """Test /api/files endpoint with a symlink pointing outside FSP to a subdirectory""" + # Create a separate directory outside the temp_dir (FSP root - created above) + external_dir = tempfile.mkdtemp() + + try: + # Create a subdirectory in the external directory + external_subdir = os.path.join(external_dir, "subdir") + os.makedirs(external_subdir, exist_ok=True) + + # Create a file in the external subdirectory + external_file = os.path.join(external_subdir, "external_file.txt") + with open(external_file, "w") as f: + f.write("external nested content") + + # Create another FSP for the external directory + from fileglancer.database import FileSharePathDB, get_db_session + from fileglancer.settings import get_settings + settings = get_settings() + + with get_db_session(settings.db_url) as session: + external_fsp = FileSharePathDB( + name="external", + zone="testzone", + group="testgroup", + storage="local", + mount_path=external_dir, + mac_path=external_dir, + windows_path=external_dir, + linux_path=external_dir + ) + session.add(external_fsp) + session.commit() + + # Create a symlink in the original FSP pointing to the external subdirectory + symlink_path = os.path.join(temp_dir, "link_to_external_subdir") + os.symlink(external_subdir, symlink_path) + + # Request files through the symlink - should get a redirect (307) that gets followed + response = test_client.get("/api/files/tempdir?subpath=link_to_external_subdir", follow_redirects=False) + assert response.status_code == 307 + + # Verify redirect location + assert "location" in response.headers + expected_location = "/api/files/external?subpath=subdir" + assert response.headers["location"] == expected_location + + # Follow the redirect and verify we get the external subdirectory listing + response_followed = test_client.get("/api/files/tempdir?subpath=link_to_external_subdir", follow_redirects=True) + assert response_followed.status_code == 200 + data = response_followed.json() + assert "files" in data + file_names = [f["name"] for f in data["files"]] + assert "external_file.txt" in file_names + + finally: + # Clean up external directory + shutil.rmtree(external_dir) + + +def test_get_files_with_symlink_no_matching_fsp(test_client, temp_dir): + """Test /api/files endpoint with a symlink pointing to a path with no matching FSP""" + # Create a separate directory outside the temp_dir + external_dir = tempfile.mkdtemp() + + try: + # Create a file in the external directory + external_file = os.path.join(external_dir, "orphan_file.txt") + with open(external_file, "w") as f: + f.write("orphan content") + + # Create a symlink in the original FSP pointing to the external directory + # But DON'T create an FSP for it + symlink_path = os.path.join(temp_dir, "link_to_orphan") + os.symlink(external_dir, symlink_path) + + # Request files through the symlink - should get a 400 error (path escapes root) + response = test_client.get("/api/files/tempdir?subpath=link_to_orphan") + assert response.status_code == 400 + data = response.json() + assert "error" in data + # The error message comes from RootCheckError + assert "path" in data["error"].lower() + + finally: + # Clean up external directory + shutil.rmtree(external_dir) + + +def test_get_content_with_symlink_to_same_fsp(test_client, temp_dir): + """Test /api/content endpoint with a symlink pointing within the same FSP""" + # Create a target file within the FSP + target_file = os.path.join(temp_dir, "target_content.txt") + target_content = "This is the target file content" + with open(target_file, "w") as f: + f.write(target_content) + + # Create a symlink within the FSP pointing to the target file + symlink_path = os.path.join(temp_dir, "link_to_content") + os.symlink(target_file, symlink_path) + + # Request content through the symlink + response = test_client.get("/api/content/tempdir?subpath=link_to_content") + assert response.status_code == 200 + assert response.text == target_content + + +def test_get_content_with_symlink_outside_fsp(test_client, temp_dir): + """Test /api/content endpoint with a symlink pointing outside the FSP""" + # Create a separate directory outside the temp_dir (FSP root) + external_dir = tempfile.mkdtemp() + + try: + # Create a file in the external directory + external_file = os.path.join(external_dir, "external_content.txt") + external_content = "This is external content" + with open(external_file, "w") as f: + f.write(external_content) + + # Create another FSP for the external directory + from fileglancer.database import FileSharePathDB, get_db_session + from fileglancer.settings import get_settings + settings = get_settings() + + with get_db_session(settings.db_url) as session: + external_fsp = FileSharePathDB( + name="external", + zone="testzone", + group="testgroup", + storage="local", + mount_path=external_dir, + mac_path=external_dir, + windows_path=external_dir, + linux_path=external_dir + ) + session.add(external_fsp) + session.commit() + + # Create a symlink in the original FSP pointing to the external file + symlink_path = os.path.join(temp_dir, "link_to_external_content") + os.symlink(external_file, symlink_path) + + # Request content through the symlink - should get a redirect (307) that gets followed + response = test_client.get("/api/content/tempdir?subpath=link_to_external_content", follow_redirects=False) + assert response.status_code == 307 + + # Verify redirect location + assert "location" in response.headers + expected_location = "/api/content/external?subpath=external_content.txt" + assert response.headers["location"] == expected_location + + # Follow the redirect and verify we get the external file content + response_followed = test_client.get("/api/content/tempdir?subpath=link_to_external_content", follow_redirects=True) + assert response_followed.status_code == 200 + assert response_followed.text == external_content + + finally: + # Clean up external directory + shutil.rmtree(external_dir) + + +def test_get_content_with_symlink_no_matching_fsp(test_client, temp_dir): + """Test /api/content endpoint with a symlink pointing to a path with no matching FSP""" + # Create a separate directory outside the temp_dir + external_dir = tempfile.mkdtemp() + + try: + # Create a file in the external directory + external_file = os.path.join(external_dir, "orphan_content.txt") + with open(external_file, "w") as f: + f.write("orphan content") + + # Create a symlink in the original FSP pointing to the external file + # But DON'T create an FSP for it + symlink_path = os.path.join(temp_dir, "link_to_orphan_content") + os.symlink(external_file, symlink_path) + + # Request content through the symlink - should get a 400 error (path escapes root) + response = test_client.get("/api/content/tempdir?subpath=link_to_orphan_content") + assert response.status_code == 400 + data = response.json() + assert "error" in data + # The error message comes from RootCheckError + assert "path" in data["error"].lower() + + finally: + # Clean up external directory + shutil.rmtree(external_dir) + + +def test_head_content_with_symlink(test_client, temp_dir): + """Test HEAD request to /api/content endpoint with a symlink""" + # Create a target file within the FSP + target_file = os.path.join(temp_dir, "target_head.txt") + target_content = "Content for HEAD request" + with open(target_file, "w") as f: + f.write(target_content) + + # Create a symlink within the FSP pointing to the target file + symlink_path = os.path.join(temp_dir, "link_to_head") + os.symlink(target_file, symlink_path) + + # HEAD request through the symlink + response = test_client.head("/api/content/tempdir?subpath=link_to_head") + assert response.status_code == 200 + assert "Accept-Ranges" in response.headers + assert response.headers["Accept-Ranges"] == "bytes" + assert "Content-Length" in response.headers + assert int(response.headers["Content-Length"]) == len(target_content) +