git: 54b955f4df5e - main - netlink: add support for decoding genl ops/groups in pytest
- Go to: [ bottom of page ] [ top of archives ] [ this month ]
Date: Thu, 01 Jun 2023 10:46:09 UTC
The branch main has been updated by melifaro: URL: https://cgit.FreeBSD.org/src/commit/?id=54b955f4df5e76b5679ba7f3eb6bb2d5fc62923d commit 54b955f4df5e76b5679ba7f3eb6bb2d5fc62923d Author: Alexander V. Chernikov <melifaro@FreeBSD.org> AuthorDate: 2023-06-01 10:45:29 +0000 Commit: Alexander V. Chernikov <melifaro@FreeBSD.org> CommitDate: 2023-06-01 10:45:29 +0000 netlink: add support for decoding genl ops/groups in pytest MFC after: 2 weeks --- tests/atf_python/sys/netlink/message.py | 39 ++++++++++++++++++++----- tests/atf_python/sys/netlink/netlink.py | 3 ++ tests/atf_python/sys/netlink/netlink_generic.py | 39 +++++++++++++++++++++++-- tests/atf_python/sys/netlink/utils.py | 2 ++ 4 files changed, 73 insertions(+), 10 deletions(-) diff --git a/tests/atf_python/sys/netlink/message.py b/tests/atf_python/sys/netlink/message.py index 1e2b71775102..98a1e3bb21c5 100644 --- a/tests/atf_python/sys/netlink/message.py +++ b/tests/atf_python/sys/netlink/message.py @@ -194,11 +194,32 @@ class StdNetlinkMessage(BaseNetlinkMessage): raise return self + def parse_child(self, data: bytes, attr_key, attr_map): + attrs, _ = self.parse_attrs(data, attr_map) + return NlAttrNested(attr_key, attrs) + + def parse_child_array(self, data: bytes, attr_key, attr_map): + ret = [] + off = 0 + while len(data) - off >= 4: + nla_len, raw_nla_type = struct.unpack("@HH", data[off : off + 4]) + if nla_len + off > len(data): + raise ValueError( + "attr length {} > than the remaining length {}".format( + nla_len, len(data) - off + ) + ) + nla_type = raw_nla_type & 0x3FFF + val = self.parse_child(data[off + 4 : off + nla_len], nla_type, attr_map) + ret.append(val) + off += align4(nla_len) + return NlAttrNested(attr_key, ret) + def parse_attrs(self, data: bytes, attr_map): ret = [] off = 0 while len(data) - off >= 4: - nla_len, raw_nla_type = struct.unpack("@HH", data[off:off + 4]) + nla_len, raw_nla_type = struct.unpack("@HH", data[off : off + 4]) if nla_len + off > len(data): raise ValueError( "attr length {} > than the remaining length {}".format( @@ -208,16 +229,20 @@ class StdNetlinkMessage(BaseNetlinkMessage): nla_type = raw_nla_type & 0x3FFF if nla_type in attr_map: v = attr_map[nla_type] - val = v["ad"].cls.from_bytes(data[off:off + nla_len], v["ad"].val) + val = v["ad"].cls.from_bytes(data[off : off + nla_len], v["ad"].val) if "child" in v: # nested - attrs, _ = self.parse_attrs( - data[off + 4:off + nla_len], v["child"] - ) - val = NlAttrNested(v["ad"].val, attrs) + child_data = data[off + 4 : off + nla_len] + if v.get("is_array", False): + # Array of nested attributes + val = self.parse_child_array( + child_data, v["ad"].val, v["child"] + ) + else: + val = self.parse_child(child_data, v["ad"].val, v["child"]) else: # unknown attribute - val = NlAttr(raw_nla_type, data[off + 4:off + nla_len]) + val = NlAttr(raw_nla_type, data[off + 4 : off + nla_len]) ret.append(val) off += align4(nla_len) return ret, off diff --git a/tests/atf_python/sys/netlink/netlink.py b/tests/atf_python/sys/netlink/netlink.py index 9b5906815489..f8f886b09b24 100644 --- a/tests/atf_python/sys/netlink/netlink.py +++ b/tests/atf_python/sys/netlink/netlink.py @@ -265,6 +265,9 @@ class Nlsock: # k = struct.pack("@BBHII", 12, 38, 0, self.pid, mask) # self.sock_fd.bind(k) + def join_group(self, group_id: int): + self.sock_fd.setsockopt(270, 1, group_id) + def write_message(self, msg, verbose=True): if verbose: print("vvvvvvvv OUT vvvvvvvv") diff --git a/tests/atf_python/sys/netlink/netlink_generic.py b/tests/atf_python/sys/netlink/netlink_generic.py index b49a30c1e8e7..80c6eea72a93 100644 --- a/tests/atf_python/sys/netlink/netlink_generic.py +++ b/tests/atf_python/sys/netlink/netlink_generic.py @@ -9,6 +9,7 @@ from enum import Enum from atf_python.sys.netlink.attrs import NlAttr from atf_python.sys.netlink.attrs import NlAttrIp4 from atf_python.sys.netlink.attrs import NlAttrIp6 +from atf_python.sys.netlink.attrs import NlAttrNested from atf_python.sys.netlink.attrs import NlAttrS32 from atf_python.sys.netlink.attrs import NlAttrStr from atf_python.sys.netlink.attrs import NlAttrU16 @@ -94,6 +95,16 @@ class GenlCtrlAttrType(Enum): CTRL_ATTR_OP = 10 +class GenlCtrlAttrOpType(Enum): + CTRL_ATTR_OP_ID = 1 + CTRL_ATTR_OP_FLAGS = 2 + + +class GenlCtrlAttrMcastGroupsType(Enum): + CTRL_ATTR_MCAST_GRP_NAME = 1 + CTRL_ATTR_MCAST_GRP_ID = 2 + + genl_ctrl_attrs = prepare_attrs_map( [ AttrDescr(GenlCtrlAttrType.CTRL_ATTR_FAMILY_ID, NlAttrU16), @@ -101,6 +112,28 @@ genl_ctrl_attrs = prepare_attrs_map( AttrDescr(GenlCtrlAttrType.CTRL_ATTR_VERSION, NlAttrU32), AttrDescr(GenlCtrlAttrType.CTRL_ATTR_HDRSIZE, NlAttrU32), AttrDescr(GenlCtrlAttrType.CTRL_ATTR_MAXATTR, NlAttrU32), + AttrDescr( + GenlCtrlAttrType.CTRL_ATTR_OPS, + NlAttrNested, + [ + AttrDescr(GenlCtrlAttrOpType.CTRL_ATTR_OP_ID, NlAttrU32), + AttrDescr(GenlCtrlAttrOpType.CTRL_ATTR_OP_FLAGS, NlAttrU32), + ], + True, + ), + AttrDescr( + GenlCtrlAttrType.CTRL_ATTR_MCAST_GROUPS, + NlAttrNested, + [ + AttrDescr( + GenlCtrlAttrMcastGroupsType.CTRL_ATTR_MCAST_GRP_NAME, NlAttrStr + ), + AttrDescr( + GenlCtrlAttrMcastGroupsType.CTRL_ATTR_MCAST_GRP_ID, NlAttrU32 + ), + ], + True, + ), ] ) @@ -220,13 +253,13 @@ class NlAttrTS(NlAttr): @staticmethod def _validate(data): assert len(data) == NlAttr.HDR_LEN + NlAttrTS.DATA_LEN - nla_len, nla_type = struct.unpack("@HH", data[:NlAttr.HDR_LEN]) + nla_len, nla_type = struct.unpack("@HH", data[: NlAttr.HDR_LEN]) assert nla_len == NlAttr.HDR_LEN + NlAttrTS.DATA_LEN @classmethod def _parse(cls, data): - nla_len, nla_type = struct.unpack("@HH", data[:NlAttr.HDR_LEN]) - val = timespec.from_buffer_copy(data[NlAttr.HDR_LEN:]) + nla_len, nla_type = struct.unpack("@HH", data[: NlAttr.HDR_LEN]) + val = timespec.from_buffer_copy(data[NlAttr.HDR_LEN :]) return cls(nla_type, val) def __bytes__(self): diff --git a/tests/atf_python/sys/netlink/utils.py b/tests/atf_python/sys/netlink/utils.py index 7a41791b5318..f1d0ba3321ed 100644 --- a/tests/atf_python/sys/netlink/utils.py +++ b/tests/atf_python/sys/netlink/utils.py @@ -34,6 +34,7 @@ class AttrDescr(NamedTuple): val: Enum cls: "NlAttr" child_map: Any = None + is_array: bool = False def prepare_attrs_map(attrs: List[AttrDescr]) -> Dict[str, Dict]: @@ -42,6 +43,7 @@ def prepare_attrs_map(attrs: List[AttrDescr]) -> Dict[str, Dict]: ret[ad.val.value] = {"ad": ad} if ad.child_map: ret[ad.val.value]["child"] = prepare_attrs_map(ad.child_map) + ret[ad.val.value]["is_array"] = ad.is_array return ret