Coverage for src / codeaudit / pypi_package_scan.py: 46%
57 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-09 09:33 +0200
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-09 09:33 +0200
1"""
2License GPLv3 or higher.
4(C) 2025 Created by Maikel Mardjan - https://nocomplexity.com/
6This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
8This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
10You should have received a copy of the GNU General Public License along with this program. If not, see <https://www.gnu.org/licenses/>.
13Public API functions for Python Code Audit aka codeaudit on pypi.org
14"""
16import gzip
17import json
18import tarfile
19import tempfile
20import zlib
21from urllib.error import HTTPError, URLError
22from urllib.request import Request, urlopen
24from codeaudit import __version__
26NOCX_HEADERS = {
27 "user-agent": f"Python Code Audit /{__version__} (https://github.com/nocomplexity/codeaudit)",
28 "Accept": "text/html, application/xhtml+xml, application/xml;q=0.9, */*;q=0.8",
29 "Accept-Encoding": "gzip, deflate,br",
30 "Connection": "keep-alive",
31 "Upgrade-Insecure-Requests": "1",
32}
35def get_pypi_package_info(package_name):
36 """JSON response, needed to get download URL of sdist"""
37 url = f"https://pypi.org/pypi/{package_name}/json"
39 try:
40 with urlopen(url) as response:
41 return json.load(response)
42 except (
43 HTTPError
44 ): # When urlopen receives a 4xx (client error) or 5xx (server error) status code, it does not return the response object; instead, it immediately raises an exception called urllib.error.HTTPError. If a package is not found a 40x is send with json response {"message": "Not Found"}, I keep handling errors simple
45 return False # No package with this name found on pypi.org!
46 except URLError as e:
47 print(f"Network error: {e}")
48 return None
51def get_pypi_download_info(package_name):
52 """Retrieves the sdist download URL
53 Using the PyPI JSON API to get the sdist download URL (https://docs.pypi.org/api/json/)
54 Note JSON API result is a nested dict with all release info published, so finding the correct sdist download URL needs logic.
55 """
56 data = get_pypi_package_info(package_name)
57 if not data:
58 return False
59 # Get the official "latest" version string from the API metadata
60 latest_version = data.get("info", {}).get("version")
61 if not latest_version:
62 return False
64 # Access the files associated with that specific version
65 releases_list = data.get("releases", {}).get(latest_version, [])
67 sdist_download_url = None
69 # Explicitly look for the source distribution (sdist)
70 for file_info in releases_list:
71 if file_info.get("packagetype") == "sdist":
72 url = file_info.get("url")
73 if url and url.endswith(
74 ".tar.gz"
75 ): # PEP527 I only extract .tar.gz files, older source formats not supported.
76 sdist_download_url = url
77 break # Found it, stop looking
79 return {"download_url": sdist_download_url, "release": latest_version}
82def get_package_source(url, nocxheaders=NOCX_HEADERS, nocxtimeout=10):
83 """Retrieves a package source and extract so SAST scanning can be applied
84 Make sure to cleanup the temporary dir!! Using e.g. `tmp_handle.cleanup()` # deletes everything
85 """
86 try:
87 request = Request(url, headers=nocxheaders or {})
88 with urlopen(request, timeout=nocxtimeout) as response:
89 content = response.read()
90 content_encoding = response.headers.get("Content-Encoding")
91 if content_encoding == "gzip":
92 content = gzip.decompress(content)
93 elif content_encoding == "deflate":
94 content = zlib.decompress(content, -zlib.MAX_WBITS)
95 elif content_encoding not in [None]:
96 raise ValueError(f"Unexpected content encoding: {content_encoding}")
98 # This directory will auto-delete when the context block exits
99 tmpdir_obj = tempfile.TemporaryDirectory(prefix="codeaudit_")
100 temp_dir = tmpdir_obj.name
102 tar_path = f"{temp_dir}/package.tar.gz"
103 with open(tar_path, "wb") as f:
104 f.write(content)
106 with tarfile.open(tar_path, "r:gz") as tar:
107 tar.extractall(
108 path=temp_dir, filter="data"
109 ) # nosec Possible risks are mitigated as far as possible, see architecture notes.
111 return temp_dir, tmpdir_obj # return both so caller controls lifetime
113 except Exception as e:
114 print(e)