comp/comp

458 lines
17 KiB
Python
Executable file

#!/usr/bin/env python3
# comp - Curses Online Media Player
# Copyright (C) 2017 Nguyễn Gia Phong
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import curses
import json
from argparse import ArgumentParser
from configparser import ConfigParser
from curses.ascii import ctrl
from datetime import datetime
from functools import reduce
from gettext import gettext as _, textdomain
from itertools import cycle
from os import linesep, makedirs
from os.path import abspath, dirname, expanduser, isfile
from random import choice
from time import gmtime, strftime
from threading import Thread
from youtube_dl import YoutubeDL
from mpv import MPV
# Init gettext
textdomain('comp')
# Global constants
SYSTEM_CONFIG = '/etc/comp/settings.ini'
USER_CONFIG = expanduser('~/.config/comp/settings.ini')
MPV_LOG = expanduser('~/.cache/comp/mpv.log')
MODES = ('play-current', 'play-all', 'play-selected', 'repeat-current',
'repeat-all', 'repeat-selected', 'shuffle-all', 'shuffle-selected')
MODE_STR_LEN = max(len(_(mode)) for mode in MODES)
def mpv_logger(loglevel, component, message):
mpv_log = '{} [{}] {}: {}{}'.format(datetime.isoformat(datetime.now()),
loglevel, component, message, linesep)
with open(MPV_LOG, 'a') as f:
f.write(mpv_log)
class Comp(object):
"""Meta object for drawing and playing.
Attributes:
active (bool): flag show if anything is being played
entries (list): list of all tracks
mode (str): the mode to pick and play tracks
mp (MPV): an mpv instance
play_backward (bool): flag show if to play the previous track
play_list (list): list of tracks according to mode
played (list): list of previously played tracks
playing (int): index of playing track in played
playlist (iterator): iterator of tracks according to mode
reading (bool): flag show if user input is being read
start (int): index of the first track to be printed on screen
scr (curses WindowObject): curses window object
vid (str): flag show if video output is enabled
y (int): the current y-coordinate
"""
def __new__(cls, entries, mode, mpv_vo, mpv_vid, ytdlf):
self = super(Comp, cls).__new__(cls)
self.mode, self.vid = mode, mpv_vid
self.active, self.play_backward, self.reading = False, False, False
self.playing, self.start, self.y = -1, 0, 1
self.entries, self.playlist, self.played = entries, iter(()), []
self.mp = MPV(input_default_bindings=True, input_vo_keyboard=True,
log_handler=mpv_logger, ytdl=True, ytdl_format=ytdlf)
self.scr = curses.initscr()
return self
def setno(self, *keys):
"""Set all keys of each entry in entries to False."""
for entry in self.entries:
for key in keys:
entry[key] = False
def update_status(self, message='', msgattr=curses.A_NORMAL):
"""Update the status lines at the bottom of the screen."""
def adds(s, a, y=curses.LINES-2, x=0):
if not self.reading: self.scr.addstr(y, x, s, a)
def sectoosd(pos, duration):
"""Quick hack to convert a pair of seconds to HHMMSS/HHMMSS
string as MPV.get_property_osd_string isn't available.
"""
postime, durationtime = gmtime(pos), gmtime(duration)
# Let's hope media durations are shorter than a day
timestr = '%M:%S' if duration < 3600 else '%H:%M:%S'
return '{} / {}'.format(strftime(timestr, postime),
strftime(timestr, durationtime))
right = ' {} {}{} '.format(
_(self.mode), ' ' if self.mp._get_property('mute', bool) else 'A',
' ' if self.vid == 'no' else 'V')
adds(right.rjust(curses.COLS), curses.color_pair(12))
try:
left = ' {} {} '.format(
sectoosd(self.mp._get_property('time-pos', int),
self.mp._get_property('duration', int)),
'|' if self.mp._get_property('pause', bool) else '>')
title_len = curses.COLS - len(left + right)
center = self.mp._get_property('media-title').ljust(title_len)[:title_len]
except:
pass
else:
adds(left, curses.color_pair(12))
adds(center, curses.color_pair(12) | curses.A_BOLD, x=len(left))
if message:
self.scr.move(curses.LINES - 1, 0)
self.scr.clrtoeol()
adds(message.ljust(curses.COLS)[:curses.COLS-1], msgattr,
y=curses.LINES-1, x=0)
self.scr.refresh()
def getlink(self, entry):
"""Return an URL from the given entry."""
with YoutubeDL({'quiet': True}) as ytdl:
return entry.setdefault('webpage_url', ytdl.extract_info(
entry['url'], download=False, ie_key=entry.get('ie_key')
).get('webpage_url'))
def play(self, force=False):
"""Play the next track."""
def mpv_play(entry, force):
self.active = True
self.setno('playing')
entry['playing'] = True
self.redraw()
self.mp._set_property('vid', self.vid)
try:
self.mp.play(self.getlink(entry))
except:
entry['error'] = True
if force: self.mp._set_property('pause', False, bool)
self.mp.wait_for_playback()
self.active = False
entry['playing'] = False
self.redraw()
if self.play_backward and -self.playing < len(self.played):
self.playing -= 1
t = self.played[self.playing], force
elif self.playing < -1:
self.playing += 1
t = self.played[self.playing], force
else:
try:
self.played.append(next(self.playlist))
except StopIteration:
return
else:
t = self.played[-1], force
self.play_backward = False
play_thread = Thread(target=mpv_play, args=t, daemon=True)
play_thread.start()
def generic_event_handler(self, event):
"""Reprint status line and play next entry if the last one is
ended without caring about the event.
"""
self.update_status()
if not self.active: self.play()
def reattr(self, y=None):
"""Set the attributes of line y, if y is None the current line
will be picked."""
if y is None: y = self.y
entry = self.entries[self.start + y - 1]
c = {'error': 1, 'playing': 3, 'selected': 5}
color = ((8 if entry is self.current() else 0)
| reduce(int.__xor__, (c.get(i, 0) for i in entry if entry[i])))
if color:
self.scr.chgat(y, 0, curses.color_pair(color) | curses.A_BOLD)
else:
self.scr.chgat(y, 0, curses.A_NORMAL)
def redraw(self):
"""Redraw the whole screen."""
def _max(a): return max(a) if a else 0
self.scr.clear()
self.scr.addstr(0, 1, _('Title'))
sitenamelen = max(_max([len(entry['ie_key']) for entry in self.entries]), 6)
self.scr.addstr(0, curses.COLS - sitenamelen - 1, _('Source'))
self.scr.chgat(0, 0, curses.color_pair(10) | curses.A_BOLD)
for i, entry in enumerate(self.entries[self.start:][:curses.LINES-3]):
self.scr.addstr(i + 1, 0, entry['ie_key'].rjust(curses.COLS - 1))
self.scr.addstr(i + 1, 1, entry.get('title', '')[:curses.COLS-sitenamelen-3])
self.reattr(i + 1)
self.update_status()
def __init__(self, entries, mode, mpv_vo, mpv_vid, ytdlf):
self.setno('error', 'playing', 'selected')
if mpv_vo: self.mp['vo'] = mpv_vo
self.mp.observe_property('mute', lambda event: self.update_status())
self.mp.observe_property('pause', lambda event: self.update_status())
self.mp.observe_property('time-pos', self.generic_event_handler)
curses.noecho()
curses.cbreak()
self.scr.keypad(True)
curses.curs_set(False)
curses.start_color()
curses.use_default_colors()
for i in range(1, 8): curses.init_pair(i, i, -1)
curses.init_pair(8, -1, 7)
for i in range(1, 7): curses.init_pair(i + 8, -1, i)
self.redraw()
def __enter__(self): return self
def idx(self):
"""Return the index of the current entry."""
return self.start + self.y - 1
def current(self):
"""Return the current entry."""
try:
return self.entries[self.idx()]
except:
return {}
def update_play_list(self, pick):
"""Update the list of entries to be played."""
if pick == 'current':
self.play_list = [self.current()]
elif pick == 'all':
self.play_list = self.entries
else:
self.play_list = [i for i in self.entries if i['selected']]
def update_playlist(self):
"""Update the playlist to be used by play function."""
action, pick = self.mode.split('-')
self.update_play_list(pick)
if action == 'play':
self.playlist = iter(self.play_list)
elif action == 'repeat':
self.playlist = cycle(self.play_list)
else:
self.playlist = iter(lambda: choice(self.play_list), None)
self.played = self.played[:self.playing]
def move(self, delta):
"""Move to the relatively next delta entry."""
if not (self.entries and delta): return
start, y = self.start, self.y
maxy = min(len(self.entries), curses.LINES - 3)
if self.idx() + delta <= 0:
self.start, self.y = 0, 1
elif self.idx() + delta >= len(self.entries):
self.start, self.y = len(self.entries) - maxy, maxy
elif self.y + delta < 1:
self.start += self.y + delta - 1
self.y = 1
elif self.y + delta > curses.LINES - 3:
self.start += self.y + delta - maxy
self.y = maxy
else:
self.y += delta
if self.start == start:
self.reattr(y)
self.reattr()
self.scr.refresh()
else:
self.redraw()
def gets(self, prompt):
"""Print the prompt string at the bottom of the screen then read
from standard input.
"""
self.scr.addstr(curses.LINES - 1, 0, prompt)
self.reading = True
curses.curs_set(True)
curses.echo()
b = self.scr.getstr(curses.LINES - 1, len(prompt))
self.reading = False
curses.curs_set(False)
curses.noecho()
return b.decode()
def seek(self, amount, reference='relative'):
"""Quick hack to fix MPV seek-double bug."""
if reference == 'relative': amount /= 2
try:
self.mp.seek(amount, reference)
except:
pass
def __exit__(self, exc_type, exc_value, traceback):
curses.nocbreak()
self.scr.keypad(False)
curses.echo()
curses.endwin()
self.mp.terminate()
parser = ArgumentParser(description=_("Curses Online Media Player"))
parser.add_argument('-c', '--config', required=False,
help=_('path to custom config file'))
parser.add_argument('-u', '--online-playlist', required=False, metavar='URL',
help=_('URL to an playlist on Youtube'))
parser.add_argument('-j', '--json-playlist', required=False, metavar='path',
help=_('path to playlist in JSON format'))
args = parser.parse_args()
config = ConfigParser()
config.read(USER_CONFIG if isfile(USER_CONFIG) else SYSTEM_CONFIG)
if args.json_playlist:
json_file = args.json_playlist
with open(json_file) as f:
entries = json.load(f)
elif args.online_playlist:
with YoutubeDL({'extract_flat': 'in_playlist', 'quiet': True}) as ytdl:
info = ytdl.extract_info(args.online_playlist, download=False)
entries = info.get('entries', [])
json_file = ''
else:
entries = []
json_file = ''
makedirs(dirname(MPV_LOG), exist_ok=True)
with Comp(
entries,
config.get('comp', 'play-mode', fallback='play-current'),
config.get('mpv', 'video-output', fallback=None),
config.get('mpv', 'video', fallback='auto'),
config.get('youtube-dl', 'format', fallback='best')
) as comp:
c = comp.scr.getch()
while c != 113: # letter q
if c == 10: # curses.KEY_ENTER doesn't work
comp.update_playlist()
if comp.active:
comp.seek(100, 'absolute-percent')
comp.mp._set_property('pause', False, bool)
else:
comp.play(force=True)
elif c == 32: # space
comp.current()['selected'] = not comp.current()['selected']
comp.move(1)
elif c == 60: # <
try:
if comp.mp._get_property('time-pos', int) < 5:
comp.play_backward = True
comp.seek(100, 'absolute-percent')
else:
comp.seek(0, 'absolute')
except:
pass
elif c == 62: # >
comp.seek(100, 'absolute-percent')
elif c == 65: # letter A
comp.mp._toggle_property('mute')
elif c == 77: # letter M
comp.mode = MODES[(MODES.index(comp.mode) - 1) % 8]
comp.update_status()
elif c == 85: # letter U
with YoutubeDL({'extract_flat': True, 'quiet': True}) as ytdl:
try:
info = ytdl.extract_info(
comp.gets(_("Open online playlist: ")), download=False)
except:
comp.redraw()
else:
comp.entries = info.get('entries', {})
comp.start, comp.y = 0, 1
comp.setno('error', 'playing', 'selected')
comp.redraw()
elif c == 86: # letter V
comp.vid = 'auto' if comp.vid == 'no' else 'no'
comp.mp._set_property('vid', comp.vid)
comp.update_status()
elif c == 87: # letter W
if not comp.entries: continue
s = comp.gets(_('Save playlist to [{}]: ').format(json_file))
if s: json_file = s
try:
makedirs(dirname(abspath(json_file)), exist_ok=True)
with open(json_file, 'w') as f:
json.dump(entries, f)
except:
comp.update_status(
_("'{}': Can't open file for writing").format(json_file),
curses.color_pair(1))
else:
comp.update_status(_("'{}' written").format(json_file))
elif c == 100: # letter d
if not comp.entries: continue
i = comp.idx()
if i + 1 < len(entries):
comp.entries.pop(i)
elif len(entries) > 1:
comp.entries.pop(i)
else:
comp.entries = []
comp.redraw()
elif c == 112: # letter p
comp.mp._toggle_property('pause')
elif c == 109: # letter m
comp.mode = MODES[(MODES.index(comp.mode) + 1) % 8]
comp.update_status()
elif c == 119: # letter w
comp.update_play_list(comp.mode.split('-')[1])
with YoutubeDL({'quiet': True}) as ytdl:
ytdl.download([comp.getlink(i) for i in comp.play_list])
elif c in (curses.KEY_UP, 107): # up arrow or letter k
comp.move(-1)
elif c in (curses.KEY_DOWN, 106): # down arrow or letter j
comp.move(1)
elif c in (curses.KEY_LEFT, 104): # left arrow or letter h
comp.seek(-5)
elif c in (curses.KEY_RIGHT, 108): # right arrow or letter l
comp.seek(5)
elif c == curses.KEY_HOME: # home
comp.move(-len(comp.entries))
elif c == curses.KEY_END: # end
comp.move(len(comp.entries))
elif c == curses.KEY_NPAGE: # page down
comp.move(curses.LINES - 4)
elif c == curses.KEY_PPAGE: # page up
comp.move(4 - curses.LINES)
elif c == curses.KEY_F5: # F5
comp.redraw()
elif c == curses.KEY_RESIZE:
curses.update_lines_cols()
if curses.COLS < MODE_STR_LEN + 42 or curses.LINES < 4:
comp.scr.clear()
sizeerr = _('Current size: {}x{}. Minimum size: {}x4.').format(
curses.COLS, curses.LINES, MODE_STR_LEN + 42)
comp.scr.addstr(0, 0, sizeerr[:curses.LINES*curses.COLS])
else:
comp.start += comp.y - 1
comp.y = 1
comp.redraw()
c = comp.scr.getch()