GDBを使用してLinuxプロセスにその場でパッチを適用する

Linuxでの関数フックの手法はよく知られており、インターネットで説明されています。 最も簡単な方法は、「クローン関数」を使用して動的ライブラリを作成し、LD_PRELOADメカニズムを使用して、プロセスのロード段階でインポートテーブルをオーバーライドすることです。

LD_PRELOADの欠点は、プロセスの開始を制御する必要があることです。 すでに実行中のプロセスの関数またはインポートテーブルにない関数をインターセプトするには、「スプライシング」を使用します。インターセプトされた関数の先頭でインターセプターに移動するコマンドを記録します。

Pythonには、C言語のデータや関数(つまり、Cインターフェースを備えた多数の動的ライブラリ)とやり取りできるctypesモジュールがctypesことも知られています。 したがって、プロセスの機能をインターセプトし、 ctypesを使用してCコールバックでラップされたPythonメソッドに送信することを妨げるものはありません。

制御をインターセプトし、ターゲットプロセスにコードをロードするには、Python( https://sourceware.org/gdb/current/onlinedocs/gdb/Python-API.html )で拡張モジュールの作成をサポートするGDBデバッガーを使用すると便利です。
ニュアンス
この例のコードは記事の最後に完全に記載されており、2つのファイルで構成されています。

  • pyinject.py-GDB拡張機能
  • hook.py-フック関数を備えたモジュール

GDB側では、コードはユーザーコマンドとして便利にフォーマットされます。 gdb.Commandクラスから継承することにより、新しいコマンドを作成できます。 GDBでコマンドを使用する場合、 invoke(argument, from_tty)メソッドが呼び出されます。

gdb.Parameter継承するカスタムパラメータを作成することもできます。 サンプル記事では、インターセプション関数でファイル名を指定するために使用されます。

実行中のPIDプロセスへの接続とモジュールのロードは、GDBを起動するとすぐに実行できます
 gdb -ex 'attach PID' -ex 'source pyinject.py' -ex 'set hookfile hook.py' 
このデバッグされたプロセスのフィールドが停止し、インタラクティブなGDBコマンドラインが起動します。ここで、新しいpyinjectコマンドが使用可能になります。

傍受は3つの段階に分けることができます。
  1. Pythonインタープリターをターゲットプロセスのアドレス空間に挿入する
  2. キャプチャされた機能に関する情報のキャプチャ
  3. 実際に傍受
節1と2はデバッガー側で簡単に実行でき、節3はすでにターゲットプロセス内にあります。

Pythonインタープリターインジェクション


Python GDBインターフェースのほとんどは、デバッグ機能を拡張するように設計されています。 他のすべてについては、 gdb.execute(command, from_tty, to_string)があります。これにより、任意のGDBコマンドを実行し、その出力を文字列として取得できます。
例:
 out = gdb.execute("info registers", False, True) 
また、式を評価し、結果をgdb.Valueとして返すgdb.parse_end_eval(expression)gdb.Valueます。

最初のステップは、Pythonライブラリをターゲットプロセスのアドレス空間にロードすることです。 これを行うには、ターゲットプロセスのコンテキストでdlopenを呼び出します。
gdb.executeまたはgdb.parse_and_evalcallコマンドを使用できます。
 # pyinject.py gdb.execute('call dlopen("libpython2.7.so", %d)' % RTLD_LAZY) assert long(gdb.history(0)) handle = gdb.parse_and_eval('dlopen("libpython2.7.so", %d)' % RTLD_LAZY) assert long(handle) 

その後、インタープリターを初期化できます
 # pyinject.py gdb.execute('call PyEval_InitThreads()') gdb.execute('call Py_Initialize()') 
最初の呼び出しはGIL(グローバルインタープリターロック)を作成し、2番目の呼び出しはPython C-APIを使用するために準備します。

そしてインターセプション関数でモジュールをロードします
 # pyinject.py fp = gdb.parse_and_eval('fopen("hook.py", "r")') assert long(fp) != 0 pyret = gdb.parse_and_eval('PyRun_AnyFileEx(%u, "hook.py", 1)' % fp) 
PyRun_AnyFileExは、 PyRun_AnyFileExモジュールのコンテキストでファイルからコードを実行します。
ニュアンス
上記は、ターゲットプロセスがPythonを(メインまたはスクリプト言語として)使用しない場合にのみ機能します。 そうでない場合、すべてが非常に複雑です。 主な問題は、デバッグのためにランダムな場所で停止したプロセスでは、Python C-API関数を使用できないことです(おそらくPy_AddPendingCallを除く)。

Hook.pyモジュール


hook.pyモジュールには、フック関数と、フック自体を実行するフッククラスが含まれています。
インターセプター関数は、デコレーターを使用して指定されます。 たとえば、標準ライブラリのopen関数の場合、引数を出力し、 origフィールドに格納されている元の関数を呼び出した結果を返します
 # hook.py @hook(symbol='open', ctype=CFUNCTYPE(c_int, c_char_p, c_int)) def python_open(fname, oflag): print "open: ", fname, oflag return python_open.orig(fname, oflag) 

@hookデコレーターは2つのパラメーターを受け入れます。デコレータはHookクラスに関数を登録し、変更せずに戻ります。
 # hook.py def hook(symbol, ctype): def deco(func): Hook.register(symbol, ctype, func) return func return deco 

registerメソッドは、クラスのインスタンスを作成し、 all_hooks辞書に格納します。 したがって、ファイルの実行後、 Hook.all_hooksのデコレーターのおかげで、インターセプターの利用可能な機能に関するすべての情報が得られます。
 # hook.py class Hook(object): all_hooks = {} @staticmethod def register(symbol, *args): Hook.all_hooks[symbol] = Hook(symbol, *args) 

1つの関数を呼び出してGDBからインターセプトするには、インターセプトを担当するHookクラスで静的メソッドを定義すると便利です
 # hook.py class Hook(object): @staticmethod def hook(symbol, *args): h = Hook.all_hooks[symbol] if h.active: return h.install(*args) 
*argsは、フックされた関数に関する追加情報をここに提供します。 どれが傍受の方法に依存します。

スプライシングの傍受方法


スプライシングは、元の関数の呼び出し方法に応じて、2つの亜種にグローバルに分割されます。

単純なフックでは、元の関数の呼び出しはいくつかのステップで構成されます。
  1. 元の関数の先頭が保存されたコピーから復元されます
  2. 電話をかける
  3. 始まりは、インターセプターへの遷移命令によって再び上書きされます
ニュアンス
欠点は明らかです。マルチスレッドプログラムでは、別のスレッドが関数の先頭を上書きするときに関数を呼び出さないことを保証できません。 これは、元の関数が呼び出されている間に他のスレッドを停止することで部分的に処理されます。 しかし、第一に、これを達成する標準的な方法はありません。第二に、mallocのような関数の呼び出しに失敗すると、デッドロックをキャッチできます。

トランポリンフックでは、元の関数の先頭が新しい場所にコピーされ、その後、元の関数の本体への遷移が記録されます。 このオプションでは、元の機能は常に新しいアドレスで使用できます。

トランポリンフックはマルチスレッドプログラムで動作しますが、インストールははるかに困難です。 整数個の命令を書き換える必要がありますが、一般的には逆アセンブラが使用されます。 x86_64アーキテクチャの出現により、 %ripレジスタ(現在のコマンドアドレス)に対するメモリアドレス指定の偏在性により、さらに多くの問題が追加されました。
ニュアンス
GDBのopen関数の始まりを見てみましょう。
 0x7f6cc8aa83e0 <open64+0>: 83 3d ed 33 2d 00 00 cmpl $0x0,0x2d33ed(%rip) 0x7f6cc8aa83e7 <open64+7>: 75 10 jne 0x7f6cc8aa83f9 <open64+25> 0x7f6cc8aa83e9 <__open_nocancel+0>: b8 02 00 00 00 mov $0x2,%eax 0x7f6cc8aa83ee <__open_nocancel+5>: 0f 05 syscall 

最初のコマンド " cmpl $0x0,0x2d33ed(%rip) "を別のアドレスに0x7f6cc8d7b7d4 、現在0x7f6cc8d7b7d4指している相対アドレス0x2d33ed(%rip)は別の場所(hi SIGSEGV)を指します。

この関数のトランポリンフックを作成するには、次のものが必要です。
  1. 関数の先頭でコマンドのサイズを決定します
  2. cmplコマンドの宛先アドレスから2 GB以下のメモリを割り当てます(オフセット0x2d33ed(%rip)署名付き32ビット)
  3. 先頭を新しい場所にコピーし、 cmpl %ripに関連するメモリアクセスにパッチを当てます
さらに、jumpコマンドは9バイトより短くする必要があります。 2つのエントリポイントを持つ関数であり、アドレス0x7f6cc8aa83e9すでに__open_nocancelです。 これは、32ビットのトランジションを許可するために、 openの開始からスプリングボードが2 GBを超えてはならないことを意味します(64ビットのトランジションはすべて9バイトより長い)。

原則として、GDBのすべての機能( gdb.execute() )の背後にあるため、トランポリンフックを正しく実装することを妨げるものはありませんが、簡単にするために、この記事では単純なフックを使用します。

単純なフックでは、唯一の制限はジャンプ命令の長さです。
2つのオプションがあります(メイン):

この記事では2番目の方法を使用します
 # hook.py class Hook(object): @staticmethod def get_indlongjmp(srcaddr, proxyaddr): s = struct.pack('=BBl', 0xff, 0x25, proxyaddr - srcaddr - 6) return map(ord, s) 
get_indlongjmpは、 srcaddrアドレスからproxyaddr QWORDに格納されているアドレスにジャンプするためのコードを返します

これで、 Hookクラスの欠落しているメソッドを最終的に記述できます。 installメソッドは、元の関数addressproxyaddr補助ゾーンのアドレスを取得します。 その後、インターセプターに切り替えて、関数の先頭を書き換えます(以前はself.code保存していself.code
 # hook.py def install(self, address, proxyaddr): self.address = address self.proxyaddr = proxyaddr proxymemory = (c_void_p * 1).from_address(self.proxyaddr) proxymemory[0] = Hook.cast_to_void_p(self.cfunc) self.jmp = self.get_indlongjmp(self.address, self.proxyaddr) self.memory = (c_ubyte * len(self.jmp)).from_address(self.address) self.code = list(self.memory) self.patchmem(self.jmp) self.pyfunc.orig = self.origfunc() self.active = True 

patchmemは、 srcデータで元の関数の先頭を上書きします
 # hook.py def patchmem(self, src): for i in range(len(src)): self.memory[i] = src[i] 

origfuncは、インターセプターへの遷移を削除および設定するコードで関数呼び出しをラップします。
 # hook.py def origfunc(self): ofunc = self.ctype(self.address) def wrap(*args): self.patchmem(self.code) val = ofunc(*args) self.patchmem(self.jmp) return val return wrap 

最後の仕上げ


Pythonはアドレス空間にロードされ、hook.pyファイルはPythonにロードされます。 GDBモジュールのPython側からHook.hook(symbol, address, proxyaddr)を呼び出すことは残ります。

open 」機能のアドレスを見つける
 line = gdb.execute('info address %s' % "open" False, True) m = re.match(r'.*?(0x[0-9a-f]+)', line) addr = int(m.group(1), 16) 
ニュアンス
一般に、停止したプロセスのコードを書き直すために実行する前に、このコードの途中で停止しないように(またはそれに戻るように)する必要があります。 これを行う最も簡単な方法は、 gdb.execute("thread apply all backtrace")の出力を解析することgdb.execute("thread apply all backtrace")

addr近くにメモリを割り当てます
 prot = PROT_READ | PROT_WRITE | PROT_EXEC flags = MAP_PRIVATE | MAP_ANONYMOUS maddr = gdb.parse_and_eval('(void*)mmap(0x%x, %d, %d, %d, -1, 0)\n' % (addr | 0x7FFFFFFF, 4096, prot, flags)) maddr = (long(maddr) & 0x00000000FFFFFFFF) | (addr & 0xFFFFFFFF00000000) 
ニュアンス
最後の行は、結果の上位ビットを食い尽くすGDBのバグの回避策です。 引数(addr | 0x7FFFFFFF)は、文書化されていないプロパティmmapを使用して、目的のアドレスよりも小さいアドレスのメモリを(addr | 0x7FFFFFFF)ます。

トリックなしでは、正しい方法で少し長くなりますgdb.execute('info proc mappings', False, True)の出力を解析し、アドレス空間でaddrに最も近い穴を見つけ、 MAP_FIXED mmapをフェッチする必要があります。 もちろん、インターセプトされた関数ごとにメモリのページ全体を割り当てる必要はありません。

元の関数の書き換えを許可(別名SIGSEGV)
 gdb.parse_and_eval('mprotect(0x%x, %u, %d)' % (addr & -0x1000, 4096*2, prot)) 

PyRun_SimpleStringを介してHook.hookを呼び出しPyRun_SimpleString
 pyret = gdb.parse_and_eval('PyRun_SimpleString("Hook.hook(\\"open\\", 0x%x, 0x%x)")' % (addr, maddr)) 

できた! これで、ターゲットプロセスでの「 open 」の呼び出しがインターセプトされ、 python_openからpython_openにルーティングされます。

サンプルファイル


完全なサンプルファイル(もう少し確認しますが、多くのニュアンスを考慮しません)
pyinject.py
 # pyinject.py import re import os RTLD_LAZY = 1 PROT_READ = 0x1 PROT_WRITE = 0x2 PROT_EXEC = 0x4 MAP_PRIVATE = 0x2 MAP_FIXED = 0x10 MAP_ANONYMOUS = 0x20 LIBPYTHON = 'libpython2.7.so' class ParamHookfile(gdb.Parameter): instance = None def __init__(self, default=''): super(ParamHookfile, self).__init__("hookfile", gdb.COMMAND_NONE, gdb.PARAM_FILENAME) self.value = default ParamHookfile.instance = self def get_set_string(self): return self.value def get_show_string(self, svalue): return svalue class CmdHook(gdb.Command): instance = None def __init__(self): super(CmdHook, self).__init__("pyinject", gdb.COMMAND_NONE) self.initialized = False CmdHook.instance = self def complete(self, text, word): matching = [s[4:] for s in dir(self) if s.startswith('cmd_') and s[4:].startswith(text)] return matching def invoke(self, subcmd, from_tty): self.dont_repeat() if subcmd.startswith("hook"): self.cmd_hook(*gdb.string_to_argv(subcmd)) elif subcmd.startswith("unhook"): self.cmd_unhook(*gdb.string_to_argv(subcmd)) else: gdb.write('unknown sub-command "%s"' % subcmd) def cmd_hook(self, *args): self.initialize() if not self.initialized: return pyret = gdb.parse_and_eval('PyRun_SimpleString("print Hook")') if long(pyret) != 0: hookfile = ParamHookfile.instance.value if not os.path.exists(hookfile): gdb.write('Use "set hookfile <path>"\n') return fp = gdb.parse_and_eval('fopen("%s", "r")' % hookfile) assert long(fp) != 0 pyret = gdb.parse_and_eval('PyRun_AnyFileEx(%u, "%s", 1)' % (fp, hookfile)) if long(pyret) != 0: gdb.write('Error loading "%s"\n' % hookfile) return for symbol in args: try: line = gdb.execute('info address %s' % symbol, False, True) m = re.match(r'.*?(0x[0-9a-f]+)', line) if m: addr = int(m.group(1), 16) except gdb.error: continue prot = PROT_READ | PROT_WRITE | PROT_EXEC flags = MAP_PRIVATE | MAP_ANONYMOUS # | MAP_FIXED maddr = gdb.parse_and_eval('(void*)mmap(0x%x, %d, %d, %d, -1, 0)\n' % (addr | 0x7FFFFFFF , 4096, prot, flags)) maddr = (long(maddr) & 0x00000000FFFFFFFF) | (addr & 0xFFFFFFFF00000000) gdb.write("mmap = 0x%x\n" % maddr) if maddr == 0: continue gdb.parse_and_eval('mprotect(0x%x, %u, %d)' % (addr & -0x1000, 4096*2, prot)) pyret = gdb.parse_and_eval('PyRun_SimpleString("Hook.hook(\\"%s\\", 0x%x, 0x%x)")' % (symbol, addr, maddr)) if long(pyret) == 0: gdb.write('hook "%s" OK\n' % symbol) def cmd_unhook(self, *args): for symbol in args: pyret = gdb.parse_and_eval('PyRun_SimpleString("Hook.unhook(\\"%s\\")")' % (symbol)) if long(pyret) == 0: gdb.write('unhook "%s" OK\n' % symbol) def initialize(self): if self.initialized: return handle = gdb.parse_and_eval('dlopen("%s", %d)' % (LIBPYTHON, RTLD_LAZY)) if not long(handle): gdb.write('Cannot load library %s\n' % LIBPYTHON) return if not long(gdb.parse_and_eval('Py_IsInitialized()')): gdb.execute('call PyEval_InitThreads()') gdb.execute('call Py_Initialize()') self.initialized = True if __name__ == '__main__': ParamHookfile() CmdHook() 

hook.py
 # hook.py import struct from ctypes import (CFUNCTYPE, POINTER, c_ubyte, c_int, c_char_p, c_void_p) class Hook(object): all_hooks = {} @staticmethod def cast_to_void_p(pointer): return CFUNCTYPE(c_void_p, c_void_p)(lambda x: x)(pointer) @staticmethod def register(symbol, *args): Hook.all_hooks[symbol] = Hook(symbol, *args) def __init__(self, symbol, ctype, pyfunc): self.symbol = symbol self.ctype = ctype self.pyfunc = pyfunc self.cfunc = self.ctype(self.pyfunc) self.address = 0 self.proxyaddr = 0 self.jmp = None self.memory = None self.code = None self.active = False def install(self, address, proxyaddr): print "install:", hex(address) self.address = address self.proxyaddr = proxyaddr proxymemory = (c_void_p * 1).from_address(self.proxyaddr) proxymemory[0] = Hook.cast_to_void_p(self.cfunc) self.jmp = self.get_indlongjmp(self.address, self.proxyaddr) self.memory = (c_ubyte * len(self.jmp)).from_address(self.address) self.code = list(self.memory) self.patchmem(self.jmp) self.pyfunc.orig = self.origfunc() self.active = True def uninstall(self): self.patchmem(self.code) self.active = False def origfunc(self): ofunc = self.ctype(self.address) def wrap(*args): self.patchmem(self.code) val = ofunc(*args) self.patchmem(self.jmp) return val return wrap def patchmem(self, src): for i in range(len(src)): self.memory[i] = src[i] @staticmethod def get_indlongjmp(srcaddr, proxyaddr): # 64-bit indirect absolute jump (6 + 8 bytes) # ff 25 off32 jmpq *off32(%rip) try: s = struct.pack('=BBl', 0xff, 0x25, proxyaddr - srcaddr - 6) return map(ord, s) except: print hex(proxyaddr), hex(srcaddr), hex(proxyaddr - srcaddr - 6) raise @staticmethod def hook(symbol, address, proxyaddr): h = Hook.all_hooks[symbol] if h.active: return h.install(address, proxyaddr) @staticmethod def unhook(symbol): h = Hook.all_hooks[symbol] if not h.active: return h.uninstall() def hook(symbol, ctype): def deco(func): Hook.register(symbol, ctype, func) return func return deco #int open (const char *__file, int __oflag, ...) @hook(symbol='open', ctype=CFUNCTYPE(c_int, c_char_p, c_int)) def python_open(fname, oflag): print "open: ", fname, oflag return python_open.orig(fname, oflag) 

例を実行する(絶対パスで改善する)
 gdb -ex 'attach PID' -ex 'source /path/pyinject.py' -ex 'set hookfile /path/hook.py' (gdb) pyinject hook open (gdb) continue 

Source: https://habr.com/ru/post/J237575/


All Articles