From 17d8bb320912b03c97f0d47633fd419524d24703 Mon Sep 17 00:00:00 2001 From: berimbolo13 Date: Tue, 5 May 2026 16:48:38 +0800 Subject: [PATCH 1/5] feat: add OpenMetadataEntityTag CRD Introduces a new CRD for declaratively tagging OpenMetadata entities discovered by ingestion (tables, topics, schemas, etc.). The reconciler queries OM's search endpoint for entities matching FQN patterns and applies a tag via the bulk tag-asset endpoints, recording assignments in status to drive drift detection and rename cleanup. --- api/v1alpha1/conditions.go | 14 + api/v1alpha1/openmetadataentitytag_types.go | 150 +++++++++ api/v1alpha1/tag_types.go | 26 ++ api/v1alpha1/zz_generated.deepcopy.go | 162 ++++++++++ cmd/main.go | 6 + ...ta.vortexa.com_openmetadataentitytags.yaml | 246 ++++++++++++++ config/crd/kustomization.yaml | 1 + config/rbac/kustomization.yaml | 3 + .../openmetadataentitytag_admin_role.yaml | 27 ++ .../openmetadataentitytag_editor_role.yaml | 33 ++ .../openmetadataentitytag_viewer_role.yaml | 29 ++ config/rbac/role.yaml | 3 + .../openmetadataentitytag_controller.go | 79 +++++ .../openmetadataentitytag_controller_test.go | 228 +++++++++++++ internal/handler/entitytag_handler.go | 300 ++++++++++++++++++ internal/handler/entitytag_handler_test.go | 152 +++++++++ internal/handler/entitytag_index.go | 53 ++++ internal/omclient/entitytag.go | 194 +++++++++++ internal/omclient/interface.go | 22 ++ internal/omclient/types.go | 38 +++ 20 files changed, 1766 insertions(+) create mode 100644 api/v1alpha1/openmetadataentitytag_types.go create mode 100644 api/v1alpha1/tag_types.go create mode 100644 config/crd/bases/openmetadata.vortexa.com_openmetadataentitytags.yaml create mode 100644 config/rbac/openmetadataentitytag_admin_role.yaml create mode 100644 config/rbac/openmetadataentitytag_editor_role.yaml create mode 100644 config/rbac/openmetadataentitytag_viewer_role.yaml create mode 100644 internal/controller/openmetadataentitytag_controller.go create mode 100644 internal/controller/openmetadataentitytag_controller_test.go create mode 100644 internal/handler/entitytag_handler.go create mode 100644 internal/handler/entitytag_handler_test.go create mode 100644 internal/handler/entitytag_index.go create mode 100644 internal/omclient/entitytag.go diff --git a/api/v1alpha1/conditions.go b/api/v1alpha1/conditions.go index 0f5895f..6d6e4a1 100644 --- a/api/v1alpha1/conditions.go +++ b/api/v1alpha1/conditions.go @@ -75,4 +75,18 @@ const ( // ReasonOwnerResolutionFailed indicates an owner could not be resolved to an OpenMetadata UUID. ReasonOwnerResolutionFailed = "OwnerResolutionFailed" + + // ReasonTagResolutionFailed indicates a tag FQN could not be resolved against OpenMetadata. + ReasonTagResolutionFailed = "TagResolutionFailed" + + // ReasonUnsupportedEntityType means this operator version doesn't support + // tagging the entity type the user specified in the CR. + ReasonUnsupportedEntityType = "UnsupportedEntityType" + + // ReasonEntitySearchFailed means the search request to OpenMetadata itself + // failed (network error, OM unreachable, server error). + ReasonEntitySearchFailed = "EntitySearchFailed" + + // ReasonTaggingFailed indicates applying or removing a tag on entities in OpenMetadata failed. + ReasonTaggingFailed = "TaggingFailed" ) diff --git a/api/v1alpha1/openmetadataentitytag_types.go b/api/v1alpha1/openmetadataentitytag_types.go new file mode 100644 index 0000000..eb58016 --- /dev/null +++ b/api/v1alpha1/openmetadataentitytag_types.go @@ -0,0 +1,150 @@ +/* +Copyright 2026. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// TaggableEntityType is the type of OpenMetadata entity that an +// OpenMetadataEntityTag can target. Limited to entities discovered by +// ingestion (tables, topics, schemas, etc.); the service-level entity +// itself is not tagged via this CR. +// +kubebuilder:validation:Enum=table;topic;databaseSchema;database;dashboard;mlmodel;pipeline;container;searchIndex +type TaggableEntityType string + +// Taggable entity type constants. Values match the OpenMetadata "type" field +// used in entity references. +const ( + TaggableEntityTypeTable TaggableEntityType = "table" + TaggableEntityTypeTopic TaggableEntityType = "topic" + TaggableEntityTypeDatabaseSchema TaggableEntityType = "databaseSchema" + TaggableEntityTypeDatabase TaggableEntityType = "database" + TaggableEntityTypeDashboard TaggableEntityType = "dashboard" + TaggableEntityTypeMlmodel TaggableEntityType = "mlmodel" + TaggableEntityTypePipeline TaggableEntityType = "pipeline" + TaggableEntityTypeContainer TaggableEntityType = "container" + TaggableEntityTypeSearchIndex TaggableEntityType = "searchIndex" +) + +// OpenMetadataEntityTagSpec defines the desired state of an +// OpenMetadataEntityTag, which applies tags to OpenMetadata entities the +// operator does not create directly (tables, topics, schemas, etc. discovered +// by ingestion). +type OpenMetadataEntityTagSpec struct { + // Match selects the entities this CR applies to. + Match EntityMatch `json:"match"` + + // Tag is the tag to apply to every matched entity. + Tag TagRef `json:"tag"` + + // OpenMetadataConnectionRef is the name of the cluster-scoped OpenMetadataConnection + // resource that defines the target OpenMetadata instance. + // +kubebuilder:validation:MinLength=1 + OpenMetadataConnectionRef string `json:"openMetadataConnectionRef"` +} + +// EntityMatch selects entities by type and FQN pattern. +type EntityMatch struct { + // EntityType is the OpenMetadata entity type to match. + EntityType TaggableEntityType `json:"entityType"` + + // Includes is the list of FQN patterns whose union defines the matched set. + // Each pattern uses '*' as a wildcard (zero or more characters) and '?' for + // a single character, applied to the entity's full FQN. Patterns should + // typically begin with the service name to scope the match. Examples: + // "my-svc.raw.*", "my-svc.public.events". + // +kubebuilder:validation:MinItems=1 + Includes []string `json:"includes"` + + // Excludes is an optional list of FQN patterns to subtract from the + // included set. Same pattern syntax as Includes. + // +optional + Excludes []string `json:"excludes,omitempty"` +} + +// TagAssignment records a single (entity, tag) assignment the operator has +// made. Used to detect drift and to drive removal on CR deletion. +type TagAssignment struct { + // EntityType of the tagged entity. + EntityType TaggableEntityType `json:"entityType"` + + // EntityID is the OpenMetadata UUID of the tagged entity. + EntityID string `json:"entityId"` + + // FullyQualifiedName of the tagged entity. + FullyQualifiedName string `json:"fullyQualifiedName"` + + // TagFQN is the fully qualified name of the applied tag. + TagFQN string `json:"tagFQN"` +} + +// OpenMetadataEntityTagStatus defines the observed state of an OpenMetadataEntityTag. +type OpenMetadataEntityTagStatus struct { + // TagAssignments is the set of (entity, tag) assignments the operator has + // applied. + // +optional + TagAssignments []TagAssignment `json:"tagAssignments,omitempty"` + + // LastReconcileTime is the timestamp of the last successful reconciliation. + // +optional + LastReconcileTime *metav1.Time `json:"lastReconcileTime,omitempty"` + + // ObservedGeneration is the most recent generation observed by the controller. + // +optional + ObservedGeneration int64 `json:"observedGeneration,omitempty"` + + // Conditions represent the latest available observations of the resource's state. + // +listType=map + // +listMapKey=type + // +optional + Conditions []metav1.Condition `json:"conditions,omitempty"` +} + +// +kubebuilder:object:root=true +// +kubebuilder:subresource:status +// +kubebuilder:printcolumn:name="Entity Type",type=string,JSONPath=`.spec.match.entityType` +// +kubebuilder:printcolumn:name="Ready",type=string,JSONPath=`.status.conditions[?(@.type=="Ready")].status` +// +kubebuilder:printcolumn:name="Age",type=date,JSONPath=`.metadata.creationTimestamp` + +// OpenMetadataEntityTag is the Schema for the openmetadataentitytags API. +// It applies a tag to OpenMetadata entities matched by FQN pattern. +type OpenMetadataEntityTag struct { + metav1.TypeMeta `json:",inline"` + + // +optional + metav1.ObjectMeta `json:"metadata,omitzero"` + + // +required + Spec OpenMetadataEntityTagSpec `json:"spec"` + + // +optional + Status OpenMetadataEntityTagStatus `json:"status,omitzero"` +} + +// +kubebuilder:object:root=true + +// OpenMetadataEntityTagList contains a list of OpenMetadataEntityTag. +type OpenMetadataEntityTagList struct { + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata,omitzero"` + Items []OpenMetadataEntityTag `json:"items"` +} + +func init() { + SchemeBuilder.Register(&OpenMetadataEntityTag{}, &OpenMetadataEntityTagList{}) +} diff --git a/api/v1alpha1/tag_types.go b/api/v1alpha1/tag_types.go new file mode 100644 index 0000000..fae5a17 --- /dev/null +++ b/api/v1alpha1/tag_types.go @@ -0,0 +1,26 @@ +/* +Copyright 2026. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +// TagRef references a tag in OpenMetadata by its fully qualified name +// (e.g. "Tier.Tier3"). The operator validates the tag exists at reconcile time. +type TagRef struct { + // TagFQN is the fully qualified name of the tag, in the form + // "." (e.g. "Tier.Tier3", "PII.Sensitive"). + // +kubebuilder:validation:MinLength=1 + TagFQN string `json:"tagFQN"` +} diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index b8be808..e140bfd 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -60,6 +60,31 @@ func (in *ConnectionSpec) DeepCopy() *ConnectionSpec { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *EntityMatch) DeepCopyInto(out *EntityMatch) { + *out = *in + if in.Includes != nil { + in, out := &in.Includes, &out.Includes + *out = make([]string, len(*in)) + copy(*out, *in) + } + if in.Excludes != nil { + in, out := &in.Excludes, &out.Excludes + *out = make([]string, len(*in)) + copy(*out, *in) + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new EntityMatch. +func (in *EntityMatch) DeepCopy() *EntityMatch { + if in == nil { + return nil + } + out := new(EntityMatch) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *EntityReference) DeepCopyInto(out *EntityReference) { *out = *in @@ -273,6 +298,113 @@ func (in *OpenMetadataConnectionSpec) DeepCopy() *OpenMetadataConnectionSpec { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *OpenMetadataEntityTag) DeepCopyInto(out *OpenMetadataEntityTag) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) + in.Spec.DeepCopyInto(&out.Spec) + in.Status.DeepCopyInto(&out.Status) +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new OpenMetadataEntityTag. +func (in *OpenMetadataEntityTag) DeepCopy() *OpenMetadataEntityTag { + if in == nil { + return nil + } + out := new(OpenMetadataEntityTag) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *OpenMetadataEntityTag) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *OpenMetadataEntityTagList) DeepCopyInto(out *OpenMetadataEntityTagList) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ListMeta.DeepCopyInto(&out.ListMeta) + if in.Items != nil { + in, out := &in.Items, &out.Items + *out = make([]OpenMetadataEntityTag, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new OpenMetadataEntityTagList. +func (in *OpenMetadataEntityTagList) DeepCopy() *OpenMetadataEntityTagList { + if in == nil { + return nil + } + out := new(OpenMetadataEntityTagList) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *OpenMetadataEntityTagList) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *OpenMetadataEntityTagSpec) DeepCopyInto(out *OpenMetadataEntityTagSpec) { + *out = *in + in.Match.DeepCopyInto(&out.Match) + out.Tag = in.Tag +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new OpenMetadataEntityTagSpec. +func (in *OpenMetadataEntityTagSpec) DeepCopy() *OpenMetadataEntityTagSpec { + if in == nil { + return nil + } + out := new(OpenMetadataEntityTagSpec) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *OpenMetadataEntityTagStatus) DeepCopyInto(out *OpenMetadataEntityTagStatus) { + *out = *in + if in.TagAssignments != nil { + in, out := &in.TagAssignments, &out.TagAssignments + *out = make([]TagAssignment, len(*in)) + copy(*out, *in) + } + if in.LastReconcileTime != nil { + in, out := &in.LastReconcileTime, &out.LastReconcileTime + *out = (*in).DeepCopy() + } + if in.Conditions != nil { + in, out := &in.Conditions, &out.Conditions + *out = make([]v1.Condition, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new OpenMetadataEntityTagStatus. +func (in *OpenMetadataEntityTagStatus) DeepCopy() *OpenMetadataEntityTagStatus { + if in == nil { + return nil + } + out := new(OpenMetadataEntityTagStatus) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *OpenMetadataService) DeepCopyInto(out *OpenMetadataService) { *out = *in @@ -511,6 +643,36 @@ func (in *ServiceOMSpec) DeepCopy() *ServiceOMSpec { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *TagAssignment) DeepCopyInto(out *TagAssignment) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TagAssignment. +func (in *TagAssignment) DeepCopy() *TagAssignment { + if in == nil { + return nil + } + out := new(TagAssignment) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *TagRef) DeepCopyInto(out *TagRef) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TagRef. +func (in *TagRef) DeepCopy() *TagRef { + if in == nil { + return nil + } + out := new(TagRef) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *TestCaseOMSpec) DeepCopyInto(out *TestCaseOMSpec) { *out = *in diff --git a/cmd/main.go b/cmd/main.go index ef7569d..aaefc22 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -157,6 +157,12 @@ func main() { setupLog.Error(err, "Failed to create controller", "controller", "OpenMetadataTestCase") os.Exit(1) } + if err := (&controller.OpenMetadataEntityTagReconciler{ + Client: mgr.GetClient(), + }).SetupWithManager(mgr); err != nil { + setupLog.Error(err, "Failed to create controller", "controller", "OpenMetadataEntityTag") + os.Exit(1) + } // +kubebuilder:scaffold:builder if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil { diff --git a/config/crd/bases/openmetadata.vortexa.com_openmetadataentitytags.yaml b/config/crd/bases/openmetadata.vortexa.com_openmetadataentitytags.yaml new file mode 100644 index 0000000..328bc46 --- /dev/null +++ b/config/crd/bases/openmetadata.vortexa.com_openmetadataentitytags.yaml @@ -0,0 +1,246 @@ +--- +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + annotations: + controller-gen.kubebuilder.io/version: v0.20.1 + name: openmetadataentitytags.openmetadata.vortexa.com +spec: + group: openmetadata.vortexa.com + names: + kind: OpenMetadataEntityTag + listKind: OpenMetadataEntityTagList + plural: openmetadataentitytags + singular: openmetadataentitytag + scope: Namespaced + versions: + - additionalPrinterColumns: + - jsonPath: .spec.match.entityType + name: Entity Type + type: string + - jsonPath: .status.conditions[?(@.type=="Ready")].status + name: Ready + type: string + - jsonPath: .metadata.creationTimestamp + name: Age + type: date + name: v1alpha1 + schema: + openAPIV3Schema: + description: |- + OpenMetadataEntityTag is the Schema for the openmetadataentitytags API. + It applies a tag to OpenMetadata entities matched by FQN pattern. + properties: + apiVersion: + description: |- + APIVersion defines the versioned schema of this representation of an object. + Servers should convert recognized schemas to the latest internal value, and + may reject unrecognized values. + More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources + type: string + kind: + description: |- + Kind is a string value representing the REST resource this object represents. + Servers may infer this from the endpoint the client submits requests to. + Cannot be updated. + In CamelCase. + More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds + type: string + metadata: + type: object + spec: + description: |- + OpenMetadataEntityTagSpec defines the desired state of an + OpenMetadataEntityTag, which applies tags to OpenMetadata entities the + operator does not create directly (tables, topics, schemas, etc. discovered + by ingestion). + properties: + match: + description: Match selects the entities this CR applies to. + properties: + entityType: + description: |- + EntityType is the OpenMetadata entity type to match. Must be one of the + supported lower-level types (table, topic, databaseSchema, database, etc.). + enum: + - table + - topic + - databaseSchema + - database + - dashboard + - mlmodel + - pipeline + - container + - searchIndex + type: string + excludes: + description: |- + Excludes is an optional list of FQN patterns to subtract from the + included set. Same pattern syntax as Includes. + items: + type: string + type: array + includes: + description: |- + Includes is the list of FQN patterns whose union defines the matched set. + Each pattern uses '*' as a wildcard (zero or more characters) and '?' for + a single character, applied to the entity's full FQN. Patterns should + typically begin with the service name to scope the match. Examples: + "my-svc.raw.*", "my-svc.public.events". + items: + type: string + minItems: 1 + type: array + required: + - entityType + - includes + type: object + openMetadataConnectionRef: + description: |- + OpenMetadataConnectionRef is the name of the cluster-scoped OpenMetadataConnection + resource that defines the target OpenMetadata instance. + minLength: 1 + type: string + tag: + description: |- + Tag is the tag to apply to every matched entity. + To apply more than one tag to the same entity set, use one CR per tag. + properties: + tagFQN: + description: |- + TagFQN is the fully qualified name of the tag, in the form + "." (e.g. "Tier.Tier3", "PII.Sensitive"). + minLength: 1 + type: string + required: + - tagFQN + type: object + required: + - match + - openMetadataConnectionRef + - tag + type: object + status: + description: OpenMetadataEntityTagStatus defines the observed state of + an OpenMetadataEntityTag. + properties: + conditions: + description: Conditions represent the latest available observations + of the resource's state. + items: + description: Condition contains details for one aspect of the current + state of this API Resource. + properties: + lastTransitionTime: + description: |- + lastTransitionTime is the last time the condition transitioned from one status to another. + This should be when the underlying condition changed. If that is not known, then using the time when the API field changed is acceptable. + format: date-time + type: string + message: + description: |- + message is a human readable message indicating details about the transition. + This may be an empty string. + maxLength: 32768 + type: string + observedGeneration: + description: |- + observedGeneration represents the .metadata.generation that the condition was set based upon. + For instance, if .metadata.generation is currently 12, but the .status.conditions[x].observedGeneration is 9, the condition is out of date + with respect to the current state of the instance. + format: int64 + minimum: 0 + type: integer + reason: + description: |- + reason contains a programmatic identifier indicating the reason for the condition's last transition. + Producers of specific condition types may define expected values and meanings for this field, + and whether the values are considered a guaranteed API. + The value should be a CamelCase string. + This field may not be empty. + maxLength: 1024 + minLength: 1 + pattern: ^[A-Za-z]([A-Za-z0-9_,:]*[A-Za-z0-9_])?$ + type: string + status: + description: status of the condition, one of True, False, Unknown. + enum: + - "True" + - "False" + - Unknown + type: string + type: + description: type of condition in CamelCase or in foo.example.com/CamelCase. + maxLength: 316 + pattern: ^([a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*/)?(([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9])$ + type: string + required: + - lastTransitionTime + - message + - reason + - status + - type + type: object + type: array + x-kubernetes-list-map-keys: + - type + x-kubernetes-list-type: map + lastReconcileTime: + description: LastReconcileTime is the timestamp of the last successful + reconciliation. + format: date-time + type: string + observedGeneration: + description: ObservedGeneration is the most recent generation observed + by the controller. + format: int64 + type: integer + tagAssignments: + description: |- + TagAssignments is the set of (entity, tag) assignments the operator has + applied. The operator manages only these assignments additively — tags + set via the OM UI or by other controllers are left untouched. + items: + description: |- + TagAssignment records a single (entity, tag) assignment the operator has + made. Used to detect drift and to drive removal on CR deletion. + properties: + entityId: + description: EntityID is the OpenMetadata UUID of the tagged + entity. + type: string + entityType: + description: EntityType of the tagged entity. + enum: + - table + - topic + - databaseSchema + - database + - dashboard + - mlmodel + - pipeline + - container + - searchIndex + type: string + fullyQualifiedName: + description: FullyQualifiedName of the tagged entity. + type: string + tagFQN: + description: TagFQN is the fully qualified name of the applied + tag. + type: string + required: + - entityId + - entityType + - fullyQualifiedName + - tagFQN + type: object + type: array + type: object + required: + - spec + type: object + served: true + storage: true + subresources: + status: {} diff --git a/config/crd/kustomization.yaml b/config/crd/kustomization.yaml index 2ffe75b..9266a79 100644 --- a/config/crd/kustomization.yaml +++ b/config/crd/kustomization.yaml @@ -3,4 +3,5 @@ resources: - bases/openmetadata.vortexa.com_openmetadataservices.yaml - bases/openmetadata.vortexa.com_ingestionpipelines.yaml - bases/openmetadata.vortexa.com_openmetadatatestcases.yaml +- bases/openmetadata.vortexa.com_openmetadataentitytags.yaml # +kubebuilder:scaffold:crdkustomizeresource diff --git a/config/rbac/kustomization.yaml b/config/rbac/kustomization.yaml index c7ee862..cab48c1 100644 --- a/config/rbac/kustomization.yaml +++ b/config/rbac/kustomization.yaml @@ -31,4 +31,7 @@ resources: - openmetadatatestcase_admin_role.yaml - openmetadatatestcase_editor_role.yaml - openmetadatatestcase_viewer_role.yaml +- openmetadataentitytag_admin_role.yaml +- openmetadataentitytag_editor_role.yaml +- openmetadataentitytag_viewer_role.yaml diff --git a/config/rbac/openmetadataentitytag_admin_role.yaml b/config/rbac/openmetadataentitytag_admin_role.yaml new file mode 100644 index 0000000..2cb8c9a --- /dev/null +++ b/config/rbac/openmetadataentitytag_admin_role.yaml @@ -0,0 +1,27 @@ +# This rule is not used by the project openmetadata-operator itself. +# It is provided to allow the cluster admin to help manage permissions for users. +# +# Grants full permissions ('*') over openmetadataentitytags. +# This role is intended for users authorized to modify roles and bindings within the cluster, +# enabling them to delegate specific permissions to other users or groups as needed. + +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + labels: + app.kubernetes.io/name: openmetadata-operator + app.kubernetes.io/managed-by: kustomize + name: openmetadataentitytag-admin-role +rules: +- apiGroups: + - openmetadata.vortexa.com + resources: + - openmetadataentitytags + verbs: + - '*' +- apiGroups: + - openmetadata.vortexa.com + resources: + - openmetadataentitytags/status + verbs: + - get diff --git a/config/rbac/openmetadataentitytag_editor_role.yaml b/config/rbac/openmetadataentitytag_editor_role.yaml new file mode 100644 index 0000000..34b731c --- /dev/null +++ b/config/rbac/openmetadataentitytag_editor_role.yaml @@ -0,0 +1,33 @@ +# This rule is not used by the project openmetadata-operator itself. +# It is provided to allow the cluster admin to help manage permissions for users. +# +# Grants permissions to create, update, and delete openmetadataentitytags. +# This role is intended for users who need to manage these resources +# but should not control RBAC or manage permissions for others. + +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + labels: + app.kubernetes.io/name: openmetadata-operator + app.kubernetes.io/managed-by: kustomize + name: openmetadataentitytag-editor-role +rules: +- apiGroups: + - openmetadata.vortexa.com + resources: + - openmetadataentitytags + verbs: + - create + - delete + - get + - list + - patch + - update + - watch +- apiGroups: + - openmetadata.vortexa.com + resources: + - openmetadataentitytags/status + verbs: + - get diff --git a/config/rbac/openmetadataentitytag_viewer_role.yaml b/config/rbac/openmetadataentitytag_viewer_role.yaml new file mode 100644 index 0000000..68a9ab7 --- /dev/null +++ b/config/rbac/openmetadataentitytag_viewer_role.yaml @@ -0,0 +1,29 @@ +# This rule is not used by the project openmetadata-operator itself. +# It is provided to allow the cluster admin to help manage permissions for users. +# +# Grants read-only access to openmetadataentitytags. +# This role is intended for users who need visibility into these resources +# without permissions to modify them. It is ideal for monitoring purposes and limited-access viewing. + +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + labels: + app.kubernetes.io/name: openmetadata-operator + app.kubernetes.io/managed-by: kustomize + name: openmetadataentitytag-viewer-role +rules: +- apiGroups: + - openmetadata.vortexa.com + resources: + - openmetadataentitytags + verbs: + - get + - list + - watch +- apiGroups: + - openmetadata.vortexa.com + resources: + - openmetadataentitytags/status + verbs: + - get diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index 84bc75d..d1a66da 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -23,6 +23,7 @@ rules: - openmetadata.vortexa.com resources: - ingestionpipelines + - openmetadataentitytags - openmetadataservices - openmetadatatestcases verbs: @@ -37,6 +38,7 @@ rules: - openmetadata.vortexa.com resources: - ingestionpipelines/finalizers + - openmetadataentitytags/finalizers - openmetadataservices/finalizers - openmetadatatestcases/finalizers verbs: @@ -45,6 +47,7 @@ rules: - openmetadata.vortexa.com resources: - ingestionpipelines/status + - openmetadataentitytags/status - openmetadataservices/status - openmetadatatestcases/status verbs: diff --git a/internal/controller/openmetadataentitytag_controller.go b/internal/controller/openmetadataentitytag_controller.go new file mode 100644 index 0000000..9d13d45 --- /dev/null +++ b/internal/controller/openmetadataentitytag_controller.go @@ -0,0 +1,79 @@ +/* +Copyright 2026. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "context" + + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + + omv1alpha1 "github.com/VorTECHsa/openmetadata-operator/api/v1alpha1" + "github.com/VorTECHsa/openmetadata-operator/internal/finalizer" + "github.com/VorTECHsa/openmetadata-operator/internal/handler" + "github.com/VorTECHsa/openmetadata-operator/internal/omclient" +) + +// OpenMetadataEntityTagReconciler reconciles an OpenMetadataEntityTag object. +// It is a thin orchestrator: fetch the CR, manage the entity-tag finalizer, +// and delegate the matching/diff/apply loop to the EntityTagHandler. +type OpenMetadataEntityTagReconciler struct { + client.Client + Handler *handler.EntityTagHandler +} + +// +kubebuilder:rbac:groups=openmetadata.vortexa.com,resources=openmetadataentitytags,verbs=get;list;watch;create;update;patch;delete +// +kubebuilder:rbac:groups=openmetadata.vortexa.com,resources=openmetadataentitytags/status,verbs=get;update;patch +// +kubebuilder:rbac:groups=openmetadata.vortexa.com,resources=openmetadataentitytags/finalizers,verbs=update + +// Reconcile handles a single reconciliation loop for an OpenMetadataEntityTag resource. +func (r *OpenMetadataEntityTagReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + et := &omv1alpha1.OpenMetadataEntityTag{} + if err := r.Get(ctx, req.NamespacedName, et); err != nil { + return ctrl.Result{}, client.IgnoreNotFound(err) + } + + if !et.DeletionTimestamp.IsZero() { + return r.Handler.HandleDeletion(ctx, et) + } + + added, err := finalizer.EnsurePresent(ctx, r.Client, et) + if err != nil { + return ctrl.Result{}, err + } + if added { + return ctrl.Result{}, nil + } + + return r.Handler.Reconcile(ctx, et) +} + +// SetupWithManager sets up the controller with the Manager. If Handler is nil, +// a default EntityTagHandler is wired with real dependencies. +func (r *OpenMetadataEntityTagReconciler) SetupWithManager(mgr ctrl.Manager) error { + if r.Handler == nil { + r.Handler = &handler.EntityTagHandler{ + Client: r.Client, + Recorder: mgr.GetEventRecorder("openmetadataentitytag-controller"), + NewOMClient: omclient.NewEntityTagClient, + } + } + return ctrl.NewControllerManagedBy(mgr). + For(&omv1alpha1.OpenMetadataEntityTag{}). + Named("openmetadataentitytag"). + Complete(r) +} diff --git a/internal/controller/openmetadataentitytag_controller_test.go b/internal/controller/openmetadataentitytag_controller_test.go new file mode 100644 index 0000000..515f45d --- /dev/null +++ b/internal/controller/openmetadataentitytag_controller_test.go @@ -0,0 +1,228 @@ +/* +Copyright 2026. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "context" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + omv1alpha1 "github.com/VorTECHsa/openmetadata-operator/api/v1alpha1" + "github.com/VorTECHsa/openmetadata-operator/internal/finalizer" + "github.com/VorTECHsa/openmetadata-operator/internal/handler" + "github.com/VorTECHsa/openmetadata-operator/internal/omclient" +) + +// stubEntityTagClient implements omclient.EntityTagClient with recorded calls. +type stubEntityTagClient struct { + searchResp []omclient.EntitySummary + tagIDs map[string]string + addCalls []stubBulkCall + removeCalls []stubBulkCall +} + +type stubBulkCall struct { + tagID string + assets []omclient.AssetRef +} + +func (s *stubEntityTagClient) SearchEntities(_ context.Context, _ string, _, _ []string) ([]omclient.EntitySummary, error) { + return s.searchResp, nil +} + +func (s *stubEntityTagClient) GetEntityByName(_ context.Context, _, fqn string) (string, error) { + if id, ok := s.tagIDs[fqn]; ok { + return id, nil + } + return "", &omclient.APIError{StatusCode: 404, Body: "not found"} +} + +func (s *stubEntityTagClient) BulkAddTagToAssets(_ context.Context, tagID string, assets []omclient.AssetRef) error { + s.addCalls = append(s.addCalls, stubBulkCall{tagID: tagID, assets: assets}) + return nil +} + +func (s *stubEntityTagClient) BulkRemoveTagFromAssets(_ context.Context, tagID string, assets []omclient.AssetRef) error { + s.removeCalls = append(s.removeCalls, stubBulkCall{tagID: tagID, assets: assets}) + return nil +} + +func newEntityTagReconciler(stub *stubEntityTagClient) *OpenMetadataEntityTagReconciler { + return &OpenMetadataEntityTagReconciler{ + Client: k8sClient, + Handler: &handler.EntityTagHandler{ + Client: k8sClient, + NewOMClient: func(_, _ string) omclient.EntityTagClient { return stub }, + }, + } +} + +var _ = Describe("OpenMetadataEntityTag Controller", func() { + const ( + resourceName = "trading-kafka-raw" + namespace = "default" + connectionName = "om-connection-et" + ) + + namespacedName := types.NamespacedName{Name: resourceName, Namespace: namespace} + + createPrereqs := func() { + conn := &omv1alpha1.OpenMetadataConnection{ + ObjectMeta: metav1.ObjectMeta{Name: connectionName}, + Spec: omv1alpha1.OpenMetadataConnectionSpec{ + URL: "http://openmetadata:8585/api", + AuthSecretRef: omv1alpha1.SecretReference{ + Name: "openmetadata-api-secret-et", Namespace: namespace, Key: "token", + }, + }, + } + Expect(k8sClient.Create(ctx, conn)).To(Succeed()) + + secret := &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{Name: "openmetadata-api-secret-et", Namespace: namespace}, + Data: map[string][]byte{"token": []byte("test-jwt")}, + } + Expect(k8sClient.Create(ctx, secret)).To(Succeed()) + } + + createCR := func() *omv1alpha1.OpenMetadataEntityTag { + return &omv1alpha1.OpenMetadataEntityTag{ + ObjectMeta: metav1.ObjectMeta{Name: resourceName, Namespace: namespace}, + Spec: omv1alpha1.OpenMetadataEntityTagSpec{ + Match: omv1alpha1.EntityMatch{ + EntityType: omv1alpha1.TaggableEntityTypeTopic, + Includes: []string{"trading-kafka.raw.*"}, + }, + Tag: omv1alpha1.TagRef{TagFQN: "Tier.Tier5"}, + OpenMetadataConnectionRef: connectionName, + }, + } + } + + cleanUp := func() { + et := &omv1alpha1.OpenMetadataEntityTag{} + if err := k8sClient.Get(ctx, namespacedName, et); err == nil { + et.Finalizers = nil + _ = k8sClient.Update(ctx, et) + _ = k8sClient.Delete(ctx, et) + } + conn := &omv1alpha1.OpenMetadataConnection{} + if err := k8sClient.Get(ctx, types.NamespacedName{Name: connectionName}, conn); err == nil { + _ = k8sClient.Delete(ctx, conn) + } + secret := &corev1.Secret{} + if err := k8sClient.Get(ctx, types.NamespacedName{Name: "openmetadata-api-secret-et", Namespace: namespace}, secret); err == nil { + _ = k8sClient.Delete(ctx, secret) + } + } + + BeforeEach(func() { createPrereqs() }) + AfterEach(func() { cleanUp() }) + + It("returns empty result when CR does not exist", func() { + stub := &stubEntityTagClient{} + reconciler := newEntityTagReconciler(stub) + + result, err := reconciler.Reconcile(ctx, reconcile.Request{ + NamespacedName: types.NamespacedName{Name: "nonexistent", Namespace: namespace}, + }) + Expect(err).NotTo(HaveOccurred()) + Expect(result).To(Equal(reconcile.Result{})) + }) + + It("adds finalizer on first reconciliation", func() { + Expect(k8sClient.Create(ctx, createCR())).To(Succeed()) + stub := &stubEntityTagClient{} + reconciler := newEntityTagReconciler(stub) + + _, err := reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: namespacedName}) + Expect(err).NotTo(HaveOccurred()) + + updated := &omv1alpha1.OpenMetadataEntityTag{} + Expect(k8sClient.Get(ctx, namespacedName, updated)).To(Succeed()) + Expect(updated.Finalizers).To(ContainElement(finalizer.Name)) + }) + + It("applies tags to matched entities and records them in status", func() { + Expect(k8sClient.Create(ctx, createCR())).To(Succeed()) + // Filtering is server-side via ES, so the stub returns the already-matched + // set (what the OM /search/query endpoint would return for the include pattern). + stub := &stubEntityTagClient{ + searchResp: []omclient.EntitySummary{ + {ID: "uuid-raw-events", FullyQualifiedName: "trading-kafka.raw.events"}, + {ID: "uuid-raw-orders", FullyQualifiedName: "trading-kafka.raw.orders"}, + }, + tagIDs: map[string]string{"Tier.Tier5": "tier5-uuid"}, + } + reconciler := newEntityTagReconciler(stub) + + // First reconcile adds finalizer. + _, err := reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: namespacedName}) + Expect(err).NotTo(HaveOccurred()) + + // Second reconcile applies tags. + result, err := reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: namespacedName}) + Expect(err).NotTo(HaveOccurred()) + Expect(result.RequeueAfter).To(BeNumerically(">", 0)) + + // Both raw.* topics should have been added to the bulk call. + Expect(stub.addCalls).To(HaveLen(1)) + Expect(stub.addCalls[0].tagID).To(Equal("tier5-uuid")) + Expect(stub.addCalls[0].assets).To(HaveLen(2)) + Expect(stub.removeCalls).To(BeEmpty()) + + updated := &omv1alpha1.OpenMetadataEntityTag{} + Expect(k8sClient.Get(ctx, namespacedName, updated)).To(Succeed()) + Expect(updated.Status.TagAssignments).To(HaveLen(2)) + }) + + It("removes tags on deletion via finalizer", func() { + Expect(k8sClient.Create(ctx, createCR())).To(Succeed()) + stub := &stubEntityTagClient{ + searchResp: []omclient.EntitySummary{ + {ID: "uuid-raw-events", FullyQualifiedName: "trading-kafka.raw.events"}, + }, + tagIDs: map[string]string{"Tier.Tier5": "tier5-uuid"}, + } + reconciler := newEntityTagReconciler(stub) + + // Reconcile twice to populate status.appliedTags. + _, err := reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: namespacedName}) + Expect(err).NotTo(HaveOccurred()) + _, err = reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: namespacedName}) + Expect(err).NotTo(HaveOccurred()) + + // Trigger deletion. + et := &omv1alpha1.OpenMetadataEntityTag{} + Expect(k8sClient.Get(ctx, namespacedName, et)).To(Succeed()) + Expect(k8sClient.Delete(ctx, et)).To(Succeed()) + + _, err = reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: namespacedName}) + Expect(err).NotTo(HaveOccurred()) + + // Bulk-remove was invoked for the previously-applied tag. + Expect(stub.removeCalls).To(HaveLen(1)) + Expect(stub.removeCalls[0].tagID).To(Equal("tier5-uuid")) + Expect(stub.removeCalls[0].assets).To(HaveLen(1)) + Expect(stub.removeCalls[0].assets[0].ID).To(Equal("uuid-raw-events")) + }) +}) diff --git a/internal/handler/entitytag_handler.go b/internal/handler/entitytag_handler.go new file mode 100644 index 0000000..933d0c4 --- /dev/null +++ b/internal/handler/entitytag_handler.go @@ -0,0 +1,300 @@ +/* +Copyright 2026. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package handler + +import ( + "context" + "fmt" + "sort" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/tools/events" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + logf "sigs.k8s.io/controller-runtime/pkg/log" + + omv1alpha1 "github.com/VorTECHsa/openmetadata-operator/api/v1alpha1" + "github.com/VorTECHsa/openmetadata-operator/internal/condition" + "github.com/VorTECHsa/openmetadata-operator/internal/finalizer" + "github.com/VorTECHsa/openmetadata-operator/internal/omclient" +) + +// tagsEntityTypePath is the OpenMetadata entity type path used to look up +// a tag by FQN (GET /api/v1/tags/name/{fqn}). +const tagsEntityTypePath = "tags" + +// EntityTagHandler contains the business logic for reconciling +// OpenMetadataEntityTag resources against the OpenMetadata API. +type EntityTagHandler struct { + Client client.Client + Recorder events.EventRecorder + NewOMClient func(baseURL, token string) omclient.EntityTagClient +} + +// Reconcile applies the desired tags to entities matched by the spec and +// reconciles any drift against status.appliedTags. +func (h *EntityTagHandler) Reconcile(ctx context.Context, et *omv1alpha1.OpenMetadataEntityTag) (ctrl.Result, error) { + logger := logf.FromContext(ctx) + + // --- Observe --- + + searchIndex, err := resolveEntitySearchIndex(et.Spec.Match.EntityType) + if err != nil { + h.setConditionAndPersist(ctx, et, metav1.ConditionFalse, omv1alpha1.ReasonUnsupportedEntityType, err.Error()) + return ctrl.Result{}, nil + } + + // Resolve OpenMetadataConnection. + conn := &omv1alpha1.OpenMetadataConnection{} + if err := h.Client.Get(ctx, types.NamespacedName{Name: et.Spec.OpenMetadataConnectionRef}, conn); err != nil { + logger.Error(err, "Failed to resolve OpenMetadataConnection", "ref", et.Spec.OpenMetadataConnectionRef) + h.setConditionAndPersist(ctx, et, metav1.ConditionFalse, omv1alpha1.ReasonConnectionNotFound, err.Error()) + return ctrl.Result{}, err + } + + token, err := resolveAuthToken(ctx, h.Client, conn.Spec.AuthSecretRef) + if err != nil { + logger.Error(err, "Failed to resolve auth token") + h.setConditionAndPersist(ctx, et, metav1.ConditionFalse, omv1alpha1.ReasonAuthTokenUnavailable, err.Error()) + return ctrl.Result{}, err + } + + omClient := h.NewOMClient(conn.Spec.URL, token) + + // Resolve FQN to UUID: the endpoints that attach/detach the tag to/from assets take the tag's UUID in the URL path. + tagFQN := et.Spec.Tag.TagFQN + tagID, err := omClient.GetEntityByName(ctx, tagsEntityTypePath, tagFQN) + if err != nil { + if omclient.IsNotFound(err) { + h.setConditionAndPersist(ctx, et, metav1.ConditionFalse, omv1alpha1.ReasonTagResolutionFailed, + fmt.Sprintf("tag not found: %q", tagFQN)) + return ctrl.Result{}, nil + } + logger.Error(err, "Failed to resolve tag", "tagFQN", tagFQN) + h.setConditionAndPersist(ctx, et, metav1.ConditionFalse, omv1alpha1.ReasonTagResolutionFailed, err.Error()) + return ctrl.Result{}, err + } + + // --- Compare --- + + matched, err := omClient.SearchEntities(ctx, searchIndex, et.Spec.Match.Includes, et.Spec.Match.Excludes) + if err != nil { + logger.Error(err, "Failed to search entities", "entityType", et.Spec.Match.EntityType, "searchIndex", searchIndex) + h.setConditionAndPersist(ctx, et, metav1.ConditionFalse, omv1alpha1.ReasonEntitySearchFailed, err.Error()) + return ctrl.Result{}, err + } + + // --- Converge --- + + // Split status.TagAssignments into entries for the current desired tag (used to + // compute the add/remove diff) and entries for any other tag (left over + // from a previous spec — clean those up so renames take effect cleanly). + previousForTag, staleByTag := splitAppliedByTag(et.Status.TagAssignments, tagFQN) + + toAdd, toRemove := diffAssets(matched, previousForTag, et.Spec.Match.EntityType) + if len(toAdd) > 0 { + if err := omClient.BulkAddTagToAssets(ctx, tagID, toAdd); err != nil { + logger.Error(err, "Failed to bulk-add tag", "tagFQN", tagFQN) + h.setConditionAndPersist(ctx, et, metav1.ConditionFalse, omv1alpha1.ReasonTaggingFailed, err.Error()) + h.emitEvent(et, corev1.EventTypeWarning, omv1alpha1.ReasonTaggingFailed, err.Error()) + return ctrl.Result{}, err + } + } + if len(toRemove) > 0 { + if err := omClient.BulkRemoveTagFromAssets(ctx, tagID, toRemove); err != nil { + logger.Error(err, "Failed to bulk-remove tag", "tagFQN", tagFQN) + h.setConditionAndPersist(ctx, et, metav1.ConditionFalse, omv1alpha1.ReasonTaggingFailed, err.Error()) + h.emitEvent(et, corev1.EventTypeWarning, omv1alpha1.ReasonTaggingFailed, err.Error()) + return ctrl.Result{}, err + } + } + + // Rename cleanup: remove any stale tag we previously applied under a + // different FQN. 404 on the stale tag means it's already gone — fine. + for staleFQN, applied := range staleByTag { + staleID, err := omClient.GetEntityByName(ctx, tagsEntityTypePath, staleFQN) + if err != nil { + if omclient.IsNotFound(err) { + continue + } + logger.Error(err, "Failed to resolve stale tag for cleanup", "tagFQN", staleFQN) + h.setConditionAndPersist(ctx, et, metav1.ConditionFalse, omv1alpha1.ReasonTagResolutionFailed, err.Error()) + return ctrl.Result{}, err + } + if err := omClient.BulkRemoveTagFromAssets(ctx, staleID, assetRefsFromApplied(applied)); err != nil { + logger.Error(err, "Failed to bulk-remove stale tag", "tagFQN", staleFQN) + h.setConditionAndPersist(ctx, et, metav1.ConditionFalse, omv1alpha1.ReasonTaggingFailed, err.Error()) + return ctrl.Result{}, err + } + } + + // New status: the desired tag on every matched entity. + desiredApplied := make([]omv1alpha1.TagAssignment, 0, len(matched)) + for _, e := range matched { + desiredApplied = append(desiredApplied, omv1alpha1.TagAssignment{ + EntityType: et.Spec.Match.EntityType, + EntityID: e.ID, + FullyQualifiedName: e.FullyQualifiedName, + TagFQN: tagFQN, + }) + } + sort.Slice(desiredApplied, func(i, j int) bool { + return desiredApplied[i].FullyQualifiedName < desiredApplied[j].FullyQualifiedName + }) + + now := metav1.Now() + et.Status.TagAssignments = desiredApplied + et.Status.LastReconcileTime = &now + et.Status.ObservedGeneration = et.Generation + + msg := fmt.Sprintf("Applied %s to %d %s entit(ies)", tagFQN, len(matched), et.Spec.Match.EntityType) + h.setConditionAndPersist(ctx, et, metav1.ConditionTrue, omv1alpha1.ReasonInSync, msg) + h.emitEvent(et, corev1.EventTypeNormal, omv1alpha1.ReasonInSync, msg) + + logger.Info("Reconciled OpenMetadataEntityTag", + "entityType", et.Spec.Match.EntityType, "tagFQN", tagFQN, "matched", len(matched)) + + return ctrl.Result{RequeueAfter: requeueInterval}, nil +} + +// HandleDeletion removes our previously-applied tags from each entity recorded +// in status.appliedTags, then releases the finalizer. Iterates by TagFQN to +// support cleanup of historical specs (renames left lingering tags in status). +func (h *EntityTagHandler) HandleDeletion(ctx context.Context, et *omv1alpha1.OpenMetadataEntityTag) (ctrl.Result, error) { + logger := logf.FromContext(ctx) + + if !finalizer.IsPresent(et) { + return ctrl.Result{}, nil + } + + if len(et.Status.TagAssignments) > 0 { + conn := &omv1alpha1.OpenMetadataConnection{} + if err := h.Client.Get(ctx, types.NamespacedName{Name: et.Spec.OpenMetadataConnectionRef}, conn); err != nil { + logger.Error(err, "Cannot resolve OpenMetadataConnection during deletion, retrying") + return ctrl.Result{}, err + } + + token, err := resolveAuthToken(ctx, h.Client, conn.Spec.AuthSecretRef) + if err != nil { + logger.Error(err, "Cannot resolve auth token during deletion, retrying") + return ctrl.Result{}, err + } + + omClient := h.NewOMClient(conn.Spec.URL, token) + + // Group applied tags by FQN (typically one bucket — only multiple if a + // rename was in flight when the CR was deleted). Resolve each FQN to + // its UUID and bulk-remove. 404s are treated as success (already gone). + _, byTag := splitAppliedByTag(et.Status.TagAssignments, "") + for tagFQN, applied := range byTag { + tagID, err := omClient.GetEntityByName(ctx, tagsEntityTypePath, tagFQN) + if err != nil { + if omclient.IsNotFound(err) { + continue + } + logger.Error(err, "Failed to resolve tag for deletion", "tagFQN", tagFQN) + return ctrl.Result{}, err + } + if err := omClient.BulkRemoveTagFromAssets(ctx, tagID, assetRefsFromApplied(applied)); err != nil { + logger.Error(err, "Failed to bulk-remove tag during deletion", "tagFQN", tagFQN) + h.setConditionAndPersist(ctx, et, metav1.ConditionFalse, omv1alpha1.ReasonTaggingFailed, err.Error()) + return ctrl.Result{}, err + } + } + } + + if err := finalizer.EnsureAbsent(ctx, h.Client, et); err != nil { + return ctrl.Result{}, err + } + return ctrl.Result{}, nil +} + +// splitAppliedByTag partitions status entries into those whose TagFQN matches +// `desired` (returned as a flat slice) and the rest grouped by their TagFQN. +// Pass an empty `desired` to fall through and group everything. +func splitAppliedByTag(applied []omv1alpha1.TagAssignment, desired string) ([]omv1alpha1.TagAssignment, map[string][]omv1alpha1.TagAssignment) { + var current []omv1alpha1.TagAssignment + other := make(map[string][]omv1alpha1.TagAssignment) + for _, a := range applied { + if a.TagFQN == desired { + current = append(current, a) + } else { + other[a.TagFQN] = append(other[a.TagFQN], a) + } + } + return current, other +} + +// assetRefsFromApplied converts AppliedTag status entries into AssetRefs for +// the bulk tag-asset endpoint. +func assetRefsFromApplied(applied []omv1alpha1.TagAssignment) []omclient.AssetRef { + refs := make([]omclient.AssetRef, 0, len(applied)) + for _, a := range applied { + refs = append(refs, omclient.AssetRef{ + ID: a.EntityID, Type: string(a.EntityType), FullyQualifiedName: a.FullyQualifiedName, + }) + } + return refs +} + +// diffAssets computes adds (entities matched now but not previously applied) +// and removes (entities previously applied but no longer matched). Identity is +// by entity ID. +func diffAssets(matched []omclient.EntitySummary, previous []omv1alpha1.TagAssignment, entityType omv1alpha1.TaggableEntityType) (toAdd, toRemove []omclient.AssetRef) { + matchedByID := make(map[string]omclient.EntitySummary, len(matched)) + for _, e := range matched { + matchedByID[e.ID] = e + } + prevByID := make(map[string]omv1alpha1.TagAssignment, len(previous)) + for _, p := range previous { + prevByID[p.EntityID] = p + } + + for id, e := range matchedByID { + if _, ok := prevByID[id]; !ok { + toAdd = append(toAdd, omclient.AssetRef{ + ID: id, Type: string(entityType), FullyQualifiedName: e.FullyQualifiedName, + }) + } + } + for id, p := range prevByID { + if _, ok := matchedByID[id]; !ok { + toRemove = append(toRemove, omclient.AssetRef{ + ID: id, Type: string(p.EntityType), FullyQualifiedName: p.FullyQualifiedName, + }) + } + } + return toAdd, toRemove +} + +func (h *EntityTagHandler) setConditionAndPersist(ctx context.Context, et *omv1alpha1.OpenMetadataEntityTag, + status metav1.ConditionStatus, reason, message string) { + + condition.SetReady(&et.Status.Conditions, et.Generation, status, reason, message) + + if err := h.Client.Status().Update(ctx, et); err != nil { + logf.FromContext(ctx).Error(err, "Failed to update status condition") + } +} + +func (h *EntityTagHandler) emitEvent(obj *omv1alpha1.OpenMetadataEntityTag, eventType, reason, message string) { + if h.Recorder != nil { + h.Recorder.Eventf(obj, nil, eventType, reason, "Reconcile", message) + } +} diff --git a/internal/handler/entitytag_handler_test.go b/internal/handler/entitytag_handler_test.go new file mode 100644 index 0000000..584289c --- /dev/null +++ b/internal/handler/entitytag_handler_test.go @@ -0,0 +1,152 @@ +/* +Copyright 2026. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package handler + +import ( + "sort" + "testing" + + omv1alpha1 "github.com/VorTECHsa/openmetadata-operator/api/v1alpha1" + "github.com/VorTECHsa/openmetadata-operator/internal/omclient" +) + +func TestDiffAssets(t *testing.T) { + matched := []omclient.EntitySummary{ + {ID: "id-1", FullyQualifiedName: "svc.raw.t1"}, + {ID: "id-2", FullyQualifiedName: "svc.raw.t2"}, + {ID: "id-3", FullyQualifiedName: "svc.raw.t3"}, + } + previous := []omv1alpha1.TagAssignment{ + {EntityType: omv1alpha1.TaggableEntityTypeTable, EntityID: "id-2", FullyQualifiedName: "svc.raw.t2", TagFQN: "Tier.Tier3"}, + {EntityType: omv1alpha1.TaggableEntityTypeTable, EntityID: "id-4", FullyQualifiedName: "svc.raw.gone", TagFQN: "Tier.Tier3"}, + } + + add, rem := diffAssets(matched, previous, omv1alpha1.TaggableEntityTypeTable) + + sortByID := func(refs []omclient.AssetRef) { + sort.Slice(refs, func(i, j int) bool { return refs[i].ID < refs[j].ID }) + } + sortByID(add) + sortByID(rem) + + wantAdd := []string{"id-1", "id-3"} + wantRem := []string{"id-4"} + + if len(add) != len(wantAdd) { + t.Fatalf("want %d adds, got %d (%+v)", len(wantAdd), len(add), add) + } + for i, ref := range add { + if ref.ID != wantAdd[i] { + t.Errorf("add[%d].ID = %q, want %q", i, ref.ID, wantAdd[i]) + } + if ref.Type != string(omv1alpha1.TaggableEntityTypeTable) { + t.Errorf("add[%d].Type = %q, want %q", i, ref.Type, omv1alpha1.TaggableEntityTypeTable) + } + } + if len(rem) != len(wantRem) { + t.Fatalf("want %d removes, got %d (%+v)", len(wantRem), len(rem), rem) + } + if rem[0].ID != wantRem[0] { + t.Errorf("remove[0].ID = %q, want %q", rem[0].ID, wantRem[0]) + } + if rem[0].FullyQualifiedName != "svc.raw.gone" { + t.Errorf("remove[0].FQN = %q, want preserved from previous", rem[0].FullyQualifiedName) + } +} + +func TestDiffAssetsEmpty(t *testing.T) { + add, rem := diffAssets(nil, nil, omv1alpha1.TaggableEntityTypeTable) + if add != nil || rem != nil { + t.Errorf("nil inputs should produce nil diffs; got add=%v rem=%v", add, rem) + } +} + +func TestSplitAppliedByTag(t *testing.T) { + in := []omv1alpha1.TagAssignment{ + {TagFQN: "Tier.Tier3", EntityID: "1"}, + {TagFQN: "Tier.Tier5", EntityID: "2"}, + {TagFQN: "Tier.Tier3", EntityID: "3"}, + } + + // Desired = Tier.Tier3: those go into current; Tier5 is stale. + current, stale := splitAppliedByTag(in, "Tier.Tier3") + if len(current) != 2 { + t.Errorf("current size = %d, want 2", len(current)) + } + if len(stale["Tier.Tier5"]) != 1 { + t.Errorf("stale[Tier.Tier5] size = %d, want 1", len(stale["Tier.Tier5"])) + } + if _, ok := stale["Tier.Tier3"]; ok { + t.Errorf("desired tag should not appear in stale map") + } + + // Empty desired (used by HandleDeletion) puts everything in stale. + current, stale = splitAppliedByTag(in, "") + if len(current) != 0 { + t.Errorf("expected empty current when desired is unset, got %d", len(current)) + } + if len(stale["Tier.Tier3"]) != 2 || len(stale["Tier.Tier5"]) != 1 { + t.Errorf("stale grouping wrong: %+v", stale) + } +} + +func TestAssetRefsFromApplied(t *testing.T) { + in := []omv1alpha1.TagAssignment{ + {EntityType: omv1alpha1.TaggableEntityTypeTable, EntityID: "id-1", FullyQualifiedName: "svc.db.s.t1", TagFQN: "Tier.Tier3"}, + {EntityType: omv1alpha1.TaggableEntityTypeTable, EntityID: "id-2", FullyQualifiedName: "svc.db.s.t2", TagFQN: "Tier.Tier3"}, + } + got := assetRefsFromApplied(in) + if len(got) != 2 { + t.Fatalf("len = %d, want 2", len(got)) + } + if got[0].ID != "id-1" || got[0].Type != "table" || got[0].FullyQualifiedName != "svc.db.s.t1" { + t.Errorf("unexpected first ref: %+v", got[0]) + } +} + +func TestResolveEntitySearchIndex(t *testing.T) { + tests := []struct { + in omv1alpha1.TaggableEntityType + want string + }{ + {omv1alpha1.TaggableEntityTypeTable, "table_search_index"}, + {omv1alpha1.TaggableEntityTypeTopic, "topic_search_index"}, + {omv1alpha1.TaggableEntityTypeDatabaseSchema, "database_schema_search_index"}, + {omv1alpha1.TaggableEntityTypeDatabase, "database_search_index"}, + {omv1alpha1.TaggableEntityTypeDashboard, "dashboard_search_index"}, + {omv1alpha1.TaggableEntityTypeMlmodel, "mlmodel_search_index"}, + {omv1alpha1.TaggableEntityTypePipeline, "pipeline_search_index"}, + {omv1alpha1.TaggableEntityTypeContainer, "container_search_index"}, + {omv1alpha1.TaggableEntityTypeSearchIndex, "search_entity_search_index"}, + } + for _, tt := range tests { + got, err := resolveEntitySearchIndex(tt.in) + if err != nil { + t.Errorf("resolveEntitySearchIndex(%q) returned error: %v", tt.in, err) + } + if got != tt.want { + t.Errorf("resolveEntitySearchIndex(%q) = %q, want %q", tt.in, got, tt.want) + } + } +} + +func TestResolveEntitySearchIndexUnknown(t *testing.T) { + _, err := resolveEntitySearchIndex(omv1alpha1.TaggableEntityType("nonexistent")) + if err == nil { + t.Fatal("expected error for unknown entity type") + } +} diff --git a/internal/handler/entitytag_index.go b/internal/handler/entitytag_index.go new file mode 100644 index 0000000..54b7b1e --- /dev/null +++ b/internal/handler/entitytag_index.go @@ -0,0 +1,53 @@ +/* +Copyright 2026. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package handler + +import ( + "errors" + "fmt" + + omv1alpha1 "github.com/VorTECHsa/openmetadata-operator/api/v1alpha1" +) + +// ErrUnsupportedEntityType signals a permanent spec error: the configured +// entityType has no search-index mapping in the operator. New entity types +// must be added here as we extend support. +var ErrUnsupportedEntityType = errors.New("unsupported entity type") + +// entityTypeSearchIndex maps each TaggableEntityType to the OpenMetadata +// search-index name used by the /v1/search/query endpoint. +// Index names follow the OM convention `_search_index`. +var entityTypeSearchIndex = map[omv1alpha1.TaggableEntityType]string{ + omv1alpha1.TaggableEntityTypeTable: "table_search_index", + omv1alpha1.TaggableEntityTypeTopic: "topic_search_index", + omv1alpha1.TaggableEntityTypeDatabaseSchema: "database_schema_search_index", + omv1alpha1.TaggableEntityTypeDatabase: "database_search_index", + omv1alpha1.TaggableEntityTypeDashboard: "dashboard_search_index", + omv1alpha1.TaggableEntityTypeMlmodel: "mlmodel_search_index", + omv1alpha1.TaggableEntityTypePipeline: "pipeline_search_index", + omv1alpha1.TaggableEntityTypeContainer: "container_search_index", + omv1alpha1.TaggableEntityTypeSearchIndex: "search_entity_search_index", +} + +// resolveEntitySearchIndex returns the search-index name for the given entity +// type, or ErrUnsupportedEntityType if it has no mapping. +func resolveEntitySearchIndex(t omv1alpha1.TaggableEntityType) (string, error) { + if idx, ok := entityTypeSearchIndex[t]; ok { + return idx, nil + } + return "", fmt.Errorf("%w: %q", ErrUnsupportedEntityType, t) +} diff --git a/internal/omclient/entitytag.go b/internal/omclient/entitytag.go new file mode 100644 index 0000000..a7e1994 --- /dev/null +++ b/internal/omclient/entitytag.go @@ -0,0 +1,194 @@ +/* +Copyright 2026. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package omclient + +import ( + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "net/url" + "strings" +) + +// searchPageSize is how many hits to request per search page. +const searchPageSize = 1000 + +// fqnField is the OpenMetadata search field used for FQN matching. It's +// indexed as an exact-match keyword with a lowercase normaliser — wildcards +// match the full lowercased string, so there's no tokenisation surprise. +const fqnField = "fullyQualifiedName" + +// NewEntityTagClient creates a new OpenMetadata API client for entity-tag operations. +func NewEntityTagClient(baseURL, token string) EntityTagClient { + return newClient(baseURL, token) +} + +// Compile-time check. +var _ EntityTagClient = (*Client)(nil) + +// SearchEntities returns every entity in the named search index whose +// fullyQualifiedName matches any include pattern and no exclude pattern. +// Wildcards '*' (zero or more chars) and '?' (one char) in patterns are +// passed through to OpenMetadata's search endpoint unchanged. Pagination is +// handled internally. +func (c *Client) SearchEntities(ctx context.Context, searchIndex string, includes, excludes []string) ([]EntitySummary, error) { + if len(includes) == 0 { + return nil, nil + } + q := buildSearchQuery(includes, excludes) + + var ( + out []EntitySummary + from int + ) + for { + params := url.Values{} + params.Set("q", q) + params.Set("index", searchIndex) + params.Set("size", fmt.Sprintf("%d", searchPageSize)) + params.Set("from", fmt.Sprintf("%d", from)) + reqURL := fmt.Sprintf("%s/v1/search/query?%s", c.baseURL, params.Encode()) + + page, err := c.fetchSearchPage(ctx, reqURL) + if err != nil { + return nil, err + } + for _, h := range page.Hits.Hits { + out = append(out, h.Source) + } + if len(page.Hits.Hits) < searchPageSize { + break + } + from += searchPageSize + // Defensive cap to avoid runaway pagination on a misbehaving backend. + if from >= page.Hits.Total.Value { + break + } + } + return out, nil +} + +func (c *Client) fetchSearchPage(ctx context.Context, reqURL string) (*searchResponse, error) { + resp, err := c.doRequest(ctx, http.MethodGet, reqURL, nil) + if err != nil { + return nil, fmt.Errorf("search entities: %w", err) + } + defer func() { _ = resp.Body.Close() }() + + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("reading search response: %w", err) + } + + if resp.StatusCode != http.StatusOK { + return nil, &APIError{StatusCode: resp.StatusCode, Body: string(body)} + } + + var page searchResponse + if err := json.Unmarshal(body, &page); err != nil { + return nil, fmt.Errorf("decoding search response: %w", err) + } + return &page, nil +} + +// buildSearchQuery composes the search query string sent to OpenMetadata in +// the form +// +// (field:p1 OR field:p2) AND NOT (field:e1 OR field:e2) +// +// where field = "fullyQualifiedName". Special characters in the literal +// portion of each pattern are escaped; '*' and '?' are passed through so +// they retain their wildcard meaning. +func buildSearchQuery(includes, excludes []string) string { + includeClause := joinPatternClauses(includes) + if len(excludes) == 0 { + return includeClause + } + excludeClause := joinPatternClauses(excludes) + return fmt.Sprintf("%s AND NOT %s", includeClause, excludeClause) +} + +func joinPatternClauses(patterns []string) string { + parts := make([]string, 0, len(patterns)) + for _, p := range patterns { + parts = append(parts, fmt.Sprintf("%s:%s", fqnField, escapeQueryValue(p))) + } + if len(parts) == 1 { + return parts[0] + } + return "(" + strings.Join(parts, " OR ") + ")" +} + +// escapeQueryValue escapes characters reserved by OpenMetadata's search +// query syntax inside a pattern, while preserving the wildcards '*' and '?'. +// Reserved chars: +// +// - - = & | > < ! ( ) { } [ ] ^ " ~ : \ / +// +// The whole-word "AND", "OR", "NOT" operators are also reserved but cannot +// appear inside our keyword-field values, so we don't worry about them. +func escapeQueryValue(s string) string { + const reserved = `+-=&|> Date: Wed, 6 May 2026 15:12:25 +0800 Subject: [PATCH 2/5] refactor: clarify EntityTag reconcile flow with explicit applyDiff/applyRename paths --- ...ta.vortexa.com_openmetadataentitytags.yaml | 11 +- internal/handler/entitytag_handler.go | 282 ++++++++++-------- internal/handler/entitytag_handler_test.go | 60 ++-- 3 files changed, 201 insertions(+), 152 deletions(-) diff --git a/config/crd/bases/openmetadata.vortexa.com_openmetadataentitytags.yaml b/config/crd/bases/openmetadata.vortexa.com_openmetadataentitytags.yaml index 328bc46..6a41443 100644 --- a/config/crd/bases/openmetadata.vortexa.com_openmetadataentitytags.yaml +++ b/config/crd/bases/openmetadata.vortexa.com_openmetadataentitytags.yaml @@ -59,9 +59,7 @@ spec: description: Match selects the entities this CR applies to. properties: entityType: - description: |- - EntityType is the OpenMetadata entity type to match. Must be one of the - supported lower-level types (table, topic, databaseSchema, database, etc.). + description: EntityType is the OpenMetadata entity type to match. enum: - table - topic @@ -102,9 +100,7 @@ spec: minLength: 1 type: string tag: - description: |- - Tag is the tag to apply to every matched entity. - To apply more than one tag to the same entity set, use one CR per tag. + description: Tag is the tag to apply to every matched entity. properties: tagFQN: description: |- @@ -198,8 +194,7 @@ spec: tagAssignments: description: |- TagAssignments is the set of (entity, tag) assignments the operator has - applied. The operator manages only these assignments additively — tags - set via the OM UI or by other controllers are left untouched. + applied. items: description: |- TagAssignment records a single (entity, tag) assignment the operator has diff --git a/internal/handler/entitytag_handler.go b/internal/handler/entitytag_handler.go index 933d0c4..30f2d5d 100644 --- a/internal/handler/entitytag_handler.go +++ b/internal/handler/entitytag_handler.go @@ -39,16 +39,29 @@ import ( // a tag by FQN (GET /api/v1/tags/name/{fqn}). const tagsEntityTypePath = "tags" -// EntityTagHandler contains the business logic for reconciling -// OpenMetadataEntityTag resources against the OpenMetadata API. +// EntityTagHandler reconciles OpenMetadataEntityTag resources against +// the OpenMetadata API. type EntityTagHandler struct { Client client.Client Recorder events.EventRecorder NewOMClient func(baseURL, token string) omclient.EntityTagClient } -// Reconcile applies the desired tags to entities matched by the spec and -// reconciles any drift against status.appliedTags. +// Reconcile follows Observe → Compare → Converge: +// - Observe: validate inputs, resolve the OM connection, look up the tag's UUID. +// - Compare: search OM for entities matching the spec's includes/excludes. +// - Converge: pick one of two paths based on what status records: +// 1) Steady state — tag in status is the same as in spec, so we just +// add the tag to matched entities that don't have it yet and remove it +// from entities that the spec no longer matches. +// 2) Rename — tag in status differs from spec, so we add the new tag +// to every matched entity and remove the old tag from every entity +// listed in status. +// +// Status invariant: after every successful reconcile, every entry in +// status.TagAssignments shares the same tagFQN — the one written this run. +// We use that to detect renames cheaply: if the tagFQN recorded in status +// differs from the spec's, the user has changed the desired tag. func (h *EntityTagHandler) Reconcile(ctx context.Context, et *omv1alpha1.OpenMetadataEntityTag) (ctrl.Result, error) { logger := logf.FromContext(ctx) @@ -60,24 +73,11 @@ func (h *EntityTagHandler) Reconcile(ctx context.Context, et *omv1alpha1.OpenMet return ctrl.Result{}, nil } - // Resolve OpenMetadataConnection. - conn := &omv1alpha1.OpenMetadataConnection{} - if err := h.Client.Get(ctx, types.NamespacedName{Name: et.Spec.OpenMetadataConnectionRef}, conn); err != nil { - logger.Error(err, "Failed to resolve OpenMetadataConnection", "ref", et.Spec.OpenMetadataConnectionRef) - h.setConditionAndPersist(ctx, et, metav1.ConditionFalse, omv1alpha1.ReasonConnectionNotFound, err.Error()) - return ctrl.Result{}, err - } - - token, err := resolveAuthToken(ctx, h.Client, conn.Spec.AuthSecretRef) + omClient, err := h.resolveOMClient(ctx, et) if err != nil { - logger.Error(err, "Failed to resolve auth token") - h.setConditionAndPersist(ctx, et, metav1.ConditionFalse, omv1alpha1.ReasonAuthTokenUnavailable, err.Error()) return ctrl.Result{}, err } - omClient := h.NewOMClient(conn.Spec.URL, token) - - // Resolve FQN to UUID: the endpoints that attach/detach the tag to/from assets take the tag's UUID in the URL path. tagFQN := et.Spec.Tag.TagFQN tagID, err := omClient.GetEntityByName(ctx, tagsEntityTypePath, tagFQN) if err != nil { @@ -102,68 +102,25 @@ func (h *EntityTagHandler) Reconcile(ctx context.Context, et *omv1alpha1.OpenMet // --- Converge --- - // Split status.TagAssignments into entries for the current desired tag (used to - // compute the add/remove diff) and entries for any other tag (left over - // from a previous spec — clean those up so renames take effect cleanly). - previousForTag, staleByTag := splitAppliedByTag(et.Status.TagAssignments, tagFQN) - - toAdd, toRemove := diffAssets(matched, previousForTag, et.Spec.Match.EntityType) - if len(toAdd) > 0 { - if err := omClient.BulkAddTagToAssets(ctx, tagID, toAdd); err != nil { - logger.Error(err, "Failed to bulk-add tag", "tagFQN", tagFQN) - h.setConditionAndPersist(ctx, et, metav1.ConditionFalse, omv1alpha1.ReasonTaggingFailed, err.Error()) - h.emitEvent(et, corev1.EventTypeWarning, omv1alpha1.ReasonTaggingFailed, err.Error()) - return ctrl.Result{}, err - } - } - if len(toRemove) > 0 { - if err := omClient.BulkRemoveTagFromAssets(ctx, tagID, toRemove); err != nil { - logger.Error(err, "Failed to bulk-remove tag", "tagFQN", tagFQN) - h.setConditionAndPersist(ctx, et, metav1.ConditionFalse, omv1alpha1.ReasonTaggingFailed, err.Error()) - h.emitEvent(et, corev1.EventTypeWarning, omv1alpha1.ReasonTaggingFailed, err.Error()) - return ctrl.Result{}, err - } - } - - // Rename cleanup: remove any stale tag we previously applied under a - // different FQN. 404 on the stale tag means it's already gone — fine. - for staleFQN, applied := range staleByTag { - staleID, err := omClient.GetEntityByName(ctx, tagsEntityTypePath, staleFQN) - if err != nil { - if omclient.IsNotFound(err) { - continue - } - logger.Error(err, "Failed to resolve stale tag for cleanup", "tagFQN", staleFQN) - h.setConditionAndPersist(ctx, et, metav1.ConditionFalse, omv1alpha1.ReasonTagResolutionFailed, err.Error()) + // Rename path triggers only when status records a tag (status non-empty) + // and that recorded tag differs from what the spec now wants. + if oldTagFQN := recordedTagFQN(et); oldTagFQN != "" && oldTagFQN != tagFQN { + if err := h.applyRename(ctx, omClient, et, tagID, oldTagFQN, matched); err != nil { return ctrl.Result{}, err } - if err := omClient.BulkRemoveTagFromAssets(ctx, staleID, assetRefsFromApplied(applied)); err != nil { - logger.Error(err, "Failed to bulk-remove stale tag", "tagFQN", staleFQN) - h.setConditionAndPersist(ctx, et, metav1.ConditionFalse, omv1alpha1.ReasonTaggingFailed, err.Error()) + } else { + if err := h.applyDiff(ctx, omClient, et, tagID, matched); err != nil { return ctrl.Result{}, err } } - // New status: the desired tag on every matched entity. - desiredApplied := make([]omv1alpha1.TagAssignment, 0, len(matched)) - for _, e := range matched { - desiredApplied = append(desiredApplied, omv1alpha1.TagAssignment{ - EntityType: et.Spec.Match.EntityType, - EntityID: e.ID, - FullyQualifiedName: e.FullyQualifiedName, - TagFQN: tagFQN, - }) - } - sort.Slice(desiredApplied, func(i, j int) bool { - return desiredApplied[i].FullyQualifiedName < desiredApplied[j].FullyQualifiedName - }) - + // Persist new status: the desired tag on every matched entity. + et.Status.TagAssignments = buildAssignments(matched, et.Spec.Match.EntityType, tagFQN) now := metav1.Now() - et.Status.TagAssignments = desiredApplied et.Status.LastReconcileTime = &now et.Status.ObservedGeneration = et.Generation - msg := fmt.Sprintf("Applied %s to %d %s entit(ies)", tagFQN, len(matched), et.Spec.Match.EntityType) + msg := fmt.Sprintf("Applied %s to %d %s entities", tagFQN, len(matched), et.Spec.Match.EntityType) h.setConditionAndPersist(ctx, et, metav1.ConditionTrue, omv1alpha1.ReasonInSync, msg) h.emitEvent(et, corev1.EventTypeNormal, omv1alpha1.ReasonInSync, msg) @@ -173,9 +130,59 @@ func (h *EntityTagHandler) Reconcile(ctx context.Context, et *omv1alpha1.OpenMet return ctrl.Result{RequeueAfter: requeueInterval}, nil } -// HandleDeletion removes our previously-applied tags from each entity recorded -// in status.appliedTags, then releases the finalizer. Iterates by TagFQN to -// support cleanup of historical specs (renames left lingering tags in status). +// applyDiff is the steady-state path: status.TagAssignments either records the +// same tagFQN as the spec (or is empty on first reconcile). Diff matched +// entities against what we previously applied — bulk-add newcomers, bulk-remove +// the ones that fell out of scope. +func (h *EntityTagHandler) applyDiff(ctx context.Context, omClient omclient.EntityTagClient, et *omv1alpha1.OpenMetadataEntityTag, tagID string, matched []omclient.EntitySummary) error { + toAdd, toRemove := diffAssets(matched, et.Status.TagAssignments, et.Spec.Match.EntityType) + + if len(toAdd) > 0 { + if err := omClient.BulkAddTagToAssets(ctx, tagID, toAdd); err != nil { + return h.failTagging(ctx, et, "Failed to bulk-add tag", err) + } + } + if len(toRemove) > 0 { + if err := omClient.BulkRemoveTagFromAssets(ctx, tagID, toRemove); err != nil { + return h.failTagging(ctx, et, "Failed to bulk-remove tag", err) + } + } + return nil +} + +// applyRename is the rename path: status.TagAssignments records assignments +// under oldTagFQN, but the spec now wants tagFQN. Apply the new tag to every +// matched entity (no diff needed — by invariant none of them carry it yet +// from this CR), then remove the old tag from every entity we previously +// applied it to. 404 on the old tag's lookup means it's already gone in OM +// — fine, nothing to remove. +func (h *EntityTagHandler) applyRename(ctx context.Context, omClient omclient.EntityTagClient, et *omv1alpha1.OpenMetadataEntityTag, tagID, oldTagFQN string, matched []omclient.EntitySummary) error { + if newRefs := assetRefsFromMatched(matched, et.Spec.Match.EntityType); len(newRefs) > 0 { + if err := omClient.BulkAddTagToAssets(ctx, tagID, newRefs); err != nil { + return h.failTagging(ctx, et, "Failed to bulk-add tag during rename", err) + } + } + + oldTagID, err := omClient.GetEntityByName(ctx, tagsEntityTypePath, oldTagFQN) + if err != nil { + if omclient.IsNotFound(err) { + return nil + } + logf.FromContext(ctx).Error(err, "Failed to resolve old tag for rename cleanup", "tagFQN", oldTagFQN) + h.setConditionAndPersist(ctx, et, metav1.ConditionFalse, omv1alpha1.ReasonTagResolutionFailed, err.Error()) + return err + } + oldRefs := assetRefsFromAssignments(et.Status.TagAssignments) + if err := omClient.BulkRemoveTagFromAssets(ctx, oldTagID, oldRefs); err != nil { + return h.failTagging(ctx, et, "Failed to bulk-remove old tag during rename", err) + } + return nil +} + +// HandleDeletion removes our previously-applied tags from each recorded asset, +// then releases the finalizer. Iterates by tagFQN so the rare case of a CR +// being deleted mid-rename (status holds entries under more than one FQN) is +// handled correctly. func (h *EntityTagHandler) HandleDeletion(ctx context.Context, et *omv1alpha1.OpenMetadataEntityTag) (ctrl.Result, error) { logger := logf.FromContext(ctx) @@ -184,34 +191,20 @@ func (h *EntityTagHandler) HandleDeletion(ctx context.Context, et *omv1alpha1.Op } if len(et.Status.TagAssignments) > 0 { - conn := &omv1alpha1.OpenMetadataConnection{} - if err := h.Client.Get(ctx, types.NamespacedName{Name: et.Spec.OpenMetadataConnectionRef}, conn); err != nil { - logger.Error(err, "Cannot resolve OpenMetadataConnection during deletion, retrying") + omClient, err := h.resolveOMClient(ctx, et) + if err != nil { return ctrl.Result{}, err } - token, err := resolveAuthToken(ctx, h.Client, conn.Spec.AuthSecretRef) - if err != nil { - logger.Error(err, "Cannot resolve auth token during deletion, retrying") + tagFQN := recordedTagFQN(et) + tagID, err := omClient.GetEntityByName(ctx, tagsEntityTypePath, tagFQN) + if err != nil && !omclient.IsNotFound(err) { + logger.Error(err, "Failed to resolve tag for deletion", "tagFQN", tagFQN) return ctrl.Result{}, err } - - omClient := h.NewOMClient(conn.Spec.URL, token) - - // Group applied tags by FQN (typically one bucket — only multiple if a - // rename was in flight when the CR was deleted). Resolve each FQN to - // its UUID and bulk-remove. 404s are treated as success (already gone). - _, byTag := splitAppliedByTag(et.Status.TagAssignments, "") - for tagFQN, applied := range byTag { - tagID, err := omClient.GetEntityByName(ctx, tagsEntityTypePath, tagFQN) - if err != nil { - if omclient.IsNotFound(err) { - continue - } - logger.Error(err, "Failed to resolve tag for deletion", "tagFQN", tagFQN) - return ctrl.Result{}, err - } - if err := omClient.BulkRemoveTagFromAssets(ctx, tagID, assetRefsFromApplied(applied)); err != nil { + // On 404 the tag is already gone in OM, so there's nothing to remove. + if err == nil { + if err := omClient.BulkRemoveTagFromAssets(ctx, tagID, assetRefsFromAssignments(et.Status.TagAssignments)); err != nil { logger.Error(err, "Failed to bulk-remove tag during deletion", "tagFQN", tagFQN) h.setConditionAndPersist(ctx, et, metav1.ConditionFalse, omv1alpha1.ReasonTaggingFailed, err.Error()) return ctrl.Result{}, err @@ -225,27 +218,82 @@ func (h *EntityTagHandler) HandleDeletion(ctx context.Context, et *omv1alpha1.Op return ctrl.Result{}, nil } -// splitAppliedByTag partitions status entries into those whose TagFQN matches -// `desired` (returned as a flat slice) and the rest grouped by their TagFQN. -// Pass an empty `desired` to fall through and group everything. -func splitAppliedByTag(applied []omv1alpha1.TagAssignment, desired string) ([]omv1alpha1.TagAssignment, map[string][]omv1alpha1.TagAssignment) { - var current []omv1alpha1.TagAssignment - other := make(map[string][]omv1alpha1.TagAssignment) - for _, a := range applied { - if a.TagFQN == desired { - current = append(current, a) - } else { - other[a.TagFQN] = append(other[a.TagFQN], a) - } +// resolveOMClient resolves the OpenMetadataConnection ref and auth token for +// the given CR, returning a configured client. Sets the appropriate condition +// and persists status on failure. +func (h *EntityTagHandler) resolveOMClient(ctx context.Context, et *omv1alpha1.OpenMetadataEntityTag) (omclient.EntityTagClient, error) { + logger := logf.FromContext(ctx) + + conn := &omv1alpha1.OpenMetadataConnection{} + if err := h.Client.Get(ctx, types.NamespacedName{Name: et.Spec.OpenMetadataConnectionRef}, conn); err != nil { + logger.Error(err, "Failed to resolve OpenMetadataConnection", "ref", et.Spec.OpenMetadataConnectionRef) + h.setConditionAndPersist(ctx, et, metav1.ConditionFalse, omv1alpha1.ReasonConnectionNotFound, err.Error()) + return nil, err } - return current, other + + token, err := resolveAuthToken(ctx, h.Client, conn.Spec.AuthSecretRef) + if err != nil { + logger.Error(err, "Failed to resolve auth token") + h.setConditionAndPersist(ctx, et, metav1.ConditionFalse, omv1alpha1.ReasonAuthTokenUnavailable, err.Error()) + return nil, err + } + + return h.NewOMClient(conn.Spec.URL, token), nil +} + +// When a bulk tag-asset call fails. +func (h *EntityTagHandler) failTagging(ctx context.Context, et *omv1alpha1.OpenMetadataEntityTag, msg string, err error) error { + logf.FromContext(ctx).Error(err, msg) + h.setConditionAndPersist(ctx, et, metav1.ConditionFalse, omv1alpha1.ReasonTaggingFailed, err.Error()) + h.emitEvent(et, corev1.EventTypeWarning, omv1alpha1.ReasonTaggingFailed, err.Error()) + return err +} + +// recordedTagFQN returns the tagFQN recorded in status, or "" if status is +// empty (first reconcile). Relies on the status invariant that every entry +// shares the same tagFQN, so the first entry is representative. +func recordedTagFQN(et *omv1alpha1.OpenMetadataEntityTag) string { + if len(et.Status.TagAssignments) == 0 { + return "" + } + return et.Status.TagAssignments[0].TagFQN +} + +// buildAssignments returns one TagAssignment per matched entity, sorted by +// FQN for stable status output. +func buildAssignments(matched []omclient.EntitySummary, entityType omv1alpha1.TaggableEntityType, tagFQN string) []omv1alpha1.TagAssignment { + out := make([]omv1alpha1.TagAssignment, 0, len(matched)) + for _, e := range matched { + out = append(out, omv1alpha1.TagAssignment{ + EntityType: entityType, + EntityID: e.ID, + FullyQualifiedName: e.FullyQualifiedName, + TagFQN: tagFQN, + }) + } + sort.Slice(out, func(i, j int) bool { + return out[i].FullyQualifiedName < out[j].FullyQualifiedName + }) + return out +} + +// assetRefsFromMatched converts search results into AssetRefs for the bulk +// tag-asset endpoint. +func assetRefsFromMatched(matched []omclient.EntitySummary, entityType omv1alpha1.TaggableEntityType) []omclient.AssetRef { + refs := make([]omclient.AssetRef, 0, len(matched)) + for _, e := range matched { + refs = append(refs, omclient.AssetRef{ + ID: e.ID, Type: string(entityType), FullyQualifiedName: e.FullyQualifiedName, + }) + } + return refs } -// assetRefsFromApplied converts AppliedTag status entries into AssetRefs for -// the bulk tag-asset endpoint. -func assetRefsFromApplied(applied []omv1alpha1.TagAssignment) []omclient.AssetRef { - refs := make([]omclient.AssetRef, 0, len(applied)) - for _, a := range applied { +// assetRefsFromAssignments converts status.TagAssignments entries into +// AssetRefs for the bulk tag-asset endpoint. +func assetRefsFromAssignments(assignments []omv1alpha1.TagAssignment) []omclient.AssetRef { + refs := make([]omclient.AssetRef, 0, len(assignments)) + for _, a := range assignments { refs = append(refs, omclient.AssetRef{ ID: a.EntityID, Type: string(a.EntityType), FullyQualifiedName: a.FullyQualifiedName, }) @@ -253,9 +301,9 @@ func assetRefsFromApplied(applied []omv1alpha1.TagAssignment) []omclient.AssetRe return refs } -// diffAssets computes adds (entities matched now but not previously applied) -// and removes (entities previously applied but no longer matched). Identity is -// by entity ID. +// diffAssets computes adds (in matched but not previously applied) and +// removes (previously applied but no longer matched). Identity is by +// entity ID. func diffAssets(matched []omclient.EntitySummary, previous []omv1alpha1.TagAssignment, entityType omv1alpha1.TaggableEntityType) (toAdd, toRemove []omclient.AssetRef) { matchedByID := make(map[string]omclient.EntitySummary, len(matched)) for _, e := range matched { diff --git a/internal/handler/entitytag_handler_test.go b/internal/handler/entitytag_handler_test.go index 584289c..72a92d1 100644 --- a/internal/handler/entitytag_handler_test.go +++ b/internal/handler/entitytag_handler_test.go @@ -75,41 +75,47 @@ func TestDiffAssetsEmpty(t *testing.T) { } } -func TestSplitAppliedByTag(t *testing.T) { - in := []omv1alpha1.TagAssignment{ - {TagFQN: "Tier.Tier3", EntityID: "1"}, - {TagFQN: "Tier.Tier5", EntityID: "2"}, - {TagFQN: "Tier.Tier3", EntityID: "3"}, - } - - // Desired = Tier.Tier3: those go into current; Tier5 is stale. - current, stale := splitAppliedByTag(in, "Tier.Tier3") - if len(current) != 2 { - t.Errorf("current size = %d, want 2", len(current)) - } - if len(stale["Tier.Tier5"]) != 1 { - t.Errorf("stale[Tier.Tier5] size = %d, want 1", len(stale["Tier.Tier5"])) - } - if _, ok := stale["Tier.Tier3"]; ok { - t.Errorf("desired tag should not appear in stale map") - } - - // Empty desired (used by HandleDeletion) puts everything in stale. - current, stale = splitAppliedByTag(in, "") - if len(current) != 0 { - t.Errorf("expected empty current when desired is unset, got %d", len(current)) +func TestRecordedTagFQN(t *testing.T) { + tests := []struct { + name string + assignments []omv1alpha1.TagAssignment + want string + }{ + {name: "empty status returns empty string", assignments: nil, want: ""}, + { + name: "single entry returns its tag", + assignments: []omv1alpha1.TagAssignment{ + {TagFQN: "Tier.Tier3", EntityID: "1"}, + }, + want: "Tier.Tier3", + }, + { + name: "multiple entries return first (invariant: all share same tag)", + assignments: []omv1alpha1.TagAssignment{ + {TagFQN: "Tier.Tier3", EntityID: "1"}, + {TagFQN: "Tier.Tier3", EntityID: "2"}, + }, + want: "Tier.Tier3", + }, } - if len(stale["Tier.Tier3"]) != 2 || len(stale["Tier.Tier5"]) != 1 { - t.Errorf("stale grouping wrong: %+v", stale) + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + et := &omv1alpha1.OpenMetadataEntityTag{ + Status: omv1alpha1.OpenMetadataEntityTagStatus{TagAssignments: tt.assignments}, + } + if got := recordedTagFQN(et); got != tt.want { + t.Errorf("got %q, want %q", got, tt.want) + } + }) } } -func TestAssetRefsFromApplied(t *testing.T) { +func TestAssetRefsFromAssignments(t *testing.T) { in := []omv1alpha1.TagAssignment{ {EntityType: omv1alpha1.TaggableEntityTypeTable, EntityID: "id-1", FullyQualifiedName: "svc.db.s.t1", TagFQN: "Tier.Tier3"}, {EntityType: omv1alpha1.TaggableEntityTypeTable, EntityID: "id-2", FullyQualifiedName: "svc.db.s.t2", TagFQN: "Tier.Tier3"}, } - got := assetRefsFromApplied(in) + got := assetRefsFromAssignments(in) if len(got) != 2 { t.Fatalf("len = %d, want 2", len(got)) } From d54a977ff2455f1ddbcdea5a491579fe0f9ec7a8 Mon Sep 17 00:00:00 2001 From: berimbolo13 Date: Wed, 6 May 2026 16:32:00 +0800 Subject: [PATCH 3/5] refactor: tighten EntityTag comments and drop search pagination cap --- .../openmetadataentitytag_controller_test.go | 2 +- internal/handler/entitytag_handler.go | 12 +++++++----- internal/omclient/entitytag.go | 4 ---- internal/omclient/types.go | 3 --- 4 files changed, 8 insertions(+), 13 deletions(-) diff --git a/internal/controller/openmetadataentitytag_controller_test.go b/internal/controller/openmetadataentitytag_controller_test.go index 515f45d..7d04536 100644 --- a/internal/controller/openmetadataentitytag_controller_test.go +++ b/internal/controller/openmetadataentitytag_controller_test.go @@ -205,7 +205,7 @@ var _ = Describe("OpenMetadataEntityTag Controller", func() { } reconciler := newEntityTagReconciler(stub) - // Reconcile twice to populate status.appliedTags. + // Reconcile twice: first adds the finalizer, second populates status.TagAssignments. _, err := reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: namespacedName}) Expect(err).NotTo(HaveOccurred()) _, err = reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: namespacedName}) diff --git a/internal/handler/entitytag_handler.go b/internal/handler/entitytag_handler.go index 30f2d5d..f7ff96e 100644 --- a/internal/handler/entitytag_handler.go +++ b/internal/handler/entitytag_handler.go @@ -179,10 +179,10 @@ func (h *EntityTagHandler) applyRename(ctx context.Context, omClient omclient.En return nil } -// HandleDeletion removes our previously-applied tags from each recorded asset, -// then releases the finalizer. Iterates by tagFQN so the rare case of a CR -// being deleted mid-rename (status holds entries under more than one FQN) is -// handled correctly. +// HandleDeletion removes our previously-applied tag from each recorded asset, +// then releases the finalizer. Relies on the status invariant that every entry +// in status.TagAssignments shares a single tagFQN, so a single bulk-remove +// suffices. func (h *EntityTagHandler) HandleDeletion(ctx context.Context, et *omv1alpha1.OpenMetadataEntityTag) (ctrl.Result, error) { logger := logf.FromContext(ctx) @@ -241,7 +241,9 @@ func (h *EntityTagHandler) resolveOMClient(ctx context.Context, et *omv1alpha1.O return h.NewOMClient(conn.Spec.URL, token), nil } -// When a bulk tag-asset call fails. +// failTagging logs a bulk tag-asset failure, records it on the CR's Ready +// condition with reason ReasonTaggingFailed, emits a warning event, and +// returns the original error for the caller to propagate. func (h *EntityTagHandler) failTagging(ctx context.Context, et *omv1alpha1.OpenMetadataEntityTag, msg string, err error) error { logf.FromContext(ctx).Error(err, msg) h.setConditionAndPersist(ctx, et, metav1.ConditionFalse, omv1alpha1.ReasonTaggingFailed, err.Error()) diff --git a/internal/omclient/entitytag.go b/internal/omclient/entitytag.go index a7e1994..7da1080 100644 --- a/internal/omclient/entitytag.go +++ b/internal/omclient/entitytag.go @@ -76,10 +76,6 @@ func (c *Client) SearchEntities(ctx context.Context, searchIndex string, include break } from += searchPageSize - // Defensive cap to avoid runaway pagination on a misbehaving backend. - if from >= page.Hits.Total.Value { - break - } } return out, nil } diff --git a/internal/omclient/types.go b/internal/omclient/types.go index e2966a2..003db6a 100644 --- a/internal/omclient/types.go +++ b/internal/omclient/types.go @@ -118,9 +118,6 @@ type searchHit struct { // response. type searchResponse struct { Hits struct { - Total struct { - Value int `json:"value"` - } `json:"total"` Hits []searchHit `json:"hits"` } `json:"hits"` } From d33d26f1f884a34408a88955b3d0dd3dd3007dd8 Mon Sep 17 00:00:00 2001 From: berimbolo13 Date: Wed, 6 May 2026 16:57:52 +0800 Subject: [PATCH 4/5] fix: declare EntityTag controller's dependent-resource RBAC markers --- internal/controller/openmetadataentitytag_controller.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/internal/controller/openmetadataentitytag_controller.go b/internal/controller/openmetadataentitytag_controller.go index 9d13d45..7b7a3e4 100644 --- a/internal/controller/openmetadataentitytag_controller.go +++ b/internal/controller/openmetadataentitytag_controller.go @@ -39,6 +39,9 @@ type OpenMetadataEntityTagReconciler struct { // +kubebuilder:rbac:groups=openmetadata.vortexa.com,resources=openmetadataentitytags,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=openmetadata.vortexa.com,resources=openmetadataentitytags/status,verbs=get;update;patch // +kubebuilder:rbac:groups=openmetadata.vortexa.com,resources=openmetadataentitytags/finalizers,verbs=update +// +kubebuilder:rbac:groups=openmetadata.vortexa.com,resources=openmetadataconnections,verbs=get;list;watch +// +kubebuilder:rbac:groups="",resources=secrets,verbs=get;list;watch +// +kubebuilder:rbac:groups="",resources=events,verbs=create;patch // Reconcile handles a single reconciliation loop for an OpenMetadataEntityTag resource. func (r *OpenMetadataEntityTagReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { From 0ef45e67fd53b9b1903828252b341253ca08d2cc Mon Sep 17 00:00:00 2001 From: berimbolo13 Date: Wed, 6 May 2026 17:05:09 +0800 Subject: [PATCH 5/5] fix: requeue when EntityTag's spec tag is not found in OpenMetadata --- internal/handler/entitytag_handler.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/internal/handler/entitytag_handler.go b/internal/handler/entitytag_handler.go index f7ff96e..38efb03 100644 --- a/internal/handler/entitytag_handler.go +++ b/internal/handler/entitytag_handler.go @@ -81,11 +81,6 @@ func (h *EntityTagHandler) Reconcile(ctx context.Context, et *omv1alpha1.OpenMet tagFQN := et.Spec.Tag.TagFQN tagID, err := omClient.GetEntityByName(ctx, tagsEntityTypePath, tagFQN) if err != nil { - if omclient.IsNotFound(err) { - h.setConditionAndPersist(ctx, et, metav1.ConditionFalse, omv1alpha1.ReasonTagResolutionFailed, - fmt.Sprintf("tag not found: %q", tagFQN)) - return ctrl.Result{}, nil - } logger.Error(err, "Failed to resolve tag", "tagFQN", tagFQN) h.setConditionAndPersist(ctx, et, metav1.ConditionFalse, omv1alpha1.ReasonTagResolutionFailed, err.Error()) return ctrl.Result{}, err