]> average.org Git - loctrkd.git/commitdiff
the whole shebang is working now
authorEugene Crosser <crosser@average.org>
Thu, 21 Apr 2022 22:58:15 +0000 (00:58 +0200)
committerEugene Crosser <crosser@average.org>
Thu, 21 Apr 2022 22:58:15 +0000 (00:58 +0200)
gps303/collector.py
gps303/gps303proto.py
gps303/lookaside.py
gps303/termconfig.py [new file with mode: 0644]
gps303/zmsg.py

index be1bdeccc609c35b85bea167ceda15fed65bb6b9..921833c083be0ac8aecda838a338a4f72b5f818c 100644 (file)
@@ -32,7 +32,6 @@ class Client:
         log.debug("Closing fd %d (IMEI %s)", self.sock.fileno(), self.imei)
         self.sock.close()
         self.buffer = b""
-        self.imei = None
 
     def recv(self):
         """Read from the socket and parse complete messages"""
@@ -127,21 +126,23 @@ class Clients:
     def response(self, resp):
         if resp.imei in self.by_imei:
             self.by_imei[resp.imei].send(resp.packet)
+        else:
+            log.info("Not connected (IMEI %s)", resp.imei)
 
 
 def runserver(conf):
     zctx = zmq.Context()
     zpub = zctx.socket(zmq.PUB)
     zpub.bind(conf.get("collector", "publishurl"))
-    zsub = zctx.socket(zmq.SUB)
-    zsub.connect(conf.get("collector", "listenurl"))
+    zpull = zctx.socket(zmq.PULL)
+    zpull.bind(conf.get("collector", "listenurl"))
     tcpl = socket(AF_INET6, SOCK_STREAM)
     tcpl.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
     tcpl.bind(("", conf.getint("collector", "port")))
     tcpl.listen(5)
     tcpfd = tcpl.fileno()
     poller = zmq.Poller()
-    poller.register(zsub, flags=zmq.POLLIN)
+    poller.register(zpull, flags=zmq.POLLIN)
     poller.register(tcpfd, flags=zmq.POLLIN)
     clients = Clients()
     try:
@@ -149,19 +150,19 @@ def runserver(conf):
             tosend = []
             topoll = []
             tostop = []
-            events = poller.poll(10)
+            events = poller.poll(1000)
             for sk, fl in events:
-                if sk is zsub:
+                if sk is zpull:
                     while True:
                         try:
-                            msg = zsub.recv(zmq.NOBLOCK)
+                            msg = zpull.recv(zmq.NOBLOCK)
                             tosend.append(Resp(msg))
                         except zmq.Again:
                             break
                 elif sk == tcpfd:
                     clntsock, clntaddr = tcpl.accept()
                     topoll.append((clntsock, clntaddr))
