Skip to content

Query and Save Sora Videos (Python)

Overview of this page

Sora Video Query and Save (Asynchronous Task) Python Example;

Section titled “Sora Video Query and Save (Asynchronous Task) Python Example;”
import requests
import json
import time
import os # <--- Import the os library for file and folder operations
# --- 1. Enter the API Key you obtained from 4All API ---
API_KEY = "sk-JKzyxxxxxxxxxxxxxxxxxxxxxxxxxx"
# --- 2. Enter your API host address ("proxy address") ---
API_HOST = "sg.4All API.com"
# --- 3. Enter the Task ID you received when you submitted the task last time ---
TASK_ID = "video_8087fea9-6e80-435b-b96b-6e47f4071673" # (remember to replace it with a new one next time)
# --- 4. Set the polling interval (seconds) ---
POLL_INTERVAL = 10
# --- 5. Set the folder where videos will be saved ---
# (This folder will be created automatically in the same directory as the script)
SAVE_FOLDER = "video_downloads"
# Construct the API URL and request headers
API_URL = f"https://{API_HOST}/v1/videos/{TASK_ID}"
headers = {
'Authorization': f'Bearer {API_KEY}'
}
# --- Function to automatically download the video ---
def download_video(video_url_path, save_path):
"""
Download and save a video based on a relative path.
video_url_path: The relative path returned by the API (for example, /v1/videos/...)
save_path: The full local path where the file will be saved (for example, video_downloads/task_id.mp4)
"""
# 1. Construct the full download URL
full_download_url = f"https://{API_HOST}{video_url_path}"
print(f"\nDownloading video from: {full_download_url}")
try:
# 2. Send the download request (use stream=True to handle large files and avoid memory overflow)
# Note: downloading files usually also requires the same Authorization header
with requests.get(full_download_url, headers=headers, stream=True) as r:
r.raise_for_status() # Check whether the download request was successful (not 404, 401, etc.)
# 3. Make sure the save folder exists
# os.path.dirname(save_path) gets "video_downloads"
os.makedirs(os.path.dirname(save_path), exist_ok=True)
# 4. Open the file in binary write ('wb') mode and write in chunks
with open(save_path, 'wb') as f:
for chunk in r.iter_content(chunk_size=8192): # 8KB
f.write(chunk)
print(f"--- Video saved successfully to: {save_path} ---")
except requests.exceptions.RequestException as e:
print(f"!!! Failed to download video: {e} !!!")
# --- End of function ---
print(f"Starting to poll task: {TASK_ID}")
print(f"Query URL: {API_URL}\n")
try:
while True:
response = requests.get(API_URL, headers=headers)
# Handle temporary server errors (for example, 502, 503)
if response.status_code >= 500:
print(f"Received server error {response.status_code}, retrying in {POLL_INTERVAL} seconds...")
time.sleep(POLL_INTERVAL)
continue
# Handle other client errors (for example, 401, 404, 400)
response.raise_for_status()
data = response.json()
status = data.get("status")
progress = data.get("progress", 0)
print(f"[{time.strftime('%H:%M:%S')}] Status: {status} | Progress: {progress}%")
# Check whether the task succeeded
if status == "completed" or status == "succeeded":
print("\n--- Task succeeded! ---")
pretty_json = json.dumps(data, indent=2, ensure_ascii=False)
print(pretty_json)
# --- Added automatic download logic ---
video_relative_url = data.get("url")
if video_relative_url:
# Construct the saved file name (we assume it is mp4)
file_name = f"{TASK_ID}.mp4"
save_file_path = os.path.join(SAVE_FOLDER, file_name)
# Call the download function
download_video(video_relative_url, save_file_path)
else:
print("!!! The task has completed, but the response did not contain the 'url' field, so the video cannot be downloaded.")
# --- End of download logic ---
break # Exit the while loop
# Check whether the task failed
elif status == "failed":
print("\n--- Task failed! ---")
pretty_json = json.dumps(data, indent=2, ensure_ascii=False)
print(pretty_json)
break # Exit the while loop
# If the task is still in progress, wait a while
time.sleep(POLL_INTERVAL)
except requests.exceptions.HTTPError as http_err:
print(f"\nHTTP error: {http_err}")
print(f"Response content: {response.text}")
except requests.exceptions.RequestException as err:
print(f"\nRequest error: {err}")
except KeyboardInterrupt:
print("\nThe user stopped polling.")