force \n/lf line endings in pre-commit (#15036)

* force \n/lf line endings in pre-commit

* more

* set -o pipefail

* Revert "set -o pipefail"

This reverts commit b359ec806b.
This commit is contained in:
Kyle Altendorf 2023-05-03 01:29:31 -04:00 committed by GitHub
parent 7f7bce8677
commit 1228892c43
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 1046 additions and 1044 deletions

View File

@ -1,12 +1,12 @@
contact_links:
- about: Ask a question or request support here
name: Ask for Support
url: >-
https://github.com/Chia-Network/chia-blockchain/discussions/new?category=support
- about: Request a new feature or idea here
name: Make a Request
url: >-
https://github.com/Chia-Network/chia-blockchain/discussions/new?category=ideas
- about: Get support on the Chia Keybase chat channels.
name: Join the Keybase.io support chat
url: 'https://keybase.io/team/chia_network.public'
contact_links:
- about: Ask a question or request support here
name: Ask for Support
url: >-
https://github.com/Chia-Network/chia-blockchain/discussions/new?category=support
- about: Request a new feature or idea here
name: Make a Request
url: >-
https://github.com/Chia-Network/chia-blockchain/discussions/new?category=ideas
- about: Get support on the Chia Keybase chat channels.
name: Join the Keybase.io support chat
url: 'https://keybase.io/team/chia_network.public'

View File

@ -22,6 +22,8 @@ repos:
rev: v4.3.0
hooks:
- id: check-yaml
- id: mixed-line-ending
args: ["--fix=lf"]
- id: end-of-file-fixer
exclude: ".*?(.hex|.clsp|.clvm|.clib)"
- id: trailing-whitespace

View File

@ -1,127 +1,127 @@
from __future__ import annotations
from pathlib import Path
from typing import Any, Dict, Optional
from chia.cmds.cmds_util import NODE_TYPES, get_any_service_client
from chia.rpc.rpc_client import RpcClient
async def add_node_connection(rpc_client: RpcClient, add_connection: str) -> None:
if ":" not in add_connection:
print("Enter a valid IP and port in the following format: 10.5.4.3:8000")
else:
ip, port = (
":".join(add_connection.split(":")[:-1]),
add_connection.split(":")[-1],
)
print(f"Connecting to {ip}, {port}")
try:
result = await rpc_client.open_connection(ip, int(port))
err = result.get("error")
if result["success"] is False or err is not None:
print(err)
except Exception:
print(f"Failed to connect to {ip}:{port}")
async def remove_node_connection(rpc_client: RpcClient, remove_connection: str) -> None:
from chia.server.outbound_message import NodeType
result_txt = ""
if len(remove_connection) != 8:
result_txt = "Invalid NodeID. Do not include '.'"
else:
connections = await rpc_client.get_connections()
for con in connections:
if remove_connection == con["node_id"].hex()[:8]:
print("Attempting to disconnect", "NodeID", remove_connection)
try:
await rpc_client.close_connection(con["node_id"])
except Exception:
result_txt = f"Failed to disconnect NodeID {remove_connection}"
else:
result_txt = (
f"NodeID {remove_connection}... {NodeType(con['type']).name} {con['peer_host']} disconnected"
)
elif result_txt == "":
result_txt = f"NodeID {remove_connection}... not found"
print(result_txt)
async def print_connections(rpc_client: RpcClient, trusted_peers: Dict[str, Any]) -> None:
import time
from chia.server.outbound_message import NodeType
from chia.util.network import is_trusted_peer
connections = await rpc_client.get_connections()
print("Connections:")
print("Type IP Ports NodeID Last Connect" + " MiB Up|Dwn")
for con in connections:
last_connect_tuple = time.struct_time(time.localtime(con["last_message_time"]))
last_connect = time.strftime("%b %d %T", last_connect_tuple)
mb_down = con["bytes_read"] / (1024 * 1024)
mb_up = con["bytes_written"] / (1024 * 1024)
host = con["peer_host"]
# Strip IPv6 brackets
host = host.strip("[]")
trusted: bool = is_trusted_peer(host, con["node_id"], trusted_peers, False)
# Nodetype length is 9 because INTRODUCER will be deprecated
if NodeType(con["type"]) is NodeType.FULL_NODE:
peak_height = con.get("peak_height", None)
connection_peak_hash = con.get("peak_hash", None)
if connection_peak_hash is None:
connection_peak_hash = "No Info"
else:
if connection_peak_hash.startswith(("0x", "0X")):
connection_peak_hash = connection_peak_hash[2:]
connection_peak_hash = f"{connection_peak_hash[:8]}..."
con_str = (
f"{NodeType(con['type']).name:9} {host:39} "
f"{con['peer_port']:5}/{con['peer_server_port']:<5}"
f" {con['node_id'].hex()[:8]}... "
f"{last_connect} "
f"{mb_up:7.1f}|{mb_down:<7.1f}"
f"\n "
)
if peak_height is not None:
con_str += f"-Height: {peak_height:8.0f} -Hash: {connection_peak_hash}"
else:
con_str += f"-Height: No Info -Hash: {connection_peak_hash}"
# Only show when Trusted is True
if trusted:
con_str += f" -Trusted: {trusted}"
else:
con_str = (
f"{NodeType(con['type']).name:9} {host:39} "
f"{con['peer_port']:5}/{con['peer_server_port']:<5}"
f" {con['node_id'].hex()[:8]}... "
f"{last_connect} "
f"{mb_up:7.1f}|{mb_down:<7.1f}"
)
print(con_str)
async def peer_async(
node_type: str,
rpc_port: Optional[int],
root_path: Path,
show_connections: bool,
add_connection: str,
remove_connection: str,
) -> None:
client_type = NODE_TYPES[node_type]
async with get_any_service_client(client_type, rpc_port, root_path) as (rpc_client, config):
if rpc_client is not None:
# Check or edit node connections
if show_connections:
trusted_peers: Dict[str, Any] = config["full_node"].get("trusted_peers", {})
await print_connections(rpc_client, trusted_peers)
# if called together with state, leave a blank line
if add_connection:
await add_node_connection(rpc_client, add_connection)
if remove_connection:
await remove_node_connection(rpc_client, remove_connection)
from __future__ import annotations
from pathlib import Path
from typing import Any, Dict, Optional
from chia.cmds.cmds_util import NODE_TYPES, get_any_service_client
from chia.rpc.rpc_client import RpcClient
async def add_node_connection(rpc_client: RpcClient, add_connection: str) -> None:
if ":" not in add_connection:
print("Enter a valid IP and port in the following format: 10.5.4.3:8000")
else:
ip, port = (
":".join(add_connection.split(":")[:-1]),
add_connection.split(":")[-1],
)
print(f"Connecting to {ip}, {port}")
try:
result = await rpc_client.open_connection(ip, int(port))
err = result.get("error")
if result["success"] is False or err is not None:
print(err)
except Exception:
print(f"Failed to connect to {ip}:{port}")
async def remove_node_connection(rpc_client: RpcClient, remove_connection: str) -> None:
from chia.server.outbound_message import NodeType
result_txt = ""
if len(remove_connection) != 8:
result_txt = "Invalid NodeID. Do not include '.'"
else:
connections = await rpc_client.get_connections()
for con in connections:
if remove_connection == con["node_id"].hex()[:8]:
print("Attempting to disconnect", "NodeID", remove_connection)
try:
await rpc_client.close_connection(con["node_id"])
except Exception:
result_txt = f"Failed to disconnect NodeID {remove_connection}"
else:
result_txt = (
f"NodeID {remove_connection}... {NodeType(con['type']).name} {con['peer_host']} disconnected"
)
elif result_txt == "":
result_txt = f"NodeID {remove_connection}... not found"
print(result_txt)
async def print_connections(rpc_client: RpcClient, trusted_peers: Dict[str, Any]) -> None:
import time
from chia.server.outbound_message import NodeType
from chia.util.network import is_trusted_peer
connections = await rpc_client.get_connections()
print("Connections:")
print("Type IP Ports NodeID Last Connect" + " MiB Up|Dwn")
for con in connections:
last_connect_tuple = time.struct_time(time.localtime(con["last_message_time"]))
last_connect = time.strftime("%b %d %T", last_connect_tuple)
mb_down = con["bytes_read"] / (1024 * 1024)
mb_up = con["bytes_written"] / (1024 * 1024)
host = con["peer_host"]
# Strip IPv6 brackets
host = host.strip("[]")
trusted: bool = is_trusted_peer(host, con["node_id"], trusted_peers, False)
# Nodetype length is 9 because INTRODUCER will be deprecated
if NodeType(con["type"]) is NodeType.FULL_NODE:
peak_height = con.get("peak_height", None)
connection_peak_hash = con.get("peak_hash", None)
if connection_peak_hash is None:
connection_peak_hash = "No Info"
else:
if connection_peak_hash.startswith(("0x", "0X")):
connection_peak_hash = connection_peak_hash[2:]
connection_peak_hash = f"{connection_peak_hash[:8]}..."
con_str = (
f"{NodeType(con['type']).name:9} {host:39} "
f"{con['peer_port']:5}/{con['peer_server_port']:<5}"
f" {con['node_id'].hex()[:8]}... "
f"{last_connect} "
f"{mb_up:7.1f}|{mb_down:<7.1f}"
f"\n "
)
if peak_height is not None:
con_str += f"-Height: {peak_height:8.0f} -Hash: {connection_peak_hash}"
else:
con_str += f"-Height: No Info -Hash: {connection_peak_hash}"
# Only show when Trusted is True
if trusted:
con_str += f" -Trusted: {trusted}"
else:
con_str = (
f"{NodeType(con['type']).name:9} {host:39} "
f"{con['peer_port']:5}/{con['peer_server_port']:<5}"
f" {con['node_id'].hex()[:8]}... "
f"{last_connect} "
f"{mb_up:7.1f}|{mb_down:<7.1f}"
)
print(con_str)
async def peer_async(
node_type: str,
rpc_port: Optional[int],
root_path: Path,
show_connections: bool,
add_connection: str,
remove_connection: str,
) -> None:
client_type = NODE_TYPES[node_type]
async with get_any_service_client(client_type, rpc_port, root_path) as (rpc_client, config):
if rpc_client is not None:
# Check or edit node connections
if show_connections:
trusted_peers: Dict[str, Any] = config["full_node"].get("trusted_peers", {})
await print_connections(rpc_client, trusted_peers)
# if called together with state, leave a blank line
if add_connection:
await add_node_connection(rpc_client, add_connection)
if remove_connection:
await remove_node_connection(rpc_client, remove_connection)

View File

@ -1,216 +1,216 @@
from __future__ import annotations
import asyncio
from pathlib import Path
from typing import Any, Optional
import click
from chia.cmds.sim_funcs import async_config_wizard, farm_blocks, print_status, revert_block_height, set_auto_farm
from chia.util.default_root import SIMULATOR_ROOT_PATH
@click.group("sim", help="Configure and make requests to a Chia Simulator Full Node")
@click.option(
"-p",
"--rpc-port",
help=(
"Set the port where the Simulator is hosting the RPC interface. "
"See the rpc_port under full_node in config.yaml"
),
type=int,
default=None,
)
@click.option(
"--root-path", default=SIMULATOR_ROOT_PATH, help="Simulator root folder.", type=click.Path(), show_default=True
)
@click.option(
"-n",
"--simulator-name",
help="This name is used to determine the sub folder to use in the simulator root folder.",
type=str,
default="main",
)
@click.pass_context
def sim_cmd(ctx: click.Context, rpc_port: Optional[int], root_path: str, simulator_name: str) -> None:
ctx.ensure_object(dict)
ctx.obj["root_path"] = Path(root_path) / simulator_name
ctx.obj["sim_name"] = simulator_name
ctx.obj["rpc_port"] = rpc_port
@sim_cmd.command("create", help="Guides you through the process of setting up a Chia Simulator")
@click.option("-f", "--fingerprint", type=int, required=False, help="Use your fingerprint to skip the key prompt")
@click.option(
"-r",
"--reward-address",
type=str,
required=False,
help="Use this address instead of the default farming address.",
)
@click.option(
"-p", "--plot-directory", type=str, required=False, help="Use a different directory then 'simulator/plots'."
)
@click.option("-m", "--mnemonic", type=str, required=False, help="Add to keychain and use a specific mnemonic.")
@click.option("-a", "--auto-farm", type=bool, default=None, help="Enable or Disable auto farming")
@click.option(
"-d",
"--docker-mode",
is_flag=True,
hidden=True,
help="Run non-interactively in Docker Mode, & generate a new key if keychain is empty.",
)
@click.option("-b", "--no-bitfield", type=bool, is_flag=True, help="Do not use bitfield when generating plots")
@click.pass_context
def create_simulator_config(
ctx: click.Context,
fingerprint: Optional[int],
reward_address: Optional[str],
plot_directory: Optional[str],
mnemonic: Optional[str],
auto_farm: Optional[bool],
docker_mode: bool,
no_bitfield: bool,
) -> None:
print(f"Using this Directory: {ctx.obj['root_path']}\n")
if fingerprint and mnemonic:
print("You can't use both a fingerprint and a mnemonic. Please choose one.")
return None
asyncio.run(
async_config_wizard(
ctx.obj["root_path"],
fingerprint,
reward_address,
plot_directory,
mnemonic,
auto_farm,
docker_mode,
not no_bitfield,
)
)
@sim_cmd.command("start", help="Start service groups while automatically using the right chia_root.")
@click.option("-r", "--restart", is_flag=True, help="Restart running services")
@click.option("-w", "--wallet", is_flag=True, help="Start wallet")
@click.pass_context
def sim_start_cmd(ctx: click.Context, restart: bool, wallet: bool) -> None:
from chia.cmds.start import start_cmd
group: tuple[str, ...] = ("simulator",)
if wallet:
group += ("wallet",)
ctx.invoke(start_cmd, restart=restart, group=group)
@sim_cmd.command("stop", help="Stop running services while automatically using the right chia_root.")
@click.option("-d", "--daemon", is_flag=True, help="Stop daemon")
@click.option("-w", "--wallet", is_flag=True, help="Stop wallet")
@click.pass_context
def sim_stop_cmd(ctx: click.Context, daemon: bool, wallet: bool) -> None:
from chia.cmds.stop import stop_cmd
group: Any = ("simulator",)
if wallet:
group += ("wallet",)
ctx.invoke(stop_cmd, daemon=daemon, group=group)
@sim_cmd.command("status", help="Get information about the state of the simulator.")
@click.option("-f", "--fingerprint", type=int, help="Get detailed information on this fingerprint.")
@click.option("--show-key/--no-show-key", help="Show detailed key information.")
@click.option("-c", "--show-coins", is_flag=True, help="Show all unspent coins.")
@click.option("-i", "--include-rewards", is_flag=True, help="Include reward coins when showing coins.")
@click.option("-a", "--show-addresses", is_flag=True, help="Show the balances of all addresses.")
@click.pass_context
def status_cmd(
ctx: click.Context,
fingerprint: Optional[int],
show_key: bool,
show_coins: bool,
include_rewards: bool,
show_addresses: bool,
) -> None:
asyncio.run(
print_status(
ctx.obj["rpc_port"],
ctx.obj["root_path"],
fingerprint,
show_key,
show_coins,
include_rewards,
show_addresses,
)
)
@sim_cmd.command("revert", help="Reset chain to a previous block height.")
@click.option("-b", "--blocks", type=int, default=1, help="Number of blocks to go back.")
@click.option("-n", "--new-blocks", type=int, default=1, help="Number of new blocks to add during a reorg.")
@click.option("-r", "--reset", is_flag=True, help="Reset the chain to the genesis block")
@click.option(
"-f",
"--force",
is_flag=True,
help="Forcefully delete blocks, this is not a reorg but might be needed in very special circumstances."
" Note: Use with caution, this will break all wallets.",
)
@click.option("-d", "--disable-prompt", is_flag=True, help="Disable confirmation prompt when force reverting.")
@click.pass_context
def revert_cmd(
ctx: click.Context, blocks: int, new_blocks: int, reset: bool, force: bool, disable_prompt: bool
) -> None:
if force and not disable_prompt:
input_str = (
"Are you sure you want to force delete blocks? This should only ever be used in special circumstances,"
" and will break all wallets. \nPress 'y' to continue, or any other button to exit: "
)
if input(input_str) != "y":
return
if reset and not force:
print("\n The force flag (-f) is required to reset the chain to the genesis block. \n")
return
if reset and blocks != 1:
print("\nBlocks, '-b' must not be set if all blocks are selected by reset, '-r'. Exiting.\n")
return
asyncio.run(
revert_block_height(
ctx.obj["rpc_port"],
ctx.obj["root_path"],
blocks,
new_blocks,
reset,
force,
)
)
@sim_cmd.command("farm", help="Farm blocks")
@click.option("-b", "--blocks", type=int, default=1, help="Amount of blocks to create")
@click.option("-n", "--non-transaction", is_flag=True, help="Allow non-transaction blocks")
@click.option("-a", "--target-address", type=str, default="", help="Block reward address")
@click.pass_context
def farm_cmd(ctx: click.Context, blocks: int, non_transaction: bool, target_address: str) -> None:
asyncio.run(
farm_blocks(
ctx.obj["rpc_port"],
ctx.obj["root_path"],
blocks,
not non_transaction,
target_address,
)
)
@sim_cmd.command("autofarm", help="Enable or disable auto farming on transaction submission")
@click.argument("set-autofarm", type=click.Choice(["on", "off"]), nargs=1, required=True)
@click.pass_context
def autofarm_cmd(ctx: click.Context, set_autofarm: str) -> None:
autofarm = bool(set_autofarm == "on")
asyncio.run(
set_auto_farm(
ctx.obj["rpc_port"],
ctx.obj["root_path"],
autofarm,
)
)
from __future__ import annotations
import asyncio
from pathlib import Path
from typing import Any, Optional
import click
from chia.cmds.sim_funcs import async_config_wizard, farm_blocks, print_status, revert_block_height, set_auto_farm
from chia.util.default_root import SIMULATOR_ROOT_PATH
@click.group("sim", help="Configure and make requests to a Chia Simulator Full Node")
@click.option(
"-p",
"--rpc-port",
help=(
"Set the port where the Simulator is hosting the RPC interface. "
"See the rpc_port under full_node in config.yaml"
),
type=int,
default=None,
)
@click.option(
"--root-path", default=SIMULATOR_ROOT_PATH, help="Simulator root folder.", type=click.Path(), show_default=True
)
@click.option(
"-n",
"--simulator-name",
help="This name is used to determine the sub folder to use in the simulator root folder.",
type=str,
default="main",
)
@click.pass_context
def sim_cmd(ctx: click.Context, rpc_port: Optional[int], root_path: str, simulator_name: str) -> None:
ctx.ensure_object(dict)
ctx.obj["root_path"] = Path(root_path) / simulator_name
ctx.obj["sim_name"] = simulator_name
ctx.obj["rpc_port"] = rpc_port
@sim_cmd.command("create", help="Guides you through the process of setting up a Chia Simulator")
@click.option("-f", "--fingerprint", type=int, required=False, help="Use your fingerprint to skip the key prompt")
@click.option(
"-r",
"--reward-address",
type=str,
required=False,
help="Use this address instead of the default farming address.",
)
@click.option(
"-p", "--plot-directory", type=str, required=False, help="Use a different directory then 'simulator/plots'."
)
@click.option("-m", "--mnemonic", type=str, required=False, help="Add to keychain and use a specific mnemonic.")
@click.option("-a", "--auto-farm", type=bool, default=None, help="Enable or Disable auto farming")
@click.option(
"-d",
"--docker-mode",
is_flag=True,
hidden=True,
help="Run non-interactively in Docker Mode, & generate a new key if keychain is empty.",
)
@click.option("-b", "--no-bitfield", type=bool, is_flag=True, help="Do not use bitfield when generating plots")
@click.pass_context
def create_simulator_config(
ctx: click.Context,
fingerprint: Optional[int],
reward_address: Optional[str],
plot_directory: Optional[str],
mnemonic: Optional[str],
auto_farm: Optional[bool],
docker_mode: bool,
no_bitfield: bool,
) -> None:
print(f"Using this Directory: {ctx.obj['root_path']}\n")
if fingerprint and mnemonic:
print("You can't use both a fingerprint and a mnemonic. Please choose one.")
return None
asyncio.run(
async_config_wizard(
ctx.obj["root_path"],
fingerprint,
reward_address,
plot_directory,
mnemonic,
auto_farm,
docker_mode,
not no_bitfield,
)
)
@sim_cmd.command("start", help="Start service groups while automatically using the right chia_root.")
@click.option("-r", "--restart", is_flag=True, help="Restart running services")
@click.option("-w", "--wallet", is_flag=True, help="Start wallet")
@click.pass_context
def sim_start_cmd(ctx: click.Context, restart: bool, wallet: bool) -> None:
from chia.cmds.start import start_cmd
group: tuple[str, ...] = ("simulator",)
if wallet:
group += ("wallet",)
ctx.invoke(start_cmd, restart=restart, group=group)
@sim_cmd.command("stop", help="Stop running services while automatically using the right chia_root.")
@click.option("-d", "--daemon", is_flag=True, help="Stop daemon")
@click.option("-w", "--wallet", is_flag=True, help="Stop wallet")
@click.pass_context
def sim_stop_cmd(ctx: click.Context, daemon: bool, wallet: bool) -> None:
from chia.cmds.stop import stop_cmd
group: Any = ("simulator",)
if wallet:
group += ("wallet",)
ctx.invoke(stop_cmd, daemon=daemon, group=group)
@sim_cmd.command("status", help="Get information about the state of the simulator.")
@click.option("-f", "--fingerprint", type=int, help="Get detailed information on this fingerprint.")
@click.option("--show-key/--no-show-key", help="Show detailed key information.")
@click.option("-c", "--show-coins", is_flag=True, help="Show all unspent coins.")
@click.option("-i", "--include-rewards", is_flag=True, help="Include reward coins when showing coins.")
@click.option("-a", "--show-addresses", is_flag=True, help="Show the balances of all addresses.")
@click.pass_context
def status_cmd(
ctx: click.Context,
fingerprint: Optional[int],
show_key: bool,
show_coins: bool,
include_rewards: bool,
show_addresses: bool,
) -> None:
asyncio.run(
print_status(
ctx.obj["rpc_port"],
ctx.obj["root_path"],
fingerprint,
show_key,
show_coins,
include_rewards,
show_addresses,
)
)
@sim_cmd.command("revert", help="Reset chain to a previous block height.")
@click.option("-b", "--blocks", type=int, default=1, help="Number of blocks to go back.")
@click.option("-n", "--new-blocks", type=int, default=1, help="Number of new blocks to add during a reorg.")
@click.option("-r", "--reset", is_flag=True, help="Reset the chain to the genesis block")
@click.option(
"-f",
"--force",
is_flag=True,
help="Forcefully delete blocks, this is not a reorg but might be needed in very special circumstances."
" Note: Use with caution, this will break all wallets.",
)
@click.option("-d", "--disable-prompt", is_flag=True, help="Disable confirmation prompt when force reverting.")
@click.pass_context
def revert_cmd(
ctx: click.Context, blocks: int, new_blocks: int, reset: bool, force: bool, disable_prompt: bool
) -> None:
if force and not disable_prompt:
input_str = (
"Are you sure you want to force delete blocks? This should only ever be used in special circumstances,"
" and will break all wallets. \nPress 'y' to continue, or any other button to exit: "
)
if input(input_str) != "y":
return
if reset and not force:
print("\n The force flag (-f) is required to reset the chain to the genesis block. \n")
return
if reset and blocks != 1:
print("\nBlocks, '-b' must not be set if all blocks are selected by reset, '-r'. Exiting.\n")
return
asyncio.run(
revert_block_height(
ctx.obj["rpc_port"],
ctx.obj["root_path"],
blocks,
new_blocks,
reset,
force,
)
)
@sim_cmd.command("farm", help="Farm blocks")
@click.option("-b", "--blocks", type=int, default=1, help="Amount of blocks to create")
@click.option("-n", "--non-transaction", is_flag=True, help="Allow non-transaction blocks")
@click.option("-a", "--target-address", type=str, default="", help="Block reward address")
@click.pass_context
def farm_cmd(ctx: click.Context, blocks: int, non_transaction: bool, target_address: str) -> None:
asyncio.run(
farm_blocks(
ctx.obj["rpc_port"],
ctx.obj["root_path"],
blocks,
not non_transaction,
target_address,
)
)
@sim_cmd.command("autofarm", help="Enable or disable auto farming on transaction submission")
@click.argument("set-autofarm", type=click.Choice(["on", "off"]), nargs=1, required=True)
@click.pass_context
def autofarm_cmd(ctx: click.Context, set_autofarm: str) -> None:
autofarm = bool(set_autofarm == "on")
asyncio.run(
set_auto_farm(
ctx.obj["rpc_port"],
ctx.obj["root_path"],
autofarm,
)
)

View File

@ -1,498 +1,498 @@
from __future__ import annotations
import asyncio
import os
import sys
from pathlib import Path, PureWindowsPath
from random import randint
from typing import Any, Dict, List, Optional
from aiohttp import ClientConnectorError
from blspy import PrivateKey
from chia.cmds.cmds_util import get_any_service_client
from chia.cmds.start_funcs import async_start
from chia.consensus.coinbase import create_puzzlehash_for_pk
from chia.simulator.simulator_full_node_rpc_client import SimulatorFullNodeRpcClient
from chia.types.blockchain_format.sized_bytes import bytes32
from chia.types.coin_record import CoinRecord
from chia.util.bech32m import decode_puzzle_hash, encode_puzzle_hash
from chia.util.config import load_config, save_config
from chia.util.errors import KeychainFingerprintExists
from chia.util.ints import uint32
from chia.util.keychain import Keychain, bytes_to_mnemonic
from chia.wallet.derive_keys import (
master_sk_to_farmer_sk,
master_sk_to_pool_sk,
master_sk_to_wallet_sk,
master_sk_to_wallet_sk_unhardened,
)
def get_ph_from_fingerprint(fingerprint: int, key_id: int = 1) -> bytes32:
priv_key_and_entropy = Keychain().get_private_key_by_fingerprint(fingerprint)
if priv_key_and_entropy is None:
raise Exception("Fingerprint not found")
private_key = priv_key_and_entropy[0]
sk_for_wallet_id: PrivateKey = master_sk_to_wallet_sk(private_key, uint32(key_id))
puzzle_hash: bytes32 = create_puzzlehash_for_pk(sk_for_wallet_id.get_g1())
return puzzle_hash
def create_chia_directory(
chia_root: Path,
fingerprint: int,
farming_address: Optional[str],
plot_directory: Optional[str],
auto_farm: Optional[bool],
docker_mode: bool,
) -> Dict[str, Any]:
"""
This function creates a new chia directory and returns a heavily modified config,
suitable for use in the simulator.
"""
from chia.cmds.init_funcs import chia_init
if not chia_root.is_dir() or not Path(chia_root / "config" / "config.yaml").exists():
# create chia directories & load config
chia_init(chia_root, testnet=True, fix_ssl_permissions=True)
config: Dict[str, Any] = load_config(chia_root, "config.yaml")
# apply standard block-tools config.
config["full_node"]["send_uncompact_interval"] = 0
config["full_node"]["target_uncompact_proofs"] = 30
config["full_node"]["peer_connect_interval"] = 50
config["full_node"]["sanitize_weight_proof_only"] = False
config["logging"]["log_level"] = "INFO" # extra logs for easier development
# make sure we don't try to connect to other nodes.
config["full_node"]["introducer_peer"] = None
config["wallet"]["introducer_peer"] = None
config["full_node"]["dns_servers"] = []
config["wallet"]["dns_servers"] = []
# create custom testnet (simulator0)
config["network_overrides"]["constants"]["simulator0"] = config["network_overrides"]["constants"][
"testnet0"
].copy()
config["network_overrides"]["config"]["simulator0"] = config["network_overrides"]["config"]["testnet0"].copy()
sim_genesis = "eb8c4d20b322be8d9fddbf9412016bdffe9a2901d7edb0e364e94266d0e095f7"
config["network_overrides"]["constants"]["simulator0"]["GENESIS_CHALLENGE"] = sim_genesis
# tell services to use simulator0
config["selected_network"] = "simulator0"
config["wallet"]["selected_network"] = "simulator0"
config["full_node"]["selected_network"] = "simulator0"
if not docker_mode: # We want predictable ports for our docker image.
# set ports and networks, we don't want to cause a port conflict.
port_offset = randint(1, 20000)
config["daemon_port"] -= port_offset
config["network_overrides"]["config"]["simulator0"]["default_full_node_port"] = 38444 + port_offset
# wallet
config["wallet"]["port"] += port_offset
config["wallet"]["rpc_port"] += port_offset
# full node
config["full_node"]["port"] -= port_offset
config["full_node"]["rpc_port"] += port_offset
# connect wallet to full node
config["wallet"]["full_node_peer"]["port"] = config["full_node"]["port"]
config["full_node"]["wallet_peer"]["port"] = config["wallet"]["port"]
# ui
config["ui"]["daemon_port"] = config["daemon_port"]
else:
config["self_hostname"] = "0.0.0.0" # Bind to all interfaces.
config["logging"]["log_stdout"] = True # Log to console.
else:
config = load_config(chia_root, "config.yaml")
# simulator overrides
config["simulator"]["key_fingerprint"] = fingerprint
if farming_address is None:
prefix = config["network_overrides"]["config"]["simulator0"]["address_prefix"]
farming_address = encode_puzzle_hash(get_ph_from_fingerprint(fingerprint), prefix)
config["simulator"]["farming_address"] = farming_address
if plot_directory is not None:
config["simulator"]["plot_directory"] = plot_directory
# Temporary change to fix win / linux differences.
config["simulator"]["plot_directory"] = str(Path(config["simulator"]["plot_directory"]))
if "//" in config["simulator"]["plot_directory"] and os.name != "nt":
# if we're on linux, we need to convert to a linux path.
config["simulator"]["plot_directory"] = str(PureWindowsPath(config["simulator"]["plot_directory"]).as_posix())
config["simulator"]["auto_farm"] = auto_farm if auto_farm is not None else True
farming_ph = decode_puzzle_hash(farming_address)
# modify genesis block to give the user the reward
simulator_consts = config["network_overrides"]["constants"]["simulator0"]
simulator_consts["GENESIS_PRE_FARM_FARMER_PUZZLE_HASH"] = farming_ph.hex()
simulator_consts["GENESIS_PRE_FARM_POOL_PUZZLE_HASH"] = farming_ph.hex()
# save config and return the config
save_config(chia_root, "config.yaml", config)
return config
def display_key_info(fingerprint: int, prefix: str) -> None:
"""
Display key info for a given fingerprint, similar to the output of `chia keys show`.
"""
print(f"Using fingerprint {fingerprint}")
private_key_and_seed = Keychain().get_private_key_by_fingerprint(fingerprint)
if private_key_and_seed is None:
print(f"Fingerprint {fingerprint} not found")
return
sk, seed = private_key_and_seed
print("\nFingerprint:", sk.get_g1().get_fingerprint())
print("Master public key (m):", sk.get_g1())
print("Farmer public key (m/12381/8444/0/0):", master_sk_to_farmer_sk(sk).get_g1())
print("Pool public key (m/12381/8444/1/0):", master_sk_to_pool_sk(sk).get_g1())
first_wallet_sk: PrivateKey = master_sk_to_wallet_sk_unhardened(sk, uint32(0))
wallet_address: str = encode_puzzle_hash(create_puzzlehash_for_pk(first_wallet_sk.get_g1()), prefix)
print(f"First wallet address: {wallet_address}")
assert seed is not None
print("Master private key (m):", bytes(sk).hex())
print("First wallet secret key (m/12381/8444/2/0):", master_sk_to_wallet_sk(sk, uint32(0)))
mnemonic = bytes_to_mnemonic(seed)
print(" Mnemonic seed (24 secret words):")
print(f"{mnemonic} \n")
def generate_and_return_fingerprint(mnemonic: Optional[str] = None) -> int:
"""
Generate and add new PrivateKey and return its fingerprint.
"""
from chia.util.keychain import generate_mnemonic
if mnemonic is None:
print("Generating private key")
mnemonic = generate_mnemonic()
try:
sk = Keychain().add_private_key(mnemonic, None)
fingerprint: int = sk.get_g1().get_fingerprint()
except KeychainFingerprintExists as e:
fingerprint = e.fingerprint
print(f"Fingerprint: {fingerprint} for provided private key already exists.")
return fingerprint
print(f"Added private key with public key fingerprint {fingerprint}")
return fingerprint
def select_fingerprint(
fingerprint: Optional[int] = None, mnemonic_string: Optional[str] = None, auto_generate_key: bool = False
) -> Optional[int]:
"""
Either select an existing fingerprint or create one and return it.
"""
if mnemonic_string:
fingerprint = generate_and_return_fingerprint(mnemonic_string)
fingerprints: list[int] = [pk.get_fingerprint() for pk in Keychain().get_all_public_keys()]
if fingerprint is not None and fingerprint in fingerprints:
return fingerprint
elif fingerprint is not None and fingerprint not in fingerprints:
print(f"Invalid Fingerprint. Fingerprint {fingerprint} was not found.")
return None
if auto_generate_key and len(fingerprints) == 1:
return fingerprints[0]
if len(fingerprints) == 0:
if not auto_generate_key:
if (
input("No keys in keychain. Press 'q' to quit, or press any other key to generate a new key.").lower()
== "q"
):
return None
# generate private key and add to wallet
fingerprint = generate_and_return_fingerprint()
else:
print("Fingerprints:")
print(
"If you already used one of these keys, select that fingerprint to skip the plotting process."
" Otherwise, select any key below."
)
for i, fp in enumerate(fingerprints):
row: str = f"{i + 1}) "
row += f"{fp}"
print(row)
val = None
prompt: str = f"Choose a simulator key [1-{len(fingerprints)}] ('q' to quit, or 'g' to generate a new key): "
while val is None:
val = input(prompt)
if val == "q":
return None
elif val == "g":
fingerprint = generate_and_return_fingerprint()
break
elif not val.isdigit():
val = None
else:
index = int(val) - 1
if index < 0 or index >= len(fingerprints):
print("Invalid value")
val = None
continue
else:
fingerprint = fingerprints[index]
assert fingerprint is not None
return fingerprint
async def generate_plots(config: Dict[str, Any], root_path: Path, fingerprint: int, bitfield: bool) -> None:
"""
Pre-Generate plots for the new simulator instance.
"""
from chia.simulator.block_tools import BlockTools, test_constants
from chia.simulator.start_simulator import PLOT_SIZE, PLOTS
farming_puzzle_hash = decode_puzzle_hash(config["simulator"]["farming_address"])
os.environ["CHIA_ROOT"] = str(root_path) # change env variable, to make it match what the daemon would set it to
# create block tools and use local keychain
bt = BlockTools(
test_constants,
root_path,
automated_testing=False,
plot_dir=config["simulator"].get("plot_directory", "plots"),
keychain=Keychain(),
)
await bt.setup_keys(fingerprint=fingerprint, reward_ph=farming_puzzle_hash)
existing_plots = await bt.setup_plots(
num_og_plots=PLOTS, num_pool_plots=0, num_non_keychain_plots=0, plot_size=PLOT_SIZE, bitfield=bitfield
)
print(f"{'New plots generated.' if existing_plots else 'Using Existing Plots'}\n")
async def get_current_height(root_path: Path) -> int:
async with get_any_service_client(SimulatorFullNodeRpcClient, root_path=root_path, consume_errors=False) as (
node_client,
_,
):
assert node_client is not None # this cant be None, because we don't catch errors
num_blocks = len(await node_client.get_all_blocks())
return num_blocks
async def async_config_wizard(
root_path: Path,
fingerprint: Optional[int],
farming_address: Optional[str],
plot_directory: Optional[str],
mnemonic_string: Optional[str],
auto_farm: Optional[bool],
docker_mode: bool,
bitfield: bool,
) -> None:
# either return passed through fingerprint or get one
fingerprint = select_fingerprint(fingerprint, mnemonic_string, docker_mode)
if fingerprint is None:
# user cancelled wizard
return
# create chia directory & get config.
print("Creating chia directory & config...")
config = create_chia_directory(root_path, fingerprint, farming_address, plot_directory, auto_farm, docker_mode)
# Pre-generate plots by running block_tools init functions.
print("Please Wait, Generating plots...")
print("This may take up to a minute if you are on a slow machine")
await generate_plots(config, root_path, fingerprint, bitfield)
# final messages
final_farming_address = config["simulator"]["farming_address"]
print(f"\nFarming & Prefarm reward address: {final_farming_address}\n")
print("Configuration Wizard Complete.")
print("Starting Simulator now...\n\n")
sys.argv[0] = str(Path(sys.executable).parent / "chia") # fix path for tests
await async_start(root_path, config, ("simulator",), False)
# now we make sure the simulator has a genesis block
print("Please wait, generating genesis block.")
while True:
try:
num_blocks: int = await get_current_height(root_path)
except ClientConnectorError:
await asyncio.sleep(0.25)
else:
if num_blocks == 0:
await farm_blocks(None, root_path, 1, True, final_farming_address)
print("Genesis block generated, exiting.")
else:
print("Genesis block already exists, exiting.")
break
print(f"\nMake sure your CHIA_ROOT Environment Variable is set to: {root_path}")
def print_coin_record(
name: str,
address_prefix: str,
coin_record: CoinRecord,
) -> None:
from datetime import datetime
coin_address = encode_puzzle_hash(coin_record.coin.puzzle_hash, address_prefix)
print(f"Coin 0x{coin_record.name.hex()}")
print(f"Wallet Address: {coin_address}")
print(f"Confirmed at block: {coin_record.confirmed_block_index}")
print(f"Spent: {f'at Block {coin_record.spent_block_index}' if coin_record.spent else 'No'}")
print(f"Coin Amount: {coin_record.coin.amount} {name}")
print(f"Parent Coin ID: 0x{coin_record.coin.parent_coin_info.hex()}")
print(f"Created at: {datetime.fromtimestamp(float(coin_record.timestamp)).strftime('%Y-%m-%d %H:%M:%S')}\n")
async def print_coin_records(
config: Dict[str, Any],
node_client: SimulatorFullNodeRpcClient,
include_reward_coins: bool,
include_spent: bool = False,
) -> None:
import sys
coin_records: List[CoinRecord] = await node_client.get_all_coins(include_spent)
coin_records = [coin_record for coin_record in coin_records if not coin_record.coinbase or include_reward_coins]
address_prefix = config["network_overrides"]["config"][config["selected_network"]]["address_prefix"]
name = "mojo"
paginate = False # I might change this later.
if len(coin_records) != 0:
print("All Coins: ")
if paginate is True:
paginate = sys.stdout.isatty()
num_per_screen = 5 if paginate else len(coin_records)
# ripped from cmds/wallet_funcs.
for i in range(0, len(coin_records), num_per_screen):
for j in range(0, num_per_screen):
if i + j >= len(coin_records):
break
print_coin_record(
coin_record=coin_records[i + j],
name=name,
address_prefix=address_prefix,
)
if i + num_per_screen <= len(coin_records) and paginate:
print("Press q to quit, or c to continue")
while True:
entered_key = sys.stdin.read(1)
if entered_key == "q":
return None
elif entered_key == "c":
break
async def print_wallets(config: Dict[str, Any], node_client: SimulatorFullNodeRpcClient) -> None:
ph_and_amount = await node_client.get_all_puzzle_hashes()
address_prefix = config["network_overrides"]["config"][config["selected_network"]]["address_prefix"]
name = "mojo"
for puzzle_hash, (amount, num_tx) in ph_and_amount.items():
address = encode_puzzle_hash(puzzle_hash, address_prefix)
print(f"Address: {address} has a balance of: {amount} {name}, with a total of: {num_tx} transactions.\n")
async def print_status(
rpc_port: Optional[int],
root_path: Path,
fingerprint: Optional[int],
show_key: bool,
show_coins: bool,
include_reward_coins: bool,
show_addresses: bool,
) -> None:
"""
This command allows users to easily get the status of the simulator
and information about the state of and the coins in the simulated blockchain.
"""
from chia.cmds.show_funcs import print_blockchain_state
from chia.cmds.units import units
async with get_any_service_client(SimulatorFullNodeRpcClient, rpc_port, root_path) as (node_client, config):
if node_client is not None:
# Display keychain info
if show_key:
if fingerprint is None:
fingerprint = config["simulator"]["key_fingerprint"]
if fingerprint is not None:
display_key_info(
fingerprint, config["network_overrides"]["config"][config["selected_network"]]["address_prefix"]
)
else:
print(
"No fingerprint in config, either rerun 'cdv sim create' "
"or use --fingerprint to specify one, skipping key information."
)
# chain status ( basically chia show -s)
await print_blockchain_state(node_client, config)
print("")
# farming information
target_ph: bytes32 = await node_client.get_farming_ph()
farming_coin_records = await node_client.get_coin_records_by_puzzle_hash(target_ph, False)
prefix = config["network_overrides"]["config"][config["selected_network"]]["address_prefix"]
print(
f"Current Farming address: {encode_puzzle_hash(target_ph, prefix)}, "
f"with a balance of: "
f"{sum(coin_records.coin.amount for coin_records in farming_coin_records) / units['chia']} TXCH."
)
if show_addresses:
print("All Addresses: ")
await print_wallets(config, node_client)
if show_coins:
await print_coin_records(config, node_client, include_reward_coins)
async def revert_block_height(
rpc_port: Optional[int],
root_path: Path,
num_blocks: int,
num_new_blocks: int,
reset_chain_to_genesis: bool,
use_revert_blocks: bool,
) -> None:
"""
This function allows users to easily revert the chain to a previous state or perform a reorg.
"""
async with get_any_service_client(SimulatorFullNodeRpcClient, rpc_port, root_path) as (node_client, _):
if node_client is not None:
if use_revert_blocks:
if num_new_blocks != 1:
print(f"Ignoring num_new_blocks: {num_new_blocks}, because we are not performing a reorg.")
# in this case num_blocks is the number of blocks to delete
new_height: int = await node_client.revert_blocks(num_blocks, reset_chain_to_genesis)
print(
f"All transactions in Block: {new_height + num_blocks} and above were successfully deleted, "
"you should now delete & restart all wallets."
)
else:
# However, in this case num_blocks is the fork height.
new_height = await node_client.reorg_blocks(num_blocks, num_new_blocks, use_revert_blocks)
old_height = new_height - num_new_blocks
print(f"All transactions in Block: {old_height - num_blocks} and above were successfully reverted.")
print(f"Block Height is now: {new_height}")
async def farm_blocks(
rpc_port: Optional[int],
root_path: Path,
num_blocks: int,
transaction_blocks: bool,
target_address: str,
) -> None:
"""
This function is used to generate new blocks.
"""
async with get_any_service_client(SimulatorFullNodeRpcClient, rpc_port, root_path) as (node_client, config):
if node_client is not None:
if target_address == "":
target_address = config["simulator"]["farming_address"]
if target_address is None:
print(
"No target address in config, falling back to the temporary address currently in use. "
"You can use 'cdv sim create' or use --target-address to specify a different address."
)
target_ph: bytes32 = await node_client.get_farming_ph()
else:
target_ph = decode_puzzle_hash(target_address)
await node_client.farm_block(target_ph, num_blocks, transaction_blocks)
print(f"Farmed {num_blocks}{' Transaction' if transaction_blocks else ''} blocks")
block_height = (await node_client.get_blockchain_state())["peak"].height
print(f"Block Height is now: {block_height}")
async def set_auto_farm(rpc_port: Optional[int], root_path: Path, set_autofarm: bool) -> None:
"""
This function can be used to enable or disable Auto Farming.
"""
async with get_any_service_client(SimulatorFullNodeRpcClient, rpc_port, root_path) as (node_client, _):
if node_client is not None:
current = await node_client.get_auto_farming()
if current == set_autofarm:
print(f"Auto farming is already {'on' if set_autofarm else 'off'}")
return
result = await node_client.set_auto_farming(set_autofarm)
print(f"Auto farming is now {'on' if result else 'off'}")
from __future__ import annotations
import asyncio
import os
import sys
from pathlib import Path, PureWindowsPath
from random import randint
from typing import Any, Dict, List, Optional
from aiohttp import ClientConnectorError
from blspy import PrivateKey
from chia.cmds.cmds_util import get_any_service_client
from chia.cmds.start_funcs import async_start
from chia.consensus.coinbase import create_puzzlehash_for_pk
from chia.simulator.simulator_full_node_rpc_client import SimulatorFullNodeRpcClient
from chia.types.blockchain_format.sized_bytes import bytes32
from chia.types.coin_record import CoinRecord
from chia.util.bech32m import decode_puzzle_hash, encode_puzzle_hash
from chia.util.config import load_config, save_config
from chia.util.errors import KeychainFingerprintExists
from chia.util.ints import uint32
from chia.util.keychain import Keychain, bytes_to_mnemonic
from chia.wallet.derive_keys import (
master_sk_to_farmer_sk,
master_sk_to_pool_sk,
master_sk_to_wallet_sk,
master_sk_to_wallet_sk_unhardened,
)
def get_ph_from_fingerprint(fingerprint: int, key_id: int = 1) -> bytes32:
priv_key_and_entropy = Keychain().get_private_key_by_fingerprint(fingerprint)
if priv_key_and_entropy is None:
raise Exception("Fingerprint not found")
private_key = priv_key_and_entropy[0]
sk_for_wallet_id: PrivateKey = master_sk_to_wallet_sk(private_key, uint32(key_id))
puzzle_hash: bytes32 = create_puzzlehash_for_pk(sk_for_wallet_id.get_g1())
return puzzle_hash
def create_chia_directory(
chia_root: Path,
fingerprint: int,
farming_address: Optional[str],
plot_directory: Optional[str],
auto_farm: Optional[bool],
docker_mode: bool,
) -> Dict[str, Any]:
"""
This function creates a new chia directory and returns a heavily modified config,
suitable for use in the simulator.
"""
from chia.cmds.init_funcs import chia_init
if not chia_root.is_dir() or not Path(chia_root / "config" / "config.yaml").exists():
# create chia directories & load config
chia_init(chia_root, testnet=True, fix_ssl_permissions=True)
config: Dict[str, Any] = load_config(chia_root, "config.yaml")
# apply standard block-tools config.
config["full_node"]["send_uncompact_interval"] = 0
config["full_node"]["target_uncompact_proofs"] = 30
config["full_node"]["peer_connect_interval"] = 50
config["full_node"]["sanitize_weight_proof_only"] = False
config["logging"]["log_level"] = "INFO" # extra logs for easier development
# make sure we don't try to connect to other nodes.
config["full_node"]["introducer_peer"] = None
config["wallet"]["introducer_peer"] = None
config["full_node"]["dns_servers"] = []
config["wallet"]["dns_servers"] = []
# create custom testnet (simulator0)
config["network_overrides"]["constants"]["simulator0"] = config["network_overrides"]["constants"][
"testnet0"
].copy()
config["network_overrides"]["config"]["simulator0"] = config["network_overrides"]["config"]["testnet0"].copy()
sim_genesis = "eb8c4d20b322be8d9fddbf9412016bdffe9a2901d7edb0e364e94266d0e095f7"
config["network_overrides"]["constants"]["simulator0"]["GENESIS_CHALLENGE"] = sim_genesis
# tell services to use simulator0
config["selected_network"] = "simulator0"
config["wallet"]["selected_network"] = "simulator0"
config["full_node"]["selected_network"] = "simulator0"
if not docker_mode: # We want predictable ports for our docker image.
# set ports and networks, we don't want to cause a port conflict.
port_offset = randint(1, 20000)
config["daemon_port"] -= port_offset
config["network_overrides"]["config"]["simulator0"]["default_full_node_port"] = 38444 + port_offset
# wallet
config["wallet"]["port"] += port_offset
config["wallet"]["rpc_port"] += port_offset
# full node
config["full_node"]["port"] -= port_offset
config["full_node"]["rpc_port"] += port_offset
# connect wallet to full node
config["wallet"]["full_node_peer"]["port"] = config["full_node"]["port"]
config["full_node"]["wallet_peer"]["port"] = config["wallet"]["port"]
# ui
config["ui"]["daemon_port"] = config["daemon_port"]
else:
config["self_hostname"] = "0.0.0.0" # Bind to all interfaces.
config["logging"]["log_stdout"] = True # Log to console.
else:
config = load_config(chia_root, "config.yaml")
# simulator overrides
config["simulator"]["key_fingerprint"] = fingerprint
if farming_address is None:
prefix = config["network_overrides"]["config"]["simulator0"]["address_prefix"]
farming_address = encode_puzzle_hash(get_ph_from_fingerprint(fingerprint), prefix)
config["simulator"]["farming_address"] = farming_address
if plot_directory is not None:
config["simulator"]["plot_directory"] = plot_directory
# Temporary change to fix win / linux differences.
config["simulator"]["plot_directory"] = str(Path(config["simulator"]["plot_directory"]))
if "//" in config["simulator"]["plot_directory"] and os.name != "nt":
# if we're on linux, we need to convert to a linux path.
config["simulator"]["plot_directory"] = str(PureWindowsPath(config["simulator"]["plot_directory"]).as_posix())
config["simulator"]["auto_farm"] = auto_farm if auto_farm is not None else True
farming_ph = decode_puzzle_hash(farming_address)
# modify genesis block to give the user the reward
simulator_consts = config["network_overrides"]["constants"]["simulator0"]
simulator_consts["GENESIS_PRE_FARM_FARMER_PUZZLE_HASH"] = farming_ph.hex()
simulator_consts["GENESIS_PRE_FARM_POOL_PUZZLE_HASH"] = farming_ph.hex()
# save config and return the config
save_config(chia_root, "config.yaml", config)
return config
def display_key_info(fingerprint: int, prefix: str) -> None:
"""
Display key info for a given fingerprint, similar to the output of `chia keys show`.
"""
print(f"Using fingerprint {fingerprint}")
private_key_and_seed = Keychain().get_private_key_by_fingerprint(fingerprint)
if private_key_and_seed is None:
print(f"Fingerprint {fingerprint} not found")
return
sk, seed = private_key_and_seed
print("\nFingerprint:", sk.get_g1().get_fingerprint())
print("Master public key (m):", sk.get_g1())
print("Farmer public key (m/12381/8444/0/0):", master_sk_to_farmer_sk(sk).get_g1())
print("Pool public key (m/12381/8444/1/0):", master_sk_to_pool_sk(sk).get_g1())
first_wallet_sk: PrivateKey = master_sk_to_wallet_sk_unhardened(sk, uint32(0))
wallet_address: str = encode_puzzle_hash(create_puzzlehash_for_pk(first_wallet_sk.get_g1()), prefix)
print(f"First wallet address: {wallet_address}")
assert seed is not None
print("Master private key (m):", bytes(sk).hex())
print("First wallet secret key (m/12381/8444/2/0):", master_sk_to_wallet_sk(sk, uint32(0)))
mnemonic = bytes_to_mnemonic(seed)
print(" Mnemonic seed (24 secret words):")
print(f"{mnemonic} \n")
def generate_and_return_fingerprint(mnemonic: Optional[str] = None) -> int:
"""
Generate and add new PrivateKey and return its fingerprint.
"""
from chia.util.keychain import generate_mnemonic
if mnemonic is None:
print("Generating private key")
mnemonic = generate_mnemonic()
try:
sk = Keychain().add_private_key(mnemonic, None)
fingerprint: int = sk.get_g1().get_fingerprint()
except KeychainFingerprintExists as e:
fingerprint = e.fingerprint
print(f"Fingerprint: {fingerprint} for provided private key already exists.")
return fingerprint
print(f"Added private key with public key fingerprint {fingerprint}")
return fingerprint
def select_fingerprint(
fingerprint: Optional[int] = None, mnemonic_string: Optional[str] = None, auto_generate_key: bool = False
) -> Optional[int]:
"""
Either select an existing fingerprint or create one and return it.
"""
if mnemonic_string:
fingerprint = generate_and_return_fingerprint(mnemonic_string)
fingerprints: list[int] = [pk.get_fingerprint() for pk in Keychain().get_all_public_keys()]
if fingerprint is not None and fingerprint in fingerprints:
return fingerprint
elif fingerprint is not None and fingerprint not in fingerprints:
print(f"Invalid Fingerprint. Fingerprint {fingerprint} was not found.")
return None
if auto_generate_key and len(fingerprints) == 1:
return fingerprints[0]
if len(fingerprints) == 0:
if not auto_generate_key:
if (
input("No keys in keychain. Press 'q' to quit, or press any other key to generate a new key.").lower()
== "q"
):
return None
# generate private key and add to wallet
fingerprint = generate_and_return_fingerprint()
else:
print("Fingerprints:")
print(
"If you already used one of these keys, select that fingerprint to skip the plotting process."
" Otherwise, select any key below."
)
for i, fp in enumerate(fingerprints):
row: str = f"{i + 1}) "
row += f"{fp}"
print(row)
val = None
prompt: str = f"Choose a simulator key [1-{len(fingerprints)}] ('q' to quit, or 'g' to generate a new key): "
while val is None:
val = input(prompt)
if val == "q":
return None
elif val == "g":
fingerprint = generate_and_return_fingerprint()
break
elif not val.isdigit():
val = None
else:
index = int(val) - 1
if index < 0 or index >= len(fingerprints):
print("Invalid value")
val = None
continue
else:
fingerprint = fingerprints[index]
assert fingerprint is not None
return fingerprint
async def generate_plots(config: Dict[str, Any], root_path: Path, fingerprint: int, bitfield: bool) -> None:
"""
Pre-Generate plots for the new simulator instance.
"""
from chia.simulator.block_tools import BlockTools, test_constants
from chia.simulator.start_simulator import PLOT_SIZE, PLOTS
farming_puzzle_hash = decode_puzzle_hash(config["simulator"]["farming_address"])
os.environ["CHIA_ROOT"] = str(root_path) # change env variable, to make it match what the daemon would set it to
# create block tools and use local keychain
bt = BlockTools(
test_constants,
root_path,
automated_testing=False,
plot_dir=config["simulator"].get("plot_directory", "plots"),
keychain=Keychain(),
)
await bt.setup_keys(fingerprint=fingerprint, reward_ph=farming_puzzle_hash)
existing_plots = await bt.setup_plots(
num_og_plots=PLOTS, num_pool_plots=0, num_non_keychain_plots=0, plot_size=PLOT_SIZE, bitfield=bitfield
)
print(f"{'New plots generated.' if existing_plots else 'Using Existing Plots'}\n")
async def get_current_height(root_path: Path) -> int:
async with get_any_service_client(SimulatorFullNodeRpcClient, root_path=root_path, consume_errors=False) as (
node_client,
_,
):
assert node_client is not None # this cant be None, because we don't catch errors
num_blocks = len(await node_client.get_all_blocks())
return num_blocks
async def async_config_wizard(
root_path: Path,
fingerprint: Optional[int],
farming_address: Optional[str],
plot_directory: Optional[str],
mnemonic_string: Optional[str],
auto_farm: Optional[bool],
docker_mode: bool,
bitfield: bool,
) -> None:
# either return passed through fingerprint or get one
fingerprint = select_fingerprint(fingerprint, mnemonic_string, docker_mode)
if fingerprint is None:
# user cancelled wizard
return
# create chia directory & get config.
print("Creating chia directory & config...")
config = create_chia_directory(root_path, fingerprint, farming_address, plot_directory, auto_farm, docker_mode)
# Pre-generate plots by running block_tools init functions.
print("Please Wait, Generating plots...")
print("This may take up to a minute if you are on a slow machine")
await generate_plots(config, root_path, fingerprint, bitfield)
# final messages
final_farming_address = config["simulator"]["farming_address"]
print(f"\nFarming & Prefarm reward address: {final_farming_address}\n")
print("Configuration Wizard Complete.")
print("Starting Simulator now...\n\n")
sys.argv[0] = str(Path(sys.executable).parent / "chia") # fix path for tests
await async_start(root_path, config, ("simulator",), False)
# now we make sure the simulator has a genesis block
print("Please wait, generating genesis block.")
while True:
try:
num_blocks: int = await get_current_height(root_path)
except ClientConnectorError:
await asyncio.sleep(0.25)
else:
if num_blocks == 0:
await farm_blocks(None, root_path, 1, True, final_farming_address)
print("Genesis block generated, exiting.")
else:
print("Genesis block already exists, exiting.")
break
print(f"\nMake sure your CHIA_ROOT Environment Variable is set to: {root_path}")
def print_coin_record(
name: str,
address_prefix: str,
coin_record: CoinRecord,
) -> None:
from datetime import datetime
coin_address = encode_puzzle_hash(coin_record.coin.puzzle_hash, address_prefix)
print(f"Coin 0x{coin_record.name.hex()}")
print(f"Wallet Address: {coin_address}")
print(f"Confirmed at block: {coin_record.confirmed_block_index}")
print(f"Spent: {f'at Block {coin_record.spent_block_index}' if coin_record.spent else 'No'}")
print(f"Coin Amount: {coin_record.coin.amount} {name}")
print(f"Parent Coin ID: 0x{coin_record.coin.parent_coin_info.hex()}")
print(f"Created at: {datetime.fromtimestamp(float(coin_record.timestamp)).strftime('%Y-%m-%d %H:%M:%S')}\n")
async def print_coin_records(
config: Dict[str, Any],
node_client: SimulatorFullNodeRpcClient,
include_reward_coins: bool,
include_spent: bool = False,
) -> None:
import sys
coin_records: List[CoinRecord] = await node_client.get_all_coins(include_spent)
coin_records = [coin_record for coin_record in coin_records if not coin_record.coinbase or include_reward_coins]
address_prefix = config["network_overrides"]["config"][config["selected_network"]]["address_prefix"]
name = "mojo"
paginate = False # I might change this later.
if len(coin_records) != 0:
print("All Coins: ")
if paginate is True:
paginate = sys.stdout.isatty()
num_per_screen = 5 if paginate else len(coin_records)
# ripped from cmds/wallet_funcs.
for i in range(0, len(coin_records), num_per_screen):
for j in range(0, num_per_screen):
if i + j >= len(coin_records):
break
print_coin_record(
coin_record=coin_records[i + j],
name=name,
address_prefix=address_prefix,
)
if i + num_per_screen <= len(coin_records) and paginate:
print("Press q to quit, or c to continue")
while True:
entered_key = sys.stdin.read(1)
if entered_key == "q":
return None
elif entered_key == "c":
break
async def print_wallets(config: Dict[str, Any], node_client: SimulatorFullNodeRpcClient) -> None:
ph_and_amount = await node_client.get_all_puzzle_hashes()
address_prefix = config["network_overrides"]["config"][config["selected_network"]]["address_prefix"]
name = "mojo"
for puzzle_hash, (amount, num_tx) in ph_and_amount.items():
address = encode_puzzle_hash(puzzle_hash, address_prefix)
print(f"Address: {address} has a balance of: {amount} {name}, with a total of: {num_tx} transactions.\n")
async def print_status(
rpc_port: Optional[int],
root_path: Path,
fingerprint: Optional[int],
show_key: bool,
show_coins: bool,
include_reward_coins: bool,
show_addresses: bool,
) -> None:
"""
This command allows users to easily get the status of the simulator
and information about the state of and the coins in the simulated blockchain.
"""
from chia.cmds.show_funcs import print_blockchain_state
from chia.cmds.units import units
async with get_any_service_client(SimulatorFullNodeRpcClient, rpc_port, root_path) as (node_client, config):
if node_client is not None:
# Display keychain info
if show_key:
if fingerprint is None:
fingerprint = config["simulator"]["key_fingerprint"]
if fingerprint is not None:
display_key_info(
fingerprint, config["network_overrides"]["config"][config["selected_network"]]["address_prefix"]
)
else:
print(
"No fingerprint in config, either rerun 'cdv sim create' "
"or use --fingerprint to specify one, skipping key information."
)
# chain status ( basically chia show -s)
await print_blockchain_state(node_client, config)
print("")
# farming information
target_ph: bytes32 = await node_client.get_farming_ph()
farming_coin_records = await node_client.get_coin_records_by_puzzle_hash(target_ph, False)
prefix = config["network_overrides"]["config"][config["selected_network"]]["address_prefix"]
print(
f"Current Farming address: {encode_puzzle_hash(target_ph, prefix)}, "
f"with a balance of: "
f"{sum(coin_records.coin.amount for coin_records in farming_coin_records) / units['chia']} TXCH."
)
if show_addresses:
print("All Addresses: ")
await print_wallets(config, node_client)
if show_coins:
await print_coin_records(config, node_client, include_reward_coins)
async def revert_block_height(
rpc_port: Optional[int],
root_path: Path,
num_blocks: int,
num_new_blocks: int,
reset_chain_to_genesis: bool,
use_revert_blocks: bool,
) -> None:
"""
This function allows users to easily revert the chain to a previous state or perform a reorg.
"""
async with get_any_service_client(SimulatorFullNodeRpcClient, rpc_port, root_path) as (node_client, _):
if node_client is not None:
if use_revert_blocks:
if num_new_blocks != 1:
print(f"Ignoring num_new_blocks: {num_new_blocks}, because we are not performing a reorg.")
# in this case num_blocks is the number of blocks to delete
new_height: int = await node_client.revert_blocks(num_blocks, reset_chain_to_genesis)
print(
f"All transactions in Block: {new_height + num_blocks} and above were successfully deleted, "
"you should now delete & restart all wallets."
)
else:
# However, in this case num_blocks is the fork height.
new_height = await node_client.reorg_blocks(num_blocks, num_new_blocks, use_revert_blocks)
old_height = new_height - num_new_blocks
print(f"All transactions in Block: {old_height - num_blocks} and above were successfully reverted.")
print(f"Block Height is now: {new_height}")
async def farm_blocks(
rpc_port: Optional[int],
root_path: Path,
num_blocks: int,
transaction_blocks: bool,
target_address: str,
) -> None:
"""
This function is used to generate new blocks.
"""
async with get_any_service_client(SimulatorFullNodeRpcClient, rpc_port, root_path) as (node_client, config):
if node_client is not None:
if target_address == "":
target_address = config["simulator"]["farming_address"]
if target_address is None:
print(
"No target address in config, falling back to the temporary address currently in use. "
"You can use 'cdv sim create' or use --target-address to specify a different address."
)
target_ph: bytes32 = await node_client.get_farming_ph()
else:
target_ph = decode_puzzle_hash(target_address)
await node_client.farm_block(target_ph, num_blocks, transaction_blocks)
print(f"Farmed {num_blocks}{' Transaction' if transaction_blocks else ''} blocks")
block_height = (await node_client.get_blockchain_state())["peak"].height
print(f"Block Height is now: {block_height}")
async def set_auto_farm(rpc_port: Optional[int], root_path: Path, set_autofarm: bool) -> None:
"""
This function can be used to enable or disable Auto Farming.
"""
async with get_any_service_client(SimulatorFullNodeRpcClient, rpc_port, root_path) as (node_client, _):
if node_client is not None:
current = await node_client.get_auto_farming()
if current == set_autofarm:
print(f"Auto farming is already {'on' if set_autofarm else 'off'}")
return
result = await node_client.set_auto_farming(set_autofarm)
print(f"Auto farming is now {'on' if result else 'off'}")

View File

@ -1,191 +1,191 @@
from __future__ import annotations
import logging
import random
from typing import Dict, List, Optional, Set
from chia.types.blockchain_format.coin import Coin
from chia.types.blockchain_format.sized_bytes import bytes32
from chia.util.ints import uint64, uint128
from chia.wallet.wallet_coin_record import WalletCoinRecord
async def select_coins(
spendable_amount: uint128,
max_coin_amount: uint64,
spendable_coins: List[WalletCoinRecord],
unconfirmed_removals: Dict[bytes32, Coin],
log: logging.Logger,
amount: uint128,
exclude: Optional[List[Coin]] = None,
min_coin_amount: Optional[uint64] = None,
excluded_coin_amounts: Optional[List[uint64]] = None,
) -> Set[Coin]:
"""
Returns a set of coins that can be used for generating a new transaction.
"""
if exclude is None:
exclude = []
if min_coin_amount is None:
min_coin_amount = uint64(0)
if excluded_coin_amounts is None:
excluded_coin_amounts = []
if amount > spendable_amount:
error_msg = (
f"Can't select amount higher than our spendable balance. Amount: {amount}, spendable: {spendable_amount}"
)
log.warning(error_msg)
raise ValueError(error_msg)
log.debug(f"About to select coins for amount {amount}")
max_num_coins = 500
sum_spendable_coins = 0
valid_spendable_coins: List[Coin] = []
for coin_record in spendable_coins: # remove all the unconfirmed coins, excluded coins and dust.
if coin_record.coin.name() in unconfirmed_removals:
continue
if coin_record.coin in exclude:
continue
if coin_record.coin.amount < min_coin_amount or coin_record.coin.amount > max_coin_amount:
continue
if coin_record.coin.amount in excluded_coin_amounts:
continue
valid_spendable_coins.append(coin_record.coin)
sum_spendable_coins += coin_record.coin.amount
# This happens when we couldn't use one of the coins because it's already used
# but unconfirmed, and we are waiting for the change. (unconfirmed_additions)
if sum_spendable_coins < amount:
raise ValueError(
f"Transaction for {amount} is greater than spendable balance of {sum_spendable_coins}. "
"There may be other transactions pending or our minimum coin amount is too high."
)
if amount == 0 and sum_spendable_coins == 0:
raise ValueError(
"No coins available to spend, you can not create a coin with an amount of 0,"
" without already having coins."
)
# Sort the coins by amount
valid_spendable_coins.sort(reverse=True, key=lambda r: r.amount)
# check for exact 1 to 1 coin match.
exact_match_coin: Optional[Coin] = check_for_exact_match(valid_spendable_coins, uint64(amount))
if exact_match_coin:
log.debug(f"selected coin with an exact match: {exact_match_coin}")
return {exact_match_coin}
# Check for an exact match with all of the coins smaller than the amount.
# If we have more, smaller coins than the amount we run the next algorithm.
smaller_coin_sum = 0 # coins smaller than target.
smaller_coins: List[Coin] = []
for coin in valid_spendable_coins:
if coin.amount < amount:
smaller_coin_sum += coin.amount
smaller_coins.append(coin)
if smaller_coin_sum == amount and len(smaller_coins) < max_num_coins and amount != 0:
log.debug(f"Selected all smaller coins because they equate to an exact match of the target.: {smaller_coins}")
return set(smaller_coins)
elif smaller_coin_sum < amount:
smallest_coin: Optional[Coin] = select_smallest_coin_over_target(amount, valid_spendable_coins)
assert smallest_coin is not None # Since we know we have enough, there must be a larger coin
log.debug(f"Selected closest greater coin: {smallest_coin.name()}")
return {smallest_coin}
elif smaller_coin_sum > amount:
coin_set: Optional[Set[Coin]] = knapsack_coin_algorithm(smaller_coins, amount, max_coin_amount, max_num_coins)
log.debug(f"Selected coins from knapsack algorithm: {coin_set}")
if coin_set is None:
coin_set = sum_largest_coins(amount, smaller_coins)
if coin_set is None or len(coin_set) > max_num_coins:
greater_coin = select_smallest_coin_over_target(amount, valid_spendable_coins)
if greater_coin is None:
raise ValueError(
f"Transaction of {amount} mojo would use more than "
f"{max_num_coins} coins. Try sending a smaller amount"
)
coin_set = {greater_coin}
return coin_set
else:
# if smaller_coin_sum == amount and (len(smaller_coins) >= max_num_coins or amount == 0)
potential_large_coin: Optional[Coin] = select_smallest_coin_over_target(amount, valid_spendable_coins)
if potential_large_coin is None:
raise ValueError("Too many coins are required to make this transaction")
log.debug(f"Resorted to selecting smallest coin over target due to dust.: {potential_large_coin}")
return {potential_large_coin}
# These algorithms were based off of the algorithms in:
# https://murch.one/wp-content/uploads/2016/11/erhardt2016coinselection.pdf
# we use this to check if one of the coins exactly matches the target.
def check_for_exact_match(coin_list: List[Coin], target: uint64) -> Optional[Coin]:
for coin in coin_list:
if coin.amount == target:
return coin
return None
# amount of coins smaller than target, followed by a list of all valid spendable coins.
# Coins must be sorted in descending amount order.
def select_smallest_coin_over_target(target: uint128, sorted_coin_list: List[Coin]) -> Optional[Coin]:
if sorted_coin_list[0].amount < target:
return None
for coin in reversed(sorted_coin_list):
if coin.amount >= target:
return coin
assert False # Should never reach here
# we use this to find the set of coins which have total value closest to the target, but at least the target.
# IMPORTANT: The coins have to be sorted in descending order or else this function will not work.
def knapsack_coin_algorithm(
smaller_coins: List[Coin], target: uint128, max_coin_amount: int, max_num_coins: int, seed: bytes = b"knapsack seed"
) -> Optional[Set[Coin]]:
best_set_sum = max_coin_amount
best_set_of_coins: Optional[Set[Coin]] = None
ran: random.Random = random.Random()
ran.seed(seed)
for i in range(1000):
# reset these variables every loop.
selected_coins: Set[Coin] = set()
selected_coins_sum = 0
n_pass = 0
target_reached = False
while n_pass < 2 and not target_reached:
for coin in smaller_coins:
# run 2 passes where the first pass may select a coin 50% of the time.
# the second pass runs to finish the set if the first pass didn't finish the set.
# this makes each trial random and increases the chance of getting a perfect set.
if (n_pass == 0 and bool(ran.getrandbits(1))) or (n_pass == 1 and coin not in selected_coins):
if len(selected_coins) > max_num_coins:
break
selected_coins_sum += coin.amount
selected_coins.add(coin)
if selected_coins_sum == target:
return selected_coins
if selected_coins_sum > target:
target_reached = True
if selected_coins_sum < best_set_sum:
best_set_of_coins = selected_coins.copy()
best_set_sum = selected_coins_sum
selected_coins_sum -= coin.amount
selected_coins.remove(coin)
n_pass += 1
return best_set_of_coins
# Adds up the largest coins in the list, resulting in the minimum number of selected coins. A solution
# is guaranteed if and only if the sum(coins) >= target. Coins must be sorted in descending amount order.
def sum_largest_coins(target: uint128, sorted_coins: List[Coin]) -> Optional[Set[Coin]]:
total_value = 0
selected_coins: Set[Coin] = set()
for coin in sorted_coins:
total_value += coin.amount
selected_coins.add(coin)
if total_value >= target:
return selected_coins
return None
from __future__ import annotations
import logging
import random
from typing import Dict, List, Optional, Set
from chia.types.blockchain_format.coin import Coin
from chia.types.blockchain_format.sized_bytes import bytes32
from chia.util.ints import uint64, uint128
from chia.wallet.wallet_coin_record import WalletCoinRecord
async def select_coins(
spendable_amount: uint128,
max_coin_amount: uint64,
spendable_coins: List[WalletCoinRecord],
unconfirmed_removals: Dict[bytes32, Coin],
log: logging.Logger,
amount: uint128,
exclude: Optional[List[Coin]] = None,
min_coin_amount: Optional[uint64] = None,
excluded_coin_amounts: Optional[List[uint64]] = None,
) -> Set[Coin]:
"""
Returns a set of coins that can be used for generating a new transaction.
"""
if exclude is None:
exclude = []
if min_coin_amount is None:
min_coin_amount = uint64(0)
if excluded_coin_amounts is None:
excluded_coin_amounts = []
if amount > spendable_amount:
error_msg = (
f"Can't select amount higher than our spendable balance. Amount: {amount}, spendable: {spendable_amount}"
)
log.warning(error_msg)
raise ValueError(error_msg)
log.debug(f"About to select coins for amount {amount}")
max_num_coins = 500
sum_spendable_coins = 0
valid_spendable_coins: List[Coin] = []
for coin_record in spendable_coins: # remove all the unconfirmed coins, excluded coins and dust.
if coin_record.coin.name() in unconfirmed_removals:
continue
if coin_record.coin in exclude:
continue
if coin_record.coin.amount < min_coin_amount or coin_record.coin.amount > max_coin_amount:
continue
if coin_record.coin.amount in excluded_coin_amounts:
continue
valid_spendable_coins.append(coin_record.coin)
sum_spendable_coins += coin_record.coin.amount
# This happens when we couldn't use one of the coins because it's already used
# but unconfirmed, and we are waiting for the change. (unconfirmed_additions)
if sum_spendable_coins < amount:
raise ValueError(
f"Transaction for {amount} is greater than spendable balance of {sum_spendable_coins}. "
"There may be other transactions pending or our minimum coin amount is too high."
)
if amount == 0 and sum_spendable_coins == 0:
raise ValueError(
"No coins available to spend, you can not create a coin with an amount of 0,"
" without already having coins."
)
# Sort the coins by amount
valid_spendable_coins.sort(reverse=True, key=lambda r: r.amount)
# check for exact 1 to 1 coin match.
exact_match_coin: Optional[Coin] = check_for_exact_match(valid_spendable_coins, uint64(amount))
if exact_match_coin:
log.debug(f"selected coin with an exact match: {exact_match_coin}")
return {exact_match_coin}
# Check for an exact match with all of the coins smaller than the amount.
# If we have more, smaller coins than the amount we run the next algorithm.
smaller_coin_sum = 0 # coins smaller than target.
smaller_coins: List[Coin] = []
for coin in valid_spendable_coins:
if coin.amount < amount:
smaller_coin_sum += coin.amount
smaller_coins.append(coin)
if smaller_coin_sum == amount and len(smaller_coins) < max_num_coins and amount != 0:
log.debug(f"Selected all smaller coins because they equate to an exact match of the target.: {smaller_coins}")
return set(smaller_coins)
elif smaller_coin_sum < amount:
smallest_coin: Optional[Coin] = select_smallest_coin_over_target(amount, valid_spendable_coins)
assert smallest_coin is not None # Since we know we have enough, there must be a larger coin
log.debug(f"Selected closest greater coin: {smallest_coin.name()}")
return {smallest_coin}
elif smaller_coin_sum > amount:
coin_set: Optional[Set[Coin]] = knapsack_coin_algorithm(smaller_coins, amount, max_coin_amount, max_num_coins)
log.debug(f"Selected coins from knapsack algorithm: {coin_set}")
if coin_set is None:
coin_set = sum_largest_coins(amount, smaller_coins)
if coin_set is None or len(coin_set) > max_num_coins:
greater_coin = select_smallest_coin_over_target(amount, valid_spendable_coins)
if greater_coin is None:
raise ValueError(
f"Transaction of {amount} mojo would use more than "
f"{max_num_coins} coins. Try sending a smaller amount"
)
coin_set = {greater_coin}
return coin_set
else:
# if smaller_coin_sum == amount and (len(smaller_coins) >= max_num_coins or amount == 0)
potential_large_coin: Optional[Coin] = select_smallest_coin_over_target(amount, valid_spendable_coins)
if potential_large_coin is None:
raise ValueError("Too many coins are required to make this transaction")
log.debug(f"Resorted to selecting smallest coin over target due to dust.: {potential_large_coin}")
return {potential_large_coin}
# These algorithms were based off of the algorithms in:
# https://murch.one/wp-content/uploads/2016/11/erhardt2016coinselection.pdf
# we use this to check if one of the coins exactly matches the target.
def check_for_exact_match(coin_list: List[Coin], target: uint64) -> Optional[Coin]:
for coin in coin_list:
if coin.amount == target:
return coin
return None
# amount of coins smaller than target, followed by a list of all valid spendable coins.
# Coins must be sorted in descending amount order.
def select_smallest_coin_over_target(target: uint128, sorted_coin_list: List[Coin]) -> Optional[Coin]:
if sorted_coin_list[0].amount < target:
return None
for coin in reversed(sorted_coin_list):
if coin.amount >= target:
return coin
assert False # Should never reach here
# we use this to find the set of coins which have total value closest to the target, but at least the target.
# IMPORTANT: The coins have to be sorted in descending order or else this function will not work.
def knapsack_coin_algorithm(
smaller_coins: List[Coin], target: uint128, max_coin_amount: int, max_num_coins: int, seed: bytes = b"knapsack seed"
) -> Optional[Set[Coin]]:
best_set_sum = max_coin_amount
best_set_of_coins: Optional[Set[Coin]] = None
ran: random.Random = random.Random()
ran.seed(seed)
for i in range(1000):
# reset these variables every loop.
selected_coins: Set[Coin] = set()
selected_coins_sum = 0
n_pass = 0
target_reached = False
while n_pass < 2 and not target_reached:
for coin in smaller_coins:
# run 2 passes where the first pass may select a coin 50% of the time.
# the second pass runs to finish the set if the first pass didn't finish the set.
# this makes each trial random and increases the chance of getting a perfect set.
if (n_pass == 0 and bool(ran.getrandbits(1))) or (n_pass == 1 and coin not in selected_coins):
if len(selected_coins) > max_num_coins:
break
selected_coins_sum += coin.amount
selected_coins.add(coin)
if selected_coins_sum == target:
return selected_coins
if selected_coins_sum > target:
target_reached = True
if selected_coins_sum < best_set_sum:
best_set_of_coins = selected_coins.copy()
best_set_sum = selected_coins_sum
selected_coins_sum -= coin.amount
selected_coins.remove(coin)
n_pass += 1
return best_set_of_coins
# Adds up the largest coins in the list, resulting in the minimum number of selected coins. A solution
# is guaranteed if and only if the sum(coins) >= target. Coins must be sorted in descending amount order.
def sum_largest_coins(target: uint128, sorted_coins: List[Coin]) -> Optional[Set[Coin]]:
total_value = 0
selected_coins: Set[Coin] = set()
for coin in sorted_coins:
total_value += coin.amount
selected_coins.add(coin)
if total_value >= target:
return selected_coins
return None