首页
论坛
课程
招聘
[原创]基于深度学习的恶意软件分类器(一)
2022-2-19 18:37 13922

[原创]基于深度学习的恶意软件分类器(一)

2022-2-19 18:37
13922

一.前言

1.实验内容

通过将程序的16进制序列按照一定的规则转换为灰度图作为输入数据,随后使用VGG16深度卷积神经网络作为模型,利用深度卷积神经网络在图像识别上的准确性来来构建恶意程序分类器。该实验将会在两个公开数据集上进行测试,两个公开数据集的数据分别属于不同的恶意程序家族。因此,构建的分类器要解决的问题将是一个多分类问题。

2.实验环境

  • Python版本:3.6.13

  • Pytorch版本:1.8.1

  • CUDA版本11.4

二.灰度图的生成

最初提出将程序的16进制序列转换为灰度图作为输入数据的是由2011年发表的一篇论文中阐述的,该论文的链接是:Malware Images: Visualization and Automatic  Classification,感兴趣的,可以将论文下载下来,下面讲一下转换思路。

当程序被编译器编译完成以后,将会以二进制的形式保存在磁盘中。但是程序解析的时候,都是按照一个字节(8比特)来解析,所以解析器一般是按照16进制形式展现一个程序的二进制文件,如下图所示:

而每个字节的大小在0-255的范围内,这个范围和像素点的范围是一样的。因此,可以考虑将程序的每个字节当成一个像素点,转换成一个二维的数组,然后生成相应的灰度图,过程如下图所示:

这里就有一个问题,每个程序的字节数是不同的。因此,需要按照一定的规则来指定生成的图像的宽和高,作者在论文中给出的参考如下:

根据文件大小的不同来固定生成的图像的宽度,剩余的字节就填充到高度去,以此,来生成相应的灰度图来表征程序。此外,作者认为,生成灰度图的时候,应当把程序的PE头部分删除,只留下节区,因为节区中的内容才保存了程序要执行的指令,使用到的数据等到。如下图所示,可以看到不同节区生成的灰度图的纹理存在着明显的不同:

最终生成的灰度图如下图所示,其中上面三张图是其中一个类别的恶意程序生成的灰度图,而下面的三张图则是另一类别恶意程序生成的灰度图。从中可以看到,不同家族的恶意程序所生成的灰度图的纹理存在非常明显的差异,这就给模型的训练带来的便利。

三.公开数据集

1.Maling数据集

上述论文作者按照其思路,将如下所示的25个家族的恶意程序转换成数据集供给大家实验。其中,最后一列代表的是该家族恶意程序的数量,可以看出,数据集存在明显的类别不均衡的问题,事实上,这也是该领域的一大问题。

该数据集一共有九千多条数据,其中的百分之90(8000多条)用来作为训练数据,百分之10(900多条)用来作为测试数据。训练数据和测试数据的形式如下图所示:

每种家族的恶意程序都被保存到了一个文件夹中,文件夹的名称就是程序所属的家族,文件夹中则保存了每个家族的恶意软件生成的灰度图,如下图所示:

该数据集提供了测试集的标签,因此使用准确率来判断模型性能,如下是该数据集的数据类代码

from torch.utils.data import Dataset
import os
import cv2
from torchvision import transforms
import pandas as pd
import glob
import torch


# 定义数据类来读取文件
class MalwareDataset(Dataset):
    def __init__(self, file_path):
        self.root_path = file_path
        self.file_path = []
        self.y_data = []
        self.len = 0

        category = 0    # 每种样本的类别(0-24)
        for dir_name in os.listdir(self.root_path):
            for file_name in os.listdir(os.path.join(self.root_path, dir_name)):
                self.file_path.append(os.path.join(self.root_path, dir_name, file_name))    # 存储文件路径
                self.y_data.append(category)    # 存储类别
                self.len = self.len + 1
            category += 1
        self.transforms_data = transforms.Compose(
            # [transforms.RandomRotation(-45, 45)],
            [transforms.ToTensor()]
        )

    def __getitem__(self, index):
        data_path = self.file_path[index]
        image = cv2.imread(data_path)
        image = cv2.resize(image, (224, 224))
        image = self.transforms_data(image)

        return image, self.y_data[index]

    def __len__(self):
        return self.len

2.微软公开数据集

该数据集是微软2015年在kaggle举行的恶意软件分类比赛上提供的数据集,该比赛的网址是:Microsoft Malware Classification Challenge。该数据集的训练集样本的测试集样本数量差不多,都是1W出头的数量。trainLabels.csv文件保存了训练集中的恶意程序类别,其中ID是软件的名称,Class是所属的类别,如下图所示:

对于每个恶意软件,数据集都给出了其16进制序列和汇编指令,分别保存在.bytes和.asm文件中。

其中,.bytes文件保存的软件的16进制序列去除了PE头,如下图所示:

.asm文件中的汇编指令则是使用IDA生成的,如下图所示:

这个数据集特别的大,所以解压的时候最好使用py7zr用来解压,下面是参考代码:

import os
import py7zr


# 辅助函数,将.bytes文件从.7z压缩包中解压出来 减少硬盘消耗
if __name__ == '__main__':
    base_dir = "D:\\AI\\dataset\\malware-classification"
    file_path = os.path.join(base_dir, "test.7z")   # 要解压的压缩包文件
    dest_path = os.path.join(base_dir, "test")      # 解压出来的文件保存的位置
    if not os.path.exists(dest_path):
        os.makedirs(dest_path)
    with py7zr.SevenZipFile(file_path, mode="r") as archive:
        file_names = archive.getnames()
        byte_file = []
        for file_name in file_names:
            if file_name[-1] == 'm':    # m解压的是.asm文件,s解压的是.bytes文件
                byte_file.append(file_name)
        # print(byte_file)
        archive.extract(dest_path, byte_file)
    print("Extract file Ok")

在本实验中,只使用到了.bytes文件。由于.bytes文件中保存的是程序的16进制序列,因此,需要将其按照上述规则转换为对应的灰度图像。下面给出相应的参考代码:

import os.path
import numpy as np
from PIL import Image
from ImageDataset import get_file_name
from CalImageArray import get_gray_image
import glob


# 将.bytes中的文本16进制读出来
def read_bytes(file_path):
    res_bytes = []
    with open(file_path, mode='r') as fp:
        for byte_line in fp.readlines():    # 循环读出每一行
            str_bytes = byte_line.split(" ")  # 根据空格划分为数组
            for str_byte in str_bytes:
                if str_byte[0] == '?':  # 舍弃无用字符
                    continue
                byte = int(str_byte, 16)    # 将字符按照16进制转成数字
                if byte <= 0xFF:            # 每行前面的地址都要舍弃
                    res_bytes.append(byte)
    return res_bytes
    

# 将数组转为灰度图像
def get_gray_image(array_byte):
    byte_len = len(array_byte)
    file_size = int(byte_len / 1024)
    file_size_array = [10, 30, 60, 100, 200, 500, 1000]
    image_width_array = [32, 64, 128, 256, 384, 512, 768]
    image_width = 0

    for i in range(7):
        if file_size < file_size_array[i]:
            image_width = image_width_array[i]
            break
    if image_width == 0:
        image_width = 1024

    image_height = int(byte_len / image_width)
    image_bytes = np.zeros([image_height, image_width])
    k = 0

    for i in range(image_height):
        for j in range(image_width):
            image_bytes[i][j] = array_byte[k]
            k += 1
    return image_bytes
    

def get_gray_images(base_dir):
    count = 0
    # 保存生成的灰度图像的路径
    dest_path = os.path.join(base_dir, "..", "train_gray_images")
    if not os.path.exists(dest_path):
        os.makedirs(dest_path)
    # 获取保存.bytes文件的路径
    bytes_file_path = glob.glob(os.path.join(base_dir, "*.bytes"))
    for file_path in bytes_file_path:
        # 保存的图片名称
        img_path = os.path.join(dest_path, get_file_name(file_path)) + ".png"
        # 获取.bytes文件中的16进制序列
        array_byte = read_bytes(file_path)
        if len(array_byte) == 0:
            array_byte = [0] * (224 * 224)
        # 转换成相应的二维数组
        img = get_gray_image(array_byte)
        img = np.uint8(img)
        # 生成图片
        img = Image.fromarray(img)
        img.save(img_path)

        count += 1
        if count % 100 == 99:
            print(count)
    print("Get gray image ok! get %d images" % count)

该数据集没有提供测试集的标签,想要测试分类器的性能需要将测试结果上传到以上的数据集官网中。官网按照以下的公式计算损失,来对测试结果进行评分,验证分类器的精度:

其中N表示样本总数,M表示恶意软件的家族数,pij表示预测第i个样本分类为第j类的概率。当第i个样本属于第j类的时候,yij等于1,否则等于0。以下是根据这个数据集的特点编写的数据类:

class MalwareTrainDataset(Dataset):
    def __init__(self, file_path):
        self.root_path = file_path

        self.file_path = glob.glob(os.path.join(self.root_path, "*.png"))
        train_label_path = os.path.join(self.root_path, "..", "trainLabels.csv")
               df = pd.read_csv(train_label_path)
        self.y_data = get_train_label(self.file_path, df)
        self.len = len(self.file_path)
        self.transforms_data = transforms.Compose(
            [transforms.ToTensor()]
        )

    def __getitem__(self, index):
        image = cv2.imread(self.file_path[index])
        image = cv2.resize(image, (224, 224))
        image = self.transforms_data(image)

        return image, self.y_data[index]

    def __len__(self):
        return self.len


class MalwareTestDataset(Dataset):
    def __init__(self, file_path):
        self.root_path = file_path
        self.file_path = glob.glob(os.path.join(self.root_path, "*.png"))
        self.len = len(self.file_path)
        self.transforms_data = transforms.Compose(
            [transforms.ToTensor()]
        )

    def __getitem__(self, index):
        image = cv2.imread(self.file_path[index])
        image = cv2.resize(image, (224, 224))
        image = self.transforms_data(image)
        file_name = get_file_name(self.file_path[index])

        return image, file_name

    def __len__(self):
        return self.len


# 从trainLabels.csv中获得文件名对应的类别
def get_train_label(file_path, df):
    train_label = []
    for fp in file_path:
        file_name = get_file_name(fp)
        train_label.append(df[df["Id"] == file_name]["Class"].astype(int).values[0] - 1)
        
    return torch.Tensor(train_label)


# 根据文件路径得出不带后缀的文件名
def get_file_name(file_path):
    file_name_begin = file_path.rfind("/")
    file_name_end = file_path.rfind(".")
    file_name = file_path[file_name_begin + 1:file_name_end]
    return file_name

四.模型

训练模型使用该论文所说的模型:Malware Classification with Deep Convolutional Neural Networks。论文所提供的模型,如下图所示:

不难看出,这个模型就是将VGG16模型,只不过模型的输出要修改为相应的数据集的恶意软件的类别,因此,相应的代码如下:

import torch.nn as nn
from torch.hub import load_state_dict_from_url

__all__ = [
    'VGG', 'vgg16'
]

model_urls = {
    'vgg16': 'https://download.pytorch.org/models/vgg16-397923af.pth',
}


class VGG(nn.Module):
    def __init__(self, features, num_classes=25, init_weights=True):
        super(VGG, self).__init__()
        self.features = features
        self.avgpool = nn.AdaptiveAvgPool2d((7, 7))
        self.classifier = nn.Sequential(
            nn.Linear(512 * 7 * 7, 4096),
            nn.ReLU(True),
            nn.Dropout(),
            nn.Linear(4096, 4096),
            nn.ReLU(True),
            nn.Dropout(),
            nn.Linear(4096, num_classes)
        )
        if init_weights:
            self._initialize_weights()

    def forward(self, x):
        x = self.features(x)
        x = self.avgpool(x)
        x = x.view(x.size(0), -1)
        x = self.classifier(x)
        return x

    def _initialize_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
                if m.bias is not None:
                    nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.BatchNorm2d):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.Linear):
                nn.init.normal_(m.weight, 0, 0.01)
                nn.init.constant_(m.bias, 0)


def make_layers(cfg, batch_norm=False):
    layers = []
    in_channels = 3
    for v in cfg:
        if v == 'M':
            layers += [nn.MaxPool2d(kernel_size=2, stride=2)]
        else:
            conv2d = nn.Conv2d(in_channels, v, kernel_size=3, padding=1)
            if batch_norm:
                layers += [conv2d, nn.BatchNorm2d(v), nn.ReLU(inplace=True)]
            else:
                layers += [conv2d, nn.ReLU(inplace=True)]
            in_channels = v
    return nn.Sequential(*layers)


cfgs = {
    'A': [64, 'M', 128, 'M', 256, 256, 'M', 512, 512, 'M', 512, 512, 'M'],
    'B': [64, 64, 'M', 128, 128, 'M', 256, 256, 'M', 512, 512, 'M', 512, 512, 'M'],
    'D': [64, 64, 'M', 128, 128, 'M', 256, 256, 256, 'M', 512, 512, 512, 'M', 512, 512, 512, 'M'],
    'E': [64, 64, 'M', 128, 128, 'M', 256, 256, 256, 256, 'M', 512, 512, 512, 512, 'M', 512, 512, 512, 512, 'M'],
}


def _vgg(arch, cfg, batch_norm, pretrained, progress, **kwargs):
    if pretrained:
        kwargs['init_weights'] = False
    model = VGG(make_layers(cfgs[cfg], batch_norm=batch_norm), **kwargs)
    if pretrained:
        state_dict = load_state_dict_from_url(model_urls[arch], progress=progress)
        model.load_state_dict(state_dict, strict=False)
    return model


def vgg16(pretrained=False, progress=True, **kwargs):
    """gray 16-layer model (configuration "D")

    Args:
        pretrained (bool): If True, returns a model pre-trained on ImageNet
        progress (bool): If True, displays a progress bar of the download to stderr
    """
    return _vgg('vgg16', 'D', False, pretrained, progress, **kwargs)

五.参数设置

论文中选择的损失函数是cross_entropy,优化器则是SGD,其相应的参数设置如下:

  • batch_size: 8

  • epochs: 25

  • lr: 0.001

  • decay: 0.0005

  • momentum: 0.9

因此,使用Configure保存了这些信息:

import os


# 存储程序需要的内容
class Configure:
    # 数据集存放路径,根据实际情况更改
    base_path = ""
    train_path = os.path.join(base_path, "train")
    test_path = os.path.join(base_path, "validation")

    model_path = "model.pth"    # 保存模型的路径
    load_model = False  # 是否要加载模型

    batch_size = 8
    epochs = 25
    lr = 0.001
    decay = 0.0005
    momentum = 0.9

最后给出主运行代码如下:

import os
import torch
import torch.nn.functional as F
from torch.utils.data import DataLoader
from VGG import vgg16
from MalwareDataset import MalwareDataset, MalwareTrainDataset, MalwareTestDataset
from Configure import Configure
import sys
import pandas as pd


def train(epoch):
    for batch_idx, data in enumerate(train_loader, 0):
        optimizer.zero_grad()   # 梯度清0

        inputs, labels = data
        inputs, labels = inputs.to(device), labels.to(device)

        y_pred = model(inputs)  # 前向传播
        loss = torch.nn.functional.cross_entropy(y_pred, labels.long())    # 计算损失

        if batch_idx % 100 == 99:
            print("epoch=%d, loss=%f" % (epoch, loss.item()))

        loss.backward()     # 反向传播
        optimizer.step()    # 梯度更新

# 用来测试Maling数据集
def test():
    correct = 0
    total = 0
    with torch.no_grad():
        for data in test_loader:
            inputs, target = data
            inputs, target = inputs.to(device), target.to(device)
            outputs = model(inputs)
            _, predicted = torch.max(outputs.data, dim=1)
            total += target.size(0)
            correct += (predicted == target).sum()
    acc = 1.0 * 100 * correct / total
    print('Accuracy on test set: %f %% [%d/%d]' % (acc, correct, total))


# 用来测试微软的公开数据集
def test():
    df = pd.read_csv(Configure.result_dir)
    with torch.no_grad():
        for inputs, file_name in test_loader:
            inputs = inputs.to(device)
            outputs = model(inputs)
            predicted = F.softmax(outputs.data)
            # _, predicted = torch.max(outputs.data, dim=1)
            data_len = len(inputs)
            for i in range(data_len):
                dict_res = {"Id": file_name[i], "Prediction1": 0, "Prediction2": 0,
                            "Prediction3": 0, "Prediction4": 0, "Prediction5": 0,
                            "Prediction6": 0, "Prediction7": 0, "Prediction8": 0, "Prediction9": 0}
                for j in range(9):
                    dict_res["Prediction" + str(j + 1)] = predicted[i][j].item()
                df = df.append(dict_res, ignore_index=True)
    df.to_csv(Configure.result_dir, index=0)


def save_model(target_model, model_path):
    if os.path.exists(model_path):
        os.remove(model_path)
    torch.save(target_model.state_dict(), model_path)


def load_model(target_model, model_path):
    if not os.path.exists(model_path):
        print("模型路径错误,模型加载失败")
        sys.exit(0)
    else:
        target_model.load_state_dict(torch.load(model_path))
        target_model.eval()


