bg left:30% width:300px

Implementing Protocols

A step-by-step guide

󠁼

👨🏻‍💻 Guillaume Valadon ✉️ guillaume@valadon.net


Scapy Concepts

abstracted fields encoding and decoding

  • simplified protocols implementations
    • they are called layers
  • a layer is an object
    • it inherits from the Packet object
  • each layer is a list of fields

Navigating Layers

```python >>> p = Ether() / IP () / UDP() >>> p[IP] <IP frag=0 proto=udp |<UDP |>> >>> p[IP].underlayer <Ether type=IPv4 |<IP frag=0 proto=udp |<UDP |>>> >>> p[IP].payload <UDP |> >>> p.payload.payload <UDP |> ```
* `p[IP]`: access a layer by name * `p[IP].underlayer`: access its ancestor * `p[IP].payload`: access its predecessor

Fields

  • many types (see scapy/fields.py)
    • ByteField, IPField, StrField
  • variants
    • X*: display the field in hexadecimal
    • LE*: Little Endian integer
    • Signed*: signed integer
    • *Enum*: use a dictionary to name values

implementing a layer consists in combining fields


Frequent Patterns


A Simple Layer

```python class UDP(Packet): name = "UDP" fields_desc = [ShortEnumField("sport", 53, UDP_SERVICES), ShortEnumField("dport", 53, UDP_SERVICES), ShortField("len", None), XShortField("chksum", None), ] ``` > see `scapy/layers/inet.py`
* mandatory elements * `Packet` inheritance * a `name` * a `fields_desc` containing a list of fields * three `ShortField` variants * default values

Computing Default Values

```python def post_build(self, packet, payload): packet += payload tmp_len = self.len if tmp_len is None: tmp_len = len(packet) packet = packet[:4] packet += struct.pack("!H", tmp_len) packet += packet[6:] return p ```
* `None` is replaced by 0 * `post_build()` method alters the `bytes` values

computing the chksum value is similar


Matching Answers

```python def hashret(self): return self.payload.hashret() def answers(self, other): if not isinstance(other, UDP): return 0 return self.payload.answers(other.payload) ``` > see `UDP` in `scapy/layers/inet.py`
* `hashret()` method * constructs a hash common to the query & the answer * `answers()` method * returns `True` if `other` is an answer to `self`

Finding The Next Layer

```python bind_layers(Ether, MACControl, type=MAC_CONTROL_ETHER_TYPE) def guess_payload_class(self, payload): try: op_code = (orb(payload[0]) << 8) + orb(payload[1]) return MAC_CTRL_CLASSES[op_code] except KeyError: pass return Packet.guess_payload_class(self, payload) ``` > see `MACControl` in `scapy/contrib/mpls.py`
* `bind_layers()` function * instructs Scapy to tie layers together * `guess_payload_class()` method * returns the class of the payload

Packets As A Single Field

```python class Dot11EltRSN(Dot11Elt): name = "802.11 RSN information" match_subclass = True fields_desc = [ ByteEnumField("ID", 48, _dot11_id_enum), ByteField("len", None), LEShortField("version", 1), PacketField("group_cipher_suite", RSNCipherSuite(), RSNCipherSuite), # --- 8< --- 8< --- 8< --- ``` > see `scapy/layers/dot11.py`
* `PacketField` decodes a field as a packet

Packet As A List Of Fields

```python class GTPEchoResponse(Packet): name = "GTP Echo Response" fields_desc = [PacketListField("IE_list", [], IE_Dispatcher)] # --- 8< --- 8< --- 8< --- ``` > see `scapy/layers/dot11.py`
* `PacketListField` decodes a field as a list of packets * here `IE_Dispatcher` could be a class or a function that returns a class * it makes it possible to handle different types of packets

Notable Fields


Encoding Bits

