Starter Code
This commit is contained in:
		| @@ -0,0 +1,189 @@ | ||||
| """Support functions for working with wheel files. | ||||
| """ | ||||
|  | ||||
| import logging | ||||
| from email.message import Message | ||||
| from email.parser import Parser | ||||
| from typing import Dict, Tuple | ||||
| from zipfile import BadZipFile, ZipFile | ||||
|  | ||||
| from pip._vendor.packaging.utils import canonicalize_name | ||||
| from pip._vendor.pkg_resources import DistInfoDistribution, Distribution | ||||
|  | ||||
| from pip._internal.exceptions import UnsupportedWheel | ||||
| from pip._internal.utils.pkg_resources import DictMetadata | ||||
|  | ||||
| VERSION_COMPATIBLE = (1, 0) | ||||
|  | ||||
|  | ||||
| logger = logging.getLogger(__name__) | ||||
|  | ||||
|  | ||||
| class WheelMetadata(DictMetadata): | ||||
|     """Metadata provider that maps metadata decoding exceptions to our | ||||
|     internal exception type. | ||||
|     """ | ||||
|  | ||||
|     def __init__(self, metadata, wheel_name): | ||||
|         # type: (Dict[str, bytes], str) -> None | ||||
|         super().__init__(metadata) | ||||
|         self._wheel_name = wheel_name | ||||
|  | ||||
|     def get_metadata(self, name): | ||||
|         # type: (str) -> str | ||||
|         try: | ||||
|             return super().get_metadata(name) | ||||
|         except UnicodeDecodeError as e: | ||||
|             # Augment the default error with the origin of the file. | ||||
|             raise UnsupportedWheel( | ||||
|                 f"Error decoding metadata for {self._wheel_name}: {e}" | ||||
|             ) | ||||
|  | ||||
|  | ||||
| def pkg_resources_distribution_for_wheel(wheel_zip, name, location): | ||||
|     # type: (ZipFile, str, str) -> Distribution | ||||
|     """Get a pkg_resources distribution given a wheel. | ||||
|  | ||||
|     :raises UnsupportedWheel: on any errors | ||||
|     """ | ||||
|     info_dir, _ = parse_wheel(wheel_zip, name) | ||||
|  | ||||
|     metadata_files = [p for p in wheel_zip.namelist() if p.startswith(f"{info_dir}/")] | ||||
|  | ||||
|     metadata_text = {}  # type: Dict[str, bytes] | ||||
|     for path in metadata_files: | ||||
|         _, metadata_name = path.split("/", 1) | ||||
|  | ||||
|         try: | ||||
|             metadata_text[metadata_name] = read_wheel_metadata_file(wheel_zip, path) | ||||
|         except UnsupportedWheel as e: | ||||
|             raise UnsupportedWheel("{} has an invalid wheel, {}".format(name, str(e))) | ||||
|  | ||||
|     metadata = WheelMetadata(metadata_text, location) | ||||
|  | ||||
|     return DistInfoDistribution(location=location, metadata=metadata, project_name=name) | ||||
|  | ||||
|  | ||||
| def parse_wheel(wheel_zip, name): | ||||
|     # type: (ZipFile, str) -> Tuple[str, Message] | ||||
|     """Extract information from the provided wheel, ensuring it meets basic | ||||
|     standards. | ||||
|  | ||||
|     Returns the name of the .dist-info directory and the parsed WHEEL metadata. | ||||
|     """ | ||||
|     try: | ||||
|         info_dir = wheel_dist_info_dir(wheel_zip, name) | ||||
|         metadata = wheel_metadata(wheel_zip, info_dir) | ||||
|         version = wheel_version(metadata) | ||||
|     except UnsupportedWheel as e: | ||||
|         raise UnsupportedWheel("{} has an invalid wheel, {}".format(name, str(e))) | ||||
|  | ||||
|     check_compatibility(version, name) | ||||
|  | ||||
|     return info_dir, metadata | ||||
|  | ||||
|  | ||||
| def wheel_dist_info_dir(source, name): | ||||
|     # type: (ZipFile, str) -> str | ||||
|     """Returns the name of the contained .dist-info directory. | ||||
|  | ||||
|     Raises AssertionError or UnsupportedWheel if not found, >1 found, or | ||||
|     it doesn't match the provided name. | ||||
|     """ | ||||
|     # Zip file path separators must be / | ||||
|     subdirs = {p.split("/", 1)[0] for p in source.namelist()} | ||||
|  | ||||
|     info_dirs = [s for s in subdirs if s.endswith(".dist-info")] | ||||
|  | ||||
|     if not info_dirs: | ||||
|         raise UnsupportedWheel(".dist-info directory not found") | ||||
|  | ||||
|     if len(info_dirs) > 1: | ||||
|         raise UnsupportedWheel( | ||||
|             "multiple .dist-info directories found: {}".format(", ".join(info_dirs)) | ||||
|         ) | ||||
|  | ||||
|     info_dir = info_dirs[0] | ||||
|  | ||||
|     info_dir_name = canonicalize_name(info_dir) | ||||
|     canonical_name = canonicalize_name(name) | ||||
|     if not info_dir_name.startswith(canonical_name): | ||||
|         raise UnsupportedWheel( | ||||
|             ".dist-info directory {!r} does not start with {!r}".format( | ||||
|                 info_dir, canonical_name | ||||
|             ) | ||||
|         ) | ||||
|  | ||||
|     return info_dir | ||||
|  | ||||
|  | ||||
| def read_wheel_metadata_file(source, path): | ||||
|     # type: (ZipFile, str) -> bytes | ||||
|     try: | ||||
|         return source.read(path) | ||||
|         # BadZipFile for general corruption, KeyError for missing entry, | ||||
|         # and RuntimeError for password-protected files | ||||
|     except (BadZipFile, KeyError, RuntimeError) as e: | ||||
|         raise UnsupportedWheel(f"could not read {path!r} file: {e!r}") | ||||
|  | ||||
|  | ||||
| def wheel_metadata(source, dist_info_dir): | ||||
|     # type: (ZipFile, str) -> Message | ||||
|     """Return the WHEEL metadata of an extracted wheel, if possible. | ||||
|     Otherwise, raise UnsupportedWheel. | ||||
|     """ | ||||
|     path = f"{dist_info_dir}/WHEEL" | ||||
|     # Zip file path separators must be / | ||||
|     wheel_contents = read_wheel_metadata_file(source, path) | ||||
|  | ||||
|     try: | ||||
|         wheel_text = wheel_contents.decode() | ||||
|     except UnicodeDecodeError as e: | ||||
|         raise UnsupportedWheel(f"error decoding {path!r}: {e!r}") | ||||
|  | ||||
|     # FeedParser (used by Parser) does not raise any exceptions. The returned | ||||
|     # message may have .defects populated, but for backwards-compatibility we | ||||
|     # currently ignore them. | ||||
|     return Parser().parsestr(wheel_text) | ||||
|  | ||||
|  | ||||
| def wheel_version(wheel_data): | ||||
|     # type: (Message) -> Tuple[int, ...] | ||||
|     """Given WHEEL metadata, return the parsed Wheel-Version. | ||||
|     Otherwise, raise UnsupportedWheel. | ||||
|     """ | ||||
|     version_text = wheel_data["Wheel-Version"] | ||||
|     if version_text is None: | ||||
|         raise UnsupportedWheel("WHEEL is missing Wheel-Version") | ||||
|  | ||||
|     version = version_text.strip() | ||||
|  | ||||
|     try: | ||||
|         return tuple(map(int, version.split("."))) | ||||
|     except ValueError: | ||||
|         raise UnsupportedWheel(f"invalid Wheel-Version: {version!r}") | ||||
|  | ||||
|  | ||||
| def check_compatibility(version, name): | ||||
|     # type: (Tuple[int, ...], str) -> None | ||||
|     """Raises errors or warns if called with an incompatible Wheel-Version. | ||||
|  | ||||
|     pip should refuse to install a Wheel-Version that's a major series | ||||
|     ahead of what it's compatible with (e.g 2.0 > 1.1); and warn when | ||||
|     installing a version only minor version ahead (e.g 1.2 > 1.1). | ||||
|  | ||||
|     version: a 2-tuple representing a Wheel-Version (Major, Minor) | ||||
|     name: name of wheel or package to raise exception about | ||||
|  | ||||
|     :raises UnsupportedWheel: when an incompatible Wheel-Version is given | ||||
|     """ | ||||
|     if version[0] > VERSION_COMPATIBLE[0]: | ||||
|         raise UnsupportedWheel( | ||||
|             "{}'s Wheel-Version ({}) is not compatible with this version " | ||||
|             "of pip".format(name, ".".join(map(str, version))) | ||||
|         ) | ||||
|     elif version > VERSION_COMPATIBLE: | ||||
|         logger.warning( | ||||
|             "Installing from a newer Wheel-Version (%s)", | ||||
|             ".".join(map(str, version)), | ||||
|         ) | ||||
		Reference in New Issue
	
	Block a user