-                else:
+                elif fl & zmq.POLLIN:
                     received = clients.recv(sk)
                     if received is None:
                         log.debug(
@@ -192,15 +193,18 @@ def runserver(conf):
                                 clients.response(
                                     Resp(imei=imei, packet=respmsg)
                                 )
+                else:
+                    log.debug("Stray event: %s on socket %s", fl, sk)
             # poll queue consumed, make changes now
             for fd in tostop:
                 poller.unregister(fd)
                 clients.stop(fd)
             for zmsg in tosend:
+                log.debug("Sending to the client: %s", zmsg)
                 clients.response(zmsg)
             for clntsock, clntaddr in topoll:
                 fd = clients.add(clntsock, clntaddr)
-                poller.register(fd)
+                poller.register(fd, flags=zmq.POLLIN)
     except KeyboardInterrupt:
         pass
 
index 445c34ac21d1e7c3a9fc86637bec0398280a55f6..32aae55970b134409342ad766dfdf17710a7251e 100755 (executable)
@@ -58,6 +58,7 @@ log = getLogger("gps303")
 
 class GPS303Pkt:
     PROTO: int
+    INLINE = True
 
     def __init__(self, *args, **kwargs):
         assert len(args) == 0
@@ -96,15 +97,15 @@ class GPS303Pkt:
 
     @classmethod
     def inline_response(cls, packet):
-        return cls.make_packet(b"")
+        if cls.INLINE:
+            return cls.make_packet(b"")
+        else:
+            return None
 
 
 class UNKNOWN(GPS303Pkt):
     PROTO = 256  # > 255 is impossible in real packets
-
-    @classmethod
-    def inline_response(cls, packet):
-        return None
+    INLINE = False
 
 
 class LOGIN(GPS303Pkt):
@@ -117,18 +118,16 @@ class LOGIN(GPS303Pkt):
         self.ver = unpack("B", payload[-1:])[0]
         return self
 
-    def response(self):
-        return self.make_packet(b"")
-
 
 class SUPERVISION(GPS303Pkt):  # Server sends supervision number status
     PROTO = 0x05
+    INLINE = False
 
     def response(self, supnum=0):
         # 1: The device automatically answers Pickup effect
         # 2: Automatically Answering Two-way Calls
         # 3: Ring manually answer the two-way call
-        return super().response(b"")
+        return self.make_packet(pack("B", supnum))
 
 
 class HEARTBEAT(GPS303Pkt):
@@ -163,9 +162,6 @@ class _GPS_POSITIONING(GPS303Pkt):
     def inline_response(cls, packet):
         return cls.make_packet(packet[2:8])
 
-    def response(self):
-        return self.make_packet(self.dtime)
-
 
 class GPS_POSITIONING(_GPS_POSITIONING):
     PROTO = 0x10
@@ -177,6 +173,7 @@ class GPS_OFFLINE_POSITIONING(_GPS_POSITIONING):
 
 class STATUS(GPS303Pkt):
     PROTO = 0x13
+    INLINE = False
 
     @classmethod
     def from_packet(cls, length, payload):
@@ -196,12 +193,8 @@ class STATUS(GPS303Pkt):
             self.signal = None
         return self
 
-    @classmethod
-    def inline_response(cls, packet):
-        return None
-
     def response(self, upload_interval=25):  # Set interval in minutes
-        return super().response(pack("B", upload_interval))
+        return self.make_packet(pack("B", upload_interval))
 
 
 class HIBERNATION(GPS303Pkt):
@@ -210,6 +203,7 @@ class HIBERNATION(GPS303Pkt):
 
 class RESET(GPS303Pkt):  # Device sends when it got reset SMS
     PROTO = 0x15
+    INLINE = False
 
     def response(self):  # Server can send to initiate factory reset
         return self.make_packet(b"")
@@ -217,9 +211,10 @@ class RESET(GPS303Pkt):  # Device sends when it got reset SMS
 
 class WHITELIST_TOTAL(GPS303Pkt):  # Server sends to initiage sync (0x58)
     PROTO = 0x16
+    INLINE = False
 
     def response(self, number=3):  # Number of whitelist entries
-        return super().response(pack("B", number))
+        return self.make_packet(pack("B", number))
 
 
 class _WIFI_POSITIONING(GPS303Pkt):
@@ -258,9 +253,6 @@ class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
     def inline_response(cls, packet):
         return cls.make_packet(packet[2:8])
 
-    def response(self):
-        return super().response(self.dtime)
-
 
 class TIME(GPS303Pkt):
     PROTO = 0x30
@@ -271,16 +263,13 @@ class TIME(GPS303Pkt):
             "!BBHBBBBB", 7, cls.PROTO, *datetime.utcnow().timetuple()[:6]
         )
 
-    def response(self):
-        payload = pack("!HBBBBB", *datetime.utcnow().timetuple()[:6])
-        return super().response(payload)
-
 
 class PROHIBIT_LBS(GPS303Pkt):
     PROTO = 0x33
+    INLINE = False
 
     def response(self, status=1):  # Server sent, 0-off, 1-on
-        return super().response(pack("B", status))
+        return self.make_packet(pack("B", status))
 
 
 class MOM_PHONE(GPS303Pkt):
@@ -290,9 +279,6 @@ class MOM_PHONE(GPS303Pkt):
 class STOP_UPLOAD(GPS303Pkt):  # Server response to LOGIN to thwart the device
     PROTO = 0x44
 
-    def response(self):
-        return super().response(b"")
-
 
 class STOP_ALARM(GPS303Pkt):
     PROTO = 0x56
