👀

夏休みの宿題でPort Scannerを自作してみた③~実装編~

2024/08/24に公開

これまでのお話

  • 夏だし,Port Scannerを自作してみたい --> せっかくなのでmy_portscannerを作る過程を紹介していきます。
  • 技術選定編では,Port Scannerを自作するにあたってどのような技術を使うかを考えました。
  • 開発環境準備編Port Scannerの開発環境を整えた。
  • 今回は,Port Scannerの実装をしていきます。

実装の前に

pythonをパッケージ化する

yt-dlpの構成を真似したらいい感じにパッケージ化できました。

全体構成としてはこんな感じ

$ pwd
/home/tomita/my_portscanner/src
$ tree
.
├── my_portscanner
│   ├── get_datetime.py
│   ├── __init__.py
│   ├── __main__.py
│   ├── options.py
│   ├── scan_tools
│   │   ├── ConnectScan.py
│   │   ├── __init__.py
│   │   ├── Scan.py
│   │   └── SynScan.py
│   └── version.py
└── tests
    ├── __init__.py
    ├── scan_tools
    │   ├── __init__.py
    │   ├── test_ConnectScan.py
    │   └── test_SynScan.py
    ├── test_get_datetime.py
    └── test_options.py

全体の動作のイメージとしては

  1. rye build後には以下のようにして実行できるようになります。

    my_portscanner localhost -sS
    
  2. my_portscannersrc/my_portscanner/__main__.pyを実行します。

    import my_portscanner
    
    
    if __name__ == "__main__":
        my_portscanner.main()
    
  3. src/my_portscanner/__init__.pymain関数が実行されます。 main関数に全体の処理が書いてあります。

__init__.pyのmain関数が冗長になると全体の処理が追いかけにくくなり,テストもしにくくなるので,細かい処理はなるべく外だしすることを心がけました。

例えば,options.pyはargparseというライブラリを使って引数処理を行っています。

引数は覚えやすいように-sS-sTのようにnmapに似た形式にしました。自作ツールあるあるなのですが,自分で作った引数って作っているときは最高にイケてると思うのに数ヶ月経つと全くおぼえていないんですよね笑。

buildにもryeを使う

rye buildを使うと,パッケージ化されたpythonのパッケージがdist/

rye build
ls dist
my_portscanner-0.1.0-py3-none-any.whl  my_portscanner-0.1.0.tar.gz

port scan部分の実装

クラス構成

  • 今回は,Connect ScanとSyn Scanの2種類のスキャン方法に絞ることにしました。
  • 今後,他のスキャン方法を追加することも考えられるので,各スキャン方法で共通する部分をScanクラスに切り出し,各スキャンクラスに継承させることにしました。

composition patternを使うほうが継承より美しそうですが,面倒だったので継承にしました。

├── my_portscanner
│   ├── scan_tools
│   │   ├── ConnectScan.py
│   │   ├── __init__.py
│   │   ├── Scan.py
│   │   └── SynScan.py
│   ├── __init__.py
│   ├── __main__.py

各スキャン方法について

コードを見るのが一番てっとり早いですが,イメージがつかみにくいと思うので,nmapのドキュメントを参考にしてください。

スキャン部分を作る

いきなりコードを書き出してもわかりにくいので,スモールスタートして小さいものを作り,これをパッケージ化して外部から呼び出せるようにしました。

  • Connect Scan
import socket


target_ip = 127.0.0.1
target_port = 22

s = socket.socket()
errno = s.connect_ex((target_ip, target_port))
s.close()

if errno == 0:
    print(f"TCP port {target_port} is open")
else:
    print(f"TCP port {target_port} is closed")
  • SYN Scan
from scapy.all import IP, TCP, sr1

target_ip = 127.0.0.1
target_port = 22

# SYNパケットを作成
syn_packet = IP(dst=target_ip)/TCP(dport=target_port, flags="S")

response_packet = sr1(syn_packet)

# SYN/ACKパケットが返ってきた場合は、ポートが開いていると判断
if (response_packet.haslayer(TCP) and 
    response_packet[TCP].flags == "SA"):
    print(f"TCP port {target_port} is open")
