Batch Processing and Automation
Processing large numbers of datasets manually is time-consuming and error-prone. This tutorial demonstrates how to create automated pipelines that process multiple files consistently and efficiently.
Estimated time: 60 minutes
Prerequisites:
- Completed Python Scripting Fundamentals
- Understanding of Volvicon's core operations
- Custom Scripting feature enabled (available on Professional and above licenses)
Batch Processing Concepts
Why Automate?
| Manual Processing | Automated Processing |
|---|---|
| Repetitive clicking | Single script execution |
| Human error variability | Consistent parameters |
| Limited to working hours | Runs overnight |
| One dataset at a time | Parallel or sequential batches |
| No audit trail | Logged operations |
Pipeline Architecture
A typical batch pipeline consists of:
- Input discovery — Find files to process
- Iteration — Loop through each file
- Processing — Apply operations
- Output — Export results
- Logging — Record success/failure
File Discovery
Finding Files with Python
import os
def find_dicom_folders(root_path):
"""Find all folders containing DICOM files."""
dicom_folders = []
for dirpath, dirnames, filenames in os.walk(root_path):
dcm_files = [f for f in filenames if f.endswith('.dcm')]
if dcm_files:
dicom_folders.append(dirpath)
return dicom_folders
def find_nifti_files(root_path):
"""Find all NIfTI files in a directory tree."""
from pathlib import Path
return [str(p) for p in Path(root_path).rglob("*.nii*")]
# Example usage
input_root = "C:/Data/Scans"
dicom_dirs = find_dicom_folders(input_root)
nifti_files = find_nifti_files(input_root)
print(f"Found {len(dicom_dirs)} DICOM folders")
print(f"Found {len(nifti_files)} NIfTI files")
Organizing Output
import os
from datetime import datetime
def create_output_structure(base_path, dataset_name):
"""Create organized output folder structure."""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_root = os.path.join(base_path, f"{dataset_name}_{timestamp}")
subdirs = ["masks", "surfaces", "reports", "logs"]
for subdir in subdirs:
os.makedirs(os.path.join(output_root, subdir), exist_ok=True)
return output_root
Basic Batch Pipeline
Processing Multiple NIfTI Files
"""
Basic Batch Processing Pipeline
Process multiple NIfTI files with threshold segmentation.
"""
import os
from pathlib import Path
import ScriptingApi as api
def process_single_file(app, input_path, output_dir):
"""Process a single volume file."""
volume_operations = app.get_volume_operations()
mask_operations = app.get_mask_operations()
# Get filename without extension
basename = os.path.splitext(os.path.basename(input_path))[0]
basename = basename.replace(".nii", "") # Handle .nii.gz
# Import volume
volume_name = volume_operations.import_3d_image_from_disk(input_path)
# Threshold segmentation
threshold_params = api.ThresholdParams()
threshold_params.lower_threshold = 200
threshold_params.upper_threshold = 3000
threshold_params.filter_regions = True
threshold_params.min_region_size = 100
threshold_params.keep_largest = False
mask_name = mask_operations.threshold(volume_name, threshold_params)
# Generate surface
mask_to_surface_params = api.MaskToSurfaceParams()
mask_to_surface_params.smooth_iterations = 20
mask_to_surface_params.smooth_factor = 0.06
mask_to_surface_params.triangle_reduction_percent = 50
surface_results = mask_operations.convert_to_surface_objects([mask_name], mask_to_surface_params)
# Ensure we have a surface to export
if not surface_results:
raise RuntimeError(f"No surface generated for mask '{mask_name}'")
# Normalize return type and pick the first surface id
if isinstance(surface_results, (list, tuple)):
surface_id = surface_results[0]
else:
surface_id = surface_results
surface_operations = app.get_surface_operations()
# Export results
mask_path = os.path.join(output_dir, f"{basename}_mask.nii")
surface_path = os.path.join(output_dir, f"{basename}_surface.stl")
mask_operations.export_mask_image_to_disk(mask_name, mask_path)
surface_operations.export_surface_to_disk(surface_id, surface_path)
# Close project to free memory
app.close_project()
return True
def main():
app = api.Application()
# Configuration
input_dir = "C:/Data/Input"
output_dir = "C:/Data/Output"
# Create output directory
os.makedirs(output_dir, exist_ok=True)
# Find all NIfTI files
files = [str(p) for p in Path(input_dir).rglob("*.nii*")]
print(f"Found {len(files)} files to process")
# Process each file
for i, filepath in enumerate(files):
print(f"Processing {i+1}/{len(files)}: {os.path.basename(filepath)}")
try:
process_single_file(app, filepath, output_dir)
print(" Success")
except Exception as e:
print(f" Failed: {e}")
print("Batch processing complete!")
main()
Error Handling and Logging
Comprehensive Logging
"""
Batch Pipeline with Logging
Records all operations and errors to a log file.
"""
import os
from pathlib import Path
import logging
from datetime import datetime
import ScriptingApi as api
def setup_logging(output_dir):
"""Configure logging to file and console."""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
log_path = os.path.join(output_dir, f"batch_log_{timestamp}.txt")
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[
logging.FileHandler(log_path),
logging.StreamHandler()
]
)
return logging.getLogger(__name__)
def process_with_logging(app, input_path, output_dir, logger):
"""Process a file with detailed logging."""
volume_operations = app.get_volume_operations()
mask_operations = app.get_mask_operations()
measure_operations = app.get_measure_operations()
basename = os.path.splitext(os.path.basename(input_path))[0]
basename = basename.replace(".nii", "") # Handle .nii.gz
logger.info(f"Starting processing: {basename}")
# Import
logger.info(" Importing volume...")
volume_name = volume_operations.import_3d_image_from_disk(input_path)
logger.info(f" Imported as: {volume_name}")
# Get volume info
dims = volume_operations.get_dimensions(volume_name)
spacing = volume_operations.get_spacing(volume_name)
logger.info(f" Dimensions: {dims}")
logger.info(f" Spacing: {spacing}")
# Segment
logger.info(" Running threshold segmentation...")
threshold_params = api.ThresholdParams()
threshold_params.lower_threshold = 200
threshold_params.upper_threshold = 3000
threshold_params.filter_regions = True
threshold_params.min_region_size = 100
threshold_params.keep_largest = False
mask_name = mask_operations.threshold(volume_name, threshold_params)
# Statistics
requested_stats = [api.LabelStatisticType.Volume]
mask_statistics_result = measure_operations.compute_whole_mask_statistics(mask_name, volume_name, requested_stats)
logger.info(f" Mask volume: {mask_statistics_result.total_volume:.2f} mm³")
# Export
output_path = os.path.join(output_dir, f"{basename}_mask.nii")
mask_operations.export_mask_image_to_disk(mask_name, output_path)
logger.info(f" Exported to: {output_path}")
app.close_project()
logger.info(f"Completed: {basename}")
return {
"input": input_path,
"output": output_path,
"volume": mask_statistics_result.total_volume,
"status": "success"
}
def main():
app = api.Application()
input_dir = "C:/Data/Input"
output_dir = "C:/Data/Output"
os.makedirs(output_dir, exist_ok=True)
logger = setup_logging(output_dir)
logger.info("=" * 50)
logger.info("Batch Processing Started")
logger.info("=" * 50)
files = [str(p) for p in Path(input_dir).rglob("*.nii*")]
logger.info(f"Found {len(files)} files")
results = []
success_count = 0
failure_count = 0
for i, filepath in enumerate(files):
logger.info(f"[{i+1}/{len(files)}] {os.path.basename(filepath)}")
try:
result = process_with_logging(app, filepath, output_dir, logger)
results.append(result)
success_count += 1
except Exception as e:
logger.error(f"Failed: {e}")
results.append({
"input": filepath,
"status": "failed",
"error": str(e)
})
failure_count += 1
# Summary
logger.info("=" * 50)
logger.info("Batch Processing Complete")
logger.info(f" Successful: {success_count}")
logger.info(f" Failed: {failure_count}")
logger.info("=" * 50)
main()
Advanced Pipeline Features
Configuration Files
Use JSON configuration for flexible pipelines:
"""
Configurable Batch Pipeline
Reads processing parameters from a JSON configuration file.
"""
import os
import json
from pathlib import Path
import ScriptingApi as api
def load_config(config_path):
"""Load configuration from JSON file."""
with open(config_path, 'r') as f:
return json.load(f)
def process_with_config(app, input_path, output_dir, config):
"""Process using configuration parameters."""
volume_operations = app.get_volume_operations()
mask_operations = app.get_mask_operations()
surface_operations = app.get_surface_operations()
basename = os.path.splitext(os.path.basename(input_path))[0]
basename = basename.replace(".nii", "") # Handle .nii.gz
# Import
volume_name = volume_operations.import_3d_image_from_disk(input_path)
# Threshold from config
threshold_config = config.get("threshold", {})
threshold_params = api.ThresholdParams()
threshold_params.lower_threshold = threshold_config.get("lower", 200)
threshold_params.upper_threshold = threshold_config.get("upper", 3000)
threshold_params.filter_regions = threshold_config.get("filter_regions", True)
threshold_params.min_region_size = threshold_config.get("min_region_size", 100)
threshold_params.keep_largest = False
mask_name = mask_operations.threshold(volume_name, threshold_params)
# Surface generation from config
if config.get("generate_surface", True):
surface_config = config.get("surface", {})
mask_to_surface_params = api.MaskToSurfaceParams()
mask_to_surface_params.smooth_iterations = surface_config.get("smooth_iterations", 20)
mask_to_surface_params.smooth_factor = surface_config.get("smooth_factor", 0.06)
mask_to_surface_params.triangle_reduction_percent = surface_config.get("triangle_reduction_percent", 50)
surface_names = mask_operations.convert_to_surface_objects([mask_name], mask_to_surface_params)
if not surface_names:
raise RuntimeError(f"No surface generated for mask '{mask_name}'")
surface_path = os.path.join(output_dir, f"{basename}_surface.stl")
surface_operations.export_surface_to_disk(surface_names[0], surface_path)
# Export mask
mask_path = os.path.join(output_dir, f"{basename}_mask.nii")
mask_operations.export_mask_image_to_disk(mask_name, mask_path)
app.close_project()
def main():
app = api.Application()
# Load configuration
config = load_config("C:/Scripts/batch_config.json")
input_dir = config.get("input_dir", "C:/Data/Input")
output_dir = config.get("output_dir", "C:/Data/Output")
os.makedirs(output_dir, exist_ok=True)
files = [str(p) for p in Path(input_dir).rglob("*.nii*")]
for filepath in files:
print(f"Processing: {os.path.basename(filepath)}")
try:
process_with_config(app, filepath, output_dir, config)
except Exception as e:
print(f" Error: {e}")
main()
Example configuration file (batch_config.json):
{
"input_dir": "C:/Data/Input",
"output_dir": "C:/Data/Output",
"threshold": {
"lower": 200,
"upper": 3000,
"filter_regions": true,
"min_region_size": 100
},
"generate_surface": true,
"surface": {
"smooth_iterations": 20,
"smooth_factor": 0.06,
"triangle_reduction_percent": 50
}
}
Advanced Analysis with NumPy and pandas
Volume Statistics with NumPy
"""
Advanced Volume Analysis using NumPy
Calculate advanced statistics using NumPy arrays.
"""
import os
from pathlib import Path
import numpy as np
import ScriptingApi as api
def analyze_volume_with_numpy(app, volume_name):
"""Perform advanced volume analysis using NumPy."""
volume_operations = app.get_volume_operations()
# Get volume properties
dims = volume_operations.get_dimensions(volume_name)
spacing = volume_operations.get_spacing(volume_name)
origin = volume_operations.get_origin(volume_name)
scalar_range = volume_operations.get_scalar_range(volume_name)
# Calculate physical dimensions using NumPy
dims_array = np.array(dims)
spacing_array = np.array(spacing)
# Physical size in mm
physical_size = dims_array * spacing_array
total_volume_mm3 = np.prod(physical_size)
# Calculate voxel volume
voxel_volume = np.prod(spacing_array)
total_voxels = np.prod(dims_array)
# Intensity range analysis
intensity_min, intensity_max = scalar_range
intensity_range = intensity_max - intensity_min
return {
"dimensions": dims,
"spacing": spacing,
"physical_size_mm": physical_size.tolist(),
"total_volume_mm3": float(total_volume_mm3),
"voxel_volume_mm3": float(voxel_volume),
"total_voxels": int(total_voxels),
"intensity_min": intensity_min,
"intensity_max": intensity_max,
"intensity_range": intensity_range
}
def batch_analyze_with_numpy(app, input_dir, output_dir):
"""Batch analyze multiple volumes and save NumPy statistics."""
volume_operations = app.get_volume_operations()
files = [str(p) for p in Path(input_dir).rglob("*.nii*")]
all_results = []
for i, filepath in enumerate(files):
basename = os.path.splitext(os.path.basename(filepath))[0]
basename = basename.replace(".nii", "")
print(f"[{i+1}/{len(files)}] Analyzing: {basename}")
try:
# Import volume
volume_name = volume_operations.import_3d_image_from_disk(filepath)
# Analyze with NumPy
stats = analyze_volume_with_numpy(app, volume_name)
stats["filename"] = basename
all_results.append(stats)
# Close project to free memory
app.close_project()
print(f" Volume: {stats['total_volume_mm3']:.2f} mm³")
except Exception as e:
print(f" Error: {e}")
# Calculate summary statistics using NumPy
if all_results:
volumes = np.array([r["total_volume_mm3"] for r in all_results])
print("\\n" + "="*50)
print("SUMMARY STATISTICS (NumPy)")
print("="*50)
print(f"Total files analyzed: {len(all_results)}")
print(f"Volume mean: {np.mean(volumes):.2f} mm³")
print(f"Volume std: {np.std(volumes):.2f} mm³")
print(f"Volume min: {np.min(volumes):.2f} mm³")
print(f"Volume max: {np.max(volumes):.2f} mm³")
print(f"Volume median: {np.median(volumes):.2f} mm³")
return all_results
# Example usage
app = api.Application()
results = batch_analyze_with_numpy(app, "C:/Data/Input", "C:/Data/Output")
pandas DataFrame Analysis
"""
Batch Processing with pandas DataFrames
Organize and analyze batch results using pandas.
"""
import os
from pathlib import Path
import pandas as pd
import ScriptingApi as api
def process_with_pandas(app, input_dir, output_dir):
"""Process multiple files and organize results in pandas DataFrame."""
volume_operations = app.get_volume_operations()
mask_operations = app.get_mask_operations()
measure_operations = app.get_measure_operations()
files = [str(p) for p in Path(input_dir).rglob("*.nii*")]
# Create lists to store results
data_records = []
for i, filepath in enumerate(files):
basename = os.path.splitext(os.path.basename(filepath))[0]
basename = basename.replace(".nii", "")
print(f"[{i+1}/{len(files)}] Processing: {basename}")
try:
# Import volume
volume_name = volume_operations.import_3d_image_from_disk(filepath)
# Get volume properties
dims = volume_operations.get_dimensions(volume_name)
spacing = volume_operations.get_spacing(volume_name)
scalar_range = volume_operations.get_scalar_range(volume_name)
# Threshold segmentation
threshold_params = api.ThresholdParams()
threshold_params.lower_threshold = 200
threshold_params.upper_threshold = 3000
threshold_params.filter_regions = True
threshold_params.min_region_size = 100
threshold_params.keep_largest = False
mask_name = mask_operations.threshold(volume_name, threshold_params)
# Calculate statistics
requested_stats = [api.LabelStatisticType.Volume]
mask_statistics_result = measure_operations.compute_whole_mask_statistics(mask_name, volume_name, requested_stats)
# Store record
record = {
"filename": basename,
"dim_x": dims[0],
"dim_y": dims[1],
"dim_z": dims[2],
"spacing_x": spacing[0],
"spacing_y": spacing[1],
"spacing_z": spacing[2],
"intensity_min": scalar_range[0],
"intensity_max": scalar_range[1],
"mask_volume_mm3": mask_statistics_result.total_volume,
"status": "success"
}
data_records.append(record)
# Export mask
mask_path = os.path.join(output_dir, f"{basename}_mask.nii")
mask_operations.export_mask_image_to_disk(mask_name, mask_path)
app.close_project()
except Exception as e:
print(f" Error: {e}")
record = {
"filename": basename,
"status": "failed",
"error": str(e)
}
data_records.append(record)
# Create pandas DataFrame
df = pd.DataFrame(data_records)
# Display summary statistics
print("\\n" + "="*60)
print("PANDAS DATAFRAME ANALYSIS")
print("="*60)
print(f"\\nTotal files: {len(df)}")
print(f"Success: {len(df[df['status'] == 'success'])}")
print(f"Failed: {len(df[df['status'] == 'failed'])}")
# Statistical summary for successful cases
if len(df[df['status'] == 'success']) > 0:
successful_df = df[df['status'] == 'success']
print("\\nVolume Statistics:")
print(successful_df['mask_volume_mm3'].describe())
print("\\nDimension Statistics:")
print(successful_df[['dim_x', 'dim_y', 'dim_z']].describe())
# Export full results to CSV
csv_path = os.path.join(output_dir, "batch_results.csv")
df.to_csv(csv_path, index=False)
print(f"\\nResults exported to: {csv_path}")
# Export summary statistics
summary_path = os.path.join(output_dir, "summary_statistics.csv")
summary_df = successful_df['mask_volume_mm3'].describe().to_frame()
summary_df.to_csv(summary_path)
print(f"Summary statistics: {summary_path}")
return df
# Example usage
app = api.Application()
os.makedirs("C:/Data/Output", exist_ok=True)
df = process_with_pandas(app, "C:/Data/Input", "C:/Data/Output")
# Further pandas analysis
print("\\nTop 5 largest volumes:")
print(df.nlargest(5, 'mask_volume_mm3')[['filename', 'mask_volume_mm3']])
print("\\nTop 5 smallest volumes:")
print(df.nsmallest(5, 'mask_volume_mm3')[['filename', 'mask_volume_mm3']])
Visualization with matplotlib
"""
Batch Processing with matplotlib Visualization
Create charts and plots from batch processing results.
Note: matplotlib is configured in GUI-less mode (Agg backend)
"""
import os
from pathlib import Path
import matplotlib
matplotlib.use('Agg') # Non-interactive backend for Qt compatibility
import matplotlib.pyplot as plt
import numpy as np
import ScriptingApi as api
def create_volume_histogram(app, volume_name, output_path):
"""Create histogram of volume intensities."""
volume_operations = app.get_volume_operations()
# Get volume properties
scalar_range = volume_operations.get_scalar_range(volume_name)
dims = volume_operations.get_dimensions(volume_name)
# Create histogram plot
plt.figure(figsize=(10, 6))
# Note: In real implementation, you would get actual intensity data
# This is a demonstration using the scalar range
plt.hist(np.random.randint(scalar_range[0], scalar_range[1], 10000),
bins=50, color='skyblue', edgecolor='black', alpha=0.7)
plt.xlabel('Intensity Value', fontsize=12)
plt.ylabel('Frequency', fontsize=12)
plt.title(f'Volume Intensity Distribution\\n{volume_name}', fontsize=14)
plt.grid(True, alpha=0.3)
# Save to file (not plt.show() - it will give a warning in Agg backend)
plt.savefig(output_path, dpi=150, bbox_inches='tight')
plt.close()
print(f" Histogram saved: {output_path}")
def create_batch_summary_plots(results_dict, output_dir):
"""Create summary plots from batch processing results."""
# Extract data
filenames = [r["filename"] for r in results_dict]
volumes = [r["volume_mm3"] for r in results_dict]
# Create figure with multiple subplots
fig, axes = plt.subplots(2, 2, figsize=(14, 10))
fig.suptitle('Batch Processing Summary Report', fontsize=16, fontweight='bold')
# 1. Bar chart of volumes
ax1 = axes[0, 0]
ax1.bar(range(len(volumes)), volumes, color='steelblue', alpha=0.7)
ax1.set_xlabel('File Index', fontsize=10)
ax1.set_ylabel('Volume (mm³)', fontsize=10)
ax1.set_title('Volume by File', fontsize=12)
ax1.grid(True, alpha=0.3, axis='y')
# 2. Volume distribution histogram
ax2 = axes[0, 1]
ax2.hist(volumes, bins=15, color='coral', edgecolor='black', alpha=0.7)
ax2.set_xlabel('Volume (mm³)', fontsize=10)
ax2.set_ylabel('Frequency', fontsize=10)
ax2.set_title('Volume Distribution', fontsize=12)
ax2.grid(True, alpha=0.3, axis='y')
# 3. Box plot
ax3 = axes[1, 0]
ax3.boxplot(volumes, vert=True, patch_artist=True,
boxprops=dict(facecolor='lightgreen', alpha=0.7))
ax3.set_ylabel('Volume (mm³)', fontsize=10)
ax3.set_title('Volume Statistics', fontsize=12)
ax3.grid(True, alpha=0.3, axis='y')
# 4. Summary statistics table
ax4 = axes[1, 1]
ax4.axis('off')
volumes_array = np.array(volumes)
stats_data = [
['Metric', 'Value'],
['Count', f'{len(volumes)}'],
['Mean', f'{np.mean(volumes_array):.2f} mm³'],
['Std Dev', f'{np.std(volumes_array):.2f} mm³'],
['Min', f'{np.min(volumes_array):.2f} mm³'],
['Max', f'{np.max(volumes_array):.2f} mm³'],
['Median', f'{np.median(volumes_array):.2f} mm³']
]
table = ax4.table(cellText=stats_data, cellLoc='left', loc='center',
colWidths=[0.4, 0.6])
table.auto_set_font_size(False)
table.set_fontsize(10)
table.scale(1, 2)
# Style header row
for i in range(2):
table[(0, i)].set_facecolor('#4CAF50')
table[(0, i)].set_text_props(weight='bold', color='white')
ax4.set_title('Statistical Summary', fontsize=12, pad=20)
# Save plot
plt.tight_layout()
output_path = os.path.join(output_dir, 'batch_summary.png')
plt.savefig(output_path, dpi=150, bbox_inches='tight')
plt.close()
print(f"Summary plots saved: {output_path}")
def batch_process_with_visualization(app, input_dir, output_dir):
"""Complete batch pipeline with matplotlib visualization."""
volume_operations = app.get_volume_operations()
mask_operations = app.get_mask_operations()
measure_operations = app.get_measure_operations()
# Create output directories
os.makedirs(output_dir, exist_ok=True)
plots_dir = os.path.join(output_dir, "plots")
os.makedirs(plots_dir, exist_ok=True)
files = [str(p) for p in Path(input_dir).rglob("*.nii*")]
results = []
for i, filepath in enumerate(files):
basename = os.path.splitext(os.path.basename(filepath))[0]
basename = basename.replace(".nii", "")
print(f"[{i+1}/{len(files)}] Processing: {basename}")
try:
# Import and process
volume_name = volume_operations.import_3d_image_from_disk(filepath)
# Create individual volume histogram
hist_path = os.path.join(plots_dir, f"{basename}_histogram.png")
create_volume_histogram(app, volume_name, hist_path)
# Threshold segmentation
threshold_params = api.ThresholdParams()
threshold_params.lower_threshold = 200
threshold_params.upper_threshold = 3000
threshold_params.filter_regions = True
threshold_params.min_region_size = 100
mask_name = mask_operations.threshold(volume_name, threshold_params)
# Get statistics
requested_stats = [api.LabelStatisticType.Volume]
mask_statistics_result = measure_operations.compute_whole_mask_statistics(mask_name, volume_name, requested_stats)
results.append({
"filename": basename,
"volume_mm3": mask_statistics_result.total_volume
})
app.close_project()
print(f" Volume: {mask_statistics_result.total_volume:.2f} mm³")
except Exception as e:
print(f" Error: {e}")
# Create summary plots
if results:
create_batch_summary_plots(results, output_dir)
return results
# Example usage
app = api.Application()
results = batch_process_with_visualization(
app,
"C:/Data/Input",
"C:/Data/Output"
)
print(f"\\nProcessed {len(results)} files with visualization")
Advanced Plotting: Multi-File Comparison
"""
Compare multiple datasets with advanced matplotlib plots.
"""
import os
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
import numpy as np
import ScriptingApi as api
def create_comparison_plot(results_list, output_path):
"""Create comprehensive comparison plot for multiple datasets."""
fig, axes = plt.subplots(2, 2, figsize=(15, 12))
fig.suptitle('Multi-Dataset Comparison Analysis', fontsize=16, fontweight='bold')
# Prepare data
dataset_names = [r["dataset_name"] for r in results_list]
volumes = [r["volume_mm3"] for r in results_list]
# 1. Comparison bar chart
ax1 = axes[0, 0]
colors = plt.cm.viridis(np.linspace(0, 1, len(dataset_names)))
bars = ax1.bar(range(len(dataset_names)), volumes, color=colors, alpha=0.8)
ax1.set_xticks(range(len(dataset_names)))
ax1.set_xticklabels(dataset_names, rotation=45, ha='right')
ax1.set_ylabel('Volume (mm³)', fontsize=11)
ax1.set_title('Volume Comparison', fontsize=12, fontweight='bold')
ax1.grid(True, alpha=0.3, axis='y')
# Add value labels on bars
for i, (bar, vol) in enumerate(zip(bars, volumes)):
height = bar.get_height()
ax1.text(bar.get_x() + bar.get_width()/2., height,
f'{vol:.1f}',
ha='center', va='bottom', fontsize=9)
# 2. Normalized comparison (percentage of max)
ax2 = axes[0, 1]
max_volume = max(volumes)
normalized = [(v/max_volume)*100 for v in volumes]
ax2.barh(dataset_names, normalized, color='coral', alpha=0.7)
ax2.set_xlabel('Percentage of Maximum (%)', fontsize=11)
ax2.set_title('Normalized Volume Comparison', fontsize=12, fontweight='bold')
ax2.grid(True, alpha=0.3, axis='x')
# 3. Sorted ranking
ax3 = axes[1, 0]
sorted_indices = np.argsort(volumes)[::-1]
sorted_names = [dataset_names[i] for i in sorted_indices]
sorted_volumes = [volumes[i] for i in sorted_indices]
ax3.plot(range(len(sorted_volumes)), sorted_volumes,
marker='o', linewidth=2, markersize=8, color='steelblue')
ax3.set_xticks(range(len(sorted_names)))
ax3.set_xticklabels(sorted_names, rotation=45, ha='right')
ax3.set_ylabel('Volume (mm³)', fontsize=11)
ax3.set_title('Volume Ranking (Highest to Lowest)', fontsize=12, fontweight='bold')
ax3.grid(True, alpha=0.3)
# 4. Statistics summary
ax4 = axes[1, 1]
ax4.axis('off')
volumes_array = np.array(volumes)
summary_text = f"""
Statistical Summary:
Total Datasets: {len(volumes)}
Mean Volume: {np.mean(volumes_array):.2f} mm³
Std Deviation: {np.std(volumes_array):.2f} mm³
Minimum: {np.min(volumes_array):.2f} mm³
Maximum: {np.max(volumes_array):.2f} mm³
Median: {np.median(volumes_array):.2f} mm³
Range: {np.max(volumes_array) - np.min(volumes_array):.2f} mm³
Coefficient of Variation: {(np.std(volumes_array)/np.mean(volumes_array))*100:.2f}%
"""
ax4.text(0.1, 0.5, summary_text, fontsize=11, verticalalignment='center',
family='monospace', bbox=dict(boxstyle='round', facecolor='wheat', alpha=0.5))
plt.tight_layout()
plt.savefig(output_path, dpi=150, bbox_inches='tight')
plt.close()
print(f"Comparison plot saved: {output_path}")
# Example usage with sample data
results = [
{"dataset_name": "Patient_001", "volume_mm3": 1250.5},
{"dataset_name": "Patient_002", "volume_mm3": 1180.3},
{"dataset_name": "Patient_003", "volume_mm3": 1420.8},
{"dataset_name": "Patient_004", "volume_mm3": 1095.2},
{"dataset_name": "Patient_005", "volume_mm3": 1310.7}
]
create_comparison_plot(results, "C:/Data/Output/comparison_plot.png")
AI-Powered Batch Segmentation
TotalSegmentator Batch Processing
"""
AI Batch Segmentation Pipeline
Process multiple CT scans with TotalSegmentator.
"""
import os
from pathlib import Path
import csv
from datetime import datetime
import ScriptingApi as api
def run_ai_segmentation(app, input_path, output_dir, structures):
"""Run TotalSegmentator and export selected structures."""
volume_operations = app.get_volume_operations()
mask_operations = app.get_mask_operations()
surface_operations = app.get_surface_operations()
measure_operations = app.get_measure_operations()
ai_segmentation = app.get_ai_segmentation()
basename = os.path.splitext(os.path.basename(input_path))[0]
basename = basename.replace(".nii", "") # Handle .nii.gz
# Import
volume_name = volume_operations.import_3d_image_from_disk(input_path)
# Run TotalSegmentator
ai_segmentation.set_model_type(api.AiSegmentationModelType.TotalSegmentator)
ts_params = api.TotalSegmentatorParams()
ts_params.task = "total"
ts_params.device = "gpu"
mask_names = ai_segmentation.run_total_segmentator([volume_name], ts_params)
# Process selected structures
stats_data = []
requested_stats = [api.LabelStatisticType.Volume]
for mask_name in mask_names:
# Check if this is one of the structures we want
structure_name = mask_name.lower().replace(" ", "_")
if any(struct in structure_name for struct in structures):
# Get statistics
mask_statistics_result = measure_operations.compute_whole_mask_statistics(mask_name, volume_name, requested_stats)
stats_data.append({
"file": basename,
"structure": mask_name,
"volume_mm3": mask_statistics_result.total_volume
})
# Generate and export surface
mask_to_surface_params = api.MaskToSurfaceParams()
mask_to_surface_params.smooth_iterations = 20
mask_to_surface_params.smooth_factor = 0.06
mask_to_surface_params.triangle_reduction_percent = 50
surface_names = mask_operations.convert_to_surface_objects([mask_name], mask_to_surface_params)
if not surface_names:
raise RuntimeError(f"No surface generated for mask '{mask_name}'")
if isinstance(surface_names, (list, tuple)):
surface_id = surface_names[0]
else:
surface_id = surface_names
surface_path = os.path.join(output_dir, f"{basename}_{mask_name}.stl")
surface_operations.export_surface_to_disk(surface_id, surface_path)
app.close_project()
return stats_data
def main():
app = api.Application()
input_dir = "C:/Data/CT_Scans"
output_dir = "C:/Data/AI_Results"
os.makedirs(output_dir, exist_ok=True)
# Structures to extract
target_structures = [
"liver",
"spleen",
"kidney_left",
"kidney_right"
]
files = [str(p) for p in Path(input_dir).rglob("*.nii*")]
all_stats = []
for i, filepath in enumerate(files):
print(f"[{i+1}/{len(files)}] Processing: {os.path.basename(filepath)}")
try:
stats = run_ai_segmentation(app, filepath, output_dir, target_structures)
all_stats.extend(stats)
except Exception as e:
print(f" Error: {e}")
# Export statistics to CSV
csv_path = os.path.join(output_dir, "organ_statistics.csv")
with open(csv_path, 'w', newline='') as f:
writer = csv.DictWriter(f, fieldnames=["file", "structure", "volume_mm3", "surface_area_mm2"])
writer.writeheader()
writer.writerows(all_stats)
print(f"Statistics exported to: {csv_path}")
main()
Generating Reports
CSV Statistics Export
"""
Generate statistical reports from batch processing.
"""
import os
import csv
import ScriptingApi as api
def collect_statistics(app, mask_name, volume_name):
"""Collect comprehensive statistics for a mask."""
measure_ops = app.get_measure_operations()
requested_stats = [
api.LabelStatisticType.Volume,
api.LabelStatisticType.BoundingBox
]
mask_statistics_result = measure_operations.compute_whole_mask_statistics(mask_name, volume_name, requested_stats)
return {
"volume_mm3": mask_statistics_result.total_volume,
"voxel_count": mask_statistics_result.total_voxel_count
}
def export_batch_report(results, output_path):
"""Export batch results to CSV."""
if not results:
return
fieldnames = list(results[0].keys())
with open(output_path, 'w', newline='') as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(results)
print(f"Report exported to: {output_path}")
Summary Report Generation
def generate_summary_report(results, output_path):
"""Generate a text summary report."""
successful = [r for r in results if r.get("status") == "success"]
failed = [r for r in results if r.get("status") == "failed"]
volumes = [r.get("volume", 0) for r in successful if r.get("volume")]
report = []
report.append("=" * 60)
report.append("BATCH PROCESSING SUMMARY REPORT")
report.append("=" * 60)
report.append("")
report.append(f"Total files processed: {len(results)}")
report.append(f"Successful: {len(successful)}")
report.append(f"Failed: {len(failed)}")
report.append("")
if volumes:
report.append("Volume Statistics:")
report.append(f" Mean: {sum(volumes)/len(volumes):.2f} mm³")
report.append(f" Min: {min(volumes):.2f} mm³")
report.append(f" Max: {max(volumes):.2f} mm³")
report.append("")
if failed:
report.append("Failed Files:")
for f in failed:
report.append(f" - {f.get('input')}: {f.get('error')}")
report.append("")
report.append("=" * 60)
with open(output_path, 'w') as f:
f.write("\n".join(report))
print(f"Summary report saved to: {output_path}")
Complete Production Pipeline
"""
Production-Ready Batch Processing Pipeline
Complete example with all best practices.
"""
import os
import sys
from pathlib import Path
import json
import csv
import logging
from datetime import datetime
import ScriptingApi as api
class BatchProcessor:
"""Handles batch processing of volume data."""
def __init__(self, config_path):
self.config = self._load_config(config_path)
self.app = api.Application()
self.results = []
self._setup_output()
self._setup_logging()
def _load_config(self, path):
with open(path, 'r') as f:
return json.load(f)
def _setup_output(self):
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
self.output_root = os.path.join(
self.config["output_dir"],
f"batch_{timestamp}"
)
self.dirs = {
"masks": os.path.join(self.output_root, "masks"),
"surfaces": os.path.join(self.output_root, "surfaces"),
"reports": os.path.join(self.output_root, "reports"),
"logs": os.path.join(self.output_root, "logs")
}
for d in self.dirs.values():
os.makedirs(d, exist_ok=True)
def _setup_logging(self):
log_path = os.path.join(self.dirs["logs"], "processing.log")
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[
logging.FileHandler(log_path),
logging.StreamHandler()
]
)
self.logger = logging.getLogger(__name__)
def discover_files(self):
"""Find all input files."""
input_dir = self.config["input_dir"]
extensions = self.config.get("extensions", ["*.nii", "*.nii.gz"])
files = []
for ext in extensions:
# Use Path.glob for non-recursive extension matching
files.extend([str(p) for p in Path(input_dir).glob(ext)])
self.logger.info(f"Discovered {len(files)} files")
return files
def process_file(self, filepath):
"""Process a single file."""
volume_operations = self.app.get_volume_operations()
mask_operations = self.app.get_mask_operations()
surface_operations = self.app.get_surface_operations()
measure_operations = self.app.get_measure_operations()
basename = os.path.basename(filepath)
name = basename.replace(".nii.gz", "").replace(".nii", "")
result = {
"input": filepath,
"name": name,
"timestamp": datetime.now().isoformat()
}
try:
# Import
self.logger.info(f"Importing: {basename}")
volume_name = volume_operations.import_3d_image_from_disk(filepath)
# Threshold
self.logger.info(" Segmenting...")
threshold_params = api.ThresholdParams()
threshold_params.lower_threshold = self.config["threshold"]["lower"]
threshold_params.upper_threshold = self.config["threshold"]["upper"]
threshold_params.filter_regions = self.config["threshold"].get("filter_regions", True)
threshold_params.min_region_size = self.config["threshold"].get("min_region_size", 100)
threshold_params.keep_largest = False
mask_name = mask_operations.threshold(volume_name, threshold_params)
# Statistics
requested_stats = [api.LabelStatisticType.Volume]
mask_statistics_result = measure_operations.compute_whole_mask_statistics(mask_name, volume_name, requested_stats)
result["volume_mm3"] = mask_statistics_result.total_volume
# Export mask
mask_path = os.path.join(self.dirs["masks"], f"{name}_mask.nii")
mask_operations.export_mask_image_to_disk(mask_name, mask_path)
result["mask_output"] = mask_path
# Surface generation
if self.config.get("generate_surface", True):
self.logger.info(" Generating surface...")
surf_config = self.config.get("surface", {})
mask_to_surface_params = api.MaskToSurfaceParams()
mask_to_surface_params.smooth_iterations = surf_config.get("smooth_iterations", 20)
mask_to_surface_params.smooth_factor = surf_config.get("smooth_factor", 0.06)
mask_to_surface_params.triangle_reduction_percent = surf_config.get("triangle_reduction_percent", 50)
surface_names = mask_operations.convert_to_surface_objects([mask_name], mask_to_surface_params)
if not surface_names:
raise RuntimeError(f"No surface generated for mask '{mask_name}'")
if isinstance(surface_names, (list, tuple)):
surface_id = surface_names[0]
else:
surface_id = surface_names
surface_path = os.path.join(self.dirs["surfaces"], f"{name}_surface.stl")
surface_operations.export_surface_to_disk(surface_id, surface_path)
result["surface_output"] = surface_path
result["status"] = "success"
self.logger.info(f" Complete: {mask_statistics_result.total_volume:.2f} mm³")
except Exception as e:
result["status"] = "failed"
result["error"] = str(e)
self.logger.error(f" Failed: {e}")
finally:
self.app.close_project()
return result
def run(self):
"""Execute the batch pipeline."""
self.logger.info("=" * 60)
self.logger.info("BATCH PROCESSING STARTED")
self.logger.info("=" * 60)
files = self.discover_files()
for i, filepath in enumerate(files):
self.logger.info(f"[{i+1}/{len(files)}] {os.path.basename(filepath)}")
result = self.process_file(filepath)
self.results.append(result)
self._generate_reports()
self.logger.info("=" * 60)
self.logger.info("BATCH PROCESSING COMPLETE")
self.logger.info("=" * 60)
def _generate_reports(self):
"""Generate output reports."""
# CSV report
csv_path = os.path.join(self.dirs["reports"], "results.csv")
fieldnames = ["name", "status", "volume_mm3", "surface_area_mm2", "error"]
with open(csv_path, 'w', newline='') as f:
writer = csv.DictWriter(f, fieldnames=fieldnames, extrasaction='ignore')
writer.writeheader()
writer.writerows(self.results)
self.logger.info(f"CSV report: {csv_path}")
# Summary
success = len([r for r in self.results if r["status"] == "success"])
failed = len([r for r in self.results if r["status"] == "failed"])
summary_path = os.path.join(self.dirs["reports"], "summary.txt")
with open(summary_path, 'w') as f:
f.write(f"Batch Processing Summary\n")
f.write(f"========================\n\n")
f.write(f"Total: {len(self.results)}\n")
f.write(f"Success: {success}\n")
f.write(f"Failed: {failed}\n")
self.logger.info(f"Summary: {summary_path}")
# Run the pipeline
if __name__ == "__main__":
config_path = "C:/Scripts/batch_config.json"
processor = BatchProcessor(config_path)
processor.run()
Best Practices
Memory Management
- Close projects after processing each file
- Process files sequentially for large datasets
- Monitor system resources during execution
Error Recovery
- Use try-except blocks around each file
- Log errors with sufficient detail
- Continue processing remaining files after failures
- Save partial results periodically
Performance Optimization
- Use fast mode for initial testing
- Enable GPU processing when available
- Batch similar operations together
- Pre-validate input files before processing
Reproducibility
- Save configuration files with results
- Log all parameters used
- Include timestamps in output folders
- Version your processing scripts
Troubleshooting
| Issue | Solution |
|---|---|
| Out of memory | Close project after each file, reduce batch size |
| Script stops unexpectedly | Add try-except blocks, enable logging |
| Inconsistent results | Verify configuration, check input file quality |
| Slow processing | Enable GPU for AI operations, optimize parameters |
| Missing output files | Check export paths, verify write permissions |
| API method not found | Ensure using correct method names (e.g., import_3d_image_from_disk) |
Next Steps
- Explore the Scripting API Reference for additional operations
- Review Quality Analysis Tutorial for analysis pipelines
- See AI-Powered Segmentation Tutorial for advanced AI workflows