Skip to content

Commit 10e7e78

Browse files
committed
feat(luminork): Add ability to see upgrade_available and upgrade schema
For an installed schema, we should see if there's an upgrade available and if so, add the ability to upgrade the schema
1 parent 3d242ef commit 10e7e78

File tree

6 files changed

+185
-2
lines changed

6 files changed

+185
-2
lines changed

lib/luminork-server/src/service/v1.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,7 @@ pub use schemas::{
159159
},
160160
unlock_schema::UnlockedSchemaV1Response,
161161
update_schema_variant::UpdateSchemaVariantV1Request,
162+
upgrade_schema::UpgradeSchemaResponse,
162163
};
163164
pub use search::{
164165
SearchV1Request,
@@ -221,6 +222,7 @@ pub use crate::api_types::func_run::v1::{
221222
schemas::create_codegen::create_variant_codegen,
222223
schemas::create_management::create_variant_management,
223224
schemas::update_schema_variant::update_schema_variant,
225+
schemas::upgrade_schema::upgrade_schema,
224226
funcs::get_func_run::get_func_run,
225227
funcs::get_func::get_func,
226228
funcs::update_func::update_func,

lib/luminork-server/src/service/v1/schemas/create_schema.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ pub async fn create_schema(
9393
default_variant_id: created_schema_variant.id,
9494
variant_ids: variants.into_iter().map(|v| v.id).collect(),
9595
schema_id: schema.id(),
96+
upgrade_available: None,
9697
}))
9798
}
9899

lib/luminork-server/src/service/v1/schemas/find_schema.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use utoipa::{
2424
use super::{
2525
SchemaError,
2626
SchemaResult,
27+
check_schema_upgrade_available,
2728
};
2829
use crate::extract::{
2930
PosthogEventTracker,
@@ -133,6 +134,12 @@ pub async fn find_schema(
133134
},
134135
};
135136

137+
let upgrade_available = if installed {
138+
check_schema_upgrade_available(ctx, schema_id).await?
139+
} else {
140+
None
141+
};
142+
136143
tracker.track(
137144
ctx,
138145
"api_find_schema",
@@ -146,6 +153,7 @@ pub async fn find_schema(
146153
schema_id,
147154
category,
148155
installed,
156+
upgrade_available,
149157
}))
150158
}
151159

@@ -160,6 +168,8 @@ pub struct FindSchemaV1Response {
160168
pub category: Option<String>,
161169
#[schema(value_type = bool)]
162170
pub installed: bool,
171+
#[schema(value_type = Option<bool>)]
172+
pub upgrade_available: Option<bool>,
163173
}
164174

165175
enum SchemaReference {

lib/luminork-server/src/service/v1/schemas/get_schema.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use super::{
2323
SchemaResponseV1,
2424
SchemaResult,
2525
SchemaV1RequestPath,
26+
check_schema_upgrade_available,
2627
};
2728
use crate::extract::{
2829
PosthogEventTracker,
@@ -58,6 +59,7 @@ pub async fn get_schema(
5859
if let Ok(Some(schema)) = Schema::get_by_id_opt(ctx, schema_id).await {
5960
let default_variant_id = Schema::default_variant_id(ctx, schema_id).await?;
6061
let variants = SchemaVariant::list_for_schema(ctx, schema_id).await?;
62+
let upgrade_available = check_schema_upgrade_available(ctx, schema_id).await?;
6163

6264
tracker.track(
6365
ctx,
@@ -75,6 +77,7 @@ pub async fn get_schema(
7577
name: schema.name,
7678
default_variant_id,
7779
variant_ids: variants.into_iter().map(|v| v.id).collect_vec(),
80+
upgrade_available,
7881
}));
7982
}
8083

@@ -101,6 +104,7 @@ pub async fn get_schema(
101104
name: cached_schema.name,
102105
default_variant_id: cached_schema.default_variant_id,
103106
variant_ids: cached_schema.variant_ids,
107+
upgrade_available: None, // Uninstalled schema
104108
}));
105109
}
106110
}

lib/luminork-server/src/service/v1/schemas/mod.rs

