-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathprotocol.py
More file actions
143 lines (98 loc) · 3.42 KB
/
protocol.py
File metadata and controls
143 lines (98 loc) · 3.42 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
import sys
class MQTT:
def __init__(self):
self.name = "mqtt"
self.num = 1
self.qos_list = ["0", "1", "2"]
self.max_message_length = TCP_MAX_MESSAGE_LENGTH
self.publish_flags_enabled = True
def is_tcp(self):
return True
def is_udp(self):
return False
class MQTT_SN:
def __init__(self):
self.name = "mqttsn"
self.num = 2
self.qos_list = ["0", "1", "2"]
self.max_message_length = UDP_MAX_MESSAGE_LENGTH
self.publish_flags_enabled = True
def is_tcp(self):
return False
def is_udp(self):
return True
class COAP:
def __init__(self):
self.name = "coap"
self.num = 3
self.qos_list = ["0", "1"]
self.max_message_length = UDP_MAX_MESSAGE_LENGTH
self.publish_flags_enabled = False
def is_tcp(self):
return False
def is_udp(self):
return True
class WEBSOCKET:
def __init__(self):
self.name = "websocket"
self.num = 4
self.qos_list = ["0", "1", "2"]
self.max_message_length = TCP_MAX_MESSAGE_LENGTH
self.publish_flags_enabled = True
def is_tcp(self):
return True
def is_udp(self):
return False
class AMQP:
def __init__(self):
self.name = "amqp"
self.num = 5
self.qos_list = ["1"]
self.max_message_length = TCP_MAX_MESSAGE_LENGTH
self.publish_flags_enabled = False
def is_tcp(self):
return True
def is_udp(self):
return False
TCP_MAX_MESSAGE_LENGTH = sys.maxsize
UDP_MAX_MESSAGE_LENGTH = 1400
__protocols = [MQTT(), MQTT_SN(), COAP(), WEBSOCKET(), AMQP()]
def protocol_names():
return [p.name for p in __protocols]
def get_protocol_name(num):
for protocol in __protocols:
if protocol.num == num:
return protocol.name
raise ValueError("invalid protocol value: " + num)
def get_protocol_num(name):
for protocol in __protocols:
if protocol.name == name:
return protocol.num
raise ValueError("invalid protocol value: " + name)
def is_tcp(protocol_val):
for protocol in __protocols:
if protocol.name == protocol_val or protocol.num == protocol_val:
return protocol.is_tcp()
raise ValueError("invalid protocol value: " + protocol_val)
def is_udp(protocol_val):
for protocol in __protocols:
if protocol.name == protocol_val or protocol.num == protocol_val:
return protocol.is_udp()
raise ValueError("invalid protocol value: " + protocol_val)
def qos_list(protocol_val):
for protocol in __protocols:
if protocol.name == protocol_val or protocol.num == protocol_val:
return protocol.qos_list
raise ValueError("invalid protocol value: " + protocol_val)
def is_message_length_valid(protocol_val, msg_length):
return msg_length <= get_max_message_length(protocol_val)
def get_max_message_length(protocol_val):
for protocol in __protocols:
if protocol.name == protocol_val or protocol.num == protocol_val:
return protocol.max_message_length
raise ValueError("invalid protocol value: " + protocol_val)
def publish_flags_enabled(protocol_val):
for protocol in __protocols:
if protocol.name == protocol_val or protocol.num == protocol_val:
return protocol.publish_flags_enabled
raise ValueError("invalid protocol value: " + protocol_val)