mirror of https://github.com/pypa/pip
AdjacentTempDirectory should fail on unwritable directory (#6215)
Based on #6225
This commit is contained in:
commit
7eb79b13d0
|
@ -32,9 +32,13 @@ pip-wheel-metadata
|
||||||
# Misc
|
# Misc
|
||||||
*~
|
*~
|
||||||
.*.sw?
|
.*.sw?
|
||||||
|
.env/
|
||||||
|
|
||||||
# For IntelliJ IDEs (basically PyCharm)
|
# For IntelliJ IDEs (basically PyCharm)
|
||||||
.idea/
|
.idea/
|
||||||
|
|
||||||
|
# For Visual Studio Code
|
||||||
|
.vscode/
|
||||||
|
|
||||||
# Scratch Pad for experiments
|
# Scratch Pad for experiments
|
||||||
.scratch/
|
.scratch/
|
||||||
|
|
|
@ -0,0 +1 @@
|
||||||
|
``AdjacentTempDirectory`` fails on unwritable directory instead of locking up the uninstall command.
|
|
@ -1 +1 @@
|
||||||
Make failed uninstalls roll back more reliably and better at avoiding naming conflicts.
|
Make failed uninstalls roll back more reliably and better at avoiding naming conflicts.
|
||||||
|
|
|
@ -17,7 +17,7 @@ from pip._internal.utils.misc import (
|
||||||
FakeFile, ask, dist_in_usersite, dist_is_local, egg_link_path, is_local,
|
FakeFile, ask, dist_in_usersite, dist_is_local, egg_link_path, is_local,
|
||||||
normalize_path, renames, rmtree,
|
normalize_path, renames, rmtree,
|
||||||
)
|
)
|
||||||
from pip._internal.utils.temp_dir import AdjacentTempDirectory
|
from pip._internal.utils.temp_dir import AdjacentTempDirectory, TempDirectory
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
@ -127,7 +127,7 @@ def compress_for_rename(paths):
|
||||||
# If all the files we found are in our remaining set of files to
|
# If all the files we found are in our remaining set of files to
|
||||||
# remove, then remove them from the latter set and add a wildcard
|
# remove, then remove them from the latter set and add a wildcard
|
||||||
# for the directory.
|
# for the directory.
|
||||||
if len(all_files - remaining) == 0:
|
if not (all_files - remaining):
|
||||||
remaining.difference_update(all_files)
|
remaining.difference_update(all_files)
|
||||||
wildcards.add(root + os.sep)
|
wildcards.add(root + os.sep)
|
||||||
|
|
||||||
|
@ -183,6 +183,111 @@ def compress_for_output_listing(paths):
|
||||||
return will_remove, will_skip
|
return will_remove, will_skip
|
||||||
|
|
||||||
|
|
||||||
|
class StashedUninstallPathSet(object):
|
||||||
|
"""A set of file rename operations to stash files while
|
||||||
|
tentatively uninstalling them."""
|
||||||
|
def __init__(self):
|
||||||
|
# Mapping from source file root to [Adjacent]TempDirectory
|
||||||
|
# for files under that directory.
|
||||||
|
self._save_dirs = {}
|
||||||
|
# (old path, new path) tuples for each move that may need
|
||||||
|
# to be undone.
|
||||||
|
self._moves = []
|
||||||
|
|
||||||
|
def _get_directory_stash(self, path):
|
||||||
|
"""Stashes a directory.
|
||||||
|
|
||||||
|
Directories are stashed adjacent to their original location if
|
||||||
|
possible, or else moved/copied into the user's temp dir."""
|
||||||
|
|
||||||
|
try:
|
||||||
|
save_dir = AdjacentTempDirectory(path)
|
||||||
|
save_dir.create()
|
||||||
|
except OSError:
|
||||||
|
save_dir = TempDirectory(kind="uninstall")
|
||||||
|
save_dir.create()
|
||||||
|
self._save_dirs[os.path.normcase(path)] = save_dir
|
||||||
|
|
||||||
|
return save_dir.path
|
||||||
|
|
||||||
|
def _get_file_stash(self, path):
|
||||||
|
"""Stashes a file.
|
||||||
|
|
||||||
|
If no root has been provided, one will be created for the directory
|
||||||
|
in the user's temp directory."""
|
||||||
|
path = os.path.normcase(path)
|
||||||
|
head, old_head = os.path.dirname(path), None
|
||||||
|
save_dir = None
|
||||||
|
|
||||||
|
while head != old_head:
|
||||||
|
try:
|
||||||
|
save_dir = self._save_dirs[head]
|
||||||
|
break
|
||||||
|
except KeyError:
|
||||||
|
pass
|
||||||
|
head, old_head = os.path.dirname(head), head
|
||||||
|
else:
|
||||||
|
# Did not find any suitable root
|
||||||
|
head = os.path.dirname(path)
|
||||||
|
save_dir = TempDirectory(kind='uninstall')
|
||||||
|
save_dir.create()
|
||||||
|
self._save_dirs[head] = save_dir
|
||||||
|
|
||||||
|
relpath = os.path.relpath(path, head)
|
||||||
|
if relpath and relpath != os.path.curdir:
|
||||||
|
return os.path.join(save_dir.path, relpath)
|
||||||
|
return save_dir.path
|
||||||
|
|
||||||
|
def stash(self, path):
|
||||||
|
"""Stashes the directory or file and returns its new location.
|
||||||
|
"""
|
||||||
|
if os.path.isdir(path):
|
||||||
|
new_path = self._get_directory_stash(path)
|
||||||
|
else:
|
||||||
|
new_path = self._get_file_stash(path)
|
||||||
|
|
||||||
|
self._moves.append((path, new_path))
|
||||||
|
if os.path.isdir(path) and os.path.isdir(new_path):
|
||||||
|
# If we're moving a directory, we need to
|
||||||
|
# remove the destination first or else it will be
|
||||||
|
# moved to inside the existing directory.
|
||||||
|
# We just created new_path ourselves, so it will
|
||||||
|
# be removable.
|
||||||
|
os.rmdir(new_path)
|
||||||
|
renames(path, new_path)
|
||||||
|
return new_path
|
||||||
|
|
||||||
|
def commit(self):
|
||||||
|
"""Commits the uninstall by removing stashed files."""
|
||||||
|
for _, save_dir in self._save_dirs.items():
|
||||||
|
save_dir.cleanup()
|
||||||
|
self._moves = []
|
||||||
|
self._save_dirs = {}
|
||||||
|
|
||||||
|
def rollback(self):
|
||||||
|
"""Undoes the uninstall by moving stashed files back."""
|
||||||
|
for p in self._moves:
|
||||||
|
logging.info("Moving to %s\n from %s", *p)
|
||||||
|
|
||||||
|
for new_path, path in self._moves:
|
||||||
|
try:
|
||||||
|
logger.debug('Replacing %s from %s', new_path, path)
|
||||||
|
if os.path.isfile(new_path):
|
||||||
|
os.unlink(new_path)
|
||||||
|
elif os.path.isdir(new_path):
|
||||||
|
rmtree(new_path)
|
||||||
|
renames(path, new_path)
|
||||||
|
except OSError as ex:
|
||||||
|
logger.error("Failed to restore %s", new_path)
|
||||||
|
logger.debug("Exception: %s", ex)
|
||||||
|
|
||||||
|
self.commit()
|
||||||
|
|
||||||
|
@property
|
||||||
|
def can_rollback(self):
|
||||||
|
return bool(self._moves)
|
||||||
|
|
||||||
|
|
||||||
class UninstallPathSet(object):
|
class UninstallPathSet(object):
|
||||||
"""A set of file paths to be removed in the uninstallation of a
|
"""A set of file paths to be removed in the uninstallation of a
|
||||||
requirement."""
|
requirement."""
|
||||||
|
@ -191,8 +296,7 @@ class UninstallPathSet(object):
|
||||||
self._refuse = set()
|
self._refuse = set()
|
||||||
self.pth = {}
|
self.pth = {}
|
||||||
self.dist = dist
|
self.dist = dist
|
||||||
self._save_dirs = []
|
self._moved_paths = StashedUninstallPathSet()
|
||||||
self._moved_paths = []
|
|
||||||
|
|
||||||
def _permitted(self, path):
|
def _permitted(self, path):
|
||||||
"""
|
"""
|
||||||
|
@ -230,22 +334,6 @@ class UninstallPathSet(object):
|
||||||
else:
|
else:
|
||||||
self._refuse.add(pth_file)
|
self._refuse.add(pth_file)
|
||||||
|
|
||||||
def _stash(self, path):
|
|
||||||
best = None
|
|
||||||
for save_dir in self._save_dirs:
|
|
||||||
if not path.startswith(save_dir.original + os.sep):
|
|
||||||
continue
|
|
||||||
if not best or len(save_dir.original) > len(best.original):
|
|
||||||
best = save_dir
|
|
||||||
if best is None:
|
|
||||||
best = AdjacentTempDirectory(os.path.dirname(path))
|
|
||||||
best.create()
|
|
||||||
self._save_dirs.append(best)
|
|
||||||
relpath = os.path.relpath(path, best.original)
|
|
||||||
if not relpath or relpath == os.path.curdir:
|
|
||||||
return best.path
|
|
||||||
return os.path.join(best.path, relpath)
|
|
||||||
|
|
||||||
def remove(self, auto_confirm=False, verbose=False):
|
def remove(self, auto_confirm=False, verbose=False):
|
||||||
"""Remove paths in ``self.paths`` with confirmation (unless
|
"""Remove paths in ``self.paths`` with confirmation (unless
|
||||||
``auto_confirm`` is True)."""
|
``auto_confirm`` is True)."""
|
||||||
|
@ -264,18 +352,14 @@ class UninstallPathSet(object):
|
||||||
|
|
||||||
with indent_log():
|
with indent_log():
|
||||||
if auto_confirm or self._allowed_to_proceed(verbose):
|
if auto_confirm or self._allowed_to_proceed(verbose):
|
||||||
for path in sorted(compact(compress_for_rename(self.paths))):
|
moved = self._moved_paths
|
||||||
new_path = self._stash(path)
|
|
||||||
|
for_rename = compress_for_rename(self.paths)
|
||||||
|
|
||||||
|
for path in sorted(compact(for_rename)):
|
||||||
|
moved.stash(path)
|
||||||
logger.debug('Removing file or directory %s', path)
|
logger.debug('Removing file or directory %s', path)
|
||||||
self._moved_paths.append((path, new_path))
|
|
||||||
if os.path.isdir(path) and os.path.isdir(new_path):
|
|
||||||
# If we're moving a directory, we need to
|
|
||||||
# remove the destination first or else it will be
|
|
||||||
# moved to inside the existing directory.
|
|
||||||
# We just created new_path ourselves, so it will
|
|
||||||
# be removable.
|
|
||||||
os.rmdir(new_path)
|
|
||||||
renames(path, new_path)
|
|
||||||
for pth in self.pth.values():
|
for pth in self.pth.values():
|
||||||
pth.remove()
|
pth.remove()
|
||||||
|
|
||||||
|
@ -312,28 +396,20 @@ class UninstallPathSet(object):
|
||||||
|
|
||||||
def rollback(self):
|
def rollback(self):
|
||||||
"""Rollback the changes previously made by remove()."""
|
"""Rollback the changes previously made by remove()."""
|
||||||
if not self._save_dirs:
|
if not self._moved_paths.can_rollback:
|
||||||
logger.error(
|
logger.error(
|
||||||
"Can't roll back %s; was not uninstalled",
|
"Can't roll back %s; was not uninstalled",
|
||||||
self.dist.project_name,
|
self.dist.project_name,
|
||||||
)
|
)
|
||||||
return False
|
return False
|
||||||
logger.info('Rolling back uninstall of %s', self.dist.project_name)
|
logger.info('Rolling back uninstall of %s', self.dist.project_name)
|
||||||
for path, tmp_path in self._moved_paths:
|
self._moved_paths.rollback()
|
||||||
logger.debug('Replacing %s', path)
|
|
||||||
if os.path.isdir(tmp_path) and os.path.isdir(path):
|
|
||||||
rmtree(path)
|
|
||||||
renames(tmp_path, path)
|
|
||||||
for pth in self.pth.values():
|
for pth in self.pth.values():
|
||||||
pth.rollback()
|
pth.rollback()
|
||||||
for save_dir in self._save_dirs:
|
|
||||||
save_dir.cleanup()
|
|
||||||
|
|
||||||
def commit(self):
|
def commit(self):
|
||||||
"""Remove temporary save dir: rollback will no longer be possible."""
|
"""Remove temporary save dir: rollback will no longer be possible."""
|
||||||
for save_dir in self._save_dirs:
|
self._moved_paths.commit()
|
||||||
save_dir.cleanup()
|
|
||||||
self._moved_paths = []
|
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def from_dist(cls, dist):
|
def from_dist(cls, dist):
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
from __future__ import absolute_import
|
from __future__ import absolute_import
|
||||||
|
|
||||||
|
import errno
|
||||||
import itertools
|
import itertools
|
||||||
import logging
|
import logging
|
||||||
import os.path
|
import os.path
|
||||||
|
@ -118,22 +119,30 @@ class AdjacentTempDirectory(TempDirectory):
|
||||||
package).
|
package).
|
||||||
"""
|
"""
|
||||||
for i in range(1, len(name)):
|
for i in range(1, len(name)):
|
||||||
if name[i] in cls.LEADING_CHARS:
|
|
||||||
continue
|
|
||||||
for candidate in itertools.combinations_with_replacement(
|
for candidate in itertools.combinations_with_replacement(
|
||||||
cls.LEADING_CHARS, i - 1):
|
cls.LEADING_CHARS, i - 1):
|
||||||
new_name = '~' + ''.join(candidate) + name[i:]
|
new_name = '~' + ''.join(candidate) + name[i:]
|
||||||
if new_name != name:
|
if new_name != name:
|
||||||
yield new_name
|
yield new_name
|
||||||
|
|
||||||
|
# If we make it this far, we will have to make a longer name
|
||||||
|
for i in range(len(cls.LEADING_CHARS)):
|
||||||
|
for candidate in itertools.combinations_with_replacement(
|
||||||
|
cls.LEADING_CHARS, i):
|
||||||
|
new_name = '~' + ''.join(candidate) + name
|
||||||
|
if new_name != name:
|
||||||
|
yield new_name
|
||||||
|
|
||||||
def create(self):
|
def create(self):
|
||||||
root, name = os.path.split(self.original)
|
root, name = os.path.split(self.original)
|
||||||
for candidate in self._generate_names(name):
|
for candidate in self._generate_names(name):
|
||||||
path = os.path.join(root, candidate)
|
path = os.path.join(root, candidate)
|
||||||
try:
|
try:
|
||||||
os.mkdir(path)
|
os.mkdir(path)
|
||||||
except OSError:
|
except OSError as ex:
|
||||||
pass
|
# Continue if the name exists already
|
||||||
|
if ex.errno != errno.EEXIST:
|
||||||
|
raise
|
||||||
else:
|
else:
|
||||||
self.path = os.path.realpath(path)
|
self.path = os.path.realpath(path)
|
||||||
break
|
break
|
||||||
|
|
|
@ -5,8 +5,8 @@ from mock import Mock
|
||||||
|
|
||||||
import pip._internal.req.req_uninstall
|
import pip._internal.req.req_uninstall
|
||||||
from pip._internal.req.req_uninstall import (
|
from pip._internal.req.req_uninstall import (
|
||||||
UninstallPathSet, compact, compress_for_output_listing,
|
StashedUninstallPathSet, UninstallPathSet, compact,
|
||||||
compress_for_rename, uninstallation_paths,
|
compress_for_output_listing, compress_for_rename, uninstallation_paths,
|
||||||
)
|
)
|
||||||
from tests.lib import create_file
|
from tests.lib import create_file
|
||||||
|
|
||||||
|
@ -175,3 +175,96 @@ class TestUninstallPathSet(object):
|
||||||
ups.add(path1)
|
ups.add(path1)
|
||||||
ups.add(path2)
|
ups.add(path2)
|
||||||
assert ups.paths == {path1}
|
assert ups.paths == {path1}
|
||||||
|
|
||||||
|
|
||||||
|
class TestStashedUninstallPathSet(object):
|
||||||
|
WALK_RESULT = [
|
||||||
|
("A", ["B", "C"], ["a.py"]),
|
||||||
|
("A/B", ["D"], ["b.py"]),
|
||||||
|
("A/B/D", [], ["c.py"]),
|
||||||
|
("A/C", [], ["d.py", "e.py"]),
|
||||||
|
("A/E", ["F"], ["f.py"]),
|
||||||
|
("A/E/F", [], []),
|
||||||
|
("A/G", ["H"], ["g.py"]),
|
||||||
|
("A/G/H", [], ["h.py"]),
|
||||||
|
]
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def mock_walk(cls, root):
|
||||||
|
for dirname, subdirs, files in cls.WALK_RESULT:
|
||||||
|
dirname = os.path.sep.join(dirname.split("/"))
|
||||||
|
if dirname.startswith(root):
|
||||||
|
yield dirname[len(root) + 1:], subdirs, files
|
||||||
|
|
||||||
|
def test_compress_for_rename(self, monkeypatch):
|
||||||
|
paths = [os.path.sep.join(p.split("/")) for p in [
|
||||||
|
"A/B/b.py",
|
||||||
|
"A/B/D/c.py",
|
||||||
|
"A/C/d.py",
|
||||||
|
"A/E/f.py",
|
||||||
|
"A/G/g.py",
|
||||||
|
]]
|
||||||
|
|
||||||
|
expected_paths = [os.path.sep.join(p.split("/")) for p in [
|
||||||
|
"A/B/", # selected everything below A/B
|
||||||
|
"A/C/d.py", # did not select everything below A/C
|
||||||
|
"A/E/", # only empty folders remain under A/E
|
||||||
|
"A/G/g.py", # non-empty folder remains under A/G
|
||||||
|
]]
|
||||||
|
|
||||||
|
monkeypatch.setattr('os.walk', self.mock_walk)
|
||||||
|
|
||||||
|
actual_paths = compress_for_rename(paths)
|
||||||
|
assert set(expected_paths) == set(actual_paths)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def make_stash(cls, tmpdir, paths):
|
||||||
|
for dirname, subdirs, files in cls.WALK_RESULT:
|
||||||
|
root = os.path.join(tmpdir, *dirname.split("/"))
|
||||||
|
if not os.path.exists(root):
|
||||||
|
os.mkdir(root)
|
||||||
|
for d in subdirs:
|
||||||
|
os.mkdir(os.path.join(root, d))
|
||||||
|
for f in files:
|
||||||
|
with open(os.path.join(root, f), "wb"):
|
||||||
|
pass
|
||||||
|
|
||||||
|
pathset = StashedUninstallPathSet()
|
||||||
|
|
||||||
|
paths = [os.path.join(tmpdir, *p.split('/')) for p in paths]
|
||||||
|
stashed_paths = [(p, pathset.stash(p)) for p in paths]
|
||||||
|
|
||||||
|
return pathset, stashed_paths
|
||||||
|
|
||||||
|
def test_stash(self, tmpdir):
|
||||||
|
pathset, stashed_paths = self.make_stash(tmpdir, [
|
||||||
|
"A/B/", "A/C/d.py", "A/E/", "A/G/g.py",
|
||||||
|
])
|
||||||
|
|
||||||
|
for old_path, new_path in stashed_paths:
|
||||||
|
assert not os.path.exists(old_path)
|
||||||
|
assert os.path.exists(new_path)
|
||||||
|
|
||||||
|
assert stashed_paths == pathset._moves
|
||||||
|
|
||||||
|
def test_commit(self, tmpdir):
|
||||||
|
pathset, stashed_paths = self.make_stash(tmpdir, [
|
||||||
|
"A/B/", "A/C/d.py", "A/E/", "A/G/g.py",
|
||||||
|
])
|
||||||
|
|
||||||
|
pathset.commit()
|
||||||
|
|
||||||
|
for old_path, new_path in stashed_paths:
|
||||||
|
assert not os.path.exists(old_path)
|
||||||
|
assert not os.path.exists(new_path)
|
||||||
|
|
||||||
|
def test_rollback(self, tmpdir):
|
||||||
|
pathset, stashed_paths = self.make_stash(tmpdir, [
|
||||||
|
"A/B/", "A/C/d.py", "A/E/", "A/G/g.py",
|
||||||
|
])
|
||||||
|
|
||||||
|
pathset.rollback()
|
||||||
|
|
||||||
|
for old_path, new_path in stashed_paths:
|
||||||
|
assert os.path.exists(old_path)
|
||||||
|
assert not os.path.exists(new_path)
|
||||||
|
|
|
@ -553,6 +553,10 @@ class TestTempDirectory(object):
|
||||||
"ABC.dist-info",
|
"ABC.dist-info",
|
||||||
"_+-",
|
"_+-",
|
||||||
"_package",
|
"_package",
|
||||||
|
"A......B",
|
||||||
|
"AB",
|
||||||
|
"A",
|
||||||
|
"2",
|
||||||
])
|
])
|
||||||
def test_adjacent_directory_names(self, name):
|
def test_adjacent_directory_names(self, name):
|
||||||
def names():
|
def names():
|
||||||
|
@ -566,17 +570,82 @@ class TestTempDirectory(object):
|
||||||
# result that works, provided there are many of those
|
# result that works, provided there are many of those
|
||||||
# and that shorter names result in totally unique sets,
|
# and that shorter names result in totally unique sets,
|
||||||
# it's okay to skip part of the test.)
|
# it's okay to skip part of the test.)
|
||||||
some_names = list(itertools.islice(names(), 10000))
|
some_names = list(itertools.islice(names(), 1000))
|
||||||
assert len(some_names) == len(set(some_names))
|
# We should always get at least 1000 names
|
||||||
|
assert len(some_names) == 1000
|
||||||
|
|
||||||
# Ensure original name does not appear
|
# Ensure original name does not appear early in the set
|
||||||
assert not any(n == name for n in names())
|
assert name not in some_names
|
||||||
|
|
||||||
# Check the first group are correct
|
if len(name) > 2:
|
||||||
expected_names = ['~' + name[1:]]
|
# Names should be at least 90% unique (given the infinite
|
||||||
expected_names.extend('~' + c + name[2:] for c in chars)
|
# range of inputs, and the possibility that generated names
|
||||||
for x, y in zip(some_names, expected_names):
|
# may already exist on disk anyway, this is a much cheaper
|
||||||
assert x == y
|
# criteria to enforce than complete uniqueness).
|
||||||
|
assert len(some_names) > 0.9 * len(set(some_names))
|
||||||
|
|
||||||
|
# Ensure the first few names are the same length as the original
|
||||||
|
same_len = list(itertools.takewhile(
|
||||||
|
lambda x: len(x) == len(name),
|
||||||
|
some_names
|
||||||
|
))
|
||||||
|
assert len(same_len) > 10
|
||||||
|
|
||||||
|
# Check the first group are correct
|
||||||
|
expected_names = ['~' + name[1:]]
|
||||||
|
expected_names.extend('~' + c + name[2:] for c in chars)
|
||||||
|
for x, y in zip(some_names, expected_names):
|
||||||
|
assert x == y
|
||||||
|
|
||||||
|
else:
|
||||||
|
# All names are going to be longer than our original
|
||||||
|
assert min(len(x) for x in some_names) > 1
|
||||||
|
|
||||||
|
# All names are going to be unqiue
|
||||||
|
assert len(some_names) == len(set(some_names))
|
||||||
|
|
||||||
|
if len(name) == 2:
|
||||||
|
# All but the first name are going to end with our original
|
||||||
|
assert all(x.endswith(name) for x in some_names[1:])
|
||||||
|
else:
|
||||||
|
# All names are going to end with our original
|
||||||
|
assert all(x.endswith(name) for x in some_names)
|
||||||
|
|
||||||
|
@pytest.mark.parametrize("name", [
|
||||||
|
"A",
|
||||||
|
"ABC",
|
||||||
|
"ABC.dist-info",
|
||||||
|
"_+-",
|
||||||
|
"_package",
|
||||||
|
])
|
||||||
|
def test_adjacent_directory_exists(self, name, tmpdir):
|
||||||
|
block_name, expect_name = itertools.islice(
|
||||||
|
AdjacentTempDirectory._generate_names(name), 2)
|
||||||
|
|
||||||
|
original = os.path.join(tmpdir, name)
|
||||||
|
blocker = os.path.join(tmpdir, block_name)
|
||||||
|
|
||||||
|
ensure_dir(original)
|
||||||
|
ensure_dir(blocker)
|
||||||
|
|
||||||
|
with AdjacentTempDirectory(original) as atmp_dir:
|
||||||
|
assert expect_name == os.path.split(atmp_dir.path)[1]
|
||||||
|
|
||||||
|
def test_adjacent_directory_permission_error(self, monkeypatch):
|
||||||
|
name = "ABC"
|
||||||
|
|
||||||
|
def raising_mkdir(*args, **kwargs):
|
||||||
|
raise OSError("Unknown OSError")
|
||||||
|
|
||||||
|
with TempDirectory() as tmp_dir:
|
||||||
|
original = os.path.join(tmp_dir.path, name)
|
||||||
|
|
||||||
|
ensure_dir(original)
|
||||||
|
monkeypatch.setattr("os.mkdir", raising_mkdir)
|
||||||
|
|
||||||
|
with pytest.raises(OSError):
|
||||||
|
with AdjacentTempDirectory(original):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
class TestGlibc(object):
|
class TestGlibc(object):
|
||||||
|
|
Loading…
Reference in New Issue