Decouple atari server. (#13065)
* Checkpoint decouple atari server. * Add click cli. * Lint. * Remove run_server. * Updates to DL HTTP service * Lint - config parameter optional with default * changes based on comments Co-authored-by: Earle Lowe <e.lowe@chia.net>
This commit is contained in:
parent
c59bf12f8e
commit
1fdacc75a7
6 changed files with 98 additions and 18 deletions
|
@ -84,6 +84,7 @@ if getattr(sys, "frozen", False):
|
|||
name_map = {
|
||||
"chia": "chia",
|
||||
"chia_data_layer": "start_data_layer",
|
||||
"chia_data_layer_http": "start_data_layer_http",
|
||||
"chia_wallet": "start_wallet",
|
||||
"chia_full_node": "start_full_node",
|
||||
"chia_harvester": "start_harvester",
|
||||
|
@ -1240,8 +1241,6 @@ def launch_service(root_path: Path, service_command) -> Tuple[subprocess.Popen,
|
|||
# we need to pass on the possibly altered CHIA_ROOT
|
||||
os.environ["CHIA_ROOT"] = str(root_path)
|
||||
|
||||
log.debug(f"Launching service with CHIA_ROOT: {os.environ['CHIA_ROOT']}")
|
||||
|
||||
# Insert proper e
|
||||
service_array = service_command.split()
|
||||
service_executable = executable_for_service(service_array[0])
|
||||
|
@ -1252,6 +1251,8 @@ def launch_service(root_path: Path, service_command) -> Tuple[subprocess.Popen,
|
|||
startupinfo = subprocess.STARTUPINFO()
|
||||
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
|
||||
|
||||
log.debug(f"Launching service {service_array} with CHIA_ROOT: {os.environ['CHIA_ROOT']}")
|
||||
|
||||
# CREATE_NEW_PROCESS_GROUP allows graceful shutdown on windows, by CTRL_BREAK_EVENT signal
|
||||
if sys.platform == "win32" or sys.platform == "cygwin":
|
||||
creationflags = subprocess.CREATE_NEW_PROCESS_GROUP
|
||||
|
@ -1261,6 +1262,7 @@ def launch_service(root_path: Path, service_command) -> Tuple[subprocess.Popen,
|
|||
process = subprocess.Popen(
|
||||
service_array, shell=False, startupinfo=startupinfo, creationflags=creationflags, env=environ_copy
|
||||
)
|
||||
|
||||
pid_path = pid_path_for_service(root_path, service_command)
|
||||
try:
|
||||
pid_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
|
|
@ -10,7 +10,6 @@ import aiohttp
|
|||
import aiosqlite
|
||||
|
||||
from chia.data_layer.data_layer_errors import KeyNotFoundError
|
||||
from chia.data_layer.data_layer_server import DataLayerServer
|
||||
from chia.data_layer.data_layer_util import (
|
||||
DiffData,
|
||||
InternalNode,
|
||||
|
@ -45,7 +44,6 @@ from chia.wallet.transaction_record import TransactionRecord
|
|||
|
||||
class DataLayer:
|
||||
data_store: DataStore
|
||||
data_layer_server: DataLayerServer
|
||||
db_wrapper: DBWrapper
|
||||
batch_update_db_wrapper: DBWrapper
|
||||
db_path: Path
|
||||
|
@ -84,7 +82,6 @@ class DataLayer:
|
|||
).replace("CHALLENGE", config["selected_network"])
|
||||
self.server_files_location = path_from_root(root_path, server_files_replaced)
|
||||
self.server_files_location.mkdir(parents=True, exist_ok=True)
|
||||
self.data_layer_server = DataLayerServer(root_path, self.config, self.log)
|
||||
self.none_bytes = bytes32([0] * 32)
|
||||
self.lock = asyncio.Lock()
|
||||
|
||||
|
@ -100,8 +97,6 @@ class DataLayer:
|
|||
self.data_store = await DataStore.create(self.db_wrapper)
|
||||
self.wallet_rpc = await self.wallet_rpc_init
|
||||
self.subscription_lock: asyncio.Lock = asyncio.Lock()
|
||||
if self.config.get("run_server", False):
|
||||
await self.data_layer_server.start()
|
||||
|
||||
self.periodically_manage_data_task: asyncio.Task[Any] = asyncio.create_task(self.periodically_manage_data())
|
||||
return True
|
||||
|
@ -113,8 +108,6 @@ class DataLayer:
|
|||
async def _await_closed(self) -> None:
|
||||
if self.connection is not None:
|
||||
await self.connection.close()
|
||||
if self.config.get("run_server", False):
|
||||
await self.data_layer_server.stop()
|
||||
try:
|
||||
self.periodically_manage_data_task.cancel()
|
||||
except asyncio.CancelledError:
|
||||
|
|
|
@ -1,13 +1,31 @@
|
|||
import asyncio
|
||||
import functools
|
||||
import logging
|
||||
import signal
|
||||
import sys
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict
|
||||
|
||||
import click
|
||||
from aiohttp import web
|
||||
|
||||
from chia.data_layer.download_data import is_filename_valid
|
||||
from chia.server.upnp import UPnP
|
||||
from chia.util.chia_logging import initialize_logging
|
||||
from chia.util.config import load_config
|
||||
from chia.util.default_root import DEFAULT_ROOT_PATH
|
||||
from chia.util.path import path_from_root
|
||||
from chia.util.setproctitle import setproctitle
|
||||
|
||||
# from chia.cmds.chia import monkey_patch_click
|
||||
|
||||
|
||||
# See: https://bugs.python.org/issue29288
|
||||
"".encode("idna")
|
||||
|
||||
SERVICE_NAME = "data_layer"
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass
|
||||
|
@ -15,9 +33,29 @@ class DataLayerServer:
|
|||
root_path: Path
|
||||
config: Dict[str, Any]
|
||||
log: logging.Logger
|
||||
shutdown_event: asyncio.Event
|
||||
|
||||
async def start(self) -> None:
|
||||
self.log.info("Starting Data Layer Server.")
|
||||
|
||||
if sys.platform == "win32" or sys.platform == "cygwin":
|
||||
# pylint: disable=E1101
|
||||
signal.signal(signal.SIGBREAK, self._accept_signal)
|
||||
signal.signal(signal.SIGINT, self._accept_signal)
|
||||
signal.signal(signal.SIGTERM, self._accept_signal)
|
||||
else:
|
||||
loop = asyncio.get_running_loop()
|
||||
loop.add_signal_handler(
|
||||
signal.SIGINT,
|
||||
functools.partial(self._accept_signal, signal_number=signal.SIGINT),
|
||||
)
|
||||
loop.add_signal_handler(
|
||||
signal.SIGTERM,
|
||||
functools.partial(self._accept_signal, signal_number=signal.SIGTERM),
|
||||
)
|
||||
|
||||
self.log.info("Starting Data Layer HTTP Server.")
|
||||
|
||||
self.host_ip = self.config["host_ip"]
|
||||
self.port = self.config["host_port"]
|
||||
|
||||
# Setup UPnP for the data_layer_service port
|
||||
|
@ -33,17 +71,23 @@ class DataLayerServer:
|
|||
app.add_routes([web.get("/{filename}", self.file_handler)])
|
||||
self.runner = web.AppRunner(app)
|
||||
await self.runner.setup()
|
||||
self.site = web.TCPSite(self.runner, self.config["host_ip"], port=self.port)
|
||||
self.site = web.TCPSite(self.runner, self.host_ip, port=self.port)
|
||||
await self.site.start()
|
||||
self.log.info("Started Data Layer Server.")
|
||||
self.log.info("Started Data Layer HTTP Server.")
|
||||
await self.shutdown_event.wait()
|
||||
|
||||
async def stop(self) -> None:
|
||||
def stop(self) -> None:
|
||||
self.shutdown_event.set()
|
||||
self.upnp.release(self.port)
|
||||
# UPnP.shutdown() is a blocking call, waiting for the UPnP thread to exit
|
||||
self.upnp.shutdown()
|
||||
|
||||
self.log.info("Stopped Data Layer Server.")
|
||||
await self.runner.cleanup()
|
||||
async def close_runner() -> None:
|
||||
await self.runner.cleanup()
|
||||
|
||||
asyncio.create_task(close_runner())
|
||||
|
||||
self.log.info("Stopped Data Layer HTTP Server.")
|
||||
|
||||
async def file_handler(self, request: web.Request) -> web.Response:
|
||||
filename = request.match_info["filename"]
|
||||
|
@ -58,3 +102,43 @@ class DataLayerServer:
|
|||
body=content,
|
||||
)
|
||||
return response
|
||||
|
||||
def _accept_signal(self, signal_number: int, stack_frame: Any = None) -> None:
|
||||
self.log.info("Got SIGINT or SIGTERM signal - stopping")
|
||||
|
||||
self.stop()
|
||||
|
||||
|
||||
async def async_start(root_path: Path) -> int:
|
||||
|
||||
shutdown_event = asyncio.Event()
|
||||
|
||||
dl_config = load_config(root_path=root_path, filename="config.yaml", sub_config=SERVICE_NAME)
|
||||
setproctitle("data_layer_http")
|
||||
initialize_logging(
|
||||
service_name="data_layer_http",
|
||||
logging_config=dl_config["logging"],
|
||||
root_path=root_path,
|
||||
)
|
||||
|
||||
data_layer_server = DataLayerServer(root_path, dl_config, log, shutdown_event)
|
||||
await data_layer_server.start()
|
||||
|
||||
return 0
|
||||
|
||||
|
||||
@click.command()
|
||||
@click.option(
|
||||
"-r",
|
||||
"--root-path",
|
||||
type=click.Path(exists=True, writable=True, file_okay=False),
|
||||
default=DEFAULT_ROOT_PATH,
|
||||
show_default=True,
|
||||
help="Config file root",
|
||||
)
|
||||
def main(root_path: str = str(DEFAULT_ROOT_PATH)) -> int:
|
||||
return asyncio.run(async_start(Path(root_path)))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
|
|
|
@ -561,8 +561,6 @@ data_layer:
|
|||
# Data for running a data layer server.
|
||||
host_ip: 0.0.0.0
|
||||
host_port: 8575
|
||||
# Switch this to True if we want to run the server.
|
||||
run_server: True
|
||||
# Data for running a data layer client.
|
||||
manage_data_interval: 60
|
||||
selected_network: *selected_network
|
||||
|
|
|
@ -2,10 +2,12 @@ from typing import KeysView, Generator
|
|||
|
||||
SERVICES_FOR_GROUP = {
|
||||
"all": (
|
||||
"chia_harvester chia_timelord_launcher chia_timelord chia_farmer chia_full_node chia_wallet chia_data_layer"
|
||||
"chia_harvester chia_timelord_launcher chia_timelord chia_farmer "
|
||||
"chia_full_node chia_wallet chia_data_layer chia_data_layer_http"
|
||||
).split(),
|
||||
# TODO: should this be `data_layer`?
|
||||
"data": "chia_wallet chia_data_layer".split(),
|
||||
"data_layer_http": "chia_data_layer_http".split(),
|
||||
"node": "chia_full_node".split(),
|
||||
"harvester": "chia_harvester".split(),
|
||||
"farmer": "chia_harvester chia_farmer chia_full_node chia_wallet".split(),
|
||||
|
|
1
setup.py
1
setup.py
|
@ -134,6 +134,7 @@ kwargs = dict(
|
|||
"chia_timelord_launcher = chia.timelord.timelord_launcher:main",
|
||||
"chia_full_node_simulator = chia.simulator.start_simulator:main",
|
||||
"chia_data_layer = chia.server.start_data_layer:main",
|
||||
"chia_data_layer_http = chia.data_layer.data_layer_server:main",
|
||||
]
|
||||
},
|
||||
package_data={
|
||||
|
|
Loading…
Reference in a new issue