> ## Documentation Index
> Fetch the complete documentation index at: https://docs.visual-layer.com/llms.txt
> Use this file to discover all available pages before exploring further.

# Useful Scripts

> Collection of Python scripts for automating Visual Layer workflows including custom metadata uploads, DICOM conversion, and caption extraction.

This page provides ready-to-use Python scripts for common Visual Layer administration tasks. Each script is designed to automate specific workflows and can be customized for your needs.

<Accordion title="Custom Metadata Upload Script" icon="file-code-2">
  **Purpose:** Automate custom metadata uploads from folders containing images and individual metadata JSON files with automatic field discovery and type detection.

  **Installation:**

  ```bash theme={"theme":"monokai"}
  pip install pandas requests
  ```

  **Usage:**

  ```bash theme={"theme":"monokai"}
  # Auto-detect all fields
  python upload_metadata_from_folder.py \
    --folder /path/to/folder \
    --dataset-id your-dataset-id \
    --base-url http://localhost:2080

  # Specify specific fields
  python upload_metadata_from_folder.py \
    --folder /path/to/folder \
    --dataset-id your-dataset-id \
    --base-url http://localhost:2080 \
    --field annotated_at datetime \
    --field confidence float
  ```

  **Script Code:**

  ```python theme={"theme":"monokai"}
  #!/usr/bin/env python3
  """
  Folder-based Metadata Upload for Visual Layer
  Scans a folder for images and their corresponding .metadata.json files.
  Automatically discovers and uploads all metadata fields with intelligent type detection.
  """

  import json
  import requests
  import argparse
  import os
  import sys
  import pandas as pd
  import time
  from typing import Dict, List, Any, Optional, Tuple
  from pathlib import Path

  class FolderMetadataProcessor:
      def __init__(self, dataset_id: str, base_url: str):
          self.dataset_id = dataset_id
          self.raw_base_url = base_url.rstrip('/')

          if not base_url.endswith('/api/v1/datasets'):
              if base_url.endswith('/'):
                  base_url = base_url.rstrip('/')
              self.base_url = f"{base_url}/api/v1/datasets"
          else:
              self.base_url = base_url

          self.session = requests.Session()
          self._temp_files = []

      def export_dataset(self) -> Dict[str, str]:
          """Export dataset and return mapping of filename -> media_id."""
          print("📤 Exporting dataset to get media_id mappings...")
          url = f"{self.raw_base_url}/api/v1/dataset/{self.dataset_id}/export_media_id"

          try:
              response = self.session.get(url)
              if response.status_code == 200:
                  import csv
                  import io

                  csv_content = response.text
                  csv_reader = csv.DictReader(io.StringIO(csv_content))

                  mapping = {}
                  for row in csv_reader:
                      filename = row.get('filename', '')
                      media_id = row.get('media_id', '')

                      if media_id and filename:
                          basename = os.path.basename(filename)
                          mapping[basename] = media_id

                  print(f"   ✅ Exported {len(mapping)} media items")
                  return mapping
              else:
                  print(f"   ❌ Failed to export dataset: {response.status_code}")
                  return {}
          except Exception as e:
              print(f"   ❌ Export failed: {str(e)}")
              return {}

      def scan_folder(self, folder_path: str) -> List[Tuple[str, str]]:
          """Scan folder and return list of (image_path, metadata_path) tuples."""
          print(f"🔍 Scanning folder: {folder_path}")

          folder = Path(folder_path)
          if not folder.exists():
              raise FileNotFoundError(f"Folder not found: {folder_path}")

          pairs = []
          for metadata_file in folder.glob("*.metadata.json"):
              image_name = metadata_file.name.replace(".metadata.json", "")
              image_path = folder / image_name

              if image_path.exists():
                  pairs.append((str(image_path), str(metadata_file)))

          print(f"   ✅ Found {len(pairs)} image + metadata pairs")
          return pairs

      def load_metadata_files(self, pairs: List[Tuple[str, str]]) -> Dict[str, Dict[str, Any]]:
          """Load all metadata.json files."""
          print("📖 Loading metadata files...")

          metadata_by_file = {}
          for image_path, metadata_path in pairs:
              image_filename = os.path.basename(image_path)

              try:
                  with open(metadata_path, 'r', encoding='utf-8') as f:
                      metadata = json.load(f)
                      metadata_by_file[image_filename] = metadata
              except Exception as e:
                  print(f"   ⚠️  Failed to load {metadata_path}: {str(e)}")

          print(f"   ✅ Loaded {len(metadata_by_file)} metadata files")
          return metadata_by_file

      def discover_and_analyze_fields(self, metadata_by_file: Dict[str, Dict[str, Any]]) -> Dict[str, str]:
          """Discover all fields and auto-detect their types."""
          print("🔍 Discovering and analyzing all fields...")

          all_fields = set()
          for metadata in metadata_by_file.values():
              all_fields.update(metadata.keys())

          print(f"   📋 Found {len(all_fields)} unique fields")

          field_types = {}
          for field_name in sorted(all_fields):
              sample_values = [metadata[field_name] for metadata in metadata_by_file.values() if field_name in metadata][:100]
              field_type = self._auto_detect_field_type(field_name, sample_values)
              field_types[field_name] = field_type
              print(f"      {field_name}: {field_type}")

          print(f"   ✅ Analyzed {len(field_types)} fields")
          return field_types

      def _auto_detect_field_type(self, field_name: str, sample_values: List[Any]) -> str:
          """Auto-detect field type based on sample values."""
          if not sample_values:
              return 'string'

          first_value = next((v for v in sample_values if v not in (None, '')), None)
          if first_value is None:
              return 'string'

          if self._is_float(first_value):
              return 'float'
          elif self._is_date(first_value):
              return 'datetime'
          elif self._is_url(first_value):
              return 'link'
          elif isinstance(first_value, list):
              unique_values = set()
              for val in sample_values:
                  if isinstance(val, list):
                      unique_values.update(val)
              return 'multi-enum' if len(unique_values) <= 20 else 'string'
          else:
              unique_values = {str(v).strip() for v in sample_values if v not in (None, '') and not isinstance(v, (dict, list))}
              return 'enum' if 0 < len(unique_values) <= 100 else 'string'

      def _is_float(self, value: Any) -> bool:
          if not isinstance(value, (str, int, float)):
              return False
          try:
              return '.' in str(value) or 'e' in str(value).lower()
          except:
              return False

      def _is_date(self, value: Any) -> bool:
          if not isinstance(value, str) or not value.strip():
              return False
          try:
              pd.to_datetime(value.strip())
              return True
          except:
              return False

      def _is_url(self, value: Any) -> bool:
          return isinstance(value, str) and value.strip().lower().startswith(('http://', 'https://', 'ftp://'))

      def create_custom_field(self, field_name: str, field_type: str, metadata_by_file: Dict[str, Dict[str, Any]]) -> Optional[str]:
          """Create a custom field."""
          print(f"🔧 Creating custom field: {field_name} ({field_type})")

          field_data = {"field_name": field_name, "field_type": field_type}

          if field_type in ['enum', 'multi-enum']:
              unique_values = set()
              for metadata in metadata_by_file.values():
                  if field_name in metadata:
                      value = metadata[field_name]
                      if value:
                          if field_type == 'multi-enum' and isinstance(value, list):
                              unique_values.update(str(v) for v in value)
                          else:
                              unique_values.add(str(value))

              field_data["enum_options"] = list(unique_values)[:20]
              if field_type == 'multi-enum':
                  field_data["field_type"] = 'enum'
                  field_data["is_multi"] = True

          url = f"{self.base_url}/{self.dataset_id}/custom_metadata/tasks"

          try:
              response = self.session.post(url, json=field_data)
              if response.status_code == 200:
                  task_id = response.json().get('task_id')
                  print(f"   ✅ Created field with task ID: {task_id}")
                  return task_id
              elif "already exists" in response.text:
                  print(f"   🔄 Field already exists")
                  return None
              else:
                  print(f"   ❌ Failed: {response.status_code}")
                  return None
          except Exception as e:
              print(f"   ❌ Error: {str(e)}")
              return None

      def upload_field_data(self, field_id: str, field_name: str, field_type: str,
                           metadata_by_file: Dict[str, Dict[str, Any]],
                           filename_to_media_id: Dict[str, str]) -> Optional[str]:
          """Upload data for a custom field."""
          print(f"   📤 Uploading data for field: {field_name}")

          upload_data = []
          for filename, metadata in metadata_by_file.items():
              media_id = filename_to_media_id.get(filename)
              if not media_id or field_name not in metadata:
                  continue

              value = metadata[field_name]
              if value in (None, ''):
                  continue

              try:
                  converted_value = self._convert_value(value, field_name, field_type)
                  if converted_value is not None:
                      upload_data.append({"media_id": media_id, "value": converted_value})
              except Exception:
                  continue

          if not upload_data:
              print(f"   ⚠️  No data to upload")
              return None

          import tempfile
          with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f:
              json.dump(upload_data, f)
              temp_file = f.name

          self._temp_files.append(temp_file)

          url = f"{self.base_url}/{self.dataset_id}/custom_metadata/tasks/{field_id}"

          try:
              with open(temp_file, 'rb') as f:
                  files = {'file': (f'metadata_{field_name}.json', f, 'application/json')}
                  response = self.session.post(url, files=files)

              if response.status_code in [200, 202]:
                  print(f"   ✅ Upload completed")
                  return field_id
              else:
                  print(f"   ❌ Upload failed: {response.status_code}")
                  return None
          except Exception as e:
              print(f"   ❌ Error: {str(e)}")
              return None

      def _convert_value(self, value: Any, field_name: str, field_type: str) -> Any:
          """Convert value to appropriate type."""
          try:
              if field_type == 'datetime':
                  dt = pd.to_datetime(str(value))
                  return dt.strftime('%Y-%m-%dT%H:%M:%SZ') if dt.tz is None else dt.tz_convert('UTC').strftime('%Y-%m-%dT%H:%M:%SZ')
              elif field_type == 'float':
                  return float(value)
              elif field_type == 'multi-enum':
                  return [str(v).strip() for v in value] if isinstance(value, list) else [str(value).strip()]
              elif field_type == 'enum':
                  return str(value).strip()
              elif field_type == 'string':
                  if isinstance(value, (dict, list)):
                      return json.dumps(value)[:255]
                  return str(value).strip()[:255]
              else:
                  return str(value)
          except:
              return str(value)[:255]

      def check_task_status(self, task_id: str) -> str:
          """Check task status."""
          url = f"{self.base_url}/{self.dataset_id}/custom_metadata/tasks/{task_id}/status"

          try:
              response = self.session.get(url)
              if response.status_code == 200:
                  result = response.json()
                  status = result.get('status', 'unknown')

                  if status == 'COMPLETED':
                      print(f"   ✅ Completed, {result.get('inserted_rows', 0)} rows inserted")
                  elif status == 'COMPLETED_WITH_ERRORS':
                      print(f"   ⚠️  Completed with {result.get('error_count', 0)} errors")

                  return status
              return 'error'
          except:
              return 'error'

      def wait_for_task_completion(self, task_id: str, field_name: str) -> str:
          """Wait for task completion."""
          print(f"   ⏳ Waiting for completion...")
          start = time.time()

          while True:
              status = self.check_task_status(task_id)

              if status in ['COMPLETED', 'COMPLETED_WITH_ERRORS']:
                  print(f"   ✅ Completed after {int(time.time() - start)}s")
                  return status
              elif status == 'error':
                  return 'error'

              time.sleep(5)

      def cleanup_temp_files(self):
          """Remove temporary files."""
          for temp_file in self._temp_files:
              try:
                  if os.path.exists(temp_file):
                      os.remove(temp_file)
              except:
                  pass

      def process_workflow(self, folder_path: str, user_fields: Optional[Dict[str, str]] = None):
          """Main workflow."""
          try:
              print("🚀 Starting Metadata Upload Workflow")

              filename_to_media_id = self.export_dataset()
              if not filename_to_media_id:
                  raise Exception("Export failed")

              pairs = self.scan_folder(folder_path)
              if not pairs:
                  raise Exception("No files found")

              metadata_by_file = self.load_metadata_files(pairs)
              if not metadata_by_file:
                  raise Exception("Failed to load metadata")

              if user_fields:
                  validated_fields = user_fields
              else:
                  validated_fields = self.discover_and_analyze_fields(metadata_by_file)

              print(f"\n🎯 Processing {len(validated_fields)} fields...")

              completed = []
              for field_name, field_type in validated_fields.items():
                  print(f"\n🔄 Processing: {field_name} ({field_type})")

                  try:
                      field_id = self.create_custom_field(field_name, field_type, metadata_by_file)
                      if not field_id:
                          continue

                      task_id = self.upload_field_data(field_id, field_name, field_type,
                                                      metadata_by_file, filename_to_media_id)
                      if not task_id:
                          continue

                      status = self.wait_for_task_completion(task_id, field_name)
                      if status in ['COMPLETED', 'COMPLETED_WITH_ERRORS']:
                          completed.append(field_name)
                  except Exception as e:
                      print(f"   ❌ Error: {str(e)}")

              print(f"\n🎉 Completed {len(completed)}/{len(validated_fields)} fields")

          finally:
              self.cleanup_temp_files()

  def main():
      parser = argparse.ArgumentParser(description='Upload metadata from folder')
      parser.add_argument('--folder', required=True, help='Folder path')
      parser.add_argument('--dataset-id', required=True, help='Dataset ID')
      parser.add_argument('--base-url', default='http://localhost:2080', help='Base URL')
      parser.add_argument('--field', action='append', nargs=2, metavar=('NAME', 'TYPE'),
                         help='Field specification (optional)')

      args = parser.parse_args()

      user_fields = None
      if args.field:
          user_fields = {}
          valid_types = {'string', 'float', 'datetime', 'enum', 'multi-enum', 'link'}
          for name, type in args.field:
              if type not in valid_types:
                  print(f"❌ Invalid type: {type}")
                  sys.exit(1)
              user_fields[name] = type

      processor = FolderMetadataProcessor(args.dataset_id, args.base_url)
      processor.process_workflow(args.folder, user_fields)

  if __name__ == "__main__":
      main()
  ```
