Coverage for src / codeaudit / pypi_package_scan.py: 46%

57 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-09 09:33 +0200

1""" 

2License GPLv3 or higher. 

3 

4(C) 2025 Created by Maikel Mardjan - https://nocomplexity.com/ 

5 

6This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. 

7 

8This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. 

9 

10You should have received a copy of the GNU General Public License along with this program. If not, see <https://www.gnu.org/licenses/>. 

11 

12 

13Public API functions for Python Code Audit aka codeaudit on pypi.org 

14""" 

15 

16import gzip 

17import json 

18import tarfile 

19import tempfile 

20import zlib 

21from urllib.error import HTTPError, URLError 

22from urllib.request import Request, urlopen 

23 

24from codeaudit import __version__ 

25 

26NOCX_HEADERS = { 

27 "user-agent": f"Python Code Audit /{__version__} (https://github.com/nocomplexity/codeaudit)", 

28 "Accept": "text/html, application/xhtml+xml, application/xml;q=0.9, */*;q=0.8", 

29 "Accept-Encoding": "gzip, deflate,br", 

30 "Connection": "keep-alive", 

31 "Upgrade-Insecure-Requests": "1", 

32} 

33 

34 

35def get_pypi_package_info(package_name): 

36 """JSON response, needed to get download URL of sdist""" 

37 url = f"https://pypi.org/pypi/{package_name}/json" 

38 

39 try: 

40 with urlopen(url) as response: 

41 return json.load(response) 

42 except ( 

43 HTTPError 

44 ): # When urlopen receives a 4xx (client error) or 5xx (server error) status code, it does not return the response object; instead, it immediately raises an exception called urllib.error.HTTPError. If a package is not found a 40x is send with json response {"message": "Not Found"}, I keep handling errors simple 

45 return False # No package with this name found on pypi.org! 

46 except URLError as e: 

47 print(f"Network error: {e}") 

48 return None 

49 

50 

51def get_pypi_download_info(package_name): 

52 """Retrieves the sdist download URL 

53 Using the PyPI JSON API to get the sdist download URL (https://docs.pypi.org/api/json/) 

54 Note JSON API result is a nested dict with all release info published, so finding the correct sdist download URL needs logic. 

55 """ 

56 data = get_pypi_package_info(package_name) 

57 if not data: 

58 return False 

59 # Get the official "latest" version string from the API metadata 

60 latest_version = data.get("info", {}).get("version") 

61 if not latest_version: 

62 return False 

63 

64 # Access the files associated with that specific version 

65 releases_list = data.get("releases", {}).get(latest_version, []) 

66 

67 sdist_download_url = None 

68 

69 # Explicitly look for the source distribution (sdist) 

70 for file_info in releases_list: 

71 if file_info.get("packagetype") == "sdist": 

72 url = file_info.get("url") 

73 if url and url.endswith( 

74 ".tar.gz" 

75 ): # PEP527 I only extract .tar.gz files, older source formats not supported. 

76 sdist_download_url = url 

77 break # Found it, stop looking 

78 

79 return {"download_url": sdist_download_url, "release": latest_version} 

80 

81 

82def get_package_source(url, nocxheaders=NOCX_HEADERS, nocxtimeout=10): 

83 """Retrieves a package source and extract so SAST scanning can be applied 

84 Make sure to cleanup the temporary dir!! Using e.g. `tmp_handle.cleanup()` # deletes everything 

85 """ 

86 try: 

87 request = Request(url, headers=nocxheaders or {}) 

88 with urlopen(request, timeout=nocxtimeout) as response: 

89 content = response.read() 

90 content_encoding = response.headers.get("Content-Encoding") 

91 if content_encoding == "gzip": 

92 content = gzip.decompress(content) 

93 elif content_encoding == "deflate": 

94 content = zlib.decompress(content, -zlib.MAX_WBITS) 

95 elif content_encoding not in [None]: 

96 raise ValueError(f"Unexpected content encoding: {content_encoding}") 

97 

98 # This directory will auto-delete when the context block exits 

99 tmpdir_obj = tempfile.TemporaryDirectory(prefix="codeaudit_") 

100 temp_dir = tmpdir_obj.name 

101 

102 tar_path = f"{temp_dir}/package.tar.gz" 

103 with open(tar_path, "wb") as f: 

104 f.write(content) 

105 

106 with tarfile.open(tar_path, "r:gz") as tar: 

107 tar.extractall( 

108 path=temp_dir, filter="data" 

109 ) # nosec Possible risks are mitigated as far as possible, see architecture notes. 

110 

111 return temp_dir, tmpdir_obj # return both so caller controls lifetime 

112 

113 except Exception as e: 

114 print(e)