@@ -300,6 +286,7 @@ class STOP_ALARM(GPS303Pkt):
 
 class SETUP(GPS303Pkt):
     PROTO = 0x57
+    INLINE = False
 
     def response(
         self,
@@ -333,7 +320,7 @@ class SETUP(GPS303Pkt):
             ]
             + [b";".join([el.encode() for el in phoneNumbers])]
         )
-        return super().response(payload)
+        return self.make_packet(payload)
 
 
 class SYNCHRONOUS_WHITELIST(GPS303Pkt):
@@ -346,10 +333,7 @@ class RESTORE_PASSWORD(GPS303Pkt):
 
 class WIFI_POSITIONING(_WIFI_POSITIONING):
     PROTO = 0x69
-
-    @classmethod
-    def inline_response(cls, packet):
-        return None
+    INLINE = False
 
     def response(self, lat=None, lon=None):
         if lat is None or lon is None:
@@ -383,6 +367,7 @@ class VIBRATION_RECEIVED(GPS303Pkt):
 
 class POSITION_UPLOAD_INTERVAL(GPS303Pkt):
     PROTO = 0x98
+    INLINE = False
 
     @classmethod
     def from_packet(cls, length, payload):
@@ -390,12 +375,8 @@ class POSITION_UPLOAD_INTERVAL(GPS303Pkt):
         self.interval = unpack("!H", payload[:2])
         return self
 
-    @classmethod
-    def inline_response(cls, packet):
-        return cls.make_packet(packet[2:4])
-
-    def response(self):
-        return self.make_packet(pack("!H", self.interval))
+    def response(self, interval=10):
+        return self.make_packet(pack("!H", interval))
 
 
 class SOS_ALARM(GPS303Pkt):
index d9af989aff2a3b58711938bbb548053355fdd03a..05b7a49a62a7240bfc39477d10a4deab97422021 100644 (file)
@@ -1,13 +1,59 @@
-"""
-For when responding to the terminal is not trivial
-"""
+""" Estimate coordinates from WIFI_POSITIONING and send back """
 
-from .gps303proto import *
+from datetime import datetime, timezone
+from logging import getLogger
+from struct import pack
+import zmq
+
+from . import common
+from .gps303proto import parse_message, proto_by_name, WIFI_POSITIONING
 from .opencellid import qry_cell
+from .zmsg import Bcast, Resp
+
+log = getLogger("gps303/lookaside")
+
+
+def runserver(conf):
+    zctx = zmq.Context()
+    zsub = zctx.socket(zmq.SUB)
+    zsub.connect(conf.get("collector", "publishurl"))
+    topic = pack("B", proto_by_name("WIFI_POSITIONING"))
+    zsub.setsockopt(zmq.SUBSCRIBE, topic)
+    zpush = zctx.socket(zmq.PUSH)
+    zpush.connect(conf.get("collector", "listenurl"))
+
+    try:
+        while True:
+            zmsg = Bcast(zsub.recv())
+            msg = parse_message(zmsg.packet)
+            log.debug(
+                "IMEI %s from %s at %s: %s",
+                zmsg.imei,
+                zmsg.peeraddr,
+                datetime.fromtimestamp(zmsg.when).astimezone(tz=timezone.utc),
+                msg,
+            )
+            if not isinstance(msg, WIFI_POSITIONING):
+                log.error(
+                    "IMEI %s from %s at %s: %s",
+                    zmsg.imei,
+                    zmsg.peeraddr,
+                    datetime.fromtimestamp(zmsg.when).astimezone(
+                        tz=timezone.utc
+                    ),
+                    msg,
+                )
+                continue
+            lat, lon = qry_cell(
+                conf["opencellid"]["dbfn"], msg.mcc, msg.gsm_cells
+            )
+            resp = Resp(imei=zmsg.imei, packet=msg.response(lat=lat, lon=lon))
+            log.debug("Response for lat=%s, lon=%s: %s", lat, lon, resp)
+            zpush.send(resp.packed)
+
+    except KeyboardInterrupt:
+        pass
+
 
