Nft bulk transfer (#14329)

* Add NFT bulk transfer API

* Fix pre-commit

* Improve unit test

* Fix precommit

* Improve unit test

* Fix response json serialization
This commit is contained in:
Kronus91 2023-01-19 11:21:04 -08:00 committed by GitHub
parent b4c7464af1
commit 35b6738c49
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 249 additions and 4 deletions

View file

@ -187,7 +187,8 @@ class WalletRpcApi:
"/nft_add_uri": self.nft_add_uri,
"/nft_calculate_royalties": self.nft_calculate_royalties,
"/nft_mint_bulk": self.nft_mint_bulk,
"/nft_set_did_bulk": self.nft_set_bulk_nft_did,
"/nft_set_did_bulk": self.nft_set_did_bulk,
"/nft_transfer_bulk": self.nft_transfer_bulk,
# Pool Wallet
"/pw_join_pool": self.pw_join_pool,
"/pw_self_pool": self.pw_self_pool,
@ -2198,7 +2199,7 @@ class WalletRpcApi:
spend_bundle = await nft_wallet.set_nft_did(nft_coin_info, did_id, fee=fee)
return {"wallet_id": wallet_id, "success": True, "spend_bundle": spend_bundle}
async def nft_set_bulk_nft_did(self, request):
async def nft_set_did_bulk(self, request):
"""
Bulk set DID for NFTs across different wallets.
accepted `request` dict keys:
@ -2273,10 +2274,83 @@ class WalletRpcApi:
await nft_wallet.update_coin_status(coin, True)
for wallet_id in nft_dict.keys():
self.service.wallet_state_manager.state_changed("nft_coin_did_set", wallet_id)
return {"wallet_id": nft_dict.keys(), "success": True, "spend_bundle": spend_bundle}
return {"wallet_id": list(nft_dict.keys()), "success": True, "spend_bundle": spend_bundle}
else:
raise ValueError("Couldn't set DID on given NFT")
async def nft_transfer_bulk(self, request):
"""
Bulk transfer NFTs to an address.
accepted `request` dict keys:
- required `nft_coin_list`: [{"nft_coin_id": COIN_ID/NFT_ID, "wallet_id": WALLET_ID},....]
- required `target_address`, Transfer NFTs to this address
- optional `fee`, in mojos, defaults to 0
:param request:
:return:
"""
if len(request["nft_coin_list"]) > MAX_NFT_CHUNK_SIZE:
return {"success": False, "error": f"You can only transfer {MAX_NFT_CHUNK_SIZE} NFTs at once"}
address = request["target_address"]
if isinstance(address, str):
puzzle_hash = decode_puzzle_hash(address)
else:
return dict(success=False, error="target_address parameter missing")
nft_dict: Dict[uint32, List[NFTCoinInfo]] = {}
tx_list: List[TransactionRecord] = []
coin_ids = []
fee = uint64(request.get("fee", 0))
for nft_coin in request["nft_coin_list"]:
if "nft_coin_id" not in nft_coin or "wallet_id" not in nft_coin:
log.error(f"Cannot transfer NFT :{nft_coin}, missing nft_coin_id or wallet_id.")
continue
wallet_id = uint32(nft_coin["wallet_id"])
nft_wallet = self.service.wallet_state_manager.wallets[wallet_id]
assert isinstance(nft_wallet, NFTWallet)
nft_coin_id = nft_coin["nft_coin_id"]
if nft_coin_id.startswith(AddressType.NFT.hrp(self.service.config)):
nft_id = decode_puzzle_hash(nft_coin_id)
nft_coin_info = await nft_wallet.get_nft(nft_id)
else:
nft_coin_id = bytes32.from_hexstr(nft_coin_id)
nft_coin_info = await nft_wallet.get_nft_coin_by_id(nft_coin_id)
assert nft_coin_info is not None
if wallet_id in nft_dict:
nft_dict[wallet_id].append(nft_coin_info)
else:
nft_dict[wallet_id] = [nft_coin_info]
first = True
nft_wallet = None
for wallet_id, nft_list in nft_dict.items():
nft_wallet = self.service.wallet_state_manager.wallets[wallet_id]
assert isinstance(nft_wallet, NFTWallet)
if not first:
tx_list.extend(await nft_wallet.bulk_transfer_nft(nft_list, puzzle_hash))
else:
tx_list.extend(await nft_wallet.bulk_transfer_nft(nft_list, puzzle_hash, fee))
for coin in nft_list:
coin_ids.append(coin.coin.name())
first = False
spend_bundles: List[SpendBundle] = []
refined_tx_list: List[TransactionRecord] = []
for tx in tx_list:
if tx.spend_bundle is not None:
spend_bundles.append(tx.spend_bundle)
refined_tx_list.append(dataclasses.replace(tx, spend_bundle=None))
if len(spend_bundles) > 0:
spend_bundle = SpendBundle.aggregate(spend_bundles)
# Add all spend bundles to the first tx
refined_tx_list[0] = dataclasses.replace(refined_tx_list[0], spend_bundle=spend_bundle)
for tx in refined_tx_list:
await self.service.wallet_state_manager.add_pending_transaction(tx)
for coin in coin_ids:
await nft_wallet.update_coin_status(coin, True)
for wallet_id in nft_dict.keys():
self.service.wallet_state_manager.state_changed("nft_coin_did_set", wallet_id)
return {"wallet_id": list(nft_dict.keys()), "success": True, "spend_bundle": spend_bundle}
else:
raise ValueError("Couldn't transfer given NFTs")
async def nft_get_by_did(self, request) -> EndpointResult:
did_id: Optional[bytes32] = None
if "did_id" in request:

View file

@ -1100,6 +1100,43 @@ class NFTWallet:
refined_tx_list[0] = dataclasses.replace(refined_tx_list[0], spend_bundle=spend_bundle)
return refined_tx_list
async def bulk_transfer_nft(
self,
nft_list: List[NFTCoinInfo],
puzzle_hash: bytes32,
fee: uint64 = uint64(0),
) -> List[TransactionRecord]:
self.log.debug("Transfer NFTs %s to %s", nft_list, puzzle_hash.hex())
nft_tx_record = []
spend_bundles = []
first = True
for nft_coin_info in nft_list:
if not first:
fee = uint64(0)
nft_tx_record.extend(
await self.generate_signed_transaction(
[uint64(nft_coin_info.coin.amount)],
[puzzle_hash],
coins={nft_coin_info.coin},
fee=fee,
new_owner=b"",
new_did_inner_hash=b"",
)
)
first = False
refined_tx_list: List[TransactionRecord] = []
for tx in nft_tx_record:
if tx.spend_bundle is not None:
spend_bundles.append(tx.spend_bundle)
refined_tx_list.append(dataclasses.replace(tx, spend_bundle=None))
if len(spend_bundles) > 0:
spend_bundle = SpendBundle.aggregate(spend_bundles)
# Add all spend bundles to the first tx
refined_tx_list[0] = dataclasses.replace(refined_tx_list[0], spend_bundle=spend_bundle)
return refined_tx_list
async def set_nft_did(self, nft_coin_info: NFTCoinInfo, did_id: bytes, fee: uint64 = uint64(0)) -> SpendBundle:
self.log.debug("Setting NFT DID with parameters: nft=%s did=%s", nft_coin_info, did_id)
unft = UncurriedNFT.uncurry(*nft_coin_info.full_puzzle.uncurry())

View file

@ -1281,8 +1281,10 @@ async def test_nft_bulk_set_did(self_hostname: str, two_wallet_nodes: Any, trust
nft_coin_list = [
{"wallet_id": nft_wallet_0_id, "nft_coin_id": nft1.nft_coin_id.hex()},
{"wallet_id": nft_wallet_1_id, "nft_coin_id": nft2.nft_coin_id.hex()},
{"wallet_id": nft_wallet_1_id},
{"nft_coin_id": nft2.nft_coin_id.hex()},
]
resp = await api_0.nft_set_bulk_nft_did(dict(did_id=hmr_did_id, nft_coin_list=nft_coin_list))
resp = await api_0.nft_set_did_bulk(dict(did_id=hmr_did_id, nft_coin_list=nft_coin_list))
coins_response = await wait_rpc_state_condition(
30, api_0.nft_get_nfts, [{"wallet_id": nft_wallet_0_id}], lambda x: len(x["nft_list"]) == 1
)
@ -1308,6 +1310,138 @@ async def test_nft_bulk_set_did(self_hostname: str, two_wallet_nodes: Any, trust
assert coins[1].owner_did.hex() == hex_did_id
@pytest.mark.parametrize(
"trusted",
[True, False],
)
@pytest.mark.asyncio
async def test_nft_bulk_transfer(two_wallet_nodes: Any, trusted: Any) -> None:
num_blocks = 2
full_nodes, wallets, _ = two_wallet_nodes
full_node_api: FullNodeSimulator = full_nodes[0]
full_node_server = full_node_api.server
wallet_node_0, server_0 = wallets[0]
wallet_node_1, server_1 = wallets[1]
wallet_0 = wallet_node_0.wallet_state_manager.main_wallet
wallet_1 = wallet_node_1.wallet_state_manager.main_wallet
api_0 = WalletRpcApi(wallet_node_0)
api_1 = WalletRpcApi(wallet_node_1)
ph = await wallet_0.get_new_puzzlehash()
ph1 = await wallet_1.get_new_puzzlehash()
address = encode_puzzle_hash(ph1, AddressType.XCH.hrp(wallet_node_1.config))
if trusted:
wallet_node_0.config["trusted_peers"] = {
full_node_api.full_node.server.node_id.hex(): full_node_api.full_node.server.node_id.hex()
}
wallet_node_1.config["trusted_peers"] = {
full_node_api.full_node.server.node_id.hex(): full_node_api.full_node.server.node_id.hex()
}
else:
wallet_node_0.config["trusted_peers"] = {}
wallet_node_1.config["trusted_peers"] = {}
await server_0.start_client(PeerInfo("localhost", uint16(full_node_server._port)), None)
await server_1.start_client(PeerInfo("localhost", uint16(full_node_server._port)), None)
for _ in range(1, num_blocks + 1):
await full_node_api.farm_new_transaction_block(FarmNewBlockProtocol(ph))
funds = sum(
[calculate_pool_reward(uint32(i)) + calculate_base_farmer_reward(uint32(i)) for i in range(1, num_blocks)]
)
await time_out_assert(30, wallet_0.get_unconfirmed_balance, funds)
await time_out_assert(30, wallet_0.get_confirmed_balance, funds)
did_wallet: DIDWallet = await DIDWallet.create_new_did_wallet(
wallet_node_0.wallet_state_manager, wallet_0, uint64(1)
)
spend_bundle_list = await wallet_node_0.wallet_state_manager.tx_store.get_unconfirmed_for_wallet(did_wallet.id())
spend_bundle = spend_bundle_list[0].spend_bundle
await time_out_assert_not_none(30, full_node_api.full_node.mempool_manager.get_spendbundle, spend_bundle.name())
for _ in range(1, num_blocks):
await full_node_api.farm_new_transaction_block(FarmNewBlockProtocol(ph))
await time_out_assert(30, wallet_0.get_pending_change_balance, 0)
await time_out_assert(30, wallet_0.get_confirmed_balance, 3999999999999)
hex_did_id = did_wallet.get_my_DID()
hmr_did_id = encode_puzzle_hash(bytes32.from_hexstr(hex_did_id), AddressType.DID.hrp(wallet_node_0.config))
await time_out_assert(30, wallet_is_synced, True, wallet_node_0, full_node_api)
res = await api_0.create_new_wallet(dict(wallet_type="nft_wallet", name="NFT WALLET 1", did_id=hmr_did_id))
assert isinstance(res, dict)
assert res.get("success")
nft_wallet_0_id = res["wallet_id"]
res = await api_0.create_new_wallet(dict(wallet_type="nft_wallet", name="NFT WALLET 2"))
assert isinstance(res, dict)
assert res.get("success")
nft_wallet_1_id = res["wallet_id"]
await time_out_assert(30, did_wallet.get_confirmed_balance, 1)
# Create a NFT with DID
resp = await api_0.nft_mint_nft(
{
"wallet_id": nft_wallet_0_id,
"hash": "0xD4584AD463139FA8C0D9F68F4B59F185",
"uris": ["https://www.chia.net/img/branding/chia-logo.svg"],
"mu": ["https://www.chia.net/img/branding/chia-logo.svg"],
"did_id": hmr_did_id,
}
)
sb = await make_new_block_with(resp, full_node_api, ph)
# ensure hints are generated
assert compute_memos(sb)
await wait_rpc_state_condition(
30, api_0.nft_get_nfts, [{"wallet_id": nft_wallet_0_id}], lambda x: len(x["nft_list"]) > 0
)
resp = await api_0.nft_mint_nft(
{
"wallet_id": nft_wallet_1_id,
"hash": "0xD4584AD463139FA8C0D9F68F4B59F186",
"uris": ["https://www.chia.net/img/branding/chia-logo.svg"],
"mu": ["https://www.chia.net/img/branding/chia-logo.svg"],
"did_id": "",
}
)
sb = await make_new_block_with(resp, full_node_api, ph)
# ensure hints are generated
assert compute_memos(sb)
# Check DID NFT
coins_response = await wait_rpc_state_condition(
30, api_0.nft_get_nfts, [{"wallet_id": nft_wallet_0_id}], lambda x: len(x["nft_list"]) == 1
)
coins = coins_response["nft_list"]
nft1 = coins[0]
assert len(coins) == 1
assert coins[0].owner_did is not None
coins_response = await wait_rpc_state_condition(
30, api_0.nft_get_nfts, [{"wallet_id": nft_wallet_1_id}], lambda x: len(x["nft_list"]) == 1
)
coins = coins_response["nft_list"]
nft2 = coins[0]
assert len(coins) == 1
assert coins[0].owner_did is None
nft_coin_list = [
{"wallet_id": nft_wallet_0_id, "nft_coin_id": nft1.nft_coin_id.hex()},
{"wallet_id": nft_wallet_1_id, "nft_coin_id": nft2.nft_coin_id.hex()},
{"wallet_id": nft_wallet_1_id},
{"nft_coin_id": nft2.nft_coin_id.hex()},
]
resp = await api_0.nft_transfer_bulk(dict(target_address=address, nft_coin_list=nft_coin_list))
sb = await make_new_block_with(resp, full_node_api, ph)
# ensure hints are generated
assert compute_memos(sb)
await time_out_assert(30, get_wallet_number, 2, wallet_node_1.wallet_state_manager)
coins_response = await wait_rpc_state_condition(
30, api_1.nft_get_nfts, [{"wallet_id": 2}], lambda x: len(x["nft_list"]) == 2
)
coins = coins_response["nft_list"]
assert coins[0].launcher_id == nft1.launcher_id
assert coins[1].launcher_id == nft2.launcher_id
assert coins[0].owner_did is None
assert coins[1].owner_did is None
@pytest.mark.parametrize(
"trusted",
[True, False],