]> average.org Git - loctrkd.git/commitdiff
WIP retoure messaging
authorEugene Crosser <crosser@average.org>
Sun, 8 May 2022 22:46:12 +0000 (00:46 +0200)
committerEugene Crosser <crosser@average.org>
Sun, 8 May 2022 22:46:12 +0000 (00:46 +0200)
gps303/backlog.py
gps303/collector.py
gps303/evstore.py
gps303/gps303proto.py
gps303/lookaside.py
gps303/qry.py
gps303/storage.py
gps303/termconfig.py
gps303/watch.py
gps303/wsgateway.py
gps303/zmsg.py

index 39dfcbcfbb6b2d70ada9f0871e9ae83dbebea8fa..1322286457be68471ee3605d4850429afab904dc 100644 (file)
@@ -3,24 +3,16 @@
 from .opencellid import qry_cell
 from .evstore import initdb, fetch
 from .gps303proto import GPS_POSITIONING, WIFI_POSITIONING, parse_message
-from .zmsg import LocEvt
 
-OCDB = None
 
-def blinit(evdb, ocdb):
-    global OCDB
-    OCDB = ocdb
+def blinit(evdb):
     initdb(evdb)
 
+
 def backlog(imei, backlog):
     result = []
-    for packet in fetch(imei, (GPS_POSITIONING.PROTO, WIFI_POSITIONING.PROTO), backlog):
+    for packet in fetch(
+        imei, (GPS_POSITIONING.PROTO, WIFI_POSITIONING.PROTO), backlog
+    ):
         msg = parse_message(packet)
-        if isinstance(msg, GPS_POSITIONING):
-            result.append(LocEvt(devtime=msg.devtime, lon=msg.longitude,
-                lat=msg.latitude, is_gps=True, imei=imei))
-        elif isinstance(msg, WIFI_POSITIONING):
-            lat, lon = qry_cell(OCDB, msg.mcc, msg.gsm_cells)
-            result.append(LocEvt(devtime=msg.devtime, lon=lon,
-                lat=lat, is_gps=False, imei=imei))
     return reversed(result)
index 9f305e55f60197f38d84e820a793192bb93abbc2..e63da322f6ef1fb7c5f5da11d945c610aa135958 100644 (file)
@@ -166,7 +166,16 @@ def runserver(conf):
                     while True:
                         try:
                             msg = zpull.recv(zmq.NOBLOCK)
-                            tosend.append(Resp(msg))
+                            zmsg = Resp(msg)
+                            zpub.send(
+                                Bcast(
+                                    is_incoming=False,
+                                    proto=proto_of_message(zmsg.packet),
+                                    imei=zmsg.imei,
+                                    packet=zmsg.packet,
+                                ).packed
+                            )
+                            tosend.append(zmsg)
                         except zmq.Again:
                             break
                 elif sk == tcpfd:
index ec463c7c988990e3360cc67d741f87024bedb290..76173624bed9069359f202f46d07e0eba3cc8491 100644 (file)
@@ -31,6 +31,7 @@ def stow(**kwargs):
     parms = {
         k: kwargs[k] if k in kwargs else v
         for k, v in (
+            ("is_incoming", True),
             ("peeraddr", None),
             ("when", 0.0),
             ("imei", None),
@@ -41,9 +42,9 @@ def stow(**kwargs):
     assert len(kwargs) <= len(parms)
     DB.execute(
         """insert or ignore into events
-                (tstamp, imei, peeraddr, proto, packet)
+                (tstamp, imei, peeraddr, proto, packet, is_incoming)
                 values
-                (:when, :imei, :peeraddr, :proto, :packet)
+                (:when, :imei, :peeraddr, :proto, :packet, :is_incoming)
         """,
         parms,
     )
index 136f86ba3df3e5a47044bfac09536cd691a1f45e..8f4ffcce124e23fd1a4baeb236e434578cbcae5f 100755 (executable)
@@ -186,10 +186,8 @@ class GPS303Pkt(metaclass=MetaPkt):
         return
 
     def out_decode(self, length, packet):
-        # Necessary to emulate terminal, which is not implemented
-        raise NotImplementedError(
-            self.__class__.__name__ + ".decode() not implemented"
-        )
+        # Overridden in subclasses, otherwise do not decode payload
+        return
 
     def in_encode(self):
         # Necessary to emulate terminal, which is not implemented
@@ -536,12 +534,19 @@ class RESTORE_PASSWORD(GPS303Pkt):
 class WIFI_POSITIONING(_WIFI_POSITIONING):
     PROTO = 0x69
     RESPOND = Respond.EXT
-    OUT_KWARGS = (("lat", float, None), ("lon", float, None))
+    OUT_KWARGS = (("latitude", float, None), ("longitude", float, None))
 
     def out_encode(self):
-        if self.lat is None or self.lon is None:
+        if self.latitude is None or self.longitude is None:
             return b""
-        return "{:+#010.8g},{:+#010.8g}".format(self.lat, self.lon).encode()
+        return "{:+#010.8g},{:+#010.8g}".format(
+            self.latitude, self.longitude
+        ).encode()
+
+    def out_decode(self, length, payload):
+        lat, lon = payload.decode().split(",")
+        self.latitude = float(lat)
+        self.longitude = float(lon)
 
 
 class MANUAL_POSITIONING(GPS303Pkt):
@@ -648,12 +653,15 @@ def inline_response(packet):
     return None
 
 
-def parse_message(packet):
+def parse_message(packet, is_incoming=True):
     """From a packet (without framing bytes) derive the XXX.In object"""
     length, proto = unpack("BB", packet[:2])
     payload = packet[2:]
     if proto in CLASSES:
-        return CLASSES[proto].In(length, payload)
+        if is_incoming:
+            return CLASSES[proto].In(length, payload)
+        else:
+            return CLASSES[proto].Out(length, payload)
     else:
         retobj = UNKNOWN.In(length, payload)
         retobj.PROTO = proto  # Override class attr with object attr
index ecd5dfa089cfbd03e0c2ff6795779d77b2e476d4..dd5a449bdbcfa771fd5358f25b1d56afd52b386a 100644 (file)
@@ -7,9 +7,9 @@ from struct import pack
 import zmq
 
 from . import common
-from .gps303proto import parse_message, proto_by_name, WIFI_POSITIONING
+from .gps303proto import parse_message, WIFI_POSITIONING
 from .opencellid import qry_cell
-from .zmsg import Bcast, Resp
+from .zmsg import Bcast, Resp, topic
 
 log = getLogger("gps303/lookaside")
 
@@ -18,8 +18,8 @@ def runserver(conf):
     zctx = zmq.Context()
     zsub = zctx.socket(zmq.SUB)
     zsub.connect(conf.get("collector", "publishurl"))
-    topic = pack("B", proto_by_name("WIFI_POSITIONING"))
-    zsub.setsockopt(zmq.SUBSCRIBE, topic)
+    tosub = topic(WIFI_POSITIONING.PROTO)
+    zsub.setsockopt(zmq.SUBSCRIBE, tosub)
     zpush = zctx.socket(zmq.PUSH)
     zpush.connect(conf.get("collector", "listenurl"))
 
@@ -34,21 +34,13 @@ def runserver(conf):
                 datetime.fromtimestamp(zmsg.when).astimezone(tz=timezone.utc),
                 msg,
             )
-            if not isinstance(msg, WIFI_POSITIONING):
-                log.error(
-                    "IMEI %s from %s at %s: %s",
-                    zmsg.imei,
-                    zmsg.peeraddr,
-                    datetime.fromtimestamp(zmsg.when).astimezone(
-                        tz=timezone.utc
-                    ),
-                    msg,
-                )
-                continue
             lat, lon = qry_cell(
                 conf["opencellid"]["dbfn"], msg.mcc, msg.gsm_cells
             )
-            resp = Resp(imei=zmsg.imei, packet=msg.Out(lat=lat, lon=lon).packed)
+            resp = Resp(
+                imei=zmsg.imei,
+                packet=msg.Out(latitude=lat, longitude=lon).packed,
+            )
             log.debug("Response for lat=%s, lon=%s: %s", lat, lon, resp)
             zpush.send(resp.packed)
 
index 17228c650b3a67148bfbfb96a26ef6bc4874bcd1..4aeb33c4a14ea3f9c8425ea2312e1d25775e3d43 100644 (file)
@@ -21,6 +21,9 @@ c.execute(
 )
 
 for tstamp, imei, peeraddr, proto, packet in c:
+    if len(packet) >  packet[0] + 1:
+        print("proto", packet[1] , "datalen", len(packet),
+                "msg.length", packet[0], file=sys.stderr)
     msg = parse_message(packet)
     print(
         datetime.fromtimestamp(tstamp)
index 38ffaae863e979eb93fca5e948e516eaa497dae3..5368efc575e9910bae5da767c0d1adcede239624 100644 (file)
@@ -25,7 +25,8 @@ def runserver(conf):
         while True:
             zmsg = Bcast(zsub.recv())
             log.debug(
-                "IMEI %s from %s at %s: %s",
+                "%s IMEI %s from %s at %s: %s",
+                "I" if zmsg.is_incoming else "O",
                 zmsg.imei,
                 zmsg.peeraddr,
                 datetime.fromtimestamp(zmsg.when).astimezone(tz=timezone.utc),
@@ -37,6 +38,7 @@ def runserver(conf):
             else:
                 peeraddr = None
             stow(
+                is_incoming=zmsg.is_incoming,
                 peeraddr=peeraddr,
                 when=zmsg.when,
                 imei=zmsg.imei,
index 771f988f038dc6785d7d34a5ff1b1164db476beb..f481a84ff514fcd3b2cb475661453178b796b64b 100644 (file)
@@ -7,7 +7,7 @@ import zmq
 
 from . import common
 from .gps303proto import *
-from .zmsg import Bcast, Resp
+from .zmsg import Bcast, Resp, topic
 
 log = getLogger("gps303/termconfig")
 
@@ -22,8 +22,8 @@ def runserver(conf):
         "SETUP",
         "POSITION_UPLOAD_INTERVAL",
     ):
-        topic = pack("B", proto_by_name(protoname))
-        zsub.setsockopt(zmq.SUBSCRIBE, topic)
+        tosub = topic(proto_by_name(protoname))
+        zsub.setsockopt(zmq.SUBSCRIBE, tosub)
     zpush = zctx.socket(zmq.PUSH)
     zpush.connect(conf.get("collector", "listenurl"))
 
index ed51ad19ebea040062d3e3cca1ead844bfb9e5af..2e5b412fb89c87c98d0338ff3069f934f3132896 100644 (file)
@@ -5,6 +5,7 @@ from logging import getLogger
 import zmq
 
 from . import common
+from .gps303proto import parse_message
 from .zmsg import Bcast
 
 log = getLogger("gps303/watch")
@@ -19,8 +20,8 @@ def runserver(conf):
     try:
         while True:
             zmsg = Bcast(zsub.recv())
-            msg = parse_message(zmsg.packet)
-            print(zmsg.imei, msg)
+            msg = parse_message(zmsg.packet, zmsg.is_incoming)
+            print("I" if zmsg.is_incoming else "O", zmsg.imei, msg)
     except KeyboardInterrupt:
         pass
 
index 8a5af7f43ec37c94d1500c9ab3a46bf934991e33..80926fbceb53b381d067eb2560d82cbeb2aa9aad 100644 (file)
@@ -18,7 +18,12 @@ import zmq
 
 from . import common
 from .backlog import blinit, backlog
-from .zmsg import LocEvt
+from .gps303proto import (
+    GPS_POSITIONING,
+    WIFI_POSITIONING,
+    parse_message,
+)
+from .zmsg import Bcast, topic
 
 log = getLogger("gps303/wsgateway")
 htmlfile = None
@@ -157,7 +162,12 @@ class Client:
         return msgs
 
     def wants(self, imei):
-        log.debug("wants %s? set is %s on fd %d", imei, self.imeis, self.sock.fileno())
+        log.debug(
+            "wants %s? set is %s on fd %d",
+            imei,
+            self.imeis,
+            self.sock.fileno(),
+        )
         return True  # TODO: check subscriptions
 
     def send(self, message):
@@ -224,11 +234,11 @@ class Clients:
 def runserver(conf):
     global htmlfile
 
-    blinit(conf.get("storage", "dbfn"), conf.get("opencellid", "dbfn"))
+    blinit(conf.get("storage", "dbfn"))
     htmlfile = conf.get("wsgateway", "htmlfile")
     zctx = zmq.Context()
     zsub = zctx.socket(zmq.SUB)
-    zsub.connect(conf.get("lookaside", "publishurl"))
+    zsub.connect(conf.get("collector", "publishurl"))
     tcpl = socket(AF_INET6, SOCK_STREAM)
     tcpl.setblocking(False)
     tcpl.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
@@ -245,9 +255,23 @@ def runserver(conf):
         while True:
             neededsubs = clients.subs()
             for imei in neededsubs - activesubs:
-                zsub.setsockopt(zmq.SUBSCRIBE, imei.encode())
+                zsub.setsockopt(
+                    zmq.SUBSCRIBE,
+                    topic(GPS_POSITIONING.PROTO, True),
+                )
+                zsub.setsockopt(
+                    zmq.SUBSCRIBE,
+                    topic(WIFI_POSITIONING.PROTO, False),
+                )
             for imei in activesubs - neededsubs:
-                zsub.setsockopt(zmq.UNSUBSCRIBE, imei.encode())
+                zsub.setsockopt(
+                    zmq.UNSUBSCRIBE,
+                    topic(GPS_POSITIONING.PROTO, True),
+                )
+                zsub.setsockopt(
+                    zmq.UNSUBSCRIBE,
+                    topic(WIFI_POSITIONING.PROTO, False),
+                )
             activesubs = neededsubs
             log.debug("Subscribed to: %s", activesubs)
             tosend = []
@@ -259,8 +283,10 @@ def runserver(conf):
                 if sk is zsub:
                     while True:
                         try:
-                            zmsg = LocEvt(zsub.recv(zmq.NOBLOCK))
+                            zmsg = Bcast(zsub.recv(zmq.NOBLOCK))
+                            msg = parse_message(zmsg.packet)
                             tosend.append(zmsg)
+                            log.debug("Got %s", zmsg)
                         except zmq.Again:
                             break
                 elif sk == tcpfd:
index 211011392a296f200cc6cbce99188ceda20c1f44..8179616485f0d9f5b89148e4dcf12d21df4294ae 100644 (file)
@@ -3,12 +3,16 @@
 import ipaddress as ip
 from struct import pack, unpack
 
-__all__ = "Bcast", "Resp"
+__all__ = "Bcast", "Resp", "topic"
 
 
 def pack_peer(peeraddr):
     try:
-        saddr, port, _x, _y = peeraddr
+        if peeraddr is None:
+            saddr = "::"
+            port = 0
+        else:
+            saddr, port, _x, _y = peeraddr
         addr = ip.ip_address(saddr)
     except ValueError:
         saddr, port = peeraddr
@@ -75,10 +79,17 @@ class _Zmsg:
         )
 
 
+def topic(proto, is_incoming=True, imei=None):
+    return (
+        pack("BB", is_incoming, proto) + b"" if imei is None else imei.encode()
+    )
+
+
 class Bcast(_Zmsg):
     """Zmq message to broadcast what was received from the terminal"""
 
     KWARGS = (
+        ("is_incoming", True),
         ("proto", 256),
         ("imei", None),
         ("when", None),
@@ -89,7 +100,7 @@ class Bcast(_Zmsg):
     @property
     def packed(self):
         return (
-            pack("B", self.proto)
+            pack("BB", int(self.is_incoming), self.proto)
             + ("0000000000000000" if self.imei is None else self.imei).encode()
             + (
                 b"\0\0\0\0\0\0\0\0"
@@ -101,26 +112,32 @@ class Bcast(_Zmsg):
         )
 
     def decode(self, buffer):
-        self.proto = buffer[0]
-        self.imei = buffer[1:17].decode()
+        self.is_incoming = bool(buffer[0])
+        self.proto = buffer[1]
+        self.imei = buffer[2:18].decode()
         if self.imei == "0000000000000000":
             self.imei = None
-        self.when = unpack("!d", buffer[17:25])[0]
-        self.peeraddr = unpack_peer(buffer[25:43])
-        self.packet = buffer[43:]
+        self.when = unpack("!d", buffer[18:26])[0]
+        self.peeraddr = unpack_peer(buffer[26:44])
+        self.packet = buffer[44:]
 
 
 class Resp(_Zmsg):
     """Zmq message received from a third party to send to the terminal"""
 
-    KWARGS = (("imei", None), ("packet", b""))
+    KWARGS = (("imei", None), ("when", None), ("packet", b""))
 
     @property
     def packed(self):
         return (
             "0000000000000000" if self.imei is None else self.imei.encode()
-        ) + self.packet
+        ) + (
+                b"\0\0\0\0\0\0\0\0"
+                if self.when is None
+                else pack("!d", self.when)
+            ) + self.packet
 
     def decode(self, buffer):
         self.imei = buffer[:16].decode()
-        self.packet = buffer[16:]
+        self.when = unpack("!d", buffer[16:24])[0]
+        self.packet = buffer[24:]