rest (working on it) [skip ci]

This commit is contained in:
Matt Hauff 2023-06-30 15:54:42 -07:00
parent c94e9118fe
commit 6e4aa410fe
No known key found for this signature in database
35 changed files with 3005 additions and 632 deletions

@ -1 +1 @@
Subproject commit a1154d5d3ce00abecd2f9e0b8d079df145b52843
Subproject commit 018f0b08f6890d92905053680d9824ef31a0feb9

View File

@ -1620,3 +1620,52 @@ def _revoke_vc(
"reuse_puzhash": reuse_puzhash,
}
asyncio.run(execute_with_wallet(wallet_rpc_port, fingerprint, extra_params, revoke_vc))
@vcs_cmd.command("approve_r_cats", help="Claim any R-CATs that are currently pending VC approval")
@click.option(
"-wp",
"--wallet-rpc-port",
help="Set the port where the Wallet is hosting the RPC interface. See the rpc_port under wallet in config.yaml",
type=int,
default=None,
)
@click.option("-f", "--fingerprint", help="Set the fingerprint to specify which key to use", type=int)
@click.option("-i", "--id", help="Id of the wallet with the pending approval balance", type=int, required=True)
@click.option(
"-a", "--min-amount-to-claim", help="The minimum amount to approve to move into the wallet", type=str, required=True
)
@click.option(
"-m", "--fee", type=str, default=0, show_default=True, help="Blockchain fee for approval transaction, in XCH"
)
@click.option("-ma", "--min-coin-amount", type=int, help="The minimum coin amount to select")
@click.option("-l", "--max-coin-amount", type=int, help="The maximum coin amount to select")
@click.option(
"--reuse",
help="Reuse existing address for the change.",
is_flag=True,
default=False,
)
def approve_r_cats_cmd(
wallet_rpc_port: Optional[int],
fingerprint: int,
id: int,
min_amount_to_claim: str,
fee: str,
min_coin_amount: Optional[int],
max_coin_amount: Optional[int],
reuse: bool,
) -> None: # pragma: no cover
extra_params = {
"id": id,
"min_amount_to_claim": min_amount_to_claim,
"fee": fee,
"min_coin_amount": min_coin_amount,
"max_coin_amount": max_coin_amount,
"reuse": reuse,
}
import asyncio
from .wallet_funcs import approve_r_cats
asyncio.run(execute_with_wallet(wallet_rpc_port, fingerprint, extra_params, approve_r_cats))

View File

@ -83,16 +83,16 @@ def print_transaction(
print("")
def get_mojo_per_unit(wallet_type: WalletType) -> int:
def get_mojo_per_unit(wallet_type: WalletType) -> int: # pragma: no cover
mojo_per_unit: int
if wallet_type in {
WalletType.STANDARD_WALLET,
WalletType.POOLING_WALLET,
WalletType.DATA_LAYER,
WalletType.VC,
}: # pragma: no cover
}:
mojo_per_unit = units["chia"]
elif wallet_type == WalletType.CAT:
elif wallet_type in {WalletType.CAT, WalletType.CRCAT}:
mojo_per_unit = units["cat"]
else:
raise LookupError(f"Operation is not supported for Wallet type {wallet_type.name}")
@ -116,7 +116,7 @@ async def get_unit_name_for_wallet_id(
wallet_type: WalletType,
wallet_id: int,
wallet_client: WalletRpcClient,
):
): # pragma: no cover
if wallet_type in {
WalletType.STANDARD_WALLET,
WalletType.POOLING_WALLET,
@ -124,7 +124,7 @@ async def get_unit_name_for_wallet_id(
WalletType.VC,
}: # pragma: no cover
name = config["network_overrides"]["config"][config["selected_network"]]["address_prefix"].upper()
elif wallet_type == WalletType.CAT:
elif wallet_type in {WalletType.CAT, WalletType.CRCAT}:
name = await wallet_client.get_cat_name(wallet_id=wallet_id)
else:
raise LookupError(f"Operation is not supported for Wallet type {wallet_type.name}")
@ -297,7 +297,7 @@ async def send(args: dict, wallet_client: WalletRpcClient, fingerprint: int) ->
if clawback_time_lock > 0
else None,
)
elif typ == WalletType.CAT:
elif typ in {WalletType.CAT, WalletType.CRCAT}:
print("Submitting transaction...")
res = await wallet_client.cat_spend(
wallet_id,
@ -772,8 +772,8 @@ async def cancel_offer(args: dict, wallet_client: WalletRpcClient, fingerprint:
print(f"Use chia wallet get_offers --id {trade_record.trade_id} -f {fingerprint} to view cancel status")
def wallet_coin_unit(typ: WalletType, address_prefix: str) -> Tuple[str, int]:
if typ == WalletType.CAT:
def wallet_coin_unit(typ: WalletType, address_prefix: str) -> Tuple[str, int]: # pragma: no cover
if typ in {WalletType.CAT, WalletType.CRCAT}:
return "", units["cat"]
if typ in [WalletType.STANDARD_WALLET, WalletType.POOLING_WALLET, WalletType.MULTI_SIG]:
return address_prefix, units["chia"]
@ -791,7 +791,7 @@ def print_balance(amount: int, scale: int, address_prefix: str, *, decimal_only:
return ret
async def print_balances(args: dict, wallet_client: WalletRpcClient, fingerprint: int) -> None:
async def print_balances(args: dict, wallet_client: WalletRpcClient, fingerprint: int) -> None: # pragma: no cover
wallet_type: Optional[WalletType] = None
if "type" in args:
wallet_type = WalletType(args["type"])
@ -831,24 +831,32 @@ async def print_balances(args: dict, wallet_client: WalletRpcClient, fingerprint
)
spendable_balance: str = print_balance(balances["spendable_balance"], scale, address_prefix)
my_did: Optional[str] = None
ljust = 23
if typ == WalletType.CRCAT:
ljust = 36
print()
print(f"{summary['name']}:")
print(f"{indent}{'-Total Balance:'.ljust(23)} {total_balance}")
print(f"{indent}{'-Pending Total Balance:'.ljust(23)} {unconfirmed_wallet_balance}")
print(f"{indent}{'-Spendable:'.ljust(23)} {spendable_balance}")
print(f"{indent}{'-Type:'.ljust(23)} {typ.name}")
print(f"{indent}{'-Total Balance:'.ljust(ljust)} {total_balance}")
if typ == WalletType.CRCAT:
print(
f"{indent}{'-Balance Pending VC Approval:'.ljust(ljust)} "
f"{print_balance(balances['pending_approval_balance'], scale, address_prefix)}"
)
print(f"{indent}{'-Pending Total Balance:'.ljust(ljust)} {unconfirmed_wallet_balance}")
print(f"{indent}{'-Spendable:'.ljust(ljust)} {spendable_balance}")
print(f"{indent}{'-Type:'.ljust(ljust)} {typ.name}")
if typ == WalletType.DECENTRALIZED_ID:
get_did_response = await wallet_client.get_did_id(wallet_id)
my_did = get_did_response["my_did"]
print(f"{indent}{'-DID ID:'.ljust(23)} {my_did}")
print(f"{indent}{'-DID ID:'.ljust(ljust)} {my_did}")
elif typ == WalletType.NFT:
get_did_response = await wallet_client.get_nft_wallet_did(wallet_id)
my_did = get_did_response["did_id"]
if my_did is not None and len(my_did) > 0:
print(f"{indent}{'-DID ID:'.ljust(23)} {my_did}")
print(f"{indent}{'-DID ID:'.ljust(ljust)} {my_did}")
elif len(asset_id) > 0:
print(f"{indent}{'-Asset ID:'.ljust(23)} {asset_id}")
print(f"{indent}{'-Wallet ID:'.ljust(23)} {wallet_id}")
print(f"{indent}{'-Asset ID:'.ljust(ljust)} {asset_id}")
print(f"{indent}{'-Wallet ID:'.ljust(ljust)} {wallet_id}")
print(" ")
trusted_peers: Dict = config["wallet"].get("trusted_peers", {})
@ -1408,3 +1416,40 @@ async def revoke_vc(args: Dict, wallet_client: WalletRpcClient, fingerprint: int
address_prefix=selected_network_address_prefix(config),
mojo_per_unit=get_mojo_per_unit(wallet_type=WalletType.STANDARD_WALLET),
)
async def approve_r_cats(args: Dict, wallet_client: WalletRpcClient, fingerprint: int) -> None: # pragma: no cover
config = load_config(DEFAULT_ROOT_PATH, "config.yaml", SERVICE_NAME)
txs = await wallet_client.crcat_approve_pending(
wallet_id=uint32(args["id"]),
min_amount_to_claim=uint64(int(Decimal(args["min_amount_to_claim"]) * units["cat"])),
fee=uint64(int(Decimal(args["fee"]) * units["chia"])),
min_coin_amount=uint64(args["min_coin_amount"]) if args["min_coin_amount"] is not None else None,
max_coin_amount=uint64(args["max_coin_amount"]) if args["max_coin_amount"] is not None else None,
reuse_puzhash=args["reuse"],
)
print("VC successfully approved R-CATs!")
print("Relevant TX records:")
print("")
for tx in txs:
try:
wallet_type = await get_wallet_type(wallet_id=tx.wallet_id, wallet_client=wallet_client)
mojo_per_unit = get_mojo_per_unit(wallet_type=wallet_type)
name = await get_unit_name_for_wallet_id(
config=config,
wallet_type=wallet_type,
wallet_id=tx.wallet_id,
wallet_client=wallet_client,
)
except LookupError as e:
print(e.args[0])
return
print_transaction(
tx,
verbose=False,
name=name,
address_prefix=selected_network_address_prefix(config),
mojo_per_unit=mojo_per_unit,
)

View File

@ -78,6 +78,7 @@ from chia.wallet.util.query_filter import HashFilter, TransactionTypeFilter
from chia.wallet.util.transaction_type import CLAWBACK_TRANSACTION_TYPES, TransactionType
from chia.wallet.util.wallet_sync_utils import fetch_coin_spend_for_coin_state
from chia.wallet.util.wallet_types import CoinType, WalletType
from chia.wallet.vc_wallet.cr_cat_wallet import CRCATWallet
from chia.wallet.vc_wallet.vc_store import VCProofs
from chia.wallet.vc_wallet.vc_wallet import VCWallet
from chia.wallet.wallet import CHIP_0002_SIGN_MESSAGE_PREFIX, Wallet
@ -236,6 +237,8 @@ class WalletRpcApi:
"/vc_add_proofs": self.vc_add_proofs,
"/vc_get_proofs_for_root": self.vc_get_proofs_for_root,
"/vc_revoke": self.vc_revoke,
# CR-CATs
"/crcat_approve_pending": self.crcat_approve_pending,
}
def get_connections(self, request_node_type: Optional[NodeType]) -> List[Dict[str, Any]]:
@ -814,9 +817,13 @@ class WalletRpcApi:
wallet_balance["wallet_type"] = wallet.type()
if self.service.logged_in_fingerprint is not None:
wallet_balance["fingerprint"] = self.service.logged_in_fingerprint
if wallet.type() == WalletType.CAT:
if wallet.type() in {WalletType.CAT, WalletType.CRCAT}:
assert isinstance(wallet, CATWallet)
wallet_balance["asset_id"] = wallet.get_asset_id()
if wallet.type() == WalletType.CRCAT:
assert isinstance(wallet, CRCATWallet)
wallet_balance["pending_approval_balance"] = await wallet.get_pending_approval_balance()
return wallet_balance
async def get_wallet_balance(self, request: Dict) -> EndpointResult:
@ -909,7 +916,7 @@ class WalletRpcApi:
record: Optional[WalletCoinRecord] = await self.service.wallet_state_manager.coin_store.get_coin_record(
coin.name()
)
assert record is not None, f"Cannot find coin record for clawback transaction {tx['name']}"
assert record is not None, f"Cannot find coin record for type {tx['type']} transaction {tx['name']}"
tx["metadata"] = record.parsed_metadata().to_json_dict()
tx["metadata"]["coin_id"] = coin.name().hex()
tx["metadata"]["spent"] = record.spent
@ -955,7 +962,7 @@ class WalletRpcApi:
assert isinstance(wallet, Wallet)
raw_puzzle_hash = await wallet.get_puzzle_hash(create_new)
address = encode_puzzle_hash(raw_puzzle_hash, prefix)
elif wallet.type() == WalletType.CAT:
elif wallet.type() in {WalletType.CAT, WalletType.CRCAT}:
assert isinstance(wallet, CATWallet)
raw_puzzle_hash = await wallet.standard_wallet.get_puzzle_hash(create_new)
address = encode_puzzle_hash(raw_puzzle_hash, prefix)
@ -1036,13 +1043,13 @@ class WalletRpcApi:
wallet = self.service.wallet_state_manager.wallets[wallet_id]
async with self.service.wallet_state_manager.lock:
if wallet.type() == WalletType.CAT:
if wallet.type() in {WalletType.CAT, WalletType.CRCAT}:
assert isinstance(wallet, CATWallet)
transaction: Dict = (await self.cat_spend(request, hold_lock=False))["transaction"]
else:
transaction = (await self.create_signed_transaction(request, hold_lock=False))["signed_tx"]
tr = TransactionRecord.from_json_dict_convenience(transaction)
if wallet.type() != WalletType.CAT:
if wallet.type() not in {WalletType.CAT, WalletType.CRCAT}:
assert isinstance(wallet, Wallet)
await wallet.push_transaction(tr)
@ -1076,7 +1083,9 @@ class WalletRpcApi:
tx_id_list: List[bytes] = []
for coin_id, coin_record in coin_records.coin_id_to_record.items():
try:
coins[coin_record.coin] = coin_record.parsed_metadata()
metadata = coin_record.parsed_metadata()
assert isinstance(metadata, ClawbackMetadata)
coins[coin_record.coin] = metadata
if len(coins) >= batch_size:
tx_id_list.extend((await self.service.wallet_state_manager.spend_clawback_coins(coins, tx_fee)))
coins = {}
@ -1161,7 +1170,7 @@ class WalletRpcApi:
wallet = state_mgr.wallets[wallet_id]
async with state_mgr.lock:
all_coin_records = await state_mgr.coin_store.get_unspent_coins_for_wallet(wallet_id)
if wallet.type() == WalletType.CAT:
if wallet.type() in {WalletType.CAT, WalletType.CRCAT}:
assert isinstance(wallet, CATWallet)
spendable_coins: List[WalletCoinRecord] = await wallet.get_cat_spendable_coins(all_coin_records)
else:
@ -3012,7 +3021,7 @@ class WalletRpcApi:
wallet = self.service.wallet_state_manager.main_wallet
assert isinstance(
wallet, (Wallet, CATWallet)
wallet, (Wallet, CATWallet, CRCATWallet)
), "create_signed_transaction only works for standard and CAT wallets"
if "additions" not in request or len(request["additions"]) < 1:
@ -3607,3 +3616,41 @@ class WalletRpcApi:
return {
"transactions": [tx.to_json_dict_convenience(self.service.config) for tx in txs],
}
async def crcat_approve_pending(self, request) -> Dict:
"""
Moving any "pending approval" CR-CATs into the spendable balance of the wallet
:param request: Required 'wallet_id'. Optional 'min_amount_to_claim' (deafult: full balance).
Standard transaction params 'fee' & 'reuse_puzhash'.
:return: all relevant 'transactions'
"""
@streamable
@dataclasses.dataclass(frozen=True)
class CRCATApprovePending(Streamable):
wallet_id: uint32
min_amount_to_claim: uint64
fee: uint64 = uint64(0)
min_coin_amount: Optional[uint64] = None
max_coin_amount: Optional[uint64] = None
excluded_coin_amounts: Optional[List[uint64]] = None
reuse_puzhash: Optional[bool] = None
parsed_request = CRCATApprovePending.from_json_dict(request)
cr_cat_wallet = self.service.wallet_state_manager.wallets[parsed_request.wallet_id]
assert isinstance(cr_cat_wallet, CRCATWallet)
txs = await cr_cat_wallet.claim_pending_approval_balance(
parsed_request.min_amount_to_claim,
fee=parsed_request.fee,
min_coin_amount=parsed_request.min_coin_amount,
max_coin_amount=parsed_request.max_coin_amount,
excluded_coin_amounts=parsed_request.excluded_coin_amounts,
reuse_puzhash=parsed_request.reuse_puzhash,
)
for tx in txs:
await self.service.wallet_state_manager.add_pending_transaction(tx)
return {
"transactions": [tx.to_json_dict_convenience(self.service.config) for tx in txs],
}

View File

@ -1263,3 +1263,27 @@ class WalletRpcClient(RpcClient):
"vc_revoke", {"vc_parent_id": vc_parent_id.hex(), "fee": fee, "reuse_puzhash": reuse_puzhash}
)
return [TransactionRecord.from_json_dict_convenience(tx) for tx in response["transactions"]]
async def crcat_approve_pending(
self,
wallet_id: uint32,
min_amount_to_claim: uint64,
fee: uint64 = uint64(0),
min_coin_amount: Optional[uint64] = None,
max_coin_amount: Optional[uint64] = None,
exclude_coin_amounts: Optional[List[uint64]] = None,
reuse_puzhash: Optional[bool] = None,
) -> List[TransactionRecord]:
response = await self.fetch(
"crcat_approve_pending",
{
"wallet_id": wallet_id,
"min_amount_to_claim": min_amount_to_claim,
"fee": fee,
"min_coin_amount": min_coin_amount,
"max_coin_amount": max_coin_amount,
"exclude_coin_amounts": exclude_coin_amounts,
"reuse_puzhash": reuse_puzhash,
},
)
return [TransactionRecord.from_json_dict_convenience(tx) for tx in response["transactions"]]

View File

@ -7,6 +7,7 @@ from chia.types.blockchain_format.program import Program
from chia.types.blockchain_format.sized_bytes import bytes32
from chia.util.streamable import Streamable, streamable
from chia.wallet.lineage_proof import LineageProof
from chia.wallet.vc_wallet.cr_cat_drivers import ProofsChecker
@streamable
@ -24,3 +25,12 @@ class LegacyCATInfo(Streamable):
limitations_program_hash: bytes32
my_tail: Optional[Program] # this is the program
lineage_proofs: List[Tuple[bytes32, Optional[LineageProof]]] # {coin.name(): lineage_proof}
@streamable
@dataclass(frozen=True)
class CRCATInfo(Streamable):
limitations_program_hash: bytes32
my_tail: Optional[Program] # this is the program
authorized_providers: List[bytes32]
proofs_checker: ProofsChecker

View File

@ -238,7 +238,19 @@ class CATWallet:
wallet: Wallet,
puzzle_driver: PuzzleInfo,
name: Optional[str] = None,
) -> CATWallet:
# We're hinting this as Any for mypy by should explore adding this to the wallet protocol and hinting properly
potential_subclasses: Dict[AssetType, Any] = {},
) -> Any:
next_layer: Optional[PuzzleInfo] = puzzle_driver.also()
if next_layer is not None:
if AssetType(next_layer.type()) in potential_subclasses:
return await potential_subclasses[AssetType(next_layer.type())].create_from_puzzle_info(
wallet_state_manager,
wallet,
puzzle_driver,
name,
potential_subclasses,
)
return await cls.get_or_create_wallet_for_cat(
wallet_state_manager,
wallet,
@ -565,7 +577,7 @@ class CATWallet:
self,
fee: uint64,
amount_to_claim: uint64,
announcement_to_assert: Optional[Announcement] = None,
announcements_to_assert: Optional[Set[Announcement]] = None,
min_coin_amount: Optional[uint64] = None,
max_coin_amount: Optional[uint64] = None,
excluded_coin_amounts: Optional[List[uint64]] = None,
@ -600,7 +612,25 @@ class CATWallet:
coins=chia_coins,
origin_id=origin_id, # We specify this so that we know the coin that is making the announcement
negative_change_allowed=False,
coin_announcements_to_consume={announcement_to_assert} if announcement_to_assert is not None else None,
coin_announcements_to_consume=announcements_to_assert if announcements_to_assert is not None else None,
reuse_puzhash=reuse_puzhash,
)
assert chia_tx.spend_bundle is not None
else:
chia_coins = await self.standard_wallet.select_coins(
fee,
min_coin_amount=min_coin_amount,
max_coin_amount=max_coin_amount,
excluded_coin_amounts=excluded_coin_amounts,
)
origin_id = list(chia_coins)[0].name()
selected_amount = sum([c.amount for c in chia_coins])
chia_tx = await self.standard_wallet.generate_signed_transaction(
uint64(selected_amount + amount_to_claim - fee),
(await self.standard_wallet.get_puzzle_hash(not reuse_puzhash)),
coins=chia_coins,
negative_change_allowed=True,
coin_announcements_to_consume=announcements_to_assert if announcements_to_assert is not None else None,
reuse_puzhash=reuse_puzhash,
)
assert chia_tx.spend_bundle is not None
@ -615,23 +645,6 @@ class CATWallet:
assert message is not None
announcement = Announcement(origin_id, message)
else:
chia_coins = await self.standard_wallet.select_coins(
fee,
min_coin_amount=min_coin_amount,
max_coin_amount=max_coin_amount,
excluded_coin_amounts=excluded_coin_amounts,
)
selected_amount = sum([c.amount for c in chia_coins])
chia_tx = await self.standard_wallet.generate_signed_transaction(
uint64(selected_amount + amount_to_claim - fee),
(await self.standard_wallet.get_puzzle_hash(not reuse_puzhash)),
coins=chia_coins,
negative_change_allowed=True,
coin_announcements_to_consume={announcement_to_assert} if announcement_to_assert is not None else None,
reuse_puzhash=reuse_puzhash,
)
assert chia_tx.spend_bundle is not None
return chia_tx, announcement
@ -733,7 +746,7 @@ class CATWallet:
chia_tx, _ = await self.create_tandem_xch_tx(
fee,
uint64(regular_chia_to_claim),
announcement_to_assert=announcement,
announcements_to_assert={announcement},
min_coin_amount=min_coin_amount,
max_coin_amount=max_coin_amount,
excluded_coin_amounts=excluded_coin_amounts,

View File

@ -13,6 +13,7 @@ from chia.wallet.nft_wallet.singleton_outer_puzzle import SingletonOuterPuzzle
from chia.wallet.nft_wallet.transfer_program_puzzle import TransferProgramPuzzle
from chia.wallet.puzzle_drivers import PuzzleInfo, Solver
from chia.wallet.uncurried_puzzle import UncurriedPuzzle
from chia.wallet.vc_wallet.cr_outer_puzzle import CROuterPuzzle
"""
This file provides a central location for acquiring drivers for outer puzzles like CATs, NFTs, etc.
@ -40,6 +41,7 @@ class AssetType(Enum):
METADATA = "metadata"
OWNERSHIP = "ownership"
ROYALTY_TRANSFER_PROGRAM = "royalty transfer program"
CR = "credential restricted"
def match_puzzle(puzzle: UncurriedPuzzle) -> Optional[PuzzleInfo]:
@ -78,4 +80,5 @@ driver_lookup: Dict[AssetType, DriverProtocol] = {
AssetType.METADATA: MetadataOuterPuzzle(*function_args),
AssetType.OWNERSHIP: OwnershipOuterPuzzle(*function_args),
AssetType.ROYALTY_TRANSFER_PROGRAM: TransferProgramPuzzle(*function_args),
AssetType.CR: CROuterPuzzle(*function_args),
}

View File

@ -4,6 +4,7 @@
"calculate_synthetic_public_key": "624c5d5704d0decadfc0503e71bbffb6cdfe45025bce7cf3e6864d1eafe8f65e",
"cat_v2": "37bef360ee858133b69d595a906dc45d01af50379dad515eb9518abb7c1d2a7a",
"chialisp_deserialisation": "94ec19077f9a34e0b11ad2456af0170f4cc03f11230ca42e3f82e6e644ac4f5d",
"conditions_w_fee_announce": "1a169582dc619f2542f8eb79f02823e1595ba0aca53820f503eda5ff20b47856",
"covenant_layer": "b982796850336aabf9ab17c3f21e299f0c633444117ab5e9ebeafadf1860d9fc",
"create_nft_launcher_from_did": "7a32d2d9571d3436791c0ad3d7fcfdb9c43ace2b0f0ff13f98d29f0cc093f445",
"credential_restriction": "2fdfc1f058cfd65e7ec4e253bfeb394da163ecd0036f508df8629b0a2b8fde96",

View File

@ -28,9 +28,13 @@ from chia.wallet.trading.offer import OFFER_MOD_OLD_HASH, NotarizedPayment, Offe
from chia.wallet.trading.trade_status import TradeStatus
from chia.wallet.trading.trade_store import TradeStore
from chia.wallet.transaction_record import TransactionRecord
from chia.wallet.uncurried_puzzle import uncurry_puzzle
from chia.wallet.util.compute_hints import compute_spend_hints_and_additions
from chia.wallet.util.query_filter import HashFilter
from chia.wallet.util.transaction_type import TransactionType
from chia.wallet.util.wallet_types import WalletType
from chia.wallet.vc_wallet.cr_cat_drivers import ProofsChecker, construct_pending_approval_state
from chia.wallet.vc_wallet.vc_wallet import VCWallet
from chia.wallet.wallet import Wallet
from chia.wallet.wallet_coin_record import WalletCoinRecord
@ -413,6 +417,7 @@ class TradeManager:
min_coin_amount: Optional[uint64] = None,
max_coin_amount: Optional[uint64] = None,
reuse_puzhash: Optional[bool] = None,
taking: bool = False,
) -> Union[Tuple[Literal[True], TradeRecord, None], Tuple[Literal[False], None, str]]:
if driver_dict is None:
driver_dict = {}
@ -426,6 +431,7 @@ class TradeManager:
min_coin_amount=min_coin_amount,
max_coin_amount=max_coin_amount,
reuse_puzhash=reuse_puzhash,
taking=taking,
)
if not result[0] or result[1] is None:
raise Exception(f"Error creating offer: {result[2]}")
@ -462,6 +468,7 @@ class TradeManager:
max_coin_amount: Optional[uint64] = None,
old: bool = False,
reuse_puzhash: Optional[bool] = None,
taking: bool = False,
) -> Union[Tuple[Literal[True], Offer, None], Tuple[Literal[False], None, str]]:
"""
Offer is dictionary of wallet ids and amount
@ -553,6 +560,10 @@ class TradeManager:
else:
raise ValueError(f"Wallet for asset id {asset_id} is not properly integrated with TradeManager")
requested_payments = await self.check_for_requested_payment_modifications(
requested_payments, driver_dict, taking
)
potential_special_offer: Optional[Offer] = await self.check_for_special_offer_making(
offer_dict_no_ints, driver_dict, solver, fee, min_coin_amount, max_coin_amount, old
)
@ -610,6 +621,7 @@ class TradeManager:
coins=set(selected_coins),
puzzle_announcements_to_consume=announcements_to_assert,
reuse_puzhash=reuse_puzhash,
add_authorizations_to_cr_cats=False,
)
all_transactions.extend(txs)
@ -656,16 +668,22 @@ class TradeManager:
settlement_coins: List[Coin] = [c for coins in offer.get_offered_coins().values() for c in coins]
settlement_coin_ids: List[bytes32] = [c.name() for c in settlement_coins]
additions: List[Coin] = final_spend_bundle.not_ephemeral_additions()
hint_dict: Dict[bytes32, bytes32] = {}
additions_dict: Dict[bytes32, Coin] = {}
for hinted_coins in (compute_spend_hints_and_additions(spend) for spend in final_spend_bundle.coin_spends):
hint_dict.update({id: hc.hint for id, hc in hinted_coins.items() if hc.hint is not None})
additions_dict.update({id: hc.coin for id, hc in hinted_coins.items()})
removals: List[Coin] = final_spend_bundle.removals()
additions: List[Coin] = list(a for a in additions_dict.values() if a not in removals)
all_fees = uint64(final_spend_bundle.fees())
txs = []
addition_dict: Dict[uint32, List[Coin]] = {}
for addition in additions:
wallet_identifier = await self.wallet_state_manager.get_wallet_identifier_for_puzzle_hash(
addition.puzzle_hash
wallet_identifier = await self.wallet_state_manager.get_wallet_identifier_for_coin(
addition,
hint_dict,
)
if wallet_identifier is not None:
if addition.parent_coin_info in settlement_coin_ids:
@ -688,7 +706,7 @@ class TradeManager:
trade_id=offer.name(),
type=uint32(TransactionType.INCOMING_TRADE.value),
name=std_hash(final_spend_bundle.name() + addition.name()),
memos=[],
memos=[(coin_id, [hint]) for coin_id, hint in hint_dict.items()],
)
)
else: # This is change
@ -698,8 +716,9 @@ class TradeManager:
# While we want additions to show up as separate records, removals of the same wallet should show as one
removal_dict: Dict[uint32, List[Coin]] = {}
for removal in removals:
wallet_identifier = await self.wallet_state_manager.get_wallet_identifier_for_puzzle_hash(
removal.puzzle_hash
wallet_identifier = await self.wallet_state_manager.get_wallet_identifier_for_coin(
removal,
hint_dict,
)
if wallet_identifier is not None:
removal_dict.setdefault(wallet_identifier.id, [])
@ -734,7 +753,7 @@ class TradeManager:
trade_id=offer.name(),
type=uint32(TransactionType.OUTGOING_TRADE.value),
name=std_hash(final_spend_bundle.name() + removal_tree_hash),
memos=[],
memos=[(coin_id, [hint]) for coin_id, hint in hint_dict.items()],
)
)
@ -783,16 +802,21 @@ class TradeManager:
max_coin_amount=max_coin_amount,
old=offer.old,
reuse_puzhash=reuse_puzhash,
taking=True,
)
if not result[0] or result[1] is None:
raise ValueError(result[2])
success, take_offer, error = result
complete_offer = await self.check_for_final_modifications(Offer.aggregate([offer, take_offer]), solver)
complete_offer, valid_spend_solver = await self.check_for_final_modifications(
Offer.aggregate([offer, take_offer]), solver, reuse_puzhash
)
self.log.info("COMPLETE OFFER: %s", complete_offer.to_bech32())
assert complete_offer.is_valid()
final_spend_bundle: SpendBundle = complete_offer.to_valid_spend()
final_spend_bundle: SpendBundle = complete_offer.to_valid_spend(
solver=Solver({**valid_spend_solver.info, **solver.info})
)
await self.maybe_create_wallets_for_offer(complete_offer)
tx_records: List[TransactionRecord] = await self.calculate_tx_records_for_offer(complete_offer, True)
@ -908,7 +932,9 @@ class TradeManager:
offered, requested, infos = offer.summary()
return {"offered": offered, "requested": requested, "fees": offer.fees(), "infos": infos}
async def check_for_final_modifications(self, offer: Offer, solver: Solver) -> Offer:
async def check_for_final_modifications(
self, offer: Offer, solver: Solver, reuse_puzhash: Optional[bool] = None
) -> Tuple[Offer, Solver]:
for puzzle_info in offer.driver_dict.values():
if (
puzzle_info.check_type(
@ -919,6 +945,66 @@ class TradeManager:
)
and puzzle_info.also()["updater_hash"] == ACS_MU_PH # type: ignore
):
return await DataLayerWallet.finish_graftroot_solutions(offer, solver)
return (await DataLayerWallet.finish_graftroot_solutions(offer, solver), Solver({}))
elif puzzle_info.check_type(
[
AssetType.CAT.value,
AssetType.CR.value,
]
):
# get VC wallet
for _, wallet in self.wallet_state_manager.wallets.items():
if WalletType(wallet.type()) == WalletType.VC:
assert isinstance(wallet, VCWallet)
return await wallet.add_vc_authorization(offer, solver, reuse_puzhash)
else:
raise ValueError("No VCs to approve CR-CATs with") # pragma: no cover
return offer
return offer, Solver({})
async def check_for_requested_payment_modifications(
self,
requested_payments: Dict[Optional[bytes32], List[Payment]],
driver_dict: Dict[bytes32, PuzzleInfo],
taking: bool,
) -> Dict[Optional[bytes32], List[Payment]]:
# This function exclusively deals with CR-CATs for now
if not taking:
for asset_id, puzzle_info in driver_dict.items():
if puzzle_info.check_type(
[
AssetType.CAT.value,
AssetType.CR.value,
]
):
vc = await (
await self.wallet_state_manager.get_or_create_vc_wallet()
).get_vc_with_provider_in_and_proofs(
puzzle_info["also"]["authorized_providers"],
ProofsChecker.from_program(uncurry_puzzle(puzzle_info["also"]["proofs_checker"])).flags,
)
if vc is None:
raise ValueError("Cannot request CR-CATs that you cannot approve with a VC") # pragma: no cover
return {
asset_id: [
dataclasses.replace(
payment,
puzzle_hash=construct_pending_approval_state(
payment.puzzle_hash, payment.amount
).get_tree_hash(),
)
for payment in payments
]
if asset_id is not None
and driver_dict[asset_id].check_type(
[
AssetType.CAT.value,
AssetType.CR.value,
]
)
else payments
for asset_id, payments in requested_payments.items()
}
else:
return requested_payments

