巡风扫描器--nascan源码分析

mac2025-09-07  7

nascan部分的结构: lib init.py cider.py: CIDR形式IP地址解析; common.py: 其它方法; icmp.py: ICMP消息发送类; log.py: 控制台信息输出; mongo.py: 数据库连接; scan.py: 扫描与识别; start.py: 线程控制; plugin masscan.py :masscan调用脚本; nascan.py: 网络资产信息抓取引擎;

nascan代码分析

import thread from lib.common import * from lib.start import * if __name__ == "__main__": try: CONFIG_INI = get_config() # 读取配置,读取数据库中整个Config集合数据 log.write('info', None, 0, u'获取配置成功')#输出到控制台 STATISTICS = get_statistics() # 读取统计信息 MASSCAN_AC = [0] #值为1表示masscan正在扫描 NACHANGE = [0] #值为1表示能进入扫描阶段 thread.start_new_thread(monitor, (CONFIG_INI, STATISTICS, NACHANGE)) # 心跳线程 thread.start_new_thread(cruise, (STATISTICS, MASSCAN_AC)) # 失效记录删除线程 socket.setdefaulttimeout(int(CONFIG_INI['Timeout']) / 2) # 设置连接超时 ac_data = [] while True: now_time = time.localtime() now_hour = now_time.tm_hour now_day = now_time.tm_mday now_date = str(now_time.tm_year) + \ str(now_time.tm_mon) + str(now_day) cy_day, ac_hour = CONFIG_INI['Cycle'].split('|') log.write('info', None, 0, u'扫描规则: ' + str(CONFIG_INI['Cycle']))#log.write()函数,格式化了输出在控制台界面的信息,并使用了线程锁,防止信息一时间输出过多,导致显示错行。 # 判断是否进入扫描时段或者能直接进入扫描阶段 if (now_hour == int(ac_hour) and now_day % int(cy_day) == 0 and now_date not in ac_data) or NACHANGE[0]: ac_data.append(now_date) #恢复原值,不能再次进入资产探测,直到新的事件触发该值改变 NACHANGE[0] = 0 log.write('info', None, 0, u'开始扫描') # 具体的资产发现操作 s = start(CONFIG_INI) # masscan扫描状态 s.masscan_ac = MASSCAN_AC s.statistics = STATISTICS s.run() time.sleep(60) except Exception, e: print e

1、CONFIG_INI = get_config() # 读取配置,读取数据库中整个Config集合数据 读取配置,get_config()进去 nascan/lib/common.py

# 信息识别Config集合, 配置统一格式化,返回dict类型 def get_config(): config = {} # Config集合共有vulscan、nascan两个子集合,获取Config集合中的nascan子集合的文档内容 config_info = mongo.na_db.Config.find_one({"type": "nascan"}) for name in config_info['config']: if name in ['Discern_cms', 'Discern_con', 'Discern_lang', 'Discern_server']: ''' cms识别、组件容器识别、语言技术识别、端口服务识别四个部分的文档内容赋值配按照 事先定义的格式进一步格式化分离数据, 方便后续取用. ''' config[name] = format_config(name, config_info['config'][name]['value']) else: config[name] = config_info['config'][name]['value'] return config 读取了mongodb里面的Config集合中的nascan子集合的文档内容; 2、get_statistics()读取统计信息返回时间 位于nascan/lib/common.py def get_statistics(): date_ = datetime.datetime.now().strftime('%Y-%m-%d') now_stati = mongo.na_db.Statistics.find_one({"date": date_}) if not now_stati: now_stati = {date_: {"add": 0, "update": 0, "delete": 0}} return now_stati else: return {date_: now_stati['info']}

3、 MASSCAN_AC = [0] #值为1表示masscan正在扫描 NACHANGE = [0] #值为1表示能进入扫描阶段 4、 thread.start_new_thread(monitor, (CONFIG_INI, STATISTICS, NACHANGE)) # 心跳线程 thread.start_new_thread(cruise, (STATISTICS, MASSCAN_AC)) # 失效记录删除线程 socket.setdefaulttimeout(int(CONFIG_INI['Timeout']) / 2) # 设置连接超时

