























# -*- coding: utf-8 -*- """ IDA Pro 9.3 Script - Extract Function Referenced Data Regions 输出格式:IDA LST 风格(.rdata:XXXXXXXX db/dq/...) Usage: 运行脚本后,通过对话框输入函数名称或地址 """ import idaapi import idautils import idc import ida_bytes import ida_segment import ida_name import ida_funcs import ida_xref import os import json import struct import re import bisect from collections import OrderedDict, Counter # ==================== 配置区域 ==================== OUTPUT_DIR = "" # 输出目录,为空则使用IDB所在目录 OUTPUT_FORMAT = "lst" # "lst" | "json" | "binary" | "all" MAX_DATA_SIZE = 0x100000 # 单块数据最大读取限制 (1MB) FOLLOW_POINTERS = True # 是否跟踪指针指向的数据 MAX_DEPTH = 2 # 指针跟踪深度 LST_BYTES_PER_LINE = 16 # lst 格式每行 hex dump 宽度(仅影响 unknown 块) # ================================================== # ======================================================== # 兼容层:判断是否 64 位 # ======================================================== def is_64bit(): try: import ida_ida return ida_ida.inf_is_64bit() except (ImportError, AttributeError): pass try: return idaapi.get_inf_structure().is_64bit() except AttributeError: pass try: return idc.get_inf_attr(idc.INF_LFLAGS) & 0x0002 != 0 except Exception: pass return False # ======================================================== # 日志 # ======================================================== def log(msg, level="INFO"): prefix = {"INFO": "[*]", "OK": "[+]", "WARN": "[!]", "ERROR": "[-]"}.get(level, "[?]") print(f"{prefix} {msg}") # ======================================================== # 输出目录 # ======================================================== def get_output_dir(): d = OUTPUT_DIR if OUTPUT_DIR else os.path.dirname(idc.get_idb_path()) os.makedirs(d, exist_ok=True) return d # ======================================================== # 段属性工具 # ======================================================== DATA_SEG_NAMES = { ".data", ".rdata", ".rodata", ".bss", ".idata", ".edata", ".tls", ".rsrc", ".got", ".got.plt", ".init_array", ".fini_array", "DATA", "BSS", } def is_data_segment(seg): if seg is None: return False if seg.type in (idaapi.SEG_DATA, idaapi.SEG_BSS, idaapi.SEG_NULL, idaapi.SEG_UNDF): return True sname = ida_segment.get_segm_name(seg).lower() return any(sname.startswith(n.lower()) for n in DATA_SEG_NAMES) def is_code_segment(seg): return seg is not None and seg.type == idaapi.SEG_CODE # ======================================================== # 数据类型判断 # ======================================================== def determine_data_type(flags): if ida_bytes.is_byte(flags): return "byte" if ida_bytes.is_word(flags): return "word" if ida_bytes.is_dword(flags): return "dword" if ida_bytes.is_qword(flags): return "qword" if ida_bytes.is_strlit(flags): return "string" if ida_bytes.is_struct(flags): return "struct" if ida_bytes.is_float(flags): return "float" if ida_bytes.is_double(flags): return "double" if ida_bytes.is_align(flags): return "align" if ida_bytes.is_code(flags): return "code" return "unknown" def ida_known_item_size(ea): flags = ida_bytes.get_flags(ea) dtype = determine_data_type(flags) if dtype in ("byte", "word", "dword", "qword", "float", "double", "string", "struct", "align"): sz = idc.get_item_size(ea) if sz and sz > 0: return sz, True if dtype == "code": return 0, False sz = idc.get_item_size(ea) if sz and sz > 0: return sz, False return 0, False # ======================================================== # 智能数据边界 # ======================================================== def compute_data_size(ea, all_known_eas, seg): sz, reliable = ida_known_item_size(ea) if reliable and sz > 0: return sz, "ida_defined" next_anchor = _find_next_anchor(ea, all_known_eas, seg) if next_anchor is not None and next_anchor > ea: return min(next_anchor - ea, MAX_DATA_SIZE), f"inferred_to_next(0x{next_anchor:X})" if seg: remaining = seg.end_ea - ea capped = min(remaining, MAX_DATA_SIZE) if capped > 0: return capped, "segment_end" return 1, "fallback_1byte" def _find_next_anchor(ea, all_known_eas, seg): candidates = [] idx = bisect.bisect_right(all_known_eas, ea) if idx < len(all_known_eas): candidates.append(all_known_eas[idx]) nh = idc.next_head(ea + 1, idaapi.BADADDR) if nh and nh != idaapi.BADADDR: candidates.append(nh) next_named = ida_name.get_next_name_ea(ea) if next_named and next_named != idaapi.BADADDR: candidates.append(next_named) seg_end = seg.end_ea if seg else idaapi.BADADDR if seg: candidates.append(seg_end) candidates = [c for c in candidates if ea < c <= seg_end] return min(candidates) if candidates else None # ======================================================== # 安全读取 # ======================================================== def read_safe(ea, size): if size <= 0: return b"" data = ida_bytes.get_bytes(ea, min(size, MAX_DATA_SIZE)) return bytes(data) if data else b"" def get_string_at(ea): str_type = idc.get_str_type(ea) if str_type is None or str_type < 0: str_type = idc.STRTYPE_C s = idc.get_strlit_contents(ea, -1, str_type) if s is None: return None try: return s.decode("utf-8", errors="replace") except Exception: return repr(s) # ======================================================== # ★ IDA LST 风格渲染核心 # ======================================================== def _fmt_addr(ea, seg_name, addr_width): """生成 '.rdata:0000000140928A80' 风格的地址前缀""" return f"{seg_name}:{ea:0{addr_width}X}" def _ascii_char(b): if 0x20 <= b < 0x7F and b != ord("'"): return chr(b) return None def _render_string_literal(raw: bytes) -> str: """把字节序列渲染成 IDA 风格字符串,例如 'hello',0""" parts = [] i = 0 while i < len(raw): b = raw[i] if b == 0: # 终结符单独列出 parts.append("0") i += 1 continue ch = _ascii_char(b) if ch: # 尽量合并连续可打印字符 run = [] while i < len(raw) and _ascii_char(raw[i]) is not None: run.append(_ascii_char(raw[i])) i += 1 parts.append(f"'{''.join(run)}'") else: parts.append(f"{b:02X}h") i += 1 return ", ".join(parts) def render_ida_lst_entry(entry: dict, addr_width: int = 16) -> list: """ 将单个 entry 渲染为若干行 IDA LST 风格字符串。 返回 list[str],每个元素是一行(不含换行符)。 """ ea = entry["ea_int"] seg_name = entry["segment"] dtype = entry["type"] name = entry["name"] raw = bytes(entry["data_bytes"]) size = len(raw) from_ea = entry["from_ea"] depth = entry["depth"] ptr_size = 8 if is_64bit() else 4 lines = [] addr_prefix = _fmt_addr(ea, seg_name, addr_width) # ---------- 注释头:名称 + xref ---------- comment_parts = [] if from_ea and from_ea != "N/A": comment_parts.append(f"DATA XREF: {from_ea}") if depth > 0: comment_parts.append(f"depth={depth}") if entry.get("ptr_chain"): comment_parts.append("chain: " + " -> ".join(entry["ptr_chain"])) # ---------- 按类型生成主体行 ---------- def one_line(addr, mnemonic, operand, comment=""): pfx = _fmt_addr(addr, seg_name, addr_width) cmt = f" ; {comment}" if comment else "" # IDA 风格:地址(20) 名称/标签 助记符 操作数 return f"{pfx} {mnemonic:<6} {operand}{cmt}" # --- 名称行(如果有 label)--- if name: label_line = f"{addr_prefix} {name}" # 追加 xref 注释 if comment_parts: label_line += f" ; {'; '.join(comment_parts)}" lines.append(label_line) # 之后的数据行不再重复注释 comment_parts = [] cmt_suffix = ("; " + "; ".join(comment_parts)) if comment_parts else "" # ===== qword(指针/偏移)===== if dtype == "qword" and size >= 8: # 尝试识别为 offset val = struct.unpack_from("<Q", raw[:8])[0] target_name = ida_name.get_name(val) if val else "" if target_name: operand = f"offset {target_name}" else: operand = f"{val:016X}h" lines.append(one_line(ea, "dq", operand, cmt_suffix.lstrip("; "))) # 剩余字节逐字节输出 for i in range(8, size): lines.append(one_line(ea + i, "db", f"{raw[i]:3d} ; {raw[i]:02X}h")) return lines # ===== dword ===== if dtype == "dword" and size >= 4: val = struct.unpack_from("<I", raw[:4])[0] lines.append(one_line(ea, "dd", f"{val:08X}h", cmt_suffix.lstrip("; "))) for i in range(4, size): lines.append(one_line(ea + i, "db", f"{raw[i]:3d} ; {raw[i]:02X}h")) return lines # ===== word ===== if dtype == "word" and size >= 2: val = struct.unpack_from("<H", raw[:2])[0] lines.append(one_line(ea, "dw", f"{val:04X}h", cmt_suffix.lstrip("; "))) for i in range(2, size): lines.append(one_line(ea + i, "db", f"{raw[i]:3d} ; {raw[i]:02X}h")) return lines # ===== byte ===== if dtype == "byte" and size == 1: b = raw[0] ch = _ascii_char(b) operand = f"{b:3d} ; {b:02X}h" + (f" '{ch}'" if ch else "") lines.append(one_line(ea, "db", operand, cmt_suffix.lstrip("; "))) return lines # ===== string ===== if dtype == "string": sv = entry.get("string_value") or "" # 第一行:db 字符串内容,注释写原始字符串 str_repr = _render_string_literal(raw) preview = sv[:60].replace("\n", "\\n") if sv else raw[:60].hex() lines.append(one_line(ea, "db", str_repr, f'"{preview}"')) return lines # ===== float ===== if dtype == "float" and size >= 4: val = struct.unpack_from("<f", raw[:4])[0] lines.append(one_line(ea, "dd", f"{val} ; float", cmt_suffix.lstrip("; "))) return lines # ===== double ===== if dtype == "double" and size >= 8: val = struct.unpack_from("<d", raw[:8])[0] lines.append(one_line(ea, "dq", f"{val} ; double", cmt_suffix.lstrip("; "))) return lines # ===== align ===== if dtype == "align": lines.append(one_line(ea, "align", str(size), cmt_suffix.lstrip("; "))) return lines # ===== unknown / struct / code / fallback ===== # 逐字节输出,每 16 字节一段加 ASCII 注释(模拟 hex dump) first = True for i in range(0, size): b = raw[i] ch = _ascii_char(b) cmt = f"{b:02X}h" + (f" '{ch}'" if ch else "") # 第一行附加 xref 注释 if first and cmt_suffix: cmt = cmt + " " + cmt_suffix first = False lines.append(one_line(ea + i, "db", f"{b:3d}", cmt)) return lines def build_lst_text(collected: OrderedDict, func_ea: int, func_name: str) -> str: """ 把所有 entry 按地址分组(同一段连续),生成完整的 LST 文本。 """ addr_width = 16 if is_64bit() else 8 ptr_size = 8 if is_64bit() else 4 out = [] out.append("; =" * 35) out.append(f"; Function Data Extraction — {func_name} @ 0x{func_ea:X}") out.append(f"; Entries: {len(collected)} PtrSize: {ptr_size*8}-bit") out.append("; =" * 35) out.append("") cur_seg = None for entry in collected.values(): seg_name = entry["segment"] # 换段时插入段分隔注释 if seg_name != cur_seg: if cur_seg is not None: out.append("") out.append(f"; {'─'*66}") out.append(f"; Segment: {seg_name}") out.append(f"; {'─'*66}") cur_seg = seg_name # 渲染该条目 entry_lines = render_ida_lst_entry(entry, addr_width) out.extend(entry_lines) out.append("") out.append("; end of extraction") return "\n".join(out) # ======================================================== # 函数解析:支持名称或地址输入 # ======================================================== def resolve_function(input_str): s = input_str.strip() if not s: return None, "输入为空" func_ea = None candidate = None try: if s.lower().startswith("0x"): candidate = int(s, 16) elif re.fullmatch(r"[0-9a-fA-F]{4,}", s): candidate = int(s, 16) else: candidate = int(s, 0) except ValueError: candidate = None if candidate is not None: f = ida_funcs.get_func(candidate) if f is not None: func_ea = f.start_ea log(f"地址 0x{candidate:X} -> 函数 0x{func_ea:X}") else: log(f"0x{candidate:X} 不在已知函数内,尝试按名称查找...", "WARN") if func_ea is None: name_ea = ida_name.get_name_ea(idaapi.BADADDR, s) if name_ea not in (idaapi.BADADDR, 0): f = ida_funcs.get_func(name_ea) if f is not None: func_ea = f.start_ea log(f"名称 '{s}' -> 函数 0x{func_ea:X}") else: return None, f"名称 '{s}' 对应地址 0x{name_ea:X} 不在任何函数内" else: log(f"精确名称未找到,模糊匹配 '{s}'...", "WARN") matches = [ (f_ea, ida_name.get_name(f_ea) or "") for f_ea in idautils.Functions() if s.lower() in (ida_name.get_name(f_ea) or "").lower() ] if len(matches) == 1: func_ea = matches[0][0] log(f"模糊匹配: {matches[0][1]} @ 0x{func_ea:X}", "OK") elif len(matches) > 1: show = matches[:10] log(f"模糊匹配到 {len(matches)} 个:", "WARN") for i, (mea, mn) in enumerate(show): log(f" [{i}] {mn} @ 0x{mea:X}") choice = idaapi.ask_str("0", 0, f"找到 {len(matches)} 个,输入序号 (0~{len(show)-1}):") if choice is None: return None, "用户取消" try: idx = int(choice.strip()) if 0 <= idx < len(show): func_ea = show[idx][0] else: return None, f"序号 {idx} 超出范围" except ValueError: return None, f"无效序号: {choice}" else: return None, f"未找到包含 '{s}' 的函数" if func_ea is None: return None, f"无法解析: '{s}'" return func_ea, "" # ======================================================== # 收集函数引用数据 # ======================================================== def collect_function_refs(func_ea, follow_ptrs=True, max_depth=MAX_DEPTH): raw_refs = {} visited_code = set() pending_ptrs = [] func = ida_funcs.get_func(func_ea) if func is None: log(f"无法获取函数: 0x{func_ea:X}", "ERROR") return OrderedDict() func_name = ida_name.get_name(func_ea) or f"sub_{func_ea:X}" log(f"分析函数: {func_name} @ 0x{func_ea:X}") ptr_size = 8 if is_64bit() else 4 ptr_fmt = "<Q" if ptr_size == 8 else "<I" all_insn_eas = [] for chunk_start, chunk_end in idautils.Chunks(func_ea): ea = chunk_start while ea < chunk_end: all_insn_eas.append(ea) visited_code.add(ea) ea = idc.next_head(ea, chunk_end) log(f"函数共 {len(all_insn_eas)} 条指令") def record_ref(ref_ea, from_ea=None, depth=0, chain=None): if ref_ea in (0, idaapi.BADADDR): return seg = ida_segment.getseg(ref_ea) if seg is None: return if is_code_segment(seg) and ref_ea in visited_code: return if ref_ea not in raw_refs: raw_refs[ref_ea] = {"from_ea": from_ea, "depth": depth, "chain": chain or []} log("--- 阶段1: DataRefsFrom ---") for insn_ea in all_insn_eas: for ref_ea in idautils.DataRefsFrom(insn_ea): record_ref(ref_ea, insn_ea, 0) log("--- 阶段2: 操作数解析 ---") for insn_ea in all_insn_eas: insn = idaapi.insn_t() if idaapi.decode_insn(insn, insn_ea) <= 0: continue for op in insn.ops: if op.type == idaapi.o_void: break ref_ea = idaapi.BADADDR if op.type == idaapi.o_mem: ref_ea = op.addr elif op.type == idaapi.o_displ: ref_ea = op.addr elif op.type == idaapi.o_imm: cand = op.value if cand > 0x1000: s = ida_segment.getseg(cand) if s and is_data_segment(s): ref_ea = cand if ref_ea not in (0, idaapi.BADADDR): record_ref(ref_ea, insn_ea, 0) log("--- 阶段3: xref 补充 ---") for insn_ea in all_insn_eas: xref = ida_xref.xrefblk_t() ok = xref.first_from(insn_ea, ida_xref.XREF_DATA) while ok: record_ref(xref.to, insn_ea, 0) ok = xref.next_from() log(f"第一轮共发现 {len(raw_refs)} 个候选地址") log("--- 第二轮: 智能边界计算 + 数据读取 ---") sorted_eas = sorted(raw_refs.keys()) collected = OrderedDict() def build_entry(ref_ea, meta): seg = ida_segment.getseg(ref_ea) if seg is None: return None flags = ida_bytes.get_flags(ref_ea) dtype = determine_data_type(flags) name = ida_name.get_name(ref_ea) or "" seg_name = ida_segment.get_segm_name(seg) size, size_method = compute_data_size(ref_ea, sorted_eas, seg) data_bytes = read_safe(ref_ea, size) actual_size = len(data_bytes) entry = { "ea": f"0x{ref_ea:X}", "ea_int": ref_ea, "name": name, "segment": seg_name, "type": dtype, "size": actual_size, "size_method": size_method, "data_hex": data_bytes.hex(), "data_bytes": list(data_bytes), "from_ea": f"0x{meta['from_ea']:X}" if meta["from_ea"] else "N/A", "depth": meta["depth"], "ptr_chain": meta["chain"], } if dtype == "string": entry["string_value"] = get_string_at(ref_ea) if actual_size >= ptr_size: try: entry["ptr_value"] = struct.unpack(ptr_fmt, data_bytes[:ptr_size])[0] except Exception: entry["ptr_value"] = 0 else: entry["ptr_value"] = int.from_bytes(data_bytes, "little") if data_bytes else 0 log(f" [D{meta['depth']}] 0x{ref_ea:X} | {seg_name:12s} | {dtype:8s} " f"| {actual_size:6d}B | [{size_method}] | {name or '<unnamed>'}") return entry for ref_ea in sorted_eas: meta = raw_refs[ref_ea] entry = build_entry(ref_ea, meta) if entry: collected[ref_ea] = entry if follow_ptrs and meta["depth"] < max_depth: pv = entry["ptr_value"] if pv and pv != idaapi.BADADDR: pending_ptrs.append((pv, ref_ea, meta["depth"] + 1, meta["chain"] + [f"0x{ref_ea:X}"])) if follow_ptrs: log(f"--- 阶段4: 指针追踪 (最大深度={max_depth}) ---") processed = set() while pending_ptrs: ptr_ea, from_ea, depth, chain = pending_ptrs.pop(0) if ptr_ea in processed or ptr_ea in collected: continue processed.add(ptr_ea) seg = ida_segment.getseg(ptr_ea) if seg is None: continue if is_code_segment(seg) and ptr_ea in visited_code: continue bisect.insort(sorted_eas, ptr_ea) raw_refs[ptr_ea] = {"from_ea": from_ea, "depth": depth, "chain": chain} meta = raw_refs[ptr_ea] entry = build_entry(ptr_ea, meta) if entry: collected[ptr_ea] = entry if depth < max_depth: pv = entry["ptr_value"] if pv and pv != idaapi.BADADDR: pending_ptrs.append((pv, ptr_ea, depth + 1, chain + [f"0x{ptr_ea:X}"])) log(f"共收集 {len(collected)} 个数据条目") return OrderedDict(sorted(collected.items())) # ======================================================== # 统计 # ======================================================== def print_statistics(collected): type_cnt = Counter(e["type"] for e in collected.values()) seg_cnt = Counter(e["segment"] for e in collected.values()) total_b = sum(e["size"] for e in collected.values()) log("=" * 60) log(f" 总条目数: {len(collected)} 总数据量: {total_b} bytes ({total_b/1024:.2f} KB)") log(" 类型: " + " ".join(f"{t}×{c}" for t, c in type_cnt.most_common())) log(" 段: " + " ".join(f"{s}×{c}" for s, c in seg_cnt.most_common())) log("=" * 60) # ======================================================== # 保存文件 # ======================================================== def save_outputs(func_ea, collected, out_dir): func_name = ida_name.get_name(func_ea) or f"sub_{func_ea:X}" safe_name = "".join(c if c.isalnum() or c in "-_." else "_" for c in func_name) base = os.path.join(out_dir, f"funcdata_{safe_name}_0x{func_ea:X}") saved = [] # ---- LST(默认/主要格式)---- if OUTPUT_FORMAT in ("lst", "all"): path = base + ".lst" text = build_lst_text(collected, func_ea, func_name) with open(path, "w", encoding="utf-8") as f: f.write(text) saved.append(path) log(f"LST 已保存: {path}", "OK") # ---- JSON ---- if OUTPUT_FORMAT in ("json", "all"): path = base + ".json" export = { "meta": { "func_ea": f"0x{func_ea:X}", "func_name": func_name, "entry_count": len(collected), "ida_version": idaapi.get_kernel_version(), "ptr_size": 8 if is_64bit() else 4, }, "entries": [ {k: v for k, v in e.items() if k != "ea_int"} for e in collected.values() ] } with open(path, "w", encoding="utf-8") as f: json.dump(export, f, indent=2, ensure_ascii=False) saved.append(path) log(f"JSON 已保存: {path}", "OK") # ---- Binary ---- if OUTPUT_FORMAT in ("binary", "all"): path = base + ".bin" MAGIC, VER = b"FDAT", 2 entries = list(collected.values()) parts = [struct.pack("<4sII", MAGIC, VER, len(entries))] for e in entries: raw = bytes(e["data_bytes"]) name_b = e["name"].encode("utf-8") if e["name"] else b"" parts.append(struct.pack("<QII", e["ea_int"], len(raw), len(name_b))) parts.append(name_b) parts.append(raw) blob = b"".join(parts) with open(path, "wb") as f: f.write(blob) saved.append(path) log(f"BIN 已保存: {path} ({len(blob)} bytes)", "OK") return saved # ======================================================== # 批量处理 # ======================================================== def process_batch(input_list): results = {} for item in input_list: log(f"\n{'='*60}\n处理: {item}") func_ea, err = resolve_function(item) if func_ea is None: log(f"跳过 '{item}': {err}", "WARN") results[item] = {"status": "failed", "error": err} continue collected = collect_function_refs(func_ea, follow_ptrs=FOLLOW_POINTERS, max_depth=MAX_DEPTH) if not collected: results[item] = {"status": "empty", "func_ea": func_ea} continue print_statistics(collected) saved = save_outputs(func_ea, collected, get_output_dir()) results[item] = {"status": "ok", "func_ea": func_ea, "entries": len(collected), "files": saved} return results # ======================================================== # 主入口 # ======================================================== def main(): log("=" * 60) log(f"IDA Pro 9.3 - Function Data Extractor v4 " f"({'64' if is_64bit() else '32'}-bit)") log(f"输出格式: {OUTPUT_FORMAT}") log("=" * 60) cur_ea = idc.get_screen_ea() cur_func = ida_funcs.get_func(cur_ea) default = (ida_name.get_name(cur_func.start_ea) or f"0x{cur_func.start_ea:X}" if cur_func else f"0x{cur_ea:X}") user_input = idaapi.ask_str( default, 0, "输入函数名称或地址(多个目标用逗号分隔)\n" "示例: main, 0x401000, sub_402ABC" ) if user_input is None: log("已取消", "WARN"); return targets = [t.strip() for t in re.split(r"[,;\n]+", user_input) if t.strip()] if not targets: log("未输入任何目标", "WARN"); return log(f"待处理 {len(targets)} 个: {targets}") if len(targets) == 1: func_ea, err = resolve_function(targets[0]) if func_ea is None: log(f"解析失败: {err}", "ERROR"); return collected = collect_function_refs(func_ea, follow_ptrs=FOLLOW_POINTERS, max_depth=MAX_DEPTH) if not collected: log("未找到任何数据引用", "WARN"); return print_statistics(collected) out_dir = get_output_dir() saved = save_outputs(func_ea, collected, out_dir) log("=" * 60) log(f"完成!输出目录: {out_dir}", "OK") for p in saved: log(f" {p}", "OK") log("=" * 60) else: results = process_batch(targets) log("\n" + "=" * 60 + "\n批量汇总:") for target, r in results.items(): status = r["status"] if status == "ok": log(f" [OK] {target:30s} -> {r['entries']} 条", "OK") elif status == "empty": log(f" [空] {target:30s} -> 无数据引用", "WARN") else: log(f" [ERR] {target:30s} -> {r.get('error','?')}", "ERROR") log("=" * 60) if __name__ == "__main__": main()
此内容由惯性聚合(RSS阅读器)自动聚合整理,仅供阅读参考。 原文来自 — 版权归原作者所有。