diff --git a/docs/tutorials/02-dataset-format.ipynb b/docs/tutorials/02-dataset-format.ipynb index 2338c76..eb7fc75 100644 --- a/docs/tutorials/02-dataset-format.ipynb +++ b/docs/tutorials/02-dataset-format.ipynb @@ -4,12 +4,13 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "This notebook answers the following questions:\n", - "1. What dataset format does `fev` expect?\n", - "2. How is this format different from other popular time series data formats?\n", - "3. How to convert my dataset into a format expected by `fev`?\n", + "# Dataset format\n", "\n", - "For information on how to convert a `datasets.Dataset` into other popular time series data formats see notebook [04-models.ipynb](./04-models.ipynb)." + "This notebook covers:\n", + "1. The **simple option**: pass a long-format DataFrame (or parquet file) directly to `fev`.\n", + "2. The **data schema** that `fev` expects.\n", + "3. The **native format** used internally by `fev`, and why we use it.\n", + "4. Formats that are **not supported**, and the reasoning." ] }, { @@ -20,6 +21,10 @@ "source": [ "import warnings\n", "import datasets\n", + "import pandas as pd\n", + "\n", + "import fev\n", + "import fev.utils\n", "\n", "warnings.simplefilter(\"ignore\")\n", "datasets.disable_progress_bars()" @@ -29,194 +34,17 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "## What dataset format does `fev` expect?\n", - "We store time series datasets using the Hugging Face `datasets` library.\n", - "\n", - "We assume that all time series datasets obey the following schema:\n", - "- each dataset entry (=row) represents a single (univariate/multivariate) time series\n", - "- each entry contains\n", - " - 1/ a field of type `Sequence(timestamp)` that contains the **timestamps** of observations\n", - " - 2/ at least one field of type `Sequence(float)` that can be used as the **target** time series\n", - " - 3/ a field of type `string` that contains the **unique ID** of each time series\n", - "- all fields of type `Sequence` have the same length\n", + "## The simple option: a long-format DataFrame\n", "\n", - "A few notes about the above schema:\n", - "- The ID, timestamp and target fields can have arbitrary names. These names can be specified when creating an `fev.Task` object.\n", - "- In addition to the required fields above, the dataset can contain arbitrary other fields such as \n", - " - extra dynamic columns of type `Sequence`\n", - " - static features of type `Value` or `Image`\n", - "- The dataset itself contains no information about the forecasting task. For example, the dataset does not say which dynamic columns should be used as the target column or exogenous features, or which columns are known only in the past. Such design makes it easy to re-use the same dataset across multiple different tasks without data duplication.\n", + "If your data is already in a long-format pandas DataFrame (one row per `(id, timestamp)` observation), you can use it with `fev` without any manual conversion.\n", "\n", - "Here is an example of such dataset taken from https://huggingface.co/datasets/autogluon/chronos_datasets." + "Here is an example dataset:" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, - "outputs": [ - { - "data": { - "text/plain": [ - "Dataset({\n", - " features: ['id', 'timestamp', 'target', 'city', 'station', 'measurement'],\n", - " num_rows: 270\n", - "})" - ] - }, - "execution_count": 2, - "metadata": {}, - "output_type": "execute_result" - } - ], - "source": [ - "ds = datasets.load_dataset(\"autogluon/chronos_datasets\", \"monash_kdd_cup_2018\", split=\"train\")\n", - "ds.set_format(\"numpy\")\n", - "ds" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Each entry corresponds to a single time series" - ] - }, - { - "cell_type": "code", - "execution_count": 3, - "metadata": {}, - "outputs": [ - { - "data": { - "text/plain": [ - "{'id': np.str_('T000000'),\n", - " 'timestamp': array(['2017-01-01T14:00:00.000', '2017-01-01T15:00:00.000',\n", - " '2017-01-01T16:00:00.000', ..., '2018-03-31T13:00:00.000',\n", - " '2018-03-31T14:00:00.000', '2018-03-31T15:00:00.000'],\n", - " dtype='datetime64[ms]'),\n", - " 'target': array([453., 417., 395., ..., 132., 158., 118.], dtype=float32),\n", - " 'city': np.str_('Beijing'),\n", - " 'station': np.str_('aotizhongxin_aq'),\n", - " 'measurement': np.str_('PM2.5')}" - ] - }, - "execution_count": 3, - "metadata": {}, - "output_type": "execute_result" - } - ], - "source": [ - "ds[0]" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "The `datasets` library conveniently stores metadata about the different features of the dataset." - ] - }, - { - "cell_type": "code", - "execution_count": 4, - "metadata": {}, - "outputs": [ - { - "data": { - "text/plain": [ - "{'id': Value(dtype='string', id=None),\n", - " 'timestamp': Sequence(feature=Value(dtype='timestamp[ms]', id=None), length=-1, id=None),\n", - " 'target': Sequence(feature=Value(dtype='float64', id=None), length=-1, id=None),\n", - " 'city': Value(dtype='string', id=None),\n", - " 'station': Value(dtype='string', id=None),\n", - " 'measurement': Value(dtype='string', id=None)}" - ] - }, - "execution_count": 4, - "metadata": {}, - "output_type": "execute_result" - } - ], - "source": [ - "ds.features" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## What are the advantages of the \"fev format\" compared to other common formats?\n", - "We find the above dataset format (\"fev format\") more convenient and practical compared to other popular formats for storing time series data.\n", - "\n", - "**Long-format data frame** is quite common for storing data, is human readable and widely used by practitioners.\n", - "\n", - "| item_id | timestamp | scaled_price | promotion_email | promotion_homepage | unit_sales | product_code | product_category | product_subcategory | location_code |\n", - "|----------:|:------------|---------------:|------------------:|---------------------:|-------------:|---------------:|:-------------------|:----------------------|----------------:|\n", - "| 1062_101 | 2018-01-01 | 0.87913 | 0 | 0 | 636 | 1062 | Beverages | Fruit Juice Mango | 101 |\n", - "| 1062_101 | 2018-01-08 | 0.994517 | 0 | 0 | 123 | 1062 | Beverages | Fruit Juice Mango | 101 |\n", - "| 1062_101 | 2018-01-15 | 1.00551 | 0 | 0 | 391 | 1062 | Beverages | Fruit Juice Mango | 101 |\n", - "| 1062_101 | 2018-01-22 | 1 | 0 | 0 | 339 | 1062 | Beverages | Fruit Juice Mango | 101 |\n", - "| ... | ... | ... | ... | ... | ... | ... | ... | ... | ... |\n", - "\n", - "The long-format data frame has two main limitations compared to the \"fev format\".\n", - "- Static features either need to be unnecessarily duplicated for each row, or need to be stored in a separate file.\n", - " - This becomes especially problematic if static features contain information such as images or text documents.\n", - "- Dealing with large datasets is challenging.\n", - " - Obtaining individual time series requires an expensive `groupby` operation.\n", - " - When sharding, we need custom logic to ensure that rows corresponding to the same `item_id` are kept in the same shard.\n", - " - We either constantly need to ensure that the rows are ordered chronologically, or need to sort the rows each time the data is used.\n", - "\n", - "In contrast, the \"fev format\" can easily distinguish between static & dynamic features using the `datasets.Features` metadata. Since one time series corresponds to a single row, it has no problems with sharding.\n", - "\n", - "**GluonTS format** is another popular choice for storing time series data (e.g., used in [LOTSA](https://huggingface.co/datasets/Salesforce/lotsa_data)).\n", - "\n", - "Each entry is encoded as a dictionary with a pre-defined schema shared across all datasets\n", - "\n", - "```json\n", - "{\n", - " \"start\": \"2024-01-01\", \n", - " \"freq\": \"1D\", \n", - " \"target\": [0.5, 1.2, ...], \n", - " \"feat_dynamic_real\": [[...]], \n", - " \"past_feat_dynamic_real\": [[...]], \n", - " \"feat_static_cat\": [...], \n", - " \"feat_static_real\": [...], \n", - " ...,\n", - "}\n", - "```\n", - "This format is efficient and can be immediately consumed by some ML models. However, it also has some drawbacks compared to the \"fev format\".\n", - "- It hard-codes the forecasting task definition into the dataset (i.e., which columns are used as target, which columns are known in the future vs. only in the past). This often leads to data duplication.\n", - " - For example, consider a dataset that contains energy demand & weather time series for some region. If you want to evaluate a model in 3 settings (weather forecast is available for the future; weather is known only in the past; weather is ignored, only historic demand is available), you will need to create 3 copies of the dataset.\n", - "- It only supports numeric data, so it's not future-proof.\n", - " - Incorporating multimodal data such images or text into time series forecasting tasks [is becoming popular](https://arxiv.org/abs/2410.18959). The GluonTS format cannot natively handle that.\n", - "- It relies on pandas `freq` aliases staying consistent over time - which is something that we [cannot take for granted](https://pandas.pydata.org/docs/whatsnew/v2.2.0.html#deprecate-aliases-m-q-y-etc-in-favour-of-me-qe-ye-etc-for-offsets).\n", - "\n", - "The \"fev format\" does not hard-code the task properties, natively deals with multimodal data and is not tied to the pandas versions." - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## How to convert my dataset into a format expected by `fev`?\n", - "If your dataset is stored in a long-format data frame, you can convert into an fev-compatible `datasets.Dataset` object using a helper function" - ] - }, - { - "cell_type": "code", - "execution_count": 5, - "metadata": {}, - "outputs": [], - "source": [ - "import pandas as pd\n", - "import fev.utils" - ] - }, - { - "cell_type": "code", - "execution_count": 6, - "metadata": {}, "outputs": [ { "data": { @@ -344,7 +172,7 @@ "4 101 " ] }, - "execution_count": 6, + "execution_count": 2, "metadata": {}, "output_type": "execute_result" } @@ -355,102 +183,83 @@ ] }, { - "cell_type": "code", - "execution_count": 7, + "cell_type": "markdown", "metadata": {}, - "outputs": [ - { - "data": { - "text/plain": [ - "{'item_id': Value(dtype='string', id=None),\n", - " 'product_code': Value(dtype='int64', id=None),\n", - " 'product_category': Value(dtype='string', id=None),\n", - " 'product_subcategory': Value(dtype='string', id=None),\n", - " 'location_code': Value(dtype='int64', id=None),\n", - " 'timestamp': Sequence(feature=Value(dtype='timestamp[us]', id=None), length=-1, id=None),\n", - " 'scaled_price': Sequence(feature=Value(dtype='float64', id=None), length=-1, id=None),\n", - " 'promotion_email': Sequence(feature=Value(dtype='float64', id=None), length=-1, id=None),\n", - " 'promotion_homepage': Sequence(feature=Value(dtype='float64', id=None), length=-1, id=None),\n", - " 'unit_sales': Sequence(feature=Value(dtype='float64', id=None), length=-1, id=None)}" - ] - }, - "execution_count": 7, - "metadata": {}, - "output_type": "execute_result" - } - ], "source": [ - "ds = fev.utils.convert_long_df_to_hf_dataset(df, id_column=\"item_id\", static_columns=[\"product_code\", \"product_category\", \"product_subcategory\", \"location_code\"])\n", - "ds.features" + "You have two ways to use a long-format DataFrame:\n", + "\n", + "**Option A** — save to parquet and point `fev.Task` at the file. `fev` detects long-format data automatically and converts it on load.\n", + "\n", + "```python\n", + "df.to_parquet(\"data.parquet\")\n", + "\n", + "task = fev.Task(\n", + " dataset_path=\"data.parquet\",\n", + " id_column=\"item_id\",\n", + " timestamp_column=\"timestamp\",\n", + " target=\"unit_sales\",\n", + " known_dynamic_columns=[\"scaled_price\", \"promotion_email\"],\n", + " static_columns=[\"product_code\", \"product_category\", \"product_subcategory\", \"location_code\"],\n", + " horizon=8,\n", + ")\n", + "```\n", + "\n", + "**Option B** — convert in-memory with `fev.utils.convert_long_df_to_hf_dataset` (useful when you want to inspect, save, or push the converted dataset to the Hugging Face Hub):" ] }, { "cell_type": "code", - "execution_count": 8, + "execution_count": 3, "metadata": {}, "outputs": [ { "data": { "text/plain": [ - "{'item_id': np.str_('1062_101'),\n", - " 'product_code': np.int64(1062),\n", - " 'product_category': np.str_('Beverages'),\n", - " 'product_subcategory': np.str_('Fruit Juice Mango'),\n", - " 'location_code': np.int64(101),\n", - " 'timestamp': array(['2018-01-01T00:00:00.000000', '2018-01-08T00:00:00.000000',\n", - " '2018-01-15T00:00:00.000000', '2018-01-22T00:00:00.000000',\n", - " '2018-01-29T00:00:00.000000', '2018-02-05T00:00:00.000000',\n", - " '2018-02-12T00:00:00.000000', '2018-02-19T00:00:00.000000',\n", - " '2018-02-26T00:00:00.000000', '2018-03-05T00:00:00.000000',\n", - " '2018-03-12T00:00:00.000000', '2018-03-19T00:00:00.000000',\n", - " '2018-03-26T00:00:00.000000', '2018-04-02T00:00:00.000000',\n", - " '2018-04-09T00:00:00.000000', '2018-04-16T00:00:00.000000',\n", - " '2018-04-23T00:00:00.000000', '2018-04-30T00:00:00.000000',\n", - " '2018-05-07T00:00:00.000000', '2018-05-14T00:00:00.000000',\n", - " '2018-05-21T00:00:00.000000', '2018-05-28T00:00:00.000000',\n", - " '2018-06-04T00:00:00.000000', '2018-06-11T00:00:00.000000',\n", - " '2018-06-18T00:00:00.000000', '2018-06-25T00:00:00.000000',\n", - " '2018-07-02T00:00:00.000000', '2018-07-09T00:00:00.000000',\n", - " '2018-07-16T00:00:00.000000', '2018-07-23T00:00:00.000000',\n", - " '2018-07-30T00:00:00.000000'], dtype='datetime64[us]'),\n", - " 'scaled_price': array([0.8791298 , 0.99451727, 1.005513 , 1. , 0.88330877,\n", - " 0.8728938 , 0.8780195 , 0.8884807 , 0.9889777 , 1.0055426 ,\n", - " 0.98920846, 1.0054836 , 1. , 1. , 1.011026 ,\n", - " 0.9945471 , 0.99454623, 1. , 0.99451727, 1. ,\n", - " 1. , 0.9945471 , 1.011026 , 1.0054251 , 1.0054537 ,\n", - " 1. , 1.005513 , 1. , 1. , 1.0123464 ,\n", - " 1.006248 ], dtype=float32),\n", - " 'promotion_email': array([0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,\n", - " 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],\n", - " dtype=float32),\n", - " 'promotion_homepage': array([0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,\n", - " 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],\n", - " dtype=float32),\n", - " 'unit_sales': array([636., 123., 391., 339., 661., 513., 555., 485., 339., 230., 202.,\n", - " 420., 418., 581., 472., 230., 176., 242., 270., 285., 258., 285.,\n", - " 377., 339., 310., 231., 393., 447., 486., 284., 392.],\n", - " dtype=float32)}" + "{'item_id': Value('string'),\n", + " 'product_code': Value('int64'),\n", + " 'product_category': Value('string'),\n", + " 'product_subcategory': Value('string'),\n", + " 'location_code': Value('int64'),\n", + " 'timestamp': List(Value('timestamp[ns]')),\n", + " 'scaled_price': List(Value('float64')),\n", + " 'promotion_email': List(Value('float64')),\n", + " 'promotion_homepage': List(Value('float64')),\n", + " 'unit_sales': List(Value('float64'))}" ] }, - "execution_count": 8, + "execution_count": 3, "metadata": {}, "output_type": "execute_result" } ], "source": [ - "ds.with_format(\"numpy\")[0]" + "ds = fev.utils.convert_long_df_to_hf_dataset(\n", + " df,\n", + " id_column=\"item_id\",\n", + " timestamp_column=\"timestamp\",\n", + " static_columns=[\"product_code\", \"product_category\", \"product_subcategory\", \"location_code\"],\n", + ")\n", + "ds.features" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "To verify that the dataset was converted correctly, use the `fev.utils.validate_time_series_dataset` method." + "You can save the converted dataset to a parquet file or push it to the Hugging Face Hub for reuse:\n", + "\n", + "```python\n", + "ds.to_parquet(\"data.parquet\")\n", + "# or\n", + "ds.push_to_hub(repo_id=YOUR_REPO_ID, config_name=CONFIG_NAME)\n", + "```\n", + "\n", + "To verify that a dataset is well-formed, use `fev.utils.validate_time_series_dataset`:" ] }, { "cell_type": "code", - "execution_count": 9, + "execution_count": 4, "metadata": {}, "outputs": [], "source": [ @@ -461,45 +270,137 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "You can save the dataset to disk as a parquet file" + "## Data schema\n", + "\n", + "`fev` expects time series datasets to follow this schema:\n", + "\n", + "- each row represents a single (univariate or multivariate) time series\n", + "- each row contains\n", + " 1. a field of type `Sequence(timestamp)` with the **timestamps** of observations\n", + " 2. at least one field of type `Sequence(float)` that can be used as the **target** time series\n", + " 3. a field of type `string` with the **unique ID** of the time series\n", + "- all fields of type `Sequence` have the same length\n", + "- additional fields are allowed:\n", + " - extra dynamic columns of type `Sequence` (covariates)\n", + " - static features of type `Value`, `Image`, or any other Hugging Face `Features` type\n", + "\n", + "The names of the ID, timestamp, and target fields are arbitrary and configured on the `fev.Task`.\n", + "\n", + "Note that the dataset itself contains no information about the forecasting task. The same dataset can be reused across multiple tasks (e.g. different targets, horizons, or covariate setups) without data duplication.\n", + "\n", + "When you load a long-format DataFrame (Option A or B above), `fev` produces a dataset matching this schema. Here is an example:" ] }, { "cell_type": "code", - "execution_count": 10, + "execution_count": 5, "metadata": {}, - "outputs": [], + "outputs": [ + { + "data": { + "text/plain": [ + "{'id': Value('string'),\n", + " 'timestamp': List(Value('timestamp[ms]')),\n", + " 'target': List(Value('float64')),\n", + " 'city': Value('string'),\n", + " 'station': Value('string'),\n", + " 'measurement': Value('string')}" + ] + }, + "execution_count": 5, + "metadata": {}, + "output_type": "execute_result" + } + ], "source": [ - "# ds.to_parquet(DATASET_PATH)" + "ds = datasets.load_dataset(\"autogluon/chronos_datasets\", \"monash_kdd_cup_2018\", split=\"train\")\n", + "ds.set_format(\"numpy\")\n", + "ds.features" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "{'id': np.str_('T000000'),\n", + " 'timestamp': array(['2017-01-01T14:00:00.000', '2017-01-01T15:00:00.000',\n", + " '2017-01-01T16:00:00.000', ..., '2018-03-31T13:00:00.000',\n", + " '2018-03-31T14:00:00.000', '2018-03-31T15:00:00.000'],\n", + " dtype='datetime64[ms]'),\n", + " 'target': array([453., 417., 395., ..., 132., 158., 118.], dtype=float32),\n", + " 'city': np.str_('Beijing'),\n", + " 'station': np.str_('aotizhongxin_aq'),\n", + " 'measurement': np.str_('PM2.5')}" + ] + }, + "execution_count": 6, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "ds[0]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "Or directly push it to HF Hub" + "## Why the native format?\n", + "\n", + "`fev` stores datasets using the schema above (we'll call this the **native format**) rather than long-format DataFrames. The main reason is **flexibility**:\n", + "\n", + "- **Multimodal static features.** Static metadata is stored once per series, with full type information from `datasets.Features`. This naturally accommodates non-tabular content like images, text documents, or embeddings — things that don't fit cleanly into a long-format DataFrame.\n", + "- **Clean static/dynamic separation.** `Sequence` vs `Value` types make it explicit which columns vary over time and which don't, with no need to duplicate static values across rows or maintain a separate file.\n", + "- **Sharding-friendly.** One time series per row means rows can be sharded freely without splitting a single series across files.\n", + "- **No expensive groupby.** Individual time series are accessed by row index, not by grouping the data on the fly.\n", + "- **Future-proof.** New feature types from the Hugging Face `datasets` ecosystem (audio, video, etc.) can be incorporated without changing the format." ] }, { - "cell_type": "code", - "execution_count": 11, + "cell_type": "markdown", "metadata": {}, - "outputs": [], "source": [ - "# ds.push_to_hub(repo_id=YOUR_REPO_ID, config_name=CONFIG_NAME)" + "## What is not supported: the GluonTS format\n", + "\n", + "The GluonTS format is another popular choice for storing time series data (e.g. used in [LOTSA](https://huggingface.co/datasets/Salesforce/lotsa_data)). Each entry follows a fixed schema:\n", + "\n", + "```json\n", + "{\n", + " \"start\": \"2024-01-01\",\n", + " \"freq\": \"1D\",\n", + " \"target\": [0.5, 1.2, ...],\n", + " \"feat_dynamic_real\": [[...]],\n", + " \"past_feat_dynamic_real\": [[...]],\n", + " \"feat_static_cat\": [...],\n", + " \"feat_static_real\": [...]\n", + "}\n", + "```\n", + "\n", + "`fev` does not support this format, for the following reasons:\n", + "\n", + "- **Hard-codes the task definition.** The schema bakes in which columns are the target, which are known in the future, and which are only known in the past. Evaluating the same data under different assumptions (e.g. with vs. without future weather covariates) requires duplicating the dataset.\n", + "- **Numeric-only.** Multimodal static features (text, images) cannot be represented natively, even though they are increasingly relevant for time series tasks ([Multimodal Time Series Analysis: A Tutorial and Survey](https://arxiv.org/abs/2410.18959)).\n", + "- **Tied to pandas frequency aliases.** The `freq` field relies on pandas frequency strings, [which have changed over time](https://pandas.pydata.org/docs/whatsnew/v2.2.0.html#deprecate-aliases-m-q-y-etc-in-favour-of-me-qe-ye-etc-for-offsets) and are not stable across versions." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "You can then use the path to your dataset when creating a `fev.Task`." + "---\n", + "\n", + "To convert a `datasets.Dataset` into formats consumed by other forecasting libraries, see [`04-adapters.ipynb`](./04-adapters.ipynb)." ] } ], "metadata": { "kernelspec": { - "display_name": "fev", + "display_name": ".venv", "language": "python", "name": "python3" }, @@ -513,7 +414,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.11.10" + "version": "3.11.15" } }, "nbformat": 4, diff --git a/src/fev/task.py b/src/fev/task.py index 116fc13..8397441 100644 --- a/src/fev/task.py +++ b/src/fev/task.py @@ -646,7 +646,29 @@ def _load_dataset( ) # Since we loaded with split=TRAIN and streaming=False, ds is a datasets.Dataset object assert isinstance(ds, datasets.Dataset) - ds.set_format("numpy") + + is_long_format = ( + self.id_column in ds.features + and self.timestamp_column in ds.features + and all(isinstance(feat, datasets.Value) for feat in ds.features.values()) + ) + if is_long_format: + if not ds.features[self.timestamp_column].dtype.startswith("timestamp"): + raise ValueError( + f"timestamp_column {self.timestamp_column!r} must have a timestamp dtype, " + f"got {ds.features[self.timestamp_column].dtype}" + ) + logger.debug( + f"Loaded dataset is in long format - converting to fev format with " + f"id_column={self.id_column!r}, timestamp_column={self.timestamp_column!r}, " + f"static_columns={self.static_columns!r}" + ) + ds = utils.convert_long_table_to_hf_dataset( + ds.data.table, + id_column=self.id_column, + timestamp_column=self.timestamp_column, + static_columns=self.static_columns, + ) required_columns = self.known_dynamic_columns + self.past_dynamic_columns + self.static_columns if self.generate_univariate_targets_from is None: @@ -683,10 +705,13 @@ def _load_dataset( num_proc=num_proc, ) - # Ensure that IDs are sorted alphabetically for consistent ordering + # Ensure that IDs are sorted alphabetically for consistent ordering. + # Sort directly on the underlying pyarrow Table to avoid `Dataset.sort` materializing + # an indices column that would later need a flatten_indices pass. if ds.features[self.id_column].dtype != "string": ds = ds.cast_column(self.id_column, datasets.Value("string")) - ds = ds.sort(self.id_column) + sorted_table = ds.data.table.sort_by([(self.id_column, "ascending")]) + ds = datasets.Dataset(sorted_table, info=ds.info, split=ds.split).with_format("numpy") self._freq = pd.infer_freq(ds[0][self.timestamp_column]) if self._freq is None: raise ValueError("Dataset contains irregular timestamps") diff --git a/src/fev/utils.py b/src/fev/utils.py index cb0318b..c4a658a 100644 --- a/src/fev/utils.py +++ b/src/fev/utils.py @@ -2,7 +2,6 @@ import warnings import datasets -import multiprocess as mp import numpy as np import pandas as pd import pyarrow as pa @@ -13,6 +12,7 @@ __all__ = [ "maybe_cache_from_s3", "convert_long_df_to_hf_dataset", + "convert_long_table_to_hf_dataset", "infer_column_types", "validate_time_series_dataset", "generate_univariate_targets_from_multivariate", @@ -159,10 +159,13 @@ def convert_long_df_to_hf_dataset( id_column: str = "id", timestamp_column: str = "timestamp", static_columns: list[str] | None = None, - num_proc: int = DEFAULT_NUM_PROC, ) -> datasets.Dataset: """Convert a long-format pandas DataFrame to a Hugging Face datasets.Dataset object. + Each unique value in `id_column` becomes a single row in the resulting dataset. Columns listed in + `static_columns` (plus `id_column`) become static features (`Value` type), and all remaining columns + become dynamic features (`Sequence` type) with one entry per timestamp. + Parameters ---------- df: @@ -173,25 +176,72 @@ def convert_long_df_to_hf_dataset( Name of the column containing the timestamp of time series observations. static_columns Names of columns that contain static (time-independent) features. - num_proc - Number of processes used to parallelize the computation. """ df[id_column] = df[id_column].astype(str) df[timestamp_column] = pd.to_datetime(df[timestamp_column]) - df = df.sort_values(by=[id_column, timestamp_column]) + table = pa.Table.from_pandas(df, preserve_index=False) + return convert_long_table_to_hf_dataset( + table, + id_column=id_column, + timestamp_column=timestamp_column, + static_columns=static_columns, + ) + + +def convert_long_table_to_hf_dataset( + table: pa.Table, + id_column: str = "id", + timestamp_column: str = "timestamp", + static_columns: list[str] | None = None, +) -> datasets.Dataset: + """Convert a long-format pyarrow Table to an fev-compatible `datasets.Dataset`. + + Each unique value in `id_column` becomes a single row. Columns listed in `static_columns` + (plus `id_column`) become static features (`Value` type), and all remaining columns become + dynamic features (`Sequence` type) with one entry per timestamp. + + Sorts the table by `(id_column, timestamp_column)`, then directly constructs `pa.ListArray` for + dynamic columns and indexes the first row of each group for static columns -- avoiding an + expensive Python-level groupby. + Parameters + ---------- + table + Long-format pyarrow Table containing the data. + id_column + Name of the column containing the unique ID of each time series. + timestamp_column + Name of the column containing the timestamp of time series observations. + static_columns + Names of columns that contain static (time-independent) features. + """ if static_columns is None: static_columns = [] - static_columns = [id_column] + static_columns + static_columns = list(static_columns) + + table = table.sort_by([(id_column, "ascending"), (timestamp_column, "ascending")]) - def process_entry(group: pd.DataFrame) -> dict: - static = group[static_columns].iloc[0].to_dict() - dynamic = group.drop(columns=static_columns).to_dict("list") - return {**static, **dynamic} + id_arr = table[id_column].combine_chunks().to_numpy(zero_copy_only=False) + n = len(id_arr) + if n == 0: + change_points = np.array([], dtype=np.int64) + else: + change_points = np.where(id_arr[:-1] != id_arr[1:])[0] + 1 + offsets_np = np.concatenate([[0], change_points, [n]]).astype(np.int32) + first_row_per_group = pa.array(offsets_np[:-1].astype(np.int64)) + offsets_arr = pa.array(offsets_np) + + static_set = {id_column, *static_columns} + new_columns: dict[str, pa.Array] = {} + for col in [id_column] + static_columns: + new_columns[col] = pc.take(table[col], first_row_per_group) + for col in table.column_names: + if col in static_set: + continue + values = table[col].combine_chunks() + new_columns[col] = pa.ListArray.from_arrays(offsets_arr, values) - with mp.Pool(processes=num_proc) as pool: - entries = pool.map(process_entry, [group for _, group in df.groupby(id_column, sort=False)]) - return datasets.Dataset.from_list(entries) + return datasets.Dataset(pa.table(new_columns)) def generate_fingerprint(dataset: datasets.Dataset, num_rows_to_check: int = 3) -> str | None: diff --git a/test/test_task.py b/test/test_task.py index 5bf8189..3023c08 100644 --- a/test/test_task.py +++ b/test/test_task.py @@ -1,5 +1,6 @@ import datasets import numpy as np +import pandas as pd import pydantic import pytest @@ -317,3 +318,39 @@ def test_when_covariate_columns_are_provided_then_only_correct_columns_are_loade past, future = window.get_input_data() assert set(past.column_names) == set(["id", "timestamp"] + task.target_columns + past_cols + known_cols) assert set(future.column_names) == set(["id", "timestamp"] + known_cols) + + +def test_when_dataset_is_long_format_parquet_then_task_loads_it(tmp_path): + rows = [] + for i in range(3): + for t in range(20): + rows.append( + { + "item_id": f"id_{i}", + "timestamp": pd.Timestamp("2020-01-01") + pd.Timedelta(days=t), + "target": float(i * 100 + t), + "category": ["A", "B", "C"][i], + } + ) + df = pd.DataFrame(rows) + parquet_path = tmp_path / "long.parquet" + df.to_parquet(parquet_path) + + task = fev.Task( + dataset_path=str(parquet_path), + id_column="item_id", + horizon=4, + static_columns=["category"], + ) + ds = task.load_full_dataset() + assert len(ds) == 3 + assert isinstance(ds.features["item_id"], datasets.Value) + assert isinstance(ds.features["category"], datasets.Value) + assert isinstance(ds.features["timestamp"], datasets.Sequence) + assert isinstance(ds.features["target"], datasets.Sequence) + + past, future = task.get_window(0).get_input_data() + assert len(past) == 3 + assert len(future) == 3 + assert len(past[0]["target"]) == 16 + assert len(future[0]["timestamp"]) == 4 diff --git a/test/test_utils.py b/test/test_utils.py index 758a895..8742577 100644 --- a/test/test_utils.py +++ b/test/test_utils.py @@ -375,3 +375,59 @@ def test_when_dataset_has_indices_then_flattened_before_expansion(): assert len(result) == 4 assert result[0]["id"] == "A_X" assert result[2]["id"] == "C_X" + + +# convert_long_df_to_hf_dataset tests + + +def _make_long_df(num_ids: int = 3, length: int = 5) -> pd.DataFrame: + rows = [] + for i in range(num_ids): + for t in range(length): + rows.append( + { + "item_id": f"id_{i}", + "timestamp": pd.Timestamp("2020-01-01") + pd.Timedelta(days=t), + "target": float(i * 100 + t), + "category": ["A", "B", "C"][i % 3], + } + ) + return pd.DataFrame(rows) + + +def test_when_long_df_provided_then_dynamic_and_static_columns_are_inferred(): + df = _make_long_df(num_ids=3, length=4) + ds = fev.utils.convert_long_df_to_hf_dataset(df, id_column="item_id", static_columns=["category"]) + + assert len(ds) == 3 + assert isinstance(ds.features["item_id"], datasets.Value) + assert isinstance(ds.features["category"], datasets.Value) + assert isinstance(ds.features["timestamp"], datasets.Sequence) + assert isinstance(ds.features["target"], datasets.Sequence) + assert ds[0]["item_id"] == "id_0" + assert list(ds[0]["target"]) == [0.0, 1.0, 2.0, 3.0] + assert ds[1]["item_id"] == "id_1" + assert list(ds[1]["target"]) == [100.0, 101.0, 102.0, 103.0] + + +def test_when_long_df_is_unsorted_then_output_is_sorted_by_id_and_timestamp(): + df = _make_long_df(num_ids=2, length=3) + df_shuffled = df.sample(frac=1.0, random_state=0).reset_index(drop=True) + ds = fev.utils.convert_long_df_to_hf_dataset(df_shuffled, id_column="item_id") + assert list(ds[0]["target"]) == [0.0, 1.0, 2.0] + assert list(ds[1]["target"]) == [100.0, 101.0, 102.0] + + +def test_when_long_df_is_provided_then_validate_time_series_dataset_passes(): + df = _make_long_df(num_ids=3, length=10) + ds = fev.utils.convert_long_df_to_hf_dataset(df, id_column="item_id", static_columns=["category"]) + fev.utils.validate_time_series_dataset(ds, id_column="item_id", timestamp_column="timestamp") + + +def test_when_long_df_provided_then_pyarrow_path_matches_old_groupby_output(): + # Sanity check: independently constructed reference matches the new pyarrow path. + df = _make_long_df(num_ids=4, length=6) + ds = fev.utils.convert_long_df_to_hf_dataset(df, id_column="item_id", static_columns=["category"]) + expected_per_id = {f"id_{i}": [float(i * 100 + t) for t in range(6)] for i in range(4)} + for row in ds: + assert list(row["target"]) == expected_per_id[row["item_id"]]