#
twitter
2026-05-10
X(Twitter) 热榜批量下载
基于 truvaze.com/api/media 接口的 Twitter 排行榜视频批量获取 & 下载工具📦 环境准备
# 安装依赖
pip install requests tqdm urllib3本流程默认使用代理
🚀 使用流程
第一步:获取视频 URL 列表
运行以下脚本,从排行榜 API 抓取视频直链,保存到 video_urls.txt。
python get_urls.py可选参数:
| 参数 | 说明 | 默认值 |
|---|---|---|
--range | 时间范围: monthly/weekly/daily/all | monthly |
--sort | 排序: favorite/pv/recent | favorite |
-s | 起始页码 | 1 |
-e | 结束页码 | 全部 |
--proxy | 代理地址 | 系统代理 |
--no-download | 仅获取链接不下载 | False |
--anime-only | 仅动漫内容 | False |
示例:
# 获取本月收藏榜前5页
python get_urls.py --range monthly --sort favorite -s 1 -e 5
# 获取本周播放榜(使用代理)
python get_urls.py --range weekly --sort pv --proxy http://127.0.0.1:7890第二步:下载视频
video_urls.txt 生成后,运行下载脚本自动下载所有视频:
python download.py下载的视频保存在 downloads/ 目录,支持断点续传和自动重试。
📜 脚本一:get_urls.py — 获取排行榜视频 URL
#!/usr/bin/env python3
"""
Twitter排行榜视频批量下载工具
从 truvaze.com/api/media 接口获取视频列表并批量下载
"""
import json
import os
import time
import random
import threading
import requests
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List, Dict, Optional
import argparse
from urllib.parse import urlsplit, urlunsplit, parse_qsl
try:
from tqdm import tqdm
except ImportError:
tqdm = None
class TwitterVideoDownloader:
"""Twitter排行榜视频下载器"""
def __init__(self, base_url: str = "https://truvaze.com/api/media",
per_page: int = 50,
output_dir: str = "downloads",
max_workers: int = 5,
proxy: Optional[str] = None):
"""
初始化下载器
Args:
base_url: API基础URL
per_page: 每页数量
output_dir: 下载目录
max_workers: 并发下载线程数
proxy: 代理地址,例如 http://127.0.0.1:7890;不传则自动读取系统代理
"""
self.base_url = base_url
self.per_page = per_page
self.output_dir = output_dir
self.max_workers = max_workers
self.download_log_file = os.path.join(self.output_dir, "downloaded_urls.txt")
self._downloaded_urls = set()
self._downloaded_urls_lock = threading.Lock()
env_proxy = (
os.getenv("HTTPS_PROXY")
or os.getenv("HTTP_PROXY")
or os.getenv("https_proxy")
or os.getenv("http_proxy")
)
self.proxy = proxy or env_proxy
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
})
if self.proxy:
self.session.proxies.update({
'http': self.proxy,
'https': self.proxy,
})
if proxy:
print(f"已启用命令行代理: {self.proxy}")
else:
print(f"已启用系统代理: {self.proxy}")
else:
print("未配置代理,使用直连。")
# 创建下载目录
os.makedirs(output_dir, exist_ok=True)
self._load_downloaded_urls()
self.base_url, self.base_query_params = self._normalize_api_url(self.base_url)
def _normalize_api_url(self, raw_url: str):
"""
支持传入带查询参数的完整 API URL:
例如 https://.../api/media?range=monthly&page=1...
返回 (不带query的base_url, query参数字典)。
"""
parsed = urlsplit(raw_url)
query_dict = {k: v for k, v in parse_qsl(parsed.query, keep_blank_values=True)}
clean_url = urlunsplit((parsed.scheme, parsed.netloc, parsed.path, "", ""))
return clean_url, query_dict
def _load_downloaded_urls(self):
"""加载历史已下载URL日志。"""
if not os.path.exists(self.download_log_file):
return
try:
with open(self.download_log_file, "r", encoding="utf-8") as f:
for line in f:
url = line.strip()
if url:
self._downloaded_urls.add(url)
print(f"已加载历史下载URL: {len(self._downloaded_urls)} 条")
except OSError as e:
print(f"读取下载日志失败,将继续运行: {e}")
def _mark_url_downloaded(self, url: str):
"""记录URL到历史下载日志,避免后续重复下载。"""
with self._downloaded_urls_lock:
if url in self._downloaded_urls:
return
self._downloaded_urls.add(url)
try:
with open(self.download_log_file, "a", encoding="utf-8") as f:
f.write(url + "\n")
except OSError as e:
print(f"写入下载日志失败: {e}")
def _exp_backoff_sleep(
self,
attempt: int,
base: float = 1.0,
cap: float = 60.0,
jitter_ratio: float = 0.3
) -> float:
"""
指数退避 + 随机抖动。
attempt 从 1 开始计数。
"""
exp = min(cap, base * (2 ** (attempt - 1)))
jitter = exp * jitter_ratio
wait_seconds = max(0.0, exp + random.uniform(-jitter, jitter))
time.sleep(wait_seconds)
return wait_seconds
def fetch_page(self, page: int, sort: str = "favorite",
range_type: str = "monthly",
is_anime_only: int = 0) -> Optional[Dict]:
"""
获取单页视频列表
Args:
page: 页码
sort: 排序方式 (favorite, pv, recent)
range_type: 时间范围 (monthly, weekly, daily, all)
is_anime_only: 是否仅动漫 (0或1)
Returns:
API响应的JSON数据,失败返回None
"""
params = dict(self.base_query_params)
params.update({
'range': range_type,
'page': page,
'per_page': self.per_page,
'category': '',
'ids': '',
'isAnimeOnly': is_anime_only,
'sort': sort
})
try:
print(f"正在获取第 {page} 页...")
response = self.session.get(self.base_url, params=params, timeout=30)
if response.status_code == 429:
print(f"第 {page} 页触发限流(429)")
return {"_rate_limited": True}
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
print(f"获取第 {page} 页失败: {e}")
return None
def extract_video_urls(self, data: Dict) -> List[Dict]:
"""
从API响应中提取视频信息
Args:
data: API响应的JSON数据
Returns:
视频信息列表,每个元素包含id, url, 标题等
"""
videos = []
items = data.get('items', [])
for item in items:
video_info = {
'id': item.get('id'),
'url': item.get('url'),
'thumbnail': item.get('thumbnail'),
'duration': item.get('time', 0),
'views': item.get('pv', '0'),
'likes': item.get('favorite', '0'),
'tweet_account': item.get('tweet_account', 'unknown'),
'url_cd': item.get('url_cd', '')
}
videos.append(video_info)
return videos
def get_all_video_urls(self, start_page: int = 1, end_page: Optional[int] = None,
sort: str = "favorite", range_type: str = "monthly",
is_anime_only: int = 0, delay: float = 0.5) -> List[Dict]:
"""
获取所有视频链接
Args:
start_page: 起始页码
end_page: 结束页码,None表示获取全部
sort: 排序方式
range_type: 时间范围 (monthly, weekly, daily, all)
is_anime_only: 是否仅动漫 (0或1)
delay: 请求延迟(秒),避免请求过快
Returns:
所有视频信息列表
"""
all_videos = []
# 先获取第一页,确定总页数
api_max_retry = 6
first_page_data = None
for attempt in range(1, api_max_retry + 1):
first_page_data = self.fetch_page(start_page, sort, range_type, is_anime_only)
if first_page_data and not first_page_data.get("_rate_limited"):
break
if first_page_data and first_page_data.get("_rate_limited"):
waited = self._exp_backoff_sleep(attempt, base=1.5, cap=90.0, jitter_ratio=0.35)
print(f"第 {start_page} 页 429退避等待 {waited:.1f}s 后重试 ({attempt}/{api_max_retry})")
continue
break
if not first_page_data:
print("无法获取第一页数据")
return []
if first_page_data.get("_rate_limited"):
print("第一页持续触发429,已停止本次任务")
return []
total_pages = first_page_data.get('lastPage', 0)
total_videos = first_page_data.get('total', 0)
if end_page:
total_pages = min(total_pages, end_page)
print(f"共发现 {total_pages} 页,总计 {total_videos} 个视频")
# 获取第一页的视频
videos = self.extract_video_urls(first_page_data)
all_videos.extend(videos)
print(f"第 1 页: 获取到 {len(videos)} 个视频")
# 获取剩余页面
page_iter = range(start_page + 1, total_pages + 1)
if tqdm is not None:
page_iter = tqdm(
page_iter,
total=max(0, total_pages - start_page),
desc="获取分页进度",
unit="page"
)
for page in page_iter:
time.sleep(delay) # 礼貌性延迟
data = None
for attempt in range(1, api_max_retry + 1):
data = self.fetch_page(page, sort, range_type, is_anime_only)
if data and not data.get("_rate_limited"):
break
if data and data.get("_rate_limited"):
waited = self._exp_backoff_sleep(attempt, base=1.5, cap=90.0, jitter_ratio=0.35)
print(f"第 {page} 页 429退避等待 {waited:.1f}s 后重试 ({attempt}/{api_max_retry})")
continue
break
if data:
if data.get("_rate_limited"):
print(f"第 {page} 页持续触发429,跳过")
continue
videos = self.extract_video_urls(data)
all_videos.extend(videos)
print(f"第 {page} 页: 获取到 {len(videos)} 个视频 (累计: {len(all_videos)})")
else:
print(f"第 {page} 页获取失败,跳过")
return all_videos
def download_single_video(self, video_info: Dict, retry: int = 3) -> bool:
"""
下载单个视频
Args:
video_info: 视频信息字典
retry: 重试次数
Returns:
下载成功返回True,失败返回False
"""
video_url = video_info.get('url')
if not video_url:
return False
with self._downloaded_urls_lock:
if video_url in self._downloaded_urls:
return True
video_id = video_info.get('id', 'unknown')
account = video_info.get('tweet_account', 'unknown')
# 从URL中提取文件扩展名
if '.mp4' in video_url:
ext = '.mp4'
else:
ext = '.mp4' # 默认
# 生成文件名
filename = f"{video_id}_{account}_{video_info.get('url_cd', '')}{ext}"
# 清理文件名中的非法字符
filename = "".join(c for c in filename if c.isalnum() or c in '._-')
filepath = os.path.join(self.output_dir, filename)
# 如果文件已存在,跳过
if os.path.exists(filepath):
self._mark_url_downloaded(video_url)
return True
for attempt in range(retry):
try:
response = self.session.get(video_url, stream=True, timeout=60)
if response.status_code == 429:
waited = self._exp_backoff_sleep(attempt + 1, base=2.0, cap=120.0, jitter_ratio=0.35)
if tqdm is not None:
tqdm.write(f"下载触发限流(429),等待 {waited:.1f}s 后重试 ({attempt + 1}/{retry})")
else:
print(f"下载触发限流(429),等待 {waited:.1f}s 后重试 ({attempt + 1}/{retry})")
continue
response.raise_for_status()
# 写入文件
with open(filepath, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
self._mark_url_downloaded(video_url)
return True
except Exception as e:
if tqdm is not None:
tqdm.write(f"下载失败 {filename} (尝试 {attempt + 1}/{retry}): {e}")
else:
print(f"下载失败 {filename} (尝试 {attempt + 1}/{retry}): {e}")
if attempt < retry - 1:
time.sleep(2)
return False
def download_all_videos(self, videos: List[Dict], max_workers: int = None):
"""
并发下载所有视频
Args:
videos: 视频信息列表
max_workers: 并发数,默认使用初始化时的值
"""
if not videos:
print("没有视频需要下载")
return
# 任务内按 URL 去重,避免重复提交下载任务
deduped_videos = []
seen_urls = set()
duplicate_count = 0
for v in videos:
url = v.get("url")
if not url:
continue
if url in seen_urls:
duplicate_count += 1
continue
seen_urls.add(url)
deduped_videos.append(v)
workers = max_workers or self.max_workers
print(f"\n开始下载 {len(deduped_videos)} 个视频,并发数: {workers}")
if duplicate_count:
print(f"已跳过任务内重复URL: {duplicate_count} 条")
print("=" * 50)
success_count = 0
fail_count = 0
with ThreadPoolExecutor(max_workers=workers) as executor:
# 提交所有下载任务
future_to_video = {
executor.submit(self.download_single_video, video): video
for video in deduped_videos
}
# 处理完成的任务
if tqdm is not None:
with tqdm(total=len(deduped_videos), desc="下载进度", unit="video", dynamic_ncols=True) as pbar:
for future in as_completed(future_to_video):
video = future_to_video[future]
try:
if future.result():
success_count += 1
else:
fail_count += 1
except Exception as e:
tqdm.write(f"下载出错: {e}")
fail_count += 1
finally:
pbar.update(1)
pbar.set_postfix(success=success_count, fail=fail_count)
else:
done = 0
total = len(deduped_videos)
for future in as_completed(future_to_video):
video = future_to_video[future]
try:
if future.result():
success_count += 1
else:
fail_count += 1
except Exception as e:
print(f"下载出错: {e}")
fail_count += 1
finally:
done += 1
print(f"下载进度: {done}/{total}")
print("=" * 50)
print(f"下载完成!成功: {success_count}, 失败: {fail_count}")
def save_urls_to_file(self, videos: List[Dict], filename: str = "video_urls.txt"):
"""
将视频链接保存到文件
Args:
videos: 视频信息列表
filename: 保存的文件名
"""
with open(filename, 'w', encoding='utf-8') as f:
for video in videos:
f.write(video['url'] + '\n')
print(f"视频链接已保存到 {filename}")
def load_urls_from_file(self, filename: str = "video_urls.txt") -> List[Dict]:
"""
从文件加载视频URL,返回与下载流程兼容的视频信息列表。
"""
if not os.path.exists(filename):
return []
videos = []
seen = set()
with open(filename, 'r', encoding='utf-8') as f:
for line in f:
url = line.strip()
if not url:
continue
if url in seen:
continue
seen.add(url)
# 用 URL 最后一段构造基础元信息,便于生成文件名
tail = url.split('/')[-1].split('?')[0]
url_cd = tail.replace('.mp4', '')
videos.append({
'id': f'fromfile_{len(videos)+1}',
'url': url,
'thumbnail': '',
'duration': 0,
'views': 'N/A',
'likes': 'N/A',
'tweet_account': 'from_file',
'url_cd': url_cd
})
print(f"已从文件加载 {len(videos)} 条URL: {filename}")
return videos
def get_statistics(self, videos: List[Dict]):
"""获取视频统计信息"""
if not videos:
print("无数据")
return
total_duration = sum(v.get('duration', 0) for v in videos)
total_hours = total_duration / 3600
print("\n统计信息:")
print(f" 总视频数: {len(videos)}")
print(f" 总时长: {total_duration} 秒 ({total_hours:.1f} 小时)")
# 统计各账号视频数量
account_count = {}
for v in videos:
account = v.get('tweet_account', 'unknown')
account_count[account] = account_count.get(account, 0) + 1
top_accounts = sorted(account_count.items(), key=lambda x: x[1], reverse=True)[:10]
print(f" 前10个活跃账号:")
for account, count in top_accounts:
print(f" {account}: {count} 个视频")
def main():
parser = argparse.ArgumentParser(description='Twitter排行榜视频批量下载工具')
parser.add_argument('--api-url', type=str, default='https://truvaze.com/api/media?range=monthly&page=1&per_page=50&category=&ids=&isAnimeOnly=0&sort=favorite',
help='API地址,支持完整URL+查询参数')
parser.add_argument('--proxy', type=str, default='',
help='HTTP/HTTPS代理,例如: http://127.0.0.1:7890;不传则使用系统代理')
parser.add_argument('-s', '--start-page', type=int, default=1,
help='起始页码 (默认: 1)')
parser.add_argument('-e', '--end-page', type=int, default=None,
help='结束页码 (默认: 获取全部)')
parser.add_argument('-p', '--per-page', type=int, default=50,
help='每页数量 (默认: 50)')
parser.add_argument('-o', '--output', type=str, default='downloads',
help='输出目录 (默认: downloads)')
parser.add_argument('-w', '--workers', type=int, default=5,
help='并发下载线程数 (默认: 5)')
parser.add_argument('--sort', type=str, default='favorite',
choices=['favorite', 'favorit', 'pv', 'recent'],
help='排序方式: favorite(收藏), pv(播放), recent(最新)')
parser.add_argument('--range', dest='range_type', type=str, default='monthly',
choices=['monthly', 'weekly', 'daily', 'all', ''],
help='时间范围 (默认: monthly)')
parser.add_argument('--anime-only', action='store_true',
help='仅下载动漫内容')
parser.add_argument('--no-download', action='store_true',
help='仅获取链接,不下载')
parser.add_argument('--delay', type=float, default=0.5,
help='API请求延迟(秒) (默认: 0.5)')
parser.add_argument('--url-file', type=str, default='video_urls.txt',
help='URL文件名 (默认: video_urls.txt)')
parser.add_argument('--refresh-urls', action='store_true',
help='强制重新抓取URL并覆盖URL文件(即使文件已存在)')
args = parser.parse_args()
# 创建下载器
downloader = TwitterVideoDownloader(
base_url=args.api_url,
per_page=args.per_page,
output_dir=args.output,
max_workers=args.workers,
proxy=args.proxy or None
)
url_file = os.path.join(os.path.dirname(__file__), args.url_file)
videos = []
# 优先使用本地 URL 文件:存在则跳过抓取,直接下载
if os.path.exists(url_file) and not args.refresh_urls:
print(f"检测到已存在URL文件,跳过抓取: {url_file}")
videos = downloader.load_urls_from_file(url_file)
else:
if args.refresh_urls and os.path.exists(url_file):
print(f"已开启强制刷新,将重新抓取并覆盖: {url_file}")
print("未检测到URL文件,开始获取视频列表...")
print("-" * 50)
normalized_sort = 'favorite' if args.sort == 'favorit' else args.sort
videos = downloader.get_all_video_urls(
start_page=args.start_page,
end_page=args.end_page,
sort=normalized_sort,
range_type=args.range_type,
is_anime_only=1 if args.anime_only else 0,
delay=args.delay
)
if not videos:
print("未获取到任何视频")
return
# 显示统计信息(仅 API 抓取模式)
downloader.get_statistics(videos)
# 保存链接到文件
downloader.save_urls_to_file(videos, url_file)
if not videos:
print("URL文件为空或无有效链接,结束。")
return
# 下载视频
if not args.no_download:
downloader.download_all_videos(videos)
else:
print("\n仅获取链接模式,跳过下载")
print(f"可以使用 yt-dlp -a \"{url_file}\" 来下载")
if __name__ == "__main__":
main()📜 脚本二:download.py — 根据 video_urls.txt 下载视频
import os
import sys
from typing import Optional
from urllib.parse import urlparse
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
try:
from tqdm import tqdm
except ImportError:
tqdm = None
try:
# Windows 下尽量避免中文输出乱码(终端仍可能由 code page 决定)
if hasattr(sys.stdout, "reconfigure"):
sys.stdout.reconfigure(errors="replace")
except Exception:
pass
def _download_with_fallback_progress(
response, f, total: Optional[int], label: str
) -> None:
"""无 tqdm 时按已下载字节与 Content-Length 打印简单进度。"""
downloaded = 0
for chunk in response.iter_content(chunk_size=8192):
if not chunk:
continue
f.write(chunk)
downloaded += len(chunk)
if total and total > 0:
pct = min(100, downloaded * 100 // total)
print(f"\r{label} {pct}% ({downloaded}/{total} B)", end="", flush=True)
if total and total > 0:
print()
def load_urls_from_file(file_path: str) -> list:
"""从文本文件逐行读取 URL,自动过滤空行和重复项。"""
if not os.path.exists(file_path):
raise FileNotFoundError(f"URL 文件不存在: {file_path}")
urls = []
seen = set()
with open(file_path, "r", encoding="utf-8") as f:
for raw in f:
url = raw.strip()
if not url:
continue
if url in seen:
print(f"URL重复,跳过: {url}")
continue
seen.add(url)
urls.append(url)
return urls
def build_proxies(proxy_url: Optional[str] = None) -> Optional[dict]:
"""
构建 requests 代理配置。
优先级:函数参数 > 环境变量 HTTP_PROXY/HTTPS_PROXY。
"""
proxy = proxy_url or os.getenv("HTTPS_PROXY") or os.getenv("HTTP_PROXY")
if not proxy:
return None
return {"http": proxy, "https": proxy}
def download_twitter_video(url, output_dir="./downloads", proxies: Optional[dict] = None):
"""
下载Twitter/X视频
:param url: 完整的视频直链
:param output_dir: 保存目录
"""
# 创建保存目录
os.makedirs(output_dir, exist_ok=True)
# 从URL中提取文件名(使用随机部分+分辨率)
parsed = urlparse(url)
path_parts = parsed.path.split('/')
# 示例:.../480x772/vsPghorwKeBNFQDY.mp4
random_name = path_parts[-1] # vsPghorwKeBNFQDY.mp4
resolution = path_parts[-2] # 480x772
video_id = path_parts[-4] # 1781326937954365441
# 自定义保存文件名
filename = f"{video_id}_{resolution}_{random_name}"
save_path = os.path.join(output_dir, filename)
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
}
# 更稳:复用 Session + 自动重试(含 429/5xx)
session = requests.Session()
retry = Retry(
total=8,
connect=8,
read=8,
status=8,
backoff_factor=0.8,
status_forcelist=(429, 500, 502, 503, 504),
allowed_methods=("GET", "HEAD"),
raise_on_status=False,
respect_retry_after_header=True,
)
adapter = HTTPAdapter(max_retries=retry, pool_connections=16, pool_maxsize=16)
session.mount("https://", adapter)
session.mount("http://", adapter)
if proxies:
session.proxies.update(proxies)
desc = os.path.basename(save_path)
# 断点续传:如果已存在且未下完,则用 Range 接着下
max_attempts = 6
attempt = 0
while attempt < max_attempts:
attempt += 1
try:
existing = 0
try:
existing = os.path.getsize(save_path) if os.path.exists(save_path) else 0
except OSError:
existing = 0
req_headers = dict(headers)
mode = "wb"
if existing > 0:
req_headers["Range"] = f"bytes={existing}-"
mode = "ab"
print(f"正在下载: {url}")
if existing > 0:
print(f"断点续传: {save_path} (已存在 {existing} bytes)")
# timeout: (连接超时, 读超时) —— 读超时给大一点防止中途卡顿
response = session.get(
url,
headers=req_headers,
stream=True,
timeout=(15, 180),
)
if response.status_code == 416 and existing > 0:
# Range 超出:可能已完整,也可能远端大小变了;用 HEAD 校验
head = session.head(url, headers=headers, timeout=(15, 60), allow_redirects=True)
cl_head = head.headers.get("Content-Length")
total_head = int(cl_head) if cl_head and cl_head.isdigit() else None
if total_head is not None and total_head == existing:
print(f"已存在,跳过: {save_path}")
return
try:
os.remove(save_path)
print(f"本地文件疑似不完整,已删除重下: {save_path}")
except OSError:
print(f"本地文件疑似不完整,但删除失败,将覆盖重下: {save_path}")
existing = 0
req_headers = dict(headers)
mode = "wb"
response = session.get(
url,
headers=req_headers,
stream=True,
timeout=(15, 180),
)
response.raise_for_status()
cl = response.headers.get("Content-Length")
total = int(cl) if cl and cl.isdigit() else None
# 206 时 Content-Length 只是剩余部分,完整大小需加上 existing
if response.status_code == 206 and total is not None:
total = existing + total
with open(save_path, mode) as f:
if tqdm is not None:
bar_fmt = "{desc}: {percentage:3.0f}%|{bar}| {n_fmt}/{total_fmt}"
initial = existing if mode == "ab" else 0
with tqdm(
total=total,
initial=initial,
unit="B",
unit_scale=True,
unit_divisor=1024,
desc=desc,
file=sys.stdout,
bar_format=bar_fmt,
) as pbar:
for chunk in response.iter_content(chunk_size=1024 * 256):
if chunk:
f.write(chunk)
pbar.update(len(chunk))
else:
_download_with_fallback_progress(response, f, total, label=desc)
# 校验:有 Content-Length 时确保下载完整,否则下次继续续传
try:
final_size = os.path.getsize(save_path)
except OSError:
final_size = None
if total is not None and final_size is not None and final_size < total:
print(f"下载未完成({final_size}/{total} bytes),稍后继续重试…")
continue
print(f"下载成功: {save_path}")
return
except (requests.exceptions.ReadTimeout, requests.exceptions.ConnectionError) as e:
print(f"网络波动(第 {attempt}/{max_attempts} 次): {e},将继续重试…")
continue
except Exception as e:
print(f"下载失败(第 {attempt}/{max_attempts} 次): {e}")
continue
if __name__ == "__main__":
urls_file = os.path.join(os.path.dirname(__file__), "video_urls.txt")
output_dir = os.path.join(os.path.dirname(__file__), "downloads")
# 这里可直接填你的代理,例如:
# proxy_url = "http://127.0.0.1:7890"
# 不填则自动读取环境变量 HTTP_PROXY / HTTPS_PROXY
proxy_url = ""
proxies = build_proxies(proxy_url=proxy_url or None)
if proxies:
print(f"已启用代理: {proxies.get('https')}")
else:
print("未配置代理,使用直连下载。")
video_urls = load_urls_from_file(urls_file)
print(f"读取到 {len(video_urls)} 条 URL,开始下载...")
for url in video_urls:
download_twitter_video(url, output_dir=output_dir, proxies=proxies)⚙️ 完整使用步骤
# 1. 安装依赖
pip install requests tqdm urllib3
# 2. 获取视频 URL 列表 → 生成 video_urls.txt
python get_urls.py
# 3. 根据 video_urls.txt 批量下载视频
python download.py⚠️ 注意事项
- 请遵守
truvaze.com的服务条款 - 视频版权归原作者所有,请合理使用
- 建议使用代理下载,更稳定
- 下载中途中断可重新运行,支持断点续传
TAGS:
twitter
相关推荐
- 暂无相关推荐,看看别的吧。
0 评论