diff --git a/datajunction-server/datajunction_server/api/preaggregations.py b/datajunction-server/datajunction_server/api/preaggregations.py
index b1c6ee673..bbab021f0 100644
--- a/datajunction-server/datajunction_server/api/preaggregations.py
+++ b/datajunction-server/datajunction_server/api/preaggregations.py
@@ -31,6 +31,7 @@
from datajunction_server.database.column import Column
from datajunction_server.database.availabilitystate import AvailabilityState
from datajunction_server.database.dimensionlink import DimensionLink
+from datajunction_server.database.measure import FrozenMeasure
from datajunction_server.database.preaggregation import (
PreAggregation,
VALID_PREAGG_STRATEGIES,
@@ -43,13 +44,15 @@
DJQueryServiceClientException,
)
from datajunction_server.internal.access.authentication.http import SecureAPIRouter
-from datajunction_server.models.node_type import NodeNameVersion
+from datajunction_server.models.node_type import NodeNameVersion, NodeType
from datajunction_server.models.dialect import Dialect
from datajunction_server.models.materialization import MaterializationStrategy
from datajunction_server.models.preaggregation import (
BackfillRequest,
BackfillInput,
BackfillResponse,
+ BulkDeactivateWorkflowsResponse,
+ DeactivatedWorkflowInfo,
GrainMode,
DEFAULT_SCHEDULE,
PlanPreAggregationsRequest,
@@ -59,12 +62,13 @@
PreAggMaterializationInput,
UpdatePreAggregationAvailabilityRequest,
WorkflowResponse,
+ WorkflowStatus,
WorkflowUrl,
)
from datajunction_server.construction.build_v3.preagg_matcher import (
get_temporal_partitions,
)
-from datajunction_server.models.decompose import PreAggMeasure
+from datajunction_server.models.decompose import MetricRef, PreAggMeasure
from datajunction_server.models.node_type import NodeType
from datajunction_server.models.query import ColumnMetadata, V3ColumnMetadata
from datajunction_server.service_clients import QueryServiceClient
@@ -131,15 +135,54 @@ async def _get_upstream_source_tables(
return []
-def _preagg_to_info(preagg: PreAggregation) -> PreAggregationInfo:
+async def _preagg_to_info(
+ preagg: PreAggregation,
+ session: AsyncSession,
+) -> PreAggregationInfo:
"""Convert a PreAggregation ORM object to a PreAggregationInfo response model."""
+ # Look up related metrics from FrozenMeasure relationships for each measure
+ measures_with_metrics: list[PreAggMeasure] = []
+ all_related_metrics: set[str] = set()
+
+ # Fetch all frozen measures in a single query to avoid N+1
+ measure_names = [measure.name for measure in preagg.measures or []]
+ frozen_measures = await FrozenMeasure.get_by_names(session, measure_names)
+ frozen_measures_map = {fm.name: fm for fm in frozen_measures}
+
+ for measure in preagg.measures or []:
+ # Find which metrics use this measure
+ measure_metrics: list[MetricRef] = []
+ frozen = frozen_measures_map.get(measure.name)
+ if frozen:
+ for nr in frozen.used_by_node_revisions:
+ if nr.type == NodeType.METRIC: # pragma: no branch
+ measure_metrics.append(
+ MetricRef(name=nr.name, display_name=nr.display_name),
+ )
+ all_related_metrics.add(nr.name)
+
+ # Create new PreAggMeasure with used_by_metrics populated
+ measures_with_metrics.append(
+ PreAggMeasure(
+ name=measure.name,
+ expression=measure.expression,
+ aggregation=measure.aggregation,
+ merge=measure.merge,
+ rule=measure.rule,
+ expr_hash=measure.expr_hash,
+ used_by_metrics=sorted(measure_metrics, key=lambda m: m.name)
+ if measure_metrics
+ else None,
+ ),
+ )
+
return PreAggregationInfo(
id=preagg.id,
node_revision_id=preagg.node_revision_id,
node_name=preagg.node_revision.name,
node_version=preagg.node_revision.version,
grain_columns=preagg.grain_columns,
- measures=preagg.measures,
+ measures=measures_with_metrics,
columns=preagg.columns,
sql=preagg.sql,
grain_group_hash=preagg.grain_group_hash,
@@ -151,6 +194,7 @@ def _preagg_to_info(preagg: PreAggregation) -> PreAggregationInfo:
status=preagg.status,
materialized_table_ref=preagg.materialized_table_ref,
max_partition=preagg.max_partition,
+ related_metrics=sorted(all_related_metrics) if all_related_metrics else None,
created_at=preagg.created_at,
updated_at=preagg.updated_at,
)
@@ -195,6 +239,10 @@ async def list_preaggregations(
default=None,
description="Filter by status: 'pending' or 'active'",
),
+ include_stale: bool = Query(
+ default=False,
+ description="Include pre-aggs from older node versions (stale)",
+ ),
limit: int = Query(default=50, ge=1, le=100),
offset: int = Query(default=0, ge=0),
*,
@@ -246,8 +294,16 @@ async def list_preaggregations(
f"Version '{node_version}' not found for node '{node_name}'",
)
stmt = stmt.where(PreAggregation.node_revision_id == target_revision.id)
+ elif include_stale:
+ # Include all revisions for this node
+ all_revisions_stmt = select(NodeRevision.id).where(
+ NodeRevision.node_id == node.id,
+ )
+ stmt = stmt.where(
+ PreAggregation.node_revision_id.in_(all_revisions_stmt),
+ )
else:
- # Use latest version
+ # Use latest version only (default)
stmt = stmt.where(PreAggregation.node_revision_id == node.current.id)
# Filter by grain_group_hash (direct lookup)
@@ -369,8 +425,8 @@ async def list_preaggregations(
)
preaggs = [p for p in preaggs if p.status == status]
- # Convert to response models
- items = [_preagg_to_info(p) for p in preaggs]
+ # Convert to response models (with related metrics lookup)
+ items = [await _preagg_to_info(p, session) for p in preaggs]
return PreAggregationListResponse(
items=items,
@@ -409,7 +465,7 @@ async def get_preaggregation(
if not preagg:
raise DJDoesNotExistException(f"Pre-aggregation with ID {preagg_id} not found")
- return _preagg_to_info(preagg)
+ return await _preagg_to_info(preagg, session)
# =============================================================================
@@ -601,7 +657,7 @@ async def plan_preaggregations(
loaded_preaggs = list(result.scalars().unique().all())
return PlanPreAggregationsResponse(
- preaggs=[_preagg_to_info(p) for p in loaded_preaggs],
+ preaggs=[await _preagg_to_info(p, session) for p in loaded_preaggs],
)
@@ -803,7 +859,7 @@ async def materialize_preaggregation(
labeled_urls.append(WorkflowUrl(label="workflow", url=url))
preagg.workflow_urls = labeled_urls
- preagg.workflow_status = "active"
+ preagg.workflow_status = WorkflowStatus.ACTIVE
# Also update schedule if it wasn't set (using default)
if not preagg.schedule:
preagg.schedule = schedule
@@ -817,7 +873,7 @@ async def materialize_preaggregation(
# Return pre-agg info with workflow URLs
await session.refresh(preagg, ["node_revision", "availability"])
- return _preagg_to_info(preagg)
+ return await _preagg_to_info(preagg, session)
class UpdatePreAggregationConfigRequest(BaseModel):
@@ -888,7 +944,7 @@ async def update_preaggregation_config(
preagg.lookback_window,
)
- return _preagg_to_info(preagg)
+ return await _preagg_to_info(preagg, session)
# =============================================================================
@@ -980,6 +1036,144 @@ async def delete_preagg_workflow(
)
+@router.delete(
+ "/preaggs/workflows",
+ response_model=BulkDeactivateWorkflowsResponse,
+ name="Bulk Deactivate Workflows",
+)
+async def bulk_deactivate_preagg_workflows(
+ node_name: str = Query(
+ description="Node name to deactivate workflows for (required)",
+ ),
+ stale_only: bool = Query(
+ default=False,
+ description="If true, only deactivate workflows for stale pre-aggs "
+ "(pre-aggs built for non-current node versions)",
+ ),
+ *,
+ session: AsyncSession = Depends(get_session),
+ request: Request,
+ query_service_client: QueryServiceClient = Depends(get_query_service_client),
+) -> BulkDeactivateWorkflowsResponse:
+ """
+ Bulk deactivate workflows for pre-aggregations of a node.
+
+ This is useful for cleaning up stale pre-aggregations after a node
+ has been updated. When stale_only=true, only deactivates workflows
+ for pre-aggs that were built for older node versions.
+
+ Staleness is determined by comparing the pre-agg's node_revision_id
+ to the node's current revision.
+ """
+ # Get the node and its current revision
+ node = await Node.get_by_name(
+ session,
+ node_name,
+ options=[
+ load_only(Node.id),
+ joinedload(Node.current).load_only(NodeRevision.id),
+ ],
+ )
+ if not node:
+ raise DJDoesNotExistException(f"Node '{node_name}' not found")
+
+ current_revision_id = node.current.id if node.current else None
+
+ # Build query for pre-aggs with active workflows
+ stmt = (
+ select(PreAggregation)
+ .options(joinedload(PreAggregation.node_revision))
+ .join(PreAggregation.node_revision)
+ .where(
+ NodeRevision.node_id == node.id,
+ PreAggregation.workflow_status == WorkflowStatus.ACTIVE,
+ )
+ )
+
+ # If stale_only, filter to non-current revisions
+ if stale_only and current_revision_id:
+ stmt = stmt.where(PreAggregation.node_revision_id != current_revision_id)
+
+ result = await session.execute(stmt)
+ preaggs = result.scalars().all()
+
+ if not preaggs:
+ return BulkDeactivateWorkflowsResponse(
+ deactivated_count=0,
+ deactivated=[],
+ skipped_count=0,
+ message="No active workflows found matching criteria",
+ )
+
+ deactivated = []
+ skipped_count = 0
+ request_headers = dict(request.headers)
+
+ for preagg in preaggs:
+ if not preagg.workflow_urls: # pragma: no cover
+ skipped_count += 1
+ continue
+
+ # Compute output_table for workflow identification
+ output_table = _compute_output_table(
+ preagg.node_revision.name,
+ preagg.grain_group_hash,
+ )
+
+ # Extract workflow name from URLs if available
+ workflow_name = None
+ if preagg.workflow_urls: # pragma: no branch
+ for wf_url in preagg.workflow_urls: # pragma: no branch
+ if (
+ hasattr(wf_url, "label") and wf_url.label == "scheduled"
+ ): # pragma: no branch
+ # Extract workflow name from URL path
+ workflow_name = wf_url.url.split("/")[-1] if wf_url.url else None
+ break
+
+ try:
+ query_service_client.deactivate_preagg_workflow(
+ output_table,
+ request_headers=request_headers,
+ )
+
+ # Clear workflow state
+ preagg.strategy = None
+ preagg.schedule = None
+ preagg.lookback_window = None
+ preagg.workflow_urls = None
+ preagg.workflow_status = None
+
+ deactivated.append(
+ DeactivatedWorkflowInfo(
+ id=preagg.id,
+ workflow_name=workflow_name,
+ ),
+ )
+
+ _logger.info(
+ "Bulk deactivate: deactivated workflow for preagg_id=%s",
+ preagg.id,
+ )
+ except Exception as e: # pragma: no cover
+ _logger.warning(
+ "Bulk deactivate: failed to deactivate workflow for preagg_id=%s: %s",
+ preagg.id,
+ str(e),
+ )
+ # Continue with other pre-aggs even if one fails
+
+ await session.commit()
+
+ return BulkDeactivateWorkflowsResponse(
+ deactivated_count=len(deactivated),
+ deactivated=deactivated,
+ skipped_count=skipped_count,
+ message=f"Deactivated {len(deactivated)} workflow(s) for node '{node_name}'"
+ + (" (stale only)" if stale_only else ""),
+ )
+
+
# =============================================================================
# Backfill & Run Endpoints
# =============================================================================
@@ -1206,4 +1400,4 @@ async def update_preaggregation_availability(
await session.commit()
await session.refresh(preagg, ["node_revision", "availability"])
- return _preagg_to_info(preagg)
+ return await _preagg_to_info(preagg, session)
diff --git a/datajunction-server/datajunction_server/database/measure.py b/datajunction-server/datajunction_server/database/measure.py
index 3e74f6026..d4eb5dce2 100644
--- a/datajunction-server/datajunction_server/database/measure.py
+++ b/datajunction-server/datajunction_server/database/measure.py
@@ -151,6 +151,27 @@ async def get_by_name(
result = await session.execute(statement)
return result.unique().scalar_one_or_none()
+ @classmethod
+ async def get_by_names(
+ cls,
+ session: AsyncSession,
+ names: list[str],
+ ) -> list["FrozenMeasure"]:
+ """
+ Get multiple measures by names in a single query.
+ """
+ if not names:
+ return [] # pragma: no cover
+ statement = (
+ select(FrozenMeasure)
+ .where(FrozenMeasure.name.in_(names))
+ .options(
+ selectinload(FrozenMeasure.used_by_node_revisions),
+ )
+ )
+ result = await session.execute(statement)
+ return list(result.unique().scalars().all())
+
@classmethod
async def find_by(
cls,
diff --git a/datajunction-server/datajunction_server/internal/nodes.py b/datajunction-server/datajunction_server/internal/nodes.py
index 26ff5d00f..3a81b25a2 100644
--- a/datajunction-server/datajunction_server/internal/nodes.py
+++ b/datajunction-server/datajunction_server/internal/nodes.py
@@ -2786,6 +2786,12 @@ async def revalidate_node(
await session.commit()
await session.refresh(node.current) # type: ignore
await session.refresh(node, ["current"])
+
+ # For metric nodes, derive frozen measures (ensures they exist even for
+ # metrics created via deployment or updated after initial creation)
+ if current_node_revision.type == NodeType.METRIC and background_tasks:
+ background_tasks.add_task(derive_frozen_measures, node.current.id) # type: ignore
+
return node_validator
diff --git a/datajunction-server/datajunction_server/models/decompose.py b/datajunction-server/datajunction_server/models/decompose.py
index 04653d480..48db5fc9f 100644
--- a/datajunction-server/datajunction_server/models/decompose.py
+++ b/datajunction-server/datajunction_server/models/decompose.py
@@ -26,6 +26,13 @@ class Aggregability(StrEnum):
NONE = "none"
+class MetricRef(BaseModel):
+ """Reference to a metric with name and display name."""
+
+ name: str
+ display_name: str | None = None
+
+
class AggregationRule(BaseModel):
"""
The aggregation rule for the metric component.
@@ -98,6 +105,7 @@ class PreAggMeasure(MetricComponent):
"""
expr_hash: str | None = None # Hash of expression for identity matching
+ used_by_metrics: list[MetricRef] | None = None # Metrics that use this measure
class DecomposedMetric(BaseModel):
diff --git a/datajunction-server/datajunction_server/models/node.py b/datajunction-server/datajunction_server/models/node.py
index a94b03037..504014aa3 100644
--- a/datajunction-server/datajunction_server/models/node.py
+++ b/datajunction-server/datajunction_server/models/node.py
@@ -870,7 +870,6 @@ def flatten_current(
for k, v in current_dict.items():
final_dict[k] = v
- print("final_dict", final_dict)
final_dict["dimension_links"] = [
link
for link in final_dict["dimension_links"] # type: ignore
diff --git a/datajunction-server/datajunction_server/models/preaggregation.py b/datajunction-server/datajunction_server/models/preaggregation.py
index 56913517c..3defb169f 100644
--- a/datajunction-server/datajunction_server/models/preaggregation.py
+++ b/datajunction-server/datajunction_server/models/preaggregation.py
@@ -17,6 +17,13 @@
from datajunction_server.models.query import V3ColumnMetadata
+class WorkflowStatus(StrEnum):
+ """Status of a pre-aggregation workflow."""
+
+ ACTIVE = "active"
+ PAUSED = "paused"
+
+
class WorkflowUrl(BaseModel):
"""A labeled workflow URL for scheduler-agnostic display."""
@@ -150,13 +157,18 @@ class PreAggregationInfo(BaseModel):
# Workflow state (persisted)
workflow_urls: Optional[List[WorkflowUrl]] = None # Labeled workflow URLs
- workflow_status: Optional[str] = None # "active" | "paused" | None
+ workflow_status: Optional[str] = (
+ None # WorkflowStatus.ACTIVE | WorkflowStatus.PAUSED | None
+ )
# Availability (derived from AvailabilityState)
status: str = "pending" # "pending" | "running" | "active"
materialized_table_ref: Optional[str] = None
max_partition: Optional[List[str]] = None
+ # Related metrics (computed from FrozenMeasure relationships)
+ related_metrics: Optional[List[str]] = None # Metric names that use these measures
+
# Metadata
created_at: datetime
updated_at: Optional[datetime] = None
@@ -330,6 +342,36 @@ class WorkflowResponse(BaseModel):
)
+class DeactivatedWorkflowInfo(BaseModel):
+ """Info about a single deactivated workflow."""
+
+ id: int = Field(description="Pre-aggregation ID")
+ workflow_name: Optional[str] = Field(
+ default=None,
+ description="Name of the deactivated workflow",
+ )
+
+
+class BulkDeactivateWorkflowsResponse(BaseModel):
+ """Response model for bulk workflow deactivation."""
+
+ deactivated_count: int = Field(
+ description="Number of workflows successfully deactivated",
+ )
+ deactivated: List[DeactivatedWorkflowInfo] = Field(
+ default_factory=list,
+ description="Details of each deactivated workflow",
+ )
+ skipped_count: int = Field(
+ default=0,
+ description="Number of pre-aggs skipped (no active workflow)",
+ )
+ message: Optional[str] = Field(
+ default=None,
+ description="Additional information about the operation",
+ )
+
+
# =============================================================================
# Backfill Models
# =============================================================================
diff --git a/datajunction-server/tests/api/preaggregations_test.py b/datajunction-server/tests/api/preaggregations_test.py
index ee14260e6..b619213a5 100644
--- a/datajunction-server/tests/api/preaggregations_test.py
+++ b/datajunction-server/tests/api/preaggregations_test.py
@@ -454,6 +454,117 @@ async def test_list_preaggs_invalid_node_version(self, client_with_preaggs):
assert response.status_code == 404
assert "Version" in response.json()["message"]
+ @pytest.mark.asyncio
+ async def test_list_preaggs_include_stale(self, client_with_preaggs):
+ """
+ Test include_stale parameter returns pre-aggs from all node versions.
+
+ This test:
+ 1. Gets current pre-aggs for a node
+ 2. Updates the node to create a new version (making existing pre-aggs stale)
+ 3. Creates new pre-aggs on the new version
+ 4. Verifies include_stale=false (default) only returns current version pre-aggs
+ 5. Verifies include_stale=true returns pre-aggs from all versions
+ """
+ client = client_with_preaggs["client"]
+
+ # Get current version and pre-aggs for v3.order_details
+ node_response = await client.get("/nodes/v3.order_details/")
+ assert node_response.status_code == 200
+ original_version = node_response.json()["version"]
+
+ # Get pre-aggs for current version (without include_stale)
+ current_response = await client.get(
+ "/preaggs/",
+ params={"node_name": "v3.order_details"},
+ )
+ assert current_response.status_code == 200
+ original_preagg_count = current_response.json()["total"]
+ assert original_preagg_count >= 1
+ original_preagg_ids = {item["id"] for item in current_response.json()["items"]}
+
+ # Update the node to create a new version (this makes existing pre-aggs stale)
+ update_response = await client.patch(
+ "/nodes/v3.order_details/",
+ json={"description": "Updated description to create new version"},
+ )
+ assert update_response.status_code == 200
+ new_version = update_response.json()["version"]
+ assert new_version != original_version, "Node version should have changed"
+
+ # Create a new pre-agg on the new version
+ plan_response = await client.post(
+ "/preaggs/plan",
+ json={
+ "metrics": ["v3.total_revenue"],
+ "dimensions": ["v3.order_details.order_id"], # Different grain
+ },
+ )
+ assert plan_response.status_code == 201
+ new_preagg_id = plan_response.json()["preaggs"][0]["id"]
+
+ # Without include_stale (default): should only return new version pre-aggs
+ default_response = await client.get(
+ "/preaggs/",
+ params={"node_name": "v3.order_details"},
+ )
+ assert default_response.status_code == 200
+ default_ids = {item["id"] for item in default_response.json()["items"]}
+
+ # Should include the new pre-agg
+ assert new_preagg_id in default_ids
+ # Should NOT include original pre-aggs (they're on old version)
+ assert len(default_ids & original_preagg_ids) == 0, (
+ "Default (no include_stale) should not return stale pre-aggs"
+ )
+
+ # With include_stale=true: should return pre-aggs from ALL versions
+ stale_response = await client.get(
+ "/preaggs/",
+ params={"node_name": "v3.order_details", "include_stale": "true"},
+ )
+ assert stale_response.status_code == 200
+ stale_data = stale_response.json()
+ stale_ids = {item["id"] for item in stale_data["items"]}
+
+ # Should include the new pre-agg
+ assert new_preagg_id in stale_ids
+ # Should also include original pre-aggs (stale ones)
+ assert original_preagg_ids <= stale_ids, (
+ "include_stale=true should return stale pre-aggs"
+ )
+ # Total should be more than just current version
+ assert stale_data["total"] > len(default_ids)
+
+ # Verify versions are different for stale vs current
+ versions_in_response = {item["node_version"] for item in stale_data["items"]}
+ assert original_version in versions_in_response
+ assert new_version in versions_in_response
+
+ @pytest.mark.asyncio
+ async def test_list_preaggs_include_stale_false_explicit(self, client_with_preaggs):
+ """Test that include_stale=false behaves same as default (no param)."""
+ client = client_with_preaggs["client"]
+
+ # Get pre-aggs with default (no include_stale param)
+ default_response = await client.get(
+ "/preaggs/",
+ params={"node_name": "v3.order_details"},
+ )
+ assert default_response.status_code == 200
+
+ # Get pre-aggs with explicit include_stale=false
+ explicit_response = await client.get(
+ "/preaggs/",
+ params={"node_name": "v3.order_details", "include_stale": "false"},
+ )
+ assert explicit_response.status_code == 200
+
+ # Should return same results
+ default_ids = {item["id"] for item in default_response.json()["items"]}
+ explicit_ids = {item["id"] for item in explicit_response.json()["items"]}
+ assert default_ids == explicit_ids
+
@pytest.mark.xdist_group(name="preaggregations")
class TestGetPreaggregationById:
@@ -473,7 +584,78 @@ async def test_get_preagg_by_id(self, client_with_preaggs):
assert data["id"] == preagg1.id
assert data["node_revision_id"] == preagg1.node_revision_id
assert data["grain_columns"] == preagg1.grain_columns
- assert data["measures"] == [m.model_dump() for m in preagg1.measures]
+ assert data["measures"] == [
+ {
+ "aggregation": "SUM",
+ "expr_hash": "83632b779d87",
+ "expression": "line_total",
+ "merge": "SUM",
+ "name": "line_total_sum_e1f61696",
+ "rule": {
+ "level": None,
+ "type": "full",
+ },
+ "used_by_metrics": [
+ {
+ "display_name": "Avg Order Value",
+ "name": "v3.avg_order_value",
+ },
+ {
+ "display_name": "Mom Revenue Change",
+ "name": "v3.mom_revenue_change",
+ },
+ {
+ "display_name": "Revenue Per Customer",
+ "name": "v3.revenue_per_customer",
+ },
+ {
+ "display_name": "Revenue Per Page View",
+ "name": "v3.revenue_per_page_view",
+ },
+ {
+ "display_name": "Revenue Per Visitor",
+ "name": "v3.revenue_per_visitor",
+ },
+ {
+ "display_name": "Total Revenue",
+ "name": "v3.total_revenue",
+ },
+ {
+ "display_name": "Trailing 7D Revenue",
+ "name": "v3.trailing_7d_revenue",
+ },
+ {
+ "display_name": "Trailing Wow Revenue Change",
+ "name": "v3.trailing_wow_revenue_change",
+ },
+ {
+ "display_name": "Wow Revenue Change",
+ "name": "v3.wow_revenue_change",
+ },
+ ],
+ },
+ {
+ "aggregation": "SUM",
+ "expr_hash": "221d2a4bfdae",
+ "expression": "quantity",
+ "merge": "SUM",
+ "name": "quantity_sum_06b64d2e",
+ "rule": {
+ "level": None,
+ "type": "full",
+ },
+ "used_by_metrics": [
+ {
+ "display_name": "Avg Items Per Order",
+ "name": "v3.avg_items_per_order",
+ },
+ {
+ "display_name": "Total Quantity",
+ "name": "v3.total_quantity",
+ },
+ ],
+ },
+ ]
assert data["sql"] == preagg1.sql
assert data["grain_group_hash"] == preagg1.grain_group_hash
assert data["strategy"] == preagg1.strategy.value
@@ -917,6 +1099,136 @@ async def test_deactivate_workflow_not_found(self, client_with_preaggs):
assert response.status_code == 404
+@pytest.mark.xdist_group(name="preaggregations")
+class TestBulkDeactivateWorkflows:
+ """Tests for DELETE /preaggs/workflows endpoint (bulk deactivation)."""
+
+ @pytest.mark.asyncio
+ async def test_bulk_deactivate_no_active_workflows(self, client_with_preaggs):
+ """Test bulk deactivate when no active workflows exist returns empty result."""
+ client = client_with_preaggs["client"]
+
+ # None of the preaggs have active workflows by default
+ response = await client.delete(
+ "/preaggs/workflows",
+ params={"node_name": "v3.order_details"},
+ )
+
+ assert response.status_code == 200
+ data = response.json()
+ assert data["deactivated_count"] == 0
+ assert data["deactivated"] == []
+ assert "No active workflows found" in data["message"]
+
+ @pytest.mark.asyncio
+ async def test_bulk_deactivate_node_not_found(self, client_with_preaggs):
+ """Test bulk deactivate for non-existent node returns 404."""
+ client = client_with_preaggs["client"]
+
+ response = await client.delete(
+ "/preaggs/workflows",
+ params={"node_name": "nonexistent.node"},
+ )
+
+ assert response.status_code == 404
+
+ @pytest.mark.asyncio
+ async def test_bulk_deactivate_success(
+ self,
+ client_with_preaggs,
+ mock_qs_for_preaggs,
+ ):
+ """Test successfully bulk deactivating workflows for a node."""
+ client = client_with_preaggs["client"]
+ session = client_with_preaggs["session"]
+ preagg8 = client_with_preaggs["preagg8"]
+ preagg9 = client_with_preaggs["preagg9"]
+
+ # Set up active workflows on preagg8 and preagg9 (both use v3.product.category)
+ preagg8_obj = await session.get(PreAggregation, preagg8.id)
+ preagg8_obj.workflow_urls = [
+ WorkflowUrl(label="scheduled", url="http://scheduler/workflow/preagg8"),
+ ]
+ preagg8_obj.workflow_status = "active"
+
+ preagg9_obj = await session.get(PreAggregation, preagg9.id)
+ preagg9_obj.workflow_urls = [
+ WorkflowUrl(label="scheduled", url="http://scheduler/workflow/preagg9"),
+ ]
+ preagg9_obj.workflow_status = "active"
+ await session.commit()
+
+ # Mock the deactivate method
+ mock_qs_for_preaggs.deactivate_preagg_workflow.return_value = {
+ "status": "paused",
+ }
+
+ # Bulk deactivate all workflows for v3.page_views_enriched node
+ # (preagg8 and preagg9 are based on metrics from v3.page_views_enriched)
+ response = await client.delete(
+ "/preaggs/workflows",
+ params={"node_name": "v3.page_views_enriched"},
+ )
+
+ assert response.status_code == 200
+ data = response.json()
+ assert data["deactivated_count"] == 2
+ assert len(data["deactivated"]) == 2
+
+ # Verify query service was called twice
+ assert mock_qs_for_preaggs.deactivate_preagg_workflow.call_count == 2
+
+ # Verify the deactivated IDs include our preaggs
+ deactivated_ids = {item["id"] for item in data["deactivated"]}
+ assert preagg8.id in deactivated_ids
+ assert preagg9.id in deactivated_ids
+
+ @pytest.mark.asyncio
+ async def test_bulk_deactivate_stale_only(
+ self,
+ client_with_preaggs,
+ mock_qs_for_preaggs,
+ ):
+ """Test bulk deactivate with stale_only=true only deactivates stale preaggs."""
+ client = client_with_preaggs["client"]
+ session = client_with_preaggs["session"]
+ preagg10 = client_with_preaggs["preagg10"]
+
+ # Set up active workflow on preagg10
+ preagg10_obj = await session.get(PreAggregation, preagg10.id)
+ preagg10_obj.workflow_urls = [
+ WorkflowUrl(label="scheduled", url="http://scheduler/workflow/preagg10"),
+ ]
+ preagg10_obj.workflow_status = "active"
+ await session.commit()
+
+ # With stale_only=true, since all preaggs are on current revision,
+ # nothing should be deactivated
+ response = await client.delete(
+ "/preaggs/workflows",
+ params={"node_name": "v3.page_views_enriched", "stale_only": "true"},
+ )
+
+ assert response.status_code == 200
+ data = response.json()
+ # No stale preaggs exist (all are on current revision)
+ assert data["deactivated_count"] == 0
+ assert "No active workflows found" in data["message"]
+
+ # Verify query service was NOT called
+ mock_qs_for_preaggs.deactivate_preagg_workflow.assert_not_called()
+
+ @pytest.mark.asyncio
+ async def test_bulk_deactivate_missing_node_name(self, client_with_preaggs):
+ """Test bulk deactivate requires node_name parameter."""
+ client = client_with_preaggs["client"]
+
+ response = await client.delete("/preaggs/workflows")
+
+ # FastAPI returns 422 for missing required query params
+ assert response.status_code == 422
+
+
@pytest.mark.xdist_group(name="preaggregations")
class TestRunPreaggBackfill:
"""Tests for POST /preaggs/{id}/backfill endpoint."""
diff --git a/datajunction-ui/src/app/pages/NodePage/NodeMaterializationTab.jsx b/datajunction-ui/src/app/pages/NodePage/NodeMaterializationTab.jsx
index 78d126cd5..90af53462 100644
--- a/datajunction-ui/src/app/pages/NodePage/NodeMaterializationTab.jsx
+++ b/datajunction-ui/src/app/pages/NodePage/NodeMaterializationTab.jsx
@@ -11,6 +11,11 @@ import AvailabilityStateBlock from './AvailabilityStateBlock';
const cronstrue = require('cronstrue');
+/**
+ * Cube materialization tab - shows cube-specific materializations.
+ * For non-cube nodes, the parent component (index.jsx) renders
+ * NodePreAggregationsTab instead.
+ */
export default function NodeMaterializationTab({ node, djClient }) {
const [rawMaterializations, setRawMaterializations] = useState([]);
const [selectedRevisionTab, setSelectedRevisionTab] = useState(null);
diff --git a/datajunction-ui/src/app/pages/NodePage/NodePreAggregationsTab.jsx b/datajunction-ui/src/app/pages/NodePage/NodePreAggregationsTab.jsx
new file mode 100644
index 000000000..6cbecdb4f
--- /dev/null
+++ b/datajunction-ui/src/app/pages/NodePage/NodePreAggregationsTab.jsx
@@ -0,0 +1,656 @@
+import { useEffect, useState, useMemo, useContext } from 'react';
+import DJClientContext from '../../providers/djclient';
+import { labelize } from '../../../utils/form';
+import '../../../styles/preaggregations.css';
+
+const cronstrue = require('cronstrue');
+
+/**
+ * Pre-aggregations tab for non-cube nodes (transform, metric, dimension).
+ * Shows pre-aggs grouped by staleness (current vs stale versions).
+ */
+export default function NodePreAggregationsTab({ node }) {
+ const djClient = useContext(DJClientContext).DataJunctionAPI;
+ const [preaggs, setPreaggs] = useState([]);
+ const [loading, setLoading] = useState(true);
+ const [error, setError] = useState(null);
+ const [expandedIds, setExpandedIds] = useState(new Set());
+ const [expandedGrainIds, setExpandedGrainIds] = useState(new Set());
+ const [deactivating, setDeactivating] = useState(new Set());
+
+ const MAX_VISIBLE_GRAIN = 10;
+
+ // Fetch pre-aggregations for this node
+ useEffect(() => {
+ const fetchPreaggs = async () => {
+ if (!node?.name) return;
+
+ setLoading(true);
+ setError(null);
+
+ try {
+ const result = await djClient.listPreaggs({
+ node_name: node.name,
+ include_stale: true,
+ });
+ if (result._error) {
+ setError(result.message);
+ } else {
+ setPreaggs(result.items || []);
+ }
+ } catch (err) {
+ setError(err.message || 'Failed to load pre-aggregations');
+ } finally {
+ setLoading(false);
+ }
+ };
+
+ fetchPreaggs();
+ }, [node?.name, djClient]);
+
+ // Group pre-aggs by staleness
+ const { currentPreaggs, stalePreaggs } = useMemo(() => {
+ const currentVersion = node?.version;
+ const current = [];
+ const stale = [];
+
+ preaggs.forEach(preagg => {
+ if (preagg.node_version === currentVersion) {
+ current.push(preagg);
+ } else {
+ stale.push(preagg);
+ }
+ });
+
+ return { currentPreaggs: current, stalePreaggs: stale };
+ }, [preaggs, node?.version]);
+
+ // Auto-expand the first current pre-agg when data loads
+ useEffect(() => {
+ if (currentPreaggs.length > 0 && expandedIds.size === 0) {
+ setExpandedIds(new Set([currentPreaggs[0].id]));
+ }
+ }, [currentPreaggs]);
+
+ // Toggle expanded state for a pre-agg row
+ const toggleExpanded = id => {
+ setExpandedIds(prev => {
+ const next = new Set(prev);
+ if (next.has(id)) {
+ next.delete(id);
+ } else {
+ next.add(id);
+ }
+ return next;
+ });
+ };
+
+ // Deactivate a single pre-agg workflow
+ const handleDeactivate = async preaggId => {
+ if (
+ !window.confirm(
+ 'Are you sure you want to deactivate this workflow? ' +
+ 'The materialization will stop running.',
+ )
+ ) {
+ return;
+ }
+
+ setDeactivating(prev => new Set(prev).add(preaggId));
+
+ try {
+ const result = await djClient.deactivatePreaggWorkflow(preaggId);
+ if (result._error) {
+ alert(`Failed to deactivate: ${result.message}`);
+ } else {
+ // Refresh the list
+ const refreshed = await djClient.listPreaggs({
+ node_name: node.name,
+ include_stale: true,
+ });
+ if (!refreshed._error) {
+ setPreaggs(refreshed.items || []);
+ }
+ }
+ } catch (err) {
+ alert(`Error: ${err.message}`);
+ } finally {
+ setDeactivating(prev => {
+ const next = new Set(prev);
+ next.delete(preaggId);
+ return next;
+ });
+ }
+ };
+
+ // Bulk deactivate all stale workflows
+ const handleDeactivateAllStale = async () => {
+ const activeStale = stalePreaggs.filter(
+ p => p.workflow_status === 'active',
+ );
+ if (activeStale.length === 0) {
+ alert('No active stale workflows to deactivate.');
+ return;
+ }
+
+ if (
+ !window.confirm(
+ `Are you sure you want to deactivate ${activeStale.length} stale workflow(s)? ` +
+ 'These materializations are from older node versions and will stop running.',
+ )
+ ) {
+ return;
+ }
+
+ setDeactivating(prev => {
+ const next = new Set(prev);
+ activeStale.forEach(p => next.add(p.id));
+ return next;
+ });
+
+ try {
+ const result = await djClient.bulkDeactivatePreaggWorkflows(
+ node.name,
+ true,
+ );
+ if (result._error) {
+ alert(`Failed to deactivate: ${result.message}`);
+ } else {
+ // Refresh the list
+ const refreshed = await djClient.listPreaggs({
+ node_name: node.name,
+ include_stale: true,
+ });
+ if (!refreshed._error) {
+ setPreaggs(refreshed.items || []);
+ }
+ }
+ } catch (err) {
+ alert(`Error: ${err.message}`);
+ } finally {
+ setDeactivating(new Set());
+ }
+ };
+
+ // Format cron expression to human-readable
+ const formatSchedule = schedule => {
+ if (!schedule) return 'Not scheduled';
+ try {
+ return cronstrue.toString(schedule);
+ } catch {
+ return schedule;
+ }
+ };
+
+ // Render a single pre-agg row
+ const renderPreaggRow = (preagg, isStale = false) => {
+ const isExpanded = expandedIds.has(preagg.id);
+ const isDeactivating = deactivating.has(preagg.id);
+ const hasActiveWorkflow = preagg.workflow_status === 'active';
+
+ return (
+
+ {/* Collapsed header row */}
+
toggleExpanded(preagg.id)}
+ >
+
+ {isExpanded ? '\u25BC' : '\u25B6'}
+
+
+
+ {(() => {
+ const grainCols = preagg.grain_columns || [];
+ const maxVisible = MAX_VISIBLE_GRAIN;
+ const visibleCols = grainCols.slice(0, maxVisible);
+ const hiddenCount = grainCols.length - maxVisible;
+
+ return (
+ <>
+ {visibleCols.map((col, idx) => {
+ const parts = col.split('.');
+ const shortName = parts[parts.length - 1];
+ return (
+
+ {shortName}
+
+ );
+ })}
+ {hiddenCount > 0 && (
+
+ +{hiddenCount}
+
+ )}
+ >
+ );
+ })()}
+
+
+
+ {preagg.measures?.length || 0} measure
+ {(preagg.measures?.length || 0) !== 1 ? 's' : ''}
+
+
+ {preagg.related_metrics?.length > 0 && (
+
+ {preagg.related_metrics.length} metric
+ {preagg.related_metrics.length !== 1 ? 's' : ''}
+
+ )}
+
+ {hasActiveWorkflow ? (
+
+ Active
+
+ ) : preagg.workflow_status === 'paused' ? (
+
+ Paused
+
+ ) : (
+
+ Pending
+
+ )}
+
+ {preagg.schedule && (
+
+ {formatSchedule(preagg.schedule).toLowerCase()}
+
+ )}
+
+ {isStale && (
+
+ was {preagg.node_version}
+
+ )}
+
+
+ {/* Expanded details */}
+ {isExpanded && (
+
+ {isStale && (
+
+
⚠️
+
+ Built for {preagg.node_version} — current is{' '}
+ {node.version}
+
+
+ This workflow is still running but won't be used for
+ queries.
+
+
+
+ )}
+
+
+ {/* Config + Grain side by side */}
+
+ {/* Config */}
+
+
Config
+
+ {/* Table-style key-value pairs */}
+
+
+
+ | Strategy |
+
+ {preagg.strategy
+ ? labelize(preagg.strategy)
+ : 'Not set'}
+ |
+
+
+ | Schedule |
+
+ {preagg.schedule ? (
+ <>
+ {formatSchedule(preagg.schedule)}
+
+ ({preagg.schedule})
+
+ >
+ ) : (
+ 'Not scheduled'
+ )}
+ |
+
+ {preagg.lookback_window && (
+
+ | Lookback |
+
+ {preagg.lookback_window}
+ |
+
+ )}
+ {preagg.max_partition &&
+ preagg.max_partition.length > 0 && (
+
+ |
+ Max Partition
+ |
+
+ {preagg.max_partition.join(', ')}
+ |
+
+ )}
+
+
+
+ {/* Actions */}
+
+ {/* Workflow buttons - one per URL */}
+ {preagg.workflow_urls?.map((wf, idx) => {
+ const label = wf.label || 'Workflow';
+ const capitalizedLabel =
+ label.charAt(0).toUpperCase() + label.slice(1);
+ return (
+
+ {capitalizedLabel}
+
+ );
+ })}
+
+ {hasActiveWorkflow && (
+
+ )}
+
+
+
+
+ {/* Grain */}
+
+
Grain
+
+
+ {(() => {
+ const grainCols = preagg.grain_columns || [];
+ const isGrainExpanded = expandedGrainIds.has(preagg.id);
+ const visibleCols = isGrainExpanded
+ ? grainCols
+ : grainCols.slice(0, MAX_VISIBLE_GRAIN);
+ const hiddenCount =
+ grainCols.length - MAX_VISIBLE_GRAIN;
+
+ return (
+ <>
+ {visibleCols.map((col, idx) => {
+ const parts = col.split('.');
+ const nodeName = parts.slice(0, -1).join('.');
+ return (
+
+ {col}
+
+ );
+ })}
+ {!isGrainExpanded && hiddenCount > 0 && (
+
+ )}
+ {isGrainExpanded && hiddenCount > 0 && (
+
+ )}
+ >
+ );
+ })()}
+
+
+
+
+
+ {/* Measures */}
+
+
+ Measures
+
+ ⓘ
+
+
+
+
+
+
+ | Name |
+
+ Aggregation
+
+ ⓘ
+
+ |
+
+ Merge
+
+ ⓘ
+
+ |
+
+ Rule
+
+ ⓘ
+
+ |
+
+ Used By
+
+ ⓘ
+
+ |
+
+
+
+ {preagg.measures?.map((measure, idx) => (
+
+ |
+ {measure.name}
+ |
+
+
+ {measure.aggregation
+ ? `${measure.aggregation}(${measure.expression})`
+ : measure.expression}
+
+ |
+
+ {measure.merge && (
+
+ {measure.merge}
+
+ )}
+ |
+
+ {measure.rule && (
+
+ {typeof measure.rule === 'object'
+ ? measure.rule.type || ''
+ : measure.rule}
+
+ )}
+ |
+
+ {measure.used_by_metrics?.length > 0 && (
+
+ )}
+ |
+
+ ))}
+
+
+
+
+
+
+ )}
+
+ );
+ };
+
+ // Loading state
+ if (loading) {
+ return Loading pre-aggregations...
;
+ }
+
+ // Error state
+ if (error) {
+ return (
+
+ Error loading pre-aggregations: {error}
+
+ );
+ }
+
+ // No pre-aggs
+ if (preaggs.length === 0) {
+ return (
+
+
+ No pre-aggregations found for this node.
+
+
+ Pre-aggregations are created when you use the{' '}
+ Query Planner to plan materializations
+ for metrics derived from this node.
+
+
+ );
+ }
+
+ // Calculate if there are active stale workflows
+ const activeStaleCount = stalePreaggs.filter(
+ p => p.workflow_status === 'active',
+ ).length;
+
+ return (
+
+ {/* Current Version Section */}
+
+
+
+ Current Pre-Aggregations ({node.version})
+
+
+ {currentPreaggs.length} pre-aggregation
+ {currentPreaggs.length !== 1 ? 's' : ''}
+
+
+
+ {currentPreaggs.length > 0 ? (
+ currentPreaggs.map(preagg => renderPreaggRow(preagg, false))
+ ) : (
+
+ No pre-aggregations for the current version.
+
+ )}
+
+
+ {/* Stale Section */}
+ {stalePreaggs.length > 0 && (
+
+
+
+
+ Stale Pre-Aggregations ({stalePreaggs.length})
+
+
+ {activeStaleCount} active workflow
+ {activeStaleCount !== 1 ? 's' : ''}
+
+
+
+ {activeStaleCount > 0 && (
+
+ )}
+
+
+ {stalePreaggs.map(preagg => renderPreaggRow(preagg, true))}
+
+ )}
+
+ );
+}
diff --git a/datajunction-ui/src/app/pages/NodePage/__tests__/NodePage.test.jsx b/datajunction-ui/src/app/pages/NodePage/__tests__/NodePage.test.jsx
index a82c1d434..ebbc8ddea 100644
--- a/datajunction-ui/src/app/pages/NodePage/__tests__/NodePage.test.jsx
+++ b/datajunction-ui/src/app/pages/NodePage/__tests__/NodePage.test.jsx
@@ -6,6 +6,14 @@ import { NodePage } from '../Loadable';
import { MemoryRouter, Route, Routes } from 'react-router-dom';
import userEvent from '@testing-library/user-event';
+// Mock cronstrue for NodePreAggregationsTab
+jest.mock('cronstrue', () => ({
+ toString: () => 'Every day at midnight',
+}));
+
+// Mock CSS imports
+jest.mock('../../../../styles/preaggregations.css', () => ({}));
+
describe('', () => {
const domTestingLib = require('@testing-library/dom');
const { queryHelpers } = domTestingLib;
@@ -61,6 +69,8 @@ describe('', () => {
removeComplexDimensionLink: jest
.fn()
.mockResolvedValue({ status: 200 }),
+ listPreaggs: jest.fn().mockResolvedValue({ items: [] }),
+ deactivatePreaggWorkflow: jest.fn().mockResolvedValue({ status: 200 }),
},
};
};
@@ -609,13 +619,13 @@ describe('', () => {
it('renders an empty NodeMaterialization tab correctly', async () => {
const djClient = mockDJClient();
- djClient.DataJunctionAPI.node.mockReturnValue(mocks.mockMetricNode);
+ djClient.DataJunctionAPI.node.mockResolvedValue(mocks.mockMetricNode);
djClient.DataJunctionAPI.getMetric.mockResolvedValue(
mocks.mockMetricNodeJson,
);
- djClient.DataJunctionAPI.columns.mockReturnValue(mocks.metricNodeColumns);
- djClient.DataJunctionAPI.materializations.mockReturnValue([]);
- djClient.DataJunctionAPI.availabilityStates.mockReturnValue([]);
+ djClient.DataJunctionAPI.columns.mockResolvedValue(mocks.metricNodeColumns);
+ // For metric nodes, listPreaggs is called, not materializations
+ djClient.DataJunctionAPI.listPreaggs.mockResolvedValue({ items: [] });
const element = (
@@ -631,35 +641,40 @@ describe('', () => {
,
);
- await waitFor(
- () => {
- fireEvent.click(
- screen.getByRole('button', { name: 'Materializations' }),
- );
- expect(djClient.DataJunctionAPI.materializations).toHaveBeenCalledWith(
- mocks.mockMetricNode.name,
- );
- screen.getByText(
- 'No materialization workflows configured for this revision.',
- );
- },
- { timeout: 5000 },
- );
+
+ // For metric nodes, NodePreAggregationsTab is used, which calls listPreaggs
+ await waitFor(() => {
+ expect(djClient.DataJunctionAPI.listPreaggs).toHaveBeenCalledWith({
+ node_name: mocks.mockMetricNode.name,
+ include_stale: true,
+ });
+ });
+
+ // Check for the empty state text (for NodePreAggregationsTab)
+ expect(
+ screen.getByText('No pre-aggregations found for this node.'),
+ ).toBeInTheDocument();
});
it('renders the NodeMaterialization tab with materializations correctly', async () => {
const djClient = mockDJClient();
- djClient.DataJunctionAPI.node.mockReturnValue(mocks.mockTransformNode);
- djClient.DataJunctionAPI.getMetric.mockResolvedValue(
- mocks.mockMetricNodeJson,
- );
- djClient.DataJunctionAPI.columns.mockReturnValue(mocks.metricNodeColumns);
- djClient.DataJunctionAPI.materializations.mockReturnValue(
+ // Use cube node - only cubes use NodeMaterializationTab
+ // Override columns with explicit partition: null to avoid undefined.type_ error
+ const cubeNodeWithPartitions = {
+ ...mocks.mockCubeNode,
+ columns: mocks.mockCubeNode.columns.map(col => ({
+ ...col,
+ partition: null,
+ })),
+ };
+ djClient.DataJunctionAPI.node.mockResolvedValue(cubeNodeWithPartitions);
+ djClient.DataJunctionAPI.cube.mockResolvedValue(mocks.mockCubesCube);
+ djClient.DataJunctionAPI.columns.mockResolvedValue([]);
+ djClient.DataJunctionAPI.materializations.mockResolvedValue(
mocks.nodeMaterializations,
);
- djClient.DataJunctionAPI.availabilityStates.mockReturnValue([]);
-
- djClient.DataJunctionAPI.materializationInfo.mockReturnValue(
+ djClient.DataJunctionAPI.availabilityStates.mockResolvedValue([]);
+ djClient.DataJunctionAPI.materializationInfo.mockResolvedValue(
mocks.materializationInfo,
);
@@ -670,30 +685,28 @@ describe('', () => {
);
render(
,
);
- await waitFor(
- () => {
- fireEvent.click(
- screen.getByRole('button', { name: 'Materializations' }),
- );
- expect(djClient.DataJunctionAPI.node).toHaveBeenCalledWith(
- mocks.mockTransformNode.name,
- );
- expect(djClient.DataJunctionAPI.materializations).toHaveBeenCalledWith(
- mocks.mockTransformNode.name,
- );
- },
- { timeout: 3000 },
- );
- }, 60000);
+
+ // Wait for the node to load first
+ await waitFor(() => {
+ expect(djClient.DataJunctionAPI.node).toHaveBeenCalledWith(
+ 'default.repair_orders_cube',
+ );
+ });
+
+ // Then wait for materializations to be fetched
+ await waitFor(() => {
+ expect(djClient.DataJunctionAPI.materializations).toHaveBeenCalledWith(
+ mocks.mockCubeNode.name,
+ );
+ });
+ });
it('renders the NodeValidate tab', async () => {
const djClient = mockDJClient();
diff --git a/datajunction-ui/src/app/pages/NodePage/__tests__/NodePreAggregationsTab.test.jsx b/datajunction-ui/src/app/pages/NodePage/__tests__/NodePreAggregationsTab.test.jsx
new file mode 100644
index 000000000..beed36055
--- /dev/null
+++ b/datajunction-ui/src/app/pages/NodePage/__tests__/NodePreAggregationsTab.test.jsx
@@ -0,0 +1,654 @@
+import React from 'react';
+import { render, screen, fireEvent, waitFor } from '@testing-library/react';
+import { MemoryRouter } from 'react-router-dom';
+import NodePreAggregationsTab from '../NodePreAggregationsTab';
+import DJClientContext from '../../../providers/djclient';
+
+// Mock the CSS import
+jest.mock('../../../../styles/preaggregations.css', () => ({}));
+
+// Mock cronstrue - it's imported via require() in the component
+jest.mock('cronstrue', () => ({
+ toString: cron => {
+ if (cron === '0 0 * * *') return 'At 12:00 AM';
+ if (cron === '0 * * * *') return 'Every hour';
+ return cron || 'Not scheduled';
+ },
+}));
+
+// Mock labelize from utils/form
+jest.mock('../../../../utils/form', () => ({
+ labelize: str => {
+ if (!str) return '';
+ // Convert snake_case/SCREAMING_SNAKE to Title Case
+ return str
+ .toLowerCase()
+ .replace(/_/g, ' ')
+ .replace(/\b\w/g, c => c.toUpperCase());
+ },
+}));
+
+const mockNode = {
+ name: 'default.orders_fact',
+ version: 'v1.0',
+ type: 'transform',
+};
+
+const mockPreaggs = {
+ items: [
+ {
+ id: 1,
+ node_revision_id: 1,
+ node_name: 'default.orders_fact',
+ node_version: 'v1.0',
+ grain_columns: [
+ 'default.date_dim.date_id',
+ 'default.customer_dim.customer_id',
+ ],
+ measures: [
+ {
+ name: 'total_revenue',
+ expression: 'revenue',
+ aggregation: 'SUM',
+ merge: 'SUM',
+ rule: { type: 'full' },
+ used_by_metrics: [
+ { name: 'default.revenue_metric', display_name: 'Revenue' },
+ ],
+ },
+ {
+ name: 'order_count',
+ expression: '*',
+ aggregation: 'COUNT',
+ merge: 'SUM',
+ rule: { type: 'full' },
+ used_by_metrics: [
+ { name: 'default.order_count_metric', display_name: 'Order Count' },
+ ],
+ },
+ ],
+ sql: 'SELECT date_id, customer_id, SUM(revenue), COUNT(*) FROM orders GROUP BY 1, 2',
+ grain_group_hash: 'abc123',
+ strategy: 'full',
+ schedule: '0 0 * * *',
+ lookback_window: null,
+ workflow_urls: [
+ { label: 'scheduled', url: 'http://scheduler/workflow/123.main' },
+ ],
+ workflow_status: 'active',
+ status: 'active',
+ materialized_table_ref: 'analytics.preaggs.orders_fact_abc123',
+ max_partition: ['2024', '01', '15'],
+ related_metrics: ['default.revenue_metric', 'default.order_count_metric'],
+ created_at: '2024-01-01T00:00:00Z',
+ updated_at: '2024-01-15T00:00:00Z',
+ },
+ {
+ id: 2,
+ node_revision_id: 1,
+ node_name: 'default.orders_fact',
+ node_version: 'v1.0',
+ grain_columns: ['default.product_dim.category'],
+ measures: [
+ {
+ name: 'total_quantity',
+ expression: 'quantity',
+ aggregation: 'SUM',
+ merge: 'SUM',
+ rule: { type: 'full' },
+ used_by_metrics: null,
+ },
+ ],
+ sql: 'SELECT category, SUM(quantity) FROM orders GROUP BY 1',
+ grain_group_hash: 'def456',
+ strategy: 'incremental_time',
+ schedule: '0 * * * *',
+ lookback_window: '3 days',
+ workflow_urls: null,
+ workflow_status: null,
+ status: 'pending',
+ materialized_table_ref: null,
+ max_partition: null,
+ related_metrics: null,
+ created_at: '2024-01-10T00:00:00Z',
+ updated_at: null,
+ },
+ ],
+ total: 2,
+ limit: 50,
+ offset: 0,
+};
+
+const mockPreaggsWithStale = {
+ items: [
+ ...mockPreaggs.items,
+ {
+ id: 3,
+ node_revision_id: 0,
+ node_name: 'default.orders_fact',
+ node_version: 'v0.9', // Stale version
+ grain_columns: ['default.date_dim.date_id'],
+ measures: [
+ {
+ name: 'old_revenue',
+ expression: 'revenue',
+ aggregation: 'SUM',
+ merge: 'SUM',
+ rule: { type: 'full' },
+ used_by_metrics: null,
+ },
+ ],
+ sql: 'SELECT date_id, SUM(revenue) FROM orders GROUP BY 1',
+ grain_group_hash: 'old123',
+ strategy: 'full',
+ schedule: '0 0 * * *',
+ workflow_urls: [
+ { label: 'scheduled', url: 'http://scheduler/workflow/old.main' },
+ ],
+ workflow_status: 'active',
+ status: 'active',
+ materialized_table_ref: 'analytics.preaggs.orders_fact_old',
+ max_partition: ['2024', '01', '10'],
+ related_metrics: null,
+ created_at: '2023-12-01T00:00:00Z',
+ updated_at: '2024-01-10T00:00:00Z',
+ },
+ ],
+ total: 3,
+ limit: 50,
+ offset: 0,
+};
+
+const createMockDjClient = (preaggs = mockPreaggs) => ({
+ DataJunctionAPI: {
+ listPreaggs: jest.fn().mockResolvedValue(preaggs),
+ deactivatePreaggWorkflow: jest.fn().mockResolvedValue({ status: 'none' }),
+ bulkDeactivatePreaggWorkflows: jest.fn().mockResolvedValue({
+ deactivated_count: 1,
+ deactivated: [{ id: 3 }],
+ }),
+ },
+});
+
+const renderWithContext = (component, djClient) => {
+ return render(
+
+
+ {component}
+
+ ,
+ );
+};
+
+describe('', () => {
+ beforeEach(() => {
+ jest.clearAllMocks();
+ });
+
+ describe('Loading and Empty States', () => {
+ it('shows loading state initially', () => {
+ const djClient = createMockDjClient();
+ // Make the promise never resolve to keep loading state
+ djClient.DataJunctionAPI.listPreaggs.mockReturnValue(
+ new Promise(() => {}),
+ );
+
+ renderWithContext(, djClient);
+
+ expect(
+ screen.getByText('Loading pre-aggregations...'),
+ ).toBeInTheDocument();
+ });
+
+ it('shows empty state when no pre-aggregations exist', async () => {
+ const djClient = createMockDjClient({
+ items: [],
+ total: 0,
+ limit: 50,
+ offset: 0,
+ });
+
+ renderWithContext(, djClient);
+
+ await waitFor(() => {
+ expect(
+ screen.getByText('No pre-aggregations found for this node.'),
+ ).toBeInTheDocument();
+ });
+ });
+
+ it('shows error state when API fails', async () => {
+ const djClient = createMockDjClient();
+ djClient.DataJunctionAPI.listPreaggs.mockResolvedValue({
+ _error: true,
+ message: 'Failed to fetch',
+ });
+
+ renderWithContext(, djClient);
+
+ await waitFor(() => {
+ expect(
+ screen.getByText(/Error loading pre-aggregations/),
+ ).toBeInTheDocument();
+ });
+ });
+ });
+
+ describe('Section Headers', () => {
+ it('renders "Current Pre-Aggregations" section header with version', async () => {
+ const djClient = createMockDjClient();
+
+ renderWithContext(, djClient);
+
+ await waitFor(() => {
+ expect(
+ screen.getByText('Current Pre-Aggregations (v1.0)'),
+ ).toBeInTheDocument();
+ });
+ });
+
+ it('renders "Stale Pre-Aggregations" section when stale preaggs exist', async () => {
+ const djClient = createMockDjClient(mockPreaggsWithStale);
+
+ renderWithContext(, djClient);
+
+ await waitFor(() => {
+ expect(
+ screen.getByText('Stale Pre-Aggregations (1)'),
+ ).toBeInTheDocument();
+ });
+ });
+
+ it('does not render stale section when no stale preaggs exist', async () => {
+ const djClient = createMockDjClient();
+
+ renderWithContext(, djClient);
+
+ await waitFor(() => {
+ expect(
+ screen.getByText('Current Pre-Aggregations (v1.0)'),
+ ).toBeInTheDocument();
+ });
+
+ expect(
+ screen.queryByText(/Stale Pre-Aggregations/),
+ ).not.toBeInTheDocument();
+ });
+ });
+
+ describe('Pre-agg Row Header', () => {
+ it('renders grain columns as chips', async () => {
+ const djClient = createMockDjClient();
+
+ renderWithContext(, djClient);
+
+ await waitFor(() => {
+ // Should show short names from grain columns
+ expect(screen.getByText('date_id')).toBeInTheDocument();
+ expect(screen.getByText('customer_id')).toBeInTheDocument();
+ });
+ });
+
+ it('renders measure count', async () => {
+ const djClient = createMockDjClient();
+
+ renderWithContext(, djClient);
+
+ await waitFor(() => {
+ expect(screen.getByText('2 measures')).toBeInTheDocument();
+ expect(screen.getByText('1 measure')).toBeInTheDocument();
+ });
+ });
+
+ it('renders metric count badge when related metrics exist', async () => {
+ const djClient = createMockDjClient();
+
+ renderWithContext(, djClient);
+
+ await waitFor(() => {
+ expect(screen.getByText('2 metrics')).toBeInTheDocument();
+ });
+ });
+
+ it('renders status badges', async () => {
+ const djClient = createMockDjClient();
+
+ renderWithContext(, djClient);
+
+ await waitFor(() => {
+ expect(screen.getByText('Active')).toBeInTheDocument();
+ expect(screen.getByText('Pending')).toBeInTheDocument();
+ });
+ });
+
+ it('renders schedule in human-readable format', async () => {
+ const djClient = createMockDjClient();
+
+ renderWithContext(, djClient);
+
+ await waitFor(() => {
+ // Schedule appears in both header and config section, so use getAllByText
+ const scheduleElements = screen.getAllByText(/at 12:00 am/i);
+ expect(scheduleElements.length).toBeGreaterThan(0);
+ });
+ });
+ });
+
+ describe('Expanded Details', () => {
+ it('expands first pre-agg by default', async () => {
+ const djClient = createMockDjClient();
+
+ renderWithContext(, djClient);
+
+ await waitFor(() => {
+ // Config section should be visible for the first expanded preagg
+ expect(screen.getByText('Config')).toBeInTheDocument();
+ expect(screen.getByText('Grain')).toBeInTheDocument();
+ expect(screen.getByText('Measures')).toBeInTheDocument();
+ });
+ });
+
+ it('shows Config section with strategy and schedule', async () => {
+ const djClient = createMockDjClient();
+
+ renderWithContext(, djClient);
+
+ await waitFor(() => {
+ expect(screen.getByText('Strategy')).toBeInTheDocument();
+ expect(screen.getByText('Full')).toBeInTheDocument();
+ expect(screen.getByText('Schedule')).toBeInTheDocument();
+ });
+ });
+
+ it('shows Grain section with dimension links', async () => {
+ const djClient = createMockDjClient();
+
+ renderWithContext(, djClient);
+
+ await waitFor(() => {
+ // Full grain column names should appear in expanded section
+ const grainBadges = screen.getAllByText(
+ /default\.(date_dim|customer_dim)\./,
+ );
+ expect(grainBadges.length).toBeGreaterThan(0);
+ });
+ });
+
+ it('shows Measures table with aggregation and merge info', async () => {
+ const djClient = createMockDjClient();
+
+ renderWithContext(, djClient);
+
+ await waitFor(() => {
+ expect(screen.getByText('total_revenue')).toBeInTheDocument();
+ expect(screen.getByText('SUM(revenue)')).toBeInTheDocument();
+ });
+ });
+
+ it('shows metrics that use each measure', async () => {
+ const djClient = createMockDjClient();
+
+ renderWithContext(, djClient);
+
+ await waitFor(() => {
+ // Display names should appear
+ expect(screen.getByText('Revenue')).toBeInTheDocument();
+ expect(screen.getByText('Order Count')).toBeInTheDocument();
+ });
+ });
+
+ it('toggles expansion when clicking row header', async () => {
+ // Use single preagg to simplify test
+ const singlePreagg = {
+ items: [mockPreaggs.items[0]],
+ total: 1,
+ limit: 50,
+ offset: 0,
+ };
+ const djClient = createMockDjClient(singlePreagg);
+
+ renderWithContext(, djClient);
+
+ // Wait for initial render with expanded state
+ await waitFor(() => {
+ expect(screen.getByText('Config')).toBeInTheDocument();
+ });
+
+ // Find and click the row header to collapse it
+ const measureText = screen.getByText('2 measures');
+ fireEvent.click(measureText.closest('.preagg-row-header'));
+
+ // After collapse, Config should no longer be visible
+ await waitFor(
+ () => {
+ expect(screen.queryByText('Config')).not.toBeInTheDocument();
+ },
+ { timeout: 2000 },
+ );
+ });
+ });
+
+ describe('Workflow Actions', () => {
+ it('renders workflow button with capitalized label', async () => {
+ const djClient = createMockDjClient();
+
+ renderWithContext(, djClient);
+
+ await waitFor(() => {
+ const scheduledBtn = screen.getByText('Scheduled');
+ expect(scheduledBtn).toBeInTheDocument();
+ expect(scheduledBtn.closest('a')).toHaveAttribute(
+ 'href',
+ 'http://scheduler/workflow/123.main',
+ );
+ });
+ });
+
+ it('renders deactivate button for active workflows', async () => {
+ const djClient = createMockDjClient();
+
+ renderWithContext(, djClient);
+
+ await waitFor(() => {
+ expect(screen.getByText('Deactivate')).toBeInTheDocument();
+ });
+ });
+
+ it('calls deactivatePreaggWorkflow when deactivate is clicked', async () => {
+ const djClient = createMockDjClient();
+ window.confirm = jest.fn(() => true);
+
+ renderWithContext(, djClient);
+
+ await waitFor(
+ () => {
+ expect(screen.getByText('Deactivate')).toBeInTheDocument();
+ },
+ { timeout: 2000 },
+ );
+
+ fireEvent.click(screen.getByText('Deactivate'));
+
+ await waitFor(
+ () => {
+ expect(
+ djClient.DataJunctionAPI.deactivatePreaggWorkflow,
+ ).toHaveBeenCalledWith(1);
+ },
+ { timeout: 2000 },
+ );
+ });
+ });
+
+ describe('Stale Pre-aggregations', () => {
+ it('shows stale version warning in row header', async () => {
+ const djClient = createMockDjClient(mockPreaggsWithStale);
+
+ renderWithContext(, djClient);
+
+ await waitFor(() => {
+ expect(screen.getByText('was v0.9')).toBeInTheDocument();
+ });
+ });
+
+ it('shows "Deactivate All Stale" button when active stale workflows exist', async () => {
+ const djClient = createMockDjClient(mockPreaggsWithStale);
+
+ renderWithContext(, djClient);
+
+ await waitFor(() => {
+ expect(screen.getByText('Deactivate All Stale')).toBeInTheDocument();
+ });
+ });
+
+ it('calls bulkDeactivatePreaggWorkflows when "Deactivate All Stale" is clicked', async () => {
+ const djClient = createMockDjClient(mockPreaggsWithStale);
+ window.confirm = jest.fn(() => true);
+
+ renderWithContext(, djClient);
+
+ await waitFor(
+ () => {
+ expect(screen.getByText('Deactivate All Stale')).toBeInTheDocument();
+ },
+ { timeout: 2000 },
+ );
+
+ fireEvent.click(screen.getByText('Deactivate All Stale'));
+
+ await waitFor(
+ () => {
+ expect(
+ djClient.DataJunctionAPI.bulkDeactivatePreaggWorkflows,
+ ).toHaveBeenCalledWith('default.orders_fact', true);
+ },
+ { timeout: 2000 },
+ );
+ });
+ });
+
+ describe('Grain Truncation', () => {
+ it('shows "+N more" button when grain has more than MAX_VISIBLE_GRAIN columns', async () => {
+ const manyGrainPreaggs = {
+ items: [
+ {
+ ...mockPreaggs.items[0],
+ grain_columns: [
+ 'default.dim1.col1',
+ 'default.dim2.col2',
+ 'default.dim3.col3',
+ 'default.dim4.col4',
+ 'default.dim5.col5',
+ 'default.dim6.col6',
+ 'default.dim7.col7',
+ 'default.dim8.col8',
+ 'default.dim9.col9',
+ 'default.dim10.col10',
+ 'default.dim11.col11',
+ 'default.dim12.col12',
+ ],
+ },
+ ],
+ total: 1,
+ limit: 50,
+ offset: 0,
+ };
+ const djClient = createMockDjClient(manyGrainPreaggs);
+
+ renderWithContext(, djClient);
+
+ await waitFor(() => {
+ // Should show "+N more" since there are 12 columns and MAX_VISIBLE_GRAIN is 10
+ expect(screen.getByText('+2 more')).toBeInTheDocument();
+ });
+ });
+
+ it('shows "Show less" after expanding grain list', async () => {
+ const manyGrainPreaggs = {
+ items: [
+ {
+ ...mockPreaggs.items[0],
+ grain_columns: [
+ 'default.dim1.col1',
+ 'default.dim2.col2',
+ 'default.dim3.col3',
+ 'default.dim4.col4',
+ 'default.dim5.col5',
+ 'default.dim6.col6',
+ 'default.dim7.col7',
+ 'default.dim8.col8',
+ 'default.dim9.col9',
+ 'default.dim10.col10',
+ 'default.dim11.col11',
+ 'default.dim12.col12',
+ ],
+ },
+ ],
+ total: 1,
+ limit: 50,
+ offset: 0,
+ };
+ const djClient = createMockDjClient(manyGrainPreaggs);
+
+ renderWithContext(, djClient);
+
+ await waitFor(
+ () => {
+ expect(screen.getByText('+2 more')).toBeInTheDocument();
+ },
+ { timeout: 2000 },
+ );
+
+ // Click to expand
+ fireEvent.click(screen.getByText('+2 more'));
+
+ await waitFor(
+ () => {
+ expect(screen.getByText('Show less')).toBeInTheDocument();
+ // All columns should now be visible
+ expect(screen.getByText('default.dim12.col12')).toBeInTheDocument();
+ },
+ { timeout: 2000 },
+ );
+ });
+ });
+
+ describe('API Integration', () => {
+ it('calls listPreaggs with include_stale=true', async () => {
+ const djClient = createMockDjClient();
+
+ renderWithContext(, djClient);
+
+ await waitFor(() => {
+ expect(djClient.DataJunctionAPI.listPreaggs).toHaveBeenCalledWith({
+ node_name: 'default.orders_fact',
+ include_stale: true,
+ });
+ });
+ });
+
+ it('refreshes list after deactivate action', async () => {
+ const djClient = createMockDjClient();
+ window.confirm = jest.fn(() => true);
+
+ renderWithContext(, djClient);
+
+ await waitFor(
+ () => {
+ expect(screen.getByText('Deactivate')).toBeInTheDocument();
+ },
+ { timeout: 2000 },
+ );
+
+ fireEvent.click(screen.getByText('Deactivate'));
+
+ await waitFor(
+ () => {
+ // Should be called twice: initial load + refresh after deactivate
+ expect(djClient.DataJunctionAPI.listPreaggs).toHaveBeenCalledTimes(2);
+ },
+ { timeout: 2000 },
+ );
+ });
+ });
+});
diff --git a/datajunction-ui/src/app/pages/NodePage/index.jsx b/datajunction-ui/src/app/pages/NodePage/index.jsx
index 71c30f283..960dca292 100644
--- a/datajunction-ui/src/app/pages/NodePage/index.jsx
+++ b/datajunction-ui/src/app/pages/NodePage/index.jsx
@@ -11,6 +11,7 @@ import NotebookDownload from './NotebookDownload';
import DJClientContext from '../../providers/djclient';
import NodeValidateTab from './NodeValidateTab';
import NodeMaterializationTab from './NodeMaterializationTab';
+import NodePreAggregationsTab from './NodePreAggregationsTab';
import ClientCodePopover from './ClientCodePopover';
import WatchButton from './WatchNodeButton';
import NodesWithDimension from './NodesWithDimension';
@@ -131,7 +132,14 @@ export function NodePage() {
tabToDisplay = ;
break;
case 'materializations':
- tabToDisplay = ;
+ // Cube nodes use cube-specific materialization tab
+ // Other nodes (transform, metric, dimension) use pre-aggregations tab
+ tabToDisplay =
+ node?.type === 'cube' ? (
+
+ ) : (
+
+ );
break;
case 'linked':
tabToDisplay = ;
diff --git a/datajunction-ui/src/app/services/DJService.js b/datajunction-ui/src/app/services/DJService.js
index 95f7f985d..98a805629 100644
--- a/datajunction-ui/src/app/services/DJService.js
+++ b/datajunction-ui/src/app/services/DJService.js
@@ -1895,6 +1895,7 @@ export const DataJunctionAPI = {
if (filters.grain_mode) params.append('grain_mode', filters.grain_mode);
if (filters.measures) params.append('measures', filters.measures);
if (filters.status) params.append('status', filters.status);
+ if (filters.include_stale) params.append('include_stale', 'true');
return await (
await fetch(`${DJ_URL}/preaggs/?${params}`, {
@@ -2050,6 +2051,31 @@ export const DataJunctionAPI = {
return result;
},
+ // Bulk deactivate pre-aggregation workflows for a node
+ bulkDeactivatePreaggWorkflows: async function (nodeName, staleOnly = false) {
+ const params = new URLSearchParams();
+ params.append('node_name', nodeName);
+ if (staleOnly) params.append('stale_only', 'true');
+
+ const response = await fetch(`${DJ_URL}/preaggs/workflows?${params}`, {
+ method: 'DELETE',
+ credentials: 'include',
+ });
+ const result = await response.json();
+ if (!response.ok) {
+ return {
+ ...result,
+ _error: true,
+ _status: response.status,
+ message:
+ result.message ||
+ result.detail ||
+ 'Failed to bulk deactivate workflows',
+ };
+ }
+ return result;
+ },
+
// Get cube details including materializations
getCubeDetails: async function (cubeName) {
const response = await fetch(`${DJ_URL}/cubes/${cubeName}`, {
diff --git a/datajunction-ui/src/styles/preaggregations.css b/datajunction-ui/src/styles/preaggregations.css
new file mode 100644
index 000000000..f776e67f1
--- /dev/null
+++ b/datajunction-ui/src/styles/preaggregations.css
@@ -0,0 +1,547 @@
+/**
+ * Pre-aggregations Tab Styles
+ *
+ * Reusable CSS classes for the pre-aggregations UI components.
+ */
+
+/* =============================================================================
+ Layout
+ ============================================================================= */
+
+.preagg-container {
+ padding: 10px 0;
+}
+
+.preagg-section {
+ margin-bottom: 30px;
+}
+
+.preagg-two-column {
+ display: grid;
+ grid-template-columns: 1fr 1fr;
+ gap: 16px;
+}
+
+.preagg-stack {
+ display: flex;
+ flex-direction: column;
+ gap: 1.5em;
+}
+
+/* =============================================================================
+ Section Headers
+ ============================================================================= */
+
+.preagg-section-header {
+ display: flex;
+ align-items: center;
+ margin-bottom: 16px;
+ border-bottom: 2px solid #e5e7eb;
+ padding-bottom: 8px;
+}
+
+.preagg-section-header--stale {
+ border-bottom-color: #fcd34d;
+ justify-content: space-between;
+}
+
+.preagg-section-title {
+ margin: 0;
+ font-size: 16px;
+ font-weight: 600;
+ color: #374151;
+}
+
+.preagg-section-title--stale {
+ color: #92400e;
+}
+
+.preagg-section-count {
+ margin-left: 12px;
+ font-size: 13px;
+ color: #6b7280;
+}
+
+.preagg-section-count--stale {
+ color: #b45309;
+}
+
+/* =============================================================================
+ Pre-agg Row (Card Container)
+ ============================================================================= */
+
+.preagg-row {
+ border: 1px solid #e0e0e0;
+ border-radius: 8px;
+ margin-bottom: 10px;
+ background-color: #fff;
+}
+
+.preagg-row--stale {
+ background-color: #fffbeb;
+}
+
+/* Collapsed Header */
+.preagg-row-header {
+ display: flex;
+ align-items: center;
+ padding: 12px 16px;
+ cursor: pointer;
+ gap: 12px;
+}
+
+.preagg-row-toggle {
+ font-size: 14px;
+ color: #666;
+}
+
+.preagg-row-grain-chips {
+ display: flex;
+ align-items: center;
+ gap: 6px;
+ min-width: 180px;
+}
+
+.preagg-grain-chip {
+ padding: 2px 8px;
+ background-color: #f1f5f9;
+ border-radius: 4px;
+ color: #475569;
+ font-size: 12px;
+ font-weight: 500;
+ font-family: monospace;
+}
+
+.preagg-grain-chip--more {
+ background-color: #e2e8f0;
+ color: #64748b;
+}
+
+.preagg-row-measures {
+ font-size: 12px;
+ color: #563a12;
+ background: #fff6e9;
+ border-radius: 8px;
+ padding: 2px 8px;
+}
+
+.preagg-row-schedule {
+ font-size: 12px;
+ color: #888;
+}
+
+.preagg-row-version {
+ font-size: 12px;
+ color: #b45309;
+ font-style: italic;
+}
+
+/* Expanded Details */
+.preagg-details {
+ padding: 20px;
+ border-top: 1px solid #e0e0e0;
+ background-color: #f8fafc;
+}
+
+.preagg-details--stale {
+ background-color: #fefce8;
+}
+
+/* =============================================================================
+ Stale Warning Banner
+ ============================================================================= */
+
+.preagg-stale-banner {
+ background-color: #fef3c7;
+ border: 1px solid #fcd34d;
+ border-radius: 8px;
+ padding: 12px 16px;
+ margin-bottom: 20px;
+ font-size: 13px;
+ display: flex;
+ align-items: center;
+ gap: 10px;
+}
+
+.preagg-stale-banner-icon {
+ font-size: 18px;
+}
+
+.preagg-stale-banner-text {
+ color: #78350f;
+}
+
+/* =============================================================================
+ Card Boxes (Config, Grain, etc.)
+ ============================================================================= */
+
+.preagg-card {
+ background-color: #ffffff;
+ border-radius: 8px;
+ /* border: 1px solid #e2e8f0; */
+ padding: 16px;
+ height: fit-content;
+ box-sizing: border-box;
+}
+
+.preagg-card--compact {
+ padding: 12px 16px;
+}
+
+.preagg-card-label {
+ font-size: 12px;
+ font-weight: 600;
+ color: #64748b;
+ text-transform: uppercase;
+ letter-spacing: 0.05em;
+ margin-bottom: 8px;
+}
+
+.preagg-card-label--with-info {
+ display: flex;
+ align-items: center;
+ gap: 6px;
+}
+
+/* =============================================================================
+ Config Table
+ ============================================================================= */
+
+.preagg-config-table {
+ font-size: 13px;
+ border-collapse: collapse;
+ width: 100%;
+}
+
+.preagg-config-key {
+ padding: 4px 12px 4px 0;
+ color: #64748b;
+ font-weight: 500;
+ white-space: nowrap;
+ width: 100px;
+}
+
+.preagg-config-value {
+ padding: 4px 0;
+ color: #1e293b;
+}
+
+.preagg-config-value code {
+ font-size: 12px;
+ background-color: #f1f5f9;
+ padding: 2px 6px;
+ border-radius: 4px;
+}
+
+.preagg-config-schedule-cron {
+ margin-left: 6px;
+ font-size: 11px;
+ color: #94a3b8;
+ font-family: monospace;
+}
+
+/* =============================================================================
+ Actions
+ ============================================================================= */
+
+.preagg-actions {
+ display: flex;
+ flex-wrap: wrap;
+ gap: 8px;
+ padding-top: 12px;
+ margin-top: 12px;
+ border-top: 1px solid #e2e8f0;
+}
+
+.preagg-action-btn {
+ display: inline-flex;
+ align-items: center;
+ padding: 5px 10px;
+ background-color: #ffffff;
+ border: 1px solid #e2e8f0;
+ border-radius: 6px;
+ color: #475569;
+ font-size: 12px;
+ font-weight: 500;
+ text-decoration: none;
+ cursor: pointer;
+}
+
+.preagg-action-btn:hover {
+ background-color: #f8fafc;
+ text-decoration: none;
+}
+
+.preagg-action-btn--danger {
+ border-color: #fecaca;
+ color: #dc2626;
+}
+
+.preagg-action-btn--danger:hover {
+ background-color: #fef2f2;
+}
+
+.preagg-action-btn--danger-fill {
+ background-color: #fee2e2;
+ border-color: #fca5a5;
+ color: #991b1b;
+}
+
+.preagg-action-btn:disabled {
+ cursor: not-allowed;
+ opacity: 0.7;
+}
+
+/* =============================================================================
+ Badges
+ ============================================================================= */
+
+/* Base badge */
+.preagg-badge {
+ padding: 4px 10px;
+ border-radius: 4px;
+ font-size: 12px;
+ font-weight: 500;
+ text-decoration: none;
+ display: inline-block;
+}
+
+/* Status badges (pill style) */
+.preagg-status-badge {
+ padding: 2px 8px;
+ border-radius: 12px;
+ font-size: 12px;
+}
+
+.preagg-status-badge--active {
+ background-color: #dcfce7;
+ color: #166534;
+}
+
+.preagg-status-badge--paused {
+ background-color: #fef3c7;
+ color: #92400e;
+}
+
+.preagg-status-badge--pending {
+ background-color: #f3f4f6;
+ color: #6b7280;
+}
+
+/* Metric count badge (in header row) */
+.preagg-metric-count-badge {
+ font-size: 12px;
+ color: #be123c;
+ background-color: #fff1f2;
+ padding: 2px 8px;
+ border-radius: 12px;
+}
+
+/* Grain badge */
+.preagg-grain-badge,
+.preagg-grain-badge:hover {
+ padding: 4px 10px;
+ background-color: #f1f5f9;
+ border-radius: 4px;
+ color: #1e40af;
+ font-size: 12px;
+ font-weight: 500;
+ text-decoration: none;
+ font-family: monospace;
+}
+
+.preagg-grain-badge:hover {
+ background-color: #e2e8f0;
+ text-decoration: none;
+}
+
+/* Aggregation badge (blue) */
+.preagg-agg-badge {
+ background-color: #dbeafe;
+ padding: 4px 10px;
+ border-radius: 4px;
+ color: #1e40af;
+ font-size: 12px;
+ font-weight: 500;
+}
+
+/* Merge badge (green) */
+.preagg-merge-badge {
+ background-color: #dcfce7;
+ padding: 4px 10px;
+ border-radius: 4px;
+ color: #166534;
+ font-size: 12px;
+ font-weight: 500;
+}
+
+/* Rule badge (gray) */
+.preagg-rule-badge {
+ color: #475569;
+ background-color: #f1f5f9;
+ padding: 4px 8px;
+ border-radius: 4px;
+ font-size: 11px;
+ font-weight: 500;
+}
+
+/* Metric badge (red/rose) */
+.preagg-metric-badge {
+ font-size: 11px;
+ color: #be123c;
+ background-color: #fff1f2;
+ padding: 3px 8px;
+ border-radius: 4px;
+ text-decoration: none;
+ border: 1px solid #fecdd3;
+ font-weight: 500;
+}
+
+.preagg-metric-badge:hover {
+ background-color: #ffe4e6;
+ text-decoration: none;
+}
+
+/* Expand/collapse button (used in grain section) */
+.preagg-expand-btn {
+ padding: 4px 10px;
+ background-color: #f1f5f9;
+ border-radius: 4px;
+ color: #64748b;
+ font-size: 12px;
+ font-weight: 500;
+ border: none;
+ cursor: pointer;
+}
+
+.preagg-expand-btn:hover {
+ background-color: #e2e8f0;
+}
+
+/* =============================================================================
+ Grain List
+ ============================================================================= */
+
+.preagg-grain-list {
+ display: flex;
+ flex-wrap: wrap;
+ gap: 8px;
+}
+
+/* =============================================================================
+ Measures Table
+ ============================================================================= */
+
+.preagg-measures-table {
+ width: 100%;
+ font-size: 13px;
+ border-collapse: collapse;
+}
+
+.preagg-measures-table thead {
+ background-color: #fafafa;
+ border-bottom: 1px solid #e2e8f0;
+}
+
+.preagg-measures-table th {
+ padding: 10px 16px;
+ text-align: left;
+ font-weight: 500;
+ color: #64748b;
+ font-size: 12px;
+}
+
+.preagg-measures-table td {
+ padding: 12px 16px;
+}
+
+.preagg-measures-table tbody tr {
+ border-bottom: 1px solid #f1f5f9;
+}
+
+.preagg-measures-table tbody tr:last-child {
+ border-bottom: none;
+}
+
+.preagg-measure-name {
+ font-weight: 500;
+ color: #1e293b;
+ font-family: monospace;
+ font-size: 12px;
+}
+
+.preagg-metrics-list {
+ display: flex;
+ flex-wrap: wrap;
+ gap: 6px;
+}
+
+/* =============================================================================
+ Info Icon
+ ============================================================================= */
+
+.preagg-info-icon {
+ cursor: help;
+ color: #94a3b8;
+ font-weight: normal;
+ margin-left: 4px;
+}
+
+/* =============================================================================
+ Empty State
+ ============================================================================= */
+
+.preagg-empty {
+ padding: 16px;
+ background-color: #f9fafb;
+ border-radius: 8px;
+ color: #6b7280;
+ font-size: 14px;
+}
+
+/* =============================================================================
+ Loading & Error States
+ ============================================================================= */
+
+.preagg-loading {
+ padding: 20px;
+ text-align: center;
+ color: #666;
+}
+
+.preagg-error {
+ padding: 20px;
+ margin: 20px 0;
+}
+
+.preagg-no-data {
+ padding: 20px;
+}
+
+.preagg-no-data-alert {
+ margin-bottom: 20px;
+ padding: 16px;
+}
+
+.preagg-no-data-text {
+ font-size: 14px;
+ color: #666;
+}
+
+/* =============================================================================
+ Section Header (Stale section - left side)
+ ============================================================================= */
+
+.preagg-section-header-left {
+ display: flex;
+ align-items: center;
+}
+
+/* =============================================================================
+ Card Modifier for Tables
+ ============================================================================= */
+
+.preagg-card--table {
+ padding: 0;
+ overflow: hidden;
+}