Skip to main content

DICOM Metadata Upload to Visual Layer

This page contains the complete, ready-to-use Python script for uploading extracted DICOM metadata to Visual Layer datasets with automatic field type detection and DICOM-specific handling.
This script uploads DICOM metadata from the CSV file generated by the DICOM converter to Visual Layer. It automatically detects DICOM date/time formats, categorizes field types, and handles multi-value fields specific to medical imaging data.

Back to DICOM Converter Documentation

Return to the main DICOM converter guide for the complete workflow, setup instructions, and usage examples.

Installation Requirements

Before using this script, install the required Python packages:
pip install pandas requests pyjwt

Usage

Save this script as upload_csv_with_json_mapping.py and run:
# For cloud installations
python upload_csv_with_json_mapping.py dicom_metadata.csv metadata.json \
  --dataset-id=your-dataset-id \
  --base-url=https://app.visual-layer.com \
  --api-key=your-api-key \
  --api-secret=your-api-secret

# For on-premises installations
python upload_csv_with_json_mapping.py dicom_metadata.csv metadata.json \
  --dataset-id=your-dataset-id \
  --base-url=http://localhost:2080 \
  --api-key=your-api-key

Complete Script Code

#!/usr/bin/env python3
"""
CSV Metadata with JSON Mapping Workflow for Visual Layer
Uploads DICOM metadata from dicom_metadata.csv to Visual Layer.
Uses metadata.json for file path mapping to Visual Layer media IDs.
"""

import csv
import json
import requests
import ast
import argparse
import os
import sys
import pandas as pd
import re
import time
import jwt
from typing import Dict, List, Any, Optional
from datetime import datetime, timezone, timedelta

