|
1 | 1 | import re |
| 2 | +from abc import abstractmethod |
2 | 3 | from typing import Optional |
3 | 4 |
|
4 | 5 | import structlog |
|
14 | 15 | ) |
15 | 16 | from codegate.pipeline.output import OutputPipelineContext, OutputPipelineStep |
16 | 17 | from codegate.pipeline.secrets.manager import SecretsManager |
17 | | -from codegate.pipeline.secrets.signatures import CodegateSignatures |
| 18 | +from codegate.pipeline.secrets.signatures import CodegateSignatures, Match |
18 | 19 | from codegate.pipeline.systemmsg import add_or_update_system_message |
19 | 20 |
|
20 | 21 | logger = structlog.get_logger("codegate") |
21 | 22 |
|
22 | 23 |
|
23 | | -class CodegateSecrets(PipelineStep): |
24 | | - """Pipeline step that handles secret information requests.""" |
| 24 | +class SecretsModifier: |
| 25 | + """ |
| 26 | + A class that helps obfuscate text by piping it through the secrets manager |
| 27 | + that finds the secrets and then calling hide_secret to modify them. |
| 28 | +
|
| 29 | + What modifications are done is up to the user who subclasses SecretsModifier |
| 30 | + """ |
25 | 31 |
|
26 | 32 | def __init__(self): |
27 | 33 | """Initialize the CodegateSecrets pipeline step.""" |
28 | 34 | super().__init__() |
29 | 35 | # Initialize and load signatures immediately |
30 | 36 | CodegateSignatures.initialize("signatures.yaml") |
31 | 37 |
|
32 | | - @property |
33 | | - def name(self) -> str: |
| 38 | + @abstractmethod |
| 39 | + def _hide_secret(self, match: Match) -> str: |
34 | 40 | """ |
35 | | - Returns the name of this pipeline step. |
| 41 | + User-defined callable to hide a secret match to either obfuscate |
| 42 | + it or reversibly encrypt |
| 43 | + """ |
| 44 | + pass |
36 | 45 |
|
37 | | - Returns: |
38 | | - str: The identifier 'codegate-secrets'. |
| 46 | + @abstractmethod |
| 47 | + def _notify_secret(self, secret): |
39 | 48 | """ |
40 | | - return "codegate-secrets" |
| 49 | + Notify about a found secret |
| 50 | + TODO: We should probably not notify about a secret value but rather |
| 51 | + an obfuscated string. It might be nice to report the context as well |
| 52 | + (e.g. the file or a couple of lines before and after) |
| 53 | + """ |
| 54 | + pass |
41 | 55 |
|
42 | 56 | def _get_absolute_position(self, line_number: int, line_offset: int, text: str) -> int: |
43 | 57 | """ |
@@ -78,21 +92,7 @@ def _extend_match_boundaries(self, text: str, start: int, end: int) -> tuple[int |
78 | 92 |
|
79 | 93 | return start, end |
80 | 94 |
|
81 | | - def _redact_text( |
82 | | - self, text: str, secrets_manager: SecretsManager, session_id: str, context: PipelineContext |
83 | | - ) -> tuple[str, int]: |
84 | | - """ |
85 | | - Find and encrypt secrets in the given text. |
86 | | -
|
87 | | - Args: |
88 | | - text: The text to protect |
89 | | - secrets_manager: .. |
90 | | - session_id: .. |
91 | | - context: The pipeline context to be able to log alerts |
92 | | - Returns: |
93 | | - Tuple containing protected text with encrypted values and the count of redacted secrets |
94 | | - """ |
95 | | - # Find secrets in the text |
| 95 | + def obfuscate(self, text: str) -> tuple[str, int]: |
96 | 96 | matches = CodegateSignatures.find_in_string(text) |
97 | 97 | if not matches: |
98 | 98 | return text, 0 |
@@ -123,48 +123,116 @@ def _redact_text( |
123 | 123 |
|
124 | 124 | # Replace each match with its encrypted value |
125 | 125 | for start, end, match in absolute_matches: |
126 | | - # Encrypt and store the value |
127 | | - encrypted_value = secrets_manager.store_secret( |
128 | | - match.value, |
129 | | - match.service, |
130 | | - match.type, |
131 | | - session_id, |
132 | | - ) |
133 | | - |
134 | | - # Create the replacement string |
135 | | - replacement = f"REDACTED<${encrypted_value}>" |
136 | | - # Store the protected text in DB. |
137 | | - context.add_alert( |
138 | | - self.name, trigger_string=replacement, severity_category=AlertSeverity.CRITICAL |
139 | | - ) |
| 126 | + hidden_secret = self._hide_secret(match) |
| 127 | + self._notify_secret(hidden_secret) |
140 | 128 |
|
141 | 129 | # Replace the secret in the text |
142 | | - protected_text[start:end] = replacement |
| 130 | + protected_text[start:end] = hidden_secret |
143 | 131 | # Store for logging |
144 | 132 | found_secrets.append( |
145 | 133 | { |
146 | 134 | "service": match.service, |
147 | 135 | "type": match.type, |
148 | 136 | "original": match.value, |
149 | | - "encrypted": encrypted_value, |
| 137 | + "encrypted": hidden_secret, |
150 | 138 | } |
151 | 139 | ) |
152 | 140 |
|
153 | | - # Convert back to string |
154 | | - protected_string = "".join(protected_text) |
155 | | - |
156 | 141 | # Log the findings |
157 | 142 | logger.info("\nFound secrets:") |
158 | | - |
159 | 143 | for secret in found_secrets: |
160 | 144 | logger.info(f"\nService: {secret['service']}") |
161 | 145 | logger.info(f"Type: {secret['type']}") |
162 | 146 | logger.info(f"Original: {secret['original']}") |
163 | | - logger.info(f"Encrypted: REDACTED<${secret['encrypted']}>") |
| 147 | + logger.info(f"Encrypted: {secret['encrypted']}") |
164 | 148 |
|
| 149 | + # Convert back to string |
| 150 | + protected_string = "".join(protected_text) |
165 | 151 | print(f"\nProtected text:\n{protected_string}") |
166 | 152 | return protected_string, len(found_secrets) |
167 | 153 |
|
| 154 | + |
| 155 | +class SecretsEncryptor(SecretsModifier): |
| 156 | + def __init__( |
| 157 | + self, |
| 158 | + secrets_manager: SecretsManager, |
| 159 | + context: PipelineContext, |
| 160 | + session_id: str, |
| 161 | + ): |
| 162 | + self._secrets_manager = secrets_manager |
| 163 | + self._session_id = session_id |
| 164 | + self._context = context |
| 165 | + self._name = "codegate-secrets" |
| 166 | + super().__init__() |
| 167 | + |
| 168 | + def _hide_secret(self, match: Match) -> str: |
| 169 | + # Encrypt and store the value |
| 170 | + encrypted_value = self._secrets_manager.store_secret( |
| 171 | + match.value, |
| 172 | + match.service, |
| 173 | + match.type, |
| 174 | + self._session_id, |
| 175 | + ) |
| 176 | + return f"REDACTED<${encrypted_value}>" |
| 177 | + |
| 178 | + def _notify_secret(self, notify_string): |
| 179 | + self._context.add_alert( |
| 180 | + self._name, trigger_string=notify_string, severity_category=AlertSeverity.CRITICAL |
| 181 | + ) |
| 182 | + |
| 183 | + |
| 184 | +class SecretsObfuscator(SecretsModifier): |
| 185 | + def __init__( |
| 186 | + self, |
| 187 | + ): |
| 188 | + super().__init__() |
| 189 | + |
| 190 | + def _hide_secret(self, match: Match) -> str: |
| 191 | + """ |
| 192 | + Obfuscate the secret value. We use a hardcoded number of asterisks |
| 193 | + to not leak the length of the secret. |
| 194 | + """ |
| 195 | + return "*" * 32 |
| 196 | + |
| 197 | + def _notify_secret(self, secret): |
| 198 | + pass |
| 199 | + |
| 200 | + |
| 201 | +class CodegateSecrets(PipelineStep): |
| 202 | + """Pipeline step that handles secret information requests.""" |
| 203 | + |
| 204 | + def __init__(self): |
| 205 | + """Initialize the CodegateSecrets pipeline step.""" |
| 206 | + super().__init__() |
| 207 | + |
| 208 | + @property |
| 209 | + def name(self) -> str: |
| 210 | + """ |
| 211 | + Returns the name of this pipeline step. |
| 212 | +
|
| 213 | + Returns: |
| 214 | + str: The identifier 'codegate-secrets'. |
| 215 | + """ |
| 216 | + return "codegate-secrets" |
| 217 | + |
| 218 | + def _redact_text( |
| 219 | + self, text: str, secrets_manager: SecretsManager, session_id: str, context: PipelineContext |
| 220 | + ) -> tuple[str, int]: |
| 221 | + """ |
| 222 | + Find and encrypt secrets in the given text. |
| 223 | +
|
| 224 | + Args: |
| 225 | + text: The text to protect |
| 226 | + secrets_manager: .. |
| 227 | + session_id: .. |
| 228 | + context: The pipeline context to be able to log alerts |
| 229 | + Returns: |
| 230 | + Tuple containing protected text with encrypted values and the count of redacted secrets |
| 231 | + """ |
| 232 | + # Find secrets in the text |
| 233 | + text_encryptor = SecretsEncryptor(secrets_manager, context, session_id) |
| 234 | + return text_encryptor.obfuscate(text) |
| 235 | + |
168 | 236 | async def process( |
169 | 237 | self, request: ChatCompletionRequest, context: PipelineContext |
170 | 238 | ) -> PipelineResult: |
|
0 commit comments