Skip to main content

Batch Processing and Automation

Processing large numbers of datasets manually is time-consuming and error-prone. This tutorial demonstrates how to create automated pipelines that process multiple files consistently and efficiently.

Estimated time: 60 minutes

Prerequisites:

  • Completed Python Scripting Fundamentals
  • Understanding of Volvicon's core operations
  • Custom Scripting feature enabled (available on Professional and above licenses)

Batch Processing Concepts

Why Automate?

Manual ProcessingAutomated Processing
Repetitive clickingSingle script execution
Human error variabilityConsistent parameters
Limited to working hoursRuns overnight
One dataset at a timeParallel or sequential batches
No audit trailLogged operations

Pipeline Architecture

A typical batch pipeline consists of:

  1. Input discovery — Find files to process
  2. Iteration — Loop through each file
  3. Processing — Apply operations
  4. Output — Export results
  5. Logging — Record success/failure

File Discovery

Finding Files with Python

import os

def find_dicom_folders(root_path):
"""Find all folders containing DICOM files."""
dicom_folders = []
for dirpath, dirnames, filenames in os.walk(root_path):
dcm_files = [f for f in filenames if f.endswith('.dcm')]
if dcm_files:
dicom_folders.append(dirpath)
return dicom_folders

def find_nifti_files(root_path):
"""Find all NIfTI files in a directory tree."""
from pathlib import Path
return [str(p) for p in Path(root_path).rglob("*.nii*")]

# Example usage
input_root = "C:/Data/Scans"
dicom_dirs = find_dicom_folders(input_root)
nifti_files = find_nifti_files(input_root)

print(f"Found {len(dicom_dirs)} DICOM folders")
print(f"Found {len(nifti_files)} NIfTI files")

Organizing Output

import os
from datetime import datetime

def create_output_structure(base_path, dataset_name):
"""Create organized output folder structure."""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_root = os.path.join(base_path, f"{dataset_name}_{timestamp}")

subdirs = ["masks", "surfaces", "reports", "logs"]
for subdir in subdirs:
os.makedirs(os.path.join(output_root, subdir), exist_ok=True)

return output_root

Basic Batch Pipeline

Processing Multiple NIfTI Files

"""
Basic Batch Processing Pipeline
Process multiple NIfTI files with threshold segmentation.
"""
import os
from pathlib import Path
import ScriptingApi as api

def process_single_file(app, input_path, output_dir):
"""Process a single volume file."""
volume_operations = app.get_volume_operations()
mask_operations = app.get_mask_operations()

# Get filename without extension
basename = os.path.splitext(os.path.basename(input_path))[0]
basename = basename.replace(".nii", "") # Handle .nii.gz

# Import volume
volume_name = volume_operations.import_3d_image_from_disk(input_path)

# Threshold segmentation
threshold_params = api.ThresholdParams()
threshold_params.lower_threshold = 200
threshold_params.upper_threshold = 3000
threshold_params.filter_regions = True
threshold_params.min_region_size = 100
threshold_params.keep_largest = False

mask_name = mask_operations.threshold(volume_name, threshold_params)

# Generate surface
mask_to_surface_params = api.MaskToSurfaceParams()
mask_to_surface_params.smooth_iterations = 20
mask_to_surface_params.smooth_factor = 0.06
mask_to_surface_params.triangle_reduction_percent = 50

surface_results = mask_operations.convert_to_surface_objects([mask_name], mask_to_surface_params)

# Ensure we have a surface to export
if not surface_results:
raise RuntimeError(f"No surface generated for mask '{mask_name}'")

# Normalize return type and pick the first surface id
if isinstance(surface_results, (list, tuple)):
surface_id = surface_results[0]
else:
surface_id = surface_results

surface_operations = app.get_surface_operations()

# Export results
mask_path = os.path.join(output_dir, f"{basename}_mask.nii")
surface_path = os.path.join(output_dir, f"{basename}_surface.stl")

mask_operations.export_mask_image_to_disk(mask_name, mask_path)
surface_operations.export_surface_to_disk(surface_id, surface_path)

# Close project to free memory
app.close_project()

return True

def main():
app = api.Application()

# Configuration
input_dir = "C:/Data/Input"
output_dir = "C:/Data/Output"

# Create output directory
os.makedirs(output_dir, exist_ok=True)

# Find all NIfTI files
files = [str(p) for p in Path(input_dir).rglob("*.nii*")]

print(f"Found {len(files)} files to process")

# Process each file
for i, filepath in enumerate(files):
print(f"Processing {i+1}/{len(files)}: {os.path.basename(filepath)}")
try:
process_single_file(app, filepath, output_dir)
print(" Success")
except Exception as e:
print(f" Failed: {e}")

print("Batch processing complete!")

main()

Error Handling and Logging

Comprehensive Logging

"""
Batch Pipeline with Logging
Records all operations and errors to a log file.
"""
import os
from pathlib import Path
import logging
from datetime import datetime
import ScriptingApi as api

def setup_logging(output_dir):
"""Configure logging to file and console."""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
log_path = os.path.join(output_dir, f"batch_log_{timestamp}.txt")

logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[
logging.FileHandler(log_path),
logging.StreamHandler()
]
)
return logging.getLogger(__name__)

