from .opencellid import qry_cell
from .evstore import initdb, fetch
from .gps303proto import GPS_POSITIONING, WIFI_POSITIONING, parse_message
-from .zmsg import LocEvt
-OCDB = None
-def blinit(evdb, ocdb):
- global OCDB
- OCDB = ocdb
+def blinit(evdb):
initdb(evdb)
+
def backlog(imei, backlog):
result = []
- for packet in fetch(imei, (GPS_POSITIONING.PROTO, WIFI_POSITIONING.PROTO), backlog):
+ for packet in fetch(
+ imei, (GPS_POSITIONING.PROTO, WIFI_POSITIONING.PROTO), backlog
+ ):
msg = parse_message(packet)
- if isinstance(msg, GPS_POSITIONING):
- result.append(LocEvt(devtime=msg.devtime, lon=msg.longitude,
- lat=msg.latitude, is_gps=True, imei=imei))
- elif isinstance(msg, WIFI_POSITIONING):
- lat, lon = qry_cell(OCDB, msg.mcc, msg.gsm_cells)
- result.append(LocEvt(devtime=msg.devtime, lon=lon,
- lat=lat, is_gps=False, imei=imei))
return reversed(result)
while True:
try:
msg = zpull.recv(zmq.NOBLOCK)
- tosend.append(Resp(msg))
+ zmsg = Resp(msg)
+ zpub.send(
+ Bcast(
+ is_incoming=False,
+ proto=proto_of_message(zmsg.packet),
+ imei=zmsg.imei,
+ packet=zmsg.packet,
+ ).packed
+ )
+ tosend.append(zmsg)
except zmq.Again:
break
elif sk == tcpfd:
parms = {
k: kwargs[k] if k in kwargs else v
for k, v in (
+ ("is_incoming", True),
("peeraddr", None),
("when", 0.0),
("imei", None),
assert len(kwargs) <= len(parms)
DB.execute(
"""insert or ignore into events
- (tstamp, imei, peeraddr, proto, packet)
+ (tstamp, imei, peeraddr, proto, packet, is_incoming)
values
- (:when, :imei, :peeraddr, :proto, :packet)
+ (:when, :imei, :peeraddr, :proto, :packet, :is_incoming)
""",
parms,
)
return
def out_decode(self, length, packet):
- # Necessary to emulate terminal, which is not implemented
- raise NotImplementedError(
- self.__class__.__name__ + ".decode() not implemented"
- )
+ # Overridden in subclasses, otherwise do not decode payload
+ return
def in_encode(self):
# Necessary to emulate terminal, which is not implemented
class WIFI_POSITIONING(_WIFI_POSITIONING):
PROTO = 0x69
RESPOND = Respond.EXT
- OUT_KWARGS = (("lat", float, None), ("lon", float, None))
+ OUT_KWARGS = (("latitude", float, None), ("longitude", float, None))
def out_encode(self):
- if self.lat is None or self.lon is None:
+ if self.latitude is None or self.longitude is None:
return b""
- return "{:+#010.8g},{:+#010.8g}".format(self.lat, self.lon).encode()
+ return "{:+#010.8g},{:+#010.8g}".format(
+ self.latitude, self.longitude
+ ).encode()
+
+ def out_decode(self, length, payload):
+ lat, lon = payload.decode().split(",")
+ self.latitude = float(lat)
+ self.longitude = float(lon)
class MANUAL_POSITIONING(GPS303Pkt):
return None
-def parse_message(packet):
+def parse_message(packet, is_incoming=True):
"""From a packet (without framing bytes) derive the XXX.In object"""
length, proto = unpack("BB", packet[:2])
payload = packet[2:]
if proto in CLASSES:
- return CLASSES[proto].In(length, payload)
+ if is_incoming:
+ return CLASSES[proto].In(length, payload)
+ else:
+ return CLASSES[proto].Out(length, payload)
else:
retobj = UNKNOWN.In(length, payload)
retobj.PROTO = proto # Override class attr with object attr
import zmq
from . import common
-from .gps303proto import parse_message, proto_by_name, WIFI_POSITIONING
+from .gps303proto import parse_message, WIFI_POSITIONING
from .opencellid import qry_cell
-from .zmsg import Bcast, Resp
+from .zmsg import Bcast, Resp, topic
log = getLogger("gps303/lookaside")
zctx = zmq.Context()
zsub = zctx.socket(zmq.SUB)
zsub.connect(conf.get("collector", "publishurl"))
- topic = pack("B", proto_by_name("WIFI_POSITIONING"))
- zsub.setsockopt(zmq.SUBSCRIBE, topic)
+ tosub = topic(WIFI_POSITIONING.PROTO)
+ zsub.setsockopt(zmq.SUBSCRIBE, tosub)
zpush = zctx.socket(zmq.PUSH)
zpush.connect(conf.get("collector", "listenurl"))
datetime.fromtimestamp(zmsg.when).astimezone(tz=timezone.utc),
msg,
)
- if not isinstance(msg, WIFI_POSITIONING):
- log.error(
- "IMEI %s from %s at %s: %s",
- zmsg.imei,
- zmsg.peeraddr,
- datetime.fromtimestamp(zmsg.when).astimezone(
- tz=timezone.utc
- ),
- msg,
- )
- continue
lat, lon = qry_cell(
conf["opencellid"]["dbfn"], msg.mcc, msg.gsm_cells
)
- resp = Resp(imei=zmsg.imei, packet=msg.Out(lat=lat, lon=lon).packed)
+ resp = Resp(
+ imei=zmsg.imei,
+ packet=msg.Out(latitude=lat, longitude=lon).packed,
+ )
log.debug("Response for lat=%s, lon=%s: %s", lat, lon, resp)
zpush.send(resp.packed)
)
for tstamp, imei, peeraddr, proto, packet in c:
+ if len(packet) > packet[0] + 1:
+ print("proto", packet[1] , "datalen", len(packet),
+ "msg.length", packet[0], file=sys.stderr)
msg = parse_message(packet)
print(
datetime.fromtimestamp(tstamp)
while True:
zmsg = Bcast(zsub.recv())
log.debug(
- "IMEI %s from %s at %s: %s",
+ "%s IMEI %s from %s at %s: %s",
+ "I" if zmsg.is_incoming else "O",
zmsg.imei,
zmsg.peeraddr,
datetime.fromtimestamp(zmsg.when).astimezone(tz=timezone.utc),
else:
peeraddr = None
stow(
+ is_incoming=zmsg.is_incoming,
peeraddr=peeraddr,
when=zmsg.when,
imei=zmsg.imei,
from . import common
from .gps303proto import *
-from .zmsg import Bcast, Resp
+from .zmsg import Bcast, Resp, topic
log = getLogger("gps303/termconfig")
"SETUP",
"POSITION_UPLOAD_INTERVAL",
):
- topic = pack("B", proto_by_name(protoname))
- zsub.setsockopt(zmq.SUBSCRIBE, topic)
+ tosub = topic(proto_by_name(protoname))
+ zsub.setsockopt(zmq.SUBSCRIBE, tosub)
zpush = zctx.socket(zmq.PUSH)
zpush.connect(conf.get("collector", "listenurl"))
import zmq
from . import common
+from .gps303proto import parse_message
from .zmsg import Bcast
log = getLogger("gps303/watch")
try:
while True:
zmsg = Bcast(zsub.recv())
- msg = parse_message(zmsg.packet)
- print(zmsg.imei, msg)
+ msg = parse_message(zmsg.packet, zmsg.is_incoming)
+ print("I" if zmsg.is_incoming else "O", zmsg.imei, msg)
except KeyboardInterrupt:
pass
from . import common
from .backlog import blinit, backlog
-from .zmsg import LocEvt
+from .gps303proto import (
+ GPS_POSITIONING,
+ WIFI_POSITIONING,
+ parse_message,
+)
+from .zmsg import Bcast, topic
log = getLogger("gps303/wsgateway")
htmlfile = None
return msgs
def wants(self, imei):
- log.debug("wants %s? set is %s on fd %d", imei, self.imeis, self.sock.fileno())
+ log.debug(
+ "wants %s? set is %s on fd %d",
+ imei,
+ self.imeis,
+ self.sock.fileno(),
+ )
return True # TODO: check subscriptions
def send(self, message):
def runserver(conf):
global htmlfile
- blinit(conf.get("storage", "dbfn"), conf.get("opencellid", "dbfn"))
+ blinit(conf.get("storage", "dbfn"))
htmlfile = conf.get("wsgateway", "htmlfile")
zctx = zmq.Context()
zsub = zctx.socket(zmq.SUB)
- zsub.connect(conf.get("lookaside", "publishurl"))
+ zsub.connect(conf.get("collector", "publishurl"))
tcpl = socket(AF_INET6, SOCK_STREAM)
tcpl.setblocking(False)
tcpl.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
while True:
neededsubs = clients.subs()
for imei in neededsubs - activesubs:
- zsub.setsockopt(zmq.SUBSCRIBE, imei.encode())
+ zsub.setsockopt(
+ zmq.SUBSCRIBE,
+ topic(GPS_POSITIONING.PROTO, True),
+ )
+ zsub.setsockopt(
+ zmq.SUBSCRIBE,
+ topic(WIFI_POSITIONING.PROTO, False),
+ )
for imei in activesubs - neededsubs:
- zsub.setsockopt(zmq.UNSUBSCRIBE, imei.encode())
+ zsub.setsockopt(
+ zmq.UNSUBSCRIBE,
+ topic(GPS_POSITIONING.PROTO, True),
+ )
+ zsub.setsockopt(
+ zmq.UNSUBSCRIBE,
+ topic(WIFI_POSITIONING.PROTO, False),
+ )
activesubs = neededsubs
log.debug("Subscribed to: %s", activesubs)
tosend = []
if sk is zsub:
while True:
try:
- zmsg = LocEvt(zsub.recv(zmq.NOBLOCK))
+ zmsg = Bcast(zsub.recv(zmq.NOBLOCK))
+ msg = parse_message(zmsg.packet)
tosend.append(zmsg)
+ log.debug("Got %s", zmsg)
except zmq.Again:
break
elif sk == tcpfd:
import ipaddress as ip
from struct import pack, unpack
-__all__ = "Bcast", "Resp"
+__all__ = "Bcast", "Resp", "topic"
def pack_peer(peeraddr):
try:
- saddr, port, _x, _y = peeraddr
+ if peeraddr is None:
+ saddr = "::"
+ port = 0
+ else:
+ saddr, port, _x, _y = peeraddr
addr = ip.ip_address(saddr)
except ValueError:
saddr, port = peeraddr
)
+def topic(proto, is_incoming=True, imei=None):
+ return (
+ pack("BB", is_incoming, proto) + b"" if imei is None else imei.encode()
+ )
+
+
class Bcast(_Zmsg):
"""Zmq message to broadcast what was received from the terminal"""
KWARGS = (
+ ("is_incoming", True),
("proto", 256),
("imei", None),
("when", None),
@property
def packed(self):
return (
- pack("B", self.proto)
+ pack("BB", int(self.is_incoming), self.proto)
+ ("0000000000000000" if self.imei is None else self.imei).encode()
+ (
b"\0\0\0\0\0\0\0\0"
)
def decode(self, buffer):
- self.proto = buffer[0]
- self.imei = buffer[1:17].decode()
+ self.is_incoming = bool(buffer[0])
+ self.proto = buffer[1]
+ self.imei = buffer[2:18].decode()
if self.imei == "0000000000000000":
self.imei = None
- self.when = unpack("!d", buffer[17:25])[0]
- self.peeraddr = unpack_peer(buffer[25:43])
- self.packet = buffer[43:]
+ self.when = unpack("!d", buffer[18:26])[0]
+ self.peeraddr = unpack_peer(buffer[26:44])
+ self.packet = buffer[44:]
class Resp(_Zmsg):
"""Zmq message received from a third party to send to the terminal"""
- KWARGS = (("imei", None), ("packet", b""))
+ KWARGS = (("imei", None), ("when", None), ("packet", b""))
@property
def packed(self):
return (
"0000000000000000" if self.imei is None else self.imei.encode()
- ) + self.packet
+ ) + (
+ b"\0\0\0\0\0\0\0\0"
+ if self.when is None
+ else pack("!d", self.when)
+ ) + self.packet
def decode(self, buffer):
self.imei = buffer[:16].decode()
- self.packet = buffer[16:]
+ self.when = unpack("!d", buffer[16:24])[0]
+ self.packet = buffer[24:]