Custom Metadata Upload Script
This page contains the complete, ready-to-use Python script for uploading custom metadata from CSV files to Visual Layer datasets with comprehensive validation and error handling.
Back to Custom Metadata Upload Documentation
Return to the main custom metadata upload guide for setup instructions, usage examples, and workflow details.
Key Features
- CSV-Based Workflow - Reads metadata from CSV files with
file_fullpathcolumn - Field Type Specification - Supports all six Visual Layer field types via command-line arguments
- Comprehensive Validation - Validates all data before upload with detailed error messages
- Enum Limit Checking - Pre-upload validation for 600 unique values constraint
- Flexible Datetime - Accepts multiple datetime formats and converts to ISO 8601 UTC
- Progress Tracking - Detailed status reporting throughout the workflow
Installation Requirements
Before using this script, install the required Python packages:Copy
Ask AI
# For cloud installations (requires PyJWT for authentication)
pip install requests pandas pyjwt
# For on-premises installations (PyJWT optional)
pip install requests pandas
Usage
Save this script ascustom_metadata_upload_script.py and run:
Copy
Ask AI
# For cloud installations (with authentication)
python custom_metadata_upload_script.py \
--csv metadata.csv \
--dataset-id your-dataset-id \
--base-url https://app.visual-layer.com \
--api-key YOUR-API-KEY \
--api-secret YOUR-API-SECRET \
--string-fields patient_id notes \
--float-fields score confidence \
--datetime-fields created_at updated_at \
--enum-fields status category \
--multi-enum-fields tags labels \
--link-fields pdf_url
# For on-premises installations (no authentication)
python custom_metadata_upload_script.py \
--csv metadata.csv \
--dataset-id your-dataset-id \
--base-url http://localhost:8080 \
--string-fields patient_id notes \
--float-fields score confidence \
--datetime-fields created_at updated_at \
--enum-fields status category \
--multi-enum-fields tags labels \
--link-fields pdf_url
CSV Format
The CSV file must include afile_fullpath column matching Visual Layer export format:
Copy
Ask AI
file_fullpath,patient_id,score,created_at,status,tags,pdf_url
images/img1.jpg,P001,95.5,2024-01-15,active,"tag1, tag2",https://example.com/doc.pdf
images/img2.jpg,P002,87.2,2024-01-16,pending,"tag3, tag4",https://example.com/doc2.pdf
This script is designed for Visual Layer installations (cloud or on-premises). You can modify the validation logic and field detection to suit your specific needs.
Complete Script Code
Copy
Ask AI
#!/usr/bin/env python3
"""
Generic CSV-to-Visual-Layer Metadata Upload Script
Reads a CSV file and uploads metadata to Visual Layer with configurable field types.
Requirements:
pip install requests pandas
Usage:
python custom_metadata_upload_script.py \
--csv metadata.csv \
--dataset-id <dataset-id> \
--base-url http://localhost:8080 \
--string-fields patient_id notes \
--float-fields score confidence \
--datetime-fields created_at updated_at \
--enum-fields status category \
--multi-enum-fields tags labels \
--link-fields pdf_url image_url
Documentation:
https://docs.visual-layer.com/docs/Creating-Datasets/custom-metadata
"""
import json
import requests
import argparse
import sys
import time
import tempfile
import os
import csv
import pandas as pd
import jwt
from typing import Dict, List, Any, Optional, Set
from datetime import datetime, timezone, timedelta
from urllib.parse import urlparse
class ValidationError(Exception):
"""Custom exception for validation errors."""
pass
class CSVMetadataUploader:
"""Upload custom metadata from CSV to Visual Layer with field type validation."""
# Maximum unique values for enum fields per Visual Layer documentation
MAX_ENUM_VALUES = 600
def __init__(self, dataset_id: str, base_url: str,
string_fields: List[str] = None,
float_fields: List[str] = None,
datetime_fields: List[str] = None,
enum_fields: List[str] = None,
multi_enum_fields: List[str] = None,
link_fields: List[str] = None,
batch_size: int = 50000,
api_key: str = None,
api_secret: str = None):
"""
Initialize uploader with field type specifications.
Args:
dataset_id: Visual Layer dataset ID
base_url: Base URL for Visual Layer API
string_fields: List of field names to treat as strings
float_fields: List of field names to treat as floats
datetime_fields: List of field names to treat as datetimes (ISO 8601 UTC)
enum_fields: List of field names to treat as enums (single-select)
multi_enum_fields: List of field names to treat as multi-enums
link_fields: List of field names to treat as links (URLs)
batch_size: Number of records per batch (default: 50000)
api_key: API key for Visual Layer Cloud authentication (optional)
api_secret: API secret for Visual Layer Cloud authentication (optional)
"""
self.dataset_id = dataset_id
self.raw_base_url = base_url.rstrip('/')
self.batch_size = batch_size
# Build field type mappings
self.field_types = {}
for field in (string_fields or []):
self.field_types[field] = 'string'
for field in (float_fields or []):
self.field_types[field] = 'float'
for field in (datetime_fields or []):
self.field_types[field] = 'datetime'
for field in (enum_fields or []):
self.field_types[field] = 'enum'
for field in (multi_enum_fields or []):
self.field_types[field] = 'multi-enum'
for field in (link_fields or []):
self.field_types[field] = 'link'
# Add /api/v1/datasets to base URL if not present
if not base_url.endswith('/api/v1/datasets'):
self.base_url = f"{base_url.rstrip('/')}/api/v1/datasets"
else:
self.base_url = base_url
self.session = requests.Session()
self._temp_files = []
# Add authentication headers if provided
if api_key and api_secret:
jwt_token = self._generate_jwt(api_key, api_secret)
self.session.headers.update({
'Authorization': f'Bearer {jwt_token}'
})
def _generate_jwt(self, api_key: str, api_secret: str) -> str:
"""Generate JWT token for Visual Layer API authentication."""
jwt_algorithm = "HS256"
jwt_header = {
'alg': jwt_algorithm,
'typ': 'JWT',
'kid': api_key,
}
now = datetime.now(tz=timezone.utc)
expiration = now + timedelta(minutes=10)
payload = {
'sub': api_key,
'iat': int(now.timestamp()),
'exp': int(expiration.timestamp()),
'iss': 'sdk'
}
return jwt.encode(payload=payload, key=api_secret, algorithm=jwt_algorithm, headers=jwt_header)
def export_dataset(self) -> Dict[str, str]:
"""Export dataset and return mapping of filename -> media_id."""
print("📤 Exporting dataset to get media_id mappings...")
url = f"{self.raw_base_url}/api/v1/dataset/{self.dataset_id}/export_media_id"
try:
response = self.session.get(url)
if response.status_code == 200:
csv_content = response.text
csv_reader = csv.DictReader(csv_content.splitlines())
mapping = {}
for row in csv_reader:
filename = row.get('filename', '')
media_id = row.get('media_id', '')
if media_id and filename:
mapping[filename] = media_id
print(f" ✅ Exported {len(mapping)} media items")
return mapping
else:
print(f" ❌ Failed to export dataset: {response.status_code} - {response.text}")
return {}
except Exception as e:
print(f" ❌ Export failed: {str(e)}")
return {}
def validate_float(self, value: str, field_name: str, row_num: int) -> float:
"""Validate and convert float value."""
try:
return float(value)
except (ValueError, TypeError):
raise ValidationError(
f"Row {row_num}, field '{field_name}': Invalid float value '{value}'. "
f"Must be a valid number."
)
def validate_datetime(self, value: str, field_name: str, row_num: int) -> str:
"""
Validate datetime format (ISO 8601 UTC).
Accepts formats:
- YYYY-MM-DDTHH:MM:SSZ (e.g., 2024-01-15T10:30:00Z)
- YYYY-MM-DD (will be converted to YYYY-MM-DDT00:00:00Z)
"""
if not value or not value.strip():
raise ValidationError(
f"Row {row_num}, field '{field_name}': Empty datetime value"
)
value = value.strip()
# Try to parse common datetime formats
formats_to_try = [
'%Y-%m-%dT%H:%M:%SZ', # 2024-01-15T10:30:00Z
'%Y-%m-%dT%H:%M:%S', # 2024-01-15T10:30:00
'%Y-%m-%d %H:%M:%S', # 2024-01-15 10:30:00
'%Y-%m-%d', # 2024-01-15
]
for fmt in formats_to_try:
try:
dt = datetime.strptime(value, fmt)
# Always return ISO 8601 UTC format
return dt.strftime('%Y-%m-%dT%H:%M:%SZ')
except ValueError:
continue
raise ValidationError(
f"Row {row_num}, field '{field_name}': Invalid datetime format '{value}'. "
f"Expected ISO 8601 format: YYYY-MM-DDTHH:MM:SSZ or YYYY-MM-DD"
)
def validate_enum(self, value: str, field_name: str, row_num: int) -> str:
"""Validate enum value."""
if not value or not value.strip():
raise ValidationError(
f"Row {row_num}, field '{field_name}': Empty enum value"
)
value = value.strip()
return value
def validate_multi_enum(self, value: str, field_name: str, row_num: int) -> List[str]:
"""
Validate multi-enum value (list of strings).
Accepts formats:
- Comma-separated: "tag1, tag2, tag3"
- JSON array: ["tag1", "tag2", "tag3"]
"""
if not value or not value.strip():
raise ValidationError(
f"Row {row_num}, field '{field_name}': Empty multi-enum value"
)
value = value.strip()
# Try to parse as JSON array first
if value.startswith('[') and value.endswith(']'):
try:
parsed = json.loads(value)
if not isinstance(parsed, list):
raise ValidationError(
f"Row {row_num}, field '{field_name}': Multi-enum must be a list"
)
values = [str(v).strip() for v in parsed]
except json.JSONDecodeError:
raise ValidationError(
f"Row {row_num}, field '{field_name}': Invalid JSON array format"
)
else:
# Parse as comma-separated values
values = [v.strip() for v in value.split(',')]
# Validate each value
for v in values:
if not v:
raise ValidationError(
f"Row {row_num}, field '{field_name}': Empty value in multi-enum list"
)
return values
def validate_link(self, value: str, field_name: str, row_num: int) -> str:
"""Validate URL format for link field."""
if not value or not value.strip():
raise ValidationError(
f"Row {row_num}, field '{field_name}': Empty link value"
)
value = value.strip()
# Parse URL
try:
parsed = urlparse(value)
# Check if it has a scheme (http, https, etc.)
if not parsed.scheme:
raise ValidationError(
f"Row {row_num}, field '{field_name}': Invalid URL '{value}'. "
f"Must include protocol (e.g., http:// or https://)"
)
return value
except Exception:
raise ValidationError(
f"Row {row_num}, field '{field_name}': Invalid URL format '{value}'"
)
def validate_and_convert_value(self, value: str, field_name: str,
field_type: str, row_num: int) -> Any:
"""Validate and convert value based on field type."""
# Skip empty values
if not value or (isinstance(value, str) and not value.strip()):
return None
try:
if field_type == 'float':
return self.validate_float(value, field_name, row_num)
elif field_type == 'datetime':
return self.validate_datetime(value, field_name, row_num)
elif field_type == 'enum':
return self.validate_enum(value, field_name, row_num)
elif field_type == 'multi-enum':
return self.validate_multi_enum(value, field_name, row_num)
elif field_type == 'link':
return self.validate_link(value, field_name, row_num)
elif field_type == 'string':
return str(value).strip()
else:
return str(value).strip()
except ValidationError:
raise
except Exception as e:
raise ValidationError(
f"Row {row_num}, field '{field_name}': Unexpected validation error: {str(e)}"
)
def read_csv(self, csv_path: str, filename_to_media_id: Dict[str, str]) -> Dict[str, Dict[str, Any]]:
"""
Read CSV file and return validated metadata dictionary.
Returns:
Dict mapping file_fullpath -> metadata
"""
print(f"📖 Reading and validating CSV file: {csv_path}")
if not self.field_types:
raise ValueError(
"No field types specified. Please provide at least one field type using "
"--string-fields, --float-fields, --datetime-fields, --enum-fields, "
"--multi-enum-fields, or --link-fields"
)
try:
# Load CSV with pandas for analysis
df = pd.read_csv(csv_path)
print(f" ✅ Loaded {len(df)} rows")
# Check enum/multi-enum fields for unique value count
print(f" 🔍 Checking enum field constraints...")
enum_errors = []
for field_name, field_type in self.field_types.items():
if field_type in ['enum', 'multi-enum']:
if field_name not in df.columns:
enum_errors.append(f"Field '{field_name}' not found in CSV columns")
continue
if field_type == 'enum':
# For enum, count unique values directly
unique_count = df[field_name].nunique()
if unique_count > self.MAX_ENUM_VALUES:
enum_errors.append(
f"Field '{field_name}' (enum): Has {unique_count} unique values, "
f"exceeds limit of {self.MAX_ENUM_VALUES}"
)
else:
print(f" ✅ {field_name} (enum): {unique_count} unique values")
elif field_type == 'multi-enum':
# For multi-enum, collect all unique values across all lists
unique_values = set()
for value in df[field_name].dropna():
if isinstance(value, str):
# Parse the value
if value.strip().startswith('['):
# JSON array format
try:
parsed = json.loads(value.strip())
if isinstance(parsed, list):
unique_values.update(str(v).strip() for v in parsed)
except json.JSONDecodeError:
pass
else:
# Comma-separated format
unique_values.update(v.strip() for v in value.split(',') if v.strip())
unique_count = len(unique_values)
if unique_count > self.MAX_ENUM_VALUES:
enum_errors.append(
f"Field '{field_name}' (multi-enum): Has {unique_count} unique values "
f"across all lists, exceeds limit of {self.MAX_ENUM_VALUES}"
)
else:
print(f" ✅ {field_name} (multi-enum): {unique_count} unique values")
if enum_errors:
print(f"\n ❌ Enum Constraint Errors ({len(enum_errors)} total):")
for error in enum_errors:
print(f" • {error}")
print()
raise ValidationError(
f"Enum fields exceed the {self.MAX_ENUM_VALUES} unique values limit. "
f"Please reduce unique values or use 'string' field type instead."
)
print(f" 🔍 Validating {len(self.field_types)} metadata fields...")
# Print field type configuration
print(f"\n 📋 Field Configuration:")
for field_name, field_type in sorted(self.field_types.items()):
print(f" • {field_name}: {field_type}")
print()
# Convert back to list of dicts for row-by-row processing
rows = df.to_dict('records')
except ValidationError:
raise
except Exception as e:
print(f" ❌ Failed to read CSV: {str(e)}")
return {}
metadata_by_path = {}
skipped_count = 0
validation_errors = []
for row_num, row in enumerate(rows, start=2): # Start at 2 (header is row 1)
# Get file_fullpath for mapping
file_fullpath = row.get('file_fullpath', '')
if not file_fullpath:
skipped_count += 1
validation_errors.append(f"Row {row_num}: Missing 'file_fullpath' column")
continue
# Skip if not in Visual Layer dataset
if file_fullpath not in filename_to_media_id:
skipped_count += 1
continue
# Validate and build metadata dictionary
flattened = {}
for field_name, field_type in self.field_types.items():
value = row.get(field_name, '')
try:
validated_value = self.validate_and_convert_value(
value, field_name, field_type, row_num
)
# Only add non-None values
if validated_value is not None:
flattened[field_name] = validated_value
except ValidationError as e:
validation_errors.append(str(e))
# Stop at first validation error for this row
break
# Only add if we have at least one metadata field
if flattened:
metadata_by_path[file_fullpath] = flattened
# Report validation errors
if validation_errors:
print(f"\n ❌ Validation Errors ({len(validation_errors)} total):")
# Show first 10 errors
for error in validation_errors[:10]:
print(f" • {error}")
if len(validation_errors) > 10:
print(f" • ... and {len(validation_errors) - 10} more errors")
print()
raise ValidationError(f"CSV validation failed with {len(validation_errors)} errors")
if skipped_count > 0:
print(f" ⚠️ Skipped {skipped_count} rows not in Visual Layer dataset")
print(f" ✅ Validated {len(metadata_by_path)} rows successfully\n")
return metadata_by_path
def create_custom_field(self, field_name: str, field_type: str,
metadata_by_path: Dict[str, Dict[str, Any]]) -> Optional[str]:
"""Create a custom field and return field_id (task_id)."""
print(f" 🔧 Creating field: {field_name} ({field_type})")
field_data = {
"field_name": field_name,
"field_type": field_type
}
# For enum/multi-enum fields, collect unique values from the data
if field_type in ['enum', 'multi-enum']:
unique_values = set()
for metadata in metadata_by_path.values():
if field_name in metadata:
value = metadata[field_name]
if field_type == 'multi-enum' and isinstance(value, list):
unique_values.update(value)
elif value and isinstance(value, str):
unique_values.add(value)
enum_options = sorted(list(unique_values))
field_data["enum_options"] = enum_options
if field_type == 'multi-enum':
field_data["field_type"] = 'enum' # API expects 'enum' for both
field_data["is_multi"] = True
else:
field_data["is_multi"] = False
print(f" 📋 Enum options ({len(enum_options)}): {enum_options[:10]}{' ...' if len(enum_options) > 10 else ''}")
url = f"{self.base_url}/{self.dataset_id}/custom_metadata/tasks"
try:
response = self.session.post(url, json=field_data)
if response.status_code == 200:
result = response.json()
task_id = result.get('task_id')
print(f" ✅ Created (task_id: {task_id})")
return task_id
elif "already exists" in response.text:
print(f" 🔄 Field already exists, skipping")
return None
else:
print(f" ❌ Failed: {response.status_code} - {response.text}")
return None
except Exception as e:
print(f" ❌ Request failed: {str(e)}")
return None
def upload_field_data(self, field_id: str, field_name: str, field_type: str,
metadata_by_path: Dict[str, Dict[str, Any]],
filename_to_media_id: Dict[str, str]) -> Optional[str]:
"""Upload data for a custom field in a single request."""
upload_data = []
for file_fullpath, metadata in metadata_by_path.items():
media_id = filename_to_media_id.get(file_fullpath)
if not media_id or field_name not in metadata:
continue
value = metadata[field_name]
if value is None or value == '':
continue
upload_data.append({
"media_id": media_id,
"value": value
})
if not upload_data:
print(f" ⚠️ No data to upload")
return None
print(f" 📤 Uploading {len(upload_data)} values")
url = f"{self.base_url}/{self.dataset_id}/custom_metadata/tasks/{field_id}"
# Save data to temp file
with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f:
json.dump(upload_data, f, indent=2)
temp_file = f.name
self._temp_files.append(temp_file)
# Upload
try:
with open(temp_file, 'rb') as f:
files = {'file': ('metadata.json', f, 'application/json')}
response = self.session.post(url, files=files)
if response.status_code not in [200, 202]:
print(f" ❌ Upload failed: {response.status_code} - {response.text}")
return None
print(f" ✅ Upload completed")
return field_id
except Exception as e:
print(f" ❌ Upload failed: {str(e)}")
return None
def wait_for_task_completion(self, task_id: str, field_name: str,
polling_interval: int = 5) -> str:
"""Wait for task completion by polling status endpoint."""
url = f"{self.base_url}/{self.dataset_id}/custom_metadata/tasks/{task_id}/status"
while True:
try:
response = self.session.get(url)
if response.status_code == 200:
result = response.json()
status = result.get('status', 'unknown')
if status == 'COMPLETED':
inserted_rows = result.get('inserted_rows', 0)
print(f" ✅ Completed ({inserted_rows} rows)")
return 'COMPLETED'
elif status == 'COMPLETED_WITH_ERRORS':
error_count = result.get('error_count', 0)
inserted_rows = result.get('inserted_rows', 0)
print(f" ⚠️ Completed with {error_count} errors ({inserted_rows} rows)")
return 'COMPLETED_WITH_ERRORS'
elif status == 'IN_PROGRESS':
time.sleep(polling_interval)
continue
else:
time.sleep(polling_interval)
continue
else:
print(f" ❌ Status check failed")
return 'error'
except Exception:
return 'error'
def cleanup_temp_files(self):
"""Remove temporary files."""
for temp_file in self._temp_files:
try:
if os.path.exists(temp_file):
os.remove(temp_file)
except OSError:
pass
def process_workflow(self, csv_path: str):
"""Main workflow: export → read CSV → validate → create fields → upload data."""
try:
print("=" * 70)
print("🚀 Generic CSV Metadata Upload to Visual Layer")
print("=" * 70)
print(f"📁 CSV File: {csv_path}")
print(f"🎯 Dataset ID: {self.dataset_id}")
print(f"🌐 Base URL: {self.raw_base_url}")
print("=" * 70)
print()
# Step 1: Export dataset to get media_id mappings
filename_to_media_id = self.export_dataset()
if not filename_to_media_id:
raise Exception("Failed to export dataset")
print()
# Step 2: Read CSV, validate, and build metadata
metadata_by_path = self.read_csv(csv_path, filename_to_media_id)
if not metadata_by_path:
raise Exception("No valid rows found in CSV after validation")
# Step 3: Process each field
print(f"🔧 Creating and uploading {len(self.field_types)} custom fields...")
print()
completed_count = 0
failed_fields = []
for field_name, field_type in self.field_types.items():
print(f"📝 Processing: {field_name} ({field_type})")
try:
# Create field
field_id = self.create_custom_field(field_name, field_type, metadata_by_path)
if not field_id:
print(f" ⏭️ Skipping (field exists or creation failed)\n")
continue
# Upload data
task_id = self.upload_field_data(field_id, field_name, field_type,
metadata_by_path, filename_to_media_id)
if not task_id:
failed_fields.append(field_name)
print(f" ❌ Upload failed\n")
continue
# Wait for completion
status = self.wait_for_task_completion(task_id, field_name)
if status in ['COMPLETED', 'COMPLETED_WITH_ERRORS']:
completed_count += 1
else:
failed_fields.append(field_name)
print() # Blank line between fields
except Exception as e:
print(f" ❌ Error: {str(e)}\n")
failed_fields.append(field_name)
continue
# Final summary
print("=" * 70)
print("🎉 Workflow Completed!")
print("=" * 70)
print(f"✅ Successfully uploaded: {completed_count}/{len(self.field_types)} fields")
print(f"📊 Total rows processed: {len(metadata_by_path)}")
if failed_fields:
print(f"❌ Failed fields ({len(failed_fields)}): {', '.join(failed_fields)}")
print("=" * 70)
except ValidationError as e:
print(f"\n❌ Validation failed: {str(e)}")
sys.exit(1)
except Exception as e:
print(f"\n❌ Workflow failed: {str(e)}")
sys.exit(1)
finally:
self.cleanup_temp_files()
def main():
parser = argparse.ArgumentParser(
description='Generic CSV metadata upload to Visual Layer with field type validation',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Example usage:
# Basic upload with different field types
python custom_metadata_upload_script.py \\
--csv metadata.csv \\
--dataset-id abc-123 \\
--base-url http://localhost:8080 \\
--string-fields patient_id notes description \\
--float-fields score confidence temperature \\
--datetime-fields created_at updated_at \\
--enum-fields status category priority \\
--multi-enum-fields tags labels keywords \\
--link-fields pdf_url documentation_link
# Simple upload with just strings and enums
python custom_metadata_upload_script.py \\
--csv data.csv \\
--dataset-id abc-123 \\
--base-url http://localhost:8080 \\
--string-fields name description \\
--enum-fields category status
CSV Format Requirements:
- Must have a 'file_fullpath' column matching Visual Layer export format
- File paths should match those returned by the dataset export API
- Datetime values: YYYY-MM-DDTHH:MM:SSZ or YYYY-MM-DD
- Float values: Valid decimal numbers
- Enum values: Max 20 characters each
- Multi-enum values: Comma-separated or JSON array ["val1", "val2"]
- Link values: Must include protocol (http:// or https://)
Field Type Documentation:
https://docs.visual-layer.com/docs/Creating-Datasets/custom-metadata
"""
)
parser.add_argument('--csv', required=True,
help='Path to CSV file with metadata')
parser.add_argument('--dataset-id', required=True,
help='Visual Layer dataset ID')
parser.add_argument('--base-url', default='http://localhost:8080',
help='Base URL (default: http://localhost:8080)')
parser.add_argument('--batch-size', type=int, default=50000,
help='Records per upload batch (default: 50000)')
parser.add_argument('--api-key', default=None,
help='API key for Visual Layer Cloud authentication')
parser.add_argument('--api-secret', default=None,
help='API secret for Visual Layer Cloud authentication')
# Field type specifications
parser.add_argument('--string-fields', nargs='*', default=[],
help='List of fields to treat as strings')
parser.add_argument('--float-fields', nargs='*', default=[],
help='List of fields to treat as floats (decimal numbers)')
parser.add_argument('--datetime-fields', nargs='*', default=[],
help='List of fields to treat as datetimes (ISO 8601 UTC format)')
parser.add_argument('--enum-fields', nargs='*', default=[],
help='List of fields to treat as enums (single-select, max 20 chars)')
parser.add_argument('--multi-enum-fields', nargs='*', default=[],
help='List of fields to treat as multi-enums (multi-select, max 20 chars each)')
parser.add_argument('--link-fields', nargs='*', default=[],
help='List of fields to treat as links (URLs)')
args = parser.parse_args()
# Validate that at least one field type is specified
if not any([args.string_fields, args.float_fields, args.datetime_fields,
args.enum_fields, args.multi_enum_fields, args.link_fields]):
parser.error(
"At least one field type must be specified. Use --string-fields, --float-fields, "
"--datetime-fields, --enum-fields, --multi-enum-fields, or --link-fields"
)
# Create uploader and run workflow
uploader = CSVMetadataUploader(
dataset_id=args.dataset_id,
base_url=args.base_url,
string_fields=args.string_fields,
float_fields=args.float_fields,
datetime_fields=args.datetime_fields,
enum_fields=args.enum_fields,
multi_enum_fields=args.multi_enum_fields,
link_fields=args.link_fields,
batch_size=args.batch_size,
api_key=args.api_key,
api_secret=args.api_secret
)
uploader.process_workflow(args.csv)
if __name__ == "__main__":
main()