Ms.hint refactor (#11222)

* Some work on hints

* More cleanup and test_generator_tools

* Change relevant calls

* More refactor and cleanup

* More refactor

* Small fix

* Create wrapper object PeakPostProcessingResult

* Lint and small fix

* Fix a hint bug

* Fix hint update_wallets

* Fix test_full_sync

* pre-commit

* Start with PR review comments

* More efficient iteration

* Remove tx_removals_additions_and_hints

* Revert mozilla ca

* Fix issue with startup
This commit is contained in:
Mariano Sorgente 2022-05-05 11:19:57 -04:00 committed by GitHub
parent cf5113ae87
commit e0df18f85d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
16 changed files with 492 additions and 266 deletions

View file

@ -95,7 +95,7 @@ jobs:
- name: Test core-full_node code with pytest
run: |
. ./activate
venv/bin/coverage run --rcfile=.coveragerc --module pytest --durations=10 -n 4 -m "not benchmark" tests/core/full_node/test_address_manager.py tests/core/full_node/test_block_height_map.py tests/core/full_node/test_conditions.py tests/core/full_node/test_full_node.py tests/core/full_node/test_mempool.py tests/core/full_node/test_mempool_performance.py tests/core/full_node/test_node_load.py tests/core/full_node/test_peer_store_resolver.py tests/core/full_node/test_performance.py tests/core/full_node/test_transactions.py
venv/bin/coverage run --rcfile=.coveragerc --module pytest --durations=10 -n 4 -m "not benchmark" tests/core/full_node/test_address_manager.py tests/core/full_node/test_block_height_map.py tests/core/full_node/test_conditions.py tests/core/full_node/test_full_node.py tests/core/full_node/test_generator_tools.py tests/core/full_node/test_hint_management.py tests/core/full_node/test_mempool.py tests/core/full_node/test_mempool_performance.py tests/core/full_node/test_node_load.py tests/core/full_node/test_peer_store_resolver.py tests/core/full_node/test_performance.py tests/core/full_node/test_transactions.py
- name: Process coverage data
run: |

View file

@ -94,7 +94,7 @@ jobs:
- name: Test core-full_node code with pytest
run: |
. ./activate
venv/bin/coverage run --rcfile=.coveragerc --module pytest --durations=10 -n 4 -m "not benchmark" tests/core/full_node/test_address_manager.py tests/core/full_node/test_block_height_map.py tests/core/full_node/test_conditions.py tests/core/full_node/test_full_node.py tests/core/full_node/test_mempool.py tests/core/full_node/test_mempool_performance.py tests/core/full_node/test_node_load.py tests/core/full_node/test_peer_store_resolver.py tests/core/full_node/test_performance.py tests/core/full_node/test_transactions.py
venv/bin/coverage run --rcfile=.coveragerc --module pytest --durations=10 -n 4 -m "not benchmark" tests/core/full_node/test_address_manager.py tests/core/full_node/test_block_height_map.py tests/core/full_node/test_conditions.py tests/core/full_node/test_full_node.py tests/core/full_node/test_generator_tools.py tests/core/full_node/test_hint_management.py tests/core/full_node/test_mempool.py tests/core/full_node/test_mempool_performance.py tests/core/full_node/test_node_load.py tests/core/full_node/test_peer_store_resolver.py tests/core/full_node/test_performance.py tests/core/full_node/test_transactions.py
- name: Process coverage data
run: |

View file

@ -134,7 +134,7 @@ class SpendSim:
await self.db_wrapper.close()
async def new_peak(self):
await self.mempool_manager.new_peak(self.block_records[-1], [])
await self.mempool_manager.new_peak(self.block_records[-1], None)
def new_coin_record(self, coin: Coin, coinbase=False) -> CoinRecord:
return CoinRecord(

View file

@ -67,6 +67,15 @@ class ReceiveBlockResult(Enum):
DISCONNECTED_BLOCK = 5 # Block's parent (previous pointer) is not in this blockchain
@dataclasses.dataclass
class StateChangeSummary:
peak: BlockRecord
fork_height: uint32
rolled_back_records: List[CoinRecord]
new_npc_results: List[NPCResult]
new_rewards: List[Coin]
class Blockchain(BlockchainInterface):
constants: ConsensusConstants
constants_json: Dict
@ -193,12 +202,7 @@ class Blockchain(BlockchainInterface):
block: FullBlock,
pre_validation_result: PreValidationResult,
fork_point_with_peak: Optional[uint32] = None,
) -> Tuple[
ReceiveBlockResult,
Optional[Err],
Optional[uint32],
Tuple[List[CoinRecord], Dict[bytes, Dict[bytes32, CoinRecord]]],
]:
) -> Tuple[ReceiveBlockResult, Optional[Err], Optional[StateChangeSummary]]:
"""
This method must be called under the blockchain lock
Adds a new block into the blockchain, if it's valid and connected to the current
@ -215,24 +219,26 @@ class Blockchain(BlockchainInterface):
The result of adding the block to the blockchain (NEW_PEAK, ADDED_AS_ORPHAN, INVALID_BLOCK,
DISCONNECTED_BLOCK, ALREDY_HAVE_BLOCK)
An optional error if the result is not NEW_PEAK or ADDED_AS_ORPHAN
A fork point if the result is NEW_PEAK
A list of changes to the coin store, and changes to hints, if the result is NEW_PEAK
A StateChangeSumamry iff NEW_PEAK, with:
- A fork point if the result is NEW_PEAK
- A list of coin changes as a result of rollback
- A list of NPCResult for any new transaction block added to the chain
"""
genesis: bool = block.height == 0
if self.contains_block(block.header_hash):
return ReceiveBlockResult.ALREADY_HAVE_BLOCK, None, None, ([], {})
return ReceiveBlockResult.ALREADY_HAVE_BLOCK, None, None
if not self.contains_block(block.prev_header_hash) and not genesis:
return (ReceiveBlockResult.DISCONNECTED_BLOCK, Err.INVALID_PREV_BLOCK_HASH, None, ([], {}))
return ReceiveBlockResult.DISCONNECTED_BLOCK, Err.INVALID_PREV_BLOCK_HASH, None
if not genesis and (self.block_record(block.prev_header_hash).height + 1) != block.height:
return ReceiveBlockResult.INVALID_BLOCK, Err.INVALID_HEIGHT, None, ([], {})
return ReceiveBlockResult.INVALID_BLOCK, Err.INVALID_HEIGHT, None
npc_result: Optional[NPCResult] = pre_validation_result.npc_result
required_iters = pre_validation_result.required_iters
if pre_validation_result.error is not None:
return ReceiveBlockResult.INVALID_BLOCK, Err(pre_validation_result.error), None, ([], {})
return ReceiveBlockResult.INVALID_BLOCK, Err(pre_validation_result.error), None
assert required_iters is not None
error_code, _ = await validate_block_body(
@ -250,7 +256,7 @@ class Blockchain(BlockchainInterface):
validate_signature=not pre_validation_result.validated_signature,
)
if error_code is not None:
return ReceiveBlockResult.INVALID_BLOCK, error_code, None, ([], {})
return ReceiveBlockResult.INVALID_BLOCK, error_code, None
block_record = block_to_block_record(
self.constants,
@ -265,23 +271,23 @@ class Blockchain(BlockchainInterface):
header_hash: bytes32 = block.header_hash
# Perform the DB operations to update the state, and rollback if something goes wrong
await self.block_store.add_full_block(header_hash, block, block_record)
fork_height, peak_height, records, (coin_record_change, hint_changes) = await self._reconsider_peak(
records, state_change_summary = await self._reconsider_peak(
block_record, genesis, fork_point_with_peak, npc_result
)
# Then update the memory cache. It is important that this task is not cancelled and does not throw
# Then update the memory cache. It is important that this is not cancelled and does not throw
# This is done after all async/DB operations, so there is a decreased chance of failure.
self.add_block_record(block_record)
if fork_height is not None:
self.__height_map.rollback(fork_height)
if state_change_summary is not None:
self.__height_map.rollback(state_change_summary.fork_height)
for fetched_block_record in records:
self.__height_map.update_height(
fetched_block_record.height,
fetched_block_record.header_hash,
fetched_block_record.sub_epoch_summary_included,
)
if peak_height is not None:
self._peak_height = peak_height
await self.__height_map.maybe_flush()
if state_change_summary is not None:
self._peak_height = block_record.height
except BaseException as e:
self.block_store.rollback_cache_block(header_hash)
log.error(
@ -290,23 +296,14 @@ class Blockchain(BlockchainInterface):
)
raise
if fork_height is not None:
# new coin records added
assert coin_record_change is not None
return ReceiveBlockResult.NEW_PEAK, None, fork_height, (coin_record_change, hint_changes)
else:
return ReceiveBlockResult.ADDED_AS_ORPHAN, None, None, ([], {})
# This is done outside the try-except in case it fails, since we do not want to revert anything if it does
await self.__height_map.maybe_flush()
def get_hint_list(self, npc_result: NPCResult) -> List[Tuple[bytes32, bytes]]:
if npc_result.conds is None:
return []
h_list = []
for spend in npc_result.conds.spends:
for puzzle_hash, amount, hint in spend.create_coin:
if hint != b"":
coin_id = Coin(spend.coin_id, puzzle_hash, amount).name()
h_list.append((coin_id, hint))
return h_list
if state_change_summary is not None:
# new coin records added
return ReceiveBlockResult.NEW_PEAK, None, state_change_summary
else:
return ReceiveBlockResult.ADDED_AS_ORPHAN, None, None
async def _reconsider_peak(
self,
@ -314,21 +311,16 @@ class Blockchain(BlockchainInterface):
genesis: bool,
fork_point_with_peak: Optional[uint32],
npc_result: Optional[NPCResult],
) -> Tuple[
Optional[uint32],
Optional[uint32],
List[BlockRecord],
Tuple[List[CoinRecord], Dict[bytes, Dict[bytes32, CoinRecord]]],
]:
) -> Tuple[List[BlockRecord], Optional[StateChangeSummary]]:
"""
When a new block is added, this is called, to check if the new block is the new peak of the chain.
This also handles reorgs by reverting blocks which are not in the heaviest chain.
It returns the height of the fork between the previous chain and the new chain, or returns
None if there was no update to the heaviest chain.
It returns the summary of the applied changes, including the height of the fork between the previous chain
and the new chain, or returns None if there was no update to the heaviest chain.
"""
peak = self.get_peak()
latest_coin_state: Dict[bytes32, CoinRecord] = {}
hint_coin_state: Dict[bytes, Dict[bytes32, CoinRecord]] = {}
rolled_back_state: Dict[bytes32, CoinRecord] = {}
if genesis:
if peak is None:
@ -341,26 +333,26 @@ class Blockchain(BlockchainInterface):
tx_removals, tx_additions = [], []
if block.is_transaction_block():
assert block.foliage_transaction_block is not None
added = await self.coin_store.new_block(
await self.coin_store.new_block(
block.height,
block.foliage_transaction_block.timestamp,
block.get_included_reward_coins(),
tx_additions,
tx_removals,
)
else:
added, _ = [], []
await self.block_store.set_in_chain([(block_record.header_hash,)])
await self.block_store.set_peak(block_record.header_hash)
return uint32(0), uint32(0), [block_record], (added, {})
return None, None, [], ([], {})
return [block_record], StateChangeSummary(
block_record, uint32(0), [], [], list(block.get_included_reward_coins())
)
return [], None
assert peak is not None
if block_record.weight <= peak.weight:
# This is not a heavier block than the heaviest we have seen, so we don't change the coin set
return None, None, [], ([], {})
return [], None
# Find the fork. if the block is just being appended, it will return the peak
# Finds the fork. if the block is just being appended, it will return the peak
# If no blocks in common, returns -1, and reverts all blocks
if block_record.prev_hash == peak.header_hash:
fork_height: int = peak.height
@ -370,14 +362,14 @@ class Blockchain(BlockchainInterface):
fork_height = find_fork_point_in_chain(self, block_record, peak)
if block_record.prev_hash != peak.header_hash:
roll_changes: List[CoinRecord] = await self.coin_store.rollback_to_block(fork_height)
for coin_record in roll_changes:
latest_coin_state[coin_record.name] = coin_record
for coin_record in await self.coin_store.rollback_to_block(fork_height):
rolled_back_state[coin_record.name] = coin_record
# Collect all blocks from fork point to new peak
# Collects all blocks from fork point to new peak
blocks_to_add: List[Tuple[FullBlock, BlockRecord]] = []
curr = block_record.header_hash
# Backtracks up to the fork point, pulling all the required blocks from DB (that will soon be in the chain)
while fork_height < 0 or curr != self.height_to_hash(uint32(fork_height)):
fetched_full_block: Optional[FullBlock] = await self.block_store.get_full_block(curr)
fetched_block_record: Optional[BlockRecord] = await self.block_store.get_block_record(curr)
@ -389,12 +381,16 @@ class Blockchain(BlockchainInterface):
break
curr = fetched_block_record.prev_hash
records_to_add = []
records_to_add: List[BlockRecord] = []
npc_results: List[NPCResult] = []
reward_coins: List[Coin] = []
for fetched_full_block, fetched_block_record in reversed(blocks_to_add):
records_to_add.append(fetched_block_record)
if not fetched_full_block.is_transaction_block():
# Coins are only created in TX blocks so there are no state updates for this block
continue
# We need to recompute the additions and removals, since they are not stored on DB (only generator is).
if fetched_block_record.header_hash == block_record.header_hash:
tx_removals, tx_additions, npc_res = await self.get_tx_removals_and_additions(
fetched_full_block, npc_result
@ -402,35 +398,21 @@ class Blockchain(BlockchainInterface):
else:
tx_removals, tx_additions, npc_res = await self.get_tx_removals_and_additions(fetched_full_block, None)
# Collect the NPC results for later post-processing
if npc_res is not None:
npc_results.append(npc_res)
# Apply the coin store changes for each block that is now in the blockchain
assert fetched_full_block.foliage_transaction_block is not None
added_rec = await self.coin_store.new_block(
await self.coin_store.new_block(
fetched_full_block.height,
fetched_full_block.foliage_transaction_block.timestamp,
fetched_full_block.get_included_reward_coins(),
tx_additions,
tx_removals,
)
removed_rec: List[CoinRecord] = await self.coin_store.get_coin_records(tx_removals)
assert len(removed_rec) == len(tx_removals)
# Set additions first, then removals in order to handle ephemeral coin state
# Add in height order is also required
record: Optional[CoinRecord]
for record in added_rec:
assert record
latest_coin_state[record.name] = record
for record in removed_rec:
latest_coin_state[record.name] = record
if npc_res is not None:
hint_list: List[Tuple[bytes32, bytes]] = self.get_hint_list(npc_res)
await self.hint_store.add_hints(hint_list)
# There can be multiple coins for the same hint
for coin_id, hint in hint_list:
key = hint
if key not in hint_coin_state:
hint_coin_state[key] = {}
hint_coin_state[key][coin_id] = latest_coin_state[coin_id]
# Collect the new reward coins for later post-processing
reward_coins.extend(fetched_full_block.get_included_reward_coins())
# we made it to the end successfully
# Rollback sub_epoch_summaries
@ -439,11 +421,9 @@ class Blockchain(BlockchainInterface):
# Changes the peak to be the new peak
await self.block_store.set_peak(block_record.header_hash)
return (
uint32(max(fork_height, 0)),
block_record.height,
records_to_add,
(list(latest_coin_state.values()), hint_coin_state),
return records_to_add, StateChangeSummary(
block_record, uint32(max(fork_height, 0)), list(rolled_back_state.values()), npc_results, reward_coins
)
async def get_tx_removals_and_additions(

View file

@ -17,7 +17,7 @@ from blspy import AugSchemeMPL
import chia.server.ws_connection as ws # lgtm [py/import-and-import-from]
from chia.consensus.block_creation import unfinished_block_to_full_block
from chia.consensus.block_record import BlockRecord
from chia.consensus.blockchain import Blockchain, ReceiveBlockResult
from chia.consensus.blockchain import Blockchain, ReceiveBlockResult, StateChangeSummary
from chia.consensus.blockchain_interface import BlockchainInterface
from chia.consensus.constants import ConsensusConstants
from chia.consensus.cost_calculator import NPCResult
@ -26,6 +26,7 @@ from chia.consensus.make_sub_epoch_summary import next_sub_epoch_summary
from chia.consensus.multiprocess_validation import PreValidationResult
from chia.consensus.pot_iterations import calculate_sp_iters
from chia.full_node.block_store import BlockStore
from chia.full_node.hint_management import get_hints_and_subscription_coin_ids
from chia.full_node.lock_queue import LockQueue, LockClient
from chia.full_node.bundle_tools import detect_potential_template_generator
from chia.full_node.coin_store import CoinStore
@ -78,6 +79,15 @@ from chia.util.db_synchronous import db_synchronous_on
from chia.util.db_version import lookup_db_version, set_db_version_async
# This is the result of calling peak_post_processing, which is then fed into peak_post_processing_2
@dataclasses.dataclass
class PeakPostProcessingResult:
mempool_peak_result: List[Tuple[SpendBundle, NPCResult, bytes32]] # The result of calling MempoolManager.new_peak
fns_peak_result: FullNodeStorePeakResult # The result of calling FullNodeStore.new_peak
hints: List[Tuple[bytes32, bytes]] # The hints added to the DB
lookup_coin_ids: List[bytes32] # The coin IDs that we need to look up to notify wallets of changes
class FullNode:
block_store: BlockStore
full_node_store: FullNodeStore
@ -272,18 +282,17 @@ class FullNode:
f"time taken: {int(time_taken)}s"
)
async with self._blockchain_lock_high_priority:
pending_tx = await self.mempool_manager.new_peak(self.blockchain.get_peak(), [])
pending_tx = await self.mempool_manager.new_peak(self.blockchain.get_peak(), None)
assert len(pending_tx) == 0 # no pending transactions when starting up
peak: Optional[BlockRecord] = self.blockchain.get_peak()
if peak is not None:
full_peak = await self.blockchain.get_full_peak()
mempool_new_peak_result, fns_peak_result = await self.peak_post_processing(
full_peak, peak, max(peak.height - 1, 0), None, []
)
await self.peak_post_processing_2(
full_peak, peak, max(peak.height - 1, 0), None, ([], {}), mempool_new_peak_result, fns_peak_result
state_change_summary = StateChangeSummary(peak, max(peak.height - 1, 0), [], [], [])
ppp_result: PeakPostProcessingResult = await self.peak_post_processing(
full_peak, state_change_summary, None
)
await self.peak_post_processing_2(full_peak, None, state_change_summary, ppp_result)
if self.config["send_uncompact_interval"] != 0:
sanitize_weight_proof_only = False
if "sanitize_weight_proof_only" in self.config:
@ -322,7 +331,7 @@ class FullNode:
try:
while not self._shut_down:
# We use a semaphore to make sure we don't send more than 200 concurrent calls of respond_transaction.
# However doing them one at a time would be slow, because they get sent to other processes.
# However, doing them one at a time would be slow, because they get sent to other processes.
await self.respond_transaction_semaphore.acquire()
item: TransactionQueueEntry = (await self.transaction_queue.get())[1]
asyncio.create_task(self._handle_one_transaction(item))
@ -437,27 +446,25 @@ class FullNode:
if not response:
raise ValueError(f"Error short batch syncing, invalid/no response for {height}-{end_height}")
async with self._blockchain_lock_high_priority:
success, advanced_peak, fork_height, coin_changes = await self.receive_block_batch(
response.blocks, peer, None
)
state_change_summary: Optional[StateChangeSummary]
success, state_change_summary = await self.receive_block_batch(response.blocks, peer, None)
if not success:
raise ValueError(f"Error short batch syncing, failed to validate blocks {height}-{end_height}")
if advanced_peak:
peak = self.blockchain.get_peak()
if state_change_summary is not None:
try:
peak_fb: Optional[FullBlock] = await self.blockchain.get_full_peak()
assert peak is not None and peak_fb is not None and fork_height is not None
mempool_new_peak_result, fns_peak_result = await self.peak_post_processing(
peak_fb, peak, fork_height, peer, coin_changes[0]
assert peak_fb is not None
ppp_result: PeakPostProcessingResult = await self.peak_post_processing(
peak_fb,
state_change_summary,
peer,
)
await self.peak_post_processing_2(
peak_fb, peak, fork_height, peer, coin_changes, mempool_new_peak_result, fns_peak_result
)
except asyncio.CancelledError:
# Still do post processing after cancel
await self.peak_post_processing_2(peak_fb, peer, state_change_summary, ppp_result)
except Exception:
# Still do post processing after cancel (or exception)
peak_fb = await self.blockchain.get_full_peak()
assert peak is not None and peak_fb is not None and fork_height is not None
await self.peak_post_processing(peak_fb, peak, fork_height, peer, coin_changes[0])
assert peak_fb is not None
await self.peak_post_processing(peak_fb, state_change_summary, peer)
raise
finally:
self.log.info(f"Added blocks {height}-{end_height}")
@ -942,7 +949,7 @@ class FullNode:
)
batch_size = self.constants.MAX_BLOCK_COUNT_PER_REQUESTS
async def fetch_block_batches(batch_queue, peers_with_peak: List[ws.WSChiaConnection]):
async def fetch_block_batches(batch_queue: asyncio.Queue, peers_with_peak: List[ws.WSChiaConnection]):
try:
for start_height in range(fork_point_height, target_peak_sb_height, batch_size):
end_height = min(target_peak_sb_height, start_height + batch_size)
@ -973,17 +980,17 @@ class FullNode:
# finished signal with None
await batch_queue.put(None)
async def validate_block_batches(batch_queue):
async def validate_block_batches(inner_batch_queue: asyncio.Queue):
advanced_peak = False
while True:
res = await batch_queue.get()
res = await inner_batch_queue.get()
if res is None:
self.log.debug("done fetching blocks")
return
peer, blocks = res
start_height = blocks[0].height
end_height = blocks[-1].height
success, advanced_peak, fork_height, coin_states = await self.receive_block_batch(
success, state_change_summary = await self.receive_block_batch(
blocks, peer, None if advanced_peak else uint32(fork_point_height), summaries
)
if success is False:
@ -992,9 +999,16 @@ class FullNode:
await peer.close(600)
raise ValueError(f"Failed to validate block batch {start_height} to {end_height}")
self.log.info(f"Added blocks {start_height} to {end_height}")
peak = self.blockchain.get_peak()
if len(coin_states) > 0 and fork_height is not None:
await self.update_wallets(peak.height, fork_height, peak.header_hash, coin_states)
peak: Optional[BlockRecord] = self.blockchain.get_peak()
if state_change_summary is not None:
advanced_peak = True
assert peak is not None
# Hints must be added to the DB. The other post-processing tasks are not required when syncing
hints_to_add, lookup_coin_ids = get_hints_and_subscription_coin_ids(
state_change_summary, self.coin_subscriptions, self.ph_subscriptions
)
await self.hint_store.add_hints(hints_to_add)
await self.update_wallets(state_change_summary, hints_to_add, lookup_coin_ids)
await self.send_peak_to_wallets()
self.blockchain.clean_block_record(end_height - self.constants.BLOCKS_CACHE_SIZE)
@ -1029,51 +1043,47 @@ class FullNode:
async def update_wallets(
self,
height: uint32,
fork_height: uint32,
peak_hash: bytes32,
state_update: Tuple[List[CoinRecord], Dict[bytes, Dict[bytes32, CoinRecord]]],
):
state_change_summary: StateChangeSummary,
hints: List[Tuple[bytes32, bytes]],
lookup_coin_ids: List[bytes32],
) -> None:
# Looks up coin records in DB for the coins that wallets are interested in
new_states: List[CoinRecord] = await self.coin_store.get_coin_records(list(lookup_coin_ids))
# Re-arrange to a map, and filter out any non-ph sized hint
coin_id_to_ph_hint: Dict[bytes32, bytes32] = {
coin_id: bytes32(hint) for coin_id, hint in hints if len(hint) == 32
}
changes_for_peer: Dict[bytes32, Set[CoinState]] = {}
for coin_record in state_change_summary.rolled_back_records + [s for s in new_states if s is not None]:
cr_name: bytes32 = coin_record.name
for peer in self.coin_subscriptions.get(cr_name, []):
if peer not in changes_for_peer:
changes_for_peer[peer] = set()
changes_for_peer[peer].add(coin_record.coin_state)
states, hint_state = state_update
for peer in self.ph_subscriptions.get(coin_record.coin.puzzle_hash, []):
if peer not in changes_for_peer:
changes_for_peer[peer] = set()
changes_for_peer[peer].add(coin_record.coin_state)
for coin_record in states:
if coin_record.name in self.coin_subscriptions:
subscribed_peers = self.coin_subscriptions[coin_record.name]
for peer in subscribed_peers:
if cr_name in coin_id_to_ph_hint:
for peer in self.ph_subscriptions.get(coin_id_to_ph_hint[cr_name], []):
if peer not in changes_for_peer:
changes_for_peer[peer] = set()
changes_for_peer[peer].add(coin_record.coin_state)
if coin_record.coin.puzzle_hash in self.ph_subscriptions:
subscribed_peers = self.ph_subscriptions[coin_record.coin.puzzle_hash]
for peer in subscribed_peers:
if peer not in changes_for_peer:
changes_for_peer[peer] = set()
changes_for_peer[peer].add(coin_record.coin_state)
# This is just a verification that the assumptions justifying the ignore below
# are valid.
hint: bytes
for hint, records in hint_state.items():
# While `hint` is typed as a `bytes`, and this is locally verified
# immediately above, if it has length 32 then it might match an entry in
# `self.ph_subscriptions`. It is unclear if there is a more proper means
# of handling this situation.
subscribed_peers = self.ph_subscriptions.get(hint) # type: ignore[call-overload]
if subscribed_peers is not None:
for peer in subscribed_peers:
if peer not in changes_for_peer:
changes_for_peer[peer] = set()
for record in records.values():
changes_for_peer[peer].add(record.coin_state)
for peer, changes in changes_for_peer.items():
if peer not in self.server.all_connections:
continue
ws_peer: ws.WSChiaConnection = self.server.all_connections[peer]
state = CoinStateUpdate(height, fork_height, peak_hash, list(changes))
state = CoinStateUpdate(
state_change_summary.peak.height,
state_change_summary.fork_height,
state_change_summary.peak.header_hash,
list(changes),
)
msg = make_msg(ProtocolMessageTypes.coin_state_update, state)
await ws_peer.send_message(msg)
@ -1083,9 +1093,9 @@ class FullNode:
peer: ws.WSChiaConnection,
fork_point: Optional[uint32],
wp_summaries: Optional[List[SubEpochSummary]] = None,
) -> Tuple[bool, bool, Optional[uint32], Tuple[List[CoinRecord], Dict[bytes, Dict[bytes32, CoinRecord]]]]:
advanced_peak = False
fork_height: Optional[uint32] = uint32(0)
) -> Tuple[bool, Optional[StateChangeSummary]]:
# Precondition: All blocks must be contiguous blocks, index i+1 must be the parent of index i
# Returns a bool for success, as well as a StateChangeSummary if the peak was advanced
blocks_to_validate: List[FullBlock] = []
for i, block in enumerate(all_blocks):
@ -1093,7 +1103,7 @@ class FullNode:
blocks_to_validate = all_blocks[i:]
break
if len(blocks_to_validate) == 0:
return True, False, fork_height, ([], {})
return True, None
# Validates signatures in multiprocessing since they take a while, and we don't have cached transactions
# for these blocks (unlike during normal operation where we validate one at a time)
@ -1114,45 +1124,48 @@ class FullNode:
self.log.error(
f"Invalid block from peer: {peer.get_peer_logging()} {Err(pre_validation_results[i].error)}"
)
return False, advanced_peak, fork_height, ([], {})
return False, None
# Dicts because deduping
all_coin_changes: Dict[bytes32, CoinRecord] = {}
all_hint_changes: Dict[bytes, Dict[bytes32, CoinRecord]] = {}
agg_state_change_summary: Optional[StateChangeSummary] = None
for i, block in enumerate(blocks_to_validate):
assert pre_validation_results[i].required_iters is not None
result, error, fork_height, coin_changes = await self.blockchain.receive_block(
state_change_summary: Optional[StateChangeSummary]
advanced_peak = agg_state_change_summary is not None
result, error, state_change_summary = await self.blockchain.receive_block(
block, pre_validation_results[i], None if advanced_peak else fork_point
)
coin_record_list, hint_records = coin_changes
# Update all changes
for record in coin_record_list:
all_coin_changes[record.name] = record
for hint, list_of_records in hint_records.items():
if hint not in all_hint_changes:
all_hint_changes[hint] = {}
for record in list_of_records.values():
all_hint_changes[hint][record.name] = record
if result == ReceiveBlockResult.NEW_PEAK:
advanced_peak = True
assert state_change_summary is not None
# Since all blocks are contiguous, we can simply append the rollback changes and npc results
if agg_state_change_summary is None:
agg_state_change_summary = state_change_summary
else:
# Keeps the old, original fork_height, since the next blocks will have fork height h-1
# Groups up all state changes into one
agg_state_change_summary = StateChangeSummary(
state_change_summary.peak,
agg_state_change_summary.fork_height,
agg_state_change_summary.rolled_back_records + state_change_summary.rolled_back_records,
agg_state_change_summary.new_npc_results + state_change_summary.new_npc_results,
agg_state_change_summary.new_rewards + state_change_summary.new_rewards,
)
elif result == ReceiveBlockResult.INVALID_BLOCK or result == ReceiveBlockResult.DISCONNECTED_BLOCK:
if error is not None:
self.log.error(f"Error: {error}, Invalid block from peer: {peer.get_peer_logging()} ")
return False, advanced_peak, fork_height, ([], {})
return False, agg_state_change_summary
block_record = self.blockchain.block_record(block.header_hash)
if block_record.sub_epoch_summary_included is not None:
if self.weight_proof_handler is not None:
await self.weight_proof_handler.create_prev_sub_epoch_segments()
if advanced_peak:
if agg_state_change_summary is not None:
self._state_changed("new_peak")
self.log.debug(
f"Total time for {len(blocks_to_validate)} blocks: {time.time() - pre_validate_start}, "
f"advanced: {advanced_peak}"
f"advanced: True"
)
return True, advanced_peak, fork_height, (list(all_coin_changes.values()), all_hint_changes)
return True, agg_state_change_summary
async def _finish_sync(self):
"""
@ -1172,13 +1185,11 @@ class FullNode:
peak_fb: FullBlock = await self.blockchain.get_full_peak()
if peak is not None:
mempool_new_peak_result, fns_peak_result = await self.peak_post_processing(
peak_fb, peak, max(peak.height - 1, 0), None, []
)
await self.peak_post_processing_2(
peak_fb, peak, max(peak.height - 1, 0), None, ([], {}), mempool_new_peak_result, fns_peak_result
state_change_summary = StateChangeSummary(peak, max(peak.height - 1, 0), [], [], [])
ppp_result: PeakPostProcessingResult = await self.peak_post_processing(
peak_fb, state_change_summary, None
)
await self.peak_post_processing_2(peak_fb, None, state_change_summary, ppp_result)
if peak is not None and self.weight_proof_handler is not None:
await self.weight_proof_handler.get_proof_of_weight(peak.header_hash)
@ -1261,22 +1272,22 @@ class FullNode:
async def peak_post_processing(
self,
block: FullBlock,
record: BlockRecord,
fork_height: uint32,
state_change_summary: StateChangeSummary,
peer: Optional[ws.WSChiaConnection],
coin_changes: List[CoinRecord],
):
) -> PeakPostProcessingResult:
"""
Must be called under self.blockchain.lock. This updates the internal state of the full node with the
latest peak information. It also notifies peers about the new peak.
"""
record = state_change_summary.peak
difficulty = self.blockchain.get_next_difficulty(record.header_hash, False)
sub_slot_iters = self.blockchain.get_next_slot_iters(record.header_hash, False)
self.log.info(
f"🌱 Updated peak to height {record.height}, weight {record.weight}, "
f"hh {record.header_hash}, "
f"forked at {fork_height}, rh: {record.reward_infusion_new_challenge}, "
f"forked at {state_change_summary.fork_height}, rh: {record.reward_infusion_new_challenge}, "
f"total iters: {record.total_iters}, "
f"overflow: {record.overflow}, "
f"deficit: {record.deficit}, "
@ -1288,6 +1299,17 @@ class FullNode:
f"{len(block.transactions_generator_ref_list) if block.transactions_generator else 'No tx'}"
)
if (
self.full_node_store.previous_generator is not None
and state_change_summary.fork_height < self.full_node_store.previous_generator.block_height
):
self.full_node_store.previous_generator = None
hints_to_add, lookup_coin_ids = get_hints_and_subscription_coin_ids(
state_change_summary, self.coin_subscriptions, self.ph_subscriptions
)
await self.hint_store.add_hints(hints_to_add)
sub_slots = await self.blockchain.get_sp_and_ip_sub_slots(record.header_hash)
assert sub_slots is not None
@ -1295,9 +1317,9 @@ class FullNode:
self.blockchain.clean_block_records()
fork_block: Optional[BlockRecord] = None
if fork_height != block.height - 1 and block.height != 0:
if state_change_summary.fork_height != block.height - 1 and block.height != 0:
# This is a reorg
fork_hash: Optional[bytes32] = self.blockchain.height_to_hash(fork_height)
fork_hash: Optional[bytes32] = self.blockchain.height_to_hash(state_change_summary.fork_height)
assert fork_hash is not None
fork_block = self.blockchain.block_record(fork_hash)
@ -1340,8 +1362,9 @@ class FullNode:
)
# Update the mempool (returns successful pending transactions added to the mempool)
new_npc_results: List[NPCResult] = state_change_summary.new_npc_results
mempool_new_peak_result: List[Tuple[SpendBundle, NPCResult, bytes32]] = await self.mempool_manager.new_peak(
self.blockchain.get_peak(), coin_changes
self.blockchain.get_peak(), new_npc_results[-1] if len(new_npc_results) > 0 else None
)
# Check if we detected a spent transaction, to load up our generator cache
@ -1350,23 +1373,22 @@ class FullNode:
if generator_arg:
self.log.info(f"Saving previous generator for height {block.height}")
self.full_node_store.previous_generator = generator_arg
return mempool_new_peak_result, fns_peak_result
return PeakPostProcessingResult(mempool_new_peak_result, fns_peak_result, hints_to_add, lookup_coin_ids)
async def peak_post_processing_2(
self,
block: FullBlock,
record: BlockRecord,
fork_height: uint32,
peer: Optional[ws.WSChiaConnection],
coin_changes: Tuple[List[CoinRecord], Dict[bytes, Dict[bytes32, CoinRecord]]],
mempool_peak_result: List[Tuple[SpendBundle, NPCResult, bytes32]],
fns_peak_result: FullNodeStorePeakResult,
state_change_summary: StateChangeSummary,
ppp_result: PeakPostProcessingResult,
):
"""
Does NOT need to be called under the blockchain lock. Handle other parts of post processing like communicating
with peers
"""
for bundle, result, spend_name in mempool_peak_result:
record = state_change_summary.peak
for bundle, result, spend_name in ppp_result.mempool_peak_result:
self.log.debug(f"Added transaction to mempool: {spend_name}")
mempool_item = self.mempool_manager.get_mempool_item(spend_name)
assert mempool_item is not None
@ -1382,12 +1404,12 @@ class FullNode:
await self.server.send_to_all([msg], NodeType.FULL_NODE)
# If there were pending end of slots that happen after this peak, broadcast them if they are added
if fns_peak_result.added_eos is not None:
if ppp_result.fns_peak_result.added_eos is not None:
broadcast = full_node_protocol.NewSignagePointOrEndOfSubSlot(
fns_peak_result.added_eos.challenge_chain.challenge_chain_end_of_slot_vdf.challenge,
fns_peak_result.added_eos.challenge_chain.get_hash(),
ppp_result.fns_peak_result.added_eos.challenge_chain.challenge_chain_end_of_slot_vdf.challenge,
ppp_result.fns_peak_result.added_eos.challenge_chain.get_hash(),
uint8(0),
fns_peak_result.added_eos.reward_chain.end_of_slot_vdf.challenge,
ppp_result.fns_peak_result.added_eos.reward_chain.end_of_slot_vdf.challenge,
)
msg = make_msg(ProtocolMessageTypes.new_signage_point_or_end_of_sub_slot, broadcast)
await self.server.send_to_all([msg], NodeType.FULL_NODE)
@ -1409,7 +1431,7 @@ class FullNode:
record.header_hash,
record.height,
record.weight,
fork_height,
state_change_summary.fork_height,
block.reward_chain_block.get_unfinished().get_hash(),
),
)
@ -1425,10 +1447,10 @@ class FullNode:
record.header_hash,
record.height,
record.weight,
fork_height,
state_change_summary.fork_height,
),
)
await self.update_wallets(record.height, fork_height, record.header_hash, coin_changes)
await self.update_wallets(state_change_summary, ppp_result.hints, ppp_result.lookup_coin_ids)
await self.server.send_to_all([msg], NodeType.WALLET)
self._state_changed("new_peak")
@ -1501,8 +1523,8 @@ class FullNode:
)
# This recursion ends here, we cannot recurse again because transactions_generator is not None
return await self.respond_block(block_response, peer)
coin_changes: Tuple[List[CoinRecord], Dict[bytes, Dict[bytes32, CoinRecord]]] = ([], {})
mempool_new_peak_result, fns_peak_result = None, None
state_change_summary: Optional[StateChangeSummary] = None
ppp_result: Optional[PeakPostProcessingResult] = None
async with self._blockchain_lock_high_priority:
# After acquiring the lock, check again, because another asyncio thread might have added it
if self.blockchain.contains_block(header_hash):
@ -1527,7 +1549,6 @@ class FullNode:
if Err(pre_validation_results[0].error) == Err.INVALID_PREV_BLOCK_HASH:
added = ReceiveBlockResult.DISCONNECTED_BLOCK
error_code: Optional[Err] = Err.INVALID_PREV_BLOCK_HASH
fork_height: Optional[uint32] = None
else:
raise ValueError(
f"Failed to validate block {header_hash} height "
@ -1538,24 +1559,15 @@ class FullNode:
pre_validation_results[0] if pre_validation_result is None else pre_validation_result
)
assert result_to_validate.required_iters == pre_validation_results[0].required_iters
added, error_code, fork_height, coin_changes = await self.blockchain.receive_block(
(added, error_code, state_change_summary) = await self.blockchain.receive_block(
block, result_to_validate, None
)
if (
self.full_node_store.previous_generator is not None
and fork_height is not None
and fork_height < self.full_node_store.previous_generator.block_height
):
self.full_node_store.previous_generator = None
if added == ReceiveBlockResult.ALREADY_HAVE_BLOCK:
return None
elif added == ReceiveBlockResult.INVALID_BLOCK:
assert error_code is not None
self.log.error(f"Block {header_hash} at height {block.height} is invalid with code {error_code}.")
raise ConsensusError(error_code, [header_hash])
elif added == ReceiveBlockResult.DISCONNECTED_BLOCK:
self.log.info(f"Disconnected block {header_hash} at height {block.height}")
if raise_on_disconnected:
@ -1563,11 +1575,8 @@ class FullNode:
return None
elif added == ReceiveBlockResult.NEW_PEAK:
# Only propagate blocks which extend the blockchain (becomes one of the heads)
new_peak: Optional[BlockRecord] = self.blockchain.get_peak()
assert new_peak is not None and fork_height is not None
mempool_new_peak_result, fns_peak_result = await self.peak_post_processing(
block, new_peak, fork_height, peer, coin_changes[0]
)
assert state_change_summary is not None
ppp_result = await self.peak_post_processing(block, state_change_summary, peer)
elif added == ReceiveBlockResult.ADDED_AS_ORPHAN:
self.log.info(
@ -1579,22 +1588,16 @@ class FullNode:
except asyncio.CancelledError:
# We need to make sure to always call this method even when we get a cancel exception, to make sure
# the node stays in sync
new_peak = self.blockchain.get_peak()
if added == ReceiveBlockResult.NEW_PEAK:
assert new_peak is not None
assert fork_height is not None
await self.peak_post_processing(block, new_peak, fork_height, peer, coin_changes[0])
assert state_change_summary is not None
await self.peak_post_processing(block, state_change_summary, peer)
raise
validation_time = time.time() - validation_start
if mempool_new_peak_result is not None:
assert new_peak is not None
assert fork_height is not None
assert fns_peak_result is not None
await self.peak_post_processing_2(
block, new_peak, fork_height, peer, coin_changes, mempool_new_peak_result, fns_peak_result
)
if ppp_result is not None:
assert state_change_summary is not None
await self.peak_post_processing_2(block, peer, state_change_summary, ppp_result)
percent_full_str = (
(

View file

@ -0,0 +1,56 @@
from typing import Dict, List, Set, Tuple
from chia.consensus.blockchain import StateChangeSummary
from chia.types.blockchain_format.coin import Coin
from chia.types.blockchain_format.sized_bytes import bytes32
def get_hints_and_subscription_coin_ids(
state_change_summary: StateChangeSummary,
coin_subscriptions: Dict[bytes32, Set[bytes32]],
ph_subscriptions: Dict[bytes32, Set[bytes32]],
) -> Tuple[List[Tuple[bytes32, bytes]], List[bytes32]]:
# Precondition: all hints passed in are max 32 bytes long
# Returns the hints that we need to add to the DB, and the coin ids that need to be looked up
# Finds the coin IDs that we need to lookup in order to notify wallets of hinted transactions
hint: bytes
hints_to_add: List[Tuple[bytes32, bytes]] = []
# Goes through additions and removals for each block and flattens to a map and a set
lookup_coin_ids: Set[bytes32] = set()
def add_if_coin_subscription(coin_id: bytes32) -> None:
if coin_id in coin_subscriptions:
lookup_coin_ids.add(coin_id)
def add_if_ph_subscription(puzzle_hash: bytes32, coin_id: bytes32) -> None:
if puzzle_hash in ph_subscriptions:
lookup_coin_ids.add(coin_id)
for npc_result in state_change_summary.new_npc_results:
if npc_result.conds is not None:
for spend in npc_result.conds.spends:
# Record all coin_ids that we are interested in, that had changes
add_if_coin_subscription(spend.coin_id)
add_if_ph_subscription(spend.puzzle_hash, spend.coin_id)
for new_ph, new_am, hint in spend.create_coin:
addition_coin: Coin = Coin(spend.coin_id, new_ph, new_am)
addition_coin_name = addition_coin.name()
add_if_coin_subscription(addition_coin_name)
add_if_ph_subscription(addition_coin.puzzle_hash, addition_coin_name)
if len(hint) == 32:
add_if_ph_subscription(bytes32(hint), addition_coin_name)
if len(hint) > 0:
assert len(hint) <= 32
hints_to_add.append((addition_coin_name, hint))
# Goes through all new reward coins
for reward_coin in state_change_summary.new_rewards:
reward_coin_name: bytes32 = reward_coin.name()
add_if_coin_subscription(reward_coin_name)
add_if_ph_subscription(reward_coin.puzzle_hash, reward_coin_name)
return hints_to_add, list(lookup_coin_ids)

View file

@ -35,6 +35,9 @@ class HintStore:
return coin_ids
async def add_hints(self, coin_hint_list: List[Tuple[bytes32, bytes]]) -> None:
if len(coin_hint_list) == 0:
return None
async with self.db_wrapper.write_db() as conn:
if self.db_wrapper.db_version == 2:
cursor = await conn.executemany(

View file

@ -514,7 +514,7 @@ class MempoolManager:
return None
async def new_peak(
self, new_peak: Optional[BlockRecord], coin_changes: List[CoinRecord]
self, new_peak: Optional[BlockRecord], last_npc_result: Optional[NPCResult]
) -> List[Tuple[SpendBundle, NPCResult, bytes32]]:
"""
Called when a new peak is available, we try to recreate a mempool for the new tip.
@ -530,13 +530,14 @@ class MempoolManager:
use_optimization: bool = self.peak is not None and new_peak.prev_transaction_block_hash == self.peak.header_hash
self.peak = new_peak
if use_optimization:
if use_optimization and last_npc_result is not None:
# We don't reinitialize a mempool, just kick removed items
for coin_record in coin_changes:
if coin_record.name in self.mempool.removals:
item = self.mempool.removals[coin_record.name]
self.mempool.remove_from_pool(item)
self.remove_seen(item.spend_bundle_name)
if last_npc_result.conds is not None:
for spend in last_npc_result.conds.spends:
if spend.coin_id in self.mempool.removals:
item = self.mempool.removals[spend.coin_id]
self.mempool.remove_from_pool(item)
self.remove_seen(item.spend_bundle_name)
else:
old_pool = self.mempool
self.mempool = Mempool(self.mempool_max_total_cost)

View file

@ -75,7 +75,11 @@ async def _validate_and_add_block(
await check_block_store_invariant(blockchain)
return None
result, err, _, _ = await blockchain.receive_block(block, results, fork_point_with_peak=fork_point_with_peak)
(
result,
err,
_,
) = await blockchain.receive_block(block, results, fork_point_with_peak=fork_point_with_peak)
await check_block_store_invariant(blockchain)
if expected_error is None and expected_result != ReceiveBlockResult.INVALID_BLOCK:

View file

@ -1721,7 +1721,7 @@ class TestPreValidation:
assert res[n].error is None
block = blocks_to_validate[n]
start_rb = time.time()
result, err, _, _ = await empty_blockchain.receive_block(block, res[n])
result, err, _ = await empty_blockchain.receive_block(block, res[n])
end_rb = time.time()
times_rb.append(end_rb - start_rb)
assert err is None
@ -1815,7 +1815,10 @@ class TestBodyValidation:
)
# Ignore errors from pre-validation, we are testing block_body_validation
repl_preval_results = dataclasses.replace(pre_validation_results[0], error=None, required_iters=uint64(1))
assert (await b.receive_block(blocks[-1], repl_preval_results))[0:-1] == (ReceiveBlockResult.NEW_PEAK, None, 2)
code, err, state_change = await b.receive_block(blocks[-1], repl_preval_results)
assert code == ReceiveBlockResult.NEW_PEAK
assert err is None
assert state_change.fork_height == 2
@pytest.mark.asyncio
@pytest.mark.parametrize("opcode", [ConditionOpcode.AGG_SIG_ME, ConditionOpcode.AGG_SIG_UNSAFE])
@ -1871,7 +1874,8 @@ class TestBodyValidation:
)
# Ignore errors from pre-validation, we are testing block_body_validation
repl_preval_results = dataclasses.replace(pre_validation_results[0], error=None, required_iters=uint64(1))
assert (await b.receive_block(blocks[-1], repl_preval_results))[0:-1] == expected
res, error, state_change = await b.receive_block(blocks[-1], repl_preval_results)
assert (res, error, state_change.fork_height if state_change else None) == expected
@pytest.mark.asyncio
@pytest.mark.parametrize(
@ -2365,7 +2369,7 @@ class TestBodyValidation:
mempool_mode=False,
height=softfork_height,
)
result, err, _, _ = await b.receive_block(block_2, PreValidationResult(None, uint64(1), npc_result, False))
result, err, _ = await b.receive_block(block_2, PreValidationResult(None, uint64(1), npc_result, False))
assert err == Err.INVALID_BLOCK_COST
# too low
@ -2390,7 +2394,7 @@ class TestBodyValidation:
mempool_mode=False,
height=softfork_height,
)
result, err, _, _ = await b.receive_block(block_2, PreValidationResult(None, uint64(1), npc_result, False))
result, err, _ = await b.receive_block(block_2, PreValidationResult(None, uint64(1), npc_result, False))
assert err == Err.INVALID_BLOCK_COST
# too high
@ -2416,7 +2420,7 @@ class TestBodyValidation:
height=softfork_height,
)
result, err, _, _ = await b.receive_block(block_2, PreValidationResult(None, uint64(1), npc_result, False))
result, err, _ = await b.receive_block(block_2, PreValidationResult(None, uint64(1), npc_result, False))
assert err == Err.INVALID_BLOCK_COST
# when the CLVM program exceeds cost during execution, it will fail with
@ -3341,12 +3345,12 @@ async def test_reorg_flip_flop(empty_blockchain, bt):
preval: List[PreValidationResult] = await b.pre_validate_blocks_multiprocessing(
[block1], {}, validate_signatures=False
)
result, err, _, _ = await b.receive_block(block1, preval[0], fork_point_with_peak=fork_height)
result, err, _ = await b.receive_block(block1, preval[0], fork_point_with_peak=fork_height)
assert not err
preval: List[PreValidationResult] = await b.pre_validate_blocks_multiprocessing(
[block2], {}, validate_signatures=False
)
result, err, _, _ = await b.receive_block(block2, preval[0], fork_point_with_peak=fork_height)
result, err, _ = await b.receive_block(block2, preval[0], fork_point_with_peak=fork_height)
assert not err
assert b.get_peak().height == 39

View file

@ -2,13 +2,14 @@ import logging
import pytest
from clvm.casts import int_to_bytes
from chia.consensus.blockchain import Blockchain
from chia.full_node.hint_store import HintStore
from chia.protocols.full_node_protocol import RespondBlock
from chia.types.blockchain_format.coin import Coin
from chia.types.blockchain_format.sized_bytes import bytes32
from chia.types.condition_opcodes import ConditionOpcode
from chia.types.condition_with_args import ConditionWithArgs
from chia.types.spend_bundle import SpendBundle
from tests.blockchain.blockchain_test_utils import _validate_and_add_block, _validate_and_add_block_no_error
from chia.util.ints import uint64
from tests.util.db_connection import DBConnection
from tests.wallet_tools import WalletTool
@ -104,8 +105,8 @@ class TestHintStore:
assert rows[0][0] == 4
@pytest.mark.asyncio
async def test_hints_in_blockchain(self, empty_blockchain, bt): # noqa: F811
blockchain: Blockchain = empty_blockchain
async def test_hints_in_blockchain(self, bt, wallet_nodes): # noqa: F811
full_node_1, full_node_2, server_1, server_2, wallet_a, wallet_receiver = wallet_nodes
blocks = bt.get_consecutive_blocks(
5,
@ -115,12 +116,12 @@ class TestHintStore:
pool_reward_puzzle_hash=bt.pool_ph,
)
for block in blocks:
await _validate_and_add_block(blockchain, block)
await full_node_1.full_node.respond_block(RespondBlock(block), None)
wt: WalletTool = bt.get_pool_wallet_tool()
puzzle_hash = 32 * b"\0"
puzzle_hash = bytes32(32 * b"\0")
amount = int_to_bytes(1)
hint = 32 * b"\5"
hint = bytes32(32 * b"\5")
coin_spent = list(blocks[-1].get_included_reward_coins())[0]
condition_dict = {
ConditionOpcode.CREATE_COIN: [ConditionWithArgs(ConditionOpcode.CREATE_COIN, [puzzle_hash, amount, hint])]
@ -136,12 +137,12 @@ class TestHintStore:
10, block_list_input=blocks, guarantee_transaction_block=True, transaction_data=tx
)
for block in blocks:
await _validate_and_add_block_no_error(blockchain, block)
for block in blocks[-10:]:
await full_node_1.full_node.respond_block(RespondBlock(block), None)
get_hint = await blockchain.hint_store.get_coin_ids(hint)
get_hint = await full_node_1.full_node.blockchain.hint_store.get_coin_ids(hint)
assert get_hint[0] == Coin(coin_spent.name(), puzzle_hash, 1).name()
assert get_hint[0] == Coin(coin_spent.name(), puzzle_hash, uint64(1)).name()
@pytest.mark.asyncio
async def test_counts(self, db_version):

View file

@ -0,0 +1,52 @@
from typing import List
from chia.types.blockchain_format.coin import Coin
from chia.types.spend_bundle_conditions import Spend, SpendBundleConditions
from chia.util.generator_tools import tx_removals_and_additions
from chia.util.hash import std_hash
from chia.util.ints import uint32, uint64
coin_ids = [std_hash(i.to_bytes(4, "big")) for i in range(10)]
phs = [std_hash(i.to_bytes(4, "big")) for i in range(10)]
spends: List[Spend] = [
Spend(
coin_ids[0],
phs[0],
None,
uint64(5),
[
(phs[2], uint64(123), b""),
(phs[3], uint64(0), b"1" * 300),
(phs[4], uint64(0), b"1" * 300),
],
[],
),
Spend(
coin_ids[1],
phs[0],
None,
uint64(2),
[
(phs[5], uint64(123), b""),
(phs[6], uint64(0), b"1" * 300),
(phs[7], uint64(0), b"1" * 300),
],
[],
),
]
def test_tx_removals_and_additions() -> None:
conditions = SpendBundleConditions(spends, uint64(0), uint32(0), uint64(0), [], uint64(0))
expected_rems = [coin_ids[0], coin_ids[1]]
expected_additions = []
for spend in spends:
for puzzle_hash, am, _ in spend.create_coin:
expected_additions.append(Coin(spend.coin_id, puzzle_hash, am))
rems, adds = tx_removals_and_additions(conditions)
assert rems == expected_rems
assert adds == expected_additions
def test_empty_conditions() -> None:
assert tx_removals_and_additions(None) == ([], [])

View file

@ -0,0 +1,124 @@
from typing import List, Optional
import pytest
from chia.consensus.block_record import BlockRecord
from chia.consensus.blockchain import Blockchain, StateChangeSummary
from chia.consensus.cost_calculator import NPCResult
from chia.full_node.hint_management import get_hints_and_subscription_coin_ids
from chia.types.blockchain_format.coin import Coin
from chia.types.blockchain_format.sized_bytes import bytes32
from chia.types.spend_bundle_conditions import Spend, SpendBundleConditions
from chia.util.hash import std_hash
from chia.util.ints import uint32, uint64
from tests.block_tools import BlockTools
from tests.blockchain.blockchain_test_utils import _validate_and_add_block
coin_ids = [std_hash(i.to_bytes(4, "big")) for i in range(10)]
phs = [std_hash(i.to_bytes(4, "big")) for i in range(10)]
spends: List[Spend] = [
Spend(
coin_ids[0],
phs[0],
None,
uint64(5),
[
(phs[2], uint64(123), b""),
(phs[4], uint64(3), b"1" * 32),
],
[],
),
Spend(
coin_ids[2],
phs[0],
None,
uint64(6),
[
(phs[7], uint64(123), b""),
(phs[4], uint64(6), b""),
(phs[9], uint64(123), b"1" * 32),
],
[],
),
Spend(
coin_ids[1],
phs[7],
None,
uint64(2),
[
(phs[5], uint64(123), b""),
(phs[6], uint64(5), b"1" * 3),
],
[],
),
]
@pytest.mark.asyncio
async def test_hints_to_add(bt: BlockTools, empty_blockchain: Blockchain) -> None:
blocks = bt.get_consecutive_blocks(2)
await _validate_and_add_block(empty_blockchain, blocks[0])
await _validate_and_add_block(empty_blockchain, blocks[1])
br: Optional[BlockRecord] = empty_blockchain.get_peak()
assert br is not None
sbc: SpendBundleConditions = SpendBundleConditions(spends, uint64(0), uint32(0), uint64(0), [], uint64(0))
npc_res = [NPCResult(None, None, uint64(0)), NPCResult(None, sbc, uint64(0))]
scs = StateChangeSummary(br, uint32(0), [], npc_res, [])
hints_to_add, lookup_coin_ids = get_hints_and_subscription_coin_ids(scs, {}, {})
assert len(lookup_coin_ids) == 0
first_coin_id: bytes32 = Coin(spends[0].coin_id, phs[4], uint64(3)).name()
second_coin_id: bytes32 = Coin(spends[2].coin_id, phs[6], uint64(5)).name()
third_coin_id: bytes32 = Coin(spends[1].coin_id, phs[9], uint64(123)).name()
assert set(hints_to_add) == {(first_coin_id, b"1" * 32), (second_coin_id, b"1" * 3), (third_coin_id, b"1" * 32)}
@pytest.mark.asyncio
async def test_lookup_coin_ids(bt: BlockTools, empty_blockchain: Blockchain) -> None:
blocks = bt.get_consecutive_blocks(2)
await _validate_and_add_block(empty_blockchain, blocks[0])
await _validate_and_add_block(empty_blockchain, blocks[1])
br: Optional[BlockRecord] = empty_blockchain.get_peak()
assert br is not None
sbc: SpendBundleConditions = SpendBundleConditions(spends, uint64(0), uint32(0), uint64(0), [], uint64(0))
npc_res = [NPCResult(None, None, uint64(0)), NPCResult(None, sbc, uint64(0))]
rewards: List[Coin] = [
Coin(coin_ids[8], phs[8], uint64(1)),
Coin(coin_ids[9], phs[9], uint64(2)),
Coin(coin_ids[5], phs[8], uint64(1234)),
]
scs = StateChangeSummary(br, uint32(0), [], npc_res, rewards)
# Removal ID and addition PH
coin_subscriptions = {coin_ids[1]: {bytes32(b"2" * 32)}}
ph_subscriptions = {phs[4]: {bytes32(b"3" * 32)}}
_, lookup_coin_ids = get_hints_and_subscription_coin_ids(scs, coin_subscriptions, ph_subscriptions)
first_coin_id: bytes32 = Coin(spends[0].coin_id, phs[4], uint64(3)).name()
second_coin_id: bytes32 = Coin(spends[1].coin_id, phs[4], uint64(6)).name()
assert set(lookup_coin_ids) == {coin_ids[1], first_coin_id, second_coin_id}
# Removal PH and addition ID
coin_subscriptions = {first_coin_id: {bytes32(b"5" * 32)}}
ph_subscriptions = {phs[0]: {bytes32(b"6" * 32)}}
_, lookup_coin_ids = get_hints_and_subscription_coin_ids(scs, coin_subscriptions, ph_subscriptions)
assert set(lookup_coin_ids) == {first_coin_id, coin_ids[0], coin_ids[2]}
# Subscribe to hint
third_coin_id: bytes32 = Coin(spends[1].coin_id, phs[9], uint64(123)).name()
ph_subscriptions = {bytes32(b"1" * 32): {bytes32(b"7" * 32)}}
_, lookup_coin_ids = get_hints_and_subscription_coin_ids(scs, {}, ph_subscriptions)
assert set(lookup_coin_ids) == {first_coin_id, third_coin_id}
# Reward PH
ph_subscriptions = {rewards[0].puzzle_hash: {bytes32(b"8" * 32)}}
_, lookup_coin_ids = get_hints_and_subscription_coin_ids(scs, {}, ph_subscriptions)
assert set(lookup_coin_ids) == {rewards[0].name(), rewards[2].name()}
# Reward coin id + reward ph
coin_subscriptions = {rewards[1].name(): {bytes32(b"9" * 32)}}
_, lookup_coin_ids = get_hints_and_subscription_coin_ids(scs, coin_subscriptions, ph_subscriptions)
assert set(lookup_coin_ids) == {rewards[1].name(), rewards[0].name(), rewards[2].name()}

View file

@ -77,7 +77,7 @@ class TestDbUpgrade:
for block in blocks:
# await _validate_and_add_block(bc, block)
results = PreValidationResult(None, uint64(1), None, False)
result, err, _, _ = await bc.receive_block(block, results)
result, err, _ = await bc.receive_block(block, results)
assert err is None
finally:
await db_wrapper1.close()

View file

@ -146,7 +146,7 @@ async def make_db(db_file: Path, blocks: List[FullBlock]) -> None:
for block in blocks:
results = PreValidationResult(None, uint64(1), None, False)
result, err, _, _ = await bc.receive_block(block, results)
result, err, _ = await bc.receive_block(block, results)
assert err is None
finally:
await db_wrapper.close()

View file

@ -152,16 +152,14 @@ async def run_sync_test(
)
await full_node.respond_block(full_node_protocol.RespondBlock(b))
else:
success, advanced_peak, fork_height, coin_changes = await full_node.receive_block_batch(
block_batch, peer, None
)
success, summary = await full_node.receive_block_batch(block_batch, peer, None)
end_height = block_batch[-1].height
full_node.blockchain.clean_block_record(end_height - full_node.constants.BLOCKS_CACHE_SIZE)
if not success:
raise RuntimeError("failed to ingest block batch")
assert advanced_peak
assert summary is not None
time_per_block = (time.monotonic() - batch_start_time) / len(block_batch)
if not worst_batch_height or worst_batch_time_per_block > time_per_block: