Update upstream source from tag 'upstream/0.4.2'

Update to upstream version '0.4.2'
with Debian dir 7222150841
This commit is contained in:
Arnaud Ferraris 2023-10-04 12:15:54 +02:00
commit c82cf9e70f
20 changed files with 431 additions and 350 deletions

26
.editorconfig Normal file
View file

@ -0,0 +1,26 @@
# EditorConfig is awesome: http://EditorConfig.org
# top-most EditorConfig file
root = true
# Unix-style newlines with a newline ending every file
[*]
end_of_line = lf
insert_final_newline = true
trim_trailing_whitespace = true
charset = utf-8
# 4 space indentation
[*.{py,java,r,R}]
indent_style = space
indent_size = 4
# 2 space indentation
[*.{js,json,y{a,}ml,html,cwl}]
indent_style = space
indent_size = 2
[*.{md,Rmd,rst}]
trim_trailing_whitespace = false
indent_style = space
indent_size = 2

View file

@ -5,7 +5,7 @@
![Expanded satellite SNR view](https://github.com/flathub/page.codeberg.tpikonen.satellite/raw/master/screenshot-snr.png)
![Speedometer and track recording](https://github.com/flathub/page.codeberg.tpikonen.satellite/raw/master/screenshot-track.png)
Satellite is an adaptive GTK / libhandy application which displays global navigation satellite system
Satellite is an adaptive GTK3 / libhandy application which displays global navigation satellite system
(GNSS: GPS et al.) data obtained from [ModemManager](https://www.freedesktop.org/wiki/Software/ModemManager/)
or [gnss-share](https://gitlab.com/postmarketOS/gnss-share). It can also save your position to a GPX-file.
@ -15,7 +15,7 @@ GPL-3.0
## Dependencies:
python 3.6+, gi, Gtk, libhandy, pydbus, pynmea2, gpxpy
python 3.6+, gi, Gtk3, libhandy, libmm-glib, pynmea2, gpxpy
## Installing and running
@ -45,9 +45,9 @@ Run the script `bin/satellite`.
Run
pip3 install --user ./
pip install --user ./
in the source tree root.
in the source tree root (use `pipx` instead of `pip` if necessary).
This creates an executable Python script in `$HOME/.local/bin/satellite`.
@ -55,7 +55,7 @@ This creates an executable Python script in `$HOME/.local/bin/satellite`.
Run
flatpak-builder --install --user build-dir flatpak/page.codeberg.tpikonen.satellite.json
flatpak-builder --install --user build-dir flatpak/page.codeberg.tpikonen.satellite.yaml
in the source tree root to install a local build to the user flatpak repo.

View file

@ -1,6 +1,6 @@
#!/usr/bin/env python3
# Copyright 2021-2022 Teemu Ikonen
# Copyright 2021-2023 Teemu Ikonen
# SPDX-License-Identifier: GPL-3.0-only
import os

View file

@ -1,5 +1,5 @@
<?xml version="1.0" encoding="UTF-8"?>
<!-- Copyright 2021-2022 Teemu Ikonen -->
<!-- Copyright 2021-2023 Teemu Ikonen -->
<!-- SPDX-License-Identifier: GPL-3.0-only -->
<component type="desktop-application">
<id>page.codeberg.tpikonen.satellite</id>
@ -9,10 +9,11 @@
<summary>Check your GPS reception and save your tracks</summary>
<description>
<p>Satellite displays global navigation satellite system (GNSS: that's GPS,
Galileo, Glonass etc.) data obtained from the ModemManager API. You can use
it to check the navigation satellite signal strength in your location and
see your speed, coordinates and other parameters once a fix is obtained.
It can also save GPX-tracks of your travels.</p>
Galileo, Glonass etc.) data obtained from an NMEA source in your device.
Currently the ModemManager and gnss-share APIs are supported. You can use
it to check the navigation satellite signal strength and see your speed,
coordinates and other parameters once a fix is obtained. It can also save
GPX-tracks of your travels.</p>
</description>
<launchable type="desktop-id">page.codeberg.tpikonen.satellite.desktop</launchable>
<url type="homepage">https://codeberg.org/tpikonen/satellite</url>
@ -48,6 +49,35 @@
</screenshot>
</screenshots>
<releases>
<release version="0.4.2" date="2023-09-23">
<description>
<p>The geoidal release</p>
<ul>
<li>Add 'Geoidal separation' field to dataframe</li>
<li>Display DOPs (PDOP, HDOP, VDOP) on a single dataframe line</li>
<li>Various small fixes to gnss-share source, logging, NMEA parsing etc.</li>
</ul>
</description>
</release>
<release version="0.4.1" date="2023-05-26">
<description>
<p>The automatic release</p>
<ul>
<li>Autodetect sources and source quirks when --source option is not given</li>
<li>Some small fixes to mm_glib_source, NMEA parsing, flatpak, etc.</li>
</ul>
</description>
</release>
<release version="0.4.0" date="2023-03-22">
<description>
<p>The managerial release</p>
<ul>
<li>Use mm-glib to talk to ModemManager, remove pydbus</li>
<li>Support 'quirks' in the ModemManager source, e.g. Quectel talker fixes</li>
<li>Various reliability fixes</li>
</ul>
</description>
</release>
<release version="0.3.1" date="2022-11-17">
<description>
<p>The quickfix release </p>

View file

@ -1,39 +0,0 @@
{
"app-id": "page.codeberg.tpikonen.satellite",
"runtime": "org.gnome.Platform",
"runtime-version": "43",
"sdk": "org.gnome.Sdk",
"command": "satellite",
"rename-desktop-file": "satellite.desktop",
"finish-args": [
"--socket=fallback-x11",
"--socket=wayland",
"--share=ipc",
"--device=dri",
"--talk-name=org.gtk.vfs.*",
"--system-talk-name=org.freedesktop.ModemManager1.*",
"--filesystem=xdg-documents/satellite-tracks:create"
],
"cleanup": [
],
"modules": [
"python3-requirements.json",
{
"name": "satellite",
"sources": [
{
"type": "git",
"path": "../",
"branch": "main"
}
],
"buildsystem": "simple",
"build-commands": [
"pip3 install --verbose --no-index --no-deps --no-build-isolation --prefix=${FLATPAK_DEST} ./"
],
"post-install": [
"install -Dm644 data/appdata.xml $FLATPAK_DEST/share/metainfo/$FLATPAK_ID.appdata.xml"
]
}
]
}

View file

@ -0,0 +1,42 @@
app-id: page.codeberg.tpikonen.satellite
runtime: org.gnome.Platform
runtime-version: "45"
sdk: org.gnome.Sdk
command: satellite
rename-desktop-file: satellite.desktop
finish-args:
- --socket=fallback-x11
- --socket=wayland
- --share=ipc
- --device=dri
- --talk-name=org.gtk.vfs.*
- --system-talk-name=org.freedesktop.ModemManager1.*
- --filesystem=xdg-documents/satellite-tracks:create
- --filesystem=/run/gnss-share.sock:ro
cleanup: []
modules:
- python3-requirements.json
- name: ModemManager
config-opts:
- --without-udev
- --with-udev-base-dir=/app/lib/udev
- --with-systemdsystemunitdir=/app/lib/systemd/system
- --without-examples
- --without-tests
- --without-mbim
- --without-qmi
- --without-qrtr
- --without-man
sources:
- type: archive
url: https://gitlab.freedesktop.org/mobile-broadband/ModemManager/-/archive/1.20.6/ModemManager-1.20.6.tar.gz
sha256: d3e8112810e48ba32e80757fced218cf65b135b5a2987dad6b431d8cfbba765f
- name: satellite
sources:
- type: git
path: ../
branch: main
buildsystem: simple
build-commands:
- pip3 install --verbose --no-index --no-deps --no-build-isolation --prefix=${FLATPAK_DEST} ./
- install -Dm644 data/appdata.xml $FLATPAK_DEST/share/metainfo/$FLATPAK_ID.appdata.xml

View file

@ -26,22 +26,8 @@
"sources": [
{
"type": "file",
"url": "https://files.pythonhosted.org/packages/c9/13/6117f735c3e8083bfce0ccd31a1d561fc2adb0e0e2d1ab3ace12256a3513/pynmea2-1.18.0-py3-none-any.whl",
"sha256": "098f9ffd89c4a6c5e137b8b59e5b38194888d4a557c50b003ebcf2c3c15ec22e"
}
]
},
{
"name": "python3-pydbus",
"buildsystem": "simple",
"build-commands": [
"pip3 install --verbose --exists-action=i --no-index --find-links=\"file://${PWD}\" --prefix=${FLATPAK_DEST} \"pydbus\" --no-build-isolation"
],
"sources": [
{
"type": "file",
"url": "https://files.pythonhosted.org/packages/92/56/27148014c2f85ce70332f18612f921f682395c7d4e91ec103783be4fce00/pydbus-0.6.0-py2.py3-none-any.whl",
"sha256": "66b80106352a718d80d6c681dc2a82588048e30b75aab933e4020eb0660bf85e"
"url": "https://files.pythonhosted.org/packages/75/24/1f575eb17a8135e54b3c243ff87e2f4d6b2389942836021d0628ed837559/pynmea2-1.19.0-py3-none-any.whl",
"sha256": "5138558b4fb5daa587b2c17de99eb43df0297039de1c98010c996624abfb00eb"
}
]
}

View file

@ -1,3 +1,2 @@
gpxpy
pynmea2
pydbus

View file

@ -1,4 +1,4 @@
# Copyright 2021-2022 Teemu Ikonen
# Copyright 2021-2023 Teemu Ikonen
# SPDX-License-Identifier: GPL-3.0-only
__version__ = "0.3.1"
__version__ = "0.4.2"

View file

@ -1,4 +1,4 @@
# Copyright 2021-2022 Teemu Ikonen
# Copyright 2021-2023 Teemu Ikonen
# SPDX-License-Identifier: GPL-3.0-only
import sys

View file

@ -1,9 +1,8 @@
# Copyright 2021-2022 Teemu Ikonen
# Copyright 2021-2023 Teemu Ikonen
# SPDX-License-Identifier: GPL-3.0-only
import argparse
import gi
import gpxpy
import importlib.resources as resources
import os
import re
import signal
@ -12,26 +11,28 @@ import time
import tokenize
from datetime import datetime
import importlib.resources as resources
import gi
import gpxpy
import satellite.nmea as nmea
import satellite.quectel as quectel
from satellite import __version__
from .mm_glib_source import ModemManagerGLibNmeaSource
from .nmeasource import (
ModemNoNMEAError,
ModemLockedError,
ModemError,
NmeaSourceNotFoundError,
QuectelNmeaSource,
GnssShareNmeaSource,
ModemError,
ModemLockedError,
ModemNoNMEAError,
NmeaSourceNotFoundError,
)
from .util import bearing_to_arrow, have_touchscreen, now, unique_filename
from .widgets import text_barchart, DataFrame
from satellite import __version__
from .widgets import DataFrame, text_barchart
gi.require_version('Gtk', '3.0')
gi.require_version('Gdk', '3.0')
gi.require_version('Handy', '1')
from gi.repository import Gdk, Gio, GLib, Gtk, Handy # noqa: E402
from gi.repository import GLib, Gdk, Gio, Gtk, Handy # noqa: E402, I100
appname = 'Satellite'
app_id = 'page.codeberg.tpikonen.satellite'
@ -46,17 +47,21 @@ class SatelliteApp(Gtk.Application):
Handy.init()
desc = "Displays navigation satellite data and saves GPX tracks"
parser = argparse.ArgumentParser(description=desc)
parser = argparse.ArgumentParser(
description=desc, formatter_class=argparse.RawTextHelpFormatter)
parser.add_argument(
'-c', '--console-output', dest='console_output',
action='store_true', default=False,
help='Output satellite data to console')
parser.add_argument(
'-s', '--source', dest='source',
default='quectel',
help='Select NMEA source. Options are '
'\'quectel\' (default) for Quectel Modems or '
'\'gnss-share\' when using gnss-share')
choices=['auto', 'quectel', 'mm', 'gnss-share'],
default='auto',
help="Select NMEA source. Options are:\n"
"'auto' (default) Automatic source detection\n"
"'quectel' ModemManager with Quectel quirks\n"
"'mm' ModemManager without quirks\n"
"'gnss-share' Read from gnss-share socket\n")
self.args = parser.parse_args()
GLib.unix_signal_add(GLib.PRIORITY_DEFAULT, signal.SIGINT,
@ -90,7 +95,7 @@ class SatelliteApp(Gtk.Application):
self.source = None
self.infolabel.set_markup("<tt>" + "\n"*10 + "</tt>")
self.infolabel.set_markup("<tt>" + "\n" * 10 + "</tt>")
self.dataframe = DataFrame()
# self.dataframe.header.set_text("Satellite info")
@ -117,7 +122,7 @@ class SatelliteApp(Gtk.Application):
self.last_data = None
self.last_speed = None
self.last_update = None
self.source_lost = False
self.had_error = False
self.sigint_received = False
self.refresh_rate = 1 # Really delay between updates in seconds
@ -161,8 +166,6 @@ class SatelliteApp(Gtk.Application):
def on_startup(self, app):
self.create_actions()
# Initialize modem after GUI startup
GLib.idle_add(self.init_source)
def on_activate(self, app):
self.setup_styles()
@ -171,33 +174,56 @@ class SatelliteApp(Gtk.Application):
if have_touchscreen():
self.datascroll.connect('edge-overshot', self.on_edge_overshot)
self.log_msg(f"{appname} version {__version__} started")
# Initialize modem after GUI startup
GLib.timeout_add(1000, self.init_source, None)
def on_shutdown(self, app):
"""Called after main loop exits."""
print("Cleaning up...")
self.gpx_write()
if self.source is not None:
self.source.close()
print("...done.")
def init_source(self):
self.log_msg(f"{appname} version {__version__} started")
def init_source(self, unused):
source_init = False
self.log_msg(f'Trying to initialize source "{self.args.source}"')
if self.args.source == 'quectel':
source_init = self.init_quectel_source()
elif self.args.source == 'gnss-share':
source_init = self.init_gnss_share_source()
if not source_init:
self.log_msg('No NmeaSource initialized')
return False # Remove from idle_add
if self.args.source == 'auto':
self.log_msg("Detecting NMEA sources...")
if not source_init:
source_init = self.init_gnss_share_source(autodetect=True)
if not source_init:
source_init = self.init_mm_source(
quirks=['detect'], autodetect=True)
if not source_init:
self.log_msg('NMEA source not found')
dialog = Gtk.MessageDialog(
parent=self.window, modal=True,
message_type=Gtk.MessageType.ERROR,
buttons=Gtk.ButtonsType.OK,
text="Could not find an NMEA source")
dialog.set_title("Error initializing NMEA source")
dialog.run()
dialog.destroy()
return GLib.SOURCE_REMOVE
else:
self.log_msg(f'NMEA source "{self.args.source}" selected')
if self.args.source == 'quectel':
source_init = self.init_mm_source(quirks=['QuectelTalker'])
elif self.args.source == 'mm':
source_init = self.init_mm_source()
elif self.args.source == 'gnss-share':
source_init = self.init_gnss_share_source()
if not source_init:
self.log_msg('Could not initialize NMEA source')
return GLib.SOURCE_REMOVE
self.log_msg(
f"Source is {self.source.manufacturer}, model {self.source.model}"
+ f", revision {self.source.revision}"
if self.source.revision else "")
f"Source is {self.source.manufacturer}"
+ (f", model {self.source.model}" if self.source.model else "")
+ (f", revision {self.source.revision}" if self.source.revision else "")
+ (f" using {', '.join(self.source.quirks)} quirks"
if hasattr(self.source, "quirks") and self.source.quirks else ""))
if (self.source.model and self.source.model.startswith("QUECTEL")):
constellations = quectel.get_constellations(self.source)
@ -218,14 +244,15 @@ class SatelliteApp(Gtk.Application):
return GLib.SOURCE_REMOVE
def init_gnss_share_source(self):
def init_gnss_share_source(self, autodetect=False):
try:
self.source = GnssShareNmeaSource(self.location_update_cb)
self.source.initialize()
except Exception as e:
fatal = False
if autodetect:
return False
self.log_msg(str(e))
dtext = 'Can you access "/var/run/gnss-share.sock"?\n' + str(e)
dtext = str(e)
dialog = Gtk.MessageDialog(
parent=self.window, modal=True,
message_type=Gtk.MessageType.ERROR,
@ -233,44 +260,40 @@ class SatelliteApp(Gtk.Application):
dialog.set_title("Error initializing NMEA source")
dialog.run()
dialog.destroy()
if fatal:
self.quit()
return False
return True
def init_quectel_source(self):
def init_mm_source(self, quirks=[], autodetect=False):
try:
self.source = QuectelNmeaSource(
self.source = ModemManagerGLibNmeaSource(
self.location_update_cb,
refresh_rate=self.refresh_rate,
quirks=quirks,
# save_filename=unique_filename(self.gpx_save_dir + '/nmeas',
# '.txt')
)
self.source.initialize()
except Exception as e:
fatal = False
if autodetect:
return False
if isinstance(e, ModemLockedError):
self.log_msg("Modem is locked")
dtext = "Please unlock the Modem"
else:
fatal = isinstance(e, gi.repository.GLib.GError)
self.log_msg("Error initializing NMEA source")
etext = str(e)
self.log_msg(f"Error initializing ModemManager NMEA source: {etext}")
dtext = etext if etext else (
"Could not find or initialize NMEA source")
"Could not initialize ModemManager NMEA source")
dialog = Gtk.MessageDialog(
parent=self.window, modal=True,
message_type=Gtk.MessageType.ERROR,
buttons=Gtk.ButtonsType.CLOSE if fatal else Gtk.ButtonsType.OK,
buttons=Gtk.ButtonsType.OK,
text=dtext)
dialog.set_title("Error initializing NMEA source")
dialog.run()
dialog.destroy()
if fatal:
self.quit()
return
return False
@ -301,7 +324,7 @@ class SatelliteApp(Gtk.Application):
version=__version__,
comments="A program for showing navigation satellite data",
license_type=Gtk.License.GPL_3_0_ONLY,
copyright="Copyright 2021-2022 Teemu Ikonen",
copyright="Copyright 2021-2023 Teemu Ikonen",
)
adlg.present()
@ -407,6 +430,12 @@ class SatelliteApp(Gtk.Application):
fixage = to_str(data.get("fixage"), "%0.0f s")
return "%s / %s" % (up_age, fixage)
def get_dops(xkey):
pdop = to_str(data.get("pdop"), "%1.1f")
hdop = to_str(data.get("hdop"), "%1.1f")
vdop = to_str(data.get("vdop"), "%1.1f")
return f"{pdop} / {hdop} / {vdop}"
mode2fix = {
"2": "2 D",
"3": "3 D",
@ -415,19 +444,19 @@ class SatelliteApp(Gtk.Application):
# Mapping: Data key, description, converter func
order = [
("mode", "Fix type", lambda x: mode2fix.get(x, "No Fix")),
("mode_indicator", "Modes (GP,GL,GA)", lambda x: str(x)),
("mode_indicator", "Modes (GP,GL,GA)",
lambda x: str(x) if x is not None else "n/a"),
("actives", "Active / in use sats", get_actives),
("visibles", "Receiving sats", lambda x: str(len(
list(r for r in x if r['snr'] > 0.0)))),
[r for r in x if r['snr'] > 0.0]))),
("visibles", "Visible sats", lambda x: str(len(x))),
# ("fixage", "Age of fix", lambda x: to_str(x, "%0.0f s")),
("fixage", "Age of update / fix", get_ages),
("systime", "Sys. Time", lambda x: x.strftime(utcfmt)),
("latlon", "Latitude",
lambda x: "%0.6f" % x[0] if x else "-"),
("latlon", "Longitude",
lambda x: "%0.6f" % x[1] if x else "-"),
("altitude", "Altitude", lambda x: to_str(x, "%0.1f m")),
("latlon", "Latitude", lambda x: "%0.6f" % x[0] if x else "-"),
("latlon", "Longitude", lambda x: "%0.6f" % x[1] if x else "-"),
("altitude", "Altitude", lambda x: to_str(x, "%0.1f m")),
("geoid_sep", "Geoidal separation", lambda x: to_str(x, "%0.1f m")),
# ("fixtime", "Time of fix",
# lambda x: x.strftime(utcfmt) if x else "-"),
# ("date", "Date of fix",
@ -436,9 +465,7 @@ class SatelliteApp(Gtk.Application):
("true_course", "True Course",
lambda x: to_str(x, "%0.1f deg ")
+ (bearing_to_arrow(x) if x is not None else "")),
("pdop", "PDOP", lambda x: to_str(x)),
("hdop", "HDOP", lambda x: to_str(x)),
("vdop", "VDOP", lambda x: to_str(x)),
("pdop", "PDOP/HDOP/VDOP", get_dops),
]
descs = []
vals = []
@ -471,10 +498,9 @@ class SatelliteApp(Gtk.Application):
self.last_mode = mode
def set_speedlabel(self, speed, bearing=None):
spd = str(int(3.6*speed)) if speed else "-"
spd = str(int(3.6 * speed)) if speed else "-"
arrow = bearing_to_arrow(bearing) if bearing is not None else ""
speedfmt = ('<span size="50000">%s%s</span>\n' +
'<span size="30000">%s</span>')
speedfmt = '<span size="50000">%s%s</span>\n<span size="30000">%s</span>'
speedstr = speedfmt % (spd, arrow, "km/h")
self.speedlabel.set_markup(speedstr)
@ -539,8 +565,14 @@ class SatelliteApp(Gtk.Application):
def update(self):
try:
nmeas = self.source.get()
if self.had_error:
self.log_msg("Getting updates")
self.main_box.set_sensitive(True)
self.had_error = False
data = nmea.parse(nmeas)
except Exception as e:
fatal = False
nmeas = None
show_dialog = False
etext = str(e)
dtext = None
@ -552,37 +584,29 @@ class SatelliteApp(Gtk.Application):
elif isinstance(e, ModemError):
dtext = "Modem error: " + str(e)
elif isinstance(e, NmeaSourceNotFoundError):
if not self.source_lost:
if not self.had_error:
dtext = etext if etext else "Modem disappeared"
self.source_lost = True
self.had_error = True
self.main_box.set_sensitive(False)
else:
dtext = etext if etext else "Unknown error"
if show_dialog:
dialog = Gtk.MessageDialog(
parent=self.window, modal=True,
message_type=Gtk.MessageType.ERROR,
buttons=Gtk.ButtonsType.OK, text=dtext)
dialog.set_title("Unrecoverable error" if fatal else "Error")
dialog.run()
dialog.destroy()
if fatal:
self.quit()
return
elif dtext is not None:
self.log_msg(dtext)
return
if not self.had_error:
if show_dialog:
dialog = Gtk.MessageDialog(
parent=self.window, modal=True,
message_type=Gtk.MessageType.ERROR,
buttons=Gtk.ButtonsType.OK, text=dtext)
dialog.set_title("Error")
dialog.run()
dialog.destroy()
elif dtext is not None:
self.log_msg(dtext)
self.had_error = True
if self.last_data is None:
return
else:
data = self.last_data
if not nmeas:
return
if self.source_lost:
self.log_msg("Modem appeared")
self.main_box.set_sensitive(True)
self.source_lost = False
data = nmea.parse(nmeas)
data["updateage"] = ((time.time() - self.last_update)
if self.last_update else None)
@ -603,7 +627,7 @@ class SatelliteApp(Gtk.Application):
# log
mode = data["mode"]
mode = int(mode) if mode else 0
mode = int(mode) if mode else self.last_mode
if mode != self.last_mode:
if mode > 1:
self.log_msg(f"Got lock, mode: {mode}")

