🌊

[SQLMap] 長く利用されるPython製アプリケーションをコードリーディングする

2024/05/02に公開

SQLMapについて

OSS InsightのWeb Scanner部門で常に人気があるリポジトリの一つがSQLMapです。*1
https://ossinsight.io/collections/web-scanner
https://github.com/sqlmapproject/sqlmap

2006年から2024年現在も活発に開発が行われている珍しいPython アプリケーションです。触ってみながらコードリーディングしたいと思います。

SQLMapはSQL Injectionに特化したツールで、5つのdetect / exploitを機械的に実行します。まずはその5つについて理解しましょう。

SQLi techniques

  1. Boolean-based blind: ブラインドSQLインジェクションの一種で、HTTPリクエスト内の影響を受けるパラメータに、真偽を判断するSQLステートメントを挿入します。出力を文字単位で推測します。
  2. Time-based blind: ブラインドSQLインジェクションの一種で、バックエンドDBMSを一定時間待機させるSQLステートメントを挿入します。出力を文字単位で推測します。
  3. Error-based: データベース固有のエラーメッセージを引き起こすステートメントを挿入し、DBMSエラーメッセージを解析して情報を取得します。
  4. UNION query-based: UNION ALL SELECTを含むSQLステートメントを挿入し、SELECTステートメントの出力を取得します。
  5. Stacked queries: スタックされたクエリをサポートする場合、セミコロンに続いて実行するSQLステートメントを挿入します。データ定義やデータ操作ステートメントを実行し、ファイルシステムへのアクセスやOSコマンドの実行などを可能にします。

スクリーンショットのような形でtechniqueごとに実行クエリが出力されます。具体的な値は実際に見て確かめるのが良いでしょう。*2

触ってみる

https://zenn.dev/shimakaze_soft/articles/0cfab83adf0f71 がものすごく参考になります。

プロジェクト構造

❯ tree -L 2 -d .
.
├── data
│   ├── html
│   ├── procs
│   ├── shell
│   ├── txt
│   ├── udf
│   └── xml
├── doc
│   └── translations
├── extra
│   ├── beep
│   ├── cloak
│   ├── dbgtool
│   ├── icmpsh
│   ├── runcmd
│   ├── shellcodeexec
│   ├── shutils
│   └── vulnserver
├── lib
│   ├── controller
│   ├── core
│   ├── parse
│   ├── request
│   ├── takeover
│   ├── techniques
│   └── utils
├── plugins
│   ├── dbms
│   └── generic
├── tamper
└── thirdparty
    ├── ansistrm
    ├── beautifulsoup
    ├── bottle
    ├── chardet
    ├── clientform
    ├── colorama
    ├── fcrypt
    ├── identywaf
    ├── keepalive
    ├── magic
    ├── multipart
    ├── odict
    ├── prettyprint
    ├── pydes
    ├── six
    ├── socks
    ├── termcolor
    └── wininetpton

pyproject.tomlもなければrequirements.txtもありません。コアな機能は全て標準ライブラリで構築されています。さすが10年選手です。
コード検索するとmymssqlのように特定のDBに関する処理で外部ライブラリを用いてる場合があります。マイクロカーネルアーキテクチャ(pluginシステム)によってDBMSの接続を抽象化しているため、本当に必要になった時のみpip installを行う形式です。

https://github.com/sqlmapproject/sqlmap/blob/master/plugins/dbms/sybase/connector.py#L8-L12

アーキテクチャ

先ほど述べたようにSQLはDBごとに微妙に異なるので様々なDBに対応する必要が出てきます。DBMSの抽象化層はgenericにあります。

https://github.com/sqlmapproject/sqlmap/tree/master/plugins/generic

❯ tree -L 2 plugins/generic   
plugins/generic
├── __init__.py
├── connector.py
├── custom.py
├── databases.py
├── entries.py
├── enumeration.py
├── filesystem.py
├── fingerprint.py
├── misc.py
├── search.py
├── syntax.py
├── takeover.py
└── users.py

ステート管理

基本実装はlib配下で行われます。

データはdata.pyで定義した変数で保持され、kb.xxxもしくはBackend.xxxのような形でステートを持ってくる方式を取っています。
https://github.com/sqlmapproject/sqlmap/blob/master/lib/core/data.py

スコープが短いものはkbから直接持ってきて、様々なところで利用するものはBackend Classのようなある程度構造化された形から持ってくるといった使い分けをしていそうです。

処理の流れ

まずはdetectの部分です。
ユーザー入力によって渡されたURLはcontroler.py@start関数でスキャンします。

controler.py@start
@stackedmethod
def start():
    """
    URLの安定性およびGET、POST、Cookie、User-Agentパラメーターすべてに対してチェックを行う関数を呼び出す
    それらが動的でSQLインジェクションの影響を受けているかどうかを確認する
    """
    targetCount = 0
    initialHeaders = list(conf.httpHeaders)

    # kb.targetsは事前にparseされたターゲットのリストが入っている
    for targetUrl, targetMethod, targetData, targetCookie, targetHeaders in kb.targets:
        targetCount += 1

        try:
            conf.url = targetUrl
            conf.method = targetMethod.upper().strip() if targetMethod else targetMethod
            conf.data = targetData
            conf.cookie = targetCookie
            conf.httpHeaders = list(initialHeaders)
            conf.httpHeaders.extend(targetHeaders or [])

            # コンフィグの設定を元にUAを変更する
            if conf.randomAgent or conf.mobile:
                for header, value in initialHeaders:
                    if header.upper() == HTTP_HEADER.USER_AGENT.upper():
                        conf.httpHeaders.append((header, value))
                        break

            # ヘッダーの重複削除
            conf.httpHeaders = [conf.httpHeaders[i] for i in xrange(len(conf.httpHeaders)) if conf.httpHeaders[i][0].upper() not in (__[0].upper() for __ in conf.httpHeaders[i + 1:])]

            # TODO
            initTargetEnv()
            # TODO
            parseTargetUrl()

            if conf.multipleTargets:
                if conf.forms and conf.method:
                    message = "[%d/%s] Form:\n%s %s" % (targetCount, len(kb.targets) if isListLike(kb.targets) else '?', conf.method, targetUrl)
                else:
                    message = "[%d/%s] URL:\n%s %s" % (targetCount, len(kb.targets) if isListLike(kb.targets) else '?', HTTPMETHOD.GET, targetUrl)

                if conf.cookie:
                    message += "\nCookie: %s" % conf.cookie

                if conf.data is not None:
                    message += "\n%s data: %s" % ((conf.method if conf.method != HTTPMETHOD.GET else None) or HTTPMETHOD.POST, urlencode(conf.data or "") if re.search(r"\A\s*[<{]", conf.data or "") is None else conf.data)

                # フォームに対するテスト
                if conf.forms and conf.method:
                    if conf.method == HTTPMETHOD.GET and targetUrl.find("?") == -1:
                        continue
                    message += "\ndo you want to test this form? [Y/n/q] "
                    choice = readInput(message, default='Y').upper()
                    if choice == 'N':
                        continue
                    elif choice == 'Q':
                        break
                    else:
                        if conf.method != HTTPMETHOD.GET:
                            message = "Edit %s data [default: %s]%s: " % (conf.method, urlencode(conf.data or "") if re.search(r"\A\s*[<{]", conf.data or "None") is None else conf.data, " (Warning: blank fields detected)" if conf.data and extractRegexResult(EMPTY_FORM_FIELDS_REGEX, conf.data) else "")
                            conf.data = readInput(message, default=conf.data)
                            conf.data = _randomFillBlankFields(conf.data)
                            conf.data = urldecode(conf.data) if conf.data and urlencode(DEFAULT_GET_POST_DELIMITER, None) not in conf.data else conf.data
                        else:
                            if '?' in targetUrl:
                                firstPart, secondPart = targetUrl.split('?', 1)
                                message = "Edit GET data [default: %s]: " % secondPart
                                test = readInput(message, default=secondPart)
                                test = _randomFillBlankFields(test)
                                conf.url = "%s?%s" % (firstPart, test)
                        
                        # TODO
                        parseTargetUrl()

                else:
                    # スコープに含まれるURLかどうか
                    if not conf.scope:
                        message += "\ndo you want to test this URL? [Y/n/q]"
                        choice = readInput(message, default='Y').upper()

                        if choice == 'N':
                            dataToStdout(os.linesep)
                            continue
                        elif choice == 'Q':
                            break
                    else:
                        pass


            # TODO
            setupTargetEnv()

            if conf.rParam and kb.originalPage:
                kb.randomPool = dict([_ for _ in kb.randomPool.items() if isinstance(_[1], list)])

                for match in re.finditer(r"(?si)<select[^>]+\bname\s*=\s*[\"']([^\"']+)(.+?)</select>", kb.originalPage):
                    name, _ = match.groups()
                    options = tuple(re.findall(r"<option[^>]+\bvalue\s*=\s*[\"']([^\"']+)", _))
                    if options:
                        kb.randomPool[name] = options

            # TODO
            checkWaf()

            parameters = list(conf.parameters.keys())
            orderList = (PLACE.CUSTOM_POST, PLACE.CUSTOM_HEADER, PLACE.URI, PLACE.POST, PLACE.GET)
            for place in orderList[::-1]:
                if place in parameters:
                    parameters.remove(place)
                    parameters.insert(0, place)

            proceed = True
            for place in parameters:
                # conf.levelを元にテストするパターンを絞り込む
                # Test User-Agent and Referer headers only if
                # --level >= 3
                skip = (place == PLACE.USER_AGENT and (kb.testOnlyCustom or conf.level < 3))
                skip |= (place == PLACE.REFERER and (kb.testOnlyCustom or conf.level < 3))

                # --param-filter
                skip |= (len(conf.paramFilter) > 0 and place.upper() not in conf.paramFilter)

                # Test Host header only if
                # --level >= 5
                skip |= (place == PLACE.HOST and (kb.testOnlyCustom or conf.level < 5))

                # Test Cookie header only if --level >= 2
                skip |= (place == PLACE.COOKIE and (kb.testOnlyCustom or conf.level < 2))
                skip |= (place == PLACE.USER_AGENT and intersect(USER_AGENT_ALIASES, conf.skip, True) not in ([], None))
                skip |= (place == PLACE.REFERER and intersect(REFERER_ALIASES, conf.skip, True) not in ([], None))
                skip |= (place == PLACE.COOKIE and intersect(PLACE.COOKIE, conf.skip, True) not in ([], None))
                skip |= (place == PLACE.HOST and intersect(PLACE.HOST, conf.skip, True) not in ([], None))

                skip &= not (place == PLACE.USER_AGENT and intersect(USER_AGENT_ALIASES, conf.testParameter, True))
                skip &= not (place == PLACE.REFERER and intersect(REFERER_ALIASES, conf.testParameter, True))
                skip &= not (place == PLACE.HOST and intersect(HOST_ALIASES, conf.testParameter, True))
                skip &= not (place == PLACE.COOKIE and intersect((PLACE.COOKIE,), conf.testParameter, True))

                if skip:
                    continue

                if place not in conf.paramDict or place not in conf.parameters:
                    continue

                paramDict = conf.paramDict[place]
                paramType = conf.method if conf.method not in (None, HTTPMETHOD.GET, HTTPMETHOD.POST) else place

                # paramDictにはスキャン対象のパラメータが入っている
                for parameter, value in paramDict.items():
                    if not proceed:
                        break

                    kb.vainRun = False
                    testSqlInj = True
                    paramKey = (conf.hostname, conf.path, place, parameter)

                    if kb.processUserMarks:
                        if testSqlInj and place not in (PLACE.CUSTOM_POST, PLACE.CUSTOM_HEADER, PLACE.URI):
                            if kb.processNonCustom is None:
                                message = "other non-custom parameters found. "
                                message += "Do you want to process them too? [Y/n/q] "
                                choice = readInput(message, default='Y').upper()

                                if choice == 'Q':
                                    raise SqlmapUserQuitException
                                else:
                                    kb.processNonCustom = choice == 'Y'
                            if not kb.processNonCustom:
                                continue

                    if paramKey in kb.testedParams:
                        testSqlInj = False


                    elif any(_ in conf.testParameter for _ in (parameter, removePostHintPrefix(parameter))):
                        pass

                    elif parameter in conf.rParam:
                        testSqlInj = False


                    elif parameter in conf.skip or kb.postHint and parameter.split(' ')[-1] in conf.skip:
                        testSqlInj = False


                    elif conf.paramExclude and (re.search(conf.paramExclude, parameter, re.I) or kb.postHint and re.search(conf.paramExclude, parameter.split(' ')[-1], re.I) or re.search(conf.paramExclude, place, re.I)):
                        testSqlInj = False


                    elif conf.csrfToken and re.search(conf.csrfToken, parameter, re.I):
                        testSqlInj = False


                    # Ignore session-like parameters for --level < 4
                    elif conf.level < 4 and (parameter.upper() in IGNORE_PARAMETERS or any(_ in parameter.lower() for _ in CSRF_TOKEN_PARAMETER_INFIXES) or parameter.upper().startswith(GOOGLE_ANALYTICS_COOKIE_PREFIX)):
                        testSqlInj = False


                    elif PAYLOAD.TECHNIQUE.BOOLEAN in conf.technique or conf.skipStatic:
                        check = checkDynParam(place, parameter, value)
                        if not check:
                            if conf.skipStatic:
                                testSqlInj = False

                    kb.testedParams.add(paramKey)

                    if testSqlInj:
                        try:
                            if place == PLACE.COOKIE:
                                pushValue(kb.mergeCookies)
                                kb.mergeCookies = False

                            # チェック1: TODO
                            check = heuristicCheckSqlInjection(place, parameter)
                            if check != HEURISTIC_TEST.POSITIVE:
                                if conf.smart or (kb.ignoreCasted and check == HEURISTIC_TEST.CASTED):
                                    continue


                            # チェック2: TODO
                            injection = checkSqlInjection(place, parameter, value)
                            proceed = not kb.endDetection
                            injectable = False
                            if getattr(injection, "place", None) is not None:
                                if NOTE.FALSE_POSITIVE_OR_UNEXPLOITABLE in injection.notes:
                                    kb.falsePositives.append(injection)
                                else:
                                    injectable = True
                                    kb.injections.append(injection)

                                    # In case when user wants to end detection phase (Ctrl+C)
                                    if not proceed:
                                        break
                        finally:
                            if place == PLACE.COOKIE:
                                kb.mergeCookies = popValue()


            if kb.injection.place is not None and kb.injection.parameter is not None:
                if conf.multipleTargets:
                    message = "do you want to exploit this SQL injection? [Y/n] "
                    condition = readInput(message, default='Y', boolean=True)
                else:
                    condition = True
                if condition:
                    # Exploit
                    action()
        
        # エラー処理
        # ctrl+cは連続で押された時に強制終了
        # 1回ならば次のターゲットに進むかどうかを確認する
        except KeyboardInterrupt:
            if kb.lastCtrlCTime and (time.time() - kb.lastCtrlCTime < 1):
                kb.multipleCtrlC = True
                raise SqlmapUserQuitException("user aborted (Ctrl+C was pressed multiple times)")
            kb.lastCtrlCTime = time.time()
            if conf.multipleTargets:
                message = "do you want to skip to the next target in list? [Y/n/q]"
                choice = readInput(message, default='Y').upper()

                if choice == 'N':
                    return False
                elif choice == 'Q':
                    raise SqlmapUserQuitException
            else:
                raise
        # Skipする場合はpass
        except SqlmapSkipTargetException:
            pass
        # それ以外は基本raiseして全ターゲットに対するスキャンを終了する
        except SqlmapUserQuitException:
            raise
        except SqlmapSilentQuitException:
            raise
        except SqlmapBaseException as ex:
            if conf.multipleTargets:
                _saveToResultsFile()
            else:
                return False

        finally:
            showHttpErrorCodes()

    return True

次にexploitの部分です。
action.py@action関数でexploitを実行します。
以下のような機能が備わっています。

  • directly connect to the database: DBMSのipやテーブル名などの情報から直接接続を試します。
  • Fingerprint and enumeration
    • error messages, banner parsing, functions output comparison, specific featuresといった情報からバックエンドの技術スタックとバージョンを推定する
    • DBMSのセッションを取得するための情報を収集して管理者かどうかのチェック
    • その他enumaration(users, password hashes, privileges, roles, databases, tables and columns)
action.py@action
def action():
    """
    影響を受けたURLパラメーターに対するSQLインジェクションを悪用する関数です
    可能な範囲でバックエンドのデータベース管理システムまたはオペレーティングシステムから要求されたデータを抽出します(Enumeration)
    """

    # First of all we have to identify the back-end database management
    # system to be able to go ahead with the injection
    setHandler()

    conf.dumper.singleString(conf.dbmsHandler.getFingerprint())
    kb.fingerprinted = True

    # Enumeration options
    # Optionで指定された様々な方法でenumerationを実施します
    if conf.getBanner:
        conf.dumper.banner(conf.dbmsHandler.getBanner())

    if conf.getCurrentUser:
        conf.dumper.currentUser(conf.dbmsHandler.getCurrentUser())

    if conf.getCurrentDb:
        conf.dumper.currentDb(conf.dbmsHandler.getCurrentDb())

    if conf.getHostname:
        conf.dumper.hostname(conf.dbmsHandler.getHostname())

    if conf.isDba:
        conf.dumper.dba(conf.dbmsHandler.isDba())

    if conf.getUsers:
        conf.dumper.users(conf.dbmsHandler.getUsers())

    if conf.getStatements:
        conf.dumper.statements(conf.dbmsHandler.getStatements())

    if conf.getPasswordHashes:
        try:
            conf.dumper.userSettings("database management system users password hashes", conf.dbmsHandler.getPasswordHashes(), "password hash", CONTENT_TYPE.PASSWORDS)
        except SqlmapNoneDataException as ex:
            logger.critical(ex)
        except:
            raise

    if conf.getPrivileges:
        try:
            conf.dumper.userSettings("database management system users privileges", conf.dbmsHandler.getPrivileges(), "privilege", CONTENT_TYPE.PRIVILEGES)
        except SqlmapNoneDataException as ex:
            logger.critical(ex)
        except:
            raise

    if conf.getRoles:
        try:
            conf.dumper.userSettings("database management system users roles", conf.dbmsHandler.getRoles(), "role", CONTENT_TYPE.ROLES)
        except SqlmapNoneDataException as ex:
            logger.critical(ex)
        except:
            raise

    if conf.getDbs:
        try:
            conf.dumper.dbs(conf.dbmsHandler.getDbs())
        except SqlmapNoneDataException as ex:
            logger.critical(ex)
        except:
            raise

    if conf.getTables:
        try:
            conf.dumper.dbTables(conf.dbmsHandler.getTables())
        except SqlmapNoneDataException as ex:
            logger.critical(ex)
        except:
            raise

    if conf.commonTables:
        try:
            conf.dumper.dbTables(tableExists(paths.COMMON_TABLES))
        except SqlmapNoneDataException as ex:
            logger.critical(ex)
        except:
            raise

    if conf.getSchema:
        try:
            conf.dumper.dbTableColumns(conf.dbmsHandler.getSchema(), CONTENT_TYPE.SCHEMA)
        except SqlmapNoneDataException as ex:
            logger.critical(ex)
        except:
            raise

    if conf.getColumns:
        try:
            conf.dumper.dbTableColumns(conf.dbmsHandler.getColumns(), CONTENT_TYPE.COLUMNS)
        except SqlmapNoneDataException as ex:
            logger.critical(ex)
        except:
            raise

    if conf.getCount:
        try:
            conf.dumper.dbTablesCount(conf.dbmsHandler.getCount())
        except SqlmapNoneDataException as ex:
            logger.critical(ex)
        except:
            raise

    if conf.commonColumns:
        try:
            conf.dumper.dbTableColumns(columnExists(paths.COMMON_COLUMNS))
        except SqlmapNoneDataException as ex:
            logger.critical(ex)
        except:
            raise

    if conf.dumpTable:
        try:
            conf.dbmsHandler.dumpTable()
        except SqlmapNoneDataException as ex:
            logger.critical(ex)
        except:
            raise

    if conf.dumpAll:
        try:
            conf.dbmsHandler.dumpAll()
        except SqlmapNoneDataException as ex:
            logger.critical(ex)
        except:
            raise

    if conf.search:
        try:
            conf.dbmsHandler.search()
        except SqlmapNoneDataException as ex:
            logger.critical(ex)
        except:
            raise

    if conf.sqlQuery:
        for query in conf.sqlQuery.strip(';').split(';'):
            query = query.strip()
            if query:
                conf.dumper.sqlQuery(query, conf.dbmsHandler.sqlQuery(query))

    if conf.sqlShell:
        conf.dbmsHandler.sqlShell()

    if conf.sqlFile:
        conf.dbmsHandler.sqlFile()

    # User-defined function options
    if conf.udfInject:
        conf.dbmsHandler.udfInjectCustom()

    # File system options
    if conf.fileRead:
        conf.dumper.rFile(conf.dbmsHandler.readFile(conf.fileRead))

    if conf.fileWrite:
        conf.dbmsHandler.writeFile(conf.fileWrite, conf.fileDest, conf.fileWriteType)

    if conf.commonFiles:
        try:
            conf.dumper.rFile(fileExists(paths.COMMON_FILES))
        except SqlmapNoneDataException as ex:
            logger.critical(ex)
        except:
            raise

    # Operating system options
    if conf.osCmd:
        conf.dbmsHandler.osCmd()

    if conf.osShell:
        conf.dbmsHandler.osShell()

    if conf.osPwn:
        conf.dbmsHandler.osPwn()

    if conf.osSmb:
        conf.dbmsHandler.osSmb()

    if conf.osBof:
        conf.dbmsHandler.osBof()

    # Windows registry options
    if conf.regRead:
        conf.dumper.registerValue(conf.dbmsHandler.regRead())

    if conf.regAdd:
        conf.dbmsHandler.regAdd()

    if conf.regDel:
        conf.dbmsHandler.regDel()

    # Miscellaneous options
    if conf.cleanup:
        conf.dbmsHandler.cleanup()

    if conf.direct:
        conf.dbmsConnector.close()

まとめ

かなりdeadcodeが多く読解するには複雑な状態になってますが、コアのシンプルさを保ったまま変化の激しい部分をうまく抽象化できていることが長く開発されている所以だと感じました。
最近のPythonはパッケージ周りで特に負債が出来まくっているので参考にしてGoにリプレイスしていきたいですね。

備考

*1 zaproxy, nucleiも開発が活発なスキャナの一つ
*2 active scanなのでクラウドのサービスに対して実行する場合は動作を理解した上で実行すること。

Discussion