log.debug("Closing fd %d (IMEI %s)", self.sock.fileno(), self.imei)
self.sock.close()
self.buffer = b""
- self.imei = None
def recv(self):
"""Read from the socket and parse complete messages"""
def response(self, resp):
if resp.imei in self.by_imei:
self.by_imei[resp.imei].send(resp.packet)
+ else:
+ log.info("Not connected (IMEI %s)", resp.imei)
def runserver(conf):
zctx = zmq.Context()
zpub = zctx.socket(zmq.PUB)
zpub.bind(conf.get("collector", "publishurl"))
- zsub = zctx.socket(zmq.SUB)
- zsub.connect(conf.get("collector", "listenurl"))
+ zpull = zctx.socket(zmq.PULL)
+ zpull.bind(conf.get("collector", "listenurl"))
tcpl = socket(AF_INET6, SOCK_STREAM)
tcpl.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
tcpl.bind(("", conf.getint("collector", "port")))
tcpl.listen(5)
tcpfd = tcpl.fileno()
poller = zmq.Poller()
- poller.register(zsub, flags=zmq.POLLIN)
+ poller.register(zpull, flags=zmq.POLLIN)
poller.register(tcpfd, flags=zmq.POLLIN)
clients = Clients()
try:
tosend = []
topoll = []
tostop = []
- events = poller.poll(10)
+ events = poller.poll(1000)
for sk, fl in events:
- if sk is zsub:
+ if sk is zpull:
while True:
try:
- msg = zsub.recv(zmq.NOBLOCK)
+ msg = zpull.recv(zmq.NOBLOCK)
tosend.append(Resp(msg))
except zmq.Again:
break
elif sk == tcpfd:
clntsock, clntaddr = tcpl.accept()
topoll.append((clntsock, clntaddr))
- else:
+ elif fl & zmq.POLLIN:
received = clients.recv(sk)
if received is None:
log.debug(
clients.response(
Resp(imei=imei, packet=respmsg)
)
+ else:
+ log.debug("Stray event: %s on socket %s", fl, sk)
# poll queue consumed, make changes now
for fd in tostop:
poller.unregister(fd)
clients.stop(fd)
for zmsg in tosend:
+ log.debug("Sending to the client: %s", zmsg)
clients.response(zmsg)
for clntsock, clntaddr in topoll:
fd = clients.add(clntsock, clntaddr)
- poller.register(fd)
+ poller.register(fd, flags=zmq.POLLIN)
except KeyboardInterrupt:
pass
class GPS303Pkt:
PROTO: int
+ INLINE = True
def __init__(self, *args, **kwargs):
assert len(args) == 0
@classmethod
def inline_response(cls, packet):
- return cls.make_packet(b"")
+ if cls.INLINE:
+ return cls.make_packet(b"")
+ else:
+ return None
class UNKNOWN(GPS303Pkt):
PROTO = 256 # > 255 is impossible in real packets
-
- @classmethod
- def inline_response(cls, packet):
- return None
+ INLINE = False
class LOGIN(GPS303Pkt):
self.ver = unpack("B", payload[-1:])[0]
return self
- def response(self):
- return self.make_packet(b"")
-
class SUPERVISION(GPS303Pkt): # Server sends supervision number status
PROTO = 0x05
+ INLINE = False
def response(self, supnum=0):
# 1: The device automatically answers Pickup effect
# 2: Automatically Answering Two-way Calls
# 3: Ring manually answer the two-way call
- return super().response(b"")
+ return self.make_packet(pack("B", supnum))
class HEARTBEAT(GPS303Pkt):
def inline_response(cls, packet):
return cls.make_packet(packet[2:8])
- def response(self):
- return self.make_packet(self.dtime)
-
class GPS_POSITIONING(_GPS_POSITIONING):
PROTO = 0x10
class STATUS(GPS303Pkt):
PROTO = 0x13
+ INLINE = False
@classmethod
def from_packet(cls, length, payload):
self.signal = None
return self
- @classmethod
- def inline_response(cls, packet):
- return None
-
def response(self, upload_interval=25): # Set interval in minutes
- return super().response(pack("B", upload_interval))
+ return self.make_packet(pack("B", upload_interval))
class HIBERNATION(GPS303Pkt):
class RESET(GPS303Pkt): # Device sends when it got reset SMS
PROTO = 0x15
+ INLINE = False
def response(self): # Server can send to initiate factory reset
return self.make_packet(b"")
class WHITELIST_TOTAL(GPS303Pkt): # Server sends to initiage sync (0x58)
PROTO = 0x16
+ INLINE = False
def response(self, number=3): # Number of whitelist entries
- return super().response(pack("B", number))
+ return self.make_packet(pack("B", number))
class _WIFI_POSITIONING(GPS303Pkt):
def inline_response(cls, packet):
return cls.make_packet(packet[2:8])
- def response(self):
- return super().response(self.dtime)
-
class TIME(GPS303Pkt):
PROTO = 0x30
"!BBHBBBBB", 7, cls.PROTO, *datetime.utcnow().timetuple()[:6]
)
- def response(self):
- payload = pack("!HBBBBB", *datetime.utcnow().timetuple()[:6])
- return super().response(payload)
-
class PROHIBIT_LBS(GPS303Pkt):
PROTO = 0x33
+ INLINE = False
def response(self, status=1): # Server sent, 0-off, 1-on
- return super().response(pack("B", status))
+ return self.make_packet(pack("B", status))
class MOM_PHONE(GPS303Pkt):
class STOP_UPLOAD(GPS303Pkt): # Server response to LOGIN to thwart the device
PROTO = 0x44
- def response(self):
- return super().response(b"")
-
class STOP_ALARM(GPS303Pkt):
PROTO = 0x56
class SETUP(GPS303Pkt):
PROTO = 0x57
+ INLINE = False
def response(
self,
]
+ [b";".join([el.encode() for el in phoneNumbers])]
)
- return super().response(payload)
+ return self.make_packet(payload)
class SYNCHRONOUS_WHITELIST(GPS303Pkt):
class WIFI_POSITIONING(_WIFI_POSITIONING):
PROTO = 0x69
-
- @classmethod
- def inline_response(cls, packet):
- return None
+ INLINE = False
def response(self, lat=None, lon=None):
if lat is None or lon is None:
class POSITION_UPLOAD_INTERVAL(GPS303Pkt):
PROTO = 0x98
+ INLINE = False
@classmethod
def from_packet(cls, length, payload):
self.interval = unpack("!H", payload[:2])
return self
- @classmethod
- def inline_response(cls, packet):
- return cls.make_packet(packet[2:4])
-
- def response(self):
- return self.make_packet(pack("!H", self.interval))
+ def response(self, interval=10):
+ return self.make_packet(pack("!H", interval))
class SOS_ALARM(GPS303Pkt):
-"""
-For when responding to the terminal is not trivial
-"""
+""" Estimate coordinates from WIFI_POSITIONING and send back """
-from .gps303proto import *
+from datetime import datetime, timezone
+from logging import getLogger
+from struct import pack
+import zmq
+
+from . import common
+from .gps303proto import parse_message, proto_by_name, WIFI_POSITIONING
from .opencellid import qry_cell
+from .zmsg import Bcast, Resp
+
+log = getLogger("gps303/lookaside")
+
+
+def runserver(conf):
+ zctx = zmq.Context()
+ zsub = zctx.socket(zmq.SUB)
+ zsub.connect(conf.get("collector", "publishurl"))
+ topic = pack("B", proto_by_name("WIFI_POSITIONING"))
+ zsub.setsockopt(zmq.SUBSCRIBE, topic)
+ zpush = zctx.socket(zmq.PUSH)
+ zpush.connect(conf.get("collector", "listenurl"))
+
+ try:
+ while True:
+ zmsg = Bcast(zsub.recv())
+ msg = parse_message(zmsg.packet)
+ log.debug(
+ "IMEI %s from %s at %s: %s",
+ zmsg.imei,
+ zmsg.peeraddr,
+ datetime.fromtimestamp(zmsg.when).astimezone(tz=timezone.utc),
+ msg,
+ )
+ if not isinstance(msg, WIFI_POSITIONING):
+ log.error(
+ "IMEI %s from %s at %s: %s",
+ zmsg.imei,
+ zmsg.peeraddr,
+ datetime.fromtimestamp(zmsg.when).astimezone(
+ tz=timezone.utc
+ ),
+ msg,
+ )
+ continue
+ lat, lon = qry_cell(
+ conf["opencellid"]["dbfn"], msg.mcc, msg.gsm_cells
+ )
+ resp = Resp(imei=zmsg.imei, packet=msg.response(lat=lat, lon=lon))
+ log.debug("Response for lat=%s, lon=%s: %s", lat, lon, resp)
+ zpush.send(resp.packed)
+
+ except KeyboardInterrupt:
+ pass
+
-def prepare_response(conf, msg):
- if isinstance(msg, WIFI_POSITIONING):
- lat, lon = qry_cell(conf["opencellid"]["dbfn"],
- msg.mcc, msg.gsm_cells)
- return {"lat": lat, "lon": lon}
- return {}
+if __name__.endswith("__main__"):
+ runserver(common.init(log))
--- /dev/null
+""" For when responding to the terminal is not trivial """
+
+from datetime import datetime, timezone
+from logging import getLogger
+from struct import pack
+import zmq
+
+from . import common
+from .gps303proto import parse_message, proto_by_name
+from .zmsg import Bcast, Resp
+
+log = getLogger("gps303/termconfig")
+
+
+def runserver(conf):
+ zctx = zmq.Context()
+ zsub = zctx.socket(zmq.SUB)
+ zsub.connect(conf.get("collector", "publishurl"))
+ for protoname in (
+ "SUPERVISION",
+ "STATUS",
+ "RESET",
+ "WHITELIST_TOTAL",
+ "PROHIBIT_LBS",
+ "SETUP",
+ "POSITION_UPLOAD_INTERVAL",
+ ):
+ topic = pack("B", proto_by_name(protoname))
+ zsub.setsockopt(zmq.SUBSCRIBE, topic)
+ zpush = zctx.socket(zmq.PUSH)
+ zpush.connect(conf.get("collector", "listenurl"))
+
+ try:
+ while True:
+ zmsg = Bcast(zsub.recv())
+ msg = parse_message(zmsg.packet)
+ log.debug(
+ "IMEI %s from %s at %s: %s",
+ zmsg.imei,
+ zmsg.peeraddr,
+ datetime.fromtimestamp(zmsg.when).astimezone(tz=timezone.utc),
+ msg,
+ )
+ # TODO get data from the config
+ resp = Resp(imei=zmsg.imei, packet=msg.response())
+ log.debug("Response: %s", resp)
+ zpush.send(resp.packed)
+
+ except KeyboardInterrupt:
+ pass
+
+
+if __name__.endswith("__main__"):
+ runserver(common.init(log))
__all__ = "Bcast", "Resp"
+
def pack_peer(peeraddr):
saddr, port, _x, _y = peeraddr
addr6 = ip.ip_address(saddr)
addr = addr6.ipv4_mapped
if addr is None:
addr = addr6
- return pack("B", addr.version) + (addr.packed + b"\0\0\0\0\0\0\0\0\0\0\0\0")[:16] + pack("!H", port)
+ return (
+ pack("B", addr.version)
+ + (addr.packed + b"\0\0\0\0\0\0\0\0\0\0\0\0")[:16]
+ + pack("!H", port)
+ )
+
def unpack_peer(buffer):
version = buffer[0]
+ str(kwargs)
)
+ def __repr__(self):
+ return "{}({})".format(
+ self.__class__.__name__,
+ ", ".join(
+ [
+ "{}={}".format(
+ k,
+ 'bytes.fromhex("{}")'.format(getattr(self, k).hex())
+ if isinstance(getattr(self, k), bytes)
+ else getattr(self, k),
+ )
+ for k, _ in self.KWARGS
+ ]
+ ),
+ )
+
def decode(self, buffer):
raise RuntimeError(
self.__class__.__name__ + "must implement `encode()` method"
return (
pack("B", self.proto)
+ ("0000000000000000" if self.imei is None else self.imei).encode()
- + (b"\0\0\0\0\0\0\0\0" if self.when is None else pack("!d", self.when))
+ + (
+ b"\0\0\0\0\0\0\0\0"
+ if self.when is None
+ else pack("!d", self.when)
+ )
+ pack_peer(self.peeraddr)
+ self.packet
)