DICOM Metadata Upload to Visual Layer
This page contains the complete, ready-to-use Python script for uploading extracted DICOM metadata to Visual Layer datasets with automatic field type detection and DICOM-specific handling.
Back to DICOM Converter Documentation
Return to the main DICOM converter guide for the complete workflow, setup instructions, and usage examples.
Installation Requirements
Before using this script, install the required Python packages:Copy
Ask AI
pip install pandas requests pyjwt
Usage
Save this script asupload_csv_with_json_mapping.py and run:
Copy
Ask AI
# For cloud installations
python upload_csv_with_json_mapping.py dicom_metadata.csv metadata.json \
--dataset-id=your-dataset-id \
--base-url=https://app.visual-layer.com \
--api-key=your-api-key \
--api-secret=your-api-secret
# For on-premises installations
python upload_csv_with_json_mapping.py dicom_metadata.csv metadata.json \
--dataset-id=your-dataset-id \
--base-url=http://localhost:2080 \
--api-key=your-api-key
Complete Script Code
Copy
Ask AI
#!/usr/bin/env python3
"""
CSV Metadata with JSON Mapping Workflow for Visual Layer
Uploads DICOM metadata from dicom_metadata.csv to Visual Layer.
Uses metadata.json for file path mapping to Visual Layer media IDs.
"""
import csv
import json
import requests
import ast
import argparse
import os
import sys
import pandas as pd
import re
import time
import jwt
from typing import Dict, List, Any, Optional
from datetime import datetime, timezone, timedelta
class CsvJsonMetadataProcessor:
def __init__(self, dataset_id: str, base_url: str, api_key: str = None, api_secret: str = None, enum_fields: list = None):
self.dataset_id = dataset_id
# Store raw base URL for backward compatibility
self.raw_base_url = base_url.rstrip('/')
# Automatically add /api/v1/datasets if not present
if not base_url.endswith('/api/v1/datasets'):
if base_url.endswith('/'):
base_url = base_url.rstrip('/')
self.base_url = f"{base_url}/api/v1/datasets"
else:
self.base_url = base_url
self.session = requests.Session()
self._temp_files = [] # Track temporary files for cleanup
self.enum_fields = enum_fields or [] # Store user-specified enum fields
# Add authentication headers if provided
if api_key and api_secret:
# Generate JWT token as per Visual Layer documentation
jwt_token = self._generate_jwt(api_key, api_secret)
self.session.headers.update({
'Authorization': f'Bearer {jwt_token}'
})
def _generate_jwt(self, api_key: str, api_secret: str) -> str:
"""Generate JWT token for Visual Layer API authentication."""
jwt_algorithm = "HS256"
jwt_header = {
'alg': jwt_algorithm,
'typ': 'JWT',
'kid': api_key,
}
now = datetime.now(tz=timezone.utc)
expiration = now + timedelta(minutes=10)
payload = {
'sub': api_key,
'iat': int(now.timestamp()),
'exp': int(expiration.timestamp()),
'iss': 'sdk'
}
return jwt.encode(payload=payload, key=api_secret, algorithm=jwt_algorithm, headers=jwt_header)
def extract_image_id_from_filename(self, filename: str) -> str:
"""Extract unique identifier from DICOM filename.
Examples:
- series-00000_image-00000.jpg -> series-00000_image-00000
- series-00001_image-00005.jpg -> series-00001_image-00005
"""
# Remove directory path first
basename = os.path.basename(filename)
# Remove file extension
name_without_ext = os.path.splitext(basename)[0]
return name_without_ext
def read_csv(self, csv_file: str) -> List[Dict[str, Any]]:
"""Read CSV file and return list of records."""
if not os.path.exists(csv_file):
raise FileNotFoundError(f"File not found: {csv_file}")
with open(csv_file, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
return list(reader)
def read_metadata_json(self, json_file: str) -> List[Dict[str, Any]]:
"""Read metadata JSON file from Visual Layer containing filename->media_id mapping."""
if not os.path.exists(json_file):
raise FileNotFoundError(f"Metadata JSON file not found: {json_file}")
with open(json_file, 'r', encoding='utf-8') as f:
data = json.load(f)
# Extract media_items array
media_items = data.get('media_items', [])
# Convert to the expected format with media_id and filename fields
export_items = []
for item in media_items:
export_items.append({
'media_id': item.get('media_id'),
'filename': item.get('file_name') # Use file_name from JSON
})
return export_items
def create_image_id_mapping(self, export_items: List[Dict[str, Any]]) -> Dict[str, List[str]]:
"""Create mapping from image_id to list of media_ids using export CSV data."""
mapping = {}
print("🔍 Creating image_id to media_id mapping from export CSV...")
for item in export_items:
media_id = item.get('media_id')
file_name = item.get('filename', '')
if media_id and file_name:
image_id = self.extract_image_id_from_filename(file_name)
if image_id:
# Add to list instead of overwriting
if image_id not in mapping:
mapping[image_id] = []
mapping[image_id].append(media_id)
# Show statistics about duplicates
total_files = sum(len(media_ids) for media_ids in mapping.values())
duplicate_count = sum(1 for media_ids in mapping.values() if len(media_ids) > 1)
print(f" ✅ Created mapping for {len(mapping)} unique image IDs covering {total_files} files")
if duplicate_count > 0:
print(f" 📊 Found {duplicate_count} image IDs with multiple files")
return mapping
def analyze_csv_fields(self, csv_data: List[Dict[str, Any]]) -> Dict[str, str]:
"""Analyze CSV fields and determine their types."""
if not csv_data:
return {}
field_types = {}
sample_row = csv_data[0]
# Skip these fields as they're not suitable for custom metadata
skip_fields = {'image_filename', 'original_dicom_path', 'series_folder', 'conversion_timestamp'}
# Fields that contain numeric arrays - should be treated as strings
numeric_array_fields = {
'image_position_patient', 'image_orientation_patient', 'pixel_spacing'
}
for field_name, value in sample_row.items():
if field_name in skip_fields:
continue
# Check if user specified this field as enum
if field_name in self.enum_fields:
field_types[field_name] = 'enum'
# Handle numeric array fields as strings
elif field_name in numeric_array_fields:
field_types[field_name] = 'string'
# Handle image_type as multi-enum (contains text arrays)
elif field_name == 'image_type':
field_types[field_name] = 'multi-enum'
# Check for DICOM date fields (YYYYMMDD format)
elif self._is_dicom_date(field_name, value):
field_types[field_name] = 'datetime'
# Check for DICOM time fields (HHMMSS format)
elif self._is_dicom_time(field_name, value):
field_types[field_name] = 'datetime' # Convert times to datetime
# Determine field type based on value patterns
elif self._is_float(value):
field_types[field_name] = 'float'
elif self._is_date(value):
field_types[field_name] = 'datetime'
elif self._is_list(value):
# Check if it's multi-enum or single enum
unique_values = set()
for row in csv_data:
list_items = self._parse_list_value(row[field_name])
unique_values.update(list_items)
if len(unique_values) <= 20: # API enum limit
field_types[field_name] = 'multi-enum'
else:
field_types[field_name] = 'string'
else:
# For DICOM data, treat most fields as string unless specified as enum
field_types[field_name] = 'string'
return field_types
def _is_dicom_date(self, field_name: str, value: str) -> bool:
"""Check if field is a DICOM date field (YYYYMMDD format)."""
date_field_names = ['study_date', 'series_date', 'acquisition_date', 'instance_creation_date', 'patient_birth_date']
if field_name not in date_field_names:
return False
if not isinstance(value, str) or len(value.strip()) != 8:
return False
try:
# Try to parse as YYYYMMDD
datetime.strptime(value.strip(), '%Y%m%d')
return True
except (ValueError, TypeError):
return False
def _is_dicom_time(self, field_name: str, value: str) -> bool:
"""Check if field is a DICOM time field (HHMMSS format)."""
time_field_names = ['study_time', 'series_time', 'acquisition_time', 'instance_creation_time']
if field_name not in time_field_names:
return False
if not isinstance(value, str) or len(value.strip()) != 6:
return False
try:
# Try to parse as HHMMSS
datetime.strptime(value.strip(), '%H%M%S')
return True
except (ValueError, TypeError):
return False
def _is_float(self, value: str) -> bool:
"""Check if value can be converted to float."""
if not isinstance(value, str) or not value.strip():
return False
try:
float(value)
return '.' in value or 'e' in value.lower()
except (ValueError, TypeError):
return False
def _is_date(self, value: str) -> bool:
"""Check if value looks like a date using pandas parsing."""
if not isinstance(value, str) or not value.strip():
return False
try:
# Use pandas to parse the date - it's very flexible with formats
pd.to_datetime(value.strip())
return True
except (ValueError, TypeError, pd.errors.ParserError):
return False
def _is_list(self, value: str) -> bool:
"""Check if value looks like a list."""
if not isinstance(value, str):
return False
value = value.strip()
# Check for DICOM image_type format: "['ORIGINAL', 'PRIMARY', 'LOCALIZER']"
if value.startswith('["[') or value.startswith("'["):
return True
# Check for CSV-style quoted lists: "[""item1"",""item2""]"
if value.startswith('["') and value.endswith('"]'):
return True
# Check for standard list formats
if (value.startswith('[') and value.endswith(']')) or ',' in value:
try:
ast.literal_eval(value)
return True
except (ValueError, SyntaxError):
return ',' in value
return False
def _parse_list_value(self, value: str) -> List[str]:
"""Parse list value from CSV, handling DICOM image_type format."""
if not value:
return []
value = value.strip()
# Handle DICOM image_type format: "['ORIGINAL', 'PRIMARY', 'LOCALIZER']"
if value.startswith('"[') and value.endswith(']"'):
# Remove outer quotes
inner_value = value[1:-1] # Remove outer quotes
try:
return ast.literal_eval(inner_value)
except:
pass
# Remove outer quotes if present (from CSV)
if value.startswith('"') and value.endswith('"'):
value = value[1:-1]
try:
# Handle double-quoted strings in CSV
cleaned = value.replace('""', '"')
return ast.literal_eval(cleaned)
except:
# Fallback: split by comma if it looks like a simple list
if ',' in value:
return [item.strip().strip('"\'') for item in value.split(',')]
return [value.strip().strip('"\'')]
def create_custom_field(self, field_name: str, field_type: str, values: List[str]) -> Optional[str]:
"""Create a custom field and return field_id."""
print(f"🔧 Creating custom field: {field_name} ({field_type})")
# Prepare field data based on type
field_data = {
"field_name": field_name,
"field_type": field_type
}
# Add enum values for enum types
if field_type in ['enum', 'multi-enum'] and values:
unique_values = list(set(values)) # Remove duplicates
field_data["enum_options"] = unique_values[:20] # Limit enum values to API max
if field_type == 'multi-enum':
field_data["field_type"] = 'enum' # API only accepts 'enum'
field_data["is_multi"] = True
else:
field_data["is_multi"] = False
print(f" 📝 Adding {len(field_data['enum_options'])} enum values")
url = f"{self.base_url}/{self.dataset_id}/custom_metadata/tasks"
try:
response = self.session.post(url, json=field_data, headers={'Content-Type': 'application/json'})
if response.status_code == 200:
result = response.json()
task_id = result.get('task_id')
print(f" ✅ Created field with task ID: {task_id}")
return task_id
elif "already exists" in response.text:
print(f" 🔄 Field already exists, skipping creation")
return None # Skip upload for existing fields
else:
print(f" ❌ Failed to create field: {response.status_code} - {response.text}")
return None
except requests.RequestException as e:
print(f" ❌ Request failed: {str(e)}")
return None
def upload_field_data(self, field_id: str, csv_data: List[Dict[str, Any]],
field_name: str, field_type: str, image_id_mapping: Dict[str, List[str]]) -> Optional[str]:
"""Upload data for a custom field using image_filename mapping."""
print(f" 📤 Uploading data for field: {field_name}")
# Prepare upload data using image_filename mapping
upload_data = []
matched_count = 0
for row in csv_data:
image_filename = row.get('image_filename', '').strip()
if not image_filename:
continue
# Extract image_id from filename
image_id = self.extract_image_id_from_filename(image_filename)
# Look up media_ids using image_id
media_ids = image_id_mapping.get(image_id, [])
if not media_ids:
continue # Skip if no matching media_ids found
value = row.get(field_name, '')
if value: # Only upload non-empty values
# Convert value based on field type
if field_type == 'float':
try:
value = float(value)
except (ValueError, TypeError):
continue
elif field_type == 'datetime':
# Handle DICOM dates and times
try:
if self._is_dicom_date(field_name, value):
# Parse DICOM date format YYYYMMDD
dt = datetime.strptime(value.strip(), '%Y%m%d')
value = dt.strftime('%Y-%m-%dT00:00:00Z')
elif self._is_dicom_time(field_name, value):
# Parse DICOM time format HHMMSS and convert to today's date + time
time_str = value.strip()
if len(time_str) == 6:
# Create datetime with today's date + DICOM time
from datetime import date
today = date.today()
dt = datetime.strptime(f"{today.strftime('%Y%m%d')}{time_str}", '%Y%m%d%H%M%S')
value = dt.strftime('%Y-%m-%dT%H:%M:%SZ')
else:
# Invalid time format, skip
continue
else:
# Use pandas to parse and format datetime to ISO 8601 UTC
dt = pd.to_datetime(value.strip())
# Convert to UTC and format as ISO 8601
if dt.tz is None:
# If no timezone, assume UTC
value = dt.strftime('%Y-%m-%dT%H:%M:%SZ')
else:
# Convert to UTC and format
value = dt.tz_convert('UTC').strftime('%Y-%m-%dT%H:%M:%SZ')
except (ValueError, TypeError, pd.errors.ParserError):
# If parsing fails, skip this value
continue
elif field_type == 'multi-enum':
# Parse list for multi-enum - ensure all values are strings
try:
parsed_list = self._parse_list_value(value)
value = [str(item).strip() for item in parsed_list]
except:
value = [str(value).strip()] if value else []
elif field_type == 'enum':
# Single enum - ensure string
value = str(value).strip()
elif field_type == 'string':
# Ensure string type
value = str(value).strip()
if value != '' and value is not None:
# Create upload entry for each media_id
for media_id in media_ids:
upload_data.append({
"media_id": media_id,
"value": value
})
matched_count += 1
total_files = len(upload_data)
unique_items = len([row for row in csv_data if row.get('image_filename', '').strip() and image_id_mapping.get(self.extract_image_id_from_filename(row.get('image_filename', '').strip()))])
print(f" 📊 Processed {unique_items} image files from {len(csv_data)} CSV rows")
print(f" 📁 Created {total_files} file entries for upload")
if not upload_data:
print(f" ⚠️ No data to upload for field {field_name}")
return None
# Save metadata to temporary file
import tempfile
with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f:
json.dump(upload_data, f, indent=2)
temp_file = f.name
# Upload file to existing task
url = f"{self.base_url}/{self.dataset_id}/custom_metadata/tasks/{field_id}"
try:
with open(temp_file, 'rb') as f:
files = {'file': (f'metadata_{field_name}.json', f, 'application/json')}
response = self.session.post(url, files=files)
# Clean up temp file
os.unlink(temp_file)
if response.status_code in [200, 202]:
print(f" ✅ Upload completed successfully")
return field_id
else:
print(f" ❌ Failed to upload data: {response.status_code} - {response.text}")
return None
except requests.RequestException as e:
print(f" ❌ Request failed: {str(e)}")
return None
def check_task_status(self, task_id: str) -> str:
"""Check the status of an upload task."""
url = f"{self.base_url}/{self.dataset_id}/custom_metadata/tasks/{task_id}/status"
try:
response = self.session.get(url)
if response.status_code == 200:
result = response.json()
status = result.get('status', 'unknown')
# Check for errors and provide detailed information
if status == 'COMPLETED_WITH_ERRORS':
error_count = result.get('error_count', 0)
inserted_rows = result.get('inserted_rows', 0)
print(f" ⚠️ Task completed with {error_count} errors, {inserted_rows} rows inserted")
# Show sample errors if available
sample_errors = result.get('sample_errors', [])
if sample_errors:
print(f" 📋 Sample errors:")
for error in sample_errors[:3]: # Show first 3 errors
print(f" - Row {error.get('row_index', '?')}: {error.get('reason', 'Unknown error')}")
if len(sample_errors) > 3:
print(f" - ... and {len(sample_errors) - 3} more errors")
elif status == 'COMPLETED':
inserted_rows = result.get('inserted_rows', 0)
print(f" ✅ Task completed successfully, {inserted_rows} rows inserted")
return status
else:
return 'error'
except requests.RequestException:
return 'error'
def wait_for_task_completion(self, task_id: str, field_name: str, polling_interval: int = 5) -> str:
"""Wait for task completion by polling status endpoint indefinitely."""
print(f" ⏳ Waiting for task completion... (Press Ctrl+C to stop)")
start_time = time.time()
while True:
status = self.check_task_status(task_id)
if status in ['COMPLETED', 'COMPLETED_WITH_ERRORS']:
elapsed = int(time.time() - start_time)
print(f" ✅ Task completed after {elapsed}s")
return status
elif status == 'error':
print(f" ❌ Error checking task status")
return 'error'
elif status == 'IN_PROGRESS':
elapsed = int(time.time() - start_time)
print(f" ⏳ Task in progress... (elapsed: {elapsed}s)")
else:
elapsed = int(time.time() - start_time)
print(f" 📋 Task status: {status} (elapsed: {elapsed}s)")
# Wait before next check
time.sleep(polling_interval)
def save_progress(self, task_ids: Dict[str, Dict[str, str]], completed_fields: List[str]):
"""Save progress to a JSON file."""
progress = {
'task_ids': task_ids,
'completed_fields': completed_fields
}
with open('csv_json_workflow_progress.json', 'w') as f:
json.dump(progress, f, indent=2)
def load_progress(self) -> tuple:
"""Load progress from JSON file."""
try:
with open('csv_json_workflow_progress.json', 'r') as f:
progress = json.load(f)
return progress.get('task_ids', {}), progress.get('completed_fields', [])
except FileNotFoundError:
return {}, []
def cleanup_temp_files(self):
"""Remove temporary files created during processing."""
for temp_file in self._temp_files:
try:
if os.path.exists(temp_file):
os.remove(temp_file)
print(f" 🧹 Cleaned up temporary file: {temp_file}")
except OSError as e:
print(f" ⚠️ Could not remove {temp_file}: {e}")
def process_workflow(self, csv_file: str, json_file: str, resume: bool = False):
"""Main workflow processing function using CSV for metadata and JSON for mapping."""
try:
print("🚀 Starting CSV Metadata Workflow")
print(f"📁 CSV metadata: {csv_file}")
print(f"🗂️ JSON metadata: {json_file}")
# Load progress if resuming
task_ids = {}
completed_fields = []
if resume:
task_ids, completed_fields = self.load_progress()
print(f"📋 Resuming workflow - {len(completed_fields)} fields already completed")
# Read CSV data (contains DICOM metadata with image_filename)
print("\n📊 Reading CSV metadata...")
csv_data = self.read_csv(csv_file)
print(f" ✅ Loaded {len(csv_data)} CSV records")
# Verify image_filename field exists
if not csv_data or 'image_filename' not in csv_data[0]:
raise ValueError("CSV must contain 'image_filename' field")
# Read JSON metadata for filename->media_id mapping
print("\n🗂️ Reading JSON metadata...")
export_items = self.read_metadata_json(json_file)
print(f" ✅ Loaded {len(export_items)} media items")
# Create image_id mapping from export CSV
image_id_mapping = self.create_image_id_mapping(export_items)
if not image_id_mapping:
raise ValueError("Could not create image_id mapping - check export CSV format")
# Analyze fields
print("\n🔍 Analyzing CSV fields...")
field_types = self.analyze_csv_fields(csv_data)
print(" 📋 Field analysis:")
for field_name, field_type in field_types.items():
status = "✅ Completed" if field_name in completed_fields else "⏳ Pending"
print(f" {field_name}: {field_type} - {status}")
print(f"\n🎯 Processing {len(field_types)} custom fields...")
# Process each field
for field_name, field_type in field_types.items():
if field_name in completed_fields:
print(f"\n⏭️ Skipping completed field: {field_name}")
continue
print(f"\n🔄 Processing field: {field_name} ({field_type})")
try:
# Collect unique values for enum types
values = []
if field_type in ['enum', 'multi-enum']:
for row in csv_data:
if field_type == 'multi-enum':
values.extend(self._parse_list_value(row[field_name]))
else:
val = row[field_name]
if val:
values.append(str(val))
# Create custom field
field_id = self.create_custom_field(field_name, field_type, values)
if not field_id:
print(f" ⏭️ Skipping field {field_name} (already exists or failed to create)")
continue
# Upload data
task_id = self.upload_field_data(field_id, csv_data, field_name, field_type, image_id_mapping)
if not task_id:
raise Exception(f"Failed to upload data for {field_name}")
# Store task info
task_ids[field_name] = {
'field_id': field_id,
'task_id': task_id
}
# Wait for task completion before moving to next field
print(f" 📤 Upload completed, monitoring task status...")
final_status = self.wait_for_task_completion(task_id, field_name)
if final_status in ['COMPLETED', 'COMPLETED_WITH_ERRORS']:
completed_fields.append(field_name)
status_icon = "✅" if final_status == 'COMPLETED' else "⚠️"
print(f" {status_icon} Field {field_name} {final_status.lower()}")
else:
print(f" ❌ Field {field_name} failed or timed out (status: {final_status})")
# Continue with next field even if this one failed
except Exception as e:
print(f" ❌ Error processing field {field_name}: {str(e)}")
print(f" ⏭️ Skipping field and continuing with next field...")
self.save_progress(task_ids, completed_fields)
continue
print()
print("🎉 Workflow completed!")
print(f"✅ Successfully processed {len(completed_fields)} fields")
# Show task summary
if task_ids:
print("\n📋 Task Summary:")
for field_name, task_info in task_ids.items():
field_id = task_info['field_id']
task_id = task_info['task_id']
if field_name in completed_fields:
status = "✅ completed"
else:
status = "❌ failed/timeout"
print(f" {field_name}: {status}")
# Clean up progress file when workflow completes successfully
progress_file = "csv_json_workflow_progress.json"
if os.path.exists(progress_file):
try:
os.remove(progress_file)
print(f"\n🧹 Cleaned up progress file: {progress_file}")
except OSError as e:
print(f"⚠️ Could not remove progress file {progress_file}: {e}")
finally:
# Always clean up temporary files
self.cleanup_temp_files()
def main():
parser = argparse.ArgumentParser(description='CSV Metadata Workflow for Visual Layer')
parser.add_argument('csv_file', help='Path to dicom_metadata.csv file')
parser.add_argument('json_file', help='Path to metadata.json file from Visual Layer (filename->media_id mapping)')
parser.add_argument('--dataset-id', required=True, help='Visual Layer dataset ID')
parser.add_argument('--base-url', default='https://app.visual-layer.com',
help='Base URL (default: https://app.visual-layer.com)')
parser.add_argument('--api-key', required=True, help='API key for authentication')
parser.add_argument('--api-secret', help='API secret for authentication')
parser.add_argument('--enum-fields', nargs='*', default=[], help='List of column names to treat as enum fields (e.g., --enum-fields patient_sex modality manufacturer)')
parser.add_argument('--resume', action='store_true', help='Resume from previous progress')
args = parser.parse_args()
# Create processor and run workflow
processor = CsvJsonMetadataProcessor(args.dataset_id, args.base_url, args.api_key, args.api_secret, args.enum_fields)
processor.process_workflow(args.csv_file, args.json_file, args.resume)
if __name__ == "__main__":
main()