monitor–心跳线程 位于nascan/lib/common.py def monitor(CONFIG_INI, STATISTICS, NACHANGE): while True:#线程通过While True和设定延时,实现了监控资产列表,定时更新数据库、触发扫描、清理失效目标等操作。 try: time_ = datetime.datetime.now() date_ = time_.strftime('%Y-%m-%d') mongo.na_db.Heartbeat.update({"name": "heartbeat"}, {"$set": {"up_time": time_}}) if date_ not in STATISTICS: STATISTICS[date_] = {"add": 0, "update": 0, "delete": 0} mongo.na_db.Statistics.update({"date": date_}, {"$set": {"info": STATISTICS[date_]}}, upsert=True) new_config = get_config()#获取数据库最新的config集合数据 if base64.b64encode(CONFIG_INI["Scan_list"]) != base64.b64encode(new_config["Scan_list"]):NACHANGE[0] = 1# 比较扫描目标是否发生了变化, 变化就将值置为1, 表示需要重新扫描 CONFIG_INI.clear() CONFIG_INI.update(new_config) except Exception, e: print e time.sleep(30)

再一次调用get_config()获取数据库config集合中最新的数据; 如果scan_list的base的值发生变化,则将NACHANGE[0] = 1,更新config,重新进行扫描; - cruise–失效删除记录 位于nascan/lib/common.py

def cruise(STATISTICS,MASSCAN_AC): while True: now_str = datetime.datetime.now() week = int(now_str.weekday()) hour = int(now_str.hour) if week >= 1 and week <= 5 and hour >= 9 and hour <= 18: # 非工作时间不删除 try: data = mongo.NA_INFO.find().sort("time", 1) for history_info in data: while True: if MASSCAN_AC[0]: # 如果masscan正在扫描即不进行清理 time.sleep(10) else: break ip = history_info['ip'] port = history_info['port'] try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)#对目标(ip:port)进行sock连接 sock.connect((ip, int(port))) sock.close() except Exception, e: time_ = datetime.datetime.now() date_ = time_.strftime('%Y-%m-%d') mongo.NA_INFO.remove({"ip": ip, "port": port})#进行sock连接,连接不上就删除数据库INFO里面的ip和port log.write('info', None, 0, '%s:%s delete' % (ip, port)) STATISTICS[date_]['delete'] += 1 del history_info["_id"] history_info['del_time'] = time_#然后将删除时间写入history_info history_info['type'] = 'delete' mongo.NA_HISTORY.insert(history_info) except: pass time.sleep(3600)

对目标(ip:port)进行sock连接,如果连接不上就删除数据库INFO里面的ip和port,然后将删除时间写入history_info; 5、log.write('info', None, 0, u'扫描规则: ' + str(CONFIG_INI['Cycle'])) log.write()函数,格式化了输出在控制台界面的信息; 6、if (now_hour == int(ac_hour) and now_day % int(cy_day) == 0 and now_date not in ac_data) or NACHANGE[0]: ac_data.append(now_date) 如果到达扫描的周期时间或者如果NACHANGE[0]的值为1,任何一个成立都可以重新扫描; 7、s = start(CONFIG_INI) 进入start()函数,位于nascan/lib/start.py

class start: def __init__(self, config): # 默认配置 self.config_ini = config self.queue = Queue.Queue() self.thread = int(self.config_ini['Thread']) self.scan_list = self.config_ini['Scan_list'].split('\n') self.mode = int(self.config_ini['Masscan'].split('|')[0]) self.icmp = int(self.config_ini['Port_list'].split('|')[0]) self.white_list = self.config_ini.get('White_list', '').split('\n') def run(self): global AC_PORT_LIST all_ip_list = [] for ip in self.scan_list: # 解析CIDR形式IP地址 if "/" in ip: ip = cidr.CIDR(ip) if not ip: continue # 获得完整目标IP地址列表 ip_list = self.get_ip_list(ip) for white_ip in self.white_list: if white_ip in ip_list: ip_list.remove(white_ip) # 当使用masscan扫描时 if self.mode == 1:#判断是否支持masscan扫描 masscan_path = self.config_ini['Masscan'].split('|')[2] masscan_rate = self.config_ini['Masscan'].split('|')[1] # 如果用户在前台关闭了ICMP存活探测则进行全IP段扫描 if self.icmp: ip_list = self.get_ac_ip(ip_list) # 默认使用icmp去探测获得存活主机 self.masscan_ac[0] = 1 # 可以继续masscan端口扫描 # 如果安装了Masscan即使用Masscan进行全端口扫描 AC_PORT_LIST = self.masscan( ip_list, masscan_path, masscan_rate) if not AC_PORT_LIST: continue self.masscan_ac[0] = 0 #不能再次用masscan进行端口扫描 for ip_str in AC_PORT_LIST.keys(): self.queue.put(ip_str) # ip地址加入队列 self.scan_start() # 开始扫描 开始端口banner获取和banner比对识别等 else: all_ip_list.extend(ip_list) # 不使用masscan时 if self.mode == 0: # 如果启用存活主机探测功能时,会用icmp echo探测存活的主机ip if self.icmp: all_ip_list = self.get_ac_ip(all_ip_list) # IP地址加入队列 for ip_str in all_ip_list: self.queue.put(ip_str) # 加入队列 self.scan_start() # TCP探测模式开始扫描

