ssiv/src/ssiv/sdlui_process.py

257 lines
9.2 KiB
Python

# This file is part of ssiv.
#
# ssiv is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# ssiv is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with ssiv. If not, see <https://www.gnu.org/licenses/>.
from _thread import allocate_lock
from threading import Thread
from .constants import *
from .image_loader_receive import *
from .mt import *
from .sdl2e import *
from .tools import *
from .widget import VideoSubSystem,Window
__all__=['SDLUI_Process']
class Agent:
__slots__=(
'_stop','_env',
)
def __new__(cls,env):
instance=super().__new__(cls)
instance._stop=False
instance._env=env
return instance
@property
def stop(self):
return self._stop or self._env.stop
@stop.setter
def stop(self,value):
self._stop=not not value
def load_wand(pipe,env):
if not (path:=pipe.recv_bytes()):
assert verb(__name__,'load: cancel load')
return
container,filename=unpackstr(path)
assert verb(__name__,f'load: {prettypath(container,filename)}')
wandrecv=enumerate(WandReceive(pipe,(agent:=Agent(env))))
while True:
try:
n,result=next(wandrecv)
except StopIteration:
break
except Exception as e:
assert error(__name__,'load: receive exception',err=e)
break
if n==0:
n_loop,n_frames,is_thum=result
assert trace(__name__,f'load: {prettypath(container,filename)} {n_frames=}')
env.holdthumbnail((container,filename))
if not is_thum:
env.holdcanvas((container,filename))
env.cancelrequire((container,filename),not is_thum)
if is_thum:
env.puttask(SDL_WAND,SDL_WAND_THUMBNAILSTARTED,
args=(n_loop,n_frames),data=path)
else:
env.puttask(SDL_WAND,SDL_WAND_LOADINGSTARTED,
args=(n_loop,n_frames),data=path)
continue
frameprop,databuffer=result
match (frameprop.pixfmt,frameprop.bpp,frameprop.depth):
case ('RGB',3,8):
pixfmt=SDL_PIXELFORMAT_RGB24
case ('RGBA',4,8):
pixfmt=SDL_PIXELFORMAT_RGBA32
case _:
assert error(__name__,'load: unsupported format:',
'pixfmt={0.pixfmt} bpp={0.bpp} depth={0.depth}'.format(frameprop),
prettypath(container,filename))
agent.stop=True
continue
assert trace(__name__,'load_wand:',prettypath(container,filename),
'thumbnail' if is_thum else f'{n}/{n_frames}')
if is_thum:
env.puttask(SDL_WAND,SDL_WAND_THUMBNAILDECODED,
args=(n,pixfmt,frameprop,databuffer),data=path)
assert info(__name__,f'load: thumbnail {prettypath(container,filename)}')
else:
env.puttask(SDL_WAND,SDL_WAND_FRAMEDECODED,
args=(n,pixfmt,frameprop,databuffer),data=path)
if n==1:
env.puttask(SDL_WAND,SDL_WAND_THUMBNAILFINISHED,data=path)
if n==n_frames:
env.puttask(SDL_WAND,SDL_WAND_LOADINGFINISHED,data=path)
if n<1:
assert verb(__name__,f'load: cancel {prettypath(container,filename)}')
else:
assert True if is_thum else \
info(__name__,f'load: loaded {prettypath(container,filename)}')
def list_path(pipe,container,env):
pipe.send_bytes(container.encode('utf8'))
buf=bytearray()
extend=buf.extend
recv_bytes=pipe.recv_bytes
while data:=recv_bytes():
extend(data)
if filenames:=unpackstr(buf):
env.putcontainer(container,filenames)
else:
env.delpath(container)
env.action_view()
class Listener:
__slots__=(
'_sender','_receiver','_env','_recv_lock','_lock',
)
def __new__(cls,*args,**kwds):
instance=super().__new__(cls)
instance._sender=None
instance._receiver=None
instance._env=None
instance._recv_lock=None
instance._lock=allocate_lock()
return instance
def __init__(self,sender,receiver,env):
self._sender=sender
self._receiver=receiver
self._env=env
def __repr__(self):
return self.__class__.__name__
def __enter__(self):
self._env.set_listener(self)
self._recv_lock=start_thread(target=self._recv)
return self
def __exit__(self,exc_type,exc_value,exc_tb):
with self._recv_lock:pass
sender=self._sender
self._sender=None
sender.close()
receiver=self._receiver
self._receiver=None
receiver.close()
self._env.set_listener(None)
def _recv(self,interval=10):
poll=self._receiver.poll
recvs=self._receiver.recvs
env=self._env
pool=ThreadPool(0)
assert trace(__name__,f'{self}: recv: loop in')
while not env.stop:
if not poll(interval):
continue
if not (msg:=recvs()):
break
data,*args=msg
assert trace(__name__,f'{self}: recv: {data}')
match data:
case 'stop':
env.action_quit()
case 'load':
pipe,=args
pool.apply_async(load_wand,(pipe,env))
case 'list':
pipe,path=args
env.putcontainer(path,[])
pool.apply_async(list_path,(pipe,path,env))
case 'cancel':
container,filename,s=args
pool.apply_async(env.cancelrequire,
((container,filename),s=='canvas'))
case 'delete':
container,filename=args
env.delpath(container,filename)
env.action_view()
case _:
assert warn(__name__,f'{self}: recv: unknown command: {data}')
assert trace(__name__,f'{self}: recv: loop out')
pool.join()
assert trace(__name__,f'{self}: recv: end')
def speak(self,command,*args):
assert is_mainthread()
assert trace(__name__,f'{self}: speak start: {command}')
if self._env.stop or command=='over':
try:
self._sender.sends('over')
except:
pass
return
try:
self._sender.sends(command,*args)
assert trace(__name__,f'{self}: speak end: {command}')
except Exception as e:
assert error(__name__,f'{self}: speak failed',exc=e)
def SDLUI_Process(sender,receiver,config):
with VideoSubSystem(config) as env:
with Listener(sender,receiver,env) as listener:
with Window(title=progname.lower()) as window:
window.screensaver=config.user_interface.screensaver
if config.renderer.driver:
window.renderer_driver=config.renderer.driver
window.renderer_vsync=not not config.renderer.vsync
listener.speak('max_texture_size',
window.renderer.max_texture_width,
window.renderer.max_texture_height)
window.renderer.set_draw_color(*config.style.bgcolor,0)
assert info(__name__,'screensaver',
'enabled' if window.screensaver else 'disabled')
assert verb(__name__,'\n'.join(window.available_window_drivers))
assert info(__name__,f'using window driver: {window.window_driver}')
assert verb(__name__,'\n'.join(window.available_renderer_drivers))
assert info(__name__,'using',
'software' if window.renderer.is_software else 'hardware',
'renderer driver:',
window.renderer.driver)
assert info(__name__,'renderer vsync',
'enabled' if window.renderer.vsync else 'disabled')
assert verb(__name__,'\n'.join(window.renderer.available_pixfmts))
window.resize(*config.style.size)
window.move_to()
window.fullscreen_change_videomode=config.user_interface.realfs
window.fullscreen=config.user_interface.startfs
window.show()
window.renderer.clear()
window.update()
window.renderer.present()
try:
env.looper(window)
except Exception as e:
env.stop=True
assert fault(__name__,f'{self}: fault',exc=e)
return UI_SUCCESS
# Local Variables:
# coding: utf-8
# mode: python
# python-indent-offset: 4
# indent-tabs-mode: nil
# End: