Fixes #3055 Uninstall causes paths to exceed MAX_PATH limit

This commit is contained in:
Steve Dower 2018-11-21 13:14:36 -08:00
parent 9c0a1daace
commit c7ae06c798
2 changed files with 115 additions and 16 deletions

View File

@ -17,7 +17,7 @@ from pip._internal.utils.misc import (
FakeFile, ask, dist_in_usersite, dist_is_local, egg_link_path, is_local,
normalize_path, renames,
)
from pip._internal.utils.temp_dir import TempDirectory
from pip._internal.utils.temp_dir import TempDirectory, AdjacentTempDirectory
logger = logging.getLogger(__name__)
@ -86,16 +86,49 @@ def compact(paths):
sep = os.path.sep
short_paths = set()
for path in sorted(paths, key=len):
should_add = any(
should_skip = any(
path.startswith(shortpath.rstrip("*")) and
path[len(shortpath.rstrip("*").rstrip(sep))] == sep
for shortpath in short_paths
)
if not should_add:
if not should_skip:
short_paths.add(path)
return short_paths
def compress_for_rename(paths):
"""Returns a set containing the paths that need to be renamed.
This set may include directories when the original sequence of paths
included every file on disk.
"""
remaining = set(paths)
unchecked = sorted(set(os.path.split(p)[0] for p in remaining), key=len)
wildcards = set()
def norm_join(*a):
return os.path.normcase(os.path.join(*a))
for root in unchecked:
if any(root.startswith(w) for w in wildcards):
# This directory has already been handled.
continue
all_files = set()
all_subdirs = set()
for dirname, subdirs, files in os.walk(root):
all_subdirs.update(norm_join(root, dirname, d) for d in subdirs)
all_files.update(norm_join(root, dirname, f) for f in files)
# If all the files we found are in our remaining set of files to
# remove, then remove them from the latter set and add a wildcard
# for the directory.
if len(all_files - remaining) == 0:
remaining.difference_update(all_files)
wildcards.add(root + os.sep)
return remaining | wildcards
def compress_for_output_listing(paths):
"""Returns a tuple of 2 sets of which paths to display to user
@ -153,7 +186,7 @@ class UninstallPathSet(object):
self._refuse = set()
self.pth = {}
self.dist = dist
self.save_dir = TempDirectory(kind="uninstall")
self._save_dirs = []
self._moved_paths = []
def _permitted(self, path):
@ -193,9 +226,17 @@ class UninstallPathSet(object):
self._refuse.add(pth_file)
def _stash(self, path):
return os.path.join(
self.save_dir.path, os.path.splitdrive(path)[1].lstrip(os.path.sep)
)
best = None
for save_dir in self._save_dirs:
if not path.startswith(save_dir.original + os.sep):
continue
if not best or len(save_dir.original) > len(best.original):
best = save_dir
if best is None:
best = AdjacentTempDirectory(os.path.dirname(path))
best.create()
self._save_dirs.append(best)
return os.path.join(best.path, os.path.relpath(path, best.original))
def remove(self, auto_confirm=False, verbose=False):
"""Remove paths in ``self.paths`` with confirmation (unless
@ -215,12 +256,10 @@ class UninstallPathSet(object):
with indent_log():
if auto_confirm or self._allowed_to_proceed(verbose):
self.save_dir.create()
for path in sorted(compact(self.paths)):
for path in sorted(compact(compress_for_rename(self.paths))):
new_path = self._stash(path)
logger.debug('Removing file or directory %s', path)
self._moved_paths.append(path)
self._moved_paths.append((path, new_path))
renames(path, new_path)
for pth in self.pth.values():
pth.remove()
@ -251,20 +290,21 @@ class UninstallPathSet(object):
_display('Would remove:', will_remove)
_display('Would not remove (might be manually added):', will_skip)
_display('Would not remove (outside of prefix):', self._refuse)
if verbose:
_display('Will actually move:', compress_for_rename(self.paths))
return ask('Proceed (y/n)? ', ('y', 'n')) == 'y'
def rollback(self):
"""Rollback the changes previously made by remove()."""
if self.save_dir.path is None:
if not self._save_dirs:
logger.error(
"Can't roll back %s; was not uninstalled",
self.dist.project_name,
)
return False
logger.info('Rolling back uninstall of %s', self.dist.project_name)
for path in self._moved_paths:
tmp_path = self._stash(path)
for path, tmp_path in self._moved_paths:
logger.debug('Replacing %s', path)
renames(tmp_path, path)
for pth in self.pth.values():
@ -272,7 +312,8 @@ class UninstallPathSet(object):
def commit(self):
"""Remove temporary save dir: rollback will no longer be possible."""
self.save_dir.cleanup()
for save_dir in self._save_dirs:
save_dir.cleanup()
self._moved_paths = []
@classmethod

View File

@ -1,7 +1,9 @@
from __future__ import absolute_import
import itertools
import logging
import os.path
import shutil
import tempfile
from pip._internal.utils.misc import rmtree
@ -58,7 +60,7 @@ class TempDirectory(object):
self.cleanup()
def create(self):
"""Create a temporary directory and store it's path in self.path
"""Create a temporary directory and store its path in self.path
"""
if self.path is not None:
logger.debug(
@ -80,3 +82,59 @@ class TempDirectory(object):
if self.path is not None and os.path.exists(self.path):
rmtree(self.path)
self.path = None
class AdjacentTempDirectory(TempDirectory):
"""Helper class that creates a temporary directory adjacent to a real one.
Attributes:
original
The original directory to create a temp directory for.
path
After calling create() or entering, contains the full
path to the temporary directory.
delete
Whether the directory should be deleted when exiting
(when used as a contextmanager)
"""
# The characters that may be used to name the temp directory
LEADING_CHARS = "-~.+=%0123456789"
def __init__(self, original, delete=None):
super(AdjacentTempDirectory, self).__init__(delete=delete)
self.original = original.rstrip('/\\')
@classmethod
def _generate_names(cls, name):
"""Generates a series of temporary names.
The algorithm replaces the leading characters in the name
with ones that are valid filesystem characters, but are not
valid package names (for both Python and pip definitions of
package).
"""
for i in range(1, len(name)):
for candidate in itertools.permutations(cls.LEADING_CHARS, i):
yield ''.join(candidate) + name[i:]
def create(self):
root, name = os.path.split(self.original)
os.makedirs(root, exist_ok=True)
for candidate in self._generate_names(name):
path = os.path.join(root, candidate)
try:
os.mkdir(path)
except OSError:
pass
else:
self.path = os.path.realpath(path)
break
if not self.path:
# Final fallback on the default behavior.
self.path = os.path.realpath(
tempfile.mkdtemp(prefix="pip-{}-".format(self.kind))
)
logger.debug("Created temporary directory: {}".format(self.path))