parse_message,
proto_of_message,
)
+from .zmsg import Bcast, Resp
log = getLogger("gps303/collector")
-class Bcast:
- """Zmq message to broadcast what was received from the terminal"""
-
- def __init__(self, imei, msg):
- self.as_bytes = (
- pack("B", proto_of_message(msg))
- + ("0000000000000000" if imei is None else imei).encode()
- + msg
- )
-
-
-class Resp:
- """Zmq message received from a third party to send to the terminal"""
-
- def __init__(self, *args, **kwargs):
- if not kwargs and len(args) == 1 and isinstance(args[0], bytes):
- self.imei = msg[:16].decode()
- self.payload = msg[16:]
- elif len(args) == 0:
- self.imei = kwargs["imei"]
- self.payload = kwargs["payload"]
-
-
class Client:
"""Connected socket to the terminal plus buffer and metadata"""
log.info(
"LOGIN from fd %d (IMEI %s)", self.sock.fileno(), self.imei
)
- msgs.append(packet)
+ msgs.append((when, self.addr, packet))
return msgs
def send(self, buffer):
if msgs is None:
return None
result = []
- for msg in msgs:
- if proto_of_message(msg) == LOGIN.PROTO: # Could do blindly...
+ for when, peeraddr, packet in msgs:
+ if proto_of_message(packet) == LOGIN.PROTO: # Could do blindly...
self.by_imei[clnt.imei] = clnt
- result.append((clnt.imei, msg))
+ result.append((clnt.imei, when, peeraddr, packet))
return result
def response(self, resp):
if resp.imei in self.by_imei:
- self.by_imei[resp.imei].send(resp.payload)
+ self.by_imei[resp.imei].send(resp.packet)
def runserver(conf):
)
tostop.append(sk)
else:
- for imei, msg in received:
- zpub.send(Bcast(imei, msg).as_bytes)
- if proto_of_message(msg) == HIBERNATION.PROTO:
+ for imei, when, peeraddr, packet in received:
+ proto = proto_of_message(packet)
+ zpub.send(
+ Bcast(
+ proto=proto,
+ imei=imei,
+ when=when,
+ peeraddr=peeraddr,
+ packet=packet,
+ ).packed
+ )
+ if proto == HIBERNATION.PROTO:
log.debug(
"HIBERNATION from fd %d (IMEI %s)",
sk,
imei,
)
tostop.append(sk)
- respmsg = inline_response(msg)
+ respmsg = inline_response(packet)
if respmsg is not None:
clients.response(
- Resp(imei=imei, payload=respmsg)
+ Resp(imei=imei, packet=respmsg)
)
# poll queue consumed, make changes now
for fd in tostop:
""" Store zmq broadcasts to sqlite """
+from datetime import datetime, timezone
from getopt import getopt
from logging import getLogger
from logging.handlers import SysLogHandler
from . import common
from .evstore import initdb, stow
from .gps303proto import parse_message
+from .zmsg import Bcast
log = getLogger("gps303/storage")
+
def runserver(conf):
dbname = conf.get("storage", "dbfn")
log.info('Using Sqlite3 database "%s"', dbname)
try:
while True:
- zmsg = zsub.recv()
- imei = zmsg[1:17].decode()
- packet = zmsg[17:]
- msg = parse_message(packet)
- log.debug("From IMEI %s: %s", imei, msg)
- stow("", time(), imei, msg.length, msg.PROTO, msg.payload)
+ zmsg = Bcast(zsub.recv())
+ msg = parse_message(zmsg.packet)
+ log.debug("IMEI %s from %s at %s: %s", zmsg.imei, zmsg.peeraddr, datetime.fromtimestamp(zmsg.when).astimezone(tz=timezone.utc), msg)
+ stow(
+ zmsg.peeraddr,
+ zmsg.when,
+ zmsg.imei,
+ msg.length,
+ msg.PROTO,
+ msg.payload,
+ )
except KeyboardInterrupt:
pass
--- /dev/null
+""" Zeromq messages """
+
+import ipaddress as ip
+from struct import pack, unpack
+
+__all__ = "Bcast", "Resp"
+
+def pack_peer(peeraddr):
+ saddr, port = peeraddr
+ addr = ip.ip_address(saddr)
+ return pack("B", addr.version) + (addr.packed + b"\0\0\0\0\0\0\0\0\0\0\0\0")[:16] + pack("!H", port)
+
+def unpack_peer(buffer):
+ version = buffer[0]
+ if version not in (4, 6):
+ return None
+ if version == 4:
+ addr = ip.IPv4Address(buffer[1:5])
+ else:
+ addr = ip.IPv6Address(buffer[1:17])
+ port = unpack("!H", buffer[17:19])[0]
+ return (addr, port)
+
+
+class _Zmsg:
+ def __init__(self, *args, **kwargs):
+ if len(args) == 1:
+ self.decode(args[0])
+ elif bool(kwargs):
+ for k, v in self.KWARGS:
+ setattr(self, k, kwargs.get(k, v))
+ else:
+ raise RuntimeError(
+ self.__class__.__name__
+ + ": both args "
+ + str(args)
+ + " and kwargs "
+ + str(kwargs)
+ )
+
+ def decode(self, buffer):
+ raise RuntimeError(
+ self.__class__.__name__ + "must implement `encode()` method"
+ )
+
+ @property
+ def packed(self):
+ raise RuntimeError(
+ self.__class__.__name__ + "must implement `encode()` method"
+ )
+
+
+class Bcast(_Zmsg):
+ """Zmq message to broadcast what was received from the terminal"""
+
+ KWARGS = (
+ ("proto", 256),
+ ("imei", None),
+ ("when", None),
+ ("peeraddr", None),
+ ("packet", b""),
+ )
+
+ @property
+ def packed(self):
+ return (
+ pack("B", self.proto)
+ + ("0000000000000000" if self.imei is None else self.imei).encode()
+ + (b"\0\0\0\0\0\0\0\0" if self.when is None else pack("!d", self.when))
+ + pack_peer(self.peeraddr)
+ + self.packet
+ )
+
+ def decode(self, buffer):
+ self.proto = buffer[0]
+ self.imei = buffer[1:17].decode()
+ if self.imei == "0000000000000000":
+ self.imei = None
+ self.when = unpack("!d", buffer[17:25])[0]
+ self.peeraddr = unpack_peer(buffer[25:44])
+ self.packet = buffer[44:]
+
+
+class Resp(_Zmsg):
+ """Zmq message received from a third party to send to the terminal"""
+
+ KWARGS = (("imei", None), ("packet", b""))
+
+ @property
+ def packed(self):
+ return (
+ "0000000000000000" if self.imei is None else self.imei.encode()
+ ) + self.packet
+
+ def decode(self, buffer):
+ self.imei = buffer[:16].decode()
+ self.packet = buffer[16:]