博客友链延迟检测和互链检测
闲来无事给博客添加了友链延迟检测和互链检测
思路也是很简单,js脚本提取元素获取链接后检测即可。
但是也不是很简单,获取对方友链页面信息涉及到跨域,还得写个python脚本做代理,不过也不难,经过1小时摸索就做出来一个比较完善的。
各位想使用的话就让GPT帮你魔改下,JS给他,右键审查元素,复制列表元素的outerHTML,让它根据你的网站简单适配下就好。
注意:友链检测的原理是在描述文本(你给每个友链的描述部分,或者Slogan位置)中检测<friend-link>友链页面<friend-link>
字符串,然后他会自动移除标志符部分的字符串并且开始检测,当然,你可以正常的显示口号,不受影响,它只会正则匹配<friend-link>友链页面<friend-link>
。例如这样:
以下是脚本,js注入在友链页,python在跳板机部署好填写对应IP给js。
<script>
(() => {
const MAX_ATTEMPTS = 8;
const INITIAL_ATTEMPTS = 3;
const STD_THRESHOLD = 50;
const TIMEOUT_LIMIT = 3000;
const GLOBAL_CONCURRENCY = 4;
const START_DELAY_MS = 1000;
const SELF_DOMAIN = 'xx.xxx.xx';//本站的域名,用来判断对面有没有添加自己的链接
const PROXY_URL = 'https://xxxx.xxxx.xxx/proxy?url='; // 跳板机的 URL
// 友链白名单(直接认为是双向友链)
const WHITELIST = [
'*检索名称,部分博客为js动态渲染,获取不到',
];
const items = document.querySelectorAll('.flink-list-item');
const pingStatusMap = new Map();
const linkStatusMap = new Map();
// 计算标准差
function standardDeviation(values) {
const avg = values.reduce((a, b) => a + b, 0) / values.length;
const squareDiffs = values.map(v => Math.pow(v - avg, 2));
return Math.sqrt(squareDiffs.reduce((a, b) => a + b, 0) / values.length);
}
// 单次 ping
function pingOnce(url) {
return new Promise(resolve => {
const start = performance.now();
Promise.race([
fetch(url, { method: 'GET', mode: 'no-cors' }),
new Promise((_, reject) => setTimeout(() => reject('TIMEOUT'), TIMEOUT_LIMIT))
])
.then(() => resolve(Math.round(performance.now() - start)))
.catch(() => resolve(TIMEOUT_LIMIT));
});
}
// 稳定的 ping,尝试多次获取稳定结果
async function stablePing(url) {
let times = [], attempts = 0;
while (attempts < MAX_ATTEMPTS) {
const time = await pingOnce(url);
times.push(time);
attempts++;
// 如果次数超过初始尝试次数,判断标准差是否符合阈值
if (times.length >= INITIAL_ATTEMPTS) {
const std = standardDeviation(times);
if (std <= STD_THRESHOLD) break;
}
}
// 排除异常值
if (times.length > 2) {
times.sort((a, b) => a - b);
times = times.slice(1, -1);
}
// 返回平均值和尝试的次数
const avg = Math.round(times.reduce((a, b) => a + b, 0) / times.length);
return { avg, times, count: attempts };
}
// 批量任务并发处理
async function runWithPool(tasks, limit) {
const results = [];
const executing = [];
for (const task of tasks) {
const p = task();
results.push(p);
executing.push(p);
// 控制并发限制
if (executing.length >= limit) {
await Promise.race(executing);
executing.splice(0, executing.length);
}
}
return Promise.all(results);
}
// 第一步:清除 friend-link 标签,插入状态容器
items.forEach(item => {
const descSpan = item.querySelector('.flink-item-desc');
const nameSpan = item.querySelector('.flink-item-name');
// 提取并清除 friend-link 标签内容
const match = descSpan?.textContent.match(/<friend-link>(.*?)<\/friend-link>/);
const linkURL = match ? match[1] : null;
if (match) descSpan.textContent = descSpan.textContent.replace(match[0], '');
// 创建容器行
const infoLine = document.createElement('div');
infoLine.style.fontSize = '0.65em';
infoLine.style.marginTop = '1px';
infoLine.style.textAlign = 'left';
infoLine.style.paddingLeft = '1px';
nameSpan.parentElement.insertBefore(infoLine, nameSpan.nextSibling);
// ping 状态栏
const pingSpan = document.createElement('span');
pingSpan.style.color = 'gray';
pingSpan.textContent = '● PINGing';
infoLine.appendChild(pingSpan);
pingStatusMap.set(item, pingSpan);
// 友链检测标志(仅有 friend-link 的才添加)
const friendName = nameSpan.textContent.trim();
if (linkURL) {
const statusSpan = document.createElement('span');
statusSpan.style.marginLeft = '8px';
statusSpan.style.color = 'goldenrod';
statusSpan.textContent = '↻ 互链查询中...';
infoLine.appendChild(statusSpan);
linkStatusMap.set(item, { url: linkURL, div: statusSpan });
// 如果该友链在白名单中,直接标记为双向友链,跳过代理检查
if (WHITELIST.includes(friendName)) {
statusSpan.textContent = '↔︎ 双向友链';
statusSpan.style.color = 'green';
console.log(`[友链] ✅ ${friendName} 被加入白名单,直接双向友链`);
linkStatusMap.set(item, { url: linkURL, div: statusSpan, isWhitelisted: true });
}
}
});
// 第二步:ping 操作
setTimeout(() => {
const pingTasks = Array.from(pingStatusMap.entries()).map(([item, pingSpan]) => {
const link = item.querySelector('a.cf-friends-link');
const url = link.href;
return async () => {
try {
const { avg, times, count } = await stablePing(url);
if (avg >= TIMEOUT_LIMIT) return;
pingSpan.textContent = `● ${avg}ms (${count})`;
pingSpan.style.color = avg <= 250 ? 'green' : 'orange';
pingSpan.title = times.join(', ') + ' ms';
console.log(`[PING] ${url} → ${avg}ms`);
} catch (e) {
// ping 错误被忽略
}
};
});
runWithPool(pingTasks, GLOBAL_CONCURRENCY);
}, START_DELAY_MS);
// 第三步:友链检测
const linkTasks = Array.from(linkStatusMap.values()).map(({ url, div, isWhitelisted }) => {
return async () => {
// 如果该友链在白名单中,跳过代理检查
if (isWhitelisted) {
return; // 已经由白名单处理,不再检查
}
try {
console.log(`[友链] 正在检查 ${url}`);
// 通过代理服务器进行请求,只在友链检测时走代理
const res = await fetch(PROXY_URL + encodeURIComponent(url), { method: 'GET' });
const data = await res.json();
if (data.status_code === 200 && data.body.includes(SELF_DOMAIN)) {
div.textContent = '↔︎ 双向友链';
div.style.color = 'green';
console.log(`[友链] ✅ ${url} 包含 ${SELF_DOMAIN}`);
} else {
div.textContent = '← 单向友链';
div.style.color = 'red';
console.log(`[友链] ❌ ${url} 未包含 ${SELF_DOMAIN}`);
}
} catch (err) {
console.log(`[友链] ❌ ${url} 访问失败`);
div.remove(); // 失败不显示任何标志
}
};
});
runWithPool(linkTasks, 2);
})();
</script>
from flask import Flask, request, jsonify
import requests
import logging
import time
from collections import OrderedDict
app = Flask(__name__)
# 设置日志格式与等级
logging.basicConfig(level=logging.INFO, format='[%(asctime)s] %(levelname)s: %(message)s')
# 缓存相关配置
CACHE_TTL = 300 # 缓存有效时间(秒)
MAX_CACHE_SIZE = 100 * 1024 * 1024 # 最大缓存大小 100MB(字节)
# 缓存结构:url -> (时间戳, 内容, 状态码, 大小)
cache = OrderedDict()
cache_total_size = 0
# 缓存清理函数
def evict_cache_if_needed():
global cache_total_size
# 当缓存总大小超过上限时,删除最旧的缓存项
while cache_total_size > MAX_CACHE_SIZE:
old_url, (ts, body, status, size) = cache.popitem(last=False)
cache_total_size -= size
logging.info(f"已移除缓存项:{old_url}(大小:{size} 字节)")
@app.route('/proxy', methods=['GET'])
def proxy():
global cache_total_size
url = request.args.get('url')
if not url:
logging.warning("请求中未提供 URL")
return jsonify({'error': '未提供 URL 参数'}), 400
current_time = time.time()
# 检查缓存是否命中
if url in cache:
timestamp, body, status_code, size = cache[url]
if current_time - timestamp < CACHE_TTL:
logging.info(f"缓存命中:{url}")
return jsonify({
'status_code': status_code,
'url': url,
'body': body,
'cached': True
})
else:
# 缓存已过期,删除
del cache[url]
cache_total_size -= size
logging.info(f"缓存过期:{url}")
try:
logging.info(f"请求远程资源:{url}")
response = requests.get(url)
body = response.text
status_code = response.status_code
# 计算响应内容的字节大小
body_size = len(body.encode('utf-8'))
# 存入缓存
cache[url] = (current_time, body, status_code, body_size)
cache.move_to_end(url) # 保证字典按访问顺序排列
cache_total_size += body_size
# 检查是否需要清理缓存
evict_cache_if_needed()
return jsonify({
'status_code': status_code,
'url': url,
'body': body,
'cached': False
})
except requests.exceptions.RequestException as e:
logging.error(f"请求出错:{url} - {e}")
return jsonify({'error': str(e)}), 500
if __name__ == '__main__':
# 启动 Flask 服务,监听所有 IP,端口为 22335
app.run(debug=True, host='0.0.0.0', port=22335)
欢迎订阅和交换友链
本文是原创文章,采用 CC BY-NC-ND 4.0 协议,完整转载请注明来自 砂纸の小屋
评论
匿名评论
隐私政策
你无需删除空行,直接评论以获取最佳展示效果