Fix locks
This commit is contained in:
parent
9ed60f50ab
commit
9a8f9b0e3b
|
@ -97,3 +97,10 @@ def placeable(space, x, y, z, r):
|
|||
{twelve(x-r), twelve(x), twelve(x+r)},
|
||||
{twelve(y-r), twelve(y), twelve(y+r)},
|
||||
{nine(z-r), nine(z), nine(z+r)}))
|
||||
|
||||
|
||||
def whilst(func, cond=lambda:True):
|
||||
"""Call func until cond is broken."""
|
||||
def loop(*args, **kwargs):
|
||||
while cond(): func(*args, **kwargs)
|
||||
return loop
|
||||
|
|
61
axuy/peer.py
61
axuy/peer.py
|
@ -21,11 +21,10 @@ __doc__ = 'Axuy main loop'
|
|||
|
||||
from argparse import ArgumentParser, RawTextHelpFormatter
|
||||
from pickle import dumps, loads
|
||||
from random import shuffle
|
||||
from socket import socket, SOCK_DGRAM, SOL_SOCKET, SO_REUSEADDR
|
||||
from threading import RLock, Thread, Semaphore
|
||||
from threading import Event, RLock, Thread
|
||||
|
||||
from .misc import mapgen, mapidgen
|
||||
from .misc import mapgen, mapidgen, whilst
|
||||
from .pico import Picobot
|
||||
from .view import ConfigReader, View
|
||||
|
||||
|
@ -48,26 +47,20 @@ class Peer:
|
|||
mapid = data['mapid']
|
||||
self.peers.extend(data['peers'])
|
||||
|
||||
self.semaphore, lock = Semaphore(0), RLock()
|
||||
self.updated, lock = Event(), RLock()
|
||||
self.addr = config.host, config.port
|
||||
self.space = mapgen(mapid)
|
||||
self.pico = Picobot(self.addr, self.space)
|
||||
self.view = View(self.addr, self.pico, self.space, config, lock)
|
||||
|
||||
data_server = Thread(target=self.serve, args=(mapid,))
|
||||
data_server.daemon = True
|
||||
data_server.start()
|
||||
Thread(target=self.serve, args=(mapid,), daemon=True).start()
|
||||
|
||||
self.sock = socket(type=SOCK_DGRAM) # UDP
|
||||
self.sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
|
||||
self.sock.bind(self.addr)
|
||||
|
||||
pusher = Thread(target=self.push)
|
||||
pusher.daemon = True
|
||||
pusher.start()
|
||||
puller = Thread(target=self.pull, args=(lock,))
|
||||
puller.daemon = True
|
||||
puller.start()
|
||||
Thread(target=self.push, daemon=True).start()
|
||||
Thread(target=self.pull, args=(lock,), daemon=True).start()
|
||||
|
||||
def serve(self, mapid):
|
||||
"""Initiate peers."""
|
||||
|
@ -81,28 +74,32 @@ class Peer:
|
|||
conn.send(dumps({'mapid': mapid, 'peers': self.peers}))
|
||||
conn.close()
|
||||
|
||||
@whilst
|
||||
def push(self):
|
||||
"""Send own state to peers."""
|
||||
while True:
|
||||
with self.semaphore:
|
||||
shards = {i: (s.pos, s.rot, s.power)
|
||||
for i, s in self.pico.shards.items()}
|
||||
data = dumps([self.pico.pos, self.pico.rot, shards])
|
||||
shuffle(self.peers)
|
||||
for peer in self.peers:
|
||||
self.semaphore.acquire()
|
||||
self.sock.sendto(data, peer)
|
||||
self.updated.wait()
|
||||
shards = {i: (s.pos, s.rot, s.power)
|
||||
for i, s in self.pico.shards.items()}
|
||||
data = dumps([self.pico.pos, self.pico.rot, shards])
|
||||
for peer in self.peers:
|
||||
self.sock.sendto(data, peer)
|
||||
self.updated.clear()
|
||||
|
||||
@whilst
|
||||
def pull(self, lock):
|
||||
"""Receive peers' state."""
|
||||
while True:
|
||||
data, addr = self.sock.recvfrom(1<<16)
|
||||
pos, rot, shards = loads(data)
|
||||
try:
|
||||
with lock: self.view.picos[addr].sync(pos, rot, shards)
|
||||
except KeyError:
|
||||
with lock: self.view.add_pico(addr, pos, rot)
|
||||
data, addr = self.sock.recvfrom(1<<16)
|
||||
pos, rot, shards = loads(data)
|
||||
with lock:
|
||||
if addr not in self.view.picos:
|
||||
self.peers.append(addr)
|
||||
self.view.add_pico(addr, pos, rot)
|
||||
self.view.picos[addr].sync(pos, rot, shards)
|
||||
|
||||
def update(self):
|
||||
"""Locally update the internal states."""
|
||||
self.view.update()
|
||||
self.updated.set()
|
||||
|
||||
def __enter__(self): return self
|
||||
|
||||
|
@ -138,7 +135,7 @@ def main():
|
|||
'--vsync', action='store_true', default=None,
|
||||
help='enable vertical synchronization (fallback: {})'.format(
|
||||
config.vsync))
|
||||
parser.add_argument('--no-vsync', action='store_false', dest='server',
|
||||
parser.add_argument('--no-vsync', action='store_false', dest='vsync',
|
||||
help='disable vertical synchronization')
|
||||
parser.add_argument(
|
||||
'--fov', type=float,
|
||||
|
@ -154,6 +151,4 @@ def main():
|
|||
config.read_args(args)
|
||||
|
||||
with Peer(config) as peer:
|
||||
while peer.view.is_running:
|
||||
for _ in peer.peers: peer.semaphore.release()
|
||||
peer.view.update()
|
||||
while peer.view.is_running: peer.update()
|
||||
|
|
21
axuy/view.py
21
axuy/view.py
|
@ -513,17 +513,16 @@ class View:
|
|||
self.prog2['vp'].write(vp)
|
||||
self.prog3['vp'].write(vp)
|
||||
|
||||
self.lock.acquire(blocking=False)
|
||||
for pico in self.picos.values():
|
||||
shards = {}
|
||||
for index, shard in pico.shards.items():
|
||||
if not shard.power: continue
|
||||
shard.update(self.fps)
|
||||
self.render_shard(shard)
|
||||
shards[index] = shard
|
||||
pico.shards = shards
|
||||
if pico is not self.camera: self.render_pico(pico)
|
||||
self.lock.release()
|
||||
with self.lock:
|
||||
for pico in self.picos.values():
|
||||
shards = {}
|
||||
for index, shard in pico.shards.items():
|
||||
if not shard.power: continue
|
||||
shard.update(self.fps)
|
||||
self.render_shard(shard)
|
||||
shards[index] = shard
|
||||
pico.shards = shards
|
||||
if pico is not self.camera: self.render_pico(pico)
|
||||
|
||||
def update(self):
|
||||
"""Handle input, update GLSL programs and render the map."""
|
||||
|
|
Loading…
Reference in New Issue