temp_dir.py 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284
  1. from __future__ import absolute_import
  2. import errno
  3. import itertools
  4. import logging
  5. import os.path
  6. import tempfile
  7. from contextlib import contextmanager
  8. from pip._vendor.contextlib2 import ExitStack
  9. from pip._vendor.six import ensure_text
  10. from pip._internal.utils.compat import WINDOWS
  11. from pip._internal.utils.misc import enum, rmtree
  12. from pip._internal.utils.typing import MYPY_CHECK_RUNNING
  13. if MYPY_CHECK_RUNNING:
  14. from typing import Any, Dict, Iterator, Optional, TypeVar, Union
  15. _T = TypeVar('_T', bound='TempDirectory')
  16. logger = logging.getLogger(__name__)
  17. # Kinds of temporary directories. Only needed for ones that are
  18. # globally-managed.
  19. tempdir_kinds = enum(
  20. BUILD_ENV="build-env",
  21. EPHEM_WHEEL_CACHE="ephem-wheel-cache",
  22. REQ_BUILD="req-build",
  23. )
  24. _tempdir_manager = None # type: Optional[ExitStack]
  25. @contextmanager
  26. def global_tempdir_manager():
  27. # type: () -> Iterator[None]
  28. global _tempdir_manager
  29. with ExitStack() as stack:
  30. old_tempdir_manager, _tempdir_manager = _tempdir_manager, stack
  31. try:
  32. yield
  33. finally:
  34. _tempdir_manager = old_tempdir_manager
  35. class TempDirectoryTypeRegistry(object):
  36. """Manages temp directory behavior
  37. """
  38. def __init__(self):
  39. # type: () -> None
  40. self._should_delete = {} # type: Dict[str, bool]
  41. def set_delete(self, kind, value):
  42. # type: (str, bool) -> None
  43. """Indicate whether a TempDirectory of the given kind should be
  44. auto-deleted.
  45. """
  46. self._should_delete[kind] = value
  47. def get_delete(self, kind):
  48. # type: (str) -> bool
  49. """Get configured auto-delete flag for a given TempDirectory type,
  50. default True.
  51. """
  52. return self._should_delete.get(kind, True)
  53. _tempdir_registry = None # type: Optional[TempDirectoryTypeRegistry]
  54. @contextmanager
  55. def tempdir_registry():
  56. # type: () -> Iterator[TempDirectoryTypeRegistry]
  57. """Provides a scoped global tempdir registry that can be used to dictate
  58. whether directories should be deleted.
  59. """
  60. global _tempdir_registry
  61. old_tempdir_registry = _tempdir_registry
  62. _tempdir_registry = TempDirectoryTypeRegistry()
  63. try:
  64. yield _tempdir_registry
  65. finally:
  66. _tempdir_registry = old_tempdir_registry
  67. class _Default(object):
  68. pass
  69. _default = _Default()
  70. class TempDirectory(object):
  71. """Helper class that owns and cleans up a temporary directory.
  72. This class can be used as a context manager or as an OO representation of a
  73. temporary directory.
  74. Attributes:
  75. path
  76. Location to the created temporary directory
  77. delete
  78. Whether the directory should be deleted when exiting
  79. (when used as a contextmanager)
  80. Methods:
  81. cleanup()
  82. Deletes the temporary directory
  83. When used as a context manager, if the delete attribute is True, on
  84. exiting the context the temporary directory is deleted.
  85. """
  86. def __init__(
  87. self,
  88. path=None, # type: Optional[str]
  89. delete=_default, # type: Union[bool, None, _Default]
  90. kind="temp", # type: str
  91. globally_managed=False, # type: bool
  92. ):
  93. super(TempDirectory, self).__init__()
  94. if delete is _default:
  95. if path is not None:
  96. # If we were given an explicit directory, resolve delete option
  97. # now.
  98. delete = False
  99. else:
  100. # Otherwise, we wait until cleanup and see what
  101. # tempdir_registry says.
  102. delete = None
  103. # The only time we specify path is in for editables where it
  104. # is the value of the --src option.
  105. if path is None:
  106. path = self._create(kind)
  107. self._path = path
  108. self._deleted = False
  109. self.delete = delete
  110. self.kind = kind
  111. if globally_managed:
  112. assert _tempdir_manager is not None
  113. _tempdir_manager.enter_context(self)
  114. @property
  115. def path(self):
  116. # type: () -> str
  117. assert not self._deleted, (
  118. "Attempted to access deleted path: {}".format(self._path)
  119. )
  120. return self._path
  121. def __repr__(self):
  122. # type: () -> str
  123. return "<{} {!r}>".format(self.__class__.__name__, self.path)
  124. def __enter__(self):
  125. # type: (_T) -> _T
  126. return self
  127. def __exit__(self, exc, value, tb):
  128. # type: (Any, Any, Any) -> None
  129. if self.delete is not None:
  130. delete = self.delete
  131. elif _tempdir_registry:
  132. delete = _tempdir_registry.get_delete(self.kind)
  133. else:
  134. delete = True
  135. if delete:
  136. self.cleanup()
  137. def _create(self, kind):
  138. # type: (str) -> str
  139. """Create a temporary directory and store its path in self.path
  140. """
  141. # We realpath here because some systems have their default tmpdir
  142. # symlinked to another directory. This tends to confuse build
  143. # scripts, so we canonicalize the path by traversing potential
  144. # symlinks here.
  145. path = os.path.realpath(
  146. tempfile.mkdtemp(prefix="pip-{}-".format(kind))
  147. )
  148. logger.debug("Created temporary directory: %s", path)
  149. return path
  150. def cleanup(self):
  151. # type: () -> None
  152. """Remove the temporary directory created and reset state
  153. """
  154. self._deleted = True
  155. if not os.path.exists(self._path):
  156. return
  157. # Make sure to pass unicode on Python 2 to make the contents also
  158. # use unicode, ensuring non-ASCII names and can be represented.
  159. # This is only done on Windows because POSIX platforms use bytes
  160. # natively for paths, and the bytes-text conversion omission avoids
  161. # errors caused by the environment configuring encodings incorrectly.
  162. if WINDOWS:
  163. rmtree(ensure_text(self._path))
  164. else:
  165. rmtree(self._path)
  166. class AdjacentTempDirectory(TempDirectory):
  167. """Helper class that creates a temporary directory adjacent to a real one.
  168. Attributes:
  169. original
  170. The original directory to create a temp directory for.
  171. path
  172. After calling create() or entering, contains the full
  173. path to the temporary directory.
  174. delete
  175. Whether the directory should be deleted when exiting
  176. (when used as a contextmanager)
  177. """
  178. # The characters that may be used to name the temp directory
  179. # We always prepend a ~ and then rotate through these until
  180. # a usable name is found.
  181. # pkg_resources raises a different error for .dist-info folder
  182. # with leading '-' and invalid metadata
  183. LEADING_CHARS = "-~.=%0123456789"
  184. def __init__(self, original, delete=None):
  185. # type: (str, Optional[bool]) -> None
  186. self.original = original.rstrip('/\\')
  187. super(AdjacentTempDirectory, self).__init__(delete=delete)
  188. @classmethod
  189. def _generate_names(cls, name):
  190. # type: (str) -> Iterator[str]
  191. """Generates a series of temporary names.
  192. The algorithm replaces the leading characters in the name
  193. with ones that are valid filesystem characters, but are not
  194. valid package names (for both Python and pip definitions of
  195. package).
  196. """
  197. for i in range(1, len(name)):
  198. for candidate in itertools.combinations_with_replacement(
  199. cls.LEADING_CHARS, i - 1):
  200. new_name = '~' + ''.join(candidate) + name[i:]
  201. if new_name != name:
  202. yield new_name
  203. # If we make it this far, we will have to make a longer name
  204. for i in range(len(cls.LEADING_CHARS)):
  205. for candidate in itertools.combinations_with_replacement(
  206. cls.LEADING_CHARS, i):
  207. new_name = '~' + ''.join(candidate) + name
  208. if new_name != name:
  209. yield new_name
  210. def _create(self, kind):
  211. # type: (str) -> str
  212. root, name = os.path.split(self.original)
  213. for candidate in self._generate_names(name):
  214. path = os.path.join(root, candidate)
  215. try:
  216. os.mkdir(path)
  217. except OSError as ex:
  218. # Continue if the name exists already
  219. if ex.errno != errno.EEXIST:
  220. raise
  221. else:
  222. path = os.path.realpath(path)
  223. break
  224. else:
  225. # Final fallback on the default behavior.
  226. path = os.path.realpath(
  227. tempfile.mkdtemp(prefix="pip-{}-".format(kind))
  228. )
  229. logger.debug("Created temporary directory: %s", path)
  230. return path