Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ local = [
"sqlalchemy>=2.0.0,<3.0.0",
"sqlalchemy_utils",
"sqlmodel==0.0.18",
"croniter>=6.0.0",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also this adds two more dependencies (cronitor and humanize) to ZenML, I'm not sure it's worth it for this check?

Wouldn't the following implementation achieve the same thing:

  • Create the schedule in our DB (we already do this)
  • Pass it to the orchestrator to create the schedule (we already do this)
  • If the orchestrator fails to create the schedule (they already do some validation here), delete it from our DB (we don't do this yet)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most probably I will need croniter either way for the following (if implemented):

  1. To validate frequency (if we apply a validation on cron frequency).
  2. To calculate next occurrence (native scheduling).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the idea of the alternative algorithm it aligns also with your comment regarding custom cron systax.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For native scheduling, we can at least keep the dependency server-side only which I think is much nicer as it doesn't affect client envs and docker images built for pipeline execution.

]
server = [
"zenml[local]", # Includes all the DB dependencies
Expand Down
11 changes: 11 additions & 0 deletions src/zenml/config/schedule.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
)

from zenml.logger import get_logger
from zenml.utils.time_utils import validate_cron_expression

logger = get_logger(__name__)

Expand Down Expand Up @@ -92,6 +93,16 @@ def _ensure_timezone(

return value

@field_validator("cron_expression", mode="before")
@classmethod
def _validate_cron_expression(cls, value: str | None) -> str | None:
if value is None:
return value
elif validate_cron_expression(expr=value):
return value
else:
raise ValueError(f"Cron expression {value} is not valid.")

@model_validator(mode="after")
def _ensure_cron_or_periodic_schedule_configured(self) -> "Schedule":
"""Ensures that the cron expression or start time + interval are set.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@

from kubernetes import client as k8s_client
from kubernetes import config as k8s_config
from kubernetes.client import ApiException

from zenml.config.base_settings import BaseSettings
from zenml.constants import (
Expand Down Expand Up @@ -1216,10 +1217,25 @@ def delete_schedule(self, schedule: "ScheduleResponse") -> None:
cron_job_name = schedule.run_metadata.get(
KUBERNETES_CRON_JOB_METADATA_KEY
)

if not cron_job_name:
raise RuntimeError("Unable to find cron job name for schedule.")
logger.warning(
"Unable to find cron job %s for schedule %.",
cron_job_name,
schedule.name,
)
return

self._k8s_batch_api.delete_namespaced_cron_job(
name=cron_job_name,
namespace=self.config.kubernetes_namespace,
)
try:
self._k8s_batch_api.delete_namespaced_cron_job(
name=cron_job_name,
namespace=self.config.kubernetes_namespace,
)
except ApiException as e:
if e.status == 404:
logger.warning(
"Unable to find cron job for schedule %s.", schedule.name
)
return
else:
raise e
22 changes: 21 additions & 1 deletion src/zenml/models/v2/core/schedule.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
ProjectScopedResponseMetadata,
ProjectScopedResponseResources,
)
from zenml.utils.time_utils import to_utc_timezone
from zenml.utils.time_utils import to_utc_timezone, validate_cron_expression

logger = get_logger(__name__)

Expand Down Expand Up @@ -76,6 +76,16 @@ def _ensure_tzunaware_utc(

return value

@field_validator("cron_expression", mode="before")
@classmethod
def _validate_cron_expression(cls, value: str | None) -> str | None:
if value is None:
return value
elif validate_cron_expression(expr=value):
return value
else:
raise ValueError(f"Cron expression {value} is not valid.")

@model_validator(mode="after")
def _ensure_cron_or_periodic_schedule_configured(
self,
Expand Down Expand Up @@ -129,6 +139,16 @@ class ScheduleUpdate(BaseUpdate):
name: Optional[str] = None
cron_expression: Optional[str] = None

@field_validator("cron_expression", mode="before")
@classmethod
def _validate_cron_expression(cls, value: str | None) -> str | None:
if value is None:
return value
elif validate_cron_expression(expr=value):
return value
else:
raise ValueError(f"Cron expression {value} is not valid.")


# ------------------ Response Model ------------------

Expand Down
38 changes: 38 additions & 0 deletions src/zenml/utils/time_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
from datetime import datetime, timedelta, timezone
from typing import Optional, Union

from croniter import CroniterBadCronError, CroniterBadDateError, croniter


def utc_now(tz_aware: Union[bool, datetime] = False) -> datetime:
"""Get the current time in the UTC timezone.
Expand Down Expand Up @@ -136,3 +138,39 @@ def expires_in(
if expires_at < now:
return expired_str
return seconds_to_human_readable(int((expires_at - now).total_seconds()))


def validate_cron_expression(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remember that some of the orchestrators we use support a custom cron syntax (or at least supported). This is probably something that we have to take into account here?

expr: str, *, base_time: Optional[datetime] = None
) -> bool:
"""Validate a standard cron expression using croniter.

Args:
expr: Cron expression string to validate.
base_time: Base datetime for croniter parsing. If not provided, uses now.

Returns:
True if croniter can parse the expression, False otherwise.
"""
if not isinstance(expr, str) or not expr.strip():
return False

fields = expr.strip().split()
if len(fields) != 5:
return False

try:
bt = base_time or datetime.now()

for token in fields:
if "-" in token:
parts = token.split("-")
if len(parts) != 2:
return False
elif int(parts[0]) >= int(parts[1]):
return False

croniter(expr, bt)
return True
except (CroniterBadCronError, CroniterBadDateError, ValueError):
return False
90 changes: 90 additions & 0 deletions tests/unit/models/test_schedule.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import uuid
from datetime import datetime, timedelta, timezone
from zoneinfo import ZoneInfo

import pytest
from pydantic import ValidationError

from zenml.models.v2.core.schedule import ScheduleRequest, ScheduleUpdate


def test_schedule_request_object_validations():
ScheduleRequest(
project=uuid.uuid4(),
name="daily schedule",
cron_expression="* * * * *",
active=True,
orchestrator_id=uuid.uuid4(),
pipeline_id=uuid.uuid4(),
)

# check cron validity

with pytest.raises(ValidationError):
ScheduleRequest(
project=uuid.uuid4(),
name="daily schedule",
cron_expression="60 * * * *",
active=True,
orchestrator_id=uuid.uuid4(),
pipeline_id=uuid.uuid4(),
)

# check missing schedule options

with pytest.raises(ValidationError):
ScheduleRequest(
project=uuid.uuid4(),
name="daily schedule",
active=True,
orchestrator_id=uuid.uuid4(),
pipeline_id=uuid.uuid4(),
start_time=datetime.now(tz=timezone.utc),
)

# check datetime utc conversions

schedule = ScheduleRequest(
interval_second=timedelta(minutes=60),
project=uuid.uuid4(),
name="daily schedule",
active=True,
orchestrator_id=uuid.uuid4(),
pipeline_id=uuid.uuid4(),
start_time=datetime(
year=2025,
month=1,
day=1,
hour=12,
minute=0,
tzinfo=ZoneInfo("Europe/Berlin"),
),
end_time=datetime(
year=2025,
month=1,
day=10,
hour=12,
minute=0,
tzinfo=ZoneInfo("Europe/Berlin"),
),
)

assert schedule.start_time.hour == 11
assert schedule.end_time.hour == 11
assert schedule.start_time.tzinfo is None
assert schedule.end_time.tzinfo is None


def test_schedule_update_object_validations():
ScheduleUpdate(
name="daily schedule",
cron_expression="* * * * *",
)

# check cron validity

with pytest.raises(ValidationError):
ScheduleUpdate(
name="daily schedule",
cron_expression="60 * * * *",
)
34 changes: 34 additions & 0 deletions tests/unit/utils/test_time_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from zenml.utils.time_utils import validate_cron_expression


def test_valid_cron_expressions_pass() -> None:
valid = [
"* * * * *",
"*/5 0 * * 1-5",
"0 12 * * 0",
"15,30,45 9-17 * * 1-5",
"0 0 1 1 *",
]
for expr in valid:
assert validate_cron_expression(expr), f"Expected valid: {expr}"


def test_invalid_cron_expressions_fail() -> None:
invalid = [
None,
"",
"* * * *",
"* * * * * *",
"60 * * * *",
"* 24 * * *",
"* * 0 * *",
"* * * 13 *",
"* * * * 8",
"*/0 * * * *",
"5-3 * * * *",
"MON * * * *",
"@daily",
"15,30,45 9-a * * 1-5",
]
for expr in invalid:
assert not validate_cron_expression(expr), f"Expected invalid: {expr}"
Loading