三多运维

Python下载GitHub的飞牛NAS镜像包

2026/02/09
7
0

设计一个本地 Ubuntu 24.04 服务器自动下载 fnNAS 镜像的方案,支持版本号变化检测,Python下载GitHub的飞牛NAS镜像包有效解决国内网络稳定性问题,不耽误使用,适用于GitHub所有镜像拉取。

方案架构

本地 Ubuntu 24.04 服务器
    ↓ 定时任务 (cron/systemd timer)
自动检测脚本 (Python/Bash)
    ↓ 匹配版本模式
下载最新镜像 → 验证 → 通知

方案一:Python 脚本(推荐,功能完善)

步骤 1:安装依赖

bash

sudo apt update
sudo apt install -y python3-pip python3-venv curl jq

步骤 2:创建脚本目录

bash

sudo mkdir -p /opt/fnnas-sync
sudo chown $USER:$USER /opt/fnnas-sync
cd /opt/fnnas-sync

步骤 3:创建 Python 脚本

Python

#!/usr/bin/env python3
"""
fnNAS 镜像自动下载脚本
支持版本号变化检测和断点续传
"""

import os
import sys
import json
import re
import hashlib
import requests
from datetime import datetime
from pathlib import Path
import logging

# ==================== 配置区域 ====================
CONFIG = {
    # GitHub 仓库信息
    "github_repo": "ophub/fnnas",
    "github_api": "https://api.github.com/repos/ophub/fnnas/releases/latest",
    
    # 文件匹配模式(正则表达式)
    # 匹配:fnnas_amlogic_s905l_k6.12.41_2026.02.02.img.gz
    "filename_pattern": r"fnnas_amlogic_s905l_k6\.12\.41_\d{4}\.\d{2}\.\d{2}\.img\.gz",
    
    # 下载目录
    "download_dir": "/opt/fnnas-sync/downloads",
    
    # 状态文件(记录已下载的版本)
    "state_file": "/opt/fnnas-sync/state.json",
    
    # 日志文件
    "log_file": "/var/log/fnnas-sync.log",
    
    # 保留历史版本数量(0表示不删除旧版本)
    "keep_versions": 2,
    
    # 下载超时(秒)
    "timeout": 600,
    
    # 断点续传分块大小(字节)
    "chunk_size": 8192,
    
    # 是否验证文件完整性(如果有校验文件)
    "verify_checksum": True,
    
    # 通知配置(可选)
    "notify": {
        "enabled": False,
        "webhook_url": "",  # 钉钉/飞书/企业微信机器人地址
    }
}

# =================================================

