Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
182 changes: 133 additions & 49 deletions driver/kubernetes/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,17 @@ type Driver struct {

// if you add fields, remember to update docs:
// https://github.com/docker/docs/blob/main/content/build/drivers/kubernetes.md
minReplicas int
deployment *appsv1.Deployment
configMaps []*corev1.ConfigMap
deploymentClient kubeclient.DeploymentClient
podClient kubeclient.PodClient
configMapClient kubeclient.ConfigMapClient
podChooser podchooser.PodChooser
defaultLoad bool
timeout time.Duration
minReplicas int
deployment *appsv1.Deployment
statefulSet *appsv1.StatefulSet
configMaps []*corev1.ConfigMap
deploymentClient kubeclient.DeploymentClient
statefulSetClient kubeclient.StatefulSetClient
podClient kubeclient.PodClient
configMapClient kubeclient.ConfigMapClient
podChooser podchooser.PodChooser
defaultLoad bool
timeout time.Duration
}

func (d *Driver) IsMobyDriver() bool {
Expand All @@ -65,31 +67,18 @@ func (d *Driver) Config() driver.InitConfig {

func (d *Driver) Bootstrap(ctx context.Context, l progress.Logger) error {
return progress.Wrap("[internal] booting buildkit", l, func(sub progress.SubLogger) error {
_, err := d.deploymentClient.Get(ctx, d.deployment.Name, metav1.GetOptions{})
if err != nil {
if !apierrors.IsNotFound(err) {
return errors.Wrapf(err, "error for bootstrap %q", d.deployment.Name)
}

for _, cfg := range d.configMaps {
// create ConfigMap first if exists
_, err = d.configMapClient.Create(ctx, cfg, metav1.CreateOptions{})
if err != nil {
if !apierrors.IsAlreadyExists(err) {
return errors.Wrapf(err, "error while calling configMapClient.Create for %q", cfg.Name)
}
_, err = d.configMapClient.Update(ctx, cfg, metav1.UpdateOptions{})
if err != nil {
return errors.Wrapf(err, "error while calling configMapClient.Update for %q", cfg.Name)
}
}
if d.deployment != nil {
if err := bootstrap(ctx, d, d.deploymentClient, d.deployment.Name, d.deployment); err != nil {
return err
}
}

_, err = d.deploymentClient.Create(ctx, d.deployment, metav1.CreateOptions{})
if err != nil {
return errors.Wrapf(err, "error while calling deploymentClient.Create for %q", d.deployment.Name)
if d.statefulSet != nil {
if err := bootstrap(ctx, d, d.statefulSetClient, d.statefulSet.Name, d.statefulSet); err != nil {
return err
}
}

return sub.Wrap(
fmt.Sprintf("waiting for %d pods to be ready, timeout: %s", d.minReplicas, units.HumanDuration(d.timeout)),
func() error {
Expand All @@ -98,11 +87,76 @@ func (d *Driver) Bootstrap(ctx context.Context, l progress.Logger) error {
})
}

type appClient[S any] interface {
Get(ctx context.Context, name string, opts metav1.GetOptions) (*S, error)
Create(ctx context.Context, spec *S, opts metav1.CreateOptions) (*S, error)
Delete(ctx context.Context, name string, opts metav1.DeleteOptions) error
}

func bootstrap[S any](ctx context.Context, d *Driver, client appClient[S], name string, spec *S) error {
if _, err := client.Get(ctx, name, metav1.GetOptions{}); err != nil {
if !apierrors.IsNotFound(err) {
return errors.Wrapf(err, "error for bootstrap %q", name)
}

for _, cfg := range d.configMaps {
// create ConfigMap first if exists
if _, err = d.configMapClient.Create(ctx, cfg, metav1.CreateOptions{}); err != nil {
if !apierrors.IsAlreadyExists(err) {
return errors.Wrapf(err, "error while calling configMapClient.Create for %q", cfg.Name)
}

if _, err = d.configMapClient.Update(ctx, cfg, metav1.UpdateOptions{}); err != nil {
return errors.Wrapf(err, "error while calling configMapClient.Update for %q", cfg.Name)
}
}
}

if _, err = client.Create(ctx, spec, metav1.CreateOptions{}); err != nil {
return errors.Wrapf(err, "error while calling Create for %q", d.deployment.Name)
}
}
return nil
}

func (d *Driver) wait(ctx context.Context) error {
if d.deployment != nil {
if err := d.waitDeployments(ctx); err != nil {
return err
}
}

if d.statefulSet != nil {
if err := d.waitStatefulSets(ctx); err != nil {
return err
}
}
return nil
}

func (d *Driver) waitDeployments(ctx context.Context) error {
return wait(ctx, d, d.deploymentClient, d.deployment.Name, func(s *appsv1.Deployment) error {
if s.Status.ReadyReplicas < int32(d.minReplicas) {
return errors.Errorf("expected %d replicas to be ready, got %d", d.minReplicas, s.Status.ReadyReplicas)
}
return nil
})
}

func (d *Driver) waitStatefulSets(ctx context.Context) error {
return wait(ctx, d, d.statefulSetClient, d.statefulSet.Name, func(s *appsv1.StatefulSet) error {
if s.Status.ReadyReplicas < int32(d.minReplicas) {
return errors.Errorf("expected %d replicas to be ready, got %d", d.minReplicas, s.Status.ReadyReplicas)
}
return nil
})
}

func wait[S any](ctx context.Context, d *Driver, client appClient[S], name string, check func(*S) error) error {
// TODO: use watch API
var (
err error
depl *appsv1.Deployment
spec *S
)

timeoutChan := time.After(d.timeout)
Expand All @@ -116,31 +170,50 @@ func (d *Driver) wait(ctx context.Context) error {
case <-timeoutChan:
return err
case <-ticker.C:
depl, err = d.deploymentClient.Get(ctx, d.deployment.Name, metav1.GetOptions{})
spec, err = client.Get(ctx, name, metav1.GetOptions{})
if err == nil {
if depl.Status.ReadyReplicas >= int32(d.minReplicas) {
if err = check(spec); err == nil {
return nil
}
err = errors.Errorf("expected %d replicas to be ready, got %d", d.minReplicas, depl.Status.ReadyReplicas)
}
}
}
}

func (d *Driver) Info(ctx context.Context) (*driver.Info, error) {
depl, err := d.deploymentClient.Get(ctx, d.deployment.Name, metav1.GetOptions{})
if err != nil {
// TODO: return err if err != ErrNotFound
return &driver.Info{
Status: driver.Inactive,
}, nil
func (d *Driver) Info(ctx context.Context) (_ *driver.Info, err error) {
var depl *appsv1.Deployment
if d.deployment != nil {
depl, err = d.deploymentClient.Get(ctx, d.deployment.Name, metav1.GetOptions{})
if err != nil {
// TODO: return err if err != ErrNotFound
return &driver.Info{
Status: driver.Inactive,
}, nil
}
if depl.Status.ReadyReplicas <= 0 {
return &driver.Info{
Status: driver.Stopped,
}, nil
}
}
if depl.Status.ReadyReplicas <= 0 {
return &driver.Info{
Status: driver.Stopped,
}, nil

var stat *appsv1.StatefulSet
if d.statefulSet != nil {
stat, err = d.statefulSetClient.Get(ctx, d.statefulSet.Name, metav1.GetOptions{})
if err != nil {
// TODO: return err if err != ErrNotFound
return &driver.Info{
Status: driver.Inactive,
}, nil
}
if depl.Status.ReadyReplicas <= 0 {
return &driver.Info{
Status: driver.Stopped,
}, nil
}
}
pods, err := podchooser.ListRunningPods(ctx, d.podClient, depl)

pods, err := podchooser.ListRunningPods(ctx, d.podClient, depl, stat)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -182,11 +255,22 @@ func (d *Driver) Rm(ctx context.Context, force, rmVolume, rmDaemon bool) error {
return nil
}

if err := d.deploymentClient.Delete(ctx, d.deployment.Name, metav1.DeleteOptions{}); err != nil {
if !apierrors.IsNotFound(err) {
return errors.Wrapf(err, "error while calling deploymentClient.Delete for %q", d.deployment.Name)
if d.deployment != nil {
if err := d.deploymentClient.Delete(ctx, d.deployment.Name, metav1.DeleteOptions{}); err != nil {
if !apierrors.IsNotFound(err) {
return errors.Wrapf(err, "error while calling deploymentClient.Delete for %q", d.deployment.Name)
}
}
}

if d.statefulSet != nil {
if err := d.statefulSetClient.Delete(ctx, d.statefulSet.Name, metav1.DeleteOptions{}); err != nil {
if !apierrors.IsNotFound(err) {
return errors.Wrapf(err, "error while calling statefulSetClient.Delete for %q", d.statefulSet.Name)
}
}
}

for _, cfg := range d.configMaps {
if err := d.configMapClient.Delete(ctx, cfg.Name, metav1.DeleteOptions{}); err != nil {
if !apierrors.IsNotFound(err) {
Expand Down
17 changes: 11 additions & 6 deletions driver/kubernetes/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func (f *factory) New(ctx context.Context, cfg driver.InitConfig) (driver.Driver
d.defaultLoad = defaultLoad
d.timeout = timeout

d.deployment, d.configMaps, err = manifest.NewDeployment(deploymentOpt)
d.deployment, d.statefulSet, d.configMaps, err = manifest.NewDeployment(deploymentOpt)
if err != nil {
return nil, err
}
Expand All @@ -143,20 +143,23 @@ func (f *factory) New(ctx context.Context, cfg driver.InitConfig) (driver.Driver
return nil, err
}
d.deploymentClient = clients.Deployments
d.statefulSetClient = clients.StatefulSets
d.podClient = clients.Pods
d.configMapClient = clients.ConfigMaps

switch loadbalance {
case LoadbalanceSticky:
d.podChooser = &podchooser.StickyPodChooser{
Key: cfg.ContextPathHash,
PodClient: d.podClient,
Deployment: d.deployment,
Key: cfg.ContextPathHash,
PodClient: d.podClient,
Deployment: d.deployment,
StatefulSet: d.statefulSet,
}
case LoadbalanceRandom:
d.podChooser = &podchooser.RandomPodChooser{
PodClient: d.podClient,
Deployment: d.deployment,
PodClient: d.podClient,
Deployment: d.deployment,
StatefulSet: d.statefulSet,
}
}
return d, nil
Expand Down Expand Up @@ -199,6 +202,8 @@ func (f *factory) processDriverOpts(deploymentName string, namespace string, cfg
deploymentOpt.RequestsMemory = v
case k == "requests.ephemeral-storage":
deploymentOpt.RequestsEphemeralStorage = v
case k == "persistent-volume-claim.requests.storage":
deploymentOpt.RequestsPersistentStorage = v
case k == "limits.cpu":
deploymentOpt.LimitsCPU = v
case k == "limits.memory":
Expand Down
62 changes: 56 additions & 6 deletions driver/kubernetes/kubeclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@ type DeploymentClient interface {
Delete(ctx context.Context, name string, opts metav1.DeleteOptions) error
}

type StatefulSetClient interface {
Get(ctx context.Context, name string, opts metav1.GetOptions) (*appsv1.StatefulSet, error)
Create(ctx context.Context, deployment *appsv1.StatefulSet, opts metav1.CreateOptions) (*appsv1.StatefulSet, error)
Delete(ctx context.Context, name string, opts metav1.DeleteOptions) error
}

type ConfigMapClient interface {
Create(ctx context.Context, configMap *corev1.ConfigMap, opts metav1.CreateOptions) (*corev1.ConfigMap, error)
Update(ctx context.Context, configMap *corev1.ConfigMap, opts metav1.UpdateOptions) (*corev1.ConfigMap, error)
Expand All @@ -29,9 +35,10 @@ type PodClient interface {
}

type Clients struct {
Deployments DeploymentClient
ConfigMaps ConfigMapClient
Pods PodClient
Deployments DeploymentClient
StatefulSets StatefulSetClient
ConfigMaps ConfigMapClient
Pods PodClient
}

func New(config *rest.Config, namespace string) (*Clients, error) {
Expand All @@ -51,9 +58,10 @@ func New(config *rest.Config, namespace string) (*Clients, error) {
}

return &Clients{
Deployments: &deploymentClient{client: appsClient, namespace: namespace},
ConfigMaps: &configMapClient{client: coreClient, namespace: namespace},
Pods: &podClient{client: coreClient, namespace: namespace},
Deployments: &deploymentClient{client: appsClient, namespace: namespace},
StatefulSets: &statefulSetClient{client: appsClient, namespace: namespace},
ConfigMaps: &configMapClient{client: coreClient, namespace: namespace},
Pods: &podClient{client: coreClient, namespace: namespace},
}, nil
}

Expand Down Expand Up @@ -110,6 +118,48 @@ func (c *deploymentClient) Delete(ctx context.Context, name string, opts metav1.
Error()
}

type statefulSetClient struct {
client rest.Interface
namespace string
}

func (c *statefulSetClient) Get(ctx context.Context, name string, opts metav1.GetOptions) (*appsv1.StatefulSet, error) {
result := &appsv1.StatefulSet{}
err := c.client.Get().
UseProtobufAsDefault().
Namespace(c.namespace).
Resource("statefulsets").
Name(name).
VersionedParams(&opts, ParameterCodec()).
Do(ctx).
Into(result)
return result, err
}

func (c *statefulSetClient) Create(ctx context.Context, statefulSet *appsv1.StatefulSet, opts metav1.CreateOptions) (*appsv1.StatefulSet, error) {
result := &appsv1.StatefulSet{}
err := c.client.Post().
UseProtobufAsDefault().
Namespace(c.namespace).
Resource("statefulsets").
VersionedParams(&opts, ParameterCodec()).
Body(statefulSet).
Do(ctx).
Into(result)
return result, err
}

func (c *statefulSetClient) Delete(ctx context.Context, name string, opts metav1.DeleteOptions) error {
return c.client.Delete().
UseProtobufAsDefault().
Namespace(c.namespace).
Resource("statefulsets").
Name(name).
Body(&opts).
Do(ctx).
Error()
}

type configMapClient struct {
client rest.Interface
namespace string
Expand Down
Loading
Loading