Sora查询视频及保存(Python)
Sora查询视频及保存(Python)
Section titled “Sora查询视频及保存(Python)”本页总览
Sora查询视频及保存(异步任务)Python示例;
Section titled “Sora查询视频及保存(异步任务)Python示例;”import requestsimport jsonimport timeimport os # <--- 导入 os 库用于操作文件和文件夹
# --- 1. 填入你在4All API获取的 API Key ---API_KEY = "sk-JKzyxxxxxxxxxxxxxxxxxxxxxxxxxx"
# --- 2. 填入你的 API 主机地址 ("代理地址") ---API_HOST = "sg.4All API.com"
# --- 3. 填入你上次提交任务时获得的 任务ID ---TASK_ID = "video_8087fea9-6e80-435b-b96b-6e47f4071673" # (下次运行时记得换成新的)
# --- 4. 设置轮询间隔 (秒) ---POLL_INTERVAL = 10
# --- 5. 设置视频保存的文件夹 ---# (这个文件夹会自动创建在脚本运行的同一目录下)SAVE_FOLDER = "video_downloads"
# 构造 API URL 和请求头API_URL = f"https://{API_HOST}/v1/videos/{TASK_ID}"headers = { 'Authorization': f'Bearer {API_KEY}'}
# --- 自动下载视频的函数 ---def download_video(video_url_path, save_path): """ 根据相对路径下载视频并保存。 video_url_path: API 返回的相对路径 (例如 /v1/videos/...) save_path: 本地保存的完整路径 (例如 video_downloads/task_id.mp4) """ # 1. 构造完整的下载 URL full_download_url = f"https://{API_HOST}{video_url_path}"
print(f"\n正在下载视频从: {full_download_url}")
try: # 2. 发送下载请求 (使用 stream=True 来处理大文件,防止内存溢出) # 注意:下载文件通常也需要同样的 Authorization 头部 with requests.get(full_download_url, headers=headers, stream=True) as r: r.raise_for_status() # 检查下载请求是否成功 (不是 404, 401 等)
# 3. 确保保存文件夹存在 # os.path.dirname(save_path) 会获取 "video_downloads" os.makedirs(os.path.dirname(save_path), exist_ok=True)
# 4. 以二进制写入('wb')模式打开文件,并分块写入 with open(save_path, 'wb') as f: for chunk in r.iter_content(chunk_size=8192): # 8KB f.write(chunk)
print(f"--- 视频已成功保存到: {save_path} ---")
except requests.exceptions.RequestException as e: print(f"!!! 下载视频失败: {e} !!!")# --- 函数结束 ---
print(f"开始轮询任务: {TASK_ID}")print(f"查询地址: {API_URL}\n")
try: while True: response = requests.get(API_URL, headers=headers)
# 处理临时的服务器错误 (例如 502, 503) if response.status_code >= 500: print(f"收到服务器错误 {response.status_code},{POLL_INTERVAL}秒后重试...") time.sleep(POLL_INTERVAL) continue
# 处理其他客户端错误 (例如 401, 404, 400) response.raise_for_status()
data = response.json()
status = data.get("status") progress = data.get("progress", 0)
print(f"[{time.strftime('%H:%M:%S')}] 状态: {status} | 进度: {progress}%")
# 检查任务是否成功 if status == "completed" or status == "succeeded": print("\n--- 任务成功! ---") pretty_json = json.dumps(data, indent=2, ensure_ascii=False) print(pretty_json)
# --- 新增的自动下载逻辑 --- video_relative_url = data.get("url") if video_relative_url: # 构造保存文件名 (我们假设它是 mp4) file_name = f"{TASK_ID}.mp4" save_file_path = os.path.join(SAVE_FOLDER, file_name)
# 调用下载函数 download_video(video_relative_url, save_file_path) else: print("!!! 任务已完成,但响应中未找到 'url' 字段,无法下载视频。") # --- 下载逻辑结束 ---
break # 退出 while 循环
# 检查任务是否失败 elif status == "failed": print("\n--- 任务失败! ---") pretty_json = json.dumps(data, indent=2, ensure_ascii=False) print(pretty_json) break # 退出 while 循环
# 如果任务还在进行中,等待一段时间 time.sleep(POLL_INTERVAL)
except requests.exceptions.HTTPError as http_err: print(f"\nHTTP 错误: {http_err}") print(f"响应内容: {response.text}")except requests.exceptions.RequestException as err: print(f"\n请求发生错误: {err}")except KeyboardInterrupt: print("\n用户停止了轮询。")