Coverage for src / codeaudit / codeaudit_dashboard.py: 0%

136 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-09 09:33 +0200

1""" 

2License GPLv3 or higher. 

3 

4(C) 2026 Created by Maikel Mardjan - https://nocomplexity.com/ 

5 

6This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. 

7 

8This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. 

9 

10You should have received a copy of the GNU General Public License along with this program. If not, see <https://www.gnu.org/licenses/>. 

11 

12 

13WASM Dashboard version of codeaudit - limited functionality - 

14""" 

15 

16import asyncio 

17import datetime 

18import inspect 

19import json 

20import sys 

21 

22import panel as pn 

23 

24pn.extension("vega") 

25 

26 

27from codeaudit.altairplots import ( 

28 ast_nodes_overview, 

29 complexity_heatmap, 

30 lines_of_code_overview, 

31 module_count_barchart, 

32 module_distribution_view, 

33 sast_files_overview, 

34 weaknesses_overview, 

35 weaknesses_radial_overview, 

36) 

37from codeaudit.api_helpers import _codeaudit_directory_scan_wasm 

38from codeaudit.api_interfaces import get_package_source, version 

39from codeaudit.dashboard_reports import ( 

40 create_statistics_overview, 

41 get_disclaimer_text, 

42 get_info_text, 

43 report_sast_results, 

44 report_used_modules, 

45) 

46 

47# --- Environment Detection --- 

48IS_PYODIDE = "pyodide" in sys.modules 

49 

50 

51async def get_pypi_package_info_wasm(package_name): 

52 url = f"https://pypi.org/pypi/{package_name}/json" 

53 if IS_PYODIDE: 

54 from pyodide.http import pyfetch 

55 

56 try: 

57 response = await pyfetch(url) 

58 if not response.ok: 

59 return False 

60 return await response.json() 

61 except: 

62 return False 

63 else: 

64 import urllib.request 

65 

66 try: 

67 with urllib.request.urlopen(url) as response: 

68 return json.loads(response.read().decode("utf-8")) 

69 except: 

70 return False 

71 

72 

73async def get_pypi_download_info_wasm(package_name): 

74 data = await get_pypi_package_info_wasm(package_name) 

75 if not data or "info" not in data: 

76 return False 

77 

78 version_str = data.get("info", {}).get("version") 

79 releases = data.get("releases", {}).get(version_str, []) 

80 

81 for file_info in releases: 

82 if file_info.get("packagetype") == "sdist" and file_info.get("url").endswith( 

83 ".tar.gz" 

84 ): 

85 return {"download_url": file_info.get("url"), "release": version_str} 

86 return False 

87 

88 

89async def get_package_source_wasm(url): 

90 import gzip 

91 import tarfile 

92 import tempfile 

93 import zlib 

94 

95 from pyodide.http import pyfetch 

96 

97 try: 

98 response = await pyfetch(url) 

99 if not response.ok: 

100 return None 

101 

102 content = await response.bytes() 

103 content_encoding = response.headers.get("Content-Encoding") 

104 

105 if content_encoding == "gzip": 

106 content = gzip.decompress(content) 

107 elif content_encoding == "deflate": 

108 content = zlib.decompress(content, -zlib.MAX_WBITS) 

109 

110 tmpdir_obj = tempfile.TemporaryDirectory(prefix="codeaudit_") 

111 temp_dir = tmpdir_obj.name 

112 

113 tar_path = f"{temp_dir}/package.tar.gz" 

114 with open(tar_path, "wb") as f: 

115 f.write(content) 

116 

117 with tarfile.open(tar_path, "r:gz") as tar: 

118 tar.extractall(path=temp_dir, filter="data") 

119 

120 return temp_dir, tmpdir_obj 

121 

122 except Exception as e: 

123 print(f"WASM fetch error: {e}") 

124 return None 

125 

126 

127# --- Logic for SAST scan to be WASM safe--- 

128async def filescan_wasm(input_path, nosec=False): 

129 """ 

130 WASM-compatible PyPI-only version of filescan function. 

131 Matches the behaviour of the original implementation for PyPI scans. 

132 

133 PYPI PACKAGE ONLY (for now) 

134 """ 

135 

136 ca_version_info = version() 

137 now = datetime.datetime.now() 

138 timestamp_str = now.strftime("%Y-%m-%d %H:%M") 

139 output = ca_version_info | {"generated_on": timestamp_str} 

140 pypi_data = await get_pypi_download_info_wasm(input_path) 

141 

142 if pypi_data: 

143 package_name = input_path 

144 url = pypi_data.get("download_url") 

145 release = pypi_data.get("release") 

146 

147 if url is not None: 

148 # WASM-safe / Desktop-compatible fetch 

149 if IS_PYODIDE: 

150 decoded_res = await get_package_source_wasm(url) 

151 else: 

152 source_res = get_package_source(url) 

153 

154 if inspect.isawaitable(source_res): 

155 decoded_res = await source_res 

156 else: 

157 decoded_res = source_res 

158 # Validation 

159 if decoded_res is None: 

160 return { 

161 "Error": f"Could not download or extract package from {url}. " 

162 f"This may be due to browser restrictions." 

163 } 

164 

165 src_dir, tmp_handle = decoded_res 

166 

167 # Match original structure EXACTLY 

168 output |= { 

169 "package_name": package_name, 

170 "package_release": release, 

171 } 

