258 lines
6.7 KiB
Python
258 lines
6.7 KiB
Python
import datetime
|
|
import pynmea2
|
|
|
|
MS_PER_KNOT = 0.514444
|
|
|
|
lastfix_dt = None
|
|
|
|
|
|
def dget(key):
|
|
def fn(d):
|
|
return d.get(key)
|
|
return fn
|
|
|
|
|
|
def fget(key, scale=1.0):
|
|
def fn(d):
|
|
try:
|
|
return scale * float(d.get(key))
|
|
except ValueError:
|
|
return None
|
|
return fn
|
|
|
|
|
|
def iget(key, default=None):
|
|
def fn(d):
|
|
try:
|
|
return int(d.get(key))
|
|
except ValueError:
|
|
return default
|
|
return fn
|
|
|
|
|
|
def get_time(d):
|
|
itime = d.get("timestamp")
|
|
if itime:
|
|
return pynmea2.nmea_utils.timestamp(itime)
|
|
else:
|
|
return None
|
|
|
|
|
|
def get_date(d):
|
|
adate = d.get("datestamp")
|
|
if adate:
|
|
return pynmea2.nmea_utils.datestamp(adate)
|
|
else:
|
|
return None
|
|
|
|
|
|
def get_latlon(mdict):
|
|
lat = mdict.get('lat')
|
|
lon = mdict.get('lon')
|
|
lat_dir = mdict.get('lat_dir')
|
|
lon_dir = mdict.get('lon_dir')
|
|
if not all((lat, lon, lat_dir, lon_dir)):
|
|
return None
|
|
lat_deg = float(lat[:2])
|
|
lat_min = float(lat[2:])
|
|
lon_deg = float(lon[:3])
|
|
lon_min = float(lon[3:])
|
|
flat = lat_deg + lat_min/60
|
|
flat = -1 * flat if lat_dir == 'S' else flat
|
|
flon = lon_deg + lon_min/60
|
|
flon = -1 * flon if lon_dir == 'W' else flon
|
|
return (flat, flon)
|
|
|
|
|
|
def get_altitude_gga(mdict):
|
|
try:
|
|
altitude = float(mdict.get('altitude'))
|
|
except ValueError:
|
|
return None
|
|
alt1_units = mdict.get('altitude_units')
|
|
if alt1_units != 'M': # altitude unit should be valid here
|
|
raise ValueError("Expected altitude in meters")
|
|
return altitude
|
|
|
|
|
|
def get_mag_variation_rmc(mdict):
|
|
try:
|
|
val = float(mdict.get('mag_variation'))
|
|
return -1 * val if mdict.get('mag_var_dir') == 'W' else val
|
|
except ValueError:
|
|
return None
|
|
|
|
|
|
# Functions for getting and converting values from parsed NMEA message dicts
|
|
getters = {
|
|
"latlon": {
|
|
'RMC': get_latlon,
|
|
'GGA': get_latlon,
|
|
'GNS': get_latlon,
|
|
},
|
|
"altitude": {
|
|
'GGA': get_altitude_gga,
|
|
'GNS': fget('altitude'),
|
|
},
|
|
"fixtime": {
|
|
'RMC': get_time,
|
|
'GGA': get_time,
|
|
},
|
|
"time": { # Reported also when no fix
|
|
'GNS': get_time,
|
|
},
|
|
"date": {
|
|
'RMC': get_date,
|
|
},
|
|
"valid": {
|
|
'RMC': lambda x: x.get('status') == 'A',
|
|
},
|
|
"speed": {
|
|
'RMC': fget('spd_over_grnd', MS_PER_KNOT),
|
|
'VTG': fget('spd_over_grnd_kts', MS_PER_KNOT),
|
|
},
|
|
"true_course": {
|
|
'RMC': fget('true_course'),
|
|
'VTG': fget('true_track'),
|
|
},
|
|
"mag_course": {
|
|
'VTG': fget('mag_track'),
|
|
},
|
|
"mag_variation": {
|
|
'RMC': get_mag_variation_rmc,
|
|
},
|
|
"mode_indicator": {
|
|
'GNS': dget('mode_indicator'),
|
|
},
|
|
"num_sats": {
|
|
'GNS': iget('num_sats', default=0),
|
|
},
|
|
"pdop": {
|
|
'GSA': fget('pdop'),
|
|
},
|
|
"hdop": {
|
|
'GGA': fget('horizontal_dil'),
|
|
'GNS': fget('hdop'),
|
|
'GSA': fget('hdop'),
|
|
},
|
|
"vdop": {
|
|
'GSA': fget('vdop'),
|
|
},
|
|
"geo_sep": {
|
|
'GNS': fget('geo_sep'),
|
|
},
|
|
"sel_mode": {
|
|
'GSA': dget('mode'),
|
|
},
|
|
"mode": {
|
|
'GSA': dget('mode_fix_type'),
|
|
},
|
|
}
|
|
|
|
|
|
def msg_get(msgs, var):
|
|
gdict = getters[var]
|
|
try:
|
|
return next(gdict[k](v) for k, v in msgs.items() if k in gdict.keys())
|
|
except StopIteration:
|
|
return None
|
|
|
|
|
|
def __get_fields(msg):
|
|
out = {}
|
|
for i, data in enumerate(msg.data):
|
|
if i >= len(msg.fields):
|
|
break
|
|
out[msg.fields[i][1]] = data
|
|
return out
|
|
|
|
|
|
def parse(nmeas, always_add_prefix=False):
|
|
global lastfix_dt
|
|
visibles = []
|
|
actives = []
|
|
msgs = {}
|
|
|
|
def fl(s, empty_val=None):
|
|
return float(s) if s else empty_val
|
|
|
|
def add_prn_prefix(prns, talker, always=always_add_prefix):
|
|
"""Add constellation prefix to PRN string"""
|
|
beidou_prefix = "C"
|
|
galileo_prefix = "E"
|
|
glonass_prefix = "R"
|
|
gps_prefix = "G"
|
|
qzss_prefix = "J"
|
|
|
|
prn = int(prns)
|
|
prefix = ''
|
|
if talker == 'BD':
|
|
prefix = beidou_prefix
|
|
# Galileo PRNs are between 1 and 36
|
|
elif (talker == 'GA') or (talker == 'GN' and prn <= 36):
|
|
prefix = galileo_prefix
|
|
elif talker == 'QZ':
|
|
prefix = qzss_prefix
|
|
elif always:
|
|
# GPS PRNs are between 1 and 32
|
|
if prn <= 32:
|
|
prefix = gps_prefix
|
|
# Glonass PRNs are between 65 and 96
|
|
elif prn >= 65 and prn <= 96:
|
|
prefix = glonass_prefix
|
|
|
|
return "%s%02d" % (prefix, prn)
|
|
|
|
parsed = [pynmea2.parse(n) for n in nmeas.split('\n') if n]
|
|
for msg in parsed:
|
|
# print(repr(msg))
|
|
keys = []
|
|
for field in msg.fields:
|
|
keys.append(field[1])
|
|
if isinstance(msg, pynmea2.types.GSV):
|
|
for n in range(1, (len(msg.data) - 4) // 4 + 1):
|
|
prns = getattr(msg, f'sv_prn_num_{n}', None)
|
|
if prns and prns.isdigit():
|
|
visibles.append({
|
|
'prn': add_prn_prefix(prns, msg.talker),
|
|
'elevation': fl(
|
|
getattr(msg, f'elevation_deg_{n}', None), 0.0),
|
|
'azimuth': fl(getattr(msg, f'azimuth_{n}', None), 0.0),
|
|
'snr': fl(getattr(msg, f'snr_{n}', None), 0.0),
|
|
})
|
|
elif isinstance(msg, pynmea2.types.GSA):
|
|
for n in range(1, 12+1):
|
|
prns = getattr(msg, f'sv_id{n:02d}')
|
|
if prns and prns.isdigit():
|
|
actives.append(add_prn_prefix(prns, msg.talker))
|
|
if msg.talker != 'QZ':
|
|
msgs['GSA'] = __get_fields(msg)
|
|
elif isinstance(msg, pynmea2.types.GGA):
|
|
msgs['GGA'] = __get_fields(msg)
|
|
elif isinstance(msg, pynmea2.types.RMC):
|
|
msgs['RMC'] = __get_fields(msg)
|
|
elif isinstance(msg, pynmea2.types.VTG):
|
|
msgs['VTG'] = __get_fields(msg)
|
|
elif isinstance(msg, pynmea2.types.GNS):
|
|
msgs['GNS'] = __get_fields(msg)
|
|
|
|
out = {
|
|
"visibles": visibles,
|
|
"actives": actives,
|
|
}
|
|
out.update({k: msg_get(msgs, k) for k in getters.keys()})
|
|
|
|
datenow = datetime.datetime.utcnow()
|
|
fixtime = out.get('fixtime')
|
|
fixdate = out.get('date')
|
|
if fixdate is None and fixtime is not None:
|
|
# We have a fix but no RMC sentence
|
|
fixdate = datenow.date()
|
|
fixdt = (datetime.datetime.combine(fixdate, fixtime)
|
|
if (fixtime and fixdate) else None)
|
|
out["datetime"] = fixdt
|
|
out["systime"] = datenow
|
|
lastfix_dt = fixdt if fixdt else lastfix_dt
|
|
out["fixage"] = ((datenow - lastfix_dt).total_seconds()
|
|
if lastfix_dt else None)
|
|
return out
|