设计一个本地 Ubuntu 24.04 服务器自动下载 fnNAS 镜像的方案,支持版本号变化检测,Python下载GitHub的飞牛NAS镜像包有效解决国内网络稳定性问题,不耽误使用,适用于GitHub所有镜像拉取。
本地 Ubuntu 24.04 服务器
↓ 定时任务 (cron/systemd timer)
自动检测脚本 (Python/Bash)
↓ 匹配版本模式
下载最新镜像 → 验证 → 通知bash
sudo apt update
sudo apt install -y python3-pip python3-venv curl jqbash
sudo mkdir -p /opt/fnnas-sync
sudo chown $USER:$USER /opt/fnnas-sync
cd /opt/fnnas-syncPython
#!/usr/bin/env python3
"""
fnNAS 镜像自动下载脚本
支持版本号变化检测和断点续传
"""
import os
import sys
import json
import re
import hashlib
import requests
from datetime import datetime
from pathlib import Path
import logging
# ==================== 配置区域 ====================
CONFIG = {
# GitHub 仓库信息
"github_repo": "ophub/fnnas",
"github_api": "https://api.github.com/repos/ophub/fnnas/releases/latest",
# 文件匹配模式(正则表达式)
# 匹配:fnnas_amlogic_s905l_k6.12.41_2026.02.02.img.gz
"filename_pattern": r"fnnas_amlogic_s905l_k6\.12\.41_\d{4}\.\d{2}\.\d{2}\.img\.gz",
# 下载目录
"download_dir": "/opt/fnnas-sync/downloads",
# 状态文件(记录已下载的版本)
"state_file": "/opt/fnnas-sync/state.json",
# 日志文件
"log_file": "/var/log/fnnas-sync.log",
# 保留历史版本数量(0表示不删除旧版本)
"keep_versions": 2,
# 下载超时(秒)
"timeout": 600,
# 断点续传分块大小(字节)
"chunk_size": 8192,
# 是否验证文件完整性(如果有校验文件)
"verify_checksum": True,
# 通知配置(可选)
"notify": {
"enabled": False,
"webhook_url": "", # 钉钉/飞书/企业微信机器人地址
}
}
# =================================================
class FnnasSync:
def __init__(self, config):
self.config = config
self.setup_logging()
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'fnNAS-Sync-Bot/1.0',
'Accept': 'application/vnd.github.v3+json'
})
# 确保目录存在
Path(config['download_dir']).mkdir(parents=True, exist_ok=True)
# 加载状态
self.state = self.load_state()
def setup_logging(self):
"""配置日志"""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s',
handlers=[
logging.FileHandler(self.config['log_file']),
logging.StreamHandler(sys.stdout)
]
)
self.logger = logging.getLogger(__name__)
def load_state(self):
"""加载已下载状态"""
if os.path.exists(self.config['state_file']):
try:
with open(self.config['state_file'], 'r') as f:
return json.load(f)
except Exception as e:
self.logger.warning(f"加载状态文件失败: {e}")
return {"downloaded": [], "last_check": None}
def save_state(self):
"""保存下载状态"""
self.state['last_check'] = datetime.now().isoformat()
with open(self.config['state_file'], 'w') as f:
json.dump(self.state, f, indent=2)
def get_latest_release(self):
"""获取 GitHub 最新 Release 信息"""
try:
response = self.session.get(
self.config['github_api'],
timeout=30
)
response.raise_for_status()
return response.json()
except Exception as e:
self.logger.error(f"获取 Release 信息失败: {e}")
return None
def find_target_asset(self, release):
"""查找匹配的文件"""
assets = release.get('assets', [])
pattern = re.compile(self.config['filename_pattern'])
for asset in assets:
name = asset['name']
if pattern.match(name):
return asset
self.logger.warning("未找到匹配的文件,可用文件列表:")
for asset in assets:
self.logger.warning(f" - {asset['name']}")
return None
def is_already_downloaded(self, asset):
"""检查是否已下载且完整"""
filename = asset['name']
filepath = os.path.join(self.config['download_dir'], filename)
if not os.path.exists(filepath):
return False
# 检查文件大小
local_size = os.path.getsize(filepath)
remote_size = asset['size']
if local_size == remote_size:
self.logger.info(f"文件已存在且大小匹配: {filename}")
return True
else:
self.logger.info(f"文件大小不匹配,本地: {local_size}, 远程: {remote_size}")
return False
def download_file(self, asset):
"""下载文件(支持断点续传)"""
url = asset['browser_download_url']
filename = asset['name']
filepath = os.path.join(self.config['download_dir'], filename)
total_size = asset['size']
# 检查已下载部分
downloaded_size = 0
if os.path.exists(filepath):
downloaded_size = os.path.getsize(filepath)
if downloaded_size == total_size:
self.logger.info("文件已完整下载")
return True
self.logger.info(f"断点续传,已下载: {downloaded_size}/{total_size} bytes")
headers = {}
if downloaded_size > 0:
headers['Range'] = f'bytes={downloaded_size}-'
try:
response = self.session.get(
url,
headers=headers,
stream=True,
timeout=self.config['timeout']
)
response.raise_for_status()
# 确定写入模式
mode = 'ab' if downloaded_size > 0 else 'wb'
with open(filepath, mode) as f:
for chunk in response.iter_content(chunk_size=self.config['chunk_size']):
if chunk:
f.write(chunk)
downloaded_size += len(chunk)
# 显示进度
if total_size > 0:
percent = (downloaded_size / total_size) * 100
print(f"\r下载进度: {percent:.1f}% ({downloaded_size}/{total_size})", end='')
print() # 换行
self.logger.info(f"下载完成: {filename}")
# 验证文件大小
final_size = os.path.getsize(filepath)
if final_size != total_size:
self.logger.error(f"文件大小验证失败: {final_size} != {total_size}")
return False
return True
except Exception as e:
self.logger.error(f"下载失败: {e}")
return False
def verify_checksum(self, asset, release):
"""验证文件校验和(如果 release 中有提供)"""
# 查找同名的 checksum 文件
checksum_extensions = ['.sha256', '.md5', '.sha256sum']
filename = asset['name']
for ext in checksum_extensions:
checksum_name = filename + ext
# 在 release body 或 assets 中查找
for line in release.get('body', '').split('\n'):
if checksum_name in line or filename in line:
# 尝试解析校验和
parts = line.split()
if len(parts) >= 2:
expected_hash = parts[0]
self.logger.info(f"找到校验和: {expected_hash[:16]}...")
# 计算实际校验和
filepath = os.path.join(self.config['download_dir'], filename)
actual_hash = self.calculate_hash(filepath, 'sha256')
if actual_hash == expected_hash:
self.logger.info("✓ 校验和验证通过")
return True
else:
self.logger.error("✗ 校验和不匹配!")
return False
return True # 没有找到校验文件,跳过验证
def calculate_hash(self, filepath, algorithm='sha256'):
"""计算文件哈希值"""
hash_obj = hashlib.new(algorithm)
with open(filepath, 'rb') as f:
for chunk in iter(lambda: f.read(8192), b''):
hash_obj.update(chunk)
return hash_obj.hexdigest()
def cleanup_old_versions(self, current_filename):
"""清理旧版本文件"""
if self.config['keep_versions'] <= 0:
return
pattern = re.compile(self.config['filename_pattern'])
download_dir = Path(self.config['download_dir'])
# 查找所有匹配的文件
files = []
for f in download_dir.iterdir():
if f.is_file() and pattern.match(f.name):
files.append((f, f.stat().st_mtime))
# 按修改时间排序
files.sort(key=lambda x: x[1], reverse=True)
# 删除旧版本
if len(files) > self.config['keep_versions']:
for old_file, _ in files[self.config['keep_versions']:]:
if old_file.name != current_filename:
self.logger.info(f"删除旧版本: {old_file.name}")
old_file.unlink()
def send_notification(self, message):
"""发送通知"""
if not self.config['notify']['enabled']:
return
webhook_url = self.config['notify']['webhook_url']
if not webhook_url:
return
try:
payload = {
"msgtype": "text",
"text": {"content": message}
}
self.session.post(webhook_url, json=payload, timeout=10)
except Exception as e:
self.logger.error(f"发送通知失败: {e}")
def run(self):
"""主运行逻辑"""
self.logger.info("=" * 50)
self.logger.info("开始检查 fnNAS 镜像更新")
# 获取最新 Release
release = self.get_latest_release()
if not release:
self.logger.error("无法获取 Release 信息,退出")
return False
version = release.get('tag_name', 'unknown')
self.logger.info(f"最新版本: {version}")
# 查找目标文件
asset = self.find_target_asset(release)
if not asset:
self.logger.error("未找到匹配的镜像文件")
return False
filename = asset['name']
self.logger.info(f"目标文件: {filename}")
self.logger.info(f"文件大小: {asset['size'] / 1024 / 1024:.2f} MB")
# 检查是否已下载
if self.is_already_downloaded(asset):
self.logger.info("文件已是最新,无需下载")
self.save_state()
return True
# 下载文件
self.logger.info("开始下载...")
if not self.download_file(asset):
self.logger.error("下载失败")
return False
# 验证校验和
if self.config['verify_checksum']:
self.verify_checksum(asset, release)
# 更新状态
self.state['downloaded'].append({
'filename': filename,
'version': version,
'downloaded_at': datetime.now().isoformat(),
'size': asset['size']
})
self.save_state()
# 清理旧版本
self.cleanup_old_versions(filename)
# 发送通知
message = f"fnNAS 镜像更新\n版本: {version}\n文件: {filename}\n时间: {datetime.now().strftime('%Y-%m-%d %H:%M')}"
self.send_notification(message)
self.logger.info("同步完成")
return True
if __name__ == "__main__":
# 支持通过环境变量覆盖配置
if os.getenv('FNNAS_DOWNLOAD_DIR'):
CONFIG['download_dir'] = os.getenv('FNNAS_DOWNLOAD_DIR')
sync = FnnasSync(CONFIG)
success = sync.run()
sys.exit(0 if success else 1)保存为 /opt/fnnas-sync/sync.py
创建服务文件 /etc/systemd/system/fnnas-sync.service:
ini
[Unit]
Description=fnNAS Mirror Sync Service
After=network.target
[Service]
Type=oneshot
User=root
WorkingDirectory=/opt/fnnas-sync
Environment="PYTHONUNBUFFERED=1"
Environment="FNNAS_DOWNLOAD_DIR=/opt/fnnas-sync/downloads"
ExecStart=/usr/bin/python3 /opt/fnnas-sync/sync.py
StandardOutput=journal
StandardError=journal
[Install]
WantedBy=multi-user.target创建定时器 /etc/systemd/system/fnnas-sync.timer:
ini
[Unit]
Description=Run fnNAS Sync every 6 hours
[Timer]
OnBootSec=5min
OnUnitActiveSec=6h
Persistent=true
[Install]
WantedBy=timers.targetbash
# 重载配置
sudo systemctl daemon-reload
# 启用定时器
sudo systemctl enable fnnas-sync.timer
# 启动定时器
sudo systemctl start fnnas-sync.timer
# 立即执行一次测试
sudo systemctl start fnnas-sync.service
# 查看状态
sudo systemctl status fnnas-sync.service
sudo systemctl list-timers --all | grep fnnas你的文件名模式:fnnas_amlogic_s905l_k6.12.41_****.**.**.img.gz
Python 配置中的正则已匹配:
Python
"filename_pattern": r"fnnas_amlogic_s905l_k6\.12\.41_\d{4}\.\d{2}\.\d{2}\.img\.gz"如果需要同时监控多个类似文件(如 s905d, s905x3 等),修改配置:
Python
# 支持多个设备
"filename_patterns": [
r"fnnas_amlogic_s905l_k6\.12\.41_\d{4}\.\d{2}\.\d{2}\.img\.gz",
r"fnnas_amlogic_s905d_k6\.12\.41_\d{4}\.\d{2}\.\d{2}\.img\.gz",
]当然也可以基于 Docker 容器。