diff --git a/frontend/src/hooks/useStreamState.ts b/frontend/src/hooks/useStreamState.ts index 68e7f3e3..5fa29077 100644 --- a/frontend/src/hooks/useStreamState.ts +++ b/frontend/src/hooks/useStreamState.ts @@ -111,19 +111,16 @@ export function useStreamState() { ); // Check if a pipeline supports noise controls in video mode - // Derived from schema: if video mode has noise_scale defined, noise controls are supported + // Derived from schema: only show if video mode explicitly defines noise_scale with a value const supportsNoiseControls = useCallback( (pipelineId: PipelineId): boolean => { const schema = pipelineSchemas?.pipelines[pipelineId]; if (schema?.mode_defaults?.video) { - // Check if video mode explicitly defines noise_scale (not null/undefined) - return schema.mode_defaults.video.noise_scale !== undefined; + // Check if video mode explicitly defines noise_scale with a non-null value + const noiseScale = schema.mode_defaults.video.noise_scale; + return noiseScale !== undefined && noiseScale !== null; } - // Fallback: check if schema has noise_scale property at all - if (schema?.config_schema?.properties?.noise_scale) { - return true; - } - // If schemas haven't loaded yet, return false (controls will appear once schemas load) + // If video mode doesn't define noise_scale, don't show noise controls return false; }, [pipelineSchemas] diff --git a/src/scope/core/pipelines/__init__.py b/src/scope/core/pipelines/__init__.py index 9ea63303..006d8e4a 100644 --- a/src/scope/core/pipelines/__init__.py +++ b/src/scope/core/pipelines/__init__.py @@ -26,7 +26,7 @@ def __getattr__(name): return PassthroughPipeline # Config classes elif name == "BasePipelineConfig": - from .schema import BasePipelineConfig + from .base_schema import BasePipelineConfig return BasePipelineConfig elif name == "LongLiveConfig": @@ -45,6 +45,14 @@ def __getattr__(name): from .schema import PassthroughConfig return PassthroughConfig + elif name == "RewardForcingConfig": + from .schema import RewardForcingConfig + + return RewardForcingConfig + elif name == "MemFlowConfig": + from .schema import MemFlowConfig + + return MemFlowConfig raise AttributeError(f"module {__name__!r} has no attribute {name!r}") @@ -61,4 +69,6 @@ def __getattr__(name): "StreamDiffusionV2Config", "KreaRealtimeVideoConfig", "PassthroughConfig", + "RewardForcingConfig", + "MemFlowConfig", ] diff --git a/src/scope/core/pipelines/base_schema.py b/src/scope/core/pipelines/base_schema.py new file mode 100644 index 00000000..e06c52dd --- /dev/null +++ b/src/scope/core/pipelines/base_schema.py @@ -0,0 +1,315 @@ +"""Base Pydantic schema models for pipeline configuration. + +This module provides the base Pydantic models for pipeline configuration. +Pipeline-specific configs should import from this module to avoid circular imports. + +Pipeline-specific configs inherit from BasePipelineConfig and override defaults. +Each pipeline defines its supported modes and can provide mode-specific defaults. + +Child classes can override field defaults with type-annotated assignments: + height: int = 320 + width: int = 576 + denoising_steps: list[int] = [1000, 750, 500, 250] +""" + +from typing import Annotated, Any, ClassVar, Literal + +from pydantic import BaseModel, ConfigDict, Field +from pydantic.fields import FieldInfo + + +# Field templates - use these to override defaults while keeping constraints/descriptions +def height_field(default: int = 512) -> FieldInfo: + """Height field with standard constraints.""" + return Field(default=default, ge=1, description="Output height in pixels") + + +def width_field(default: int = 512) -> FieldInfo: + """Width field with standard constraints.""" + return Field(default=default, ge=1, description="Output width in pixels") + + +def denoising_steps_field(default: list[int] | None = None) -> FieldInfo: + """Denoising steps field.""" + return Field( + default=default, + description="Denoising step schedule for progressive generation", + ) + + +def noise_scale_field(default: float | None = None) -> FieldInfo: + """Noise scale field with constraints.""" + return Field( + default=default, + ge=0.0, + le=1.0, + description="Amount of noise to add during video generation (video mode only)", + ) + + +def noise_controller_field(default: bool | None = None) -> FieldInfo: + """Noise controller field.""" + return Field( + default=default, + description="Enable dynamic noise control during generation (video mode only)", + ) + + +def input_size_field(default: int | None = 1) -> FieldInfo: + """Input size field with constraints.""" + return Field( + default=default, + ge=1, + description="Expected input video frame count (video mode only)", + ) + + +def ref_images_field(default: list[str] | None = None) -> FieldInfo: + """Reference images field for VACE.""" + return Field( + default=default, + description="List of reference image paths for VACE conditioning", + ) + + +def vace_context_scale_field(default: float = 1.0) -> FieldInfo: + """VACE context scale field with constraints.""" + return Field( + default=default, + ge=0.0, + le=2.0, + description="Scaling factor for VACE hint injection (0.0 to 2.0)", + ) + + +# Type alias for input modes +InputMode = Literal["text", "video"] + + +class ModeDefaults(BaseModel): + """Mode-specific default values. + + Use this to define mode-specific overrides in pipeline schemas. + Only include fields that differ from base defaults. + Set default=True to mark the default mode. + + Example: + modes = { + "text": ModeDefaults(default=True), + "video": ModeDefaults( + height=512, + width=512, + noise_scale=0.7, + noise_controller=True, + ), + } + """ + + model_config = ConfigDict(extra="forbid") + + # Whether this is the default mode + default: bool = False + + # Resolution can differ per mode + height: int | None = None + width: int | None = None + + # Core parameters + denoising_steps: list[int] | None = None + + # Video mode parameters + noise_scale: float | None = None + noise_controller: bool | None = None + input_size: int | None = None + + +class BasePipelineConfig(BaseModel): + """Base configuration for all pipelines. + + This provides common parameters shared across all pipeline modes. + Pipeline-specific configs inherit from this and override defaults. + + Mode support is declared via the `modes` class variable: + modes = { + "text": ModeDefaults(default=True), + "video": ModeDefaults( + height=512, + width=512, + noise_scale=0.7, + ), + } + + Only include fields that differ from base defaults. + Use default=True to mark the default mode. + """ + + model_config = ConfigDict(extra="forbid") + + # Pipeline metadata - not configuration parameters, used for identification + pipeline_id: ClassVar[str] = "base" + pipeline_name: ClassVar[str] = "Base Pipeline" + pipeline_description: ClassVar[str] = "Base pipeline configuration" + pipeline_version: ClassVar[str] = "1.0.0" + docs_url: ClassVar[str | None] = None + estimated_vram_gb: ClassVar[float | None] = None + requires_models: ClassVar[bool] = False + supports_lora: ClassVar[bool] = False + supports_vace: ClassVar[bool] = False + + # UI capability metadata - tells frontend what controls to show + supports_cache_management: ClassVar[bool] = False + supports_kv_cache_bias: ClassVar[bool] = False + supports_quantization: ClassVar[bool] = False + min_dimension: ClassVar[int] = 1 + # Whether this pipeline contains modifications based on the original project + modified: ClassVar[bool] = False + # Recommended quantization based on VRAM: if user's VRAM > this threshold (GB), + # quantization=null is recommended, otherwise fp8_e4m3fn is recommended. + # None means no specific recommendation (pipeline doesn't benefit from quantization). + recommended_quantization_vram_threshold: ClassVar[float | None] = None + + # Mode configuration - keys are mode names, values are ModeDefaults with field overrides + # Use default=True to mark the default mode. Only include fields that differ from base. + modes: ClassVar[dict[str, ModeDefaults]] = {"text": ModeDefaults(default=True)} + + # Prompt and temporal interpolation support + supports_prompts: ClassVar[bool] = True + default_temporal_interpolation_method: ClassVar[Literal["linear", "slerp"]] = ( + "slerp" + ) + default_temporal_interpolation_steps: ClassVar[int] = 0 + + # Resolution settings - use field templates for consistency + height: int = height_field() + width: int = width_field() + + # Core parameters + manage_cache: bool = Field( + default=True, + description="Enable automatic cache management for performance optimization", + ) + base_seed: Annotated[int, Field(ge=0)] = Field( + default=42, + description="Base random seed for reproducible generation", + ) + denoising_steps: list[int] | None = denoising_steps_field() + + # Video mode parameters (None means not applicable/text mode) + noise_scale: Annotated[float, Field(ge=0.0, le=1.0)] | None = noise_scale_field() + noise_controller: bool | None = noise_controller_field() + input_size: int | None = input_size_field() + + # VACE (optional reference image conditioning) + ref_images: list[str] | None = ref_images_field() + vace_context_scale: float = vace_context_scale_field() + + @classmethod + def get_pipeline_metadata(cls) -> dict[str, str]: + """Return pipeline identification metadata. + + Returns: + Dict with id, name, description, version + """ + return { + "id": cls.pipeline_id, + "name": cls.pipeline_name, + "description": cls.pipeline_description, + "version": cls.pipeline_version, + } + + @classmethod + def get_supported_modes(cls) -> list[str]: + """Return list of supported mode names.""" + return list(cls.modes.keys()) + + @classmethod + def get_default_mode(cls) -> str: + """Return the default mode name. + + Returns the mode marked with default=True, or the first mode if none marked. + """ + for mode_name, mode_config in cls.modes.items(): + if mode_config.default: + return mode_name + # Fallback to first mode if none marked as default + return next(iter(cls.modes.keys())) + + @classmethod + def get_defaults_for_mode(cls, mode: InputMode) -> dict[str, Any]: + """Get effective defaults for a specific mode. + + Merges base config defaults with mode-specific overrides. + + Args: + mode: The input mode ("text" or "video") + + Returns: + Dict of parameter names to their effective default values + """ + # Start with base defaults from model fields + base_instance = cls() + defaults = base_instance.model_dump() + + # Apply mode-specific overrides (excluding None values and the "default" flag) + mode_config = cls.modes.get(mode) + if mode_config: + for field_name, value in mode_config.model_dump( + exclude={"default"} + ).items(): + if value is not None: + defaults[field_name] = value + + return defaults + + @classmethod + def get_schema_with_metadata(cls) -> dict[str, Any]: + """Return complete schema with pipeline metadata and JSON schema. + + This is the primary method for API/UI schema generation. + + Returns: + Dict containing pipeline metadata + """ + metadata = cls.get_pipeline_metadata() + metadata["supported_modes"] = cls.get_supported_modes() + metadata["default_mode"] = cls.get_default_mode() + metadata["supports_prompts"] = cls.supports_prompts + metadata["default_temporal_interpolation_method"] = ( + cls.default_temporal_interpolation_method + ) + metadata["default_temporal_interpolation_steps"] = ( + cls.default_temporal_interpolation_steps + ) + metadata["docs_url"] = cls.docs_url + metadata["estimated_vram_gb"] = cls.estimated_vram_gb + metadata["requires_models"] = cls.requires_models + metadata["supports_lora"] = cls.supports_lora + metadata["supports_vace"] = cls.supports_vace + metadata["supports_cache_management"] = cls.supports_cache_management + metadata["supports_kv_cache_bias"] = cls.supports_kv_cache_bias + metadata["supports_quantization"] = cls.supports_quantization + metadata["min_dimension"] = cls.min_dimension + metadata["recommended_quantization_vram_threshold"] = ( + cls.recommended_quantization_vram_threshold + ) + metadata["modified"] = cls.modified + metadata["config_schema"] = cls.model_json_schema() + + # Include mode-specific defaults (excluding None values and the "default" flag) + mode_defaults = {} + for mode_name, mode_config in cls.modes.items(): + overrides = mode_config.model_dump(exclude={"default"}, exclude_none=True) + if overrides: + mode_defaults[mode_name] = overrides + if mode_defaults: + metadata["mode_defaults"] = mode_defaults + + return metadata + + def is_video_mode(self) -> bool: + """Check if this config represents video mode. + + Returns: + True if video mode parameters are set + """ + return self.input_size is not None diff --git a/src/scope/core/pipelines/interface.py b/src/scope/core/pipelines/interface.py index 10102e44..5aa3c6c8 100644 --- a/src/scope/core/pipelines/interface.py +++ b/src/scope/core/pipelines/interface.py @@ -1,6 +1,8 @@ """Base interface for all pipelines.""" +import inspect from abc import ABC, abstractmethod +from pathlib import Path from typing import TYPE_CHECKING import torch @@ -19,14 +21,19 @@ class Requirements(BaseModel): class Pipeline(ABC): """Abstract base class for all pipelines. - Pipelines must implement get_config_class() to return their Pydantic config model. + Pipelines automatically get their config class from schema.yaml in their directory. This enables: - Validation via model_validate() / model_validate_json() - JSON Schema generation via model_json_schema() - Type-safe configuration access - API introspection and automatic UI generation - See schema.py for the BasePipelineConfig model and pipeline-specific configs. + To create a new pipeline: + 1. Create a directory for your pipeline (e.g., my_pipeline/) + 2. Add a schema.yaml with pipeline metadata and defaults + 3. Create pipeline.py with your Pipeline subclass + + See schema.py for the BasePipelineConfig model and available fields. For multi-mode pipeline support (text/video), pipelines use helper functions from defaults.py (resolve_input_mode, apply_mode_defaults_to_state, etc.). """ @@ -35,29 +42,36 @@ class Pipeline(ABC): def get_config_class(cls) -> type["BasePipelineConfig"]: """Return the Pydantic config class for this pipeline. - The config class should inherit from BasePipelineConfig and define: + Automatically loads from schema.yaml in the same directory as the + pipeline subclass. No need to override this method - just provide + a schema.yaml file. + + The config class defines: - pipeline_id: Unique identifier - pipeline_name: Human-readable name - pipeline_description: Capabilities description - - pipeline_version: Version string - Default parameter values for the pipeline Returns: - Pydantic config model class + Pydantic config model class loaded from schema.yaml + """ + from .schema_loader import load_config_from_yaml - Note: - Subclasses should override this method to return their config class. - The default implementation returns BasePipelineConfig. + # Find the directory containing this pipeline subclass + module = inspect.getmodule(cls) + if module is None or module.__file__ is None: + # Fallback to base config if we can't find the module + from .schema import BasePipelineConfig + return BasePipelineConfig - Example: - from .schema import LongLiveConfig + pipeline_dir = Path(module.__file__).parent + schema_path = pipeline_dir / "schema.yaml" - @classmethod - def get_config_class(cls) -> type[BasePipelineConfig]: - return LongLiveConfig - """ - from .schema import BasePipelineConfig + if schema_path.exists(): + return load_config_from_yaml(schema_path) + # Fallback to base config if no schema.yaml found + from .schema import BasePipelineConfig return BasePipelineConfig @abstractmethod diff --git a/src/scope/core/pipelines/krea_realtime_video/pipeline.py b/src/scope/core/pipelines/krea_realtime_video/pipeline.py index 082bf9a9..9d75e33e 100644 --- a/src/scope/core/pipelines/krea_realtime_video/pipeline.py +++ b/src/scope/core/pipelines/krea_realtime_video/pipeline.py @@ -1,6 +1,5 @@ import logging import time -from typing import TYPE_CHECKING import torch from diffusers.modular_pipelines import PipelineState @@ -15,16 +14,12 @@ ) from ..interface import Pipeline, Requirements from ..process import postprocess_chunk -from ..schema import KreaRealtimeVideoConfig from ..utils import Quantization, load_model_config, validate_resolution from ..wan2_1.components import WanDiffusionWrapper, WanTextEncoderWrapper from ..wan2_1.lora.mixin import LoRAEnabledPipeline from ..wan2_1.vae import WanVAEWrapper from .modular_blocks import KreaRealtimeVideoBlocks -if TYPE_CHECKING: - from ..schema import BasePipelineConfig - logger = logging.getLogger(__name__) DEFAULT_DENOISING_STEP_LIST = [1000, 750, 500, 250] @@ -36,10 +31,6 @@ class KreaRealtimeVideoPipeline(Pipeline, LoRAEnabledPipeline): - @classmethod - def get_config_class(cls) -> type["BasePipelineConfig"]: - return KreaRealtimeVideoConfig - def __init__( self, config, diff --git a/src/scope/core/pipelines/krea_realtime_video/schema.yaml b/src/scope/core/pipelines/krea_realtime_video/schema.yaml new file mode 100644 index 00000000..6095c536 --- /dev/null +++ b/src/scope/core/pipelines/krea_realtime_video/schema.yaml @@ -0,0 +1,33 @@ +pipeline_id: "krea-realtime-video" +pipeline_name: "Krea Realtime Video" +pipeline_description: > + A streaming pipeline and autoregressive video diffusion model from Krea. + The model is trained using Self-Forcing on Wan2.1 14b. +docs_url: "https://github.com/daydreamlive/scope/blob/main/src/scope/core/pipelines/krea_realtime_video/docs/usage.md" +estimated_vram_gb: 32.0 +requires_models: true +supports_lora: true + +supports_cache_management: true +supports_kv_cache_bias: true +supports_quantization: true +min_dimension: 16 +modified: true +recommended_quantization_vram_threshold: 40.0 + +default_temporal_interpolation_method: "linear" +default_temporal_interpolation_steps: 4 + +height: 320 +width: 576 +denoising_steps: [1000, 750, 500, 250] + +modes: + text: + default: true + video: + height: 256 + width: 256 + noise_scale: 0.7 + noise_controller: true + denoising_steps: [1000, 750] diff --git a/src/scope/core/pipelines/longlive/pipeline.py b/src/scope/core/pipelines/longlive/pipeline.py index 9135d7b2..cd6d3729 100644 --- a/src/scope/core/pipelines/longlive/pipeline.py +++ b/src/scope/core/pipelines/longlive/pipeline.py @@ -1,6 +1,5 @@ import logging import time -from typing import TYPE_CHECKING import torch from diffusers.modular_pipelines import PipelineState @@ -15,7 +14,6 @@ ) from ..interface import Pipeline, Requirements from ..process import postprocess_chunk -from ..schema import LongLiveConfig from ..utils import Quantization, load_model_config, validate_resolution from ..wan2_1.components import WanDiffusionWrapper, WanTextEncoderWrapper from ..wan2_1.lora.mixin import LoRAEnabledPipeline @@ -24,19 +22,12 @@ from ..wan2_1.vae import WanVAEWrapper from .modular_blocks import LongLiveBlocks -if TYPE_CHECKING: - from ..schema import BasePipelineConfig - logger = logging.getLogger(__name__) DEFAULT_DENOISING_STEP_LIST = [1000, 750, 500, 250] class LongLivePipeline(Pipeline, LoRAEnabledPipeline, VACEEnabledPipeline): - @classmethod - def get_config_class(cls) -> type["BasePipelineConfig"]: - return LongLiveConfig - def __init__( self, config, diff --git a/src/scope/core/pipelines/longlive/schema.yaml b/src/scope/core/pipelines/longlive/schema.yaml new file mode 100644 index 00000000..8aeb0dd5 --- /dev/null +++ b/src/scope/core/pipelines/longlive/schema.yaml @@ -0,0 +1,30 @@ +pipeline_id: "longlive" +pipeline_name: "LongLive" +pipeline_description: > + A streaming pipeline and autoregressive video diffusion model from Nvidia, MIT, HKUST, HKU and THU. + The model is trained using Self-Forcing on Wan2.1 1.3b with modifications to support smoother prompt + switching and improved quality over longer time periods while maintaining fast generation. +docs_url: "https://github.com/daydreamlive/scope/blob/main/src/scope/core/pipelines/longlive/docs/usage.md" +estimated_vram_gb: 20.0 +requires_models: true +supports_lora: true +supports_vace: true + +supports_cache_management: true +supports_quantization: true +min_dimension: 16 +modified: true + +height: 320 +width: 576 +denoising_steps: [1000, 750, 500, 250] + +modes: + text: + default: true + video: + height: 512 + width: 512 + noise_scale: 0.7 + noise_controller: true + denoising_steps: [1000, 750] diff --git a/src/scope/core/pipelines/memflow/pipeline.py b/src/scope/core/pipelines/memflow/pipeline.py index c4677e00..51765031 100644 --- a/src/scope/core/pipelines/memflow/pipeline.py +++ b/src/scope/core/pipelines/memflow/pipeline.py @@ -1,6 +1,5 @@ import logging import time -from typing import TYPE_CHECKING import torch from diffusers.modular_pipelines import PipelineState @@ -15,7 +14,6 @@ ) from ..interface import Pipeline, Requirements from ..process import postprocess_chunk -from ..schema import MemFlowConfig from ..utils import Quantization, load_model_config, validate_resolution from ..wan2_1.components import WanDiffusionWrapper, WanTextEncoderWrapper from ..wan2_1.lora.mixin import LoRAEnabledPipeline @@ -25,19 +23,12 @@ from .modular_blocks import MemFlowBlocks from .modules.causal_model import CausalWanModel -if TYPE_CHECKING: - from ..schema import BasePipelineConfig - logger = logging.getLogger(__name__) DEFAULT_DENOISING_STEP_LIST = [1000, 750, 500, 250] class MemFlowPipeline(Pipeline, LoRAEnabledPipeline, VACEEnabledPipeline): - @classmethod - def get_config_class(cls) -> type["BasePipelineConfig"]: - return MemFlowConfig - def __init__( self, config, diff --git a/src/scope/core/pipelines/memflow/schema.yaml b/src/scope/core/pipelines/memflow/schema.yaml new file mode 100644 index 00000000..1dde8f17 --- /dev/null +++ b/src/scope/core/pipelines/memflow/schema.yaml @@ -0,0 +1,29 @@ +pipeline_id: "memflow" +pipeline_name: "MemFlow" +pipeline_description: > + A streaming pipeline and autoregressive video diffusion model with memory-efficient flow matching. + Uses Wan2.1 1.3b as the base model with optimized memory management for longer video generation. +docs_url: "https://github.com/daydreamlive/scope/blob/main/src/scope/core/pipelines/memflow/docs/usage.md" +estimated_vram_gb: 20.0 +requires_models: true +supports_lora: true +supports_vace: true + +supports_cache_management: true +supports_quantization: true +min_dimension: 16 +modified: true + +height: 320 +width: 576 +denoising_steps: [1000, 750, 500, 250] + +modes: + text: + default: true + video: + height: 512 + width: 512 + noise_scale: 0.7 + noise_controller: true + denoising_steps: [1000, 750] diff --git a/src/scope/core/pipelines/passthrough/pipeline.py b/src/scope/core/pipelines/passthrough/pipeline.py index eaea5aab..6ae18832 100644 --- a/src/scope/core/pipelines/passthrough/pipeline.py +++ b/src/scope/core/pipelines/passthrough/pipeline.py @@ -1,23 +1,13 @@ -from typing import TYPE_CHECKING - import torch from einops import rearrange from ..interface import Pipeline, Requirements from ..process import postprocess_chunk, preprocess_chunk -from ..schema import PassthroughConfig - -if TYPE_CHECKING: - from ..schema import BasePipelineConfig class PassthroughPipeline(Pipeline): """Passthrough pipeline for testing""" - @classmethod - def get_config_class(cls) -> type["BasePipelineConfig"]: - return PassthroughConfig - def __init__( self, height: int = 512, diff --git a/src/scope/core/pipelines/passthrough/schema.yaml b/src/scope/core/pipelines/passthrough/schema.yaml new file mode 100644 index 00000000..b04317ef --- /dev/null +++ b/src/scope/core/pipelines/passthrough/schema.yaml @@ -0,0 +1,9 @@ +pipeline_id: "passthrough" +pipeline_name: "Passthrough" +pipeline_description: "A pipeline that returns the input video without any processing that is useful for testing and debugging." + +supports_prompts: false + +modes: + video: + default: true diff --git a/src/scope/core/pipelines/reward_forcing/pipeline.py b/src/scope/core/pipelines/reward_forcing/pipeline.py index 439cda7d..f2107482 100644 --- a/src/scope/core/pipelines/reward_forcing/pipeline.py +++ b/src/scope/core/pipelines/reward_forcing/pipeline.py @@ -1,6 +1,5 @@ import logging import time -from typing import TYPE_CHECKING import torch from diffusers.modular_pipelines import PipelineState @@ -15,7 +14,6 @@ ) from ..interface import Pipeline, Requirements from ..process import postprocess_chunk -from ..schema import RewardForcingConfig from ..utils import Quantization, load_model_config, validate_resolution from ..wan2_1.components import WanDiffusionWrapper, WanTextEncoderWrapper from ..wan2_1.lora.mixin import LoRAEnabledPipeline @@ -23,19 +21,12 @@ from ..wan2_1.vae import WanVAEWrapper from .modular_blocks import RewardForcingBlocks -if TYPE_CHECKING: - from ..schema import BasePipelineConfig - logger = logging.getLogger(__name__) DEFAULT_DENOISING_STEP_LIST = [1000, 750, 500, 250] class RewardForcingPipeline(Pipeline, LoRAEnabledPipeline, VACEEnabledPipeline): - @classmethod - def get_config_class(cls) -> type["BasePipelineConfig"]: - return RewardForcingConfig - def __init__( self, config, diff --git a/src/scope/core/pipelines/reward_forcing/schema.yaml b/src/scope/core/pipelines/reward_forcing/schema.yaml new file mode 100644 index 00000000..79cbfbe1 --- /dev/null +++ b/src/scope/core/pipelines/reward_forcing/schema.yaml @@ -0,0 +1,29 @@ +pipeline_id: "reward-forcing" +pipeline_name: "RewardForcing" +pipeline_description: > + A streaming pipeline and autoregressive video diffusion model from ZJU, Ant Group, SIAS-ZJU, HUST and SJTU. + The model is trained with Rewarded Distribution Matching Distillation using Wan2.1 1.3b as the base model. +docs_url: "https://github.com/daydreamlive/scope/blob/main/src/scope/core/pipelines/reward_forcing/docs/usage.md" +estimated_vram_gb: 20.0 +requires_models: true +supports_lora: true +supports_vace: true + +supports_cache_management: true +supports_quantization: true +min_dimension: 16 +modified: true + +height: 320 +width: 576 +denoising_steps: [1000, 750, 500, 250] + +modes: + text: + default: true + video: + height: 512 + width: 512 + noise_scale: 0.7 + noise_controller: true + denoising_steps: [1000, 750] diff --git a/src/scope/core/pipelines/schema.py b/src/scope/core/pipelines/schema.py index 354916f8..dd3293c2 100644 --- a/src/scope/core/pipelines/schema.py +++ b/src/scope/core/pipelines/schema.py @@ -6,633 +6,46 @@ - Type-safe configuration access - API introspection and automatic UI generation -Pipeline-specific configs inherit from BasePipelineConfig and override defaults. -Each pipeline defines its supported modes and can provide mode-specific defaults. +Pipeline-specific configs are defined via schema.yaml files in their directories. +The configs are automatically loaded and made available via this module. + +To create a new pipeline: +1. Create a directory for your pipeline (e.g., my_pipeline/) +2. Add a schema.yaml file with your pipeline's configuration +3. In your pipeline.py, use: + from ..schema_loader import get_or_create_config + MyConfig = get_or_create_config(__file__) + +Example schema.yaml: + pipeline_id: "my-pipeline" + pipeline_name: "My Pipeline" + pipeline_description: "A pipeline that does X." + height: 320 + width: 576 + modes: + text: + default: true + video: + height: 512 + width: 512 """ -from typing import Annotated, Any, ClassVar, Literal +from pathlib import Path -from pydantic import BaseModel, ConfigDict, Field +# Re-export base classes from base_schema for backwards compatibility +from .base_schema import BasePipelineConfig, InputMode, ModeDefaults +from .schema_loader import load_config_from_yaml -# Type alias for input modes -InputMode = Literal["text", "video"] - - -class ModeDefaults(BaseModel): - """Mode-specific default values. - - These override the base config defaults when operating in a specific mode. - Only non-None values will override the base defaults. - """ - - model_config = ConfigDict(extra="forbid") - - # Resolution can differ per mode - height: int | None = None - width: int | None = None - - # Core parameters - denoising_steps: list[int] | None = None - - # Video mode parameters - noise_scale: float | None = None - noise_controller: bool | None = None - - -class BasePipelineConfig(BaseModel): - """Base configuration for all pipelines. - - This provides common parameters shared across all pipeline modes. - Pipeline-specific configs inherit from this and override defaults. - - Mode support is declared via class variables: - - supported_modes: List of modes this pipeline supports ("text", "video") - - default_mode: The mode to use by default in the UI - - Mode-specific defaults can be provided via the get_mode_defaults() class method. - """ - - model_config = ConfigDict(extra="forbid") - - # Pipeline metadata - not configuration parameters, used for identification - pipeline_id: ClassVar[str] = "base" - pipeline_name: ClassVar[str] = "Base Pipeline" - pipeline_description: ClassVar[str] = "Base pipeline configuration" - pipeline_version: ClassVar[str] = "1.0.0" - docs_url: ClassVar[str | None] = None - estimated_vram_gb: ClassVar[float | None] = None - requires_models: ClassVar[bool] = False - supports_lora: ClassVar[bool] = False - supports_vace: ClassVar[bool] = False - - # UI capability metadata - tells frontend what controls to show - supports_cache_management: ClassVar[bool] = False - supports_kv_cache_bias: ClassVar[bool] = False - supports_quantization: ClassVar[bool] = False - min_dimension: ClassVar[int] = 1 - # Whether this pipeline contains modifications based on the original project - modified: ClassVar[bool] = False - # Recommended quantization based on VRAM: if user's VRAM > this threshold (GB), - # quantization=null is recommended, otherwise fp8_e4m3fn is recommended. - # None means no specific recommendation (pipeline doesn't benefit from quantization). - recommended_quantization_vram_threshold: ClassVar[float | None] = None - - # Mode support - override in subclasses - supported_modes: ClassVar[list[InputMode]] = ["text"] - default_mode: ClassVar[InputMode] = "text" - - # Prompt and temporal interpolation support - supports_prompts: ClassVar[bool] = True - default_temporal_interpolation_method: ClassVar[Literal["linear", "slerp"]] = ( - "slerp" - ) - default_temporal_interpolation_steps: ClassVar[int] = 0 - - # Resolution settings - height: int = Field(default=512, ge=1, description="Output height in pixels") - width: int = Field(default=512, ge=1, description="Output width in pixels") - - # Core parameters - manage_cache: bool = Field( - default=True, - description="Enable automatic cache management for performance optimization", - ) - base_seed: Annotated[int, Field(ge=0)] = Field( - default=42, - description="Base random seed for reproducible generation", - ) - denoising_steps: list[int] | None = Field( - default=None, - description="Denoising step schedule for progressive generation", - ) - - # Video mode parameters (None means not applicable/text mode) - noise_scale: Annotated[float, Field(ge=0.0, le=1.0)] | None = Field( - default=None, - description="Amount of noise to add during video generation (video mode only)", - ) - noise_controller: bool | None = Field( - default=None, - description="Enable dynamic noise control during generation (video mode only)", - ) - input_size: int | None = Field( - default=None, - description="Expected input video frame count (video mode only)", - ) - - @classmethod - def get_pipeline_metadata(cls) -> dict[str, str]: - """Return pipeline identification metadata. - - Returns: - Dict with id, name, description, version - """ - return { - "id": cls.pipeline_id, - "name": cls.pipeline_name, - "description": cls.pipeline_description, - "version": cls.pipeline_version, - } - - @classmethod - def get_mode_defaults(cls) -> dict[InputMode, ModeDefaults]: - """Return mode-specific default overrides. - - Override in subclasses to provide different defaults per mode. - Values in ModeDefaults override the base config defaults. - - Returns: - Dict mapping mode name to ModeDefaults with override values - """ - return {} - - @classmethod - def get_defaults_for_mode(cls, mode: InputMode) -> dict[str, Any]: - """Get effective defaults for a specific mode. - - Merges base config defaults with mode-specific overrides. - - Args: - mode: The input mode ("text" or "video") - - Returns: - Dict of parameter names to their effective default values - """ - # Start with base defaults from model fields - base_instance = cls() - defaults = base_instance.model_dump() - - # Apply mode-specific overrides - mode_defaults = cls.get_mode_defaults().get(mode) - if mode_defaults: - for field_name, value in mode_defaults.model_dump().items(): - if value is not None: - defaults[field_name] = value - - return defaults - - @classmethod - def get_schema_with_metadata(cls) -> dict[str, Any]: - """Return complete schema with pipeline metadata and JSON schema. - - This is the primary method for API/UI schema generation. - - Returns: - Dict containing pipeline metadata - """ - metadata = cls.get_pipeline_metadata() - metadata["supported_modes"] = cls.supported_modes - metadata["default_mode"] = cls.default_mode - metadata["supports_prompts"] = cls.supports_prompts - metadata["default_temporal_interpolation_method"] = ( - cls.default_temporal_interpolation_method - ) - metadata["default_temporal_interpolation_steps"] = ( - cls.default_temporal_interpolation_steps - ) - metadata["docs_url"] = cls.docs_url - metadata["estimated_vram_gb"] = cls.estimated_vram_gb - metadata["requires_models"] = cls.requires_models - metadata["supports_lora"] = cls.supports_lora - metadata["supports_vace"] = cls.supports_vace - metadata["supports_cache_management"] = cls.supports_cache_management - metadata["supports_kv_cache_bias"] = cls.supports_kv_cache_bias - metadata["supports_quantization"] = cls.supports_quantization - metadata["min_dimension"] = cls.min_dimension - metadata["recommended_quantization_vram_threshold"] = ( - cls.recommended_quantization_vram_threshold - ) - metadata["modified"] = cls.modified - metadata["config_schema"] = cls.model_json_schema() - - # Include mode-specific defaults if defined - mode_defaults = cls.get_mode_defaults() - if mode_defaults: - metadata["mode_defaults"] = { - mode: defaults.model_dump(exclude_none=True) - for mode, defaults in mode_defaults.items() - } - - return metadata - - def is_video_mode(self) -> bool: - """Check if this config represents video mode. - - Returns: - True if video mode parameters are set - """ - return self.input_size is not None - - -# Concrete pipeline configurations - - -class StreamDiffusionV2Config(BasePipelineConfig): - """Configuration for StreamDiffusion V2 pipeline. - - StreamDiffusionV2 supports both text-to-video and video-to-video modes. - Default mode is video (V2V was the original training focus). - """ - - pipeline_id: ClassVar[str] = "streamdiffusionv2" - pipeline_name: ClassVar[str] = "StreamDiffusionV2" - pipeline_description: ClassVar[str] = ( - "A streaming pipeline and autoregressive video diffusion model from the creators of the original " - "StreamDiffusion project. The model is trained using Self-Forcing on Wan2.1 1.3b with modifications " - "to support streaming." - ) - docs_url: ClassVar[str | None] = ( - "https://github.com/daydreamlive/scope/blob/main/src/scope/core/pipelines/streamdiffusionv2/docs/usage.md" - ) - estimated_vram_gb: ClassVar[float | None] = 20.0 - requires_models: ClassVar[bool] = True - supports_lora: ClassVar[bool] = True - supports_vace: ClassVar[bool] = True - - # UI capabilities - supports_cache_management: ClassVar[bool] = True - supports_quantization: ClassVar[bool] = True - min_dimension: ClassVar[int] = 16 - modified: ClassVar[bool] = True - - # Mode support - supported_modes: ClassVar[list[InputMode]] = ["text", "video"] - default_mode: ClassVar[InputMode] = "video" - - # StreamDiffusion V2 defaults (video mode baseline since it's the default) - height: int = Field(default=512, ge=1, description="Output height in pixels") - width: int = Field(default=512, ge=1, description="Output width in pixels") - denoising_steps: list[int] | None = Field( - default=[750, 250], - description="Denoising step schedule for progressive generation", - ) - noise_scale: Annotated[float, Field(ge=0.0, le=1.0)] | None = Field( - default=0.7, - description="Amount of noise to add during video generation", - ) - noise_controller: bool | None = Field( - default=True, - description="Enable dynamic noise control during generation", - ) - input_size: int | None = Field( - default=4, - description="Expected input video frame count", - ) - - # VACE (optional reference image conditioning for text mode) - ref_images: list[str] | None = Field( - default=None, - description="List of reference image paths for VACE conditioning in text mode", - ) - vace_context_scale: float = Field( - default=1.0, - ge=0.0, - le=2.0, - description="Scaling factor for VACE hint injection (0.0 to 2.0)", - ) - - @classmethod - def get_mode_defaults(cls) -> dict[InputMode, ModeDefaults]: - """StreamDiffusionV2 mode-specific defaults.""" - return { - "text": ModeDefaults( - # Text mode: distinct resolution, no video input, no noise controls - height=512, - width=512, - noise_scale=None, - noise_controller=None, - denoising_steps=[1000, 750], - ), - "video": ModeDefaults( - # Video mode: requires input frames, noise controls active - noise_scale=0.7, - noise_controller=True, - ), - } - - -class LongLiveConfig(BasePipelineConfig): - """Configuration for LongLive pipeline. - - LongLive supports both text-to-video and video-to-video modes. - Default mode is text (T2V was the original training focus). - """ - - pipeline_id: ClassVar[str] = "longlive" - pipeline_name: ClassVar[str] = "LongLive" - pipeline_description: ClassVar[str] = ( - "A streaming pipeline and autoregressive video diffusion model from Nvidia, MIT, HKUST, HKU and THU. " - "The model is trained using Self-Forcing on Wan2.1 1.3b with modifications to support smoother prompt " - "switching and improved quality over longer time periods while maintaining fast generation." - ) - docs_url: ClassVar[str | None] = ( - "https://github.com/daydreamlive/scope/blob/main/src/scope/core/pipelines/longlive/docs/usage.md" - ) - estimated_vram_gb: ClassVar[float | None] = 20.0 - requires_models: ClassVar[bool] = True - supports_lora: ClassVar[bool] = True - supports_vace: ClassVar[bool] = True - - # UI capabilities - supports_cache_management: ClassVar[bool] = True - supports_quantization: ClassVar[bool] = True - min_dimension: ClassVar[int] = 16 - modified: ClassVar[bool] = True - - # Mode support - supported_modes: ClassVar[list[InputMode]] = ["text", "video"] - default_mode: ClassVar[InputMode] = "text" - - # LongLive defaults (text mode baseline) - height: int = Field(default=320, ge=1, description="Output height in pixels") - width: int = Field(default=576, ge=1, description="Output width in pixels") - denoising_steps: list[int] | None = Field( - default=[1000, 750, 500, 250], - description="Denoising step schedule for progressive generation", - ) - # noise_scale is None by default (text mode), overridden in video mode - noise_scale: Annotated[float, Field(ge=0.0, le=1.0)] | None = Field( - default=None, - description="Amount of noise to add during video generation (video mode only)", - ) - - # VACE (optional reference image conditioning) - ref_images: list[str] | None = Field( - default=None, - description="List of reference image paths for VACE conditioning", - ) - vace_context_scale: float = Field( - default=1.0, - ge=0.0, - le=2.0, - description="Scaling factor for VACE hint injection (0.0 to 2.0)", - ) - - @classmethod - def get_mode_defaults(cls) -> dict[InputMode, ModeDefaults]: - """LongLive mode-specific defaults.""" - return { - "text": ModeDefaults( - # Text mode: no video input, no noise controls - noise_scale=None, - noise_controller=None, - ), - "video": ModeDefaults( - # Video mode: requires input frames, noise controls active - height=512, - width=512, - noise_scale=0.7, - noise_controller=True, - denoising_steps=[1000, 750], - ), - } - - -class KreaRealtimeVideoConfig(BasePipelineConfig): - """Configuration for Krea Realtime Video pipeline. - - Krea supports both text-to-video and video-to-video modes. - Default mode is text (T2V was the original training focus). - """ - - pipeline_id: ClassVar[str] = "krea-realtime-video" - pipeline_name: ClassVar[str] = "Krea Realtime Video" - pipeline_description: ClassVar[str] = ( - "A streaming pipeline and autoregressive video diffusion model from Krea. " - "The model is trained using Self-Forcing on Wan2.1 14b." - ) - docs_url: ClassVar[str | None] = ( - "https://github.com/daydreamlive/scope/blob/main/src/scope/core/pipelines/krea_realtime_video/docs/usage.md" - ) - estimated_vram_gb: ClassVar[float | None] = 32.0 - requires_models: ClassVar[bool] = True - supports_lora: ClassVar[bool] = True - - # UI capabilities - supports_cache_management: ClassVar[bool] = True - supports_kv_cache_bias: ClassVar[bool] = True - supports_quantization: ClassVar[bool] = True - min_dimension: ClassVar[int] = 16 - modified: ClassVar[bool] = True - # Recommend quantization for systems with <= 40GB VRAM - recommended_quantization_vram_threshold: ClassVar[float | None] = 40.0 - - default_temporal_interpolation_method: ClassVar[Literal["linear", "slerp"]] = ( - "linear" - ) - default_temporal_interpolation_steps: ClassVar[int] = 4 - - # Mode support - supported_modes: ClassVar[list[InputMode]] = ["text", "video"] - default_mode: ClassVar[InputMode] = "text" - - # Krea defaults (text mode baseline) - distinct from LongLive (320x576) - height: int = Field(default=320, ge=1, description="Output height in pixels") - width: int = Field(default=576, ge=1, description="Output width in pixels") - denoising_steps: list[int] | None = Field( - default=[1000, 750, 500, 250], - description="Denoising step schedule for progressive generation", - ) - # noise_scale is None by default (text mode), overridden in video mode - noise_scale: Annotated[float, Field(ge=0.0, le=1.0)] | None = Field( - default=None, - description="Amount of noise to add during video generation (video mode only)", - ) - - @classmethod - def get_mode_defaults(cls) -> dict[InputMode, ModeDefaults]: - """Krea mode-specific defaults.""" - return { - "text": ModeDefaults( - # Text mode: no video input, no noise controls - noise_scale=None, - noise_controller=None, - ), - "video": ModeDefaults( - # Video mode: requires input frames, noise controls active - height=256, - width=256, - noise_scale=0.7, - noise_controller=True, - denoising_steps=[1000, 750], - ), - } - - -class RewardForcingConfig(BasePipelineConfig): - """Configuration for RewardForcing pipeline. - - RewardForcing supports both text-to-video and video-to-video modes. - Default mode is text (T2V was the original training focus). - """ - - pipeline_id: ClassVar[str] = "reward-forcing" - pipeline_name: ClassVar[str] = "RewardForcing" - pipeline_description: ClassVar[str] = ( - "A streaming pipeline and autoregressive video diffusion model from ZJU, Ant Group, SIAS-ZJU, HUST and SJTU. " - "The model is trained with Rewarded Distribution Matching Distillation using Wan2.1 1.3b as the base model." - ) - docs_url: ClassVar[str | None] = ( - "https://github.com/daydreamlive/scope/blob/main/src/scope/core/pipelines/reward_forcing/docs/usage.md" - ) - estimated_vram_gb: ClassVar[float | None] = 20.0 - requires_models: ClassVar[bool] = True - supports_lora: ClassVar[bool] = True - supports_vace: ClassVar[bool] = True - - # UI capabilities - supports_cache_management: ClassVar[bool] = True - supports_quantization: ClassVar[bool] = True - min_dimension: ClassVar[int] = 16 - modified: ClassVar[bool] = True - - # Mode support - supported_modes: ClassVar[list[InputMode]] = ["text", "video"] - default_mode: ClassVar[InputMode] = "text" - - # RewardForcing defaults (text mode baseline) - height: int = Field(default=320, ge=1, description="Output height in pixels") - width: int = Field(default=576, ge=1, description="Output width in pixels") - denoising_steps: list[int] | None = Field( - default=[1000, 750, 500, 250], - description="Denoising step schedule for progressive generation", - ) - # noise_scale is None by default (text mode), overridden in video mode - noise_scale: Annotated[float, Field(ge=0.0, le=1.0)] | None = Field( - default=None, - description="Amount of noise to add during video generation (video mode only)", - ) - - # VACE (optional reference image conditioning) - ref_images: list[str] | None = Field( - default=None, - description="List of reference image paths for VACE conditioning", - ) - vace_context_scale: float = Field( - default=1.0, - ge=0.0, - le=2.0, - description="Scaling factor for VACE hint injection (0.0 to 2.0)", - ) - - @classmethod - def get_mode_defaults(cls) -> dict[InputMode, ModeDefaults]: - """RewardForcing mode-specific defaults.""" - return { - "text": ModeDefaults( - # Text mode: no video input, no noise controls - noise_scale=None, - noise_controller=None, - ), - "video": ModeDefaults( - # Video mode: requires input frames, noise controls active - height=512, - width=512, - noise_scale=0.7, - noise_controller=True, - denoising_steps=[1000, 750], - ), - } - - -class MemFlowConfig(BasePipelineConfig): - """Configuration for MemFlow pipeline. - - MemFlow supports both text-to-video and video-to-video modes. - Default mode is text (T2V was the original training focus). - """ - - pipeline_id: ClassVar[str] = "memflow" - pipeline_name: ClassVar[str] = "MemFlow" - pipeline_description: ClassVar[str] = ( - "A streaming pipeline and autoregressive video diffusion model from Kling." - ) - - # Mode support - supported_modes: ClassVar[list[InputMode]] = ["text", "video"] - default_mode: ClassVar[InputMode] = "text" - - # LongLive defaults (text mode baseline) - height: int = Field(default=320, ge=1, description="Output height in pixels") - width: int = Field(default=576, ge=1, description="Output width in pixels") - denoising_steps: list[int] | None = Field( - default=[1000, 750, 500, 250], - description="Denoising step schedule for progressive generation", - ) - # noise_scale is None by default (text mode), overridden in video mode - noise_scale: Annotated[float, Field(ge=0.0, le=1.0)] | None = Field( - default=None, - description="Amount of noise to add during video generation (video mode only)", - ) - - # VACE (optional reference image conditioning) - ref_images: list[str] | None = Field( - default=None, - description="List of reference image paths for VACE conditioning", - ) - vace_context_scale: float = Field( - default=1.0, - ge=0.0, - le=2.0, - description="Scaling factor for VACE hint injection (0.0 to 2.0)", - ) - - @classmethod - def get_mode_defaults(cls) -> dict[InputMode, ModeDefaults]: - """MemFlow mode-specific defaults.""" - return { - "text": ModeDefaults( - # Text mode: no video input, no noise controls - noise_scale=None, - noise_controller=None, - ), - "video": ModeDefaults( - # Video mode: requires input frames, noise controls active - height=512, - width=512, - noise_scale=0.7, - noise_controller=True, - denoising_steps=[1000, 750], - ), - } - - -class PassthroughConfig(BasePipelineConfig): - """Configuration for Passthrough pipeline (testing). - - Passthrough only supports video mode - it passes through input video frames. - """ - - pipeline_id: ClassVar[str] = "passthrough" - pipeline_name: ClassVar[str] = "Passthrough" - pipeline_description: ClassVar[str] = ( - "A pipeline that returns the input video without any processing that is useful for testing and debugging." - ) - - # Mode support - video only - supported_modes: ClassVar[list[InputMode]] = ["video"] - default_mode: ClassVar[InputMode] = "video" - - # Does not support prompts - supports_prompts: ClassVar[bool] = False - - # Passthrough defaults - requires video input (distinct from StreamDiffusionV2) - height: int = Field(default=512, ge=1, description="Output height in pixels") - width: int = Field(default=512, ge=1, description="Output width in pixels") - input_size: int | None = Field( - default=4, - description="Expected input video frame count", - ) - - @classmethod - def get_mode_defaults(cls) -> dict[InputMode, ModeDefaults]: - """Passthrough mode-specific defaults - no noise controls.""" - return { - "video": ModeDefaults( - # No noise controls for passthrough - it just passes frames through - ), - } +# Directory containing pipeline subdirectories +_PIPELINES_DIR = Path(__file__).parent +# Load pipeline configs directly from YAML files +LongLiveConfig = load_config_from_yaml(_PIPELINES_DIR / "longlive" / "schema.yaml") +PassthroughConfig = load_config_from_yaml(_PIPELINES_DIR / "passthrough" / "schema.yaml") +KreaRealtimeVideoConfig = load_config_from_yaml(_PIPELINES_DIR / "krea_realtime_video" / "schema.yaml") +RewardForcingConfig = load_config_from_yaml(_PIPELINES_DIR / "reward_forcing" / "schema.yaml") +StreamDiffusionV2Config = load_config_from_yaml(_PIPELINES_DIR / "streamdiffusionv2" / "schema.yaml") +MemFlowConfig = load_config_from_yaml(_PIPELINES_DIR / "memflow" / "schema.yaml") # Registry of pipeline config classes PIPELINE_CONFIGS: dict[str, type[BasePipelineConfig]] = { @@ -641,6 +54,7 @@ def get_mode_defaults(cls) -> dict[InputMode, ModeDefaults]: "krea-realtime-video": KreaRealtimeVideoConfig, "reward-forcing": RewardForcingConfig, "passthrough": PassthroughConfig, + "memflow": MemFlowConfig, } @@ -654,3 +68,21 @@ def get_config_class(pipeline_id: str) -> type[BasePipelineConfig] | None: Config class if found, None otherwise """ return PIPELINE_CONFIGS.get(pipeline_id) + + +__all__ = [ + # Base classes + "BasePipelineConfig", + "InputMode", + "ModeDefaults", + # Pipeline configs + "StreamDiffusionV2Config", + "LongLiveConfig", + "KreaRealtimeVideoConfig", + "RewardForcingConfig", + "MemFlowConfig", + "PassthroughConfig", + # Registry + "PIPELINE_CONFIGS", + "get_config_class", +] diff --git a/src/scope/core/pipelines/schema_loader.py b/src/scope/core/pipelines/schema_loader.py new file mode 100644 index 00000000..726b8f94 --- /dev/null +++ b/src/scope/core/pipelines/schema_loader.py @@ -0,0 +1,253 @@ +"""Dynamic schema loader for YAML-based pipeline configurations. + +This module provides utilities to load pipeline configuration schemas from YAML files, +automatically creating Pydantic model classes at runtime. + +Pipeline developers can simply provide a schema.yaml file in their pipeline directory +instead of implementing schema.py. The loader will automatically discover and parse +these YAML files to generate the corresponding config classes. + +Example schema.yaml: + pipeline_id: "my-pipeline" + pipeline_name: "My Pipeline" + pipeline_description: "A great pipeline that does amazing things." + docs_url: "https://example.com/docs" + estimated_vram_gb: 20.0 + requires_models: true + supports_lora: true + supports_vace: false + + supports_cache_management: true + supports_quantization: true + min_dimension: 16 + modified: true + + # Instance-level field defaults + height: 320 + width: 576 + denoising_steps: [1000, 750, 500, 250] + + # Mode configuration + modes: + text: + default: true + video: + height: 512 + width: 512 + noise_scale: 0.7 + noise_controller: true + denoising_steps: [1000, 750] +""" + +import logging +from pathlib import Path +from typing import Any, ClassVar + +import yaml + +from .base_schema import BasePipelineConfig, ModeDefaults + +logger = logging.getLogger(__name__) + +# Cache for loaded config classes to avoid repeated parsing +_config_class_cache: dict[str, type[BasePipelineConfig]] = {} + + +# Class variables that should be set on the class, not as instance fields +CLASS_VAR_FIELDS = { + "pipeline_id", + "pipeline_name", + "pipeline_description", + "pipeline_version", + "docs_url", + "estimated_vram_gb", + "requires_models", + "supports_lora", + "supports_vace", + "supports_cache_management", + "supports_kv_cache_bias", + "supports_quantization", + "min_dimension", + "modified", + "recommended_quantization_vram_threshold", + "supports_prompts", + "default_temporal_interpolation_method", + "default_temporal_interpolation_steps", + "modes", +} + +# Instance fields that can be overridden with simple values +INSTANCE_FIELDS = { + "height", + "width", + "denoising_steps", + "noise_scale", + "noise_controller", + "input_size", + "ref_images", + "vace_context_scale", + "manage_cache", + "base_seed", +} + + +def _parse_modes(modes_dict: dict[str, Any]) -> dict[str, ModeDefaults]: + """Parse modes dictionary from YAML into ModeDefaults objects. + + Args: + modes_dict: Dictionary of mode names to their default values + + Returns: + Dictionary of mode names to ModeDefaults instances + """ + result = {} + for mode_name, mode_values in modes_dict.items(): + if mode_values is None: + mode_values = {} + result[mode_name] = ModeDefaults(**mode_values) + return result + + +def load_config_from_yaml(yaml_path: str | Path) -> type[BasePipelineConfig]: + """Load a pipeline config class from a YAML file. + + This function parses the YAML file and dynamically creates a Pydantic + model class that inherits from BasePipelineConfig with the specified + class variables and field defaults. + + Args: + yaml_path: Path to the schema.yaml file + + Returns: + A dynamically created config class + + Raises: + FileNotFoundError: If the YAML file doesn't exist + yaml.YAMLError: If the YAML is malformed + ValueError: If required fields are missing + """ + yaml_path = Path(yaml_path) + + # Check cache first + cache_key = str(yaml_path.resolve()) + if cache_key in _config_class_cache: + return _config_class_cache[cache_key] + + if not yaml_path.exists(): + raise FileNotFoundError(f"Schema file not found: {yaml_path}") + + with open(yaml_path) as f: + config_data = yaml.safe_load(f) + + if config_data is None: + raise ValueError(f"Empty or invalid YAML file: {yaml_path}") + + # Validate required fields + if "pipeline_id" not in config_data: + raise ValueError(f"Missing required field 'pipeline_id' in {yaml_path}") + + # Generate class name from pipeline_id + pipeline_id = config_data["pipeline_id"] + class_name = _generate_class_name(pipeline_id) + + # Separate class variables from instance field overrides + class_vars: dict[str, Any] = {} + field_defaults: dict[str, Any] = {} + + for key, value in config_data.items(): + if key == "modes": + # Special handling for modes - parse into ModeDefaults objects + class_vars["modes"] = _parse_modes(value) + elif key in CLASS_VAR_FIELDS: + class_vars[key] = value + elif key in INSTANCE_FIELDS: + field_defaults[key] = value + else: + logger.warning(f"Unknown field '{key}' in {yaml_path}, ignoring") + + # Create the dynamic class with annotations for field defaults + annotations: dict[str, Any] = {} + for field_name, value in field_defaults.items(): + # Infer type from value + if isinstance(value, bool): + annotations[field_name] = bool + elif isinstance(value, int): + annotations[field_name] = int + elif isinstance(value, float): + annotations[field_name] = float + elif isinstance(value, list): + if value and isinstance(value[0], int): + annotations[field_name] = list[int] + elif value and isinstance(value[0], str): + annotations[field_name] = list[str] + else: + annotations[field_name] = list + elif value is None: + # Keep as optional - get annotation from parent + pass + + # Create namespace for the new class + namespace: dict[str, Any] = { + "__annotations__": annotations, + **class_vars, + **field_defaults, + } + + # Dynamically create the config class + config_class = type(class_name, (BasePipelineConfig,), namespace) + + # Cache the class + _config_class_cache[cache_key] = config_class + + return config_class + + +def _generate_class_name(pipeline_id: str) -> str: + """Generate a class name from a pipeline ID. + + Converts pipeline IDs like "krea-realtime-video" to "KreaRealtimeVideoConfig". + + Args: + pipeline_id: The pipeline identifier + + Returns: + A PascalCase class name ending in "Config" + """ + # Replace hyphens and underscores with spaces, title case, remove spaces + parts = pipeline_id.replace("-", " ").replace("_", " ").split() + pascal_case = "".join(part.capitalize() for part in parts) + return f"{pascal_case}Config" + + +def discover_pipeline_schemas(pipelines_dir: str | Path) -> dict[str, type[BasePipelineConfig]]: + """Discover all schema.yaml files in pipeline subdirectories. + + Scans the given directory for subdirectories containing schema.yaml files + and loads each one. + + Args: + pipelines_dir: Path to the pipelines directory + + Returns: + Dictionary mapping pipeline IDs to their config classes + """ + pipelines_dir = Path(pipelines_dir) + configs = {} + + for subdir in pipelines_dir.iterdir(): + if not subdir.is_dir(): + continue + + schema_path = subdir / "schema.yaml" + if not schema_path.exists(): + continue + + try: + config_class = load_config_from_yaml(schema_path) + pipeline_id = config_class.pipeline_id + configs[pipeline_id] = config_class + logger.debug(f"Loaded schema for pipeline '{pipeline_id}' from {schema_path}") + except Exception as e: + logger.warning(f"Failed to load schema from {schema_path}: {e}") + + return configs diff --git a/src/scope/core/pipelines/streamdiffusionv2/pipeline.py b/src/scope/core/pipelines/streamdiffusionv2/pipeline.py index 93a4a8e1..fcb41429 100644 --- a/src/scope/core/pipelines/streamdiffusionv2/pipeline.py +++ b/src/scope/core/pipelines/streamdiffusionv2/pipeline.py @@ -1,6 +1,5 @@ import logging import time -from typing import TYPE_CHECKING import torch from diffusers.modular_pipelines import PipelineState @@ -15,7 +14,6 @@ ) from ..interface import Pipeline, Requirements from ..process import postprocess_chunk -from ..schema import StreamDiffusionV2Config from ..utils import Quantization, load_model_config, validate_resolution from ..wan2_1.components import WanDiffusionWrapper, WanTextEncoderWrapper from ..wan2_1.lora.mixin import LoRAEnabledPipeline @@ -23,19 +21,12 @@ from .components import StreamDiffusionV2WanVAEWrapper from .modular_blocks import StreamDiffusionV2Blocks -if TYPE_CHECKING: - from ..schema import BasePipelineConfig - logger = logging.getLogger(__name__) DEFAULT_DENOISING_STEP_LIST = [750, 250] class StreamDiffusionV2Pipeline(Pipeline, LoRAEnabledPipeline, VACEEnabledPipeline): - @classmethod - def get_config_class(cls) -> type["BasePipelineConfig"]: - return StreamDiffusionV2Config - def __init__( self, config, diff --git a/src/scope/core/pipelines/streamdiffusionv2/schema.yaml b/src/scope/core/pipelines/streamdiffusionv2/schema.yaml new file mode 100644 index 00000000..373a82dc --- /dev/null +++ b/src/scope/core/pipelines/streamdiffusionv2/schema.yaml @@ -0,0 +1,29 @@ +pipeline_id: "streamdiffusionv2" +pipeline_name: "StreamDiffusionV2" +pipeline_description: > + A streaming pipeline and autoregressive video diffusion model from the creators of the original + StreamDiffusion project. The model is trained using Self-Forcing on Wan2.1 1.3b with modifications + to support streaming. +docs_url: "https://github.com/daydreamlive/scope/blob/main/src/scope/core/pipelines/streamdiffusionv2/docs/usage.md" +estimated_vram_gb: 20.0 +requires_models: true +supports_lora: true +supports_vace: true + +supports_cache_management: true +supports_quantization: true +min_dimension: 16 +modified: true + +denoising_steps: [750, 250] +noise_scale: 0.7 +noise_controller: true +input_size: 4 + +modes: + text: + height: 512 + width: 512 + denoising_steps: [1000, 750] + video: + default: true