support calling daemon commands from chia rpc CLI (#15584)

* add daemon as a callable service

* re-adding "get_routes" command

* added a test for get_routes

* linter fix

* reformatted with black

* addressed PR feedback
This commit is contained in:
Jeff 2023-06-28 16:38:53 -07:00 committed by GitHub
parent 908c789399
commit fa6f3f67d0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 63 additions and 4 deletions

View File

@ -12,10 +12,19 @@ from chia.util.config import load_config
from chia.util.default_root import DEFAULT_ROOT_PATH
from chia.util.ints import uint16
services: List[str] = ["crawler", "farmer", "full_node", "harvester", "timelord", "wallet", "data_layer"]
services: List[str] = ["crawler", "daemon", "farmer", "full_node", "harvester", "timelord", "wallet", "data_layer"]
async def call_endpoint(service: str, endpoint: str, request: Dict[str, Any], config: Dict[str, Any]) -> Dict[str, Any]:
if service == "daemon":
return await call_daemon_command(endpoint, request, config)
return await call_rpc_service_endpoint(service, endpoint, request, config)
async def call_rpc_service_endpoint(
service: str, endpoint: str, request: Dict[str, Any], config: Dict[str, Any]
) -> Dict[str, Any]:
from chia.rpc.rpc_client import RpcClient
port: uint16
@ -44,6 +53,26 @@ async def call_endpoint(service: str, endpoint: str, request: Dict[str, Any], co
return result
async def call_daemon_command(command: str, request: Dict[str, Any], config: Dict[str, Any]) -> Dict[str, Any]:
from chia.daemon.client import connect_to_daemon_and_validate
daemon = await connect_to_daemon_and_validate(DEFAULT_ROOT_PATH, config)
if daemon is None:
raise Exception("Failed to connect to chia daemon")
result: Dict[str, Any]
try:
ws_request = daemon.format_request(command, request)
ws_response = await daemon._get(ws_request)
result = ws_response["data"]
except Exception as e:
raise Exception(f"Request failed: {e}")
finally:
await daemon.close()
return result
def print_result(json_dict: Dict[str, Any]) -> None:
print(json.dumps(json_dict, indent=4, sort_keys=True))
@ -64,7 +93,7 @@ def endpoints_cmd(service: str) -> None:
try:
routes = get_routes(service, config)
for route in routes["routes"]:
print(route[1:])
print(route.lstrip("/"))
except Exception as e:
print(e)

View File

@ -356,7 +356,7 @@ class WebSocketServer:
elif command == "ping":
response = await ping()
else:
command_mapping = await self.get_command_mapping()
command_mapping = self.get_command_mapping()
if command in command_mapping:
response = await command_mapping[command](websocket=websocket, request=data)
else:
@ -366,7 +366,7 @@ class WebSocketServer:
full_response = format_response(message, response)
return full_response, {websocket}
async def get_command_mapping(self) -> Dict[str, Command]:
def get_command_mapping(self) -> Dict[str, Command]:
"""
Returns a mapping of commands to their respective function calls.
"""
@ -388,6 +388,7 @@ class WebSocketServer:
"get_status": self.get_status,
"get_version": self.get_version,
"get_plotters": self.get_plotters,
"get_routes": self.get_routes,
}
async def is_keyring_locked(self, websocket: WebSocketResponse, request: Dict[str, Any]) -> Dict[str, Any]:
@ -555,6 +556,11 @@ class WebSocketServer:
response: Dict[str, Any] = {"success": True, "plotters": plotters}
return response
async def get_routes(self, websocket: WebSocketResponse, request: Dict[str, Any]) -> Dict[str, Any]:
routes = list(self.get_command_mapping().keys())
response: Dict[str, Any] = {"success": True, "routes": routes}
return response
async def _keyring_status_changed(self, keyring_status: Dict[str, Any], destination: str):
"""
Attempt to communicate with the GUI to inform it of any keyring status changes

View File

@ -8,6 +8,7 @@ from typing import Any, Dict, List, Optional, Tuple, Type, Union, cast
import aiohttp
import pytest
from aiohttp.web_ws import WebSocketResponse
from chia.daemon.keychain_server import (
DeleteLabelRequest,
@ -59,6 +60,14 @@ class Daemon:
services: Dict[str, Union[List[Service], Service]]
connections: Dict[str, Optional[List[Any]]]
def get_command_mapping(self) -> Dict[str, Any]:
return {
"get_routes": None,
"example_one": None,
"example_two": None,
"example_three": None,
}
def is_service_running(self, service_name: str) -> bool:
return WebSocketServer.is_service_running(cast(WebSocketServer, self), service_name)
@ -68,6 +77,11 @@ class Daemon:
async def is_running(self, request: Dict[str, Any]) -> Dict[str, Any]:
return await WebSocketServer.is_running(cast(WebSocketServer, self), request)
async def get_routes(self, request: Dict[str, Any]) -> Dict[str, Any]:
return await WebSocketServer.get_routes(
cast(WebSocketServer, self), websocket=WebSocketResponse(), request=request
)
test_key_data = KeyData.from_mnemonic(
"grief lock ketchup video day owner torch young work "
@ -414,6 +428,16 @@ async def test_running_services_with_services_and_connections(mock_daemon_with_s
)
@pytest.mark.asyncio
async def test_get_routes(mock_lonely_daemon):
daemon = mock_lonely_daemon
response = await daemon.get_routes({})
assert response == {
"success": True,
"routes": ["get_routes", "example_one", "example_two", "example_three"],
}
@pytest.mark.asyncio
@pytest.mark.parametrize(
"service_request, expected_result, expected_exception",