Create Job Script
Creating effective job scripts is essential for successful job execution on Vantage clusters. This guide walks through the process of developing, testing, and optimizing job scripts for various computational workloads.
Planning Your Script
Define Requirements
Before writing code, clearly define your script requirements:
Computational Goals
- Primary Objective: What computation or analysis will the script perform?
- Input Data: What data sources and formats will be processed?
- Expected Output: What results, files, or data products should be generated?
- Success Criteria: How will you measure successful completion?
Resource Requirements
- CPU Needs: Number of cores and processing requirements
- Memory Requirements: RAM needed for data processing and intermediate results
- Storage Needs: Disk space for input, output, and temporary files
- Runtime Estimates: Expected execution time and scheduling constraints
Dependencies and Environment
- Software Requirements: Programming languages, libraries, and tools
- Environment Modules: Required software modules and versions
- Container Images: Docker or Singularity containers if needed
- License Requirements: Software licensing and availability constraints
Script Development Process
Step 1: Choose Script Framework
Select the appropriate scripting approach:
Bash Shell Scripts
Best for system operations and workflow orchestration:
#!/bin/bash
#SBATCH --job-name=my_workflow
#SBATCH --cpus-per-task=4
#SBATCH --mem=16G
#SBATCH --time=02:00:00
set -e # Exit on any error
set -u # Exit on undefined variables
# Script implementation goes here
Python Scripts
Ideal for data processing and scientific computing:
#!/usr/bin/env python3
"""
Job script for data analysis workflow
"""
import argparse
import logging
import sys
from pathlib import Path
def setup_logging():
"""Configure logging for the job script"""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
def main():
parser = argparse.ArgumentParser(description='Data analysis job')
parser.add_argument('--input', required=True, help='Input data file')
parser.add_argument('--output', required=True, help='Output directory')
args = parser.parse_args()
setup_logging()
# Implementation goes here
if __name__ == "__main__":
main()
R Scripts
Perfect for statistical analysis and modeling:
#!/usr/bin/env Rscript
# Parse command line arguments
args <- commandArgs(trailingOnly = TRUE)
if (length(args) < 2) {
stop("Usage: script.R <input_file> <output_dir>")
}
input_file <- args[1]
output_dir <- args[2]
# Ensure output directory exists
dir.create(output_dir, recursive = TRUE, showWarnings = FALSE)
# Script implementation goes here
Step 2: Implement Core Logic
Develop the main computational components:
Data Input and Validation
# Validate input files exist and are readable
validate_inputs() {
local input_file="$1"
if [[ ! -f "$input_file" ]]; then
echo "Error: Input file '$input_file' not found" >&2
exit 1
fi
if [[ ! -r "$input_file" ]]; then
echo "Error: Input file '$input_file' not readable" >&2
exit 1
fi
echo "Input validation successful"
}
Environment Setup
# Setup computational environment
setup_environment() {
echo "Setting up environment..."
# Load required modules
module purge
module load python/3.9
module load gcc/9.3.0
# Activate virtual environment
if [[ -n "${VIRTUAL_ENV_PATH:-}" ]]; then
source "$VIRTUAL_ENV_PATH/bin/activate"
fi
# Set environment variables
export OMP_NUM_THREADS="${SLURM_CPUS_PER_TASK:-1}"
export PYTHONPATH="${PYTHONPATH:-}:$(pwd)/src"
echo "Environment setup complete"
}
Main Processing Logic
def process_data(input_file, output_dir):
"""Main data processing function"""
import pandas as pd
import numpy as np
# Load and validate data
try:
data = pd.read_csv(input_file)
logging.info(f"Loaded data with {len(data)} rows")
except Exception as e:
logging.error(f"Failed to load data: {e}")
raise
# Perform analysis
results = {}
# Statistical summary
results['summary'] = data.describe()
# Custom analysis
results['processed'] = data.groupby('category').agg({
'value': ['mean', 'std', 'count']
})
# Save results
output_path = Path(output_dir)
output_path.mkdir(parents=True, exist_ok=True)
results['summary'].to_csv(output_path / 'summary.csv')
results['processed'].to_csv(output_path / 'processed.csv')
logging.info(f"Results saved to {output_dir}")
return results
Step 3: Add Error Handling
Implement comprehensive error management:
Error Detection and Reporting
# Error handling function
handle_error() {
local exit_code=$?
local line_number=$1
echo "Error occurred in script at line $line_number (exit code: $exit_code)" >&2
echo "Cleaning up temporary files..." >&2
cleanup_temp_files
# Send notification if configured
if [[ -n "${ERROR_NOTIFICATION_EMAIL:-}" ]]; then
echo "Job script failed at line $line_number" | \
mail -s "Job Failure: $SLURM_JOB_NAME" "$ERROR_NOTIFICATION_EMAIL"
fi
exit $exit_code
}
# Set error trap
trap 'handle_error $LINENO' ERR
Resource Cleanup
# Cleanup function
cleanup_temp_files() {
echo "Performing cleanup..."
# Remove temporary directories
if [[ -n "${TEMP_DIR:-}" && -d "$TEMP_DIR" ]]; then
rm -rf "$TEMP_DIR"
echo "Removed temporary directory: $TEMP_DIR"
fi
# Cleanup any running background processes
jobs -p | xargs -r kill
echo "Cleanup complete"
}
# Ensure cleanup on exit
trap cleanup_temp_files EXIT
Retry Logic
import time
from functools import wraps
def retry(max_attempts=3, delay=1, backoff=2):
"""Decorator for retrying failed operations"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
attempt = 1
while attempt <= max_attempts:
try:
return func(*args, **kwargs)
except Exception as e:
if attempt == max_attempts:
logging.error(f"Function {func.__name__} failed after {max_attempts} attempts")
raise
logging.warning(f"Attempt {attempt} failed: {e}. Retrying in {delay} seconds...")
time.sleep(delay)
delay *= backoff
attempt += 1
return wrapper
return decorator
@retry(max_attempts=3, delay=2)
def download_data(url, output_file):
"""Download data with retry logic"""
# Implementation with potential network failures
pass
Step 4: Add Monitoring and Logging
Implement comprehensive monitoring:
Performance Monitoring
# Performance monitoring function
monitor_resources() {
local pid=$1
local log_file="$2"
while kill -0 "$pid" 2>/dev/null; do
# Log CPU and memory usage
ps -o pid,pcpu,pmem,rss,vsz,comm -p "$pid" >> "$log_file"
sleep 30
done
}
# Start main process and monitor it
python analysis.py &
MAIN_PID=$!
# Start resource monitoring in background
monitor_resources "$MAIN_PID" "resource_usage.log" &
MONITOR_PID=$!
# Wait for main process to complete
wait "$MAIN_PID"
MAIN_EXIT_CODE=$?
# Stop monitoring
kill "$MONITOR_PID" 2>/dev/null || true
exit $MAIN_EXIT_CODE
Progress Tracking
import logging
from tqdm import tqdm
def process_large_dataset(data_chunks):
"""Process data with progress tracking"""
total_chunks = len(data_chunks)
results = []
with tqdm(total=total_chunks, desc="Processing chunks") as pbar:
for i, chunk in enumerate(data_chunks):
try:
result = process_chunk(chunk)
results.append(result)
# Log progress periodically
if (i + 1) % 10 == 0:
logging.info(f"Processed {i + 1}/{total_chunks} chunks")
pbar.update(1)
except Exception as e:
logging.error(f"Failed to process chunk {i}: {e}")
# Continue with next chunk or exit based on requirements
return results
Advanced Script Features
Parameterization
Make scripts flexible with parameters:
# Default parameter values
INPUT_FILE=""
OUTPUT_DIR="./results"
NUM_THREADS="${SLURM_CPUS_PER_TASK:-4}"
MEMORY_LIMIT="16G"
DEBUG_MODE=false
# Parse command line arguments
parse_arguments() {
while [[ $# -gt 0 ]]; do
case $1 in
-i|--input)
INPUT_FILE="$2"
shift 2
;;
-o|--output)
OUTPUT_DIR="$2"
shift 2
;;
-t|--threads)
NUM_THREADS="$2"
shift 2
;;
-m|--memory)
MEMORY_LIMIT="$2"
shift 2
;;
-d|--debug)
DEBUG_MODE=true
shift
;;
-h|--help)
show_help
exit 0
;;
*)
echo "Unknown parameter: $1" >&2
show_help
exit 1
;;
esac
done
}
Configuration Files
Support external configuration:
import yaml
import json
from pathlib import Path
def load_config(config_file):
"""Load configuration from file"""
config_path = Path(config_file)
if not config_path.exists():
raise FileNotFoundError(f"Configuration file not found: {config_file}")
if config_path.suffix.lower() == '.yaml' or config_path.suffix.lower() == '.yml':
with open(config_path, 'r') as f:
return yaml.safe_load(f)
elif config_path.suffix.lower() == '.json':
with open(config_path, 'r') as f:
return json.load(f)
else:
raise ValueError(f"Unsupported configuration format: {config_path.suffix}")
# Example configuration usage
def main():
config = load_config('job_config.yaml')
# Use configuration values
input_file = config['data']['input_file']
output_dir = config['data']['output_dir']
processing_params = config['processing']
# Process with configuration
process_data(input_file, output_dir, **processing_params)
Checkpoint and Resume
Implement checkpoint functionality:
import pickle
from pathlib import Path
class JobCheckpoint:
def __init__(self, checkpoint_dir="./checkpoints"):
self.checkpoint_dir = Path(checkpoint_dir)
self.checkpoint_dir.mkdir(exist_ok=True)
def save_checkpoint(self, step, state, checkpoint_name="main"):
"""Save current processing state"""
checkpoint_file = self.checkpoint_dir / f"{checkpoint_name}_step_{step}.pkl"
checkpoint_data = {
'step': step,
'state': state,
'timestamp': time.time()
}
with open(checkpoint_file, 'wb') as f:
pickle.dump(checkpoint_data, f)
logging.info(f"Checkpoint saved: {checkpoint_file}")
def load_checkpoint(self, checkpoint_name="main"):
"""Load the latest checkpoint"""
pattern = f"{checkpoint_name}_step_*.pkl"
checkpoint_files = list(self.checkpoint_dir.glob(pattern))
if not checkpoint_files:
return None
# Find latest checkpoint
latest_file = max(checkpoint_files, key=lambda x: x.stat().st_mtime)
with open(latest_file, 'rb') as f:
checkpoint_data = pickle.load(f)
logging.info(f"Checkpoint loaded: {latest_file}")
return checkpoint_data
# Usage example
def main():
checkpoint = JobCheckpoint()
# Try to resume from checkpoint
saved_state = checkpoint.load_checkpoint()
if saved_state:
start_step = saved_state['step'] + 1
current_state = saved_state['state']
logging.info(f"Resuming from step {start_step}")
else:
start_step = 0
current_state = initialize_state()
logging.info("Starting from beginning")
# Process with checkpointing
for step in range(start_step, total_steps):
current_state = process_step(step, current_state)
# Save checkpoint every 10 steps
if step % 10 == 0:
checkpoint.save_checkpoint(step, current_state)
Testing and Validation
Local Testing
Test scripts on local systems:
# Create test environment
setup_test_environment() {
echo "Setting up test environment..."
# Create test data
mkdir -p test_data
echo "sample,data,for,testing" > test_data/input.csv
# Set test parameters
export SLURM_CPUS_PER_TASK=2
export SLURM_JOB_ID=test_job
export SLURM_JOB_NAME=test_script
echo "Test environment ready"
}
# Run tests
run_tests() {
echo "Running script tests..."
setup_test_environment
# Test with minimal data
bash my_script.sh -i test_data/input.csv -o test_results/
# Validate results
if [[ -f "test_results/summary.csv" ]]; then
echo "Test PASSED: Output file created"
else
echo "Test FAILED: No output file found"
exit 1
fi
echo "All tests passed"
}
Performance Testing
Validate resource usage and performance:
import time
import psutil
import logging
class PerformanceProfiler:
def __init__(self):
self.start_time = None
self.peak_memory = 0
self.cpu_samples = []
def start_profiling(self):
"""Start performance monitoring"""
self.start_time = time.time()
self.peak_memory = 0
self.cpu_samples = []
logging.info("Performance profiling started")
def sample_resources(self):
"""Sample current resource usage"""
memory_mb = psutil.virtual_memory().used / 1024 / 1024
cpu_percent = psutil.cpu_percent()
self.peak_memory = max(self.peak_memory, memory_mb)
self.cpu_samples.append(cpu_percent)
def report_performance(self):
"""Generate performance report"""
if self.start_time is None:
return
duration = time.time() - self.start_time
avg_cpu = sum(self.cpu_samples) / len(self.cpu_samples) if self.cpu_samples else 0
report = f"""
Performance Report:
- Duration: {duration:.2f} seconds
- Peak Memory: {self.peak_memory:.2f} MB
- Average CPU: {avg_cpu:.2f}%
- CPU Samples: {len(self.cpu_samples)}
"""
logging.info(report)
return {
'duration': duration,
'peak_memory_mb': self.peak_memory,
'avg_cpu_percent': avg_cpu
}
# Usage in main script
def main():
profiler = PerformanceProfiler()
profiler.start_profiling()
try:
# Your main processing logic here
for i in range(processing_steps):
process_step(i)
profiler.sample_resources()
finally:
profiler.report_performance()
Deployment and Optimization
Script Optimization
Optimize for cluster execution:
I/O Optimization
# Use local scratch space for intensive I/O
setup_scratch_space() {
# Use node-local storage if available
if [[ -n "${SLURM_JOB_ID:-}" ]]; then
SCRATCH_DIR="/tmp/job_${SLURM_JOB_ID}"
else
SCRATCH_DIR="/tmp/script_$$"
fi
mkdir -p "$SCRATCH_DIR"
# Copy input data to scratch
echo "Copying input data to scratch space..."
cp -r "$INPUT_DIR"/* "$SCRATCH_DIR/"
# Set working directory to scratch
cd "$SCRATCH_DIR"
echo "Working directory: $(pwd)"
}
# Copy results back on completion
copy_results_back() {
echo "Copying results back to shared storage..."
cp -r results/ "$OUTPUT_DIR/"
echo "Results copied successfully"
}
Parallel Processing
import multiprocessing as mp
from concurrent.futures import ProcessPoolExecutor, as_completed
def parallel_processing(data_chunks, num_workers=None):
"""Process data chunks in parallel"""
if num_workers is None:
num_workers = mp.cpu_count()
logging.info(f"Starting parallel processing with {num_workers} workers")
results = []
with ProcessPoolExecutor(max_workers=num_workers) as executor:
# Submit all tasks
future_to_chunk = {
executor.submit(process_chunk, chunk): i
for i, chunk in enumerate(data_chunks)
}
# Collect results as they complete
for future in as_completed(future_to_chunk):
chunk_id = future_to_chunk[future]
try:
result = future.result()
results.append((chunk_id, result))
logging.info(f"Completed chunk {chunk_id}")
except Exception as e:
logging.error(f"Chunk {chunk_id} failed: {e}")
# Sort results by chunk ID
results.sort(key=lambda x: x[0])
return [result for _, result in results]
Resource Management
Optimize resource utilization:
# Dynamic resource allocation
allocate_resources() {
# Get allocated resources from scheduler
NUM_CPUS="${SLURM_CPUS_PER_TASK:-1}"
MEMORY_MB="${SLURM_MEM_PER_NODE:-8192}"
# Calculate optimal thread counts
if [[ $NUM_CPUS -ge 16 ]]; then
PYTHON_THREADS=$((NUM_CPUS - 2)) # Leave 2 cores for system
OMP_THREADS=$NUM_CPUS
else
PYTHON_THREADS=$NUM_CPUS
OMP_THREADS=$NUM_CPUS
fi
# Set environment variables
export OMP_NUM_THREADS=$OMP_THREADS
export MKL_NUM_THREADS=$OMP_THREADS
export OPENBLAS_NUM_THREADS=$OMP_THREADS
echo "Resource allocation:"
echo " CPUs: $NUM_CPUS"
echo " Memory: ${MEMORY_MB}MB"
echo " Python threads: $PYTHON_THREADS"
echo " OpenMP threads: $OMP_THREADS"
}
Best Practices Summary
Code Quality
- Use version control for script management
- Write clear, well-commented code
- Follow language-specific style guidelines
- Implement comprehensive error handling
- Add thorough input validation
Performance
- Profile scripts to identify bottlenecks
- Optimize I/O operations and data movement
- Use appropriate parallelization strategies
- Monitor and tune resource usage
- Implement efficient algorithms and data structures
Reliability
- Test scripts thoroughly before deployment
- Implement checkpoint and resume functionality
- Add comprehensive logging and monitoring
- Design for graceful failure and recovery
- Validate outputs and intermediate results
Maintainability
- Use configuration files for parameters
- Document script purpose and usage
- Follow consistent naming conventions
- Implement modular, reusable components
- Plan for script evolution and updates
Next Steps
After creating your script:
- Share Job Scripts - Collaborate with team members
- Delete Job Scripts - Manage script lifecycle
- Test in staging environment before production deployment
- Monitor performance and optimize based on usage patterns