synthesize a traceback to get a normal exc_info from an exception

- as per https://docs.python.org/3.12/whatsnew/3.12.html#shutil, we must expect only an exception
  and *not* the full exc_info from the new onexc function (the documentation of this is very
  misleading and still uses the label "excinfo":
  https://docs.python.org/3.12/library/shutil.html#shutil.rmtree)
This commit is contained in:
Danny McClanahan 2023-07-28 01:00:06 -04:00
parent d0641740d5
commit eeb3d8fdff
No known key found for this signature in database
GPG Key ID: CE8D0DA71DEFC1DF
2 changed files with 33 additions and 6 deletions

View File

@ -1,5 +1,6 @@
import contextlib
import errno
import functools
import getpass
import hashlib
import io
@ -14,7 +15,8 @@ import urllib.parse
from functools import partial
from io import StringIO
from itertools import filterfalse, tee, zip_longest
from types import TracebackType
from pathlib import Path
from types import FunctionType, TracebackType
from typing import (
Any,
BinaryIO,
@ -67,6 +69,8 @@ T = TypeVar("T")
ExcInfo = Tuple[Type[BaseException], BaseException, TracebackType]
VersionInfo = Tuple[int, int, int]
NetlocTuple = Tuple[str, Tuple[Optional[str], Optional[str]]]
OnExc = Callable[[FunctionType, Path, BaseException], Any]
OnErr = Callable[[FunctionType, Path, ExcInfo], Any]
def get_pip_version() -> str:
@ -121,22 +125,44 @@ def get_prog() -> str:
return "pip"
def bare_exc_to_onexc(exc_val: BaseException) -> ExcInfo:
exc_ty = type(exc_val)
tb = exc_val.__traceback__
if tb is None:
import inspect
frame = inspect.currentframe()
assert frame is not None
tb = TracebackType(None, frame, frame.f_lasti, frame.f_lineno)
return (exc_ty, exc_val, tb)
def extract_exc_info_arg(f: OnErr) -> OnExc:
def g(fn: FunctionType, p: Path, e: BaseException) -> Any:
info = bare_exc_to_onexc(e)
return f(fn, p, info)
return functools.update_wrapper(g, f)
# Retry every half second for up to 3 seconds
# Tenacity raises RetryError by default, explicitly raise the original exception
@retry(reraise=True, stop=stop_after_delay(3), wait=wait_fixed(0.5))
def rmtree(
dir: str,
ignore_errors: bool = False,
onexc: Optional[Callable[[Any, Any, Any], Any]] = None,
onexc: Optional[OnErr] = None,
) -> None:
if ignore_errors:
onexc = _onerror_ignore
elif onexc is None:
if onexc is None:
onexc = _onerror_reraise
handler: OnErr = partial(rmtree_errorhandler, onexc=onexc)
if sys.version_info >= (3, 12):
shutil.rmtree(dir, onexc=partial(rmtree_errorhandler, onexc=onexc))
exc_handler = extract_exc_info_arg(handler)
shutil.rmtree(dir, onexc=exc_handler)
else:
shutil.rmtree(dir, onerror=partial(rmtree_errorhandler, onexc=onexc))
shutil.rmtree(dir, onerror=handler)
def _onerror_ignore(*_args: Any) -> None:

View File

@ -5,6 +5,7 @@ import os.path
import tempfile
import traceback
from contextlib import ExitStack, contextmanager
from pathlib import Path
from typing import (
Any,
Callable,
@ -189,7 +190,7 @@ class TempDirectory:
def onerror(
func: Callable[[str], Any],
path: str,
path: Path,
exc_info: Tuple[Type[BaseException], BaseException, Any],
) -> None:
"""Log a warning for a `rmtree` error and continue"""