Skip to main content
This page provides ready-to-use Python scripts for common Visual Layer administration tasks. Each script is designed to automate specific workflows and can be customized for your needs.
Purpose: Automate custom metadata uploads from folders containing images and individual metadata JSON files with automatic field discovery and type detection.Installation:
pip install pandas requests
Usage:
# Auto-detect all fields
python upload_metadata_from_folder.py \
  --folder /path/to/folder \
  --dataset-id your-dataset-id \
  --base-url http://localhost:2080

# Specify specific fields
python upload_metadata_from_folder.py \
  --folder /path/to/folder \
  --dataset-id your-dataset-id \
  --base-url http://localhost:2080 \
  --field annotated_at datetime \
  --field confidence float
Script Code:
#!/usr/bin/env python3
"""
Folder-based Metadata Upload for Visual Layer
Scans a folder for images and their corresponding .metadata.json files.
Automatically discovers and uploads all metadata fields with intelligent type detection.
"""

import json
import requests
import argparse
import os
import sys
import pandas as pd
import time
from typing import Dict, List, Any, Optional, Tuple
from pathlib import Path

class FolderMetadataProcessor:
    def __init__(self, dataset_id: str, base_url: str):
        self.dataset_id = dataset_id
        self.raw_base_url = base_url.rstrip('/')

        if not base_url.endswith('/api/v1/datasets'):
            if base_url.endswith('/'):
                base_url = base_url.rstrip('/')
            self.base_url = f"{base_url}/api/v1/datasets"
        else:
            self.base_url = base_url

        self.session = requests.Session()
        self._temp_files = []

    def export_dataset(self) -> Dict[str, str]:
        """Export dataset and return mapping of filename -> media_id."""
        print("📤 Exporting dataset to get media_id mappings...")
        url = f"{self.raw_base_url}/api/v1/dataset/{self.dataset_id}/export_media_id"

        try:
            response = self.session.get(url)
            if response.status_code == 200:
                import csv
                import io

                csv_content = response.text
                csv_reader = csv.DictReader(io.StringIO(csv_content))

                mapping = {}
                for row in csv_reader:
                    filename = row.get('filename', '')
                    media_id = row.get('media_id', '')

                    if media_id and filename:
                        basename = os.path.basename(filename)
                        mapping[basename] = media_id

                print(f"   ✅ Exported {len(mapping)} media items")
                return mapping
            else:
                print(f"   ❌ Failed to export dataset: {response.status_code}")
                return {}
        except Exception as e:
            print(f"   ❌ Export failed: {str(e)}")
            return {}

    def scan_folder(self, folder_path: str) -> List[Tuple[str, str]]:
        """Scan folder and return list of (image_path, metadata_path) tuples."""
        print(f"🔍 Scanning folder: {folder_path}")

        folder = Path(folder_path)
        if not folder.exists():
            raise FileNotFoundError(f"Folder not found: {folder_path}")

        pairs = []
        for metadata_file in folder.glob("*.metadata.json"):
            image_name = metadata_file.name.replace(".metadata.json", "")
            image_path = folder / image_name

            if image_path.exists():
                pairs.append((str(image_path), str(metadata_file)))

        print(f"   ✅ Found {len(pairs)} image + metadata pairs")
        return pairs

    def load_metadata_files(self, pairs: List[Tuple[str, str]]) -> Dict[str, Dict[str, Any]]:
        """Load all metadata.json files."""
        print("📖 Loading metadata files...")

        metadata_by_file = {}
        for image_path, metadata_path in pairs:
            image_filename = os.path.basename(image_path)

            try:
                with open(metadata_path, 'r', encoding='utf-8') as f:
                    metadata = json.load(f)
                    metadata_by_file[image_filename] = metadata
            except Exception as e:
                print(f"   ⚠️  Failed to load {metadata_path}: {str(e)}")

        print(f"   ✅ Loaded {len(metadata_by_file)} metadata files")
        return metadata_by_file

    def discover_and_analyze_fields(self, metadata_by_file: Dict[str, Dict[str, Any]]) -> Dict[str, str]:
        """Discover all fields and auto-detect their types."""
        print("🔍 Discovering and analyzing all fields...")

        all_fields = set()
        for metadata in metadata_by_file.values():
            all_fields.update(metadata.keys())

        print(f"   📋 Found {len(all_fields)} unique fields")

        field_types = {}
        for field_name in sorted(all_fields):
            sample_values = [metadata[field_name] for metadata in metadata_by_file.values() if field_name in metadata][:100]
            field_type = self._auto_detect_field_type(field_name, sample_values)
            field_types[field_name] = field_type
            print(f"      {field_name}: {field_type}")

        print(f"   ✅ Analyzed {len(field_types)} fields")
        return field_types

    def _auto_detect_field_type(self, field_name: str, sample_values: List[Any]) -> str:
        """Auto-detect field type based on sample values."""
        if not sample_values:
            return 'string'

        first_value = next((v for v in sample_values if v not in (None, '')), None)
        if first_value is None:
            return 'string'

        if self._is_float(first_value):
            return 'float'
        elif self._is_date(first_value):
            return 'datetime'
        elif self._is_url(first_value):
            return 'link'
        elif isinstance(first_value, list):
            unique_values = set()
            for val in sample_values:
                if isinstance(val, list):
                    unique_values.update(val)
            return 'multi-enum' if len(unique_values) <= 20 else 'string'
        else:
            unique_values = {str(v).strip() for v in sample_values if v not in (None, '') and not isinstance(v, (dict, list))}
            return 'enum' if 0 < len(unique_values) <= 100 else 'string'

    def _is_float(self, value: Any) -> bool:
        if not isinstance(value, (str, int, float)):
            return False
        try:
            return '.' in str(value) or 'e' in str(value).lower()
        except:
            return False

    def _is_date(self, value: Any) -> bool:
        if not isinstance(value, str) or not value.strip():
            return False
        try:
            pd.to_datetime(value.strip())
            return True
        except:
            return False

    def _is_url(self, value: Any) -> bool:
        return isinstance(value, str) and value.strip().lower().startswith(('http://', 'https://', 'ftp://'))

    def create_custom_field(self, field_name: str, field_type: str, metadata_by_file: Dict[str, Dict[str, Any]]) -> Optional[str]:
        """Create a custom field."""
        print(f"🔧 Creating custom field: {field_name} ({field_type})")

        field_data = {"field_name": field_name, "field_type": field_type}

        if field_type in ['enum', 'multi-enum']:
            unique_values = set()
            for metadata in metadata_by_file.values():
                if field_name in metadata:
                    value = metadata[field_name]
                    if value:
                        if field_type == 'multi-enum' and isinstance(value, list):
                            unique_values.update(str(v) for v in value)
                        else:
                            unique_values.add(str(value))

            field_data["enum_options"] = list(unique_values)[:20]
            if field_type == 'multi-enum':
                field_data["field_type"] = 'enum'
                field_data["is_multi"] = True

        url = f"{self.base_url}/{self.dataset_id}/custom_metadata/tasks"

        try:
            response = self.session.post(url, json=field_data)
            if response.status_code == 200:
                task_id = response.json().get('task_id')
                print(f"   ✅ Created field with task ID: {task_id}")
                return task_id
            elif "already exists" in response.text:
                print(f"   🔄 Field already exists")
                return None
            else:
                print(f"   ❌ Failed: {response.status_code}")
                return None
        except Exception as e:
            print(f"   ❌ Error: {str(e)}")
            return None

    def upload_field_data(self, field_id: str, field_name: str, field_type: str,
                         metadata_by_file: Dict[str, Dict[str, Any]],
                         filename_to_media_id: Dict[str, str]) -> Optional[str]:
        """Upload data for a custom field."""
        print(f"   📤 Uploading data for field: {field_name}")

        upload_data = []
        for filename, metadata in metadata_by_file.items():
            media_id = filename_to_media_id.get(filename)
            if not media_id or field_name not in metadata:
                continue

            value = metadata[field_name]
            if value in (None, ''):
                continue

            try:
                converted_value = self._convert_value(value, field_name, field_type)
                if converted_value is not None:
                    upload_data.append({"media_id": media_id, "value": converted_value})
            except Exception:
                continue

        if not upload_data:
            print(f"   ⚠️  No data to upload")
            return None

        import tempfile
        with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f:
            json.dump(upload_data, f)
            temp_file = f.name

        self._temp_files.append(temp_file)

        url = f"{self.base_url}/{self.dataset_id}/custom_metadata/tasks/{field_id}"

        try:
            with open(temp_file, 'rb') as f:
                files = {'file': (f'metadata_{field_name}.json', f, 'application/json')}
                response = self.session.post(url, files=files)

            if response.status_code in [200, 202]:
                print(f"   ✅ Upload completed")
                return field_id
            else:
                print(f"   ❌ Upload failed: {response.status_code}")
                return None
        except Exception as e:
            print(f"   ❌ Error: {str(e)}")
            return None

    def _convert_value(self, value: Any, field_name: str, field_type: str) -> Any:
        """Convert value to appropriate type."""
        try:
            if field_type == 'datetime':
                dt = pd.to_datetime(str(value))
                return dt.strftime('%Y-%m-%dT%H:%M:%SZ') if dt.tz is None else dt.tz_convert('UTC').strftime('%Y-%m-%dT%H:%M:%SZ')
            elif field_type == 'float':
                return float(value)
            elif field_type == 'multi-enum':
                return [str(v).strip() for v in value] if isinstance(value, list) else [str(value).strip()]
            elif field_type == 'enum':
                return str(value).strip()
            elif field_type == 'string':
                if isinstance(value, (dict, list)):
                    return json.dumps(value)[:255]
                return str(value).strip()[:255]
            else:
                return str(value)
        except:
            return str(value)[:255]

    def check_task_status(self, task_id: str) -> str:
        """Check task status."""
        url = f"{self.base_url}/{self.dataset_id}/custom_metadata/tasks/{task_id}/status"

        try:
            response = self.session.get(url)
            if response.status_code == 200:
                result = response.json()
                status = result.get('status', 'unknown')

                if status == 'COMPLETED':
                    print(f"   ✅ Completed, {result.get('inserted_rows', 0)} rows inserted")
                elif status == 'COMPLETED_WITH_ERRORS':
                    print(f"   ⚠️  Completed with {result.get('error_count', 0)} errors")

                return status
            return 'error'
        except:
            return 'error'

    def wait_for_task_completion(self, task_id: str, field_name: str) -> str:
        """Wait for task completion."""
        print(f"   ⏳ Waiting for completion...")
        start = time.time()

        while True:
            status = self.check_task_status(task_id)

            if status in ['COMPLETED', 'COMPLETED_WITH_ERRORS']:
                print(f"   ✅ Completed after {int(time.time() - start)}s")
                return status
            elif status == 'error':
                return 'error'

            time.sleep(5)

    def cleanup_temp_files(self):
        """Remove temporary files."""
        for temp_file in self._temp_files:
            try:
                if os.path.exists(temp_file):
                    os.remove(temp_file)
            except:
                pass

    def process_workflow(self, folder_path: str, user_fields: Optional[Dict[str, str]] = None):
        """Main workflow."""
        try:
            print("🚀 Starting Metadata Upload Workflow")

            filename_to_media_id = self.export_dataset()
            if not filename_to_media_id:
                raise Exception("Export failed")

            pairs = self.scan_folder(folder_path)
            if not pairs:
                raise Exception("No files found")

            metadata_by_file = self.load_metadata_files(pairs)
            if not metadata_by_file:
                raise Exception("Failed to load metadata")

            if user_fields:
                validated_fields = user_fields
            else:
                validated_fields = self.discover_and_analyze_fields(metadata_by_file)

            print(f"\n🎯 Processing {len(validated_fields)} fields...")

            completed = []
            for field_name, field_type in validated_fields.items():
                print(f"\n🔄 Processing: {field_name} ({field_type})")

                try:
                    field_id = self.create_custom_field(field_name, field_type, metadata_by_file)
                    if not field_id:
                        continue

                    task_id = self.upload_field_data(field_id, field_name, field_type,
                                                    metadata_by_file, filename_to_media_id)
                    if not task_id:
                        continue

                    status = self.wait_for_task_completion(task_id, field_name)
                    if status in ['COMPLETED', 'COMPLETED_WITH_ERRORS']:
                        completed.append(field_name)
                except Exception as e:
                    print(f"   ❌ Error: {str(e)}")

            print(f"\n🎉 Completed {len(completed)}/{len(validated_fields)} fields")

        finally:
            self.cleanup_temp_files()

