[SQLMap] 長く利用されるPython製アプリケーションをコードリーディングする
SQLMapについて
OSS InsightのWeb Scanner部門で常に人気があるリポジトリの一つがSQLMapです。*1
2006年から2024年現在も活発に開発が行われている珍しいPython アプリケーションです。触ってみながらコードリーディングしたいと思います。
SQLMapはSQL Injectionに特化したツールで、5つのdetect / exploitを機械的に実行します。まずはその5つについて理解しましょう。
SQLi techniques
- Boolean-based blind: ブラインドSQLインジェクションの一種で、HTTPリクエスト内の影響を受けるパラメータに、真偽を判断するSQLステートメントを挿入します。出力を文字単位で推測します。
- Time-based blind: ブラインドSQLインジェクションの一種で、バックエンドDBMSを一定時間待機させるSQLステートメントを挿入します。出力を文字単位で推測します。
- Error-based: データベース固有のエラーメッセージを引き起こすステートメントを挿入し、DBMSエラーメッセージを解析して情報を取得します。
- UNION query-based: UNION ALL SELECTを含むSQLステートメントを挿入し、SELECTステートメントの出力を取得します。
- Stacked queries: スタックされたクエリをサポートする場合、セミコロンに続いて実行するSQLステートメントを挿入します。データ定義やデータ操作ステートメントを実行し、ファイルシステムへのアクセスやOSコマンドの実行などを可能にします。
スクリーンショットのような形でtechniqueごとに実行クエリが出力されます。具体的な値は実際に見て確かめるのが良いでしょう。*2
触ってみる
https://zenn.dev/shimakaze_soft/articles/0cfab83adf0f71 がものすごく参考になります。
プロジェクト構造
❯ tree -L 2 -d .
.
├── data
│ ├── html
│ ├── procs
│ ├── shell
│ ├── txt
│ ├── udf
│ └── xml
├── doc
│ └── translations
├── extra
│ ├── beep
│ ├── cloak
│ ├── dbgtool
│ ├── icmpsh
│ ├── runcmd
│ ├── shellcodeexec
│ ├── shutils
│ └── vulnserver
├── lib
│ ├── controller
│ ├── core
│ ├── parse
│ ├── request
│ ├── takeover
│ ├── techniques
│ └── utils
├── plugins
│ ├── dbms
│ └── generic
├── tamper
└── thirdparty
├── ansistrm
├── beautifulsoup
├── bottle
├── chardet
├── clientform
├── colorama
├── fcrypt
├── identywaf
├── keepalive
├── magic
├── multipart
├── odict
├── prettyprint
├── pydes
├── six
├── socks
├── termcolor
└── wininetpton
pyproject.tomlもなければrequirements.txtもありません。コアな機能は全て標準ライブラリで構築されています。さすが10年選手です。
コード検索するとmymssqlのように特定のDBに関する処理で外部ライブラリを用いてる場合があります。マイクロカーネルアーキテクチャ(pluginシステム)によってDBMSの接続を抽象化しているため、本当に必要になった時のみpip installを行う形式です。
アーキテクチャ
先ほど述べたようにSQLはDBごとに微妙に異なるので様々なDBに対応する必要が出てきます。DBMSの抽象化層はgenericにあります。
❯ tree -L 2 plugins/generic
plugins/generic
├── __init__.py
├── connector.py
├── custom.py
├── databases.py
├── entries.py
├── enumeration.py
├── filesystem.py
├── fingerprint.py
├── misc.py
├── search.py
├── syntax.py
├── takeover.py
└── users.py
ステート管理
基本実装はlib配下で行われます。
データはdata.pyで定義した変数で保持され、kb.xxxもしくはBackend.xxxのような形でステートを持ってくる方式を取っています。
スコープが短いものはkbから直接持ってきて、様々なところで利用するものはBackend Classのようなある程度構造化された形から持ってくるといった使い分けをしていそうです。
処理の流れ
まずはdetectの部分です。
ユーザー入力によって渡されたURLはcontroler.py@start関数でスキャンします。
controler.py@start
@stackedmethod
def start():
"""
URLの安定性およびGET、POST、Cookie、User-Agentパラメーターすべてに対してチェックを行う関数を呼び出す
それらが動的でSQLインジェクションの影響を受けているかどうかを確認する
"""
targetCount = 0
initialHeaders = list(conf.httpHeaders)
# kb.targetsは事前にparseされたターゲットのリストが入っている
for targetUrl, targetMethod, targetData, targetCookie, targetHeaders in kb.targets:
targetCount += 1
try:
conf.url = targetUrl
conf.method = targetMethod.upper().strip() if targetMethod else targetMethod
conf.data = targetData
conf.cookie = targetCookie
conf.httpHeaders = list(initialHeaders)
conf.httpHeaders.extend(targetHeaders or [])
# コンフィグの設定を元にUAを変更する
if conf.randomAgent or conf.mobile:
for header, value in initialHeaders:
if header.upper() == HTTP_HEADER.USER_AGENT.upper():
conf.httpHeaders.append((header, value))
break
# ヘッダーの重複削除
conf.httpHeaders = [conf.httpHeaders[i] for i in xrange(len(conf.httpHeaders)) if conf.httpHeaders[i][0].upper() not in (__[0].upper() for __ in conf.httpHeaders[i + 1:])]
# TODO
initTargetEnv()
# TODO
parseTargetUrl()
if conf.multipleTargets:
if conf.forms and conf.method:
message = "[%d/%s] Form:\n%s %s" % (targetCount, len(kb.targets) if isListLike(kb.targets) else '?', conf.method, targetUrl)
else:
message = "[%d/%s] URL:\n%s %s" % (targetCount, len(kb.targets) if isListLike(kb.targets) else '?', HTTPMETHOD.GET, targetUrl)
if conf.cookie:
message += "\nCookie: %s" % conf.cookie
if conf.data is not None:
message += "\n%s data: %s" % ((conf.method if conf.method != HTTPMETHOD.GET else None) or HTTPMETHOD.POST, urlencode(conf.data or "") if re.search(r"\A\s*[<{]", conf.data or "") is None else conf.data)
# フォームに対するテスト
if conf.forms and conf.method:
if conf.method == HTTPMETHOD.GET and targetUrl.find("?") == -1:
continue
message += "\ndo you want to test this form? [Y/n/q] "
choice = readInput(message, default='Y').upper()
if choice == 'N':
continue
elif choice == 'Q':
break
else:
if conf.method != HTTPMETHOD.GET:
message = "Edit %s data [default: %s]%s: " % (conf.method, urlencode(conf.data or "") if re.search(r"\A\s*[<{]", conf.data or "None") is None else conf.data, " (Warning: blank fields detected)" if conf.data and extractRegexResult(EMPTY_FORM_FIELDS_REGEX, conf.data) else "")
conf.data = readInput(message, default=conf.data)
conf.data = _randomFillBlankFields(conf.data)
conf.data = urldecode(conf.data) if conf.data and urlencode(DEFAULT_GET_POST_DELIMITER, None) not in conf.data else conf.data
else:
if '?' in targetUrl:
firstPart, secondPart = targetUrl.split('?', 1)
message = "Edit GET data [default: %s]: " % secondPart
test = readInput(message, default=secondPart)
test = _randomFillBlankFields(test)
conf.url = "%s?%s" % (firstPart, test)
# TODO
parseTargetUrl()
else:
# スコープに含まれるURLかどうか
if not conf.scope:
message += "\ndo you want to test this URL? [Y/n/q]"
choice = readInput(message, default='Y').upper()
if choice == 'N':
dataToStdout(os.linesep)
continue
elif choice == 'Q':
break
else:
pass
# TODO
setupTargetEnv()
if conf.rParam and kb.originalPage:
kb.randomPool = dict([_ for _ in kb.randomPool.items() if isinstance(_[1], list)])
for match in re.finditer(r"(?si)<select[^>]+\bname\s*=\s*[\"']([^\"']+)(.+?)</select>", kb.originalPage):
name, _ = match.groups()
options = tuple(re.findall(r"<option[^>]+\bvalue\s*=\s*[\"']([^\"']+)", _))
if options:
kb.randomPool[name] = options
# TODO
checkWaf()
parameters = list(conf.parameters.keys())
orderList = (PLACE.CUSTOM_POST, PLACE.CUSTOM_HEADER, PLACE.URI, PLACE.POST, PLACE.GET)
for place in orderList[::-1]:
if place in parameters:
parameters.remove(place)
parameters.insert(0, place)
proceed = True
for place in parameters:
# conf.levelを元にテストするパターンを絞り込む
# Test User-Agent and Referer headers only if
# --level >= 3
skip = (place == PLACE.USER_AGENT and (kb.testOnlyCustom or conf.level < 3))
skip |= (place == PLACE.REFERER and (kb.testOnlyCustom or conf.level < 3))
# --param-filter
skip |= (len(conf.paramFilter) > 0 and place.upper() not in conf.paramFilter)
# Test Host header only if
# --level >= 5
skip |= (place == PLACE.HOST and (kb.testOnlyCustom or conf.level < 5))
# Test Cookie header only if --level >= 2
skip |= (place == PLACE.COOKIE and (kb.testOnlyCustom or conf.level < 2))
skip |= (place == PLACE.USER_AGENT and intersect(USER_AGENT_ALIASES, conf.skip, True) not in ([], None))
skip |= (place == PLACE.REFERER and intersect(REFERER_ALIASES, conf.skip, True) not in ([], None))
skip |= (place == PLACE.COOKIE and intersect(PLACE.COOKIE, conf.skip, True) not in ([], None))
skip |= (place == PLACE.HOST and intersect(PLACE.HOST, conf.skip, True) not in ([], None))
skip &= not (place == PLACE.USER_AGENT and intersect(USER_AGENT_ALIASES, conf.testParameter, True))
skip &= not (place == PLACE.REFERER and intersect(REFERER_ALIASES, conf.testParameter, True))
skip &= not (place == PLACE.HOST and intersect(HOST_ALIASES, conf.testParameter, True))
skip &= not (place == PLACE.COOKIE and intersect((PLACE.COOKIE,), conf.testParameter, True))
if skip:
continue
if place not in conf.paramDict or place not in conf.parameters:
continue
paramDict = conf.paramDict[place]
paramType = conf.method if conf.method not in (None, HTTPMETHOD.GET, HTTPMETHOD.POST) else place
# paramDictにはスキャン対象のパラメータが入っている
for parameter, value in paramDict.items():
if not proceed:
break
kb.vainRun = False
testSqlInj = True
paramKey = (conf.hostname, conf.path, place, parameter)
if kb.processUserMarks:
if testSqlInj and place not in (PLACE.CUSTOM_POST, PLACE.CUSTOM_HEADER, PLACE.URI):
if kb.processNonCustom is None:
message = "other non-custom parameters found. "
message += "Do you want to process them too? [Y/n/q] "
choice = readInput(message, default='Y').upper()
if choice == 'Q':
raise SqlmapUserQuitException
else:
kb.processNonCustom = choice == 'Y'
if not kb.processNonCustom:
continue
if paramKey in kb.testedParams:
testSqlInj = False
elif any(_ in conf.testParameter for _ in (parameter, removePostHintPrefix(parameter))):
pass
elif parameter in conf.rParam:
testSqlInj = False
elif parameter in conf.skip or kb.postHint and parameter.split(' ')[-1] in conf.skip:
testSqlInj = False
elif conf.paramExclude and (re.search(conf.paramExclude, parameter, re.I) or kb.postHint and re.search(conf.paramExclude, parameter.split(' ')[-1], re.I) or re.search(conf.paramExclude, place, re.I)):
testSqlInj = False
elif conf.csrfToken and re.search(conf.csrfToken, parameter, re.I):
testSqlInj = False
# Ignore session-like parameters for --level < 4
elif conf.level < 4 and (parameter.upper() in IGNORE_PARAMETERS or any(_ in parameter.lower() for _ in CSRF_TOKEN_PARAMETER_INFIXES) or parameter.upper().startswith(GOOGLE_ANALYTICS_COOKIE_PREFIX)):
testSqlInj = False
elif PAYLOAD.TECHNIQUE.BOOLEAN in conf.technique or conf.skipStatic:
check = checkDynParam(place, parameter, value)
if not check:
if conf.skipStatic:
testSqlInj = False
kb.testedParams.add(paramKey)
if testSqlInj:
try:
if place == PLACE.COOKIE:
pushValue(kb.mergeCookies)
kb.mergeCookies = False
# チェック1: TODO
check = heuristicCheckSqlInjection(place, parameter)
if check != HEURISTIC_TEST.POSITIVE:
if conf.smart or (kb.ignoreCasted and check == HEURISTIC_TEST.CASTED):
continue
# チェック2: TODO
injection = checkSqlInjection(place, parameter, value)
proceed = not kb.endDetection
injectable = False
if getattr(injection, "place", None) is not None:
if NOTE.FALSE_POSITIVE_OR_UNEXPLOITABLE in injection.notes:
kb.falsePositives.append(injection)
else:
injectable = True
kb.injections.append(injection)
# In case when user wants to end detection phase (Ctrl+C)
if not proceed:
break
finally:
if place == PLACE.COOKIE:
kb.mergeCookies = popValue()
if kb.injection.place is not None and kb.injection.parameter is not None:
if conf.multipleTargets:
message = "do you want to exploit this SQL injection? [Y/n] "
condition = readInput(message, default='Y', boolean=True)
else:
condition = True
if condition:
# Exploit
action()
# エラー処理
# ctrl+cは連続で押された時に強制終了
# 1回ならば次のターゲットに進むかどうかを確認する
except KeyboardInterrupt:
if kb.lastCtrlCTime and (time.time() - kb.lastCtrlCTime < 1):
kb.multipleCtrlC = True
raise SqlmapUserQuitException("user aborted (Ctrl+C was pressed multiple times)")
kb.lastCtrlCTime = time.time()
if conf.multipleTargets:
message = "do you want to skip to the next target in list? [Y/n/q]"
choice = readInput(message, default='Y').upper()
if choice == 'N':
return False
elif choice == 'Q':
raise SqlmapUserQuitException
else:
raise
# Skipする場合はpass
except SqlmapSkipTargetException:
pass
# それ以外は基本raiseして全ターゲットに対するスキャンを終了する
except SqlmapUserQuitException:
raise
except SqlmapSilentQuitException:
raise
except SqlmapBaseException as ex:
if conf.multipleTargets:
_saveToResultsFile()
else:
return False
finally:
showHttpErrorCodes()
return True
次にexploitの部分です。
action.py@action関数でexploitを実行します。
以下のような機能が備わっています。
- directly connect to the database: DBMSのipやテーブル名などの情報から直接接続を試します。
- Fingerprint and enumeration
- error messages, banner parsing, functions output comparison, specific featuresといった情報からバックエンドの技術スタックとバージョンを推定する
- DBMSのセッションを取得するための情報を収集して管理者かどうかのチェック
- その他enumaration(users, password hashes, privileges, roles, databases, tables and columns)
action.py@action
def action():
"""
影響を受けたURLパラメーターに対するSQLインジェクションを悪用する関数です
可能な範囲でバックエンドのデータベース管理システムまたはオペレーティングシステムから要求されたデータを抽出します(Enumeration)
"""
# First of all we have to identify the back-end database management
# system to be able to go ahead with the injection
setHandler()
conf.dumper.singleString(conf.dbmsHandler.getFingerprint())
kb.fingerprinted = True
# Enumeration options
# Optionで指定された様々な方法でenumerationを実施します
if conf.getBanner:
conf.dumper.banner(conf.dbmsHandler.getBanner())
if conf.getCurrentUser:
conf.dumper.currentUser(conf.dbmsHandler.getCurrentUser())
if conf.getCurrentDb:
conf.dumper.currentDb(conf.dbmsHandler.getCurrentDb())
if conf.getHostname:
conf.dumper.hostname(conf.dbmsHandler.getHostname())
if conf.isDba:
conf.dumper.dba(conf.dbmsHandler.isDba())
if conf.getUsers:
conf.dumper.users(conf.dbmsHandler.getUsers())
if conf.getStatements:
conf.dumper.statements(conf.dbmsHandler.getStatements())
if conf.getPasswordHashes:
try:
conf.dumper.userSettings("database management system users password hashes", conf.dbmsHandler.getPasswordHashes(), "password hash", CONTENT_TYPE.PASSWORDS)
except SqlmapNoneDataException as ex:
logger.critical(ex)
except:
raise
if conf.getPrivileges:
try:
conf.dumper.userSettings("database management system users privileges", conf.dbmsHandler.getPrivileges(), "privilege", CONTENT_TYPE.PRIVILEGES)
except SqlmapNoneDataException as ex:
logger.critical(ex)
except:
raise
if conf.getRoles:
try:
conf.dumper.userSettings("database management system users roles", conf.dbmsHandler.getRoles(), "role", CONTENT_TYPE.ROLES)
except SqlmapNoneDataException as ex:
logger.critical(ex)
except:
raise
if conf.getDbs:
try:
conf.dumper.dbs(conf.dbmsHandler.getDbs())
except SqlmapNoneDataException as ex:
logger.critical(ex)
except:
raise
if conf.getTables:
try:
conf.dumper.dbTables(conf.dbmsHandler.getTables())
except SqlmapNoneDataException as ex:
logger.critical(ex)
except:
raise
if conf.commonTables:
try:
conf.dumper.dbTables(tableExists(paths.COMMON_TABLES))
except SqlmapNoneDataException as ex:
logger.critical(ex)
except:
raise
if conf.getSchema:
try:
conf.dumper.dbTableColumns(conf.dbmsHandler.getSchema(), CONTENT_TYPE.SCHEMA)
except SqlmapNoneDataException as ex:
logger.critical(ex)
except:
raise
if conf.getColumns:
try:
conf.dumper.dbTableColumns(conf.dbmsHandler.getColumns(), CONTENT_TYPE.COLUMNS)
except SqlmapNoneDataException as ex:
logger.critical(ex)
except:
raise
if conf.getCount:
try:
conf.dumper.dbTablesCount(conf.dbmsHandler.getCount())
except SqlmapNoneDataException as ex:
logger.critical(ex)
except:
raise
if conf.commonColumns:
try:
conf.dumper.dbTableColumns(columnExists(paths.COMMON_COLUMNS))
except SqlmapNoneDataException as ex:
logger.critical(ex)
except:
raise
if conf.dumpTable:
try:
conf.dbmsHandler.dumpTable()
except SqlmapNoneDataException as ex:
logger.critical(ex)
except:
raise
if conf.dumpAll:
try:
conf.dbmsHandler.dumpAll()
except SqlmapNoneDataException as ex:
logger.critical(ex)
except:
raise
if conf.search:
try:
conf.dbmsHandler.search()
except SqlmapNoneDataException as ex:
logger.critical(ex)
except:
raise
if conf.sqlQuery:
for query in conf.sqlQuery.strip(';').split(';'):
query = query.strip()
if query:
conf.dumper.sqlQuery(query, conf.dbmsHandler.sqlQuery(query))
if conf.sqlShell:
conf.dbmsHandler.sqlShell()
if conf.sqlFile:
conf.dbmsHandler.sqlFile()
# User-defined function options
if conf.udfInject:
conf.dbmsHandler.udfInjectCustom()
# File system options
if conf.fileRead:
conf.dumper.rFile(conf.dbmsHandler.readFile(conf.fileRead))
if conf.fileWrite:
conf.dbmsHandler.writeFile(conf.fileWrite, conf.fileDest, conf.fileWriteType)
if conf.commonFiles:
try:
conf.dumper.rFile(fileExists(paths.COMMON_FILES))
except SqlmapNoneDataException as ex:
logger.critical(ex)
except:
raise
# Operating system options
if conf.osCmd:
conf.dbmsHandler.osCmd()
if conf.osShell:
conf.dbmsHandler.osShell()
if conf.osPwn:
conf.dbmsHandler.osPwn()
if conf.osSmb:
conf.dbmsHandler.osSmb()
if conf.osBof:
conf.dbmsHandler.osBof()
# Windows registry options
if conf.regRead:
conf.dumper.registerValue(conf.dbmsHandler.regRead())
if conf.regAdd:
conf.dbmsHandler.regAdd()
if conf.regDel:
conf.dbmsHandler.regDel()
# Miscellaneous options
if conf.cleanup:
conf.dbmsHandler.cleanup()
if conf.direct:
conf.dbmsConnector.close()
まとめ
かなりdeadcodeが多く読解するには複雑な状態になってますが、コアのシンプルさを保ったまま変化の激しい部分をうまく抽象化できていることが長く開発されている所以だと感じました。
最近のPythonはパッケージ周りで特に負債が出来まくっているので参考にしてGoにリプレイスしていきたいですね。
備考
*1 zaproxy, nucleiも開発が活発なスキャナの一つ
*2 active scanなのでクラウドのサービスに対して実行する場合は動作を理解した上で実行すること。
Discussion