172 

173 try: 

174 scan_output = _codeaudit_directory_scan_wasm(src_dir, nosec_flag=nosec) 

175 output |= scan_output 

176 finally: 

177 if tmp_handle: 

178 tmp_handle.cleanup() 

179 

180 return output 

181 # --------------------------------------------------------- 

182 return {"Error": "Package not found on PyPI.org."} 

183 

184 

185# - END of specific HELPERS to do CA things -# 

186 

187 

188# --- UI Component Definitions --- 

189text_input = pn.widgets.TextInput( 

190 name="Python Package Name", placeholder="Enter PyPI package (e.g., requests)..." 

191) 

192 

193 

194run_button = pn.widgets.Button(name="Run Scan", button_type="primary") 

195status = pn.pane.Markdown("### Ready to scan.") 

196result_pane = pn.pane.JSON({}, name="JSON", sizing_mode="stretch_both", depth=-1) 

197loading = pn.indicators.LoadingSpinner( 

198 value=False, size=60, color="primary", bgcolor="light", name="Scanning..." 

199) 

200 

201overview_visuals = create_statistics_overview(result_pane.object) 

202 

203tabs = pn.Tabs( 

204 ("Package Overview", overview_visuals), 

205 ("Used Modules", overview_visuals), 

206 ("Complexity Insights", overview_visuals), 

207 ("Weaknesses Overview", overview_visuals), 

208 ("Weaknesses per file", overview_visuals), 

209 ("Weaknesses Details", overview_visuals), 

210 dynamic=True, 

211 sizing_mode="stretch_both", 

212) 

213 

214# --- UI Callback --- 

215 

216 

217async def run_scan(event): 

218 package_name = text_input.value.strip() 

219 

220 if not package_name: 

221 status.object = "⚠️ Please enter a package name." 

222 return 

223 

224 # --- UI state updates --- 

225 run_button.disabled = True 

226 loading.value = True 

227 status.object = f"🔄 Scanning: **{package_name}**..." 

228 

229 try: 

230 # ✅ Allow UI to update (spinner renders) 

231 await asyncio.sleep(0.1) 

232 

233 # ✅ WASM-safe: no threads, no nested event loops 

234 result = await filescan_wasm(package_name) 

235 

236 # --- Handle results --- 

237 if result is None: 

238 status.object = "❌ Error: Scan failed to return data." 

239 

240 elif "Error" in result: 

241 status.object = f"{result['Error']}" 

242 result_pane.object = result 

243 

244 else: 

245 result_pane.object = result 

246 status.object = f"✅ Scan completed for **{package_name}**" 

247 

248 # --- Update tabs --- 

249 tabs[0] = ( 

250 "Package Overview", 

251 pn.Column(create_statistics_overview(result)), 

252 ) 

253 

254 tabs[1] = ( 

255 "Used Modules", 

256 pn.Column( 

257 pn.Row( 

258 pn.pane.Vega(module_count_barchart(result), show_actions=True), 

259 pn.pane.Vega( 

260 module_distribution_view(result), show_actions=True 

261 ), 

262 ), 

263 report_used_modules(result), 

264 pn.Spacer(height=60), 

265 ), 

266 ) 

267 

268 tabs[2] = ( 

269 "Complexity Insights", 

270 pn.Column( 

271 pn.pane.Vega(complexity_heatmap(result), show_actions=True), 

272 pn.pane.Vega(lines_of_code_overview(result), show_actions=True), 

273 pn.pane.Vega(ast_nodes_overview(result), show_actions=True), 

274 pn.Spacer(height=60), 

275 ), 

276 ) 

277 

278 tabs[3] = ( 

279 "Weaknesses Overview", 

280 pn.Column(pn.pane.Vega(weaknesses_overview(result), show_actions=True)), 

281 ) 

282 

283 tabs[4] = ( 

284 "Weaknesses per file", 

285 pn.Column( 

286 pn.pane.Vega(sast_files_overview(result), show_actions=True), 

287 pn.pane.Vega(weaknesses_radial_overview(result), show_actions=True), 

288 pn.Spacer(height=60), 

289 ), 

290 ) 

291 

292 tabs[5] = ( 

293 "Weaknesses Details", 

294 pn.Column( 

295 report_sast_results(result), 

296 pn.Spacer(height=60), 

297 ), 

298 ) 

299 

300 except Exception as e: 

301 status.object = f"❌ Error: {str(e)}" 

302 

303 finally: 

304 # --- Reset UI state --- 

305 loading.value = False 

306 run_button.disabled = False 

307 

308 

309run_button.on_click(run_scan) 

310 

311# --- Layout --- 

312 

313infotext = get_info_text() 

314disclaimer_text = get_disclaimer_text() 

315 

316# Sidebar layout 

317ca_sidebar = pn.Column( 

318 "## Package Code Security Scan", 

319 text_input, 

320 run_button, 

321 loading, 

322 status, 

323 infotext, 

324 disclaimer_text, 

325 sizing_mode="stretch_width", 

326) 

327 

328 

329main_pane = pn.Column(tabs, sizing_mode="stretch_both") 

330 

331 

332app = pn.template.MaterialTemplate( 

333 header_background="#262626", 

334 title="Python Security Code Audit", 

335 sidebar=[ca_sidebar], 

336 main=[main_pane], 

337) 

338 

339app.servable()