-def prepare_response(conf, msg):
-    if isinstance(msg, WIFI_POSITIONING):
-        lat, lon = qry_cell(conf["opencellid"]["dbfn"],
-                msg.mcc, msg.gsm_cells)
-        return {"lat": lat, "lon": lon}
-    return {}
+if __name__.endswith("__main__"):
+    runserver(common.init(log))
diff --git a/gps303/termconfig.py b/gps303/termconfig.py
new file mode 100644 (file)
index 0000000..10c6286
--- /dev/null
@@ -0,0 +1,54 @@
+""" For when responding to the terminal is not trivial """
+
+from datetime import datetime, timezone
+from logging import getLogger
+from struct import pack
+import zmq
+
+from . import common
+from .gps303proto import parse_message, proto_by_name
+from .zmsg import Bcast, Resp
+
+log = getLogger("gps303/termconfig")
+
+
+def runserver(conf):
+    zctx = zmq.Context()
+    zsub = zctx.socket(zmq.SUB)
+    zsub.connect(conf.get("collector", "publishurl"))
+    for protoname in (
+        "SUPERVISION",
+        "STATUS",
+        "RESET",
+        "WHITELIST_TOTAL",
+        "PROHIBIT_LBS",
+        "SETUP",
+        "POSITION_UPLOAD_INTERVAL",
+    ):
+        topic = pack("B", proto_by_name(protoname))
+        zsub.setsockopt(zmq.SUBSCRIBE, topic)
+    zpush = zctx.socket(zmq.PUSH)
+    zpush.connect(conf.get("collector", "listenurl"))
+
+    try:
+        while True:
+            zmsg = Bcast(zsub.recv())
+            msg = parse_message(zmsg.packet)
+            log.debug(
+                "IMEI %s from %s at %s: %s",
+                zmsg.imei,
+                zmsg.peeraddr,
+                datetime.fromtimestamp(zmsg.when).astimezone(tz=timezone.utc),
+                msg,
+            )
+            # TODO get data from the config
+            resp = Resp(imei=zmsg.imei, packet=msg.response())
+            log.debug("Response: %s", resp)
+            zpush.send(resp.packed)
+
+    except KeyboardInterrupt:
+        pass
+
+
+if __name__.endswith("__main__"):
+    runserver(common.init(log))
index 7ab3ce4caf755271bd91531cc81f9103240d8b35..6e591ab65c2fb9a8c4ec7ff7a9bcab8d69e7de08 100644 (file)
@@ -5,13 +5,19 @@ from struct import pack, unpack
 
 __all__ = "Bcast", "Resp"
 
+
 def pack_peer(peeraddr):
     saddr, port, _x, _y = peeraddr
     addr6 = ip.ip_address(saddr)
     addr = addr6.ipv4_mapped
     if addr is None:
         addr = addr6
-    return pack("B", addr.version) + (addr.packed + b"\0\0\0\0\0\0\0\0\0\0\0\0")[:16] + pack("!H", port)
+    return (
+        pack("B", addr.version)
+        + (addr.packed + b"\0\0\0\0\0\0\0\0\0\0\0\0")[:16]
+        + pack("!H", port)
+    )
+
 
 def unpack_peer(buffer):
     version = buffer[0]
@@ -41,6 +47,22 @@ class _Zmsg:
                 + str(kwargs)
             )
 
+    def __repr__(self):
+        return "{}({})".format(
+            self.__class__.__name__,
+            ", ".join(
+                [
+                    "{}={}".format(
+                        k,
+                        'bytes.fromhex("{}")'.format(getattr(self, k).hex())
+                        if isinstance(getattr(self, k), bytes)
+                        else getattr(self, k),
+                    )
+                    for k, _ in self.KWARGS
+                ]
+            ),
+        )
+
     def decode(self, buffer):
         raise RuntimeError(
             self.__class__.__name__ + "must implement `encode()` method"
@@ -69,7 +91,11 @@ class Bcast(_Zmsg):
         return (
             pack("B", self.proto)
             + ("0000000000000000" if self.imei is None else self.imei).encode()
-            + (b"\0\0\0\0\0\0\0\0" if self.when is None else pack("!d", self.when))
+            + (
+                b"\0\0\0\0\0\0\0\0"
+                if self.when is None
+                else pack("!d", self.when)
+            )
             + pack_peer(self.peeraddr)
             + self.packet
         )