Skip to main content

Custom Metadata Upload Script

This page contains the complete, ready-to-use Python script for uploading custom metadata from CSV files to Visual Layer datasets with comprehensive validation and error handling.
This script demonstrates how to upload custom metadata to Visual Layer from CSV files. The script handles the complete workflow: exports dataset mappings, validates CSV data, creates metadata fields, uploads values, and monitors progress.

Back to Custom Metadata Upload Documentation

Return to the main custom metadata upload guide for setup instructions, usage examples, and workflow details.

Key Features

  • CSV-Based Workflow - Reads metadata from CSV files with file_fullpath column
  • Field Type Specification - Supports all six Visual Layer field types via command-line arguments
  • Comprehensive Validation - Validates all data before upload with detailed error messages
  • Enum Limit Checking - Pre-upload validation for 600 unique values constraint
  • Flexible Datetime - Accepts multiple datetime formats and converts to ISO 8601 UTC
  • Progress Tracking - Detailed status reporting throughout the workflow

Installation Requirements

Before using this script, install the required Python packages:
# For cloud installations (requires PyJWT for authentication)
pip install requests pandas pyjwt

# For on-premises installations (PyJWT optional)
pip install requests pandas

Usage

Save this script as custom_metadata_upload_script.py and run:
# For cloud installations (with authentication)
python custom_metadata_upload_script.py \
  --csv metadata.csv \
  --dataset-id your-dataset-id \
  --base-url https://app.visual-layer.com \
  --api-key YOUR-API-KEY \
  --api-secret YOUR-API-SECRET \
  --string-fields patient_id notes \
  --float-fields score confidence \
  --datetime-fields created_at updated_at \
  --enum-fields status category \
  --multi-enum-fields tags labels \
  --link-fields pdf_url

# For on-premises installations (no authentication)
python custom_metadata_upload_script.py \
  --csv metadata.csv \
  --dataset-id your-dataset-id \
  --base-url http://localhost:8080 \
  --string-fields patient_id notes \
  --float-fields score confidence \
  --datetime-fields created_at updated_at \
  --enum-fields status category \
  --multi-enum-fields tags labels \
  --link-fields pdf_url

CSV Format

The CSV file must include a file_fullpath column matching Visual Layer export format:
file_fullpath,patient_id,score,created_at,status,tags,pdf_url
images/img1.jpg,P001,95.5,2024-01-15,active,"tag1, tag2",https://example.com/doc.pdf
images/img2.jpg,P002,87.2,2024-01-16,pending,"tag3, tag4",https://example.com/doc2.pdf
This script is designed for Visual Layer installations (cloud or on-premises). You can modify the validation logic and field detection to suit your specific needs.

Complete Script Code

#!/usr/bin/env python3
"""
Generic CSV-to-Visual-Layer Metadata Upload Script
Reads a CSV file and uploads metadata to Visual Layer with configurable field types.

Requirements:
    pip install requests pandas

Usage:
    python custom_metadata_upload_script.py \
        --csv metadata.csv \
        --dataset-id <dataset-id> \
        --base-url http://localhost:8080 \
        --string-fields patient_id notes \
        --float-fields score confidence \
        --datetime-fields created_at updated_at \
        --enum-fields status category \
        --multi-enum-fields tags labels \
        --link-fields pdf_url image_url

Documentation:
    https://docs.visual-layer.com/docs/Creating-Datasets/custom-metadata
"""

import json
import requests
import argparse
import sys
import time
import tempfile
import os
import csv
import pandas as pd
import jwt
from typing import Dict, List, Any, Optional, Set
from datetime import datetime, timezone, timedelta
from urllib.parse import urlparse


class ValidationError(Exception):
    """Custom exception for validation errors."""
    pass


