Skip to main content

Custom Metadata Upload to Visual Layer

This page contains the complete, ready-to-use Python script for uploading custom metadata from a folder containing images and their corresponding metadata JSON files to Visual Layer datasets with automatic field discovery and type detection.
This script demonstrates how to upload custom metadata to Visual Layer by scanning a folder for images and their corresponding .metadata.json files. The script automatically exports the dataset, discovers all metadata fields, and intelligently detects data types without manual configuration.

Back to Custom Metadata Automation Documentation

Return to the main custom metadata automation guide for setup instructions, usage examples, and workflow details.

Key Features

  • Automatic Dataset Export - No manual export step required
  • Auto-Discovery Mode - Automatically finds and processes all metadata fields
  • Manual Override - Specify specific fields with --field flag when needed
  • Intelligent Type Detection - Detects float, datetime, enum, multi-enum, link, and string types
  • Robust Error Handling - Falls back to string type if conversion fails
  • String Truncation - Automatically handles Visual Layer’s 255-character limit

Installation Requirements

Before using this script, install the required Python packages:
pip install pandas requests

Usage

Save this script as upload_metadata_from_folder.py and run:
# Auto-detect all fields (recommended)
python upload_metadata_from_folder.py \
  --folder /path/to/folder \
  --dataset-id your-dataset-id \
  --base-url http://localhost:8080

# Manually specify specific fields
python upload_metadata_from_folder.py \
  --folder /path/to/folder \
  --dataset-id your-dataset-id \
  --base-url http://localhost:8080 \
  --field annotated_at datetime \
  --field confidence float \
  --field category enum

Expected Folder Structure

The script expects images and their corresponding metadata files:
your-folder/
├── image1.jpg
├── image1.jpg.metadata.json
├── image2.png
├── image2.png.metadata.json
└── image3.jpg
    └── image3.jpg.metadata.json
Each .metadata.json file should contain field-value pairs:
{
  "annotated_at": "2024-01-15T10:30:00Z",
  "confidence": 0.95,
  "category": "approved",
  "tags": ["quality", "verified"]
}
This is an example script designed for on-premises Visual Layer installations. You can modify the field detection logic and processing to suit your specific needs.

Complete Script Code

#!/usr/bin/env python3
"""
Folder-based Metadata Upload for Visual Layer
Scans a folder for images and their corresponding .metadata.json files.
Automatically discovers and uploads all metadata fields with intelligent type detection.

Features:
- Auto-detection: Discovers all fields and detects types (float, datetime, enum, string, etc.)
- Manual override: Specify fields manually with --field flag
- Robust: Falls back to string type if conversion fails

Usage:
    # Auto-detect all fields (recommended):
    python upload_metadata_from_folder.py \\
        --folder "path/to/folder" \\
        --dataset-id <dataset-id> \\
        --base-url http://localhost:8080

    # Manual field specification:
    python upload_metadata_from_folder.py \\
        --folder "path/to/folder" \\
        --dataset-id <dataset-id> \\
        --base-url http://localhost:8080 \\
        --field annotated_at datetime \\
        --field conf_classification float
"""

import json
import requests
import argparse
import os
import sys
import pandas as pd
import time
from typing import Dict, List, Any, Optional, Tuple
from pathlib import Path