Lines changed: 60 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ use si_frontend_mv_types::{
4545
management::ManagementFuncKind,
4646
prop_schema::PropSchemaV1 as CachedPropSchemaV1,
4747
};
48+
use si_pkg::SiPkgError;
4849
use thiserror::Error;
4950
use utoipa::{
5051
self,
@@ -67,6 +68,7 @@ pub mod list_schemas;
6768
pub mod search_schemas;
6869
pub mod unlock_schema;
6970
pub mod update_schema_variant;
71+
pub mod upgrade_schema;
7072

7173
#[remain::sorted]
7274
#[derive(Debug, Error)]
@@ -87,8 +89,12 @@ pub enum SchemaError {
8789
MaterializedViews(#[from] dal_materialized_views::Error),
8890
#[error("schema missing asset func id: {0}")]
8991
MissingVariantFunc(SchemaVariantId),
92+
#[error("module error: {0}")]
93+
Module(#[from] dal::module::ModuleError),
9094
#[error("changes not permitted on HEAD change set")]
9195
NotPermittedOnHead,
96+
#[error("pkg error: {0}")]
97+
Pkg(#[from] dal::pkg::PkgError),
9298
#[error("prop error: {0}")]
9399
Prop(#[from] Box<PropError>),
94100
#[error("schema error: {0}")]
@@ -103,10 +109,18 @@ pub enum SchemaError {
103109
SchemaVariantNotFound(SchemaVariantId),
104110
#[error("schema variant {0} not a variant for the schema {1} error")]
105111
SchemaVariantNotMemberOfSchema(SchemaId, SchemaVariantId),
112+
#[error("sipkg error: {0}")]
113+
SiPkg(#[from] SiPkgError),
106114
#[error("slow runtime error: {0}")]
107115
SlowRuntime(#[from] dal::slow_rt::SlowRuntimeError),
108116
#[error("transactions error: {0}")]
109117
Transactions(#[from] TransactionsError),
118+
#[error("Cannot upgrade schema {0} - has unlocked variants that need to be locked first")]
119+
UnlockedVariantFoundForSchema(SchemaId),
120+
#[error("No cached module found for schema {0}")]
121+
UpgradableModuleNotFound(SchemaId),
122+
#[error("No schema variants imported for schema {0}")]
123+
UpgradeFailed(SchemaId),
110124
#[error("validation error: {0}")]
111125
Validation(String),
112126
#[error("variant authoring error: {0}")]
@@ -144,6 +158,10 @@ impl crate::service::v1::common::ErrorIntoResponse for SchemaError {
144158
SchemaError::SchemaNotFound(_) => (StatusCode::NOT_FOUND, self.to_string()),
145159
SchemaError::SchemaNotFoundByName(_) => (StatusCode::NOT_FOUND, self.to_string()),
146160
SchemaError::SchemaVariantNotFound(_) => (StatusCode::NOT_FOUND, self.to_string()),
161+
SchemaError::UpgradableModuleNotFound(_) => (StatusCode::NOT_FOUND, self.to_string()),
162+
SchemaError::UnlockedVariantFoundForSchema(_) => {
163+
(StatusCode::PRECONDITION_FAILED, self.to_string())
164+
}
147165
SchemaError::SchemaVariantNotMemberOfSchema(_, _) => {
148166
(StatusCode::PRECONDITION_REQUIRED, self.to_string())
149167
}
@@ -187,6 +205,7 @@ pub fn routes() -> Router<AppState> {
187205
Router::new()
188206
.route("/", get(get_schema::get_schema))
189207
.route("/unlock", post(unlock_schema::unlock_schema))
208+
.route("/upgrade", post(upgrade_schema::upgrade_schema))
190209
.nest(
191210
"/variant",
192211
Router::new()
@@ -632,6 +651,8 @@ pub struct GetSchemaV1Response {
632651
pub default_variant_id: SchemaVariantId,
633652
#[schema(value_type = Vec<String>, example = json!(["01H9ZQD35JPMBGHH69BT0Q79VZ", "01H9ZQD35JPMBGHH69BT0Q79VY"]))]
634653
pub variant_ids: Vec<SchemaVariantId>,
654+
#[schema(value_type = Option<bool>)]
655+
pub upgrade_available: Option<bool>,
635656
}
636657

637658
#[derive(Serialize, Debug, ToSchema)]
@@ -698,21 +719,58 @@ pub struct SchemaResponse {
698719
pub installed: bool,
699720
}
700721

722+
/// Check if an upgrade is available for a given schema.
723+
///
724+
/// Returns:
725+
/// - `Some(true)` if an upgrade is available (installed hash is in past hashes)
726+
/// - `Some(false)` if no upgrade is available (hashes match or no newer version)
727+
/// - `None` if upgrade check is not applicable (no cached module or no installed module)
728+
pub async fn check_schema_upgrade_available(
729+
ctx: &DalContext,
730+
schema_id: SchemaId,
731+
) -> SchemaResult<Option<bool>> {
732+
// Get the latest cached module
733+
let Some(cached_module) = CachedModule::find_latest_for_schema_id(ctx, schema_id).await? else {
734+
return Ok(None);
735+
};
736+
737+
// Get the installed module
738+
let Some(installed_module) =
739+
dal::module::Module::find_for_module_schema_id(ctx, schema_id.into()).await?
740+
else {
741+
return Ok(None);
742+
};
743+
744+
// Check if hashes differ
745+
if cached_module.latest_hash == installed_module.root_hash() {
746+
return Ok(Some(false));
747+
}
748+
749+
// Check if installed hash is in past hashes (means upgrade available)
750+
let past_hashes = CachedModule::list_for_schema_id(ctx, schema_id)
751+
.await?
752+
.iter()
753+
.map(|cm| cm.latest_hash.to_owned())
754+
.collect::<HashSet<String>>();
755+
756+
Ok(Some(past_hashes.contains(installed_module.root_hash())))
757+
}
758+
701759
pub async fn get_full_schema_list(ctx: &DalContext) -> SchemaResult<Vec<SchemaResponse>> {
702760
let schema_ids = dal::Schema::list_ids(ctx).await?;
703761
let installed_schema_ids: HashSet<_> = schema_ids.iter().collect();
704762

705763
// Get cached modules with their metadata
706764
let cached_modules = CachedModule::latest_modules(ctx).await?;
707-
// Create a map of schema ID to cached module data
708765
let mut cached_module_map: HashMap<SchemaId, CachedModule> = HashMap::new();
709766
for module in cached_modules {
710767
cached_module_map.insert(module.schema_id, module);
711768
}
712769

713770
// Combine both sources to create a complete list
714771
let mut all_schemas: Vec<SchemaResponse> = Vec::new();
715-
// First add installed schemas from Schema::list_ids
772+
773+
// First add installed schemas
716774
for schema_id in &schema_ids {
717775
if let Some(module) = cached_module_map.get(schema_id) {
718776
// Schema is both installed and in cache
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
use axum::{
2+
Json,
3+
extract::Path,
4+
};
5+
use dal::{
6+
Schema,
7+
SchemaVariant,
8+
cached_module::CachedModule,
9+
pkg::{
10+
ImportOptions,
11+
import_pkg_from_pkg,
12+
},
13+
};
14+
use serde::{
15+
Deserialize,
16+
Serialize,
17+
};
18+
use serde_json::json;
19+
use utoipa::ToSchema;
20+
21+
use super::{
22+
SchemaError,
23+
SchemaResult,
24+
SchemaV1RequestPath,
25+
};
26+
use crate::extract::{
27+
PosthogEventTracker,
28+
change_set::ChangeSetDalContext,
29+
};
30+
31+
#[derive(Deserialize, Serialize, Debug, ToSchema)]
32+
#[serde(rename_all = "camelCase")]
33+
pub struct UpgradeSchemaResponse {
34+
pub success: bool,
35+
}
36+
37+
#[utoipa::path(
38+
post,
39+
path = "/v1/w/{workspace_id}/change-sets/{change_set_id}/schemas/{schema_id}/upgrade",
40+
params(
41+
("workspace_id" = String, Path, description = "Workspace identifier"),
42+
("change_set_id" = String, Path, description = "Change Set identifier"),
43+
("schema_id" = String, Path, description = "Schema identifier"),
44+
),
45+
tag = "schemas",
46+
summary = "Upgrade a schema to the latest available version",
47+
responses(
48+
(status = 200, description = "Schema upgraded successfully", body = UpgradeSchemaResponse),
49+
(status = 401, description = "Unauthorized - Invalid or missing token"),
50+
(status = 404, description = "Schema not found"),
51+
(status = 404, description = "No cached module found for schema"),
52+
(status = 412, description = "Precondition failed - schema has unlocked variants or not installed"),
53+
(status = 500, description = "Internal server error", body = crate::service::v1::common::ApiError)
54+
)
55+
)]
56+
pub async fn upgrade_schema(
57+
ChangeSetDalContext(ref ctx): ChangeSetDalContext,
58+
tracker: PosthogEventTracker,
59+
Path(SchemaV1RequestPath { schema_id }): Path<SchemaV1RequestPath>,
60+
) -> SchemaResult<Json<UpgradeSchemaResponse>> {
61+
let schema_exists_locally = Schema::exists_locally(ctx, schema_id).await?;
62+
if !schema_exists_locally {
63+
return Err(SchemaError::SchemaNotFound(schema_id));
64+
}
65+
66+
// Check if there's an unlocked variant
67+
if SchemaVariant::get_unlocked_for_schema(ctx, schema_id)
68+
.await?
69+
.is_some()
70+
{
71+
return Err(SchemaError::UnlockedVariantFoundForSchema(schema_id));
72+
}
73+
74+
let Some(mut cached_module) = CachedModule::find_latest_for_schema_id(ctx, schema_id).await?
75+
else {
76+
return Err(SchemaError::UpgradableModuleNotFound(schema_id));
77+
};
78+
79+
let si_pkg = cached_module.si_pkg(ctx).await?;
80+
let metadata = si_pkg.metadata()?;
81+
82+
let (_, schema_variant_ids, _) = import_pkg_from_pkg(
83+
ctx,
84+
&si_pkg,
85+
Some(ImportOptions {
86+
schema_id: Some(schema_id.into()),
87+
..Default::default()
88+
}),
89+
)
90+
.await?;
91+
92+
if schema_variant_ids.is_empty() {
93+
return Err(SchemaError::UpgradeFailed(schema_id));
94+
}
95+
96+
tracker.track(
97+
ctx,
98+
"api_upgrade_schema",
99+
json!({
100+
"schema_id": schema_id,
101+
"pkg_name": metadata.name(),
102+
}),
103+
);
104+
105+
ctx.commit().await?;
106+
107+
Ok(Json(UpgradeSchemaResponse { success: true }))
108+
}

0 commit comments

Comments
 (0)