View File

@ -443,7 +443,7 @@ class Offer:
# A "valid" spend means that this bundle can be pushed to the network and will succeed
# This differs from the `to_spend_bundle` method which deliberately creates an invalid SpendBundle
def to_valid_spend(self, arbitrage_ph: Optional[bytes32] = None) -> SpendBundle:
def to_valid_spend(self, arbitrage_ph: Optional[bytes32] = None, solver: Solver = Solver({})) -> SpendBundle:
if not self.is_valid():
raise ValueError("Offer is currently incomplete")
@ -521,6 +521,7 @@ class Offer:
"sibling_spends": sibling_spends,
"sibling_puzzles": sibling_puzzles,
"sibling_solutions": sibling_solutions,
**solver.info,
}
),
offer_mod,

View File

@ -124,3 +124,6 @@ class TransactionRecord(Streamable):
# we tried to push it to mempool and got a fee error so it's a temporary error
return True
return False
def hint_dict(self) -> Dict[bytes32, bytes32]:
return {coin_id: bytes32(memos[0]) for coin_id, memos in self.memos if len(memos) > 0 and len(memos[0]) == 32}

View File

@ -13,9 +13,11 @@ class TransactionType(IntEnum):
INCOMING_CLAWBACK_RECEIVE = 6
INCOMING_CLAWBACK_SEND = 7
OUTGOING_CLAWBACK = 8
INCOMING_CRCAT_PENDING = 9
CLAWBACK_TRANSACTION_TYPES = {
TransactionType.INCOMING_CLAWBACK_SEND.value,
TransactionType.INCOMING_CLAWBACK_RECEIVE.value,
TransactionType.INCOMING_CRCAT_PENDING.value,
}

View File

@ -26,11 +26,14 @@ class WalletType(IntEnum):
DATA_LAYER = 11
DATA_LAYER_OFFER = 12
VC = 13
CRCAT = 57
class CoinType(IntEnum):
NORMAL = 0
CLAWBACK = 1
CRCAT_PENDING = 2
CRCAT = 3
class RemarkDataType(IntEnum):

View File

