# Description: 过滤并添加节点 import urllib3 from urllib3.exceptions import InsecureRequestWarning urllib3.disable_warnings(InsecureRequestWarning) import json import yaml import requests def try_node(ip, port): print(f"正在测试节点{ip}:{port}") try: request = requests.get(f"https://{ip}:{port}", timeout=3, verify=False) if request.status_code == 200: return True else: return False except: return False if __name__ == "__main__": # 读取temp_nodes.yaml文件,若没有则跳过 try: with open("temp_nodes.yaml", "r", encoding="utf-8") as f: nodes = yaml.load(f, Loader=yaml.FullLoader) except: nodes = [] # 读取all_nodes.yaml文件,若没有则创建 try: with open("all_nodes.yaml", "r", encoding="utf-8") as f: all_nodes = yaml.load(f, Loader=yaml.FullLoader) except: all_nodes = [] # 合并节点并去重 all_nodes += nodes unique_data_set = set(json.dumps(item) for item in all_nodes) unique_data_list = [json.loads(item) for item in unique_data_set] # 检验节点可连接性 for node in unique_data_list: ip = node["ip"] port = node["port"] if not try_node(ip, port): unique_data_list.remove(node) print(f"节点{ip}:{port}不可连接") print("共有" + str(len(unique_data_list)) + "个节点可用") # 写入all_nodes.yaml文件 with open("all_nodes.yaml", "w", encoding="utf-8") as f: yaml.dump(unique_data_list, f, allow_unicode=True)