158
satellite/mm_glib_source.py Normal file
View file

@ -0,0 +1,158 @@
# Copyright 2023 Teemu Ikonen
# SPDX-License-Identifier: GPL-3.0-only
import re
import gi
from pynmea2.nmea import NMEASentence
from satellite.nmeasource import ( # noqa: E402
ModemError,
ModemLockedError,
ModemNoNMEAError,
NmeaSource,
NmeaSourceNotFoundError,
)
gi.require_version('ModemManager', '1.0')
from gi.repository import Gio, ModemManager # noqa: E402, I100
class ModemManagerGLibNmeaSource(NmeaSource):
def __init__(self, update_callback, quirks=[], **kwargs):
super().__init__(update_callback, **kwargs)
self.bus = None
self.manager = None
self.modem = None
self.mlocation = None
self.old_refresh_rate = None
self.old_sources_enabled = None
self.old_signals_location = None
self.location_updated = None
self.quirks = set(quirks)
def initialize(self):
# If reinitializing, disconnect old update cb
if self.mlocation is not None:
self.mlocation.disconnect_by_func(self.update_callback)
self.bus = Gio.bus_get_sync(Gio.BusType.SYSTEM, None)
self.manager = ModemManager.Manager.new_sync(
self.bus, Gio.DBusObjectManagerClientFlags.DO_NOT_AUTO_START, None)
if self.manager.get_name_owner() is None:
raise NmeaSourceNotFoundError("ModemManager is not running")
objs = self.manager.get_objects()
if objs:
self.modem = objs[0].get_modem()
self.mlocation = objs[0].get_modem_location()
else:
raise NmeaSourceNotFoundError("No Modems Found")
self.manufacturer = self.modem.get_manufacturer()
self.model = self.modem.get_model()
self.revision = self.modem.get_revision()
if 'detect' in self.quirks:
self.quirks.remove('detect')
if (self.model.startswith('QUECTEL')
and self.manufacturer == 'QUALCOMM INCORPORATED'):
self.quirks.add('QuectelTalker')
# Detect SDM845 GNSS unit and disable MSB assistance,
# which causes stalling at startup due to some bug somewhere
if (self.manufacturer == 'QUALCOMM INCORPORATED'
and self.model == '0'
and self.revision.find('SDM845') >= 0):
self.quirks.add('NoMSB')
try:
state = self.modem.get_state()
if int(state) > 0:
if self.old_refresh_rate is None:
self.old_refresh_rate = self.mlocation.props.gps_refresh_rate
if self.old_sources_enabled is None:
self.old_sources_enabled = self.mlocation.props.enabled
if self.old_signals_location is None:
self.old_signals_location = self.mlocation.props.signals_location
caps = self.mlocation.get_capabilities()
if not caps & ModemManager.ModemLocationSource.GPS_NMEA:
raise NmeaSourceNotFoundError(
"Modem does not support NMEA")
enable = ModemManager.ModemLocationSource.GPS_NMEA
if (caps & ModemManager.ModemLocationSource.AGPS_MSB
and 'NoMSB' not in self.quirks):
enable |= ModemManager.ModemLocationSource.AGPS_MSB
self.mlocation.setup_sync(enable, True, None)
else:
raise ModemError("Modem state is: %d" % state)
except AttributeError as e:
if state == ModemManager.ModemState.LOCKED:
raise ModemLockedError from e
else:
raise e
except gi.repository.GLib.GError as e:
# Ignore error on AGPS enablement by this hack
if 'agps-msb' not in str(e):
raise e
self.mlocation.set_gps_refresh_rate_sync(self.refresh_rate, None)
self.mlocation.connect('notify::location', self.update_callback)
self.initialized = True
def _really_get(self):
if not self.initialized:
self.initialize()
try:
loc = self.mlocation.get_signaled_gps_nmea()
except Exception as e:
self.initialized = False
raise e
if loc is None:
raise ModemNoNMEAError
nmeas = loc.get_traces()
if nmeas is None:
self.initialized = False
raise ModemNoNMEAError
if 'QuectelTalker' in self.quirks:
nmeas = self.quectel_talker_quirk(nmeas)
return '\r\n'.join(nmeas)
def close(self):
if self.mlocation is None:
return
try:
self.mlocation.disconnect_by_func(self.update_callback)
except TypeError:
pass # Ignore error when nothing is connected
if self.old_sources_enabled is not None:
self.mlocation.setup_sync(
ModemManager.ModemLocationSource(self.old_sources_enabled),
self.old_signals_location, None)
if self.old_refresh_rate is not None:
self.mlocation.set_gps_refresh_rate_sync(self.old_refresh_rate, None)
def quectel_talker_quirk(self, nmeas):
pq_re = re.compile(r"""
^\s*\$?
(?P<talker>PQ)
(?P<sentence>\w{3})
(?P<data>[^*]*)
(?:[*](?P<checksum>[A-F0-9]{2}))$""", re.VERBOSE)
out = []
for nmea in (n for n in nmeas if n):
mo = pq_re.match(nmea)
if mo:
# The last extra data field is Signal ID, these are
# 1 = GPS, 2 = Glonass, 3 = Galileo, 4 = BeiDou, 5 = QZSS
# Determine talker from Signal ID
talker = 'QZ' if mo.group('data').endswith('5') else 'BD'
# Fake talker and checksum
fake = talker + "".join(mo.group(2, 3))
out.append('$' + fake + "*%02X" % NMEASentence.checksum(fake))
else:
out.append(nmea)
return out