def main():
    parser = argparse.ArgumentParser(description='Upload metadata from folder')
    parser.add_argument('--folder', required=True, help='Folder path')
    parser.add_argument('--dataset-id', required=True, help='Dataset ID')
    parser.add_argument('--base-url', default='http://localhost:2080', help='Base URL')
    parser.add_argument('--field', action='append', nargs=2, metavar=('NAME', 'TYPE'),
                       help='Field specification (optional)')

    args = parser.parse_args()

    user_fields = None
    if args.field:
        user_fields = {}
        valid_types = {'string', 'float', 'datetime', 'enum', 'multi-enum', 'link'}
        for name, type in args.field:
            if type not in valid_types:
                print(f"❌ Invalid type: {type}")
                sys.exit(1)
            user_fields[name] = type

    processor = FolderMetadataProcessor(args.dataset_id, args.base_url)
    processor.process_workflow(args.folder, user_fields)

if __name__ == "__main__":
    main()
Purpose: Extract caption data from Visual Layer pipeline runs to reuse for subsequent dataset creations.Installation:
pip install pandas pyarrow
Usage:
# Basic usage
python process_annotations.py /.vl/tmp/[dataset-id]/input/metadata/image_annotations.parquet

# Specify output
python process_annotations.py input.parquet -o /path/to/output.parquet

