Skip to content
49 changes: 48 additions & 1 deletion fileglancer/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
from fileglancer.issues import create_jira_ticket, get_jira_ticket_details, delete_jira_ticket
from fileglancer.utils import format_timestamp, guess_content_type, parse_range_header
from fileglancer.user_context import UserContext, EffectiveUserContext, CurrentUserContext, UserContextConfigurationError
from fileglancer.filestore import Filestore
from fileglancer.filestore import Filestore, RootCheckError
from fileglancer.log import AccessLogMiddleware

from x2s3.utils import get_read_access_acl, get_nosuchbucket_response, get_error_response
Expand Down Expand Up @@ -937,6 +937,30 @@ async def get_file_content(request: Request, path_name: str, subpath: Optional[s
full_path = filestore._check_path_in_root(subpath)
file_handle = open(full_path, 'rb')

except RootCheckError as e:
# Path attempts to escape root directory - try to find a valid fsp for this absolute path
logger.info(f"RootCheckError caught for {filestore_name}/{subpath}: {e}")

# Use the full_path from the exception
full_path = e.full_path

with db.get_db_session(settings.db_url) as session:
match = db.find_fsp_from_absolute_path(session, full_path)

if match:
fsp, relative_subpath = match
# Construct the correct URL
if relative_subpath:
redirect_url = f"/api/content/{fsp.name}?subpath={relative_subpath}"
else:
redirect_url = f"/api/content/{fsp.name}"

logger.info(f"Redirecting from /api/content/{filestore_name}?subpath={subpath} to {redirect_url}")
return RedirectResponse(url=redirect_url, status_code=307)

# If no match found, return the original error message
logger.error(f"No valid file share found for path: {full_path}")
raise HTTPException(status_code=400, detail=str(e))
except FileNotFoundError:
logger.error(f"File not found in {filestore_name}: {subpath}")
raise HTTPException(status_code=404, detail="File or directory not found")
Expand Down Expand Up @@ -1030,6 +1054,29 @@ async def get_file_metadata(path_name: str, subpath: Optional[str] = Query(''),

return result

except RootCheckError as e:
# Path attempts to escape root directory - try to find a valid fsp for this absolute path
logger.info(f"RootCheckError caught for {filestore_name}/{subpath}: {e}")

full_path = e.full_path

with db.get_db_session(settings.db_url) as session:
match = db.find_fsp_from_absolute_path(session, full_path)

if match:
fsp, relative_subpath = match
# Construct the correct URL
if relative_subpath:
redirect_url = f"/api/files/{fsp.name}?subpath={relative_subpath}"
else:
redirect_url = f"/api/files/{fsp.name}"

logger.info(f"Redirecting from /api/files/{filestore_name}?subpath={subpath} to {redirect_url}")
return RedirectResponse(url=redirect_url, status_code=307)

# If no match found, return the original error message
logger.error(f"No valid file share found for path: {full_path}")
raise HTTPException(status_code=400, detail=str(e))
except FileNotFoundError:
logger.error(f"File or directory not found: {subpath}")
raise HTTPException(status_code=404, detail="File or directory not found")
Expand Down
43 changes: 43 additions & 0 deletions fileglancer/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,49 @@ def _clear_sharing_key_cache():
logger.debug(f"Cleared entire sharing key cache, removed {old_size} entries")


def find_fsp_from_absolute_path(session: Session, absolute_path: str) -> Optional[tuple[FileSharePath, str]]:
"""
Find the file share path that exactly matches the given absolute path.

This function iterates through all file share paths and checks if the absolute
path exists within any of them. Returns the first exact match found.

Args:
session: Database session
absolute_path: Absolute file path to match against file shares

Returns:
Tuple of (FileSharePath, relative_subpath) if an exact match is found, None otherwise
"""
# Normalize the input path
normalized_path = os.path.normpath(absolute_path)

# Get all file share paths
paths = get_file_share_paths(session)

for fsp in paths:
# Expand ~ to user's home directory before matching
expanded_mount_path = os.path.expanduser(fsp.mount_path)
expanded_mount_path = os.path.normpath(expanded_mount_path)

# Check if the normalized path starts with this mount path
if normalized_path.startswith(expanded_mount_path):
# Calculate the relative subpath
if normalized_path == expanded_mount_path:
subpath = ""
logger.debug(f"Found exact match for path: {absolute_path} in fsp: {fsp.name} with subpath: {subpath}")
return (fsp, subpath)
else:
# Ensure we're matching on a directory boundary
remainder = normalized_path[len(expanded_mount_path):]
if remainder.startswith(os.sep):
subpath = remainder.lstrip(os.sep)
logger.debug(f"Found exact match for path: {absolute_path} in fsp: {fsp.name} with subpath: {subpath}")
return (fsp, subpath)

return None


def _validate_proxied_path(session: Session, fsp_name: str, path: str) -> None:
"""Validate a proxied path exists and is accessible"""
# Get mount path - check database first using existing session, then check local mounts
Expand Down
15 changes: 13 additions & 2 deletions fileglancer/filestore.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,17 @@
# Default buffer size for streaming file contents
DEFAULT_BUFFER_SIZE = 8192


class RootCheckError(ValueError):
"""
Raised when a path attempts to escape the root directory of a Filestore.
This exception signals that the path may be an absolute path that belongs
to a different file share and should trigger fsp resolution logic.
"""
def __init__(self, message: str, full_path: str):
super().__init__(message)
self.full_path = full_path

class FileInfo(BaseModel):
"""
A class that represents a file or directory in a Filestore.
Expand Down Expand Up @@ -163,7 +174,7 @@ def _check_path_in_root(self, path: Optional[str]) -> str:
str: The full path to the file or directory.

Raises:
ValueError: If path attempts to escape root directory
RootCheckError: If path attempts to escape root directory
"""
if path is None or path == "":
full_path = self.root_path
Expand All @@ -174,7 +185,7 @@ def _check_path_in_root(self, path: Optional[str]) -> str:

# Ensure the resolved path is within the resolved root
if not full_path.startswith(root_real + os.sep) and full_path != root_real:
raise ValueError(f"Path ({full_path}) attempts to escape root directory ({root_real})")
raise RootCheckError(f"Path ({full_path}) attempts to escape root directory ({root_real})", full_path)
return full_path


Expand Down
153 changes: 153 additions & 0 deletions tests/test_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,3 +228,156 @@ def test_create_proxied_path_with_home_dir(db_session, temp_dir):
if os.path.exists(test_path):
os.rmdir(test_path)


def test_find_fsp_from_absolute_path_exact_match(db_session, temp_dir):
"""Test finding FSP from absolute path with exact match"""
# Create a file share path
fsp = FileSharePathDB(
name="test_mount",
zone="testzone",
group="testgroup",
storage="local",
mount_path=temp_dir,
mac_path=temp_dir,
windows_path=temp_dir,
linux_path=temp_dir
)
db_session.add(fsp)
db_session.commit()

# Test exact match at mount root
result = find_fsp_from_absolute_path(db_session, temp_dir)
assert result is not None
assert result[0].name == "test_mount"
assert result[1] == ""

# Test with subdirectory
subdir = os.path.join(temp_dir, "subdir")
os.makedirs(subdir, exist_ok=True)
result = find_fsp_from_absolute_path(db_session, subdir)
assert result is not None
assert result[0].name == "test_mount"
assert result[1] == "subdir"

# Test with nested subdirectory
nested_dir = os.path.join(temp_dir, "subdir", "nested")
os.makedirs(nested_dir, exist_ok=True)
result = find_fsp_from_absolute_path(db_session, nested_dir)
assert result is not None
assert result[0].name == "test_mount"
assert result[1] == os.path.join("subdir", "nested")


def test_find_fsp_from_absolute_path_no_match(db_session, temp_dir):
"""Test finding FSP from absolute path with no match"""
# Create a file share path
fsp = FileSharePathDB(
name="test_mount",
zone="testzone",
group="testgroup",
storage="local",
mount_path=temp_dir,
mac_path=temp_dir,
windows_path=temp_dir,
linux_path=temp_dir
)
db_session.add(fsp)
db_session.commit()

# Test with path that doesn't match any FSP
non_matching_path = "/completely/different/path"
result = find_fsp_from_absolute_path(db_session, non_matching_path)
assert result is None


def test_find_fsp_from_absolute_path_with_home_dir(db_session):
"""Test finding FSP from absolute path with ~/ mount path"""
# Create a file share path using ~/ which should expand to current user's home
home_fsp = FileSharePathDB(
name="home",
zone="testzone",
group="testgroup",
storage="home",
mount_path="~/",
mac_path="~/",
windows_path="~/",
linux_path="~/"
)
db_session.add(home_fsp)
db_session.commit()

# Test with expanded home directory
home_dir = os.path.expanduser("~/")
result = find_fsp_from_absolute_path(db_session, home_dir)
assert result is not None
assert result[0].name == "home"
assert result[1] == ""

# Test with subdirectory in home
test_subpath = "test_subdir"
test_path = os.path.join(home_dir, test_subpath)
result = find_fsp_from_absolute_path(db_session, test_path)
assert result is not None
assert result[0].name == "home"
assert result[1] == test_subpath


def test_find_fsp_from_absolute_path_normalization(db_session, temp_dir):
"""Test that path normalization works correctly"""
# Create a file share path
fsp = FileSharePathDB(
name="test_mount",
zone="testzone",
group="testgroup",
storage="local",
mount_path=temp_dir,
mac_path=temp_dir,
windows_path=temp_dir,
linux_path=temp_dir
)
db_session.add(fsp)
db_session.commit()

# Test with trailing slashes
path_with_trailing_slash = temp_dir + "/"
result = find_fsp_from_absolute_path(db_session, path_with_trailing_slash)
assert result is not None
assert result[0].name == "test_mount"
assert result[1] == ""

# Test with double slashes
subdir = os.path.join(temp_dir, "subdir")
os.makedirs(subdir, exist_ok=True)
path_with_double_slash = temp_dir + "//subdir"
result = find_fsp_from_absolute_path(db_session, path_with_double_slash)
assert result is not None
assert result[0].name == "test_mount"
assert result[1] == "subdir"


def test_find_fsp_from_absolute_path_boundary_check(db_session, temp_dir):
"""Test that function correctly checks directory boundaries"""
# Create a file share path
fsp = FileSharePathDB(
name="test_mount",
zone="testzone",
group="testgroup",
storage="local",
mount_path=temp_dir,
mac_path=temp_dir,
windows_path=temp_dir,
linux_path=temp_dir
)
db_session.add(fsp)
db_session.commit()

# Test with a path that starts with the mount path but isn't a subdirectory
# For example, if temp_dir is "/tmp/test", then "/tmp/test2" should NOT match
parent_dir = os.path.dirname(temp_dir)
similar_path = temp_dir + "2" # e.g., /tmp/test2

# Only test if the similar path actually exists or if we can determine it won't match
result = find_fsp_from_absolute_path(db_session, similar_path)
# This should not match because similar_path is not a subdirectory of temp_dir
assert result is None or result[0].mount_path != temp_dir

Loading