satellite-gtk/satellite/application.py

636 lines
23 KiB
Python
Executable File

# Copyright 2021-2023 Teemu Ikonen
# SPDX-License-Identifier: GPL-3.0-only
import argparse
import importlib.resources as resources
import os
import re
import signal
import sys
import time
import tokenize
from datetime import datetime
import gi
import gpxpy
import satellite.nmea as nmea
import satellite.quectel as quectel
from satellite import __version__
from .mm_glib_source import ModemManagerGLibNmeaSource
from .nmeasource import (
GnssShareNmeaSource,
ModemError,
ModemLockedError,
ModemNoNMEAError,
NmeaSourceNotFoundError,
)
from .util import bearing_to_arrow, have_touchscreen, now, unique_filename
from .widgets import DataFrame, text_barchart
gi.require_version('Gtk', '3.0')
gi.require_version('Gdk', '3.0')
gi.require_version('Handy', '1')
from gi.repository import GLib, Gdk, Gio, Gtk, Handy # noqa: E402, I100
appname = 'Satellite'
app_id = 'page.codeberg.tpikonen.satellite'
class SatelliteApp(Gtk.Application):
def __init__(self, *args, **kwargs):
Gtk.Application.__init__(
self, *args, application_id=app_id,
flags=Gio.ApplicationFlags.FLAGS_NONE, **kwargs)
Handy.init()
desc = "Displays navigation satellite data and saves GPX tracks"
parser = argparse.ArgumentParser(
description=desc, formatter_class=argparse.RawTextHelpFormatter)
parser.add_argument(
'-c', '--console-output', dest='console_output',
action='store_true', default=False,
help='Output satellite data to console')
parser.add_argument(
'-s', '--source', dest='source',
choices=['auto', 'quectel', 'mm', 'gnss-share'],
default='auto',
help="Select NMEA source. Options are:\n"
"'auto' (default) Automatic source detection\n"
"'quectel' ModemManager with Quectel quirks\n"
"'mm' ModemManager without quirks\n"
"'gnss-share' Read from gnss-share socket\n")
self.args = parser.parse_args()
GLib.unix_signal_add(GLib.PRIORITY_DEFAULT, signal.SIGINT,
self.sigint_handler)
self.builder = Gtk.Builder()
self.builder.add_from_string(
resources.read_text("satellite", "satellite.ui"))
self.builder.add_from_string(
resources.read_text("satellite", "menus.ui"))
self.builder.connect_signals(self)
for widget in self.builder.get_objects():
if not isinstance(widget, Gtk.Buildable):
continue
# The following call looks ugly, but see Gnome bug 591085
widget_name = Gtk.Buildable.get_name(widget)
widget_api_name = '_'.join(re.findall(tokenize.Name, widget_name))
if hasattr(self, widget_api_name):
raise AttributeError(
"instance %s already has an attribute %s" % (
self, widget_api_name))
else:
setattr(self, widget_api_name, widget)
self.app_menu = self.builder.get_object('app-menu')
self.menu_popover = Gtk.Popover.new_from_model(
self.app_menu_button,
self.app_menu)
self.menu_popover.set_position(Gtk.PositionType.BOTTOM)
self.source = None
self.infolabel.set_markup("<tt>" + "\n" * 10 + "</tt>")
self.dataframe = DataFrame()
# self.dataframe.header.set_text("Satellite info")
self.dataframe.header.set_visible(False)
self.set_values({}) # Initialize dataframe
self.dataframe.show()
self.main_box.add(self.dataframe)
click = Gtk.GestureMultiPress.new(self.carousel)
click.set_button(1)
click.set_touch_only(False)
click.connect("released", self.infolabel_released_cb)
setattr(self.carousel, "multi-press-gesture", click)
self.set_speedlabel(None)
self.leaflet.set_visible_child(self.databox)
self.connect('startup', self.on_startup)
self.connect('activate', self.on_activate)
self.connect('shutdown', self.on_shutdown)
# Internal state
self.last_mode = 1
self.last_data = None
self.last_speed = None
self.last_update = None
self.had_error = False
self.sigint_received = False
self.refresh_rate = 1 # Really delay between updates in seconds
# GPX
docdir = (GLib.get_user_special_dir(
GLib.UserDirectory.DIRECTORY_DOCUMENTS)
or os.path.join(GLib.get_home_dir(), 'Documents'))
self.gpx_save_dir = os.path.join(docdir, 'satellite-tracks')
self.gpx_autosave_interval = 60
self.gpxfile = None
self.gpx = None
self.gpx_segment = None
self.gpx_counter = 0
# Bar chart heights
self.chart_small = 10
self.chart_large = 30
self.chart_size = self.chart_small
self.inhibit_cookie = 0
def create_actions(self):
app_actions = (
('menu', self.on_menu),
('about', self.on_about),
('record', self.on_record),
)
for aname, cb in app_actions:
action = Gio.SimpleAction.new(aname, None)
action.connect('activate', cb)
self.add_action(action)
def setup_styles(self):
provider = Gtk.CssProvider()
provider.load_from_data(
resources.read_binary("satellite", "main.css"))
Gtk.StyleContext.add_provider_for_screen(
Gdk.Screen.get_default(), provider,
Gtk.STYLE_PROVIDER_PRIORITY_APPLICATION
)
def on_startup(self, app):
self.create_actions()
def on_activate(self, app):
self.setup_styles()
self.add_window(self.window)
self.window.show()
if have_touchscreen():
self.datascroll.connect('edge-overshot', self.on_edge_overshot)
self.log_msg(f"{appname} version {__version__} started")
# Initialize modem after GUI startup
GLib.timeout_add(1000, self.init_source, None)
def on_shutdown(self, app):
print("Cleaning up...")
self.gpx_write()
if self.source is not None:
self.source.close()
print("...done.")
def init_source(self, unused):
source_init = False
if self.args.source == 'auto':
self.log_msg("Detecting NMEA sources...")
if not source_init:
source_init = self.init_gnss_share_source(autodetect=True)
if not source_init:
source_init = self.init_mm_source(
quirks=['detect'], autodetect=True)
if not source_init:
self.log_msg('NMEA source not found')
dialog = Gtk.MessageDialog(
parent=self.window, modal=True,
message_type=Gtk.MessageType.ERROR,
buttons=Gtk.ButtonsType.OK,
text="Could not find an NMEA source")
dialog.set_title("Error initializing NMEA source")
dialog.run()
dialog.destroy()
return GLib.SOURCE_REMOVE
else:
self.log_msg(f'NMEA source "{self.args.source}" selected')
if self.args.source == 'quectel':
source_init = self.init_mm_source(quirks=['QuectelTalker'])
elif self.args.source == 'mm':
source_init = self.init_mm_source()
elif self.args.source == 'gnss-share':
source_init = self.init_gnss_share_source()
if not source_init:
self.log_msg('Could not initialize NMEA source')
return GLib.SOURCE_REMOVE
self.log_msg(
f"Source is {self.source.manufacturer}, model {self.source.model}"
+ (f", revision {self.source.revision}" if self.source.revision else "")
+ (f" using {', '.join(self.source.quirks)} quirks"
if hasattr(self.source, "quirks") and self.source.quirks else ""))
if (self.source.model and self.source.model.startswith("QUECTEL")):
constellations = quectel.get_constellations(self.source)
if constellations is not None:
self.log_msg("Supported constellations: "
+ ", ".join(constellations))
xtradates = quectel.get_xtradata_dates(self.source, fix_week=False)
if xtradates is not None:
dt1, dt2 = xtradates
self.log_msg("XTRA data is valid from %s to %s" % (
dt1.isoformat(' ', 'minutes'),
dt2.isoformat(' ', 'minutes')))
self.log_msg("XTRA data is "
+ ("VALID" if now < dt2 else "NOT valid"))
GLib.timeout_add(self.refresh_rate * 1000, self.timeout_cb, None)
return GLib.SOURCE_REMOVE
def init_gnss_share_source(self, autodetect=False):
try:
self.source = GnssShareNmeaSource(self.location_update_cb)
self.source.initialize()
except Exception as e:
if autodetect:
return False
self.log_msg(str(e))
dtext = str(e)
dialog = Gtk.MessageDialog(
parent=self.window, modal=True,
message_type=Gtk.MessageType.ERROR,
buttons=Gtk.ButtonsType.OK, text=dtext)
dialog.set_title("Error initializing NMEA source")
dialog.run()
dialog.destroy()
return False
return True
def init_mm_source(self, quirks=[], autodetect=False):
try:
self.source = ModemManagerGLibNmeaSource(
self.location_update_cb,
refresh_rate=self.refresh_rate,
quirks=quirks,
# save_filename=unique_filename(self.gpx_save_dir + '/nmeas',
# '.txt')
)
self.source.initialize()
except Exception as e:
if autodetect:
return False
if isinstance(e, ModemLockedError):
self.log_msg("Modem is locked")
dtext = "Please unlock the Modem"
else:
etext = str(e)
self.log_msg(f"Error initializing ModemManager NMEA source: {etext}")
dtext = etext if etext else (
"Could not initialize ModemManager NMEA source")
dialog = Gtk.MessageDialog(
parent=self.window, modal=True,
message_type=Gtk.MessageType.ERROR,
buttons=Gtk.ButtonsType.OK,
text=dtext)
dialog.set_title("Error initializing NMEA source")
dialog.run()
dialog.destroy()
return False
return True
def sigint_handler(self):
if not self.sigint_received:
print("Interrupt signal (Ctrl-C) received")
self.sigint_received = True
self.quit()
else:
print("Interrupt signal (Ctrl-C) received again, force exit")
sys.exit(0)
def on_menu(self, action, param):
self.menu_popover.popup()
def on_edge_overshot(self, scrolledwindow, pos):
if pos == Gtk.PositionType.TOP:
self.menu_popover.popup()
def on_about(self, *args):
adlg = Gtk.AboutDialog(
transient_for=self.window,
modal=True,
program_name=appname,
logo_icon_name=app_id,
version=__version__,
comments="A program for showing navigation satellite data",
license_type=Gtk.License.GPL_3_0_ONLY,
copyright="Copyright 2021-2023 Teemu Ikonen",
)
adlg.present()
def on_record(self, *args):
assert self.gpxfile is None
namestem = self.gpx_save_dir + '/track'
self.gpxfile = unique_filename(namestem, '.gpx', timestamp=True)
if self.gpxfile is None:
raise FileExistsError(namestem)
os.makedirs(os.path.dirname(self.gpxfile), exist_ok=True)
# TODO: In-app notification for renaming the track
gpx = gpxpy.gpx.GPX()
gpx_track = gpxpy.gpx.GPXTrack()
gpx.tracks.append(gpx_track)
gpx_segment = gpxpy.gpx.GPXTrackSegment()
gpx_track.segments.append(gpx_segment)
self.gpx = gpx
self.gpx_segment = gpx_segment
self.gpxcounter = self.gpx_autosave_interval - 5
self.record_revealer.set_reveal_child(True)
self.lookup_action('record').set_enabled(False)
self.gpx_write()
self.log_msg("Started saving track to '%s'" % self.gpxfile)
self.inhibit_cookie = self.inhibit(
self.window, Gtk.ApplicationInhibitFlags.SUSPEND,
"Recording GPX track")
if self.inhibit_cookie == 0:
self.log_msg("Failed to inhibit system suspend")
else:
self.log_msg("Inhibiting system suspend")
def on_stoprecord_button_clicked(self, *args):
dialog = Gtk.MessageDialog(
transient_for=self.window,
flags=0,
message_type=Gtk.MessageType.QUESTION,
buttons=Gtk.ButtonsType.YES_NO,
text="Stop recording?",
secondary_text="Do you want stop recording a track?"
)
response = dialog.run()
if response == Gtk.ResponseType.YES:
self.gpx_write()
self.log_msg("Track closed and saved to '%s'" % self.gpxfile)
self.uninhibit(self.inhibit_cookie)
self.gpx = None
self.gpxfile = None
self.gpx_segment = None
self.gpx_counter = 0
self.record_revealer.set_reveal_child(False)
self.lookup_action('record').set_enabled(True)
# TODO: In-app notification for renaming the track
elif response == Gtk.ResponseType.NO:
pass
dialog.destroy()
def leaflet_forward_cb(self, button):
self.leaflet.navigate(Handy.NavigationDirection.FORWARD)
def leaflet_back_cb(self, button):
self.leaflet.navigate(Handy.NavigationDirection.BACK)
def infolabel_released_cb(self, gesture, n_press, x, y):
if n_press != 1 or self.carousel.get_position() > 0.5:
return
self.chart_size = self.chart_small if (
self.chart_size == self.chart_large) else self.chart_large
self.set_barchart(self.last_data)
def carousel_page_changed_cb(self, carousel, index):
if index == 1 and self.chart_size == self.chart_large:
self.chart_size = self.chart_small
self.set_barchart(self.last_data)
return False
def set_barchart(self, data):
if data is None:
return ''
barchart = text_barchart(
((e['prn'], e['snr']) for e in data['visibles']),
data['actives'], height=self.chart_size)
self.infolabel.set_markup("<tt>" + barchart + "</tt>")
return barchart
def set_values(self, data):
def to_str(x, fmt="%s"):
return fmt % x if x is not None else "-"
def get_actives(xkey):
actives = str(len(data.get("actives", [])))
inuse = str(data.get("num_sats", "n/a"))
return "%s / %s" % (actives, inuse)
def get_ages(xkey):
up_age = to_str(data.get("updateage"), "%0.0f s")
fixage = to_str(data.get("fixage"), "%0.0f s")
return "%s / %s" % (up_age, fixage)
mode2fix = {
"2": "2 D",
"3": "3 D",
}
utcfmt = "%H:%M:%S UTC"
# Mapping: Data key, description, converter func
order = [
("mode", "Fix type", lambda x: mode2fix.get(x, "No Fix")),
("mode_indicator", "Modes (GP,GL,GA)",
lambda x: str(x) if x is not None else "n/a"),
("actives", "Active / in use sats", get_actives),
("visibles", "Receiving sats", lambda x: str(len(
[r for r in x if r['snr'] > 0.0]))),
("visibles", "Visible sats", lambda x: str(len(x))),
# ("fixage", "Age of fix", lambda x: to_str(x, "%0.0f s")),
("fixage", "Age of update / fix", get_ages),
("systime", "Sys. Time", lambda x: x.strftime(utcfmt)),
("latlon", "Latitude", lambda x: "%0.6f" % x[0] if x else "-"),
("latlon", "Longitude", lambda x: "%0.6f" % x[1] if x else "-"),
("altitude", "Altitude", lambda x: to_str(x, "%0.1f m")),
# ("fixtime", "Time of fix",
# lambda x: x.strftime(utcfmt) if x else "-"),
# ("date", "Date of fix",
# lambda x: x.strftime("%Y-%m-%d") if x else "-"),
("speed", "Speed", lambda x: to_str(x, "%0.1f m/s")),
("true_course", "True Course",
lambda x: to_str(x, "%0.1f deg ")
+ (bearing_to_arrow(x) if x is not None else "")),
("pdop", "PDOP", lambda x: to_str(x)),
("hdop", "HDOP", lambda x: to_str(x)),
("vdop", "VDOP", lambda x: to_str(x)),
]
descs = []
vals = []
for key, desc, fun in order:
if key not in data.keys():
value = "n/a"
else:
value = fun(data[key])
descs.append(desc)
vals.append(value)
if self.args.console_output:
print(f"{desc}: {value}")
if self.dataframe.rows != len(descs):
self.dataframe.set_rowtitles(descs)
self.dataframe.set_values(vals)
def set_status(self, mode):
mode = int(mode) if mode else 0
if mode == self.last_mode:
return
if mode == 2:
image = 'face-smile-symbolic'
elif mode == 3:
image = 'face-cool-symbolic'
else:
image = 'face-crying-symbolic'
self.left_status.set_from_icon_name(image, Gtk.IconSize.DND)
self.right_status.set_from_icon_name(image, Gtk.IconSize.DND)
self.last_mode = mode
def set_speedlabel(self, speed, bearing=None):
spd = str(int(3.6 * speed)) if speed else "-"
arrow = bearing_to_arrow(bearing) if bearing is not None else ""
speedfmt = '<span size="50000">%s%s</span>\n<span size="30000">%s</span>'
speedstr = speedfmt % (spd, arrow, "km/h")
self.speedlabel.set_markup(speedstr)
def gpx_write(self):
if not (self.gpx and self.gpxfile):
return
with open(self.gpxfile, 'w') as fp:
fp.write(self.gpx.to_xml('1.0')) # v1.0 has speed-tag
def gpx_update(self, data):
latlon = data.get("latlon")
if not (latlon and self.gpx and self.gpx_segment):
return
fixage = data.get("fixage")
if fixage and fixage > (2 * self.refresh_rate):
return
# mode2gpxmode = {
# "2": "2d",
# "3": "3d",
# None: "none",
# }
self.gpx_segment.points.append(gpxpy.gpx.GPXTrackPoint(
latlon[0], latlon[1],
elevation=data.get("altitude"),
time=data.get("datetime"),
# satellites=data.get("actives"),
# type_of_gpx_fix=mode2gpxmode.get(data.get("mode"), "none"),
speed=data.get("speed"),
horizontal_dilution=data.get("hdop"),
vertical_dilution=data.get("vdop"),
position_dilution=data.get("pdop")))
self.gpx_counter += 1
if self.gpx_counter > self.gpx_autosave_interval:
self.gpx_write()
self.gpx_counter = 0
self.log_msg("Auto-saving track")
def log_msg(self, text):
maxlines = 100 # Maximum num of lines in GtkLabel
msg = datetime.now().strftime("[%H:%M:%S] ") + text
text = self.loglabel.get_text().split('\n')
if len(text) > maxlines:
text = text[1:]
text.append(msg)
self.loglabel.set_text("\n".join(text))
print(msg)
def timeout_cb(self, x):
dt = (time.time() - self.last_update) if self.last_update else 100
if dt > 2 * self.refresh_rate:
self.main_box.set_sensitive(False)
self.update()
return GLib.SOURCE_CONTINUE
def location_update_cb(self, *args):
self.last_update = time.time()
self.main_box.set_sensitive(True)
self.update()
def update(self):
try:
nmeas = self.source.get()
if self.had_error:
self.log_msg("Getting updates")
self.main_box.set_sensitive(True)
self.had_error = False
data = nmea.parse(nmeas)
except Exception as e:
nmeas = None
show_dialog = False
etext = str(e)
dtext = None
if isinstance(e, ModemLockedError):
dtext = "Please unlock the Modem"
show_dialog = True
elif isinstance(e, ModemNoNMEAError):
dtext = "NMEA info not received with location"
elif isinstance(e, ModemError):
dtext = "Modem error: " + str(e)
elif isinstance(e, NmeaSourceNotFoundError):
if not self.had_error:
dtext = etext if etext else "Modem disappeared"
self.had_error = True
self.main_box.set_sensitive(False)
else:
dtext = etext if etext else "Unknown error"
if not self.had_error:
if show_dialog:
dialog = Gtk.MessageDialog(
parent=self.window, modal=True,
message_type=Gtk.MessageType.ERROR,
buttons=Gtk.ButtonsType.OK, text=dtext)
dialog.set_title("Error")
dialog.run()
dialog.destroy()
elif dtext is not None:
self.log_msg(dtext)
self.had_error = True
if self.last_data is None:
return
else:
data = self.last_data
data["updateage"] = ((time.time() - self.last_update)
if self.last_update else None)
barchart = self.set_barchart(data)
if self.args.console_output:
print(barchart)
self.set_values(data)
speed = data['speed']
bearing = data['true_course']
self.set_speedlabel(speed, bearing)
if speed and not self.last_speed:
self.carousel.scroll_to(self.speedlabel)
elif not speed and self.last_speed:
self.carousel.scroll_to(self.infolabel)
self.last_speed = speed
# log
mode = data["mode"]
mode = int(mode) if mode else 0
if mode != self.last_mode:
if mode > 1:
self.log_msg(f"Got lock, mode: {mode}")
elif mode <= 1:
self.log_msg("Lock lost")
self.last_mode = mode
if self.gpx is not None and data.get("valid"):
self.gpx_update(data)
self.last_data = data