</Accordion>

<Accordion title="Caption Extraction Script" icon="file-code-2">
  **Purpose:** Extract caption data from Visual Layer pipeline runs to reuse for subsequent dataset creations.

  **Installation:**

  ```bash theme={"theme":"monokai"}
  pip install pandas pyarrow
  ```

  **Usage:**

  ```bash theme={"theme":"monokai"}
  # Basic usage
  python process_annotations.py /.vl/tmp/[dataset-id]/input/metadata/image_annotations.parquet

  # Specify output
  python process_annotations.py input.parquet -o /path/to/output.parquet

  # Custom prefix
  python process_annotations.py input.parquet --prefix /custom/prefix
  ```

  **Script Code:**

  ```python theme={"theme":"monokai"}
  #!/usr/bin/env python3
  """
  Process parquet annotation files to extract filename and caption columns.
  Removes path prefixes from filenames.
  """

  import argparse
  import sys
  from pathlib import Path
  import pandas as pd

  def process_parquet(input_path, output_path=None, prefix_to_remove='/hostfs'):
      """Process parquet file to extract filename and caption columns."""
      input_file = Path(input_path)
      if not input_file.exists():
          raise FileNotFoundError(f"Input file not found: {input_path}")

      if not input_file.suffix == '.parquet':
          raise ValueError(f"Must be parquet file, got: {input_file.suffix}")

      if output_path is None:
          output_file = input_file.parent / f"{input_file.stem}_processed.parquet"
      else:
          output_file = Path(output_path)

      print(f"Reading: {input_file}")

      try:
          df = pd.read_parquet(input_file)
      except Exception as e:
          raise RuntimeError(f"Failed to read parquet: {e}")

      print(f"Original shape: {df.shape}")
      print(f"Columns: {df.columns.tolist()}")

      required = ['filename', 'caption']
      missing = [col for col in required if col not in df.columns]
      if missing:
          raise ValueError(f"Missing columns: {missing}")

      df = df[['filename', 'caption']]

      print(f"Removing prefix '{prefix_to_remove}'...")
      df['filename'] = df['filename'].apply(
          lambda x: x.replace(prefix_to_remove, '', 1)
          if isinstance(x, str) and x.startswith(prefix_to_remove)
          else x
      )

      print(f"\nProcessed shape: {df.shape}")
      print(f"\nSample filenames:")
      print(df['filename'].head(3).tolist())

      print(f"\nSaving to: {output_file}")
      df.to_parquet(output_file, index=False)

      print(f"✓ Processed {len(df)} rows")
      print(f"✓ Saved to: {output_file}")

      return output_file

  def main():
      parser = argparse.ArgumentParser(
          description='Process parquet files to extract filename and caption'
      )

      parser.add_argument('input', help='Input parquet file')
      parser.add_argument('-o', '--output', help='Output path (optional)')
      parser.add_argument('--prefix', default='/hostfs', help='Prefix to remove')

      args = parser.parse_args()

      try:
          process_parquet(args.input, args.output, args.prefix)
          return 0
      except Exception as e:
          print(f"Error: {e}", file=sys.stderr)
          return 1

  if __name__ == '__main__':
      sys.exit(main())
  ```
