satellite-gtk/satellite/mm_pydbus_source.py

130 lines
4.5 KiB
Python

# Copyright 2021-2023 Teemu Ikonen
# SPDX-License-Identifier: GPL-3.0-only
import re
from pydbus import SystemBus
from pynmea2.nmea import NMEASentence
import satellite.modem_manager_defs as mm
from satellite.nmeasource import (
ModemError,
ModemLockedError,
ModemNoNMEAError,
NmeaSource,
NmeaSourceNotFoundError,
)
class ModemManagerPyDBusNmeaSource(NmeaSource):
def __init__(self, update_callback, **kwargs):
super().__init__(update_callback, **kwargs)
self.bus = SystemBus()
self.manager = self.bus.get('.ModemManager1')
self.modem = None
self.old_refresh_rate = None
self.old_sources_enabled = None
self.old_signals = None
self.location_updated = None
def initialize(self):
objs = self.manager.GetManagedObjects()
mkeys = list(objs.keys())
if mkeys:
mstr = mkeys[0]
else:
raise NmeaSourceNotFoundError("No Modems Found")
info = objs[mstr]['org.freedesktop.ModemManager1.Modem']
self.manufacturer = info.get('Manufacturer')
self.model = info.get('Model')
self.revision = info.get('Revision')
self.modem = self.bus.get('.ModemManager1', mstr)
try:
if self.modem.State > 0:
if self.old_refresh_rate is None:
self.old_refresh_rate = self.modem.GpsRefreshRate
if self.old_sources_enabled is None:
self.old_sources_enabled = self.modem.Enabled
if self.old_signals is None:
self.old_signals = self.modem.SignalsLocation
cap = self.modem.Capabilities
if (cap & mm.MM_MODEM_LOCATION_SOURCE_GPS_NMEA) == 0:
raise NmeaSourceNotFoundError(
"Modem does not support NMEA")
self.modem.Setup(
(mm.MM_MODEM_LOCATION_SOURCE_GPS_NMEA
| (cap & mm.MM_MODEM_LOCATION_SOURCE_AGPS_MSB)),
True)
else:
raise ModemError("Modem state is: %d" % self.modem.State)
except AttributeError as e:
if self.modem.State == mm.MM_MODEM_STATE_LOCKED:
raise ModemLockedError from e
else:
raise ModemError from e
except Exception as e:
raise e
self.modem.SetGpsRefreshRate(self.refresh_rate)
self.location_updated = self.bus.subscribe(
sender='org.freedesktop.ModemManager1',
iface='org.freedesktop.DBus.Properties',
signal='PropertiesChanged',
arg0='org.freedesktop.ModemManager1.Modem.Location',
signal_fired=self.update_callback)
self.initialized = True
def _really_get(self):
if not self.initialized:
self.initialize()
try:
loc = self.modem.GetLocation()
except Exception as e:
self.initialized = False
raise e
retval = loc.get(mm.MM_MODEM_LOCATION_SOURCE_GPS_NMEA)
if retval is None:
self.initialized = False
raise ModemNoNMEAError
return retval
def close(self):
if self.location_updated is not None:
self.location_updated.disconnect()
if self.old_sources_enabled is not None:
self.modem.Setup(self.old_sources_enabled, self.old_signals)
if self.old_refresh_rate is not None:
self.modem.SetGpsRefreshRate(self.old_refresh_rate)
class QuectelNmeaSource(ModemManagerPyDBusNmeaSource):
def _really_get(self):
return self.fix_talker(super()._really_get())
def fix_talker(self, nmeas):
pq_re = re.compile(r"""
^\s*\$?
(?P<talker>PQ)
(?P<sentence>\w{3})
(?P<data>[^*]*)
(?:[*](?P<checksum>[A-F0-9]{2}))$""", re.VERBOSE)
out = []
for nmea in (n for n in nmeas.split('\r\n') if n):
mo = pq_re.match(nmea)
if mo:
# The last extra data field is Signal ID, these are
# 1 = GPS, 2 = Glonass, 3 = Galileo, 4 = BeiDou, 5 = QZSS
# Determine talker from Signal ID
talker = 'QZ' if mo.group('data').endswith('5') else 'BD'
# Fake talker and checksum
fake = talker + "".join(mo.group(2, 3))
out.append('$' + fake + "*%02X" % NMEASentence.checksum(fake))
else:
out.append(nmea)
return "\r\n".join(out)