if self.mode == 1 判断是否支持masscan扫描,如果支持就使用Masscan进行全端口扫描。如果没有开启,将ip添加到all_ip_list这个列表中。 masscan函数 位于nascan/lib/start.py

def masscan(self, ip, masscan_path, masscan_rate): try: if len(ip) == 0: return sys.path.append(sys.path[0] + "/plugin") m_scan = __import__("masscan")#动态加载plugin目录下的masscan result = m_scan.run(ip, masscan_path, masscan_rate) return result except Exception, e: print e print 'No masscan plugin detected'

动态加载plugin目录下的masscan.py masscan.py

def run(ip_list,path,rate): try: ip_file = open('target.log','w') ip_file.write("\n".join(ip_list)) ip_file.close() # 过滤可能导致命令执行的字符,过滤了;|&这三个字符 path = str(path).translate(None, ';|&`\n') rate = str(rate).translate(None, ';|&`\n') if not os.path.exists(path):return # 将path、rate加到命令后面执行 os.system("%s -p1-65535 -iL target.log -oL tmp.log --randomize-hosts --rate=%s"%(path,rate)) result_file = open('tmp.log', 'r') result_json = result_file.readlines() result_file.close() del result_json[0] del result_json[-1] open_list = {} for res in result_json: try: ip = res.split()[3] port = res.split()[2] if ip in open_list: open_list[ip].append(port) else: open_list[ip] = [port] except:pass os.remove('target.log') os.remove('tmp.log') return open_list except: pass

先过滤可能导致命令执行的字符,过滤了;|&这三个字符; 然后将 将path、rate加到命令后面执行; 将扫描结果保存在tmp.log文件然后读取里面的内容; 8、scan_start()函数 不管有没有使用masscan扫描都会进入scan_start()函数,开始进行扫描;

def scan_start(self): for i in range(self.thread): # 开始扫描 t = ThreadNum(self.queue) t.setDaemon(True) t.mode = self.mode t.config_ini = self.config_ini#提供配置信息 t.statistics = self.statistics#提供统计信息 t.start() self.queue.join()

进入ThreadNum中

class ThreadNum(threading.Thread): def __init__(self, queue): threading.Thread.__init__(self) self.queue = queue #run()函数,把IP地址和端口号列表传到另一个scan()函数中 def run(self): while True: try: # ip地址队列 task_host = self.queue.get(block=False) except: break try: # 如果使用masscan, 端口就用扫描到的已经开放的端口 if self.mode: port_list = AC_PORT_LIST[task_host] # 没有使用masscan, 使用默认端口 else: port_list = self.config_ini['Port_list'].split('|')[ 1].split('\n') # 根据banner识别端口开放的服务 _s = scan.scan(task_host, port_list) _s.config_ini = self.config_ini _s.statistics = self.statistics # 提供统计信# 提供配置信息 _s.run() except Exception, e: print e finally: self.queue.task_done()

_s = scan.scan(task_host, port_list) 这里ip地址和端口号传入到另一个scan函数中; scan()函数 位于/nascan/lib/scan.py

class scan: def __init__(self, task_host, port_list): self.ip = task_host self.port_list = port_list self.config_ini = {} def run(self): self.timeout = int(self.config_ini['Timeout']) for _port in self.port_list: self.server = '' self.banner = '' self.port = int(_port) # 基础单端口扫描获得开放端口banner self.scan_port() # 端口扫描 if not self.banner: continue self.server_discern() #服务识别 使用获得的banner进行服务类型识别 # 测试还剩下的一些没识别出来的端口服务是不是web服务器 if self.server == '': web_info = self.try_web() # 尝试web访问 if web_info: log.write('web', self.ip, self.port, web_info) time_ = datetime.datetime.now() # Info 集合更新 mongo.NA_INFO.update({'ip': self.ip, 'port': self.port}, {"$set": {'banner': self.banner, 'server': 'web', 'webinfo': web_info, 'time': time_}})

scan的run()函数先进行了端口扫描,然后进入server_discern()函数。