class CSVMetadataUploader:
    """Upload custom metadata from CSV to Visual Layer with field type validation."""

    # Maximum unique values for enum fields per Visual Layer documentation
    MAX_ENUM_VALUES = 600

    def __init__(self, dataset_id: str, base_url: str,
                 string_fields: List[str] = None,
                 float_fields: List[str] = None,
                 datetime_fields: List[str] = None,
                 enum_fields: List[str] = None,
                 multi_enum_fields: List[str] = None,
                 link_fields: List[str] = None,
                 batch_size: int = 50000,
                 api_key: str = None,
                 api_secret: str = None):
        """
        Initialize uploader with field type specifications.

        Args:
            dataset_id: Visual Layer dataset ID
            base_url: Base URL for Visual Layer API
            string_fields: List of field names to treat as strings
            float_fields: List of field names to treat as floats
            datetime_fields: List of field names to treat as datetimes (ISO 8601 UTC)
            enum_fields: List of field names to treat as enums (single-select)
            multi_enum_fields: List of field names to treat as multi-enums
            link_fields: List of field names to treat as links (URLs)
            batch_size: Number of records per batch (default: 50000)
            api_key: API key for Visual Layer Cloud authentication (optional)
            api_secret: API secret for Visual Layer Cloud authentication (optional)
        """
        self.dataset_id = dataset_id
        self.raw_base_url = base_url.rstrip('/')
        self.batch_size = batch_size

        # Build field type mappings
        self.field_types = {}
        for field in (string_fields or []):
            self.field_types[field] = 'string'
        for field in (float_fields or []):
            self.field_types[field] = 'float'
        for field in (datetime_fields or []):
            self.field_types[field] = 'datetime'
        for field in (enum_fields or []):
            self.field_types[field] = 'enum'
        for field in (multi_enum_fields or []):
            self.field_types[field] = 'multi-enum'
        for field in (link_fields or []):
            self.field_types[field] = 'link'

        # Add /api/v1/datasets to base URL if not present
        if not base_url.endswith('/api/v1/datasets'):
            self.base_url = f"{base_url.rstrip('/')}/api/v1/datasets"
        else:
            self.base_url = base_url

        self.session = requests.Session()
        self._temp_files = []

        # Add authentication headers if provided
        if api_key and api_secret:
            jwt_token = self._generate_jwt(api_key, api_secret)
            self.session.headers.update({
                'Authorization': f'Bearer {jwt_token}'
            })

    def _generate_jwt(self, api_key: str, api_secret: str) -> str:
        """Generate JWT token for Visual Layer API authentication."""
        jwt_algorithm = "HS256"
        jwt_header = {
            'alg': jwt_algorithm,
            'typ': 'JWT',
            'kid': api_key,
        }

        now = datetime.now(tz=timezone.utc)
        expiration = now + timedelta(minutes=10)

        payload = {
            'sub': api_key,
            'iat': int(now.timestamp()),
            'exp': int(expiration.timestamp()),
            'iss': 'sdk'
        }

        return jwt.encode(payload=payload, key=api_secret, algorithm=jwt_algorithm, headers=jwt_header)

    def export_dataset(self) -> Dict[str, str]:
        """Export dataset and return mapping of filename -> media_id."""
        print("📤 Exporting dataset to get media_id mappings...")

        url = f"{self.raw_base_url}/api/v1/dataset/{self.dataset_id}/export_media_id"

        try:
            response = self.session.get(url)
            if response.status_code == 200:
                csv_content = response.text
                csv_reader = csv.DictReader(csv_content.splitlines())

                mapping = {}
                for row in csv_reader:
                    filename = row.get('filename', '')
                    media_id = row.get('media_id', '')
                    if media_id and filename:
                        mapping[filename] = media_id

                print(f"   ✅ Exported {len(mapping)} media items")
                return mapping
            else:
                print(f"   ❌ Failed to export dataset: {response.status_code} - {response.text}")
                return {}
        except Exception as e:
            print(f"   ❌ Export failed: {str(e)}")
            return {}

    def validate_float(self, value: str, field_name: str, row_num: int) -> float:
        """Validate and convert float value."""
        try:
            return float(value)
        except (ValueError, TypeError):
            raise ValidationError(
                f"Row {row_num}, field '{field_name}': Invalid float value '{value}'. "
                f"Must be a valid number."
            )

    def validate_datetime(self, value: str, field_name: str, row_num: int) -> str:
        """
        Validate datetime format (ISO 8601 UTC).

        Accepts formats:
        - YYYY-MM-DDTHH:MM:SSZ (e.g., 2024-01-15T10:30:00Z)
        - YYYY-MM-DD (will be converted to YYYY-MM-DDT00:00:00Z)
        """
        if not value or not value.strip():
            raise ValidationError(
                f"Row {row_num}, field '{field_name}': Empty datetime value"
            )

        value = value.strip()

        # Try to parse common datetime formats
        formats_to_try = [
            '%Y-%m-%dT%H:%M:%SZ',       # 2024-01-15T10:30:00Z
            '%Y-%m-%dT%H:%M:%S',        # 2024-01-15T10:30:00
            '%Y-%m-%d %H:%M:%S',        # 2024-01-15 10:30:00
            '%Y-%m-%d',                 # 2024-01-15
        ]

        for fmt in formats_to_try:
            try:
                dt = datetime.strptime(value, fmt)
                # Always return ISO 8601 UTC format
                return dt.strftime('%Y-%m-%dT%H:%M:%SZ')
            except ValueError:
                continue

        raise ValidationError(
            f"Row {row_num}, field '{field_name}': Invalid datetime format '{value}'. "
            f"Expected ISO 8601 format: YYYY-MM-DDTHH:MM:SSZ or YYYY-MM-DD"
        )

    def validate_enum(self, value: str, field_name: str, row_num: int) -> str:
        """Validate enum value."""
        if not value or not value.strip():
            raise ValidationError(
                f"Row {row_num}, field '{field_name}': Empty enum value"
            )

        value = value.strip()
        return value

    def validate_multi_enum(self, value: str, field_name: str, row_num: int) -> List[str]:
        """
        Validate multi-enum value (list of strings).

        Accepts formats:
        - Comma-separated: "tag1, tag2, tag3"
        - JSON array: ["tag1", "tag2", "tag3"]
        """
        if not value or not value.strip():
            raise ValidationError(
                f"Row {row_num}, field '{field_name}': Empty multi-enum value"
            )

        value = value.strip()

        # Try to parse as JSON array first
        if value.startswith('[') and value.endswith(']'):
            try:
                parsed = json.loads(value)
                if not isinstance(parsed, list):
                    raise ValidationError(
                        f"Row {row_num}, field '{field_name}': Multi-enum must be a list"
                    )
                values = [str(v).strip() for v in parsed]
            except json.JSONDecodeError:
                raise ValidationError(
                    f"Row {row_num}, field '{field_name}': Invalid JSON array format"
                )
        else:
            # Parse as comma-separated values
            values = [v.strip() for v in value.split(',')]

        # Validate each value
        for v in values:
            if not v:
                raise ValidationError(
                    f"Row {row_num}, field '{field_name}': Empty value in multi-enum list"
                )

        return values

    def validate_link(self, value: str, field_name: str, row_num: int) -> str:
        """Validate URL format for link field."""
        if not value or not value.strip():
            raise ValidationError(
                f"Row {row_num}, field '{field_name}': Empty link value"
            )

        value = value.strip()

        # Parse URL
        try:
            parsed = urlparse(value)
            # Check if it has a scheme (http, https, etc.)
            if not parsed.scheme:
                raise ValidationError(
                    f"Row {row_num}, field '{field_name}': Invalid URL '{value}'. "
                    f"Must include protocol (e.g., http:// or https://)"
                )
            return value
        except Exception:
            raise ValidationError(
                f"Row {row_num}, field '{field_name}': Invalid URL format '{value}'"
            )

    def validate_and_convert_value(self, value: str, field_name: str,
                                   field_type: str, row_num: int) -> Any:
        """Validate and convert value based on field type."""
        # Skip empty values
        if not value or (isinstance(value, str) and not value.strip()):
            return None

        try:
            if field_type == 'float':
                return self.validate_float(value, field_name, row_num)
            elif field_type == 'datetime':
                return self.validate_datetime(value, field_name, row_num)
            elif field_type == 'enum':
                return self.validate_enum(value, field_name, row_num)
            elif field_type == 'multi-enum':
                return self.validate_multi_enum(value, field_name, row_num)
            elif field_type == 'link':
                return self.validate_link(value, field_name, row_num)
            elif field_type == 'string':
                return str(value).strip()
            else:
                return str(value).strip()
        except ValidationError:
            raise
        except Exception as e:
            raise ValidationError(
                f"Row {row_num}, field '{field_name}': Unexpected validation error: {str(e)}"
            )

    def read_csv(self, csv_path: str, filename_to_media_id: Dict[str, str]) -> Dict[str, Dict[str, Any]]:
        """
        Read CSV file and return validated metadata dictionary.

        Returns:
            Dict mapping file_fullpath -> metadata
        """
        print(f"📖 Reading and validating CSV file: {csv_path}")

        if not self.field_types:
            raise ValueError(
                "No field types specified. Please provide at least one field type using "
                "--string-fields, --float-fields, --datetime-fields, --enum-fields, "
                "--multi-enum-fields, or --link-fields"
            )

        try:
            # Load CSV with pandas for analysis
            df = pd.read_csv(csv_path)

            print(f"   ✅ Loaded {len(df)} rows")

            # Check enum/multi-enum fields for unique value count
            print(f"   🔍 Checking enum field constraints...")
            enum_errors = []

            for field_name, field_type in self.field_types.items():
                if field_type in ['enum', 'multi-enum']:
                    if field_name not in df.columns:
                        enum_errors.append(f"Field '{field_name}' not found in CSV columns")
                        continue

                    if field_type == 'enum':
                        # For enum, count unique values directly
                        unique_count = df[field_name].nunique()
                        if unique_count > self.MAX_ENUM_VALUES:
                            enum_errors.append(
                                f"Field '{field_name}' (enum): Has {unique_count} unique values, "
                                f"exceeds limit of {self.MAX_ENUM_VALUES}"
                            )
                        else:
                            print(f"      ✅ {field_name} (enum): {unique_count} unique values")

                    elif field_type == 'multi-enum':
                        # For multi-enum, collect all unique values across all lists
                        unique_values = set()
                        for value in df[field_name].dropna():
                            if isinstance(value, str):
                                # Parse the value
                                if value.strip().startswith('['):
                                    # JSON array format
                                    try:
                                        parsed = json.loads(value.strip())
                                        if isinstance(parsed, list):
                                            unique_values.update(str(v).strip() for v in parsed)
                                    except json.JSONDecodeError:
                                        pass
                                else:
                                    # Comma-separated format
                                    unique_values.update(v.strip() for v in value.split(',') if v.strip())

                        unique_count = len(unique_values)
                        if unique_count > self.MAX_ENUM_VALUES:
                            enum_errors.append(
                                f"Field '{field_name}' (multi-enum): Has {unique_count} unique values "
                                f"across all lists, exceeds limit of {self.MAX_ENUM_VALUES}"
                            )
                        else:
                            print(f"      ✅ {field_name} (multi-enum): {unique_count} unique values")

            if enum_errors:
                print(f"\n   ❌ Enum Constraint Errors ({len(enum_errors)} total):")
                for error in enum_errors:
                    print(f"      • {error}")
                print()
                raise ValidationError(
                    f"Enum fields exceed the {self.MAX_ENUM_VALUES} unique values limit. "
                    f"Please reduce unique values or use 'string' field type instead."
                )

            print(f"   🔍 Validating {len(self.field_types)} metadata fields...")

            # Print field type configuration
            print(f"\n   📋 Field Configuration:")
            for field_name, field_type in sorted(self.field_types.items()):
                print(f"      • {field_name}: {field_type}")
            print()

            # Convert back to list of dicts for row-by-row processing
            rows = df.to_dict('records')

        except ValidationError:
            raise
        except Exception as e:
            print(f"   ❌ Failed to read CSV: {str(e)}")
            return {}

        metadata_by_path = {}
        skipped_count = 0
        validation_errors = []

        for row_num, row in enumerate(rows, start=2):  # Start at 2 (header is row 1)
            # Get file_fullpath for mapping
            file_fullpath = row.get('file_fullpath', '')
            if not file_fullpath:
                skipped_count += 1
                validation_errors.append(f"Row {row_num}: Missing 'file_fullpath' column")
                continue

            # Skip if not in Visual Layer dataset
            if file_fullpath not in filename_to_media_id:
                skipped_count += 1
                continue

            # Validate and build metadata dictionary
            flattened = {}
            for field_name, field_type in self.field_types.items():
                value = row.get(field_name, '')

                try:
                    validated_value = self.validate_and_convert_value(
                        value, field_name, field_type, row_num
                    )

                    # Only add non-None values
                    if validated_value is not None:
                        flattened[field_name] = validated_value

                except ValidationError as e:
                    validation_errors.append(str(e))
                    # Stop at first validation error for this row
                    break

            # Only add if we have at least one metadata field
            if flattened:
                metadata_by_path[file_fullpath] = flattened

        # Report validation errors
        if validation_errors:
            print(f"\n   ❌ Validation Errors ({len(validation_errors)} total):")
            # Show first 10 errors
            for error in validation_errors[:10]:
                print(f"      • {error}")
            if len(validation_errors) > 10:
                print(f"      • ... and {len(validation_errors) - 10} more errors")
            print()
            raise ValidationError(f"CSV validation failed with {len(validation_errors)} errors")

        if skipped_count > 0:
            print(f"   ⚠️  Skipped {skipped_count} rows not in Visual Layer dataset")

        print(f"   ✅ Validated {len(metadata_by_path)} rows successfully\n")
        return metadata_by_path

    def create_custom_field(self, field_name: str, field_type: str,
                           metadata_by_path: Dict[str, Dict[str, Any]]) -> Optional[str]:
        """Create a custom field and return field_id (task_id)."""
        print(f"   🔧 Creating field: {field_name} ({field_type})")

        field_data = {
            "field_name": field_name,
            "field_type": field_type
        }

        # For enum/multi-enum fields, collect unique values from the data
        if field_type in ['enum', 'multi-enum']:
            unique_values = set()
            for metadata in metadata_by_path.values():
                if field_name in metadata:
                    value = metadata[field_name]
                    if field_type == 'multi-enum' and isinstance(value, list):
                        unique_values.update(value)
                    elif value and isinstance(value, str):
                        unique_values.add(value)

            enum_options = sorted(list(unique_values))
            field_data["enum_options"] = enum_options

            if field_type == 'multi-enum':
                field_data["field_type"] = 'enum'  # API expects 'enum' for both
                field_data["is_multi"] = True
            else:
                field_data["is_multi"] = False

            print(f"      📋 Enum options ({len(enum_options)}): {enum_options[:10]}{' ...' if len(enum_options) > 10 else ''}")

        url = f"{self.base_url}/{self.dataset_id}/custom_metadata/tasks"

        try:
            response = self.session.post(url, json=field_data)
            if response.status_code == 200:
                result = response.json()
                task_id = result.get('task_id')
                print(f"      ✅ Created (task_id: {task_id})")
                return task_id
            elif "already exists" in response.text:
                print(f"      🔄 Field already exists, skipping")
                return None
            else:
                print(f"      ❌ Failed: {response.status_code} - {response.text}")
                return None
        except Exception as e:
            print(f"      ❌ Request failed: {str(e)}")
            return None

    def upload_field_data(self, field_id: str, field_name: str, field_type: str,
                         metadata_by_path: Dict[str, Dict[str, Any]],
                         filename_to_media_id: Dict[str, str]) -> Optional[str]:
        """Upload data for a custom field in a single request."""
        upload_data = []

        for file_fullpath, metadata in metadata_by_path.items():
            media_id = filename_to_media_id.get(file_fullpath)
            if not media_id or field_name not in metadata:
                continue

            value = metadata[field_name]
            if value is None or value == '':
                continue

            upload_data.append({
                "media_id": media_id,
                "value": value
            })

        if not upload_data:
            print(f"      ⚠️  No data to upload")
            return None

        print(f"      📤 Uploading {len(upload_data)} values")

        url = f"{self.base_url}/{self.dataset_id}/custom_metadata/tasks/{field_id}"

        # Save data to temp file
        with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f:
            json.dump(upload_data, f, indent=2)
            temp_file = f.name

        self._temp_files.append(temp_file)

        # Upload
        try:
            with open(temp_file, 'rb') as f:
                files = {'file': ('metadata.json', f, 'application/json')}
                response = self.session.post(url, files=files)

            if response.status_code not in [200, 202]:
                print(f"      ❌ Upload failed: {response.status_code} - {response.text}")
                return None

            print(f"      ✅ Upload completed")
            return field_id

        except Exception as e:
            print(f"      ❌ Upload failed: {str(e)}")
            return None

    def wait_for_task_completion(self, task_id: str, field_name: str,
                                 polling_interval: int = 5) -> str:
        """Wait for task completion by polling status endpoint."""
        url = f"{self.base_url}/{self.dataset_id}/custom_metadata/tasks/{task_id}/status"

        while True:
            try:
                response = self.session.get(url)
                if response.status_code == 200:
                    result = response.json()
                    status = result.get('status', 'unknown')

                    if status == 'COMPLETED':
                        inserted_rows = result.get('inserted_rows', 0)
                        print(f"      ✅ Completed ({inserted_rows} rows)")
                        return 'COMPLETED'
                    elif status == 'COMPLETED_WITH_ERRORS':
                        error_count = result.get('error_count', 0)
                        inserted_rows = result.get('inserted_rows', 0)
                        print(f"      ⚠️  Completed with {error_count} errors ({inserted_rows} rows)")
                        return 'COMPLETED_WITH_ERRORS'
                    elif status == 'IN_PROGRESS':
                        time.sleep(polling_interval)
                        continue
                    else:
                        time.sleep(polling_interval)
                        continue
                else:
                    print(f"      ❌ Status check failed")
                    return 'error'
            except Exception:
                return 'error'

    def cleanup_temp_files(self):
        """Remove temporary files."""
        for temp_file in self._temp_files:
            try:
                if os.path.exists(temp_file):
                    os.remove(temp_file)
            except OSError:
                pass

    def process_workflow(self, csv_path: str):
        """Main workflow: export → read CSV → validate → create fields → upload data."""
        try:
            print("=" * 70)
            print("🚀 Generic CSV Metadata Upload to Visual Layer")
            print("=" * 70)
            print(f"📁 CSV File: {csv_path}")
            print(f"🎯 Dataset ID: {self.dataset_id}")
            print(f"🌐 Base URL: {self.raw_base_url}")
            print("=" * 70)
            print()

            # Step 1: Export dataset to get media_id mappings
            filename_to_media_id = self.export_dataset()
            if not filename_to_media_id:
                raise Exception("Failed to export dataset")
            print()

            # Step 2: Read CSV, validate, and build metadata
            metadata_by_path = self.read_csv(csv_path, filename_to_media_id)
            if not metadata_by_path:
                raise Exception("No valid rows found in CSV after validation")

            # Step 3: Process each field
            print(f"🔧 Creating and uploading {len(self.field_types)} custom fields...")
            print()

            completed_count = 0
            failed_fields = []

            for field_name, field_type in self.field_types.items():
                print(f"📝 Processing: {field_name} ({field_type})")

                try:
                    # Create field
                    field_id = self.create_custom_field(field_name, field_type, metadata_by_path)
                    if not field_id:
                        print(f"   ⏭️  Skipping (field exists or creation failed)\n")
                        continue

                    # Upload data
                    task_id = self.upload_field_data(field_id, field_name, field_type,
                                                    metadata_by_path, filename_to_media_id)
                    if not task_id:
                        failed_fields.append(field_name)
                        print(f"   ❌ Upload failed\n")
                        continue

                    # Wait for completion
                    status = self.wait_for_task_completion(task_id, field_name)
                    if status in ['COMPLETED', 'COMPLETED_WITH_ERRORS']:
                        completed_count += 1
                    else:
                        failed_fields.append(field_name)

                    print()  # Blank line between fields

                except Exception as e:
                    print(f"   ❌ Error: {str(e)}\n")
                    failed_fields.append(field_name)
                    continue

            # Final summary
            print("=" * 70)
            print("🎉 Workflow Completed!")
            print("=" * 70)
            print(f"✅ Successfully uploaded: {completed_count}/{len(self.field_types)} fields")
            print(f"📊 Total rows processed: {len(metadata_by_path)}")

            if failed_fields:
                print(f"❌ Failed fields ({len(failed_fields)}): {', '.join(failed_fields)}")

            print("=" * 70)

        except ValidationError as e:
            print(f"\n❌ Validation failed: {str(e)}")
            sys.exit(1)
        except Exception as e:
            print(f"\n❌ Workflow failed: {str(e)}")
            sys.exit(1)
        finally:
            self.cleanup_temp_files()


