Make peer.Peer.pull thread-safe for real this time using queue.Queue

This commit is contained in:
Nguyễn Gia Phong 2019-09-20 19:55:31 +07:00
parent 17ff7d02dd
commit 84f829628b
4 changed files with 46 additions and 46 deletions

View File

@ -6,11 +6,12 @@ Mininalist peer-to-peer first-person shooter
## Goals
* To be written in pure Python and thus be portable
* Written in pure Python and thus be portable
* Easy to read codebase as well as easy on resources
* Logically generate visuals
* Be functional when possible
* Generative visuals
* Functional when possible
* P2P communication based on calculated *trust*
* Modularized for the ease of bot scripting
## Installation

View File

@ -16,7 +16,6 @@
# You should have received a copy of the GNU Affero General Public License
# along with Axuy. If not, see <https://www.gnu.org/licenses/>.
from functools import wraps
from itertools import (chain, combinations_with_replacement,
permutations, product)
from random import choices, shuffle
@ -41,14 +40,6 @@ def color(code, value):
return COLORS[code] * (value + 1) * 0.5
def forever(func):
"""Return a function that calls func forever."""
@wraps(func)
def loop(*args, **kwargs):
while True: func(*args, **kwargs)
return loop
def mapidgen(replacement=False):
"""Return a randomly generated map ID."""
mapid = list(range(48))

View File

@ -16,15 +16,16 @@
# You should have received a copy of the GNU Affero General Public License
# along with Axuy. If not, see <https://www.gnu.org/licenses/>.
__version__ = '0.0.5'
__version__ = '0.0.6'
__doc__ = 'Axuy main loop'
from argparse import ArgumentParser, RawTextHelpFormatter
from pickle import dumps, loads
from queue import Empty, Queue
from socket import socket, SOCK_DGRAM, SOL_SOCKET, SO_REUSEADDR
from threading import Event, Thread
from threading import Thread
from .misc import forever, mapgen, mapidgen
from .misc import mapgen, mapidgen
from .pico import Picobot
from .view import ConfigReader, View
@ -39,7 +40,7 @@ class Peer:
self.sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
self.sock.bind((config.host, config.port))
self.addr = self.sock.getsockname()
self.updated = Event()
self.q = Queue()
if config.seeder is None:
mapid, self.peers = mapidgen(), []
@ -53,11 +54,15 @@ class Peer:
self.view = View(self.addr, self.pico, self.space, config)
Thread(target=self.serve, args=(mapid,), daemon=True).start()
Thread(target=self.push, daemon=True).start()
Thread(target=self.pull, daemon=True).start()
@property
def is_running(self) -> bool:
"""Peer status."""
return self.view.is_running
def serve(self, mapid):
"""Initiate peers."""
"""Initiate other peers."""
with socket() as server: # TCP server
server.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
server.bind(self.addr)
@ -68,36 +73,39 @@ class Peer:
conn.send(dumps((mapid, self.peers+[self.addr])))
conn.close()
@forever
def push(self):
"""Send own state to peers."""
self.updated.wait()
def pull(self):
"""Receive other peers' state."""
while self.is_running: self.q.put(self.sock.recvfrom(1<<16))
def update(self):
"""Update the local states and send them to other peers."""
while True:
try:
data, addr = self.q.get_nowait()
except Empty:
break
else:
health, pos, rot, shards = loads(data)
if addr not in self.view.picos:
self.peers.append(addr)
self.view.add_pico(addr)
self.view.picos[addr].sync(health, pos, rot, shards)
self.q.task_done()
self.view.update()
pico = self.pico
shards = {i: (s.pos, s.rot, s.power)
for i, s in pico.shards.items()}
data = dumps([pico.health, pico.pos, pico.rot, shards])
for peer in self.peers:
self.sock.sendto(data, peer)
self.updated.clear()
@forever
def pull(self):
"""Receive peers' state."""
data, addr = self.sock.recvfrom(1<<16)
health, pos, rot, shards = loads(data)
if addr not in self.view.picos:
self.peers.append(addr)
self.view.add_pico(addr)
self.view.picos[addr].sync(health, pos, rot, shards)
def update(self):
"""Locally update the internal states."""
self.view.update()
self.updated.set()
for peer in self.peers: self.sock.sendto(data, peer)
def __enter__(self): return self
def __exit__(self, exc_type, exc_value, traceback):
while not self.q.empty():
self.q.get()
self.q.task_done()
self.q.join()
self.sock.close()
self.view.close()
@ -117,13 +125,13 @@ def main():
'--host',
help='host to bind this peer to (fallback: {})'.format(config.host))
parser.add_argument(
'--port', type=int,
'-p', '--port', type=int,
help='port to bind this peer to (fallback: {})'.format(config.port))
parser.add_argument('--seeder',
parser.add_argument('-s', '--seeder',
help='address of the peer that created the map')
# All these options specific for a graphical peer need to be modularized.
parser.add_argument(
'-s', '--size', type=int, nargs=2, metavar=('X', 'Y'),
'--size', type=int, nargs=2, metavar=('X', 'Y'),
help='the desired screen size (fallback: {}x{})'.format(*config.size))
parser.add_argument(
'--vsync', action='store_true', default=None,
@ -145,4 +153,4 @@ def main():
config.read_args(args)
with Peer(config) as peer:
while peer.view.is_running: peer.update()
while peer.is_running: peer.update()

View File

@ -6,7 +6,7 @@ with open('README.md') as f:
setup(
name='axuy',
version='0.0.5',
version='0.0.6',
description='Minimalist first-person shooter',
long_description=long_description,
long_description_content_type='text/markdown',
@ -15,7 +15,7 @@ setup(
author_email='vn.mcsinyx@gmail.com',
license='AGPLv3+',
classifiers=[
'Development Status :: 2 - Pre-Alpha',
'Development Status :: 3 - Alpha',
'Environment :: MacOS X',
'Environment :: Win32 (MS Windows)',
'Environment :: X11 Applications',