Custom Metadata Upload Script
Custom Metadata Upload Script
Purpose: Automate custom metadata uploads from folders containing images and individual metadata JSON files with automatic field discovery and type detection.Installation:Usage:Script Code:
Copy
Ask AI
pip install pandas requests
Copy
Ask AI
# Auto-detect all fields
python upload_metadata_from_folder.py \
--folder /path/to/folder \
--dataset-id your-dataset-id \
--base-url http://localhost:2080
# Specify specific fields
python upload_metadata_from_folder.py \
--folder /path/to/folder \
--dataset-id your-dataset-id \
--base-url http://localhost:2080 \
--field annotated_at datetime \
--field confidence float
Copy
Ask AI
#!/usr/bin/env python3
"""
Folder-based Metadata Upload for Visual Layer
Scans a folder for images and their corresponding .metadata.json files.
Automatically discovers and uploads all metadata fields with intelligent type detection.
"""
import json
import requests
import argparse
import os
import sys
import pandas as pd
import time
from typing import Dict, List, Any, Optional, Tuple
from pathlib import Path
class FolderMetadataProcessor:
def __init__(self, dataset_id: str, base_url: str):
self.dataset_id = dataset_id
self.raw_base_url = base_url.rstrip('/')
if not base_url.endswith('/api/v1/datasets'):
if base_url.endswith('/'):
base_url = base_url.rstrip('/')
self.base_url = f"{base_url}/api/v1/datasets"
else:
self.base_url = base_url
self.session = requests.Session()
self._temp_files = []
def export_dataset(self) -> Dict[str, str]:
"""Export dataset and return mapping of filename -> media_id."""
print("📤 Exporting dataset to get media_id mappings...")
url = f"{self.raw_base_url}/api/v1/dataset/{self.dataset_id}/export_media_id"
try:
response = self.session.get(url)
if response.status_code == 200:
import csv
import io
csv_content = response.text
csv_reader = csv.DictReader(io.StringIO(csv_content))
mapping = {}
for row in csv_reader:
filename = row.get('filename', '')
media_id = row.get('media_id', '')
if media_id and filename:
basename = os.path.basename(filename)
mapping[basename] = media_id
print(f" ✅ Exported {len(mapping)} media items")
return mapping
else:
print(f" ❌ Failed to export dataset: {response.status_code}")
return {}
except Exception as e:
print(f" ❌ Export failed: {str(e)}")
return {}
def scan_folder(self, folder_path: str) -> List[Tuple[str, str]]:
"""Scan folder and return list of (image_path, metadata_path) tuples."""
print(f"🔍 Scanning folder: {folder_path}")
folder = Path(folder_path)
if not folder.exists():
raise FileNotFoundError(f"Folder not found: {folder_path}")
pairs = []
for metadata_file in folder.glob("*.metadata.json"):
image_name = metadata_file.name.replace(".metadata.json", "")
image_path = folder / image_name
if image_path.exists():
pairs.append((str(image_path), str(metadata_file)))
print(f" ✅ Found {len(pairs)} image + metadata pairs")
return pairs
def load_metadata_files(self, pairs: List[Tuple[str, str]]) -> Dict[str, Dict[str, Any]]:
"""Load all metadata.json files."""
print("📖 Loading metadata files...")
metadata_by_file = {}
for image_path, metadata_path in pairs:
image_filename = os.path.basename(image_path)
try:
with open(metadata_path, 'r', encoding='utf-8') as f:
metadata = json.load(f)
metadata_by_file[image_filename] = metadata
except Exception as e:
print(f" ⚠️ Failed to load {metadata_path}: {str(e)}")
print(f" ✅ Loaded {len(metadata_by_file)} metadata files")
return metadata_by_file
def discover_and_analyze_fields(self, metadata_by_file: Dict[str, Dict[str, Any]]) -> Dict[str, str]:
"""Discover all fields and auto-detect their types."""
print("🔍 Discovering and analyzing all fields...")
all_fields = set()
for metadata in metadata_by_file.values():
all_fields.update(metadata.keys())
print(f" 📋 Found {len(all_fields)} unique fields")
field_types = {}
for field_name in sorted(all_fields):
sample_values = [metadata[field_name] for metadata in metadata_by_file.values() if field_name in metadata][:100]
field_type = self._auto_detect_field_type(field_name, sample_values)
field_types[field_name] = field_type
print(f" {field_name}: {field_type}")
print(f" ✅ Analyzed {len(field_types)} fields")
return field_types
def _auto_detect_field_type(self, field_name: str, sample_values: List[Any]) -> str:
"""Auto-detect field type based on sample values."""
if not sample_values:
return 'string'
first_value = next((v for v in sample_values if v not in (None, '')), None)
if first_value is None:
return 'string'
if self._is_float(first_value):
return 'float'
elif self._is_date(first_value):
return 'datetime'
elif self._is_url(first_value):
return 'link'
elif isinstance(first_value, list):
unique_values = set()
for val in sample_values:
if isinstance(val, list):
unique_values.update(val)
return 'multi-enum' if len(unique_values) <= 20 else 'string'
else:
unique_values = {str(v).strip() for v in sample_values if v not in (None, '') and not isinstance(v, (dict, list))}
return 'enum' if 0 < len(unique_values) <= 100 else 'string'
def _is_float(self, value: Any) -> bool:
if not isinstance(value, (str, int, float)):
return False
try:
return '.' in str(value) or 'e' in str(value).lower()
except:
return False
def _is_date(self, value: Any) -> bool:
if not isinstance(value, str) or not value.strip():
return False
try:
pd.to_datetime(value.strip())
return True
except:
return False
def _is_url(self, value: Any) -> bool:
return isinstance(value, str) and value.strip().lower().startswith(('http://', 'https://', 'ftp://'))
def create_custom_field(self, field_name: str, field_type: str, metadata_by_file: Dict[str, Dict[str, Any]]) -> Optional[str]:
"""Create a custom field."""
print(f"🔧 Creating custom field: {field_name} ({field_type})")
field_data = {"field_name": field_name, "field_type": field_type}
if field_type in ['enum', 'multi-enum']:
unique_values = set()
for metadata in metadata_by_file.values():
if field_name in metadata:
value = metadata[field_name]
if value:
if field_type == 'multi-enum' and isinstance(value, list):
unique_values.update(str(v) for v in value)
else:
unique_values.add(str(value))
field_data["enum_options"] = list(unique_values)[:20]
if field_type == 'multi-enum':
field_data["field_type"] = 'enum'
field_data["is_multi"] = True
url = f"{self.base_url}/{self.dataset_id}/custom_metadata/tasks"
try:
response = self.session.post(url, json=field_data)
if response.status_code == 200:
task_id = response.json().get('task_id')
print(f" ✅ Created field with task ID: {task_id}")
return task_id
elif "already exists" in response.text:
print(f" 🔄 Field already exists")
return None
else:
print(f" ❌ Failed: {response.status_code}")
return None
except Exception as e:
print(f" ❌ Error: {str(e)}")
return None
def upload_field_data(self, field_id: str, field_name: str, field_type: str,
metadata_by_file: Dict[str, Dict[str, Any]],
filename_to_media_id: Dict[str, str]) -> Optional[str]:
"""Upload data for a custom field."""
print(f" 📤 Uploading data for field: {field_name}")
upload_data = []
for filename, metadata in metadata_by_file.items():
media_id = filename_to_media_id.get(filename)
if not media_id or field_name not in metadata:
continue
value = metadata[field_name]
if value in (None, ''):
continue
try:
converted_value = self._convert_value(value, field_name, field_type)
if converted_value is not None:
upload_data.append({"media_id": media_id, "value": converted_value})
except Exception:
continue
if not upload_data:
print(f" ⚠️ No data to upload")
return None
import tempfile
with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f:
json.dump(upload_data, f)
temp_file = f.name
self._temp_files.append(temp_file)
url = f"{self.base_url}/{self.dataset_id}/custom_metadata/tasks/{field_id}"
try:
with open(temp_file, 'rb') as f:
files = {'file': (f'metadata_{field_name}.json', f, 'application/json')}
response = self.session.post(url, files=files)
if response.status_code in [200, 202]:
print(f" ✅ Upload completed")
return field_id
else:
print(f" ❌ Upload failed: {response.status_code}")
return None
except Exception as e:
print(f" ❌ Error: {str(e)}")
return None
def _convert_value(self, value: Any, field_name: str, field_type: str) -> Any:
"""Convert value to appropriate type."""
try:
if field_type == 'datetime':
dt = pd.to_datetime(str(value))
return dt.strftime('%Y-%m-%dT%H:%M:%SZ') if dt.tz is None else dt.tz_convert('UTC').strftime('%Y-%m-%dT%H:%M:%SZ')
elif field_type == 'float':
return float(value)
elif field_type == 'multi-enum':
return [str(v).strip() for v in value] if isinstance(value, list) else [str(value).strip()]
elif field_type == 'enum':
return str(value).strip()
elif field_type == 'string':
if isinstance(value, (dict, list)):
return json.dumps(value)[:255]
return str(value).strip()[:255]
else:
return str(value)
except:
return str(value)[:255]
def check_task_status(self, task_id: str) -> str:
"""Check task status."""
url = f"{self.base_url}/{self.dataset_id}/custom_metadata/tasks/{task_id}/status"
try:
response = self.session.get(url)
if response.status_code == 200:
result = response.json()
status = result.get('status', 'unknown')
if status == 'COMPLETED':
print(f" ✅ Completed, {result.get('inserted_rows', 0)} rows inserted")
elif status == 'COMPLETED_WITH_ERRORS':
print(f" ⚠️ Completed with {result.get('error_count', 0)} errors")
return status
return 'error'
except:
return 'error'
def wait_for_task_completion(self, task_id: str, field_name: str) -> str:
"""Wait for task completion."""
print(f" ⏳ Waiting for completion...")
start = time.time()
while True:
status = self.check_task_status(task_id)
if status in ['COMPLETED', 'COMPLETED_WITH_ERRORS']:
print(f" ✅ Completed after {int(time.time() - start)}s")
return status
elif status == 'error':
return 'error'
time.sleep(5)
def cleanup_temp_files(self):
"""Remove temporary files."""
for temp_file in self._temp_files:
try:
if os.path.exists(temp_file):
os.remove(temp_file)
except:
pass
def process_workflow(self, folder_path: str, user_fields: Optional[Dict[str, str]] = None):
"""Main workflow."""
try:
print("🚀 Starting Metadata Upload Workflow")
filename_to_media_id = self.export_dataset()
if not filename_to_media_id:
raise Exception("Export failed")
pairs = self.scan_folder(folder_path)
if not pairs:
raise Exception("No files found")
metadata_by_file = self.load_metadata_files(pairs)
if not metadata_by_file:
raise Exception("Failed to load metadata")
if user_fields:
validated_fields = user_fields
else:
validated_fields = self.discover_and_analyze_fields(metadata_by_file)
print(f"\n🎯 Processing {len(validated_fields)} fields...")
completed = []
for field_name, field_type in validated_fields.items():
print(f"\n🔄 Processing: {field_name} ({field_type})")
try:
field_id = self.create_custom_field(field_name, field_type, metadata_by_file)
if not field_id:
continue
task_id = self.upload_field_data(field_id, field_name, field_type,
metadata_by_file, filename_to_media_id)
if not task_id:
continue
status = self.wait_for_task_completion(task_id, field_name)
if status in ['COMPLETED', 'COMPLETED_WITH_ERRORS']:
completed.append(field_name)
except Exception as e:
print(f" ❌ Error: {str(e)}")
print(f"\n🎉 Completed {len(completed)}/{len(validated_fields)} fields")
finally:
self.cleanup_temp_files()
def main():
parser = argparse.ArgumentParser(description='Upload metadata from folder')
parser.add_argument('--folder', required=True, help='Folder path')
parser.add_argument('--dataset-id', required=True, help='Dataset ID')
parser.add_argument('--base-url', default='http://localhost:2080', help='Base URL')
parser.add_argument('--field', action='append', nargs=2, metavar=('NAME', 'TYPE'),
help='Field specification (optional)')
args = parser.parse_args()
user_fields = None
if args.field:
user_fields = {}
valid_types = {'string', 'float', 'datetime', 'enum', 'multi-enum', 'link'}
for name, type in args.field:
if type not in valid_types:
print(f"❌ Invalid type: {type}")
sys.exit(1)
user_fields[name] = type
processor = FolderMetadataProcessor(args.dataset_id, args.base_url)
processor.process_workflow(args.folder, user_fields)
if __name__ == "__main__":
main()
Caption Extraction Script
Caption Extraction Script
Purpose: Extract caption data from Visual Layer pipeline runs to reuse for subsequent dataset creations.Installation:Usage:Script Code:
Copy
Ask AI
pip install pandas pyarrow
Copy
Ask AI
# Basic usage
python process_annotations.py /.vl/tmp/[dataset-id]/input/metadata/image_annotations.parquet
# Specify output
python process_annotations.py input.parquet -o /path/to/output.parquet
# Custom prefix
python process_annotations.py input.parquet --prefix /custom/prefix
Copy
Ask AI
#!/usr/bin/env python3
"""
Process parquet annotation files to extract filename and caption columns.
Removes path prefixes from filenames.
"""
import argparse
import sys
from pathlib import Path
import pandas as pd
def process_parquet(input_path, output_path=None, prefix_to_remove='/hostfs'):
"""Process parquet file to extract filename and caption columns."""
input_file = Path(input_path)
if not input_file.exists():
raise FileNotFoundError(f"Input file not found: {input_path}")
if not input_file.suffix == '.parquet':
raise ValueError(f"Must be parquet file, got: {input_file.suffix}")
if output_path is None:
output_file = input_file.parent / f"{input_file.stem}_processed.parquet"
else:
output_file = Path(output_path)
print(f"Reading: {input_file}")
try:
df = pd.read_parquet(input_file)
except Exception as e:
raise RuntimeError(f"Failed to read parquet: {e}")
print(f"Original shape: {df.shape}")
print(f"Columns: {df.columns.tolist()}")
required = ['filename', 'caption']
missing = [col for col in required if col not in df.columns]
if missing:
raise ValueError(f"Missing columns: {missing}")
df = df[['filename', 'caption']]
print(f"Removing prefix '{prefix_to_remove}'...")
df['filename'] = df['filename'].apply(
lambda x: x.replace(prefix_to_remove, '', 1)
if isinstance(x, str) and x.startswith(prefix_to_remove)
else x
)
print(f"\nProcessed shape: {df.shape}")
print(f"\nSample filenames:")
print(df['filename'].head(3).tolist())
print(f"\nSaving to: {output_file}")
df.to_parquet(output_file, index=False)
print(f"✓ Processed {len(df)} rows")
print(f"✓ Saved to: {output_file}")
return output_file
def main():
parser = argparse.ArgumentParser(
description='Process parquet files to extract filename and caption'
)
parser.add_argument('input', help='Input parquet file')
parser.add_argument('-o', '--output', help='Output path (optional)')
parser.add_argument('--prefix', default='/hostfs', help='Prefix to remove')
args = parser.parse_args()
try:
process_parquet(args.input, args.output, args.prefix)
return 0
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
return 1
if __name__ == '__main__':
sys.exit(main())
DICOM Converter Script
DICOM Converter Script
Purpose: Convert DICOM medical imaging files to JPG format with comprehensive metadata extraction.Installation:Usage:Script Code:This script converts DICOM files to JPG images and extracts metadata. Due to length, the complete code includes:
Copy
Ask AI
pip install pandas pydicom dicom2jpg opencv-python numpy
Copy
Ask AI
# Basic usage
python dicom_converter.py /path/to/dicom/files
# Custom output
python dicom_converter.py /path/to/dicom/files --output /custom/output
# Verbose logging
python dicom_converter.py /path/to/dicom/files --verbose
- DICOM metadata extraction (patient info, study data, equipment details)
- Multiframe DICOM handling
- Single-frame DICOM conversion
- Windowing and rescaling
- CSV metadata output
DICOM Metadata Upload Script
DICOM Metadata Upload Script
Purpose: Upload DICOM metadata from CSV to Visual Layer with automatic field type detection.Installation:Usage:Script Code:This script uploads DICOM metadata from CSV files generated by the DICOM converter. Features include:
Copy
Ask AI
pip install pandas requests pyjwt
Copy
Ask AI
# Cloud installation
python upload_csv_with_json_mapping.py dicom_metadata.csv metadata.json \
--dataset-id=your-dataset-id \
--base-url=https://app.visual-layer.com \
--api-key=your-api-key \
--api-secret=your-api-secret
# On-premises installation
python upload_csv_with_json_mapping.py dicom_metadata.csv metadata.json \
--dataset-id=your-dataset-id \
--base-url=http://localhost:2080 \
--api-key=your-api-key
- Automatic DICOM date/time format detection
- Field type categorization (string, float, datetime, enum)
- Multi-value field handling
- JWT authentication support
- Resume capability for large uploads