Coverage for src / codeaudit / api_helpers.py: 10%

131 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-09 09:33 +0200

1""" 

2License GPLv3 or higher. 

3 

4(C) 2026 Created by Maikel Mardjan - https://nocomplexity.com/ 

5 

6This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. 

7 

8This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. 

9 

10You should have received a copy of the GNU General Public License along with this program. If not, see <https://www.gnu.org/licenses/>. 

11 

12Function to create nice APIs. So API helper functions. 

13""" 

14 

15import html 

16 

17import pandas as pd 

18 

19from codeaudit.api_interfaces import get_modules, get_overview 

20from codeaudit.checkmodules import get_all_modules 

21from codeaudit.filehelpfunctions import ( 

22 collect_python_source_files, 

23 get_filename_from_path, 

24) 

25from codeaudit.security_checks import ast_security_checks, perform_validations 

26from codeaudit.suppression import filter_sast_results 

27from codeaudit.totals import overview_per_file 

28 

29 

30def _collect_issue_lines(filename, line, context=1): 

31 """ 

32 Safely extract source code lines around a specific line number for display. 

33 

34 Args: 

35 filename (str): Path to the Python source file. 

36 line (int): Target line number (1-based). 

37 context (int, optional): Number of lines of context before and after the target line. Defaults to 1. 

38 

39 Returns: 

40 str: HTML-formatted code snippet with <pre><code> wrapper. Returns empty string on failure. 

41 """ 

42 # Validate inputs 

43 if not isinstance(filename, str) or not filename: 

44 return "" 

45 if not isinstance(line, int) or line <= 0: 

46 return "" 

47 

48 try: 

49 with open(filename, "r", encoding="utf-8") as f: 

50 lines = f.readlines() 

51 except (OSError, UnicodeDecodeError): 

52 return "" 

53 

54 # Calculate safe slice indices 

55 start = max(line - context - 1, 0) # zero-based 

56 end = min(line + context, len(lines)) 

57 

58 snippet_lines = lines[start:end] 

59 

60 snippet_lines = [l.rstrip("\n") for l in snippet_lines if l.strip() != ""] 

61 

62 # Escape HTML to prevent injection 

63 escaped_lines = [html.escape(l) for l in snippet_lines] 

64 

65 code_lines = ( 

66 "<pre><code class='language-python'>" 

67 + "\n".join(escaped_lines) 

68 + "</code></pre>" 

69 ) 

70 

71 return code_lines 

72 

73 

74def _get_test_info(error): 

75 """ 

76 Retrieve severity and info text for a given SAST error identifier. 

77 

78 Args: 

79 error (str): Identifier to match against the 'construct' column. 

80 

81 Returns: 

82 tuple[str, str]: (severity, info_text). Defaults to ('unknown', '') 

83 if no match is found or an error occurs. 

84 """ 

85 DEFAULT = ("unknown", "") 

86 # Validate input 

87 if not isinstance(error, str) or not error: 

88 return DEFAULT 

89 

90 try: 

91 df = ast_security_checks() 

92 except Exception: 

93 return DEFAULT 

94 

95 # Validate expected structure 

96 required_columns = {"construct", "severity", "info"} 

97 if not hasattr(df, "columns") or not required_columns.issubset(df.columns): 

98 return DEFAULT 

99 

100 try: 

101 # Exact match 

102 found_rows = df[df["construct"] == error] 

103 if not found_rows.empty: 

104 row = found_rows.iloc[0] 

105 return (str(row.get("severity", "unknown")), str(row.get("info", ""))) 

106 

107 # Controlled fallback (avoid overly broad matching) 

108 if "extractall" in error: 

109 fallback_rows = df[df["construct"] == "tarfile.TarFile"] 

110 if not fallback_rows.empty: 

111 row = fallback_rows.iloc[0] 

112 return (str(row.get("severity", "unknown")), str(row.get("info", ""))) 

113 

114 except Exception: 

115 return DEFAULT 

116 

117 # Safe fallback instead of exit() 

118 return DEFAULT 

119 

120 

121def _build_weakness_details(sastresult, filename_location): 

122 """ 

123 Builds a mapping of line numbers to SAST issue details. 

124 

125 Processes static analysis results into a dictionary keyed by line number, 

126 including severity, description, and code snippets. Handles invalid input, 

127 duplicate line issues, and limits total processed entries for safety. 

128 

129 Args: 

130 sastresult (dict): Mapping of issue identifiers to iterable line numbers. 

131 filename_location (str): Path to the source file for extracting code snippets. 

132 

133 Returns: 

134 dict: Dictionary keyed by line number containing issue detail dict(s). 

135 If multiple issues exist on the same line, the value is a list. 

136 """ 