class FolderMetadataProcessor:
    def __init__(self, dataset_id: str, base_url: str):
        self.dataset_id = dataset_id
        self.raw_base_url = base_url.rstrip('/')

        # Automatically add /api/v1/datasets if not present
        if not base_url.endswith('/api/v1/datasets'):
            if base_url.endswith('/'):
                base_url = base_url.rstrip('/')
            self.base_url = f"{base_url}/api/v1/datasets"
        else:
            self.base_url = base_url

        self.session = requests.Session()
        self._temp_files = []

    def export_dataset(self) -> Dict[str, str]:
        """Export dataset and return mapping of filename -> media_id."""
        print("📤 Exporting dataset to get media_id mappings...")

        # Use the export_media_id endpoint to get CSV with filename -> media_id mapping
        # Note: This endpoint uses singular 'dataset' not 'datasets'
        url = f"{self.raw_base_url}/api/v1/dataset/{self.dataset_id}/export_media_id"

        try:
            response = self.session.get(url)
            if response.status_code == 200:
                # Parse CSV response
                import csv
                import io

                csv_content = response.text
                csv_reader = csv.DictReader(io.StringIO(csv_content))

                # Build mapping from filename to media_id
                mapping = {}
                for row in csv_reader:
                    filename = row.get('filename', '')
                    media_id = row.get('media_id', '')

                    if media_id and filename:
                        # Extract just the filename without path
                        basename = os.path.basename(filename)
                        mapping[basename] = media_id

                print(f"   ✅ Exported {len(mapping)} media items")
                return mapping
            else:
                print(f"   ❌ Failed to export dataset: {response.status_code} - {response.text}")
                return {}
        except requests.RequestException as e:
            print(f"   ❌ Export request failed: {str(e)}")
            return {}
        except Exception as e:
            print(f"   ❌ Failed to parse export CSV: {str(e)}")
            return {}

    def scan_folder(self, folder_path: str) -> List[Tuple[str, str]]:
        """Scan folder and return list of (image_path, metadata_path) tuples."""
        print(f"🔍 Scanning folder: {folder_path}")

        folder = Path(folder_path)
        if not folder.exists():
            raise FileNotFoundError(f"Folder not found: {folder_path}")

        pairs = []

        # Find all .metadata.json files
        for metadata_file in folder.glob("*.metadata.json"):
            # Extract original image filename
            # e.g., "image.jpg.metadata.json" -> "image.jpg"
            image_name = metadata_file.name.replace(".metadata.json", "")
            image_path = folder / image_name

            # Check if corresponding image exists
            if image_path.exists():
                pairs.append((str(image_path), str(metadata_file)))
            else:
                print(f"   ⚠️  Found metadata without image: {metadata_file.name}")

        print(f"   ✅ Found {len(pairs)} image + metadata pairs")
        return pairs

    def load_metadata_files(self, pairs: List[Tuple[str, str]]) -> Dict[str, Dict[str, Any]]:
        """Load all metadata.json files and return dict of filename -> metadata."""
        print("📖 Loading metadata files...")

        metadata_by_file = {}

        for image_path, metadata_path in pairs:
            image_filename = os.path.basename(image_path)

            try:
                with open(metadata_path, 'r', encoding='utf-8') as f:
                    metadata = json.load(f)
                    metadata_by_file[image_filename] = metadata
            except Exception as e:
                print(f"   ⚠️  Failed to load {metadata_path}: {str(e)}")
                continue

        print(f"   ✅ Loaded {len(metadata_by_file)} metadata files")
        return metadata_by_file

    def discover_and_analyze_fields(self, metadata_by_file: Dict[str, Dict[str, Any]]) -> Dict[str, str]:
        """Discover all fields and auto-detect their types."""
        print("🔍 Discovering and analyzing all fields...")

        # Collect all unique field names
        all_fields = set()
        for filename, metadata in metadata_by_file.items():
            all_fields.update(metadata.keys())

        print(f"   📋 Found {len(all_fields)} unique fields")

        field_types = {}

        # Analyze each field
        for field_name in sorted(all_fields):
            # Collect sample values (more samples for better enum detection)
            sample_values = []
            for filename, metadata in metadata_by_file.items():
                if field_name in metadata:
                    sample_values.append(metadata[field_name])
                    if len(sample_values) >= 100:  # Sample up to 100 values for enum detection
                        break

            # Detect type
            field_type = self._auto_detect_field_type(field_name, sample_values)
            field_types[field_name] = field_type
            print(f"      {field_name}: {field_type}")

        print(f"   ✅ Analyzed {len(field_types)} fields")
        return field_types

    def _auto_detect_field_type(self, field_name: str, sample_values: List[Any]) -> str:
        """Auto-detect field type based on sample values."""
        if not sample_values:
            return 'string'

        # Get first non-null value
        first_value = None
        for val in sample_values:
            if val is not None and val != '':
                first_value = val
                break

        if first_value is None:
            return 'string'

        # Check type based on value patterns
        if self._is_float(first_value):
            return 'float'
        elif self._is_date(first_value):
            return 'datetime'
        elif self._is_url(first_value):
            # Check if most values are URLs
            url_count = sum(1 for v in sample_values if self._is_url(v))
            if url_count / len(sample_values) >= 0.7:
                return 'link'
            else:
                return 'string'
        elif self._is_list(first_value):
            # Check if it's multi-enum or string
            unique_values = set()
            for val in sample_values:
                if self._is_list(val):
                    list_items = self._parse_list_value(val)
                    unique_values.update(list_items)

            if len(unique_values) <= 20:
                return 'multi-enum'
            else:
                return 'string'
        else:
            # Check if it's an enum (limited unique string values)
            unique_values = set()
            for val in sample_values:
                if val is not None and val != '':
                    # Only consider string-like values for enum (skip dicts/lists)
                    if not isinstance(val, (dict, list)):
                        unique_values.add(str(val).strip())

            # If ≤100 unique values and has at least some values, treat as enum
            if len(unique_values) > 0 and len(unique_values) <= 100:
                return 'enum'
            else:
                # Default to string
                return 'string'

    def _is_float(self, value: Any) -> bool:
        """Check if value can be converted to float."""
        if not isinstance(value, (str, int, float)):
            return False
        try:
            float_val = float(value)
            # Check if it has decimal point (to distinguish from int)
            return '.' in str(value) or 'e' in str(value).lower()
        except (ValueError, TypeError):
            return False

    def _is_date(self, value: Any) -> bool:
        """Check if value looks like a date using pandas parsing."""
        if not isinstance(value, str) or not value.strip():
            return False

        try:
            pd.to_datetime(value.strip())
            return True
        except (ValueError, TypeError, pd.errors.ParserError):
            return False

    def _is_url(self, value: Any) -> bool:
        """Check if value is a valid absolute URL with protocol."""
        if not isinstance(value, str):
            return False

        value_lower = value.strip().lower()

        # ONLY accept actual URLs starting with protocol
        return value_lower.startswith(('http://', 'https://', 'ftp://'))

    def _is_list(self, value: Any) -> bool:
        """Check if value looks like a list."""
        # If it's already a list, return True
        if isinstance(value, list):
            return True

        if not isinstance(value, str):
            return False

        value = value.strip()

        # Check for standard list formats
        if value.startswith('[') and value.endswith(']'):
            try:
                import ast
                ast.literal_eval(value)
                return True
            except (ValueError, SyntaxError):
                return False
        return False

    def _parse_list_value(self, value: Any) -> List[str]:
        """Parse list value."""
        if isinstance(value, list):
            return [str(v) for v in value]

        if not isinstance(value, str):
            return [str(value)]

        value = value.strip()

        try:
            import ast
            parsed = ast.literal_eval(value)
            if isinstance(parsed, list):
                return [str(v) for v in parsed]
        except (ValueError, SyntaxError):
            pass

        return [value]

    def validate_user_fields(self, user_fields: Dict[str, str],
                            metadata_by_file: Dict[str, Dict[str, Any]]) -> Dict[str, str]:
        """Validate user-specified fields exist and types are correct."""
        print("🔍 Validating user-specified fields...")

        validated_fields = {}

        for field_name, field_type in user_fields.items():
            print(f"   Checking field: {field_name} ({field_type})")

            # Check if field exists in at least one metadata file
            field_exists = False
            sample_values = []

            for filename, metadata in metadata_by_file.items():
                if field_name in metadata:
                    field_exists = True
                    sample_values.append(metadata[field_name])
                    if len(sample_values) >= 10:  # Sample up to 10 values
                        break

            if not field_exists:
                print(f"      ❌ Field '{field_name}' not found in any metadata file")
                continue

            # Validate type with sample values
            type_valid = self._validate_field_type(field_name, field_type, sample_values)

            if type_valid:
                validated_fields[field_name] = field_type
                print(f"      ✅ Valid")
            else:
                print(f"      ❌ Type validation failed for '{field_name}'")

        print(f"   ✅ Validated {len(validated_fields)}/{len(user_fields)} fields")
        return validated_fields

    def _validate_field_type(self, field_name: str, field_type: str,
                            sample_values: List[Any]) -> bool:
        """Validate that sample values match expected field type."""
        if not sample_values:
            return True  # Empty is ok

        if field_type == 'string':
            return True  # String always works

        if field_type == 'datetime':
            # Try parsing with pandas
            for value in sample_values:
                if value is None or value == '':
                    continue
                try:
                    pd.to_datetime(str(value))
                except Exception:
                    return False
            return True

        if field_type == 'float':
            # Try converting to float
            for value in sample_values:
                if value is None or value == '':
                    continue
                try:
                    float(value)
                except Exception:
                    return False
            return True

        if field_type in ['enum', 'multi-enum']:
            # Collect unique values
            unique_values = set()
            for value in sample_values:
                if value is None or value == '':
                    continue
                if field_type == 'multi-enum':
                    if isinstance(value, list):
                        unique_values.update([str(v) for v in value])
                    else:
                        return False  # multi-enum must be array
                else:
                    unique_values.add(str(value))

            # Check if within API limit
            if len(unique_values) > 20:
                print(f"      ⚠️  Warning: {len(unique_values)} unique values (max 20 for enum)")
                return False

            return True

        if field_type == 'link':
            # Basic URL validation
            for value in sample_values:
                if value is None or value == '':
                    continue
                value_str = str(value)
                if not (value_str.startswith('http://') or
                       value_str.startswith('https://') or
                       value_str.startswith('ftp://')):
                    # Could be file path, allow it
                    pass
            return True

        return True

    def create_custom_field(self, field_name: str, field_type: str,
                           metadata_by_file: Dict[str, Dict[str, Any]]) -> Optional[str]:
        """Create a custom field and return field_id."""
        print(f"🔧 Creating custom field: {field_name} ({field_type})")

        # Prepare field data
        field_data = {
            "field_name": field_name,
            "field_type": field_type
        }

        # Add enum values for enum types
        if field_type in ['enum', 'multi-enum']:
            unique_values = set()

            for filename, metadata in metadata_by_file.items():
                if field_name in metadata:
                    value = metadata[field_name]
                    if value is not None and value != '':
                        if field_type == 'multi-enum':
                            if isinstance(value, list):
                                unique_values.update([str(v) for v in value])
                        else:
                            unique_values.add(str(value))

            enum_values = list(unique_values)[:20]  # Limit to API max
            field_data["enum_options"] = enum_values

            if field_type == 'multi-enum':
                field_data["field_type"] = 'enum'  # API only accepts 'enum'
                field_data["is_multi"] = True
            else:
                field_data["is_multi"] = False

            print(f"   📝 Adding {len(enum_values)} enum values")

        url = f"{self.base_url}/{self.dataset_id}/custom_metadata/tasks"

        try:
            response = self.session.post(url, json=field_data)
            if response.status_code == 200:
                result = response.json()
                task_id = result.get('task_id')
                print(f"   ✅ Created field with task ID: {task_id}")
                return task_id
            elif "already exists" in response.text:
                print(f"   🔄 Field already exists, skipping creation")
                return None
            else:
                print(f"   ❌ Failed to create field: {response.status_code} - {response.text}")
                return None
        except requests.RequestException as e:
            print(f"   ❌ Request failed: {str(e)}")
            return None

    def upload_field_data(self, field_id: str, field_name: str, field_type: str,
                         metadata_by_file: Dict[str, Dict[str, Any]],
                         filename_to_media_id: Dict[str, str]) -> Optional[str]:
        """Upload data for a custom field."""
        print(f"   📤 Uploading data for field: {field_name}")

        upload_data = []
        matched_count = 0
        skipped_count = 0

        for filename, metadata in metadata_by_file.items():
            # Get media_id for this file
            media_id = filename_to_media_id.get(filename)
            if not media_id:
                skipped_count += 1
                continue

            # Get field value
            if field_name not in metadata:
                continue

            value = metadata[field_name]
            if value is None or value == '':
                continue

            # Convert value based on field type
            try:
                converted_value = self._convert_value(value, field_name, field_type)
                if converted_value is not None:
                    upload_data.append({
                        "media_id": media_id,
                        "value": converted_value
                    })
                    matched_count += 1
            except Exception as e:
                print(f"      ⚠️  Failed to convert value for {filename}: {str(e)}")
                skipped_count += 1
                continue

        print(f"   📊 Processed {matched_count} files, skipped {skipped_count}")

        if not upload_data:
            print(f"   ⚠️  No data to upload for field {field_name}")
            return None

        # Save to temp file
        import tempfile
        with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f:
            json.dump(upload_data, f, indent=2)
            temp_file = f.name

        self._temp_files.append(temp_file)

        # Upload
        url = f"{self.base_url}/{self.dataset_id}/custom_metadata/tasks/{field_id}"

        try:
            with open(temp_file, 'rb') as f:
                files = {'file': (f'metadata_{field_name}.json', f, 'application/json')}
                response = self.session.post(url, files=files)

            if response.status_code in [200, 202]:
                print(f"   ✅ Upload completed successfully")
                return field_id
            else:
                print(f"   ❌ Failed to upload data: {response.status_code} - {response.text}")
                return None
        except requests.RequestException as e:
            print(f"   ❌ Request failed: {str(e)}")
            return None

    def _convert_value(self, value: Any, field_name: str, field_type: str) -> Any:
        """Convert value to appropriate type for upload. Falls back to string if conversion fails.
        Truncates string values to 255 characters (VL limit)."""
        try:
            if field_type == 'datetime':
                # Parse and format as ISO 8601
                dt = pd.to_datetime(str(value))
                if dt.tz is None:
                    return dt.strftime('%Y-%m-%dT%H:%M:%SZ')
                else:
                    return dt.tz_convert('UTC').strftime('%Y-%m-%dT%H:%M:%SZ')

            elif field_type == 'float':
                return float(value)

            elif field_type == 'multi-enum':
                if isinstance(value, list):
                    return [str(v).strip() for v in value]
                else:
                    return [str(value).strip()]

            elif field_type == 'enum':
                return str(value).strip()

            elif field_type == 'string':
                # Handle nested objects/arrays - serialize to JSON
                if isinstance(value, (dict, list)):
                    json_str = json.dumps(value)
                    if len(json_str) > 255:
                        return json_str[:255]
                    return json_str

                str_value = str(value).strip()
                if len(str_value) > 255:
                    return str_value[:255]
                return str_value

            elif field_type == 'link':
                return str(value).strip()

            return str(value)

        except Exception as e:
            # Fallback to string if conversion fails (with 255 char limit)
            if isinstance(value, (dict, list)):
                json_str = json.dumps(value)
                return json_str[:255] if len(json_str) > 255 else json_str

            str_value = str(value)
            return str_value[:255] if len(str_value) > 255 else str_value

    def check_task_status(self, task_id: str) -> str:
        """Check the status of an upload task."""
        url = f"{self.base_url}/{self.dataset_id}/custom_metadata/tasks/{task_id}/status"

        try:
            response = self.session.get(url)
            if response.status_code == 200:
                result = response.json()
                status = result.get('status', 'unknown')

                if status == 'COMPLETED_WITH_ERRORS':
                    error_count = result.get('error_count', 0)
                    inserted_rows = result.get('inserted_rows', 0)
                    print(f"   ⚠️  Task completed with {error_count} errors, {inserted_rows} rows inserted")

                    sample_errors = result.get('sample_errors', [])
                    if sample_errors:
                        print(f"   📋 Sample errors:")
                        for error in sample_errors[:3]:
                            print(f"      - Row {error.get('row_index', '?')}: {error.get('reason', 'Unknown error')}")
                        if len(sample_errors) > 3:
                            print(f"      - ... and {len(sample_errors) - 3} more errors")

                elif status == 'COMPLETED':
                    inserted_rows = result.get('inserted_rows', 0)
                    print(f"   ✅ Task completed successfully, {inserted_rows} rows inserted")

                return status
            else:
                return 'error'
        except requests.RequestException:
            return 'error'

    def wait_for_task_completion(self, task_id: str, field_name: str, polling_interval: int = 5) -> str:
        """Wait for task completion by polling status endpoint."""
        print(f"   ⏳ Waiting for task completion... (Press Ctrl+C to stop)")
        start_time = time.time()

        while True:
            status = self.check_task_status(task_id)

            if status in ['COMPLETED', 'COMPLETED_WITH_ERRORS']:
                elapsed = int(time.time() - start_time)
                print(f"   ✅ Task completed after {elapsed}s")
                return status
            elif status == 'error':
                print(f"   ❌ Error checking task status")
                return 'error'
            elif status == 'IN_PROGRESS':
                elapsed = int(time.time() - start_time)
                print(f"   ⏳ Task in progress... (elapsed: {elapsed}s)")
            else:
                elapsed = int(time.time() - start_time)
                print(f"   📋 Task status: {status} (elapsed: {elapsed}s)")

            time.sleep(polling_interval)

    def cleanup_temp_files(self):
        """Remove temporary files created during processing."""
        for temp_file in self._temp_files:
            try:
                if os.path.exists(temp_file):
                    os.remove(temp_file)
            except OSError:
                pass

    def process_workflow(self, folder_path: str, user_fields: Optional[Dict[str, str]] = None):
        """Main workflow processing function."""
        try:
            print("🚀 Starting Folder Metadata Upload Workflow")
            print(f"📁 Folder: {folder_path}")

            # Step 1: Export dataset to get media_id mappings
            filename_to_media_id = self.export_dataset()
            if not filename_to_media_id:
                raise Exception("Failed to export dataset")

            # Step 2: Scan folder for images + metadata
            pairs = self.scan_folder(folder_path)
            if not pairs:
                raise Exception("No image + metadata pairs found in folder")

            # Step 3: Load all metadata files
            metadata_by_file = self.load_metadata_files(pairs)
            if not metadata_by_file:
                raise Exception("Failed to load metadata files")

            # Step 4: Determine fields to process
            if user_fields:
                # User-specified fields - validate them
                print(f"🎯 User-specified fields: {len(user_fields)}")
                validated_fields = self.validate_user_fields(user_fields, metadata_by_file)
                if not validated_fields:
                    raise Exception("No valid fields to upload")
            else:
                # Auto-detect all fields
                print("🤖 Auto-detection mode")
                validated_fields = self.discover_and_analyze_fields(metadata_by_file)
                if not validated_fields:
                    raise Exception("No fields discovered")

            print(f"\n🎯 Processing {len(validated_fields)} custom fields...")

            # Step 5: Process each field
            completed_fields = []

            for field_name, field_type in validated_fields.items():
                print(f"\n🔄 Processing field: {field_name} ({field_type})")

                try:
                    # Create custom field
                    field_id = self.create_custom_field(field_name, field_type, metadata_by_file)
                    if not field_id:
                        print(f"   ⏭️  Skipping field {field_name}")
                        continue

                    # Upload data
                    task_id = self.upload_field_data(field_id, field_name, field_type,
                                                    metadata_by_file, filename_to_media_id)
                    if not task_id:
                        print(f"   ❌ Failed to upload data for {field_name}")
                        continue

                    # Wait for completion
                    final_status = self.wait_for_task_completion(task_id, field_name)

                    if final_status in ['COMPLETED', 'COMPLETED_WITH_ERRORS']:
                        completed_fields.append(field_name)
                        status_icon = "✅" if final_status == 'COMPLETED' else "⚠️"
                        print(f"   {status_icon} Field {field_name} {final_status.lower()}")
                    else:
                        print(f"   ❌ Field {field_name} failed")

                except Exception as e:
                    print(f"   ❌ Error processing field {field_name}: {str(e)}")
                    continue

            print("\n🎉 Workflow completed!")
            print(f"✅ Successfully processed {len(completed_fields)}/{len(validated_fields)} fields")

        finally:
            self.cleanup_temp_files()