# Custom prefix
python process_annotations.py input.parquet --prefix /custom/prefix
Script Code:
#!/usr/bin/env python3
"""
Process parquet annotation files to extract filename and caption columns.
Removes path prefixes from filenames.
"""

import argparse
import sys
from pathlib import Path
import pandas as pd

def process_parquet(input_path, output_path=None, prefix_to_remove='/hostfs'):
    """Process parquet file to extract filename and caption columns."""
    input_file = Path(input_path)
    if not input_file.exists():
        raise FileNotFoundError(f"Input file not found: {input_path}")

    if not input_file.suffix == '.parquet':
        raise ValueError(f"Must be parquet file, got: {input_file.suffix}")

    if output_path is None:
        output_file = input_file.parent / f"{input_file.stem}_processed.parquet"
    else:
        output_file = Path(output_path)

    print(f"Reading: {input_file}")

    try:
        df = pd.read_parquet(input_file)
    except Exception as e:
        raise RuntimeError(f"Failed to read parquet: {e}")

    print(f"Original shape: {df.shape}")
    print(f"Columns: {df.columns.tolist()}")

    required = ['filename', 'caption']
    missing = [col for col in required if col not in df.columns]
    if missing:
        raise ValueError(f"Missing columns: {missing}")

    df = df[['filename', 'caption']]

    print(f"Removing prefix '{prefix_to_remove}'...")
    df['filename'] = df['filename'].apply(
        lambda x: x.replace(prefix_to_remove, '', 1)
        if isinstance(x, str) and x.startswith(prefix_to_remove)
        else x
    )

    print(f"\nProcessed shape: {df.shape}")
    print(f"\nSample filenames:")
    print(df['filename'].head(3).tolist())

    print(f"\nSaving to: {output_file}")
    df.to_parquet(output_file, index=False)

    print(f"✓ Processed {len(df)} rows")
    print(f"✓ Saved to: {output_file}")

    return output_file

