🌐

P4Runtimeでのテーブルエントリ追加をPythonを使って試してみる

2024/05/10に公開

はじめに

P4Runtime APIは,P4で定義されたデータプレーン要素を制御するコントロールプレーンの仕様です.
P4コントローラはこのP4RuntimeAPIを用いて,テーブルへのエントリ追加削除やPacketIn/PacketOutの処理を行います.

P4 TutorialでP4を学習したのですが,コントローラの処理がいまいち理解できないなーと.
やっぱりP4Runtimeを理解するにはgRPCからコードを書かないと,ということでgRPCの処理からコントローラを作ったときのメモです.

コントローラを作成するにあたっては,P4 turorialのコードを読んで参考にしています.
また,P4 TutorialのBasicスイッチを対象にコントローラを作成します.

https://github.com/p4lang/tutorials

P4Runtime

P4RuntimeのAPI仕様は,Protocol Bufferで定義されています.

https://github.com/p4lang/p4runtime/tree/main/proto/p4

P4Runtimeでの接続の流れ

P4Runtimeでは,以下の流れでP4デバイスと接続します.

  1. セッションの確立
  2. Primary選出
    1. P4Rumtimeは複数のセッションを持つことができる
    2. Primaryはroleの範囲内でデータプレーンの排他的な制御が可能
    3. device_idrole_idのペアごとに,election_idが最大のクライアントをPrimaryとする
  3. ForwardingPipelineConfigの設定
    1. データプレーンを設定する
  4. Entityの書き込みと読み取り

コントローラ実装

本記事では,以下のファイル構造で進めます.
p4src/basic.p4はP4Tutorialから取得したP4ソースファイルです.

./
├── mininet_lib/
│   ├── __init__.py
│   └── simple_net.py
├── p4controller/
│   ├── __init__.py
│   └── p4runtime_client.py
├── p4src/
│   ├── build/
│   └── basic.p4
├── README.md
├── requirements.txt
└── simple_net_controller.py

セッションの確立(StreamChannelの確立)

P4コントローラは,StreamChannelでP4デバイスとセッションを確立します.

P4コントローラ自体のクラスとStreamChannelに関する処理は以下のコードです.StreamChannelの確立は,’stub’にリクエストメッセージの’Queue’をイテレータとして渡すことで行います.

また,以下のコードでは,P4InfoのファイルとP4 Device Configのファイルを読み込んでいます.P4InfoとP4 Device Configは,P4ソースプログラムをコンパイルすると出力されるファイルです.P4Infoでは,P4RuntimeでアクセスできるP4エンティティの使用が定められています.P4 Device Configには,ForwardingPipelineConfigが含まれています.これらの情報は,後述するForwardingPipelineConfigの設定で必要です.

p4runtime_client.py
class P4RuntimeClient:

    def __init__(self, ip: str, port: str, device_id: int, p4info_txt: str, p4device_json: str, election_id_high: int,
                 election_id_low: int, logger: Logger = None):
        self.ip = ip
        self.port = port
        self.device_id = device_id

        self.p4info = p4info_pb2.P4Info()
        with open(p4info_txt, "r") as f:
            google.protobuf.text_format.Merge(f.read(), self.p4info)
        with open(p4device_json, "rb") as f:
            self.device_config = f.read()

        # Election ID (上位64ビット)
        self.election_id_high = election_id_high
        # Election ID (下位64ビット)
        self.election_id_low = election_id_low

        self.logger = logger

        self.channel = None
        self.stub = None

        self.stream_replies = None
        self.req_queue = Queue()

    def establish_stream_channel(self):
        self.channel = grpc.insecure_channel(self.ip + ":" + self.port)
        self.stub = p4runtime_pb2_grpc.P4RuntimeStub(self.channel)

        self.stream_replies = self.stub.StreamChannel(iter(self.req_queue.get, None))
   
    def close_stream_channel(self):
        self.stream_replies.cancel()

Primary選出

コントローラは,セッションを確立したあと,Primaryを選出するためにMasterArbitrationUpdateによりelection_idを通知します.

MasterArbitrationUpdateに関する処理は以下です.

p4runtime_client.py
    def master_arbitration_update(self):
        req = p4runtime_pb2.StreamMessageRequest()
        req.arbitration.device_id = self.device_id
        req.arbitration.election_id.high = self.election_id_high
        req.arbitration.election_id.low = self.election_id_low

        self.logger.debug("MasterArbitrationUpdate[Request]: %s" % str(req))
        self.req_queue.put(req)
        for rep in self.stream_replies:
            self.logger.debug("MasterArbitrationUpdate[Reply]: %s" % str(rep))
            return rep

ForwardingPipelineConfigの設定

次に,コントローラからForwardingPipelineConfigを設定します.ForwardingPipelineConfigに関する処理を以下のように実装します.

p4runtime_client.py
    def set_pipeline_config_forward(self):
        req = p4runtime_pb2.SetForwardingPipelineConfigRequest()
        req.election_id.high = self.election_id_high
        req.election_id.low = self.election_id_low
        req.device_id = self.device_id

        req.config.p4info.CopyFrom(self.p4info)
        req.config.p4_device_config = self.device_config
        req.config.cookie.cookie = 1
        req.action = p4runtime_pb2.SetForwardingPipelineConfigRequest.VERIFY_AND_COMMIT
        self.logger.debug("SetPipelineConfigForward[Request]: %s" % str(req))
        rep = self.stub.SetForwardingPipelineConfig(req)
        self.logger.debug("SetPipelineConfigForward[Reply]: %s" % str(rep))
        return rep

    def get_pipeline_config_forward(self):
        req = p4runtime_pb2.GetForwardingPipelineConfigRequest()
        req.device_id = self.device_id
        self.logger.debug("GetPipelineConfigForward[Request]: %s" % str(req))
        rep = self.stub.GetForwardingPipelineConfig(req)
        self.logger.debug("GetPipelineConfigForward[Reply]: %s" % str(rep))
        return rep

テーブルエントリの追加

ForwardingPipelineConfig設定後,Writeを使用して,テーブルエントリといったエンティティを操作します.以下では,テーブルを読み書きできるメソッドを追加します.

p4runtime_client.py

    def write_table_entries(self, table_entries: List[TableEntry]):
        req = p4runtime_pb2.WriteRequest()
        req.device_id = self.device_id
        req.election_id.high = self.election_id_high
        req.election_id.low = self.election_id_low
        for entry in table_entries:
            update = req.updates.add()
            update.type = entry.get("update_type", p4runtime_pb2.Update.INSERT)
            table_entry = build_table_entry(entry)
            update.entity.table_entry.CopyFrom(table_entry)
        self.logger.debug("WriteTableEntries [Request]: %s" % str(req))
        rep = self.stub.Write(req)
        self.logger.debug("WriteTableEntries [Reply]: %s" % str(rep))

    def read_table_entries(self, table_id=0):
        req = p4runtime_pb2.ReadRequest()
        req.device_id = self.device_id
        entity = req.entities.add()
        table_entry = entity.table_entry
        table_entry.table_id = table_id

        self.logger.debug("ReadTableEntries [Request]: %s" % str(req))
        for rep in self.stub.Read(req):
            self.logger.debug("ReadTableEntries [Rep]: %s" % str(rep))
            yield rep

build_table_entryでは,以下のようにTableEntryメッセージを作成しています.型を付けたかったのでTypeDictを使ってます.

p4runtime_client.py
class Action(TypedDict, total=False):
    action_id: int
    params: List[Tuple[int, bytes]]  # list of (param_id, value)

class FieldMatch(TypedDict, total=False):
    # ref: https://github.com/p4lang/p4runtime/blob/main/proto/p4/v1/p4runtime.proto#L229
    field_id: int
    match_type: p4runtime_pb2.FieldMatch.Exact | p4runtime_pb2.FieldMatch.Ternary | p4runtime_pb2.FieldMatch.LPM | p4runtime_pb2.FieldMatch.Range | p4runtime_pb2.FieldMatch.Optional | Any
    value: bytes  # Exact/Ternary/LPM/Optional
    mask: bytes  # Ternary
    prefix_len: int  # LPM
    low: bytes  # Range
    high: bytes  # Range

