前言 最近读了 go fuzz 源码,好几处细节不太理解为什么要这么写。决定自己造个轮子,完成一个 fuzzer,加深自己的理解。先是读完了 Build simple fuzzer 系列,师傅写的非常诙谐,文章娓娓道来,受益良多。师傅是用 python 实现了一个能够 fuzz C 或 C++ 可执行文件的 fuzzer,并把 vsf 源码 托管到了 github 上,有 4 个 tag,和文章一一对应。
首先我会把师傅的最终代码进行事无巨细的分析,并把师傅的 binary ninja 自动下断点的代码改成 ida 脚本插件版本,更方便我们配合 ida 进行使用。最后我会说下我对师傅的变异算法的解读,并试图优化一下师傅的变异策略。在最后完成 go 的部分重构 。
vsf 整体 fuzz 流程
声明要 fuzz 的对象、语料库、断点等
语料读入,初始化变异器
变异器进行迭代
执行 fuzz 逻辑
记录根据当前变异器得到的文件所能观察到的块
更新 corpus
用户是否发出终止命令,发出了就保存 crash 文件,否则就回 3 继续执行
vsf 代码解读
https://github.com/carstein/vsf/releases
变异器类: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 class Mutator : def __init__ (self, core ): self .core = core self .trace = set () self .corpus = [] self .pool = [] self .samples = [] def __iter__ (self ): self ._fit_pool() self ._mutate_pool() return self def __next__ (self ): if not self .samples: self ._fit_pool() self ._mutate_pool() global stop_flag if stop_flag: raise StopIteration else : return self .samples.pop()
这个类实现了__iter__
方法,返回一个迭代器对象。在这个方法中,首先调用_fit_pool
方法和_mutate_pool
方法,用于初始化变异池和进行变异操作。然后返回 self 作为迭代器对象。这个类还实现了__next__
方法,用于返回下一个变异后的样本。在这个方法中,首先判断变异后的样本集 self.samples 是否为空,如果为空,则调用_fit_pool
方法和_mutate_pool()
方法,用于初始化变异池和进行变异操作。然后判断全局变量 stop_flag 的值,如果为 True,则抛出 StopIteration 异常,表示迭代结束;否则,返回 self.samples.pop()
,弹出并返回变异后的样本。
简单来说,这个类是一个用于变异操作的类,通过构造函数初始化一些实例变量,包括核心样本集、当前观察到的块集合、执行过的样本集、变异池和变异后的样本集。通过实现__iter__
方法返回一个迭代器对象,以及实现__next__
方法返回下一个变异后的样本。这个类可以用于生成变异后的样本集。
之后就是我们的初始化变异池函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 def _fit_pool (self ): print ("### Fitting round\t\t" ) for sample in self .core: self .pool.append((sample, [])) print ("Pool size: {:d} [core samples promoted]" .format (len (self .pool))) for sample, trace in self .corpus: if trace - self .trace: self .pool.append((sample, trace)) print ("Pool size: {:d} [new traces promoted]" .format (len (self .pool))) if self .corpus and len (self .pool) < 100 : self .corpus.sort(reverse=True , key=lambda x: len (x[1 ])) for _ in range (min (100 - len (self .pool), len (self .corpus))): v = random.random() * random.random() * len (self .corpus) self .pool.append(self .corpus[int (v)]) self .corpus.pop(int (v)) print ("Pool size: {:d} [backfill from corpus]" .format (len (self .pool))) print ("### End of round\t\t" ) for _, t in self .corpus: self .trace |= t self .corpus = []
在方法的最后,遍历执行过的样本集 self.corpus,将每个样本对应的观察到的块集合与当前观察到的块集合进行合并操作,更新 self.trace 的值。之后将执行过的样本集 self.corpus 清空,即删除其中的所有样本。
总结起来,这个方法用于根据当前的观察到的块集合和执行过的样本集,将样本添加到变异池中。首先会将核心样本集中的样本添加到变异池中。然后,根据执行过的样本集中观察到的新块,将样本和块集合添加到变异池中。接着,如果变异池的大小小于 100,则根据 random 乘 random 的方式从执行过的样本集中“随机”选择一些样本进行补充。最后,更新当前观察到的块集合,并清空执行过的样本集。
1 2 3 4 5 6 7 8 9 10 11 def _mutate_pool (self ): while self .pool: sample, _ = self .pool.pop() for _ in range (10 ): self .samples.append(Mutator.mutate_sample(sample)) def update_corpus (self, data, trace=None ): self .corpus.append((data, trace))
之后就是师傅写的变异的具体函数了,首先是产生一个派发具体要调用哪个变异方法的函数,也就是上文的函数:mutate_sample
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 @staticmethod def mutate_sample (sample ): _sample = sample[:] methods = [ Mutator.bit_flip, Mutator.byte_flip, Mutator.magic_number, Mutator.add_block, Mutator.remove_block, ] f = random.choice(methods) idx = random.choice(range (0 , len (_sample))) f(idx, _sample) return _sample
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 SIZE = [4 , 8 , 16 , 32 , 64 ] FLIP_ARRAY = [1 , 2 , 4 , 8 , 16 , 32 , 64 , 128 ] MAGIC_VALS = [ [0xFF ], [0x7F ], [0x00 ], [0xFF , 0xFF ], [0x00 , 0x00 ], [0xFF , 0xFF , 0xFF , 0xFF ], [0x00 , 0x00 , 0x00 , 0x00 ], [0x00 , 0x00 , 0x00 , 0x80 ], [0x00 , 0x00 , 0x00 , 0x40 ], [0xFF , 0xFF , 0xFF , 0x7F ], ] ... @staticmethod def bit_flip (index, _sample ): num = random.choice(SIZE) for idx in random.choices(range (len (_sample)), k=num): _sample[idx] = _sample[idx] ^ random.choice(FLIP_ARRAY) @staticmethod def byte_flip (index, _sample ): num = random.choice(SIZE) for idx in random.choices(range (len (_sample)), k=num): _sample[idx] = _sample[idx] ^ random.getrandbits(8 ) @staticmethod def magic_number (index, _sample ): selected_magic = random.choice(MAGIC_VALS) if index > (len (_sample) - len (selected_magic)): index = len (_sample) - len (selected_magic) for c, v in enumerate (selected_magic): _sample[index + c] = v @staticmethod def add_block (index, _sample ): size = random.choice(SIZE) _sample[index:index] = bytearray ((random.getrandbits(8 ) for i in range (size))) @staticmethod def remove_block (index, _sample ): size = random.choice(SIZE) _sample = _sample[:index] + _sample[index + size :]
防止 ASLR 随机化
让造成程序 crash 的样本单一化
这里解释一下为什么要搞个这个函数:首先得到了造成程序 crash 的文件是很好,但是贪多就不行了,假如对于每个能够造成程序 crash 的文件我们都进行记录的话,这会造成我们保存文件过多,反而不利于我们分析,因此需要去重。而对于 linux 程序来说,aslr 基本上已经是人手必备了,所以必须使用到绝对地址减去内存映射的方式。
1 2 3 4 5 6 7 8 def get_base (vmmap ): for m in vmmap: if "x" in m.permissions and m.pathname.endswith( os.path.basename(config["target" ]) ): return m.start
类似这种形式:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 06:0030│ 0x7fffffffe1d8 ◂— 0x6887a0b1d2b37c3e 07:0038│ 0x7fffffffe1e0 —▸ 0x555555555060 (_start) ◂— endbr64 ─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────[ BACKTRACE ]────────────────────────────────────────────────────────────────────────────────────────────────────────────────────── ► 0 0x555555555149 main 1 0x7ffff7df3083 __libc_start_main+243 ──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────── pwndbg> vmmap LEGEND: STACK | HEAP | CODE | DATA | RWX | RODATA Start End Perm Size Offset File 0x555555554000 0x555555555000 r--p 1000 0 /home/main 0x555555555000 0x555555556000 r-xp 1000 1000 /home/main 0x555555556000 0x555555557000 r--p 1000 2000 /home/main 0x555555557000 0x555555558000 r--p 1000 2000 /home/main 0x555555558000 0x555555559000 rw-p 1000 3000 /home/main 0x7ffff7dcf000 0x7ffff7df1000 r--p 22000 0 /usr/lib/x86_64-linux-gnu/libc-2.31.so 0x7ffff7df1000 0x7ffff7f69000 r-xp 178000 22000 /usr/lib/x86_64-linux-gnu/libc-2.31.so 0x7ffff7f69000 0x7ffff7fb7000 r--p 4e000 19a000 /usr/lib/x86_64-linux-gnu/libc-2.31.so 0x7ffff7fb7000 0x7ffff7fbb000 r--p 4000 1e7000 /usr/lib/x86_64-linux-gnu/libc-2.31.so 0x7ffff7fbb000 0x7ffff7fbd000 rw-p 2000 1eb000 /usr/lib/x86_64-linux-gnu/libc-2.31.so 0x7ffff7fbd000 0x7ffff7fc3000 rw-p 6000 0 [anon_7ffff7fbd] 0x7ffff7fcb000 0x7ffff7fce000 r--p 3000 0 [vvar] 0x7ffff7fce000 0x7ffff7fcf000 r-xp 1000 0 [vdso] 0x7ffff7fcf000 0x7ffff7fd0000 r--p 1000 0 /usr/lib/x86_64-linux-gnu/ld-2.31.so 0x7ffff7fd0000 0x7ffff7ff3000 r-xp 23000 1000 /usr/lib/x86_64-linux-gnu/ld-2.31.so 0x7ffff7ff3000 0x7ffff7ffb000 r--p 8000 24000 /usr/lib/x86_64-linux-gnu/ld-2.31.so 0x7ffff7ffc000 0x7ffff7ffd000 r--p 1000 2c000 /usr/lib/x86_64-linux-gnu/ld-2.31.so 0x7ffff7ffd000 0x7ffff7ffe000 rw-p 1000 2d000 /usr/lib/x86_64-linux-gnu/ld-2.31.so 0x7ffff7ffe000 0x7ffff7fff000 rw-p 1000 0 [anon_7ffff7ffe] 0x7ffffffde000 0x7ffffffff000 rw-p 21000 0 [stack] 0xffffffffff600000 0xffffffffff601000 --xp 1000 0 [vsyscall]
获取断点的函数 1 2 3 4 5 6 7 8 9 10 11 12 def get_bpmap (path ): bpmap = [] if path and os.path.isfile(path): with open (path, "r" ) as fh: for line in fh.readlines(): bpmap.extend(list (map (lambda x: int (x.strip(), 16 ), line.split()))) else : print ("No breakpoint map; trace won't be generated" ) return bpmap
整体 fuzz 逻辑 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 def execute_fuzz (dbg, data, bpmap ): trace = set () cmd = [config["target" ], config["file" ]] pid = debugger.child.createChild(cmd, no_stdout=True , env=None ) proc = dbg.addProcess(pid, True ) base = get_base(proc.readMappings()) if bpmap: for offset in bpmap: proc.createBreakpoint(base + offset) while True : proc.cont() event = dbg.waitProcessEvent() if event.signum == signal.SIGSEGV: crash_ip = ( proc.getInstrPointer() - base - 1 ) if crash_ip not in crashes: crashes[crash_ip] = data proc.detach() break elif event.signum == signal.SIGTRAP: ip = proc.getInstrPointer() br = proc.findBreakpoint(ip - 1 ).desinstall() proc.setInstrPointer(ip - 1 ) trace.add(ip - base - 1 ) elif event.signum == signal.SIGINT: print ("Stoping execution" ) proc.detach() break elif isinstance (event, debugger.ProcessExit): proc.detach() break else : print ("Something went wrong -> {}" .format (event)) return trace
整体看下来,就是把要 fuzz 的和要打断点的一个个打上,然后一个永真,不断执行下去,最后出错了就保存造成 crash 的数据,不然就更新自己的 trace。最后返回的是 trace,也就是程序命中的断点数目
main 函数执行逻辑 1 2 3 4 5 6 7 8 9 10 11 for sample in mutator: save_file(sample) trace = execute_fuzz(dbg, sample, bp_map) mutator.update_corpus(sample, trace) counter += 1 print ( "#{:3d} Coverage {:.2f}%\r" .format (counter, (len (trace) / len (bp_map)) * 100 ), end="" , )
就是保存文件,执行 fuzz ,然后根据 trace 不断更新变异器的过程。
ida 脚本插件 师傅在这里用 binary ninja 写了个自动下断脚本,我把它改成 ida 脚本的形式,效果完全相同:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 import idaapiimport idautilsskip_func = [ "__libc_csu_init" , "__libc_csu_fini" , "_fini" , "__do_global_dtors_aux" , "_start" , "_init" , "sub_1034" , ] min_ea = idaapi.cvar.inf.min_ea max_ea = idaapi.cvar.inf.max_ea bv = idaapi.get_bytes(idaapi.get_fileregion_offset(min_ea), max_ea - min_ea) base = idaapi.get_segm_by_name(".text" ).start_ea for func_ea in idautils.Functions(base, idaapi.get_segm_by_name(".text" ).end_ea): func_name = idaapi.get_func_name(func_ea) if func_name in skip_func: continue func = idaapi.get_func(func_ea) if func_name is None or func.flags & idaapi.FUNC_LIB: continue output = "" for bb in idaapi.FlowChart(idaapi.get_func(func_ea)): output += "0x{:x} " .format (bb.start_ea - base) print (output)
在这里可以改成保存成文件的形式,方便之后加载到我们的断点中,这段代码的作用是遍历二进制文件中的所有函数,并输出每个函数的基本块地址。
首先,min_ea
和 max_ea
分别获取了当前二进制文件的起始地址和结束地址。然后,idaapi.get_bytes()
函数获取了整个二进制文件的字节流,并将其存储在 bv
变量中。接下来,base
获取了名为 .text
的段的起始地址,用于计算每个函数的基本块地址。idautils.Functions()
函数遍历了 .text
段中的所有函数,并将其存储在 func_ea
变量中。在循环中,首先使用 idaapi.get_func_name()
函数获取当前函数的名称,并检查是否在 skip_func
列表中。如果在列表中,则跳过当前函数。然后,使用 idaapi.get_func()
函数获取当前函数的 func_t
结构体,并检查是否为导入函数或无效函数。如果是,则跳过当前函数。最后,使用 idaapi.FlowChart()
函数获取当前函数的控制流程图,并遍历其中的所有基本块。对于每个基本块,计算其相对于 .text
段的偏移量,并将其添加到 output
变量中。最后,输出 output
变量,即当前函数的所有基本块地址。
改写的这个插件可以得到每个函数的基本块地址,以便进行后续的分析和调试。
变异算法 师傅在处理遗传算法的时候用的是这个算法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 if self .corpus and len (self .pool) < 100 : self .corpus.sort(reverse = True , key = lambda x: len (x[1 ])) for _ in range (min (100 -len (self .pool), len (self .corpus))): v = random.random() * random.random() * len (self .corpus) self .pool.append(self .corpus[int (v)]) self .corpus.pop(int (v)) print ('Pool size: {:d} [backfill from corpus]' .format (len (self .pool))) print ('### End of round\t\t' )
事实上,我们可以画个直方图看下具体分布情况:
1 2 3 4 5 6 7 8 In [84 ]: import random ...: import matplotlib.pyplot as plt ...: data = [random.random() * random.random() for _ in range (100000 )] ...: plt.hist(data, bins=100 , density=True ) ...: plt.xlabel('Value' ) ...: plt.ylabel('Occurrence probability' ) ...: plt.title('Distribution of random.random() * random.random()' ) ...: plt.show()
看上去似乎是合理的,因为索引越靠前的越容易被我们选中。但是试想一下:假如我们的 corpus 命中的 trace 是100、1、1、1、1、1、1、1。也就是说,索引为 0 的代表 100,其余所有都是一个很小的 trace:1,trace 为 1 所占比例甚至还大于我们的 100,这显然不是很合理。当然我说的很极端的情况,但也足够说明。因此我们需要真正把 trace 量化到具体遗传变异策略中,而不是通过一个索引的方式解决。我觉得对于该算法,使用轮盘赌是更合理的,类似这种:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 import randomdef roulette_wheel_selection (population, fitness ): total_fitness = sum (fitness) probabilities = [fit / total_fitness for fit in fitness] cumulative_probabilities = [ sum (probabilities[: i + 1 ]) for i in range (len (probabilities)) ] random_num = random.random() for i, prob in enumerate (cumulative_probabilities): if random_num <= prob: return population[i] return population[-1 ] population = ["A" , "B" , "C" , "D" , "E" ] fitness = [100 , 1 , 1 , 1 , 1 ] selected_individual = roulette_wheel_selection(population, fitness) print ("Selected individual:" , selected_individual)
这种情况下更加合理,这样的话,我们 A,也就是命中更多断点的样例获得了更大的选择概率:100/104。如果是之前的形式,我们优良的样例 A,选择概率甚至还没有命中 trace 为 1 的概率高:4/5。
go 重构思路 我把师傅的 0.1 版本完成了 go 的重构,整体 fuzzer 的整体思路也基本敲明白了,之后没事了再开发,把代码敲到 GitHub 里
逐行看一下:需要创建造成 crashes 的文件夹,之后从命令行中得到需要 fuzz 的语料,之后使用 getBytes 得到原始文件,然后设置 fuzz 的整体轮数,将原始文件先拷贝到 data 中,之后产生一个变异。下一行,师傅把变异结果放到了一个新的文件中(createNew 函数),之后就是真正要执行的 executeFuzz。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 func main () { if _, err := os.Stat("crashes" ); os.IsNotExist(err) { if err := os.Mkdir("crashes" , 0755 ); err != nil { fmt.Printf("创建文件夹失败:%v\n" , err) os.Exit(1 ) } } if len (os.Args) < 2 { usage() } else { filename := os.Args[2 ] origData := getBytes(filename) counter := 0 for counter < 100 { data := make ([]byte , len (origData)) copy (data, origData) mutatedData := mutate(data) createNew(mutatedData) executeFuzz(mutatedData, counter) if counter%10 == 0 { fmt.Printf("Counter: %d\n" , counter) } counter++ } } }
executeFuzz executeFuzz函数严格意义上可以使用 cmd.SysProcAttr = &syscall.SysProcAttr{Ptrace: true}
的父子进程调试的方式,但因为接收信号过于方便,就用了信号量的方式)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 func executeFuzz (data []byte , counter int ) { cmd := exec.Command("./main" , "data/mutated.jpg" ) if err := cmd.Start(); err != nil { fmt.Printf("启动子进程失败:%v\n" , err) os.Exit(1 ) } done := make (chan error , 1 ) go func () { done <- cmd.Wait() }() if err := <-done; err != nil { if err := writeDataToFile(counter, data); err != nil { fmt.Printf("写入文件失败:%v\n" , err) os.Exit(1 ) } fmt.Println("写入文件成功!" ) fmt.Printf("子进程退出:%v\n" , err) } } }
mutate 函数 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 const FLIP_RATIO = 0.01 var FLIP_ARRAY = []byte {1 , 2 , 4 , 8 , 16 , 32 , 64 , 128 }var MAGIC_VALS = [][]byte { {0xFF }, {0x7F }, {0x00 }, {0xFF , 0xFF }, {0x00 , 0x00 }, {0xFF , 0xFF , 0xFF , 0xFF }, {0x00 , 0x00 , 0x00 , 0x00 }, {0x00 , 0x00 , 0x00 , 0x80 }, {0x00 , 0x00 , 0x00 , 0x40 }, {0xFF , 0xFF , 0xFF , 0x7F }, } ... func mutate (data []byte ) []byte { flips := int (float64 (len (data)-4 ) * FLIP_RATIO) flipIndexes := rand.Perm(len (data) - 8 )[:flips] for _, idx := range flipIndexes { method := rand.Intn(2 ) if method == 0 { data[idx+2 ] = bitFlip(data[idx+2 ]) } else { magic(data, idx+2 ) } } return data } func bitFlip (byteVal byte ) byte { flipVal := FLIP_ARRAY[rand.Intn(len (FLIP_ARRAY))] return byteVal ^ flipVal } func magic (data []byte , idx int ) { pickedMagic := MAGIC_VALS[rand.Intn(len (MAGIC_VALS))] for i, m := range pickedMagic { data[idx+i] = m } }
getBytes 和 writeDataToFile 这种功能性函数不与赘述。只介绍逻辑函数。