comp/comp

387 lines
15 KiB
Python

#!/usr/bin/env python3
# comp - Curses Online Media Player
# Copyright (C) 2017 Nguyễn Gia Phong
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import curses
import json
from argparse import ArgumentParser
from configparser import ConfigParser
from curses.ascii import ctrl
from datetime import datetime
from gettext import bindtextdomain, gettext, textdomain
from itertools import cycle
from os import linesep, makedirs
from os.path import abspath, dirname, expanduser, isfile
from random import choice
from time import gmtime, strftime
from threading import Thread
from youtube_dl import YoutubeDL
from mpv import MPV
# Init gettext
textdomain('comp')
_ = gettext
SYSTEM_CONFIG = '/etc/comp/settings.ini'
USER_CONFIG = expanduser('~/.config/comp/settings.ini')
MPV_LOG = expanduser('~/.cache/comp/mpv.log')
MODES = ('play-current', 'play-all', 'play-selected', 'repeat-current',
'repeat-all', 'repeat-selected', 'shuffle-all', 'shuffle-selected')
MODE_STR_LEN = max(len(_(mode)) for mode in MODES)
def mpv_logger(loglevel, component, message):
mpv_log = '{} [{}] {}: {}{}'.format(datetime.isoformat(datetime.now()),
loglevel, component, message, linesep)
with open(MPV_LOG, 'a') as f:
f.write(mpv_log)
def setno(entries, keys):
"""Set all keys of each entry in entries to False."""
for key in keys:
for entry in entries:
entry[key] = False
def getlink(entry):
"""Return an URL from the given entry."""
with YoutubeDL({'quiet': True}) as ytdl:
return ytdl.extract_info(entry['url'], download=False,
ie_key=entry.get('ie_key')).get('webpage_url')
def _secpair2hhmmss(pos, duration):
"""Quick hack to convert a pair of seconds to HHMMSS / HHMMSS
string as MPV.get_property_osd_string isn't available.
"""
if pos is None: return ''
postime, durationtime = gmtime(pos), gmtime(duration)
# Let's hope media durations are shorter than a day
timestr = '%M:%S' if duration < 3600 else '%H:%M:%S'
return ' {} / {}'.format(strftime(timestr, postime),
strftime(timestr, durationtime))
class Comp(object):
"""Meta object for drawing and playing."""
def __new__(cls, entries, mode, mpv_vo, mpv_vid, ytdlf):
self = object.__new__(cls)
self.start = 0
self.y = 1
self.mode = mode
self.entries = entries
self.playlist = (i for i in []) # an empty generator?
self.stdscr = curses.initscr()
self.mp = MPV(input_default_bindings=True, input_vo_keyboard=True,
log_handler=mpv_logger, ytdl=True, ytdl_format=ytdlf)
if mpv_vo: self.mp['vo'] = mpv_vo
self.mp._set_property('vid', mpv_vid)
return self
def __init__(self):
setno(entries, ['current', 'error', 'playing', 'selected'])
entries[0]['current'] = True
curses.noecho()
curses.cbreak()
self.stdscr.keypad(True)
curses.curs_set(False)
curses.start_color()
curses.use_default_colors()
curses.init_pair(1, 1, -1)
curses.init_pair(2, 2, -1)
curses.init_pair(3, 3, -1)
curses.init_pair(4, 4, -1)
curses.init_pair(5, 5, -1)
curses.init_pair(6, 6, -1)
curses.init_pair(7, 7, -1)
curses.init_pair(8, -1, 7)
curses.init_pair(9, -1, 1)
curses.init_pair(10, -1, 2)
curses.init_pair(11, -1, 3)
curses.init_pair(12, -1, 4)
curses.init_pair(13, -1, 5)
curses.init_pair(14, -1, 6)
self.mp._set_property('vid', mpv_vid)
def updatestt(void): self.update_status()
self.mp.observe_property('mute', updatestt)
self.mp.observe_property('pause', updatestt)
self.mp.observe_property('time-pos', updatestt)
self.mp.observe_property('vid', updatestt)
def update_status(self, message='', msgattr=curses.A_NORMAL):
right = ' {} {}{} '.format(
_(mode), ' ' if self.mp._get_property('mute', bool) else 'A',
' ' if mp._get_property('vid') == 'no' else 'V')
try:
left = _secpair2hhmmss(self.mp._get_property('time-pos', int),
self.mp._get_property('duration', int))
except:
left += ' | ' if self.mp._get_property('pause', bool) else ' > '
self.stdscr.addstr(curses.LINES - 2, 0, right.rjust(curses.COLS),
curses.color_pair(14))
else:
stdscr.addstr(curses.LINES - 2, 0, left, curses.color_pair(14))
title_len = curses.COLS - len(left + right)
center = self.entries.setdefault('title', self.mp._get_property('media-title')).ljust(title_len)[:title_len]
self.stdscr.addstr(curses.LINES - 2, len(left), center,
curses.color_pair(14) | curses.A_BOLD)
self.stdscr.addstr(curses.LINES - 2, len(left + center), right,
curses.color_pair(14))
finally:
self.stdscr.move(curses.LINES - 1, 0)
self.stdscr.clrtoeol()
self.stdscr.addstr(curses.LINES - 1, 0, message, msgattr)
self.stdscr.refresh()
def choose_from(mode1):
if mode1 == 'all': return entries
else: return [entry for entry in entries if entry.setdefault(mode1, False)]
def update_playlist(self):
action = self.mode.split('-')[0]
entries2play = choose_from(mode.split('-')[1])
# Somehow yield have to be used instead of returning a generator
if action == 'play':
for entry in entries2play: yield entry
elif action == 'repeat':
for entry in cycle(entries2play): yield entry
elif entries2play:
while True: yield choice(entries2play)
def play(self):
for entry in playlist(self.mode):
setno(entries, ['playing'])
reprint(stdscr, entries[start : start+curses.LINES-3])
entries[entries.index(entry)]['playing'] = True
reprint(stdscr, entries[start : start+curses.LINES-3])
mp.play(entry.setdefault('webpage_url', getlink(entry)))
mp.wait_for_playback()
def reattr(self, entry):
invert = 8 if entry.setdefault('current', False) else 0
if entry.setdefault('error', False):
self.stdscr.chgat(self.y, 0, curses.color_pair(1 + invert) | curses.A_BOLD)
elif entry.setdefault('playing', False):
self.stdscr.chgat(self.y, 0, curses.color_pair(3 + invert) | curses.A_BOLD)
elif entry.setdefault('selected', False):
self.stdscr.chgat(self.y, 0, curses.color_pair(5 + invert) | curses.A_BOLD)
elif invert:
self.stdscr.chgat(self.y, 0, curses.color_pair(12) | curses.A_BOLD)
else:
self.stdscr.chgat(self.y, 0, curses.color_pair(0))
def reprint(stdscr, entries2print):
stdscr.clear()
stdscr.addstr(0, 1, _('Title'))
sitenamelen = max(max(len(entry['ie_key']) for entry in entries), 6)
stdscr.addstr(0, curses.COLS - sitenamelen - 1, _('Source'))
stdscr.chgat(0, 0, curses.color_pair(10) | curses.A_BOLD)
for i, entry in enumerate(entries2print):
y = i + 1
stdscr.addstr(y, 0, entry['ie_key'].rjust(curses.COLS - 1))
stdscr.addstr(y, 1, entry.get('title', '')[:curses.COLS-sitenamelen-3])
reattr(stdscr, y, entry)
update_status(stdscr, mp)
def move(self, delta):
if self.start + self.y + delta < 1:
if start + y == 1:
return 1
setno(entries, ['current'])
start = 0
entries[0]['current'] = True
reprint(stdscr, entries[:curses.LINES-3])
return 1
elif start + y + delta > len(entries):
start = max(len(entries) - curses.LINES + 3, 0)
y = min(curses.LINES - 3, len(entries))
setno(entries, ['current'])
entries[-1]['current'] = True
reprint(stdscr, entries[-curses.LINES+3:])
return y
if y + delta < 1:
start += y + delta - 1
y = 1
setno(entries, ['current'])
entries[start]['current'] = True
reprint(stdscr, entries[start : start+curses.LINES-3])
elif y + delta > curses.LINES - 3:
start += y + delta - curses.LINES + 3
y = curses.LINES - 3
setno(entries, ['current'])
entries[start + curses.LINES - 4]['current'] = True
reprint(stdscr, entries[start : start+curses.LINES-3])
else:
entries[start + y - 1]['current'] = False
reattr(stdscr, y, entries[start + y - 1])
y = y + delta
entries[start + y - 1]['current'] = True
reattr(stdscr, y, entries[start + y - 1])
stdscr.refresh()
return y
def close(self):
curses.nocbreak()
self.stdscr.keypad(False)
curses.echo()
curses.endwin()
self.mp.terminate()
parser = ArgumentParser(description=_("Curses Online Media Player"))
parser.add_argument('-j', '--json-playlist', required=False, metavar='path',
help=_('path to playlist in JSON format'))
parser.add_argument('-u', '--online-playlist', required=False, metavar='URL',
help=_('URL to an playlist on Youtube'))
args = parser.parse_args()
config = ConfigParser()
config.read(USER_CONFIG if isfile(USER_CONFIG) else SYSTEM_CONFIG)
if args.json_playlist:
json_file = args.json_playlist
with open(json_file) as f:
entries = json.load(f)
elif args.youtube_playlist:
with YoutubeDL({'extract_flat': 'in_playlist', 'quiet': True}) as ytdl:
info = ytdl.extract_info(args.youtube_playlist, download=False)
entries = info.get('entries', [])
json_file = ''
else:
entries = []
json_file = ''
makedirs(dirname(mpv_log), exist_ok=True)
comp = Comp(
entries=entries,
mode=config.get('comp', 'play-mode', fallback='play-current'),
mpv_vo=config.get('mpv', 'video-output', fallback=None),
mpv_vid=config.get('mpv', 'video', fallback='auto'),
ytdlf=config.get('youtube-dl', 'format', fallback='best'))
while True:
c = stdscr.getch()
if c == 10: # curses.KEY_ENTER doesn't work
if not entries: continue
mp._set_property('pause', False, bool)
play_thread = Thread(target=play, daemon=True)
play_thread.start()
elif c == 32: # space
mp._toggle_property('pause')
elif c == 65: # letter A
mp._toggle_property('mute')
elif c == 77: # letter M
mode = MODES[(MODES.index(mode) - 1) % 8]
update_status(stdscr, mp)
elif c == 86: # letter V
mp._set_property('vid',
'auto' if mp._get_property('vid') == 'no' else 'no')
elif c == 87: # letter W
if not entries: continue
prompt = _('Save playlist to [{}]:').format(json_file)
stdscr.addstr(curses.LINES - 1, 0, prompt)
curses.curs_set(True)
curses.echo()
s = stdscr.getstr(curses.LINES - 1, len(prompt) + 1).decode()
if s: json_file = s
curses.curs_set(False)
curses.noecho()
try:
makedirs(dirname(abspath(json_file)), exist_ok=True)
with open(json_file, 'w') as f:
json.dump(entries, f)
except:
update_status(stdscr, mp,
_("'{}': Can't open file for writing").format(json_file),
curses.color_pair(1))
else:
update_status(stdscr, mp,
_("'{}' written").format(json_file))
elif c == 99: # letter c
if not entries: continue
i = start + y - 1
entries[i]['selected'] = not entries[i].setdefault('selected', False)
y = move(stdscr, entries, y, 1)
elif c == 100: # letter d
if not entries: continue
i = start + y - 1
if i + 1 < len(entries):
entries.pop(i)
entries[i]['current'] = True
elif len(entries) > 1:
entries.pop(i)
entries[i - 1]['current'] = True
else:
entries = []
reprint(stdscr, entries[start : start+curses.LINES-3])
elif c == 109: # letter m
mode = MODES[(MODES.index(mode) + 1) % 8]
update_status(stdscr, mp)
elif c == 113: # letter q
comp.close()
elif c == 119: # letter w
if not entries: continue
with YoutubeDL({'format': ytdlf}) as ytdl:
ytdl.download([getlink(entry) for entry in choose_from(mode)])
elif c in (curses.KEY_UP, 107): # up arrow or letter k
if not entries: continue
y = move(stdscr, entries, y, -1)
elif c in (curses.KEY_DOWN, 106): # down arrow or letter j
if not entries: continue
y = move(stdscr, entries, y, 1)
elif c in (curses.KEY_LEFT, 104): # left arrow or letter h
if mp._get_property('duration', int):
mp.seek(-2.5)
elif c in (curses.KEY_RIGHT, 108): # right arrow or letter l
if mp._get_property('duration', int):
mp.seek(2.5)
elif c == curses.KEY_HOME: # home
if not entries: continue
y = move(stdscr, entries, y, -len(entries))
elif c == curses.KEY_END: # end
if not entries: continue
y = move(stdscr, entries, y, len(entries))
elif c == curses.KEY_NPAGE: # page down
if not entries: continue
y = move(stdscr, entries, y, curses.LINES - 4)
elif c == curses.KEY_PPAGE: # page up
if not entries: continue
y = move(stdscr, entries, y, 4 - curses.LINES)
elif c == curses.KEY_F5: # F5
reprint(stdscr, entries[start : start+curses.LINES-3])
elif c == curses.KEY_RESIZE:
curses.update_lines_cols()
if curses.COLS < MODE_STR_LEN + 42 or curses.LINES < 4:
stdscr.clear()
sizeerr = _('Current size: {}x{}. Minimum size: {}x4.').format(
curses.COLS,
curses.LINES,
MODE_STR_LEN + 42
)
stdscr.addstr(0, 0, sizeerr[:curses.LINES*curses.COLS])
else:
start += y - 1
y = 1
reprint(stdscr, entries[start : start+curses.LINES-3])