import time
from typing import Any, Dict
import requests
class DatasetExporter:
def __init__(self, base_url: str):
self.base_url = base_url.rstrip("/")
self.headers = {"Accept": "application/json, text/plain, */*"}
def initiate_export(
self,
dataset_id: str,
file_name: str,
export_format: str = "json",
include_images: bool = False,
) -> Dict[str, Any]:
"""
Initiate an export of a dataset.
Args:
dataset_id: The ID of the dataset to export
file_name: Name of the export file
export_format: Format of the export (default: 'json')
include_images: Whether to include images in export (default: False)
Returns:
Dict containing the export task information
"""
url = f"{self.base_url}/api/v1/dataset/{dataset_id}/export_context_async"
params = {
"file_name": file_name,
"export_format": export_format,
"include_images": str(include_images).lower(),
}
response = requests.get(url, headers=self.headers, params=params)
response.raise_for_status()
return response.json()
def check_export_status(
self, dataset_id: str, export_task_id: str
) -> Dict[str, Any]:
"""
Check the status of an export task.
Args:
dataset_id: The ID of the dataset
export_task_id: The ID of the export task to check
Returns:
Dict containing the status information
"""
url = f"{self.base_url}/api/v1/dataset/{dataset_id}/export_status"
params = {"export_task_id": export_task_id}
response = requests.get(url, headers=self.headers, params=params)
response.raise_for_status()
return response.json()
def wait_for_export(
self,
dataset_id: str,
export_task_id: str,
check_interval: int = 5,
timeout: int = 300,
) -> Dict[str, Any]:
"""
Wait for an export task to complete.
Args:
dataset_id: The ID of the dataset
export_task_id: The ID of the export task to check
check_interval: Time in seconds between status checks (default: 5)
timeout: Maximum time to wait in seconds (default: 300)
Returns:
Dict containing the final status information
Raises:
TimeoutError: If the export doesn't complete within the timeout period
"""
start_time = time.time()
while True:
status = self.check_export_status(dataset_id, export_task_id)
if status["status"] == "COMPLETED":
return status
if status["status"] == "FAILED":
raise Exception(f"Export failed: {status.get('result_message')}")
if time.time() - start_time > timeout:
raise TimeoutError("Export timed out")
time.sleep(check_interval)
def download_export(self, download_uri: str, output_path: str) -> None:
"""
Download the exported file from the given URI.
Args:
download_uri: The URI path from the export status
output_path: Local path where the file should be saved
Returns:
None
"""
url = f"{self.base_url}{download_uri}"
response = requests.get(url, headers=self.headers, stream=True)
response.raise_for_status()
with open(output_path, "wb") as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)