```python class Dot1AH(Packet): name = "802_1AH" fields_desc = [BitField("prio", 0, 3), BitField("dei", 0, 1), BitField("nca", 0, 1), BitField("res1", 0, 1), BitField("res2", 0, 2), ThreeBytesField("isid", 0)] ``` > see `scapy/layers/l2.py`
* `BitField` can read less than a byte * successive `BitField` must total 8 bits
--- # One Field And Multiple Types
```python class PIMv2JoinPruneAddrsBase(_PIMGenericTlvBase): fields_desc = [ ByteField("addr_family", 1), ByteField("encoding_type", 0), BitField("rsrvd", 0, 5), BitField("sparse", 0, 1), BitField("wildcard", 0, 1), BitField("rpt", 1, 1), ByteField("mask_len", 32), MultipleTypeField( [(IP6Field("src_ip", "::"), lambda pkt: pkt.addr_family == 2)], IPField("src_ip", "0.0.0.0") ), ] ``` > see `scapy/contrib/pim.py`
* `src_ip` could either be `IPField` or `IP6Field` * it depends on the `addr_family` field value
--- # Check A Condition
```python class DNS(DNSCompressedPacket): name = "DNS" fields_desc = [ ConditionalField(ShortField("length", None), lambda p: isinstance(p.underlayer, TCP)), ShortField("id", 0), BitField("qr", 0, 1), # --- 8< --- 8< --- 8< --- ``` > see `scapy/layers/dns.py`
* the `length` field only exists if the condition is `True` * for DNS, it is only valid when TCP is used
--- # Length & Value
```python class HBHOptUnknown(Packet): # IPv6 Hop-By-Hop Option name = "Scapy6 Unknown Option" fields_desc = [_OTypeField("otype", 0x01, _hbhopts), FieldLenField("optlen", None, length_of="optdata", fmt="B"), StrLenField("optdata", "", length_from=lambda pkt: pkt.optlen)] ``` > see `scapy/layers/inet6.py`
* `FieldLenField` encodes a count * `length_of` is used to compute it * `StrLenField` stores a value * `length_from` is used to get it
--- # Count & Value
```python class ReserveRelease(Packet): name = "Reserve / Release" fields_desc = [ByteEnumField("rcmd", 0, {0: "Read Reserve List", 1: "Set Reserve List", 2: "Force Set Reserve List"}), FieldLenField("nb_mac", None, count_of="mac_addrs"), FieldListField("mac_addrs", None, MACField("", ETHER_ANY), count_from=lambda pkt: pkt.nb_mac)] ``` > see `scapy/layers/inet6.py`
* `FieldLenField` encodes a count * `count_of` is used to retrieve it * `FieldListField` stores a list of fields * `count_from` is used to get the number of fields
--- # Allow Failures
```python class TCPerror(TCP): name = "TCP in ICMP" fields_desc = ( TCP.fields_desc[:2] + # MayEnd after the 8 first octets. [MayEnd(TCP.fields_desc[2])] + TCP.fields_desc[3:] ) >> TCPerror(raw(TCP())[:8]) ``` > see `scapy/contrib/aoe.py`
* it might be OK to receive fewer data than the full `Packet` * only the first fields are decoded
--- # Advanced Patterns --- # Find A Better Layer
```python class IPv46(IP, IPv6): name = "IPv4/6" @classmethod def dispatch_hook(cls, _pkt=None, *_, **kargs): if _pkt: if orb(_pkt[0]) >> 4 == 6: return IPv6 elif kargs.get("version") == 6: return IPv6 return IP conf.l2types.register(DLT_RAW, IPv46) ``` > see `scapy/layers/inet6.py`
* `dispatch_hook()` is used to dynamically change the layer * here, it allows choosing `IP` or `IPv6` while parsing Raw IP PCAP files
--- # TCP Reassembly
```python @classmethod def tcp_reassemble(cls, data, metadata, session): length = struct.unpack("!I", data[4:8])[0] + 8 if len(data) >= length: return DoIP(data) return None ``` > see `DoIP` in `scapy/contrib/automotive/doip.py`
* `tcp_reassemble()` allows to decode a layer when enough data is received * Scapy reconstructs the TCP session
--- # Modify The Dissection Result
```python def post_dissect(self, s): self.decrypt() ``` > see `Dot11WEP` in `scapy/layers/dot11.py`
* `post_dissect()` is called when the layer is fully decoded * if the WEP key is known, the frame is decrypted
--- # Discard Extra Bytes
```python def extract_padding(self, pkt): return "", pkt ``` > see `TFTP_Option` in `scapy/layers/tftp.py`
* extra bytes could be considered as padding * here `extract_padding()` informs Scapy, that there is not padding * this is useful for `PacketListField`
--- # Adding A New Field --- # Field States * **i (internal)** * how Scapy manipulates the field * **m (machine)** * how the field is sent over the network * **h (human)** * how the field is displayed for humans --- # Field State Conversion Methods * **i2h()** * nternal → human * **i2m()** * internal → machine * **m2i()** * machine → internal > these methods convert field states for specific use cases --- # Alter A Field Behavior
```python class ThreeBytesField(Field[int, int]): def __init__(self, name, default): Field.__init__(self, name, default, "!I") def addfield(self, pkt, s, val): return s + struct.pack(self.fmt, self.i2m(pkt, val))[1:4] def getfield(self, pkt, s): return s[3:], self.m2i(pkt, struct.unpack(self.fmt, b"\x00" + s[:3])[0]) ``` > see `scapy/fields.py`
* `getfield()` decodes three bytes from `s` * it returns the remaining part of `s` and the decode value * `addfield()` encodes three bytes
--- ![bg right:50% width:300px](/talks/2022_GreHack/images/scapy_logo.png) # Questions? ## Going Further * https://github.com/secdev/scapy * https://scapy.net * https://scapy.readthedocs.io