comp/comp

397 lines
14 KiB
Python
Executable File

#!/usr/bin/env python3
# comp - Curses Online Media Player
# Copyright (C) 2017 Nguyễn Gia Phong
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import curses
import json
from argparse import ArgumentParser
from configparser import ConfigParser
from curses.ascii import ctrl
from datetime import datetime
from gettext import gettext as _, textdomain
from itertools import cycle
from os import linesep, makedirs
from os.path import abspath, dirname, expanduser, isfile
from random import choice
from time import gmtime, strftime
from threading import Thread
from youtube_dl import YoutubeDL
from mpv import MPV
# Init gettext
textdomain('comp')
# Global constants
SYSTEM_CONFIG = '/etc/comp/settings.ini'
USER_CONFIG = expanduser('~/.config/comp/settings.ini')
MPV_LOG = expanduser('~/.cache/comp/mpv.log')
MODES = ('play-current', 'play-all', 'play-selected', 'repeat-current',
'repeat-all', 'repeat-selected', 'shuffle-all', 'shuffle-selected')
MODE_STR_LEN = max(len(_(mode)) for mode in MODES)
def mpv_logger(loglevel, component, message):
mpv_log = '{} [{}] {}: {}{}'.format(datetime.isoformat(datetime.now()),
loglevel, component, message, linesep)
with open(MPV_LOG, 'a') as f:
f.write(mpv_log)
def _secpair2hhmmss(pos, duration):
"""Quick hack to convert a pair of seconds to HHMMSS / HHMMSS
string as MPV.get_property_osd_string isn't available.
"""
postime, durationtime = gmtime(pos), gmtime(duration)
# Let's hope media durations are shorter than a day
timestr = '%M:%S' if duration < 3600 else '%H:%M:%S'
return '{} / {}'.format(strftime(timestr, postime),
strftime(timestr, durationtime))
class Comp:
"""Meta object for drawing and playing.
Attributes:
entries
mode (str)
mp (MPV)
start (int)
stdscr
y (int)
"""
def idx(self): return self.start + self.y - 1
def current(self): return self.entries[self.idx()]
def __init__(self, entries, mode, mpv_vo, mpv_vid, ytdlf):
self.mode, self.reading, self.start, self.y = mode, False, 0, 1
self.entries, self.playlist = entries, iter(())
self.setno('current', 'error', 'playing', 'selected')
if self.entries: self.current()['current'] = True
self.mp = MPV(input_default_bindings=True, input_vo_keyboard=True,
log_handler=mpv_logger, ytdl=True, ytdl_format=ytdlf)
if mpv_vo: self.mp['vo'] = mpv_vo
self.mp._set_property('vid', mpv_vid)
self.mp.observe_property('mute', self.generic_event_handler)
self.mp.observe_property('pause', self.generic_event_handler)
self.mp.observe_property('time-pos', self.generic_event_handler)
self.mp.observe_property('vid', self.generic_event_handler)
self.stdscr = curses.initscr()
curses.noecho()
curses.cbreak()
self.stdscr.keypad(True)
curses.curs_set(False)
curses.start_color()
curses.use_default_colors()
curses.init_pair(1, 1, -1)
curses.init_pair(2, 2, -1)
curses.init_pair(3, 3, -1)
curses.init_pair(4, 4, -1)
curses.init_pair(5, 5, -1)
curses.init_pair(6, 6, -1)
curses.init_pair(7, 7, -1)
curses.init_pair(8, -1, 7)
curses.init_pair(9, -1, 1)
curses.init_pair(10, -1, 2)
curses.init_pair(11, -1, 3)
curses.init_pair(12, -1, 4)
curses.init_pair(13, -1, 5)
curses.init_pair(14, -1, 6)
self.reprint()
def setno(self, *keys):
"""Set all keys of each entry in entries to False."""
for entry in self.entries:
for key in keys:
entry[key] = False
def update_status(self, message='', msgattr=curses.A_NORMAL):
if self.reading: return False
right = ' {} {}{} '.format(
_(self.mode), ' ' if self.mp._get_property('mute', bool) else 'A',
' ' if self.mp._get_property('vid') == 'no' else 'V')
self.stdscr.addstr(curses.LINES - 2, 0, right.rjust(curses.COLS),
curses.color_pair(14))
try:
left = ' {} {} '.format(
_secpair2hhmmss(self.mp._get_property('time-pos', int),
self.mp._get_property('duration', int)),
'|' if self.mp._get_property('pause', bool) else '>')
self.stdscr.addstr(curses.LINES - 2, 0, left, curses.color_pair(14))
title_len = curses.COLS - len(left + right)
center = self.mp._get_property('media-title').ljust(title_len)[:title_len]
self.stdscr.addstr(curses.LINES - 2, len(left), center,
curses.color_pair(14) | curses.A_BOLD)
except:
return True
self.stdscr.move(curses.LINES - 1, 0)
self.stdscr.clrtoeol()
self.stdscr.addstr(curses.LINES - 1, 0,
message[:curses.COLS], msgattr)
self.stdscr.refresh()
return False
def update_playlist(self):
action, pick = self.mode.split('-')
if pick == 'current':
entries = [self.current()]
elif pick == 'all':
entries = self.entries
else:
entries = [entry for entry in self.entries if entry['selected']]
if action == 'play':
self.playlist = iter(entries)
elif action == 'repeat':
self.playlist = cycle(entries)
else:
self.playlist = iter(lambda: choice(entries), None)
def getlink(self, entry):
"""Return an URL from the given entry."""
with YoutubeDL({'quiet': True}) as ytdl:
return entry.setdefault('webpage_url', ytdl.extract_info(
entry['url'], download=False, ie_key=entry.get('ie_key')
).get('webpage_url'))
def play(self):
def mpv_play(entry):
self.mp.play(self.getlink(entry))
self.mp.wait_for_playback()
self.setno('playing')
self.reprint()
try:
entry = next(self.playlist)
except StopIteration:
pass
else:
play_thread = Thread(target=mpv_play, args=(entry,), daemon=True)
play_thread.start()
self.mp._set_property('pause', False, bool)
entry['playing'] = True
self.reprint()
def reattr(self, y=None):
if not y: y = self.y
entry = self.entries[self.start + y - 1]
invert = 8 if entry.setdefault('current', False) else 0
if entry.setdefault('error', False):
self.stdscr.chgat(y, 0, curses.color_pair(1 + invert) | curses.A_BOLD)
elif entry.setdefault('playing', False):
self.stdscr.chgat(y, 0, curses.color_pair(3 + invert) | curses.A_BOLD)
elif entry.setdefault('selected', False):
self.stdscr.chgat(y, 0, curses.color_pair(5 + invert) | curses.A_BOLD)
elif invert:
self.stdscr.chgat(y, 0, curses.color_pair(12) | curses.A_BOLD)
else:
self.stdscr.chgat(y, 0, curses.color_pair(0))
def reprint(self):
self.stdscr.clear()
self.stdscr.addstr(0, 1, _('Title'))
sitenamelen = max(max(len(entry['ie_key']) for entry in self.entries), 6)
self.stdscr.addstr(0, curses.COLS - sitenamelen - 1, _('Source'))
self.stdscr.chgat(0, 0, curses.color_pair(10) | curses.A_BOLD)
for i, entry in enumerate(self.entries[self.start:][:curses.LINES-3]):
self.stdscr.addstr(i + 1, 0, entry['ie_key'].rjust(curses.COLS - 1))
self.stdscr.addstr(i + 1, 1, entry.get('title', '')[:curses.COLS-sitenamelen-3])
self.reattr(i + 1)
self.update_status()
def move(self, delta):
if not (self.entries and delta): return
self.current()['current'] = False
self.reattr()
start, maxy = self.start, min(len(self.entries), curses.LINES - 3)
if self.idx() + delta <= 0:
self.start, self.y = 0, 1
elif self.idx() + delta >= len(self.entries):
self.start, self.y = len(self.entries) - maxy, maxy
elif self.y + delta < 1:
self.start += self.y + delta - 1
self.y = 1
elif self.y + delta > curses.LINES - 3:
self.start += self.y + delta - maxy
self.y = maxy
else:
self.y += delta
self.current()['current'] = True
if self.start == start:
self.reattr()
self.stdscr.refresh()
else:
self.reprint()
def gets(self, prompt):
self.stdscr.addstr(curses.LINES - 1, 0, prompt)
self.reading = True
curses.curs_set(True)
curses.echo()
b = self.stdscr.getstr(curses.LINES - 1, len(prompt))
self.reading = False
curses.curs_set(False)
curses.noecho()
return b.decode()
def generic_event_handler(self, event):
"""Reprint status line and play next entry if the last one is
ended without caring about the event.
"""
if self.update_status(): self.play()
def close(self):
curses.nocbreak()
self.stdscr.keypad(False)
curses.echo()
curses.endwin()
self.mp.terminate()
parser = ArgumentParser(description=_("Curses Online Media Player"))
parser.add_argument('-j', '--json-playlist', required=False, metavar='path',
help=_('path to playlist in JSON format'))
parser.add_argument('-u', '--online-playlist', required=False, metavar='URL',
help=_('URL to an playlist on Youtube'))
args = parser.parse_args()
config = ConfigParser()
config.read(USER_CONFIG if isfile(USER_CONFIG) else SYSTEM_CONFIG)
if args.json_playlist:
json_file = args.json_playlist
with open(json_file) as f:
entries = json.load(f)
elif args.online_playlist:
with YoutubeDL({'extract_flat': 'in_playlist', 'quiet': True}) as ytdl:
info = ytdl.extract_info(args.online_playlist, download=False)
entries = info.get('entries', [])
json_file = ''
else:
entries = []
json_file = ''
makedirs(dirname(MPV_LOG), exist_ok=True)
comp = Comp(
entries,
config.get('comp', 'play-mode', fallback='play-current'),
config.get('mpv', 'video-output', fallback=None),
config.get('mpv', 'video', fallback='auto'),
config.get('youtube-dl', 'format', fallback='best'))
while True:
c = comp.stdscr.getch()
if c == 10: # curses.KEY_ENTER doesn't work
comp.update_playlist()
comp.play()
elif c == 32: # space
comp.mp._toggle_property('pause')
elif c == 65: # letter A
comp.mp._toggle_property('mute')
elif c == 77: # letter M
comp.mode = MODES[(MODES.index(comp.mode) - 1) % 8]
comp.update_status()
elif c == 86: # letter V
comp.mp._set_property(
'vid', 'auto' if mp._get_property('vid') == 'no' else 'no')
elif c == 87: # letter W
if not entries: continue
s = comp.gets(_('Save playlist to [{}]: ').format(json_file))
if s: json_file = s
try:
makedirs(dirname(abspath(json_file)), exist_ok=True)
with open(json_file, 'w') as f:
json.dump(entries, f)
except:
comp.update_status(
_("'{}': Can't open file for writing").format(json_file),
curses.color_pair(1))
else:
comp.update_status(_("'{}' written").format(json_file))
elif c == 99: # letter c
if not entries: continue
i = comp.start + comp.y - 1
comp.entries[i]['selected'] = not entries[i].setdefault('selected', False)
comp.move(1)
elif c == 100: # letter d
if not entries: continue
i = comp.idx()
if i + 1 < len(entries):
comp.entries.pop(i)
comp.entries[i]['current'] = True
elif len(entries) > 1:
comp.entries.pop(i)
comp.entries[i - 1]['current'] = True
else:
comp.entries = []
comp.reprint()
elif c == 109: # letter m
comp.mode = MODES[(MODES.index(comp.mode) + 1) % 8]
comp.update_status()
elif c == 113: # letter q
comp.close()
break
elif c == 119: # letter w
if not comp.entries: continue
with YoutubeDL({'format': ytdlf}) as ytdl:
ytdl.download([comp.getlink(entry) for entry in choose_from(mode)])
elif c in (curses.KEY_UP, 107): # up arrow or letter k
if not entries: continue
comp.move(-1)
elif c in (curses.KEY_DOWN, 106): # down arrow or letter j
if not entries: continue
comp.move(1)
elif c in (curses.KEY_LEFT, 104): # left arrow or letter h
if comp.mp._get_property('duration', int):
comp.mp.seek(-2.5)
elif c in (curses.KEY_RIGHT, 108): # right arrow or letter l
if comp.mp._get_property('duration', int):
comp.mp.seek(2.5)
elif c == curses.KEY_HOME: # home
if not comp.entries: continue
comp.move(-len(comp.entries))
elif c == curses.KEY_END: # end
if not entries: continue
comp.move(len(comp.entries))
elif c == curses.KEY_NPAGE: # page down
if not comp.entries: continue
comp.move(curses.LINES - 4)
elif c == curses.KEY_PPAGE: # page up
if not entries: continue
comp.move(4 - curses.LINES)
elif c == curses.KEY_F5: # F5
comp.reprint()
elif c == curses.KEY_RESIZE:
curses.update_lines_cols()
if curses.COLS < MODE_STR_LEN + 42 or curses.LINES < 4:
comp.stdscr.clear()
sizeerr = _('Current size: {}x{}. Minimum size: {}x4.').format(
curses.COLS, curses.LINES, MODE_STR_LEN + 42)
comp.stdscr.addstr(0, 0, sizeerr[:curses.LINES*curses.COLS])
else:
comp.start += comp.y - 1
comp.y = 1
comp.reprint()