def main():
    parser = argparse.ArgumentParser(
        description='Generic CSV metadata upload to Visual Layer with field type validation',
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""
Example usage:

  # Basic upload with different field types
  python custom_metadata_upload_script.py \\
    --csv metadata.csv \\
    --dataset-id abc-123 \\
    --base-url http://localhost:8080 \\
    --string-fields patient_id notes description \\
    --float-fields score confidence temperature \\
    --datetime-fields created_at updated_at \\
    --enum-fields status category priority \\
    --multi-enum-fields tags labels keywords \\
    --link-fields pdf_url documentation_link

  # Simple upload with just strings and enums
  python custom_metadata_upload_script.py \\
    --csv data.csv \\
    --dataset-id abc-123 \\
    --base-url http://localhost:8080 \\
    --string-fields name description \\
    --enum-fields category status

CSV Format Requirements:
  - Must have a 'file_fullpath' column matching Visual Layer export format
  - File paths should match those returned by the dataset export API
  - Datetime values: YYYY-MM-DDTHH:MM:SSZ or YYYY-MM-DD
  - Float values: Valid decimal numbers
  - Enum values: Max 20 characters each
  - Multi-enum values: Comma-separated or JSON array ["val1", "val2"]
  - Link values: Must include protocol (http:// or https://)

Field Type Documentation:
  https://docs.visual-layer.com/docs/Creating-Datasets/custom-metadata
        """
    )

    parser.add_argument('--csv', required=True,
                       help='Path to CSV file with metadata')
    parser.add_argument('--dataset-id', required=True,
                       help='Visual Layer dataset ID')
    parser.add_argument('--base-url', default='http://localhost:8080',
                       help='Base URL (default: http://localhost:8080)')
    parser.add_argument('--batch-size', type=int, default=50000,
                       help='Records per upload batch (default: 50000)')
    parser.add_argument('--api-key', default=None,
                       help='API key for Visual Layer Cloud authentication')
    parser.add_argument('--api-secret', default=None,
                       help='API secret for Visual Layer Cloud authentication')

    # Field type specifications
    parser.add_argument('--string-fields', nargs='*', default=[],
                       help='List of fields to treat as strings')
    parser.add_argument('--float-fields', nargs='*', default=[],
                       help='List of fields to treat as floats (decimal numbers)')
    parser.add_argument('--datetime-fields', nargs='*', default=[],
                       help='List of fields to treat as datetimes (ISO 8601 UTC format)')
    parser.add_argument('--enum-fields', nargs='*', default=[],
                       help='List of fields to treat as enums (single-select, max 20 chars)')
    parser.add_argument('--multi-enum-fields', nargs='*', default=[],
                       help='List of fields to treat as multi-enums (multi-select, max 20 chars each)')
    parser.add_argument('--link-fields', nargs='*', default=[],
                       help='List of fields to treat as links (URLs)')

    args = parser.parse_args()

    # Validate that at least one field type is specified
    if not any([args.string_fields, args.float_fields, args.datetime_fields,
                args.enum_fields, args.multi_enum_fields, args.link_fields]):
        parser.error(
            "At least one field type must be specified. Use --string-fields, --float-fields, "
            "--datetime-fields, --enum-fields, --multi-enum-fields, or --link-fields"
        )

    # Create uploader and run workflow
    uploader = CSVMetadataUploader(
        dataset_id=args.dataset_id,
        base_url=args.base_url,
        string_fields=args.string_fields,
        float_fields=args.float_fields,
        datetime_fields=args.datetime_fields,
        enum_fields=args.enum_fields,
        multi_enum_fields=args.multi_enum_fields,
        link_fields=args.link_fields,
        batch_size=args.batch_size,
        api_key=args.api_key,
        api_secret=args.api_secret
    )

    uploader.process_workflow(args.csv)


if __name__ == "__main__":
    main()