Coverage for src / codeaudit / privacy_lint.py: 72%
182 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-09 09:33 +0200
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-09 09:33 +0200
1"""
2License GPLv3 or higher.
4(C) 2026 Created by Maikel Mardjan - https://nocomplexity.com/
6This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
8This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
10You should have received a copy of the GNU General Public License along with this program. If not, see <https://www.gnu.org/licenses/>.
12EGRESS DETECTION LOGIC - see docs
13"""
15import ast
16import datetime
17import re
18from importlib.resources import files
19from pathlib import Path
21# from codeaudit.api_interfaces import version
22from codeaudit import __version__
23from codeaudit.filehelpfunctions import (
24 collect_python_source_files,
25 get_filename_from_path,
26 is_ast_parsable,
27 read_in_source_file,
28)
29from codeaudit.pypi_package_scan import get_package_source, get_pypi_download_info
31SECRETS_LIST = files("codeaudit.data").joinpath("secretslist.txt")
34def data_egress_scan(input_path):
35 """Scans Python file or a PyPI package for potential privacy leaks.
37 This function analyzes Python code for possible privacy-related issues
38 (which often overlap with security weaknesses). The input can be:
39 - A local directory containing a Python package
40 - A single Python file
41 - A PyPI package name (which will be downloaded and scanned)
43 Depending on the input type, the function performs an AST-based scan
44 and returns structured metadata along with scan results.
46 Args:
47 input_path (str): Path to a local directory, path to a Python
48 file, or the name of a PyPI package to scan.
50 Returns:
51 dict: A dictionary containing scan metadata and results. The
52 structure varies depending on the input:
53 - For a directory or PyPI package, results include package-level
54 privacy findings.
55 - For a single Python file, results include file-level privacy
56 findings.
57 - If the input is invalid, an error dictionary is returned with
58 an `"Error"` key.
60 Raises:
61 None: All errors are handled internally and reported in the
62 returned dictionary.
63 """
64 file_output = {}
65 file_path = Path(input_path)
66 ca_version_info = {"name": "Python_Code_Audit", "version": __version__}
67 now = datetime.datetime.now()
68 timestamp_str = now.strftime("%Y-%m-%d %H:%M")
69 output = ca_version_info | {"generated_on": timestamp_str}
70 # Check if the input is a valid directory or a single valid Python file
71 if file_path.is_dir(): # local directory scan
72 package_name = get_filename_from_path(input_path)
73 output |= {"package_name": package_name}
74 spycheck_output = _codeaudit_directory_spyscan(input_path)
75 output |= spycheck_output
76 return output
77 elif (
78 file_path.suffix.lower() == ".py"
79 and file_path.is_file()
80 and is_ast_parsable(input_path)
81 ): # check on parseable single Python file
82 # do a file spy check
83 name_of_file = get_filename_from_path(input_path)
84 name_dict = {"FileName": name_of_file}
85 spycheck_output = spy_check(input_path)
86 file_output["0"] = (
87 spycheck_output # there is only 1 file , so index 0 equals as for package to make functionality that use the output that works on the dict or json can equal for a package or a single file!
88 )
89 output |= {"file_name": name_dict, "file_privacy_check": file_output}
90 return output
91 elif pypi_data := get_pypi_download_info(input_path):
92 package_name = (
93 input_path # The variable input_path is now equal to the package name
94 )
95 url = pypi_data["download_url"]
96 release = pypi_data["release"]
97 if url is not None:
98 src_dir, tmp_handle = get_package_source(url)
99 output |= {"package_name": package_name, "package_release": release}
100 try:
101 spycheck_output = _codeaudit_directory_spyscan(src_dir)
102 output |= spycheck_output
103 finally:
104 # Cleaning up temp directory
105 tmp_handle.cleanup() # deletes everything from temp directory
106 return output
107 else:
108 # Its not a directory nor a valid Python file:
109 return {
110 "Error": "File is not a *.py file, does not exist or is not a valid directory path towards a Python package."
111 }
114def spy_check(file):
115 """runs the AST function to get spy info"""
116 code = read_in_source_file(file)
117 spy_output = collect_secret_values(code)
118 name_of_file = get_filename_from_path(file)
119 output = {"file_name": name_of_file, "privacy_check_result": spy_output}
120 return output
123def _codeaudit_directory_spyscan(input_path):
124 """Performs a spyscan on a local directory
125 Function is also used with scanning directory PyPI.org packages, since in that case a tmp directory is used
126 """
127 output = {}
128 file_output = {}
129 files_to_check = collect_python_source_files(input_path)
130 if len(files_to_check) > 1:
131 for i, file in enumerate(files_to_check):
132 file_output[i] = spy_check(file)
133 output |= {"file_privacy_check": file_output}
134 return output
135 else:
136 output_msg = f"Directory path {input_path} contains no Python files."
137 return {"Error": output_msg}
140def load_secrets_list(filename=SECRETS_LIST):
141 """
142 Load secrets from SECRETS_LIST and return a list of lines,
143 excluding empty lines and lines starting with '#'.
144 """
145 secrets_patterns = []
147 with open(filename, "r", encoding="utf-8") as f:
148 for line in f:
149 line = line.strip()
150 if not line or line.startswith("#"):
151 continue
152 secrets_patterns.append(line.lower()) # lower all patterns
154 return secrets_patterns
157def match_secret(secrets, name, value):
158 """
159 Check whether a name or value contains a secret.
161 Assumptions:
162 - `secrets` are already lowercased.
164 Matching rules (in priority order):
165 1. Whole-word match in name
166 2. Whole-word match in value
168 Returns:
169 The matching secret (lowercased) if found, otherwise None.
170 """
171 name_lower = str(name).lower()
172 value_lower = str(value).lower()
174 # Shorter secrets first to preserve original behavior
175 for secret_tag in sorted(secrets, key=len):
176 pattern = re.compile(rf"\b{re.escape(secret_tag)}\b")
178 if pattern.search(name_lower) or pattern.search(value_lower):
179 return secret_tag
181 return None
184def has_privacy_findings(data):
185 """
186 Returns True if at least one file has a non-empty
187 'privacy_check_result' list, otherwise False.
188 """
189 filesscanned = data.get("file_privacy_check", {})
191 for file_info in filesscanned.values():
192 results = file_info.get("privacy_check_result")
193 if results and len(results) > 0:
194 return True
196 return False
199def count_privacy_check_results(data):
200 """
201 Count total number of findings across all files,
202 only where privacy_check_result is non-empty.
203 """
204 file_checks = data.get("file_privacy_check", {})
206 return sum(
207 len(entry.get("privacy_check_result", []))
208 for entry in file_checks.values()
209 if isinstance(entry, dict) and entry.get("privacy_check_result")
210 )
213def collect_secret_values(source_code, secrets_file=SECRETS_LIST):
214 """Scan Python source code for potential secret values indicating telemetry or data exfiltration.
215 Duplicate line results are filtered out.
216 """
217 secrets = load_secrets_list(secrets_file)
218 results = []
219 seen_keys = set()
220 seen_lines = set() # Track line contents to filter duplicates
221 source_lines = source_code.splitlines()
223 # -------------------------
224 # Parse AST and detect aliases
225 # -------------------------
226 tree = ast.parse(source_code)
227 aliases = {}
229 for node in ast.walk(tree):
230 if isinstance(node, ast.Import):
231 for n in node.names:
232 if n.asname:
233 aliases[n.asname] = n.name
234 elif isinstance(node, ast.ImportFrom):
235 module = node.module or ""
236 for n in node.names:
237 full = f"{module}.{n.name}" if module else n.name
238 if n.asname:
239 aliases[n.asname] = full
241 # -------------------------
242 # Helpers
243 # -------------------------
244 def get_constant(node):
245 return getattr(node, "value", None)
247 def is_os_environ(node):
248 return (
249 getattr(getattr(node, "value", None), "attr", None) == "environ"
250 and getattr(
251 getattr(getattr(node, "value", None), "value", None), "id", None
252 )
253 == "os"
254 )
256 def get_target_repr(node):
257 if hasattr(node, "id"):
258 return node.id
259 if hasattr(node, "attr") or hasattr(node, "slice"):
260 return ast.unparse(node)
261 return None
263 def classify_value(node):
264 if node is None:
265 return None
266 if isinstance(node, ast.Constant):
267 return node.value
268 if hasattr(node, "slice"):
269 if is_os_environ(node):
270 return get_constant(node.slice)
271 return ast.unparse(node)
272 if hasattr(node, "func") and getattr(node, "args", None):
273 first_arg = node.args[0]
274 if isinstance(first_arg, ast.Constant):
275 return first_arg.value
276 if hasattr(node, "id") or hasattr(node, "attr"):
277 return ast.unparse(node)
278 return ast.unparse(node)
280 def get_original_line(node):
281 lineno = getattr(node, "lineno", None)
282 if lineno is None:
283 return None
284 lines = []
285 if lineno > 1:
286 lines.append(source_lines[lineno - 2].rstrip())
287 if 1 <= lineno <= len(source_lines):
288 lines.append(source_lines[lineno - 1].rstrip())
289 if lineno < len(source_lines):
290 lines.append(source_lines[lineno].rstrip())
291 return "\n".join(lines)
293 def get_call_name(node):
294 func = getattr(node, "func", None)
295 if isinstance(func, ast.Attribute):
296 base = ast.unparse(func.value)
297 base = aliases.get(base, base)
298 return f"{base}.{func.attr}"
299 if isinstance(func, ast.Name):
300 return aliases.get(func.id, func.id)
301 return None
303 def add_value(name, value_node, node, call=None):
304 value = classify_value(value_node)
305 matched = match_secret(secrets, name, value)
306 if matched is None:
307 return
309 lineno = getattr(node, "lineno", None)
310 line_content = get_original_line(node)
312 key = (lineno, matched, call)
314 # Skip duplicate keys or duplicate line content
315 if key in seen_keys or line_content in seen_lines:
316 return
318 seen_keys.add(key)
319 seen_lines.add(line_content)
321 results.append(
322 {
323 "lineno": lineno,
324 "code": line_content,
325 "matched": matched,
326 "call": call,
327 }
328 )
330 # -------------------------
331 # Walk AST
332 # -------------------------
333 for node in ast.walk(tree):
334 # Assignments
335 for target in getattr(node, "targets", []):
336 name = get_target_repr(target)
337 if name:
338 add_value(name, getattr(node, "value", None), node)
340 # Annotated assignments
341 if isinstance(node, ast.AnnAssign):
342 name = get_target_repr(node.target)
343 if name:
344 add_value(name, getattr(node, "value", None), node)
346 # Function calls
347 if isinstance(node, ast.Call):
348 call_name = get_call_name(node)
349 # Keyword arguments
350 for kw in node.keywords:
351 if kw.arg:
352 add_value(kw.arg, kw.value, kw, call_name)
353 # Positional arguments
354 for arg in node.args:
355 add_value(None, arg, node, call_name)
357 return sorted(results, key=lambda item: item["lineno"])