diff --git a/internal/controller/bucket_controller.go b/internal/controller/bucket_controller.go index 78af8f117..2a70136cb 100644 --- a/internal/controller/bucket_controller.go +++ b/internal/controller/bucket_controller.go @@ -32,9 +32,7 @@ import ( "golang.org/x/sync/errgroup" "golang.org/x/sync/semaphore" corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" - kuberecorder "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -43,7 +41,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" - eventv1 "github.com/fluxcd/pkg/apis/event/v1beta1" + eventv1 "github.com/fluxcd/pkg/apis/event/v1" "github.com/fluxcd/pkg/apis/meta" intdigest "github.com/fluxcd/pkg/artifact/digest" "github.com/fluxcd/pkg/artifact/storage" @@ -51,6 +49,7 @@ import ( "github.com/fluxcd/pkg/cache" "github.com/fluxcd/pkg/runtime/conditions" helper "github.com/fluxcd/pkg/runtime/controller" + "github.com/fluxcd/pkg/runtime/events" "github.com/fluxcd/pkg/runtime/jitter" "github.com/fluxcd/pkg/runtime/patch" "github.com/fluxcd/pkg/runtime/predicates" @@ -126,7 +125,7 @@ var bucketFailConditions = []string{ // BucketReconciler reconciles a v1.Bucket object. type BucketReconciler struct { client.Client - kuberecorder.EventRecorder + events.EventRecorder helper.Metrics Storage *storage.Storage @@ -348,13 +347,13 @@ func (r *BucketReconciler) notify(ctx context.Context, oldObj, newObj *sourcev1. // Notify on new artifact and failure recovery. if !oldObj.GetArtifact().HasDigest(newObj.GetArtifact().Digest) { - r.AnnotatedEventf(newObj, annotations, corev1.EventTypeNormal, - "NewArtifact", "%s", message) + r.AnnotatedEventf(newObj, nil, annotations, corev1.EventTypeNormal, + "NewArtifact", eventv1.ActionApplied, "%s", message) ctrl.LoggerFrom(ctx).Info(message) } else { if sreconcile.FailureRecovery(oldObj, newObj, bucketFailConditions) { - r.AnnotatedEventf(newObj, annotations, corev1.EventTypeNormal, - meta.SucceededReason, "%s", message) + r.AnnotatedEventf(newObj, nil, annotations, corev1.EventTypeNormal, + meta.SucceededReason, eventv1.ActionReconciled, "%s", message) ctrl.LoggerFrom(ctx).Info(message) } } @@ -388,7 +387,7 @@ func (r *BucketReconciler) reconcileStorage(ctx context.Context, sp *patch.Seria // matches the actual artifact if !artifactMissing { if err := r.Storage.VerifyArtifact(*artifact); err != nil { - r.Eventf(obj, corev1.EventTypeWarning, "ArtifactVerificationFailed", "failed to verify integrity of artifact: %s", err.Error()) + r.Eventf(obj, nil, corev1.EventTypeWarning, "ArtifactVerificationFailed", eventv1.ActionFailed, "failed to verify integrity of artifact: %s", err.Error()) if err = r.Storage.Remove(*artifact); err != nil { return sreconcile.ResultEmpty, fmt.Errorf("failed to remove artifact after digest mismatch: %w", err) @@ -525,7 +524,7 @@ func (r *BucketReconciler) reconcileArtifact(ctx context.Context, sp *patch.Seri if curArtifact := obj.GetArtifact(); curArtifact != nil && curArtifact.Revision != "" { curRev := digest.Digest(curArtifact.Revision) if curRev.Validate() == nil && index.Digest(curRev.Algorithm()) == curRev { - r.eventLogf(ctx, obj, eventv1.EventTypeTrace, sourcev1.ArtifactUpToDateReason, "artifact up-to-date with remote revision: '%s'", artifact.Revision) + sreconcile.EventLogf(ctx, r, obj, eventv1.EventTypeTrace, sourcev1.ArtifactUpToDateReason, "artifact up-to-date with remote revision: '%s'", artifact.Revision) return sreconcile.ResultSuccess, nil } } @@ -582,7 +581,7 @@ func (r *BucketReconciler) reconcileArtifact(ctx context.Context, sp *patch.Seri // Update symlink on a "best effort" basis url, err := r.Storage.Symlink(artifact, "latest.tar.gz") if err != nil { - r.eventLogf(ctx, obj, eventv1.EventTypeTrace, sourcev1.SymlinkUpdateFailedReason, + sreconcile.EventLogf(ctx, r, obj, eventv1.EventTypeTrace, sourcev1.SymlinkUpdateFailedReason, "failed to update status URL symlink: %s", err) } if url != "" { @@ -626,7 +625,7 @@ func (r *BucketReconciler) garbageCollect(ctx context.Context, obj *sourcev1.Buc "GarbageCollectionFailed", ) } else if deleted != "" { - r.eventLogf(ctx, obj, eventv1.EventTypeTrace, "GarbageCollectionSucceeded", + sreconcile.EventLogf(ctx, r, obj, eventv1.EventTypeTrace, "GarbageCollectionSucceeded", "garbage collected artifacts for deleted resource") } obj.Status.Artifact = nil @@ -641,7 +640,7 @@ func (r *BucketReconciler) garbageCollect(ctx context.Context, obj *sourcev1.Buc ) } if len(delFiles) > 0 { - r.eventLogf(ctx, obj, eventv1.EventTypeTrace, "GarbageCollectionSucceeded", + sreconcile.EventLogf(ctx, r, obj, eventv1.EventTypeTrace, "GarbageCollectionSucceeded", "garbage collected %d artifacts", len(delFiles)) return nil } @@ -649,32 +648,6 @@ func (r *BucketReconciler) garbageCollect(ctx context.Context, obj *sourcev1.Buc return nil } -// eventLogf records events, and logs at the same time. -// -// This log is different from the debug log in the EventRecorder, in the sense -// that this is a simple log. While the debug log contains complete details -// about the event. -func (r *BucketReconciler) eventLogf(ctx context.Context, obj runtime.Object, eventType string, reason string, messageFmt string, args ...interface{}) { - r.annotatedEventLogf(ctx, obj, nil, eventType, reason, messageFmt, args...) -} - -// annotatedEventLogf records annotated events, and logs at the same time. -// -// This log is different from the debug log in the EventRecorder, in the sense -// that this is a simple log. While the debug log contains complete details -// about the event. -func (r *BucketReconciler) annotatedEventLogf(ctx context.Context, - obj runtime.Object, annotations map[string]string, eventType string, reason string, messageFmt string, args ...interface{}) { - msg := fmt.Sprintf(messageFmt, args...) - // Log and emit event. - if eventType == corev1.EventTypeWarning { - ctrl.LoggerFrom(ctx).Error(errors.New(reason), msg) - } else { - ctrl.LoggerFrom(ctx).Info(msg) - } - r.AnnotatedEventf(obj, annotations, eventType, reason, "%s", msg) -} - // fetchEtagIndex fetches the current etagIndex for the in the obj specified // bucket using the given provider, while filtering them using .sourceignore // rules. After fetching an object, the etag value in the index is updated to diff --git a/internal/controller/bucket_controller_test.go b/internal/controller/bucket_controller_test.go index 00ed46cb7..9529162f1 100644 --- a/internal/controller/bucket_controller_test.go +++ b/internal/controller/bucket_controller_test.go @@ -30,19 +30,22 @@ import ( . "github.com/onsi/gomega" corev1 "k8s.io/api/core/v1" + eventsv1 "k8s.io/api/events/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/tools/record" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" fakeclient "sigs.k8s.io/controller-runtime/pkg/client/fake" kstatus "github.com/fluxcd/cli-utils/pkg/kstatus/status" + + eventv1 "github.com/fluxcd/pkg/apis/event/v1" "github.com/fluxcd/pkg/apis/meta" intdigest "github.com/fluxcd/pkg/artifact/digest" "github.com/fluxcd/pkg/artifact/storage" "github.com/fluxcd/pkg/auth" "github.com/fluxcd/pkg/runtime/conditions" conditionscheck "github.com/fluxcd/pkg/runtime/conditions/check" + "github.com/fluxcd/pkg/runtime/events" "github.com/fluxcd/pkg/runtime/jitter" "github.com/fluxcd/pkg/runtime/patch" @@ -85,7 +88,7 @@ func TestBucketReconciler_deleteBeforeFinalizer(t *testing.T) { r := &BucketReconciler{ Client: k8sClient, - EventRecorder: record.NewFakeRecorder(32), + EventRecorder: events.NewFakeRecorder(32, false), Storage: testStorage, } // NOTE: Only a real API server responds with an error in this scenario. @@ -383,7 +386,7 @@ func TestBucketReconciler_reconcileStorage(t *testing.T) { WithScheme(testEnv.GetScheme()). WithStatusSubresource(&sourcev1.Bucket{}). Build(), - EventRecorder: record.NewFakeRecorder(32), + EventRecorder: events.NewFakeRecorder(32, false), Storage: testStorage, patchOptions: getPatchOptions(bucketReadyCondition.Owned, "sc"), } @@ -918,7 +921,7 @@ func TestBucketReconciler_reconcileSource_generic(t *testing.T) { } r := &BucketReconciler{ - EventRecorder: record.NewFakeRecorder(32), + EventRecorder: events.NewFakeRecorder(32, false), Client: clientBuilder.Build(), Storage: testStorage, patchOptions: getPatchOptions(bucketReadyCondition.Owned, "sc"), @@ -1385,7 +1388,7 @@ func TestBucketReconciler_reconcileSource_gcs(t *testing.T) { } r := &BucketReconciler{ - EventRecorder: record.NewFakeRecorder(32), + EventRecorder: events.NewFakeRecorder(32, false), Client: clientBuilder.Build(), Storage: testStorage, patchOptions: getPatchOptions(bucketReadyCondition.Owned, "sc"), @@ -1589,7 +1592,7 @@ func TestBucketReconciler_reconcileArtifact(t *testing.T) { r := &BucketReconciler{ Client: clientBuilder.Build(), - EventRecorder: record.NewFakeRecorder(32), + EventRecorder: events.NewFakeRecorder(32, false), Storage: testStorage, patchOptions: getPatchOptions(bucketReadyCondition.Owned, "sc"), } @@ -1712,7 +1715,7 @@ func TestBucketReconciler_statusConditions(t *testing.T) { } ctx := context.TODO() - summarizeHelper := summarize.NewHelper(record.NewFakeRecorder(32), serialPatcher) + summarizeHelper := summarize.NewHelper(events.NewFakeRecorder(32, false), serialPatcher) summarizeOpts := []summarize.Option{ summarize.WithConditions(bucketReadyCondition), summarize.WithReconcileResult(sreconcile.ResultSuccess), @@ -1739,7 +1742,7 @@ func TestBucketReconciler_notify(t *testing.T) { resErr error oldObjBeforeFunc func(obj *sourcev1.Bucket) newObjBeforeFunc func(obj *sourcev1.Bucket) - wantEvent string + wantEvent *eventsv1.Event }{ { name: "error - no event", @@ -1753,7 +1756,12 @@ func TestBucketReconciler_notify(t *testing.T) { newObjBeforeFunc: func(obj *sourcev1.Bucket) { obj.Status.Artifact = &meta.Artifact{Revision: "xxx", Digest: "yyy"} }, - wantEvent: "Normal NewArtifact stored artifact with 2 fetched files from", + wantEvent: &eventsv1.Event{ + Type: "Normal", + Reason: "NewArtifact", + Action: eventv1.ActionApplied, + Note: "stored artifact with 2 fetched files from", + }, }, { name: "recovery from failure", @@ -1768,7 +1776,12 @@ func TestBucketReconciler_notify(t *testing.T) { obj.Status.Artifact = &meta.Artifact{Revision: "xxx", Digest: "yyy"} conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready") }, - wantEvent: "Normal Succeeded stored artifact with 2 fetched files from", + wantEvent: &eventsv1.Event{ + Type: "Normal", + Reason: "Succeeded", + Action: eventv1.ActionReconciled, + Note: "stored artifact with 2 fetched files from", + }, }, { name: "recovery and new artifact", @@ -1783,7 +1796,12 @@ func TestBucketReconciler_notify(t *testing.T) { obj.Status.Artifact = &meta.Artifact{Revision: "aaa", Digest: "bbb"} conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready") }, - wantEvent: "Normal NewArtifact stored artifact with 2 fetched files from", + wantEvent: &eventsv1.Event{ + Type: "Normal", + Reason: "NewArtifact", + Action: eventv1.ActionApplied, + Note: "stored artifact with 2 fetched files from", + }, }, { name: "no updates", @@ -1804,7 +1822,7 @@ func TestBucketReconciler_notify(t *testing.T) { t.Run(tt.name, func(t *testing.T) { g := NewWithT(t) - recorder := record.NewFakeRecorder(32) + recorder := events.NewFakeRecorder(32, false) oldObj := &sourcev1.Bucket{ Spec: sourcev1.BucketSpec{ @@ -1832,12 +1850,15 @@ func TestBucketReconciler_notify(t *testing.T) { select { case x, ok := <-recorder.Events: - g.Expect(ok).To(Equal(tt.wantEvent != ""), "unexpected event received") - if tt.wantEvent != "" { - g.Expect(x).To(ContainSubstring(tt.wantEvent)) + g.Expect(ok).To(Equal(tt.wantEvent != nil), "unexpected event received") + if tt.wantEvent != nil { + g.Expect(x.Type).To(Equal(tt.wantEvent.Type)) + g.Expect(x.Reason).To(Equal(tt.wantEvent.Reason)) + g.Expect(x.Action).To(Equal(tt.wantEvent.Action)) + g.Expect(x.Note).To(ContainSubstring(tt.wantEvent.Note)) } default: - if tt.wantEvent != "" { + if tt.wantEvent != nil { t.Errorf("expected some event to be emitted") } } diff --git a/internal/controller/gitrepository_controller.go b/internal/controller/gitrepository_controller.go index a3c27c9b6..1c5b61939 100644 --- a/internal/controller/gitrepository_controller.go +++ b/internal/controller/gitrepository_controller.go @@ -30,14 +30,13 @@ import ( "github.com/fluxcd/pkg/auth" "github.com/fluxcd/pkg/auth/githubapp" authutils "github.com/fluxcd/pkg/auth/utils" + "github.com/fluxcd/pkg/runtime/events" "github.com/fluxcd/pkg/runtime/logger" "github.com/fluxcd/pkg/runtime/secrets" "github.com/go-git/go-git/v5/plumbing/transport" ssh "golang.org/x/crypto/ssh" corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" - kuberecorder "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" "k8s.io/utils/ptr" ctrl "sigs.k8s.io/controller-runtime" @@ -48,7 +47,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" - eventv1 "github.com/fluxcd/pkg/apis/event/v1beta1" + eventv1 "github.com/fluxcd/pkg/apis/event/v1" "github.com/fluxcd/pkg/apis/meta" "github.com/fluxcd/pkg/artifact/storage" "github.com/fluxcd/pkg/cache" @@ -165,7 +164,7 @@ func getPatchOptions(ownedConditions []string, controllerName string) []patch.Op // GitRepositoryReconciler reconciles a v1.GitRepository object. type GitRepositoryReconciler struct { client.Client - kuberecorder.EventRecorder + events.EventRecorder helper.Metrics Storage *storage.Storage @@ -377,13 +376,13 @@ func (r *GitRepositoryReconciler) notify(ctx context.Context, oldObj, newObj *so // Notify on new artifact and failure recovery. if !oldObj.GetArtifact().HasDigest(newObj.GetArtifact().Digest) { - r.AnnotatedEventf(newObj, annotations, corev1.EventTypeNormal, - "NewArtifact", "%s", message) + r.AnnotatedEventf(newObj, nil, annotations, corev1.EventTypeNormal, + "NewArtifact", eventv1.ActionApplied, "%s", message) ctrl.LoggerFrom(ctx).Info(message) } else { if sreconcile.FailureRecovery(oldObj, newObj, gitRepositoryFailConditions) { - r.AnnotatedEventf(newObj, annotations, corev1.EventTypeNormal, - meta.SucceededReason, "%s", message) + r.AnnotatedEventf(newObj, nil, annotations, corev1.EventTypeNormal, + meta.SucceededReason, eventv1.ActionReconciled, "%s", message) ctrl.LoggerFrom(ctx).Info(message) } } @@ -437,7 +436,7 @@ func (r *GitRepositoryReconciler) reconcileStorage(ctx context.Context, sp *patc // matches the actual artifact if !artifactMissing { if err := r.Storage.VerifyArtifact(*artifact); err != nil { - r.Eventf(obj, corev1.EventTypeWarning, "ArtifactVerificationFailed", "failed to verify integrity of artifact: %s", err.Error()) + r.Eventf(obj, nil, corev1.EventTypeWarning, "ArtifactVerificationFailed", eventv1.ActionFailed, "failed to verify integrity of artifact: %s", err.Error()) if err = r.Storage.Remove(*artifact); err != nil { return sreconcile.ResultEmpty, fmt.Errorf("failed to remove artifact after digest mismatch: %w", err) @@ -870,7 +869,7 @@ func (r *GitRepositoryReconciler) reconcileArtifact(ctx context.Context, sp *pat if curArtifact := obj.GetArtifact(); curArtifact.HasRevision(artifact.Revision) && !includes.Diff(obj.Status.IncludedArtifacts) && !gitContentConfigChanged(obj, includes) { - r.eventLogf(ctx, obj, eventv1.EventTypeTrace, sourcev1.ArtifactUpToDateReason, "artifact up-to-date with remote revision: '%s'", curArtifact.Revision) + sreconcile.EventLogf(ctx, r, obj, eventv1.EventTypeTrace, sourcev1.ArtifactUpToDateReason, "artifact up-to-date with remote revision: '%s'", curArtifact.Revision) return sreconcile.ResultSuccess, nil } @@ -947,7 +946,7 @@ func (r *GitRepositoryReconciler) reconcileArtifact(ctx context.Context, sp *pat if fi, err := os.Lstat(r.Storage.LocalPath(artifact)); err == nil { if fi.Mode()&os.ModeSymlink != 0 { if err := os.Remove(r.Storage.LocalPath(*symArtifact)); err != nil { - r.eventLogf(ctx, obj, eventv1.EventTypeTrace, sourcev1.SymlinkUpdateFailedReason, + sreconcile.EventLogf(ctx, r, obj, eventv1.EventTypeTrace, sourcev1.SymlinkUpdateFailedReason, "failed to remove (deprecated) symlink: %s", err) } } @@ -1218,7 +1217,7 @@ func (r *GitRepositoryReconciler) verifySignature(ctx context.Context, obj *sour mode := obj.Spec.Verification.GetMode() obj.Status.SourceVerificationMode = &mode conditions.MarkTrue(obj, sourcev1.SourceVerifiedCondition, reason, "%s", message.String()) - r.eventLogf(ctx, obj, eventv1.EventTypeTrace, reason, "%s", message.String()) + sreconcile.EventLogf(ctx, r, obj, eventv1.EventTypeTrace, reason, "%s", message.String()) return sreconcile.ResultSuccess, nil } @@ -1256,7 +1255,7 @@ func (r *GitRepositoryReconciler) garbageCollect(ctx context.Context, obj *sourc "GarbageCollectionFailed", ) } else if deleted != "" { - r.eventLogf(ctx, obj, eventv1.EventTypeTrace, "GarbageCollectionSucceeded", + sreconcile.EventLogf(ctx, r, obj, eventv1.EventTypeTrace, "GarbageCollectionSucceeded", "garbage collected artifacts for deleted resource") } obj.Status.Artifact = nil @@ -1271,7 +1270,7 @@ func (r *GitRepositoryReconciler) garbageCollect(ctx context.Context, obj *sourc ) } if len(delFiles) > 0 { - r.eventLogf(ctx, obj, eventv1.EventTypeTrace, "GarbageCollectionSucceeded", + sreconcile.EventLogf(ctx, r, obj, eventv1.EventTypeTrace, "GarbageCollectionSucceeded", "garbage collected %d artifacts", len(delFiles)) return nil } @@ -1279,22 +1278,6 @@ func (r *GitRepositoryReconciler) garbageCollect(ctx context.Context, obj *sourc return nil } -// eventLogf records events, and logs at the same time. -// -// This log is different from the debug log in the EventRecorder, in the sense -// that this is a simple log. While the debug log contains complete details -// about the event. -func (r *GitRepositoryReconciler) eventLogf(ctx context.Context, obj runtime.Object, eventType string, reason string, messageFmt string, args ...interface{}) { - msg := fmt.Sprintf(messageFmt, args...) - // Log and emit event. - if eventType == corev1.EventTypeWarning { - ctrl.LoggerFrom(ctx).Error(errors.New(reason), msg) - } else { - ctrl.LoggerFrom(ctx).Info(msg) - } - r.Eventf(obj, eventType, reason, "%s", msg) -} - // gitContentConfigChanged evaluates the current spec with the observations of // the artifact in the status to determine if artifact content configuration has // changed and requires rebuilding the artifact. Rebuilding the artifact is also diff --git a/internal/controller/gitrepository_controller_test.go b/internal/controller/gitrepository_controller_test.go index 84b2074a8..a364617aa 100644 --- a/internal/controller/gitrepository_controller_test.go +++ b/internal/controller/gitrepository_controller_test.go @@ -38,8 +38,8 @@ import ( . "github.com/onsi/gomega" sshtestdata "golang.org/x/crypto/ssh/testdata" corev1 "k8s.io/api/core/v1" + eventsv1 "k8s.io/api/events/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/tools/record" "k8s.io/utils/ptr" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -47,6 +47,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" kstatus "github.com/fluxcd/cli-utils/pkg/kstatus/status" + eventv1 "github.com/fluxcd/pkg/apis/event/v1" "github.com/fluxcd/pkg/apis/meta" "github.com/fluxcd/pkg/artifact/storage" "github.com/fluxcd/pkg/auth" @@ -55,6 +56,7 @@ import ( "github.com/fluxcd/pkg/gittestserver" "github.com/fluxcd/pkg/runtime/conditions" conditionscheck "github.com/fluxcd/pkg/runtime/conditions/check" + "github.com/fluxcd/pkg/runtime/events" "github.com/fluxcd/pkg/runtime/jitter" "github.com/fluxcd/pkg/runtime/patch" "github.com/fluxcd/pkg/ssh" @@ -232,7 +234,7 @@ func TestGitRepositoryReconciler_deleteBeforeFinalizer(t *testing.T) { r := &GitRepositoryReconciler{ Client: k8sClient, - EventRecorder: record.NewFakeRecorder(32), + EventRecorder: events.NewFakeRecorder(32, false), Storage: testStorage, } // NOTE: Only a real API server responds with an error in this scenario. @@ -345,7 +347,7 @@ func TestGitRepositoryReconciler_reconcileSource_emptyRepository(t *testing.T) { r := &GitRepositoryReconciler{ Client: clientBuilder.Build(), - EventRecorder: record.NewFakeRecorder(32), + EventRecorder: events.NewFakeRecorder(32, false), Storage: testStorage, patchOptions: getPatchOptions(gitRepositoryReadyCondition.Owned, "sc"), } @@ -902,7 +904,7 @@ func TestGitRepositoryReconciler_reconcileSource_authStrategy(t *testing.T) { r := &GitRepositoryReconciler{ Client: clientBuilder.Build(), - EventRecorder: record.NewFakeRecorder(32), + EventRecorder: events.NewFakeRecorder(32, false), Storage: testStorage, patchOptions: getPatchOptions(gitRepositoryReadyCondition.Owned, "sc"), } @@ -1084,7 +1086,7 @@ func TestGitRepositoryReconciler_getAuthOpts_provider(t *testing.T) { obj := &sourcev1.GitRepository{} r := &GitRepositoryReconciler{ - EventRecorder: record.NewFakeRecorder(32), + EventRecorder: events.NewFakeRecorder(32, false), Client: clientBuilder.Build(), features: features.FeatureGates(), patchOptions: getPatchOptions(gitRepositoryReadyCondition.Owned, "sc"), @@ -1305,7 +1307,7 @@ func TestGitRepositoryReconciler_reconcileSource_checkoutStrategy(t *testing.T) WithScheme(testEnv.GetScheme()). WithStatusSubresource(&sourcev1.GitRepository{}). Build(), - EventRecorder: record.NewFakeRecorder(32), + EventRecorder: events.NewFakeRecorder(32, false), Storage: testStorage, patchOptions: getPatchOptions(gitRepositoryReadyCondition.Owned, "sc"), } @@ -1509,7 +1511,7 @@ func TestGitRepositoryReconciler_reconcileArtifact(t *testing.T) { resetChmod(tt.dir, 0o750, 0o600) r := &GitRepositoryReconciler{ - EventRecorder: record.NewFakeRecorder(32), + EventRecorder: events.NewFakeRecorder(32, false), Storage: testStorage, features: features.FeatureGates(), patchOptions: getPatchOptions(gitRepositoryReadyCondition.Owned, "sc"), @@ -1660,7 +1662,7 @@ func TestGitRepositoryReconciler_reconcileInclude(t *testing.T) { r := &GitRepositoryReconciler{ Client: clientBuilder.Build(), - EventRecorder: record.NewFakeRecorder(32), + EventRecorder: events.NewFakeRecorder(32, false), Storage: storage, requeueDependency: dependencyInterval, features: features.FeatureGates(), @@ -1917,7 +1919,7 @@ func TestGitRepositoryReconciler_reconcileStorage(t *testing.T) { WithScheme(testEnv.GetScheme()). WithStatusSubresource(&sourcev1.GitRepository{}). Build(), - EventRecorder: record.NewFakeRecorder(32), + EventRecorder: events.NewFakeRecorder(32, false), Storage: testStorage, features: features.FeatureGates(), patchOptions: getPatchOptions(gitRepositoryReadyCondition.Owned, "sc"), @@ -1971,7 +1973,7 @@ func TestGitRepositoryReconciler_reconcileDelete(t *testing.T) { g := NewWithT(t) r := &GitRepositoryReconciler{ - EventRecorder: record.NewFakeRecorder(32), + EventRecorder: events.NewFakeRecorder(32, false), Storage: testStorage, features: features.FeatureGates(), patchOptions: getPatchOptions(gitRepositoryReadyCondition.Owned, "sc"), @@ -2762,7 +2764,7 @@ func TestGitRepositoryReconciler_verifySignature(t *testing.T) { } r := &GitRepositoryReconciler{ - EventRecorder: record.NewFakeRecorder(32), + EventRecorder: events.NewFakeRecorder(32, false), Client: clientBuilder.Build(), features: features.FeatureGates(), patchOptions: getPatchOptions(gitRepositoryReadyCondition.Owned, "sc"), @@ -2915,7 +2917,7 @@ func TestGitRepositoryReconciler_ConditionsUpdate(t *testing.T) { r := &GitRepositoryReconciler{ Client: clientBuilder.Build(), - EventRecorder: record.NewFakeRecorder(32), + EventRecorder: events.NewFakeRecorder(32, false), Storage: testStorage, features: features.FeatureGates(), patchOptions: getPatchOptions(gitRepositoryReadyCondition.Owned, "sc"), @@ -3165,7 +3167,7 @@ func TestGitRepositoryReconciler_statusConditions(t *testing.T) { } ctx := context.TODO() - summarizeHelper := summarize.NewHelper(record.NewFakeRecorder(32), serialPatcher) + summarizeHelper := summarize.NewHelper(events.NewFakeRecorder(32, false), serialPatcher) summarizeOpts := []summarize.Option{ summarize.WithConditions(gitRepositoryReadyCondition), summarize.WithBiPolarityConditionTypes(sourcev1.SourceVerifiedCondition), @@ -3206,7 +3208,7 @@ func TestGitRepositoryReconciler_notify(t *testing.T) { oldObjBeforeFunc func(obj *sourcev1.GitRepository) newObjBeforeFunc func(obj *sourcev1.GitRepository) commit git.Commit - wantEvent string + wantEvent *eventsv1.Event }{ { name: "error - no event", @@ -3220,8 +3222,13 @@ func TestGitRepositoryReconciler_notify(t *testing.T) { newObjBeforeFunc: func(obj *sourcev1.GitRepository) { obj.Status.Artifact = &meta.Artifact{Revision: "xxx", Digest: "yyy"} }, - commit: concreteCommit, - wantEvent: "Normal NewArtifact stored artifact for commit 'test commit'", + commit: concreteCommit, + wantEvent: &eventsv1.Event{ + Type: "Normal", + Reason: "NewArtifact", + Action: eventv1.ActionApplied, + Note: "stored artifact for commit 'test commit'", + }, }, { name: "recovery from failure", @@ -3236,8 +3243,13 @@ func TestGitRepositoryReconciler_notify(t *testing.T) { obj.Status.Artifact = &meta.Artifact{Revision: "xxx", Digest: "yyy"} conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready") }, - commit: concreteCommit, - wantEvent: "Normal Succeeded stored artifact for commit 'test commit'", + commit: concreteCommit, + wantEvent: &eventsv1.Event{ + Type: "Normal", + Reason: "Succeeded", + Action: eventv1.ActionReconciled, + Note: "stored artifact for commit 'test commit'", + }, }, { name: "recovery and new artifact", @@ -3252,8 +3264,13 @@ func TestGitRepositoryReconciler_notify(t *testing.T) { obj.Status.Artifact = &meta.Artifact{Revision: "aaa", Digest: "bbb"} conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready") }, - commit: concreteCommit, - wantEvent: "Normal NewArtifact stored artifact for commit 'test commit'", + commit: concreteCommit, + wantEvent: &eventsv1.Event{ + Type: "Normal", + Reason: "NewArtifact", + Action: eventv1.ActionApplied, + Note: "stored artifact for commit 'test commit'", + }, }, { name: "no updates", @@ -3281,15 +3298,20 @@ func TestGitRepositoryReconciler_notify(t *testing.T) { obj.Status.Artifact = &meta.Artifact{Revision: "xxx", Digest: "yyy"} conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready") }, - commit: partialCommit, // no-op will always result in partial commit. - wantEvent: "Normal Succeeded stored artifact for commit 'sha1:b9b3feadba509cb9b22e968a5d27e96c2bc2ff91'", + commit: partialCommit, // no-op will always result in partial commit. + wantEvent: &eventsv1.Event{ + Type: "Normal", + Reason: "Succeeded", + Action: eventv1.ActionReconciled, + Note: "stored artifact for commit 'sha1:b9b3feadba509cb9b22e968a5d27e96c2bc2ff91'", + }, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { g := NewWithT(t) - recorder := record.NewFakeRecorder(32) + recorder := events.NewFakeRecorder(32, false) oldObj := &sourcev1.GitRepository{} newObj := oldObj.DeepCopy() @@ -3310,12 +3332,15 @@ func TestGitRepositoryReconciler_notify(t *testing.T) { select { case x, ok := <-recorder.Events: - g.Expect(ok).To(Equal(tt.wantEvent != ""), "unexpected event received") - if tt.wantEvent != "" { - g.Expect(x).To(ContainSubstring(tt.wantEvent)) + g.Expect(ok).To(Equal(tt.wantEvent != nil), "unexpected event received") + if tt.wantEvent != nil { + g.Expect(x.Type).To(Equal(tt.wantEvent.Type)) + g.Expect(x.Reason).To(Equal(tt.wantEvent.Reason)) + g.Expect(x.Action).To(Equal(tt.wantEvent.Action)) + g.Expect(x.Note).To(ContainSubstring(tt.wantEvent.Note)) } default: - if tt.wantEvent != "" { + if tt.wantEvent != nil { t.Errorf("expected some event to be emitted") } } @@ -3446,7 +3471,7 @@ func TestGitRepositoryReconciler_fetchIncludes(t *testing.T) { r := &GitRepositoryReconciler{ Client: clientBuilder.Build(), - EventRecorder: record.NewFakeRecorder(32), + EventRecorder: events.NewFakeRecorder(32, false), patchOptions: getPatchOptions(gitRepositoryReadyCondition.Owned, "sc"), } diff --git a/internal/controller/helmchart_controller.go b/internal/controller/helmchart_controller.go index 9197e11e7..b58280246 100644 --- a/internal/controller/helmchart_controller.go +++ b/internal/controller/helmchart_controller.go @@ -38,9 +38,7 @@ import ( corev1 "k8s.io/api/core/v1" apierrs "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" - kuberecorder "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" @@ -51,12 +49,13 @@ import ( "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" - eventv1 "github.com/fluxcd/pkg/apis/event/v1beta1" + eventv1 "github.com/fluxcd/pkg/apis/event/v1" "github.com/fluxcd/pkg/apis/meta" "github.com/fluxcd/pkg/artifact/storage" "github.com/fluxcd/pkg/git" "github.com/fluxcd/pkg/runtime/conditions" helper "github.com/fluxcd/pkg/runtime/controller" + "github.com/fluxcd/pkg/runtime/events" "github.com/fluxcd/pkg/runtime/jitter" "github.com/fluxcd/pkg/runtime/patch" "github.com/fluxcd/pkg/runtime/predicates" @@ -128,7 +127,7 @@ var helmChartFailConditions = []string{ // HelmChartReconciler reconciles a HelmChart object type HelmChartReconciler struct { client.Client - kuberecorder.EventRecorder + events.EventRecorder helper.Metrics Storage *storage.Storage @@ -348,13 +347,13 @@ func (r *HelmChartReconciler) notify(ctx context.Context, oldObj, newObj *source // Notify on new artifact and failure recovery. if !oldObj.GetArtifact().HasDigest(newObj.GetArtifact().Digest) { - r.AnnotatedEventf(newObj, annotations, corev1.EventTypeNormal, - reasonForBuild(build), "%s", build.Summary()) + r.AnnotatedEventf(newObj, nil, annotations, corev1.EventTypeNormal, + reasonForBuild(build), eventv1.ActionApplied, "%s", build.Summary()) ctrl.LoggerFrom(ctx).Info(build.Summary()) } else { if sreconcile.FailureRecovery(oldObj, newObj, helmChartFailConditions) { - r.AnnotatedEventf(newObj, annotations, corev1.EventTypeNormal, - reasonForBuild(build), "%s", build.Summary()) + r.AnnotatedEventf(newObj, nil, annotations, corev1.EventTypeNormal, + reasonForBuild(build), eventv1.ActionApplied, "%s", build.Summary()) ctrl.LoggerFrom(ctx).Info(build.Summary()) } } @@ -388,7 +387,7 @@ func (r *HelmChartReconciler) reconcileStorage(ctx context.Context, sp *patch.Se // matches the actual artifact if !artifactMissing { if err := r.Storage.VerifyArtifact(*artifact); err != nil { - r.Eventf(obj, corev1.EventTypeWarning, "ArtifactVerificationFailed", "failed to verify integrity of artifact: %s", err.Error()) + r.Eventf(obj, nil, corev1.EventTypeWarning, "ArtifactVerificationFailed", eventv1.ActionFailed, "failed to verify integrity of artifact: %s", err.Error()) if err = r.Storage.Remove(*artifact); err != nil { return sreconcile.ResultEmpty, fmt.Errorf("failed to remove artifact after digest mismatch: %w", err) @@ -460,7 +459,7 @@ func (r *HelmChartReconciler) reconcileSource(ctx context.Context, sp *patch.Ser if helmRepo, ok := s.(*sourcev1.HelmRepository); !ok || helmRepo.Spec.Type != sourcev1.HelmRepositoryTypeOCI { conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, "NoSourceArtifact", "no artifact available for %s source '%s'", obj.Spec.SourceRef.Kind, obj.Spec.SourceRef.Name) - r.eventLogf(ctx, obj, eventv1.EventTypeTrace, "NoSourceArtifact", + sreconcile.EventLogf(ctx, r, obj, eventv1.EventTypeTrace, "NoSourceArtifact", "no artifact available for %s source '%s'", obj.Spec.SourceRef.Kind, obj.Spec.SourceRef.Name) return sreconcile.ResultRequeue, nil } @@ -482,7 +481,7 @@ func (r *HelmChartReconciler) reconcileSource(ctx context.Context, sp *patch.Ser // a sudden (partial) disappearance of observed state. // TODO(hidde): include specific name/version information? if depNum := build.ResolvedDependencies; build.Complete() && depNum > 0 { - r.Eventf(obj, eventv1.EventTypeTrace, "ResolvedDependencies", "resolved %d chart dependencies", depNum) + r.Eventf(obj, nil, eventv1.EventTypeTrace, "ResolvedDependencies", eventv1.ActionReconciled, "resolved %d chart dependencies", depNum) } // Handle any build error @@ -623,7 +622,7 @@ func (r *HelmChartReconciler) buildFromHelmRepository(ctx context.Context, obj * // If we succeed in loading the index, cache it. if httpChartRepo.Index != nil { if err = r.Cache.Set(repo.GetArtifact().Path, httpChartRepo.Index, r.TTL); err != nil { - r.eventLogf(ctx, obj, eventv1.EventTypeTrace, sourcev1.CacheOperationFailedReason, "failed to cache index: %s", err) + sreconcile.EventLogf(ctx, r, obj, eventv1.EventTypeTrace, sourcev1.CacheOperationFailedReason, "failed to cache index: %s", err) } } }() @@ -752,7 +751,7 @@ func (r *HelmChartReconciler) buildFromTarballArtifact(ctx context.Context, obj defer func() { err := dm.Clear() if err != nil { - r.eventLogf(ctx, obj, corev1.EventTypeWarning, meta.FailedReason, + sreconcile.EventLogf(ctx, r, obj, corev1.EventTypeWarning, meta.FailedReason, "dependency manager cleanup error: %s", err) } }() @@ -845,7 +844,7 @@ func (r *HelmChartReconciler) reconcileArtifact(ctx context.Context, _ *patch.Se // Return early if the build path equals the current artifact path if curArtifact := obj.GetArtifact(); curArtifact != nil && r.Storage.LocalPath(*curArtifact) == b.Path { - r.eventLogf(ctx, obj, eventv1.EventTypeTrace, sourcev1.ArtifactUpToDateReason, "artifact up-to-date with remote revision: '%s'", artifact.Revision) + sreconcile.EventLogf(ctx, r, obj, eventv1.EventTypeTrace, sourcev1.ArtifactUpToDateReason, "artifact up-to-date with remote revision: '%s'", artifact.Revision) return sreconcile.ResultSuccess, nil } @@ -894,7 +893,7 @@ func (r *HelmChartReconciler) reconcileArtifact(ctx context.Context, _ *patch.Se // Update symlink on a "best effort" basis symURL, err := r.Storage.Symlink(artifact, "latest.tar.gz") if err != nil { - r.eventLogf(ctx, obj, eventv1.EventTypeTrace, sourcev1.SymlinkUpdateFailedReason, + sreconcile.EventLogf(ctx, r, obj, eventv1.EventTypeTrace, sourcev1.SymlinkUpdateFailedReason, "failed to update status URL symlink: %s", err) } if symURL != "" { @@ -968,7 +967,7 @@ func (r *HelmChartReconciler) garbageCollect(ctx context.Context, obj *sourcev1. "GarbageCollectionFailed", ) } else if deleted != "" { - r.eventLogf(ctx, obj, eventv1.EventTypeTrace, "GarbageCollectionSucceeded", + sreconcile.EventLogf(ctx, r, obj, eventv1.EventTypeTrace, "GarbageCollectionSucceeded", "garbage collected artifacts for deleted resource") } obj.Status.Artifact = nil @@ -983,7 +982,7 @@ func (r *HelmChartReconciler) garbageCollect(ctx context.Context, obj *sourcev1. ) } if len(delFiles) > 0 { - r.eventLogf(ctx, obj, eventv1.EventTypeTrace, "GarbageCollectionSucceeded", + sreconcile.EventLogf(ctx, r, obj, eventv1.EventTypeTrace, "GarbageCollectionSucceeded", "garbage collected %d artifacts", len(delFiles)) return nil } @@ -1202,22 +1201,6 @@ func (r *HelmChartReconciler) requestsForBucketChange(ctx context.Context, o cli return reqs } -// eventLogf records events, and logs at the same time. -// -// This log is different from the debug log in the EventRecorder, in the sense -// that this is a simple log. While the debug log contains complete details -// about the event. -func (r *HelmChartReconciler) eventLogf(ctx context.Context, obj runtime.Object, eventType string, reason string, messageFmt string, args ...interface{}) { - msg := fmt.Sprintf(messageFmt, args...) - // Log and emit event. - if eventType == corev1.EventTypeWarning { - ctrl.LoggerFrom(ctx).Error(errors.New(reason), msg) - } else { - ctrl.LoggerFrom(ctx).Info(msg) - } - r.Eventf(obj, eventType, reason, "%s", msg) -} - // observeChartBuild records the observation on the given given build and error on the object. func observeChartBuild(ctx context.Context, sp *patch.SerialPatcher, pOpts []patch.Option, obj *sourcev1.HelmChart, build *chart.Build, err error) { if build.HasMetadata() { diff --git a/internal/controller/helmchart_controller_test.go b/internal/controller/helmchart_controller_test.go index 4101987b2..151d1ca8a 100644 --- a/internal/controller/helmchart_controller_test.go +++ b/internal/controller/helmchart_controller_test.go @@ -50,11 +50,11 @@ import ( "helm.sh/helm/v4/pkg/chart/v2/loader" helmreg "helm.sh/helm/v4/pkg/registry" corev1 "k8s.io/api/core/v1" + eventsv1 "k8s.io/api/events/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/client-go/tools/record" oras "oras.land/oras-go/v2/registry/remote" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -62,11 +62,13 @@ import ( "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" kstatus "github.com/fluxcd/cli-utils/pkg/kstatus/status" + eventv1 "github.com/fluxcd/pkg/apis/event/v1" "github.com/fluxcd/pkg/apis/meta" "github.com/fluxcd/pkg/artifact/storage" "github.com/fluxcd/pkg/helmtestserver" "github.com/fluxcd/pkg/runtime/conditions" conditionscheck "github.com/fluxcd/pkg/runtime/conditions/check" + "github.com/fluxcd/pkg/runtime/events" "github.com/fluxcd/pkg/runtime/jitter" "github.com/fluxcd/pkg/runtime/patch" "github.com/fluxcd/pkg/testserver" @@ -112,7 +114,7 @@ func TestHelmChartReconciler_deleteBeforeFinalizer(t *testing.T) { r := &HelmChartReconciler{ Client: k8sClient, - EventRecorder: record.NewFakeRecorder(32), + EventRecorder: events.NewFakeRecorder(32, false), Storage: testStorage, CosignVerifierFactory: testCosignVerifierFactory, } @@ -520,7 +522,7 @@ func TestHelmChartReconciler_reconcileStorage(t *testing.T) { WithScheme(testEnv.GetScheme()). WithStatusSubresource(&sourcev1.HelmChart{}). Build(), - EventRecorder: record.NewFakeRecorder(32), + EventRecorder: events.NewFakeRecorder(32, false), Storage: testStorage, patchOptions: getPatchOptions(helmChartReadyCondition.Owned, "sc"), } @@ -794,7 +796,7 @@ func TestHelmChartReconciler_reconcileSource(t *testing.T) { r := &HelmChartReconciler{ Client: clientBuilder.Build(), - EventRecorder: record.NewFakeRecorder(32), + EventRecorder: events.NewFakeRecorder(32, false), Storage: st, CosignVerifierFactory: testCosignVerifierFactory, patchOptions: getPatchOptions(helmChartReadyCondition.Owned, "sc"), @@ -1131,7 +1133,7 @@ func TestHelmChartReconciler_buildFromHelmRepository(t *testing.T) { r := &HelmChartReconciler{ Client: clientBuilder.Build(), - EventRecorder: record.NewFakeRecorder(32), + EventRecorder: events.NewFakeRecorder(32, false), Getters: testGetters, Storage: testStorage, CosignVerifierFactory: testCosignVerifierFactory, @@ -1384,7 +1386,7 @@ func TestHelmChartReconciler_buildFromOCIHelmRepository(t *testing.T) { r := &HelmChartReconciler{ Client: clientBuilder.Build(), - EventRecorder: record.NewFakeRecorder(32), + EventRecorder: events.NewFakeRecorder(32, false), Getters: testGetters, Storage: st, CosignVerifierFactory: testCosignVerifierFactory, @@ -1627,7 +1629,7 @@ func TestHelmChartReconciler_buildFromTarballArtifact(t *testing.T) { WithScheme(testEnv.Scheme()). WithStatusSubresource(&sourcev1.HelmChart{}). Build(), - EventRecorder: record.NewFakeRecorder(32), + EventRecorder: events.NewFakeRecorder(32, false), Storage: st, Getters: testGetters, patchOptions: getPatchOptions(helmChartReadyCondition.Owned, "sc"), @@ -1838,7 +1840,7 @@ func TestHelmChartReconciler_reconcileArtifact(t *testing.T) { WithScheme(testEnv.GetScheme()). WithStatusSubresource(&sourcev1.HelmChart{}). Build(), - EventRecorder: record.NewFakeRecorder(32), + EventRecorder: events.NewFakeRecorder(32, false), Storage: testStorage, patchOptions: getPatchOptions(helmChartReadyCondition.Owned, "sc"), } @@ -2028,7 +2030,7 @@ func TestHelmChartReconciler_reconcileDelete(t *testing.T) { g := NewWithT(t) r := &HelmChartReconciler{ - EventRecorder: record.NewFakeRecorder(32), + EventRecorder: events.NewFakeRecorder(32, false), Storage: testStorage, CosignVerifierFactory: testCosignVerifierFactory, patchOptions: getPatchOptions(helmChartReadyCondition.Owned, "sc"), @@ -2298,7 +2300,7 @@ func TestHelmChartReconciler_statusConditions(t *testing.T) { } ctx := context.TODO() - summarizeHelper := summarize.NewHelper(record.NewFakeRecorder(32), serialPatcher) + summarizeHelper := summarize.NewHelper(events.NewFakeRecorder(32, false), serialPatcher) summarizeOpts := []summarize.Option{ summarize.WithConditions(helmChartReadyCondition), summarize.WithBiPolarityConditionTypes(sourcev1.SourceVerifiedCondition), @@ -2326,7 +2328,7 @@ func TestHelmChartReconciler_notify(t *testing.T) { resErr error oldObjBeforeFunc func(obj *sourcev1.HelmChart) newObjBeforeFunc func(obj *sourcev1.HelmChart) - wantEvent string + wantEvent *eventsv1.Event }{ { name: "error - no event", @@ -2340,7 +2342,12 @@ func TestHelmChartReconciler_notify(t *testing.T) { newObjBeforeFunc: func(obj *sourcev1.HelmChart) { obj.Status.Artifact = &meta.Artifact{Revision: "xxx", Digest: "yyy"} }, - wantEvent: "Normal ChartPackageSucceeded packaged", + wantEvent: &eventsv1.Event{ + Type: "Normal", + Reason: "ChartPackageSucceeded", + Action: eventv1.ActionApplied, + Note: "packaged", + }, }, { name: "recovery from failure", @@ -2355,7 +2362,12 @@ func TestHelmChartReconciler_notify(t *testing.T) { obj.Status.Artifact = &meta.Artifact{Revision: "xxx", Digest: "yyy"} conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready") }, - wantEvent: "Normal ChartPackageSucceeded packaged", + wantEvent: &eventsv1.Event{ + Type: "Normal", + Reason: "ChartPackageSucceeded", + Action: eventv1.ActionApplied, + Note: "packaged", + }, }, { name: "recovery and new artifact", @@ -2370,7 +2382,12 @@ func TestHelmChartReconciler_notify(t *testing.T) { obj.Status.Artifact = &meta.Artifact{Revision: "aaa", Digest: "bbb"} conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready") }, - wantEvent: "Normal ChartPackageSucceeded packaged", + wantEvent: &eventsv1.Event{ + Type: "Normal", + Reason: "ChartPackageSucceeded", + Action: eventv1.ActionApplied, + Note: "packaged", + }, }, { name: "no updates", @@ -2390,7 +2407,7 @@ func TestHelmChartReconciler_notify(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { g := NewWithT(t) - recorder := record.NewFakeRecorder(32) + recorder := events.NewFakeRecorder(32, false) oldObj := &sourcev1.HelmChart{} newObj := oldObj.DeepCopy() @@ -2416,12 +2433,15 @@ func TestHelmChartReconciler_notify(t *testing.T) { select { case x, ok := <-recorder.Events: - g.Expect(ok).To(Equal(tt.wantEvent != ""), "unexpected event received") - if tt.wantEvent != "" { - g.Expect(x).To(ContainSubstring(tt.wantEvent)) + g.Expect(ok).To(Equal(tt.wantEvent != nil), "unexpected event received") + if tt.wantEvent != nil { + g.Expect(x.Type).To(Equal(tt.wantEvent.Type)) + g.Expect(x.Reason).To(Equal(tt.wantEvent.Reason)) + g.Expect(x.Action).To(Equal(tt.wantEvent.Action)) + g.Expect(x.Note).To(ContainSubstring(tt.wantEvent.Note)) } default: - if tt.wantEvent != "" { + if tt.wantEvent != nil { t.Errorf("expected some event to be emitted") } } @@ -2726,7 +2746,7 @@ func TestHelmChartReconciler_reconcileSourceFromOCI_authStrategy(t *testing.T) { r := &HelmChartReconciler{ Client: clientBuilder.Build(), - EventRecorder: record.NewFakeRecorder(32), + EventRecorder: events.NewFakeRecorder(32, false), Getters: testGetters, patchOptions: getPatchOptions(helmChartReadyCondition.Owned, "sc"), } @@ -2885,7 +2905,7 @@ func TestHelmChartRepository_reconcileSource_verifyOCISourceSignature_keyless(t r := &HelmChartReconciler{ Client: clientBuilder.Build(), - EventRecorder: record.NewFakeRecorder(32), + EventRecorder: events.NewFakeRecorder(32, false), Getters: testGetters, Storage: testStorage, CosignVerifierFactory: testCosignVerifierFactory, @@ -3191,7 +3211,7 @@ func TestHelmChartReconciler_reconcileSourceFromOCI_verifySignatureNotation(t *t r := &HelmChartReconciler{ Client: clientBuilder.Build(), - EventRecorder: record.NewFakeRecorder(32), + EventRecorder: events.NewFakeRecorder(32, false), Getters: testGetters, Storage: testStorage, CosignVerifierFactory: testCosignVerifierFactory, @@ -3443,7 +3463,7 @@ func TestHelmChartReconciler_reconcileSourceFromOCI_verifySignatureCosign(t *tes r := &HelmChartReconciler{ Client: clientBuilder.Build(), - EventRecorder: record.NewFakeRecorder(32), + EventRecorder: events.NewFakeRecorder(32, false), Getters: testGetters, Storage: st, CosignVerifierFactory: testCosignVerifierFactory, diff --git a/internal/controller/helmrepository_controller.go b/internal/controller/helmrepository_controller.go index 0caf5c9e3..f4f179460 100644 --- a/internal/controller/helmrepository_controller.go +++ b/internal/controller/helmrepository_controller.go @@ -29,8 +29,6 @@ import ( helmgetter "helm.sh/helm/v4/pkg/getter" helmreg "helm.sh/helm/v4/pkg/registry" corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/runtime" - kuberecorder "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -39,12 +37,13 @@ import ( "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" - eventv1 "github.com/fluxcd/pkg/apis/event/v1beta1" + eventv1 "github.com/fluxcd/pkg/apis/event/v1" "github.com/fluxcd/pkg/apis/meta" intdigest "github.com/fluxcd/pkg/artifact/digest" "github.com/fluxcd/pkg/artifact/storage" "github.com/fluxcd/pkg/runtime/conditions" helper "github.com/fluxcd/pkg/runtime/controller" + "github.com/fluxcd/pkg/runtime/events" "github.com/fluxcd/pkg/runtime/jitter" "github.com/fluxcd/pkg/runtime/patch" "github.com/fluxcd/pkg/runtime/predicates" @@ -105,7 +104,7 @@ var helmRepositoryFailConditions = []string{ // HelmRepositoryReconciler reconciles a v1.HelmRepository object. type HelmRepositoryReconciler struct { client.Client - kuberecorder.EventRecorder + events.EventRecorder helper.Metrics Getters helmgetter.Providers @@ -299,13 +298,13 @@ func (r *HelmRepositoryReconciler) notify(ctx context.Context, oldObj, newObj *s // Notify on new artifact and failure recovery. if !oldObj.GetArtifact().HasDigest(newObj.GetArtifact().Digest) { - r.AnnotatedEventf(newObj, annotations, corev1.EventTypeNormal, - "NewArtifact", "%s", message) + r.AnnotatedEventf(newObj, nil, annotations, corev1.EventTypeNormal, + "NewArtifact", eventv1.ActionApplied, "%s", message) ctrl.LoggerFrom(ctx).Info(message) } else { if sreconcile.FailureRecovery(oldObj, newObj, helmRepositoryFailConditions) { - r.AnnotatedEventf(newObj, annotations, corev1.EventTypeNormal, - meta.SucceededReason, "%s", message) + r.AnnotatedEventf(newObj, nil, annotations, corev1.EventTypeNormal, + meta.SucceededReason, eventv1.ActionReconciled, "%s", message) ctrl.LoggerFrom(ctx).Info(message) } } @@ -340,7 +339,7 @@ func (r *HelmRepositoryReconciler) reconcileStorage(ctx context.Context, sp *pat // matches the actual artifact if !artifactMissing { if err := r.Storage.VerifyArtifact(*artifact); err != nil { - r.Eventf(obj, corev1.EventTypeWarning, "ArtifactVerificationFailed", "failed to verify integrity of artifact: %s", err.Error()) + r.Eventf(obj, nil, corev1.EventTypeWarning, "ArtifactVerificationFailed", eventv1.ActionFailed, "failed to verify integrity of artifact: %s", err.Error()) if err = r.Storage.Remove(*artifact); err != nil { return sreconcile.ResultEmpty, fmt.Errorf("failed to remove artifact after digest mismatch: %w", err) @@ -544,7 +543,7 @@ func (r *HelmRepositoryReconciler) reconcileArtifact(ctx context.Context, sp *pa r.Cache.SetExpiration(artifact.Path, r.TTL) } - r.eventLogf(ctx, obj, eventv1.EventTypeTrace, sourcev1.ArtifactUpToDateReason, "artifact up-to-date with remote revision: '%s'", artifact.Revision) + sreconcile.EventLogf(ctx, r, obj, eventv1.EventTypeTrace, sourcev1.ArtifactUpToDateReason, "artifact up-to-date with remote revision: '%s'", artifact.Revision) return sreconcile.ResultSuccess, nil } @@ -597,14 +596,14 @@ func (r *HelmRepositoryReconciler) reconcileArtifact(ctx context.Context, sp *pa // authentication. Using the Artifact.Path is safe as the path is in // the format of: ///. if err := r.Cache.Set(artifact.Path, chartRepo.Index, r.TTL); err != nil { - r.eventLogf(ctx, obj, eventv1.EventTypeTrace, sourcev1.CacheOperationFailedReason, "failed to cache index: %s", err) + sreconcile.EventLogf(ctx, r, obj, eventv1.EventTypeTrace, sourcev1.CacheOperationFailedReason, "failed to cache index: %s", err) } } // Update index symlink. indexURL, err := r.Storage.Symlink(*artifact, "index.yaml") if err != nil { - r.eventLogf(ctx, obj, eventv1.EventTypeTrace, sourcev1.SymlinkUpdateFailedReason, + sreconcile.EventLogf(ctx, r, obj, eventv1.EventTypeTrace, sourcev1.SymlinkUpdateFailedReason, "failed to update status URL symlink: %s", err) } if indexURL != "" { @@ -653,7 +652,7 @@ func (r *HelmRepositoryReconciler) garbageCollect(ctx context.Context, obj *sour "GarbageCollectionFailed", ) } else if deleted != "" { - r.eventLogf(ctx, obj, eventv1.EventTypeTrace, "GarbageCollectionSucceeded", + sreconcile.EventLogf(ctx, r, obj, eventv1.EventTypeTrace, "GarbageCollectionSucceeded", "garbage collected artifacts for deleted resource") } // Clean status sub-resource @@ -672,7 +671,7 @@ func (r *HelmRepositoryReconciler) garbageCollect(ctx context.Context, obj *sour ) } if len(delFiles) > 0 { - r.eventLogf(ctx, obj, eventv1.EventTypeTrace, "GarbageCollectionSucceeded", + sreconcile.EventLogf(ctx, r, obj, eventv1.EventTypeTrace, "GarbageCollectionSucceeded", "garbage collected %d artifacts", len(delFiles)) return nil } @@ -680,22 +679,6 @@ func (r *HelmRepositoryReconciler) garbageCollect(ctx context.Context, obj *sour return nil } -// eventLogf records events, and logs at the same time. -// -// This log is different from the debug log in the EventRecorder, in the sense -// that this is a simple log. While the debug log contains complete details -// about the event. -func (r *HelmRepositoryReconciler) eventLogf(ctx context.Context, obj runtime.Object, eventType string, reason string, messageFmt string, args ...interface{}) { - msg := fmt.Sprintf(messageFmt, args...) - // Log and emit event. - if eventType == corev1.EventTypeWarning { - ctrl.LoggerFrom(ctx).Error(errors.New(reason), msg) - } else { - ctrl.LoggerFrom(ctx).Info(msg) - } - r.Eventf(obj, eventType, reason, "%s", msg) -} - // migrateToStatic is HelmRepository OCI migration to static object. func (r *HelmRepositoryReconciler) migrationToStatic(ctx context.Context, sp *patch.SerialPatcher, obj *sourcev1.HelmRepository) (result ctrl.Result, err error) { // Skip migration if suspended and not being deleted. diff --git a/internal/controller/helmrepository_controller_test.go b/internal/controller/helmrepository_controller_test.go index f76d4f221..12f75edb1 100644 --- a/internal/controller/helmrepository_controller_test.go +++ b/internal/controller/helmrepository_controller_test.go @@ -33,21 +33,23 @@ import ( helmgetter "helm.sh/helm/v4/pkg/getter" repo "helm.sh/helm/v4/pkg/repo/v1" corev1 "k8s.io/api/core/v1" + eventsv1 "k8s.io/api/events/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/tools/record" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" fakeclient "sigs.k8s.io/controller-runtime/pkg/client/fake" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" kstatus "github.com/fluxcd/cli-utils/pkg/kstatus/status" + eventv1 "github.com/fluxcd/pkg/apis/event/v1" "github.com/fluxcd/pkg/apis/meta" intdigest "github.com/fluxcd/pkg/artifact/digest" "github.com/fluxcd/pkg/artifact/storage" "github.com/fluxcd/pkg/helmtestserver" "github.com/fluxcd/pkg/runtime/conditions" conditionscheck "github.com/fluxcd/pkg/runtime/conditions/check" + "github.com/fluxcd/pkg/runtime/events" "github.com/fluxcd/pkg/runtime/patch" "github.com/fluxcd/pkg/runtime/secrets" @@ -86,7 +88,7 @@ func TestHelmRepositoryReconciler_deleteBeforeFinalizer(t *testing.T) { r := &HelmRepositoryReconciler{ Client: k8sClient, - EventRecorder: record.NewFakeRecorder(32), + EventRecorder: events.NewFakeRecorder(32, false), Storage: testStorage, } // NOTE: Only a real API server responds with an error in this scenario. @@ -354,7 +356,7 @@ func TestHelmRepositoryReconciler_reconcileStorage(t *testing.T) { WithScheme(testEnv.GetScheme()). WithStatusSubresource(&sourcev1.HelmRepository{}). Build(), - EventRecorder: record.NewFakeRecorder(32), + EventRecorder: events.NewFakeRecorder(32, false), Storage: testStorage, patchOptions: getPatchOptions(helmRepositoryReadyCondition.Owned, "sc"), } @@ -1028,7 +1030,7 @@ func TestHelmRepositoryReconciler_reconcileSource(t *testing.T) { } r := &HelmRepositoryReconciler{ - EventRecorder: record.NewFakeRecorder(32), + EventRecorder: events.NewFakeRecorder(32, false), Client: clientBuilder.Build(), Storage: testStorage, Getters: testGetters, @@ -1171,7 +1173,7 @@ func TestHelmRepositoryReconciler_reconcileArtifact(t *testing.T) { WithScheme(testEnv.GetScheme()). WithStatusSubresource(&sourcev1.HelmRepository{}). Build(), - EventRecorder: record.NewFakeRecorder(32), + EventRecorder: events.NewFakeRecorder(32, false), Storage: testStorage, Cache: tt.cache, TTL: 1 * time.Minute, @@ -1443,7 +1445,7 @@ func TestHelmRepositoryReconciler_statusConditions(t *testing.T) { } ctx := context.TODO() - summarizeHelper := summarize.NewHelper(record.NewFakeRecorder(32), serialPatcher) + summarizeHelper := summarize.NewHelper(events.NewFakeRecorder(32, false), serialPatcher) summarizeOpts := []summarize.Option{ summarize.WithConditions(helmRepositoryReadyCondition), summarize.WithReconcileResult(sreconcile.ResultSuccess), @@ -1469,7 +1471,7 @@ func TestHelmRepositoryReconciler_notify(t *testing.T) { resErr error oldObjBeforeFunc func(obj *sourcev1.HelmRepository) newObjBeforeFunc func(obj *sourcev1.HelmRepository) - wantEvent string + wantEvent *eventsv1.Event }{ { name: "error - no event", @@ -1483,7 +1485,12 @@ func TestHelmRepositoryReconciler_notify(t *testing.T) { newObjBeforeFunc: func(obj *sourcev1.HelmRepository) { obj.Status.Artifact = &meta.Artifact{Revision: "xxx", Digest: "yyy", Size: nil} }, - wantEvent: "Normal NewArtifact stored fetched index of unknown size", + wantEvent: &eventsv1.Event{ + Type: "Normal", + Reason: "NewArtifact", + Action: eventv1.ActionApplied, + Note: "stored fetched index of unknown size", + }, }, { name: "new artifact", @@ -1492,7 +1499,12 @@ func TestHelmRepositoryReconciler_notify(t *testing.T) { newObjBeforeFunc: func(obj *sourcev1.HelmRepository) { obj.Status.Artifact = &meta.Artifact{Revision: "xxx", Digest: "yyy", Size: &aSize} }, - wantEvent: "Normal NewArtifact stored fetched index of size", + wantEvent: &eventsv1.Event{ + Type: "Normal", + Reason: "NewArtifact", + Action: eventv1.ActionApplied, + Note: "stored fetched index of size", + }, }, { name: "recovery from failure", @@ -1507,7 +1519,12 @@ func TestHelmRepositoryReconciler_notify(t *testing.T) { obj.Status.Artifact = &meta.Artifact{Revision: "xxx", Digest: "yyy", Size: &aSize} conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready") }, - wantEvent: "Normal Succeeded stored fetched index of size", + wantEvent: &eventsv1.Event{ + Type: "Normal", + Reason: "Succeeded", + Action: eventv1.ActionReconciled, + Note: "stored fetched index of size", + }, }, { name: "recovery and new artifact", @@ -1522,7 +1539,12 @@ func TestHelmRepositoryReconciler_notify(t *testing.T) { obj.Status.Artifact = &meta.Artifact{Revision: "aaa", Digest: "bbb", Size: &aSize} conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready") }, - wantEvent: "Normal NewArtifact stored fetched index of size", + wantEvent: &eventsv1.Event{ + Type: "Normal", + Reason: "NewArtifact", + Action: eventv1.ActionApplied, + Note: "stored fetched index of size", + }, }, { name: "no updates", @@ -1542,7 +1564,7 @@ func TestHelmRepositoryReconciler_notify(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { g := NewWithT(t) - recorder := record.NewFakeRecorder(32) + recorder := events.NewFakeRecorder(32, false) oldObj := &sourcev1.HelmRepository{} newObj := oldObj.DeepCopy() @@ -1565,12 +1587,15 @@ func TestHelmRepositoryReconciler_notify(t *testing.T) { select { case x, ok := <-recorder.Events: - g.Expect(ok).To(Equal(tt.wantEvent != ""), "unexpected event received") - if tt.wantEvent != "" { - g.Expect(x).To(ContainSubstring(tt.wantEvent)) + g.Expect(ok).To(Equal(tt.wantEvent != nil), "unexpected event received") + if tt.wantEvent != nil { + g.Expect(x.Type).To(Equal(tt.wantEvent.Type)) + g.Expect(x.Reason).To(Equal(tt.wantEvent.Reason)) + g.Expect(x.Action).To(Equal(tt.wantEvent.Action)) + g.Expect(x.Note).To(ContainSubstring(tt.wantEvent.Note)) } default: - if tt.wantEvent != "" { + if tt.wantEvent != nil { t.Errorf("expected some event to be emitted") } } diff --git a/internal/controller/ocirepository_controller.go b/internal/controller/ocirepository_controller.go index ec941a1fd..94f1b78be 100644 --- a/internal/controller/ocirepository_controller.go +++ b/internal/controller/ocirepository_controller.go @@ -20,7 +20,6 @@ import ( "context" cryptotls "crypto/tls" "encoding/json" - "errors" "fmt" "io" "net/http" @@ -42,14 +41,12 @@ import ( "github.com/sigstore/cosign/v3/pkg/cosign" "helm.sh/helm/v4/pkg/registry" corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" - kuberecorder "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/reconcile" - eventv1 "github.com/fluxcd/pkg/apis/event/v1beta1" + eventv1 "github.com/fluxcd/pkg/apis/event/v1" "github.com/fluxcd/pkg/apis/meta" "github.com/fluxcd/pkg/artifact/storage" "github.com/fluxcd/pkg/auth" @@ -57,6 +54,7 @@ import ( "github.com/fluxcd/pkg/oci" "github.com/fluxcd/pkg/runtime/conditions" helper "github.com/fluxcd/pkg/runtime/controller" + "github.com/fluxcd/pkg/runtime/events" "github.com/fluxcd/pkg/runtime/jitter" "github.com/fluxcd/pkg/runtime/patch" "github.com/fluxcd/pkg/runtime/predicates" @@ -139,7 +137,7 @@ type ociRepositoryReconcileFunc func(ctx context.Context, sp *patch.SerialPatche type OCIRepositoryReconciler struct { client.Client helper.Metrics - kuberecorder.EventRecorder + events.EventRecorder Storage *storage.Storage ControllerName string @@ -982,7 +980,7 @@ func (r *OCIRepositoryReconciler) keychain(ctx context.Context, obj *sourcev1.OC secretRef := types.NamespacedName{Namespace: obj.Namespace, Name: obj.Spec.SecretRef.Name} err := r.Get(ctx, secretRef, &imagePullSecret) if err != nil { - r.eventLogf(ctx, obj, eventv1.EventTypeTrace, sourcev1.AuthenticationFailedReason, + sreconcile.EventLogf(ctx, r, obj, eventv1.EventTypeTrace, sourcev1.AuthenticationFailedReason, "auth secret '%s' not found", obj.Spec.SecretRef.Name) return nil, fmt.Errorf("failed to get secret '%s': %w", secretRef, err) } @@ -1085,7 +1083,7 @@ func (r *OCIRepositoryReconciler) reconcileStorage(ctx context.Context, sp *patc // matches the actual artifact if !artifactMissing { if err := r.Storage.VerifyArtifact(*artifact); err != nil { - r.Eventf(obj, corev1.EventTypeWarning, "ArtifactVerificationFailed", "failed to verify integrity of artifact: %s", err.Error()) + r.Eventf(obj, nil, corev1.EventTypeWarning, "ArtifactVerificationFailed", eventv1.ActionFailed, "failed to verify integrity of artifact: %s", err.Error()) if err = r.Storage.Remove(*artifact); err != nil { return sreconcile.ResultEmpty, fmt.Errorf("failed to remove artifact after digest mismatch: %w", err) @@ -1149,7 +1147,7 @@ func (r *OCIRepositoryReconciler) reconcileArtifact(ctx context.Context, sp *pat // The artifact is up-to-date if obj.GetArtifact().HasRevision(artifact.Revision) && !ociContentConfigChanged(obj) { - r.eventLogf(ctx, obj, eventv1.EventTypeTrace, sourcev1.ArtifactUpToDateReason, + sreconcile.EventLogf(ctx, r, obj, eventv1.EventTypeTrace, sourcev1.ArtifactUpToDateReason, "artifact up-to-date with remote revision: '%s'", artifact.Revision) return sreconcile.ResultSuccess, nil } @@ -1232,7 +1230,7 @@ func (r *OCIRepositoryReconciler) reconcileArtifact(ctx context.Context, sp *pat // Update symlink on a "best effort" basis url, err := r.Storage.Symlink(artifact, "latest.tar.gz") if err != nil { - r.eventLogf(ctx, obj, eventv1.EventTypeTrace, sourcev1.SymlinkUpdateFailedReason, + sreconcile.EventLogf(ctx, r, obj, eventv1.EventTypeTrace, sourcev1.SymlinkUpdateFailedReason, "failed to update status URL symlink: %s", err) } if url != "" { @@ -1276,7 +1274,7 @@ func (r *OCIRepositoryReconciler) garbageCollect(ctx context.Context, obj *sourc "GarbageCollectionFailed", ) } else if deleted != "" { - r.eventLogf(ctx, obj, eventv1.EventTypeTrace, "GarbageCollectionSucceeded", + sreconcile.EventLogf(ctx, r, obj, eventv1.EventTypeTrace, "GarbageCollectionSucceeded", "garbage collected artifacts for deleted resource") } obj.Status.Artifact = nil @@ -1291,7 +1289,7 @@ func (r *OCIRepositoryReconciler) garbageCollect(ctx context.Context, obj *sourc ) } if len(delFiles) > 0 { - r.eventLogf(ctx, obj, eventv1.EventTypeTrace, "GarbageCollectionSucceeded", + sreconcile.EventLogf(ctx, r, obj, eventv1.EventTypeTrace, "GarbageCollectionSucceeded", "garbage collected %d artifacts", len(delFiles)) return nil } @@ -1299,22 +1297,6 @@ func (r *OCIRepositoryReconciler) garbageCollect(ctx context.Context, obj *sourc return nil } -// eventLogf records events, and logs at the same time. -// -// This log is different from the debug log in the EventRecorder, in the sense -// that this is a simple log. While the debug log contains complete details -// about the event. -func (r *OCIRepositoryReconciler) eventLogf(ctx context.Context, obj runtime.Object, eventType string, reason string, messageFmt string, args ...interface{}) { - msg := fmt.Sprintf(messageFmt, args...) - // Log and emit event. - if eventType == corev1.EventTypeWarning { - ctrl.LoggerFrom(ctx).Error(errors.New(reason), msg) - } else { - ctrl.LoggerFrom(ctx).Info(msg) - } - r.Eventf(obj, eventType, reason, "%s", msg) -} - // notify emits notification related to the reconciliation. func (r *OCIRepositoryReconciler) notify(ctx context.Context, oldObj, newObj *sourcev1.OCIRepository, res sreconcile.Result, resErr error) { // Notify successful reconciliation for new artifact and recovery from any @@ -1343,13 +1325,13 @@ func (r *OCIRepositoryReconciler) notify(ctx context.Context, oldObj, newObj *so // Notify on new artifact and failure recovery. if !oldObj.GetArtifact().HasDigest(newObj.GetArtifact().Digest) { - r.AnnotatedEventf(newObj, annotations, corev1.EventTypeNormal, - "NewArtifact", "%s", message) + r.AnnotatedEventf(newObj, nil, annotations, corev1.EventTypeNormal, + "NewArtifact", eventv1.ActionApplied, "%s", message) ctrl.LoggerFrom(ctx).Info(message) } else { if sreconcile.FailureRecovery(oldObj, newObj, ociRepositoryFailConditions) { - r.AnnotatedEventf(newObj, annotations, corev1.EventTypeNormal, - meta.SucceededReason, "%s", message) + r.AnnotatedEventf(newObj, nil, annotations, corev1.EventTypeNormal, + meta.SucceededReason, eventv1.ActionReconciled, "%s", message) ctrl.LoggerFrom(ctx).Info(message) } } diff --git a/internal/controller/ocirepository_controller_test.go b/internal/controller/ocirepository_controller_test.go index 3b18e1fe1..6d5f10ebd 100644 --- a/internal/controller/ocirepository_controller_test.go +++ b/internal/controller/ocirepository_controller_test.go @@ -49,9 +49,9 @@ import ( "github.com/sigstore/cosign/v3/cmd/cosign/cli/sign" "github.com/sigstore/cosign/v3/pkg/cosign" corev1 "k8s.io/api/core/v1" + eventsv1 "k8s.io/api/events/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/tools/record" "k8s.io/utils/ptr" oras "oras.land/oras-go/v2/registry/remote" ctrl "sigs.k8s.io/controller-runtime" @@ -60,6 +60,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" kstatus "github.com/fluxcd/cli-utils/pkg/kstatus/status" + eventv1 "github.com/fluxcd/pkg/apis/event/v1" "github.com/fluxcd/pkg/apis/meta" intdigest "github.com/fluxcd/pkg/artifact/digest" "github.com/fluxcd/pkg/artifact/storage" @@ -68,6 +69,7 @@ import ( "github.com/fluxcd/pkg/oci" "github.com/fluxcd/pkg/runtime/conditions" conditionscheck "github.com/fluxcd/pkg/runtime/conditions/check" + "github.com/fluxcd/pkg/runtime/events" "github.com/fluxcd/pkg/runtime/patch" "github.com/fluxcd/pkg/tar" @@ -110,7 +112,7 @@ func TestOCIRepositoryReconciler_deleteBeforeFinalizer(t *testing.T) { r := &OCIRepositoryReconciler{ Client: k8sClient, - EventRecorder: record.NewFakeRecorder(32), + EventRecorder: events.NewFakeRecorder(32, false), Storage: testStorage, CosignVerifierFactory: testCosignVerifierFactory, } @@ -806,7 +808,7 @@ func TestOCIRepository_reconcileSource_authStrategy(t *testing.T) { r := &OCIRepositoryReconciler{ Client: clientBuilder.Build(), - EventRecorder: record.NewFakeRecorder(32), + EventRecorder: events.NewFakeRecorder(32, false), Storage: testStorage, CosignVerifierFactory: testCosignVerifierFactory, patchOptions: getPatchOptions(ociRepositoryReadyCondition.Owned, "sc"), @@ -1266,7 +1268,7 @@ func TestOCIRepository_reconcileSource_remoteReference(t *testing.T) { r := &OCIRepositoryReconciler{ Client: clientBuilder.Build(), - EventRecorder: record.NewFakeRecorder(32), + EventRecorder: events.NewFakeRecorder(32, false), Storage: testStorage, CosignVerifierFactory: testCosignVerifierFactory, patchOptions: getPatchOptions(ociRepositoryReadyCondition.Owned, "sc"), @@ -1469,7 +1471,7 @@ func TestOCIRepository_reconcileSource_verifyOCISourceSignatureNotation(t *testi r := &OCIRepositoryReconciler{ Client: clientBuilder.Build(), - EventRecorder: record.NewFakeRecorder(32), + EventRecorder: events.NewFakeRecorder(32, false), Storage: testStorage, CosignVerifierFactory: testCosignVerifierFactory, patchOptions: getPatchOptions(ociRepositoryReadyCondition.Owned, "sc"), @@ -1833,7 +1835,7 @@ func TestOCIRepository_reconcileSource_verifyOCISourceTrustPolicyNotation(t *tes r := &OCIRepositoryReconciler{ Client: clientBuilder.Build(), - EventRecorder: record.NewFakeRecorder(32), + EventRecorder: events.NewFakeRecorder(32, false), Storage: testStorage, CosignVerifierFactory: testCosignVerifierFactory, patchOptions: getPatchOptions(ociRepositoryReadyCondition.Owned, "sc"), @@ -2130,7 +2132,7 @@ func TestOCIRepository_reconcileSource_verifyOCISourceSignatureCosign(t *testing r := &OCIRepositoryReconciler{ Client: clientBuilder.Build(), - EventRecorder: record.NewFakeRecorder(32), + EventRecorder: events.NewFakeRecorder(32, false), Storage: testStorage, CosignVerifierFactory: testCosignVerifierFactory, patchOptions: getPatchOptions(ociRepositoryReadyCondition.Owned, "sc"), @@ -2400,7 +2402,7 @@ func TestOCIRepository_reconcileSource_verifyOCISourceSignature_keyless(t *testi r := &OCIRepositoryReconciler{ Client: clientBuilder.Build(), - EventRecorder: record.NewFakeRecorder(32), + EventRecorder: events.NewFakeRecorder(32, false), Storage: testStorage, CosignVerifierFactory: testCosignVerifierFactory, patchOptions: getPatchOptions(ociRepositoryReadyCondition.Owned, "sc"), @@ -2586,7 +2588,7 @@ func TestOCIRepository_reconcileSource_noop(t *testing.T) { r := &OCIRepositoryReconciler{ Client: clientBuilder.Build(), - EventRecorder: record.NewFakeRecorder(32), + EventRecorder: events.NewFakeRecorder(32, false), Storage: testStorage, patchOptions: getPatchOptions(ociRepositoryReadyCondition.Owned, "sc"), } @@ -2818,7 +2820,7 @@ func TestOCIRepository_reconcileArtifact(t *testing.T) { r := &OCIRepositoryReconciler{ Client: clientBuilder.Build(), - EventRecorder: record.NewFakeRecorder(32), + EventRecorder: events.NewFakeRecorder(32, false), Storage: testStorage, patchOptions: getPatchOptions(ociRepositoryReadyCondition.Owned, "sc"), } @@ -2983,7 +2985,7 @@ func TestOCIRepository_getArtifactRef(t *testing.T) { r := &OCIRepositoryReconciler{ Client: clientBuilder.Build(), - EventRecorder: record.NewFakeRecorder(32), + EventRecorder: events.NewFakeRecorder(32, false), Storage: testStorage, patchOptions: getPatchOptions(ociRepositoryReadyCondition.Owned, "sc"), } @@ -3317,7 +3319,7 @@ func TestOCIRepository_reconcileStorage(t *testing.T) { r := &OCIRepositoryReconciler{ Client: clientBuilder.Build(), - EventRecorder: record.NewFakeRecorder(32), + EventRecorder: events.NewFakeRecorder(32, false), Storage: testStorage, patchOptions: getPatchOptions(ociRepositoryReadyCondition.Owned, "sc"), } @@ -3380,7 +3382,7 @@ func TestOCIRepository_ReconcileDelete(t *testing.T) { g := NewWithT(t) r := &OCIRepositoryReconciler{ - EventRecorder: record.NewFakeRecorder(32), + EventRecorder: events.NewFakeRecorder(32, false), Storage: testStorage, CosignVerifierFactory: testCosignVerifierFactory, patchOptions: getPatchOptions(ociRepositoryReadyCondition.Owned, "sc"), @@ -3419,7 +3421,7 @@ func TestOCIRepositoryReconciler_notify(t *testing.T) { oldObjBeforeFunc func(obj *sourcev1.OCIRepository) newObjBeforeFunc func(obj *sourcev1.OCIRepository) commit git.Commit - wantEvent string + wantEvent *eventsv1.Event }{ { name: "error - no event", @@ -3441,7 +3443,12 @@ func TestOCIRepositoryReconciler_notify(t *testing.T) { }, } }, - wantEvent: "Normal NewArtifact stored artifact with revision 'xxx' from 'oci://newurl.io', origin source 'https://github.com/stefanprodan/podinfo', origin revision '6.1.8/b3b00fe35424a45d373bf4c7214178bc36fd7872'", + wantEvent: &eventsv1.Event{ + Type: "Normal", + Reason: "NewArtifact", + Action: eventv1.ActionApplied, + Note: "stored artifact with revision 'xxx' from 'oci://newurl.io', origin source 'https://github.com/stefanprodan/podinfo', origin revision '6.1.8/b3b00fe35424a45d373bf4c7214178bc36fd7872'", + }, }, { name: "recovery from failure", @@ -3457,7 +3464,12 @@ func TestOCIRepositoryReconciler_notify(t *testing.T) { obj.Status.Artifact = &meta.Artifact{Revision: "xxx", Digest: "yyy"} conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready") }, - wantEvent: "Normal Succeeded stored artifact with revision 'xxx' from 'oci://newurl.io'", + wantEvent: &eventsv1.Event{ + Type: "Normal", + Reason: "Succeeded", + Action: eventv1.ActionReconciled, + Note: "stored artifact with revision 'xxx' from 'oci://newurl.io'", + }, }, { name: "recovery and new artifact", @@ -3473,7 +3485,12 @@ func TestOCIRepositoryReconciler_notify(t *testing.T) { obj.Status.Artifact = &meta.Artifact{Revision: "aaa", Digest: "bbb"} conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready") }, - wantEvent: "Normal NewArtifact stored artifact with revision 'aaa' from 'oci://newurl.io'", + wantEvent: &eventsv1.Event{ + Type: "Normal", + Reason: "NewArtifact", + Action: eventv1.ActionApplied, + Note: "stored artifact with revision 'aaa' from 'oci://newurl.io'", + }, }, { name: "no updates", @@ -3502,7 +3519,7 @@ func TestOCIRepositoryReconciler_notify(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { g := NewWithT(t) - recorder := record.NewFakeRecorder(32) + recorder := events.NewFakeRecorder(32, false) oldObj := &sourcev1.OCIRepository{} newObj := oldObj.DeepCopy() @@ -3522,12 +3539,15 @@ func TestOCIRepositoryReconciler_notify(t *testing.T) { select { case x, ok := <-recorder.Events: - g.Expect(ok).To(Equal(tt.wantEvent != ""), "unexpected event received") - if tt.wantEvent != "" { - g.Expect(x).To(ContainSubstring(tt.wantEvent)) + g.Expect(ok).To(Equal(tt.wantEvent != nil), "unexpected event received") + if tt.wantEvent != nil { + g.Expect(x.Type).To(Equal(tt.wantEvent.Type)) + g.Expect(x.Reason).To(Equal(tt.wantEvent.Reason)) + g.Expect(x.Action).To(Equal(tt.wantEvent.Action)) + g.Expect(x.Note).To(ContainSubstring(tt.wantEvent.Note)) } default: - if tt.wantEvent != "" { + if tt.wantEvent != nil { t.Errorf("expected some event to be emitted") } } diff --git a/internal/controller/suite_test.go b/internal/controller/suite_test.go index 53da2f74e..d2ad3a0c5 100644 --- a/internal/controller/suite_test.go +++ b/internal/controller/suite_test.go @@ -43,7 +43,6 @@ import ( helmreg "helm.sh/helm/v4/pkg/registry" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/kubernetes/scheme" - "k8s.io/client-go/tools/record" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/yaml" @@ -52,6 +51,7 @@ import ( "github.com/fluxcd/pkg/artifact/digest" "github.com/fluxcd/pkg/artifact/storage" "github.com/fluxcd/pkg/runtime/controller" + "github.com/fluxcd/pkg/runtime/events" "github.com/fluxcd/pkg/runtime/metrics" "github.com/fluxcd/pkg/runtime/testenv" "github.com/fluxcd/pkg/testserver" @@ -322,7 +322,7 @@ func TestMain(m *testing.M) { if err := (&GitRepositoryReconciler{ Client: testEnv, - EventRecorder: record.NewFakeRecorder(32), + EventRecorder: events.NewFakeRecorder(32, false), Metrics: testMetricsH, Storage: testStorage, }).SetupWithManager(testEnv, GitRepositoryReconcilerOptions{ @@ -333,7 +333,7 @@ func TestMain(m *testing.M) { if err := (&BucketReconciler{ Client: testEnv, - EventRecorder: record.NewFakeRecorder(32), + EventRecorder: events.NewFakeRecorder(32, false), Metrics: testMetricsH, Storage: testStorage, }).SetupWithManager(testEnv, BucketReconcilerOptions{ @@ -347,7 +347,7 @@ func TestMain(m *testing.M) { if err := (&OCIRepositoryReconciler{ Client: testEnv, - EventRecorder: record.NewFakeRecorder(32), + EventRecorder: events.NewFakeRecorder(32, false), Metrics: testMetricsH, Storage: testStorage, }).SetupWithManager(testEnv, OCIRepositoryReconcilerOptions{ @@ -358,7 +358,7 @@ func TestMain(m *testing.M) { if err := (&HelmRepositoryReconciler{ Client: testEnv, - EventRecorder: record.NewFakeRecorder(32), + EventRecorder: events.NewFakeRecorder(32, false), Metrics: testMetricsH, Getters: testGetters, Storage: testStorage, @@ -373,7 +373,7 @@ func TestMain(m *testing.M) { if err := (&HelmChartReconciler{ Client: testEnv, - EventRecorder: record.NewFakeRecorder(32), + EventRecorder: events.NewFakeRecorder(32, false), Metrics: testMetricsH, Getters: testGetters, Storage: testStorage, diff --git a/internal/reconcile/event.go b/internal/reconcile/event.go new file mode 100644 index 000000000..1dbec884d --- /dev/null +++ b/internal/reconcile/event.go @@ -0,0 +1,47 @@ +/* +Copyright 2026 The Flux authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package reconcile + +import ( + "context" + "errors" + "fmt" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" + ctrl "sigs.k8s.io/controller-runtime" + + eventv1 "github.com/fluxcd/pkg/apis/event/v1" + "github.com/fluxcd/pkg/runtime/events" +) + +// EventLogf records an event and logs the message at the same time. +// +// This log is different from the debug log in the EventRecorder, in the sense +// that this is a simple log. While the debug log contains complete details +// about the event. +func EventLogf(ctx context.Context, rec events.EventRecorder, obj runtime.Object, eventType, reason, messageFmt string, args ...interface{}) { + msg := fmt.Sprintf(messageFmt, args...) + action := eventv1.ActionReconciled + if eventType == corev1.EventTypeWarning { + action = eventv1.ActionFailed + ctrl.LoggerFrom(ctx).Error(errors.New(reason), msg) + } else { + ctrl.LoggerFrom(ctx).Info(msg) + } + rec.Eventf(obj, nil, eventType, reason, action, "%s", msg) +} diff --git a/internal/reconcile/summarize/processor.go b/internal/reconcile/summarize/processor.go index fb2e655c2..06d0c05f9 100644 --- a/internal/reconcile/summarize/processor.go +++ b/internal/reconcile/summarize/processor.go @@ -20,12 +20,12 @@ import ( "context" corev1 "k8s.io/api/core/v1" - kuberecorder "k8s.io/client-go/tools/record" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" - eventv1 "github.com/fluxcd/pkg/apis/event/v1beta1" + eventv1 "github.com/fluxcd/pkg/apis/event/v1" "github.com/fluxcd/pkg/apis/meta" + "github.com/fluxcd/pkg/runtime/events" serror "github.com/fluxcd/source-controller/internal/error" "github.com/fluxcd/source-controller/internal/object" "github.com/fluxcd/source-controller/internal/reconcile" @@ -34,12 +34,12 @@ import ( // ResultProcessor processes the results of reconciliation (the object, result // and error). Any errors during processing need not result in the // reconciliation failure. The errors can be recorded as logs and events. -type ResultProcessor func(context.Context, kuberecorder.EventRecorder, client.Object, reconcile.Result, error) +type ResultProcessor func(context.Context, events.EventRecorder, client.Object, reconcile.Result, error) // RecordReconcileReq is a ResultProcessor that checks the reconcile // annotation value and sets it in the object status as // status.lastHandledReconcileAt. -func RecordReconcileReq(ctx context.Context, recorder kuberecorder.EventRecorder, obj client.Object, _ reconcile.Result, _ error) { +func RecordReconcileReq(ctx context.Context, recorder events.EventRecorder, obj client.Object, _ reconcile.Result, _ error) { if v, ok := meta.ReconcileAnnotationValue(obj.GetAnnotations()); ok { object.SetStatusLastHandledReconcileAt(obj, v) } @@ -49,7 +49,7 @@ func RecordReconcileReq(ctx context.Context, recorder kuberecorder.EventRecorder // configured in the given error. Logging and event recording are the handled // actions at present. As more configurations are added to serror.Config, more // action handlers can be added here. -func ErrorActionHandler(ctx context.Context, recorder kuberecorder.EventRecorder, obj client.Object, _ reconcile.Result, err error) { +func ErrorActionHandler(ctx context.Context, recorder events.EventRecorder, obj client.Object, _ reconcile.Result, err error) { switch e := err.(type) { case *serror.Generic: if e.Log { @@ -80,7 +80,7 @@ func logError(ctx context.Context, eventType string, err error, msg string, keys } // recordEvent records events based on the passed error configurations. -func recordEvent(recorder kuberecorder.EventRecorder, obj client.Object, eventType string, notification bool, err error, reason string) { +func recordEvent(recorder events.EventRecorder, obj client.Object, eventType string, notification bool, err error, reason string) { if eventType == serror.EventTypeNone { return } @@ -88,16 +88,16 @@ func recordEvent(recorder kuberecorder.EventRecorder, obj client.Object, eventTy case corev1.EventTypeNormal: if notification { // K8s native event and notification-controller event. - recorder.Eventf(obj, corev1.EventTypeNormal, reason, "%s", err.Error()) + recorder.Eventf(obj, nil, corev1.EventTypeNormal, reason, eventv1.ActionApplied, "%s", err.Error()) } else { // K8s native event only. - recorder.Eventf(obj, eventv1.EventTypeTrace, reason, "%s", err.Error()) + recorder.Eventf(obj, nil, eventv1.EventTypeTrace, reason, eventv1.ActionApplied, "%s", err.Error()) } case corev1.EventTypeWarning: // TODO: Due to the current implementation of the event recorder, all // the K8s warning events are also sent as notification controller // notifications. Once the recorder becomes capable of separating the // two, conditionally record events. - recorder.Eventf(obj, corev1.EventTypeWarning, reason, "%s", err.Error()) + recorder.Eventf(obj, nil, corev1.EventTypeWarning, reason, eventv1.ActionFailed, "%s", err.Error()) } } diff --git a/internal/reconcile/summarize/processor_test.go b/internal/reconcile/summarize/processor_test.go index 44f68b5bf..d7af0a00f 100644 --- a/internal/reconcile/summarize/processor_test.go +++ b/internal/reconcile/summarize/processor_test.go @@ -22,10 +22,10 @@ import ( . "github.com/onsi/gomega" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/tools/record" "sigs.k8s.io/controller-runtime/pkg/client" "github.com/fluxcd/pkg/apis/meta" + "github.com/fluxcd/pkg/runtime/events" sourcev1 "github.com/fluxcd/source-controller/api/v1" "github.com/fluxcd/source-controller/internal/object" @@ -119,7 +119,7 @@ func TestRecordReconcileReq(t *testing.T) { } ctx := context.TODO() - RecordReconcileReq(ctx, record.NewFakeRecorder(32), obj, reconcile.ResultEmpty, nil) + RecordReconcileReq(ctx, events.NewFakeRecorder(32, false), obj, reconcile.ResultEmpty, nil) if tt.afterFunc != nil { tt.afterFunc(g, obj) diff --git a/internal/reconcile/summarize/summary.go b/internal/reconcile/summarize/summary.go index 8650a0907..7a7ea8762 100644 --- a/internal/reconcile/summarize/summary.go +++ b/internal/reconcile/summarize/summary.go @@ -22,11 +22,11 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" kerrors "k8s.io/apimachinery/pkg/util/errors" - kuberecorder "k8s.io/client-go/tools/record" ctrl "sigs.k8s.io/controller-runtime" "github.com/fluxcd/pkg/apis/meta" "github.com/fluxcd/pkg/runtime/conditions" + "github.com/fluxcd/pkg/runtime/events" "github.com/fluxcd/pkg/runtime/patch" "github.com/fluxcd/source-controller/internal/reconcile" @@ -50,12 +50,12 @@ type Conditions struct { // Helper is SummarizeAndPatch helper. type Helper struct { - recorder kuberecorder.EventRecorder + recorder events.EventRecorder serialPatcher *patch.SerialPatcher } // NewHelper returns an initialized Helper. -func NewHelper(recorder kuberecorder.EventRecorder, serialPatcher *patch.SerialPatcher) *Helper { +func NewHelper(recorder events.EventRecorder, serialPatcher *patch.SerialPatcher) *Helper { return &Helper{ recorder: recorder, serialPatcher: serialPatcher, diff --git a/internal/reconcile/summarize/summary_test.go b/internal/reconcile/summarize/summary_test.go index c4c16e4eb..c9acbca38 100644 --- a/internal/reconcile/summarize/summary_test.go +++ b/internal/reconcile/summarize/summary_test.go @@ -26,7 +26,6 @@ import ( . "github.com/onsi/gomega" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/client-go/tools/record" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" fakeclient "sigs.k8s.io/controller-runtime/pkg/client/fake" @@ -34,6 +33,7 @@ import ( "github.com/fluxcd/pkg/apis/meta" "github.com/fluxcd/pkg/runtime/conditions" conditionscheck "github.com/fluxcd/pkg/runtime/conditions/check" + "github.com/fluxcd/pkg/runtime/events" "github.com/fluxcd/pkg/runtime/patch" sourcev1 "github.com/fluxcd/source-controller/api/v1" @@ -351,7 +351,7 @@ func TestSummarizeAndPatch(t *testing.T) { serialPatcher := patch.NewSerialPatcher(obj, c) - summaryHelper := NewHelper(record.NewFakeRecorder(32), serialPatcher) + summaryHelper := NewHelper(events.NewFakeRecorder(32, false), serialPatcher) summaryOpts := []Option{ WithReconcileResult(tt.result), WithReconcileError(tt.reconcileErr), @@ -479,7 +479,7 @@ func TestSummarizeAndPatch_Intermediate(t *testing.T) { g.Expect(c.Create(ctx, obj)).To(Succeed()) serialPatcher := patch.NewSerialPatcher(obj, c) - summaryHelper := NewHelper(record.NewFakeRecorder(32), serialPatcher) + summaryHelper := NewHelper(events.NewFakeRecorder(32, false), serialPatcher) summaryOpts := []Option{ WithConditions(tt.conditions...), WithResultBuilder(reconcile.AlwaysRequeueResultBuilder{RequeueAfter: interval}), diff --git a/main.go b/main.go index 75d897bd8..271ae2de6 100644 --- a/main.go +++ b/main.go @@ -28,7 +28,6 @@ import ( utilruntime "k8s.io/apimachinery/pkg/util/runtime" clientgoscheme "k8s.io/client-go/kubernetes/scheme" _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" - "k8s.io/client-go/tools/record" "k8s.io/utils/ptr" ctrl "sigs.k8s.io/controller-runtime" ctrlcache "sigs.k8s.io/controller-runtime/pkg/cache" @@ -195,7 +194,12 @@ func main() { metrics := helper.NewMetrics(mgr, metrics.MustMakeRecorder(), sourcev1.SourceFinalizer) cacheRecorder := cache.MustMakeMetrics() - eventRecorder := mustSetupEventRecorder(mgr, eventsAddr, controllerName) + + eventRecorder, err := events.NewRecorder(mgr, ctrl.Log, eventsAddr, controllerName) + if err != nil { + setupLog.Error(err, "unable to create event recorder") + os.Exit(1) + } algo, err := artdigest.AlgorithmForName(artifactOptions.ArtifactDigestAlgo) if err != nil { @@ -328,15 +332,6 @@ func main() { } } -func mustSetupEventRecorder(mgr ctrl.Manager, eventsAddr, controllerName string) record.EventRecorder { - eventRecorder, err := events.NewRecorder(mgr, ctrl.Log, eventsAddr, controllerName) - if err != nil { - setupLog.Error(err, "unable to create event recorder") - os.Exit(1) - } - return eventRecorder -} - func mustSetupManager(metricsAddr, healthAddr string, maxConcurrent int, watchOpts helper.WatchOptions, clientOpts client.Options, leaderOpts leaderelection.Options) ctrl.Manager {