from . import common
from .gps303proto import (
GPS303Conn,
- HIBERNATION,
- LOGIN,
+ is_goodbye_packet,
+ imei_from_packet,
inline_response,
parse_message,
proto_of_message,
e,
)
- def set_imei(self, imei: str) -> None:
- self.imei = imei
-
class Clients:
def __init__(self) -> None:
return None
result = []
for when, peeraddr, packet in msgs:
- if proto_of_message(packet) == LOGIN.PROTO:
- msg = parse_message(packet)
- if isinstance(msg, LOGIN): # Can be unparseable
- if clnt.imei is None:
- clnt.imei = msg.imei
+ if clnt.imei is None:
+ imei = imei_from_packet(packet)
+ if imei is not None:
+ log.info("LOGIN from fd %d (IMEI %s)", fd, imei)
+ clnt.imei = imei
+ oldclnt = self.by_imei.get(clnt.imei)
+ if oldclnt is not None:
log.info(
- "LOGIN from fd %d (IMEI %s)",
- clnt.sock.fileno(),
- clnt.imei,
+ "Orphaning fd %d with the same IMEI",
+ oldclnt.sock.fileno(),
)
- oldclnt = self.by_imei.get(clnt.imei)
- if oldclnt is not None:
- log.info(
- "Orphaning fd %d with the same IMEI",
- oldclnt.sock.fileno(),
- )
- oldclnt.imei = None
+ oldclnt.imei = None
self.by_imei[clnt.imei] = clnt
else:
log.warning(
packet=packet,
).packed
)
- if proto == HIBERNATION.PROTO and handle_hibernate:
+ if is_goodbye_packet(packet) and handle_hibernate:
log.debug(
- "HIBERNATION from fd %d (IMEI %s)",
+ "Goodbye from fd %d (IMEI %s)",
sk,
imei,
)
return packet[1]
+def imei_from_packet(packet: bytes) -> Optional[str]:
+ if proto_of_message(packet) == LOGIN.PROTO:
+ msg = parse_message(packet)
+ if isinstance(msg, LOGIN):
+ return msg.imei
+ return None
+
+
+def is_goodbye_packet(packet: bytes) -> bool:
+ return proto_of_message(packet) == HIBERNATION.PROTO
+
+
def inline_response(packet: bytes) -> Optional[bytes]:
proto = proto_of_message(packet)
if proto in CLASSES: