Coverage for src / codeaudit / privacy_lint.py: 72%

182 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-09 09:33 +0200

1""" 

2License GPLv3 or higher. 

3 

4(C) 2026 Created by Maikel Mardjan - https://nocomplexity.com/ 

5 

6This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. 

7 

8This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. 

9 

10You should have received a copy of the GNU General Public License along with this program. If not, see <https://www.gnu.org/licenses/>. 

11 

12EGRESS DETECTION LOGIC - see docs 

13""" 

14 

15import ast 

16import datetime 

17import re 

18from importlib.resources import files 

19from pathlib import Path 

20 

21# from codeaudit.api_interfaces import version 

22from codeaudit import __version__ 

23from codeaudit.filehelpfunctions import ( 

24 collect_python_source_files, 

25 get_filename_from_path, 

26 is_ast_parsable, 

27 read_in_source_file, 

28) 

29from codeaudit.pypi_package_scan import get_package_source, get_pypi_download_info 

30 

31SECRETS_LIST = files("codeaudit.data").joinpath("secretslist.txt") 

32 

33 

34def data_egress_scan(input_path): 

35 """Scans Python file or a PyPI package for potential privacy leaks. 

36 

37 This function analyzes Python code for possible privacy-related issues 

38 (which often overlap with security weaknesses). The input can be: 

39 - A local directory containing a Python package 

40 - A single Python file 

41 - A PyPI package name (which will be downloaded and scanned) 

42 

43 Depending on the input type, the function performs an AST-based scan 

44 and returns structured metadata along with scan results. 

45 

46 Args: 

47 input_path (str): Path to a local directory, path to a Python 

48 file, or the name of a PyPI package to scan. 

49 

50 Returns: 

51 dict: A dictionary containing scan metadata and results. The 

52 structure varies depending on the input: 

53 - For a directory or PyPI package, results include package-level 

54 privacy findings. 

55 - For a single Python file, results include file-level privacy 

56 findings. 

57 - If the input is invalid, an error dictionary is returned with 

58 an `"Error"` key. 

59 

60 Raises: 

61 None: All errors are handled internally and reported in the 

62 returned dictionary. 

63 """ 

64 file_output = {} 

65 file_path = Path(input_path) 

66 ca_version_info = {"name": "Python_Code_Audit", "version": __version__} 

67 now = datetime.datetime.now() 

68 timestamp_str = now.strftime("%Y-%m-%d %H:%M") 

69 output = ca_version_info | {"generated_on": timestamp_str} 

70 # Check if the input is a valid directory or a single valid Python file 

71 if file_path.is_dir(): # local directory scan 

72 package_name = get_filename_from_path(input_path) 

73 output |= {"package_name": package_name} 

74 spycheck_output = _codeaudit_directory_spyscan(input_path) 

75 output |= spycheck_output 

76 return output 

77 elif ( 

78 file_path.suffix.lower() == ".py" 

79 and file_path.is_file() 

80 and is_ast_parsable(input_path) 

81 ): # check on parseable single Python file 

82 # do a file spy check 

83 name_of_file = get_filename_from_path(input_path) 

84 name_dict = {"FileName": name_of_file} 

85 spycheck_output = spy_check(input_path) 

86 file_output["0"] = ( 

87 spycheck_output # there is only 1 file , so index 0 equals as for package to make functionality that use the output that works on the dict or json can equal for a package or a single file! 

88 ) 

89 output |= {"file_name": name_dict, "file_privacy_check": file_output} 

90 return output 

91 elif pypi_data := get_pypi_download_info(input_path): 

92 package_name = ( 

93 input_path # The variable input_path is now equal to the package name 

94 ) 

95 url = pypi_data["download_url"] 

96 release = pypi_data["release"] 

97 if url is not None: 

98 src_dir, tmp_handle = get_package_source(url) 

99 output |= {"package_name": package_name, "package_release": release} 

100 try: 

101 spycheck_output = _codeaudit_directory_spyscan(src_dir) 

102 output |= spycheck_output 

103 finally: 

104 # Cleaning up temp directory 

105 tmp_handle.cleanup() # deletes everything from temp directory 

106 return output 

107 else: 

108 # Its not a directory nor a valid Python file: 

109 return { 

110 "Error": "File is not a *.py file, does not exist or is not a valid directory path towards a Python package." 

111 } 

112 

113 

114def spy_check(file): 

115 """runs the AST function to get spy info""" 

116 code = read_in_source_file(file) 

117 spy_output = collect_secret_values(code) 

118 name_of_file = get_filename_from_path(file) 

119 output = {"file_name": name_of_file, "privacy_check_result": spy_output} 

120 return output 

121 

122 

123def _codeaudit_directory_spyscan(input_path): 

124 """Performs a spyscan on a local directory 

125 Function is also used with scanning directory PyPI.org packages, since in that case a tmp directory is used 

126 """ 

127 output = {} 

128 file_output = {} 

129 files_to_check = collect_python_source_files(input_path) 

130 if len(files_to_check) > 1: 

131 for i, file in enumerate(files_to_check): 

132 file_output[i] = spy_check(file) 

133 output |= {"file_privacy_check": file_output} 

134 return output 

135 else: 

136 output_msg = f"Directory path {input_path} contains no Python files." 

137 return {"Error": output_msg} 

138 

139 

140def load_secrets_list(filename=SECRETS_LIST): 

141 """ 

142 Load secrets from SECRETS_LIST and return a list of lines, 

143 excluding empty lines and lines starting with '#'. 

144 """ 

145 secrets_patterns = [] 

146 

147 with open(filename, "r", encoding="utf-8") as f: 

148 for line in f: 

149 line = line.strip() 

150 if not line or line.startswith("#"): 

151 continue 

152 secrets_patterns.append(line.lower()) # lower all patterns 

153 

154 return secrets_patterns 

155 

156 

157def match_secret(secrets, name, value): 

158 """ 

159 Check whether a name or value contains a secret. 

160 

161 Assumptions: 

162 - `secrets` are already lowercased. 

163 

164 Matching rules (in priority order): 

165 1. Whole-word match in name 

166 2. Whole-word match in value 

167 

168 Returns: 

169 The matching secret (lowercased) if found, otherwise None. 

170 """ 

171 name_lower = str(name).lower() 

172 value_lower = str(value).lower() 

173 

174 # Shorter secrets first to preserve original behavior 

175 for secret_tag in sorted(secrets, key=len): 

176 pattern = re.compile(rf"\b{re.escape(secret_tag)}\b") 

177 

178 if pattern.search(name_lower) or pattern.search(value_lower): 

179 return secret_tag 

180 

181 return None 

182 

183 

184def has_privacy_findings(data): 

185 """ 

186 Returns True if at least one file has a non-empty 

187 'privacy_check_result' list, otherwise False. 

188 """ 

189 filesscanned = data.get("file_privacy_check", {}) 

190 

191 for file_info in filesscanned.values(): 

192 results = file_info.get("privacy_check_result") 

193 if results and len(results) > 0: 

194 return True 

195 

196 return False 

197 

198 

199def count_privacy_check_results(data): 

200 """ 

201 Count total number of findings across all files, 

202 only where privacy_check_result is non-empty. 

203 """ 

204 file_checks = data.get("file_privacy_check", {}) 

205 

206 return sum( 

207 len(entry.get("privacy_check_result", [])) 

208 for entry in file_checks.values() 

209 if isinstance(entry, dict) and entry.get("privacy_check_result") 

210 ) 

211 

212 

213def collect_secret_values(source_code, secrets_file=SECRETS_LIST): 

214 """Scan Python source code for potential secret values indicating telemetry or data exfiltration. 

215 Duplicate line results are filtered out. 

216 """ 

217 secrets = load_secrets_list(secrets_file) 

218 results = [] 

219 seen_keys = set() 

220 seen_lines = set() # Track line contents to filter duplicates 

221 source_lines = source_code.splitlines() 

222 

223 # ------------------------- 

224 # Parse AST and detect aliases 

225 # ------------------------- 

226 tree = ast.parse(source_code) 

227 aliases = {} 

228 

229 for node in ast.walk(tree): 

230 if isinstance(node, ast.Import): 

231 for n in node.names: 

232 if n.asname: 