137 if not isinstance(sastresult, dict) or not sastresult: 

138 return {} 

139 

140 # Optional: basic path safety check (adjust as needed) 

141 if not isinstance(filename_location, str) or ".." in filename_location: 

142 return {} 

143 

144 result = {} 

145 MAX_ISSUES = 10000 # prevent abuse / runaway loops 

146 issue_count = 0 

147 

148 for error_str, line_numbers in sastresult.items(): 

149 

150 # Validate key 

151 if not isinstance(error_str, str): 

152 continue 

153 

154 # Validate line_numbers 

155 if not isinstance(line_numbers, (list, tuple, set)): 

156 continue 

157 

158 # Safe retrieval of metadata 

159 try: 

160 severity, info_text = _get_test_info(error_str) 

161 except Exception: 

162 severity, info_text = "unknown", "" 

163 

164 for line_num in line_numbers: 

165 

166 # Limit total processed issues 

167 issue_count += 1 

168 if issue_count > MAX_ISSUES: 

169 break 

170 

171 # Validate line number 

172 if not isinstance(line_num, int) or line_num <= 0: 

173 continue 

174 

175 # Safe code extraction 

176 try: 

177 code_snippet = _collect_issue_lines(filename_location, line_num) 

178 except Exception: 

179 code_snippet = "" 

180 

181 entry = { 

182 "line": line_num, 

183 "validation": error_str, 

184 "severity": severity, 

185 "info": info_text, 

186 "code": code_snippet, 

187 } 

188 

189 # Handle multiple issues on same line 

190 if line_num in result: 

191 # Convert to list if needed 

192 if isinstance(result[line_num], list): 

193 result[line_num].append(entry) 

194 else: 

195 result[line_num] = [result[line_num], entry] 

196 else: 

197 result[line_num] = entry 

198 

199 if issue_count > MAX_ISSUES: 

200 break 

201 

202 return result 

203 

204 

205def _codeaudit_scan_wasm(filename, nosec_flag): 

206 """Internal helper function to do a SAST scan on a single file (WASM-safe) 

207 filename is full filename, including path 

208 """ 

209 name_of_file = get_filename_from_path(filename) 

210 

211 try: 

212 # Run SAST scan 

213 if not nosec_flag: 

214 sast_data = perform_validations(filename) 

215 else: 

216 unfiltered_scan_output = perform_validations(filename) 

217 sast_data = filter_sast_results(unfiltered_scan_output) 

218 

219 # Defensive extraction 

220 sast_data_results = sast_data.get("result", {}) 

221 details = _build_weakness_details(sast_data_results, filename) 

222 return {"file_name": name_of_file, "sast_result": details} 

223 except Exception as e: 

224 # WASM-safe: never crash entire scan because of one file 

225 return {"file_name": name_of_file, "sast_result": {}, "error": str(e)} 

226 

227 

228def _codeaudit_directory_scan_wasm(input_path, nosec_flag): 

229 """ 

230 Performs a scan on a directory (WASM/Pyodide safe). 

231 Works for extracted PyPI packages. 

232 """ 

233 

234 output = {} 

235 file_output = {} 

236 

237 try: 

238 files_to_check = collect_python_source_files(input_path) 

239 except Exception as e: 

240 return {"Error": f"Failed to collect Python files: {str(e)}"} 

241 

242 if not files_to_check: 

243 return {"Error": f"Directory path {input_path} contains no Python files."} 

244 # Package-level metadata (safe-guarded) 

245 try: 

246 modules_discovered = get_all_modules(input_path) 

247 except Exception: 

248 modules_discovered = {} 

249 

250 try: 

251 package_overview = get_overview(input_path) 

252 except Exception: 

253 package_overview = {} 

254 

255 output |= { 

256 "statistics_overview": package_overview, 

257 "module_overview": modules_discovered, 

258 } 

259 # File scanning 

260 for i, file in enumerate(files_to_check): 

261 try: 

262 file_information = overview_per_file(file) 

263 except Exception: 

264 file_information = {} 

265 

266 try: 

267 module_information = get_modules(file) 

268 except Exception: 

269 module_information = {} 

270 

271 scan_output = _codeaudit_scan_wasm(file, nosec_flag) 

272 

273 # Ensure merge never crashes 

274 try: 

275 file_output[i] = file_information | module_information | scan_output 

276 except Exception: 

277 # fallback (extreme edge case) 

278 file_output[i] = { 

279 "file_name": get_filename_from_path(file), 

280 "error": "Failed to merge scan results", 

281 } 

282 

283 output |= {"file_security_info": file_output} 

284 

285 return output