#
twitter 2026-05-10

X(Twitter) 热榜批量下载

By io 90 Views 109 MIN READ 0 Comments
基于 truvaze.com/api/media 接口的 Twitter 排行榜视频批量获取 & 下载工具

📦 环境准备

# 安装依赖
pip install requests tqdm urllib3

本流程默认使用代理

🚀 使用流程

第一步:获取视频 URL 列表

运行以下脚本,从排行榜 API 抓取视频直链,保存到 video_urls.txt

python get_urls.py

可选参数:

参数说明默认值
--range时间范围: monthly/weekly/daily/allmonthly
--sort排序: favorite/pv/recentfavorite
-s起始页码1
-e结束页码全部
--proxy代理地址系统代理
--no-download仅获取链接不下载False
--anime-only仅动漫内容False

示例:

# 获取本月收藏榜前5页
python get_urls.py --range monthly --sort favorite -s 1 -e 5

# 获取本周播放榜(使用代理)
python get_urls.py --range weekly --sort pv --proxy http://127.0.0.1:7890

第二步:下载视频

video_urls.txt 生成后,运行下载脚本自动下载所有视频:

python download.py

下载的视频保存在 downloads/ 目录,支持断点续传自动重试


📜 脚本一:get_urls.py — 获取排行榜视频 URL

#!/usr/bin/env python3
"""
Twitter排行榜视频批量下载工具
从 truvaze.com/api/media 接口获取视频列表并批量下载
"""

import json
import os
import time
import random
import threading
import requests
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List, Dict, Optional
import argparse
from urllib.parse import urlsplit, urlunsplit, parse_qsl

try:
    from tqdm import tqdm
except ImportError:
    tqdm = None