233 aliases[n.asname] = n.name 

234 elif isinstance(node, ast.ImportFrom): 

235 module = node.module or "" 

236 for n in node.names: 

237 full = f"{module}.{n.name}" if module else n.name 

238 if n.asname: 

239 aliases[n.asname] = full 

240 

241 # ------------------------- 

242 # Helpers 

243 # ------------------------- 

244 def get_constant(node): 

245 return getattr(node, "value", None) 

246 

247 def is_os_environ(node): 

248 return ( 

249 getattr(getattr(node, "value", None), "attr", None) == "environ" 

250 and getattr( 

251 getattr(getattr(node, "value", None), "value", None), "id", None 

252 ) 

253 == "os" 

254 ) 

255 

256 def get_target_repr(node): 

257 if hasattr(node, "id"): 

258 return node.id 

259 if hasattr(node, "attr") or hasattr(node, "slice"): 

260 return ast.unparse(node) 

261 return None 

262 

263 def classify_value(node): 

264 if node is None: 

265 return None 

266 if isinstance(node, ast.Constant): 

267 return node.value 

268 if hasattr(node, "slice"): 

269 if is_os_environ(node): 

270 return get_constant(node.slice) 

271 return ast.unparse(node) 

272 if hasattr(node, "func") and getattr(node, "args", None): 

273 first_arg = node.args[0] 

274 if isinstance(first_arg, ast.Constant): 

275 return first_arg.value 

276 if hasattr(node, "id") or hasattr(node, "attr"): 

277 return ast.unparse(node) 

278 return ast.unparse(node) 

279 

280 def get_original_line(node): 

281 lineno = getattr(node, "lineno", None) 

282 if lineno is None: 

283 return None 

284 lines = [] 

285 if lineno > 1: 

286 lines.append(source_lines[lineno - 2].rstrip()) 

287 if 1 <= lineno <= len(source_lines): 

288 lines.append(source_lines[lineno - 1].rstrip()) 

289 if lineno < len(source_lines): 

290 lines.append(source_lines[lineno].rstrip()) 

291 return "\n".join(lines) 

292 

293 def get_call_name(node): 

294 func = getattr(node, "func", None) 

295 if isinstance(func, ast.Attribute): 

296 base = ast.unparse(func.value) 

297 base = aliases.get(base, base) 

298 return f"{base}.{func.attr}" 

299 if isinstance(func, ast.Name): 

300 return aliases.get(func.id, func.id) 

301 return None 

302 

303 def add_value(name, value_node, node, call=None): 

304 value = classify_value(value_node) 

305 matched = match_secret(secrets, name, value) 

306 if matched is None: 

307 return 

308 

309 lineno = getattr(node, "lineno", None) 

310 line_content = get_original_line(node) 

311 

312 key = (lineno, matched, call) 

313 

314 # Skip duplicate keys or duplicate line content 

315 if key in seen_keys or line_content in seen_lines: 

316 return 

317 

318 seen_keys.add(key) 

319 seen_lines.add(line_content) 

320 

321 results.append( 

322 { 

323 "lineno": lineno, 

324 "code": line_content, 

325 "matched": matched, 

326 "call": call, 

327 } 

328 ) 

329 

330 # ------------------------- 

331 # Walk AST 

332 # ------------------------- 

333 for node in ast.walk(tree): 

334 # Assignments 

335 for target in getattr(node, "targets", []): 

336 name = get_target_repr(target) 

337 if name: 

338 add_value(name, getattr(node, "value", None), node) 

339 

340 # Annotated assignments 

341 if isinstance(node, ast.AnnAssign): 

342 name = get_target_repr(node.target) 

343 if name: 

344 add_value(name, getattr(node, "value", None), node) 

345 

346 # Function calls 

347 if isinstance(node, ast.Call): 

348 call_name = get_call_name(node) 

349 # Keyword arguments 

350 for kw in node.keywords: 

351 if kw.arg: 

352 add_value(kw.arg, kw.value, kw, call_name) 

353 # Positional arguments 

354 for arg in node.args: 

355 add_value(None, arg, node, call_name) 

356 

357 return sorted(results, key=lambda item: item["lineno"])