536 lines
20 KiB
Python
Executable File
536 lines
20 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
# comp - Curses Omni Media Player
|
|
#
|
|
# comp is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU Affero General Public License as
|
|
# published by the Free Software Foundation, either version 3 of the
|
|
# License, or (at your option) any later version.
|
|
#
|
|
# comp program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with comp. If not, see <http://www.gnu.org/licenses/>.
|
|
#
|
|
# Copyright (C) 2017 Nguyễn Gia Phong <vn.mcsinyx@gmail.com>
|
|
|
|
__version__ = '0.4.6'
|
|
|
|
import curses
|
|
import re
|
|
from argparse import ArgumentParser
|
|
from collections import deque
|
|
from configparser import ConfigParser
|
|
from curses.ascii import ctrl, alt
|
|
from functools import reduce
|
|
from gettext import bindtextdomain, gettext as _, textdomain
|
|
from os.path import expanduser
|
|
from threading import Thread
|
|
from traceback import print_exception
|
|
|
|
from mpv import MPV
|
|
from pkg_resources import resource_filename
|
|
|
|
from omp import extract_info, Omp
|
|
|
|
# Init gettext
|
|
bindtextdomain('omp', resource_filename('omp', 'locale'))
|
|
textdomain('omp')
|
|
|
|
# Global constants
|
|
SYSTEM_CONFIG = '/etc/comp/settings.ini'
|
|
USER_CONFIG = expanduser('~/.config/comp/settings.ini')
|
|
MODES = ("play-current", "play-all", "play-selected", "repeat-current",
|
|
"repeat-all", "repeat-selected", "shuffle-all", "shuffle-selected")
|
|
MODE_STR_LEN = max(len(_(mode)) for mode in MODES)
|
|
DURATION_COL_LEN = max(len(_("Duration")), 8)
|
|
|
|
|
|
def justified(s, width):
|
|
"""Return s left-justified of length width."""
|
|
return s.ljust(width)[:width]
|
|
|
|
|
|
class Comp(Omp):
|
|
"""Meta object for drawing and playing.
|
|
|
|
Attributes:
|
|
entries (list): list of all tracks
|
|
json_file (str): path to save JSON playlist
|
|
mode (str): the mode to pick and play tracks
|
|
mp (MPV): an mpv instance
|
|
play_backward (bool): flag show if to play the previous track
|
|
play_list (list): list of tracks according to mode
|
|
played (list): list of previously played tracks
|
|
playing (int): index of playing track in played
|
|
playlist (iterator): iterator of tracks according to mode
|
|
reading (bool): flag show if user input is being read
|
|
search_str (str): regex search string
|
|
scr (curses WindowObject): curses window object
|
|
start (int): index of the first track to be printed on screen
|
|
y (int): the current y-coordinate
|
|
"""
|
|
def __new__(cls, entries, json_file, mode, mpv_args, ytdlf):
|
|
self = object.__new__(cls)
|
|
self.play_backward, self.reading = False, False
|
|
self.playing, self.start, self.y = -1, 0, 1
|
|
self.json_file, self.mode = json_file, mode
|
|
self.entries, self.played = entries, []
|
|
self.playlist, self.search_str = iter(()), ''
|
|
self.mp = MPV(input_default_bindings=True, input_vo_keyboard=True,
|
|
ytdl=True, ytdl_format=ytdlf)
|
|
self.scr = curses.initscr()
|
|
return self
|
|
|
|
def adds(self, s, y, x=0, X=-1, attr=curses.A_NORMAL, lpad=1):
|
|
"""Paint the string s, added lpad spaces to the left, from
|
|
(y, x) to (y, X) with attributes attr, overwriting anything
|
|
previously on the display.
|
|
"""
|
|
if self.reading: return
|
|
curses.update_lines_cols()
|
|
y %= curses.LINES
|
|
x %= curses.COLS
|
|
length = X % curses.COLS - x + (y != curses.LINES - 1)
|
|
self.scr.addstr(y, x, (' '*lpad + s).ljust(length)[:length], attr)
|
|
|
|
def update_status(self, message='', msgattr=curses.A_NORMAL):
|
|
"""Update the status lines at the bottom of the screen."""
|
|
def add_status_str(s, x=0, X=-1, attr=curses.color_pair(12), lpad=1):
|
|
self.adds(s, curses.LINES - 2, x=x, X=X, attr=attr, lpad=lpad)
|
|
|
|
if self.mp.osd.duration is not None:
|
|
self.played[self.playing]['duration'] = self.mp.osd.duration
|
|
add_status_str(':', X=5, lpad=3)
|
|
if self.mp.video: add_status_str('V', x=1, X=2)
|
|
if self.mp.audio: add_status_str('A', X=1)
|
|
add_status_str(self.mp.osd.time_pos or '00:00:00', x=4, X=12)
|
|
add_status_str('/', x=13, X=14)
|
|
add_status_str(self.mp.osd.duration or '00:00:00', x=15, X=23)
|
|
add_status_str('|' if self.mp.pause else '>', x=24, X=25)
|
|
add_status_str(self.mp.media_title or '', x=26,
|
|
attr=curses.color_pair(12)|curses.A_BOLD)
|
|
add_status_str(_(self.mode), x=-2-len(_(self.mode)))
|
|
self.scr.refresh()
|
|
|
|
def print_msg(self, message, error=False):
|
|
"""Print the given message, in red is it's an error."""
|
|
attributes = curses.color_pair(1) if error else curses.A_NORMAL
|
|
self.adds(message, curses.LINES-1, attr=attributes, lpad=0)
|
|
self.scr.refresh()
|
|
|
|
def setno(self, *keys):
|
|
"""Set all keys of each entry in entries to False."""
|
|
for entry in self.entries:
|
|
for key in keys: entry[key] = False
|
|
|
|
def play(self, force=False):
|
|
"""Play the next track."""
|
|
def mpv_play(entry, force):
|
|
self.setno('playing')
|
|
entry['playing'] = True
|
|
try:
|
|
self.mp.play(entry['filename'])
|
|
except:
|
|
entry['error'] = True
|
|
self.print(entry)
|
|
if force: self.mp.pause = False
|
|
self.mp.wait_for_playback()
|
|
self.play()
|
|
entry['playing'] = False
|
|
self.print(entry)
|
|
|
|
if self.play_backward and -self.playing < len(self.played):
|
|
self.playing -= 1
|
|
t = self.played[self.playing], force
|
|
elif self.playing < -1:
|
|
self.playing += 1
|
|
t = self.played[self.playing], force
|
|
else:
|
|
try:
|
|
self.played.append(next(self.playlist))
|
|
except StopIteration:
|
|
return
|
|
else:
|
|
t = self.played[-1], force
|
|
|
|
self.play_backward = False
|
|
play_thread = Thread(target=mpv_play, args=t, daemon=True)
|
|
play_thread.start()
|
|
|
|
def _writeln(self, y, title, duration, attr):
|
|
title_len = curses.COLS - DURATION_COL_LEN - 3
|
|
self.adds(title, y, attr=attr)
|
|
self.adds(duration, y, x=title_len+1, attr=attr)
|
|
self.scr.refresh()
|
|
|
|
def print(self, entry=None, y=None):
|
|
"""Print the entry in the line y."""
|
|
if entry is y is None:
|
|
entry = self.current()
|
|
y = self.idx() - self.start + 1
|
|
elif entry is None:
|
|
entry = self.entries[self.start + y - 1]
|
|
elif y is None:
|
|
y = self.idx(entry) - self.start + 1
|
|
if y < 1 or y > curses.LINES - 3: return
|
|
|
|
c = {'error': 1, 'playing': 3, 'selected': 5}
|
|
color = ((8 if entry is self.current() else 0)
|
|
| reduce(int.__xor__, (c.get(i, 0) for i in entry if entry[i])))
|
|
if color:
|
|
self._writeln(y, entry['title'], entry['duration'],
|
|
curses.color_pair(color) | curses.A_BOLD)
|
|
else:
|
|
self._writeln(y, entry['title'], entry['duration'],
|
|
curses.A_NORMAL)
|
|
|
|
def refresh(self):
|
|
"""Redraw the whole screen."""
|
|
self._writeln(0, _("Title"), _("Duration"),
|
|
curses.color_pair(10) | curses.A_BOLD)
|
|
for i, entry in enumerate(self.entries[self.start:][:curses.LINES-3]):
|
|
self.print(entry, i + 1)
|
|
self.scr.clrtobot()
|
|
self.update_status()
|
|
|
|
def property_handler(self, name, val): self.update_status()
|
|
|
|
def __init__(self, entries, json_file, mode, mpv_args, ytdlf):
|
|
curses.noecho()
|
|
curses.cbreak()
|
|
self.scr.keypad(True)
|
|
curses.curs_set(False)
|
|
curses.start_color()
|
|
curses.use_default_colors()
|
|
for i in range(1, 8): curses.init_pair(i, i, -1)
|
|
curses.init_pair(8, -1, 7)
|
|
for i in range(1, 7): curses.init_pair(i + 8, -1, i)
|
|
Omp.__init__(self, entries, json_file, mode, mpv_args, ytdlf)
|
|
self.refresh()
|
|
|
|
def __enter__(self): return self
|
|
|
|
def read_input(self, prompt):
|
|
"""Print the prompt string at the bottom of the screen then read
|
|
from standard input.
|
|
"""
|
|
self.adds(prompt, curses.LINES - 1, lpad=0)
|
|
self.reading = True
|
|
curses.curs_set(True)
|
|
curses.echo()
|
|
b = self.scr.getstr(curses.LINES - 1, len(prompt))
|
|
self.reading = False
|
|
curses.curs_set(False)
|
|
curses.noecho()
|
|
return b.decode()
|
|
|
|
def move(self, delta):
|
|
"""Move to the relatively next delta entry."""
|
|
if not (self.entries and delta): return
|
|
start, prev_entry = self.start, self.current()
|
|
maxy = min(len(self.entries), curses.LINES - 3)
|
|
|
|
if self.idx() + delta <= 0:
|
|
self.start, self.y = 0, 1
|
|
elif self.idx() + delta >= len(self.entries):
|
|
self.start, self.y = len(self.entries) - maxy, maxy
|
|
elif self.y + delta < 1:
|
|
self.start += self.y + delta - 1
|
|
self.y = 1
|
|
elif self.y + delta > curses.LINES - 3:
|
|
self.start += self.y + delta - maxy
|
|
self.y = maxy
|
|
else:
|
|
self.y += delta
|
|
|
|
if self.start == start:
|
|
self.print(prev_entry)
|
|
self.print()
|
|
else:
|
|
self.refresh()
|
|
|
|
def search(self, backward=False):
|
|
"""Prompt then search for a pattern."""
|
|
s = self.read_input(_("Search {}ward [{{}}]: ".format(
|
|
'back' if backward else 'for')).format(self.search_str))
|
|
if s: self.search_str = s
|
|
pattern = re.compile(self.search_str, re.IGNORECASE)
|
|
entries = deque(self.entries)
|
|
if backward:
|
|
entries.rotate(-self.idx())
|
|
entries.reverse()
|
|
else:
|
|
entries.rotate(-self.idx() - 1)
|
|
for entry in entries:
|
|
if pattern.search(entry['title']) is not None:
|
|
self.move(self.idx(entry) - self.idx())
|
|
return
|
|
self.print_msg(_("'{}' not found").format(self.search_str), error=True)
|
|
|
|
def resize(self):
|
|
curses.update_lines_cols()
|
|
self.scr.clear()
|
|
l = curses.LINES - 3
|
|
if curses.COLS < MODE_STR_LEN + 42 or l < 1: # too small
|
|
sizeerr = _("Current size: {}x{}. Minimum size: {}x4.").format(
|
|
curses.COLS, curses.LINES, MODE_STR_LEN + 42)
|
|
self.scr.addstr(0, 0, sizeerr[:curses.LINES*curses.COLS-1])
|
|
self.scr.refresh()
|
|
elif self.y > l: # shorter than the current entry
|
|
self.start += self.y - l
|
|
self.y = l
|
|
self.refresh()
|
|
elif 0 < self.start > len(self.entries) - l: # longer than the list
|
|
idx, self.start = self.idx(), min(0, len(self.entries) - l)
|
|
self.y = idx - self.start + 1
|
|
if self.y > l:
|
|
self.start += self.y - l
|
|
self.y = l
|
|
self.refresh()
|
|
else:
|
|
self.refresh()
|
|
|
|
def __exit__(self, exc_type, exc_value, traceback):
|
|
curses.nocbreak()
|
|
self.scr.keypad(False)
|
|
curses.echo()
|
|
curses.endwin()
|
|
Omp.__exit__(self, exc_type, exc_value, traceback)
|
|
if exc_value is not None:
|
|
print_exception(exc_type, exc_value, traceback)
|
|
|
|
|
|
parser = ArgumentParser(description='Curses Omni Media Player')
|
|
parser.add_argument('-v', '--version', action='version',
|
|
version='%(prog)s {}'.format(__version__))
|
|
parser.add_argument('-e', '--extractor', default='youtube-dl',
|
|
choices=('json', 'mpv', 'youtube-dl'), required=False,
|
|
help='playlist extractor, default is youtube-dl')
|
|
parser.add_argument('playlist', help='path or URL to the playlist')
|
|
parser.add_argument('-c', '--config', default=USER_CONFIG, required=False,
|
|
help='path to the configuration file')
|
|
parser.add_argument('--vo', required=False, metavar='DRIVER',
|
|
help='specify the video output backend to be used. See\
|
|
VIDEO OUTPUT DRIVERS in mpv(1) for details and\
|
|
descriptions of available drivers')
|
|
parser.add_argument('-f', '--format', required=False, metavar='YTDL_FORMAT',
|
|
help='video format/quality to be passed to youtube-dl')
|
|
args = parser.parse_args()
|
|
entries = extract_info(args.playlist, args.extractor)
|
|
if entries is None:
|
|
print(_("'{}': Can't extract playlist").format(args.playlist))
|
|
exit()
|
|
json_file = args.playlist if args.extractor == 'json' else ''
|
|
config = ConfigParser()
|
|
config.read(args.config)
|
|
mode = config.get('comp', 'play-mode', fallback='play-current')
|
|
mpv_args = dict(config['mpv']) if 'mpv' in config else {}
|
|
if args.vo is not None: mpv_args['vo'] = args.vo
|
|
ytdlf = args.format or config.get('youtube-dl', 'format',
|
|
fallback='bestvideo+bestaudio')
|
|
|
|
with Comp(entries, json_file, mode, mpv_args, ytdlf) as comp:
|
|
while True:
|
|
c = comp.scr.get_wch()
|
|
comp.print_msg('')
|
|
# mpv keybindings
|
|
if c == curses.KEY_LEFT:
|
|
comp.seek(-5, precision='exact')
|
|
elif c == curses.KEY_RIGHT:
|
|
comp.seek(5, precision='exact')
|
|
elif c == curses.KEY_SLEFT: # Shifted Left-arrow
|
|
comp.seek(-1, precision='exact')
|
|
elif c == curses.KEY_SRIGHT: # Shifted Right-arrow
|
|
comp.seek(1, precision='exact')
|
|
elif c == curses.KEY_UP:
|
|
comp.seek(-60, precision='exact')
|
|
elif c == curses.KEY_DOWN:
|
|
comp.seek(60, precision='exact')
|
|
elif c == curses.KEY_PPAGE:
|
|
comp.add('chapter', 1)
|
|
elif c == curses.KEY_NPAGE:
|
|
comp.add('chapter', -1)
|
|
elif c == '[':
|
|
comp.multiply('speed', 0.9091)
|
|
elif c == ']':
|
|
comp.multiply('speed', 1.1)
|
|
elif c == '{':
|
|
comp.multiply('speed', 0.5)
|
|
elif c == '}':
|
|
comp.multiply('speed', 2.0)
|
|
elif c == curses.KEY_BACKSPACE:
|
|
comp.mp.speed = 1.0
|
|
elif c == 'q':
|
|
comp.print_msg(_("Save playlist? [Y/n]"))
|
|
if comp.scr.get_wch() not in _("Nn"): comp.dump_json()
|
|
break
|
|
elif c in ('p', ' '):
|
|
comp.cycle('pause')
|
|
elif c == '.':
|
|
comp.mp.frame_step()
|
|
elif c == ',':
|
|
comp.mp.frame_back_step()
|
|
elif c == '<':
|
|
try:
|
|
if comp.mp.time_pos < 1:
|
|
comp.next(backward=True)
|
|
else:
|
|
comp.seek(0, 'absolute')
|
|
except:
|
|
pass
|
|
elif c == '>':
|
|
comp.next()
|
|
elif c == '\n': # curses.KEY_ENTER doesn't work
|
|
comp.update_playlist()
|
|
comp.next(force=True)
|
|
elif c == 'O':
|
|
comp.mp.command('cycle-values', 'osd-level', 3, 1)
|
|
elif c in ('o', 'P'):
|
|
comp.mp.show_progress()
|
|
elif c == 'z':
|
|
comp.add('sub-delay', -0.1)
|
|
elif c == 'x':
|
|
comp.add('sub-delay', 0.1)
|
|
elif c == ctrl('+'):
|
|
comp.add('audio-delay', 0.1)
|
|
elif c == ctrl('-'):
|
|
comp.add('audio-delay', -0.1)
|
|
elif c in ('/', '9'):
|
|
comp.add('volume', -2)
|
|
elif c in ('*', '0'):
|
|
comp.add('volume', 2)
|
|
elif c == 'm':
|
|
comp.cycle('mute')
|
|
elif c == '1':
|
|
comp.add('contrast', -1)
|
|
elif c == '2':
|
|
comp.add('contrast', 1)
|
|
elif c == '3':
|
|
comp.add('brightness', -1)
|
|
elif c == '4':
|
|
comp.add('brightness', 1)
|
|
elif c == '5':
|
|
comp.add('gamma', -1)
|
|
elif c == '6':
|
|
comp.add('gamma', 1)
|
|
elif c == '7':
|
|
comp.add('saturation', -1)
|
|
elif c == '8':
|
|
comp.add('saturation', 1)
|
|
elif c == alt('0'):
|
|
comp.mp.window_scale = 0.5
|
|
elif c == alt('1'):
|
|
comp.mp.window_scale = 1.0
|
|
elif c == alt('2'):
|
|
comp.mp.window_scale = 2.0
|
|
elif c == 'd':
|
|
comp.cycle('deinterlace')
|
|
elif c == 'r':
|
|
comp.add('sub-pos', -1)
|
|
elif c == 't':
|
|
comp.add('sub-pos', 1)
|
|
elif c == 'v':
|
|
comp.cycle('sub-visibility')
|
|
elif c == 'V':
|
|
comp.cycle('sub-ass-vsfilter-aspect-compat')
|
|
elif c == 'u':
|
|
comp.mp.command('cycle-values', 'sub-ass-override', 'force', 'no')
|
|
elif c == 'j':
|
|
comp.cycle('sub', 'up')
|
|
elif c == 'J':
|
|
comp.cycle('sub', 'down')
|
|
elif c == '#':
|
|
comp.cycle('audio')
|
|
elif c == '_':
|
|
comp.cycle('video')
|
|
elif c == 'T':
|
|
comp.cycle('ontop')
|
|
elif c == 'f':
|
|
comp.cycle('fullscreen')
|
|
elif c == 's':
|
|
comp.mp.screenshot()
|
|
elif c == 'S':
|
|
comp.mp.screenshot(includes='')
|
|
elif c == alt('s'):
|
|
comp.mp.screenshot(mode='each-frame')
|
|
elif c == 'w':
|
|
comp.add('panscan', -0.1)
|
|
elif c == 'e':
|
|
comp.add('panscan', 0.1)
|
|
elif c == 'A':
|
|
comp.mp.command('cycle-values', 'video-aspect',
|
|
'16:9', '4:3', '2.35:1', '-1')
|
|
elif c == 'E':
|
|
comp.cycle('edition')
|
|
elif c == 'l':
|
|
comp.mp.command('ab-loop')
|
|
elif c == 'L':
|
|
comp.mp.command('cycle-values', 'loop-file', 'inf', 'no')
|
|
|
|
# Emacs keybindings
|
|
elif c == ctrl('p'):
|
|
comp.move(-1)
|
|
elif c == ctrl('n'):
|
|
comp.move(1)
|
|
elif c == alt('v'):
|
|
comp.move(4 - curses.LINES)
|
|
elif c == ctrl('v'):
|
|
comp.move(curses.LINES - 4)
|
|
elif c in (ctrl('<'), curses.KEY_HOME):
|
|
comp.move(-len(comp.entries))
|
|
elif c in (ctrl('>'), curses.KEY_END):
|
|
comp.move(len(comp.entries))
|
|
elif c == ctrl(' '):
|
|
comp.current()['selected'] = not comp.current().get('selected')
|
|
comp.move(1)
|
|
|
|
elif c == ctrl('o'):
|
|
extractor = comp.read_input(_("Playlist extractor: "))
|
|
filename = comp.read_input(_("Open: "))
|
|
entries = extract_info(filename, extractor)
|
|
if entries is None:
|
|
comp.print_msg(
|
|
_("'{}': Can't extract playlist").format(filename))
|
|
else:
|
|
comp.entries, comp.start, comp.y = entries, 0, 1
|
|
comp.refresh()
|
|
elif c == ctrl('i'):
|
|
extractor = comp.read_input(_("Playlist extractor: "))
|
|
filename = comp.read_input(_("Insert: "))
|
|
entries = extract_info(filename, extractor)
|
|
if entries is None:
|
|
comp.print_msg(
|
|
_("'{}': Can't extract playlist").format(filename))
|
|
else:
|
|
bottom = comp.entries[comp.idx():]
|
|
comp.entries = comp.entries[:comp.idx()]
|
|
comp.entries.extend(entries)
|
|
comp.entries.extend(bottom)
|
|
comp.refresh()
|
|
elif c == ctrl('f'):
|
|
comp.search()
|
|
elif c == alt('f'):
|
|
comp.search(backward=True)
|
|
elif c == alt('m'):
|
|
comp.mode = MODES[(MODES.index(comp.mode) + 1) % 8]
|
|
comp.update_status()
|
|
elif c == curses.KEY_DC:
|
|
comp.entries.pop(comp.idx())
|
|
if 1 < len(comp.entries) - curses.LINES + 4 == comp.start:
|
|
comp.start -= 1
|
|
elif comp.idx() == len(comp.entries):
|
|
comp.y -= 1
|
|
comp.refresh()
|
|
elif c == 'W':
|
|
comp.dump_json()
|
|
elif c in (curses.KEY_F5, curses.KEY_RESIZE):
|
|
comp.resize()
|
|
elif c == ':':
|
|
try:
|
|
comp.mp.command(*comp.read_input(':').split())
|
|
except:
|
|
comp.print_msg(_("Failed to execute command"), error=True)
|