class TwitterVideoDownloader:
    """Twitter排行榜视频下载器"""
    
    def __init__(self, base_url: str = "https://truvaze.com/api/media", 
                 per_page: int = 50, 
                 output_dir: str = "downloads",
                 max_workers: int = 5,
                 proxy: Optional[str] = None):
        """
        初始化下载器
        
        Args:
            base_url: API基础URL
            per_page: 每页数量
            output_dir: 下载目录
            max_workers: 并发下载线程数
            proxy: 代理地址,例如 http://127.0.0.1:7890;不传则自动读取系统代理
        """
        self.base_url = base_url
        self.per_page = per_page
        self.output_dir = output_dir
        self.max_workers = max_workers
        self.download_log_file = os.path.join(self.output_dir, "downloaded_urls.txt")
        self._downloaded_urls = set()
        self._downloaded_urls_lock = threading.Lock()
        env_proxy = (
            os.getenv("HTTPS_PROXY")
            or os.getenv("HTTP_PROXY")
            or os.getenv("https_proxy")
            or os.getenv("http_proxy")
        )
        self.proxy = proxy or env_proxy
        self.session = requests.Session()
        self.session.headers.update({
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
        })
        if self.proxy:
            self.session.proxies.update({
                'http': self.proxy,
                'https': self.proxy,
            })
            if proxy:
                print(f"已启用命令行代理: {self.proxy}")
            else:
                print(f"已启用系统代理: {self.proxy}")
        else:
            print("未配置代理,使用直连。")
        
        # 创建下载目录
        os.makedirs(output_dir, exist_ok=True)
        self._load_downloaded_urls()
        self.base_url, self.base_query_params = self._normalize_api_url(self.base_url)

    def _normalize_api_url(self, raw_url: str):
        """
        支持传入带查询参数的完整 API URL:
        例如 https://.../api/media?range=monthly&page=1...
        返回 (不带query的base_url, query参数字典)。
        """
        parsed = urlsplit(raw_url)
        query_dict = {k: v for k, v in parse_qsl(parsed.query, keep_blank_values=True)}
        clean_url = urlunsplit((parsed.scheme, parsed.netloc, parsed.path, "", ""))
        return clean_url, query_dict

    def _load_downloaded_urls(self):
        """加载历史已下载URL日志。"""
        if not os.path.exists(self.download_log_file):
            return
        try:
            with open(self.download_log_file, "r", encoding="utf-8") as f:
                for line in f:
                    url = line.strip()
                    if url:
                        self._downloaded_urls.add(url)
            print(f"已加载历史下载URL: {len(self._downloaded_urls)} 条")
        except OSError as e:
            print(f"读取下载日志失败,将继续运行: {e}")

    def _mark_url_downloaded(self, url: str):
        """记录URL到历史下载日志,避免后续重复下载。"""
        with self._downloaded_urls_lock:
            if url in self._downloaded_urls:
                return
            self._downloaded_urls.add(url)
            try:
                with open(self.download_log_file, "a", encoding="utf-8") as f:
                    f.write(url + "\n")
            except OSError as e:
                print(f"写入下载日志失败: {e}")

    def _exp_backoff_sleep(
        self,
        attempt: int,
        base: float = 1.0,
        cap: float = 60.0,
        jitter_ratio: float = 0.3
    ) -> float:
        """
        指数退避 + 随机抖动。
        attempt 从 1 开始计数。
        """
        exp = min(cap, base * (2 ** (attempt - 1)))
        jitter = exp * jitter_ratio
        wait_seconds = max(0.0, exp + random.uniform(-jitter, jitter))
        time.sleep(wait_seconds)
        return wait_seconds
        
    def fetch_page(self, page: int, sort: str = "favorite",
                   range_type: str = "monthly",
                   is_anime_only: int = 0) -> Optional[Dict]:
        """
        获取单页视频列表
        
        Args:
            page: 页码
            sort: 排序方式 (favorite, pv, recent)
            range_type: 时间范围 (monthly, weekly, daily, all)
            is_anime_only: 是否仅动漫 (0或1)
            
        Returns:
            API响应的JSON数据,失败返回None
        """
        params = dict(self.base_query_params)
        params.update({
            'range': range_type,
            'page': page,
            'per_page': self.per_page,
            'category': '',
            'ids': '',
            'isAnimeOnly': is_anime_only,
            'sort': sort
        })
        
        try:
            print(f"正在获取第 {page} 页...")
            response = self.session.get(self.base_url, params=params, timeout=30)
            if response.status_code == 429:
                print(f"第 {page} 页触发限流(429)")
                return {"_rate_limited": True}
            response.raise_for_status()
            return response.json()
        except requests.exceptions.RequestException as e:
            print(f"获取第 {page} 页失败: {e}")
            return None
    
    def extract_video_urls(self, data: Dict) -> List[Dict]:
        """
        从API响应中提取视频信息
        
        Args:
            data: API响应的JSON数据
            
        Returns:
            视频信息列表,每个元素包含id, url, 标题等
        """
        videos = []
        items = data.get('items', [])
        
        for item in items:
            video_info = {
                'id': item.get('id'),
                'url': item.get('url'),
                'thumbnail': item.get('thumbnail'),
                'duration': item.get('time', 0),
                'views': item.get('pv', '0'),
                'likes': item.get('favorite', '0'),
                'tweet_account': item.get('tweet_account', 'unknown'),
                'url_cd': item.get('url_cd', '')
            }
            videos.append(video_info)
            
        return videos
    
    def get_all_video_urls(self, start_page: int = 1, end_page: Optional[int] = None,
                          sort: str = "favorite", range_type: str = "monthly",
                          is_anime_only: int = 0, delay: float = 0.5) -> List[Dict]:
        """
        获取所有视频链接
        
        Args:
            start_page: 起始页码
            end_page: 结束页码,None表示获取全部
            sort: 排序方式
            range_type: 时间范围 (monthly, weekly, daily, all)
            is_anime_only: 是否仅动漫 (0或1)
            delay: 请求延迟(秒),避免请求过快
            
        Returns:
            所有视频信息列表
        """
        all_videos = []
        
        # 先获取第一页,确定总页数
        api_max_retry = 6
        first_page_data = None
        for attempt in range(1, api_max_retry + 1):
            first_page_data = self.fetch_page(start_page, sort, range_type, is_anime_only)
            if first_page_data and not first_page_data.get("_rate_limited"):
                break
            if first_page_data and first_page_data.get("_rate_limited"):
                waited = self._exp_backoff_sleep(attempt, base=1.5, cap=90.0, jitter_ratio=0.35)
                print(f"第 {start_page} 页 429退避等待 {waited:.1f}s 后重试 ({attempt}/{api_max_retry})")
                continue
            break

        if not first_page_data:
            print("无法获取第一页数据")
            return []
        if first_page_data.get("_rate_limited"):
            print("第一页持续触发429,已停止本次任务")
            return []
        
        total_pages = first_page_data.get('lastPage', 0)
        total_videos = first_page_data.get('total', 0)
        
        if end_page:
            total_pages = min(total_pages, end_page)
        
        print(f"共发现 {total_pages} 页,总计 {total_videos} 个视频")
        
        # 获取第一页的视频
        videos = self.extract_video_urls(first_page_data)
        all_videos.extend(videos)
        print(f"第 1 页: 获取到 {len(videos)} 个视频")
        
        # 获取剩余页面
        page_iter = range(start_page + 1, total_pages + 1)
        if tqdm is not None:
            page_iter = tqdm(
                page_iter,
                total=max(0, total_pages - start_page),
                desc="获取分页进度",
                unit="page"
            )

        for page in page_iter:
            time.sleep(delay)  # 礼貌性延迟
            data = None
            for attempt in range(1, api_max_retry + 1):
                data = self.fetch_page(page, sort, range_type, is_anime_only)
                if data and not data.get("_rate_limited"):
                    break
                if data and data.get("_rate_limited"):
                    waited = self._exp_backoff_sleep(attempt, base=1.5, cap=90.0, jitter_ratio=0.35)
                    print(f"第 {page} 页 429退避等待 {waited:.1f}s 后重试 ({attempt}/{api_max_retry})")
                    continue
                break

            if data:
                if data.get("_rate_limited"):
                    print(f"第 {page} 页持续触发429,跳过")
                    continue
                videos = self.extract_video_urls(data)
                all_videos.extend(videos)
                print(f"第 {page} 页: 获取到 {len(videos)} 个视频 (累计: {len(all_videos)})")
            else:
                print(f"第 {page} 页获取失败,跳过")
                
        return all_videos
    
    def download_single_video(self, video_info: Dict, retry: int = 3) -> bool:
        """
        下载单个视频
        
        Args:
            video_info: 视频信息字典
            retry: 重试次数
            
        Returns:
            下载成功返回True,失败返回False
        """
        video_url = video_info.get('url')
        if not video_url:
            return False

        with self._downloaded_urls_lock:
            if video_url in self._downloaded_urls:
                return True
            
        video_id = video_info.get('id', 'unknown')
        account = video_info.get('tweet_account', 'unknown')
        
        # 从URL中提取文件扩展名
        if '.mp4' in video_url:
            ext = '.mp4'
        else:
            ext = '.mp4'  # 默认
        
        # 生成文件名
        filename = f"{video_id}_{account}_{video_info.get('url_cd', '')}{ext}"
        # 清理文件名中的非法字符
        filename = "".join(c for c in filename if c.isalnum() or c in '._-')
        filepath = os.path.join(self.output_dir, filename)
        
        # 如果文件已存在,跳过
        if os.path.exists(filepath):
            self._mark_url_downloaded(video_url)
            return True
        
        for attempt in range(retry):
            try:
                response = self.session.get(video_url, stream=True, timeout=60)
                if response.status_code == 429:
                    waited = self._exp_backoff_sleep(attempt + 1, base=2.0, cap=120.0, jitter_ratio=0.35)
                    if tqdm is not None:
                        tqdm.write(f"下载触发限流(429),等待 {waited:.1f}s 后重试 ({attempt + 1}/{retry})")
                    else:
                        print(f"下载触发限流(429),等待 {waited:.1f}s 后重试 ({attempt + 1}/{retry})")
                    continue
                response.raise_for_status()
                
                # 写入文件
                with open(filepath, 'wb') as f:
                    for chunk in response.iter_content(chunk_size=8192):
                        if chunk:
                            f.write(chunk)
                self._mark_url_downloaded(video_url)
                return True
                
            except Exception as e:
                if tqdm is not None:
                    tqdm.write(f"下载失败 {filename} (尝试 {attempt + 1}/{retry}): {e}")
                else:
                    print(f"下载失败 {filename} (尝试 {attempt + 1}/{retry}): {e}")
                if attempt < retry - 1:
                    time.sleep(2)
                    
        return False
    
    def download_all_videos(self, videos: List[Dict], max_workers: int = None):
        """
        并发下载所有视频
        
        Args:
            videos: 视频信息列表
            max_workers: 并发数,默认使用初始化时的值
        """
        if not videos:
            print("没有视频需要下载")
            return

        # 任务内按 URL 去重,避免重复提交下载任务
        deduped_videos = []
        seen_urls = set()
        duplicate_count = 0
        for v in videos:
            url = v.get("url")
            if not url:
                continue
            if url in seen_urls:
                duplicate_count += 1
                continue
            seen_urls.add(url)
            deduped_videos.append(v)
            
        workers = max_workers or self.max_workers
        print(f"\n开始下载 {len(deduped_videos)} 个视频,并发数: {workers}")
        if duplicate_count:
            print(f"已跳过任务内重复URL: {duplicate_count} 条")
        print("=" * 50)
        
        success_count = 0
        fail_count = 0
        
        with ThreadPoolExecutor(max_workers=workers) as executor:
            # 提交所有下载任务
            future_to_video = {
                executor.submit(self.download_single_video, video): video 
                for video in deduped_videos
            }
            
            # 处理完成的任务
            if tqdm is not None:
                with tqdm(total=len(deduped_videos), desc="下载进度", unit="video", dynamic_ncols=True) as pbar:
                    for future in as_completed(future_to_video):
                        video = future_to_video[future]
                        try:
                            if future.result():
                                success_count += 1
                            else:
                                fail_count += 1
                        except Exception as e:
                            tqdm.write(f"下载出错: {e}")
                            fail_count += 1
                        finally:
                            pbar.update(1)
                            pbar.set_postfix(success=success_count, fail=fail_count)
            else:
                done = 0
                total = len(deduped_videos)
                for future in as_completed(future_to_video):
                    video = future_to_video[future]
                    try:
                        if future.result():
                            success_count += 1
                        else:
                            fail_count += 1
                    except Exception as e:
                        print(f"下载出错: {e}")
                        fail_count += 1
                    finally:
                        done += 1
                        print(f"下载进度: {done}/{total}")
                    
        print("=" * 50)
        print(f"下载完成!成功: {success_count}, 失败: {fail_count}")
    
    def save_urls_to_file(self, videos: List[Dict], filename: str = "video_urls.txt"):
        """
        将视频链接保存到文件
        
        Args:
            videos: 视频信息列表
            filename: 保存的文件名
        """
        with open(filename, 'w', encoding='utf-8') as f:
            for video in videos:
                f.write(video['url'] + '\n')
        print(f"视频链接已保存到 {filename}")

    def load_urls_from_file(self, filename: str = "video_urls.txt") -> List[Dict]:
        """
        从文件加载视频URL,返回与下载流程兼容的视频信息列表。
        """
        if not os.path.exists(filename):
            return []

        videos = []
        seen = set()
        with open(filename, 'r', encoding='utf-8') as f:
            for line in f:
                url = line.strip()
                if not url:
                    continue
                if url in seen:
                    continue
                seen.add(url)

                # 用 URL 最后一段构造基础元信息,便于生成文件名
                tail = url.split('/')[-1].split('?')[0]
                url_cd = tail.replace('.mp4', '')
                videos.append({
                    'id': f'fromfile_{len(videos)+1}',
                    'url': url,
                    'thumbnail': '',
                    'duration': 0,
                    'views': 'N/A',
                    'likes': 'N/A',
                    'tweet_account': 'from_file',
                    'url_cd': url_cd
                })
        print(f"已从文件加载 {len(videos)} 条URL: {filename}")
        return videos
        
    def get_statistics(self, videos: List[Dict]):
        """获取视频统计信息"""
        if not videos:
            print("无数据")
            return
            
        total_duration = sum(v.get('duration', 0) for v in videos)
        total_hours = total_duration / 3600
        
        print("\n统计信息:")
        print(f"  总视频数: {len(videos)}")
        print(f"  总时长: {total_duration} 秒 ({total_hours:.1f} 小时)")
        
        # 统计各账号视频数量
        account_count = {}
        for v in videos:
            account = v.get('tweet_account', 'unknown')
            account_count[account] = account_count.get(account, 0) + 1
        
        top_accounts = sorted(account_count.items(), key=lambda x: x[1], reverse=True)[:10]
        print(f"  前10个活跃账号:")
        for account, count in top_accounts:
            print(f"    {account}: {count} 个视频")


def main():
    parser = argparse.ArgumentParser(description='Twitter排行榜视频批量下载工具')
    parser.add_argument('--api-url', type=str, default='https://truvaze.com/api/media?range=monthly&page=1&per_page=50&category=&ids=&isAnimeOnly=0&sort=favorite',
                       help='API地址,支持完整URL+查询参数')
    parser.add_argument('--proxy', type=str, default='',
                       help='HTTP/HTTPS代理,例如: http://127.0.0.1:7890;不传则使用系统代理')
    parser.add_argument('-s', '--start-page', type=int, default=1,
                       help='起始页码 (默认: 1)')
    parser.add_argument('-e', '--end-page', type=int, default=None,
                       help='结束页码 (默认: 获取全部)')
    parser.add_argument('-p', '--per-page', type=int, default=50,
                       help='每页数量 (默认: 50)')
    parser.add_argument('-o', '--output', type=str, default='downloads',
                       help='输出目录 (默认: downloads)')
    parser.add_argument('-w', '--workers', type=int, default=5,
                       help='并发下载线程数 (默认: 5)')
    parser.add_argument('--sort', type=str, default='favorite',
                       choices=['favorite', 'favorit', 'pv', 'recent'],
                       help='排序方式: favorite(收藏), pv(播放), recent(最新)')
    parser.add_argument('--range', dest='range_type', type=str, default='monthly',
                       choices=['monthly', 'weekly', 'daily', 'all', ''],
                       help='时间范围 (默认: monthly)')
    parser.add_argument('--anime-only', action='store_true',
                       help='仅下载动漫内容')
    parser.add_argument('--no-download', action='store_true',
                       help='仅获取链接,不下载')
    parser.add_argument('--delay', type=float, default=0.5,
                       help='API请求延迟(秒) (默认: 0.5)')
    parser.add_argument('--url-file', type=str, default='video_urls.txt',
                       help='URL文件名 (默认: video_urls.txt)')
    parser.add_argument('--refresh-urls', action='store_true',
                       help='强制重新抓取URL并覆盖URL文件(即使文件已存在)')
    
    args = parser.parse_args()
    
    # 创建下载器
    downloader = TwitterVideoDownloader(
        base_url=args.api_url,
        per_page=args.per_page,
        output_dir=args.output,
        max_workers=args.workers,
        proxy=args.proxy or None
    )
    
    url_file = os.path.join(os.path.dirname(__file__), args.url_file)
    videos = []

    # 优先使用本地 URL 文件:存在则跳过抓取,直接下载
    if os.path.exists(url_file) and not args.refresh_urls:
        print(f"检测到已存在URL文件,跳过抓取: {url_file}")
        videos = downloader.load_urls_from_file(url_file)
    else:
        if args.refresh_urls and os.path.exists(url_file):
            print(f"已开启强制刷新,将重新抓取并覆盖: {url_file}")
        print("未检测到URL文件,开始获取视频列表...")
        print("-" * 50)

        normalized_sort = 'favorite' if args.sort == 'favorit' else args.sort
        videos = downloader.get_all_video_urls(
            start_page=args.start_page,
            end_page=args.end_page,
            sort=normalized_sort,
            range_type=args.range_type,
            is_anime_only=1 if args.anime_only else 0,
            delay=args.delay
        )

        if not videos:
            print("未获取到任何视频")
            return

        # 显示统计信息(仅 API 抓取模式)
        downloader.get_statistics(videos)

        # 保存链接到文件
        downloader.save_urls_to_file(videos, url_file)

    if not videos:
        print("URL文件为空或无有效链接,结束。")
        return
    
    # 下载视频
    if not args.no_download:
        downloader.download_all_videos(videos)
    else:
        print("\n仅获取链接模式,跳过下载")
        print(f"可以使用 yt-dlp -a \"{url_file}\" 来下载")


if __name__ == "__main__":
    main()

📜 脚本二:download.py — 根据 video_urls.txt 下载视频

import os
import sys
from typing import Optional
from urllib.parse import urlparse

import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

try:
    from tqdm import tqdm
except ImportError:
    tqdm = None

try:
    # Windows 下尽量避免中文输出乱码(终端仍可能由 code page 决定)
    if hasattr(sys.stdout, "reconfigure"):
        sys.stdout.reconfigure(errors="replace")
except Exception:
    pass


def _download_with_fallback_progress(
    response, f, total: Optional[int], label: str
) -> None:
    """无 tqdm 时按已下载字节与 Content-Length 打印简单进度。"""
    downloaded = 0
    for chunk in response.iter_content(chunk_size=8192):
        if not chunk:
            continue
        f.write(chunk)
        downloaded += len(chunk)
        if total and total > 0:
            pct = min(100, downloaded * 100 // total)
            print(f"\r{label} {pct}% ({downloaded}/{total} B)", end="", flush=True)
    if total and total > 0:
        print()


def load_urls_from_file(file_path: str) -> list:
    """从文本文件逐行读取 URL,自动过滤空行和重复项。"""
    if not os.path.exists(file_path):
        raise FileNotFoundError(f"URL 文件不存在: {file_path}")

    urls = []
    seen = set()
    with open(file_path, "r", encoding="utf-8") as f:
        for raw in f:
            url = raw.strip()
            if not url:
                continue
            if url in seen:
                print(f"URL重复,跳过: {url}")
                continue
            seen.add(url)
            urls.append(url)
    return urls


def build_proxies(proxy_url: Optional[str] = None) -> Optional[dict]:
    """
    构建 requests 代理配置。
    优先级:函数参数 > 环境变量 HTTP_PROXY/HTTPS_PROXY。
    """
    proxy = proxy_url or os.getenv("HTTPS_PROXY") or os.getenv("HTTP_PROXY")
    if not proxy:
        return None
    return {"http": proxy, "https": proxy}


def download_twitter_video(url, output_dir="./downloads", proxies: Optional[dict] = None):
    """
    下载Twitter/X视频
    :param url: 完整的视频直链
    :param output_dir: 保存目录
    """
    # 创建保存目录
    os.makedirs(output_dir, exist_ok=True)
    
    # 从URL中提取文件名(使用随机部分+分辨率)
    parsed = urlparse(url)
    path_parts = parsed.path.split('/')
    # 示例:.../480x772/vsPghorwKeBNFQDY.mp4
    random_name = path_parts[-1]  # vsPghorwKeBNFQDY.mp4
    resolution = path_parts[-2]   # 480x772
    video_id = path_parts[-4]     # 1781326937954365441
    
    # 自定义保存文件名
    filename = f"{video_id}_{resolution}_{random_name}"
    save_path = os.path.join(output_dir, filename)

    headers = {
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
    }

    # 更稳:复用 Session + 自动重试(含 429/5xx)
    session = requests.Session()
    retry = Retry(
        total=8,
        connect=8,
        read=8,
        status=8,
        backoff_factor=0.8,
        status_forcelist=(429, 500, 502, 503, 504),
        allowed_methods=("GET", "HEAD"),
        raise_on_status=False,
        respect_retry_after_header=True,
    )
    adapter = HTTPAdapter(max_retries=retry, pool_connections=16, pool_maxsize=16)
    session.mount("https://", adapter)
    session.mount("http://", adapter)
    if proxies:
        session.proxies.update(proxies)

    desc = os.path.basename(save_path)

    # 断点续传:如果已存在且未下完,则用 Range 接着下
    max_attempts = 6
    attempt = 0
    while attempt < max_attempts:
        attempt += 1
        try:
            existing = 0
            try:
                existing = os.path.getsize(save_path) if os.path.exists(save_path) else 0
            except OSError:
                existing = 0

            req_headers = dict(headers)
            mode = "wb"
            if existing > 0:
                req_headers["Range"] = f"bytes={existing}-"
                mode = "ab"

            print(f"正在下载: {url}")
            if existing > 0:
                print(f"断点续传: {save_path} (已存在 {existing} bytes)")

            # timeout: (连接超时, 读超时) —— 读超时给大一点防止中途卡顿
            response = session.get(
                url,
                headers=req_headers,
                stream=True,
                timeout=(15, 180),
            )
            if response.status_code == 416 and existing > 0:
                # Range 超出:可能已完整,也可能远端大小变了;用 HEAD 校验
                head = session.head(url, headers=headers, timeout=(15, 60), allow_redirects=True)
                cl_head = head.headers.get("Content-Length")
                total_head = int(cl_head) if cl_head and cl_head.isdigit() else None
                if total_head is not None and total_head == existing:
                    print(f"已存在,跳过: {save_path}")
                    return
                try:
                    os.remove(save_path)
                    print(f"本地文件疑似不完整,已删除重下: {save_path}")
                except OSError:
                    print(f"本地文件疑似不完整,但删除失败,将覆盖重下: {save_path}")
                existing = 0
                req_headers = dict(headers)
                mode = "wb"
                response = session.get(
                    url,
                    headers=req_headers,
                    stream=True,
                    timeout=(15, 180),
                )
            response.raise_for_status()

            cl = response.headers.get("Content-Length")
            total = int(cl) if cl and cl.isdigit() else None
            # 206 时 Content-Length 只是剩余部分,完整大小需加上 existing
            if response.status_code == 206 and total is not None:
                total = existing + total

            with open(save_path, mode) as f:
                if tqdm is not None:
                    bar_fmt = "{desc}: {percentage:3.0f}%|{bar}| {n_fmt}/{total_fmt}"
                    initial = existing if mode == "ab" else 0
                    with tqdm(
                        total=total,
                        initial=initial,
                        unit="B",
                        unit_scale=True,
                        unit_divisor=1024,
                        desc=desc,
                        file=sys.stdout,
                        bar_format=bar_fmt,
                    ) as pbar:
                        for chunk in response.iter_content(chunk_size=1024 * 256):
                            if chunk:
                                f.write(chunk)
                                pbar.update(len(chunk))
                else:
                    _download_with_fallback_progress(response, f, total, label=desc)

            # 校验:有 Content-Length 时确保下载完整,否则下次继续续传
            try:
                final_size = os.path.getsize(save_path)
            except OSError:
                final_size = None

            if total is not None and final_size is not None and final_size < total:
                print(f"下载未完成({final_size}/{total} bytes),稍后继续重试…")
                continue

            print(f"下载成功: {save_path}")
            return
        except (requests.exceptions.ReadTimeout, requests.exceptions.ConnectionError) as e:
            print(f"网络波动(第 {attempt}/{max_attempts} 次): {e},将继续重试…")
            continue
        except Exception as e:
            print(f"下载失败(第 {attempt}/{max_attempts} 次): {e}")
            continue

if __name__ == "__main__":
    urls_file = os.path.join(os.path.dirname(__file__), "video_urls.txt")
    output_dir = os.path.join(os.path.dirname(__file__), "downloads")

    # 这里可直接填你的代理,例如:
    # proxy_url = "http://127.0.0.1:7890"
    # 不填则自动读取环境变量 HTTP_PROXY / HTTPS_PROXY
    proxy_url = ""
    proxies = build_proxies(proxy_url=proxy_url or None)
    if proxies:
        print(f"已启用代理: {proxies.get('https')}")
    else:
        print("未配置代理,使用直连下载。")

    video_urls = load_urls_from_file(urls_file)
    print(f"读取到 {len(video_urls)} 条 URL,开始下载...")

    for url in video_urls:
        download_twitter_video(url, output_dir=output_dir, proxies=proxies)

⚙️ 完整使用步骤

# 1. 安装依赖
pip install requests tqdm urllib3

# 2. 获取视频 URL 列表 → 生成 video_urls.txt
python get_urls.py

# 3. 根据 video_urls.txt 批量下载视频
python download.py

⚠️ 注意事项

  • 请遵守 truvaze.com 的服务条款
  • 视频版权归原作者所有,请合理使用
  • 建议使用代理下载,更稳定
  • 下载中途中断可重新运行,支持断点续传

本文由 io 原创

采用 CC BY-NC-SA 4.0 协议进行许可

转载请注明出处:https://godd.asia/index.php/archives/12/

TAGS: twitter

相关推荐

  • 暂无相关推荐,看看别的吧。

0 评论

发表评论