1 from getopt import getopt
2 from logging import getLogger, StreamHandler, DEBUG, INFO
3 from logging.handlers import SysLogHandler
4 from socket import socket, AF_INET, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR
9 from .config import readconfig
10 from .GT06mod import handle_packet, make_response, LOGIN, set_config
12 CONF = "/etc/gps303.conf"
14 log = getLogger("gps303/collector")
18 def __init__(self, imei, msg):
19 self.as_bytes = imei.encode() + msg.encode()
23 def __init__(self, msg):
24 self.imei = msg[:16].decode()
25 self.payload = msg[16:]
29 def __init__(self, clntsock, clntaddr):
30 self.clntsock = clntsock
31 self.clntaddr = clntaddr
39 packet = self.clntsock.recv(4096)
44 # implement framing properly
45 msg = handle_packet(packet, self.clntaddr, when)
46 self.buffer = self.buffer[len(packet):]
47 if isinstance(msg, LOGIN):
51 def send(self, buffer):
52 self.clntsock.send(buffer)
60 def add(self, clntsock, clntaddr):
61 fd = clntsock.fileno()
62 self.by_fd[fd] = Client(clntsock, clntaddr)
69 del self.by_imei[clnt.imei]
75 if isinstance(msg, LOGIN):
76 self.by_imei[clnt.imei] = clnt
79 def response(self, zmsg):
80 if zmsg.imei in self.by_imei:
81 clnt = self.by_imei[zmsg.imei].send(zmsg.payload)
84 def runserver(opts, conf):
86 zpub = zctx.socket(zmq.PUB)
87 zpub.bind(conf.get("collector", "publishurl"))
88 zsub = zctx.socket(zmq.SUB)
89 zsub.connect(conf.get("collector", "listenurl"))
90 tcpl = socket(AF_INET, SOCK_STREAM)
91 tcpl.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
92 tcpl.bind(("", conf.getint("collector", "port")))
96 poller.register(zsub, flags=zmq.POLLIN)
97 poller.register(tcpfd, flags=zmq.POLLIN)
104 events = poller.poll(10)
105 for sk, fl in events:
109 msg = zsub.recv(zmq.NOBLOCK)
110 tosend.append(Zmsg(msg))
114 clntsock, clntaddr = ctlsock.accept()
115 topoll.append((clntsock, clntaddr))
117 imei, msg = clients.recv(sk)
118 zpub.send(Bcast(imei, msg).as_bytes)
119 if msg is None or isinstance(msg, HIBERNATION):
121 # poll queue consumed, make changes now
124 pollset.unregister(fd)
126 clients.response(zmsg)
127 for clntsock, clntaddr in topoll:
128 fd = clients.add(clntsock, clntaddr)
130 except KeyboardInterrupt:
134 if __name__.endswith("__main__"):
135 opts, _ = getopt(sys.argv[1:], "c:d")
137 conf = readconfig(opts["-c"] if "-c" in opts else CONF)
138 if sys.stdout.isatty():
139 log.addHandler(StreamHandler(sys.stderr))
141 log.addHandler(SysLogHandler(address="/dev/log"))
142 log.setLevel(DEBUG if "-d" in opts else INFO)
143 log.info("starting with options: %s", opts)
144 runserver(opts, conf)