Conversation
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #4334 +/- ##
==========================================
- Coverage 81.90% 79.74% -2.17%
==========================================
Files 330 353 +23
Lines 76380 91834 +15454
==========================================
+ Hits 62561 73229 +10668
- Misses 13819 18605 +4786
🚀 New features to boost your workflow:
|
245e388 to
7ee64b2
Compare
|
Any idea on how I can fix these failed tests? |
|
Tests are failing on versions older than 3.9. You're using 3.9+ specific features. Scapy should work with anything starting from 3.7, so this probably needs to be updated. |
Fixing response payload Some docstring and bug fixes. Finished CoAP server logic implementation Added client interaction Client/Server done. Added delayed response handling Fixing small problems Unit tests Documentation
|
@gpotter2 Thanks for the insights, it is ready for reviews :D |
- Moved the defines/enumerators to coap.py - Changed the send() function to match the SuperSocket declaration - Updated unit tests
|
Could you please implement a select function for your socket, so that sr, sr1 and sniff will work. Could you please also provide unit tests for these functions. |
|
Hello, thanks for the review, I updated the code to include a select function, I'm not sure about the error in the macos tests. About the changes, I'm not sure if those can be considered "correct", it feels like workarounds. Because:
Is there a better way of doing this? Thanks so far :) |
|
Thanks for the update. I'll review soon. |
| def hashret(self): | ||
| return struct.pack('I', self.msg_id) + self.token | ||
|
|
||
| def answers(self, other): |
There was a problem hiding this comment.
Wouldn't it make sense to implement the answers function based on coap_codes / the field "code"
There was a problem hiding this comment.
I added a simple verification where any kind of answer is accepted (within the response codes in the RFC's section 5.9), except if you are trying to answer with another request.
I did this way because I don't want to put too much constraints for the user.
scapy/contrib/coap_socket.py
Outdated
| return len(x) | ||
|
|
||
| def sr(self, *args, **kargs): | ||
| args[0].sport = self.impl.port |
There was a problem hiding this comment.
since you want to operate on the packet you get here, I suggest to change the function signature:
def sr(pkt, *args, **kargs):
pkt[UDP].sport = ...
...
There was a problem hiding this comment.
Hopefully I got this correct, did you mean:
def sr(self, pkt, *args, **kargs):
pkt[UDP] ...
There was a problem hiding this comment.
Looks good. Unfortunately I’m not fully done with the review. I try to find some time as soon as possible.
- Implemented coap.answers function - Fixed some types - Remove unnecessary override - Changed sr and sr1 functions signatures.
There was a problem hiding this comment.
Pull request overview
This PR implements a CoAP (Constrained Application Protocol) socket with client and server capabilities for Scapy, following RFC 7252. The implementation is modeled after the existing ISOTPSoftSocket and provides features including congestion control, retransmission mechanism, separate responses, and message duplication detection.
Changes:
- Added CoAP constants and helper methods (hashret/answers) to enable CoAP packet matching in send/receive operations
- Implemented CoAPSocket and CoAPSocketImpl classes providing client/server functionality with automatic retransmission and resource handling
- Added comprehensive unit tests covering basic client-server interactions, retransmission, delayed responses, and sr/sr1 operations
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 30 comments.
| File | Description |
|---|---|
| scapy/contrib/coap.py | Added CoAP protocol constants (message types, codes, options) and implemented hashret() and answers() methods for proper packet matching in sr/sr1 operations |
| scapy/contrib/coap_socket.py | Main implementation of CoAPSocket with client/server capabilities, resource management, retransmission logic, and separate response handling |
| test/contrib/coap_socket.uts | Unit tests covering resource discovery, basic request/response, error handling, retransmission, delayed responses, and sr/sr1 functionality |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| def _register_request_response(self, message_id, token, response): | ||
| # type: (int, int, dict) -> None | ||
| """Registers a response in case it get lost""" | ||
| if (message_id, token) not in self._duplication_dict.keys(): |
There was a problem hiding this comment.
The .keys() call is redundant when checking membership in a dictionary. Use "not in self._duplication_dict" instead of "not in self._duplication_dict.keys()" for better performance and more idiomatic Python.
| """Check CoAPSocket for the documentation""" | ||
| t = token | ||
| if isinstance(token, int): | ||
| t = token.to_bytes((token.bit_length() + 7) // 8, 'big') |
There was a problem hiding this comment.
Potential issue with token conversion when token is 0. In make_delayed_resp_packet, if token is the integer 0, then token.bit_length() returns 0, and the expression (0 + 7) // 8 evaluates to 0, resulting in an empty bytes object b''. This could be problematic if the original request had a non-empty token with value 0 (e.g., b'\x00'). Consider handling the token=0 case explicitly or ensuring at least 1 byte is used.
| t = token.to_bytes((token.bit_length() + 7) // 8, 'big') | |
| if token == 0: | |
| t = b"\x00" | |
| else: | |
| t = token.to_bytes((token.bit_length() + 7) // 8, 'big') |
| def check_duplicated(self, message_id, token): | ||
| # type: (int, int) -> bool | ||
| """Returns true if (message_id, token) duplicated.""" | ||
| return (message_id, token) in self._duplication_dict.keys() |
There was a problem hiding this comment.
The .keys() call is redundant when checking membership in a dictionary. Use "in self._duplication_dict" instead of "in self._duplication_dict.keys()" for better performance and more idiomatic Python.
| StrLenField("delta_ext", "", length_from=_get_delta_ext_size), | ||
| # noqa: E501 |
There was a problem hiding this comment.
The noqa comment is placed on a separate line rather than at the end of the long line. This breaks the typical pattern where noqa comments are placed inline. Move the comment to the end of line 163 instead of having it on line 164.
| StrLenField("delta_ext", "", length_from=_get_delta_ext_size), | |
| # noqa: E501 | |
| StrLenField("delta_ext", "", length_from=_get_delta_ext_size), # noqa: E501 |
| response["type"] = NON | ||
|
|
||
| # Add paymark (separator between options and payload) | ||
| if "paymark" not in response.keys(): |
There was a problem hiding this comment.
The .keys() call is redundant when checking membership in a dictionary. Use "not in response" instead of "not in response.keys()" for better performance and more idiomatic Python.
| if "paymark" not in response.keys(): | |
| if "paymark" not in response: |
| """Process a received request""" | ||
| coap_pkt = pkt[CoAP] | ||
| req_uri = "/" | ||
| token = int.from_bytes(coap_pkt.token, "big") # Can be up to 8 bytes |
There was a problem hiding this comment.
Potential issue with empty token handling. If coap_pkt.token is empty (b''), int.from_bytes(b'', 'big') returns 0. This means empty tokens and tokens with value 0 are indistinguishable in the duplication checking and request matching logic. Per RFC 7252, empty tokens are valid and should be handled distinctly from tokens with value 0. Consider storing tokens as bytes instead of converting to int, or handle the empty token case explicitly.
| token = int.from_bytes(coap_pkt.token, "big") # Can be up to 8 bytes |
| pkt, sa_ll = self.sock.ins.recvfrom(MTU) | ||
| pkt = (IP(src=sa_ll[0], dst=self.ip) / | ||
| UDP(sport=sa_ll[1], dport=self.port) / | ||
| CoAP(bytes(pkt))) | ||
| if pkt: | ||
| if not self._debug_drop_package(): | ||
| self._on_pkt_recv(pkt, sa_ll) | ||
| self._debug_drop_package_counter = 0 | ||
| else: | ||
| self._debug_drop_package_counter += 1 |
There was a problem hiding this comment.
Missing exception handling for malformed CoAP packets. If the received data cannot be parsed as a valid CoAP packet (line 596), CoAP(bytes(pkt)) could raise an exception that would crash the _recv thread. Consider wrapping the packet parsing and handling in a try-except block to gracefully handle malformed packets.
| pkt, sa_ll = self.sock.ins.recvfrom(MTU) | |
| pkt = (IP(src=sa_ll[0], dst=self.ip) / | |
| UDP(sport=sa_ll[1], dport=self.port) / | |
| CoAP(bytes(pkt))) | |
| if pkt: | |
| if not self._debug_drop_package(): | |
| self._on_pkt_recv(pkt, sa_ll) | |
| self._debug_drop_package_counter = 0 | |
| else: | |
| self._debug_drop_package_counter += 1 | |
| try: | |
| pkt, sa_ll = self.sock.ins.recvfrom(MTU) | |
| pkt = (IP(src=sa_ll[0], dst=self.ip) / | |
| UDP(sport=sa_ll[1], dport=self.port) / | |
| CoAP(bytes(pkt))) | |
| except Exception: | |
| # Malformed or otherwise invalid packet; log and ignore to keep _recv running | |
| log_coap_sock.exception("Failed to parse incoming CoAP packet") | |
| else: | |
| if pkt: | |
| if not self._debug_drop_package(): | |
| self._on_pkt_recv(pkt, sa_ll) | |
| self._debug_drop_package_counter = 0 | |
| else: | |
| self._debug_drop_package_counter += 1 |
| lst_resources=None, # type: Optional[None, list["CoAPResource"]] | ||
| sock=None, # type: Optional[None, SuperSocket, any] |
There was a problem hiding this comment.
The type hint syntax "Optional[None, list[...]]" is incorrect. Optional takes a single argument representing the type, not multiple arguments. It should be "Optional[list[CoAPResource]]" since Optional[X] is equivalent to Union[X, None].
| lst_resources=None, # type: Optional[None, list["CoAPResource"]] | |
| sock=None, # type: Optional[None, SuperSocket, any] | |
| lst_resources=None, # type: Optional[list["CoAPResource"]] | |
| sock=None, # type: Optional[object] |
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Description
This PR implements a CoAP socket, pretty similar on how ISOTPSoftSocket works.
I implemented the basic message exchange, mostly based on the RFC-7252.
Known-limitations
General comments
It has a dependency for
from scapy.contrib.isotp.isotp_soft_socket import TimeoutScheduler, I found nice how this is implemented, so I just used it, I didn't want to copy/paste again.Also I added some unit tests for the basic cases.
Quick usage