Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 19 additions & 7 deletions src/together/lib/cli/api/beta/jig/volumes.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,13 @@ async def spinner_updater(self) -> None:
self.update_progress()
await asyncio.sleep(0.1)

async def upload_files(self, source_path: Path, volume_name: str) -> None:
async def upload_files(self, source_path: Path, volume_name: str, content_version: int) -> None:
"""Upload all files from source directory with progress tracking"""
# these require a running event loop
self.semaphore = asyncio.Semaphore(UPLOAD_CONCURRENCY_LIMIT)
self.progress_lock = asyncio.Lock()

source_prefix = f"{volume_name}/{source_path.name}"
source_prefix = f"{volume_name}/{content_version}/{source_path.name}"
files_to_upload: list[tuple[Path, str, int]] = []

for file_path in source_path.rglob("*"):
Expand Down Expand Up @@ -296,7 +296,8 @@ async def _create_volume(client: Together, name: str, source: str) -> None:
if not source_path.is_dir():
raise ValueError(f"Source path must be a directory: {source}")

source_prefix = f"{name}/{source_path.name}"
content_version = 1
source_prefix = f"{name}/{content_version}/{source_path.name}"

click.echo(f"\N{ROCKET} Creating volume '{name}' with source prefix '{source_prefix}'")
try:
Expand All @@ -310,7 +311,7 @@ async def _create_volume(client: Together, name: str, source: str) -> None:
raise RuntimeError(f"Failed to create volume: {e}") from e

try:
await Uploader(client).upload_files(source_path, volume_name=name)
await Uploader(client).upload_files(source_path, volume_name=name, content_version=content_version)
except Exception as e:
click.echo(f"\N{CROSS MARK} Upload failed: {e}")
click.echo(f"\N{WASTEBASKET} Cleaning up volume '{name}'")
Expand All @@ -330,16 +331,27 @@ async def _update_volume(client: Together, name: str, source: str) -> None:
raise ValueError(f"Source path must be a directory: {source}")

try:
client.beta.jig.volumes.retrieve(name)
volume = client.beta.jig.volumes.retrieve(name)
except APIStatusError as e:
if hasattr(e, "status_code") and e.status_code == 404:
raise ValueError(f"Volume '{name}' does not exist") from e
raise

source_prefix = f"{name}/{source_path.name}"
old_version = 1
if volume.content and volume.content.type == "files" and volume.content.source_prefix:
prefix_parts = volume.content.source_prefix.split("/")
if len(prefix_parts) > 2 and prefix_parts[0] == name:
try:
old_version = int(prefix_parts[1])
except ValueError:
# version is not encoded in the prefix, default to 1
pass

version = old_version + 1
source_prefix = f"{name}/{version}/{source_path.name}"

click.echo(f"\N{INFORMATION SOURCE} Uploading files for volume '{name}'")
await Uploader(client).upload_files(source_path, volume_name=name)
await Uploader(client).upload_files(source_path, volume_name=name, content_version=version)

click.echo(f"\N{INFORMATION SOURCE} Updating volume '{name}' with source prefix '{source_prefix}'")
client.beta.jig.volumes.update(
Expand Down