@@ -2,26 +2,38 @@ package action
22
33import (
44 "context"
5+ "encoding/json"
56 "fmt"
7+ "strings"
68 "time"
79
810 "github.com/operator-framework/api/pkg/operators/v1alpha1"
911 "github.com/operator-framework/operator-registry/pkg/image"
1012 "github.com/operator-framework/operator-registry/pkg/image/containerdregistry"
1113 "github.com/spf13/pflag"
14+ corev1 "k8s.io/api/core/v1"
15+ apierrors "k8s.io/apimachinery/pkg/api/errors"
16+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1217 "k8s.io/apimachinery/pkg/types"
18+ "k8s.io/apimachinery/pkg/util/rand"
1319 "k8s.io/apimachinery/pkg/util/wait"
20+ "k8s.io/client-go/util/retry"
1421 "sigs.k8s.io/controller-runtime/pkg/client"
22+ "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
1523
1624 "github.com/joelanford/kubectl-operator/internal/pkg/catalog"
1725 "github.com/joelanford/kubectl-operator/internal/pkg/log"
1826)
1927
28+ const grpcPort = "50051"
29+
2030type CatalogAdd struct {
2131 config * Configuration
2232
2333 CatalogSourceName string
2434 IndexImage string
35+ InjectBundles []string
36+ InjectBundleMode string
2537 DisplayName string
2638 Publisher string
2739 AddTimeout time.Duration
@@ -43,6 +55,10 @@ func (a *CatalogAdd) BindFlags(fs *pflag.FlagSet) {
4355 fs .StringVarP (& a .Publisher , "publisher" , "p" , "" , "publisher of the index" )
4456 fs .DurationVarP (& a .AddTimeout , "timeout" , "t" , time .Minute , "the amount of time to wait before cancelling the catalog addition" )
4557 fs .DurationVar (& a .CleanupTimeout , "cleanup-timeout" , time .Minute , "the amount to time to wait before cancelling cleanup" )
58+
59+ fs .StringArrayVarP (& a .InjectBundles , "inject-bundles" , "b" , nil , "inject extra bundles into the index at runtime" )
60+ fs .StringVarP (& a .InjectBundleMode , "inject-bundle-mode" , "m" , "" , "mode to use to inject bundles" )
61+ _ = fs .MarkHidden ("inject-bundle-mode" )
4662}
4763
4864func (a * CatalogAdd ) Run (ctx context.Context ) (* v1alpha1.CatalogSource , error ) {
@@ -70,13 +86,37 @@ func (a *CatalogAdd) Run(ctx context.Context) (*v1alpha1.CatalogSource, error) {
7086
7187 a .setDefaults (labels )
7288
89+ var registryPod * corev1.Pod
90+ if len (a .InjectBundles ) > 0 {
91+ if registryPod , err = a .createRegistryPod (ctx ); err != nil {
92+ return nil , err
93+ }
94+ }
95+
7396 opts := []catalog.Option {
74- catalog .Image (a .IndexImage ),
7597 catalog .DisplayName (a .DisplayName ),
7698 catalog .Publisher (a .Publisher ),
7799 }
100+
101+ if registryPod == nil {
102+ opts = append (opts , catalog .Image (a .IndexImage ))
103+ } else {
104+ address := fmt .Sprintf ("%s:%s" , registryPod .Status .PodIP , grpcPort )
105+ injectedBundlesJSON , err := json .Marshal (a .InjectBundles )
106+ if err != nil {
107+ return nil , fmt .Errorf ("json marshal injected bundles: %v" , err )
108+ }
109+ annotations := map [string ]string {
110+ "operators.operatorframework.io/injected-bundles" : string (injectedBundlesJSON ),
111+ }
112+ opts = append (opts ,
113+ catalog .Address (address ),
114+ catalog .Annotations (annotations ),
115+ )
116+ }
117+
78118 cs := catalog .Build (csKey , opts ... )
79- if err := a .add (ctx , cs ); err != nil {
119+ if err := a .add (ctx , cs , registryPod ); err != nil {
80120 defer a .cleanup (cs )
81121 return nil , err
82122 }
@@ -106,13 +146,85 @@ func (a *CatalogAdd) setDefaults(labels map[string]string) {
106146 a .Publisher = v
107147 }
108148 }
149+ if a .InjectBundleMode == "" {
150+ if strings .HasPrefix (a .IndexImage , "quay.io/operator-framework/upstream-opm-builder" ) {
151+ a .InjectBundleMode = "semver"
152+ } else {
153+ a .InjectBundleMode = "replaces"
154+ }
155+ }
156+ }
157+
158+ func (a * CatalogAdd ) createRegistryPod (ctx context.Context ) (* corev1.Pod , error ) {
159+ command := []string {
160+ "/bin/sh" ,
161+ "-c" ,
162+ fmt .Sprintf (`mkdir -p /database && \
163+ /bin/opm registry add -d /database/index.db --mode=%s -b %s && \
164+ /bin/opm registry serve -d /database/index.db -p %s` , a .InjectBundleMode , strings .Join (a .InjectBundles , "," ), grpcPort ),
165+ }
166+
167+ pod := & corev1.Pod {
168+ ObjectMeta : metav1.ObjectMeta {
169+ Name : fmt .Sprintf ("%s-%s" , a .CatalogSourceName , rand .String (4 )),
170+ Namespace : a .config .Namespace ,
171+ },
172+ Spec : corev1.PodSpec {
173+ Containers : []corev1.Container {
174+ {
175+ Name : "registry" ,
176+ Image : a .IndexImage ,
177+ Command : command ,
178+ },
179+ },
180+ },
181+ }
182+ if err := a .config .Client .Create (ctx , pod ); err != nil {
183+ return nil , err
184+ }
185+
186+ if err := wait .PollImmediateUntil (time .Millisecond * 250 , func () (bool , error ) {
187+ podKey , err := client .ObjectKeyFromObject (pod )
188+ if err != nil {
189+ return false , fmt .Errorf ("get pod key: %v" , err )
190+ }
191+ if err := a .config .Client .Get (ctx , podKey , pod ); err != nil {
192+ return false , err
193+ }
194+ if pod .Status .Phase == corev1 .PodRunning && pod .Status .PodIP != "" {
195+ return true , nil
196+ }
197+ return false , nil
198+ }, ctx .Done ()); err != nil {
199+ return nil , fmt .Errorf ("registry pod not ready: %v" , err )
200+ }
201+ return pod , nil
109202}
110203
111- func (a * CatalogAdd ) add (ctx context.Context , cs * v1alpha1.CatalogSource ) error {
204+ func (a * CatalogAdd ) add (ctx context.Context , cs * v1alpha1.CatalogSource , pod * corev1. Pod ) error {
112205 if err := a .config .Client .Create (ctx , cs ); err != nil {
113206 return fmt .Errorf ("create catalogsource: %v" , err )
114207 }
115208
209+ if pod != nil {
210+ retry .RetryOnConflict (retry .DefaultBackoff , func () error {
211+ podKey , err := client .ObjectKeyFromObject (pod )
212+ if err != nil {
213+ return fmt .Errorf ("get pod key: %v" , err )
214+ }
215+ if err := a .config .Client .Get (ctx , podKey , pod ); err != nil {
216+ return fmt .Errorf ("get registry pod: %v" , err )
217+ }
218+ if err := controllerutil .SetOwnerReference (cs , pod , a .config .Scheme ); err != nil {
219+ return fmt .Errorf ("set registry pod owner reference: %v" , err )
220+ }
221+ if err := a .config .Client .Update (ctx , pod ); err != nil {
222+ return fmt .Errorf ("update registry pod owner reference: %w" , err )
223+ }
224+ return nil
225+ })
226+ }
227+
116228 csKey , err := client .ObjectKeyFromObject (cs )
117229 if err != nil {
118230 return fmt .Errorf ("get catalogsource key: %v" , err )
@@ -136,7 +248,7 @@ func (a *CatalogAdd) add(ctx context.Context, cs *v1alpha1.CatalogSource) error
136248func (a * CatalogAdd ) cleanup (cs * v1alpha1.CatalogSource ) {
137249 ctx , cancel := context .WithTimeout (context .Background (), a .CleanupTimeout )
138250 defer cancel ()
139- if err := a .config .Client .Delete (ctx , cs ); err != nil {
251+ if err := a .config .Client .Delete (ctx , cs ); err != nil && ! apierrors . IsNotFound ( err ) {
140252 log .Printf ("delete catalogsource %q: %v" , cs .Name , err )
141253 }
142254}
0 commit comments