Skip to content

Commit ff80ad7

Browse files
authored
Merge pull request #207 from Argonus/support-cached-endpoints
Add resource version parameter for kubernetes strategy
2 parents da2a8bb + 81e1710 commit ff80ad7

File tree

5 files changed

+175
-3
lines changed

5 files changed

+175
-3
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,10 @@
22

33
## Unreleased
44

5+
- Add `kubernetes_use_cached_resources` option to Kubernetes strategy
6+
7+
## 3.4.1
8+
59
- Use new cypher names
610
- Allow Epmd strategy to reconnect after connection failures
711
- Detect Self Signed Certificate Authority for Kubernetes Strategy

lib/strategy/kubernetes.ex

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ defmodule Cluster.Strategy.Kubernetes do
2525
- `:kubernetes_selector`
2626
- `:kubernetes_service_name`
2727
- `:kubernetes_ip_lookup_mode`
28+
- `:kubernetes_use_cached_resources`
2829
- `:mode`
2930
3031
## Getting `<basename>`
@@ -70,6 +71,11 @@ defmodule Cluster.Strategy.Kubernetes do
7071
7172
Then, this strategy will fetch the IP of all pods with that label and attempt to connect.
7273
74+
### `kubernetes_use_cached_resources` option
75+
76+
When setting this value, this strategy will use cached resource version value to fetch k8s resources.
77+
In k8s resources are incremented by 1 on every change, this version will set requested resourceVersion
78+
to 0, that will use cached versions of resources, take in mind that this may be outdated or unavailable.
7379
7480
### `:mode` option
7581
@@ -362,6 +368,9 @@ defmodule Cluster.Strategy.Kubernetes do
362368
selector = Keyword.fetch!(config, :kubernetes_selector)
363369
ip_lookup_mode = Keyword.get(config, :kubernetes_ip_lookup_mode, :endpoints)
364370

371+
use_cache = Keyword.get(config, :kubernetes_use_cached_resources, false)
372+
resource_version = if use_cache, do: 0, else: nil
373+
365374
master_name = Keyword.get(config, :kubernetes_master, @kubernetes_master)
366375
cluster_domain = System.get_env("CLUSTER_DOMAIN", "#{cluster_name}.local")
367376

@@ -380,12 +389,19 @@ defmodule Cluster.Strategy.Kubernetes do
380389

381390
cond do
382391
app_name != nil and selector != nil ->
383-
selector = URI.encode(selector)
392+
query_params =
393+
[]
394+
|> apply_param(:labelSelector, selector)
395+
|> apply_param(:resourceVersion, resource_version)
396+
|> URI.encode_query(:rfc3986)
384397

385398
path =
386399
case ip_lookup_mode do
387-
:endpoints -> "api/v1/namespaces/#{namespace}/endpoints?labelSelector=#{selector}"
388-
:pods -> "api/v1/namespaces/#{namespace}/pods?labelSelector=#{selector}"
400+
:endpoints ->
401+
"api/v1/namespaces/#{namespace}/endpoints?#{query_params}"
402+
403+
:pods ->
404+
"api/v1/namespaces/#{namespace}/pods?#{query_params}"
389405
end
390406

391407
headers = [{~c"authorization", ~c"Bearer #{token}"}]
@@ -440,6 +456,12 @@ defmodule Cluster.Strategy.Kubernetes do
440456
end
441457
end
442458

459+
defp apply_param(params, key, value) when value != nil do
460+
[{key, value} | params]
461+
end
462+
463+
defp apply_param(params, _key, _value), do: params
464+
443465
defp parse_response(:endpoints, resp) do
444466
case resp do
445467
%{"items" => items} when is_list(items) ->