if __name__ == '__main__':
    os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID"
    os.environ["CUDA_VISIBLE_DEVICES"] = "0"

    Configure = Configure()

    # 定义Maling数据集的数据类
    train_dataset = MalwareDataset(Configure.train_path)
    train_loader = DataLoader(train_dataset, batch_size=Configure.batch_size,
                              shuffle=True, num_workers=2)

    test_dataset = MalwareDataset(Configure.test_path)
    test_loader = DataLoader(test_dataset, batch_size=Configure.batch_size,
                             shuffle=False, num_workers=2)

    # 微软公开数据集的数据类
    train_dataset = MalwareTrainDataset(Configure.train_path)
    train_loader = DataLoader(train_dataset, batch_size=Configure.batch_size,
                              shuffle=True, num_workers=2)

    test_dataset = MalwareTestDataset(Configure.test_path)
    test_loader = DataLoader(test_dataset, batch_size=Configure.batch_size,
                             shuffle=False, num_workers=2)

    if Configure.load_model:    # 选择加载模型
        model = vgg16()
        print("=====================开始加载模型================")
        load_model(model, Configure.model_path)
        print("=====================模型加载完成================")
    else:   # 选择训练模型
        model = vgg16(pretrained=True)

    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    model.to(device)

    if not Configure.load_model:
        optimizer = torch.optim.SGD(model.parameters(), lr=Configure.lr,
                                    weight_decay=Configure.decay, momentum=Configure.momentum)    # 定义优化器
        print("=====================开始训练模型================")
        for i in range(Configure.epochs):
            train(i)
        print("=====================模型训练完成================")
        save_model(model, Configure.model_path)
    print("=====================开始测试模型================")
    test()
    print("=====================模型测试完成================")

六.分类结果

对于Maling数据集,分类器的测试精度达到了百分之98,也就是935条测试数据集中的918条数据被正确的分类

在微软的公开数据集上分别有Private Score和Public Score两个分数,前者对测试集中的百分之70的样本进行损失计算,后者对剩余百分之30进行损失计算。该模型最终在Private Score的分数为0.06705,Public Score的分数为0.05235


【看雪培训】《Adroid高级研修班》2022年夏季班招生中!

最后于 2022-4-12 16:56 被1900编辑 ,原因:
收藏
点赞2
打赏
分享
最新回复 (8)
雪    币: 3776
活跃值: 活跃值 (1000)
能力值: ( LV2,RANK:10 )
在线值:
发帖
回帖
粉丝
十年后 活跃值 2022-2-20 08:25
2
0
分析的很到位,谢谢分享
雪    币: 0
活跃值: 活跃值 (94)
能力值: ( LV2,RANK:10 )
在线值:
发帖
回帖
粉丝
kofcoco 活跃值 2022-2-21 14:55
3
0
很强大,学习了
雪    币: 4854
活跃值: 活跃值 (2600)
能力值: ( LV7,RANK:150 )
在线值:
发帖
回帖
粉丝
淡然他徒弟 活跃值 1 2022-2-21 22:13
4
0
mark了
雪    币:
能力值: ( LV1,RANK:0 )
在线值:
发帖
回帖
粉丝
wx_咿呀咿呀 活跃值 2022-3-23 17:04
5
0
雪    币: 76
活跃值: 活跃值 (652)
能力值: ( LV2,RANK:10 )
在线值:
发帖
回帖
粉丝
saloyun 活跃值 2022-3-24 17:27
6
0
雪    币: 200
能力值: ( LV1,RANK:0 )
在线值:
发帖
回帖
粉丝
mb_kbkqyusp 活跃值 2022-3-24 18:52
7
0
wa
雪    币:
能力值: ( LV1,RANK:0 )
在线值:
发帖
回帖
粉丝
mb_pyuhxzzy 活跃值 2022-5-24 19:23
8
0
请问maling数据集的下载地址是什么
雪    币: 17696
活跃值: 活跃值 (16004)
能力值: ( LV15,RANK:750 )
在线值:
发帖
回帖
粉丝
1900 活跃值 5 2022-5-24 19:27
9
0
mb_pyuhxzzy 请问maling数据集的下载地址是什么

论文里面有,不过之前有人分享了kaggle的,所以也可以去这里下载:https://www.kaggle.com/datasets/keerthicheepurupalli/malimg-dataset9010

最后于 2022-5-24 19:30 被1900编辑 ,原因:
游客
登录 | 注册 方可回帖
返回