xch-blockchain/benchmarks/mempool.py

308 lines
11 KiB
Python

from __future__ import annotations
import asyncio
import cProfile
import sys
from contextlib import contextmanager
from dataclasses import dataclass
from subprocess import check_call
from time import monotonic
from typing import Dict, Iterator, List, Optional, Tuple
from chia.consensus.coinbase import create_farmer_coin, create_pool_coin
from chia.consensus.cost_calculator import NPCResult
from chia.consensus.default_constants import DEFAULT_CONSTANTS
from chia.full_node.mempool_manager import MempoolManager
from chia.simulator.wallet_tools import WalletTool
from chia.types.blockchain_format.coin import Coin
from chia.types.blockchain_format.sized_bytes import bytes32
from chia.types.coin_record import CoinRecord
from chia.types.mempool_inclusion_status import MempoolInclusionStatus
from chia.types.spend_bundle import SpendBundle
from chia.types.spend_bundle_conditions import Spend, SpendBundleConditions
from chia.util.ints import uint32, uint64
from chia.util.misc import to_batches
NUM_ITERS = 200
NUM_PEERS = 5
@contextmanager
def enable_profiler(profile: bool, name: str) -> Iterator[None]:
if not profile:
yield
return
if sys.version_info < (3, 8):
raise Exception(f"Python 3.8 or higher required when profiling is requested, running with: {sys.version}")
with cProfile.Profile() as pr:
yield
pr.create_stats()
output_file = f"mempool-{name}"
pr.dump_stats(output_file + ".profile")
check_call(["gprof2dot", "-f", "pstats", "-o", output_file + ".dot", output_file + ".profile"])
with open(output_file + ".png", "w+") as f:
check_call(["dot", "-T", "png", output_file + ".dot"], stdout=f)
print(" output written to: %s.png" % output_file)
def make_hash(height: int) -> bytes32:
return bytes32(height.to_bytes(32, byteorder="big"))
@dataclass(frozen=True)
class BenchBlockRecord:
"""
This is a subset of BlockRecord that the mempool manager uses for peak.
"""
header_hash: bytes32
height: uint32
timestamp: Optional[uint64]
prev_transaction_block_height: uint32
prev_transaction_block_hash: Optional[bytes32]
@property
def is_transaction_block(self) -> bool:
return self.timestamp is not None
def fake_block_record(block_height: uint32, timestamp: uint64) -> BenchBlockRecord:
this_hash = make_hash(block_height)
prev_hash = make_hash(block_height - 1)
return BenchBlockRecord(
header_hash=this_hash,
height=block_height,
timestamp=timestamp,
prev_transaction_block_height=uint32(block_height - 1),
prev_transaction_block_hash=prev_hash,
)
async def run_mempool_benchmark() -> None:
all_coins: Dict[bytes32, CoinRecord] = {}
async def get_coin_record(coin_id: bytes32) -> Optional[CoinRecord]:
return all_coins.get(coin_id)
wt = WalletTool(DEFAULT_CONSTANTS)
spend_bundles: List[List[SpendBundle]] = []
# these spend the same coins as spend_bundles but with a higher fee
replacement_spend_bundles: List[List[SpendBundle]] = []
# these spend the same coins as spend_bundles, but they are organized in
# much larger bundles
large_spend_bundles: List[List[SpendBundle]] = []
timestamp = uint64(1631794488)
height = uint32(1)
print("Building SpendBundles")
for peer in range(NUM_PEERS):
print(f" peer {peer}")
print(" reward coins")
unspent: List[Coin] = []
for idx in range(NUM_ITERS):
height = uint32(height + 1)
# 19 seconds per block
timestamp = uint64(timestamp + 19)
# farm rewards
farmer_coin = create_farmer_coin(
height, wt.get_new_puzzlehash(), uint64(250000000), DEFAULT_CONSTANTS.GENESIS_CHALLENGE
)
pool_coin = create_pool_coin(
height, wt.get_new_puzzlehash(), uint64(1750000000), DEFAULT_CONSTANTS.GENESIS_CHALLENGE
)
all_coins[farmer_coin.name()] = CoinRecord(farmer_coin, height, uint32(0), True, timestamp)
all_coins[pool_coin.name()] = CoinRecord(pool_coin, height, uint32(0), True, timestamp)
unspent.extend([farmer_coin, pool_coin])
print(" spend bundles")
bundles: List[SpendBundle] = []
for coin in unspent:
tx: SpendBundle = wt.generate_signed_transaction(
uint64(coin.amount // 2), wt.get_new_puzzlehash(), coin, fee=peer + idx
)
bundles.append(tx)
spend_bundles.append(bundles)
bundles = []
print(" replacement spend bundles")
for coin in unspent:
tx = wt.generate_signed_transaction(
uint64(coin.amount // 2), wt.get_new_puzzlehash(), coin, fee=peer + idx + 10000000
)
bundles.append(tx)
replacement_spend_bundles.append(bundles)
bundles = []
print(" large spend bundles")
for batch in to_batches(unspent, 200):
print(f"{len(batch.entries)} coins")
tx = SpendBundle.aggregate(
[
wt.generate_signed_transaction(uint64(c.amount // 2), wt.get_new_puzzlehash(), c, fee=peer + idx)
for c in batch.entries
]
)
bundles.append(tx)
large_spend_bundles.append(bundles)
start_height = height
for single_threaded in [False, True]:
if single_threaded:
print("\n== Single-threaded")
else:
print("\n== Multi-threaded")
mempool = MempoolManager(get_coin_record, DEFAULT_CONSTANTS, single_threaded=single_threaded)
height = start_height
rec = fake_block_record(height, timestamp)
await mempool.new_peak(rec, None)
async def add_spend_bundles(spend_bundles: List[SpendBundle]) -> None:
for tx in spend_bundles:
spend_bundle_id = tx.name()
npc = await mempool.pre_validate_spendbundle(tx, None, spend_bundle_id)
assert npc is not None
_, status, error = await mempool.add_spend_bundle(tx, npc, spend_bundle_id, height)
assert status == MempoolInclusionStatus.SUCCESS
assert error is None
suffix = "st" if single_threaded else "mt"
print("\nProfiling add_spend_bundle() with large bundles")
total_bundles = 0
tasks = []
with enable_profiler(True, f"add-large-{suffix}"):
start = monotonic()
for peer in range(NUM_PEERS):
total_bundles += len(large_spend_bundles[peer])
tasks.append(asyncio.create_task(add_spend_bundles(large_spend_bundles[peer])))
await asyncio.gather(*tasks)
stop = monotonic()
print(f" time: {stop - start:0.4f}s")
print(f" per call: {(stop - start) / total_bundles * 1000:0.2f}ms")
mempool = MempoolManager(get_coin_record, DEFAULT_CONSTANTS, single_threaded=single_threaded)
height = start_height
rec = fake_block_record(height, timestamp)
await mempool.new_peak(rec, None)
print("\nProfiling add_spend_bundle()")
total_bundles = 0
tasks = []
with enable_profiler(True, f"add-{suffix}"):
start = monotonic()
for peer in range(NUM_PEERS):
total_bundles += len(spend_bundles[peer])
tasks.append(asyncio.create_task(add_spend_bundles(spend_bundles[peer])))
await asyncio.gather(*tasks)
stop = monotonic()
print(f" time: {stop - start:0.4f}s")
print(f" per call: {(stop - start) / total_bundles * 1000:0.2f}ms")
print("\nProfiling add_spend_bundle() with replace-by-fee")
total_bundles = 0
tasks = []
with enable_profiler(True, f"replace-{suffix}"):
start = monotonic()
for peer in range(NUM_PEERS):
total_bundles += len(replacement_spend_bundles[peer])
tasks.append(asyncio.create_task(add_spend_bundles(replacement_spend_bundles[peer])))
await asyncio.gather(*tasks)
stop = monotonic()
print(f" time: {stop - start:0.4f}s")
print(f" per call: {(stop - start) / total_bundles * 1000:0.2f}ms")
print("\nProfiling create_bundle_from_mempool()")
with enable_profiler(True, f"create-{suffix}"):
start = monotonic()
for _ in range(500):
mempool.create_bundle_from_mempool(rec.header_hash)
stop = monotonic()
print(f" time: {stop - start:0.4f}s")
print(f" per call: {(stop - start) / 500 * 1000:0.2f}ms")
print("\nProfiling new_peak() (optimized)")
blocks: List[Tuple[BenchBlockRecord, NPCResult]] = []
for coin_id in all_coins.keys():
height = uint32(height + 1)
timestamp = uint64(timestamp + 19)
rec = fake_block_record(height, timestamp)
npc_result = NPCResult(
None,
SpendBundleConditions(
[Spend(coin_id, bytes32(b" " * 32), None, None, None, None, None, None, [], [], 0)],
0,
0,
0,
None,
None,
[],
0,
0,
0,
),
uint64(1000000000),
)
blocks.append((rec, npc_result))
with enable_profiler(True, f"new-peak-{suffix}"):
start = monotonic()
for rec, npc_result in blocks:
await mempool.new_peak(rec, npc_result)
stop = monotonic()
print(f" time: {stop - start:0.4f}s")
print(f" per call: {(stop - start) / len(blocks) * 1000:0.2f}ms")
print("\nProfiling new_peak() (reorg)")
blocks = []
for coin_id in all_coins.keys():
height = uint32(height + 2)
timestamp = uint64(timestamp + 28)
rec = fake_block_record(height, timestamp)
npc_result = NPCResult(
None,
SpendBundleConditions(
[Spend(coin_id, bytes32(b" " * 32), None, None, None, None, None, None, [], [], 0)],
0,
0,
0,
None,
None,
[],
0,
0,
0,
),
uint64(1000000000),
)
blocks.append((rec, npc_result))
with enable_profiler(True, f"new-peak-reorg-{suffix}"):
start = monotonic()
for rec, npc_result in blocks:
await mempool.new_peak(rec, npc_result)
stop = monotonic()
print(f" time: {stop - start:0.4f}s")
print(f" per call: {(stop - start) / len(blocks) * 1000:0.2f}ms")
if __name__ == "__main__":
import logging
logger = logging.getLogger()
logger.addHandler(logging.StreamHandler())
logger.setLevel(logging.WARNING)
asyncio.run(run_mempool_benchmark())