本文最后更新于 262 天前,其中的信息可能已经有所发展或是发生改变。
特征提取
获取php文件
def get_php_file(base_dir):
file_list = []
for path, dirs, files in os.walk(base_dir):
for file in files:
if file.endswith('.php'):
filename = os.path.realpath(os.path.join(path, file))
try:
f = open(filename, "r", encoding='unicode_escape').read()
lines = open(filename, "r", encoding='unicode_escape').readlines()
file_list.append(filename)
except UnicodeDecodeError:
os.remove(filename)
print(filename)
pass
return file_list
获取opcode
# 获得opcode
def get_file_opcode(filename):
# VLD插件获得opcode
cmd = 'php -dvld.active=1 -dvld.execute=0 ' + filename
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
opcodes = []
pattern = r'([A-Z_]{2,})\s+'
for line in p.stdout.readlines()[8:-3]:
try:
match = re.search(pattern, str(line, encoding='utf-8'))
except UnicodeDecodeError:
match = re.search(pattern, str(line))
if match:
opcodes.append(match.group(1))
p.terminate()
return opcodes
获取AST语法树
def get_file_ast(filename):
cmd = 'php getParser.php ' + filename
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
asts = []
# pattern = r'([A-Z_]{2,})\s+'
for line in p.stdout.readlines()[6:]:
asts.append(str(line, encoding='unicode_escape'))
p.terminate()
return asts
getParser.php
<?php
require_once "vendor/autoload.php";
use PhpParser\Error;
use PhpParser\NodeDumper;
use PhpParser\ParserFactory;
use PhpParser\Node;
class MyNodeVisitor extends PhpParser\NodeVisitorAbstract{
public function leaveNode(Node $node) {
echo $node->getType();
echo "\n";
}
}
if ($argc > 1) {
$code = file_get_contents($argv[1]);
$parser = (new ParserFactory)->create(ParserFactory::PREFER_PHP7);
$traverser = new PhpParser\NodeTraverser();
$traverser->addVisitor(new MyNodeVisitor());
try {
$ast = $parser->parse($code);
$traverser->traverse($ast);
// $dumper = new NodeDumper;
// echo $dumper->dump($ast) . "\n";
} catch (Error $error) {
echo "Parse error: {$error->getMessage()}\n";
return;
}
}
获取全局变量
# 全局变量
def get_all_global_var(file_list):
patterns = ['$_GET', '$_POST', '$_COOKIE', '$_REQUEST', '$_FILES', '$_SESSION']
files_global_var = []
for file in file_list:
file_global_var = []
try:
txt = open(file, "r", encoding='unicode_escape').read()
for pattern in patterns:
index = len(re.findall(pattern, txt))
file_global_var.append(index)
files_global_var.append(file_global_var)
except UnicodeDecodeError:
print(file)
return files_global_var
获取句法特征
# 句法特征
def get_all_syntax(file_list):
patterns = ['if', 'elseif', 'else', 'case', 'for', 'while', 'foreach']
files_syntax = []
for file in file_list:
file_syntax = []
txt = open(file, "r", encoding='unicode_escape').read()
lines = open(file, "r", encoding='unicode_escape').readlines()
size = len(lines)
if size != 0:
for pattern in patterns:
index = len(re.findall(pattern, txt))
file_syntax.append(index / size)
files_syntax.append(file_syntax)
else:
files_syntax.append([0, 0, 0, 0, 0, 0, 0])
return files_syntax
获取敏感函数
# 敏感函数
def get_all_sensitive_function(file_list):
patterns = [
['eval', 'assert', 'preg_replace', 'create_function', 'array_map', 'call_user_func', 'call_user_func_array', 'array_filter', 'usort' 'uasort',
'system', 'exec', 'passthru', 'shell_exec', 'pcntl_exec', 'popen'],
['perl', 'python', 'gcc', 'chmod', 'nohup'],
['uname', 'sysctl', 'whoami', '$OSTYPE', 'curl', 'ping']
]
files_sensitive_function = []
for file in file_list:
file_sensitive_function = []
txt = open(file, "r", encoding='unicode_escape').read()
for pattern in patterns:
length = 0
for i in pattern:
index = len(re.findall(i, txt))
length += index
file_sensitive_function.append(length)
files_sensitive_function.append(file_sensitive_function)
return files_sensitive_function
获取统计学特征
def get_all_features(file_list):
files_features = []
for file in file_list:
lines = open(file, "r", encoding='unicode_escape').readlines()
if len(lines) == 0:
files_features.append([0, 0, 0])
else:
# 行最长长度
longest_line = max(len(line) for line in lines)
# 平均行长度
avg = sum(len(line) for line in lines) / len(lines)
# 最长词长度
longest_str = 0
for line in lines:
if len(line.split()):
long = max(line.split(), key=len)
if len(long) > longest_str:
longest_str = len(long)
files_features.append([longest_line, avg, longest_str])
return files_features
重合指数
# 重合指数
def get_index_of_coincidence(data):
char_count = 0 # 保存在data中任意选择两个字符,这两个字符相同的情形的数量
total_char_count = 0 # 保存在data所有字符的数量
# 遍历单字节代表的 256 字符
for x in range(256):
char = chr(x)
# 计算当前字符在data中的数量
one_count = data.count(char)
# 计算在data中任意选择两个字符,这两个字符都为当前字符的情形的数量,并累加
char_count += one_count * (one_count - 1)
# 计算当前字符在data中的数量,并累加
total_char_count += one_count
# 按照重合指数的计算方法进行计算
ic = float(char_count) / (total_char_count * (total_char_count - 1))
return ic
信息熵
# 信息熵
def get_information_entropy(data):
entropy = 0 # 保存最终熵值
stripped_data = data.replace(' ', '') # 去掉文件内容中的空格
# 遍历所有 ascii 256个字符
for x in range(256):
p_x = float(stripped_data.count(chr(x))) / len(stripped_data) # 计算单个字符出现的概率
if p_x > 0:
entropy += - p_x * math.log(p_x, 2) # 计算该字符的熵值并累加
return entropy
文件压缩比
# 文件压缩比
def get_ratio(data):
compressed = zlib.compress(data)
ratio = float(len(data)) / float(len(compressed))
return ratio
词向量提取
n-gram + TF-IDF
opcode_CoVec = CountVectorizer(ngram_range=(2, 4), decode_error="ignore",
max_features=opcode_max_features,
token_pattern=r'\b\w+\b',
min_df=1,
max_df=1.0)
# 生成词频矩阵 [i][j] -> k, j 词在 i 文本中的频率为 k
opcode_Covec_Mat = opcode_CoVec.fit_transform(new_opcode_list).toarray()
opcode_transformer = TfidfTransformer(smooth_idf=False)
opcode_Tfidf_Mat = opcode_transformer.fit_transform(opcode_Covec_Mat).toarray()
opcode_Tfidf_Mat = numpy.array(opcode_Tfidf_Mat)
Word2Vec
# word2vec 模型训练
print("[*] Training Word2Vec model...")
from gensim.models import Word2Vec
model = Word2Vec(x,min_count = 1,vector_size=10)
model.train(x, total_examples=model.corpus_count, epochs=20)
print("[+] Train success.")
机器学习
RandomForest
from sklearn.ensemble import RandomForestClassifier
def randomforest_train(x, y):
rf_classifier = RandomForestClassifier(oob_score=True)
x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.3, random_state=0)
rf_model = rf_classifier.fit(x_train, y_train)
print(cross_val_score(rf_model, x, y, cv=5).mean())
y_pred = cross_val_predict(rf_classifier, x_test, y_test, cv=5)
print(classification_report(y_true=y_test, y_pred=y_pred, target_names=['Whitefile', 'Webshell'], digits=6))
randomforest_train(x, y)
XGBoost
import xgboost
from sklearn.metrics import classification_report
def xgboost_train(x, y):
xgb_classifier = xgboost.XGBClassifier(eval_metric='logloss', use_label_encoder=False, max_depth=4)
# train_test_split 将矩阵随机划分为训练子集和测试子集,返回划分好的训练集测试集样本和训练集测试集标签
x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.3, random_state=0)
xgb_model = xgb_classifier.fit(x_train, y_train)
y_pred = xgb_model.predict(x_test)
# classification_report 用于显示主要分类指标的文本报告, 在报告中显示每个类的精确度,召回率, F1 值等信息
report = classification_report(y_test, y_pred, digits=6, target_names=['Whitefile', 'Webshell'])
print(':honeybee: XGBoost 分类模型训练结果:')
print(report)
'''
# print(cross_val_score(xgb_classifier, x, y, cv=5).mean())
y_pred = cross_val_predict(xgb_classifier, x, y, cv=5)
print(':honeybee: [green]XGBoost 分类模型训练结果(5重交叉验证):')
print(classification_report(y_true=y, y_pred=y_pred, target_names=['Whitefile', 'Webshell'], digits=6))
'''
xgboost_train(x, y)
Decision Tree
from sklearn.tree import DecisionTreeClassifier
def decisiontree_train(x, y):
dt_classifier = DecisionTreeClassifier()
x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.3, random_state=0)
dt_model = dt_classifier.fit(x_train, y_train)
print(cross_val_score(dt_model, x, y, cv=5).mean())
y_pred = cross_val_predict(dt_classifier, x_test, y_test, cv=5)
print(classification_report(y_true=y_test, y_pred=y_pred, target_names=['Whitefile', 'Webshell'], digits=6))
decisiontree_train(x, y)
深度学习
CNN
from keras.models import Sequential
from keras.layers import Dense, Dropout
from keras.layers import Embedding
from keras.layers import Conv1D, GlobalAveragePooling1D, MaxPooling1D
optimizer = 'adam'
train_loss = 'binary_crossentropy'
activation = 'sigmoid'
model = Sequential()
model.add(Conv1D(128, 3, activation='relu', input_shape=(train_seq_length, vector_size)))
# model.add(Conv1D(128, 3, activation='relu', input_shape=(max_features, 1)))
# model.add(Conv1D(128, 3, activation='relu', input_shape=(22, 1)))
model.add(Conv1D(128, 4, activation='relu'))
model.add(Conv1D(128, 5, activation='relu'))
# model.add(MaxPooling1D())
model.add(GlobalAveragePooling1D())
# model.add(Dropout(0.5))
model.add(Dense(1, activation=activation))
model.compile(loss=train_loss,
optimizer=optimizer,
metrics=['accuracy'])
train_epoch = 20
train_batch = 100
# 实例化 CNN 对象并训练数据
print('[*] traning started')
print('[+] epoch: {0}, batch_size: {1}'.format(train_epoch, train_batch))
history = model.fit(x_train, y_train,
batch_size=train_batch, epochs=train_epoch,
validation_data=(x_test, y_test))
print('[+] training complete')
LSTM
from keras.models import Sequential
from keras.layers import Dense, Dropout
from keras.layers import Embedding
from keras.layers import Conv1D, GlobalAveragePooling1D, MaxPooling1D
import keras.layers as layers
optimizer = 'adam'
train_loss = 'binary_crossentropy'
activation = 'sigmoid'
train_batch = 100
model = Sequential()
model.add(layers.Bidirectional(layers.LSTM(64, return_sequences=True), input_shape=(train_seq_length, vector_size)))
# model.add(layers.Bidirectional(layers.LSTM(64, return_sequences=True), input_shape=(max_features, 1)))
# model.add(layers.Bidirectional(layers.LSTM(64, return_sequences=True), input_shape=(22, 1)))
model.add(layers.Bidirectional(layers.LSTM(32)))
model.add(layers.Dense(1, activation=activation))
model.compile(loss=train_loss,
optimizer=optimizer,
metrics=['accuracy'])
# 实例化 CNN 对象并训练数据
print('[*] traning started')
print('[+] epoch: {0}, batch_size: {1}'.format(10, train_batch))
model.fit(x_train, y_train,
batch_size=train_batch, epochs=10,
validation_data=(x_test, y_test))
print('[+] training complete')
GRU
from keras.models import Sequential
from keras.layers import Dense, Dropout
from keras.layers import Embedding
from keras.layers import Conv1D, GlobalAveragePooling1D, MaxPooling1D
import keras.layers as layers
optimizer = 'adam'
train_loss = 'binary_crossentropy'
activation = 'sigmoid'
train_epoch = 10
train_batch = 100
model = Sequential()
model.add(layers.GRU(256, dropout=0.2, recurrent_dropout=0.2, return_sequences=False, input_shape=(train_seq_length, vector_size)))
# model.add(layers.GRU(256, dropout=0.2, recurrent_dropout=0.2, return_sequences=False, input_shape=(max_features, 1)))
# model.add(layers.GRU(256, dropout=0.2, recurrent_dropout=0.2, return_sequences=False, input_shape=(22, 1)))
# model.add(layers.SimpleRNN(128))
model.add(layers.Dense(1, activation=activation))
model.compile(loss=train_loss,
optimizer=optimizer,
metrics=['accuracy'])
# 实例化 CNN 对象并训练数据
print('[*] traning started')
print('[+] epoch: {0}, batch_size: {1}'.format(train_epoch, train_batch))
model.fit(x_train, y_train,
batch_size=train_batch, epochs=train_epoch,
validation_data=(x_test, y_test))
print('[+] training complete')