class CsvJsonMetadataProcessor:
    def __init__(self, dataset_id: str, base_url: str, api_key: str = None, api_secret: str = None, enum_fields: list = None):
        self.dataset_id = dataset_id
        # Store raw base URL for backward compatibility
        self.raw_base_url = base_url.rstrip('/')
        # Automatically add /api/v1/datasets if not present
        if not base_url.endswith('/api/v1/datasets'):
            if base_url.endswith('/'):
                base_url = base_url.rstrip('/')
            self.base_url = f"{base_url}/api/v1/datasets"
        else:
            self.base_url = base_url
        self.session = requests.Session()
        self._temp_files = []  # Track temporary files for cleanup
        self.enum_fields = enum_fields or []  # Store user-specified enum fields

        # Add authentication headers if provided
        if api_key and api_secret:
            # Generate JWT token as per Visual Layer documentation
            jwt_token = self._generate_jwt(api_key, api_secret)
            self.session.headers.update({
                'Authorization': f'Bearer {jwt_token}'
            })

    def _generate_jwt(self, api_key: str, api_secret: str) -> str:
        """Generate JWT token for Visual Layer API authentication."""
        jwt_algorithm = "HS256"
        jwt_header = {
            'alg': jwt_algorithm,
            'typ': 'JWT',
            'kid': api_key,
        }

        now = datetime.now(tz=timezone.utc)
        expiration = now + timedelta(minutes=10)

        payload = {
            'sub': api_key,
            'iat': int(now.timestamp()),
            'exp': int(expiration.timestamp()),
            'iss': 'sdk'
        }

        return jwt.encode(payload=payload, key=api_secret, algorithm=jwt_algorithm, headers=jwt_header)

    def extract_image_id_from_filename(self, filename: str) -> str:
        """Extract unique identifier from DICOM filename.

        Examples:
        - series-00000_image-00000.jpg -> series-00000_image-00000
        - series-00001_image-00005.jpg -> series-00001_image-00005
        """
        # Remove directory path first
        basename = os.path.basename(filename)

        # Remove file extension
        name_without_ext = os.path.splitext(basename)[0]

        return name_without_ext

    def read_csv(self, csv_file: str) -> List[Dict[str, Any]]:
        """Read CSV file and return list of records."""
        if not os.path.exists(csv_file):
            raise FileNotFoundError(f"File not found: {csv_file}")

        with open(csv_file, 'r', encoding='utf-8') as f:
            reader = csv.DictReader(f)
            return list(reader)


    def read_metadata_json(self, json_file: str) -> List[Dict[str, Any]]:
        """Read metadata JSON file from Visual Layer containing filename->media_id mapping."""
        if not os.path.exists(json_file):
            raise FileNotFoundError(f"Metadata JSON file not found: {json_file}")

        with open(json_file, 'r', encoding='utf-8') as f:
            data = json.load(f)

        # Extract media_items array
        media_items = data.get('media_items', [])

        # Convert to the expected format with media_id and filename fields
        export_items = []
        for item in media_items:
            export_items.append({
                'media_id': item.get('media_id'),
                'filename': item.get('file_name')  # Use file_name from JSON
            })

        return export_items

    def create_image_id_mapping(self, export_items: List[Dict[str, Any]]) -> Dict[str, List[str]]:
        """Create mapping from image_id to list of media_ids using export CSV data."""
        mapping = {}

        print("🔍 Creating image_id to media_id mapping from export CSV...")

        for item in export_items:
            media_id = item.get('media_id')
            file_name = item.get('filename', '')

            if media_id and file_name:
                image_id = self.extract_image_id_from_filename(file_name)
                if image_id:
                    # Add to list instead of overwriting
                    if image_id not in mapping:
                        mapping[image_id] = []
                    mapping[image_id].append(media_id)

        # Show statistics about duplicates
        total_files = sum(len(media_ids) for media_ids in mapping.values())
        duplicate_count = sum(1 for media_ids in mapping.values() if len(media_ids) > 1)

        print(f"   ✅ Created mapping for {len(mapping)} unique image IDs covering {total_files} files")
        if duplicate_count > 0:
            print(f"   📊 Found {duplicate_count} image IDs with multiple files")

        return mapping

    def analyze_csv_fields(self, csv_data: List[Dict[str, Any]]) -> Dict[str, str]:
        """Analyze CSV fields and determine their types."""
        if not csv_data:
            return {}

        field_types = {}
        sample_row = csv_data[0]

        # Skip these fields as they're not suitable for custom metadata
        skip_fields = {'image_filename', 'original_dicom_path', 'series_folder', 'conversion_timestamp'}

        # Fields that contain numeric arrays - should be treated as strings
        numeric_array_fields = {
            'image_position_patient', 'image_orientation_patient', 'pixel_spacing'
        }

        for field_name, value in sample_row.items():
            if field_name in skip_fields:
                continue

            # Check if user specified this field as enum
            if field_name in self.enum_fields:
                field_types[field_name] = 'enum'
            # Handle numeric array fields as strings
            elif field_name in numeric_array_fields:
                field_types[field_name] = 'string'
            # Handle image_type as multi-enum (contains text arrays)
            elif field_name == 'image_type':
                field_types[field_name] = 'multi-enum'
            # Check for DICOM date fields (YYYYMMDD format)
            elif self._is_dicom_date(field_name, value):
                field_types[field_name] = 'datetime'
            # Check for DICOM time fields (HHMMSS format)
            elif self._is_dicom_time(field_name, value):
                field_types[field_name] = 'datetime'  # Convert times to datetime
            # Determine field type based on value patterns
            elif self._is_float(value):
                field_types[field_name] = 'float'
            elif self._is_date(value):
                field_types[field_name] = 'datetime'
            elif self._is_list(value):
                # Check if it's multi-enum or single enum
                unique_values = set()
                for row in csv_data:
                    list_items = self._parse_list_value(row[field_name])
                    unique_values.update(list_items)

                if len(unique_values) <= 20:  # API enum limit
                    field_types[field_name] = 'multi-enum'
                else:
                    field_types[field_name] = 'string'
            else:
                # For DICOM data, treat most fields as string unless specified as enum
                field_types[field_name] = 'string'

        return field_types

    def _is_dicom_date(self, field_name: str, value: str) -> bool:
        """Check if field is a DICOM date field (YYYYMMDD format)."""
        date_field_names = ['study_date', 'series_date', 'acquisition_date', 'instance_creation_date', 'patient_birth_date']
        if field_name not in date_field_names:
            return False

        if not isinstance(value, str) or len(value.strip()) != 8:
            return False

        try:
            # Try to parse as YYYYMMDD
            datetime.strptime(value.strip(), '%Y%m%d')
            return True
        except (ValueError, TypeError):
            return False

    def _is_dicom_time(self, field_name: str, value: str) -> bool:
        """Check if field is a DICOM time field (HHMMSS format)."""
        time_field_names = ['study_time', 'series_time', 'acquisition_time', 'instance_creation_time']
        if field_name not in time_field_names:
            return False

        if not isinstance(value, str) or len(value.strip()) != 6:
            return False

        try:
            # Try to parse as HHMMSS
            datetime.strptime(value.strip(), '%H%M%S')
            return True
        except (ValueError, TypeError):
            return False

    def _is_float(self, value: str) -> bool:
        """Check if value can be converted to float."""
        if not isinstance(value, str) or not value.strip():
            return False
        try:
            float(value)
            return '.' in value or 'e' in value.lower()
        except (ValueError, TypeError):
            return False

    def _is_date(self, value: str) -> bool:
        """Check if value looks like a date using pandas parsing."""
        if not isinstance(value, str) or not value.strip():
            return False

        try:
            # Use pandas to parse the date - it's very flexible with formats
            pd.to_datetime(value.strip())
            return True
        except (ValueError, TypeError, pd.errors.ParserError):
            return False

    def _is_list(self, value: str) -> bool:
        """Check if value looks like a list."""
        if not isinstance(value, str):
            return False

        value = value.strip()

        # Check for DICOM image_type format: "['ORIGINAL', 'PRIMARY', 'LOCALIZER']"
        if value.startswith('["[') or value.startswith("'["):
            return True

        # Check for CSV-style quoted lists: "[""item1"",""item2""]"
        if value.startswith('["') and value.endswith('"]'):
            return True

        # Check for standard list formats
        if (value.startswith('[') and value.endswith(']')) or ',' in value:
            try:
                ast.literal_eval(value)
                return True
            except (ValueError, SyntaxError):
                return ',' in value
        return False

    def _parse_list_value(self, value: str) -> List[str]:
        """Parse list value from CSV, handling DICOM image_type format."""
        if not value:
            return []

        value = value.strip()

        # Handle DICOM image_type format: "['ORIGINAL', 'PRIMARY', 'LOCALIZER']"
        if value.startswith('"[') and value.endswith(']"'):
            # Remove outer quotes
            inner_value = value[1:-1]  # Remove outer quotes
            try:
                return ast.literal_eval(inner_value)
            except:
                pass

        # Remove outer quotes if present (from CSV)
        if value.startswith('"') and value.endswith('"'):
            value = value[1:-1]

        try:
            # Handle double-quoted strings in CSV
            cleaned = value.replace('""', '"')
            return ast.literal_eval(cleaned)
        except:
            # Fallback: split by comma if it looks like a simple list
            if ',' in value:
                return [item.strip().strip('"\'') for item in value.split(',')]
            return [value.strip().strip('"\'')]

    def create_custom_field(self, field_name: str, field_type: str, values: List[str]) -> Optional[str]:
        """Create a custom field and return field_id."""
        print(f"🔧 Creating custom field: {field_name} ({field_type})")

        # Prepare field data based on type
        field_data = {
            "field_name": field_name,
            "field_type": field_type
        }

        # Add enum values for enum types
        if field_type in ['enum', 'multi-enum'] and values:
            unique_values = list(set(values))  # Remove duplicates
            field_data["enum_options"] = unique_values[:20]  # Limit enum values to API max
            if field_type == 'multi-enum':
                field_data["field_type"] = 'enum'  # API only accepts 'enum'
                field_data["is_multi"] = True
            else:
                field_data["is_multi"] = False
            print(f"   📝 Adding {len(field_data['enum_options'])} enum values")

        url = f"{self.base_url}/{self.dataset_id}/custom_metadata/tasks"

        try:
            response = self.session.post(url, json=field_data, headers={'Content-Type': 'application/json'})
            if response.status_code == 200:
                result = response.json()
                task_id = result.get('task_id')
                print(f"   ✅ Created field with task ID: {task_id}")
                return task_id
            elif "already exists" in response.text:
                print(f"   🔄 Field already exists, skipping creation")
                return None  # Skip upload for existing fields
            else:
                print(f"   ❌ Failed to create field: {response.status_code} - {response.text}")
                return None
        except requests.RequestException as e:
            print(f"   ❌ Request failed: {str(e)}")
            return None

    def upload_field_data(self, field_id: str, csv_data: List[Dict[str, Any]],
                         field_name: str, field_type: str, image_id_mapping: Dict[str, List[str]]) -> Optional[str]:
        """Upload data for a custom field using image_filename mapping."""

        print(f"   📤 Uploading data for field: {field_name}")

        # Prepare upload data using image_filename mapping
        upload_data = []
        matched_count = 0

        for row in csv_data:
            image_filename = row.get('image_filename', '').strip()
            if not image_filename:
                continue

            # Extract image_id from filename
            image_id = self.extract_image_id_from_filename(image_filename)

            # Look up media_ids using image_id
            media_ids = image_id_mapping.get(image_id, [])
            if not media_ids:
                continue  # Skip if no matching media_ids found

            value = row.get(field_name, '')
            if value:  # Only upload non-empty values
                # Convert value based on field type
                if field_type == 'float':
                    try:
                        value = float(value)
                    except (ValueError, TypeError):
                        continue
                elif field_type == 'datetime':
                    # Handle DICOM dates and times
                    try:
                        if self._is_dicom_date(field_name, value):
                            # Parse DICOM date format YYYYMMDD
                            dt = datetime.strptime(value.strip(), '%Y%m%d')
                            value = dt.strftime('%Y-%m-%dT00:00:00Z')
                        elif self._is_dicom_time(field_name, value):
                            # Parse DICOM time format HHMMSS and convert to today's date + time
                            time_str = value.strip()
                            if len(time_str) == 6:
                                # Create datetime with today's date + DICOM time
                                from datetime import date
                                today = date.today()
                                dt = datetime.strptime(f"{today.strftime('%Y%m%d')}{time_str}", '%Y%m%d%H%M%S')
                                value = dt.strftime('%Y-%m-%dT%H:%M:%SZ')
                            else:
                                # Invalid time format, skip
                                continue
                        else:
                            # Use pandas to parse and format datetime to ISO 8601 UTC
                            dt = pd.to_datetime(value.strip())
                            # Convert to UTC and format as ISO 8601
                            if dt.tz is None:
                                # If no timezone, assume UTC
                                value = dt.strftime('%Y-%m-%dT%H:%M:%SZ')
                            else:
                                # Convert to UTC and format
                                value = dt.tz_convert('UTC').strftime('%Y-%m-%dT%H:%M:%SZ')
                    except (ValueError, TypeError, pd.errors.ParserError):
                        # If parsing fails, skip this value
                        continue
                elif field_type == 'multi-enum':
                    # Parse list for multi-enum - ensure all values are strings
                    try:
                        parsed_list = self._parse_list_value(value)
                        value = [str(item).strip() for item in parsed_list]
                    except:
                        value = [str(value).strip()] if value else []
                elif field_type == 'enum':
                    # Single enum - ensure string
                    value = str(value).strip()
                elif field_type == 'string':
                    # Ensure string type
                    value = str(value).strip()

                if value != '' and value is not None:
                    # Create upload entry for each media_id
                    for media_id in media_ids:
                        upload_data.append({
                            "media_id": media_id,
                            "value": value
                        })
                        matched_count += 1

        total_files = len(upload_data)
        unique_items = len([row for row in csv_data if row.get('image_filename', '').strip() and image_id_mapping.get(self.extract_image_id_from_filename(row.get('image_filename', '').strip()))])
        print(f"   📊 Processed {unique_items} image files from {len(csv_data)} CSV rows")
        print(f"   📁 Created {total_files} file entries for upload")

        if not upload_data:
            print(f"   ⚠️  No data to upload for field {field_name}")
            return None

        # Save metadata to temporary file
        import tempfile

        with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f:
            json.dump(upload_data, f, indent=2)
            temp_file = f.name

        # Upload file to existing task
        url = f"{self.base_url}/{self.dataset_id}/custom_metadata/tasks/{field_id}"

        try:
            with open(temp_file, 'rb') as f:
                files = {'file': (f'metadata_{field_name}.json', f, 'application/json')}
                response = self.session.post(url, files=files)

            # Clean up temp file
            os.unlink(temp_file)

            if response.status_code in [200, 202]:
                print(f"   ✅ Upload completed successfully")
                return field_id
            else:
                print(f"   ❌ Failed to upload data: {response.status_code} - {response.text}")
                return None
        except requests.RequestException as e:
            print(f"   ❌ Request failed: {str(e)}")
            return None

    def check_task_status(self, task_id: str) -> str:
        """Check the status of an upload task."""
        url = f"{self.base_url}/{self.dataset_id}/custom_metadata/tasks/{task_id}/status"

        try:
            response = self.session.get(url)
            if response.status_code == 200:
                result = response.json()
                status = result.get('status', 'unknown')

                # Check for errors and provide detailed information
                if status == 'COMPLETED_WITH_ERRORS':
                    error_count = result.get('error_count', 0)
                    inserted_rows = result.get('inserted_rows', 0)
                    print(f"   ⚠️  Task completed with {error_count} errors, {inserted_rows} rows inserted")

                    # Show sample errors if available
                    sample_errors = result.get('sample_errors', [])
                    if sample_errors:
                        print(f"   📋 Sample errors:")
                        for error in sample_errors[:3]:  # Show first 3 errors
                            print(f"      - Row {error.get('row_index', '?')}: {error.get('reason', 'Unknown error')}")
                        if len(sample_errors) > 3:
                            print(f"      - ... and {len(sample_errors) - 3} more errors")

                elif status == 'COMPLETED':
                    inserted_rows = result.get('inserted_rows', 0)
                    print(f"   ✅ Task completed successfully, {inserted_rows} rows inserted")

                return status
            else:
                return 'error'
        except requests.RequestException:
            return 'error'

    def wait_for_task_completion(self, task_id: str, field_name: str, polling_interval: int = 5) -> str:
        """Wait for task completion by polling status endpoint indefinitely."""
        print(f"   ⏳ Waiting for task completion... (Press Ctrl+C to stop)")
        start_time = time.time()

        while True:
            status = self.check_task_status(task_id)

            if status in ['COMPLETED', 'COMPLETED_WITH_ERRORS']:
                elapsed = int(time.time() - start_time)
                print(f"   ✅ Task completed after {elapsed}s")
                return status
            elif status == 'error':
                print(f"   ❌ Error checking task status")
                return 'error'
            elif status == 'IN_PROGRESS':
                elapsed = int(time.time() - start_time)
                print(f"   ⏳ Task in progress... (elapsed: {elapsed}s)")
            else:
                elapsed = int(time.time() - start_time)
                print(f"   📋 Task status: {status} (elapsed: {elapsed}s)")

            # Wait before next check
            time.sleep(polling_interval)

    def save_progress(self, task_ids: Dict[str, Dict[str, str]], completed_fields: List[str]):
        """Save progress to a JSON file."""
        progress = {
            'task_ids': task_ids,
            'completed_fields': completed_fields
        }
        with open('csv_json_workflow_progress.json', 'w') as f:
            json.dump(progress, f, indent=2)

    def load_progress(self) -> tuple:
        """Load progress from JSON file."""
        try:
            with open('csv_json_workflow_progress.json', 'r') as f:
                progress = json.load(f)
            return progress.get('task_ids', {}), progress.get('completed_fields', [])
        except FileNotFoundError:
            return {}, []

    def cleanup_temp_files(self):
        """Remove temporary files created during processing."""
        for temp_file in self._temp_files:
            try:
                if os.path.exists(temp_file):
                    os.remove(temp_file)
                    print(f"   🧹 Cleaned up temporary file: {temp_file}")
            except OSError as e:
                print(f"   ⚠️  Could not remove {temp_file}: {e}")

    def process_workflow(self, csv_file: str, json_file: str, resume: bool = False):
        """Main workflow processing function using CSV for metadata and JSON for mapping."""
        try:
            print("🚀 Starting CSV Metadata Workflow")
            print(f"📁 CSV metadata: {csv_file}")
            print(f"🗂️  JSON metadata: {json_file}")

            # Load progress if resuming
            task_ids = {}
            completed_fields = []

            if resume:
                task_ids, completed_fields = self.load_progress()
                print(f"📋 Resuming workflow - {len(completed_fields)} fields already completed")

            # Read CSV data (contains DICOM metadata with image_filename)
            print("\n📊 Reading CSV metadata...")
            csv_data = self.read_csv(csv_file)
            print(f"   ✅ Loaded {len(csv_data)} CSV records")

            # Verify image_filename field exists
            if not csv_data or 'image_filename' not in csv_data[0]:
                raise ValueError("CSV must contain 'image_filename' field")

            # Read JSON metadata for filename->media_id mapping
            print("\n🗂️  Reading JSON metadata...")
            export_items = self.read_metadata_json(json_file)
            print(f"   ✅ Loaded {len(export_items)} media items")

            # Create image_id mapping from export CSV
            image_id_mapping = self.create_image_id_mapping(export_items)
            if not image_id_mapping:
                raise ValueError("Could not create image_id mapping - check export CSV format")

            # Analyze fields
            print("\n🔍 Analyzing CSV fields...")
            field_types = self.analyze_csv_fields(csv_data)
            print("   📋 Field analysis:")
            for field_name, field_type in field_types.items():
                status = "✅ Completed" if field_name in completed_fields else "⏳ Pending"
                print(f"      {field_name}: {field_type} - {status}")

            print(f"\n🎯 Processing {len(field_types)} custom fields...")

            # Process each field
            for field_name, field_type in field_types.items():
                if field_name in completed_fields:
                    print(f"\n⏭️  Skipping completed field: {field_name}")
                    continue

                print(f"\n🔄 Processing field: {field_name} ({field_type})")

                try:
                    # Collect unique values for enum types
                    values = []
                    if field_type in ['enum', 'multi-enum']:
                        for row in csv_data:
                            if field_type == 'multi-enum':
                                values.extend(self._parse_list_value(row[field_name]))
                            else:
                                val = row[field_name]
                                if val:
                                    values.append(str(val))

                    # Create custom field
                    field_id = self.create_custom_field(field_name, field_type, values)
                    if not field_id:
                        print(f"   ⏭️  Skipping field {field_name} (already exists or failed to create)")
                        continue

                    # Upload data
                    task_id = self.upload_field_data(field_id, csv_data, field_name, field_type, image_id_mapping)
                    if not task_id:
                        raise Exception(f"Failed to upload data for {field_name}")

                    # Store task info
                    task_ids[field_name] = {
                        'field_id': field_id,
                        'task_id': task_id
                    }

                    # Wait for task completion before moving to next field
                    print(f"   📤 Upload completed, monitoring task status...")
                    final_status = self.wait_for_task_completion(task_id, field_name)

                    if final_status in ['COMPLETED', 'COMPLETED_WITH_ERRORS']:
                        completed_fields.append(field_name)
                        status_icon = "✅" if final_status == 'COMPLETED' else "⚠️"
                        print(f"   {status_icon} Field {field_name} {final_status.lower()}")
                    else:
                        print(f"   ❌ Field {field_name} failed or timed out (status: {final_status})")
                        # Continue with next field even if this one failed

                except Exception as e:
                    print(f"   ❌ Error processing field {field_name}: {str(e)}")
                    print(f"   ⏭️  Skipping field and continuing with next field...")
                    self.save_progress(task_ids, completed_fields)
                    continue

                print()

            print("🎉 Workflow completed!")
            print(f"✅ Successfully processed {len(completed_fields)} fields")

            # Show task summary
            if task_ids:
                print("\n📋 Task Summary:")
                for field_name, task_info in task_ids.items():
                    field_id = task_info['field_id']
                    task_id = task_info['task_id']

                    if field_name in completed_fields:
                        status = "✅ completed"
                    else:
                        status = "❌ failed/timeout"

                    print(f"   {field_name}: {status}")

            # Clean up progress file when workflow completes successfully
            progress_file = "csv_json_workflow_progress.json"
            if os.path.exists(progress_file):
                try:
                    os.remove(progress_file)
                    print(f"\n🧹 Cleaned up progress file: {progress_file}")
                except OSError as e:
                    print(f"⚠️  Could not remove progress file {progress_file}: {e}")

        finally:
            # Always clean up temporary files
            self.cleanup_temp_files()

def main():
    parser = argparse.ArgumentParser(description='CSV Metadata Workflow for Visual Layer')
    parser.add_argument('csv_file', help='Path to dicom_metadata.csv file')
    parser.add_argument('json_file', help='Path to metadata.json file from Visual Layer (filename->media_id mapping)')
    parser.add_argument('--dataset-id', required=True, help='Visual Layer dataset ID')
    parser.add_argument('--base-url', default='https://app.visual-layer.com',
                       help='Base URL (default: https://app.visual-layer.com)')
    parser.add_argument('--api-key', required=True, help='API key for authentication')
    parser.add_argument('--api-secret', help='API secret for authentication')
    parser.add_argument('--enum-fields', nargs='*', default=[], help='List of column names to treat as enum fields (e.g., --enum-fields patient_sex modality manufacturer)')
    parser.add_argument('--resume', action='store_true', help='Resume from previous progress')

    args = parser.parse_args()

    # Create processor and run workflow
    processor = CsvJsonMetadataProcessor(args.dataset_id, args.base_url, args.api_key, args.api_secret, args.enum_fields)
    processor.process_workflow(args.csv_file, args.json_file, args.resume)

if __name__ == "__main__":
    main()
I