class TableEntry(TypedDict, total=False):
    update_type: int
    table_id: int
    match: List[FieldMatch]
    is_default_action: bool
    action: Action
    priority: int

def build_table_entry(entry: TableEntry):
    table_entry = p4runtime_pb2.TableEntry()
    table_entry.table_id = entry["table_id"]
    table_entry.is_default_action = entry.get("is_default_action", False)
    table_entry.priority = entry.get("priority", 0)

    action = table_entry.action.action
    action.action_id = entry["action"]["action_id"]
    for action_param in entry["action"].get("params", []):
        param = action.params.add()
        param.param_id = action_param[0]
        param.value = action_param[1]

    for m in entry.get("match", []):
        match_type = m["match_type"]
        field_match = table_entry.match.add()
        field_match.field_id = m["field_id"]
        if match_type == p4runtime_pb2.FieldMatch.Exact:
            field_match.exact.value = m["value"]
        elif match_type == p4runtime_pb2.FieldMatch.Ternary:
            field_match.ternary.value = m["value"]
            field_match.ternary.mask = m["mask"]
        elif match_type == p4runtime_pb2.FieldMatch.LPM:
            field_match.lpm.value = m["value"]
            field_match.lpm.prefix_len = m["prefix_len"]
        elif match_type == p4runtime_pb2.FieldMatch.Range:
            field_match.range.low = m["low"]
            field_match.range.high = m["high"]
        elif match_type == p4runtime_pb2.FieldMatch.Optional:
            field_match.optional.value = m["value"]
        else:
            raise Exception("%s is unknown match type." % str(match_type))

    return table_entry

mininetでのネットワーク作成

テーブルエントリを追加して動くのを見たいだけなので,以下のようなシンプルなネットワークを考えます.

ネットワーク作成にはmininetを利用し,p1において,P4スイッチとしてbmv2を使用します.

simple_net.drawio.png

ネットワークの作成

ネットワーク図の通りにネットワークを作成します.
basic.p4では,ARPの処理が実装されていないため,あらかじめMACアドレスは設定しておきます.
なお,p1については,Pythonではなく,mininetのコンソールでbmv2を起動することを想定しています.

simple_net.py
from mininet.cli import CLI
from mininet.net import Mininet
from mininet.log import setLogLevel

def run_topo():
    net = Mininet()
    
    h1 = net.addHost("h1", ip="192.168.1.1/24", mac="08:00:00:00:00:01")
    h2 = net.addHost("h2", ip="192.168.2.2/24", mac="08:00:00:00:00:02")
    p1 = net.addHost("p1", ip=None, inNamespace=False)

    net.addLink(h1, p1, intfName1="h1_p1", intfName2="p1_h1")
    net.addLink(h2, p1, intfName1="h2_p1", intfName2="p1_h2")

    net.start()

    h1.cmdPrint("ip route add default via 192.168.1.254 dev h1_p1")
    h1.cmdPrint("arp -i h1_p1 -s 192.168.1.254 08:00:00:00:01:00")
    h2.cmdPrint("ip route add default via 192.168.2.254 dev h2_p1")
    h2.cmdPrint("arp -i h2_p1 -s 192.168.2.254 08:00:00:00:02:00")

    CLI(net)
    net.stop()

if __name__ == "__main__":
    setLogLevel("info")
    run_topo()

ネットワークの実行

mininetopennetworking/p4mnのDockerコンテナを流用して実行します.opennetworking/p4mnbmv2スイッチのネットワークをmininetで作成できる環境です.このコンテナのエントリポイントをbashとして,bashsimple_net.pyを実行します.

以下のコマンドでDockerコンテナを実行します.