self.scan_port() # 端口扫描 def scan_port(self): try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.connect((self.ip, self.port)) time.sleep(0.2) except Exception, e: return try: self.banner = sock.recv(1024) sock.close() if len(self.banner) <= 2: self.banner = 'NULL' except Exception, e: self.banner = 'NULL' log.write('portscan', self.ip, self.port, None) banner = '' hostname = self.ip2hostname(self.ip) time_ = datetime.datetime.now() date_ = time_.strftime('%Y-%m-%d') try: banner = unicode(self.banner, errors='replace') if self.banner == 'NULL': banner = '' mongo.NA_INFO.insert({"ip": self.ip, "port": self.port, "hostname": hostname, "banner": banner, "time": time_}) self.statistics[date_]['add'] += 1 except: if banner: history_info = mongo.NA_INFO.find_and_modify( query={"ip": self.ip, "port": self.port, "banner": {"$ne": banner}}, remove=True) if history_info: mongo.NA_INFO.insert( {"ip": self.ip, "port": self.port, "hostname": hostname, "banner": banner, "time": time_}) self.statistics[date_]['update'] += 1 del history_info["_id"] history_info['del_time'] = time_ history_info['type'] = 'update' mongo.NA_HISTORY.insert(history_info)

scan_port()通过socket套接字连接,获得端口服务返回的banner信息;

self.server_discern() #服务识别 使用获得的banner进行服务类型识别 进入server_discern()函数 def server_discern(self): for mark_info in self.config_ini['Discern_server']: # 快速识别 try: # 服务名默认端口识别方法 banner匹配正则表达式 name, default_port, mode, reg = mark_info # 识别模式是default的,只判断端口号 if mode == 'default': if int(default_port) == self.port: self.server = name # 识别模式是banner的,正则匹配banner elif mode == 'banner': matchObj = re.search(reg, self.banner, re.I | re.M)#re.M多行匹配,影响 ^ 和 $; re.I使匹配对大小写不敏感; if matchObj: self.server = name if self.server: break except: continue # 处理没识别出来的也不太像(不严谨)web的服务 if not self.server and self.port not in [80, 443, 8080]: for mark_info in self.config_ini['Discern_server']: # 发包识别 try: name, default_port, mode, reg = mark_info if mode not in ['default', 'banner']: dis_sock = socket.socket( socket.AF_INET, socket.SOCK_STREAM) dis_sock.connect((self.ip, self.port)) mode = mode.decode('string_escape') reg = reg.decode('string_escape') dis_sock.send(mode) time.sleep(0.3) dis_recv = dis_sock.recv(1024) dis_sock.close() matchObj = re.search(reg, dis_recv, re.I | re.M) if matchObj: self.server = name break except: pass if self.server: log.write("server", self.ip, self.port, self.server) mongo.NA_INFO.update({"ip": self.ip, "port": self.port}, { "$set": {"server": self.server}})

server_discern()函数,通过正则表达式,依次比较,获得服务类型;

try_web()函数 def try_web(self): title_str, html = '', '' try: if self.port == 443: info = urllib2.urlopen("https://%s:%s" % (self.ip, self.port), timeout=self.timeout) else: info = urllib2.urlopen("http://%s:%s" % (self.ip, self.port), timeout=self.timeout) html = info.read() header = info.headers except urllib2.HTTPError, e: html = e.read() header = e.headers except: return if not header: return # 解压gzip if 'Content-Encoding' in header and 'gzip' in header['Content-Encoding']: html_data = StringIO.StringIO(html) gz = gzip.GzipFile(fileobj=html_data) html = gz.read() try: html_code = self.get_code(header, html).strip() if html_code and len(html_code) < 12: html = html.decode(html_code).encode('utf-8') except: pass try: title = re.search(r'<title>(.*?)</title>', html, flags=re.I | re.M) if title: title_str = title.group(1) except: pass try: web_banner = str(header) + "\r\n\r\n" + html self.banner = web_banner history_info = mongo.NA_INFO.find_one( {"ip": self.ip, "port": self.port}) if 'server' not in history_info: tag = self.get_tag() web_info = {'title': title_str, 'tag': tag} return web_info else: if abs(len(history_info['banner'].encode('utf-8')) - len(web_banner)) > len(web_banner) / 60: del history_info['_id'] history_info['del_time'] = datetime.datetime.now() mongo.NA_HISTORY.insert(history_info) tag = self.get_tag() web_info = {'title': title_str, 'tag': tag} date_ = datetime.datetime.now().strftime('%Y-%m-%d') self.statistics[date_]['update'] += 1 log.write('info', None, 0, '%s:%s update web info' % (self.ip, self.port)) return web_info except: return

nascan代码大致流程

最新回复(0)