View file

@ -1,31 +0,0 @@
# Copyright 2021-2022 Teemu Ikonen
# SPDX-License-Identifier: GPL-3.0-only
# flake8: noqa
# See /usr/include/ModemManager/ModemManager-enums.h in modemmanager-dev
MM_MODEM_LOCATION_SOURCE_NONE = 0
MM_MODEM_LOCATION_SOURCE_3GPP_LAC_CI = 1 << 0
MM_MODEM_LOCATION_SOURCE_GPS_RAW = 1 << 1
MM_MODEM_LOCATION_SOURCE_GPS_NMEA = 1 << 2
MM_MODEM_LOCATION_SOURCE_CDMA_BS = 1 << 3
MM_MODEM_LOCATION_SOURCE_GPS_UNMANAGED = 1 << 4
MM_MODEM_LOCATION_SOURCE_AGPS_MSA = 1 << 5
MM_MODEM_LOCATION_SOURCE_AGPS_MSB = 1 << 6
MM_MODEM_LOCATION_ASSISTANCE_DATA_TYPE_NONE = 0
MM_MODEM_LOCATION_ASSISTANCE_DATA_TYPE_XTRA = 1 << 0
MM_MODEM_STATE_FAILED = -1
MM_MODEM_STATE_UNKNOWN = 0
MM_MODEM_STATE_INITIALIZING = 1
MM_MODEM_STATE_LOCKED = 2
MM_MODEM_STATE_DISABLED = 3
MM_MODEM_STATE_DISABLING = 4
MM_MODEM_STATE_ENABLING = 5
MM_MODEM_STATE_ENABLED = 6
MM_MODEM_STATE_SEARCHING = 7
MM_MODEM_STATE_REGISTERED = 8
MM_MODEM_STATE_DISCONNECTING = 9
MM_MODEM_STATE_CONNECTING = 10
MM_MODEM_STATE_CONNECTED = 11

