diff --git a/python/lib/sift_client/_internal/low_level_wrappers/exports.py b/python/lib/sift_client/_internal/low_level_wrappers/exports.py new file mode 100644 index 000000000..63aa200cd --- /dev/null +++ b/python/lib/sift_client/_internal/low_level_wrappers/exports.py @@ -0,0 +1,154 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING, cast + +from sift.calculated_channels.v2.calculated_channels_pb2 import ( + CalculatedChannelAbstractChannelReference, +) +from sift.exports.v1.exports_pb2 import ( + AssetsAndTimeRange, + CalculatedChannelConfig, + ExportDataRequest, + ExportDataResponse, + ExportOptions, + GetDownloadUrlRequest, + GetDownloadUrlResponse, + RunsAndTimeRange, + TimeRange, +) +from sift.exports.v1.exports_pb2_grpc import ExportServiceStub + +from sift_client._internal.low_level_wrappers.base import LowLevelClientBase +from sift_client._internal.util.timestamp import to_pb_timestamp +from sift_client.sift_types.calculated_channel import CalculatedChannel, CalculatedChannelCreate +from sift_client.transport import WithGrpcClient + +if TYPE_CHECKING: + from datetime import datetime + + from sift_client.sift_types.export import ExportOutputFormat + from sift_client.transport.grpc_transport import GrpcClient + + +def _build_calc_channel_configs( + calculated_channels: list[CalculatedChannel | CalculatedChannelCreate] | None, +) -> list[CalculatedChannelConfig]: + """Convert high-level calculated channel objects to proto CalculatedChannelConfig messages.""" + if not calculated_channels: + return [] + configs = [] + for cc in calculated_channels: + if isinstance(cc, CalculatedChannelCreate): + refs = cc.expression_channel_references or [] + else: + refs = cc.channel_references + configs.append( + CalculatedChannelConfig( + name=cc.name, + expression=cc.expression or "", + channel_references=[ + CalculatedChannelAbstractChannelReference( + channel_reference=ref.channel_reference, + channel_identifier=ref.channel_identifier, + ) + for ref in refs + ], + units=cc.units, + ) + ) + return configs + + +class ExportsLowLevelClient(LowLevelClientBase, WithGrpcClient): + """Low-level client for the DataExportAPI. + + This class provides a thin wrapper around the autogenerated gRPC bindings for the DataExportAPI. + """ + + def __init__(self, grpc_client: GrpcClient): + """Initialize the ExportsLowLevelClient. + + Args: + grpc_client: The gRPC client to use for making API calls. + """ + super().__init__(grpc_client) + + async def export_data( + self, + *, + output_format: ExportOutputFormat, + run_ids: list[str] | None = None, + asset_ids: list[str] | None = None, + start_time: datetime | None = None, + stop_time: datetime | None = None, + channel_ids: list[str] | None = None, + calculated_channels: list[CalculatedChannel | CalculatedChannelCreate] | None = None, + simplify_channel_names: bool = False, + combine_runs: bool = False, + split_export_by_asset: bool = False, + split_export_by_run: bool = False, + ) -> str: + """Initiate a data export. + + Builds the ExportDataRequest proto and makes the gRPC call. + Sets whichever time_selection oneof fields are provided + (run_ids, asset_ids, or time range); the server validates + the request. + + Returns: + The job ID for the background export. + """ + request = ExportDataRequest( + output_format=output_format.value, + export_options=ExportOptions( + use_legacy_format=False, + simplify_channel_names=simplify_channel_names, + combine_runs=combine_runs, + split_export_by_asset=split_export_by_asset, + split_export_by_run=split_export_by_run, + ), + channel_ids=channel_ids or [], + calculated_channel_configs=_build_calc_channel_configs(calculated_channels), + ) + + if run_ids is not None: + runs_and_time_range = RunsAndTimeRange(run_ids=run_ids) + if start_time: + runs_and_time_range.start_time.CopyFrom(to_pb_timestamp(start_time)) + if stop_time: + runs_and_time_range.stop_time.CopyFrom(to_pb_timestamp(stop_time)) + request.runs_and_time_range.CopyFrom(runs_and_time_range) + + if asset_ids is not None: + assets_and_time_range = AssetsAndTimeRange(asset_ids=asset_ids) + if start_time: + assets_and_time_range.start_time.CopyFrom(to_pb_timestamp(start_time)) + if stop_time: + assets_and_time_range.stop_time.CopyFrom(to_pb_timestamp(stop_time)) + request.assets_and_time_range.CopyFrom(assets_and_time_range) + + if run_ids is None and asset_ids is None: + time_range = TimeRange() + if start_time: + time_range.start_time.CopyFrom(to_pb_timestamp(start_time)) + if stop_time: + time_range.stop_time.CopyFrom(to_pb_timestamp(stop_time)) + request.time_range.CopyFrom(time_range) + + response = await self._grpc_client.get_stub(ExportServiceStub).ExportData(request) + response = cast("ExportDataResponse", response) + return response.job_id + + async def get_download_url(self, job_id: str) -> str: + """Get the download URL for a background export job. + + Args: + job_id: The job ID returned from export_data. + + Returns: + The presigned URL to download the exported zip file. + """ + request = GetDownloadUrlRequest(job_id=job_id) + response = await self._grpc_client.get_stub(ExportServiceStub).GetDownloadUrl(request) + response = cast("GetDownloadUrlResponse", response) + return response.presigned_url diff --git a/python/lib/sift_client/_internal/util/channels.py b/python/lib/sift_client/_internal/util/channels.py new file mode 100644 index 000000000..8c3d39d82 --- /dev/null +++ b/python/lib/sift_client/_internal/util/channels.py @@ -0,0 +1,54 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +from sift_client.sift_types.calculated_channel import CalculatedChannel, CalculatedChannelCreate +from sift_client.sift_types.channel import ChannelReference + +if TYPE_CHECKING: + from sift_client.resources.channels import ChannelsAPIAsync + + +async def resolve_calculated_channels( + calculated_channels: list[CalculatedChannel | CalculatedChannelCreate] | None, + channels_api: ChannelsAPIAsync, +) -> list[CalculatedChannel | CalculatedChannelCreate] | None: + """Resolve channel reference identifiers from names to UUIDs. + + For each channel reference, looks up the identifier as a channel name. + If found, replaces it with the channel's UUID. If not found, assumes + the identifier is already a UUID and keeps it as-is. + """ + if not calculated_channels: + return None + + resolved: list[CalculatedChannel | CalculatedChannelCreate] = [] + for cc in calculated_channels: + refs = ( + (cc.expression_channel_references or []) + if isinstance(cc, CalculatedChannelCreate) + else cc.channel_references + ) + + resolved_refs: list[ChannelReference] = [] + for ref in refs: + channel = await channels_api.find( + name=ref.channel_identifier, + assets=cc.asset_ids, + ) + if channel is not None: + ref = ChannelReference( + channel_reference=ref.channel_reference, + channel_identifier=channel._id_or_error, + ) + resolved_refs.append(ref) + + resolved.append( + CalculatedChannelCreate( + name=cc.name, + expression=cc.expression, + expression_channel_references=resolved_refs, + units=cc.units or None, + ) + ) + return resolved diff --git a/python/lib/sift_client/_internal/util/executor.py b/python/lib/sift_client/_internal/util/executor.py new file mode 100644 index 000000000..87525cce0 --- /dev/null +++ b/python/lib/sift_client/_internal/util/executor.py @@ -0,0 +1,10 @@ +from __future__ import annotations + +import asyncio +from typing import Any, Callable + + +async def run_sync_function(fn: Callable[..., Any], *args: Any) -> Any: + """Run a synchronous function in a thread pool to avoid blocking the event loop.""" + loop = asyncio.get_running_loop() + return await loop.run_in_executor(None, fn, *args) diff --git a/python/lib/sift_client/_internal/util/file.py b/python/lib/sift_client/_internal/util/file.py new file mode 100644 index 000000000..55f937cc8 --- /dev/null +++ b/python/lib/sift_client/_internal/util/file.py @@ -0,0 +1,63 @@ +from __future__ import annotations + +import warnings +import zipfile +from typing import TYPE_CHECKING + +from sift_client.errors import SiftWarning + +if TYPE_CHECKING: + from pathlib import Path + + from sift_client.transport.rest_transport import RestClient + + +def download_file(signed_url: str, output_path: Path, *, rest_client: RestClient) -> Path: + """Download a file from a URL in streaming 4 MiB chunks. + + Args: + url: The URL to download from. + dest: Path where the file will be saved. Parent directories are created if needed. + rest_client: The SDK rest client to use for the download. + + Returns: + The path to the downloaded file. + + Raises: + requests.HTTPError: If the download request fails. + """ + output_path.parent.mkdir(parents=True, exist_ok=True) + # Strip the session's default Authorization header, presigned URLs carry their own auth + with rest_client.get(signed_url, stream=True, headers={"Authorization": None}) as response: + response.raise_for_status() + with output_path.open("wb") as file: + for chunk in response.iter_content(chunk_size=4194304): # 4 MiB + if chunk: + file.write(chunk) + return output_path + + +def extract_zip(zip_path: Path, output_dir: Path, *, delete_zip: bool = True) -> list[Path]: + """Extract a zip file to a directory. + + Args: + zip_path: Path to the zip file. + output_dir: Directory to extract contents into. Created if it doesn't exist. + delete_zip: If True (default), delete the zip file after extraction. + + Returns: + List of paths to the extracted files (excludes directories). + + Raises: + zipfile.BadZipFile: If the file is not a valid zip. + """ + output_dir.mkdir(parents=True, exist_ok=True) + with zipfile.ZipFile(zip_path, "r") as zip_file: + names = zip_file.namelist() + zip_file.extractall(output_dir) + if delete_zip: + try: + zip_path.unlink() + except OSError: + warnings.warn(f"Failed to delete zip file '{zip_path}'", SiftWarning, stacklevel=2) + return [output_dir / name for name in names if not name.endswith("/")] diff --git a/python/lib/sift_client/_tests/_internal/test_channels.py b/python/lib/sift_client/_tests/_internal/test_channels.py new file mode 100644 index 000000000..3b6be637d --- /dev/null +++ b/python/lib/sift_client/_tests/_internal/test_channels.py @@ -0,0 +1,58 @@ +from __future__ import annotations + +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from sift_client._internal.util.channels import resolve_calculated_channels +from sift_client.sift_types.calculated_channel import ( + CalculatedChannel, + CalculatedChannelCreate, + ChannelReference, +) +from sift_client.sift_types.channel import Channel + + +class TestResolveCalculatedChannels: + @pytest.mark.asyncio + async def test_none_passthrough(self): + api = MagicMock() + api.find = AsyncMock(return_value=None) + assert await resolve_calculated_channels(None, channels_api=api) is None + + @pytest.mark.asyncio + async def test_resolves_name_to_uuid(self): + mock_ch = MagicMock(spec=Channel) + mock_ch._id_or_error = "resolved-uuid" + api = MagicMock() + api.find = AsyncMock(return_value=mock_ch) + + cc = MagicMock(spec=CalculatedChannel) + cc.name, cc.expression, cc.units = "calc", "$1 + 10", "m/s" + cc.asset_ids = ["asset-1"] + cc.channel_references = [ + ChannelReference(channel_reference="$1", channel_identifier="sensor.vel") + ] + + result = await resolve_calculated_channels([cc], channels_api=api) + assert result is not None + assert len(result) == 1 + refs = result[0].expression_channel_references + assert refs is not None + assert refs[0].channel_identifier == "resolved-uuid" + + @pytest.mark.asyncio + async def test_keeps_identifier_when_not_found(self): + api = MagicMock() + api.find = AsyncMock(return_value=None) + cc = CalculatedChannelCreate( + name="x", + expression="$1", + units="m", + expression_channel_references=[ + ChannelReference(channel_reference="$1", channel_identifier="ch-1") + ], + ) + result = await resolve_calculated_channels([cc], channels_api=api) + assert result is not None + assert result[0] == cc diff --git a/python/lib/sift_client/_tests/_internal/test_sync_wrapper.py b/python/lib/sift_client/_tests/_internal/test_sync_wrapper.py index 86841f2b6..be3050032 100644 --- a/python/lib/sift_client/_tests/_internal/test_sync_wrapper.py +++ b/python/lib/sift_client/_tests/_internal/test_sync_wrapper.py @@ -9,6 +9,7 @@ import pytest from sift_client._internal.sync_wrapper import generate_sync_api +from sift_client._internal.util.executor import run_sync_function from sift_client.resources._base import ResourceBase @@ -82,6 +83,12 @@ async def async_method_with_exception(self) -> None: await asyncio.sleep(0.01) raise ValueError("Test exception") + async def async_method_with_executor(self) -> str: + """Test async method that uses run_in_executor, like wait_and_download.""" + self._record_call("async_method_with_executor") + result = await run_sync_function(lambda: "executor_result") + return result + async def async_method_with_complex_args( self, arg1: str, arg2: dict[str, Any] | None = None, *args, **kwargs ) -> dict[str, Any]: @@ -183,3 +190,44 @@ def test_complex_arguments(self, mock_resource_sync): assert result["args"] == ("extra_arg",) assert result["kwargs"] == {"keyword": "keyword_value"} assert mock_resource_sync._async_impl.get_call_count("async_method_with_complex_args") == 1 + + +class TestSyncWrapperEventLoopScenarios: + """Test sync wrapper with run_in_executor under different event loop scenarios.""" + + @pytest.fixture + def mock_resource_sync(self): + mock_client = MockClient() + MockResource = generate_sync_api(MockResourceAsync, "MockResource") # noqa: N806 + return MockResource(mock_client, value="testVal") + + def test_sync_no_event_loop(self, mock_resource_sync): + """Plain sync call with no active event loop in the calling thread.""" + result = mock_resource_sync.async_method_with_executor() + assert result == "executor_result" + + def test_with_user_event_loop(self, mock_resource_sync): + """User has their own event loop running in another thread.""" + user_loop = asyncio.new_event_loop() + user_thread = threading.Thread(target=user_loop.run_forever, daemon=True) + user_thread.start() + try: + result = mock_resource_sync.async_method_with_executor() + assert result == "executor_result" + finally: + user_loop.call_soon_threadsafe(user_loop.stop) + user_thread.join(timeout=1.0) + user_loop.close() + + def test_sync_from_async(self, mock_resource_sync): + """Sync API called from inside a running async function.""" + + async def caller(): + return mock_resource_sync.async_method_with_executor() + + loop = asyncio.new_event_loop() + try: + result = loop.run_until_complete(caller()) + assert result == "executor_result" + finally: + loop.close() diff --git a/python/lib/sift_client/_tests/resources/test_exports.py b/python/lib/sift_client/_tests/resources/test_exports.py new file mode 100644 index 000000000..4cebe86d0 --- /dev/null +++ b/python/lib/sift_client/_tests/resources/test_exports.py @@ -0,0 +1,325 @@ +"""Pytest tests for the Exports API. + +These tests demonstrate and validate the usage of the Data Export API including: +- Basic export operations (by run, by asset, by time range) +- Wait and download functionality +- Input validation and error handling +- Calculated channel configuration and resolution +""" + +from __future__ import annotations + +import asyncio +import uuid +from datetime import datetime, timedelta, timezone +from typing import TYPE_CHECKING +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest +import pytest_asyncio + +from sift_client._internal.low_level_wrappers.exports import _build_calc_channel_configs + +if TYPE_CHECKING: + from sift_client import SiftClient +from sift_client.resources import DataExportAPI +from sift_client.resources.exports import DataExportAPIAsync +from sift_client.resources.ingestion import TracingConfig +from sift_client.sift_types.calculated_channel import ( + CalculatedChannel, + CalculatedChannelCreate, + ChannelReference, +) +from sift_client.sift_types.channel import ChannelDataType +from sift_client.sift_types.export import ExportOutputFormat +from sift_client.sift_types.ingestion import ChannelConfig, FlowConfig, IngestionConfigCreate +from sift_client.sift_types.job import Job, JobStatus + +START = datetime(2025, 1, 1, tzinfo=timezone.utc) +STOP = datetime(2025, 1, 2, tzinfo=timezone.utc) +CSV = ExportOutputFormat.CSV + + +@pytest.fixture +def exports_api_async(sift_client: SiftClient): + """Get the async data export API instance.""" + return sift_client.async_.data_export + + +@pytest.fixture +def exports_api_sync(sift_client: SiftClient): + """Get the synchronous data export API instance.""" + return sift_client.data_export + + +@pytest.fixture +def mock_client(): + client = MagicMock() + client.grpc_client = MagicMock() + client.async_ = MagicMock() + client.async_.jobs = MagicMock() + client.async_.channels = MagicMock() + client.async_.channels.find = AsyncMock(return_value=None) + return client + + +@pytest.fixture +def mock_job(): + job = MagicMock(spec=Job) + job._id_or_error = "job-123" + job.job_status = JobStatus.FINISHED + return job + + +@pytest.fixture +def exports_api(mock_client, mock_job): + with patch("sift_client.resources.exports.ExportsLowLevelClient", autospec=True) as mock_ll: + api = DataExportAPIAsync(mock_client) + api._low_level_client = mock_ll.return_value + api._low_level_client.export_data = AsyncMock(return_value="job-123") + mock_client.async_.jobs.get = AsyncMock(return_value=mock_job) + return api + + +@pytest.mark.integration +def test_client_binding(sift_client): + assert sift_client.data_export + assert isinstance(sift_client.data_export, DataExportAPI) + assert sift_client.async_.data_export + assert isinstance(sift_client.async_.data_export, DataExportAPIAsync) + + +INGEST_TIMESTAMP = datetime(2025, 6, 1, tzinfo=timezone.utc) + + +@pytest_asyncio.fixture(scope="session") +async def ingested_export_channel(sift_client, nostromo_asset): + """Ingest a single data point into a unique channel on the nostromo asset for export tests.""" + channel_name = f"export-test-{uuid.uuid4().hex[:8]}" + + flow_config = FlowConfig( + name="export-test-flow", + channels=[ChannelConfig(name=channel_name, data_type=ChannelDataType.DOUBLE)], + ) + ingestion_config = IngestionConfigCreate( + asset_name=nostromo_asset.name, + flows=[flow_config], + ) + + async with await sift_client.async_.ingestion.create_ingestion_config_streaming_client( + ingestion_config=ingestion_config, + tracing_config=TracingConfig.disabled(), # suppresses ./logs + ) as stream: + await stream.send( + flow_config.as_flow(timestamp=INGEST_TIMESTAMP, values={channel_name: 42.0}) + ) + + channel = None + for _ in range(20): + channel = sift_client.channels.find(name=channel_name, asset=nostromo_asset._id_or_error) + if channel is not None: + break + await asyncio.sleep(0.5) + assert channel is not None, f"Channel {channel_name} did not appear after ingest" + + yield channel + + sift_client.channels.archive([channel]) + + +@pytest.mark.integration +class TestDataExportAPIAsync: + """Test suite for the async Data Export API functionality.""" + + class TestExport: + """Tests for the async export method.""" + + @pytest.mark.asyncio + async def test_export_by_run(self, exports_api_async, nostromo_run): + """Test exporting data scoped to a run.""" + start = nostromo_run.start_time + job = await exports_api_async.export( + runs=[nostromo_run], + start_time=start, + stop_time=start + timedelta(seconds=10), + output_format=CSV, + ) + assert isinstance(job, Job) + assert job.id_ is not None + + @pytest.mark.asyncio + async def test_export_by_asset( + self, exports_api_async, nostromo_asset, ingested_export_channel + ): + """Test exporting data scoped to an asset with specific channels.""" + job = await exports_api_async.export( + assets=[nostromo_asset], + start_time=INGEST_TIMESTAMP - timedelta(seconds=1), + stop_time=INGEST_TIMESTAMP + timedelta(seconds=1), + channels=[ingested_export_channel], + output_format=CSV, + ) + assert isinstance(job, Job) + + @pytest.mark.asyncio + async def test_export_by_time_range(self, exports_api_async, sift_client, nostromo_run): + """Test exporting data by time range with explicit channels.""" + channels = await sift_client.async_.channels.list_(limit=1) + assert channels, "No channels available" + start = nostromo_run.start_time + job = await exports_api_async.export( + start_time=start, + stop_time=start + timedelta(seconds=10), + channels=[channels[0]], + output_format=CSV, + ) + assert isinstance(job, Job) + + class TestWaitAndDownload: + """Tests for the async wait_and_download method.""" + + @pytest.mark.asyncio + async def test_wait_and_download(self, exports_api_async, nostromo_run, tmp_path): + """Test exporting data and downloading the result.""" + start = nostromo_run.start_time + job = await exports_api_async.export( + runs=[nostromo_run], + start_time=start, + stop_time=start + timedelta(seconds=10), + output_format=CSV, + ) + files = job.wait_and_download(output_dir=tmp_path, timeout_secs=300) + assert len(files) > 0 + assert all(f.exists() for f in files) + + +@pytest.mark.integration +class TestDataExportAPISync: + """Test suite for the synchronous Data Export API functionality. + + Only includes basic sync tests to verify sync wrappers work. No specific sync behavior + difference tests are needed. + """ + + class TestExport: + """Tests for the sync export method.""" + + def test_export_by_run(self, exports_api_sync, nostromo_run): + """Test synchronous export scoped to a run.""" + start = nostromo_run.start_time + job = exports_api_sync.export( + runs=[nostromo_run], + start_time=start, + stop_time=start + timedelta(seconds=10), + output_format=CSV, + ) + assert isinstance(job, Job) + + def test_export_by_asset(self, exports_api_sync, nostromo_asset, ingested_export_channel): + """Test synchronous export scoped to an asset with specific channels.""" + job = exports_api_sync.export( + assets=[nostromo_asset], + start_time=INGEST_TIMESTAMP - timedelta(seconds=1), + stop_time=INGEST_TIMESTAMP + timedelta(seconds=1), + channels=[ingested_export_channel], + output_format=CSV, + ) + assert isinstance(job, Job) + + def test_export_by_time_range(self, exports_api_sync, sift_client, nostromo_run): + """Test synchronous export by time range with explicit channels.""" + channels = sift_client.channels.list_(limit=1) + assert channels, "No channels available" + start = nostromo_run.start_time + job = exports_api_sync.export( + start_time=start, + stop_time=start + timedelta(seconds=10), + channels=[channels[0]], + output_format=CSV, + ) + assert isinstance(job, Job) + + +class TestDictConversion: + @pytest.mark.asyncio + async def test_calculated_channel_dict_converted(self, exports_api): + await exports_api.export( + runs=["run-1"], + output_format=CSV, + calculated_channels=[ + { + "name": "calc", + "expression": "$1 + 1", + "expression_channel_references": [ + {"channel_reference": "$1", "channel_identifier": "ch-1"} + ], + } + ], + ) + cc = exports_api._low_level_client.export_data.call_args.kwargs["calculated_channels"] + assert cc is not None + assert len(cc) == 1 + assert isinstance(cc[0], CalculatedChannelCreate) + assert cc[0].name == "calc" + + +class TestExportValidation: + @pytest.mark.asyncio + async def test_runs_and_assets_raises(self, exports_api): + with pytest.raises(ValueError, match="not both"): + await exports_api.export( + runs=["r"], assets=["a"], start_time=START, stop_time=STOP, output_format=CSV + ) + + @pytest.mark.asyncio + async def test_nothing_provided_raises(self, exports_api): + with pytest.raises(ValueError, match="At least one"): + await exports_api.export(output_format=CSV) + + +class TestBuildCalcChannelConfigs: + @pytest.mark.parametrize("input_val", [None, []]) + def test_empty_input(self, input_val): + assert _build_calc_channel_configs(input_val) == [] + + def test_create_objects(self): + ccs = [ + CalculatedChannelCreate( + name="speed_doubled", + expression="$1 * 2", + units="m/s", + expression_channel_references=[ + ChannelReference(channel_reference="$1", channel_identifier="ch-1") + ], + ), + CalculatedChannelCreate( + name="no_units", + expression="$1 + $2", + expression_channel_references=[ + ChannelReference(channel_reference="$1", channel_identifier="ch-1"), + ChannelReference(channel_reference="$2", channel_identifier="ch-2"), + ], + ), + ] + result = _build_calc_channel_configs(ccs) + assert len(result) == 2 + assert result[0].name == "speed_doubled" + assert result[0].units == "m/s" + assert result[1].name == "no_units" + assert result[1].units == "" + assert len(result[1].channel_references) == 2 + + def test_existing_calculated_channel(self): + cc = MagicMock(spec=CalculatedChannel) + cc.name, cc.expression, cc.units = "derived", "$1 / $2", "m/s" + cc.channel_references = [ + ChannelReference(channel_reference="$1", channel_identifier="ch-dist"), + ChannelReference(channel_reference="$2", channel_identifier="ch-time"), + ] + result = _build_calc_channel_configs([cc]) + assert len(result) == 1 + assert result[0].name == "derived" + assert [r.channel_identifier for r in result[0].channel_references] == [ + "ch-dist", + "ch-time", + ] diff --git a/python/lib/sift_client/_tests/resources/test_jobs.py b/python/lib/sift_client/_tests/resources/test_jobs.py index ebc9ea09f..feb84707d 100644 --- a/python/lib/sift_client/_tests/resources/test_jobs.py +++ b/python/lib/sift_client/_tests/resources/test_jobs.py @@ -18,6 +18,7 @@ from sift_client.sift_types import Job from sift_client.sift_types.job import ( DataExportDetails, + DataExportStatusDetails, DataImportDetails, DataImportStatusDetails, JobStatus, @@ -525,3 +526,29 @@ def test_basic_list(self, jobs_api_sync): if jobs: assert isinstance(jobs[0], Job) + + +class TestWaitAndDownload: + @pytest.mark.asyncio + @pytest.mark.parametrize( + ("status", "details", "match"), + [ + ( + JobStatus.FAILED, + DataExportStatusDetails(error_message="out of memory"), + r"failed.*out of memory", + ), + (JobStatus.FAILED, None, "failed"), + (JobStatus.CANCELLED, None, "cancelled"), + ], + ) + async def test_terminal_status_raises(self, status, details, match): + mock_client = MagicMock() + mock_client.grpc_client = MagicMock() + jobs_api = JobsAPIAsync(mock_client) + completed = MagicMock(spec=Job) + completed.job_status = status + completed.job_status_details = details + jobs_api.wait_until_complete = AsyncMock(return_value=completed) + with pytest.raises(RuntimeError, match=match): + await jobs_api.wait_and_download(job="job-err") diff --git a/python/lib/sift_client/client.py b/python/lib/sift_client/client.py index ae3302673..ed7aeba9a 100644 --- a/python/lib/sift_client/client.py +++ b/python/lib/sift_client/client.py @@ -7,6 +7,8 @@ CalculatedChannelsAPIAsync, ChannelsAPI, ChannelsAPIAsync, + DataExportAPI, + DataExportAPIAsync, FileAttachmentsAPI, FileAttachmentsAPIAsync, IngestionAPIAsync, @@ -101,9 +103,13 @@ class SiftClient( tags: TagsAPI """Instance of the Tags API for making synchronous requests.""" + test_results: TestResultsAPI """Instance of the Test Results API for making synchronous requests.""" + data_export: DataExportAPI + """Instance of the Data Export API for making synchronous requests.""" + async_: AsyncAPIs """Accessor for the asynchronous APIs. All asynchronous APIs are available as attributes on this accessor.""" @@ -152,6 +158,7 @@ def __init__( self.runs = RunsAPI(self) self.tags = TagsAPI(self) self.test_results = TestResultsAPI(self) + self.data_export = DataExportAPI(self) # Accessor for the asynchronous APIs self.async_ = AsyncAPIs( @@ -167,6 +174,7 @@ def __init__( runs=RunsAPIAsync(self), tags=TagsAPIAsync(self), test_results=TestResultsAPIAsync(self), + data_export=DataExportAPIAsync(self), ) @property diff --git a/python/lib/sift_client/resources/__init__.py b/python/lib/sift_client/resources/__init__.py index af9fe5e31..78b3b4eba 100644 --- a/python/lib/sift_client/resources/__init__.py +++ b/python/lib/sift_client/resources/__init__.py @@ -162,6 +162,7 @@ async def main(): from sift_client.resources.runs import RunsAPIAsync from sift_client.resources.tags import TagsAPIAsync from sift_client.resources.test_results import TestResultsAPIAsync +from sift_client.resources.exports import DataExportAPIAsync # ruff: noqa All imports needs to be imported before sync_stubs to avoid circular import from sift_client.resources.sync_stubs import ( @@ -176,6 +177,7 @@ async def main(): TagsAPI, TestResultsAPI, FileAttachmentsAPI, + DataExportAPI, ) import sys @@ -211,4 +213,6 @@ async def main(): "TestResultsAPI", "TestResultsAPIAsync", "TracingConfig", + "DataExportAPI", + "DataExportAPIAsync", ] diff --git a/python/lib/sift_client/resources/exports.py b/python/lib/sift_client/resources/exports.py new file mode 100644 index 000000000..ed8676960 --- /dev/null +++ b/python/lib/sift_client/resources/exports.py @@ -0,0 +1,139 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +from sift_client._internal.low_level_wrappers.exports import ExportsLowLevelClient +from sift_client._internal.util.channels import resolve_calculated_channels +from sift_client.resources._base import ResourceBase +from sift_client.sift_types.asset import Asset +from sift_client.sift_types.calculated_channel import CalculatedChannelCreate +from sift_client.sift_types.channel import Channel +from sift_client.sift_types.export import ExportOutputFormat # noqa: TC001 +from sift_client.sift_types.run import Run + +if TYPE_CHECKING: + from datetime import datetime + + from sift_client.client import SiftClient + from sift_client.sift_types.calculated_channel import CalculatedChannel + from sift_client.sift_types.job import Job + + +class DataExportAPIAsync(ResourceBase): + """High-level API for exporting data from Sift.""" + + def __init__(self, sift_client: SiftClient): + """Initialize the DataExportAPI. + + Args: + sift_client: The Sift client to use. + """ + super().__init__(sift_client) + self._low_level_client = ExportsLowLevelClient(grpc_client=self.client.grpc_client) + + async def export( + self, + *, + output_format: ExportOutputFormat, + runs: list[str | Run] | None = None, + assets: list[str | Asset] | None = None, + start_time: datetime | None = None, + stop_time: datetime | None = None, + channels: list[str | Channel] | None = None, + calculated_channels: list[CalculatedChannel | CalculatedChannelCreate | dict] | None = None, + simplify_channel_names: bool = False, + combine_runs: bool = False, + split_export_by_asset: bool = False, + split_export_by_run: bool = False, + ) -> Job: + """Export data from Sift. + + Initiates an export on the server and returns a Job handle. Use + ``job.wait_and_download()`` to poll for completion and download the files. + + There are three ways to scope the export, determined by which arguments + are provided: + + 1. **By runs** — provide ``runs``. The ``start_time``/``stop_time`` are + optional (if omitted, the full time range of each run is used). If no + ``channels`` or ``calculated_channels`` are provided, all channels + from the runs' assets are included. + + 2. **By assets** — provide ``assets``. Both ``start_time`` and + ``stop_time`` are **required**. If no ``channels`` or + ``calculated_channels`` are provided, all channels from the assets + are included. + + 3. **By time range only** — provide ``start_time`` and ``stop_time`` + without ``runs`` or ``assets``. At least one of ``channels`` or + ``calculated_channels`` **must** be provided to scope the data. + + You cannot provide both ``runs`` and ``assets`` at the same time. + + Args: + output_format: The file format for the export (CSV or Sun/WinPlot). + runs: One or more Run objects or run IDs to export data from. + assets: One or more Asset objects or asset IDs to export data from. + start_time: Start of the time range to export. Required when using + assets or time-range-only mode; optional when using runs. + stop_time: End of the time range to export. Required when using + assets or time-range-only mode; optional when using runs. + channels: Channel objects or channel IDs to include. If omitted and + runs or assets are provided, all channels are exported. Required + (along with ``calculated_channels``) in time-range-only mode. + calculated_channels: Calculated channels to include in the export. + Accepts existing CalculatedChannel objects, + CalculatedChannelCreate definitions, or dictionaries that + will be converted to CalculatedChannelCreate via model_validate. + simplify_channel_names: Remove text preceding last period in channel + names, only if the resulting simplified name is unique. + combine_runs: Identical channels within the same asset across + multiple runs will be combined into a single column. + split_export_by_asset: Split each asset into a separate file, with + asset name removed from channel name display. + split_export_by_run: Split each run into a separate file, with run + name removed from channel name display. + + Returns: + A Job handle for the pending export. + """ + if runs and assets: + raise ValueError("Provide either 'runs' or 'assets', not both.") + if not runs and not assets and not start_time and not stop_time: + raise ValueError("At least one of 'runs', 'assets', or a time range must be provided.") + + run_ids = [r._id_or_error if isinstance(r, Run) else r for r in runs] if runs else None + asset_ids = ( + [a._id_or_error if isinstance(a, Asset) else a for a in assets] if assets else None + ) + channel_ids = ( + [c._id_or_error if isinstance(c, Channel) else c for c in channels] if channels else [] + ) + normalized_calc_channels: list[CalculatedChannel | CalculatedChannelCreate] | None = ( + [ + CalculatedChannelCreate.model_validate(cc) if isinstance(cc, dict) else cc + for cc in calculated_channels + ] + if calculated_channels + else None + ) + resolved_calc_channels = await resolve_calculated_channels( + normalized_calc_channels, + channels_api=self.client.async_.channels, + ) + + job_id = await self._low_level_client.export_data( + run_ids=run_ids, + asset_ids=asset_ids, + output_format=output_format, + start_time=start_time, + stop_time=stop_time, + channel_ids=channel_ids, + calculated_channels=resolved_calc_channels, + simplify_channel_names=simplify_channel_names, + combine_runs=combine_runs, + split_export_by_asset=split_export_by_asset, + split_export_by_run=split_export_by_run, + ) + + return await self.client.async_.jobs.get(job_id=job_id) diff --git a/python/lib/sift_client/resources/jobs.py b/python/lib/sift_client/resources/jobs.py index c3f775389..a0b50649e 100644 --- a/python/lib/sift_client/resources/jobs.py +++ b/python/lib/sift_client/resources/jobs.py @@ -1,12 +1,17 @@ from __future__ import annotations import asyncio +import tempfile import time +import zipfile +from pathlib import Path from typing import TYPE_CHECKING from sift_client._internal.low_level_wrappers.jobs import JobsLowLevelClient +from sift_client._internal.util.executor import run_sync_function +from sift_client._internal.util.file import download_file, extract_zip from sift_client.resources._base import ResourceBase -from sift_client.sift_types.job import Job, JobStatus, JobType +from sift_client.sift_types.job import DataExportStatusDetails, Job, JobStatus, JobType from sift_client.util import cel_utils as cel if TYPE_CHECKING: @@ -160,8 +165,8 @@ async def retry(self, job: Job | str) -> Job: async def wait_until_complete( self, - *, job: Job | str, + *, polling_interval_secs: int = 5, timeout_secs: int | None = None, ) -> Job: @@ -189,3 +194,74 @@ async def wait_until_complete( if timeout_secs is not None and (time.monotonic() - start) >= timeout_secs: raise TimeoutError(f"Job {job_id} did not complete within {timeout_secs} seconds") await asyncio.sleep(polling_interval_secs) + + async def wait_and_download( + self, + job: Job | str, + *, + polling_interval_secs: int = 5, + timeout_secs: int | None = None, + output_dir: str | Path | None = None, + extract: bool = True, + ) -> list[Path]: + """Wait for a job to complete and download the result files. + + Polls the job status at the given interval until the job is FINISHED, + FAILED, or CANCELLED, then downloads the result files. + + Args: + job: The Job or job ID to wait for. + polling_interval_secs: Seconds between status polls. Defaults to 5. + timeout_secs: Maximum seconds to wait. If None, polls indefinitely. + output_dir: Directory to save the downloaded files. If omitted, a + temporary directory is created automatically. + extract: If True (default) and the downloaded file is a zip, + extract it and delete the archive, returning paths to the + extracted files. Non-zip files are returned as-is regardless + of this flag. + + Returns: + List of paths to the downloaded/extracted files. + + Raises: + RuntimeError: If the job fails or is cancelled. + TimeoutError: If the job does not complete within timeout_secs. + """ + job_id = job._id_or_error if isinstance(job, Job) else job + + completed_job = await self.wait_until_complete( + job=job_id, + polling_interval_secs=polling_interval_secs, + timeout_secs=timeout_secs, + ) + if completed_job.job_status == JobStatus.FAILED: + if ( + isinstance(completed_job.job_status_details, DataExportStatusDetails) + and completed_job.job_status_details.error_message + ): + raise RuntimeError( + f"Export job '{job_id}' failed. {completed_job.job_status_details.error_message}" + ) + raise RuntimeError(f"Export job '{job_id}' failed.") + if completed_job.job_status == JobStatus.CANCELLED: + raise RuntimeError(f"Export job '{job_id}' was cancelled.") + + presigned_url = await self.client.async_.data_export._low_level_client.get_download_url( + job_id=job_id + ) + output_dir = ( + Path(output_dir) + if output_dir is not None + else Path(tempfile.mkdtemp(prefix="sift_export_")) + ) + download_path = output_dir / job_id + + # Run the synchronous download in a thread pool to avoid blocking the event loop + rest_client = self.client.rest_client + await run_sync_function( + lambda: download_file(presigned_url, download_path, rest_client=rest_client) + ) + + if not extract or not zipfile.is_zipfile(download_path): + return [download_path] + return extract_zip(download_path, output_dir) diff --git a/python/lib/sift_client/resources/sync_stubs/__init__.py b/python/lib/sift_client/resources/sync_stubs/__init__.py index 3f6cc427c..acd73755e 100644 --- a/python/lib/sift_client/resources/sync_stubs/__init__.py +++ b/python/lib/sift_client/resources/sync_stubs/__init__.py @@ -7,6 +7,7 @@ AssetsAPIAsync, CalculatedChannelsAPIAsync, ChannelsAPIAsync, + DataExportAPIAsync, FileAttachmentsAPIAsync, JobsAPIAsync, PingAPIAsync, @@ -28,11 +29,13 @@ ReportsAPI = generate_sync_api(ReportsAPIAsync, "ReportsAPI") TagsAPI = generate_sync_api(TagsAPIAsync, "TagsAPI") TestResultsAPI = generate_sync_api(TestResultsAPIAsync, "TestResultsAPI") +DataExportAPI = generate_sync_api(DataExportAPIAsync, "DataExportAPI") __all__ = [ "AssetsAPI", "CalculatedChannelsAPI", "ChannelsAPI", + "DataExportAPI", "FileAttachmentsAPI", "JobsAPI", "PingAPI", diff --git a/python/lib/sift_client/resources/sync_stubs/__init__.pyi b/python/lib/sift_client/resources/sync_stubs/__init__.pyi index 843a0061f..67ed4eb14 100644 --- a/python/lib/sift_client/resources/sync_stubs/__init__.pyi +++ b/python/lib/sift_client/resources/sync_stubs/__init__.pyi @@ -21,12 +21,17 @@ if TYPE_CHECKING: CalculatedChannelUpdate, ) from sift_client.sift_types.channel import Channel + from sift_client.sift_types.export import ExportOutputFormat from sift_client.sift_types.file_attachment import ( FileAttachment, FileAttachmentUpdate, RemoteFileEntityType, ) - from sift_client.sift_types.job import Job, JobStatus, JobType + from sift_client.sift_types.job import ( + Job, + JobStatus, + JobType, + ) from sift_client.sift_types.report import Report, ReportUpdate from sift_client.sift_types.rule import Rule, RuleCreate, RuleUpdate, RuleVersion from sift_client.sift_types.run import Run, RunCreate, RunUpdate @@ -533,6 +538,89 @@ class ChannelsAPI: """ ... +class DataExportAPI: + """Sync counterpart to `DataExportAPIAsync`. + + High-level API for exporting data from Sift. + """ + + def __init__(self, sift_client: SiftClient): + """Initialize the DataExportAPI. + + Args: + sift_client: The Sift client to use. + """ + ... + + def _run(self, coro): ... + def export( + self, + *, + output_format: ExportOutputFormat, + runs: list[str | Run] | None = None, + assets: list[str | Asset] | None = None, + start_time: datetime | None = None, + stop_time: datetime | None = None, + channels: list[str | Channel] | None = None, + calculated_channels: list[CalculatedChannel | CalculatedChannelCreate | dict] | None = None, + simplify_channel_names: bool = False, + combine_runs: bool = False, + split_export_by_asset: bool = False, + split_export_by_run: bool = False, + ) -> Job: + """Export data from Sift. + + Initiates an export on the server and returns a Job handle. Use + ``job.wait_and_download()`` to poll for completion and download the files. + + There are three ways to scope the export, determined by which arguments + are provided: + + 1. **By runs** — provide ``runs``. The ``start_time``/``stop_time`` are + optional (if omitted, the full time range of each run is used). If no + ``channels`` or ``calculated_channels`` are provided, all channels + from the runs' assets are included. + + 2. **By assets** — provide ``assets``. Both ``start_time`` and + ``stop_time`` are **required**. If no ``channels`` or + ``calculated_channels`` are provided, all channels from the assets + are included. + + 3. **By time range only** — provide ``start_time`` and ``stop_time`` + without ``runs`` or ``assets``. At least one of ``channels`` or + ``calculated_channels`` **must** be provided to scope the data. + + You cannot provide both ``runs`` and ``assets`` at the same time. + + Args: + output_format: The file format for the export (CSV or Sun/WinPlot). + runs: One or more Run objects or run IDs to export data from. + assets: One or more Asset objects or asset IDs to export data from. + start_time: Start of the time range to export. Required when using + assets or time-range-only mode; optional when using runs. + stop_time: End of the time range to export. Required when using + assets or time-range-only mode; optional when using runs. + channels: Channel objects or channel IDs to include. If omitted and + runs or assets are provided, all channels are exported. Required + (along with ``calculated_channels``) in time-range-only mode. + calculated_channels: Calculated channels to include in the export. + Accepts existing CalculatedChannel objects, + CalculatedChannelCreate definitions, or dictionaries that + will be converted to CalculatedChannelCreate via model_validate. + simplify_channel_names: Remove text preceding last period in channel + names, only if the resulting simplified name is unique. + combine_runs: Identical channels within the same asset across + multiple runs will be combined into a single column. + split_export_by_asset: Split each asset into a separate file, with + asset name removed from channel name display. + split_export_by_run: Split each run into a separate file, with run + name removed from channel name display. + + Returns: + A Job handle for the pending export. + """ + ... + class FileAttachmentsAPI: """Sync counterpart to `FileAttachmentsAPIAsync`. @@ -763,8 +851,42 @@ class JobsAPI: """ ... + def wait_and_download( + self, + job: Job | str, + *, + polling_interval_secs: int = 5, + timeout_secs: int | None = None, + output_dir: str | Path | None = None, + extract: bool = True, + ) -> list[Path]: + """Wait for a job to complete and download the result files. + + Polls the job status at the given interval until the job is FINISHED, + FAILED, or CANCELLED, then downloads the result files. + + Args: + job: The Job or job ID to wait for. + polling_interval_secs: Seconds between status polls. Defaults to 5. + timeout_secs: Maximum seconds to wait. If None, polls indefinitely. + output_dir: Directory to save the downloaded files. If omitted, a + temporary directory is created automatically. + extract: If True (default) and the downloaded file is a zip, + extract it and delete the archive, returning paths to the + extracted files. Non-zip files are returned as-is regardless + of this flag. + + Returns: + List of paths to the downloaded/extracted files. + + Raises: + RuntimeError: If the job fails or is cancelled. + TimeoutError: If the job does not complete within timeout_secs. + """ + ... + def wait_until_complete( - self, *, job: Job | str, polling_interval_secs: int = 5, timeout_secs: int | None = None + self, job: Job | str, *, polling_interval_secs: int = 5, timeout_secs: int | None = None ) -> Job: """Wait until the job is complete or the timeout is reached. diff --git a/python/lib/sift_client/sift_types/export.py b/python/lib/sift_client/sift_types/export.py new file mode 100644 index 000000000..bac3eac31 --- /dev/null +++ b/python/lib/sift_client/sift_types/export.py @@ -0,0 +1,17 @@ +from __future__ import annotations + +from enum import Enum + +from sift.exports.v1.exports_pb2 import ExportOutputFormat as ExportOutputFormatProto + + +class ExportOutputFormat(Enum): + """Supported output formats for data exports. + + Attributes: + CSV: Comma-separated values format. + SUN: Sun (WinPlot) format (not used in certain environments). + """ + + CSV = ExportOutputFormatProto.EXPORT_OUTPUT_FORMAT_CSV + SUN = ExportOutputFormatProto.EXPORT_OUTPUT_FORMAT_SUN diff --git a/python/lib/sift_client/sift_types/job.py b/python/lib/sift_client/sift_types/job.py index 32b355763..6d3adbe2d 100644 --- a/python/lib/sift_client/sift_types/job.py +++ b/python/lib/sift_client/sift_types/job.py @@ -16,6 +16,8 @@ from sift_client.sift_types._base import BaseType if TYPE_CHECKING: + from pathlib import Path + from sift_client.client import SiftClient @@ -312,3 +314,41 @@ def wait_until_complete( ) self._update(completed_job) return self + + def wait_and_download( + self, + *, + polling_interval_secs: int = 5, + timeout_secs: int | None = None, + output_dir: str | Path | None = None, + extract: bool = True, + ) -> list[Path]: + """Wait for a job to complete and download the result files. + + Polls the job status at the given interval until the job is FINISHED, + FAILED, or CANCELLED, then downloads the result files. + + Args: + polling_interval_secs: Seconds between status polls. Defaults to 5. + timeout_secs: Maximum seconds to wait. If None, polls indefinitely. + output_dir: Directory to save the downloaded files. If omitted, a + temporary directory is created automatically. + extract: If True (default) and the downloaded file is a zip, + extract it and delete the archive, returning paths to the + extracted files. Non-zip files are returned as-is regardless + of this flag. + + Returns: + List of paths to the downloaded/extracted files. + + Raises: + RuntimeError: If the job fails or is cancelled. + TimeoutError: If the job does not complete within timeout_secs. + """ + return self.client.jobs.wait_and_download( + job=self, + polling_interval_secs=polling_interval_secs, + timeout_secs=timeout_secs, + output_dir=output_dir, + extract=extract, + ) diff --git a/python/lib/sift_client/util/util.py b/python/lib/sift_client/util/util.py index 3800f91a7..e82a8ccfe 100644 --- a/python/lib/sift_client/util/util.py +++ b/python/lib/sift_client/util/util.py @@ -7,6 +7,7 @@ AssetsAPIAsync, CalculatedChannelsAPIAsync, ChannelsAPIAsync, + DataExportAPIAsync, FileAttachmentsAPIAsync, IngestionAPIAsync, JobsAPIAsync, @@ -58,6 +59,9 @@ class AsyncAPIs(NamedTuple): test_results: TestResultsAPIAsync """Instance of the Test Results API for making asynchronous requests.""" + data_export: DataExportAPIAsync + """Instance of the Data Export API for making asynchronous requests.""" + def count_non_none(*args: Any) -> int: """Count the number of non-none arguments."""