169 lines
6.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import requests
import urllib3
from urllib3.exceptions import InsecureRequestWarning
urllib3.disable_warnings(InsecureRequestWarning)
import json
from ipSearch.xdbSearcher import XdbSearcher
import re
import base64
from bs4 import BeautifulSoup
import socket
import time
def search_ip(ip): # 根据ip查询地理位置idc等信息
dbPath = "./ipSearch/ip2region.xdb"
vi = XdbSearcher.loadVectorIndexFromFile(dbfile=dbPath)
searcher = XdbSearcher(dbfile=dbPath, vectorIndex=vi)
region_str = searcher.search(ip)
return region_str
def use_zoomeye(): # 使用zoomeye采集节点
url = 'https://www.zoomeye.org/api/search'
params = {
'q': '"Tailscale"+"DERP"+country:"CN"',
'page': '1',
'pageSize': '20',
't': 'v4+v6+web'
}
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.77 Safari/537.36'
}
info = requests.get(url, params=params, headers=headers)
if info.status_code == 200:
data = json.loads(info.text)["matches"]
node_list = []
for i in data:
portinfo = i.get("portinfo", {})
service = portinfo.get("service", "N/A")
if service == "https":
ip = i["ip"]
port = i["portinfo"]["port"]
ip_info = re.sub(r'\b(\w+)\s+\1\b', r'\1', search_ip(ip).replace("|", " ").replace("0", "").replace("","").replace("","")).replace(" ", "")
# 向列表中添加节点信息
node_list.append({"ip": ip, "port": port, "info": ip_info})
return True, node_list
else:
return False, []
def use_fofainfo(): # 使用fofa采集节点
url = 'https://fofa.info/result'
params = {
'qbase64': base64.b64encode('body="DERP" && body="Tailscale" && country="CN"'.encode('utf-8')).decode('utf-8')
}
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.77 Safari/537.36'
}
info = requests.get(url, params=params, headers=headers)
if info.status_code == 200:
soup = BeautifulSoup(info.text, "html.parser")
pattern = re.compile(r'<a\s+href="https[^"]+"[^>]*>')
matches = re.findall(pattern, str(soup))
node_list = []
for match in matches:
match = match.replace('<a href="https://', "").replace('" target="_blank">', "")
ip = socket.gethostbyname(match.split(":")[0])
try:
port = match.split(":")[1]
except:
port = "443"
print(ip, port)
ip_info = re.sub(r'\b(\w+)\s+\1\b', r'\1', search_ip(ip).replace("|", " ").replace("0", "").replace("","").replace("","")).replace(" ", "")
# 向列表中添加节点信息
node_list.append({"ip": ip, "port": port, "info": ip_info})
return True, node_list
else:
return False, []
def check_node(ip, port):
try:
request = requests.get(f"https://{ip}:{port}", timeout=3, verify=False)
if request.status_code == 200:
print(f"{ip}:{port}可用")
return True
else:
print(f"{ip}:{port}不可用")
return False
except:
print(f"{ip}:{port}不可用")
return False
if __name__ == "__main__":
# 使用zoomeye采集节点
status, zoomeye_node_list = use_zoomeye()
if status:
print("zoomeye采集成功")
else:
print("zoomeye采集失败")
# 使用fofa采集节点
status, fofa_node_list = use_fofainfo()
if status:
print("fofa采集成功")
else:
print("fofa采集失败")
# 合并节点
temp_node_list = zoomeye_node_list + fofa_node_list
print("共采集到" + str(len(temp_node_list)) + "个节点")
# 写入temp_nodes.json备用
with open("temp_nodes.json", "w", encoding="utf-8") as f:
f.write(json.dumps(temp_node_list, ensure_ascii=False, indent=4))
print("写入temp_nodes.json成功")
# 读取现有节点,合并并去重
try:
with open("all_nodes.json", "r", encoding="utf-8") as f:
all_nodes = json.load(f)
except:
all_nodes = []
all_nodes += temp_node_list
unique_data_set = {tuple(d.items()) for d in all_nodes}
unique_data_list = [dict(t) for t in unique_data_set]
# 检验节点可连接性
for node in unique_data_list:
ip = node["ip"]
port = node["port"]
if not check_node(ip, port):
unique_data_list.remove(node)
print("共有" + str(len(unique_data_list)) + "个节点可用")
# 写入all_nodes.json文件
with open("all_nodes.json", "w", encoding="utf-8") as f:
f.write(json.dumps(unique_data_list, ensure_ascii=False, indent=4))
print("写入all_nodes.json成功")
# 将README.md文件中的节点列表替换为最新的节点信息
with open("README.md", "r", encoding="utf-8") as f:
readme = f.read()
readme_node_list = ""
RegionID = 900
for node in unique_data_list:
node['RegionID'] = RegionID
readme_node_list += f"| {node['info']} | {RegionID} | {node['ip']} | {node['port']} |\n"
RegionID += 1
readme = readme[:readme.find("| :-: | :-: | :-: | :-: |") + 26] + readme_node_list
# 保存README.md文件
with open("README.md", "w", encoding="utf-8") as f:
f.write(readme)
print("README.md文件已更新")
# 将节点保存到Tailscale_时间戳.json文件
with open(f"./config/Tailscale_{int(time.time())}.json", "w", encoding="utf-8") as f:
f.write(json.dumps({
"derpMap": {
"OmitDefaultRegions": True,
"Regions": {
str(node["RegionID"]): {
"RegionID": node["RegionID"],
"RegionCode": node["info"],
"Nodes": [
{
"Name": node["info"],
"RegionID": node["RegionID"],
"HostName": node["ip"],
"DERPPort": node["port"],
"InsecureForTests": True
}
]
}
for node in unique_data_list
}
}
}, ensure_ascii=False, indent=4))