基于深度学习的Webshell检测研究
本文最后更新于 84 天前,其中的信息可能已经有所发展或是发生改变。

特征提取

获取php文件

def get_php_file(base_dir):
    file_list = []
    for path, dirs, files in os.walk(base_dir):
        for file in files:
            if file.endswith('.php'):
                filename = os.path.realpath(os.path.join(path, file))
                try:
                    f = open(filename, "r", encoding='unicode_escape').read()
                    lines = open(filename, "r", encoding='unicode_escape').readlines()
                    file_list.append(filename)
                except UnicodeDecodeError:
                    os.remove(filename)
                    print(filename)
                    pass
    return file_list

获取opcode

# 获得opcode
def get_file_opcode(filename):
    # VLD插件获得opcode
    cmd = 'php -dvld.active=1 -dvld.execute=0 ' + filename
    p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)

    opcodes = []
    pattern = r'([A-Z_]{2,})\s+'
    for line in p.stdout.readlines()[8:-3]:
        try:
            match = re.search(pattern, str(line, encoding='utf-8'))
        except UnicodeDecodeError:
            match = re.search(pattern, str(line))
        if match:
            opcodes.append(match.group(1))
    p.terminate()
    return opcodes

获取AST语法树

def get_file_ast(filename):
    cmd = 'php getParser.php ' + filename
    p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)

    asts = []
    # pattern = r'([A-Z_]{2,})\s+'
    for line in p.stdout.readlines()[6:]:
        asts.append(str(line, encoding='unicode_escape'))
    p.terminate()
    return asts

getParser.php

<?php
require_once "vendor/autoload.php";

use PhpParser\Error;
use PhpParser\NodeDumper;
use PhpParser\ParserFactory;
use PhpParser\Node;

class MyNodeVisitor extends PhpParser\NodeVisitorAbstract{
    public function leaveNode(Node $node) {
        echo $node->getType();
        echo "\n";
    }
}

if ($argc > 1) {
    $code = file_get_contents($argv[1]);

    $parser = (new ParserFactory)->create(ParserFactory::PREFER_PHP7);

    $traverser = new PhpParser\NodeTraverser();

    $traverser->addVisitor(new MyNodeVisitor());

    try {
        $ast = $parser->parse($code);
        $traverser->traverse($ast);
       // $dumper = new NodeDumper;
       // echo $dumper->dump($ast) . "\n";
    } catch (Error $error) {
        echo "Parse error: {$error->getMessage()}\n";
        return;
    }

}

获取全局变量

# 全局变量
def get_all_global_var(file_list):
    patterns = ['$_GET', '$_POST', '$_COOKIE', '$_REQUEST', '$_FILES', '$_SESSION']
    files_global_var = []
    for file in file_list:
        file_global_var = []
        try:
            txt = open(file, "r", encoding='unicode_escape').read()
            for pattern in patterns:
                index = len(re.findall(pattern, txt))
                file_global_var.append(index)
            files_global_var.append(file_global_var)
        except UnicodeDecodeError:
            print(file)
    return files_global_var

获取句法特征

# 句法特征
def get_all_syntax(file_list):
    patterns = ['if', 'elseif', 'else', 'case', 'for', 'while', 'foreach']
    files_syntax = []
    for file in file_list:
        file_syntax = []
        txt = open(file, "r", encoding='unicode_escape').read()
        lines = open(file, "r", encoding='unicode_escape').readlines()
        size = len(lines)
        if size != 0:
            for pattern in patterns:
                index = len(re.findall(pattern, txt))
                file_syntax.append(index / size)
            files_syntax.append(file_syntax)
        else:
            files_syntax.append([0, 0, 0, 0, 0, 0, 0])
    return files_syntax

获取敏感函数

# 敏感函数
def get_all_sensitive_function(file_list):
    patterns = [
        ['eval', 'assert', 'preg_replace', 'create_function', 'array_map', 'call_user_func', 'call_user_func_array', 'array_filter', 'usort' 'uasort',
         'system', 'exec', 'passthru', 'shell_exec', 'pcntl_exec', 'popen'],
        ['perl', 'python', 'gcc', 'chmod', 'nohup'],
        ['uname', 'sysctl', 'whoami', '$OSTYPE', 'curl', 'ping']
    ]
    files_sensitive_function = []
    for file in file_list:
        file_sensitive_function = []
        txt = open(file, "r", encoding='unicode_escape').read()
        for pattern in patterns:
            length = 0
            for i in pattern:
                index = len(re.findall(i, txt))
                length += index
            file_sensitive_function.append(length)
        files_sensitive_function.append(file_sensitive_function)
    return files_sensitive_function

获取统计学特征

def get_all_features(file_list):
    files_features = []
    for file in file_list:
        lines = open(file, "r", encoding='unicode_escape').readlines()
        if len(lines) == 0:
            files_features.append([0, 0, 0])
        else:
            # 行最长长度
            longest_line = max(len(line) for line in lines)
            # 平均行长度
            avg = sum(len(line) for line in lines) / len(lines)
            # 最长词长度
            longest_str = 0
            for line in lines:
                if len(line.split()):
                    long = max(line.split(), key=len)
                    if len(long) > longest_str:
                        longest_str = len(long)
            files_features.append([longest_line, avg, longest_str])
    return files_features

重合指数

# 重合指数
def get_index_of_coincidence(data):
    char_count = 0  # 保存在data中任意选择两个字符,这两个字符相同的情形的数量
    total_char_count = 0  # 保存在data所有字符的数量
    # 遍历单字节代表的 256 字符
    for x in range(256):
        char = chr(x)
        # 计算当前字符在data中的数量
        one_count = data.count(char)
        # 计算在data中任意选择两个字符,这两个字符都为当前字符的情形的数量,并累加
        char_count += one_count * (one_count - 1)
        # 计算当前字符在data中的数量,并累加
        total_char_count += one_count
    # 按照重合指数的计算方法进行计算
    ic = float(char_count) / (total_char_count * (total_char_count - 1))
    return ic

信息熵

# 信息熵
def get_information_entropy(data):
    entropy = 0  # 保存最终熵值
    stripped_data = data.replace(' ', '')  # 去掉文件内容中的空格
    # 遍历所有 ascii 256个字符
    for x in range(256):
        p_x = float(stripped_data.count(chr(x))) / len(stripped_data)  # 计算单个字符出现的概率
        if p_x > 0:
            entropy += - p_x * math.log(p_x, 2)  # 计算该字符的熵值并累加
    return entropy

文件压缩比

# 文件压缩比
def get_ratio(data):
    compressed = zlib.compress(data)
    ratio = float(len(data)) / float(len(compressed))
    return ratio

词向量提取

n-gram + TF-IDF

opcode_CoVec = CountVectorizer(ngram_range=(2, 4), decode_error="ignore",
                        max_features=opcode_max_features,
                        token_pattern=r'\b\w+\b',
                        min_df=1,
                        max_df=1.0)

# 生成词频矩阵 [i][j] -> k, j 词在 i 文本中的频率为 k
opcode_Covec_Mat = opcode_CoVec.fit_transform(new_opcode_list).toarray()

opcode_transformer = TfidfTransformer(smooth_idf=False)

opcode_Tfidf_Mat = opcode_transformer.fit_transform(opcode_Covec_Mat).toarray()

opcode_Tfidf_Mat = numpy.array(opcode_Tfidf_Mat)

Word2Vec

# word2vec 模型训练
print("[*] Training Word2Vec model...")
from gensim.models import Word2Vec
model = Word2Vec(x,min_count = 1,vector_size=10)
model.train(x, total_examples=model.corpus_count, epochs=20)
print("[+] Train success.")

机器学习

RandomForest

from sklearn.ensemble import RandomForestClassifier

def randomforest_train(x, y):
    rf_classifier = RandomForestClassifier(oob_score=True)

    x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.3, random_state=0)
    rf_model = rf_classifier.fit(x_train, y_train)

    print(cross_val_score(rf_model, x, y, cv=5).mean())
    y_pred = cross_val_predict(rf_classifier, x_test, y_test, cv=5)
    print(classification_report(y_true=y_test, y_pred=y_pred, target_names=['Whitefile', 'Webshell'], digits=6))

randomforest_train(x, y)

XGBoost

import xgboost
from sklearn.metrics import classification_report


def xgboost_train(x, y):
    xgb_classifier = xgboost.XGBClassifier(eval_metric='logloss', use_label_encoder=False, max_depth=4)


    # train_test_split 将矩阵随机划分为训练子集和测试子集,返回划分好的训练集测试集样本和训练集测试集标签
    x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.3, random_state=0)

    xgb_model = xgb_classifier.fit(x_train, y_train)

    y_pred = xgb_model.predict(x_test)

    # classification_report 用于显示主要分类指标的文本报告, 在报告中显示每个类的精确度,召回率, F1 值等信息
    report = classification_report(y_test, y_pred, digits=6, target_names=['Whitefile', 'Webshell'])
    print(':honeybee: XGBoost 分类模型训练结果:')
    print(report)

    '''
    # print(cross_val_score(xgb_classifier, x, y, cv=5).mean())
    y_pred = cross_val_predict(xgb_classifier, x, y, cv=5)
    print(':honeybee: [green]XGBoost 分类模型训练结果(5重交叉验证):')
    print(classification_report(y_true=y, y_pred=y_pred, target_names=['Whitefile', 'Webshell'], digits=6))
    '''