test/fixtures/vcr_cassettes/kubernetes.json

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,38 @@
3131
"type": "ok"
3232
}
3333
},
34+
{
35+
"request": {
36+
"body": "",
37+
"headers": {
38+
"authorization": "***"
39+
},
40+
"method": "get",
41+
"options": {
42+
"httpc_options": [],
43+
"http_options": {
44+
"ssl": "[verify: :verify_none]"
45+
}
46+
},
47+
"request_body": "",
48+
"url": "https://cluster.localhost./api/v1/namespaces/__libcluster_test/endpoints?labelSelector=app=test_selector&resourceVersion=0"
49+
},
50+
"response": {
51+
"binary": false,
52+
"body": "{\"kind\":\"EndpointsList\",\"apiVersion\":\"v1\",\"metadata\":{\"selfLink\":\"SELFLINK_PLACEHOLDER\",\"resourceVersion\":\"17042410\"},\"items\":[{\"metadata\":{\"name\":\"development-development\",\"namespace\":\"airatel-service-localization\",\"selfLink\":\"SELFLINK_PLACEHOLDER\",\"uid\":\"7e3faf1e-0294-11e8-bcad-42010a9c01cc\",\"resourceVersion\":\"17037787\",\"creationTimestamp\":\"2018-01-26T12:29:03Z\",\"labels\":{\"app\":\"development\",\"chart\":\"CHART_PLACEHOLDER\"}},\"subsets\":[{\"addresses\":[{\"hostname\":\"my-hostname-0\",\"ip\":\"10.48.33.136\",\"nodeName\":\"gke-jshmrtn-cluster-default-pool-a61da41f-db9x\",\"targetRef\":{\"kind\":\"Pod\",\"namespace\":\"airatel-service-localization\",\"name\":\"development-4292695165-mgq9f\",\"uid\":\"eb0f3e80-0295-11e8-bcad-42010a9c01cc\",\"resourceVersion\":\"17037783\"}}],\"ports\":[{\"name\":\"web\",\"port\":8443,\"protocol\":\"TCP\"}]}]}]}\n",
53+
"headers": {
54+
"date": "Fri, 26 Jan 2018 13:18:46 GMT",
55+
"content-length": "877",
56+
"content-type": "application/json"
57+
},
58+
"status_code": [
59+
"HTTP/1.1",
60+
200,
61+
"OK"
62+
],
63+
"type": "ok"
64+
}
65+
},
3466
{
3567
"request": {
3668
"body": "",

test/fixtures/vcr_cassettes/kubernetes_pods.json

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,5 +30,37 @@
3030
],
3131
"type": "ok"
3232
}
33+
},
34+
{
35+
"request": {
36+
"body": "",
37+
"headers": {
38+
"authorization": "***"
39+
},
40+
"method": "get",
41+
"options": {
42+
"httpc_options": [],
43+
"http_options": {
44+
"ssl": "[verify: :verify_none]"
45+
}
46+
},
47+
"request_body": "",
48+
"url": "https://cluster.localhost./api/v1/namespaces/__libcluster_test/pods?labelSelector=app=test_selector&resourceVersion=0"
49+
},
50+
"response": {
51+
"binary": false,
52+
"body": "{\"kind\":\"PodList\",\"apiVersion\":\"v1\",\"metadata\":{\"selfLink\":\"SELFLINK_PLACEHOLDER\",\"resourceVersion\":\"17042410\"},\"items\":[{\"metadata\":{\"name\":\"development-development\",\"namespace\":\"airatel-service-localization\",\"selfLink\":\"SELFLINK_PLACEHOLDER\",\"uid\":\"7e3faf1e-0294-11e8-bcad-42010a9c01cc\",\"resourceVersion\":\"17037787\",\"creationTimestamp\":\"2018-01-26T12:29:03Z\",\"labels\":{\"app\":\"development\",\"chart\":\"CHART_PLACEHOLDER\"}},\"spec\": { \"hostname\": \"my-hostname-0\" },\"status\":{\"podIP\": \"10.48.33.136\"}}]}\n",
53+
"headers": {
54+
"date": "Fri, 26 Jan 2018 13:18:46 GMT",
55+
"content-length": "877",
56+
"content-type": "application/json"
57+
},
58+
"status_code": [
59+
"HTTP/1.1",
60+
200,
61+
"OK"
62+
],
63+
"type": "ok"
64+
}
3365
}
3466
]