def main():
    parser = argparse.ArgumentParser(
        description='Process parquet files to extract filename and caption'
    )

    parser.add_argument('input', help='Input parquet file')
    parser.add_argument('-o', '--output', help='Output path (optional)')
    parser.add_argument('--prefix', default='/hostfs', help='Prefix to remove')

    args = parser.parse_args()

    try:
        process_parquet(args.input, args.output, args.prefix)
        return 0
    except Exception as e:
        print(f"Error: {e}", file=sys.stderr)
        return 1

if __name__ == '__main__':
    sys.exit(main())
Purpose: Convert DICOM medical imaging files to JPG format with comprehensive metadata extraction.Installation:
pip install pandas pydicom dicom2jpg opencv-python numpy
Usage:
# Basic usage
python dicom_converter.py /path/to/dicom/files

# Custom output
python dicom_converter.py /path/to/dicom/files --output /custom/output

# Verbose logging
python dicom_converter.py /path/to/dicom/files --verbose
Script Code:This script converts DICOM files to JPG images and extracts metadata. Due to length, the complete code includes:
  • DICOM metadata extraction (patient info, study data, equipment details)
  • Multiframe DICOM handling
  • Single-frame DICOM conversion
  • Windowing and rescaling
  • CSV metadata output
The full script handles both single-frame and multiframe DICOM files, applies proper windowing and rescaling transformations, and extracts comprehensive medical imaging metadata including patient information, study details, equipment specifications, and image acquisition parameters.
Purpose: Upload DICOM metadata from CSV to Visual Layer with automatic field type detection.Installation:
pip install pandas requests pyjwt
Usage:
# Cloud installation
python upload_csv_with_json_mapping.py dicom_metadata.csv metadata.json \
  --dataset-id=your-dataset-id \
  --base-url=https://app.visual-layer.com \
  --api-key=your-api-key \
  --api-secret=your-api-secret

# On-premises installation
python upload_csv_with_json_mapping.py dicom_metadata.csv metadata.json \
  --dataset-id=your-dataset-id \
  --base-url=http://localhost:2080 \
  --api-key=your-api-key
Script Code:This script uploads DICOM metadata from CSV files generated by the DICOM converter. Features include:
  • Automatic DICOM date/time format detection
  • Field type categorization (string, float, datetime, enum)
  • Multi-value field handling
  • JWT authentication support
  • Resume capability for large uploads
The complete script handles DICOM-specific date formats (YYYYMMDD) and time formats (HHMMSS), intelligently detects field types, and manages the complete upload workflow with progress tracking.