@ -2,16 +2,19 @@ from __future__ import annotations
import functools
from dataclasses import dataclass, replace
from enum import IntEnum
from typing import Iterable, List, Optional, Tuple, Type, TypeVar
from clvm.casts import int_to_bytes
from chia.types.announcement import Announcement
from chia.types.blockchain_format.coin import Coin, coin_as_list
from chia.types.blockchain_format.program import Program
from chia.types.blockchain_format.sized_bytes import bytes32
from chia.types.coin_spend import CoinSpend
from chia.util.hash import std_hash
from chia.util.ints import uint64
from chia.util.ints import uint16, uint64
from chia.util.streamable import Streamable, streamable
from chia.wallet.cat_wallet.cat_utils import CAT_MOD, construct_cat_puzzle
from chia.wallet.lineage_proof import LineageProof
from chia.wallet.payment import Payment
@ -29,6 +32,7 @@ from chia.wallet.vc_wallet.vc_drivers import (
)
# Mods
CREDENTIAL_RESTRICTION: Program = load_clvm_maybe_recompile(
"credential_restriction.clsp",
package_or_requirement="chia.wallet.vc_wallet.cr_puzzles",
@ -40,6 +44,11 @@ PROOF_FLAGS_CHECKER: Program = load_clvm_maybe_recompile(
package_or_requirement="chia.wallet.vc_wallet.cr_puzzles",
include_standard_libraries=True,
)
PENDING_VC_ANNOUNCEMENT: Program = load_clvm_maybe_recompile(
"conditions_w_fee_announce.clsp",
package_or_requirement="chia.wallet.vc_wallet.cr_puzzles",
include_standard_libraries=True,
)
# Basic drivers
@ -98,12 +107,11 @@ def construct_cr_layer(
return first_curry.curry(first_curry.get_tree_hash(), inner_puzzle)
# Coverage coming with CR-CAT Wallet
def match_cr_layer(
uncurried_puzzle: UncurriedPuzzle,
) -> Optional[Tuple[List[bytes32], Program, Program]]: # pragma: no cover
if uncurried_puzzle.mod == CREDENTIAL_RESTRICTION:
extra_uncurried_puzzle = uncurry_puzzle(uncurried_puzzle.mod)
) -> Optional[Tuple[List[bytes32], Program, Program]]:
extra_uncurried_puzzle = uncurry_puzzle(uncurried_puzzle.mod)
if extra_uncurried_puzzle.mod == CREDENTIAL_RESTRICTION:
return (
[bytes32(provider.atom) for provider in extra_uncurried_puzzle.args.at("rf").as_iter()],
extra_uncurried_puzzle.args.at("rrf"),
@ -136,6 +144,11 @@ def solve_cr_layer(
return solution
# For the "pending approval" state
def construct_pending_approval_state(puzzle_hash: bytes32, amount: uint64) -> Program:
return PENDING_VC_ANNOUNCEMENT.curry(Program.to([[51, puzzle_hash, amount, [puzzle_hash]]]))
_T_CRCAT = TypeVar("_T_CRCAT", bound="CRCAT")
@ -265,14 +278,12 @@ class CRCAT:
"""
if puzzle_reveal.mod != CAT_MOD:
return False, "top most layer is not a CAT" # pragma: no cover
layer_below_cat: UncurriedPuzzle = uncurry_puzzle(puzzle_reveal.args.at("rrf"))
layer_below_cat: UncurriedPuzzle = uncurry_puzzle(uncurry_puzzle(puzzle_reveal.args.at("rrf")).mod)
if layer_below_cat.mod != CREDENTIAL_RESTRICTION:
return False, "CAT is not credential restricted" # pragma: no cover
# Coverage coming with CR-CAT Wallet
return True, "" # pragma: no cover
return True, ""
# Coverage coming with CR-CAT Wallet
@staticmethod
def get_inner_puzzle(puzzle_reveal: UncurriedPuzzle) -> Program: # pragma: no cover
return uncurry_puzzle(puzzle_reveal.args.at("rrf")).args.at("rf")
@ -292,7 +303,7 @@ class CRCAT:
spend.solution.to_program().at("rf"),
[bytes32(ap.atom) for ap in second_uncurried_cr_layer.args.at("rf").as_iter()],
second_uncurried_cr_layer.args.at("rrf"),
first_uncurried_cr_layer.args.at("f").get_tree_hash(),
first_uncurried_cr_layer.args.at("rf").get_tree_hash(),
)
@classmethod
@ -355,6 +366,10 @@ class CRCAT:
uint64(parent_spend.coin.amount),
)
all_conditions: List[Program] = list(conditions.as_iter())
if len(all_conditions) > 1000:
raise RuntimeError("More than 1000 conditions not currently supported by CRCAT drivers") # pragma: no cover
# Almost complete except the coin's full puzzle hash which we want to use the class method to calculate
partially_completed_crcats: List[CRCAT] = [
CRCAT(
@ -365,7 +380,7 @@ class CRCAT:
proofs_checker,
bytes32(condition.at("rf").atom) if new_inner_puzzle_hash is None else new_inner_puzzle_hash,
)
for condition in conditions.as_iter()
for condition in all_conditions
if condition.at("f").as_int() == 51 and condition.at("rrf") != Program.to(-113)
]
@ -395,13 +410,13 @@ class CRCAT:
proof_checker_solution: Program,
provider_id: bytes32,
vc_launcher_id: bytes32,
vc_inner_puzhash: bytes32,
vc_inner_puzhash: Optional[bytes32], # Optional for incomplete spends
# Inner puzzle and solution
inner_puzzle: Program,
inner_solution: Program,
# For optimization purposes the conditions may already have been run
conditions: Optional[Iterable[Program]] = None,
) -> Tuple[List[bytes32], CoinSpend, List["CRCAT"]]:
) -> Tuple[List[Announcement], CoinSpend, List["CRCAT"]]:
"""
Spend a CR-CAT.
@ -412,7 +427,7 @@ class CRCAT:
Likely, spend_many is more useful.
"""
# Gather the output information
announcement_ids: List[bytes32] = []
announcements: List[Announcement] = []
new_inner_puzzle_hashes_and_amounts: List[Tuple[bytes32, uint64]] = []
if conditions is None:
conditions = inner_puzzle.run(inner_solution).as_iter() # pragma: no cover
@ -421,13 +436,13 @@ class CRCAT:
if condition.at("f").as_int() == 51 and condition.at("rrf").as_int() != -113:
new_inner_puzzle_hash: bytes32 = bytes32(condition.at("rf").atom)
new_amount: uint64 = uint64(condition.at("rrf").as_int())
announcement_ids.append(
std_hash(self.coin.name() + b"\xcd" + std_hash(new_inner_puzzle_hash + int_to_bytes(new_amount)))
announcements.append(
Announcement(self.coin.name(), b"\xcd" + std_hash(new_inner_puzzle_hash + int_to_bytes(new_amount)))
)
new_inner_puzzle_hashes_and_amounts.append((new_inner_puzzle_hash, new_amount))
return (
announcement_ids,
announcements,
CoinSpend(
self.coin,
self.construct_puzzle(inner_puzzle),
@ -438,7 +453,7 @@ class CRCAT:
proof_checker_solution,
provider_id,
vc_launcher_id,
vc_inner_puzhash,
vc_inner_puzhash, # type: ignore
self.coin.name(),
inner_solution,
),
@ -479,14 +494,14 @@ class CRCAT:
@classmethod
def spend_many(
cls: Type[_T_CRCAT],
inner_spends: List[Tuple[_T_CRCAT, Program, Program]], # CRCAT, inner puzzle, inner solution
inner_spends: List[Tuple[_T_CRCAT, int, Program, Program]], # CRCAT, extra_delta, inner puzzle, inner solution
# CR layer solving info
proof_of_inclusions: Program,
proof_checker_solution: Program,
provider_id: bytes32,
vc_launcher_id: bytes32,
vc_inner_puzhash: bytes32,
) -> Tuple[List[bytes32], List[CoinSpend], List[CRCAT]]:
vc_inner_puzhash: Optional[bytes32], # Optional for incomplete spends
) -> Tuple[List[Announcement], List[CoinSpend], List[CRCAT]]:
"""
Spend a multiple CR-CATs.
@ -501,28 +516,29 @@ class CRCAT:
def prev_index(index: int) -> int:
return index - 1
sorted_inner_spends: List[Tuple[_T_CRCAT, Program, Program]] = sorted(
sorted_inner_spends: List[Tuple[_T_CRCAT, int, Program, Program]] = sorted(
inner_spends,
key=lambda spend: spend[0].coin.name(),
)
all_expected_announcements: List[bytes32] = []
all_expected_announcements: List[Announcement] = []
all_coin_spends: List[CoinSpend] = []
all_new_crcats: List[CRCAT] = []
subtotal: int = 0
for i, inner_spend in enumerate(sorted_inner_spends):
crcat, inner_puzzle, inner_solution = inner_spend
crcat, extra_delta, inner_puzzle, inner_solution = inner_spend
conditions: List[Program] = list(inner_puzzle.run(inner_solution).as_iter())
output_amount: uint64 = uint64(
output_amount: int = (
sum(
c.at("rrf").as_int()
for c in conditions
if c.at("f").as_int() == 51 and c.at("rrf").as_int() != -113
)
- extra_delta
)
next_crcat, _, _ = sorted_inner_spends[next_index(i)]
prev_crcat, _, _ = sorted_inner_spends[prev_index(i)]
next_crcat, _, _, _ = sorted_inner_spends[next_index(i)]
prev_crcat, _, _, _ = sorted_inner_spends[prev_index(i)]
expected_announcements, coin_spend, new_crcats = crcat.do_spend(
prev_crcat.coin.name(),
LineageProof(
@ -533,7 +549,7 @@ class CRCAT:
uint64(next_crcat.coin.amount),
),
subtotal,
0, # TODO: add support for mint/melt
extra_delta,
proof_of_inclusions,
proof_checker_solution,
provider_id,
@ -564,10 +580,10 @@ class CRCATSpend:
inner_puzzle: Program
inner_solution: Program
children: List[CRCAT]
provider_specified: bool
incomplete: bool
inner_conditions: List[Program]
proof_of_inclusions: Program
# Coverage coming with CR-CAT wallet
@classmethod
def from_coin_spend(cls, spend: CoinSpend) -> CRCATSpend: # pragma: no cover
inner_puzzle: Program = CRCAT.get_inner_puzzle(uncurry_puzzle(spend.puzzle_reveal.to_program()))
@ -580,11 +596,13 @@ class CRCATSpend:
CRCAT.get_next_from_coin_spend(spend, conditions=inner_conditions),
spend.solution.to_program().at("f").at("rrrrf") == Program.to(None),
list(inner_conditions.as_iter()),
spend.solution.to_program().at("f").at("f"),
)
@streamable
@dataclass(frozen=True)
class ProofsChecker:
class ProofsChecker(Streamable):
flags: List[str]
def as_program(self) -> Program:
@ -593,10 +611,28 @@ class ProofsChecker:
return PROOF_FLAGS_CHECKER.curry(
[
Program.to((flag, 1))
Program.to((flag, "1"))
for flag in sorted(
self.flags,
key=functools.cmp_to_key(byte_sort_flags),
)
]
)
@classmethod
def from_program(cls, uncurried_puzzle: UncurriedPuzzle) -> ProofsChecker:
if uncurried_puzzle.mod != PROOF_FLAGS_CHECKER:
raise ValueError("Puzzle was not a proof checker") # pragma: no cover
return cls([flag.at("f").atom.decode("utf8") for flag in uncurried_puzzle.args.at("f").as_iter()])
class CRCATVersion(IntEnum):
V1 = uint16(1)
@streamable
@dataclass(frozen=True)
class CRCATMetadata(Streamable):
lineage_proof: LineageProof
inner_puzzle_hash: bytes32

View File

@ -0,0 +1,927 @@
from __future__ import annotations
import dataclasses
import logging
import time
import traceback
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Set, Tuple
from blspy import G1Element, G2Element
from typing_extensions import Unpack
from chia.server.ws_connection import WSChiaConnection
from chia.types.announcement import Announcement
from chia.types.blockchain_format.coin import Coin, coin_as_list
from chia.types.blockchain_format.program import Program
from chia.types.blockchain_format.sized_bytes import bytes32
from chia.types.coin_spend import CoinSpend
from chia.types.spend_bundle import SpendBundle
from chia.util.byte_types import hexstr_to_bytes
from chia.util.hash import std_hash
from chia.util.ints import uint8, uint32, uint64, uint128
from chia.util.misc import VersionedBlob
from chia.wallet.cat_wallet.cat_info import CRCATInfo
from chia.wallet.cat_wallet.cat_utils import CAT_MOD, construct_cat_puzzle
from chia.wallet.cat_wallet.cat_wallet import CATWallet
from chia.wallet.coin_selection import select_coins
from chia.wallet.lineage_proof import LineageProof
from chia.wallet.outer_puzzles import AssetType
from chia.wallet.payment import Payment
from chia.wallet.puzzle_drivers import PuzzleInfo
from chia.wallet.trading.offer import Offer
from chia.wallet.transaction_record import TransactionRecord
from chia.wallet.uncurried_puzzle import uncurry_puzzle
from chia.wallet.util.compute_hints import compute_spend_hints_and_additions
from chia.wallet.util.compute_memos import compute_memos
from chia.wallet.util.query_filter import HashFilter
from chia.wallet.util.transaction_type import TransactionType
from chia.wallet.util.wallet_sync_utils import fetch_coin_spend_for_coin_state
from chia.wallet.util.wallet_types import CoinType, WalletType
from chia.wallet.vc_wallet.cr_cat_drivers import (
CRCAT,
CRCATMetadata,
CRCATVersion,
ProofsChecker,
construct_cr_layer,
construct_pending_approval_state,
)
from chia.wallet.vc_wallet.vc_drivers import VerifiedCredential
from chia.wallet.vc_wallet.vc_wallet import VCWallet
from chia.wallet.wallet import Wallet
from chia.wallet.wallet_coin_record import MetadataTypes, WalletCoinRecord
from chia.wallet.wallet_info import WalletInfo
from chia.wallet.wallet_protocol import GSTOptionalArgs, WalletProtocol
if TYPE_CHECKING:
from chia.wallet.wallet_state_manager import WalletStateManager
class CRCATWallet(CATWallet):
wallet_state_manager: WalletStateManager
log: logging.Logger
wallet_info: WalletInfo
info: CRCATInfo
standard_wallet: Wallet
cost_of_single_tx: int
@staticmethod
def default_wallet_name_for_unknown_cat(limitations_program_hash_hex: str) -> str:
return f"CAT {limitations_program_hash_hex[:16]}..."
@staticmethod
async def create_new_cat_wallet(
wallet_state_manager: WalletStateManager,
wallet: Wallet,
cat_tail_info: Dict[str, Any],
amount: uint64,
name: Optional[str] = None,
) -> "CATWallet": # pragma: no cover
raise NotImplementedError("create_new_cat_wallet is a legacy method and is not available on CR-CAT wallets")
@staticmethod
async def get_or_create_wallet_for_cat(
wallet_state_manager: WalletStateManager,
wallet: Wallet,
limitations_program_hash_hex: str,
name: Optional[str] = None,
authorized_providers: Optional[List[bytes32]] = None,
proofs_checker: Optional[ProofsChecker] = None,
) -> CRCATWallet:
if authorized_providers is None or proofs_checker is None: # pragma: no cover
raise ValueError("get_or_create_wallet_for_cat was call on CRCATWallet without proper arguments")
self = CRCATWallet()
self.cost_of_single_tx = 78000000 # Measured in testing
self.standard_wallet = wallet
if name is None:
name = self.default_wallet_name_for_unknown_cat(limitations_program_hash_hex)
self.log = logging.getLogger(name)
tail_hash = bytes32.from_hexstr(limitations_program_hash_hex)
for id, w in wallet_state_manager.wallets.items():
if w.type() == CRCATWallet.type():
assert isinstance(w, CRCATWallet)
if w.get_asset_id() == limitations_program_hash_hex:
self.log.warning("Not creating wallet for already existing CR-CAT wallet")
return w
self.wallet_state_manager = wallet_state_manager
self.info = CRCATInfo(tail_hash, None, authorized_providers, proofs_checker)
info_as_string = bytes(self.info).hex()
self.wallet_info = await wallet_state_manager.user_store.create_wallet(name, WalletType.CRCAT, info_as_string)
await self.wallet_state_manager.add_new_wallet(self)
return self
@classmethod
async def create_from_puzzle_info(
cls,
wallet_state_manager: WalletStateManager,
wallet: Wallet,
puzzle_driver: PuzzleInfo,
name: Optional[str] = None,
# We're hinting this as Any for mypy by should explore adding this to the wallet protocol and hinting properly
potential_subclasses: Dict[AssetType, Any] = {},
) -> Any:
cr_layer: Optional[PuzzleInfo] = puzzle_driver.also()
if cr_layer is None: # pragma: no cover
raise ValueError("create_from_puzzle_info called on CRCATWallet with a non CR-CAT puzzle driver")
return await cls.get_or_create_wallet_for_cat(
wallet_state_manager,
wallet,
puzzle_driver["tail"].hex(),
name,
[bytes32(provider) for provider in cr_layer["authorized_providers"]],
ProofsChecker.from_program(uncurry_puzzle(cr_layer["proofs_checker"])),
)
@staticmethod
async def create(
wallet_state_manager: WalletStateManager,
wallet: Wallet,
wallet_info: WalletInfo,
) -> CRCATWallet:
self = CRCATWallet()
self.log = logging.getLogger(__name__)
self.cost_of_single_tx = 78000000
self.wallet_state_manager = wallet_state_manager
self.wallet_info = wallet_info
self.standard_wallet = wallet
self.info = CRCATInfo.from_bytes(hexstr_to_bytes(self.wallet_info.data))
return self
@classmethod
async def convert_to_cr(
cls,
cat_wallet: CATWallet,
authorized_providers: List[bytes32],
proofs_checker: ProofsChecker,
) -> None:
replace_self = cls()
replace_self.cost_of_single_tx = 78000000 # Measured in testing
replace_self.standard_wallet = cat_wallet.standard_wallet
replace_self.log = logging.getLogger(cat_wallet.get_name())
replace_self.wallet_state_manager = cat_wallet.wallet_state_manager
replace_self.info = CRCATInfo(
cat_wallet.cat_info.limitations_program_hash, None, authorized_providers, proofs_checker
)
replace_self.wallet_info = await cat_wallet.wallet_state_manager.user_store.update_wallet(
WalletInfo(
cat_wallet.id(), cat_wallet.get_name(), uint8(WalletType.CRCAT.value), bytes(replace_self.info).hex()
)
)
cat_wallet.wallet_state_manager.wallets[cat_wallet.id()] = replace_self
@classmethod
def type(cls) -> WalletType:
return WalletType.CRCAT
def id(self) -> uint32:
return self.wallet_info.id
def get_asset_id(self) -> str:
return self.info.limitations_program_hash.hex()
async def set_tail_program(self, tail_program: str) -> None: # pragma: no cover
raise NotImplementedError("set_tail_program is a legacy method and is not available on CR-CAT wallets")
async def coin_added(self, coin: Coin, height: uint32, peer: WSChiaConnection) -> None:
"""Notification from wallet state manager that wallet has been received."""
self.log.info(f"CR-CAT wallet has been notified that {coin.name().hex()} was added")
try:
coin_state = await self.wallet_state_manager.wallet_node.get_coin_state([coin.parent_coin_info], peer=peer)
coin_spend = await fetch_coin_spend_for_coin_state(coin_state[0], peer)
await self.add_crcat_coin(coin_spend, coin, height)
except Exception as e:
self.log.debug(f"Exception: {e}, traceback: {traceback.format_exc()}")
async def add_crcat_coin(self, coin_spend: CoinSpend, coin: Coin, height: uint32) -> None:
try:
new_cr_cats: List[CRCAT] = CRCAT.get_next_from_coin_spend(coin_spend)
hint_dict = {
id: hc.hint for id, hc in compute_spend_hints_and_additions(coin_spend).items() if hc.hint is not None
}
cr_cat: CRCAT = list(filter(lambda c: c.coin.name() == coin.name(), new_cr_cats))[0]
if (
await self.wallet_state_manager.puzzle_store.get_derivation_record_for_puzzle_hash(
cr_cat.inner_puzzle_hash
)
is not None
):
self.log.info(f"Found CRCAT coin {coin.name().hex()}")
is_pending = False
elif (
cr_cat.inner_puzzle_hash
== construct_pending_approval_state(
hint_dict[coin.name()],
uint64(coin.amount),
).get_tree_hash()
):
self.log.info(f"Found pending approval CRCAT coin {coin.name().hex()}")
is_pending = True
created_timestamp = await self.wallet_state_manager.wallet_node.get_timestamp_for_height(uint32(height))
spend_bundle = SpendBundle([coin_spend], G2Element())
memos = compute_memos(spend_bundle)
# This will override the tx created in the wallet state manager
tx_record = TransactionRecord(
confirmed_at_height=height,
created_at_time=uint64(created_timestamp),
to_puzzle_hash=hint_dict[coin.name()],
amount=uint64(coin.amount),
fee_amount=uint64(0),
confirmed=True,
sent=uint32(0),
spend_bundle=None,
additions=[coin],
removals=[coin_spend.coin],
wallet_id=self.id(),
sent_to=[],
trade_id=None,
type=uint32(TransactionType.INCOMING_CRCAT_PENDING),
name=coin.name(),
memos=list(memos.items()),
)
await self.wallet_state_manager.tx_store.add_transaction_record(tx_record)
else: # pragma: no cover
self.log.error(f"Unknown CRCAT inner puzzle, coin ID: {coin.name().hex()}")
return None
coin_record = WalletCoinRecord(
coin,
uint32(height),
uint32(0),
False,
False,
WalletType.CRCAT,
self.id(),
CoinType.CRCAT_PENDING if is_pending else CoinType.CRCAT,
VersionedBlob(
CRCATVersion.V1.value,
bytes(
CRCATMetadata(
cr_cat.lineage_proof, hint_dict[coin.name()] if is_pending else cr_cat.inner_puzzle_hash
)
),
),
)
await self.wallet_state_manager.coin_store.add_coin_record(coin_record)
except Exception:
# The parent is not a CAT which means we need to scrub all of its children from our DB
self.log.error(f"Cannot add CRCAT coin: {traceback.format_exc()}")
child_coin_records = await self.wallet_state_manager.coin_store.get_coin_records_by_parent_id(
coin_spend.coin.name()
)
if len(child_coin_records) > 0:
for record in child_coin_records:
if record.wallet_id == self.id(): # pragma: no cover
await self.wallet_state_manager.coin_store.delete_coin_record(record.coin.name())
# We also need to make sure there's no record of the transaction
await self.wallet_state_manager.tx_store.delete_transaction_record(record.coin.name())
def require_derivation_paths(self) -> bool:
return False
def puzzle_for_pk(self, pubkey: G1Element) -> Program: # pragma: no cover
raise NotImplementedError("puzzle_for_pk is a legacy method and is not available on CR-CAT wallets")
def puzzle_hash_for_pk(self, pubkey: G1Element) -> bytes32: # pragma: no cover
raise NotImplementedError("puzzle_hash_for_pk is a legacy method and is not available on CR-CAT wallets")
async def get_new_cat_puzzle_hash(self) -> bytes32: # pragma: no cover
raise NotImplementedError("get_new_cat_puzzle_hash is a legacy method and is not available on CR-CAT wallets")
async def sign(self, spend_bundle: SpendBundle) -> SpendBundle: # pragma: no cover
raise NotImplementedError("get_new_cat_puzzle_hash is a legacy method and is not available on CR-CAT wallets")
async def inner_puzzle_for_cat_puzhash(self, cat_hash: bytes32) -> Program: # pragma: no cover
raise NotImplementedError(
"inner_puzzle_for_cat_puzhash is a legacy method and is not available on CR-CAT wallets"
)
async def get_cat_spendable_coins(self, records: Optional[Set[WalletCoinRecord]] = None) -> List[WalletCoinRecord]:
result: List[WalletCoinRecord] = []
record_list: Set[WalletCoinRecord] = await self.wallet_state_manager.get_spendable_coins_for_wallet(
self.id(), records
)
for record in record_list:
crcat: CRCAT = self.coin_record_to_crcat(record)
if crcat.lineage_proof is not None and not crcat.lineage_proof.is_none():
result.append(record)
return result
async def get_confirmed_balance(self, record_list: Optional[Set[WalletCoinRecord]] = None) -> uint128:
if record_list is None:
record_list = await self.wallet_state_manager.coin_store.get_unspent_coins_for_wallet(
self.id(), CoinType.CRCAT
)
amount: uint128 = uint128(0)
for record in record_list:
crcat: CRCAT = self.coin_record_to_crcat(record)
if crcat.lineage_proof is not None and not crcat.lineage_proof.is_none():
amount = uint128(amount + record.coin.amount)
self.log.info(f"Confirmed balance for cat wallet {self.id()} is {amount}")
return uint128(amount)
async def get_pending_approval_balance(self, record_list: Optional[Set[WalletCoinRecord]] = None) -> uint128:
if record_list is None:
record_list = await self.wallet_state_manager.coin_store.get_unspent_coins_for_wallet(
self.id(), CoinType.CRCAT_PENDING
)
amount: uint128 = uint128(0)
for record in record_list:
crcat: CRCAT = self.coin_record_to_crcat(record)
if crcat.lineage_proof is not None and not crcat.lineage_proof.is_none():
amount = uint128(amount + record.coin.amount)
self.log.info(f"Pending approval balance for cat wallet {self.id()} is {amount}")
return uint128(amount)
async def convert_puzzle_hash(self, puzzle_hash: bytes32) -> bytes32:
return puzzle_hash
@staticmethod
def get_metadata_from_record(coin_record: WalletCoinRecord) -> CRCATMetadata:
metadata: MetadataTypes = coin_record.parsed_metadata()
assert isinstance(metadata, CRCATMetadata)
return metadata
def coin_record_to_crcat(self, coin_record: WalletCoinRecord) -> CRCAT:
if coin_record.coin_type not in {CoinType.CRCAT, CoinType.CRCAT_PENDING}: # pragma: no cover
raise ValueError(f"Attempting to spend a non-CRCAT coin: {coin_record.coin.name().hex()}")
if coin_record.metadata is None: # pragma: no cover
raise ValueError(f"Attempting to spend a CRCAT coin without metadata: {coin_record.coin.name().hex()}")
try:
metadata: CRCATMetadata = CRCATWallet.get_metadata_from_record(coin_record)
crcat: CRCAT = CRCAT(
coin_record.coin,
self.info.limitations_program_hash,
metadata.lineage_proof,
self.info.authorized_providers,
self.info.proofs_checker.as_program(),
construct_pending_approval_state(
metadata.inner_puzzle_hash, uint64(coin_record.coin.amount)
).get_tree_hash()
if coin_record.coin_type == CoinType.CRCAT_PENDING
else metadata.inner_puzzle_hash,
)
return crcat
except Exception as e: # pragma: no cover
raise ValueError(f"Error parsing CRCAT metadata: {e}")
async def get_lineage_proof_for_coin(self, coin: Coin) -> Optional[LineageProof]: # pragma: no cover
raise RuntimeError("get_lineage_proof_for_coin is a legacy method and is not available on CR-CAT wallets")
async def _generate_unsigned_spendbundle(
self,
payments: List[Payment],
fee: uint64 = uint64(0),
cat_discrepancy: Optional[Tuple[int, Program, Program]] = None, # (extra_delta, tail_reveal, tail_solution)
coins: Optional[Set[Coin]] = None,
coin_announcements_to_consume: Optional[Set[Announcement]] = None,
puzzle_announcements_to_consume: Optional[Set[Announcement]] = None,
min_coin_amount: Optional[uint64] = None,
max_coin_amount: Optional[uint64] = None,
excluded_coin_amounts: Optional[List[uint64]] = None,
exclude_coins: Optional[Set[Coin]] = None,
reuse_puzhash: Optional[bool] = None,
add_authorizations_to_cr_cats: bool = True,
) -> Tuple[SpendBundle, List[TransactionRecord]]:
if coin_announcements_to_consume is not None: # pragma: no cover
coin_announcements_bytes: Optional[Set[bytes32]] = {a.name() for a in coin_announcements_to_consume}
else:
coin_announcements_bytes = None
if puzzle_announcements_to_consume is not None:
puzzle_announcements_bytes: Optional[Set[bytes32]] = {a.name() for a in puzzle_announcements_to_consume}
else:
puzzle_announcements_bytes = None
if cat_discrepancy is not None:
extra_delta, tail_reveal, tail_solution = cat_discrepancy
else:
extra_delta, tail_reveal, tail_solution = 0, Program.to([]), Program.to([])
payment_amount: int = sum([p.amount for p in payments])
starting_amount: int = payment_amount - extra_delta
if not add_authorizations_to_cr_cats:
reuse_puzhash = True
if reuse_puzhash is None:
reuse_puzhash_config = self.wallet_state_manager.config.get("reuse_public_key_for_change", None)
if reuse_puzhash_config is None:
reuse_puzhash = False # pragma: no cover
else:
reuse_puzhash = reuse_puzhash_config.get(
str(self.wallet_state_manager.wallet_node.logged_in_fingerprint), False
)
if coins is None:
if exclude_coins is None:
exclude_coins = set()
cat_coins = list(
await self.select_coins(
uint64(starting_amount),
exclude=list(exclude_coins),
min_coin_amount=min_coin_amount,
max_coin_amount=max_coin_amount,
excluded_coin_amounts=excluded_coin_amounts,
)
)
elif exclude_coins is not None:
raise ValueError("Can't exclude coins when also specifically including coins") # pragma: no cover
else:
cat_coins = list(coins)
cat_coins = sorted(cat_coins, key=Coin.name) # need determinism because we need definitive origin coin
selected_cat_amount = sum([c.amount for c in cat_coins])
assert selected_cat_amount >= starting_amount
# Figure out if we need to absorb/melt some XCH as part of this
regular_chia_to_claim: int = 0
if payment_amount > starting_amount:
# TODO: The no coverage comment is because minting is broken for both this and the standard CAT wallet
fee = uint64(fee + payment_amount - starting_amount) # pragma: no cover
elif payment_amount < starting_amount:
regular_chia_to_claim = payment_amount
need_chia_transaction = (fee > 0 or regular_chia_to_claim > 0) and (fee - regular_chia_to_claim != 0)
# Calculate standard puzzle solutions
change = selected_cat_amount - starting_amount
primaries: List[Payment] = []
for payment in payments:
primaries.append(payment)
if change > 0:
origin_crcat_record = await self.wallet_state_manager.coin_store.get_coin_record(list(cat_coins)[0].name())
if origin_crcat_record is None:
raise RuntimeError("A CR-CAT coin was selected that we don't have a record for") # pragma: no cover
origin_crcat = self.coin_record_to_crcat(origin_crcat_record)
if reuse_puzhash:
change_puzhash = origin_crcat.inner_puzzle_hash
for payment in payments:
if change_puzhash == payment.puzzle_hash and change == payment.amount:
# We cannot create two coins has same id, create a new puzhash for the change
change_puzhash = await self.get_new_inner_hash()
break
else:
change_puzhash = await self.get_new_inner_hash()
primaries.append(Payment(change_puzhash, uint64(change), [change_puzhash]))
# Find the VC Wallet
vc_wallet: VCWallet
for wallet in self.wallet_state_manager.wallets.values():
if WalletType(wallet.type()) == WalletType.VC:
assert isinstance(wallet, VCWallet)
vc_wallet = wallet
break
else:
raise RuntimeError("CR-CATs cannot be spent without an appropriate VC") # pragma: no cover
# Loop through the coins we've selected and gather the information we need to spend them
vc: Optional[VerifiedCredential] = None
vc_announcements_to_make: List[bytes] = []
inner_spends: List[Tuple[CRCAT, int, Program, Program]] = []
chia_tx = None
first = True
announcement: Announcement
coin_ids: List[bytes32] = [coin.name() for coin in cat_coins]
coin_records: List[WalletCoinRecord] = (
await self.wallet_state_manager.coin_store.get_coin_records(coin_id_filter=HashFilter.include(coin_ids))
).records
assert len(coin_records) == len(cat_coins)
# sort the coin records to ensure they are in the same order as the CAT coins
coin_records = [rec for rec in sorted(coin_records, key=lambda rec: coin_ids.index(rec.coin.name()))]
for coin in coin_records:
if vc is None:
vc = await vc_wallet.get_vc_with_provider_in_and_proofs(
self.info.authorized_providers, self.info.proofs_checker.flags
)
crcat: CRCAT = self.coin_record_to_crcat(coin)
vc_announcements_to_make.append(crcat.expected_announcement())
if first:
announcement = Announcement(coin.name(), std_hash(b"".join([c.name() for c in cat_coins])))
if need_chia_transaction:
if fee > regular_chia_to_claim:
chia_tx, _ = await self.create_tandem_xch_tx(
fee,
uint64(regular_chia_to_claim),
announcements_to_assert={announcement},
min_coin_amount=min_coin_amount,
max_coin_amount=max_coin_amount,
excluded_coin_amounts=excluded_coin_amounts,
reuse_puzhash=reuse_puzhash,
)
innersol = self.standard_wallet.make_solution(
primaries=primaries,
coin_announcements={announcement.message},
coin_announcements_to_assert=coin_announcements_bytes,
puzzle_announcements_to_assert=puzzle_announcements_bytes,
)
elif regular_chia_to_claim > fee:
chia_tx, xch_announcement = await self.create_tandem_xch_tx(
fee,
uint64(regular_chia_to_claim),
min_coin_amount=min_coin_amount,
max_coin_amount=max_coin_amount,
excluded_coin_amounts=excluded_coin_amounts,
reuse_puzhash=reuse_puzhash,
)
assert xch_announcement is not None
innersol = self.standard_wallet.make_solution(
primaries=primaries,
coin_announcements={announcement.message},
coin_announcements_to_assert={xch_announcement.name()},
)
else:
innersol = self.standard_wallet.make_solution(
primaries=primaries,
coin_announcements={announcement.message},
coin_announcements_to_assert=coin_announcements_bytes,
puzzle_announcements_to_assert=puzzle_announcements_bytes,
)
else:
innersol = self.standard_wallet.make_solution(
primaries=[],
coin_announcements_to_assert={announcement.name()},
)
if first and cat_discrepancy is not None:
# TODO: This line is a hack, make_solution should allow us to pass extra conditions to it
innersol = Program.to(
[[], (1, Program.to([51, None, -113, tail_reveal, tail_solution]).cons(innersol.at("rfr"))), []]
)
inner_derivation_record = (
await self.wallet_state_manager.puzzle_store.get_derivation_record_for_puzzle_hash(
crcat.inner_puzzle_hash
)
)
if inner_derivation_record is None:
raise RuntimeError( # pragma: no cover
f"CR-CAT {crcat} has an inner puzzle hash {crcat.inner_puzzle_hash} that we don't have the keys for"
)
inner_puzzle: Program = self.standard_wallet.puzzle_for_pk(inner_derivation_record.pubkey)
inner_spends.append(
(
crcat,
extra_delta if first else 0,
inner_puzzle,
innersol,
)
)
first = False
if vc is None: # pragma: no cover
raise RuntimeError("Spending no cat coins is not an appropriate use of _generate_unsigned_spendbundle")
if vc.proof_hash is None:
raise RuntimeError("CR-CATs found an appropriate VC but that VC contains no proofs") # pragma: no cover
proof_of_inclusions: Program = await vc_wallet.proof_of_inclusions_for_root_and_keys(
vc.proof_hash, self.info.proofs_checker.flags
)
expected_announcements, coin_spends, _ = CRCAT.spend_many(
inner_spends,
proof_of_inclusions,
Program.to(None), # TODO: With more proofs checkers, this may need to be flexible. For now, it's hardcoded.
vc.proof_provider,
vc.launcher_id,
vc.wrap_inner_with_backdoor().get_tree_hash() if add_authorizations_to_cr_cats else None,
)
if add_authorizations_to_cr_cats:
vc_txs: List[TransactionRecord] = await vc_wallet.generate_signed_transaction(
vc.launcher_id,
puzzle_announcements=set(vc_announcements_to_make),
coin_announcements_to_consume=set((*expected_announcements, announcement)),
reuse_puzhash=reuse_puzhash,
)
else:
vc_txs = []
for crcat, _, _, _ in inner_spends:
await self.standard_wallet.hack_populate_secret_key_for_puzzle_hash(crcat.inner_puzzle_hash)
return (
SpendBundle(
[
*coin_spends,
*(spend for tx in vc_txs if tx.spend_bundle is not None for spend in tx.spend_bundle.coin_spends),
*(
(
spend
for bundle in [chia_tx.spend_bundle]
if bundle is not None
for spend in bundle.coin_spends
)
if chia_tx is not None
else []
),
],
G2Element(),
),
[*vc_txs, *([chia_tx] if chia_tx is not None else [])],
)
async def generate_signed_transaction(
self,
amounts: List[uint64],
puzzle_hashes: List[bytes32],
fee: uint64 = uint64(0),
coins: Optional[Set[Coin]] = None,
ignore_max_send_amount: bool = False,
memos: Optional[List[List[bytes]]] = None,
coin_announcements_to_consume: Optional[Set[Announcement]] = None,
puzzle_announcements_to_consume: Optional[Set[Announcement]] = None,
min_coin_amount: Optional[uint64] = None,
max_coin_amount: Optional[uint64] = None,
excluded_coin_amounts: Optional[List[uint64]] = None,
reuse_puzhash: Optional[bool] = None,
**kwargs: Unpack[GSTOptionalArgs],
) -> List[TransactionRecord]:
exclude_cat_coins: Optional[Set[Coin]] = kwargs.get("excluded_cat_coins", None)
# (extra_delta, tail_reveal, tail_solution)
cat_discrepancy: Optional[Tuple[int, Program, Program]] = kwargs.get("cat_discrepancy", None)
add_authorizations_to_cr_cats: bool = kwargs.get("add_authorizations_to_cr_cats", True)
if memos is None:
memos = [[] for _ in range(len(puzzle_hashes))]
if not (len(memos) == len(puzzle_hashes) == len(amounts)):
raise ValueError("Memos, puzzle_hashes, and amounts must have the same length") # pragma: no cover
payments = []
for amount, puzhash, memo_list in zip(amounts, puzzle_hashes, memos):
memos_with_hint: List[bytes] = [puzhash]
memos_with_hint.extend(memo_list)
# Force wrap the outgoing coins in the pending state if not going to us
payments.append(
Payment(
construct_pending_approval_state(puzhash, amount).get_tree_hash()
if puzhash != Offer.ph()
and not await self.wallet_state_manager.puzzle_store.puzzle_hash_exists(puzhash)
else puzhash,
amount,
memos_with_hint,
)
)
payment_sum = sum([p.amount for p in payments])
if not ignore_max_send_amount:
max_send = await self.get_max_send_amount()
if payment_sum > max_send:
raise ValueError(f"Can't send more than {max_send} mojos in a single transaction") # pragma: no cover
unsigned_spend_bundle, other_txs = await self._generate_unsigned_spendbundle(
payments,
fee,
cat_discrepancy=cat_discrepancy, # (extra_delta, tail_reveal, tail_solution)
coins=coins,
coin_announcements_to_consume=coin_announcements_to_consume,
puzzle_announcements_to_consume=puzzle_announcements_to_consume,
min_coin_amount=min_coin_amount,
max_coin_amount=max_coin_amount,
excluded_coin_amounts=excluded_coin_amounts,
exclude_coins=exclude_cat_coins,
reuse_puzhash=reuse_puzhash,
add_authorizations_to_cr_cats=add_authorizations_to_cr_cats,
)
signed_spend_bundle: SpendBundle = await self.wallet_state_manager.main_wallet.sign_transaction(
unsigned_spend_bundle.coin_spends
)
tx_list = [
TransactionRecord(
confirmed_at_height=uint32(0),
created_at_time=uint64(int(time.time())),
to_puzzle_hash=payment.puzzle_hash,
amount=payment.amount,
fee_amount=fee,
confirmed=False,
sent=uint32(0),
spend_bundle=signed_spend_bundle if i == 0 else None,
additions=signed_spend_bundle.additions() if i == 0 else [],
removals=signed_spend_bundle.removals() if i == 0 else [],
wallet_id=self.id(),
sent_to=[],
trade_id=None,
type=uint32(TransactionType.OUTGOING_TX.value),
name=signed_spend_bundle.name(),
memos=list(compute_memos(signed_spend_bundle).items()),
)
for i, payment in enumerate(payments)
]
return [*tx_list, *(dataclasses.replace(tx, spend_bundle=None) for tx in other_txs)]
async def claim_pending_approval_balance(
self,
min_amount_to_claim: uint64,
fee: uint64 = uint64(0),
coins: Optional[Set[Coin]] = None,
min_coin_amount: Optional[uint64] = None,
max_coin_amount: Optional[uint64] = None,
excluded_coin_amounts: Optional[List[uint64]] = None,
reuse_puzhash: Optional[bool] = None,
) -> List[TransactionRecord]:
# Select the relevant CR-CAT coins
crcat_records: Set[WalletCoinRecord] = await self.wallet_state_manager.coin_store.get_unspent_coins_for_wallet(
self.id(), CoinType.CRCAT_PENDING
)
if coins is None:
if max_coin_amount is None:
max_coin_amount = uint64(self.wallet_state_manager.constants.MAX_COIN_AMOUNT)
coins = await select_coins(
await self.get_pending_approval_balance(),
max_coin_amount,
list(crcat_records),
{},
self.log,
uint128(min_amount_to_claim),
None,
min_coin_amount,
excluded_coin_amounts,
)
# Select the relevant XCH coins
if fee > 0:
chia_coins = await self.standard_wallet.select_coins(
fee,
min_coin_amount=min_coin_amount,
max_coin_amount=max_coin_amount,
excluded_coin_amounts=excluded_coin_amounts,
)
else:
chia_coins = set()
# Select the relevant VC coin
vc_wallet: VCWallet = await self.wallet_state_manager.get_or_create_vc_wallet()
vc: Optional[VerifiedCredential] = await vc_wallet.get_vc_with_provider_in_and_proofs(
self.info.authorized_providers, self.info.proofs_checker.flags
)
if vc is None: # pragma: no cover
raise RuntimeError(f"No VC exists that can approve spends for CR-CAT wallet {self.id()}")
if vc.proof_hash is None:
raise RuntimeError(f"VC {vc.launcher_id} has no proofs to authorize transaction") # pragma: no cover
proof_of_inclusions: Program = await vc_wallet.proof_of_inclusions_for_root_and_keys(
vc.proof_hash, self.info.proofs_checker.flags
)
# Generate the bundle nonce
nonce: bytes32 = Program.to(
[coin_as_list(c) for c in sorted(coins.union(chia_coins).union({vc.coin}), key=Coin.name)]
).get_tree_hash()
# Make CR-CAT bundle
crcats_and_puzhashes: List[Tuple[CRCAT, bytes32]] = [
(crcat, CRCATWallet.get_metadata_from_record(record).inner_puzzle_hash)
for record in [r for r in crcat_records if r.coin in coins]
for crcat in [self.coin_record_to_crcat(record)]
]
expected_announcements, coin_spends, _ = CRCAT.spend_many(
[
(
crcat,
0,
construct_pending_approval_state(inner_puzhash, uint64(crcat.coin.amount)),
Program.to([nonce]),
)
for crcat, inner_puzhash in crcats_and_puzhashes
],
proof_of_inclusions,
Program.to(None), # TODO: With more proofs checkers, this may need to be flexible. For now, it's hardcoded.
vc.proof_provider,
vc.launcher_id,
vc.wrap_inner_with_backdoor().get_tree_hash(),
)
claim_bundle: SpendBundle = SpendBundle(coin_spends, G2Element())
# Make the Fee TX
if fee > 0:
chia_tx, _ = await self.create_tandem_xch_tx(
fee,
uint64(0),
announcements_to_assert=set(Announcement(coin.name(), nonce) for coin in coins.union({vc.coin})),
min_coin_amount=min_coin_amount,
max_coin_amount=max_coin_amount,
excluded_coin_amounts=excluded_coin_amounts,
reuse_puzhash=reuse_puzhash,
)
if chia_tx.spend_bundle is None:
raise RuntimeError("Did not get spendbundle for fee transaction") # pragma: no cover
claim_bundle = SpendBundle.aggregate([claim_bundle, chia_tx.spend_bundle])
else:
chia_tx = None
# Make the VC TX
vc_txs: List[TransactionRecord] = await vc_wallet.generate_signed_transaction(
vc.launcher_id,
puzzle_announcements=set(crcat.expected_announcement() for crcat, _ in crcats_and_puzhashes),
coin_announcements={nonce},
coin_announcements_to_consume=set(expected_announcements),
reuse_puzhash=reuse_puzhash,
)
claim_bundle = SpendBundle.aggregate(
[claim_bundle, *(tx.spend_bundle for tx in vc_txs if tx.spend_bundle is not None)]
)
return [
TransactionRecord(
confirmed_at_height=uint32(0),
created_at_time=uint64(int(time.time())),
to_puzzle_hash=await self.wallet_state_manager.main_wallet.get_puzzle_hash(False),
amount=uint64(sum(c.amount for c in coins)),
fee_amount=fee,
confirmed=False,
sent=uint32(0),
spend_bundle=claim_bundle,
additions=claim_bundle.additions(),
removals=claim_bundle.removals(),
wallet_id=self.id(),
sent_to=[],
trade_id=None,
type=uint32(TransactionType.OUTGOING_TX.value),
name=claim_bundle.name(),
memos=list(compute_memos(claim_bundle).items()),
),
TransactionRecord(
confirmed_at_height=uint32(0),
created_at_time=uint64(int(time.time())),
to_puzzle_hash=await self.wallet_state_manager.main_wallet.get_puzzle_hash(False),
amount=uint64(sum(c.amount for c in coins)),
fee_amount=fee,
confirmed=False,
sent=uint32(0),
spend_bundle=None,
additions=[],
removals=[],
wallet_id=self.id(),
sent_to=[],
trade_id=None,
type=uint32(TransactionType.INCOMING_TX.value),
name=claim_bundle.name(),
memos=list(compute_memos(claim_bundle).items()),
),
*(dataclasses.replace(tx, spend_bundle=None) for tx in vc_txs),
*((dataclasses.replace(chia_tx, spend_bundle=None),) if chia_tx is not None else []),
]
async def match_puzzle_info(self, puzzle_driver: PuzzleInfo) -> bool:
if (
AssetType(puzzle_driver.type()) == AssetType.CAT
and puzzle_driver["tail"] == self.info.limitations_program_hash
):
inner_puzzle_driver: Optional[PuzzleInfo] = puzzle_driver.also()
if inner_puzzle_driver is None:
raise ValueError("Malformed puzzle driver passed to CRCATWallet.match_puzzle_info") # pragma: no cover
return (
AssetType(inner_puzzle_driver.type()) == AssetType.CR
and [bytes32(provider) for provider in inner_puzzle_driver["authorized_providers"]]
== self.info.authorized_providers
and ProofsChecker.from_program(uncurry_puzzle(inner_puzzle_driver["proofs_checker"]))
== self.info.proofs_checker
)
return False
async def get_puzzle_info(self, asset_id: bytes32) -> PuzzleInfo:
return PuzzleInfo(
{
"type": AssetType.CAT.value,
"tail": "0x" + self.info.limitations_program_hash.hex(),
"also": {
"type": AssetType.CR.value,
"authorized_providers": ["0x" + provider.hex() for provider in self.info.authorized_providers],
"proofs_checker": self.info.proofs_checker.as_program(),
},
}
)
async def match_hinted_coin(self, coin: Coin, hint: bytes32) -> bool:
return (
construct_cat_puzzle(
CAT_MOD,
self.info.limitations_program_hash,
construct_cr_layer(
self.info.authorized_providers,
self.info.proofs_checker.as_program(),
hint, # type: ignore
),
).get_tree_hash_precalc(hint)
== coin.puzzle_hash
or construct_cat_puzzle(
CAT_MOD,
self.info.limitations_program_hash,
construct_cr_layer(
self.info.authorized_providers,
self.info.proofs_checker.as_program(),
construct_pending_approval_state(hint, uint64(coin.amount)),
),
).get_tree_hash()
== coin.puzzle_hash
)
if TYPE_CHECKING:
_dummy: WalletProtocol = CRCATWallet()

View File

@ -0,0 +1,102 @@
from __future__ import annotations
from dataclasses import dataclass
from typing import Any, Callable, Dict, List, Optional, Tuple
from clvm_tools.binutils import disassemble
from chia.types.blockchain_format.coin import Coin
from chia.types.blockchain_format.program import Program
from chia.types.blockchain_format.sized_bytes import bytes32
from chia.util.ints import uint64
from chia.wallet.puzzle_drivers import PuzzleInfo, Solver
from chia.wallet.uncurried_puzzle import UncurriedPuzzle, uncurry_puzzle
from chia.wallet.vc_wallet.cr_cat_drivers import PROOF_FLAGS_CHECKER, construct_cr_layer, match_cr_layer, solve_cr_layer
@dataclass(frozen=True)
class CROuterPuzzle:
_match: Callable[[UncurriedPuzzle], Optional[PuzzleInfo]]
_construct: Callable[[PuzzleInfo, Program], Program]
_solve: Callable[[PuzzleInfo, Solver, Program, Program], Program]
_get_inner_puzzle: Callable[[PuzzleInfo, UncurriedPuzzle], Optional[Program]]
_get_inner_solution: Callable[[PuzzleInfo, Program], Optional[Program]]
def match(self, puzzle: UncurriedPuzzle) -> Optional[PuzzleInfo]:
args: Optional[Tuple[List[bytes32], Program, Program]] = match_cr_layer(puzzle)
if args is None:
return None
authorized_providers, proofs_checker, inner_puzzle = args
constructor_dict: Dict[str, Any] = {
"type": "credential restricted",
"authorized_providers": ["0x" + ap.hex() for ap in authorized_providers],
"proofs_checker": disassemble(proofs_checker),
}
next_constructor = self._match(uncurry_puzzle(inner_puzzle))
if next_constructor is not None:
constructor_dict["also"] = next_constructor.info
return PuzzleInfo(constructor_dict)
def get_inner_puzzle(self, constructor: PuzzleInfo, puzzle_reveal: UncurriedPuzzle) -> Optional[Program]:
args: Optional[Tuple[List[bytes32], Program, Program]] = match_cr_layer(puzzle_reveal)
if args is None:
raise ValueError("This driver is not for the specified puzzle reveal") # pragma: no cover
_, _, inner_puzzle = args
also = constructor.also()
if also is not None:
deep_inner_puzzle: Optional[Program] = self._get_inner_puzzle(also, uncurry_puzzle(inner_puzzle))
return deep_inner_puzzle
else:
return inner_puzzle
def get_inner_solution(self, constructor: PuzzleInfo, solution: Program) -> Optional[Program]:
my_inner_solution: Program = solution.at("rrrrrrf")
also = constructor.also()
if also:
deep_inner_solution: Optional[Program] = self._get_inner_solution(also, my_inner_solution)
return deep_inner_solution
else:
return my_inner_solution
def asset_id(self, constructor: PuzzleInfo) -> Optional[bytes32]:
return None
def construct(self, constructor: PuzzleInfo, inner_puzzle: Program) -> Program:
also = constructor.also()
if also is not None:
inner_puzzle = self._construct(also, inner_puzzle)
return construct_cr_layer(
constructor["authorized_providers"],
constructor["proofs_checker"] if "proofs_checker" in constructor else PROOF_FLAGS_CHECKER,
inner_puzzle,
)
def solve(self, constructor: PuzzleInfo, solver: Solver, inner_puzzle: Program, inner_solution: Program) -> Program:
coin_bytes: bytes = solver["coin"]
coin = Coin(bytes32(coin_bytes[0:32]), bytes32(coin_bytes[32:64]), uint64.from_bytes(coin_bytes[64:72]))
coin_name: str = coin.name().hex()
if "vc_authorizations" in solver.info:
vc_info = solver["vc_authorizations"][coin_name]
else:
vc_info = [
# TODO: This is something of a hack here, doesn't really work for proofs checkers generally.
# The problem is that the CAT driver above us is running its inner puzzle (us) in order to get the
# conditions that are output. This is bad practice on the CAT driver's part, the protocol should support
# asking inner drivers for what conditions they return. Alas, since this is not supported, we have to
# do a hack that we know will work for the one known proof checker we currently have.
uncurry_puzzle(constructor["proofs_checker"]).args.at("f"),
None,
constructor["authorized_providers"][0], # Hack for similar reasons as above, we need a valid provider
None,
None,
]
also = constructor.also()
if also is not None:
inner_solution = self._solve(also, solver, inner_puzzle, inner_solution)
return solve_cr_layer( # type: ignore[call-arg]
*vc_info,
coin.name(),
inner_solution,
)

View File

@ -0,0 +1,3 @@
(mod (CONDITIONS nonce)
(c (list 60 nonce) CONDITIONS)
)

View File

@ -0,0 +1 @@
ff04ffff04ffff013cffff04ff05ff808080ff0280

View File

@ -54,6 +54,32 @@ class VCProofs:
else:
raise ValueError("Malformatted VCProofs program") # pragma: no cover
def prove_keys(self, keys: List[str], tree: Optional[Program] = None) -> Program:
if tree is None:
tree = self.as_program()
first = tree.first()
if first.atom is not None:
if first.atom.decode("utf8") in keys:
return tree
else:
tree_hash_as_atom: Program = Program.to(tree.get_tree_hash())
return tree_hash_as_atom
else:
rest = tree.rest()
first_tree = self.prove_keys(keys, first)
rest_tree = self.prove_keys(keys, rest)
if first_tree.atom is not None and rest_tree.atom is not None:
tree_hash_as_atom = Program.to(
Program.to((first_tree, rest_tree)).get_tree_hash_precalc(
bytes32(first_tree.atom), bytes32(rest_tree.atom)
)
)
return tree_hash_as_atom
else:
new_tree: Program = first_tree.cons(rest_tree)
return new_tree
_T_VCStore = TypeVar("_T_VCStore", bound="VCStore")
@ -167,15 +193,15 @@ class VCStore:
return _row_to_vc_record(row)
return None
# Coverage coming with CR-CAT Wallet
async def get_vc_records_by_providers(self, provider_ids: List[bytes32]) -> List[VCRecord]: # pragma: no cover
"""
Checks DB for VCs with a proof_provider in a specified list and returns them.
"""
async with self.db_wrapper.reader_no_transaction() as conn:
providers_param: str = ",".join(["?"] * len(provider_ids))
providers_param: str = ", ".join(["?"] * len(provider_ids))
cursor = await conn.execute(
f"SELECT * from vc_records WHERE proof_provider IN {providers_param} LIMIT 1000", provider_ids
f"SELECT * from vc_records WHERE proof_provider IN ({providers_param}) LIMIT 1000",
tuple(id.hex() for id in provider_ids),
)
rows = await cursor.fetchall()
await cursor.close()

View File

@ -4,9 +4,10 @@ import dataclasses
import logging
import time
import traceback
from typing import TYPE_CHECKING, List, Optional, Set, Tuple, Type, TypeVar, Union
from typing import TYPE_CHECKING, Dict, List, Optional, Set, Tuple, Type, TypeVar, Union
from blspy import G1Element, G2Element
from clvm.casts import int_to_bytes
from typing_extensions import Unpack
from chia.protocols.wallet_protocol import CoinState
@ -17,18 +18,23 @@ from chia.types.blockchain_format.program import Program
from chia.types.blockchain_format.sized_bytes import bytes32
from chia.types.coin_spend import CoinSpend
from chia.types.spend_bundle import SpendBundle
from chia.util.hash import std_hash
from chia.util.ints import uint32, uint64, uint128
from chia.wallet.did_wallet.did_wallet import DIDWallet
from chia.wallet.payment import Payment
from chia.wallet.puzzle_drivers import Solver
from chia.wallet.puzzles.p2_delegated_puzzle_or_hidden_puzzle import solution_for_conditions
from chia.wallet.sign_coin_spends import sign_coin_spends
from chia.wallet.trading.offer import Offer
from chia.wallet.transaction_record import TransactionRecord
from chia.wallet.uncurried_puzzle import uncurry_puzzle
from chia.wallet.util.compute_memos import compute_memos
from chia.wallet.util.transaction_type import TransactionType
from chia.wallet.util.wallet_sync_utils import fetch_coin_spend_for_coin_state
from chia.wallet.util.wallet_types import WalletType
from chia.wallet.vc_wallet.cr_cat_drivers import CRCAT, CRCATSpend, ProofsChecker, construct_pending_approval_state
from chia.wallet.vc_wallet.vc_drivers import VerifiedCredential
from chia.wallet.vc_wallet.vc_store import VCRecord, VCStore
from chia.wallet.vc_wallet.vc_store import VCProofs, VCRecord, VCStore
from chia.wallet.wallet import Wallet
from chia.wallet.wallet_coin_record import WalletCoinRecord
from chia.wallet.wallet_info import WalletInfo
@ -297,6 +303,7 @@ class VCWallet:
magic_conditions=[magic_condition],
)
did_announcement, coin_spend, vc = vc_record.vc.do_spend(inner_puzzle, innersol, new_proof_hash)
await self.standard_wallet.hack_populate_secret_key_for_puzzle_hash(inner_puzhash)
spend_bundles = [
await sign_coin_spends(
[coin_spend],
@ -425,6 +432,185 @@ class VCWallet:
else:
return [tx] # pragma: no cover
async def add_vc_authorization(
self, offer: Offer, solver: Solver, reuse_puzhash: Optional[bool] = None
) -> Tuple[Offer, Solver]:
if reuse_puzhash is None:
reuse_puzhash_config = self.wallet_state_manager.config.get("reuse_public_key_for_change", None)
if reuse_puzhash_config is None:
reuse_puzhash = False # pragma: no cover
else:
reuse_puzhash = reuse_puzhash_config.get(
str(self.wallet_state_manager.wallet_node.logged_in_fingerprint), False
)
# Gather all of the CRCATs being spent and the CRCATs that each creates
crcat_spends: List[CRCATSpend] = []
other_spends: List[CoinSpend] = []
spends_to_fix: Dict[bytes32, CoinSpend] = {}
for spend in offer.to_valid_spend().coin_spends:
if CRCAT.is_cr_cat(uncurry_puzzle(spend.puzzle_reveal.to_program()))[0]:
crcat_spend: CRCATSpend = CRCATSpend.from_coin_spend(spend)
if crcat_spend.incomplete:
crcat_spends.append(crcat_spend)
if spend in offer._bundle.coin_spends:
spends_to_fix[spend.coin.name()] = spend
else:
if spend in offer._bundle.coin_spends: # pragma: no cover
other_spends.append(spend)
else:
if spend in offer._bundle.coin_spends:
other_spends.append(spend)
# Figure out what VC announcements are needed
announcements_to_make: Dict[bytes32, List[bytes32]] = {}
announcements_to_assert: Dict[bytes32, List[Announcement]] = {}
vcs: Dict[bytes32, VerifiedCredential] = {}
coin_args: Dict[str, List[str]] = {}
for crcat_spend in crcat_spends:
# Check first whether we can approve...
available_vcs: List[VCRecord] = [
vc_rec
for vc_rec in await self.store.get_vc_records_by_providers(crcat_spend.crcat.authorized_providers)
if vc_rec.confirmed_at_height != 0
]
if len(available_vcs) == 0: # pragma: no cover
raise ValueError(f"No VC available with provider in {crcat_spend.crcat.authorized_providers}")
vc: VerifiedCredential = available_vcs[0].vc
vc_to_use: bytes32 = vc.launcher_id
vcs[vc_to_use] = vc
# ...then whether or not we should
our_crcat: bool = (
await self.wallet_state_manager.get_wallet_identifier_for_puzzle_hash(
crcat_spend.crcat.inner_puzzle_hash
)
is not None
)
outputs_ok: bool = True
for cc in [c for c in crcat_spend.inner_conditions if c.at("f") == 51]:
if not (
( # it's coming to us
await self.wallet_state_manager.get_wallet_identifier_for_puzzle_hash(bytes32(cc.at("rf").atom))
is not None
)
or ( # it's going back where it came from
bytes32(cc.at("rf").atom) == crcat_spend.crcat.inner_puzzle_hash
)
or ( # it's going to the pending state
cc.at("rrr") != Program.to(None)
and cc.at("rrrf").atom is None
and bytes32(cc.at("rf").atom)
== construct_pending_approval_state(
bytes32(cc.at("rrrff").atom), uint64(cc.at("rrf").as_int())
).get_tree_hash()
)
or bytes32(cc.at("rf").atom) == Offer.ph() # it's going to the offer mod
):
outputs_ok = False # pragma: no cover
if our_crcat or outputs_ok:
announcements_to_make.setdefault(vc_to_use, [])
announcements_to_assert.setdefault(vc_to_use, [])
announcements_to_make[vc_to_use].append(crcat_spend.crcat.expected_announcement())
announcements_to_assert[vc_to_use].extend(
[
Announcement(
crcat_spend.crcat.coin.name(),
b"\xcd" + std_hash(crc.inner_puzzle_hash + int_to_bytes(crc.coin.amount)),
)
for crc in crcat_spend.children
]
)
coin_name: str = crcat_spend.crcat.coin.name().hex()
coin_args[coin_name] = [
await self.proof_of_inclusions_for_root_and_keys(
# It's on my TODO list to fix the below line -Quex
vc.proof_hash, # type: ignore
ProofsChecker.from_program(uncurry_puzzle(crcat_spend.crcat.proofs_checker)).flags,
),
"()", # not general
"0x" + vc.proof_provider.hex(),
"0x" + vc.launcher_id.hex(),
"0x" + vc.wrap_inner_with_backdoor().get_tree_hash().hex(),
]
if crcat_spend.crcat.coin.name() in spends_to_fix:
spend_to_fix: CoinSpend = spends_to_fix[crcat_spend.crcat.coin.name()]
other_spends.append(
dataclasses.replace(
spend_to_fix,
solution=spend_to_fix.solution.to_program().replace(
ff=coin_args[coin_name][0],
frf=Program.to(None), # not general
frrf=bytes32.from_hexstr(coin_args[coin_name][2]),
frrrf=bytes32.from_hexstr(coin_args[coin_name][3]),
frrrrf=bytes32.from_hexstr(coin_args[coin_name][4]),
),
)
)
else:
raise ValueError("Wallet cannot verify all spends in specified offer") # pragma: no cover
vc_spends: List[SpendBundle] = []
for launcher_id, vc in vcs.items():
vc_spends.append(
SpendBundle.aggregate(
[
tx.spend_bundle
for tx in (
await self.generate_signed_transaction(
launcher_id,
puzzle_announcements=set(announcements_to_make[launcher_id]),
coin_announcements_to_consume=set(announcements_to_assert[launcher_id]),
reuse_puzhash=reuse_puzhash,
)
)
if tx.spend_bundle is not None
]
)
)
return Offer.from_spend_bundle(
SpendBundle.aggregate(
[
SpendBundle(
[
*(
spend
for spend in offer.to_spend_bundle().coin_spends
if spend.coin.parent_coin_info == bytes32([0] * 32)
),
*other_spends,
],
offer._bundle.aggregated_signature,
),
*vc_spends,
]
)
), Solver({"vc_authorizations": coin_args})
async def get_vc_with_provider_in_and_proofs(
self, authorized_providers: List[bytes32], proofs: List[str]
) -> VerifiedCredential:
vc_records: List[VCRecord] = await self.store.get_vc_records_by_providers(authorized_providers)
if len(vc_records) == 0: # pragma: no cover
raise ValueError(f"VCWallet has no VCs with providers in the following list: {authorized_providers}")
else:
for rec in vc_records:
if rec.vc.proof_hash is None:
continue # pragma: no cover
vc_proofs: Optional[VCProofs] = await self.store.get_proofs_for_root(rec.vc.proof_hash)
if vc_proofs is None:
continue # pragma: no cover
if all(proof in vc_proofs.key_value_pairs for proof in proofs):
return rec.vc
raise ValueError(f"No authorized VC has the correct proofs: {proofs}") # pragma: no cover
async def proof_of_inclusions_for_root_and_keys(self, root: bytes32, keys: List[str]) -> Program:
vc_proofs: Optional[VCProofs] = await self.store.get_proofs_for_root(root)
if vc_proofs is None:
raise RuntimeError(f"No proofs exist for VC root: {root.hex()}") # pragma: no cover
else:
return vc_proofs.prove_keys(keys)
async def select_coins(
self,
amount: uint64,

View File

@ -421,6 +421,9 @@ class Wallet:
message_list.append(Coin(coin.name(), primary.puzzle_hash, primary.amount).name())
message: bytes32 = std_hash(b"".join(message_list))
puzzle: Program = await self.puzzle_for_puzzle_hash(coin.puzzle_hash)
assert (
puzzle.get_tree_hash() == coin.puzzle_hash
), "Decorated puzzle doesn't match the coin puzzle hash, please check your puzzle_decorators config."
solution: Program = self.make_solution(
primaries=primaries,
fee=fee,

View File

@ -10,8 +10,9 @@ from chia.util.ints import uint8, uint32, uint64
from chia.util.misc import VersionedBlob
from chia.wallet.puzzles.clawback.metadata import ClawbackMetadata, ClawbackVersion
from chia.wallet.util.wallet_types import CoinType, StreamableWalletIdentifier, WalletType
from chia.wallet.vc_wallet.cr_cat_drivers import CRCATMetadata, CRCATVersion
MetadataTypes = Union[ClawbackMetadata]
MetadataTypes = Union[ClawbackMetadata, CRCATMetadata]
@dataclass(frozen=True)
@ -42,7 +43,11 @@ class WalletCoinRecord:
raise ValueError("Can't parse None metadata")
if self.coin_type == CoinType.CLAWBACK and self.metadata.version == ClawbackVersion.V1.value:
return ClawbackMetadata.from_bytes(self.metadata.blob)
if (
self.coin_type in {CoinType.CRCAT_PENDING, CoinType.CRCAT}
and self.metadata.version == CRCATVersion.V1.value
):
return CRCATMetadata.from_bytes(self.metadata.blob)
raise ValueError(f"Unknown metadata {self.metadata} for coin_type {self.coin_type}")
def name(self) -> bytes32:

View File

@ -74,6 +74,7 @@ from chia.wallet.util.wallet_sync_utils import (
subscribe_to_coin_updates,
subscribe_to_phs,
)
from chia.wallet.util.wallet_types import CoinType, WalletType
from chia.wallet.wallet_state_manager import WalletStateManager
from chia.wallet.wallet_weight_proof_handler import WalletWeightProofHandler, get_wp_fork_point
@ -1639,7 +1640,11 @@ class WalletNode:
async def _update_balance_cache(self, wallet_id: uint32) -> None:
assert self.wallet_state_manager.lock.locked(), "WalletStateManager.lock required"
wallet = self.wallet_state_manager.wallets[wallet_id]
unspent_records = await self.wallet_state_manager.coin_store.get_unspent_coins_for_wallet(wallet_id)
if wallet.type() == WalletType.CRCAT:
coin_type = CoinType.CRCAT
else:
coin_type = CoinType.NORMAL
unspent_records = await self.wallet_state_manager.coin_store.get_unspent_coins_for_wallet(wallet_id, coin_type)
balance = await wallet.get_confirmed_balance(unspent_records)
pending_balance = await wallet.get_unconfirmed_balance(unspent_records)
spendable_balance = await wallet.get_spendable_balance(unspent_records)

View File

@ -93,6 +93,8 @@ class GSTOptionalArgs(TypedDict):
trade_prices_list: NotRequired[Optional[Program]]
additional_bundles: NotRequired[List[SpendBundle]]
metadata_update: NotRequired[Optional[Tuple[str, str]]]
# CR-CAT Wallet
add_authorizations_to_cr_cats: NotRequired[bool]
# VCWallet
new_proof_hash: NotRequired[Optional[bytes32]]
provider_inner_puzhash: NotRequired[Optional[bytes32]]

View File

@ -8,7 +8,20 @@ import traceback
from contextlib import asynccontextmanager
from pathlib import Path
from secrets import token_bytes
from typing import TYPE_CHECKING, Any, AsyncIterator, Callable, Dict, Iterator, List, Optional, Set, Type, TypeVar
from typing import (
TYPE_CHECKING,
Any,
AsyncIterator,
Callable,
Dict,
Iterator,
List,
Optional,
Set,
Type,
TypeVar,
Union,
)
import aiosqlite
from blspy import G1Element, G2Element, PrivateKey
@ -47,6 +60,7 @@ from chia.util.lru_cache import LRUCache
from chia.util.misc import UInt32Range, UInt64Range, VersionedBlob
from chia.util.path import path_from_root
from chia.wallet.cat_wallet.cat_constants import DEFAULT_CATS
from chia.wallet.cat_wallet.cat_info import CATInfo, CRCATInfo
from chia.wallet.cat_wallet.cat_utils import CAT_MOD, CAT_MOD_HASH, construct_cat_puzzle, match_cat_puzzle
from chia.wallet.cat_wallet.cat_wallet import CATWallet
from chia.wallet.db_wallet.db_wallet_puzzles import MIRROR_PUZZLE_HASH
@ -88,12 +102,14 @@ from chia.wallet.util.wallet_sync_utils import (
last_change_height_cs,
)
from chia.wallet.util.wallet_types import CoinType, WalletIdentifier, WalletType
from chia.wallet.vc_wallet.cr_cat_drivers import CRCAT, ProofsChecker, construct_pending_approval_state
from chia.wallet.vc_wallet.cr_cat_wallet import CRCATWallet
from chia.wallet.vc_wallet.vc_drivers import VerifiedCredential
from chia.wallet.vc_wallet.vc_store import VCStore
from chia.wallet.vc_wallet.vc_wallet import VCWallet
from chia.wallet.wallet import Wallet
from chia.wallet.wallet_blockchain import WalletBlockchain
from chia.wallet.wallet_coin_record import WalletCoinRecord
from chia.wallet.wallet_coin_record import MetadataTypes, WalletCoinRecord
from chia.wallet.wallet_coin_store import WalletCoinStore
from chia.wallet.wallet_info import WalletInfo
from chia.wallet.wallet_interested_store import WalletInterestedStore
@ -269,6 +285,12 @@ class WalletStateManager:
self.main_wallet,
wallet_info,
)
elif wallet_type == WalletType.CRCAT: # pragma: no cover
wallet = await CRCATWallet.create(
self,
self.main_wallet,
wallet_info,
)
if wallet is not None:
self.wallets[wallet_info.id] = wallet
@ -581,11 +603,13 @@ class WalletStateManager:
return spendable_amount
async def does_coin_belong_to_wallet(self, coin: Coin, wallet_id: int) -> bool:
async def does_coin_belong_to_wallet(
self, coin: Coin, wallet_id: int, hint_dict: Dict[bytes32, bytes32] = {}
) -> bool:
"""
Returns true if we have the key for this coin.
"""
wallet_identifier = await self.puzzle_store.get_wallet_identifier_for_puzzle_hash(coin.puzzle_hash)
wallet_identifier = await self.get_wallet_identifier_for_coin(coin, hint_dict)
return wallet_identifier is not None and wallet_identifier.id == wallet_id
async def get_confirmed_balance_for_wallet(
@ -598,7 +622,11 @@ class WalletStateManager:
"""
# lock only if unspent_coin_records is None
if unspent_coin_records is None:
unspent_coin_records = await self.coin_store.get_unspent_coins_for_wallet(wallet_id)
if self.wallets[uint32(wallet_id)].type() == WalletType.CRCAT:
coin_type = CoinType.CRCAT
else:
coin_type = CoinType.NORMAL
unspent_coin_records = await self.coin_store.get_unspent_coins_for_wallet(wallet_id, coin_type)
return uint128(sum(cr.coin.amount for cr in unspent_coin_records))
async def get_unconfirmed_balance(
@ -611,7 +639,13 @@ class WalletStateManager:
# This API should change so that get_balance_from_coin_records is called for Set[WalletCoinRecord]
# and this method is called only for the unspent_coin_records==None case.
if unspent_coin_records is None:
unspent_coin_records = await self.coin_store.get_unspent_coins_for_wallet(wallet_id)
wallet_type: WalletType = self.wallets[uint32(wallet_id)].type()
if wallet_type == WalletType.CRCAT:
unspent_coin_records = await self.coin_store.get_unspent_coins_for_wallet(wallet_id, CoinType.CRCAT)
pending_crcat = await self.coin_store.get_unspent_coins_for_wallet(wallet_id, CoinType.CRCAT_PENDING)
unspent_coin_records = unspent_coin_records.union(pending_crcat)
else:
unspent_coin_records = await self.coin_store.get_unspent_coins_for_wallet(wallet_id)
unconfirmed_tx: List[TransactionRecord] = await self.tx_store.get_unconfirmed_for_wallet(wallet_id)
all_unspent_coins: Set[Coin] = {cr.coin for cr in unspent_coin_records}
@ -619,11 +653,14 @@ class WalletStateManager:
for record in unconfirmed_tx:
for addition in record.additions:
# This change or a self transaction
if await self.does_coin_belong_to_wallet(addition, wallet_id):
if await self.does_coin_belong_to_wallet(addition, wallet_id, record.hint_dict()):
all_unspent_coins.add(addition)
for removal in record.removals:
if await self.does_coin_belong_to_wallet(removal, wallet_id) and removal in all_unspent_coins:
if (
await self.does_coin_belong_to_wallet(removal, wallet_id, record.hint_dict())
and removal in all_unspent_coins
):
all_unspent_coins.remove(removal)
return uint128(sum(coin.amount for coin in all_unspent_coins))
@ -718,7 +755,8 @@ class WalletStateManager:
)
for coin in unspent_coins.records:
try:
metadata: ClawbackMetadata = coin.parsed_metadata()
metadata: MetadataTypes = coin.parsed_metadata()
assert isinstance(metadata, ClawbackMetadata)
if await metadata.is_recipient(self.puzzle_store):
coin_timestamp = await self.wallet_node.get_timestamp_for_height(coin.confirmed_block_height)
if current_timestamp - coin_timestamp >= metadata.time_lock:
@ -876,14 +914,65 @@ class WalletStateManager:
our_inner_puzzle: Program = self.main_wallet.puzzle_for_pk(derivation_record.pubkey)
asset_id: bytes32 = bytes32(bytes(tail_hash)[1:])
cat_puzzle = construct_cat_puzzle(CAT_MOD, asset_id, our_inner_puzzle, CAT_MOD_HASH)
is_crcat: bool = False
if cat_puzzle.get_tree_hash() != coin_state.coin.puzzle_hash:
return None
# Check if it is a CRCAT
if CRCAT.is_cr_cat(uncurry_puzzle(Program.from_bytes(bytes(coin_spend.puzzle_reveal)))):
is_crcat = True
else:
return None # pragma: no cover
if is_crcat:
# Since CRCAT wallet doesn't have derivation path, every CRCAT will go through this code path
crcat: CRCAT = next(
crc for crc in CRCAT.get_next_from_coin_spend(coin_spend) if crc.coin == coin_state.coin
)
# Make sure we control the inner puzzle or we control it if it's wrapped in the pending state
if (
await self.puzzle_store.get_derivation_record_for_puzzle_hash(crcat.inner_puzzle_hash) is None
and crcat.inner_puzzle_hash
!= construct_pending_approval_state(
hinted_coin.hint,
uint64(coin_state.coin.amount),
).get_tree_hash()
):
self.log.error(f"Unknown CRCAT inner puzzle, coin ID:{crcat.coin.name().hex()}") # pragma: no cover
return None # pragma: no cover
# Check if we already have a wallet
for wallet_info in await self.get_all_wallet_info_entries(wallet_type=WalletType.CRCAT):
crcat_info: CRCATInfo = CRCATInfo.from_bytes(bytes.fromhex(wallet_info.data))
if crcat_info.limitations_program_hash == asset_id:
return WalletIdentifier(wallet_info.id, WalletType(wallet_info.type))
# We didn't find a matching CR-CAT wallet, but maybe we have a matching CAT wallet that we can convert
for wallet_info in await self.get_all_wallet_info_entries(wallet_type=WalletType.CAT):
cat_info: CATInfo = CATInfo.from_bytes(bytes.fromhex(wallet_info.data))
found_cat_wallet = self.wallets[wallet_info.id]
assert isinstance(found_cat_wallet, CATWallet)
if cat_info.limitations_program_hash == crcat.tail_hash:
await CRCATWallet.convert_to_cr(
found_cat_wallet,
crcat.authorized_providers,
ProofsChecker.from_program(uncurry_puzzle(crcat.proofs_checker)),
)
self.state_changed("converted cat wallet to cr", wallet_info.id)
return WalletIdentifier(wallet_info.id, WalletType(WalletType.CRCAT))
if bytes(tail_hash).hex()[2:] in self.default_cats or self.config.get(
"automatically_add_unknown_cats", False
):
cat_wallet = await CATWallet.get_or_create_wallet_for_cat(
self, self.main_wallet, bytes(tail_hash).hex()[2:]
)
if is_crcat:
cat_wallet: Union[CATWallet, CRCATWallet] = await CRCATWallet.get_or_create_wallet_for_cat(
self,
self.main_wallet,
crcat.tail_hash.hex(),
authorized_providers=crcat.authorized_providers,
proofs_checker=ProofsChecker.from_program(uncurry_puzzle(crcat.proofs_checker)),
)
else:
cat_wallet = await CATWallet.get_or_create_wallet_for_cat(
self, self.main_wallet, bytes(tail_hash).hex()[2:]
)
return WalletIdentifier.create(cat_wallet)
else:
# Found unacknowledged CAT, save it in the database.
@ -1670,6 +1759,29 @@ class WalletStateManager:
return WalletIdentifier(uint32(wallet_id), self.wallets[uint32(wallet_id)].type())
return None
async def get_wallet_identifier_for_coin(
self, coin: Coin, hint_dict: Dict[bytes32, bytes32] = {}
) -> Optional[WalletIdentifier]:
wallet_identifier = await self.puzzle_store.get_wallet_identifier_for_puzzle_hash(coin.puzzle_hash)
if (
wallet_identifier is None
and coin.name() in hint_dict
and await self.puzzle_store.puzzle_hash_exists(hint_dict[coin.name()])
):
wallet_identifier = await self.get_wallet_identifier_for_hinted_coin(coin, hint_dict[coin.name()])
if wallet_identifier is None:
coin_record = await self.coin_store.get_coin_record(coin.name())
if coin_record is not None:
wallet_identifier = WalletIdentifier(uint32(coin_record.wallet_id), coin_record.wallet_type)
return wallet_identifier
async def get_wallet_identifier_for_hinted_coin(self, coin: Coin, hint: bytes32) -> Optional[WalletIdentifier]:
for wallet in self.wallets.values():
if await wallet.match_hinted_coin(coin, hint):
return WalletIdentifier(wallet.id(), wallet.type())
return None
async def coin_added(
self,
coin: Coin,
@ -1898,9 +2010,9 @@ class WalletStateManager:
async def get_wallet_for_asset_id(self, asset_id: str) -> Optional[WalletProtocol]:
for wallet_id, wallet in self.wallets.items():
if wallet.type() == WalletType.CAT:
if wallet.type() in (WalletType.CAT, WalletType.CRCAT):
assert isinstance(wallet, CATWallet)
if bytes(wallet.cat_info.limitations_program_hash).hex() == asset_id:
if wallet.get_asset_id() == asset_id:
return wallet
elif wallet.type() == WalletType.DATA_LAYER:
assert isinstance(wallet, DataLayerWallet)
@ -1928,6 +2040,9 @@ class WalletStateManager:
self.main_wallet,
puzzle_driver,
name,
potential_subclasses={
AssetType.CR: CRCATWallet,
},
)
async def add_new_wallet(self, wallet: WalletProtocol) -> None:
@ -1938,8 +2053,12 @@ class WalletStateManager:
async def get_spendable_coins_for_wallet(
self, wallet_id: int, records: Optional[Set[WalletCoinRecord]] = None
) -> Set[WalletCoinRecord]:
wallet_type = self.wallets[uint32(wallet_id)].type()
if records is None:
records = await self.coin_store.get_unspent_coins_for_wallet(wallet_id)
if wallet_type == WalletType.CRCAT:
records = await self.coin_store.get_unspent_coins_for_wallet(wallet_id, CoinType.CRCAT)
else:
records = await self.coin_store.get_unspent_coins_for_wallet(wallet_id)
# Coins that are currently part of a transaction
unconfirmed_tx: List[TransactionRecord] = await self.tx_store.get_unconfirmed_for_wallet(wallet_id)
@ -1947,7 +2066,7 @@ class WalletStateManager:
for tx in unconfirmed_tx:
for coin in tx.removals:
# TODO, "if" might not be necessary once unconfirmed tx doesn't contain coins for other wallets
if await self.does_coin_belong_to_wallet(coin, wallet_id):
if await self.does_coin_belong_to_wallet(coin, wallet_id, tx.hint_dict()):
removal_dict[coin.name()] = coin
# Coins that are part of the trade
@ -1993,7 +2112,7 @@ class WalletStateManager:
async def convert_puzzle_hash(self, wallet_id: uint32, puzzle_hash: bytes32) -> bytes32:
wallet = self.wallets[wallet_id]
# This should be general to wallets but for right now this is just for CATs so we'll add this if
if wallet.type() == WalletType.CAT.value:
if wallet.type() in (WalletType.CAT.value, WalletType.CRCAT.value):
assert isinstance(wallet, CATWallet)
return await wallet.convert_puzzle_hash(puzzle_hash)

View File

@ -162,7 +162,10 @@ def db_version(request) -> int:
return request.param
@pytest.fixture(scope="function", params=[1000000, 3886635, 4410000, 5496000])
SOFTFORK_HEIGHTS = [1000000, 3886635, 4410000, 5496000]
@pytest.fixture(scope="function", params=SOFTFORK_HEIGHTS)
def softfork_height(request) -> int:
return request.param
@ -688,11 +691,14 @@ async def daemon_connection_and_temp_keychain(
@pytest_asyncio.fixture(scope="function")
async def wallets_prefarm_services(two_wallet_nodes_services, self_hostname, trusted):
async def wallets_prefarm_services(two_wallet_nodes_services, self_hostname, trusted, request):
"""
Sets up the node with 10 blocks, and returns a payer and payee wallet.
"""
farm_blocks = 3
try:
farm_blocks = request.param
except AttributeError:
farm_blocks = 3
buffer = 1
full_nodes, wallets, bt = two_wallet_nodes_services
full_node_api = full_nodes[0]._api

File diff suppressed because one or more lines are too long

View File

@ -302,6 +302,10 @@ class TestDLWallet:
await time_out_assert(10, wallet_0.get_confirmed_balance, funds - 2000000000000)
await asyncio.sleep(0.5)
dl_coin_record = await dl_wallet.wallet_state_manager.coin_store.get_coin_record(new_record.coin_id)
assert dl_coin_record is not None
assert await dl_wallet.match_hinted_coin(dl_coin_record.coin, new_record.launcher_id)
previous_record = await dl_wallet.get_latest_singleton(launcher_id)
new_root = MerkleTree([Program.to("new root").get_tree_hash()]).calculate_root()

View File

@ -790,6 +790,12 @@ class TestDIDWallet:
assert metadata["Twitter"] == "Test"
assert metadata["GitHub"] == "测试"
# Test match_hinted_coin
assert await did_wallet_2.match_hinted_coin(
list(await did_wallet_2.select_coins(1))[0],
new_puzhash,
)
@pytest.mark.parametrize(
"trusted",
[True, False],

View File

@ -262,7 +262,7 @@ async def assert_get_balance(rpc_client: WalletRpcClient, wallet_node: WalletNod
expected_balance_dict["wallet_id"] = wallet.id()
expected_balance_dict["wallet_type"] = wallet.type()
expected_balance_dict["fingerprint"] = wallet_node.logged_in_fingerprint
if wallet.type() == WalletType.CAT:
if wallet.type() in {WalletType.CAT, WalletType.CRCAT}:
assert isinstance(wallet, CATWallet)
expected_balance_dict["asset_id"] = wallet.get_asset_id()
assert await rpc_client.get_wallet_balance(wallet.id()) == expected_balance_dict

View File

@ -127,6 +127,10 @@ class TestWalletSimulator:
assert await wallet.get_confirmed_balance() == expected_confirmed_balance
assert await wallet.get_unconfirmed_balance() == expected_confirmed_balance
# Test match_hinted_coin
selected_coin = list(await wallet.select_coins(uint64(0)))[0]
assert await wallet.match_hinted_coin(selected_coin, selected_coin.puzzle_hash)
@pytest.mark.parametrize(
"trusted",
[True, False],

View File

@ -0,0 +1,68 @@
from __future__ import annotations
from typing import List, Optional
from chia.types.blockchain_format.coin import Coin
from chia.types.blockchain_format.program import Program
from chia.types.blockchain_format.sized_bytes import bytes32
from chia.util.ints import uint64
from chia.wallet.outer_puzzles import (
construct_puzzle,
create_asset_id,
get_inner_puzzle,
get_inner_solution,
match_puzzle,
solve_puzzle,
)
from chia.wallet.puzzle_drivers import PuzzleInfo, Solver
from chia.wallet.uncurried_puzzle import uncurry_puzzle
from chia.wallet.vc_wallet.cr_cat_drivers import construct_cr_layer
def test_cat_outer_puzzle() -> None:
authorized_providers: List[bytes32] = [bytes32([0] * 32), bytes32([0] * 32)]
proofs_checker: Program = Program.to(None)
ACS: Program = Program.to(1)
cr_puzzle: Program = construct_cr_layer(authorized_providers, proofs_checker, ACS)
double_cr_puzzle: Program = construct_cr_layer(authorized_providers, proofs_checker, cr_puzzle)
uncurried_cr_puzzle = uncurry_puzzle(double_cr_puzzle)
cr_driver: Optional[PuzzleInfo] = match_puzzle(uncurried_cr_puzzle)
assert cr_driver is not None
assert cr_driver.type() == "credential restricted"
assert cr_driver["authorized_providers"] == authorized_providers
assert cr_driver["proofs_checker"] == proofs_checker
inside_cr_driver: Optional[PuzzleInfo] = cr_driver.also()
assert inside_cr_driver is not None
assert inside_cr_driver.type() == "credential restricted"
assert inside_cr_driver["authorized_providers"] == authorized_providers
assert inside_cr_driver["proofs_checker"] == proofs_checker
assert construct_puzzle(cr_driver, ACS) == double_cr_puzzle
assert get_inner_puzzle(cr_driver, uncurried_cr_puzzle) == ACS
assert create_asset_id(cr_driver) is None
# Set up for solve
coin: Coin = Coin(bytes32([0] * 32), bytes32([0] * 32), uint64(0))
coin_as_hex: str = "0x" + coin.parent_coin_info.hex() + coin.puzzle_hash.hex() + bytes(uint64(coin.amount)).hex()
inner_solution = Program.to([[51, ACS.get_tree_hash(), 100]])
solution: Program = solve_puzzle(
cr_driver,
Solver(
{
"coin": coin_as_hex,
"vc_authorizations": {
coin.name().hex(): [
"()",
"()",
"()",
"()",
"()",
],
},
},
),
ACS,
inner_solution,
)
assert get_inner_solution(cr_driver, solution) == inner_solution

View File

@ -1,5 +1,6 @@
from __future__ import annotations
import itertools
from typing import List, Optional, Tuple
import pytest
@ -394,6 +395,46 @@ async def test_viral_backdoor(cost_logger: CostLogger) -> None:
assert len(await client.get_coin_records_by_puzzle_hashes([wrapped_brick_hash], include_spent_coins=False)) > 0
@pytest.mark.asyncio
@pytest.mark.parametrize("num_proofs", range(1, 6))
async def test_proofs_checker(cost_logger: CostLogger, num_proofs: int) -> None:
async with sim_and_client() as (sim, client):
flags: List[str] = [str(i) for i in range(0, num_proofs)]
proofs_checker: ProofsChecker = ProofsChecker(flags)
# (mod (PROOFS_CHECKER proofs) (if (a PROOFS_CHECKER (list proofs)) () (x)))
proofs_checker_runner: Program = Program.fromhex(
"ff02ffff03ffff02ff02ffff04ff05ff808080ff80ffff01ff088080ff0180"
).curry(proofs_checker.as_program())
await sim.farm_block(proofs_checker_runner.get_tree_hash())
proof_checker_coin: Coin = (
await client.get_coin_records_by_puzzle_hashes(
[proofs_checker_runner.get_tree_hash()], include_spent_coins=False
)
)[0].coin
block_height: uint32 = sim.block_height
for i, proof_list in enumerate(itertools.permutations(flags, num_proofs)):
result: Tuple[MempoolInclusionStatus, Optional[Err]] = await client.push_tx(
cost_logger.add_cost(
f"Proofs Checker only - num_proofs: {num_proofs} - permutation: {i}",
SpendBundle(
[
CoinSpend(
proof_checker_coin,
proofs_checker_runner,
Program.to([[Program.to((flag, "1")) for flag in proof_list]]),
)
],
G2Element(),
),
)
)
assert result == (MempoolInclusionStatus.SUCCESS, None)
await sim.farm_block()
await sim.rewind(block_height)
@pytest.mark.asyncio
@pytest.mark.parametrize("test_syncing", [True, False])
async def test_vc_lifecycle(test_syncing: bool, cost_logger: CostLogger) -> None:
@ -490,8 +531,8 @@ async def test_vc_lifecycle(test_syncing: bool, cost_logger: CostLogger) -> None
assert len(await client.get_coin_records_by_puzzle_hashes([vc.coin.puzzle_hash], include_spent_coins=False)) > 0
# Update the proofs with a proper announcement
NEW_PROOFS: Program = Program.to((("test", True), ("test2", True)))
MALICIOUS_PROOFS: Program = Program.to(("malicious", True))
NEW_PROOFS: Program = Program.to((("test", "1"), ("test2", "1")))
MALICIOUS_PROOFS: Program = Program.to(("malicious", "1"))
NEW_PROOF_HASH: bytes32 = NEW_PROOFS.get_tree_hash()
expected_announcement, update_spend, vc = vc.do_spend(
ACS,
@ -629,6 +670,7 @@ async def test_vc_lifecycle(test_syncing: bool, cost_logger: CostLogger) -> None
[
(
cr_1 if error != "use_malicious_cats" else malicious_cr_1,
0,
ACS,
Program.to(
[
@ -643,6 +685,7 @@ async def test_vc_lifecycle(test_syncing: bool, cost_logger: CostLogger) -> None
),
(
cr_2 if error != "use_malicious_cats" else malicious_cr_2,
0,
ACS,
Program.to(
[
@ -680,7 +723,7 @@ async def test_vc_lifecycle(test_syncing: bool, cost_logger: CostLogger) -> None
if error not in ["use_malicious_cats", "attempt_honest_cat_piggyback"]
else malicious_cr_2.expected_announcement(),
],
*([61, a] for a in expected_announcements),
*([61, a.name()] for a in expected_announcements),
vc.standard_magic_condition(),
]
),
@ -702,7 +745,7 @@ async def test_vc_lifecycle(test_syncing: bool, cost_logger: CostLogger) -> None
assert result == (MempoolInclusionStatus.SUCCESS, None)
if test_syncing:
assert all(
CRCAT.is_cr_cat(uncurry_puzzle(spend.puzzle_reveal.to_program())) for spend in cr_cat_spends
CRCAT.is_cr_cat(uncurry_puzzle(spend.puzzle_reveal.to_program()))[0] for spend in cr_cat_spends
)
new_crcats = [crcat for spend in cr_cat_spends for crcat in CRCAT.get_next_from_coin_spend(spend)]
vc = VerifiedCredential.get_next_from_coin_spend(auth_spend)

View File

@ -1,19 +1,106 @@
from __future__ import annotations
from typing import Any, Optional
from typing import Any, Awaitable, Callable, List, Optional
import pytest
from blspy import G2Element
from typing_extensions import Literal
from chia.rpc.wallet_rpc_client import WalletRpcClient
from chia.simulator.full_node_simulator import FullNodeSimulator
from chia.simulator.time_out_assert import time_out_assert, time_out_assert_not_none
from chia.types.blockchain_format.coin import coin_as_list
from chia.types.blockchain_format.program import Program
from chia.types.blockchain_format.sized_bytes import bytes32
from chia.types.coin_spend import CoinSpend
from chia.types.peer_info import PeerInfo
from chia.util.ints import uint16, uint64
from chia.types.spend_bundle import SpendBundle
from chia.util.bech32m import encode_puzzle_hash
from chia.util.ints import uint16, uint32, uint64
from chia.wallet.cat_wallet.cat_utils import CAT_MOD, construct_cat_puzzle
from chia.wallet.cat_wallet.cat_wallet import CATWallet
from chia.wallet.did_wallet.did_wallet import DIDWallet
from chia.wallet.util.query_filter import TransactionTypeFilter
from chia.wallet.util.transaction_type import TransactionType
from chia.wallet.util.wallet_types import WalletType
from chia.wallet.vc_wallet.cr_cat_drivers import ProofsChecker, construct_cr_layer
from chia.wallet.vc_wallet.cr_cat_wallet import CRCATWallet
from chia.wallet.vc_wallet.vc_store import VCProofs, VCRecord
from chia.wallet.wallet import Wallet
from chia.wallet.wallet_node import WalletNode
async def mint_cr_cat(
num_blocks: int,
wallet_0: Wallet,
wallet_node_0: WalletNode,
client_0: WalletRpcClient,
full_node_api: FullNodeSimulator,
authorized_providers: List[bytes32] = [],
tail: Program = Program.to(None),
proofs_checker: ProofsChecker = ProofsChecker(["foo", "bar"]),
) -> None:
our_puzzle: Program = await wallet_0.get_new_puzzle()
cat_puzzle: Program = construct_cat_puzzle(
CAT_MOD,
tail.get_tree_hash(),
Program.to(1),
)
CAT_AMOUNT_0 = uint64(100)
await full_node_api.wait_for_wallet_synced(wallet_node=wallet_node_0, timeout=20)
tx = await client_0.create_signed_transaction(
[
{
"puzzle_hash": cat_puzzle.get_tree_hash(),
"amount": CAT_AMOUNT_0,
}
],
wallet_id=1,
)
spend_bundle = tx.spend_bundle
assert spend_bundle is not None
# Do the eve spend back to our wallet and add the CR layer
cat_coin = next(c for c in spend_bundle.additions() if c.amount == CAT_AMOUNT_0)
eve_spend = SpendBundle(
[
CoinSpend(
cat_coin,
cat_puzzle,
Program.to(
[
Program.to(
[
[
51,
construct_cr_layer(
authorized_providers,
proofs_checker.as_program(),
our_puzzle,
).get_tree_hash(),
CAT_AMOUNT_0,
[our_puzzle.get_tree_hash()],
],
[51, None, -113, tail, None],
[1, our_puzzle.get_tree_hash(), authorized_providers, proofs_checker.as_program()],
]
),
None,
cat_coin.name(),
coin_as_list(cat_coin),
[cat_coin.parent_coin_info, Program.to(1).get_tree_hash(), cat_coin.amount],
0,
0,
]
),
)
],
G2Element(),
)
spend_bundle = SpendBundle.aggregate([spend_bundle, eve_spend])
await client_0.push_tx(spend_bundle) # type: ignore [no-untyped-call]
await time_out_assert_not_none(5, full_node_api.full_node.mempool_manager.get_spendbundle, spend_bundle.name())
@pytest.mark.parametrize(
@ -31,7 +118,7 @@ async def test_vc_lifecycle(self_hostname: str, two_wallet_nodes_services: Any,
wallet_node_0 = wallet_service_0._node
wallet_node_1 = wallet_service_1._node
wallet_0 = wallet_node_0.wallet_state_manager.main_wallet
wallet_1 = wallet_node_1.wallet_state_manager.main_wallet # just to farm to for processing TXs
wallet_1 = wallet_node_1.wallet_state_manager.main_wallet
client_0 = await WalletRpcClient.create(
bt.config["self_hostname"],
@ -39,6 +126,14 @@ async def test_vc_lifecycle(self_hostname: str, two_wallet_nodes_services: Any,
wallet_service_0.root_path,
wallet_service_0.config,
)
client_1 = await WalletRpcClient.create(
bt.config["self_hostname"],
wallet_service_1.rpc_server.listen_port,
wallet_service_1.root_path,
wallet_service_1.config,
)
wallet_node_0.config["automatically_add_unknown_cats"] = True
wallet_node_1.config["automatically_add_unknown_cats"] = True
if trusted:
wallet_node_0.config["trusted_peers"] = {
@ -54,6 +149,7 @@ async def test_vc_lifecycle(self_hostname: str, two_wallet_nodes_services: Any,
await wallet_node_0.server.start_client(PeerInfo(self_hostname, uint16(full_node_server._port)), None)
await wallet_node_1.server.start_client(PeerInfo(self_hostname, uint16(full_node_server._port)), None)
await full_node_api.farm_blocks_to_wallet(count=num_blocks, wallet=wallet_0)
await full_node_api.wait_for_wallet_synced(wallet_node=wallet_node_0, timeout=20)
confirmed_balance: int = await wallet_0.get_confirmed_balance()
did_wallet: DIDWallet = await DIDWallet.create_new_did_wallet(
wallet_node_0.wallet_state_manager, wallet_0, uint64(1)
@ -80,9 +176,8 @@ async def test_vc_lifecycle(self_hostname: str, two_wallet_nodes_services: Any,
new_vc_record: Optional[VCRecord] = await client_0.vc_get(vc_record.vc.launcher_id)
assert new_vc_record is not None
assert did_wallet.did_info.current_inner is not None
# Spend VC
proofs: VCProofs = VCProofs({"foo": "bar", "baz": "qux", "corge": "grault"})
proofs: VCProofs = VCProofs({"foo": "1", "bar": "1", "baz": "1", "qux": "1", "grault": "1"})
proof_root: bytes32 = proofs.root()
txs = await client_0.vc_spend(
vc_record.vc.launcher_id,
@ -127,15 +222,209 @@ async def test_vc_lifecycle(self_hostname: str, two_wallet_nodes_services: Any,
assert len(vc_records) == 1
assert fetched_proofs[proof_root.hex()] == proofs.key_value_pairs
await mint_cr_cat(num_blocks, wallet_0, wallet_node_0, client_0, full_node_api, [did_id])
await full_node_api.farm_blocks_to_wallet(count=num_blocks, wallet=wallet_0)
await full_node_api.wait_for_wallet_synced(wallet_node=wallet_node_0, timeout=20)
confirmed_balance += 2_000_000_000_000 * num_blocks
confirmed_balance -= 100 # cat mint amount
# Send CR-CAT to another wallet
async def check_length(length: int, func: Callable[..., Awaitable[Any]], *args: Any) -> Optional[Literal[True]]:
if len(await func(*args)) == length:
return True
return None # pragma: no cover
await time_out_assert_not_none(
15, check_length, 1, wallet_node_0.wallet_state_manager.get_all_wallet_info_entries, WalletType.CRCAT
)
cr_cat_wallet_id_0: uint16 = (
await wallet_node_0.wallet_state_manager.get_all_wallet_info_entries(wallet_type=WalletType.CRCAT)
)[0].id
cr_cat_wallet_0: CRCATWallet = wallet_node_0.wallet_state_manager.wallets[cr_cat_wallet_id_0]
assert await wallet_node_0.wallet_state_manager.get_wallet_for_asset_id(cr_cat_wallet_0.get_asset_id()) is not None
wallet_1_addr = encode_puzzle_hash(await wallet_1.get_new_puzzlehash(), "txch")
tx = await client_0.cat_spend(
cr_cat_wallet_0.id(),
uint64(90),
wallet_1_addr,
uint64(2000000000),
memos=["hey"],
)
confirmed_balance -= 2000000000
await wallet_node_0.wallet_state_manager.add_pending_transaction(tx)
assert tx.spend_bundle is not None
spend_bundle = tx.spend_bundle
await time_out_assert_not_none(5, full_node_api.full_node.mempool_manager.get_spendbundle, spend_bundle.name())
await full_node_api.farm_blocks_to_wallet(count=num_blocks, wallet=wallet_1)
await full_node_api.wait_for_wallet_synced(wallet_node=wallet_node_0, timeout=20)
await full_node_api.wait_for_wallet_synced(wallet_node=wallet_node_1, timeout=20)
await time_out_assert(15, wallet_0.get_confirmed_balance, confirmed_balance)
# Check the other wallet recieved it
await time_out_assert_not_none(
15, check_length, 1, wallet_node_1.wallet_state_manager.get_all_wallet_info_entries, WalletType.CRCAT
)
cr_cat_wallet_info = (
await wallet_node_1.wallet_state_manager.get_all_wallet_info_entries(wallet_type=WalletType.CRCAT)
)[0]
cr_cat_wallet_id_1: uint16 = cr_cat_wallet_info.id
cr_cat_wallet_1: CRCATWallet = wallet_node_1.wallet_state_manager.wallets[cr_cat_wallet_id_1]
assert await CRCATWallet.create( # just testing the create method doesn't throw
wallet_node_1.wallet_state_manager,
wallet_node_1.wallet_state_manager.main_wallet,
cr_cat_wallet_info,
)
await time_out_assert(15, cr_cat_wallet_1.get_confirmed_balance, 0)
await time_out_assert(15, cr_cat_wallet_1.get_pending_approval_balance, 90)
await time_out_assert(15, cr_cat_wallet_1.get_unconfirmed_balance, 90)
assert await client_1.get_wallet_balance(cr_cat_wallet_id_1) == {
"confirmed_wallet_balance": 0,
"unconfirmed_wallet_balance": 0,
"spendable_balance": 0,
"pending_change": 0,
"max_send_amount": 0,
"unspent_coin_count": 0,
"pending_coin_removal_count": 0,
"pending_approval_balance": 90,
"wallet_id": cr_cat_wallet_id_1,
"wallet_type": cr_cat_wallet_1.type().value,
"asset_id": cr_cat_wallet_1.get_asset_id(),
"fingerprint": wallet_node_1.logged_in_fingerprint,
}
pending_tx = await client_1.get_transactions(
cr_cat_wallet_1.id(),
0,
1,
reverse=True,
type_filter=TransactionTypeFilter.include([TransactionType.INCOMING_CRCAT_PENDING]),
)
assert len(pending_tx) == 1
# Send the VC to wallet_1 to use for the CR-CATs
txs = await client_0.vc_spend(vc_record.vc.launcher_id, new_puzhash=await wallet_1.get_new_puzzlehash())
spend_bundle = next(tx.spend_bundle for tx in txs if tx.spend_bundle is not None)
await time_out_assert_not_none(5, full_node_api.full_node.mempool_manager.get_spendbundle, spend_bundle.name())
await full_node_api.farm_blocks_to_wallet(count=num_blocks, wallet=wallet_1)
await full_node_api.wait_for_wallet_synced(wallet_node=wallet_node_1, timeout=20)
vc_record_updated = await client_1.vc_get(vc_record.vc.launcher_id)
assert vc_record_updated is not None
await client_1.vc_add_proofs(proofs.key_value_pairs)
# Claim the pending approval to our wallet
txs = await client_1.crcat_approve_pending(
uint32(cr_cat_wallet_id_1),
uint64(90),
fee=uint64(90),
)
spend_bundle = next(tx.spend_bundle for tx in txs if tx.spend_bundle is not None)
await time_out_assert_not_none(5, full_node_api.full_node.mempool_manager.get_spendbundle, spend_bundle.name())
await full_node_api.farm_blocks_to_wallet(count=num_blocks, wallet=wallet_1)
await full_node_api.wait_for_wallet_synced(wallet_node=wallet_node_1, timeout=20)
await time_out_assert(15, cr_cat_wallet_1.get_confirmed_balance, 90)
await time_out_assert(15, cr_cat_wallet_1.get_pending_approval_balance, 0)
await time_out_assert(15, cr_cat_wallet_1.get_unconfirmed_balance, 90)
await time_out_assert(
15, cr_cat_wallet_1.wallet_state_manager.get_confirmed_balance_for_wallet, 90, cr_cat_wallet_id_1
)
await time_out_assert_not_none(
10, check_vc_record_has_parent_id, vc_record_updated.vc.coin.name(), client_1, vc_record.vc.launcher_id
)
vc_record_updated = await client_1.vc_get(vc_record.vc.launcher_id)
assert vc_record_updated is not None
# Test melting a CRCAT
tx = await client_1.cat_spend(
cr_cat_wallet_id_1,
uint64(20),
wallet_1_addr,
uint64(0),
cat_discrepancy=(-50, Program.to(None), Program.to(None)),
reuse_puzhash=True,
)
await wallet_node_1.wallet_state_manager.add_pending_transaction(tx)
assert tx.spend_bundle is not None
spend_bundle = tx.spend_bundle
await time_out_assert_not_none(5, full_node_api.full_node.mempool_manager.get_spendbundle, spend_bundle.name())
await full_node_api.farm_blocks_to_wallet(count=num_blocks, wallet=wallet_1)
await full_node_api.wait_for_wallet_synced(wallet_node=wallet_node_1, timeout=20)
# should go straight to confirmed because we sent to ourselves
await time_out_assert(15, cr_cat_wallet_1.get_confirmed_balance, 40)
await time_out_assert(15, cr_cat_wallet_1.get_pending_approval_balance, 0)
await time_out_assert(15, cr_cat_wallet_1.get_unconfirmed_balance, 40)
# Revoke VC
await time_out_assert_not_none(
10, check_vc_record_has_parent_id, vc_record_updated.vc.coin.name(), client_1, vc_record.vc.launcher_id
)
vc_record_updated = await client_1.vc_get(vc_record_updated.vc.launcher_id)
assert vc_record_updated is not None
txs = await client_0.vc_revoke(vc_record_updated.vc.coin.parent_coin_info, uint64(1))
confirmed_balance -= 1
spend_bundle = next(tx.spend_bundle for tx in txs if tx.spend_bundle is not None)
await time_out_assert_not_none(5, full_node_api.full_node.mempool_manager.get_spendbundle, spend_bundle.name())
await full_node_api.farm_blocks_to_wallet(count=num_blocks, wallet=wallet_1)
await time_out_assert(15, wallet_0.get_confirmed_balance, confirmed_balance)
vc_record_revoked: Optional[VCRecord] = await client_0.vc_get(vc_record.vc.launcher_id)
vc_record_revoked: Optional[VCRecord] = await client_1.vc_get(vc_record.vc.launcher_id)
assert vc_record_revoked is None
assert (
len(await (await wallet_node_0.wallet_state_manager.get_or_create_vc_wallet()).store.get_unconfirmed_vcs()) == 0
)
@pytest.mark.parametrize(
"trusted",
[True, False],
)
@pytest.mark.asyncio
async def test_cat_wallet_conversion(
self_hostname: str,
one_wallet_and_one_simulator_services: Any,
trusted: Any,
) -> None:
num_blocks = 1
full_nodes, wallets, bt = one_wallet_and_one_simulator_services
full_node_api: FullNodeSimulator = full_nodes[0]._api
full_node_server = full_node_api.full_node.server
wallet_service_0 = wallets[0]
wallet_node_0 = wallet_service_0._node
wallet_0 = wallet_node_0.wallet_state_manager.main_wallet
client_0 = await WalletRpcClient.create(
bt.config["self_hostname"],
wallet_service_0.rpc_server.listen_port,
wallet_service_0.root_path,
wallet_service_0.config,
)
if trusted:
wallet_node_0.config["trusted_peers"] = {
full_node_api.full_node.server.node_id.hex(): full_node_api.full_node.server.node_id.hex()
}
else:
wallet_node_0.config["trusted_peers"] = {}
await wallet_node_0.server.start_client(PeerInfo(self_hostname, uint16(full_node_server._port)), None)
await full_node_api.farm_blocks_to_wallet(count=num_blocks, wallet=wallet_0)
await full_node_api.wait_for_wallet_synced(wallet_node=wallet_node_0, timeout=20)
# Key point of test: create a normal CAT wallet first, and see if it gets converted to CR-CAT wallet
await CATWallet.get_or_create_wallet_for_cat(
wallet_node_0.wallet_state_manager, wallet_0, Program.to(None).get_tree_hash().hex()
)
did_id = bytes32([0] * 32)
await mint_cr_cat(num_blocks, wallet_0, wallet_node_0, client_0, full_node_api, [did_id])
await full_node_api.farm_blocks_to_wallet(count=num_blocks, wallet=wallet_0)
await full_node_api.wait_for_wallet_synced(wallet_node=wallet_node_0, timeout=20)
async def check_length(length: int, func: Callable[..., Awaitable[Any]], *args: Any) -> Optional[Literal[True]]:
if len(await func(*args)) == length:
return True
return None # pragma: no cover
await time_out_assert_not_none(
15, check_length, 1, wallet_node_0.wallet_state_manager.get_all_wallet_info_entries, WalletType.CRCAT
)
await time_out_assert_not_none(
15, check_length, 0, wallet_node_0.wallet_state_manager.get_all_wallet_info_entries, WalletType.CAT
)