$ docker run --privileged --rm -it -v $PWD:/workdir -w /workdir -p50001-50030:50001-50030 --name p4mn --hostname p4mn --entrypoint "bash" opennetworking/p4mn:stable

Dockerコンテナ内でsimple_net.pyを実行します.

root@p4mn:/workdir# python mininet_lib/simple_net.py

simple_net.pyでは,bmv2を起動していません.
そのため,mininetのコンソール上でp1 <command>を用いてp1でスイッチを起動します.

mininet> p1 simple_switch_grpc --device-id 1 -i 1@p1_h1 -i 2@p1_h2 --log-console --log-level info ./p4src/build/basic.bmv2.json -- --cpu-port 255 --grpc-server-addr 0.0.0.0:50001 &

この時点ではP4スイッチにエントリを追加していないため,h1h2で疎通を取ることができません.

mininet> h1 ping -c 3 h2
PING 192.168.2.2 (192.168.2.2) 56(84) bytes of data.

--- 192.168.2.2 ping statistics ---
3 packets transmitted, 0 received, 100% packet loss, time 2043ms
mininet> 

P4Runtimeコントローラでのエントリの追加

P4Runtimeクライアントの機能についてはp4runtime_client.pyで実装したので,後はアドレスの情報や設定するエントリを追加するだけです.
以下のような設定で,P4デバイスと接続しエントリを追加します.

simple_net_controller.py
import argparse
import binascii
import socket
from time import sleep
from logging import StreamHandler, DEBUG, getLogger
from typing import List

from p4.v1 import p4runtime_pb2

from p4controller.p4runtime_client import P4RuntimeClient, TableEntry

def get_args():
    """get args from command line"""
    parser = argparse.ArgumentParser()
    parser.add_argument("--ip", default="127.0.0.1", help="p4 switch address")
    parser.add_argument("--port", default="50001", help="p4 switch port")
    parser.add_argument("--device_id", default=1, help="p4 device id")
    parser.add_argument("--p4info_txt", default="./p4src/build/basic.p4info.txt", help="p4info file")
    parser.add_argument("--p4device_json", default="./p4src/build/basic.bmv2.json", help="p4 device json file")
    parser.add_argument("--election_id_high", default=0, help="Election ID (High)")
    parser.add_argument("--election_id_low", default=1, help="Election ID (Low)")

    args = parser.parse_args()
    return args

def new_p4runtime_client():
    args = get_args()
    logger = getLogger(__name__)
    logger.setLevel(DEBUG)
    logger.addHandler(StreamHandler())
    client = P4RuntimeClient(
        ip=args.ip,
        port=args.port,
        device_id=args.device_id,
        p4info_txt=args.p4info_txt,
        p4device_json=args.p4device_json,
        election_id_high=args.election_id_high,
        election_id_low=args.election_id_low,
        logger=logger
    )
    return client

if __name__ == "__main__":
    client = new_p4runtime_client()
    client.establish_stream_channel()
    client.master_arbitration_update()
    client.set_pipeline_config_forward()
    client.get_pipeline_config_forward()

    # 書き込むテーブルエントリ.各IDはp4infoから取得
    table_entries: List[TableEntry] = [
        {
            "update_type": p4runtime_pb2.Update.MODIFY,
            "table_id": 33574068,  # MyIngress.ipv4_lpm
            "is_default_action": True,
            "action": {
                "action_id": 16805608,  # MyIngress.drop
            }
        }, {
            "update_type": p4runtime_pb2.Update.INSERT,
            "table_id": 33574068,  # MyIngress.ipv4_lpm
            "match": [
                {
                    "field_id": 1,  # hdr.ipv4.dstAddr
                    "match_type": p4runtime_pb2.FieldMatch.LPM,
                    "value": socket.inet_aton("192.168.1.1"),
                    "prefix_len": 32,
                }
            ],
            "action": {
                "action_id": 16799317,  # MyIngress.ipv4_forward
                "params": [
                    (1, binascii.unhexlify("08:00:00:00:00:01".replace(':', ''))),  # (dstAddr, "08:00:00:00:00:01")
                    (2, (1).to_bytes(2, "big"))  # (port, 1)
                ]
            }
        }, {
            "update_type": p4runtime_pb2.Update.INSERT,
            "table_id": 33574068,  # MyIngress.ipv4_lpm
            "match": [
                {
                    "field_id": 1,  # hdr.ipv4.dstAddr
                    "match_type": p4runtime_pb2.FieldMatch.LPM,
                    "value": socket.inet_aton("192.168.2.2"),
                    "prefix_len": 32,
                }
            ],
            "action": {
                "action_id": 16799317,  # MyIngress.ipv4_forward
                "params": [
                    (1, binascii.unhexlify("08:00:00:00:00:02".replace(':', ''))),  # (dstAddr, "08:00:00:00:00:02")
                    (2, (2).to_bytes(2, "big"))  # (port, 2)
                ]
            }
        },
    ]
    client.write_table_entries(table_entries)
    client.read_table_entries()

    client.close_stream_channel()

