X-Git-Url: http://average.org/gitweb/?a=blobdiff_plain;f=gps303%2Fcollector.py;h=be1bdeccc609c35b85bea167ceda15fed65bb6b9;hb=3dea189c7bb47f02db07b52fdcda53fdb986fd2b;hp=08178f272bd57d4d1cd102cbd6c4de45c66fb6fc;hpb=c772f9219b7a90be63174ef8e66c2f1249200c85;p=loctrkd.git diff --git a/gps303/collector.py b/gps303/collector.py index 08178f2..be1bdec 100644 --- a/gps303/collector.py +++ b/gps303/collector.py @@ -1,40 +1,24 @@ """ TCP server that communicates with terminals """ from logging import getLogger -from socket import socket, AF_INET, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR +from socket import socket, AF_INET6, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR from time import time from struct import pack import zmq from . import common -from .gps303proto import HIBERNATION, LOGIN, parse_message, proto_of_message +from .gps303proto import ( + HIBERNATION, + LOGIN, + inline_response, + parse_message, + proto_of_message, +) +from .zmsg import Bcast, Resp log = getLogger("gps303/collector") -class Bcast: - """Zmq message to broadcast what was received from the terminal""" - - def __init__(self, imei, msg): - self.as_bytes = ( - pack("B", proto_of_message(msg)) - + ("0000000000000000" if imei is None else imei).encode() - + msg - ) - - -class Resp: - """Zmq message received from a third party to send to the terminal""" - - def __init__(self, *args, **kwargs): - if not kwargs and len(args) == 1 and isinstance(args[0], bytes): - self.imei = msg[:16].decode() - self.payload = msg[16:] - elif len(args) == 0: - self.imei = kwargs["imei"] - self.payload = kwargs["payload"] - - class Client: """Connected socket to the terminal plus buffer and metadata""" @@ -95,7 +79,7 @@ class Client: log.info( "LOGIN from fd %d (IMEI %s)", self.sock.fileno(), self.imei ) - msgs.append(packet) + msgs.append((when, self.addr, packet)) return msgs def send(self, buffer): @@ -134,15 +118,15 @@ class Clients: if msgs is None: return None result = [] - for msg in msgs: - if proto_of_message(msg) == LOGIN.PROTO: # Could do blindly... + for when, peeraddr, packet in msgs: + if proto_of_message(packet) == LOGIN.PROTO: # Could do blindly... self.by_imei[clnt.imei] = clnt - result.append((clnt.imei, msg)) + result.append((clnt.imei, when, peeraddr, packet)) return result def response(self, resp): if resp.imei in self.by_imei: - self.by_imei[resp.imei].send(resp.payload) + self.by_imei[resp.imei].send(resp.packet) def runserver(conf): @@ -151,7 +135,7 @@ def runserver(conf): zpub.bind(conf.get("collector", "publishurl")) zsub = zctx.socket(zmq.SUB) zsub.connect(conf.get("collector", "listenurl")) - tcpl = socket(AF_INET, SOCK_STREAM) + tcpl = socket(AF_INET6, SOCK_STREAM) tcpl.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) tcpl.bind(("", conf.getint("collector", "port"))) tcpl.listen(5) @@ -178,18 +162,36 @@ def runserver(conf): clntsock, clntaddr = tcpl.accept() topoll.append((clntsock, clntaddr)) else: - for imei, msg in clients.recv(sk): - zpub.send(Bcast(imei, msg).as_bytes) - if ( - msg is None - or proto_of_message(msg) == HIBERNATION.PROTO - ): - log.debug( - "HIBERNATION from fd %d (IMEI %s)", sk, imei + received = clients.recv(sk) + if received is None: + log.debug( + "Terminal gone from fd %d (IMEI %s)", sk, imei + ) + tostop.append(sk) + else: + for imei, when, peeraddr, packet in received: + proto = proto_of_message(packet) + zpub.send( + Bcast( + proto=proto, + imei=imei, + when=when, + peeraddr=peeraddr, + packet=packet, + ).packed ) - tostop.append(sk) - elif proto_of_message(msg) == LOGIN.PROTO: - clients.response(Resp(imei=imei, payload=LOGIN.response())) + if proto == HIBERNATION.PROTO: + log.debug( + "HIBERNATION from fd %d (IMEI %s)", + sk, + imei, + ) + tostop.append(sk) + respmsg = inline_response(packet) + if respmsg is not None: + clients.response( + Resp(imei=imei, packet=respmsg) + ) # poll queue consumed, make changes now for fd in tostop: poller.unregister(fd)