Split PyDBus based ModemManager source to its own file
This commit is contained in:
parent
0861c78c76
commit
760f80fb49
|
@ -16,12 +16,12 @@ import importlib.resources as resources
|
|||
|
||||
import satellite.nmea as nmea
|
||||
import satellite.quectel as quectel
|
||||
from .mm_pydbus_source import QuectelNmeaSource
|
||||
from .nmeasource import (
|
||||
ModemNoNMEAError,
|
||||
ModemLockedError,
|
||||
ModemError,
|
||||
NmeaSourceNotFoundError,
|
||||
QuectelNmeaSource,
|
||||
GnssShareNmeaSource,
|
||||
)
|
||||
from .util import bearing_to_arrow, have_touchscreen, now, unique_filename
|
||||
|
|
|
@ -0,0 +1,128 @@
|
|||
# Copyright 2021-2023 Teemu Ikonen
|
||||
# SPDX-License-Identifier: GPL-3.0-only
|
||||
|
||||
import re
|
||||
import satellite.modem_manager_defs as mm
|
||||
from satellite.nmeasource import (
|
||||
NmeaSource,
|
||||
NmeaSourceNotFoundError,
|
||||
ModemError,
|
||||
ModemLockedError,
|
||||
ModemNoNMEAError,
|
||||
)
|
||||
|
||||
from pydbus import SystemBus
|
||||
from pynmea2.nmea import NMEASentence
|
||||
|
||||
|
||||
class ModemManagerPyDBusNmeaSource(NmeaSource):
|
||||
|
||||
def __init__(self, update_callback, **kwargs):
|
||||
super().__init__(update_callback, **kwargs)
|
||||
self.bus = SystemBus()
|
||||
self.manager = self.bus.get('.ModemManager1')
|
||||
self.modem = None
|
||||
self.old_refresh_rate = None
|
||||
self.old_sources_enabled = None
|
||||
self.old_signals = None
|
||||
self.location_updated = None
|
||||
|
||||
def initialize(self):
|
||||
objs = self.manager.GetManagedObjects()
|
||||
mkeys = list(objs.keys())
|
||||
if mkeys:
|
||||
mstr = mkeys[0]
|
||||
else:
|
||||
raise NmeaSourceNotFoundError("No Modems Found")
|
||||
info = objs[mstr]['org.freedesktop.ModemManager1.Modem']
|
||||
self.manufacturer = info.get('Manufacturer')
|
||||
self.model = info.get('Model')
|
||||
self.revision = info.get('Revision')
|
||||
self.modem = self.bus.get('.ModemManager1', mstr)
|
||||
|
||||
try:
|
||||
if self.modem.State > 0:
|
||||
if self.old_refresh_rate is None:
|
||||
self.old_refresh_rate = self.modem.GpsRefreshRate
|
||||
if self.old_sources_enabled is None:
|
||||
self.old_sources_enabled = self.modem.Enabled
|
||||
if self.old_signals is None:
|
||||
self.old_signals = self.modem.SignalsLocation
|
||||
cap = self.modem.Capabilities
|
||||
if (cap & mm.MM_MODEM_LOCATION_SOURCE_GPS_NMEA) == 0:
|
||||
raise NmeaSourceNotFoundError(
|
||||
"Modem does not support NMEA")
|
||||
self.modem.Setup(
|
||||
(mm.MM_MODEM_LOCATION_SOURCE_GPS_NMEA
|
||||
| (cap & mm.MM_MODEM_LOCATION_SOURCE_AGPS_MSB)),
|
||||
True)
|
||||
else:
|
||||
raise ModemError("Modem state is: %d" % self.modem.State)
|
||||
except AttributeError as e:
|
||||
if self.modem.State == mm.MM_MODEM_STATE_LOCKED:
|
||||
raise ModemLockedError from e
|
||||
else:
|
||||
raise ModemError from e
|
||||
except Exception as e:
|
||||
raise e
|
||||
|
||||
self.modem.SetGpsRefreshRate(self.refresh_rate)
|
||||
self.location_updated = self.bus.subscribe(
|
||||
sender='org.freedesktop.ModemManager1',
|
||||
iface='org.freedesktop.DBus.Properties',
|
||||
signal='PropertiesChanged',
|
||||
arg0='org.freedesktop.ModemManager1.Modem.Location',
|
||||
signal_fired=self.update_callback)
|
||||
self.initialized = True
|
||||
|
||||
def _really_get(self):
|
||||
if not self.initialized:
|
||||
self.initialize()
|
||||
try:
|
||||
loc = self.modem.GetLocation()
|
||||
except Exception as e:
|
||||
self.initialized = False
|
||||
raise e
|
||||
|
||||
retval = loc.get(mm.MM_MODEM_LOCATION_SOURCE_GPS_NMEA)
|
||||
if retval is None:
|
||||
self.initialized = False
|
||||
raise ModemNoNMEAError
|
||||
return retval
|
||||
|
||||
def close(self):
|
||||
if self.location_updated is not None:
|
||||
self.location_updated.disconnect()
|
||||
if self.old_sources_enabled is not None:
|
||||
self.modem.Setup(self.old_sources_enabled, self.old_signals)
|
||||
if self.old_refresh_rate is not None:
|
||||
self.modem.SetGpsRefreshRate(self.old_refresh_rate)
|
||||
|
||||
|
||||
class QuectelNmeaSource(ModemManagerPyDBusNmeaSource):
|
||||
|
||||
def _really_get(self):
|
||||
return self.fix_talker(super()._really_get())
|
||||
|
||||
def fix_talker(self, nmeas):
|
||||
pq_re = re.compile(r'''
|
||||
^\s*\$?
|
||||
(?P<talker>PQ)
|
||||
(?P<sentence>\w{3})
|
||||
(?P<data>[^*]*)
|
||||
(?:[*](?P<checksum>[A-F0-9]{2}))$''', re.VERBOSE)
|
||||
out = []
|
||||
for nmea in (n for n in nmeas.split('\r\n') if n):
|
||||
mo = pq_re.match(nmea)
|
||||
if mo:
|
||||
# The last extra data field is Signal ID, these are
|
||||
# 1 = GPS, 2 = Glonass, 3 = Galileo, 4 = BeiDou, 5 = QZSS
|
||||
# Determine talker from Signal ID
|
||||
talker = 'QZ' if mo.group('data').endswith('5') else 'BD'
|
||||
# Fake talker and checksum
|
||||
fake = talker + "".join(mo.group(2, 3))
|
||||
out.append('$' + fake + "*%02X" % NMEASentence.checksum(fake))
|
||||
else:
|
||||
out.append(nmea)
|
||||
|
||||
return "\r\n".join(out)
|
|
@ -1,12 +1,8 @@
|
|||
# Copyright 2021-2022 Teemu Ikonen
|
||||
# SPDX-License-Identifier: GPL-3.0-only
|
||||
|
||||
import re
|
||||
import satellite.modem_manager_defs as mm
|
||||
import os.path
|
||||
import socket
|
||||
from pydbus import SystemBus
|
||||
from pynmea2.nmea import NMEASentence
|
||||
from gi.repository import GLib
|
||||
|
||||
|
||||
|
@ -115,118 +111,6 @@ class GnssShareNmeaSource(UnixSocketNmeaSource):
|
|||
**kwargs)
|
||||
|
||||
|
||||
class ModemManagerNmeaSource(NmeaSource):
|
||||
def __init__(self, update_callback, **kwargs):
|
||||
super().__init__(update_callback, **kwargs)
|
||||
self.bus = SystemBus()
|
||||
self.manager = self.bus.get('.ModemManager1')
|
||||
self.modem = None
|
||||
self.old_refresh_rate = None
|
||||
self.old_sources_enabled = None
|
||||
self.old_signals = None
|
||||
self.location_updated = None
|
||||
|
||||
def initialize(self):
|
||||
objs = self.manager.GetManagedObjects()
|
||||
mkeys = list(objs.keys())
|
||||
if mkeys:
|
||||
mstr = mkeys[0]
|
||||
else:
|
||||
raise NmeaSourceNotFoundError("No Modems Found")
|
||||
info = objs[mstr]['org.freedesktop.ModemManager1.Modem']
|
||||
self.manufacturer = info.get('Manufacturer')
|
||||
self.model = info.get('Model')
|
||||
self.revision = info.get('Revision')
|
||||
self.modem = self.bus.get('.ModemManager1', mstr)
|
||||
|
||||
try:
|
||||
if self.modem.State > 0:
|
||||
if self.old_refresh_rate is None:
|
||||
self.old_refresh_rate = self.modem.GpsRefreshRate
|
||||
if self.old_sources_enabled is None:
|
||||
self.old_sources_enabled = self.modem.Enabled
|
||||
if self.old_signals is None:
|
||||
self.old_signals = self.modem.SignalsLocation
|
||||
cap = self.modem.Capabilities
|
||||
if (cap & mm.MM_MODEM_LOCATION_SOURCE_GPS_NMEA) == 0:
|
||||
raise NmeaSourceNotFoundError(
|
||||
"Modem does not support NMEA")
|
||||
self.modem.Setup(
|
||||
(mm.MM_MODEM_LOCATION_SOURCE_GPS_NMEA
|
||||
| (cap & mm.MM_MODEM_LOCATION_SOURCE_AGPS_MSB)),
|
||||
True)
|
||||
else:
|
||||
raise ModemError("Modem state is: %d" % self.modem.State)
|
||||
except AttributeError as e:
|
||||
if self.modem.State == mm.MM_MODEM_STATE_LOCKED:
|
||||
raise ModemLockedError from e
|
||||
else:
|
||||
raise ModemError from e
|
||||
except Exception as e:
|
||||
raise e
|
||||
|
||||
self.modem.SetGpsRefreshRate(self.refresh_rate)
|
||||
self.location_updated = self.bus.subscribe(
|
||||
sender='org.freedesktop.ModemManager1',
|
||||
iface='org.freedesktop.DBus.Properties',
|
||||
signal='PropertiesChanged',
|
||||
arg0='org.freedesktop.ModemManager1.Modem.Location',
|
||||
signal_fired=self.update_callback)
|
||||
self.initialized = True
|
||||
|
||||
def _really_get(self):
|
||||
if not self.initialized:
|
||||
self.initialize()
|
||||
try:
|
||||
loc = self.modem.GetLocation()
|
||||
except Exception as e:
|
||||
self.initialized = False
|
||||
raise e
|
||||
|
||||
retval = loc.get(mm.MM_MODEM_LOCATION_SOURCE_GPS_NMEA)
|
||||
if retval is None:
|
||||
self.initialized = False
|
||||
raise ModemNoNMEAError
|
||||
return retval
|
||||
|
||||
def close(self):
|
||||
if self.location_updated is not None:
|
||||
self.location_updated.disconnect()
|
||||
if self.old_sources_enabled is not None:
|
||||
self.modem.Setup(self.old_sources_enabled, self.old_signals)
|
||||
if self.old_refresh_rate is not None:
|
||||
self.modem.SetGpsRefreshRate(self.old_refresh_rate)
|
||||
|
||||
|
||||
class QuectelNmeaSource(ModemManagerNmeaSource):
|
||||
|
||||
def _really_get(self):
|
||||
return self.fix_talker(super()._really_get())
|
||||
|
||||
def fix_talker(self, nmeas):
|
||||
pq_re = re.compile(r'''
|
||||
^\s*\$?
|
||||
(?P<talker>PQ)
|
||||
(?P<sentence>\w{3})
|
||||
(?P<data>[^*]*)
|
||||
(?:[*](?P<checksum>[A-F0-9]{2}))$''', re.VERBOSE)
|
||||
out = []
|
||||
for nmea in (n for n in nmeas.split('\r\n') if n):
|
||||
mo = pq_re.match(nmea)
|
||||
if mo:
|
||||
# The last extra data field is Signal ID, these are
|
||||
# 1 = GPS, 2 = Glonass, 3 = Galileo, 4 = BeiDou, 5 = QZSS
|
||||
# Determine talker from Signal ID
|
||||
talker = 'QZ' if mo.group('data').endswith('5') else 'BD'
|
||||
# Fake talker and checksum
|
||||
fake = talker + "".join(mo.group(2, 3))
|
||||
out.append('$' + fake + "*%02X" % NMEASentence.checksum(fake))
|
||||
else:
|
||||
out.append(nmea)
|
||||
|
||||
return "\r\n".join(out)
|
||||
|
||||
|
||||
class ReplayNmeaSource(NmeaSource):
|
||||
def __init__(self, update_callback, replay_filename=None, refresh_rate=1,
|
||||
**kwargs):
|
||||
|
|
Loading…
Reference in New Issue