else:
    print(f"TCP port {target_port} is closed")

こんな感じでライブラリの動きを学んだら,Class化して__init__.pyから呼び出せるようにします。


引数処理ができるようにする

├── my_portscanner
│   ├── scan_tools
│   │   ├── ConnectScan.py
│   │   ├── __init__.py
│   │   ├── Scan.py
│   │   └── SynScan.py
│   ├── __init__.py
│   ├── __main__.py
│   ├── options.py ☆new☆

options.pyを追加して,引数処理を行うようにしました。
ライブラリとしてはargparseを使いました。


テスト作成

テストの実行はrye testで行えるみたいです。
rye testtestsディレクトリ以下にあるtest_*.pyを探してテストを実行してくれます。

ネットワーク通信部分はモック化してテストを行いました。自分はモックライブラリとしてunittest.mockを使いましたが,pytestを使うともっと簡単にモック化できるかもしれません。unittestでモックを書くのは初めてだったので単体テストの実装が一番苦労しました。
今回は趣味プログラミングなので,カバレッジ100%とかは目指さず,各機能が正しく動くかを確認する程度に留めました。

from io import StringIO
import sys
import unittest
from unittest.mock import patch, MagicMock
from my_portscanner.scan_tools.ConnectScan import ConnectScan


class TestConnectScan(unittest.TestCase):
    def setUp(self):
        """
        テスト実行時に毎回実行され,
        socket.socketのmockしたインスタンスを作成する。
        """
        # 共通のテストデータ
        self.target_ip = "192.168.150.2"
        self.target_port_list = [22, 80, 443]
        self.expected_open_ports = [22, 443]

        # mock_socket_instanceの作成
        self.mock_socket_instance = MagicMock()

        # connect_exの戻り値を設定
        def connect_ex_side_effect(address):
            ip, port = address
            if port in self.expected_open_ports:
                return 0  # open
            else:
                return 1  # close

        self.mock_socket_instance.connect_ex.side_effect = connect_ex_side_effect
        return

    @patch("my_portscanner.scan_tools.ConnectScan.socket.socket")
    def test_run(self, mock_socket):
        mock_socket.return_value = self.mock_socket_instance

        scan = ConnectScan(
            target_ip=self.target_ip, target_port_list=self.target_port_list
        )
        open_ports = scan.run()

        self.assertEqual(open_ports, self.expected_open_ports)

    @patch("my_portscanner.scan_tools.ConnectScan.socket.socket")
    def test_print_result(self, mock_socket):
        mock_socket.return_value = self.mock_socket_instance

        scan = ConnectScan(
            target_ip=self.target_ip, target_port_list=self.target_port_list
        )
        scan.run()

        # 標準出力をキャプチャ
        captured_output = StringIO()
        sys.stdout = captured_output

        # print_resultメソッドの実行
        scan.print_result()

        # 標準出力の内容を取得
        sys.stdout = sys.__stdout__
        output = captured_output.getvalue().strip()

        # 期待される出力
        expected_output = "PORT       STATE SERVICE\n22/tcp     open  unknown\n443/tcp    open  unknown"

        self.assertEqual(output, expected_output)


if __name__ == "__main__":
    unittest.main()

一例としてConnectScanのテストを載せておきます。
socket.socketのmockしたインスタンスを作成し,connect_exが呼ばれた際に,予め設定したポートがopenしている応答を返すようにside_effectを設定することで実際に通信を行わずにConnectScanの各関数のテストが行えます。

テストの方式はいわゆるロンドン派と呼ばれるものに近いと思われます。
参考: テストの流派


完成とその後の活動

こちらが現状の完成品になります。
my_portscanner
追加で実施した事項としては,

  • GitHub Actionsを使ってテストを自動化,リリースを自動化,Docker Container Registryへの配布を自動化
  • ドキュメントの整備
  • 例外処理をちゃんと書く
  • より詳細なスキャンができるように改善
    等を行いました。
    今後,async awaitを使った非同期処理や,他のスキャン方法の追加等思いついたものをどんどん実装していきたいです。
GitHubで編集を提案

Discussion