1
0
mirror of synced 2026-04-29 22:38:46 +08:00

Merge pull request #275 from MiyakoMeow/main

添加DNS查询功能,优化IP地址筛选逻辑
This commit is contained in:
削微寒
2025-04-14 09:09:18 +08:00
committed by GitHub
3 changed files with 112 additions and 31 deletions

View File

@@ -1,3 +1,4 @@
requests-html==0.10.0
retry==0.9.2
lxml_html_clean
lxml_html_clean
aiodns==3.0.0

View File

@@ -6,8 +6,11 @@
# Date : 2020-05-19 15:27
# Desc : 获取最新的 GitHub 相关域名对应 IP
import re
from typing import Any, Optional
from typing import Any, Dict, List, Optional
from datetime import datetime
import sys
import asyncio
import aiodns
from pythonping import ping
from requests_html import HTMLSession
@@ -16,26 +19,36 @@ from retry import retry
from common import GITHUB_URLS, write_hosts_content
def get_best_ip(ip_list: list) -> str:
ping_timeout = 1
best_ip = ''
min_ms = ping_timeout * 1000
ip_set = set(ip_list)
for ip in ip_set:
ping_result = ping(ip, timeout=ping_timeout)
print(f'ping {ip} {ping_result.rtt_avg_ms} ms')
if ping_result.rtt_avg_ms == ping_timeout * 1000:
# 超时认为 IP 失效
continue
else:
if ping_result.rtt_avg_ms < min_ms:
min_ms = ping_result.rtt_avg_ms
best_ip = ip
PING_TIMEOUT_SEC: int = 1
DISCARD_LIST: List[str] = ["1.0.1.1", "1.2.1.1", "127.0.0.1"]
PING_LIST: Dict[str, int] = dict()
def ping_cached(ip: str) -> int:
global PING_LIST
if ip in PING_LIST:
return PING_LIST[ip]
ping_times = [ping(ip, timeout=PING_TIMEOUT_SEC).rtt_avg_ms for _ in range(3)]
ping_times.sort()
print(f'Ping {ip}: {ping_times} ms')
PING_LIST[ip] = ping_times[1] # 取中位数
return PING_LIST[ip]
def select_ip_from_list(ip_list: List[str]) -> Optional[str]:
if len(ip_list) == 0:
return None
ping_results = [(ip, ping_cached(ip)) for ip in ip_list]
ping_results.sort(key=lambda x: x[1])
best_ip = ping_results[0][0]
print(f"{ping_results}, selected {best_ip}")
return best_ip
@retry(tries=3)
def get_ip(session: Any, github_url: str) -> Optional[str]:
def get_ip_list_from_ipaddress_com(session: Any, github_url: str) -> Optional[List[str]]:
url = f'https://sites.ipaddress.com/{github_url}'
headers = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7)'
@@ -45,31 +58,94 @@ def get_ip(session: Any, github_url: str) -> Optional[str]:
rs = session.get(url, headers=headers, timeout=5)
pattern = r"\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b"
ip_list = re.findall(pattern, rs.html.text)
best_ip = get_best_ip(ip_list)
if best_ip:
return best_ip
else:
raise Exception("url: {github_url}, ipaddress empty")
return ip_list
except Exception as ex:
print("get: {url}, error: {ex}")
print(f"get: {url}, error: {ex}")
raise Exception
def main() -> None:
DNS_SERVER_LIST = [
"1.1.1.1", # Cloudflare
"8.8.8.8", # Google
"101.101.101.101", # Quad101
"101.102.103.104", # Quad101
]
def windows_compatibility_check():
if sys.platform == "win32":
# 检查 pycares 是否正常加载
try:
import pycares
except ImportError:
raise RuntimeError("请先执行 'pip install pycares'")
async def get_ip_list_from_dns(
domain,
record_type="A",
dns_server_list=["1.2.4.8", "114.114.114.114"],
):
# Windows 兼容性检查
windows_compatibility_check()
# 配置 DNS 服务器
resolver = aiodns.DNSResolver()
resolver.nameservers = dns_server_list
try:
# 执行异步查询
result = await resolver.query(domain, record_type)
return [answer.host for answer in result]
except aiodns.error.DNSError as e:
print(f"{domain}: DNS 查询失败: {e}")
return []
async def get_ip(session: Any, github_url: str) -> Optional[str]:
ip_list_web = []
try:
ip_list_web = get_ip_list_from_ipaddress_com(session, github_url)
except Exception as ex:
pass
ip_list_dns = []
try:
ip_list_dns = await get_ip_list_from_dns(github_url, dns_server_list=DNS_SERVER_LIST)
except Exception as ex:
pass
ip_list_set = set(ip_list_web + ip_list_dns)
for discard_ip in DISCARD_LIST:
ip_list_set.discard(discard_ip)
ip_list = list(ip_list_set)
ip_list.sort()
if len(ip_list) == 0:
return None
print(f"{github_url}: {ip_list}")
best_ip = select_ip_from_list(ip_list)
return best_ip
async def main() -> None:
current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print(f'{current_time} - Start script.')
session = HTMLSession()
content = ""
content_list = []
for index, github_url in enumerate(GITHUB_URLS):
print(f'Start Processing url: {index + 1}/{len(GITHUB_URLS)}, {github_url}')
try:
ip = get_ip(session, github_url)
content += ip.ljust(30) + github_url + "\n"
ip = await get_ip(session, github_url)
if ip is None:
print(f"{github_url}: IP Not Found")
ip = "# IP Address Not Found"
content += ip.ljust(30) + github_url
global PING_LIST
if PING_LIST.get(ip) is not None and PING_LIST.get(ip) == PING_TIMEOUT_SEC * 1000:
content += " # Timeout"
content += "\n"
content_list.append((ip, github_url,))
except Exception:
continue
print(f'Process url: {index + 1}/{len(GITHUB_URLS)}, {github_url}')
write_hosts_content(content, content_list)
# print(hosts_content)
@@ -77,5 +153,8 @@ def main() -> None:
print(f'{current_time} - End script.')
if __name__ == '__main__':
main()
if __name__ == "__main__":
if sys.platform == "win32":
# Windows 事件循环策略配置
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
asyncio.run(main())

View File

@@ -2,3 +2,4 @@ requests-html==0.10.0
pythonping==1.1.4
retry==0.9.2
lxml_html_clean
aiodns==3.0.0