def process_with_logging(app, input_path, output_dir, logger):
"""Process a file with detailed logging."""
volume_operations = app.get_volume_operations()
mask_operations = app.get_mask_operations()
measure_operations = app.get_measure_operations()

basename = os.path.splitext(os.path.basename(input_path))[0]
basename = basename.replace(".nii", "") # Handle .nii.gz

logger.info(f"Starting processing: {basename}")

# Import
logger.info(" Importing volume...")
volume_name = volume_operations.import_3d_image_from_disk(input_path)
logger.info(f" Imported as: {volume_name}")

# Get volume info
dims = volume_operations.get_dimensions(volume_name)
spacing = volume_operations.get_spacing(volume_name)
logger.info(f" Dimensions: {dims}")
logger.info(f" Spacing: {spacing}")

# Segment
logger.info(" Running threshold segmentation...")
threshold_params = api.ThresholdParams()
threshold_params.lower_threshold = 200
threshold_params.upper_threshold = 3000
threshold_params.filter_regions = True
threshold_params.min_region_size = 100
threshold_params.keep_largest = False

mask_name = mask_operations.threshold(volume_name, threshold_params)

# Statistics
requested_stats = [api.LabelStatisticType.Volume]
mask_statistics_result = measure_operations.compute_whole_mask_statistics(mask_name, volume_name, requested_stats)
logger.info(f" Mask volume: {mask_statistics_result.total_volume:.2f} mm³")

# Export
output_path = os.path.join(output_dir, f"{basename}_mask.nii")
mask_operations.export_mask_image_to_disk(mask_name, output_path)
logger.info(f" Exported to: {output_path}")

app.close_project()
logger.info(f"Completed: {basename}")

return {
"input": input_path,
"output": output_path,
"volume": mask_statistics_result.total_volume,
"status": "success"
}

def main():
app = api.Application()

input_dir = "C:/Data/Input"
output_dir = "C:/Data/Output"
os.makedirs(output_dir, exist_ok=True)

logger = setup_logging(output_dir)
logger.info("=" * 50)
logger.info("Batch Processing Started")
logger.info("=" * 50)

files = [str(p) for p in Path(input_dir).rglob("*.nii*")]
logger.info(f"Found {len(files)} files")

results = []
success_count = 0
failure_count = 0

for i, filepath in enumerate(files):
logger.info(f"[{i+1}/{len(files)}] {os.path.basename(filepath)}")
try:
result = process_with_logging(app, filepath, output_dir, logger)
results.append(result)
success_count += 1
except Exception as e:
logger.error(f"Failed: {e}")
results.append({
"input": filepath,
"status": "failed",
"error": str(e)
})
failure_count += 1

# Summary
logger.info("=" * 50)
logger.info("Batch Processing Complete")
logger.info(f" Successful: {success_count}")
logger.info(f" Failed: {failure_count}")
logger.info("=" * 50)

main()

Advanced Pipeline Features

Configuration Files

Use JSON configuration for flexible pipelines:

"""
Configurable Batch Pipeline
Reads processing parameters from a JSON configuration file.
"""
import os
import json
from pathlib import Path
import ScriptingApi as api

def load_config(config_path):
"""Load configuration from JSON file."""
with open(config_path, 'r') as f:
return json.load(f)

def process_with_config(app, input_path, output_dir, config):
"""Process using configuration parameters."""
volume_operations = app.get_volume_operations()
mask_operations = app.get_mask_operations()
surface_operations = app.get_surface_operations()

basename = os.path.splitext(os.path.basename(input_path))[0]
basename = basename.replace(".nii", "") # Handle .nii.gz

# Import
volume_name = volume_operations.import_3d_image_from_disk(input_path)

# Threshold from config
threshold_config = config.get("threshold", {})
threshold_params = api.ThresholdParams()
threshold_params.lower_threshold = threshold_config.get("lower", 200)
threshold_params.upper_threshold = threshold_config.get("upper", 3000)
threshold_params.filter_regions = threshold_config.get("filter_regions", True)
threshold_params.min_region_size = threshold_config.get("min_region_size", 100)
threshold_params.keep_largest = False

mask_name = mask_operations.threshold(volume_name, threshold_params)

# Surface generation from config
if config.get("generate_surface", True):
surface_config = config.get("surface", {})
mask_to_surface_params = api.MaskToSurfaceParams()
mask_to_surface_params.smooth_iterations = surface_config.get("smooth_iterations", 20)
mask_to_surface_params.smooth_factor = surface_config.get("smooth_factor", 0.06)
mask_to_surface_params.triangle_reduction_percent = surface_config.get("triangle_reduction_percent", 50)

surface_names = mask_operations.convert_to_surface_objects([mask_name], mask_to_surface_params)

if not surface_names:
raise RuntimeError(f"No surface generated for mask '{mask_name}'")

surface_path = os.path.join(output_dir, f"{basename}_surface.stl")
surface_operations.export_surface_to_disk(surface_names[0], surface_path)

# Export mask
mask_path = os.path.join(output_dir, f"{basename}_mask.nii")
mask_operations.export_mask_image_to_disk(mask_name, mask_path)

app.close_project()

def main():
app = api.Application()

# Load configuration
config = load_config("C:/Scripts/batch_config.json")

input_dir = config.get("input_dir", "C:/Data/Input")
output_dir = config.get("output_dir", "C:/Data/Output")

os.makedirs(output_dir, exist_ok=True)

files = [str(p) for p in Path(input_dir).rglob("*.nii*")]

for filepath in files:
print(f"Processing: {os.path.basename(filepath)}")
try:
process_with_config(app, filepath, output_dir, config)
except Exception as e:
print(f" Error: {e}")

main()

Example configuration file (batch_config.json):

{
"input_dir": "C:/Data/Input",
"output_dir": "C:/Data/Output",
"threshold": {
"lower": 200,
"upper": 3000,
"filter_regions": true,
"min_region_size": 100
},
"generate_surface": true,
"surface": {
"smooth_iterations": 20,
"smooth_factor": 0.06,
"triangle_reduction_percent": 50
}
}

Advanced Analysis with NumPy and pandas

Volume Statistics with NumPy

"""
Advanced Volume Analysis using NumPy
Calculate advanced statistics using NumPy arrays.
"""
import os
from pathlib import Path
import numpy as np
import ScriptingApi as api

def analyze_volume_with_numpy(app, volume_name):
"""Perform advanced volume analysis using NumPy."""
volume_operations = app.get_volume_operations()

# Get volume properties
dims = volume_operations.get_dimensions(volume_name)
spacing = volume_operations.get_spacing(volume_name)
origin = volume_operations.get_origin(volume_name)
scalar_range = volume_operations.get_scalar_range(volume_name)

# Calculate physical dimensions using NumPy
dims_array = np.array(dims)
spacing_array = np.array(spacing)

# Physical size in mm
physical_size = dims_array * spacing_array
total_volume_mm3 = np.prod(physical_size)

# Calculate voxel volume
voxel_volume = np.prod(spacing_array)
total_voxels = np.prod(dims_array)

# Intensity range analysis
intensity_min, intensity_max = scalar_range
intensity_range = intensity_max - intensity_min

return {
"dimensions": dims,
"spacing": spacing,
"physical_size_mm": physical_size.tolist(),
"total_volume_mm3": float(total_volume_mm3),
"voxel_volume_mm3": float(voxel_volume),
"total_voxels": int(total_voxels),
"intensity_min": intensity_min,
"intensity_max": intensity_max,
"intensity_range": intensity_range
}

def batch_analyze_with_numpy(app, input_dir, output_dir):
"""Batch analyze multiple volumes and save NumPy statistics."""
volume_operations = app.get_volume_operations()

files = [str(p) for p in Path(input_dir).rglob("*.nii*")]
all_results = []

for i, filepath in enumerate(files):
basename = os.path.splitext(os.path.basename(filepath))[0]
basename = basename.replace(".nii", "")

print(f"[{i+1}/{len(files)}] Analyzing: {basename}")

try:
# Import volume
volume_name = volume_operations.import_3d_image_from_disk(filepath)

# Analyze with NumPy
stats = analyze_volume_with_numpy(app, volume_name)
stats["filename"] = basename
all_results.append(stats)

# Close project to free memory
app.close_project()

print(f" Volume: {stats['total_volume_mm3']:.2f} mm³")

except Exception as e:
print(f" Error: {e}")

# Calculate summary statistics using NumPy
if all_results:
volumes = np.array([r["total_volume_mm3"] for r in all_results])

print("\\n" + "="*50)
print("SUMMARY STATISTICS (NumPy)")
print("="*50)
print(f"Total files analyzed: {len(all_results)}")
print(f"Volume mean: {np.mean(volumes):.2f} mm³")
print(f"Volume std: {np.std(volumes):.2f} mm³")
print(f"Volume min: {np.min(volumes):.2f} mm³")
print(f"Volume max: {np.max(volumes):.2f} mm³")
print(f"Volume median: {np.median(volumes):.2f} mm³")

return all_results

# Example usage
app = api.Application()
results = batch_analyze_with_numpy(app, "C:/Data/Input", "C:/Data/Output")

pandas DataFrame Analysis

"""
Batch Processing with pandas DataFrames
Organize and analyze batch results using pandas.
"""
import os
from pathlib import Path
import pandas as pd
import ScriptingApi as api

def process_with_pandas(app, input_dir, output_dir):
"""Process multiple files and organize results in pandas DataFrame."""
volume_operations = app.get_volume_operations()
mask_operations = app.get_mask_operations()
measure_operations = app.get_measure_operations()

files = [str(p) for p in Path(input_dir).rglob("*.nii*")]

# Create lists to store results
data_records = []

for i, filepath in enumerate(files):
basename = os.path.splitext(os.path.basename(filepath))[0]
basename = basename.replace(".nii", "")

print(f"[{i+1}/{len(files)}] Processing: {basename}")

try:
# Import volume
volume_name = volume_operations.import_3d_image_from_disk(filepath)

# Get volume properties
dims = volume_operations.get_dimensions(volume_name)
spacing = volume_operations.get_spacing(volume_name)
scalar_range = volume_operations.get_scalar_range(volume_name)

# Threshold segmentation
threshold_params = api.ThresholdParams()
threshold_params.lower_threshold = 200
threshold_params.upper_threshold = 3000
threshold_params.filter_regions = True
threshold_params.min_region_size = 100
threshold_params.keep_largest = False

mask_name = mask_operations.threshold(volume_name, threshold_params)

# Calculate statistics
requested_stats = [api.LabelStatisticType.Volume]
mask_statistics_result = measure_operations.compute_whole_mask_statistics(mask_name, volume_name, requested_stats)

# Store record
record = {
"filename": basename,
"dim_x": dims[0],
"dim_y": dims[1],
"dim_z": dims[2],
"spacing_x": spacing[0],
"spacing_y": spacing[1],
"spacing_z": spacing[2],
"intensity_min": scalar_range[0],
"intensity_max": scalar_range[1],
"mask_volume_mm3": mask_statistics_result.total_volume,
"status": "success"
}
data_records.append(record)

# Export mask
mask_path = os.path.join(output_dir, f"{basename}_mask.nii")
mask_operations.export_mask_image_to_disk(mask_name, mask_path)

app.close_project()

except Exception as e:
print(f" Error: {e}")
record = {
"filename": basename,
"status": "failed",
"error": str(e)
}
data_records.append(record)

# Create pandas DataFrame
df = pd.DataFrame(data_records)

# Display summary statistics
print("\\n" + "="*60)
print("PANDAS DATAFRAME ANALYSIS")
print("="*60)
print(f"\\nTotal files: {len(df)}")
print(f"Success: {len(df[df['status'] == 'success'])}")
print(f"Failed: {len(df[df['status'] == 'failed'])}")

# Statistical summary for successful cases
if len(df[df['status'] == 'success']) > 0:
successful_df = df[df['status'] == 'success']

print("\\nVolume Statistics:")
print(successful_df['mask_volume_mm3'].describe())

print("\\nDimension Statistics:")
print(successful_df[['dim_x', 'dim_y', 'dim_z']].describe())

# Export full results to CSV
csv_path = os.path.join(output_dir, "batch_results.csv")
df.to_csv(csv_path, index=False)
print(f"\\nResults exported to: {csv_path}")

# Export summary statistics
summary_path = os.path.join(output_dir, "summary_statistics.csv")
summary_df = successful_df['mask_volume_mm3'].describe().to_frame()
summary_df.to_csv(summary_path)
print(f"Summary statistics: {summary_path}")

return df

# Example usage
app = api.Application()
os.makedirs("C:/Data/Output", exist_ok=True)
df = process_with_pandas(app, "C:/Data/Input", "C:/Data/Output")

# Further pandas analysis
print("\\nTop 5 largest volumes:")
print(df.nlargest(5, 'mask_volume_mm3')[['filename', 'mask_volume_mm3']])

print("\\nTop 5 smallest volumes:")
print(df.nsmallest(5, 'mask_volume_mm3')[['filename', 'mask_volume_mm3']])

Visualization with matplotlib

"""
Batch Processing with matplotlib Visualization
Create charts and plots from batch processing results.
Note: matplotlib is configured in GUI-less mode (Agg backend)
"""
import os
from pathlib import Path
import matplotlib
matplotlib.use('Agg') # Non-interactive backend for Qt compatibility
import matplotlib.pyplot as plt
import numpy as np
import ScriptingApi as api

def create_volume_histogram(app, volume_name, output_path):
"""Create histogram of volume intensities."""
volume_operations = app.get_volume_operations()

# Get volume properties
scalar_range = volume_operations.get_scalar_range(volume_name)
dims = volume_operations.get_dimensions(volume_name)

# Create histogram plot
plt.figure(figsize=(10, 6))

# Note: In real implementation, you would get actual intensity data
# This is a demonstration using the scalar range
plt.hist(np.random.randint(scalar_range[0], scalar_range[1], 10000),
bins=50, color='skyblue', edgecolor='black', alpha=0.7)

plt.xlabel('Intensity Value', fontsize=12)
plt.ylabel('Frequency', fontsize=12)
plt.title(f'Volume Intensity Distribution\\n{volume_name}', fontsize=14)
plt.grid(True, alpha=0.3)

# Save to file (not plt.show() - it will give a warning in Agg backend)
plt.savefig(output_path, dpi=150, bbox_inches='tight')
plt.close()

print(f" Histogram saved: {output_path}")

def create_batch_summary_plots(results_dict, output_dir):
"""Create summary plots from batch processing results."""

# Extract data
filenames = [r["filename"] for r in results_dict]
volumes = [r["volume_mm3"] for r in results_dict]

# Create figure with multiple subplots
fig, axes = plt.subplots(2, 2, figsize=(14, 10))
fig.suptitle('Batch Processing Summary Report', fontsize=16, fontweight='bold')

# 1. Bar chart of volumes
ax1 = axes[0, 0]
ax1.bar(range(len(volumes)), volumes, color='steelblue', alpha=0.7)
ax1.set_xlabel('File Index', fontsize=10)
ax1.set_ylabel('Volume (mm³)', fontsize=10)
ax1.set_title('Volume by File', fontsize=12)
ax1.grid(True, alpha=0.3, axis='y')

# 2. Volume distribution histogram
ax2 = axes[0, 1]
ax2.hist(volumes, bins=15, color='coral', edgecolor='black', alpha=0.7)
ax2.set_xlabel('Volume (mm³)', fontsize=10)
ax2.set_ylabel('Frequency', fontsize=10)
ax2.set_title('Volume Distribution', fontsize=12)
ax2.grid(True, alpha=0.3, axis='y')

# 3. Box plot
ax3 = axes[1, 0]
ax3.boxplot(volumes, vert=True, patch_artist=True,
boxprops=dict(facecolor='lightgreen', alpha=0.7))
ax3.set_ylabel('Volume (mm³)', fontsize=10)
ax3.set_title('Volume Statistics', fontsize=12)
ax3.grid(True, alpha=0.3, axis='y')

# 4. Summary statistics table
ax4 = axes[1, 1]
ax4.axis('off')

volumes_array = np.array(volumes)
stats_data = [
['Metric', 'Value'],
['Count', f'{len(volumes)}'],
['Mean', f'{np.mean(volumes_array):.2f} mm³'],
['Std Dev', f'{np.std(volumes_array):.2f} mm³'],
['Min', f'{np.min(volumes_array):.2f} mm³'],
['Max', f'{np.max(volumes_array):.2f} mm³'],
['Median', f'{np.median(volumes_array):.2f} mm³']
]

table = ax4.table(cellText=stats_data, cellLoc='left', loc='center',
colWidths=[0.4, 0.6])
table.auto_set_font_size(False)
table.set_fontsize(10)
table.scale(1, 2)

# Style header row
for i in range(2):
table[(0, i)].set_facecolor('#4CAF50')
table[(0, i)].set_text_props(weight='bold', color='white')

ax4.set_title('Statistical Summary', fontsize=12, pad=20)

# Save plot
plt.tight_layout()
output_path = os.path.join(output_dir, 'batch_summary.png')
plt.savefig(output_path, dpi=150, bbox_inches='tight')
plt.close()

print(f"Summary plots saved: {output_path}")

def batch_process_with_visualization(app, input_dir, output_dir):
"""Complete batch pipeline with matplotlib visualization."""
volume_operations = app.get_volume_operations()
mask_operations = app.get_mask_operations()
measure_operations = app.get_measure_operations()

# Create output directories
os.makedirs(output_dir, exist_ok=True)
plots_dir = os.path.join(output_dir, "plots")
os.makedirs(plots_dir, exist_ok=True)

files = [str(p) for p in Path(input_dir).rglob("*.nii*")]
results = []

for i, filepath in enumerate(files):
basename = os.path.splitext(os.path.basename(filepath))[0]
basename = basename.replace(".nii", "")

print(f"[{i+1}/{len(files)}] Processing: {basename}")

try:
# Import and process
volume_name = volume_operations.import_3d_image_from_disk(filepath)

# Create individual volume histogram
hist_path = os.path.join(plots_dir, f"{basename}_histogram.png")
create_volume_histogram(app, volume_name, hist_path)

# Threshold segmentation
threshold_params = api.ThresholdParams()
threshold_params.lower_threshold = 200
threshold_params.upper_threshold = 3000
threshold_params.filter_regions = True
threshold_params.min_region_size = 100

mask_name = mask_operations.threshold(volume_name, threshold_params)

# Get statistics
requested_stats = [api.LabelStatisticType.Volume]
mask_statistics_result = measure_operations.compute_whole_mask_statistics(mask_name, volume_name, requested_stats)

results.append({
"filename": basename,
"volume_mm3": mask_statistics_result.total_volume
})

app.close_project()
print(f" Volume: {mask_statistics_result.total_volume:.2f} mm³")

except Exception as e:
print(f" Error: {e}")

# Create summary plots
if results:
create_batch_summary_plots(results, output_dir)

return results

# Example usage
app = api.Application()
results = batch_process_with_visualization(
app,
"C:/Data/Input",
"C:/Data/Output"
)

print(f"\\nProcessed {len(results)} files with visualization")

Advanced Plotting: Multi-File Comparison

"""
Compare multiple datasets with advanced matplotlib plots.
"""
import os
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
import numpy as np
import ScriptingApi as api

def create_comparison_plot(results_list, output_path):
"""Create comprehensive comparison plot for multiple datasets."""

fig, axes = plt.subplots(2, 2, figsize=(15, 12))
fig.suptitle('Multi-Dataset Comparison Analysis', fontsize=16, fontweight='bold')

# Prepare data
dataset_names = [r["dataset_name"] for r in results_list]
volumes = [r["volume_mm3"] for r in results_list]

# 1. Comparison bar chart
ax1 = axes[0, 0]
colors = plt.cm.viridis(np.linspace(0, 1, len(dataset_names)))
bars = ax1.bar(range(len(dataset_names)), volumes, color=colors, alpha=0.8)
ax1.set_xticks(range(len(dataset_names)))
ax1.set_xticklabels(dataset_names, rotation=45, ha='right')
ax1.set_ylabel('Volume (mm³)', fontsize=11)
ax1.set_title('Volume Comparison', fontsize=12, fontweight='bold')
ax1.grid(True, alpha=0.3, axis='y')

# Add value labels on bars
for i, (bar, vol) in enumerate(zip(bars, volumes)):
height = bar.get_height()
ax1.text(bar.get_x() + bar.get_width()/2., height,
f'{vol:.1f}',
ha='center', va='bottom', fontsize=9)

# 2. Normalized comparison (percentage of max)
ax2 = axes[0, 1]
max_volume = max(volumes)
normalized = [(v/max_volume)*100 for v in volumes]
ax2.barh(dataset_names, normalized, color='coral', alpha=0.7)
ax2.set_xlabel('Percentage of Maximum (%)', fontsize=11)
ax2.set_title('Normalized Volume Comparison', fontsize=12, fontweight='bold')
ax2.grid(True, alpha=0.3, axis='x')

# 3. Sorted ranking
ax3 = axes[1, 0]
sorted_indices = np.argsort(volumes)[::-1]
sorted_names = [dataset_names[i] for i in sorted_indices]
sorted_volumes = [volumes[i] for i in sorted_indices]

ax3.plot(range(len(sorted_volumes)), sorted_volumes,
marker='o', linewidth=2, markersize=8, color='steelblue')
ax3.set_xticks(range(len(sorted_names)))
ax3.set_xticklabels(sorted_names, rotation=45, ha='right')
ax3.set_ylabel('Volume (mm³)', fontsize=11)
ax3.set_title('Volume Ranking (Highest to Lowest)', fontsize=12, fontweight='bold')
ax3.grid(True, alpha=0.3)

# 4. Statistics summary
ax4 = axes[1, 1]
ax4.axis('off')

volumes_array = np.array(volumes)
summary_text = f"""
Statistical Summary:

Total Datasets: {len(volumes)}

Mean Volume: {np.mean(volumes_array):.2f} mm³
Std Deviation: {np.std(volumes_array):.2f} mm³

Minimum: {np.min(volumes_array):.2f} mm³
Maximum: {np.max(volumes_array):.2f} mm³

Median: {np.median(volumes_array):.2f} mm³
Range: {np.max(volumes_array) - np.min(volumes_array):.2f} mm³

Coefficient of Variation: {(np.std(volumes_array)/np.mean(volumes_array))*100:.2f}%
"""

ax4.text(0.1, 0.5, summary_text, fontsize=11, verticalalignment='center',
family='monospace', bbox=dict(boxstyle='round', facecolor='wheat', alpha=0.5))

plt.tight_layout()
plt.savefig(output_path, dpi=150, bbox_inches='tight')
plt.close()

print(f"Comparison plot saved: {output_path}")

# Example usage with sample data
results = [
{"dataset_name": "Patient_001", "volume_mm3": 1250.5},
{"dataset_name": "Patient_002", "volume_mm3": 1180.3},
{"dataset_name": "Patient_003", "volume_mm3": 1420.8},
{"dataset_name": "Patient_004", "volume_mm3": 1095.2},
{"dataset_name": "Patient_005", "volume_mm3": 1310.7}
]

create_comparison_plot(results, "C:/Data/Output/comparison_plot.png")

AI-Powered Batch Segmentation

TotalSegmentator Batch Processing

"""
AI Batch Segmentation Pipeline
Process multiple CT scans with TotalSegmentator.
"""
import os
from pathlib import Path
import csv
from datetime import datetime
import ScriptingApi as api

def run_ai_segmentation(app, input_path, output_dir, structures):
"""Run TotalSegmentator and export selected structures."""
volume_operations = app.get_volume_operations()
mask_operations = app.get_mask_operations()
surface_operations = app.get_surface_operations()
measure_operations = app.get_measure_operations()
ai_segmentation = app.get_ai_segmentation()

basename = os.path.splitext(os.path.basename(input_path))[0]
basename = basename.replace(".nii", "") # Handle .nii.gz

# Import
volume_name = volume_operations.import_3d_image_from_disk(input_path)

# Run TotalSegmentator
ai_segmentation.set_model_type(api.AiSegmentationModelType.TotalSegmentator)

ts_params = api.TotalSegmentatorParams()
ts_params.task = "total"
ts_params.device = "gpu"

mask_names = ai_segmentation.run_total_segmentator([volume_name], ts_params)

# Process selected structures
stats_data = []
requested_stats = [api.LabelStatisticType.Volume]

for mask_name in mask_names:
# Check if this is one of the structures we want
structure_name = mask_name.lower().replace(" ", "_")
if any(struct in structure_name for struct in structures):
# Get statistics
mask_statistics_result = measure_operations.compute_whole_mask_statistics(mask_name, volume_name, requested_stats)
stats_data.append({
"file": basename,
"structure": mask_name,
"volume_mm3": mask_statistics_result.total_volume
})

# Generate and export surface
mask_to_surface_params = api.MaskToSurfaceParams()
mask_to_surface_params.smooth_iterations = 20
mask_to_surface_params.smooth_factor = 0.06
mask_to_surface_params.triangle_reduction_percent = 50

surface_names = mask_operations.convert_to_surface_objects([mask_name], mask_to_surface_params)
if not surface_names:
raise RuntimeError(f"No surface generated for mask '{mask_name}'")
if isinstance(surface_names, (list, tuple)):
surface_id = surface_names[0]
else:
surface_id = surface_names

surface_path = os.path.join(output_dir, f"{basename}_{mask_name}.stl")
surface_operations.export_surface_to_disk(surface_id, surface_path)

app.close_project()
return stats_data

def main():
app = api.Application()

input_dir = "C:/Data/CT_Scans"
output_dir = "C:/Data/AI_Results"
os.makedirs(output_dir, exist_ok=True)

# Structures to extract
target_structures = [
"liver",
"spleen",
"kidney_left",
"kidney_right"
]

files = [str(p) for p in Path(input_dir).rglob("*.nii*")]
all_stats = []

for i, filepath in enumerate(files):
print(f"[{i+1}/{len(files)}] Processing: {os.path.basename(filepath)}")
try:
stats = run_ai_segmentation(app, filepath, output_dir, target_structures)
all_stats.extend(stats)
except Exception as e:
print(f" Error: {e}")

# Export statistics to CSV
csv_path = os.path.join(output_dir, "organ_statistics.csv")
with open(csv_path, 'w', newline='') as f:
writer = csv.DictWriter(f, fieldnames=["file", "structure", "volume_mm3", "surface_area_mm2"])
writer.writeheader()
writer.writerows(all_stats)

print(f"Statistics exported to: {csv_path}")

main()

Generating Reports

CSV Statistics Export

"""
Generate statistical reports from batch processing.
"""
import os
import csv
import ScriptingApi as api

def collect_statistics(app, mask_name, volume_name):
"""Collect comprehensive statistics for a mask."""
measure_ops = app.get_measure_operations()

requested_stats = [
api.LabelStatisticType.Volume,
api.LabelStatisticType.BoundingBox
]

mask_statistics_result = measure_operations.compute_whole_mask_statistics(mask_name, volume_name, requested_stats)

return {
"volume_mm3": mask_statistics_result.total_volume,
"voxel_count": mask_statistics_result.total_voxel_count
}

def export_batch_report(results, output_path):
"""Export batch results to CSV."""
if not results:
return

fieldnames = list(results[0].keys())

with open(output_path, 'w', newline='') as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(results)

print(f"Report exported to: {output_path}")

Summary Report Generation

def generate_summary_report(results, output_path):
"""Generate a text summary report."""
successful = [r for r in results if r.get("status") == "success"]
failed = [r for r in results if r.get("status") == "failed"]

volumes = [r.get("volume", 0) for r in successful if r.get("volume")]

report = []
report.append("=" * 60)
report.append("BATCH PROCESSING SUMMARY REPORT")
report.append("=" * 60)
report.append("")
report.append(f"Total files processed: {len(results)}")
report.append(f"Successful: {len(successful)}")
report.append(f"Failed: {len(failed)}")
report.append("")

if volumes:
report.append("Volume Statistics:")
report.append(f" Mean: {sum(volumes)/len(volumes):.2f} mm³")
report.append(f" Min: {min(volumes):.2f} mm³")
report.append(f" Max: {max(volumes):.2f} mm³")
report.append("")

if failed:
report.append("Failed Files:")
for f in failed:
report.append(f" - {f.get('input')}: {f.get('error')}")

report.append("")
report.append("=" * 60)

with open(output_path, 'w') as f:
f.write("\n".join(report))

print(f"Summary report saved to: {output_path}")

Complete Production Pipeline

"""
Production-Ready Batch Processing Pipeline
Complete example with all best practices.
"""
import os
import sys
from pathlib import Path
import json
import csv
import logging
from datetime import datetime
import ScriptingApi as api

class BatchProcessor:
"""Handles batch processing of volume data."""

def __init__(self, config_path):
self.config = self._load_config(config_path)
self.app = api.Application()
self.results = []
self._setup_output()
self._setup_logging()

def _load_config(self, path):
with open(path, 'r') as f:
return json.load(f)

def _setup_output(self):
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
self.output_root = os.path.join(
self.config["output_dir"],
f"batch_{timestamp}"
)

self.dirs = {
"masks": os.path.join(self.output_root, "masks"),
"surfaces": os.path.join(self.output_root, "surfaces"),
"reports": os.path.join(self.output_root, "reports"),
"logs": os.path.join(self.output_root, "logs")
}

for d in self.dirs.values():
os.makedirs(d, exist_ok=True)

def _setup_logging(self):
log_path = os.path.join(self.dirs["logs"], "processing.log")

logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[
logging.FileHandler(log_path),
logging.StreamHandler()
]
)
self.logger = logging.getLogger(__name__)

def discover_files(self):
"""Find all input files."""
input_dir = self.config["input_dir"]
extensions = self.config.get("extensions", ["*.nii", "*.nii.gz"])

files = []
for ext in extensions:
# Use Path.glob for non-recursive extension matching
files.extend([str(p) for p in Path(input_dir).glob(ext)])
self.logger.info(f"Discovered {len(files)} files")
return files

def process_file(self, filepath):
"""Process a single file."""
volume_operations = self.app.get_volume_operations()
mask_operations = self.app.get_mask_operations()
surface_operations = self.app.get_surface_operations()
measure_operations = self.app.get_measure_operations()

basename = os.path.basename(filepath)
name = basename.replace(".nii.gz", "").replace(".nii", "")

result = {
"input": filepath,
"name": name,
"timestamp": datetime.now().isoformat()
}

try:
# Import
self.logger.info(f"Importing: {basename}")
volume_name = volume_operations.import_3d_image_from_disk(filepath)

# Threshold
self.logger.info(" Segmenting...")
threshold_params = api.ThresholdParams()
threshold_params.lower_threshold = self.config["threshold"]["lower"]
threshold_params.upper_threshold = self.config["threshold"]["upper"]
threshold_params.filter_regions = self.config["threshold"].get("filter_regions", True)
threshold_params.min_region_size = self.config["threshold"].get("min_region_size", 100)
threshold_params.keep_largest = False

mask_name = mask_operations.threshold(volume_name, threshold_params)

# Statistics
requested_stats = [api.LabelStatisticType.Volume]
mask_statistics_result = measure_operations.compute_whole_mask_statistics(mask_name, volume_name, requested_stats)
result["volume_mm3"] = mask_statistics_result.total_volume

# Export mask
mask_path = os.path.join(self.dirs["masks"], f"{name}_mask.nii")
mask_operations.export_mask_image_to_disk(mask_name, mask_path)
result["mask_output"] = mask_path

# Surface generation
if self.config.get("generate_surface", True):
self.logger.info(" Generating surface...")
surf_config = self.config.get("surface", {})

mask_to_surface_params = api.MaskToSurfaceParams()
mask_to_surface_params.smooth_iterations = surf_config.get("smooth_iterations", 20)
mask_to_surface_params.smooth_factor = surf_config.get("smooth_factor", 0.06)
mask_to_surface_params.triangle_reduction_percent = surf_config.get("triangle_reduction_percent", 50)

surface_names = mask_operations.convert_to_surface_objects([mask_name], mask_to_surface_params)
if not surface_names:
raise RuntimeError(f"No surface generated for mask '{mask_name}'")
if isinstance(surface_names, (list, tuple)):
surface_id = surface_names[0]
else:
surface_id = surface_names

surface_path = os.path.join(self.dirs["surfaces"], f"{name}_surface.stl")
surface_operations.export_surface_to_disk(surface_id, surface_path)
result["surface_output"] = surface_path

result["status"] = "success"
self.logger.info(f" Complete: {mask_statistics_result.total_volume:.2f} mm³")

except Exception as e:
result["status"] = "failed"
result["error"] = str(e)
self.logger.error(f" Failed: {e}")

finally:
self.app.close_project()

return result

def run(self):
"""Execute the batch pipeline."""
self.logger.info("=" * 60)
self.logger.info("BATCH PROCESSING STARTED")
self.logger.info("=" * 60)

files = self.discover_files()

for i, filepath in enumerate(files):
self.logger.info(f"[{i+1}/{len(files)}] {os.path.basename(filepath)}")
result = self.process_file(filepath)
self.results.append(result)

self._generate_reports()

self.logger.info("=" * 60)
self.logger.info("BATCH PROCESSING COMPLETE")
self.logger.info("=" * 60)

def _generate_reports(self):
"""Generate output reports."""
# CSV report
csv_path = os.path.join(self.dirs["reports"], "results.csv")
fieldnames = ["name", "status", "volume_mm3", "surface_area_mm2", "error"]

with open(csv_path, 'w', newline='') as f:
writer = csv.DictWriter(f, fieldnames=fieldnames, extrasaction='ignore')
writer.writeheader()
writer.writerows(self.results)

self.logger.info(f"CSV report: {csv_path}")

# Summary
success = len([r for r in self.results if r["status"] == "success"])
failed = len([r for r in self.results if r["status"] == "failed"])

summary_path = os.path.join(self.dirs["reports"], "summary.txt")
with open(summary_path, 'w') as f:
f.write(f"Batch Processing Summary\n")
f.write(f"========================\n\n")
f.write(f"Total: {len(self.results)}\n")
f.write(f"Success: {success}\n")
f.write(f"Failed: {failed}\n")

self.logger.info(f"Summary: {summary_path}")

# Run the pipeline
if __name__ == "__main__":
config_path = "C:/Scripts/batch_config.json"
processor = BatchProcessor(config_path)
processor.run()

Best Practices

Memory Management

  • Close projects after processing each file
  • Process files sequentially for large datasets
  • Monitor system resources during execution

Error Recovery

  • Use try-except blocks around each file
  • Log errors with sufficient detail
  • Continue processing remaining files after failures
  • Save partial results periodically

Performance Optimization

  • Use fast mode for initial testing
  • Enable GPU processing when available
  • Batch similar operations together
  • Pre-validate input files before processing

Reproducibility

  • Save configuration files with results
  • Log all parameters used
  • Include timestamps in output folders
  • Version your processing scripts

Troubleshooting

IssueSolution
Out of memoryClose project after each file, reduce batch size
Script stops unexpectedlyAdd try-except blocks, enable logging
Inconsistent resultsVerify configuration, check input file quality
Slow processingEnable GPU for AI operations, optimize parameters
Missing output filesCheck export paths, verify write permissions
API method not foundEnsure using correct method names (e.g., import_3d_image_from_disk)

Next Steps


See Also