From a26719bbb7fce9d27c4d854fcc07c6f3f489b10f Mon Sep 17 00:00:00 2001 From: Wei Qi Lu Date: Thu, 26 Mar 2026 15:47:02 -0700 Subject: [PATCH 1/2] python(feat): add data import api --- .../low_level_wrappers/data_imports.py | 212 ++++++++++++++ python/lib/sift_client/client.py | 7 + python/lib/sift_client/resources/__init__.py | 4 + .../lib/sift_client/resources/data_imports.py | 239 ++++++++++++++++ .../resources/sync_stubs/__init__.py | 3 + .../resources/sync_stubs/__init__.pyi | 146 ++++++++++ .../lib/sift_client/sift_types/data_import.py | 269 ++++++++++++++++++ python/lib/sift_client/util/util.py | 4 + 8 files changed, 884 insertions(+) create mode 100644 python/lib/sift_client/_internal/low_level_wrappers/data_imports.py create mode 100644 python/lib/sift_client/resources/data_imports.py create mode 100644 python/lib/sift_client/sift_types/data_import.py diff --git a/python/lib/sift_client/_internal/low_level_wrappers/data_imports.py b/python/lib/sift_client/_internal/low_level_wrappers/data_imports.py new file mode 100644 index 000000000..d83f42142 --- /dev/null +++ b/python/lib/sift_client/_internal/low_level_wrappers/data_imports.py @@ -0,0 +1,212 @@ +from __future__ import annotations + +import logging +from typing import TYPE_CHECKING, cast + +from sift.data_imports.v2.data_imports_pb2 import ( + CreateDataImportFromUploadRequest, + CreateDataImportFromUploadResponse, + CreateDataImportFromUrlRequest, + CreateDataImportFromUrlResponse, + DetectConfigRequest, + DetectConfigResponse, + GetDataImportRequest, + GetDataImportResponse, + ListDataImportsRequest, + ListDataImportsResponse, + RetryDataImportRequest, +) +from sift.data_imports.v2.data_imports_pb2_grpc import DataImportServiceStub + +from sift_client._internal.low_level_wrappers.base import LowLevelClientBase +from sift_client._internal.util.executor import run_sync_function +from sift_client.sift_types.data_import import CsvImportConfig, DataImport +from sift_client.transport import WithGrpcClient, WithRestClient + +if TYPE_CHECKING: + from pathlib import Path + + from sift.data_imports.v2.data_imports_pb2 import DataTypeKey + + from sift_client.transport.grpc_transport import GrpcClient + from sift_client.transport.rest_transport import RestClient + +# Union of all supported config types. Extend this as new formats are added. +ImportConfig = CsvImportConfig + + +def _set_config_on_request( + request: CreateDataImportFromUploadRequest | CreateDataImportFromUrlRequest, + config: ImportConfig, +) -> None: + """Set the appropriate config field on a proto request based on the config type.""" + if isinstance(config, CsvImportConfig): + request.csv_config.CopyFrom(config._to_proto()) + else: + raise TypeError(f"Unsupported import config type: {type(config).__name__}") + + +logger = logging.getLogger(__name__) + + +class DataImportsLowLevelClient(LowLevelClientBase, WithGrpcClient, WithRestClient): + """Low-level client for the DataImportService. + + This class provides a thin wrapper around the autogenerated bindings for the DataImportsAPI. + """ + + def __init__(self, grpc_client: GrpcClient, rest_client: RestClient): + WithGrpcClient.__init__(self, grpc_client=grpc_client) + WithRestClient.__init__(self, rest_client=rest_client) + + async def create_from_upload(self, config: ImportConfig) -> tuple[str, str]: + """Create a data import and get back a presigned upload URL. + + Args: + config: The import configuration. + + Returns: + A tuple of (data_import_id, upload_url). + """ + request = CreateDataImportFromUploadRequest() + _set_config_on_request(request, config) + response = await self._grpc_client.get_stub( + DataImportServiceStub + ).CreateDataImportFromUpload(request) + response = cast("CreateDataImportFromUploadResponse", response) + return response.data_import_id, response.upload_url + + async def upload_file(self, upload_url: str, file_path: Path) -> None: + """Upload a file to a presigned URL. + + Runs the synchronous HTTP POST in a thread pool to avoid blocking + the event loop. + + Args: + upload_url: The presigned URL to upload to. + file_path: Path to the file to upload. + """ + rest_client = self._rest_client + + def _do_upload() -> None: + with open(file_path, "rb") as f: + response = rest_client.post(upload_url, data=f) + response.raise_for_status() + + await run_sync_function(_do_upload) + + async def create_from_url(self, url: str, config: ImportConfig) -> str: + """Create a data import from a remote URL. + + Args: + url: The URL to import from (HTTP or S3). + config: The import configuration. + + Returns: + The data_import_id. + """ + request = CreateDataImportFromUrlRequest(url=url) + _set_config_on_request(request, config) + response = await self._grpc_client.get_stub(DataImportServiceStub).CreateDataImportFromUrl( + request + ) + response = cast("CreateDataImportFromUrlResponse", response) + return response.data_import_id + + async def get(self, data_import_id: str) -> DataImport: + """Get a data import by ID. + + Args: + data_import_id: The ID of the data import. + + Returns: + The DataImport. + """ + request = GetDataImportRequest(data_import_id=data_import_id) + response = await self._grpc_client.get_stub(DataImportServiceStub).GetDataImport(request) + response = cast("GetDataImportResponse", response) + return DataImport._from_proto(response.data_import) + + async def list_( + self, + *, + page_size: int | None = None, + page_token: str | None = None, + query_filter: str = "", + order_by: str = "", + ) -> tuple[list[DataImport], str]: + """List data imports with optional filtering and pagination. + + Args: + page_size: Maximum number of results per page. + page_token: Token for the next page of results. + query_filter: CEL filter string. + order_by: Ordering string (e.g. "created_date desc"). + + Returns: + A tuple of (list of DataImports, next_page_token). + """ + request = ListDataImportsRequest( + filter=query_filter, + order_by=order_by, + ) + if page_size is not None: + request.page_size = page_size + if page_token: + request.page_token = page_token + + response = await self._grpc_client.get_stub(DataImportServiceStub).ListDataImports(request) + response = cast("ListDataImportsResponse", response) + data_imports = [DataImport._from_proto(di) for di in response.data_imports] + return data_imports, response.next_page_token + + async def list_all( + self, + *, + query_filter: str = "", + order_by: str = "", + max_results: int | None = None, + ) -> list[DataImport]: + """List all data imports, handling pagination automatically. + + Args: + query_filter: CEL filter string. + order_by: Ordering string (e.g. "created_date desc"). + max_results: Maximum total results to return. + + Returns: + A list of all matching DataImports. + """ + return await self._handle_pagination( + func=self.list_, + kwargs={"query_filter": query_filter, "order_by": order_by}, + max_results=max_results, + ) + + async def retry(self, data_import_id: str) -> None: + """Retry a failed data import. + + Only works for URL-based imports in a failed state. + + Args: + data_import_id: The ID of the data import to retry. + """ + request = RetryDataImportRequest(data_import_id=data_import_id) + await self._grpc_client.get_stub(DataImportServiceStub).RetryDataImport(request) + + async def detect_config( + self, data: bytes, data_type_key: DataTypeKey.ValueType + ) -> DetectConfigResponse: + """Call the DetectConfig RPC to auto-detect import configuration. + + Args: + data: A sample of the file content (e.g. the first 64 KiB). + data_type_key: The file type hint. + + Returns: + The raw DetectConfigResponse proto. The caller (resource API) + is responsible for converting to a sift_type. + """ + request = DetectConfigRequest(data=data, type=data_type_key) + response = await self._grpc_client.get_stub(DataImportServiceStub).DetectConfig(request) + return cast("DetectConfigResponse", response) diff --git a/python/lib/sift_client/client.py b/python/lib/sift_client/client.py index ed7aeba9a..95fd25b71 100644 --- a/python/lib/sift_client/client.py +++ b/python/lib/sift_client/client.py @@ -9,6 +9,8 @@ ChannelsAPIAsync, DataExportAPI, DataExportAPIAsync, + DataImportAPI, + DataImportAPIAsync, FileAttachmentsAPI, FileAttachmentsAPIAsync, IngestionAPIAsync, @@ -110,6 +112,9 @@ class SiftClient( data_export: DataExportAPI """Instance of the Data Export API for making synchronous requests.""" + data_import: DataImportAPI + """Instance of the Data Import API for making synchronous requests.""" + async_: AsyncAPIs """Accessor for the asynchronous APIs. All asynchronous APIs are available as attributes on this accessor.""" @@ -159,6 +164,7 @@ def __init__( self.tags = TagsAPI(self) self.test_results = TestResultsAPI(self) self.data_export = DataExportAPI(self) + self.data_import = DataImportAPI(self) # Accessor for the asynchronous APIs self.async_ = AsyncAPIs( @@ -175,6 +181,7 @@ def __init__( tags=TagsAPIAsync(self), test_results=TestResultsAPIAsync(self), data_export=DataExportAPIAsync(self), + data_import=DataImportAPIAsync(self), ) @property diff --git a/python/lib/sift_client/resources/__init__.py b/python/lib/sift_client/resources/__init__.py index 78b3b4eba..2b7a4c55b 100644 --- a/python/lib/sift_client/resources/__init__.py +++ b/python/lib/sift_client/resources/__init__.py @@ -162,6 +162,7 @@ async def main(): from sift_client.resources.runs import RunsAPIAsync from sift_client.resources.tags import TagsAPIAsync from sift_client.resources.test_results import TestResultsAPIAsync +from sift_client.resources.data_imports import DataImportAPIAsync from sift_client.resources.exports import DataExportAPIAsync # ruff: noqa All imports needs to be imported before sync_stubs to avoid circular import @@ -178,6 +179,7 @@ async def main(): TestResultsAPI, FileAttachmentsAPI, DataExportAPI, + DataImportAPI, ) import sys @@ -215,4 +217,6 @@ async def main(): "TracingConfig", "DataExportAPI", "DataExportAPIAsync", + "DataImportAPI", + "DataImportAPIAsync", ] diff --git a/python/lib/sift_client/resources/data_imports.py b/python/lib/sift_client/resources/data_imports.py new file mode 100644 index 000000000..8ec2a3706 --- /dev/null +++ b/python/lib/sift_client/resources/data_imports.py @@ -0,0 +1,239 @@ +from __future__ import annotations + +import asyncio +import logging +import time +from pathlib import Path +from typing import TYPE_CHECKING + +from sift.data_imports.v2.data_imports_pb2 import DATA_TYPE_KEY_CSV + +from sift_client._internal.low_level_wrappers.data_imports import DataImportsLowLevelClient +from sift_client.resources._base import ResourceBase +from sift_client.sift_types.data_import import ( + CsvImportConfig, + DataImport, + DataImportStatus, +) +from sift_client.util import cel_utils as cel + +if TYPE_CHECKING: + from sift_client._internal.low_level_wrappers.data_imports import ImportConfig + from sift_client.client import SiftClient + +logger = logging.getLogger(__name__) + +_DETECT_CONFIG_SAMPLE_SIZE = 65_536 # 64 KiB + + +class DataImportAPIAsync(ResourceBase): + """High-level API for importing data into Sift. + + Supports importing data from local files or remote URLs. Returns a + `DataImport` object that can be polled for status. + """ + + def __init__(self, sift_client: SiftClient): + """Initialize the DataImportAPI. + + Args: + sift_client: The Sift client to use. + """ + super().__init__(sift_client) + self._low_level_client = DataImportsLowLevelClient( + grpc_client=self.client.grpc_client, + rest_client=self.client.rest_client, + ) + + async def import_from_path( + self, + *, + file_path: str | Path, + config: ImportConfig, + ) -> DataImport: + """Import data from a local file. + + Creates a data import on the server and uploads the file to the + returned presigned URL. Returns a :class:`DataImport` that can be + polled for status via ``data_import.refresh()``. + + Args: + file_path: Path to the local file to import. + config: Import configuration describing the file format and column + mapping. + + Returns: + A :class:`DataImport` representing the import operation. + + Raises: + FileNotFoundError: If the file does not exist. + """ + path = Path(file_path) + if not path.is_file(): + raise FileNotFoundError(f"File not found: {file_path}") + + data_import_id, upload_url = await self._low_level_client.create_from_upload(config) + logger.info("Created data import %s", data_import_id) + + await self._low_level_client.upload_file(upload_url, path) + logger.info("Uploaded file to presigned URL for import %s", data_import_id) + + data_import = await self._low_level_client.get(data_import_id) + return self._apply_client_to_instance(data_import) + + async def import_from_url( + self, + *, + url: str, + config: ImportConfig, + ) -> DataImport: + """Import data from a remote URL (HTTP or S3). + + Returns a :class:`DataImport` that can be polled for status via + ``data_import.refresh()``. + + Args: + url: The URL to import from. + config: Import configuration describing the file format and column + mapping. + + Returns: + A :class:`DataImport` representing the import operation. + """ + data_import_id = await self._low_level_client.create_from_url(url, config) + logger.info("Created URL-based data import %s", data_import_id) + + data_import = await self._low_level_client.get(data_import_id) + return self._apply_client_to_instance(data_import) + + async def get(self, data_import_id: str) -> DataImport: + """Get a data import by ID. + + Args: + data_import_id: The ID of the data import. + + Returns: + The DataImport. + """ + data_import = await self._low_level_client.get(data_import_id) + return self._apply_client_to_instance(data_import) + + async def list_( + self, + *, + data_import_ids: list[str] | None = None, + status: DataImportStatus | None = None, + filter_query: str | None = None, + order_by: str | None = None, + limit: int | None = None, + ) -> list[DataImport]: + """List data imports with optional filtering. + + Args: + data_import_ids: Filter to imports with any of these IDs. + status: Filter to imports with this status. + filter_query: Explicit CEL filter string. + order_by: Ordering string (e.g. "created_date desc"). + limit: Maximum number of imports to return. If None, returns all. + + Returns: + A list of DataImport objects matching the filter criteria. + """ + filter_parts = [] + if data_import_ids: + filter_parts.append(cel.in_("data_import_id", data_import_ids)) + if status is not None: + filter_parts.append(cel.equals("status", str(status.value))) + if filter_query: + filter_parts.append(filter_query) + query_filter = cel.and_(*filter_parts) + + data_imports = await self._low_level_client.list_all( + query_filter=query_filter or "", + order_by=order_by or "", + max_results=limit, + ) + return self._apply_client_to_instances(data_imports) + + async def retry(self, data_import: str | DataImport) -> None: + """Retry a failed data import. + + Only works for URL-based imports in a failed state. + + Args: + data_import: The DataImport or data_import_id to retry. + """ + data_import_id = ( + data_import._id_or_error if isinstance(data_import, DataImport) else data_import + ) + await self._low_level_client.retry(data_import_id) + + async def detect_config(self, file_path: str | Path) -> CsvImportConfig: + """Auto-detect import configuration from a file. + + Reads a sample of the file, sends it to the server's DetectConfig + endpoint, and returns the detected configuration. You can inspect + and modify the result before passing it to :meth:`import_from_path`. + + Currently supports CSV files only. + + Args: + file_path: Path to the file to analyze. + + Returns: + The detected import config. + + Raises: + FileNotFoundError: If the file does not exist. + ValueError: If detection returns no config. + """ + path = Path(file_path) + if not path.is_file(): + raise FileNotFoundError(f"File not found: {file_path}") + + with open(path, "rb") as f: + sample = f.read(_DETECT_CONFIG_SAMPLE_SIZE) + + response = await self._low_level_client.detect_config(sample, DATA_TYPE_KEY_CSV) + + if response.HasField("csv_config"): + return CsvImportConfig._from_proto(response.csv_config) + + raise ValueError("Server returned an empty DetectConfig response.") + + async def wait_until_complete( + self, + data_import: str | DataImport, + *, + polling_interval_secs: int = 5, + timeout_secs: int | None = None, + ) -> DataImport: + """Wait until a data import reaches a terminal state. + + Polls the import status at the given interval until the import is + SUCCEEDED or FAILED, returning the completed DataImport. + + Args: + data_import: The DataImport or data_import_id to wait for. + polling_interval_secs: Seconds between status polls. Defaults to 5s. + timeout_secs: Maximum seconds to wait. If None, polls indefinitely. + Defaults to None (indefinite). + + Returns: + The DataImport in its terminal state. + """ + data_import_id = ( + data_import._id_or_error if isinstance(data_import, DataImport) else data_import + ) + + start = time.monotonic() + while True: + result = await self.get(data_import_id) + if result.is_complete: + return result + if timeout_secs is not None and (time.monotonic() - start) >= timeout_secs: + raise TimeoutError( + f"Data import '{data_import_id}' did not complete " + f"within {timeout_secs} seconds." + ) + await asyncio.sleep(polling_interval_secs) diff --git a/python/lib/sift_client/resources/sync_stubs/__init__.py b/python/lib/sift_client/resources/sync_stubs/__init__.py index acd73755e..982a028c6 100644 --- a/python/lib/sift_client/resources/sync_stubs/__init__.py +++ b/python/lib/sift_client/resources/sync_stubs/__init__.py @@ -8,6 +8,7 @@ CalculatedChannelsAPIAsync, ChannelsAPIAsync, DataExportAPIAsync, + DataImportAPIAsync, FileAttachmentsAPIAsync, JobsAPIAsync, PingAPIAsync, @@ -30,12 +31,14 @@ TagsAPI = generate_sync_api(TagsAPIAsync, "TagsAPI") TestResultsAPI = generate_sync_api(TestResultsAPIAsync, "TestResultsAPI") DataExportAPI = generate_sync_api(DataExportAPIAsync, "DataExportAPI") +DataImportAPI = generate_sync_api(DataImportAPIAsync, "DataImportAPI") __all__ = [ "AssetsAPI", "CalculatedChannelsAPI", "ChannelsAPI", "DataExportAPI", + "DataImportAPI", "FileAttachmentsAPI", "JobsAPI", "PingAPI", diff --git a/python/lib/sift_client/resources/sync_stubs/__init__.pyi b/python/lib/sift_client/resources/sync_stubs/__init__.pyi index 02e53aeb5..0deb2928e 100644 --- a/python/lib/sift_client/resources/sync_stubs/__init__.pyi +++ b/python/lib/sift_client/resources/sync_stubs/__init__.pyi @@ -13,6 +13,9 @@ if TYPE_CHECKING: import pandas as pd import pyarrow as pa + from sift_client._internal.low_level_wrappers.data_imports import ( + ImportConfig, + ) from sift_client.client import SiftClient from sift_client.sift_types.asset import Asset, AssetUpdate from sift_client.sift_types.calculated_channel import ( @@ -21,6 +24,7 @@ if TYPE_CHECKING: CalculatedChannelUpdate, ) from sift_client.sift_types.channel import Channel + from sift_client.sift_types.data_import import CsvImportConfig, DataImport, DataImportStatus from sift_client.sift_types.export import ExportOutputFormat from sift_client.sift_types.file_attachment import ( FileAttachment, @@ -621,6 +625,148 @@ class DataExportAPI: """ ... +class DataImportAPI: + """Sync counterpart to `DataImportAPIAsync`. + + High-level API for importing data into Sift. + + Supports importing data from local files or remote URLs. Returns a + `DataImport` object that can be polled for status. + """ + + def __init__(self, sift_client: SiftClient): + """Initialize the DataImportAPI. + + Args: + sift_client: The Sift client to use. + """ + ... + + def _run(self, coro): ... + def detect_config(self, file_path: str | Path) -> CsvImportConfig: + """Auto-detect import configuration from a file. + + Reads a sample of the file, sends it to the server's DetectConfig + endpoint, and returns the detected configuration. You can inspect + and modify the result before passing it to :meth:`import_from_path`. + + Currently supports CSV files only. + + Args: + file_path: Path to the file to analyze. + + Returns: + The detected import config. + + Raises: + FileNotFoundError: If the file does not exist. + ValueError: If detection returns no config. + """ + ... + + def get(self, data_import_id: str) -> DataImport: + """Get a data import by ID. + + Args: + data_import_id: The ID of the data import. + + Returns: + The DataImport. + """ + ... + + def import_from_path(self, *, file_path: str | Path, config: ImportConfig) -> DataImport: + """Import data from a local file. + + Creates a data import on the server and uploads the file to the + returned presigned URL. Returns a :class:`DataImport` that can be + polled for status via ``data_import.refresh()``. + + Args: + file_path: Path to the local file to import. + config: Import configuration describing the file format and column + mapping. + + Returns: + A :class:`DataImport` representing the import operation. + + Raises: + FileNotFoundError: If the file does not exist. + """ + ... + + def import_from_url(self, *, url: str, config: ImportConfig) -> DataImport: + """Import data from a remote URL (HTTP or S3). + + Returns a :class:`DataImport` that can be polled for status via + ``data_import.refresh()``. + + Args: + url: The URL to import from. + config: Import configuration describing the file format and column + mapping. + + Returns: + A :class:`DataImport` representing the import operation. + """ + ... + + def list_( + self, + *, + data_import_ids: list[str] | None = None, + status: DataImportStatus | None = None, + filter_query: str | None = None, + order_by: str | None = None, + limit: int | None = None, + ) -> list[DataImport]: + """List data imports with optional filtering. + + Args: + data_import_ids: Filter to imports with any of these IDs. + status: Filter to imports with this status. + filter_query: Explicit CEL filter string. + order_by: Ordering string (e.g. "created_date desc"). + limit: Maximum number of imports to return. If None, returns all. + + Returns: + A list of DataImport objects matching the filter criteria. + """ + ... + + def retry(self, data_import: str | DataImport) -> None: + """Retry a failed data import. + + Only works for URL-based imports in a failed state. + + Args: + data_import: The DataImport or data_import_id to retry. + """ + ... + + def wait_until_complete( + self, + data_import: str | DataImport, + *, + polling_interval_secs: int = 5, + timeout_secs: int | None = None, + ) -> DataImport: + """Wait until a data import reaches a terminal state. + + Polls the import status at the given interval until the import is + SUCCEEDED or FAILED, returning the completed DataImport. + + Args: + data_import: The DataImport or data_import_id to wait for. + polling_interval_secs: Seconds between status polls. Defaults to 5s. + timeout_secs: Maximum seconds to wait. If None, polls indefinitely. + Defaults to None (indefinite). + + Returns: + The DataImport in its terminal state. + """ + ... + class FileAttachmentsAPI: """Sync counterpart to `FileAttachmentsAPIAsync`. diff --git a/python/lib/sift_client/sift_types/data_import.py b/python/lib/sift_client/sift_types/data_import.py new file mode 100644 index 000000000..fc0bf119a --- /dev/null +++ b/python/lib/sift_client/sift_types/data_import.py @@ -0,0 +1,269 @@ +from __future__ import annotations + +from datetime import datetime # noqa: TC003 +from enum import Enum +from typing import TYPE_CHECKING + +from pydantic import BaseModel, ConfigDict +from sift.common.type.v1.channel_config_pb2 import ChannelConfig as ChannelConfigProto +from sift.data_imports.v2.data_imports_pb2 import CsvConfig as CsvConfigProto +from sift.data_imports.v2.data_imports_pb2 import CsvTimeColumn as CsvTimeColumnProto +from sift.data_imports.v2.data_imports_pb2 import DataImport as DataImportProto +from sift.data_imports.v2.data_imports_pb2 import DataImportStatus as DataImportStatusProto +from sift.data_imports.v2.data_imports_pb2 import TimeFormat as TimeFormatProto + +from sift_client._internal.util.timestamp import to_pb_timestamp +from sift_client.sift_types._base import BaseType +from sift_client.sift_types.channel import ChannelDataType + +if TYPE_CHECKING: + from sift_client.client import SiftClient + + +# --------------------------------------------------------------------------- +# Enums +# --------------------------------------------------------------------------- + + +class TimeFormat(Enum): + """Supported time formats for data import columns.""" + + RELATIVE_NANOSECONDS = TimeFormatProto.TIME_FORMAT_RELATIVE_NANOSECONDS + RELATIVE_MICROSECONDS = TimeFormatProto.TIME_FORMAT_RELATIVE_MICROSECONDS + RELATIVE_MILLISECONDS = TimeFormatProto.TIME_FORMAT_RELATIVE_MILLISECONDS + RELATIVE_SECONDS = TimeFormatProto.TIME_FORMAT_RELATIVE_SECONDS + RELATIVE_MINUTES = TimeFormatProto.TIME_FORMAT_RELATIVE_MINUTES + RELATIVE_HOURS = TimeFormatProto.TIME_FORMAT_RELATIVE_HOURS + ABSOLUTE_RFC3339 = TimeFormatProto.TIME_FORMAT_ABSOLUTE_RFC3339 + ABSOLUTE_DATETIME = TimeFormatProto.TIME_FORMAT_ABSOLUTE_DATETIME + ABSOLUTE_UNIX_SECONDS = TimeFormatProto.TIME_FORMAT_ABSOLUTE_UNIX_SECONDS + ABSOLUTE_UNIX_MILLISECONDS = TimeFormatProto.TIME_FORMAT_ABSOLUTE_UNIX_MILLISECONDS + ABSOLUTE_UNIX_MICROSECONDS = TimeFormatProto.TIME_FORMAT_ABSOLUTE_UNIX_MICROSECONDS + ABSOLUTE_UNIX_NANOSECONDS = TimeFormatProto.TIME_FORMAT_ABSOLUTE_UNIX_NANOSECONDS + + +class DataImportStatus(Enum): + """Status of a data import.""" + + PENDING = DataImportStatusProto.DATA_IMPORT_STATUS_PENDING + IN_PROGRESS = DataImportStatusProto.DATA_IMPORT_STATUS_IN_PROGRESS + SUCCEEDED = DataImportStatusProto.DATA_IMPORT_STATUS_SUCCEEDED + FAILED = DataImportStatusProto.DATA_IMPORT_STATUS_FAILED + + +# --------------------------------------------------------------------------- +# CSV config types +# --------------------------------------------------------------------------- + + +class CsvTimeColumn(BaseModel): + """Time column configuration for CSV imports. + + Attributes: + column: The 1-indexed column number of the time column. + format: The time format used in this column. + relative_start_time: Required when using a relative time format. + """ + + model_config = ConfigDict(frozen=True) + + column: int + format: TimeFormat + relative_start_time: datetime | None = None + + def _to_proto(self) -> CsvTimeColumnProto: + proto = CsvTimeColumnProto( + column_number=self.column, + format=self.format.value, + ) + if self.relative_start_time is not None: + proto.relative_start_time.CopyFrom(to_pb_timestamp(self.relative_start_time)) + return proto + + +class CsvDataColumn(BaseModel): + """A data column definition for CSV imports. + + Attributes: + column: The 1-indexed column number. + name: Channel name. + data_type: The data type of the channel values. + units: Optional units string. + description: Optional channel description. + """ + + model_config = ConfigDict(frozen=True) + + column: int + name: str + data_type: ChannelDataType + units: str = "" + description: str = "" + + +class CsvImportConfig(BaseModel): + """Configuration for importing a CSV file. + + Attributes: + asset_name: Name of the asset to import data into. + run_name: Name for the run. Ignored if ``run_id`` is set. + run_id: ID of an existing run to append data to. + first_data_row: The first row containing data (1-indexed). Defaults to 2 to skip a header row. + time_column: Time column configuration. + data_columns: List of data column definitions. + """ + + model_config = ConfigDict(frozen=True) + + asset_name: str + run_name: str | None = None + run_id: str | None = None + first_data_row: int = 2 + time_column: CsvTimeColumn + data_columns: list[CsvDataColumn] + + def _to_proto(self) -> CsvConfigProto: + return CsvConfigProto( + asset_name=self.asset_name, + run_name=self.run_name or "", + run_id=self.run_id or "", + first_data_row=self.first_data_row, + time_column=self.time_column._to_proto(), + data_columns={ + dc.column: ChannelConfigProto( + name=dc.name, + data_type=dc.data_type.value, + units=dc.units, + description=dc.description, + ) + for dc in self.data_columns + }, + ) + + @classmethod + def _from_proto(cls, proto: CsvConfigProto) -> CsvImportConfig: + """Create from a proto CsvConfig (e.g. from DetectConfig response).""" + time_column = CsvTimeColumn( + column=proto.time_column.column_number, + format=TimeFormat(proto.time_column.format), + ) + data_columns = [ + CsvDataColumn( + column=col_num, + name=ch_cfg.name, + data_type=ChannelDataType(ch_cfg.data_type), + units=ch_cfg.units, + description=ch_cfg.description, + ) + for col_num, ch_cfg in proto.data_columns.items() + ] + return cls( + asset_name=proto.asset_name, + run_name=proto.run_name or None, + run_id=proto.run_id or None, + first_data_row=proto.first_data_row or 2, + time_column=time_column, + data_columns=data_columns, + ) + + +# --------------------------------------------------------------------------- +# DataImport resource type +# --------------------------------------------------------------------------- + + +class DataImport(BaseType[DataImportProto, "DataImport"]): + """A data import in the Sift system. + + Represents the status and metadata of an import operation. Use + ``client.data_import.upload()`` to create one, or ``client.data_import.get()`` + to retrieve an existing import by ID. + """ + + # Required fields + status: DataImportStatus + created_date: datetime + modified_date: datetime + + # Optional fields + error_message: str | None + source_url: str | None + run_id: str | None + report_id: str | None + asset_id: str | None + data_start_time: datetime | None + data_stop_time: datetime | None + + # Config used for this import + csv_config: CsvImportConfig | None + + @classmethod + def _from_proto( + cls, proto: DataImportProto, sift_client: SiftClient | None = None + ) -> DataImport: + from datetime import timezone + + return cls( + proto=proto, + id_=proto.data_import_id, + status=DataImportStatus(proto.status), + error_message=proto.error_message or None, + created_date=proto.created_date.ToDatetime(tzinfo=timezone.utc), + modified_date=proto.modified_date.ToDatetime(tzinfo=timezone.utc), + source_url=proto.source_url or None, + run_id=proto.run_id if proto.HasField("_run_id") else None, + report_id=proto.report_id if proto.HasField("_report_id") else None, + asset_id=proto.asset_id if proto.HasField("_asset_id") else None, + data_start_time=( + proto.data_start_time.ToDatetime(tzinfo=timezone.utc) + if proto.HasField("_data_start_time") + else None + ), + data_stop_time=( + proto.data_stop_time.ToDatetime(tzinfo=timezone.utc) + if proto.HasField("_data_stop_time") + else None + ), + csv_config=( + CsvImportConfig._from_proto(proto.csv_config) + if proto.HasField("csv_config") + else None + ), + _client=sift_client, + ) + + @property + def is_pending(self) -> bool: + """Return True if the import is pending.""" + return self.status == DataImportStatus.PENDING + + @property + def is_in_progress(self) -> bool: + """Return True if the import is in progress.""" + return self.status == DataImportStatus.IN_PROGRESS + + @property + def is_succeeded(self) -> bool: + """Return True if the import succeeded.""" + return self.status == DataImportStatus.SUCCEEDED + + @property + def is_failed(self) -> bool: + """Return True if the import failed.""" + return self.status == DataImportStatus.FAILED + + @property + def is_complete(self) -> bool: + """Return True if the import reached a terminal state (succeeded or failed).""" + return self.status in (DataImportStatus.SUCCEEDED, DataImportStatus.FAILED) + + def refresh(self) -> DataImport: + """Refresh this import with the latest data from the API.""" + updated = self.client.data_import.get(self._id_or_error) + self._update(updated) + return self + + def retry(self) -> None: + """Retry this import. Only works for URL-based imports in a failed state.""" + self.client.data_import.retry(self._id_or_error) + self.refresh() diff --git a/python/lib/sift_client/util/util.py b/python/lib/sift_client/util/util.py index e82a8ccfe..98719cfdd 100644 --- a/python/lib/sift_client/util/util.py +++ b/python/lib/sift_client/util/util.py @@ -8,6 +8,7 @@ CalculatedChannelsAPIAsync, ChannelsAPIAsync, DataExportAPIAsync, + DataImportAPIAsync, FileAttachmentsAPIAsync, IngestionAPIAsync, JobsAPIAsync, @@ -62,6 +63,9 @@ class AsyncAPIs(NamedTuple): data_export: DataExportAPIAsync """Instance of the Data Export API for making asynchronous requests.""" + data_import: DataImportAPIAsync + """Instance of the Data Import API for making asynchronous requests.""" + def count_non_none(*args: Any) -> int: """Count the number of non-none arguments.""" From 265941cb7841ee833fbf7d2e1ec2893a314c71f3 Mon Sep 17 00:00:00 2001 From: Wei Qi Lu Date: Fri, 27 Mar 2026 08:46:42 -0700 Subject: [PATCH 2/2] add detect config data types --- .../lib/sift_client/resources/data_imports.py | 70 ++++++++++++++++--- .../resources/sync_stubs/__init__.pyi | 36 ++++++++-- .../lib/sift_client/sift_types/data_import.py | 28 ++++++++ 3 files changed, 116 insertions(+), 18 deletions(-) diff --git a/python/lib/sift_client/resources/data_imports.py b/python/lib/sift_client/resources/data_imports.py index 8ec2a3706..bc2ac9cdf 100644 --- a/python/lib/sift_client/resources/data_imports.py +++ b/python/lib/sift_client/resources/data_imports.py @@ -6,11 +6,10 @@ from pathlib import Path from typing import TYPE_CHECKING -from sift.data_imports.v2.data_imports_pb2 import DATA_TYPE_KEY_CSV - from sift_client._internal.low_level_wrappers.data_imports import DataImportsLowLevelClient from sift_client.resources._base import ResourceBase from sift_client.sift_types.data_import import ( + EXTENSION_TO_DATA_TYPE_KEY, CsvImportConfig, DataImport, DataImportStatus, @@ -49,7 +48,10 @@ async def import_from_path( self, *, file_path: str | Path, - config: ImportConfig, + config: ImportConfig | None = None, + asset_name: str | None = None, + run_name: str | None = None, + run_id: str | None = None, ) -> DataImport: """Import data from a local file. @@ -57,21 +59,47 @@ async def import_from_path( returned presigned URL. Returns a :class:`DataImport` that can be polled for status via ``data_import.refresh()``. + When ``config`` is omitted the file format is auto-detected via + :meth:`detect_config` and a :class:`CsvImportConfig` is built using + the provided ``asset_name`` and optional ``run_name`` / ``run_id``. + Args: file_path: Path to the local file to import. config: Import configuration describing the file format and column - mapping. + mapping. When provided, ``asset_name``, ``run_name``, and + ``run_id`` are ignored. + asset_name: Name of the asset to import into. Required when + ``config`` is not provided. + run_name: Optional run name. Only used when ``config`` is not + provided. + run_id: Optional existing run ID. Only used when ``config`` is not + provided. Returns: A :class:`DataImport` representing the import operation. Raises: FileNotFoundError: If the file does not exist. + ValueError: If neither ``config`` nor ``asset_name`` is provided. """ path = Path(file_path) if not path.is_file(): raise FileNotFoundError(f"File not found: {file_path}") + if config is None: + if asset_name is None: + raise ValueError( + "Either 'config' or 'asset_name' must be provided." + ) + detected = await self.detect_config(file_path) + config = detected.model_copy( + update={ + "asset_name": asset_name, + "run_name": run_name, + "run_id": run_id, + } + ) + data_import_id, upload_url = await self._low_level_client.create_from_upload(config) logger.info("Created data import %s", data_import_id) @@ -168,14 +196,15 @@ async def retry(self, data_import: str | DataImport) -> None: ) await self._low_level_client.retry(data_import_id) - async def detect_config(self, file_path: str | Path) -> CsvImportConfig: + async def detect_config(self, file_path: str | Path) -> ImportConfig: """Auto-detect import configuration from a file. Reads a sample of the file, sends it to the server's DetectConfig - endpoint, and returns the detected configuration. You can inspect - and modify the result before passing it to :meth:`import_from_path`. + endpoint, and returns the detected configuration. The file format + is inferred from the file extension. You can inspect and modify the + result before passing it to :meth:`import_from_path`. - Currently supports CSV files only. + Supported extensions: .csv, .parquet, .tdms, .ch10, .ch11, .h5, .hdf5 Args: file_path: Path to the file to analyze. @@ -185,19 +214,38 @@ async def detect_config(self, file_path: str | Path) -> CsvImportConfig: Raises: FileNotFoundError: If the file does not exist. - ValueError: If detection returns no config. + ValueError: If the file extension is unsupported or detection + returns no config. """ path = Path(file_path) if not path.is_file(): raise FileNotFoundError(f"File not found: {file_path}") + ext = path.suffix.lower() + data_type_key = EXTENSION_TO_DATA_TYPE_KEY.get(ext) + if data_type_key is None: + raise ValueError( + f"Unsupported file extension '{ext}'. " + f"Supported: {', '.join(sorted(EXTENSION_TO_DATA_TYPE_KEY))}" + ) + with open(path, "rb") as f: sample = f.read(_DETECT_CONFIG_SAMPLE_SIZE) - response = await self._low_level_client.detect_config(sample, DATA_TYPE_KEY_CSV) + response = await self._low_level_client.detect_config(sample, data_type_key.value) if response.HasField("csv_config"): - return CsvImportConfig._from_proto(response.csv_config) + config = CsvImportConfig._from_proto(response.csv_config) + # The server's DetectConfig may include the time column in + # data_columns, but CreateDataImportFromUpload rejects that + # overlap. Filter it out so the config is import-ready. + time_col = config.time_column.column + filtered = [dc for dc in config.data_columns if dc.column != time_col] + if len(filtered) != len(config.data_columns): + config = config.model_copy(update={"data_columns": filtered}) + return config + + # TODO: Add parquet_config and hdf5_config once their config types are added. raise ValueError("Server returned an empty DetectConfig response.") diff --git a/python/lib/sift_client/resources/sync_stubs/__init__.pyi b/python/lib/sift_client/resources/sync_stubs/__init__.pyi index 0deb2928e..2dd6e479a 100644 --- a/python/lib/sift_client/resources/sync_stubs/__init__.pyi +++ b/python/lib/sift_client/resources/sync_stubs/__init__.pyi @@ -643,14 +643,15 @@ class DataImportAPI: ... def _run(self, coro): ... - def detect_config(self, file_path: str | Path) -> CsvImportConfig: + def detect_config(self, file_path: str | Path) -> ImportConfig: """Auto-detect import configuration from a file. Reads a sample of the file, sends it to the server's DetectConfig - endpoint, and returns the detected configuration. You can inspect - and modify the result before passing it to :meth:`import_from_path`. + endpoint, and returns the detected configuration. The file format + is inferred from the file extension. You can inspect and modify the + result before passing it to :meth:`import_from_path`. - Currently supports CSV files only. + Supported extensions: .csv, .parquet, .tdms, .ch10, .ch11, .h5, .hdf5 Args: file_path: Path to the file to analyze. @@ -660,7 +661,8 @@ class DataImportAPI: Raises: FileNotFoundError: If the file does not exist. - ValueError: If detection returns no config. + ValueError: If the file extension is unsupported or detection + returns no config. """ ... @@ -675,23 +677,43 @@ class DataImportAPI: """ ... - def import_from_path(self, *, file_path: str | Path, config: ImportConfig) -> DataImport: + def import_from_path( + self, + *, + file_path: str | Path, + config: ImportConfig | None = None, + asset_name: str | None = None, + run_name: str | None = None, + run_id: str | None = None, + ) -> DataImport: """Import data from a local file. Creates a data import on the server and uploads the file to the returned presigned URL. Returns a :class:`DataImport` that can be polled for status via ``data_import.refresh()``. + When ``config`` is omitted the file format is auto-detected via + :meth:`detect_config` and a :class:`CsvImportConfig` is built using + the provided ``asset_name`` and optional ``run_name`` / ``run_id``. + Args: file_path: Path to the local file to import. config: Import configuration describing the file format and column - mapping. + mapping. When provided, ``asset_name``, ``run_name``, and + ``run_id`` are ignored. + asset_name: Name of the asset to import into. Required when + ``config`` is not provided. + run_name: Optional run name. Only used when ``config`` is not + provided. + run_id: Optional existing run ID. Only used when ``config`` is not + provided. Returns: A :class:`DataImport` representing the import operation. Raises: FileNotFoundError: If the file does not exist. + ValueError: If neither ``config`` nor ``asset_name`` is provided. """ ... diff --git a/python/lib/sift_client/sift_types/data_import.py b/python/lib/sift_client/sift_types/data_import.py index fc0bf119a..3ced4e9f6 100644 --- a/python/lib/sift_client/sift_types/data_import.py +++ b/python/lib/sift_client/sift_types/data_import.py @@ -6,6 +6,13 @@ from pydantic import BaseModel, ConfigDict from sift.common.type.v1.channel_config_pb2 import ChannelConfig as ChannelConfigProto +from sift.data_imports.v2.data_imports_pb2 import ( + DATA_TYPE_KEY_CH10, + DATA_TYPE_KEY_CSV, + DATA_TYPE_KEY_HDF5, + DATA_TYPE_KEY_PARQUET_FLATDATASET, + DATA_TYPE_KEY_TDMS, +) from sift.data_imports.v2.data_imports_pb2 import CsvConfig as CsvConfigProto from sift.data_imports.v2.data_imports_pb2 import CsvTimeColumn as CsvTimeColumnProto from sift.data_imports.v2.data_imports_pb2 import DataImport as DataImportProto @@ -51,6 +58,27 @@ class DataImportStatus(Enum): FAILED = DataImportStatusProto.DATA_IMPORT_STATUS_FAILED +class DataTypeKey(Enum): + """Supported file types for data import detection.""" + + CSV = DATA_TYPE_KEY_CSV + PARQUET = DATA_TYPE_KEY_PARQUET_FLATDATASET + TDMS = DATA_TYPE_KEY_TDMS + CH10 = DATA_TYPE_KEY_CH10 + HDF5 = DATA_TYPE_KEY_HDF5 + + +EXTENSION_TO_DATA_TYPE_KEY: dict[str, DataTypeKey] = { + ".csv": DataTypeKey.CSV, + ".parquet": DataTypeKey.PARQUET, + ".tdms": DataTypeKey.TDMS, + ".ch10": DataTypeKey.CH10, + ".ch11": DataTypeKey.CH10, + ".h5": DataTypeKey.HDF5, + ".hdf5": DataTypeKey.HDF5, +} + + # --------------------------------------------------------------------------- # CSV config types # ---------------------------------------------------------------------------