View file

@ -1,10 +1,11 @@
# Copyright 2021-2022 Teemu Ikonen
# Copyright 2021-2023 Teemu Ikonen
# SPDX-License-Identifier: GPL-3.0-only
import datetime
import pynmea2
import re
import pynmea2
MS_PER_KNOT = 0.514444
lastfix_dt = None
@ -28,7 +29,7 @@ def fget(key, scale=1.0):
def iget(key, default=None):
def fn(d):
try:
return int(d.get(key))
return int(d.get(key, default))
except ValueError:
return default
return fn
@ -61,9 +62,9 @@ def get_latlon(mdict):
lat_min = float(lat[2:])
lon_deg = float(lon[:3])
lon_min = float(lon[3:])
flat = lat_deg + lat_min/60
flat = lat_deg + lat_min / 60
flat = -1 * flat if lat_dir == 'S' else flat
flon = lon_deg + lon_min/60
flon = lon_deg + lon_min / 60
flon = -1 * flon if lon_dir == 'W' else flon
return (flat, flon)
@ -98,12 +99,10 @@ getters = {
'GGA': get_altitude_gga,
'GNS': fget('altitude'),
},
"fixtime": {
"fixtime": { # Time of position report
'RMC': get_time,
'GGA': get_time,
},
"time": { # Reported also when no fix
'GNS': get_time,
'GGA': get_time,
},
"date": {
'RMC': get_date,
@ -131,6 +130,7 @@ getters = {
},
"num_sats": {
'GNS': iget('num_sats', default=0),
'GGA': iget('num_sats', default=0),
},
"pdop": {
'GSA': fget('pdop'),
@ -143,7 +143,8 @@ getters = {
"vdop": {
'GSA': fget('vdop'),
},
"geo_sep": {
"geoid_sep": {
'GGA': fget('geo_sep'),
'GNS': fget('geo_sep'),
},
"sel_mode": {
@ -182,7 +183,7 @@ def parse(nmeas, always_add_prefix=False):
return float(s) if s else empty_val
def add_prn_prefix(prns, talker, always=always_add_prefix):
"""Add constellation prefix to PRN string"""
"""Add constellation prefix to PRN string."""
beidou_prefix = "C"
galileo_prefix = "E"
glonass_prefix = "R"
@ -227,7 +228,7 @@ def parse(nmeas, always_add_prefix=False):
'snr': fl(getattr(msg, f'snr_{n}', None), 0.0),
})
elif isinstance(msg, pynmea2.types.GSA):
for n in range(1, 12+1):
for n in range(1, 12 + 1):
prns = getattr(msg, f'sv_id{n:02d}')
if prns and prns.isdigit():
actives.append(add_prn_prefix(prns, msg.talker))
@ -248,13 +249,13 @@ def parse(nmeas, always_add_prefix=False):
}
out.update({k: msg_get(msgs, k) for k in getters.keys()})
datenow = datetime.datetime.utcnow()
datenow = datetime.datetime.now(datetime.timezone.utc)
fixtime = out.get('fixtime')
fixdate = out.get('date')
if fixdate is None and fixtime is not None:
# We have a fix but no RMC sentence
fixdate = datenow.date()
fixdt = (datetime.datetime.combine(fixdate, fixtime)
fixdt = (datetime.datetime.combine(fixdate, fixtime, datetime.timezone.utc)
if (fixtime and fixdate) else None)
out["datetime"] = fixdt
out["systime"] = datenow

View file

@ -1,12 +1,8 @@
# Copyright 2021-2022 Teemu Ikonen
# Copyright 2021-2023 Teemu Ikonen
# SPDX-License-Identifier: GPL-3.0-only
import re
import satellite.modem_manager_defs as mm
import os.path
import socket
from pydbus import SystemBus
from pynmea2.nmea import NMEASentence
from gi.repository import GLib
@ -73,11 +69,12 @@ class UnixSocketNmeaSource(NmeaSource):
def on_read_data_available(self, io_channel, condition, **unused):
self.update_callback()
return True
def initialize(self):
if self.socket_file_path is None or \
not os.path.exists(self.socket_file_path):
return
if (self.socket_file_path is None
or not os.path.exists(self.socket_file_path)):
raise FileNotFoundError(f"Could not open socket {self.socket_file_path}")
self.s = socket.socket(socket.AF_UNIX,
socket.SOCK_NONBLOCK | socket.SOCK_STREAM)
@ -113,118 +110,7 @@ class GnssShareNmeaSource(UnixSocketNmeaSource):
super().__init__(update_callback,
socket_file_path='/var/run/gnss-share.sock',
**kwargs)
class ModemManagerNmeaSource(NmeaSource):
def __init__(self, update_callback, **kwargs):
super().__init__(update_callback, **kwargs)
self.bus = SystemBus()
self.manager = self.bus.get('.ModemManager1')
self.modem = None
self.old_refresh_rate = None
self.old_sources_enabled = None
self.old_signals = None
self.location_updated = None
def initialize(self):
objs = self.manager.GetManagedObjects()
mkeys = list(objs.keys())
if mkeys:
mstr = mkeys[0]
else:
raise NmeaSourceNotFoundError("No Modems Found")
info = objs[mstr]['org.freedesktop.ModemManager1.Modem']
self.manufacturer = info.get('Manufacturer')
self.model = info.get('Model')
self.revision = info.get('Revision')
self.modem = self.bus.get('.ModemManager1', mstr)
try:
if self.modem.State > 0:
if self.old_refresh_rate is None:
self.old_refresh_rate = self.modem.GpsRefreshRate
if self.old_sources_enabled is None:
self.old_sources_enabled = self.modem.Enabled
if self.old_signals is None:
self.old_signals = self.modem.SignalsLocation
cap = self.modem.Capabilities
if (cap & mm.MM_MODEM_LOCATION_SOURCE_GPS_NMEA) == 0:
raise NmeaSourceNotFoundError(
"Modem does not support NMEA")
self.modem.Setup(
(mm.MM_MODEM_LOCATION_SOURCE_GPS_NMEA
| (cap & mm.MM_MODEM_LOCATION_SOURCE_AGPS_MSB)),
True)
else:
raise ModemError("Modem state is: %d" % self.modem.State)
except AttributeError as e:
if self.modem.State == mm.MM_MODEM_STATE_LOCKED:
raise ModemLockedError from e
else:
raise ModemError from e
except Exception as e:
raise e
self.modem.SetGpsRefreshRate(self.refresh_rate)
self.location_updated = self.bus.subscribe(
sender='org.freedesktop.ModemManager1',
iface='org.freedesktop.DBus.Properties',
signal='PropertiesChanged',
arg0='org.freedesktop.ModemManager1.Modem.Location',
signal_fired=self.update_callback)
self.initialized = True
def _really_get(self):
if not self.initialized:
self.initialize()
try:
loc = self.modem.GetLocation()
except Exception as e:
self.initialized = False
raise e
retval = loc.get(mm.MM_MODEM_LOCATION_SOURCE_GPS_NMEA)
if retval is None:
self.initialized = False
raise ModemNoNMEAError
return retval
def close(self):
if self.location_updated is not None:
self.location_updated.disconnect()
if self.old_sources_enabled is not None:
self.modem.Setup(self.old_sources_enabled, self.old_signals)
if self.old_refresh_rate is not None:
self.modem.SetGpsRefreshRate(self.old_refresh_rate)
class QuectelNmeaSource(ModemManagerNmeaSource):
def _really_get(self):
return self.fix_talker(super()._really_get())
def fix_talker(self, nmeas):
pq_re = re.compile(r'''
^\s*\$?
(?P<talker>PQ)
(?P<sentence>\w{3})
(?P<data>[^*]*)
(?:[*](?P<checksum>[A-F0-9]{2}))$''', re.VERBOSE)
out = []
for nmea in (n for n in nmeas.split('\r\n') if n):
mo = pq_re.match(nmea)
if mo:
# The last extra data field is Signal ID, these are
# 1 = GPS, 2 = Glonass, 3 = Galileo, 4 = BeiDou, 5 = QZSS
# Determine talker from Signal ID
talker = 'QZ' if mo.group('data').endswith('5') else 'BD'
# Fake talker and checksum
fake = talker + "".join(mo.group(2, 3))
out.append('$' + fake + "*%02X" % NMEASentence.checksum(fake))
else:
out.append(nmea)
return "\r\n".join(out)
self.manufacturer = "gnss-share"
class ReplayNmeaSource(NmeaSource):

View file

@ -1,8 +1,7 @@
# Copyright 2021-2022 Teemu Ikonen
# Copyright 2021-2023 Teemu Ikonen
# SPDX-License-Identifier: GPL-3.0-only
import re
from datetime import datetime, timezone
from .util import (

View file

@ -1,11 +1,10 @@
# Copyright 2021-2022 Teemu Ikonen
# Copyright 2021-2023 Teemu Ikonen
# SPDX-License-Identifier: GPL-3.0-only
import gi
import os
from datetime import datetime, timezone
import gi
gi.require_version('Gdk', '3.0')
from gi.repository import Gdk # noqa: E402
@ -16,15 +15,13 @@ week_now = int((now.timestamp() - gps_epoch.timestamp()) / one_week)
def have_touchscreen():
"""Return True if the default seat of default display has touch capability
"""
"""Return True if the default seat of default display has touch capability."""
return bool(Gdk.Display.get_default_seat(
Gdk.Display.get_default()).get_capabilities() & Gdk.SeatCapabilities.TOUCH)
def datetime_from_gpstime(week, millisecs, fix_week=False):
"""Return a datetime object formed from GPS week number and
milliseconds from week start.
"""Return a datetime from GPS week number and milliseconds from week start.
If fix_week is True, set the bits above 10 in week number from
current date, see
@ -38,7 +35,7 @@ def datetime_from_gpstime(week, millisecs, fix_week=False):
def gpstime_from_datetime(dt):
"""Return a (gps_week, millisec) tuple from a datetime object"""
"""Return a (gps_week, millisec) tuple from a datetime object."""
if dt < gps_epoch:
raise ValueError("Time cannot be less than GPS epoch")
ts = dt.timestamp()
@ -51,7 +48,7 @@ def gpstime_from_datetime(dt):
def unique_filename(namestem, ext, timestamp=False):
if timestamp:
namestem += "-" + datetime.now().isoformat(
'_', 'seconds').replace(':', '.')
'_', 'seconds').replace(':', '.')
name = None
for count in ('~%d' % n if n > 0 else '' for n in range(100)):
test = namestem + count + ext
@ -77,7 +74,7 @@ def bearing_to_arrow(bearing):
'\u2196',
'\u2191',
]
edges = list(22.5 + 45.0 * n for n in range(0, 8)) + [360.0]
edges = [22.5 + 45.0 * n for n in range(0, 8)] + [360.0]
angle = bearing - (bearing // 360) * 360
index = next(ind for (ind, e) in enumerate(edges) if angle < e)

View file

@ -1,10 +1,9 @@
# Copyright 2021-2022 Teemu Ikonen
# Copyright 2021-2023 Teemu Ikonen
# SPDX-License-Identifier: GPL-3.0-only
import gi
import importlib.resources as resources
import gi
gi.require_version('Gtk', '3.0')
gi.require_version('Gdk', '3.0')
from gi.repository import Gtk # noqa: E402
@ -18,8 +17,7 @@ def text_barchart(data, highlights, height=None, width=30):
height Number of lines in the generated bar chart
width Width of the generated bar chart in chars
"""
sdata = list((d[0] if d[0] else '',
int(d[1]) if d[1] else 0) for d in data)
sdata = [(d[0] if d[0] else '', int(d[1]) if d[1] else 0) for d in data]
sdata.sort(key=lambda x: x[1], reverse=True)
dstr = ''
@ -41,14 +39,14 @@ def text_barchart(data, highlights, height=None, width=30):
cmax_xaxis = cmaxbar + 3
for d in sdata[:barlines]:
block = '\u2585' if d[0] in highlights else '='
dstr += "%3s\u2502%s %d\n" % (d[0], block*int(scale*d[1]), d[1])
dstr += "%3s\u2502%s %d\n" % (d[0], block * int(scale * d[1]), d[1])
if barlines < len(sdata):
dstr += " \u256a\n"
elif (len(sdata) - axislines) < height:
# Add empty lines to y-axis
dstr += ' \u2502\n' * (height - len(sdata) - axislines)
dstr += " \u251c" + '\u2500'*(cmax_xaxis) + '\u2524\n'
dstr += " 0" + ' '*(cmax_xaxis - 1) + str(max_xaxis)
dstr += " \u251c" + '\u2500' * (cmax_xaxis) + '\u2524\n'
dstr += " 0" + ' ' * (cmax_xaxis - 1) + str(max_xaxis)
return dstr

View file

@ -1,3 +1,8 @@
[flake8]
exclude=.git,__pycache__,build
max-line-length=88
ignore = B902, BLK100, CCR001, CNL100, D1, I201, Q000, W503
[pycodestyle]
count=1
max-line-length = 88

View file

@ -29,7 +29,7 @@ setuptools.setup(
"Bug Tracker": "https://codeberg.org/tpikonen/satellite/issues",
},
classifiers=[
"Development Status :: 3 - Alpha",
"Development Status :: 4 - Beta",
"Environment :: X11 Applications :: GTK",
"Intended Audience :: End Users/Desktop",
"License :: OSI Approved :: GNU General Public License v3 (GPLv3)",