xgboost_train(x, y)

Decision Tree

from sklearn.tree import DecisionTreeClassifier

def decisiontree_train(x, y):
    dt_classifier = DecisionTreeClassifier()

    x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.3, random_state=0)
    dt_model = dt_classifier.fit(x_train, y_train)

    print(cross_val_score(dt_model, x, y, cv=5).mean())
    y_pred = cross_val_predict(dt_classifier, x_test, y_test, cv=5)
    print(classification_report(y_true=y_test, y_pred=y_pred, target_names=['Whitefile', 'Webshell'], digits=6))

decisiontree_train(x, y)

深度学习

CNN

from keras.models import Sequential
from keras.layers import Dense, Dropout
from keras.layers import Embedding
from keras.layers import Conv1D, GlobalAveragePooling1D, MaxPooling1D

optimizer = 'adam'
train_loss = 'binary_crossentropy'
activation = 'sigmoid'

model = Sequential()
model.add(Conv1D(128, 3, activation='relu', input_shape=(train_seq_length, vector_size)))
# model.add(Conv1D(128, 3, activation='relu', input_shape=(max_features, 1)))
# model.add(Conv1D(128, 3, activation='relu', input_shape=(22, 1)))
model.add(Conv1D(128, 4, activation='relu'))
model.add(Conv1D(128, 5, activation='relu'))
# model.add(MaxPooling1D())
model.add(GlobalAveragePooling1D())
# model.add(Dropout(0.5))
model.add(Dense(1, activation=activation))

model.compile(loss=train_loss,
              optimizer=optimizer,
              metrics=['accuracy'])

train_epoch = 20
train_batch = 100

# 实例化 CNN 对象并训练数据
print('[*] traning started')
print('[+] epoch: {0}, batch_size: {1}'.format(train_epoch, train_batch))

history = model.fit(x_train, y_train,
          batch_size=train_batch, epochs=train_epoch,
          validation_data=(x_test, y_test))

print('[+] training complete')

LSTM

from keras.models import Sequential
from keras.layers import Dense, Dropout
from keras.layers import Embedding
from keras.layers import Conv1D, GlobalAveragePooling1D, MaxPooling1D
import keras.layers as layers

optimizer = 'adam'
train_loss = 'binary_crossentropy'
activation = 'sigmoid'

train_batch = 100

model = Sequential()

model.add(layers.Bidirectional(layers.LSTM(64, return_sequences=True), input_shape=(train_seq_length, vector_size)))
# model.add(layers.Bidirectional(layers.LSTM(64, return_sequences=True), input_shape=(max_features, 1)))
# model.add(layers.Bidirectional(layers.LSTM(64, return_sequences=True), input_shape=(22, 1)))
model.add(layers.Bidirectional(layers.LSTM(32)))

model.add(layers.Dense(1, activation=activation))

model.compile(loss=train_loss,
    optimizer=optimizer,
    metrics=['accuracy'])

# 实例化 CNN 对象并训练数据
print('[*] traning started')
print('[+] epoch: {0}, batch_size: {1}'.format(10, train_batch))

model.fit(x_train, y_train,
          batch_size=train_batch, epochs=10,
          validation_data=(x_test, y_test))

print('[+] training complete')

GRU

from keras.models import Sequential
from keras.layers import Dense, Dropout
from keras.layers import Embedding
from keras.layers import Conv1D, GlobalAveragePooling1D, MaxPooling1D
import keras.layers as layers

optimizer = 'adam'
train_loss = 'binary_crossentropy'
activation = 'sigmoid'

train_epoch = 10
train_batch = 100

model = Sequential()
model.add(layers.GRU(256, dropout=0.2, recurrent_dropout=0.2, return_sequences=False, input_shape=(train_seq_length, vector_size)))
# model.add(layers.GRU(256, dropout=0.2, recurrent_dropout=0.2, return_sequences=False, input_shape=(max_features, 1)))
# model.add(layers.GRU(256, dropout=0.2, recurrent_dropout=0.2, return_sequences=False, input_shape=(22, 1)))
# model.add(layers.SimpleRNN(128))

model.add(layers.Dense(1, activation=activation))

model.compile(loss=train_loss,
    optimizer=optimizer,
    metrics=['accuracy'])

# 实例化 CNN 对象并训练数据
print('[*] traning started')
print('[+] epoch: {0}, batch_size: {1}'.format(train_epoch, train_batch))

model.fit(x_train, y_train,
          batch_size=train_batch, epochs=train_epoch,
          validation_data=(x_test, y_test))

print('[+] training complete')

暂无评论

发送评论 编辑评论


				
|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇
下一篇