コントローラの実行

Dockerコンテナで実行中のP4スイッチに対して,以下のコマンドでコントローラを実行しエントリを追加します.

$ python3 simple_net_controller.py
MasterArbitrationUpdate[Request]: arbitration {
  device_id: 1
  election_id {
    low: 1
  }
}

MasterArbitrationUpdate[Reply]: arbitration {
  device_id: 1
  election_id {
    low: 1
  }
  status {
    message: "Is master"
  }
}

...
()
...

ReadTableEntries [Request]: device_id: 1
entities {
  table_entry {
  }
}

ReadTableEntries [Rep]: entities {
  table_entry {
    table_id: 33574068
    match {
      field_id: 1
      lpm {
        value: "\300\250\001\001"
        prefix_len: 32
      }
    }
    action {
      action {
        action_id: 16799317
        params {
          param_id: 1
          value: "\010\000\000\000\000\001"
        }
        params {
          param_id: 2
          value: "\000\001"
        }
      }
    }
  }
}
entities {
  table_entry {
    table_id: 33574068
    match {
      field_id: 1
      lpm {
        value: "\300\250\002\002"
        prefix_len: 32
      }
    }
    action {
      action {
        action_id: 16799317
        params {
          param_id: 1
          value: "\010\000\000\000\000\002"
        }
        params {
          param_id: 2
          value: "\000\002"
        }
      }
    }
  }
}

entities {
  table_entry {
    table_id: 33574068
    match {
      field_id: 1
      lpm {
        value: "\300\250\001\001"
        prefix_len: 32
      }
    }
    action {
      action {
        action_id: 16799317
        params {
          param_id: 1
          value: "\010\000\000\000\000\001"
        }
        params {
          param_id: 2
          value: "\000\001"
        }
      }
    }
  }
}
entities {
  table_entry {
    table_id: 33574068
    match {
      field_id: 1
      lpm {
        value: "\300\250\002\002"
        prefix_len: 32
      }
    }
    action {
      action {
        action_id: 16799317
        params {
          param_id: 1
          value: "\010\000\000\000\000\002"
        }
        params {
          param_id: 2
          value: "\000\002"
        }
      }
    }
  }
}

ログの情報から,エントリが追加されている状態であることが確認できます.

また,pingを実行し,h1h2で疎通が取れていることを確認します.

mininet> h1 ping -c 3 h2
PING 192.168.2.2 (192.168.2.2) 56(84) bytes of data.
64 bytes from 192.168.2.2: icmp_seq=1 ttl=63 time=0.404 ms
64 bytes from 192.168.2.2: icmp_seq=2 ttl=63 time=0.306 ms
64 bytes from 192.168.2.2: icmp_seq=3 ttl=63 time=0.396 ms

--- 192.168.2.2 ping statistics ---
3 packets transmitted, 3 received, 0% packet loss, time 2024ms
rtt min/avg/max/mdev = 0.306/0.368/0.404/0.049 ms

おわりに

P4の他のエンティティとの書き込みなどわかってないので,色々試したいですね.

参考文献

Discussion