|
| 1 | +"""Generate Kubernetes kubeconfig via a PrivX API Proxy client |
| 2 | +credential. |
| 3 | +
|
| 4 | +This example shows one practical SDK flow for Kubernetes access via PrivX: |
| 5 | +1. List PrivX API targets or resolve a target by name. |
| 6 | +2. Find an existing current-user client credential for that target. |
| 7 | +3. Optionally create the credential if it does not exist yet. |
| 8 | +4. Request the client credential secret in ``kubeconfig`` format. |
| 9 | +
|
| 10 | +Install the SDK: |
| 11 | +- From a local checkout: |
| 12 | + ``pip install .`` |
| 13 | +- Directly from GitHub: |
| 14 | + ``pip install git+https://github.com/SSHcom/privx-sdk-for-python.git`` |
| 15 | +
|
| 16 | +What you need before running this script: |
| 17 | +- ``config.py`` next to this script must point to your PrivX instance and |
| 18 | + contain valid API client credentials. |
| 19 | +- In PrivX, the selected API target must represent the Kubernetes API you want |
| 20 | + to access through API Proxy. |
| 21 | +- The authenticated PrivX user must be allowed to use that target. |
| 22 | +
|
| 23 | +Typical usage: |
| 24 | +- List targets: |
| 25 | + ``python3 your_path_to_the_script/get_kubeconfig_via_api_proxy.py`` |
| 26 | + ``--list-targets`` |
| 27 | +- Print kubeconfig to stdout: |
| 28 | + ``python3 your_path_to_the_script/get_kubeconfig_via_api_proxy.py`` |
| 29 | + ``--api-target my-k8s-api`` |
| 30 | +- Create missing credential and save kubeconfig: |
| 31 | + ``python3 your_path_to_the_script/get_kubeconfig_via_api_proxy.py`` |
| 32 | + ``--api-target my-k8s-api --create-if-missing --output ./kubeconfig.yaml`` |
| 33 | +- Use with kubectl: |
| 34 | + ``KUBECONFIG=./kubeconfig.yaml kubectl get ns`` |
| 35 | +""" |
| 36 | + |
| 37 | +import argparse |
| 38 | +import os |
| 39 | +import sys |
| 40 | +from http import HTTPStatus |
| 41 | +from pathlib import Path |
| 42 | +from typing import Dict, List, Optional |
| 43 | + |
| 44 | +import config |
| 45 | + |
| 46 | +try: |
| 47 | + import privx_api |
| 48 | +except ImportError: |
| 49 | + # Allow running the script from repository checkout without package install. |
| 50 | + sys.path.append(str(Path(__file__).resolve().parent.parent)) |
| 51 | + import privx_api |
| 52 | + |
| 53 | + |
| 54 | +def build_api() -> privx_api.PrivXAPI: |
| 55 | + """Initialize and authenticate the SDK client.""" |
| 56 | + api = privx_api.PrivXAPI( |
| 57 | + config.HOSTNAME, |
| 58 | + config.HOSTPORT, |
| 59 | + config.CA_CERT, |
| 60 | + config.OAUTH_CLIENT_ID, |
| 61 | + config.OAUTH_CLIENT_SECRET, |
| 62 | + ) |
| 63 | + api.authenticate(config.API_CLIENT_ID, config.API_CLIENT_SECRET) |
| 64 | + return api |
| 65 | + |
| 66 | + |
| 67 | +def find_api_target_by_name(api: privx_api.PrivXAPI, target_name: str) -> Dict: |
| 68 | + """Find exactly one API target by human-readable name.""" |
| 69 | + response = api.get_api_targets(limit=200, filter_param="") |
| 70 | + if not response.ok: |
| 71 | + raise RuntimeError(f"Failed to list API targets: {response.data}") |
| 72 | + |
| 73 | + items = response.data.get("items", []) |
| 74 | + matches = [item for item in items if item.get("name") == target_name] |
| 75 | + if not matches: |
| 76 | + raise RuntimeError( |
| 77 | + f"API target '{target_name}' not found among accessible targets." |
| 78 | + ) |
| 79 | + if len(matches) > 1: |
| 80 | + ids = [item.get("id") for item in matches] |
| 81 | + raise RuntimeError( |
| 82 | + f"Multiple API targets named '{target_name}' found. IDs: {ids}" |
| 83 | + ) |
| 84 | + return matches[0] |
| 85 | + |
| 86 | + |
| 87 | +def list_accessible_targets(api: privx_api.PrivXAPI) -> List[Dict]: |
| 88 | + """Return accessible API targets for quick operator discovery.""" |
| 89 | + response = api.get_api_targets(limit=200, filter_param="") |
| 90 | + if not response.ok: |
| 91 | + raise RuntimeError(f"Failed to list API targets: {response.data}") |
| 92 | + return response.data.get("items", []) |
| 93 | + |
| 94 | + |
| 95 | +def find_current_user_credential( |
| 96 | + api: privx_api.PrivXAPI, |
| 97 | + target_id: str, |
| 98 | + credential_name: Optional[str] = None, |
| 99 | +) -> Optional[Dict]: |
| 100 | + """Find an existing current-user client credential for this target.""" |
| 101 | + response = api.get_current_user_client_credentials(limit=200) |
| 102 | + if not response.ok: |
| 103 | + raise RuntimeError(f"Failed to list client credentials: {response.data}") |
| 104 | + |
| 105 | + items: List[Dict] = response.data.get("items", []) |
| 106 | + for item in items: |
| 107 | + item_target = (item.get("target") or {}).get("id") |
| 108 | + if item_target != target_id: |
| 109 | + continue |
| 110 | + if credential_name and item.get("name") != credential_name: |
| 111 | + continue |
| 112 | + return item |
| 113 | + return None |
| 114 | + |
| 115 | + |
| 116 | +def create_current_user_credential( |
| 117 | + api: privx_api.PrivXAPI, |
| 118 | + target_id: str, |
| 119 | + credential_name: str, |
| 120 | +) -> Dict: |
| 121 | + """Create a current-user client credential bound to the selected target.""" |
| 122 | + payload = { |
| 123 | + "name": credential_name, |
| 124 | + "comment": "Generated by get_kubeconfig_via_api_proxy.py", |
| 125 | + "target": {"id": target_id}, |
| 126 | + } |
| 127 | + response = api.create_current_user_client_credential(payload) |
| 128 | + if not response.ok: |
| 129 | + raise RuntimeError(f"Failed to create client credential: {response.data}") |
| 130 | + |
| 131 | + credential_id = response.data.get("id") |
| 132 | + if not credential_id: |
| 133 | + raise RuntimeError( |
| 134 | + "Credential was created but response did not include credential ID." |
| 135 | + ) |
| 136 | + return {"id": credential_id, "name": credential_name, "target": {"id": target_id}} |
| 137 | + |
| 138 | + |
| 139 | +def get_kubeconfig_secret(api: privx_api.PrivXAPI, credential_id: str) -> Dict: |
| 140 | + """Fetch secret material in kubeconfig format.""" |
| 141 | + response = api.get_current_user_client_credential_secret( |
| 142 | + credential_id, conf_format="kubeconfig" |
| 143 | + ) |
| 144 | + data = response.data |
| 145 | + if response.status != HTTPStatus.OK: |
| 146 | + raise RuntimeError(f"Failed to fetch kubeconfig secret: {data}") |
| 147 | + return data |
| 148 | + |
| 149 | + |
| 150 | +def save_or_print_kubeconfig(kubeconfig_yaml: str, output_path: Optional[str]) -> None: |
| 151 | + """Write kubeconfig to a file or print to stdout.""" |
| 152 | + if output_path: |
| 153 | + abs_path = os.path.abspath(output_path) |
| 154 | + os.makedirs(os.path.dirname(abs_path) or ".", exist_ok=True) |
| 155 | + with open(abs_path, "w", encoding="utf-8") as handle: |
| 156 | + handle.write(kubeconfig_yaml) |
| 157 | + try: |
| 158 | + os.chmod(abs_path, 0o600) |
| 159 | + except OSError: |
| 160 | + pass |
| 161 | + print(f"kubeconfig written to: {abs_path}") |
| 162 | + return |
| 163 | + |
| 164 | + print(kubeconfig_yaml) |
| 165 | + |
| 166 | + |
| 167 | +def parse_args() -> argparse.Namespace: |
| 168 | + parser = argparse.ArgumentParser( |
| 169 | + description=( |
| 170 | + "Fetch Kubernetes kubeconfig through PrivX API Proxy using a current " |
| 171 | + "user client credential." |
| 172 | + ), |
| 173 | + epilog=( |
| 174 | + "SDK installation:\n" |
| 175 | + " pip install .\n" |
| 176 | + " pip install " |
| 177 | + "git+https://github.com/SSHcom/privx-sdk-for-python.git\n\n" |
| 178 | + "Examples:\n" |
| 179 | + " python3 your_path_to_the_script/" |
| 180 | + "get_kubeconfig_via_api_proxy.py --list-targets\n" |
| 181 | + " python3 your_path_to_the_script/" |
| 182 | + "get_kubeconfig_via_api_proxy.py --api-target my-k8s-api\n" |
| 183 | + " python3 your_path_to_the_script/" |
| 184 | + "get_kubeconfig_via_api_proxy.py --api-target my-k8s-api " |
| 185 | + "--create-if-missing " |
| 186 | + "--output ./kubeconfig.yaml\n" |
| 187 | + " KUBECONFIG=./kubeconfig.yaml kubectl get namespaces" |
| 188 | + ), |
| 189 | + formatter_class=argparse.RawDescriptionHelpFormatter, |
| 190 | + ) |
| 191 | + parser.add_argument( |
| 192 | + "--api-target", |
| 193 | + help="PrivX API target name that represents the Kubernetes API endpoint.", |
| 194 | + ) |
| 195 | + parser.add_argument( |
| 196 | + "--list-targets", |
| 197 | + action="store_true", |
| 198 | + help="List accessible API targets and exit.", |
| 199 | + ) |
| 200 | + parser.add_argument( |
| 201 | + "--credential-name", |
| 202 | + default="k8s-api-proxy-credential", |
| 203 | + help="Credential name to search/create under the current PrivX user.", |
| 204 | + ) |
| 205 | + parser.add_argument( |
| 206 | + "--create-if-missing", |
| 207 | + action="store_true", |
| 208 | + help="Create the client credential if one does not exist.", |
| 209 | + ) |
| 210 | + parser.add_argument( |
| 211 | + "--output", |
| 212 | + help="Write kubeconfig YAML to this file path. If omitted, prints to stdout.", |
| 213 | + ) |
| 214 | + return parser.parse_args() |
| 215 | + |
| 216 | + |
| 217 | +def main() -> int: |
| 218 | + args = parse_args() |
| 219 | + api = build_api() |
| 220 | + |
| 221 | + if args.list_targets: |
| 222 | + targets = list_accessible_targets(api) |
| 223 | + if not targets: |
| 224 | + print("No accessible API targets found.") |
| 225 | + return 0 |
| 226 | + print("Accessible API targets:") |
| 227 | + for target in targets: |
| 228 | + print(f"- {target.get('name')} ({target.get('id')})") |
| 229 | + return 0 |
| 230 | + |
| 231 | + if not args.api_target: |
| 232 | + raise RuntimeError("--api-target is required unless --list-targets is used.") |
| 233 | + |
| 234 | + target = find_api_target_by_name(api, args.api_target) |
| 235 | + print(f"Using API target: {target.get('name')} ({target.get('id')})") |
| 236 | + |
| 237 | + credential = find_current_user_credential( |
| 238 | + api, |
| 239 | + target_id=target.get("id", ""), |
| 240 | + credential_name=args.credential_name, |
| 241 | + ) |
| 242 | + if credential is None: |
| 243 | + if not args.create_if_missing: |
| 244 | + print( |
| 245 | + "No matching client credential found. Re-run with " |
| 246 | + "--create-if-missing to create one." |
| 247 | + ) |
| 248 | + return 2 |
| 249 | + credential = create_current_user_credential( |
| 250 | + api, |
| 251 | + target_id=target.get("id", ""), |
| 252 | + credential_name=args.credential_name, |
| 253 | + ) |
| 254 | + print( |
| 255 | + f"Created client credential: {credential.get('name')} " |
| 256 | + f"({credential.get('id')})" |
| 257 | + ) |
| 258 | + else: |
| 259 | + print( |
| 260 | + f"Using existing client credential: {credential.get('name')} " |
| 261 | + f"({credential.get('id')})" |
| 262 | + ) |
| 263 | + |
| 264 | + kube_data = get_kubeconfig_secret(api, credential.get("id", "")) |
| 265 | + kubeconfig = ((kube_data.get("kubeconfig") or {}).get("data") or "").strip() |
| 266 | + if not kubeconfig: |
| 267 | + raise RuntimeError( |
| 268 | + "PrivX response did not contain kubeconfig.data. " |
| 269 | + f"Full response: {kube_data}" |
| 270 | + ) |
| 271 | + |
| 272 | + save_or_print_kubeconfig(kubeconfig, args.output) |
| 273 | + |
| 274 | + kubectl_commands = (kube_data.get("kubectl_commands") or {}).get("data") or [] |
| 275 | + if kubectl_commands: |
| 276 | + print("\nSuggested kubectl commands:") |
| 277 | + for command in kubectl_commands: |
| 278 | + print(f"- {command}") |
| 279 | + |
| 280 | + return 0 |
| 281 | + |
| 282 | + |
| 283 | +if __name__ == "__main__": |
| 284 | + try: |
| 285 | + raise SystemExit(main()) |
| 286 | + except Exception as exc: # pragma: no cover - dev script error path |
| 287 | + print(f"ERROR: {exc}", file=sys.stderr) |
| 288 | + raise SystemExit(1) |
0 commit comments