Custom Metadata Upload to Visual Layer
This page contains the complete, ready-to-use Python script for uploading custom metadata from a folder containing images and their corresponding metadata JSON files to Visual Layer datasets with automatic field discovery and type detection.
.metadata.json files. The script automatically exports the dataset, discovers all metadata fields, and intelligently detects data types without manual configuration.
Back to Custom Metadata Automation Documentation
Return to the main custom metadata automation guide for setup instructions, usage examples, and workflow details.
Key Features
- Automatic Dataset Export - No manual export step required
- Auto-Discovery Mode - Automatically finds and processes all metadata fields
- Manual Override - Specify specific fields with
--fieldflag when needed - Intelligent Type Detection - Detects float, datetime, enum, multi-enum, link, and string types
- Robust Error Handling - Falls back to string type if conversion fails
- String Truncation - Automatically handles Visual Layer’s 255-character limit
Installation Requirements
Before using this script, install the required Python packages:Copy
Ask AI
pip install pandas requests
Usage
Save this script asupload_metadata_from_folder.py and run:
Copy
Ask AI
# Auto-detect all fields (recommended)
python upload_metadata_from_folder.py \
--folder /path/to/folder \
--dataset-id your-dataset-id \
--base-url http://localhost:8080
# Manually specify specific fields
python upload_metadata_from_folder.py \
--folder /path/to/folder \
--dataset-id your-dataset-id \
--base-url http://localhost:8080 \
--field annotated_at datetime \
--field confidence float \
--field category enum
Expected Folder Structure
The script expects images and their corresponding metadata files:Copy
Ask AI
your-folder/
├── image1.jpg
├── image1.jpg.metadata.json
├── image2.png
├── image2.png.metadata.json
└── image3.jpg
└── image3.jpg.metadata.json
.metadata.json file should contain field-value pairs:
Copy
Ask AI
{
"annotated_at": "2024-01-15T10:30:00Z",
"confidence": 0.95,
"category": "approved",
"tags": ["quality", "verified"]
}
This is an example script designed for on-premises Visual Layer installations. You can modify the field detection logic and processing to suit your specific needs.
Complete Script Code
Copy
Ask AI
#!/usr/bin/env python3
"""
Folder-based Metadata Upload for Visual Layer
Scans a folder for images and their corresponding .metadata.json files.
Automatically discovers and uploads all metadata fields with intelligent type detection.
Features:
- Auto-detection: Discovers all fields and detects types (float, datetime, enum, string, etc.)
- Manual override: Specify fields manually with --field flag
- Robust: Falls back to string type if conversion fails
Usage:
# Auto-detect all fields (recommended):
python upload_metadata_from_folder.py \\
--folder "path/to/folder" \\
--dataset-id <dataset-id> \\
--base-url http://localhost:8080
# Manual field specification:
python upload_metadata_from_folder.py \\
--folder "path/to/folder" \\
--dataset-id <dataset-id> \\
--base-url http://localhost:8080 \\
--field annotated_at datetime \\
--field conf_classification float
"""
import json
import requests
import argparse
import os
import sys
import pandas as pd
import time
from typing import Dict, List, Any, Optional, Tuple
from pathlib import Path
class FolderMetadataProcessor:
def __init__(self, dataset_id: str, base_url: str):
self.dataset_id = dataset_id
self.raw_base_url = base_url.rstrip('/')
# Automatically add /api/v1/datasets if not present
if not base_url.endswith('/api/v1/datasets'):
if base_url.endswith('/'):
base_url = base_url.rstrip('/')
self.base_url = f"{base_url}/api/v1/datasets"
else:
self.base_url = base_url
self.session = requests.Session()
self._temp_files = []
def export_dataset(self) -> Dict[str, str]:
"""Export dataset and return mapping of filename -> media_id."""
print("📤 Exporting dataset to get media_id mappings...")
# Use the export_media_id endpoint to get CSV with filename -> media_id mapping
# Note: This endpoint uses singular 'dataset' not 'datasets'
url = f"{self.raw_base_url}/api/v1/dataset/{self.dataset_id}/export_media_id"
try:
response = self.session.get(url)
if response.status_code == 200:
# Parse CSV response
import csv
import io
csv_content = response.text
csv_reader = csv.DictReader(io.StringIO(csv_content))
# Build mapping from filename to media_id
mapping = {}
for row in csv_reader:
filename = row.get('filename', '')
media_id = row.get('media_id', '')
if media_id and filename:
# Extract just the filename without path
basename = os.path.basename(filename)
mapping[basename] = media_id
print(f" ✅ Exported {len(mapping)} media items")
return mapping
else:
print(f" ❌ Failed to export dataset: {response.status_code} - {response.text}")
return {}
except requests.RequestException as e:
print(f" ❌ Export request failed: {str(e)}")
return {}
except Exception as e:
print(f" ❌ Failed to parse export CSV: {str(e)}")
return {}
def scan_folder(self, folder_path: str) -> List[Tuple[str, str]]:
"""Scan folder and return list of (image_path, metadata_path) tuples."""
print(f"🔍 Scanning folder: {folder_path}")
folder = Path(folder_path)
if not folder.exists():
raise FileNotFoundError(f"Folder not found: {folder_path}")
pairs = []
# Find all .metadata.json files
for metadata_file in folder.glob("*.metadata.json"):
# Extract original image filename
# e.g., "image.jpg.metadata.json" -> "image.jpg"
image_name = metadata_file.name.replace(".metadata.json", "")
image_path = folder / image_name
# Check if corresponding image exists
if image_path.exists():
pairs.append((str(image_path), str(metadata_file)))
else:
print(f" ⚠️ Found metadata without image: {metadata_file.name}")
print(f" ✅ Found {len(pairs)} image + metadata pairs")
return pairs
def load_metadata_files(self, pairs: List[Tuple[str, str]]) -> Dict[str, Dict[str, Any]]:
"""Load all metadata.json files and return dict of filename -> metadata."""
print("📖 Loading metadata files...")
metadata_by_file = {}
for image_path, metadata_path in pairs:
image_filename = os.path.basename(image_path)
try:
with open(metadata_path, 'r', encoding='utf-8') as f:
metadata = json.load(f)
metadata_by_file[image_filename] = metadata
except Exception as e:
print(f" ⚠️ Failed to load {metadata_path}: {str(e)}")
continue
print(f" ✅ Loaded {len(metadata_by_file)} metadata files")
return metadata_by_file
def discover_and_analyze_fields(self, metadata_by_file: Dict[str, Dict[str, Any]]) -> Dict[str, str]:
"""Discover all fields and auto-detect their types."""
print("🔍 Discovering and analyzing all fields...")
# Collect all unique field names
all_fields = set()
for filename, metadata in metadata_by_file.items():
all_fields.update(metadata.keys())
print(f" 📋 Found {len(all_fields)} unique fields")
field_types = {}
# Analyze each field
for field_name in sorted(all_fields):
# Collect sample values (more samples for better enum detection)
sample_values = []
for filename, metadata in metadata_by_file.items():
if field_name in metadata:
sample_values.append(metadata[field_name])
if len(sample_values) >= 100: # Sample up to 100 values for enum detection
break
# Detect type
field_type = self._auto_detect_field_type(field_name, sample_values)
field_types[field_name] = field_type
print(f" {field_name}: {field_type}")
print(f" ✅ Analyzed {len(field_types)} fields")
return field_types
def _auto_detect_field_type(self, field_name: str, sample_values: List[Any]) -> str:
"""Auto-detect field type based on sample values."""
if not sample_values:
return 'string'
# Get first non-null value
first_value = None
for val in sample_values:
if val is not None and val != '':
first_value = val
break
if first_value is None:
return 'string'
# Check type based on value patterns
if self._is_float(first_value):
return 'float'
elif self._is_date(first_value):
return 'datetime'
elif self._is_url(first_value):
# Check if most values are URLs
url_count = sum(1 for v in sample_values if self._is_url(v))
if url_count / len(sample_values) >= 0.7:
return 'link'
else:
return 'string'
elif self._is_list(first_value):
# Check if it's multi-enum or string
unique_values = set()
for val in sample_values:
if self._is_list(val):
list_items = self._parse_list_value(val)
unique_values.update(list_items)
if len(unique_values) <= 20:
return 'multi-enum'
else:
return 'string'
else:
# Check if it's an enum (limited unique string values)
unique_values = set()
for val in sample_values:
if val is not None and val != '':
# Only consider string-like values for enum (skip dicts/lists)
if not isinstance(val, (dict, list)):
unique_values.add(str(val).strip())
# If ≤100 unique values and has at least some values, treat as enum
if len(unique_values) > 0 and len(unique_values) <= 100:
return 'enum'
else:
# Default to string
return 'string'
def _is_float(self, value: Any) -> bool:
"""Check if value can be converted to float."""
if not isinstance(value, (str, int, float)):
return False
try:
float_val = float(value)
# Check if it has decimal point (to distinguish from int)
return '.' in str(value) or 'e' in str(value).lower()
except (ValueError, TypeError):
return False
def _is_date(self, value: Any) -> bool:
"""Check if value looks like a date using pandas parsing."""
if not isinstance(value, str) or not value.strip():
return False
try:
pd.to_datetime(value.strip())
return True
except (ValueError, TypeError, pd.errors.ParserError):
return False
def _is_url(self, value: Any) -> bool:
"""Check if value is a valid absolute URL with protocol."""
if not isinstance(value, str):
return False
value_lower = value.strip().lower()
# ONLY accept actual URLs starting with protocol
return value_lower.startswith(('http://', 'https://', 'ftp://'))
def _is_list(self, value: Any) -> bool:
"""Check if value looks like a list."""
# If it's already a list, return True
if isinstance(value, list):
return True
if not isinstance(value, str):
return False
value = value.strip()
# Check for standard list formats
if value.startswith('[') and value.endswith(']'):
try:
import ast
ast.literal_eval(value)
return True
except (ValueError, SyntaxError):
return False
return False
def _parse_list_value(self, value: Any) -> List[str]:
"""Parse list value."""
if isinstance(value, list):
return [str(v) for v in value]
if not isinstance(value, str):
return [str(value)]
value = value.strip()
try:
import ast
parsed = ast.literal_eval(value)
if isinstance(parsed, list):
return [str(v) for v in parsed]
except (ValueError, SyntaxError):
pass
return [value]
def validate_user_fields(self, user_fields: Dict[str, str],
metadata_by_file: Dict[str, Dict[str, Any]]) -> Dict[str, str]:
"""Validate user-specified fields exist and types are correct."""
print("🔍 Validating user-specified fields...")
validated_fields = {}
for field_name, field_type in user_fields.items():
print(f" Checking field: {field_name} ({field_type})")
# Check if field exists in at least one metadata file
field_exists = False
sample_values = []
for filename, metadata in metadata_by_file.items():
if field_name in metadata:
field_exists = True
sample_values.append(metadata[field_name])
if len(sample_values) >= 10: # Sample up to 10 values
break
if not field_exists:
print(f" ❌ Field '{field_name}' not found in any metadata file")
continue
# Validate type with sample values
type_valid = self._validate_field_type(field_name, field_type, sample_values)
if type_valid:
validated_fields[field_name] = field_type
print(f" ✅ Valid")
else:
print(f" ❌ Type validation failed for '{field_name}'")
print(f" ✅ Validated {len(validated_fields)}/{len(user_fields)} fields")
return validated_fields
def _validate_field_type(self, field_name: str, field_type: str,
sample_values: List[Any]) -> bool:
"""Validate that sample values match expected field type."""
if not sample_values:
return True # Empty is ok
if field_type == 'string':
return True # String always works
if field_type == 'datetime':
# Try parsing with pandas
for value in sample_values:
if value is None or value == '':
continue
try:
pd.to_datetime(str(value))
except Exception:
return False
return True
if field_type == 'float':
# Try converting to float
for value in sample_values:
if value is None or value == '':
continue
try:
float(value)
except Exception:
return False
return True
if field_type in ['enum', 'multi-enum']:
# Collect unique values
unique_values = set()
for value in sample_values:
if value is None or value == '':
continue
if field_type == 'multi-enum':
if isinstance(value, list):
unique_values.update([str(v) for v in value])
else:
return False # multi-enum must be array
else:
unique_values.add(str(value))
# Check if within API limit
if len(unique_values) > 20:
print(f" ⚠️ Warning: {len(unique_values)} unique values (max 20 for enum)")
return False
return True
if field_type == 'link':
# Basic URL validation
for value in sample_values:
if value is None or value == '':
continue
value_str = str(value)
if not (value_str.startswith('http://') or
value_str.startswith('https://') or
value_str.startswith('ftp://')):
# Could be file path, allow it
pass
return True
return True
def create_custom_field(self, field_name: str, field_type: str,
metadata_by_file: Dict[str, Dict[str, Any]]) -> Optional[str]:
"""Create a custom field and return field_id."""
print(f"🔧 Creating custom field: {field_name} ({field_type})")
# Prepare field data
field_data = {
"field_name": field_name,
"field_type": field_type
}
# Add enum values for enum types
if field_type in ['enum', 'multi-enum']:
unique_values = set()
for filename, metadata in metadata_by_file.items():
if field_name in metadata:
value = metadata[field_name]
if value is not None and value != '':
if field_type == 'multi-enum':
if isinstance(value, list):
unique_values.update([str(v) for v in value])
else:
unique_values.add(str(value))
enum_values = list(unique_values)[:20] # Limit to API max
field_data["enum_options"] = enum_values
if field_type == 'multi-enum':
field_data["field_type"] = 'enum' # API only accepts 'enum'
field_data["is_multi"] = True
else:
field_data["is_multi"] = False
print(f" 📝 Adding {len(enum_values)} enum values")
url = f"{self.base_url}/{self.dataset_id}/custom_metadata/tasks"
try:
response = self.session.post(url, json=field_data)
if response.status_code == 200:
result = response.json()
task_id = result.get('task_id')
print(f" ✅ Created field with task ID: {task_id}")
return task_id
elif "already exists" in response.text:
print(f" 🔄 Field already exists, skipping creation")
return None
else:
print(f" ❌ Failed to create field: {response.status_code} - {response.text}")
return None
except requests.RequestException as e:
print(f" ❌ Request failed: {str(e)}")
return None
def upload_field_data(self, field_id: str, field_name: str, field_type: str,
metadata_by_file: Dict[str, Dict[str, Any]],
filename_to_media_id: Dict[str, str]) -> Optional[str]:
"""Upload data for a custom field."""
print(f" 📤 Uploading data for field: {field_name}")
upload_data = []
matched_count = 0
skipped_count = 0
for filename, metadata in metadata_by_file.items():
# Get media_id for this file
media_id = filename_to_media_id.get(filename)
if not media_id:
skipped_count += 1
continue
# Get field value
if field_name not in metadata:
continue
value = metadata[field_name]
if value is None or value == '':
continue
# Convert value based on field type
try:
converted_value = self._convert_value(value, field_name, field_type)
if converted_value is not None:
upload_data.append({
"media_id": media_id,
"value": converted_value
})
matched_count += 1
except Exception as e:
print(f" ⚠️ Failed to convert value for {filename}: {str(e)}")
skipped_count += 1
continue
print(f" 📊 Processed {matched_count} files, skipped {skipped_count}")
if not upload_data:
print(f" ⚠️ No data to upload for field {field_name}")
return None
# Save to temp file
import tempfile
with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f:
json.dump(upload_data, f, indent=2)
temp_file = f.name
self._temp_files.append(temp_file)
# Upload
url = f"{self.base_url}/{self.dataset_id}/custom_metadata/tasks/{field_id}"
try:
with open(temp_file, 'rb') as f:
files = {'file': (f'metadata_{field_name}.json', f, 'application/json')}
response = self.session.post(url, files=files)
if response.status_code in [200, 202]:
print(f" ✅ Upload completed successfully")
return field_id
else:
print(f" ❌ Failed to upload data: {response.status_code} - {response.text}")
return None
except requests.RequestException as e:
print(f" ❌ Request failed: {str(e)}")
return None
def _convert_value(self, value: Any, field_name: str, field_type: str) -> Any:
"""Convert value to appropriate type for upload. Falls back to string if conversion fails.
Truncates string values to 255 characters (VL limit)."""
try:
if field_type == 'datetime':
# Parse and format as ISO 8601
dt = pd.to_datetime(str(value))
if dt.tz is None:
return dt.strftime('%Y-%m-%dT%H:%M:%SZ')
else:
return dt.tz_convert('UTC').strftime('%Y-%m-%dT%H:%M:%SZ')
elif field_type == 'float':
return float(value)
elif field_type == 'multi-enum':
if isinstance(value, list):
return [str(v).strip() for v in value]
else:
return [str(value).strip()]
elif field_type == 'enum':
return str(value).strip()
elif field_type == 'string':
# Handle nested objects/arrays - serialize to JSON
if isinstance(value, (dict, list)):
json_str = json.dumps(value)
if len(json_str) > 255:
return json_str[:255]
return json_str
str_value = str(value).strip()
if len(str_value) > 255:
return str_value[:255]
return str_value
elif field_type == 'link':
return str(value).strip()
return str(value)
except Exception as e:
# Fallback to string if conversion fails (with 255 char limit)
if isinstance(value, (dict, list)):
json_str = json.dumps(value)
return json_str[:255] if len(json_str) > 255 else json_str
str_value = str(value)
return str_value[:255] if len(str_value) > 255 else str_value
def check_task_status(self, task_id: str) -> str:
"""Check the status of an upload task."""
url = f"{self.base_url}/{self.dataset_id}/custom_metadata/tasks/{task_id}/status"
try:
response = self.session.get(url)
if response.status_code == 200:
result = response.json()
status = result.get('status', 'unknown')
if status == 'COMPLETED_WITH_ERRORS':
error_count = result.get('error_count', 0)
inserted_rows = result.get('inserted_rows', 0)
print(f" ⚠️ Task completed with {error_count} errors, {inserted_rows} rows inserted")
sample_errors = result.get('sample_errors', [])
if sample_errors:
print(f" 📋 Sample errors:")
for error in sample_errors[:3]:
print(f" - Row {error.get('row_index', '?')}: {error.get('reason', 'Unknown error')}")
if len(sample_errors) > 3:
print(f" - ... and {len(sample_errors) - 3} more errors")
elif status == 'COMPLETED':
inserted_rows = result.get('inserted_rows', 0)
print(f" ✅ Task completed successfully, {inserted_rows} rows inserted")
return status
else:
return 'error'
except requests.RequestException:
return 'error'
def wait_for_task_completion(self, task_id: str, field_name: str, polling_interval: int = 5) -> str:
"""Wait for task completion by polling status endpoint."""
print(f" ⏳ Waiting for task completion... (Press Ctrl+C to stop)")
start_time = time.time()
while True:
status = self.check_task_status(task_id)
if status in ['COMPLETED', 'COMPLETED_WITH_ERRORS']:
elapsed = int(time.time() - start_time)
print(f" ✅ Task completed after {elapsed}s")
return status
elif status == 'error':
print(f" ❌ Error checking task status")
return 'error'
elif status == 'IN_PROGRESS':
elapsed = int(time.time() - start_time)
print(f" ⏳ Task in progress... (elapsed: {elapsed}s)")
else:
elapsed = int(time.time() - start_time)
print(f" 📋 Task status: {status} (elapsed: {elapsed}s)")
time.sleep(polling_interval)
def cleanup_temp_files(self):
"""Remove temporary files created during processing."""
for temp_file in self._temp_files:
try:
if os.path.exists(temp_file):
os.remove(temp_file)
except OSError:
pass
def process_workflow(self, folder_path: str, user_fields: Optional[Dict[str, str]] = None):
"""Main workflow processing function."""
try:
print("🚀 Starting Folder Metadata Upload Workflow")
print(f"📁 Folder: {folder_path}")
# Step 1: Export dataset to get media_id mappings
filename_to_media_id = self.export_dataset()
if not filename_to_media_id:
raise Exception("Failed to export dataset")
# Step 2: Scan folder for images + metadata
pairs = self.scan_folder(folder_path)
if not pairs:
raise Exception("No image + metadata pairs found in folder")
# Step 3: Load all metadata files
metadata_by_file = self.load_metadata_files(pairs)
if not metadata_by_file:
raise Exception("Failed to load metadata files")
# Step 4: Determine fields to process
if user_fields:
# User-specified fields - validate them
print(f"🎯 User-specified fields: {len(user_fields)}")
validated_fields = self.validate_user_fields(user_fields, metadata_by_file)
if not validated_fields:
raise Exception("No valid fields to upload")
else:
# Auto-detect all fields
print("🤖 Auto-detection mode")
validated_fields = self.discover_and_analyze_fields(metadata_by_file)
if not validated_fields:
raise Exception("No fields discovered")
print(f"\n🎯 Processing {len(validated_fields)} custom fields...")
# Step 5: Process each field
completed_fields = []
for field_name, field_type in validated_fields.items():
print(f"\n🔄 Processing field: {field_name} ({field_type})")
try:
# Create custom field
field_id = self.create_custom_field(field_name, field_type, metadata_by_file)
if not field_id:
print(f" ⏭️ Skipping field {field_name}")
continue
# Upload data
task_id = self.upload_field_data(field_id, field_name, field_type,
metadata_by_file, filename_to_media_id)
if not task_id:
print(f" ❌ Failed to upload data for {field_name}")
continue
# Wait for completion
final_status = self.wait_for_task_completion(task_id, field_name)
if final_status in ['COMPLETED', 'COMPLETED_WITH_ERRORS']:
completed_fields.append(field_name)
status_icon = "✅" if final_status == 'COMPLETED' else "⚠️"
print(f" {status_icon} Field {field_name} {final_status.lower()}")
else:
print(f" ❌ Field {field_name} failed")
except Exception as e:
print(f" ❌ Error processing field {field_name}: {str(e)}")
continue
print("\n🎉 Workflow completed!")
print(f"✅ Successfully processed {len(completed_fields)}/{len(validated_fields)} fields")
finally:
self.cleanup_temp_files()
def main():
parser = argparse.ArgumentParser(
description='Upload metadata from folder to Visual Layer',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Example usage:
# Auto-detect all fields (recommended):
python upload_metadata_from_folder.py \\
--folder "example shopic/" \\
--dataset-id abc-123 \\
--base-url http://localhost:8080
# Manual field specification:
python upload_metadata_from_folder.py \\
--folder "example shopic/" \\
--dataset-id abc-123 \\
--base-url http://localhost:8080 \\
--field annotated_at datetime \\
--field conf_classification float \\
--field barcode string \\
--field quality enum
"""
)
parser.add_argument('--folder', required=True,
help='Path to folder containing images and .metadata.json files')
parser.add_argument('--dataset-id', required=True,
help='Visual Layer dataset ID')
parser.add_argument('--base-url', default='http://localhost:8080',
help='Base URL (default: http://localhost:8080)')
parser.add_argument('--field', action='append', nargs=2, metavar=('NAME', 'TYPE'),
help='Field to upload (optional, repeatable). If not specified, auto-detects all fields. TYPE: string|float|datetime|enum|multi-enum|link')
args = parser.parse_args()
# Parse fields if provided
user_fields = None
if args.field:
user_fields = {}
valid_types = {'string', 'float', 'datetime', 'enum', 'multi-enum', 'link'}
for field_name, field_type in args.field:
if field_type not in valid_types:
print(f"❌ Error: Invalid field type '{field_type}' for field '{field_name}'")
print(f" Valid types: {', '.join(sorted(valid_types))}")
sys.exit(1)
user_fields[field_name] = field_type
# Create processor and run workflow
processor = FolderMetadataProcessor(args.dataset_id, args.base_url)
processor.process_workflow(args.folder, user_fields)
if __name__ == "__main__":
main()
Single JSON File Modification
If your metadata is in one JSON file for all images, add this method to theFolderMetadataProcessor class:
Copy
Ask AI
def load_single_json_metadata(self, json_file: str, folder_path: str) -> Dict[str, Dict[str, Any]]:
"""Load metadata from a single JSON file containing all image metadata."""
print(f"📖 Loading metadata from single JSON file: {json_file}")
with open(json_file, 'r', encoding='utf-8') as f:
data = json.load(f)
# Handle dictionary format: {filename: {metadata}}
if isinstance(data, dict):
metadata_by_file = data
# Handle array format: [{filename: ..., field: value, ...}]
elif isinstance(data, list):
metadata_by_file = {}
for item in data:
filename = item.get('filename') or item.get('file_name')
if filename:
item_copy = {k: v for k, v in item.items() if k not in ['filename', 'file_name']}
metadata_by_file[filename] = item_copy
# Verify images exist
folder = Path(folder_path)
valid_metadata = {}
for filename, metadata in metadata_by_file.items():
if (folder / filename).exists():
valid_metadata[filename] = metadata
print(f" ✅ Loaded metadata for {len(valid_metadata)} images")
return valid_metadata
process_workflow(), replace the scan/load section with:
Copy
Ask AI
# Replace this:
pairs = self.scan_folder(folder_path)
metadata_by_file = self.load_metadata_files(pairs)
# With this:
metadata_by_file = self.load_single_json_metadata('/path/to/metadata.json', folder_path)