]> average.org Git - loctrkd.git/blobdiff - gps303/collector.py
WIP to reorganise to microservices
[loctrkd.git] / gps303 / collector.py
index d8ca86c0791a2b8c0cda3dae053153b649c863ce..7ffa7526cbce36f3ff98d4d88570b97ee8c84e94 100644 (file)
@@ -1,3 +1,5 @@
+""" TCP server that communicates with terminals """
+
 from getopt import getopt
 from logging import getLogger, StreamHandler, DEBUG, INFO
 from logging.handlers import SysLogHandler
@@ -7,7 +9,7 @@ import sys
 import zmq
 
 from .config import readconfig
-from .GT06mod import handle_packet, make_response, LOGIN, set_config
+from .gps303proto import handle_packet, make_response, LOGIN, set_config
 
 CONF = "/etc/gps303.conf"
 
@@ -15,42 +17,74 @@ log = getLogger("gps303/collector")
 
 
 class Bcast:
+    """Zmq message to broadcast what was received from the terminal"""
     def __init__(self, imei, msg):
         self.as_bytes = imei.encode() + msg.encode()
 
 
-class Zmsg:
+class Resp:
+    """Zmq message received from a third party to send to the terminal"""
     def __init__(self, msg):
         self.imei = msg[:16].decode()
         self.payload = msg[16:]
 
 
 class Client:
-    def __init__(self, clntsock, clntaddr):
-        self.clntsock = clntsock
-        self.clntaddr = clntaddr
+    """Connected socket to the terminal plus buffer and metadata"""
+    def __init__(self, sock, addr):
+        self.sock = sock
+        self.addr = addr
         self.buffer = b""
         self.imei = None
 
     def close(self):
-        self.clntsock.close()
+        log.debug("Closing fd %d (IMEI %s)", self.sock.fileno(), self.imei)
+        self.sock.close()
+        self.buffer = b""
+        self.imei = None
 
     def recv(self):
-        packet = self.clntsock.recv(4096)
-        if not packet:
+        """ Read from the socket and parse complete messages """
+        try:
+            segment = self.sock.recv(4096)
+        except OSError:
+            log.warning("Reading from fd %d (IMEI %s): %s",
+                    self.sock.fileno(), self.imei, e)
+            return None
+        if not segment:  # Terminal has closed connection
+            log.info("EOF reading from fd %d (IMEI %s)",
+                    self.sock.fileno(), self.imei)
             return None
         when = time()
-        self.buffer += packet
-        # implement framing properly
-        msg = handle_packet(packet, self.clntaddr, when)
-        self.buffer = self.buffer[len(packet):]
-        if isinstance(msg, LOGIN):
-            self.imei = msg.imei
-        return msg
+        self.buffer += segment
+        msgs = []
+        while True:
+            framestart = self.buffer.find(b"xx")
+            if framestart == -1:  # No frames, return whatever we have
+                break
+            if framestart > 0:  # Should not happen, report
+                log.warning("Undecodable data \"%s\" from fd %d (IMEI %s)",
+                        self.buffer[:framestart].hex(), self.sock.fileno(), self.imei)
+                self.buffer = self.buffer[framestart:]
+            # At this point, buffer starts with a packet
+            frameend = self.buffer.find(b"\r\n", 4)
+            if frameend == -1:  # Incomplete frame, return what we have
+                break
+            msg = parse_message(self.buffer[:frameend])
+            self.buffer = self.buffer[frameend+2:]
+            if isinstance(msg, LOGIN):
+                self.imei = msg.imei
+                log.info("LOGIN from fd %d: IMEI %s",
+                        self.sock.fileno(), self.imei)
+            msgs.append(msg)
+        return msgs
 
     def send(self, buffer):
-        self.clntsock.send(buffer)
-
+        try:
+            self.sock.send(b"xx" + buffer + b"\r\n")
+        except OSError as e:
+            log.error("Sending to fd %d (IMEI %s): %s",
+                    self.sock.fileno, self.imei, e)
 
 class Clients:
     def __init__(self):
@@ -64,6 +98,7 @@ class Clients:
 
     def stop(self, fd):
         clnt = by_fd[fd]
+        log.info("Stop serving fd %d (IMEI %s)", clnt.sock.fileno(), clnt.imei)
         clnt.close()
         if clnt.imei:
             del self.by_imei[clnt.imei]
@@ -71,14 +106,17 @@ class Clients:
 
     def recv(self, fd):
         clnt = by_fd[fd]
-        msg = clnt.recv()
-        if isinstance(msg, LOGIN):
-            self.by_imei[clnt.imei] = clnt
-        return clnt.imei, msg
+        msgs = clnt.recv()
+        result = []
+        for msg in msgs:
+            if isinstance(msg, LOGIN):
+                self.by_imei[clnt.imei] = clnt
+            result.append(clnt.imei, msg)
+        return result
 
-    def response(self, zmsg):
-        if zmsg.imei in self.by_imei:
-            clnt = self.by_imei[zmsg.imei].send(zmsg.payload)
+    def response(self, resp):
+        if resp.imei in self.by_imei:
+            self.by_imei[resp.imei].send(resp.payload)
 
 
 def runserver(opts, conf):
@@ -107,7 +145,7 @@ def runserver(opts, conf):
                     while True:
                         try:
                             msg = zsub.recv(zmq.NOBLOCK)
-                            tosend.append(Zmsg(msg))
+                            tosend.append(Resp(msg))
                         except zmq.Again:
                             break
                 elif sk == tcpfd:
@@ -117,6 +155,7 @@ def runserver(opts, conf):
                     imei, msg = clients.recv(sk)
                     zpub.send(Bcast(imei, msg).as_bytes)
                     if msg is None or isinstance(msg, HIBERNATION):
+                        log.debug("HIBERNATION from fd %d", sk)
                         tostop.append(sk)
             # poll queue consumed, make changes now
             for fd in tostop: