Query and Save Sora Videos (Python)
Sora Video Query and Save (Python)
Section titled “Sora Video Query and Save (Python)”Overview of this page
Sora Video Query and Save (Asynchronous Task) Python Example;
Section titled “Sora Video Query and Save (Asynchronous Task) Python Example;”import requestsimport jsonimport timeimport os # <--- Import the os library for file and folder operations
# --- 1. Enter the API Key you obtained from 4All API ---API_KEY = "sk-JKzyxxxxxxxxxxxxxxxxxxxxxxxxxx"
# --- 2. Enter your API host address ("proxy address") ---API_HOST = "sg.4All API.com"
# --- 3. Enter the Task ID you received when you submitted the task last time ---TASK_ID = "video_8087fea9-6e80-435b-b96b-6e47f4071673" # (remember to replace it with a new one next time)
# --- 4. Set the polling interval (seconds) ---POLL_INTERVAL = 10
# --- 5. Set the folder where videos will be saved ---# (This folder will be created automatically in the same directory as the script)SAVE_FOLDER = "video_downloads"
# Construct the API URL and request headersAPI_URL = f"https://{API_HOST}/v1/videos/{TASK_ID}"headers = { 'Authorization': f'Bearer {API_KEY}'}
# --- Function to automatically download the video ---def download_video(video_url_path, save_path): """ Download and save a video based on a relative path. video_url_path: The relative path returned by the API (for example, /v1/videos/...) save_path: The full local path where the file will be saved (for example, video_downloads/task_id.mp4) """ # 1. Construct the full download URL full_download_url = f"https://{API_HOST}{video_url_path}"
print(f"\nDownloading video from: {full_download_url}")
try: # 2. Send the download request (use stream=True to handle large files and avoid memory overflow) # Note: downloading files usually also requires the same Authorization header with requests.get(full_download_url, headers=headers, stream=True) as r: r.raise_for_status() # Check whether the download request was successful (not 404, 401, etc.)
# 3. Make sure the save folder exists # os.path.dirname(save_path) gets "video_downloads" os.makedirs(os.path.dirname(save_path), exist_ok=True)
# 4. Open the file in binary write ('wb') mode and write in chunks with open(save_path, 'wb') as f: for chunk in r.iter_content(chunk_size=8192): # 8KB f.write(chunk)
print(f"--- Video saved successfully to: {save_path} ---")
except requests.exceptions.RequestException as e: print(f"!!! Failed to download video: {e} !!!")# --- End of function ---
print(f"Starting to poll task: {TASK_ID}")print(f"Query URL: {API_URL}\n")
try: while True: response = requests.get(API_URL, headers=headers)
# Handle temporary server errors (for example, 502, 503) if response.status_code >= 500: print(f"Received server error {response.status_code}, retrying in {POLL_INTERVAL} seconds...") time.sleep(POLL_INTERVAL) continue
# Handle other client errors (for example, 401, 404, 400) response.raise_for_status()
data = response.json()
status = data.get("status") progress = data.get("progress", 0)
print(f"[{time.strftime('%H:%M:%S')}] Status: {status} | Progress: {progress}%")
# Check whether the task succeeded if status == "completed" or status == "succeeded": print("\n--- Task succeeded! ---") pretty_json = json.dumps(data, indent=2, ensure_ascii=False) print(pretty_json)
# --- Added automatic download logic --- video_relative_url = data.get("url") if video_relative_url: # Construct the saved file name (we assume it is mp4) file_name = f"{TASK_ID}.mp4" save_file_path = os.path.join(SAVE_FOLDER, file_name)
# Call the download function download_video(video_relative_url, save_file_path) else: print("!!! The task has completed, but the response did not contain the 'url' field, so the video cannot be downloaded.") # --- End of download logic ---
break # Exit the while loop
# Check whether the task failed elif status == "failed": print("\n--- Task failed! ---") pretty_json = json.dumps(data, indent=2, ensure_ascii=False) print(pretty_json) break # Exit the while loop
# If the task is still in progress, wait a while time.sleep(POLL_INTERVAL)
except requests.exceptions.HTTPError as http_err: print(f"\nHTTP error: {http_err}") print(f"Response content: {response.text}")except requests.exceptions.RequestException as err: print(f"\nRequest error: {err}")except KeyboardInterrupt: print("\nThe user stopped polling.")