test/kubernetes_test.exs

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,60 @@ defmodule Cluster.Strategy.KubernetesTest do
8181
end
8282
end
8383

84+
test "works with cached resources" do
85+
use_cassette "kubernetes", custom: true do
86+
capture_log(fn ->
87+
start_supervised!({Kubernetes,
88+
[
89+
%Cluster.Strategy.State{
90+
topology: :name,
91+
config: [
92+
kubernetes_node_basename: "test_basename",
93+
kubernetes_selector: "app=test_selector",
94+
kubernetes_use_cached_resources: true,
95+
# If you want to run the test freshly, you'll need to create a DNS Entry
96+
kubernetes_master: "cluster.localhost.",
97+
kubernetes_service_account_path:
98+
Path.join([__DIR__, "fixtures", "kubernetes", "service_account"])
99+
],
100+
connect: {Nodes, :connect, [self()]},
101+
disconnect: {Nodes, :disconnect, [self()]},
102+
list_nodes: {Nodes, :list_nodes, [[]]}
103+
}
104+
]})
105+
106+
assert_receive {:connect, _}, 5_000
107+
end)
108+
end
109+
end
110+
111+
test "works with no cached resources" do
112+
use_cassette "kubernetes", custom: true do
113+
capture_log(fn ->
114+
start_supervised!({Kubernetes,
115+
[
116+
%Cluster.Strategy.State{
117+
topology: :name,
118+
config: [
119+
kubernetes_node_basename: "test_basename",
120+
kubernetes_selector: "app=test_selector",
121+
kubernetes_use_cached_resources: false,
122+
# If you want to run the test freshly, you'll need to create a DNS Entry
123+
kubernetes_master: "cluster.localhost.",
124+
kubernetes_service_account_path:
125+
Path.join([__DIR__, "fixtures", "kubernetes", "service_account"])
126+
],
127+
connect: {Nodes, :connect, [self()]},
128+
disconnect: {Nodes, :disconnect, [self()]},
129+
list_nodes: {Nodes, :list_nodes, [[]]}
130+
}
131+
]})
132+
133+
assert_receive {:connect, _}, 5_000
134+
end)
135+
end
136+
end
137+
84138
test "works with dns and cluster_name" do
85139
use_cassette "kubernetes", custom: true do
86140
capture_log(fn ->
@@ -201,6 +255,34 @@ defmodule Cluster.Strategy.KubernetesTest do
201255
end
202256
end
203257

258+
test "works with pods and cached resources" do
259+
use_cassette "kubernetes_pods", custom: true do
260+
capture_log(fn ->
261+
start_supervised!({Kubernetes,
262+
[
263+
%Cluster.Strategy.State{
264+
topology: :name,
265+
config: [
266+
kubernetes_node_basename: "test_basename",
267+
kubernetes_selector: "app=test_selector",
268+
# If you want to run the test freshly, you'll need to create a DNS Entry
269+
kubernetes_master: "cluster.localhost.",
270+
kubernetes_ip_lookup_mode: :pods,
271+
kubernetes_use_cached_resources: true,
272+
kubernetes_service_account_path:
273+
Path.join([__DIR__, "fixtures", "kubernetes", "service_account"])
274+
],
275+
connect: {Nodes, :connect, [self()]},
276+
disconnect: {Nodes, :disconnect, [self()]},
277+
list_nodes: {Nodes, :list_nodes, [[]]}
278+
}
279+
]})
280+
281+
assert_receive {:connect, :"test_basename@10.48.33.136"}, 5_000
282+
end)
283+
end
284+
end
285+
204286
test "works with pods and dns" do
205287
use_cassette "kubernetes_pods", custom: true do
206288
capture_log(fn ->

0 commit comments

Comments
 (0)