</Accordion>

<Accordion title="DICOM Converter Script" icon="file-code-2">
  **Purpose:** Convert DICOM medical imaging files to JPG format with comprehensive metadata extraction.

  **Installation:**

  ```bash theme={"theme":"monokai"}
  pip install pandas pydicom dicom2jpg opencv-python numpy
  ```

  **Usage:**

  ```bash theme={"theme":"monokai"}
  # Basic usage
  python dicom_converter.py /path/to/dicom/files

  # Custom output
  python dicom_converter.py /path/to/dicom/files --output /custom/output

  # Verbose logging
  python dicom_converter.py /path/to/dicom/files --verbose
  ```

  **Script Code:**

  This script converts DICOM files to JPG images and extracts metadata. Due to length, the complete code includes:

  * DICOM metadata extraction (patient info, study data, equipment details)
  * Multiframe DICOM handling
  * Single-frame DICOM conversion
  * Windowing and rescaling
  * CSV metadata output

  The full script handles both single-frame and multiframe DICOM files, applies proper windowing and rescaling transformations, and extracts comprehensive medical imaging metadata including patient information, study details, equipment specifications, and image acquisition parameters.
</Accordion>

<Accordion title="DICOM Metadata Upload Script" icon="file-code-2">
  **Purpose:** Upload DICOM metadata from CSV to Visual Layer with automatic field type detection.

  **Installation:**

  ```bash theme={"theme":"monokai"}
  pip install pandas requests pyjwt
  ```

  **Usage:**

  ```bash theme={"theme":"monokai"}
  # Cloud installation
  python upload_csv_with_json_mapping.py dicom_metadata.csv metadata.json \
    --dataset-id=your-dataset-id \
    --base-url=https://app.visual-layer.com \
    --api-key=your-api-key \
    --api-secret=your-api-secret

  # On-premises installation
  python upload_csv_with_json_mapping.py dicom_metadata.csv metadata.json \
    --dataset-id=your-dataset-id \
    --base-url=http://localhost:2080 \
    --api-key=your-api-key
  ```

  **Script Code:**

  This script uploads DICOM metadata from CSV files generated by the DICOM converter. Features include:

  * Automatic DICOM date/time format detection
  * Field type categorization (string, float, datetime, enum)
  * Multi-value field handling
  * JWT authentication support
  * Resume capability for large uploads

  The complete script handles DICOM-specific date formats (YYYYMMDD) and time formats (HHMMSS), intelligently detects field types, and manages the complete upload workflow with progress tracking.
</Accordion>

## Related Resources

<CardGroup cols={2}>
  <Card title="Custom Metadata" icon="file-braces-corner" href="/docs/advanced-features/custom-metadata">
    Working with custom metadata in Visual Layer
  </Card>

  <Card title="Importing Annotations" icon="database" href="/docs/advanced-dataset-management/importing-annotations">
    Import pre-existing annotation files
  </Card>

  <Card title="User Management" icon="users" href="/docs/self-hosting/user-management">
    Manage users in self-hosted installations
  </Card>

  <Card title="Troubleshooting" icon="bug" href="/docs/self-hosting/Troubleshooting">
    Common issues and solutions
  </Card>
</CardGroup>