class FnnasSync:
    def __init__(self, config):
        self.config = config
        self.setup_logging()
        self.session = requests.Session()
        self.session.headers.update({
            'User-Agent': 'fnNAS-Sync-Bot/1.0',
            'Accept': 'application/vnd.github.v3+json'
        })
        
        # 确保目录存在
        Path(config['download_dir']).mkdir(parents=True, exist_ok=True)
        
        # 加载状态
        self.state = self.load_state()
        
    def setup_logging(self):
        """配置日志"""
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s [%(levelname)s] %(message)s',
            handlers=[
                logging.FileHandler(self.config['log_file']),
                logging.StreamHandler(sys.stdout)
            ]
        )
        self.logger = logging.getLogger(__name__)
        
    def load_state(self):
        """加载已下载状态"""
        if os.path.exists(self.config['state_file']):
            try:
                with open(self.config['state_file'], 'r') as f:
                    return json.load(f)
            except Exception as e:
                self.logger.warning(f"加载状态文件失败: {e}")
        return {"downloaded": [], "last_check": None}
    
    def save_state(self):
        """保存下载状态"""
        self.state['last_check'] = datetime.now().isoformat()
        with open(self.config['state_file'], 'w') as f:
            json.dump(self.state, f, indent=2)
    
    def get_latest_release(self):
        """获取 GitHub 最新 Release 信息"""
        try:
            response = self.session.get(
                self.config['github_api'],
                timeout=30
            )
            response.raise_for_status()
            return response.json()
        except Exception as e:
            self.logger.error(f"获取 Release 信息失败: {e}")
            return None
    
    def find_target_asset(self, release):
        """查找匹配的文件"""
        assets = release.get('assets', [])
        pattern = re.compile(self.config['filename_pattern'])
        
        for asset in assets:
            name = asset['name']
            if pattern.match(name):
                return asset
        
        self.logger.warning("未找到匹配的文件,可用文件列表:")
        for asset in assets:
            self.logger.warning(f"  - {asset['name']}")
        return None
    
    def is_already_downloaded(self, asset):
        """检查是否已下载且完整"""
        filename = asset['name']
        filepath = os.path.join(self.config['download_dir'], filename)
        
        if not os.path.exists(filepath):
            return False
            
        # 检查文件大小
        local_size = os.path.getsize(filepath)
        remote_size = asset['size']
        
        if local_size == remote_size:
            self.logger.info(f"文件已存在且大小匹配: {filename}")
            return True
        else:
            self.logger.info(f"文件大小不匹配,本地: {local_size}, 远程: {remote_size}")
            return False
    
    def download_file(self, asset):
        """下载文件(支持断点续传)"""
        url = asset['browser_download_url']
        filename = asset['name']
        filepath = os.path.join(self.config['download_dir'], filename)
        total_size = asset['size']
        
        # 检查已下载部分
        downloaded_size = 0
        if os.path.exists(filepath):
            downloaded_size = os.path.getsize(filepath)
            if downloaded_size == total_size:
                self.logger.info("文件已完整下载")
                return True
            self.logger.info(f"断点续传,已下载: {downloaded_size}/{total_size} bytes")
        
        headers = {}
        if downloaded_size > 0:
            headers['Range'] = f'bytes={downloaded_size}-'
        
        try:
            response = self.session.get(
                url,
                headers=headers,
                stream=True,
                timeout=self.config['timeout']
            )
            response.raise_for_status()
            
            # 确定写入模式
            mode = 'ab' if downloaded_size > 0 else 'wb'
            
            with open(filepath, mode) as f:
                for chunk in response.iter_content(chunk_size=self.config['chunk_size']):
                    if chunk:
                        f.write(chunk)
                        downloaded_size += len(chunk)
                        
                        # 显示进度
                        if total_size > 0:
                            percent = (downloaded_size / total_size) * 100
                            print(f"\r下载进度: {percent:.1f}% ({downloaded_size}/{total_size})", end='')
            
            print()  # 换行
            self.logger.info(f"下载完成: {filename}")
            
            # 验证文件大小
            final_size = os.path.getsize(filepath)
            if final_size != total_size:
                self.logger.error(f"文件大小验证失败: {final_size} != {total_size}")
                return False
                
            return True
            
        except Exception as e:
            self.logger.error(f"下载失败: {e}")
            return False
    
    def verify_checksum(self, asset, release):
        """验证文件校验和(如果 release 中有提供)"""
        # 查找同名的 checksum 文件
        checksum_extensions = ['.sha256', '.md5', '.sha256sum']
        filename = asset['name']
        
        for ext in checksum_extensions:
            checksum_name = filename + ext
            # 在 release body 或 assets 中查找
            for line in release.get('body', '').split('\n'):
                if checksum_name in line or filename in line:
                    # 尝试解析校验和
                    parts = line.split()
                    if len(parts) >= 2:
                        expected_hash = parts[0]
                        self.logger.info(f"找到校验和: {expected_hash[:16]}...")
                        # 计算实际校验和
                        filepath = os.path.join(self.config['download_dir'], filename)
                        actual_hash = self.calculate_hash(filepath, 'sha256')
                        if actual_hash == expected_hash:
                            self.logger.info("✓ 校验和验证通过")
                            return True
                        else:
                            self.logger.error("✗ 校验和不匹配!")
                            return False
        return True  # 没有找到校验文件,跳过验证
    
    def calculate_hash(self, filepath, algorithm='sha256'):
        """计算文件哈希值"""
        hash_obj = hashlib.new(algorithm)
        with open(filepath, 'rb') as f:
            for chunk in iter(lambda: f.read(8192), b''):
                hash_obj.update(chunk)
        return hash_obj.hexdigest()
    
    def cleanup_old_versions(self, current_filename):
        """清理旧版本文件"""
        if self.config['keep_versions'] <= 0:
            return
            
        pattern = re.compile(self.config['filename_pattern'])
        download_dir = Path(self.config['download_dir'])
        
        # 查找所有匹配的文件
        files = []
        for f in download_dir.iterdir():
            if f.is_file() and pattern.match(f.name):
                files.append((f, f.stat().st_mtime))
        
        # 按修改时间排序
        files.sort(key=lambda x: x[1], reverse=True)
        
        # 删除旧版本
        if len(files) > self.config['keep_versions']:
            for old_file, _ in files[self.config['keep_versions']:]:
                if old_file.name != current_filename:
                    self.logger.info(f"删除旧版本: {old_file.name}")
                    old_file.unlink()
    
    def send_notification(self, message):
        """发送通知"""
        if not self.config['notify']['enabled']:
            return
            
        webhook_url = self.config['notify']['webhook_url']
        if not webhook_url:
            return
            
        try:
            payload = {
                "msgtype": "text",
                "text": {"content": message}
            }
            self.session.post(webhook_url, json=payload, timeout=10)
        except Exception as e:
            self.logger.error(f"发送通知失败: {e}")
    
    def run(self):
        """主运行逻辑"""
        self.logger.info("=" * 50)
        self.logger.info("开始检查 fnNAS 镜像更新")
        
        # 获取最新 Release
        release = self.get_latest_release()
        if not release:
            self.logger.error("无法获取 Release 信息,退出")
            return False
        
        version = release.get('tag_name', 'unknown')
        self.logger.info(f"最新版本: {version}")
        
        # 查找目标文件
        asset = self.find_target_asset(release)
        if not asset:
            self.logger.error("未找到匹配的镜像文件")
            return False
        
        filename = asset['name']
        self.logger.info(f"目标文件: {filename}")
        self.logger.info(f"文件大小: {asset['size'] / 1024 / 1024:.2f} MB")
        
        # 检查是否已下载
        if self.is_already_downloaded(asset):
            self.logger.info("文件已是最新,无需下载")
            self.save_state()
            return True
        
        # 下载文件
        self.logger.info("开始下载...")
        if not self.download_file(asset):
            self.logger.error("下载失败")
            return False
        
        # 验证校验和
        if self.config['verify_checksum']:
            self.verify_checksum(asset, release)
        
        # 更新状态
        self.state['downloaded'].append({
            'filename': filename,
            'version': version,
            'downloaded_at': datetime.now().isoformat(),
            'size': asset['size']
        })
        self.save_state()
        
        # 清理旧版本
        self.cleanup_old_versions(filename)
        
        # 发送通知
        message = f"fnNAS 镜像更新\n版本: {version}\n文件: {filename}\n时间: {datetime.now().strftime('%Y-%m-%d %H:%M')}"
        self.send_notification(message)
        
        self.logger.info("同步完成")
        return True


if __name__ == "__main__":
    # 支持通过环境变量覆盖配置
    if os.getenv('FNNAS_DOWNLOAD_DIR'):
        CONFIG['download_dir'] = os.getenv('FNNAS_DOWNLOAD_DIR')
    
    sync = FnnasSync(CONFIG)
    success = sync.run()
    sys.exit(0 if success else 1)

保存为 /opt/fnnas-sync/sync.py

步骤 4:创建 Systemd 服务

创建服务文件 /etc/systemd/system/fnnas-sync.service

ini

[Unit]
Description=fnNAS Mirror Sync Service
After=network.target

[Service]
Type=oneshot
User=root
WorkingDirectory=/opt/fnnas-sync
Environment="PYTHONUNBUFFERED=1"
Environment="FNNAS_DOWNLOAD_DIR=/opt/fnnas-sync/downloads"
ExecStart=/usr/bin/python3 /opt/fnnas-sync/sync.py
StandardOutput=journal
StandardError=journal

[Install]
WantedBy=multi-user.target

创建定时器 /etc/systemd/system/fnnas-sync.timer

ini

[Unit]
Description=Run fnNAS Sync every 6 hours

[Timer]
OnBootSec=5min
OnUnitActiveSec=6h
Persistent=true

[Install]
WantedBy=timers.target

步骤 5:启动服务

bash

# 重载配置
sudo systemctl daemon-reload

# 启用定时器
sudo systemctl enable fnnas-sync.timer

# 启动定时器
sudo systemctl start fnnas-sync.timer

# 立即执行一次测试
sudo systemctl start fnnas-sync.service

# 查看状态
sudo systemctl status fnnas-sync.service
sudo systemctl list-timers --all | grep fnnas

针对需求优化

你的文件名模式:fnnas_amlogic_s905l_k6.12.41_****.**.**.img.gz

Python 配置中的正则已匹配:

Python

"filename_pattern": r"fnnas_amlogic_s905l_k6\.12\.41_\d{4}\.\d{2}\.\d{2}\.img\.gz"

如果需要同时监控多个类似文件(如 s905d, s905x3 等),修改配置:

Python

# 支持多个设备
"filename_patterns": [
    r"fnnas_amlogic_s905l_k6\.12\.41_\d{4}\.\d{2}\.\d{2}\.img\.gz",
    r"fnnas_amlogic_s905d_k6\.12\.41_\d{4}\.\d{2}\.\d{2}\.img\.gz",
]

当然也可以基于 Docker 容器。