11import re
22from abc import abstractmethod
3- from typing import Optional
3+ from typing import List , Optional , Tuple
44
55import structlog
66from litellm import ChatCompletionRequest , ChatCompletionSystemMessage , ModelResponse
@@ -44,12 +44,11 @@ def _hide_secret(self, match: Match) -> str:
4444 pass
4545
4646 @abstractmethod
47- def _notify_secret (self , secret ) :
47+ def _notify_secret (self , match : Match , protected_text : List [ str ]) -> None :
4848 """
4949 Notify about a found secret
50- TODO: We should probably not notify about a secret value but rather
51- an obfuscated string. It might be nice to report the context as well
52- (e.g. the file or a couple of lines before and after)
50+ TODO: If the secret came from a CodeSnippet we should notify about that. This would
51+ involve using the CodeSnippetExtractor step that is further down the pipeline.
5352 """
5453 pass
5554
@@ -92,6 +91,21 @@ def _extend_match_boundaries(self, text: str, start: int, end: int) -> tuple[int
9291
9392 return start , end
9493
94+ def _get_surrounding_secret_lines (
95+ self , protected_text : List [str ], secret_line : int , surrounding_lines : int = 3
96+ ) -> str :
97+ """
98+ Get the lines before and after the secret line to provide context.
99+
100+ Args:
101+ protected_text: The text with secrets replaced
102+ secret_line: The line number of the secret
103+ """
104+ lines = "" .join (protected_text ).split ("\n " )
105+ start_line = max (secret_line - surrounding_lines , 0 )
106+ end_line = min (secret_line + surrounding_lines , len (lines ))
107+ return "\n " .join (lines [start_line :end_line ])
108+
95109 def obfuscate (self , text : str ) -> tuple [str , int ]:
96110 matches = CodegateSignatures .find_in_string (text )
97111 if not matches :
@@ -100,7 +114,7 @@ def obfuscate(self, text: str) -> tuple[str, int]:
100114 logger .debug (f"Found { len (matches )} secrets in the user message" )
101115
102116 # Convert line positions to absolute positions and extend boundaries
103- absolute_matches = []
117+ absolute_matches : List [ Tuple [ int , int , Match ]] = []
104118 for match in matches :
105119 start = self ._get_absolute_position (match .line_number , match .start_index , text )
106120 end = self ._get_absolute_position (match .line_number , match .end_index , text )
@@ -119,37 +133,30 @@ def obfuscate(self, text: str) -> tuple[str, int]:
119133 protected_text = list (text )
120134
121135 # Store matches for logging
122- found_secrets = []
136+ found_secrets = 0
123137
124138 # Replace each match with its encrypted value
139+ logger .info ("\n Found secrets:" )
125140 for start , end , match in absolute_matches :
126141 hidden_secret = self ._hide_secret (match )
127- self ._notify_secret (hidden_secret )
142+ self ._notify_secret (match , protected_text )
128143
129144 # Replace the secret in the text
130145 protected_text [start :end ] = hidden_secret
131- # Store for logging
132- found_secrets .append (
133- {
134- "service" : match .service ,
135- "type" : match .type ,
136- "original" : match .value ,
137- "encrypted" : hidden_secret ,
138- }
139- )
140146
141- # Log the findings
142- logger .info ("\n Found secrets:" )
143- for secret in found_secrets :
144- logger .info (f"\n Service: { secret ['service' ]} " )
145- logger .info (f"Type: { secret ['type' ]} " )
146- logger .info (f"Original: { secret ['original' ]} " )
147- logger .info (f"Encrypted: { secret ['encrypted' ]} " )
147+ found_secrets += 1
148+ # Log the findings
149+ logger .info (
150+ f"\n Service: { match .service } "
151+ f"\n Type: { match .type } "
152+ f"\n Original: { match .value } "
153+ f"\n Encrypted: { hidden_secret } "
154+ )
148155
149156 # Convert back to string
150157 protected_string = "" .join (protected_text )
151158 print (f"\n Protected text:\n { protected_string } " )
152- return protected_string , len ( found_secrets )
159+ return protected_string , found_secrets
153160
154161
155162class SecretsEncryptor (SecretsModifier ):
@@ -175,7 +182,9 @@ def _hide_secret(self, match: Match) -> str:
175182 )
176183 return f"REDACTED<${ encrypted_value } >"
177184
178- def _notify_secret (self , notify_string ):
185+ def _notify_secret (self , match : Match , protected_text : List [str ]) -> None :
186+ secret_lines = self ._get_surrounding_secret_lines (protected_text , match .line_number )
187+ notify_string = f"{ match .service } - { match .type } :\n { secret_lines } "
179188 self ._context .add_alert (
180189 self ._name , trigger_string = notify_string , severity_category = AlertSeverity .CRITICAL
181190 )
@@ -194,7 +203,7 @@ def _hide_secret(self, match: Match) -> str:
194203 """
195204 return "*" * 32
196205
197- def _notify_secret (self , secret ) :
206+ def _notify_secret (self , match : Match , protected_text : List [ str ]) -> None :
198207 pass
199208
200209
0 commit comments