def main():
    parser = argparse.ArgumentParser(
        description='Upload metadata from folder to Visual Layer',
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""
Example usage:

  # Auto-detect all fields (recommended):
  python upload_metadata_from_folder.py \\
    --folder "example shopic/" \\
    --dataset-id abc-123 \\
    --base-url http://localhost:8080

  # Manual field specification:
  python upload_metadata_from_folder.py \\
    --folder "example shopic/" \\
    --dataset-id abc-123 \\
    --base-url http://localhost:8080 \\
    --field annotated_at datetime \\
    --field conf_classification float \\
    --field barcode string \\
    --field quality enum
        """
    )

    parser.add_argument('--folder', required=True,
                       help='Path to folder containing images and .metadata.json files')
    parser.add_argument('--dataset-id', required=True,
                       help='Visual Layer dataset ID')
    parser.add_argument('--base-url', default='http://localhost:8080',
                       help='Base URL (default: http://localhost:8080)')
    parser.add_argument('--field', action='append', nargs=2, metavar=('NAME', 'TYPE'),
                       help='Field to upload (optional, repeatable). If not specified, auto-detects all fields. TYPE: string|float|datetime|enum|multi-enum|link')

    args = parser.parse_args()

    # Parse fields if provided
    user_fields = None
    if args.field:
        user_fields = {}
        valid_types = {'string', 'float', 'datetime', 'enum', 'multi-enum', 'link'}

        for field_name, field_type in args.field:
            if field_type not in valid_types:
                print(f"❌ Error: Invalid field type '{field_type}' for field '{field_name}'")
                print(f"   Valid types: {', '.join(sorted(valid_types))}")
                sys.exit(1)
            user_fields[field_name] = field_type

    # Create processor and run workflow
    processor = FolderMetadataProcessor(args.dataset_id, args.base_url)
    processor.process_workflow(args.folder, user_fields)


if __name__ == "__main__":
    main()

Single JSON File Modification

If your metadata is in one JSON file for all images, add this method to the FolderMetadataProcessor class:
def load_single_json_metadata(self, json_file: str, folder_path: str) -> Dict[str, Dict[str, Any]]:
    """Load metadata from a single JSON file containing all image metadata."""
    print(f"📖 Loading metadata from single JSON file: {json_file}")

    with open(json_file, 'r', encoding='utf-8') as f:
        data = json.load(f)

    # Handle dictionary format: {filename: {metadata}}
    if isinstance(data, dict):
        metadata_by_file = data
    # Handle array format: [{filename: ..., field: value, ...}]
    elif isinstance(data, list):
        metadata_by_file = {}
        for item in data:
            filename = item.get('filename') or item.get('file_name')
            if filename:
                item_copy = {k: v for k, v in item.items() if k not in ['filename', 'file_name']}
                metadata_by_file[filename] = item_copy

    # Verify images exist
    folder = Path(folder_path)
    valid_metadata = {}
    for filename, metadata in metadata_by_file.items():
        if (folder / filename).exists():
            valid_metadata[filename] = metadata

    print(f"   ✅ Loaded metadata for {len(valid_metadata)} images")
    return valid_metadata
Usage: In process_workflow(), replace the scan/load section with:
# Replace this:
pairs = self.scan_folder(folder_path)
metadata_by_file = self.load_metadata_files(pairs)

# With this:
metadata_by_file = self.load_single_json_metadata('/path/to/metadata.json', folder_path)
I