Ms.wlt node unsynced (#12875)

* Allow different reject message

* Sync correctly when localhost is not synced

* Remove logs

* Fix getting timestamp when local node is not synced

* Precommit

* Mypy

* Debug logging

* Is trusted

* revert gui

* Remove debug logging

* Remove more debug logs
This commit is contained in:
Mariano Sorgente 2022-08-12 01:34:23 +07:00 committed by GitHub
parent db48f53ede
commit 51df412652
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 54 additions and 38 deletions

View file

@ -33,12 +33,12 @@ VALID_REPLY_MESSAGE_MAP = {
pmt.request_signage_point_or_end_of_sub_slot: [pmt.respond_signage_point, pmt.respond_end_of_sub_slot],
pmt.request_compact_vdf: [pmt.respond_compact_vdf],
pmt.request_peers: [pmt.respond_peers],
pmt.request_header_blocks: [pmt.respond_header_blocks, pmt.reject_header_blocks],
pmt.request_header_blocks: [pmt.respond_header_blocks, pmt.reject_header_blocks, pmt.reject_block_headers],
pmt.register_interest_in_puzzle_hash: [pmt.respond_to_ph_update],
pmt.register_interest_in_coin: [pmt.respond_to_coin_update],
pmt.request_children: [pmt.respond_children],
pmt.request_ses_hashes: [pmt.respond_ses_hashes],
pmt.request_block_headers: [pmt.respond_block_headers, pmt.reject_block_headers],
pmt.request_block_headers: [pmt.respond_block_headers, pmt.reject_block_headers, pmt.reject_header_blocks],
pmt.request_peers_introducer: [pmt.respond_peers_introducer],
pmt.request_puzzle_solution: [pmt.respond_puzzle_solution, pmt.reject_puzzle_solution],
pmt.send_transaction: [pmt.transaction_ack],

View file

@ -333,18 +333,18 @@ async def request_header_blocks(
async def _fetch_header_blocks_inner(
all_peers: List[WSChiaConnection],
all_peers: List[Tuple[WSChiaConnection, bool]],
request_start: uint32,
request_end: uint32,
) -> Optional[Union[RespondHeaderBlocks, RespondBlockHeaders]]:
# We will modify this list, don't modify passed parameters.
bytes_api_peers = [peer for peer in all_peers if Capability.BLOCK_HEADERS in peer.peer_capabilities]
other_peers = [peer for peer in all_peers if Capability.BLOCK_HEADERS not in peer.peer_capabilities]
bytes_api_peers = [peer for peer in all_peers if Capability.BLOCK_HEADERS in peer[0].peer_capabilities]
other_peers = [peer for peer in all_peers if Capability.BLOCK_HEADERS not in peer[0].peer_capabilities]
random.shuffle(bytes_api_peers)
random.shuffle(other_peers)
for peer in bytes_api_peers + other_peers:
if Capability.BLOCK_HEADERS.name in peer.peer_capabilities:
for peer, is_trusted in bytes_api_peers + other_peers:
if Capability.BLOCK_HEADERS in peer.peer_capabilities:
response = await peer.request_block_headers(RequestBlockHeaders(request_start, request_end, False))
else:
response = await peer.request_header_blocks(RequestHeaderBlocks(request_start, request_end))
@ -354,7 +354,9 @@ async def _fetch_header_blocks_inner(
# Request to peer failed in some way, close the connection and remove the peer
# from our local list.
await peer.close()
if not is_trusted:
log.info(f"Closing peer {peer} since it does not have the blocks we asked for")
await peer.close()
return None
@ -363,7 +365,7 @@ async def fetch_header_blocks_in_range(
start: uint32,
end: uint32,
peer_request_cache: PeerRequestCache,
all_peers: List[WSChiaConnection],
all_peers: List[Tuple[WSChiaConnection, bool]],
) -> Optional[List[HeaderBlock]]:
blocks: List[HeaderBlock] = []
for i in range(start - (start % 32), end + 1, 32):

View file

@ -874,29 +874,41 @@ class WalletNode:
request.peak_hash,
)
def get_full_node_peer(self, synced_only: bool = False) -> Optional[WSChiaConnection]:
def get_full_node_peer(self) -> Optional[WSChiaConnection]:
"""
Get a full node, preferring synced & trusted > synced & untrusted > unsynced & trusted > unsynced & untrusted
"""
if self._server is None:
full_nodes: List[WSChiaConnection] = self.get_full_node_peers_in_order()
if len(full_nodes) == 0:
return None
return full_nodes[0]
nodes = self.server.get_full_node_connections()
if len(nodes) > 0:
synced_peers = set(node for node in nodes if node.peer_node_id in self.synced_peers)
trusted_peers = set(node for node in nodes if self.is_trusted(node))
if len(synced_peers & trusted_peers) > 0:
return random.choice(list(synced_peers & trusted_peers))
elif len(synced_peers) > 0:
return random.choice(list(synced_peers))
elif synced_only:
return None
elif len(trusted_peers) > 0:
return random.choice(list(trusted_peers))
def get_full_node_peers_in_order(self) -> List[WSChiaConnection]:
"""
Get all full nodes sorted:
preferring synced & trusted > synced & untrusted > unsynced & trusted > unsynced & untrusted
"""
if self._server is None:
return []
synced_and_trusted: List[WSChiaConnection] = []
synced: List[WSChiaConnection] = []
trusted: List[WSChiaConnection] = []
neither: List[WSChiaConnection] = []
all_nodes: List[WSChiaConnection] = self.server.get_full_node_connections().copy()
random.shuffle(all_nodes)
for node in all_nodes:
we_synced_to_it = node.peer_node_id in self.synced_peers
is_trusted = self.is_trusted(node)
if we_synced_to_it and is_trusted:
synced_and_trusted.append(node)
elif we_synced_to_it:
synced.append(node)
elif is_trusted:
trusted.append(node)
else:
return random.choice(list(nodes))
else:
return None
neither.append(node)
return synced_and_trusted + synced + trusted + neither
async def disconnect_and_stop_wpeers(self) -> None:
if self._server is None:
@ -933,16 +945,18 @@ class WalletNode:
if cache_ts is not None:
return cache_ts
peer: Optional[WSChiaConnection] = self.get_full_node_peer()
if peer is None:
raise ValueError("Cannot fetch timestamp, no peers")
self.log.debug(f"Fetching block at height: {height}")
last_tx_block: Optional[HeaderBlock] = await fetch_last_tx_from_peer(height, peer)
if last_tx_block is None:
raise ValueError(f"Error fetching blocks from peer {peer.get_peer_info()}")
assert last_tx_block.foliage_transaction_block is not None
self.get_cache_for_peer(peer).add_to_blocks(last_tx_block)
return last_tx_block.foliage_transaction_block.timestamp
peers: List[WSChiaConnection] = self.get_full_node_peers_in_order()
last_tx_block: Optional[HeaderBlock] = None
for peer in peers:
last_tx_block = await fetch_last_tx_from_peer(height, peer)
if last_tx_block is None:
continue
assert last_tx_block.foliage_transaction_block is not None
self.get_cache_for_peer(peer).add_to_blocks(last_tx_block)
return last_tx_block.foliage_transaction_block.timestamp
raise ValueError("Error fetching timestamp from all peers")
async def new_peak_wallet(self, new_peak: wallet_protocol.NewPeakWallet, peer: WSChiaConnection):
if self._wallet_state_manager is None:
@ -1013,7 +1027,6 @@ class WalletNode:
if far_behind or len(self.synced_peers) == 0:
syncing = True
self.wallet_state_manager.set_sync_mode(True)
if not syncing and not (
self._secondary_peer_sync_task is None or self._secondary_peer_sync_task.done()
):
@ -1447,7 +1460,8 @@ class WalletNode:
if end == 0:
self.log.error("Error finding sub epoch")
return False
all_peers = self.server.get_full_node_connections()
all_peers_c = self.server.get_full_node_connections()
all_peers = [(con, self.is_trusted(con)) for con in all_peers_c]
blocks: Optional[List[HeaderBlock]] = await fetch_header_blocks_in_range(
start, end, peer_request_cache, all_peers
)