+""" TCP server that communicates with terminals """
+
from getopt import getopt
from logging import getLogger, StreamHandler, DEBUG, INFO
from logging.handlers import SysLogHandler
import zmq
from .config import readconfig
-from .GT06mod import handle_packet, make_response, LOGIN, set_config
+from .gps303proto import handle_packet, make_response, LOGIN, set_config
CONF = "/etc/gps303.conf"
class Bcast:
+ """Zmq message to broadcast what was received from the terminal"""
def __init__(self, imei, msg):
self.as_bytes = imei.encode() + msg.encode()
-class Zmsg:
+class Resp:
+ """Zmq message received from a third party to send to the terminal"""
def __init__(self, msg):
self.imei = msg[:16].decode()
self.payload = msg[16:]
class Client:
- def __init__(self, clntsock, clntaddr):
- self.clntsock = clntsock
- self.clntaddr = clntaddr
+ """Connected socket to the terminal plus buffer and metadata"""
+ def __init__(self, sock, addr):
+ self.sock = sock
+ self.addr = addr
self.buffer = b""
self.imei = None
def close(self):
- self.clntsock.close()
+ log.debug("Closing fd %d (IMEI %s)", self.sock.fileno(), self.imei)
+ self.sock.close()
+ self.buffer = b""
+ self.imei = None
def recv(self):
- packet = self.clntsock.recv(4096)
- if not packet:
+ """ Read from the socket and parse complete messages """
+ try:
+ segment = self.sock.recv(4096)
+ except OSError:
+ log.warning("Reading from fd %d (IMEI %s): %s",
+ self.sock.fileno(), self.imei, e)
+ return None
+ if not segment: # Terminal has closed connection
+ log.info("EOF reading from fd %d (IMEI %s)",
+ self.sock.fileno(), self.imei)
return None
when = time()
- self.buffer += packet
- # implement framing properly
- msg = handle_packet(packet, self.clntaddr, when)
- self.buffer = self.buffer[len(packet):]
- if isinstance(msg, LOGIN):
- self.imei = msg.imei
- return msg
+ self.buffer += segment
+ msgs = []
+ while True:
+ framestart = self.buffer.find(b"xx")
+ if framestart == -1: # No frames, return whatever we have
+ break
+ if framestart > 0: # Should not happen, report
+ log.warning("Undecodable data \"%s\" from fd %d (IMEI %s)",
+ self.buffer[:framestart].hex(), self.sock.fileno(), self.imei)
+ self.buffer = self.buffer[framestart:]
+ # At this point, buffer starts with a packet
+ frameend = self.buffer.find(b"\r\n", 4)
+ if frameend == -1: # Incomplete frame, return what we have
+ break
+ msg = parse_message(self.buffer[:frameend])
+ self.buffer = self.buffer[frameend+2:]
+ if isinstance(msg, LOGIN):
+ self.imei = msg.imei
+ log.info("LOGIN from fd %d: IMEI %s",
+ self.sock.fileno(), self.imei)
+ msgs.append(msg)
+ return msgs
def send(self, buffer):
- self.clntsock.send(buffer)
-
+ try:
+ self.sock.send(b"xx" + buffer + b"\r\n")
+ except OSError as e:
+ log.error("Sending to fd %d (IMEI %s): %s",
+ self.sock.fileno, self.imei, e)
class Clients:
def __init__(self):
def stop(self, fd):
clnt = by_fd[fd]
+ log.info("Stop serving fd %d (IMEI %s)", clnt.sock.fileno(), clnt.imei)
clnt.close()
if clnt.imei:
del self.by_imei[clnt.imei]
def recv(self, fd):
clnt = by_fd[fd]
- msg = clnt.recv()
- if isinstance(msg, LOGIN):
- self.by_imei[clnt.imei] = clnt
- return clnt.imei, msg
+ msgs = clnt.recv()
+ result = []
+ for msg in msgs:
+ if isinstance(msg, LOGIN):
+ self.by_imei[clnt.imei] = clnt
+ result.append(clnt.imei, msg)
+ return result
- def response(self, zmsg):
- if zmsg.imei in self.by_imei:
- clnt = self.by_imei[zmsg.imei].send(zmsg.payload)
+ def response(self, resp):
+ if resp.imei in self.by_imei:
+ self.by_imei[resp.imei].send(resp.payload)
def runserver(opts, conf):
while True:
try:
msg = zsub.recv(zmq.NOBLOCK)
- tosend.append(Zmsg(msg))
+ tosend.append(Resp(msg))
except zmq.Again:
break
elif sk == tcpfd:
imei, msg = clients.recv(sk)
zpub.send(Bcast(imei, msg).as_bytes)
if msg is None or isinstance(msg, HIBERNATION):
+ log.debug("HIBERNATION from fd %d", sk)
tostop.append(sk)
# poll queue consumed, make changes now
for fd in tostop: