Add tests

This commit is contained in:
jaseg 2016-02-19 15:34:34 +01:00
parent 51869a9184
commit f3cab6cdf1
2 changed files with 173 additions and 32 deletions

129
mpv-test.py Executable file
View File

@ -0,0 +1,129 @@
#!/usr/bin/env python3
import unittest
from unittest import mock
import math
import threading
from contextlib import contextmanager
import gc
import time
import mpv
MPV_ERRORS = [ l(ec) for ec, l in mpv.ErrorCode.EXCEPTION_DICT.items() if l ]
class TestProperties(unittest.TestCase):
@contextmanager
def swallow_mpv_errors(self):
try:
yield
except Exception as e:
for ex in MPV_ERRORS:
if e.args[:2] == ex.args:
break
else:
raise
def setUp(self):
self.m = mpv.MPV()
def test_sanity(self):
for name, (ptype, access) in mpv.ALL_PROPERTIES.items():
self.assertTrue('r' in access or 'w' in access)
self.assertRegex(name, '^[-0-9a-z]+$')
self.assertIn(ptype, (int, float, str, mpv.ynbool))
def test_read(self):
for name, (ptype, access) in mpv.ALL_PROPERTIES.items():
if 'r' in access:
name = name.replace('-', '_')
try:
with self.swallow_mpv_errors():
rv = getattr(self.m, name)
except Exception as e:
raise RuntimeError('Error while testing property', name)
if rv is not None: # Technically, any property can return None (even if of type e.g. int)
self.assertEqual(type(rv), ptype, msg=name)
def test_write(self):
for name, (ptype, access) in mpv.ALL_PROPERTIES.items():
if 'w' in access:
name = name.replace('-', '_')
with self.swallow_mpv_errors():
if ptype == int:
setattr(self.m, name, 0)
setattr(self.m, name, 1)
setattr(self.m, name, -1)
elif ptype == float:
setattr(self.m, name, 0.0)
setattr(self.m, name, 1)
setattr(self.m, name, 1.0)
setattr(self.m, name, -1.0)
setattr(self.m, name, math.nan)
elif ptype == str:
setattr(self.m, name, 'foo')
setattr(self.m, name, '')
setattr(self.m, name, 'bazbazbaz'*1000)
elif ptype == mpv.ynbool:
if 'r' in access:
setattr(self.m, name, 'yes')
self.assertTrue(getattr(self.m, name))
self.assertEqual(getattr(self.m, name), True)
setattr(self.m, name, b'yes')
self.assertTrue(getattr(self.m, name))
setattr(self.m, name, True)
self.assertTrue(getattr(self.m, name))
setattr(self.m, name, 'no')
self.assertFalse(getattr(self.m, name))
self.assertEqual(getattr(self.m, name), False)
setattr(self.m, name, b'no')
self.assertFalse(getattr(self.m, name))
setattr(self.m, name, False)
self.assertFalse(getattr(self.m, name))
else:
setattr(self.m, name, 'yes')
setattr(self.m, name, b'yes')
setattr(self.m, name, True)
def tearDown(self):
del self.m
class ObservePropertyTest(unittest.TestCase):
def test_observe_property(self):
handler = mock.Mock()
m = mpv.MPV()
m.loop = 'inf'
m.observe_property('loop', handler)
m.loop = 'no'
self.assertEqual(m.loop, 'no')
m.loop = 'inf'
self.assertEqual(m.loop, 'inf')
time.sleep(0.02)
m.unobserve_property(handler)
m.loop = 'no'
m.loop = 'inf'
m.terminate() # needed for synchronization of event thread
handler.has_calls([mock.call('loop', 'no'), mock.call('loop', 'inf')])
class TestLifecycle(unittest.TestCase):
def test_create_destroy(self):
thread_names = lambda: [ t.name for t in threading.enumerate() ]
self.assertNotIn('MPVEventHandlerThread', thread_names())
m = mpv.MPV()
self.assertIn('MPVEventHandlerThread', thread_names())
del m
gc.collect()
self.assertNotIn('MPVEventHandlerThread', thread_names())
if __name__ == '__main__':
unittest.main()

76
mpv.py
View File

@ -281,23 +281,32 @@ def load_lua():
CDLL('liblua.so', mode=RTLD_GLOBAL)
def _event_loop(event_handle, _playback_cond, event_callbacks, _property_handlers):
def _event_loop(event_handle, playback_cond, event_callbacks, property_handlers):
for event in _event_generator(event_handle):
devent = event.as_dict() # copy data from ctypes
eid = devent['event_id']
if eid in (MpvEventID.SHUTDOWN, MpvEventID.END_FILE, MpvEventID.PAUSE):
with _playback_cond:
_playback_cond.notify_all()
if eid == MpvEventID.PROPERTY_CHANGE:
_property_handlers[devent['reply_userdata']](devent['event'])
if eid == MpvEventID.LOG_MESSAGE and log_handler is not None:
ev = devent['event']
log_handler('{}: {}: {}'.format(ev['level'], ev['prefix'], ev['text']))
for callback in event_callbacks:
callback.call(devent)
if eid == MpvEventID.SHUTDOWN:
_mpv_detach_destroy(event_handle)
return
try:
devent = event.as_dict() # copy data from ctypes
eid = devent['event_id']
if eid in (MpvEventID.SHUTDOWN, MpvEventID.END_FILE, MpvEventID.PAUSE):
with playback_cond:
playback_cond.notify_all()
if eid == MpvEventID.PROPERTY_CHANGE:
pc, handlerid = devent['event'], devent['reply_userdata']&0Xffffffffffffffff
if handlerid in property_handlers:
if 'value' in pc:
property_handlers[handlerid](pc['name'], pc['value'])
else:
property_handlers[handlerid](pc['name'], pc['data'], pc['format'])
if eid == MpvEventID.LOG_MESSAGE and log_handler is not None:
ev = devent['event']
log_handler('{}: {}: {}'.format(ev['level'], ev['prefix'], ev['text']))
for callback in event_callbacks:
callback.call(devent)
if eid == MpvEventID.SHUTDOWN:
_mpv_detach_destroy(event_handle)
return
except:
pass # It seems that when this thread runs into an exception, the MPV core is not able to terminate properly
# anymore. FIXME
class MPV:
""" See man mpv(1) for the details of the implemented commands. """
@ -451,14 +460,13 @@ class MPV:
def observe_property(self, name, handler):
self._property_handlers[hash(handler)] = handler
_mpv_observe_property(self.handle, hash(handler), name.encode(), MpvFormat.STRING)
_mpv_observe_property(self._event_handle, hash(handler), name.encode(), MpvFormat.STRING)
def unobserve_property(self, handler):
_mpv_unobserve_property(self.handle, hash(handler))
try:
del self._property_handlers[hash(handler)]
except KeyError:
pass
handlerid = hash(handler)
_mpv_unobserve_property(self._event_handle, handlerid)
if handlerid in self._property_handlers:
del self._property_handlers[handlerid]
@property
def metadata(self):
@ -577,9 +585,9 @@ ALL_PROPERTIES = {
'audio-bitrate': (float, 'r'),
'audio-samplerate': (int, 'r'),
'audio-channels': (str, 'r'),
'aid': (int, 'rw'),
'audio': (int, 'rw'),
'balance': (int, 'rw'),
'aid': (str, 'rw'),
'audio': (str, 'rw'), # alias for aid
'balance': (float, 'rw'),
'fullscreen': (ynbool, 'rw'),
'deinterlace': (str, 'rw'),
'colormatrix': (str, 'rw'),
@ -610,8 +618,8 @@ ALL_PROPERTIES = {
'osd-width': (int, 'r'),
'osd-height': (int, 'r'),
'osd-par': (float, 'r'),
'vid': (int, 'rw'),
'video': (int, 'rw'),
'vid': (str, 'rw'),
'video': (str, 'rw'), # alias for vid
'video-align-x': (float, 'rw'),
'video-align-y': (float, 'rw'),
'video-pan-x': (int, 'rw'),
@ -619,9 +627,9 @@ ALL_PROPERTIES = {
'video-zoom': (float, 'rw'),
'video-unscaled': (ynbool, 'w'),
'program': (int, 'w'),
'sid': (int, 'rw'),
'secondary-sid': (int, 'rw'),
'sub': (int, 'rw'),
'sid': (str, 'rw'),
'sub': (str, 'rw'), # alias for sid
'secondary-sid': (str, 'rw'),
'sub-delay': (float, 'rw'),
'sub-pos': (int, 'rw'),
'sub-visibility': (ynbool, 'rw'),
@ -642,8 +650,12 @@ ALL_PROPERTIES = {
def bindproperty(MPV, name, proptype, access):
def getter(self):
value = _ensure_encoding(_mpv_get_property_string(self.handle, name.encode()))
return proptype(value) if value is not None else value
cval = _mpv_get_property_string(self.handle, name.encode())
if cval is None:
return None
rv = proptype(cval.decode())
# _mpv_free(cval) FIXME
return rv
def setter(self, value):
_mpv_set_property_string(self.handle, name.encode(), str(proptype(value)).encode())
def barf(*args):