server: Enable and fix mypy for all chia.server files (#13990)

Co-authored-by: Kyle Altendorf <sda@fstab.net>
This commit is contained in:
dustinface 2022-11-29 01:00:45 +01:00 committed by GitHub
parent 23ac4f7c80
commit 898543b874
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
18 changed files with 136 additions and 111 deletions

View file

@ -69,7 +69,7 @@ class ExtendedPeerInfo:
return out
@classmethod
def from_string(cls, peer_str: str):
def from_string(cls, peer_str: str) -> ExtendedPeerInfo:
blobs = peer_str.split(" ")
assert len(blobs) == 5
peer_info = TimestampedPeerInfo(blobs[0], uint16(int(blobs[1])), uint64(int(blobs[2])))
@ -143,7 +143,7 @@ class ExtendedPeerInfo:
return False
def get_selection_chance(self, now: Optional[int] = None):
def get_selection_chance(self, now: Optional[int] = None) -> float:
if now is None:
now = int(math.floor(time.time()))
chance = 1.0
@ -566,7 +566,7 @@ class AddressManager:
return addr
def cleanup(self, max_timestamp_difference: int, max_consecutive_failures: int):
def cleanup(self, max_timestamp_difference: int, max_consecutive_failures: int) -> None:
now = int(math.floor(time.time()))
for bucket in range(NEW_BUCKET_COUNT):
for pos in range(BUCKET_SIZE):
@ -579,7 +579,7 @@ class AddressManager:
):
self.clear_new_(bucket, pos)
def connect_(self, addr: PeerInfo, timestamp: int):
def connect_(self, addr: PeerInfo, timestamp: int) -> None:
info, _ = self.find_(addr)
if info is None:
return None
@ -615,7 +615,7 @@ class AddressManager:
addr: PeerInfo,
test_before_evict: bool = True,
timestamp: int = -1,
):
) -> None:
if timestamp == -1:
timestamp = math.floor(time.time())
async with self.lock:
@ -627,14 +627,14 @@ class AddressManager:
addr: PeerInfo,
count_failures: bool,
timestamp: int = -1,
):
) -> None:
if timestamp == -1:
timestamp = math.floor(time.time())
async with self.lock:
self.attempt_(addr, count_failures, timestamp)
# See if any to-be-evicted tried table entries have been tested and if so resolve the collisions.
async def resolve_tried_collisions(self):
async def resolve_tried_collisions(self) -> None:
async with self.lock:
self.resolve_tried_collisions_()
@ -653,8 +653,8 @@ class AddressManager:
async with self.lock:
return self.get_peers_()
async def connect(self, addr: PeerInfo, timestamp: int = -1):
async def connect(self, addr: PeerInfo, timestamp: int = -1) -> None:
if timestamp == -1:
timestamp = math.floor(time.time())
async with self.lock:
return self.connect_(addr, timestamp)
self.connect_(addr, timestamp)

View file

@ -30,10 +30,10 @@ class VettedPeer:
self.host = h
self.port = p
def __eq__(self, rhs):
return self.host == rhs.host and self.port == rhs.port
def __eq__(self, rhs: object) -> bool:
return self.host == rhs.host and self.port == rhs.port # type: ignore[no-any-return, attr-defined]
def __hash__(self):
def __hash__(self) -> int:
return hash((self.host, self.port))
@ -68,8 +68,10 @@ class IntroducerPeers:
except ValueError:
return False
def get_peers(self, max_peers: int = 0, randomize: bool = False, recent_threshold=9999999) -> List[VettedPeer]:
target_peers = [peer for peer in self._peers if time.time() - peer.time_added < recent_threshold]
def get_peers(
self, max_peers: int = 0, randomize: bool = False, recent_threshold: float = 9999999
) -> List[VettedPeer]:
target_peers = [peer for peer in self._peers if time.time() - float(peer.time_added) < recent_threshold]
if not max_peers or max_peers > len(target_peers):
max_peers = len(target_peers)
if randomize:

View file

@ -4,14 +4,15 @@ import asyncio
import math
import time
import traceback
from logging import Logger
from random import Random
from secrets import randbits
from typing import Dict, List, Optional, Set
from typing import Any, Dict, List, Optional, Set, Tuple, Union
import dns.asyncresolver
from chia.protocols.full_node_protocol import RequestPeers, RespondPeers
from chia.protocols.introducer_protocol import RequestPeersIntroducer
from chia.protocols.introducer_protocol import RequestPeersIntroducer, RespondPeersIntroducer
from chia.protocols.protocol_message_types import ProtocolMessageTypes
from chia.server.address_manager import AddressManager, ExtendedPeerInfo
from chia.server.address_manager_sqlite_store import create_address_manager_from_db
@ -22,7 +23,7 @@ from chia.server.server import ChiaServer
from chia.server.ws_connection import WSChiaConnection
from chia.types.peer_info import PeerInfo, TimestampedPeerInfo
from chia.util.hash import std_hash
from chia.util.ints import uint64
from chia.util.ints import uint16, uint64
MAX_PEERS_RECEIVED_PER_REQUEST = 1000
MAX_TOTAL_PEERS_RECEIVED = 3000
@ -43,13 +44,13 @@ class FullNodeDiscovery:
server: ChiaServer,
target_outbound_count: int,
peer_store_resolver: PeerStoreResolver,
introducer_info: Optional[Dict],
introducer_info: Optional[Dict[str, Any]],
dns_servers: List[str],
peer_connect_interval: int,
selected_network: str,
default_port: Optional[int],
log,
):
log: Logger,
) -> None:
self.server: ChiaServer = server
self.is_closed = False
self.target_outbound_count = target_outbound_count
@ -66,14 +67,14 @@ class FullNodeDiscovery:
self.introducer_info = None
self.peer_connect_interval = peer_connect_interval
self.log = log
self.relay_queue: Optional[asyncio.Queue] = None
self.relay_queue: Optional[asyncio.Queue[Tuple[TimestampedPeerInfo, int]]] = None
self.address_manager: Optional[AddressManager] = None
self.connection_time_pretest: Dict = {}
self.received_count_from_peers: Dict = {}
self.connection_time_pretest: Dict[str, Any] = {}
self.received_count_from_peers: Dict[str, Any] = {}
self.lock = asyncio.Lock()
self.connect_peers_task: Optional[asyncio.Task] = None
self.serialize_task: Optional[asyncio.Task] = None
self.cleanup_task: Optional[asyncio.Task] = None
self.connect_peers_task: Optional[asyncio.Task[None]] = None
self.serialize_task: Optional[asyncio.Task[None]] = None
self.cleanup_task: Optional[asyncio.Task[None]] = None
self.initial_wait: int = 0
try:
self.resolver: Optional[dns.asyncresolver.Resolver] = dns.asyncresolver.Resolver()
@ -81,7 +82,7 @@ class FullNodeDiscovery:
self.resolver = None
self.log.exception("Error initializing asyncresolver")
self.pending_outbound_connections: Set[str] = set()
self.pending_tasks: Set[asyncio.Task] = set()
self.pending_tasks: Set[asyncio.Task[None]] = set()
self.default_port: Optional[int] = default_port
if default_port is None and selected_network in NETWORK_ID_DEFAULT_PORTS:
self.default_port = NETWORK_ID_DEFAULT_PORTS[selected_network]
@ -130,14 +131,14 @@ class FullNodeDiscovery:
if len(self.pending_tasks) > 0:
await asyncio.wait(self.pending_tasks)
def cancel_task_safe(self, task: Optional[asyncio.Task]):
def cancel_task_safe(self, task: Optional[asyncio.Task[None]]) -> None:
if task is not None:
try:
task.cancel()
except Exception as e:
self.log.error(f"Error while canceling task.{e} {task}")
async def on_connect(self, peer: WSChiaConnection):
async def on_connect(self, peer: WSChiaConnection) -> None:
if (
peer.is_outbound is False
and peer.peer_server_port is not None
@ -164,7 +165,7 @@ class FullNodeDiscovery:
await peer.send_message(msg)
# Updates timestamps each time we receive a message for outbound connections.
async def update_peer_timestamp_on_message(self, peer: WSChiaConnection):
async def update_peer_timestamp_on_message(self, peer: WSChiaConnection) -> None:
if (
peer.is_outbound
and peer.peer_server_port is not None
@ -192,23 +193,23 @@ class FullNodeDiscovery:
(https://en.wikipedia.org/wiki/Poisson_distribution)
"""
def _poisson_next_send(self, now, avg_interval_seconds, random):
def _poisson_next_send(self, now: float, avg_interval_seconds: int, random: Random) -> float:
return now + (
math.log(random.randrange(1 << 48) * -0.0000000000000035527136788 + 1) * avg_interval_seconds * -1000000.0
+ 0.5
)
async def _introducer_client(self):
async def _introducer_client(self) -> None:
if self.introducer_info is None:
return None
async def on_connect(peer: WSChiaConnection):
async def on_connect(peer: WSChiaConnection) -> None:
msg = make_msg(ProtocolMessageTypes.request_peers_introducer, RequestPeersIntroducer())
await peer.send_message(msg)
await self.server.start_client(self.introducer_info, on_connect)
async def _query_dns(self, dns_address):
async def _query_dns(self, dns_address: str) -> None:
try:
if self.default_port is None:
self.log.error(
@ -225,8 +226,8 @@ class FullNodeDiscovery:
peers.append(
TimestampedPeerInfo(
ip.to_text(),
self.default_port,
0,
uint16(self.default_port),
uint64(0),
)
)
self.log.info(f"Received {len(peers)} peers from DNS seeder, using rdtype = {rdtype}.")
@ -235,7 +236,7 @@ class FullNodeDiscovery:
except Exception as e:
self.log.warning(f"querying DNS introducer failed: {e}")
async def on_connect_callback(self, peer: WSChiaConnection):
async def on_connect_callback(self, peer: WSChiaConnection) -> None:
if self.server.on_connect is not None:
await self.server.on_connect(peer)
else:
@ -267,7 +268,7 @@ class FullNodeDiscovery:
self.log.error(f"Exception in create outbound connections: {e}")
self.log.error(f"Traceback: {traceback.format_exc()}")
async def _connect_to_peers(self, random) -> None:
async def _connect_to_peers(self, random: Random) -> None:
next_feeler = self._poisson_next_send(time.time() * 1000 * 1000, 240, random)
retry_introducers = False
introducer_attempts: int = 0
@ -433,7 +434,7 @@ class FullNodeDiscovery:
self.log.error(f"Exception in create outbound connections: {e}")
self.log.error(f"Traceback: {traceback.format_exc()}")
async def _periodically_serialize(self, random: Random):
async def _periodically_serialize(self, random: Random) -> None:
while not self.is_closed:
if self.address_manager is None:
await asyncio.sleep(10)
@ -462,7 +463,9 @@ class FullNodeDiscovery:
async with self.address_manager.lock:
self.address_manager.cleanup(max_timestamp_difference, max_consecutive_failures)
async def _respond_peers_common(self, request, peer_src, is_full_node) -> None:
async def _respond_peers_common(
self, request: Union[RespondPeers, RespondPeersIntroducer], peer_src: Optional[PeerInfo], is_full_node: bool
) -> None:
# Check if we got the peers from a full node or from the introducer.
peers_adjusted_timestamp = []
is_misbehaving = False
@ -506,21 +509,21 @@ class FullNodeDiscovery:
class FullNodePeers(FullNodeDiscovery):
self_advertise_task: Optional[asyncio.Task] = None
address_relay_task: Optional[asyncio.Task] = None
self_advertise_task: Optional[asyncio.Task[None]] = None
address_relay_task: Optional[asyncio.Task[None]] = None
def __init__(
self,
server,
target_outbound_count,
server: ChiaServer,
target_outbound_count: int,
peer_store_resolver: PeerStoreResolver,
introducer_info,
dns_servers,
peer_connect_interval,
selected_network,
default_port,
log,
):
introducer_info: Dict[str, Any],
dns_servers: List[str],
peer_connect_interval: int,
selected_network: str,
default_port: Optional[int],
log: Logger,
) -> None:
super().__init__(
server,
target_outbound_count,
@ -548,7 +551,7 @@ class FullNodePeers(FullNodeDiscovery):
self.cancel_task_safe(self.self_advertise_task)
self.cancel_task_safe(self.address_relay_task)
async def _periodically_self_advertise_and_clean_data(self):
async def _periodically_self_advertise_and_clean_data(self) -> None:
while not self.is_closed:
try:
try:
@ -615,10 +618,14 @@ class FullNodePeers(FullNodeDiscovery):
self.log.error(f"Request peers exception: {e}")
return None
async def respond_peers(self, request, peer_src, is_full_node: bool) -> None:
async def respond_peers(
self, request: Union[RespondPeers, RespondPeersIntroducer], peer_src: Optional[PeerInfo], is_full_node: bool
) -> None:
try:
await self._respond_peers_common(request, peer_src, is_full_node)
if is_full_node:
if peer_src is None:
return
await self.add_peers_neighbour(request.peer_list, peer_src)
if len(request.peer_list) == 1 and self.relay_queue is not None:
peer = request.peer_list[0]
@ -628,10 +635,11 @@ class FullNodePeers(FullNodeDiscovery):
self.log.error(f"Respond peers exception: {e}. Traceback: {traceback.format_exc()}")
return None
async def _address_relay(self):
async def _address_relay(self) -> None:
while not self.is_closed:
try:
try:
assert self.relay_queue is not None, "FullNodePeers.relay_queue should always exist"
relay_peer, num_peers = await self.relay_queue.get()
except asyncio.CancelledError:
return None
@ -662,6 +670,8 @@ class FullNodePeers(FullNodeDiscovery):
if index >= num_peers:
break
peer_info = connection.get_peer_info()
if peer_info is None:
continue
async with self.lock:
if peer_info not in self.neighbour_known_peers:
self.neighbour_known_peers[peer_info] = set()
@ -684,15 +694,15 @@ class FullNodePeers(FullNodeDiscovery):
class WalletPeers(FullNodeDiscovery):
def __init__(
self,
server,
target_outbound_count,
server: ChiaServer,
target_outbound_count: int,
peer_store_resolver: PeerStoreResolver,
introducer_info,
dns_servers,
peer_connect_interval,
selected_network,
default_port,
log,
introducer_info: Dict[str, Any],
dns_servers: List[str],
peer_connect_interval: int,
selected_network: str,
default_port: Optional[int],
log: Logger,
) -> None:
super().__init__(
server,
@ -717,5 +727,7 @@ class WalletPeers(FullNodeDiscovery):
return None
await self._close_common()
async def respond_peers(self, request, peer_src, is_full_node) -> None:
async def respond_peers(
self, request: Union[RespondPeers, RespondPeersIntroducer], peer_src: Optional[PeerInfo], is_full_node: bool
) -> None:
await self._respond_peers_common(request, peer_src, is_full_node)

View file

@ -2,7 +2,7 @@ from __future__ import annotations
import os
from pathlib import Path
from typing import Dict, Optional
from typing import Any, Dict, Optional
class PeerStoreResolver:
@ -13,7 +13,7 @@ class PeerStoreResolver:
def __init__(
self,
root_path: Path,
config: Dict,
config: Dict[str, Any],
*,
selected_network: str,
peers_file_path_key: str, # config key for the peers data file relative path

View file

@ -4,7 +4,7 @@ import dataclasses
import logging
import time
from collections import Counter
from typing import Dict, List
from typing import List
from chia.protocols.protocol_message_types import ProtocolMessageTypes
from chia.protocols.shared_protocol import Capability
@ -19,13 +19,13 @@ class RateLimiter:
incoming: bool
reset_seconds: int
current_minute: int
message_counts: Counter
message_cumulative_sizes: Counter
message_counts: Counter[ProtocolMessageTypes]
message_cumulative_sizes: Counter[ProtocolMessageTypes]
percentage_of_limit: int
non_tx_message_counts: int = 0
non_tx_cumulative_size: int = 0
def __init__(self, incoming: bool, reset_seconds=60, percentage_of_limit=100):
def __init__(self, incoming: bool, reset_seconds: int = 60, percentage_of_limit: int = 100):
"""
The incoming parameter affects whether counters are incremented
unconditionally or not. For incoming messages, the counters are always
@ -69,7 +69,7 @@ class RateLimiter:
proportion_of_limit: float = self.percentage_of_limit / 100
ret: bool = False
rate_limits: Dict = get_rate_limits_to_use(our_capabilities, peer_capabilities)
rate_limits = get_rate_limits_to_use(our_capabilities, peer_capabilities)
try:

View file

@ -1,6 +1,7 @@
from __future__ import annotations
import asyncio
from logging import Logger
from typing import Optional
from chia.server.server import ChiaServer
@ -8,7 +9,9 @@ from chia.types.peer_info import PeerInfo
from chia.util.network import get_host_addr
def start_reconnect_task(server: ChiaServer, peer_info_arg: PeerInfo, log, prefer_ipv6: Optional[bool]):
def start_reconnect_task(
server: ChiaServer, peer_info_arg: PeerInfo, log: Logger, prefer_ipv6: Optional[bool]
) -> asyncio.Task[None]:
"""
Start a background task that checks connection and reconnects periodically to a peer.
"""
@ -18,7 +21,7 @@ def start_reconnect_task(server: ChiaServer, peer_info_arg: PeerInfo, log, prefe
else:
peer_info = PeerInfo(get_host_addr(peer_info_arg, prefer_ipv6), peer_info_arg.port)
async def connection_check():
async def connection_check() -> None:
while True:
peer_retry = True
for _, connection in server.all_connections.items():

View file

@ -213,6 +213,9 @@ class ChiaServer:
chia_ca_crt_path, chia_ca_key_path, public_cert_path, public_key_path, log=log
)
node_id_cert_path = private_cert_path if public_cert_path is None else public_cert_path
assert node_id_cert_path is not None
return cls(
_port=port,
_local_type=local_type,
@ -228,7 +231,7 @@ class ChiaServer:
config=config,
ssl_context=ssl_context,
ssl_client_context=ssl_client_context,
node_id=calculate_node_id(private_cert_path if public_cert_path is None else public_cert_path),
node_id=calculate_node_id(node_id_cert_path),
exempt_peer_networks=[ip_network(net, strict=False) for net in config.get("exempt_peer_networks", [])],
introducer_peers=IntroducerPeers() if local_type is NodeType.INTRODUCER else None,
)

View file

@ -1,31 +1,31 @@
from __future__ import annotations
from pathlib import Path
from typing import Dict
from typing import Any, Dict, Tuple
def public_ssl_paths(path: Path, config: Dict):
def public_ssl_paths(path: Path, config: Dict[str, Any]) -> Tuple[Path, Path]:
return (
path / config["ssl"]["public_crt"],
path / config["ssl"]["public_key"],
)
def private_ssl_paths(path: Path, config: Dict):
def private_ssl_paths(path: Path, config: Dict[str, Any]) -> Tuple[Path, Path]:
return (
path / config["ssl"]["private_crt"],
path / config["ssl"]["private_key"],
)
def private_ssl_ca_paths(path: Path, config: Dict):
def private_ssl_ca_paths(path: Path, config: Dict[str, Any]) -> Tuple[Path, Path]:
return (
path / config["private_ssl_ca"]["crt"],
path / config["private_ssl_ca"]["key"],
)
def chia_ssl_ca_paths(path: Path, config: Dict):
def chia_ssl_ca_paths(path: Path, config: Dict[str, Any]) -> Tuple[Path, Path]:
return (
path / config["chia_ssl_ca"]["crt"],
path / config["chia_ssl_ca"]["key"],

View file

@ -11,12 +11,12 @@ from chia.rpc.data_layer_rpc_api import DataLayerRpcApi
from chia.rpc.wallet_rpc_client import WalletRpcClient
from chia.server.outbound_message import NodeType
from chia.server.start_service import RpcInfo, Service, async_run
from chia.server.start_wallet import WalletNode
from chia.ssl.create_ssl import create_all_ssl
from chia.util.chia_logging import initialize_logging
from chia.util.config import load_config, load_config_cli
from chia.util.default_root import DEFAULT_ROOT_PATH
from chia.util.ints import uint16
from chia.wallet.wallet_node import WalletNode
# See: https://bugs.python.org/issue29288
"".encode("idna")

View file

@ -2,7 +2,7 @@ from __future__ import annotations
import pathlib
import sys
from typing import Dict, Optional
from typing import Any, Dict, Optional
from chia.consensus.constants import ConsensusConstants
from chia.consensus.default_constants import DEFAULT_CONSTANTS
@ -25,8 +25,8 @@ SERVICE_NAME = "farmer"
def create_farmer_service(
root_path: pathlib.Path,
config: Dict,
config_pool: Dict,
config: Dict[str, Any],
config_pool: Dict[str, Any],
consensus_constants: ConsensusConstants,
keychain: Optional[Keychain] = None,
connect_to_daemon: bool = True,

View file

@ -5,7 +5,7 @@ import os
import pathlib
import sys
from multiprocessing import freeze_support
from typing import Dict, List, Optional, Tuple
from typing import Any, Dict, List, Optional, Tuple
from chia.consensus.constants import ConsensusConstants
from chia.consensus.default_constants import DEFAULT_CONSTANTS
@ -29,10 +29,10 @@ log = logging.getLogger(__name__)
def create_full_node_service(
root_path: pathlib.Path,
config: Dict,
config: Dict[str, Any],
consensus_constants: ConsensusConstants,
connect_to_daemon: bool = True,
override_capabilities: List[Tuple[uint16, str]] = None,
override_capabilities: Optional[List[Tuple[uint16, str]]] = None,
) -> Service[FullNode]:
service_config = config[SERVICE_NAME]

View file

@ -2,7 +2,7 @@ from __future__ import annotations
import pathlib
import sys
from typing import Dict, Optional
from typing import Any, Dict, Optional
from chia.consensus.constants import ConsensusConstants
from chia.consensus.default_constants import DEFAULT_CONSTANTS
@ -24,7 +24,7 @@ SERVICE_NAME = "harvester"
def create_harvester_service(
root_path: pathlib.Path,
config: Dict,
config: Dict[str, Any],
consensus_constants: ConsensusConstants,
farmer_peer: Optional[PeerInfo],
connect_to_daemon: bool = True,

View file

@ -2,7 +2,7 @@ from __future__ import annotations
import pathlib
import sys
from typing import Dict, Optional
from typing import Any, Dict, Optional
from chia.introducer.introducer import Introducer
from chia.introducer.introducer_api import IntroducerAPI
@ -20,7 +20,7 @@ SERVICE_NAME = "introducer"
def create_introducer_service(
root_path: pathlib.Path,
config: Dict,
config: Dict[str, Any],
advertised_port: Optional[int] = None,
connect_to_daemon: bool = True,
) -> Service[Introducer]:

View file

@ -7,10 +7,13 @@ import logging.config
import os
import signal
import sys
from typing import Any, Callable, Coroutine, Dict, Generic, List, Optional, Tuple, Type, TypeVar
from pathlib import Path
from types import FrameType
from typing import Any, Awaitable, Callable, Coroutine, Dict, Generic, List, Optional, Tuple, Type, TypeVar
from chia.daemon.server import service_launch_lock_path
from chia.server.ssl_context import chia_ssl_ca_paths, private_ssl_ca_paths
from chia.server.ws_connection import WSChiaConnection
from chia.util.lock import Lockfile, LockfileError
from ..protocols.shared_protocol import capabilities
@ -48,7 +51,7 @@ class ServiceException(Exception):
class Service(Generic[_T_RpcServiceProtocol]):
def __init__(
self,
root_path,
root_path: Path,
node: _T_RpcServiceProtocol,
peer_api: Any,
node_type: NodeType,
@ -60,9 +63,9 @@ class Service(Generic[_T_RpcServiceProtocol]):
upnp_ports: List[int] = [],
server_listen_ports: List[int] = [],
connect_peers: List[PeerInfo] = [],
on_connect_callback: Optional[Callable] = None,
on_connect_callback: Optional[Callable[[WSChiaConnection], Awaitable[None]]] = None,
rpc_info: Optional[RpcInfo] = None,
connect_to_daemon=True,
connect_to_daemon: bool = True,
max_request_body_size: Optional[int] = None,
override_capabilities: Optional[List[Tuple[uint16, str]]] = None,
) -> None:
@ -76,7 +79,7 @@ class Service(Generic[_T_RpcServiceProtocol]):
self._node_type = node_type
self._service_name = service_name
self.rpc_server: Optional[RpcServer] = None
self._rpc_close_task: Optional[asyncio.Task] = None
self._rpc_close_task: Optional[asyncio.Task[None]] = None
self._network_id: str = network_id
self.max_request_body_size = max_request_body_size
@ -131,7 +134,7 @@ class Service(Generic[_T_RpcServiceProtocol]):
self._on_connect_callback = on_connect_callback
self._advertised_port = advertised_port
self._reconnect_tasks: Dict[PeerInfo, Optional[asyncio.Task]] = {peer: None for peer in connect_peers}
self._reconnect_tasks: Dict[PeerInfo, Optional[asyncio.Task[None]]] = {peer: None for peer in connect_peers}
self.upnp: UPnP = UPnP()
async def start(self) -> None:
@ -217,7 +220,7 @@ class Service(Generic[_T_RpcServiceProtocol]):
functools.partial(self._accept_signal, signal_number=signal.SIGTERM),
)
def _accept_signal(self, signal_number: int, stack_frame=None):
def _accept_signal(self, signal_number: int, stack_frame: Optional[FrameType] = None) -> None:
self._log.info(f"got signal {signal_number}")
# we only handle signals in the main process. In the ProcessPoolExecutor

View file

@ -3,7 +3,7 @@ from __future__ import annotations
import logging
import pathlib
import sys
from typing import Dict, Optional
from typing import Any, Dict, Optional
from chia.consensus.constants import ConsensusConstants
from chia.consensus.default_constants import DEFAULT_CONSTANTS
@ -28,7 +28,7 @@ log = logging.getLogger(__name__)
def create_timelord_service(
root_path: pathlib.Path,
config: Dict,
config: Dict[str, Any],
constants: ConsensusConstants,
connect_to_daemon: bool = True,
) -> Service[Timelord]:

View file

@ -4,7 +4,7 @@ import os
import pathlib
import sys
from multiprocessing import freeze_support
from typing import Dict, Optional
from typing import Any, Dict, Optional
from chia.consensus.constants import ConsensusConstants
from chia.consensus.default_constants import DEFAULT_CONSTANTS
@ -29,7 +29,7 @@ SERVICE_NAME = "wallet"
def create_wallet_service(
root_path: pathlib.Path,
config: Dict,
config: Dict[str, Any],
consensus_constants: ConsensusConstants,
keychain: Optional[Keychain] = None,
connect_to_daemon: bool = True,

View file

@ -14,7 +14,7 @@ class PeerInfo(Streamable):
host: str
port: uint16
def is_valid(self, allow_private_subnets=False) -> bool:
def is_valid(self, allow_private_subnets: bool = False) -> bool:
ip: Optional[Union[ipaddress.IPv6Address, ipaddress.IPv4Address]] = None
try:
ip = ipaddress.IPv6Address(self.host)
@ -36,7 +36,7 @@ class PeerInfo(Streamable):
return False
# Functions related to peer bucketing in new/tried tables.
def get_key(self):
def get_key(self) -> bytes:
try:
ip = ipaddress.IPv6Address(self.host)
except ValueError:
@ -46,18 +46,20 @@ class PeerInfo(Streamable):
key += bytes([self.port // 0x100, self.port & 0x0FF])
return key
def get_group(self):
def get_group(self) -> bytes:
# TODO: Port everything from Bitcoin.
ipv4 = 1
ip_v4: Optional[ipaddress.IPv4Address] = None
ip_v6: Optional[ipaddress.IPv6Address] = None
try:
ip = ipaddress.IPv4Address(self.host)
ip_v4 = ipaddress.IPv4Address(self.host)
except ValueError:
ip = ipaddress.IPv6Address(self.host)
ipv4 = 0
if ipv4:
group = bytes([1]) + ip.packed[:2]
ip_v6 = ipaddress.IPv6Address(self.host)
if ip_v4 is not None:
group = bytes([1]) + ip_v4.packed[:2]
elif ip_v6 is not None:
group = bytes([0]) + ip_v6.packed[:4]
else:
group = bytes([0]) + ip.packed[:4]
raise ValueError("PeerInfo.host is not an ip address")
return group

File diff suppressed because one or more lines are too long