Skip to content

Commit 17b2af0

Browse files
authored
chore: reformat scaler code to match KEDA go style (#1295)
Signed-off-by: Zbynek Roubalik <zroubalik@gmail.com>
1 parent d2bed33 commit 17b2af0

File tree

3 files changed

+15
-90
lines changed

3 files changed

+15
-90
lines changed

scaler/handlers.go

Lines changed: 6 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -45,12 +45,7 @@ type impl struct {
4545
externalscaler.UnimplementedExternalScalerServer
4646
}
4747

48-
func newImpl(
49-
lggr logr.Logger,
50-
pinger *queuePinger,
51-
httpsoInformer informershttpv1alpha1.HTTPScaledObjectInformer,
52-
defaultTargetMetric int64,
53-
) *impl {
48+
func newImpl(lggr logr.Logger, pinger *queuePinger, httpsoInformer informershttpv1alpha1.HTTPScaledObjectInformer, defaultTargetMetric int64) *impl {
5449
return &impl{
5550
lggr: lggr,
5651
pinger: pinger,
@@ -63,10 +58,7 @@ func (e *impl) Ping(context.Context, *emptypb.Empty) (*emptypb.Empty, error) {
6358
return &emptypb.Empty{}, nil
6459
}
6560

66-
func (e *impl) IsActive(
67-
ctx context.Context,
68-
sor *externalscaler.ScaledObjectRef,
69-
) (*externalscaler.IsActiveResponse, error) {
61+
func (e *impl) IsActive(ctx context.Context, sor *externalscaler.ScaledObjectRef) (*externalscaler.IsActiveResponse, error) {
7062
lggr := e.lggr.WithName("IsActive")
7163

7264
gmr, err := e.GetMetrics(ctx, &externalscaler.GetMetricsRequest{
@@ -91,10 +83,7 @@ func (e *impl) IsActive(
9183
return res, nil
9284
}
9385

94-
func (e *impl) StreamIsActive(
95-
scaledObject *externalscaler.ScaledObjectRef,
96-
server externalscaler.ExternalScaler_StreamIsActiveServer,
97-
) error {
86+
func (e *impl) StreamIsActive(scaledObject *externalscaler.ScaledObjectRef, server externalscaler.ExternalScaler_StreamIsActiveServer) error {
9887
// this function communicates with KEDA via the 'server' parameter.
9988
// we call server.Send (below) every streamInterval, which tells it to immediately
10089
// ping our IsActive RPC
@@ -127,10 +116,7 @@ func (e *impl) StreamIsActive(
127116
}
128117
}
129118

130-
func (e *impl) GetMetricSpec(
131-
_ context.Context,
132-
sor *externalscaler.ScaledObjectRef,
133-
) (*externalscaler.GetMetricSpecResponse, error) {
119+
func (e *impl) GetMetricSpec(_ context.Context, sor *externalscaler.ScaledObjectRef) (*externalscaler.GetMetricSpecResponse, error) {
134120
lggr := e.lggr.WithName("GetMetricSpec")
135121

136122
namespacedName := k8s.NamespacedNameFromScaledObjectRef(sor)
@@ -189,10 +175,7 @@ func (e *impl) interceptorMetricSpec(metricName string, interceptorTargetPending
189175
return res, nil
190176
}
191177

192-
func (e *impl) GetMetrics(
193-
_ context.Context,
194-
metricRequest *externalscaler.GetMetricsRequest,
195-
) (*externalscaler.GetMetricsResponse, error) {
178+
func (e *impl) GetMetrics(_ context.Context, metricRequest *externalscaler.GetMetricsRequest) (*externalscaler.GetMetricsResponse, error) {
196179
lggr := e.lggr.WithName("GetMetrics")
197180
sor := metricRequest.ScaledObjectRef
198181

@@ -222,8 +205,7 @@ func (e *impl) GetMetrics(
222205
count := e.pinger.counts()[key]
223206

224207
var metricValue int
225-
if httpso.Spec.ScalingMetric != nil &&
226-
httpso.Spec.ScalingMetric.Rate != nil {
208+
if httpso.Spec.ScalingMetric != nil && httpso.Spec.ScalingMetric.Rate != nil {
227209
metricValue = int(math.Ceil(count.RPS))
228210
lggr.V(1).Info(fmt.Sprintf("%d rps for %s", metricValue, httpso.GetName()))
229211
} else {

scaler/main.go

Lines changed: 4 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -64,14 +64,7 @@ func main() {
6464
setupLog.Error(err, "creating new Kubernetes ClientSet")
6565
os.Exit(1)
6666
}
67-
pinger := newQueuePinger(
68-
ctrl.Log,
69-
k8s.EndpointsFuncForK8sClientset(k8sCl),
70-
namespace,
71-
svcName,
72-
deplName,
73-
targetPortStr,
74-
)
67+
pinger := newQueuePinger(ctrl.Log, k8s.EndpointsFuncForK8sClientset(k8sCl), namespace, svcName, deplName, targetPortStr)
7568

7669
// create the endpoints informer
7770
endpInformer := k8s.NewInformerBackedEndpointsCache(
@@ -141,15 +134,7 @@ func main() {
141134
setupLog.Info("Bye!")
142135
}
143136

144-
func startGrpcServer(
145-
ctx context.Context,
146-
cfg *config,
147-
lggr logr.Logger,
148-
port int,
149-
pinger *queuePinger,
150-
httpsoInformer informershttpv1alpha1.HTTPScaledObjectInformer,
151-
targetPendingRequests int64,
152-
) error {
137+
func startGrpcServer(ctx context.Context, cfg *config, lggr logr.Logger, port int, pinger *queuePinger, httpsoInformer informershttpv1alpha1.HTTPScaledObjectInformer, targetPendingRequests int64) error {
153138
addr := fmt.Sprintf("0.0.0.0:%d", port)
154139
lggr.Info("starting grpc server", "address", addr)
155140

@@ -188,20 +173,9 @@ func startGrpcServer(
188173
}
189174
}()
190175

191-
grpc_health_v1.RegisterHealthServer(
192-
grpcServer,
193-
hs,
194-
)
176+
grpc_health_v1.RegisterHealthServer(grpcServer, hs)
195177

196-
externalscaler.RegisterExternalScalerServer(
197-
grpcServer,
198-
newImpl(
199-
lggr,
200-
pinger,
201-
httpsoInformer,
202-
targetPendingRequests,
203-
),
204-
)
178+
externalscaler.RegisterExternalScalerServer(grpcServer, newImpl(lggr, pinger, httpsoInformer, targetPendingRequests))
205179

206180
go func() {
207181
<-ctx.Done()

scaler/queue_pinger.go

Lines changed: 5 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -54,14 +54,7 @@ type queuePinger struct {
5454
status PingerStatus
5555
}
5656

57-
func newQueuePinger(
58-
lggr logr.Logger,
59-
getEndpointsFn k8s.GetEndpointsFunc,
60-
ns,
61-
svcName,
62-
deplName,
63-
adminPort string,
64-
) *queuePinger {
57+
func newQueuePinger(lggr logr.Logger, getEndpointsFn k8s.GetEndpointsFunc, ns, svcName, deplName, adminPort string) *queuePinger {
6558
pingMut := new(sync.RWMutex)
6659
pinger := &queuePinger{
6760
getEndpointsFn: getEndpointsFn,
@@ -77,11 +70,7 @@ func newQueuePinger(
7770
}
7871

7972
// start starts the queuePinger
80-
func (q *queuePinger) start(
81-
ctx context.Context,
82-
ticker *time.Ticker,
83-
endpCache k8s.EndpointsCache,
84-
) error {
73+
func (q *queuePinger) start(ctx context.Context, ticker *time.Ticker, endpCache k8s.EndpointsCache) error {
8574
endpoWatchIface, err := endpCache.Watch(q.interceptorNS, q.interceptorServiceName)
8675
if err != nil {
8776
return err
@@ -132,14 +121,7 @@ func (q *queuePinger) counts() map[string]queue.Count {
132121
func (q *queuePinger) fetchAndSaveCounts(ctx context.Context) error {
133122
q.pingMut.Lock()
134123
defer q.pingMut.Unlock()
135-
counts, err := fetchCounts(
136-
ctx,
137-
q.lggr,
138-
q.getEndpointsFn,
139-
q.interceptorNS,
140-
q.interceptorSvcName,
141-
q.adminPort,
142-
)
124+
counts, err := fetchCounts(ctx, q.lggr, q.getEndpointsFn, q.interceptorNS, q.interceptorSvcName, q.adminPort)
143125
if err != nil {
144126
q.lggr.Error(err, "getting request counts")
145127
q.status = PingerERROR
@@ -161,23 +143,10 @@ func (q *queuePinger) fetchAndSaveCounts(ctx context.Context) error {
161143
//
162144
// Upon any failure, a non-nil error is returned and the
163145
// other two return values are nil and 0, respectively.
164-
func fetchCounts(
165-
ctx context.Context,
166-
lggr logr.Logger,
167-
endpointsFn k8s.GetEndpointsFunc,
168-
ns,
169-
svcName,
170-
adminPort string,
171-
) (map[string]queue.Count, error) {
146+
func fetchCounts(ctx context.Context, lggr logr.Logger, endpointsFn k8s.GetEndpointsFunc, ns, svcName, adminPort string) (map[string]queue.Count, error) {
172147
lggr = lggr.WithName("queuePinger.requestCounts")
173148

174-
endpointURLs, err := k8s.EndpointsForService(
175-
ctx,
176-
ns,
177-
svcName,
178-
adminPort,
179-
endpointsFn,
180-
)
149+
endpointURLs, err := k8s.EndpointsForService(ctx, ns, svcName, adminPort, endpointsFn)
181150
if err != nil {
182151
return nil, err
183152
}

0 commit comments

Comments
 (0)