Pipeline Orchestrator¶
The Pipeline Orchestrator is the central component that coordinates the execution of all pipeline phases with comprehensive MLflow tracking and experiment management.
Overview¶
The FineTunePipeline
class in app/pipeline_invoker.py
provides:
- Phase Coordination: Manages fine-tuning, inference, and evaluation phases
- MLflow Integration: Complete experiment tracking and artifact logging
- Environment Variables Management: Centralized environment variables handling
- Error Handling: Robust error management with detailed logging
Key Features¶
- ✅ Phase Control: Run individual phases or complete pipeline
- ✅ MLflow Tracking: Automatic experiment tracking and metric logging
- ✅ Nested Runs: Organized experiment structure with parent/child runs
- ✅ Artifact Management: Automatic artifact logging and versioning
- ✅ Metric Sanitization: MLflow-compatible metric name sanitization
- ✅ System Metrics: Hardware and resource utilization tracking
- ✅ Configuration Logging: Complete configuration parameter tracking
Architecture¶
Usage¶
Complete Pipeline¶
from app.pipeline_invoker import FineTunePipeline
# Initialize pipeline
pipeline = FineTunePipeline(config_path="config.toml")
# Run complete pipeline
results = pipeline.run_pipeline()
Individual Phases¶
# Setup MLflow tracking
pipeline.setup_mlflow()
pipeline.start_mlflow_run()
# Run specific phases
finetuning_results = pipeline.run_finetuning()
inference_results = pipeline.run_inference()
evaluation_results = pipeline.run_evaluation()
# Finalize tracking
pipeline.stop_mlflow_run()
Command Line Interface¶
# Run complete pipeline
python app/pipeline_invoker.py --config config.toml
# Run with API keys
python app/pipeline_invoker.py --hf-key YOUR_HF_TOKEN --openai-key YOUR_OPENAI_KEY
# Enable specific phases
python app/pipeline_invoker.py --enable-finetuning --enable-inference --enable-evaluation
MLflow Integration¶
Experiment Structure¶
- Parent Run: Overall pipeline execution
- Child Runs: Individual phase executions (fine-tuning, inference, evaluation)
- Metrics: Performance metrics, durations, and system resources
- Parameters: Configuration parameters and run metadata
- Artifacts: Model files, evaluation reports, and output datasets
Tracked Metrics¶
- Pipeline execution times
- Phase-specific durations
- Model performance metrics
- System resource utilization
- Evaluation scores and statistics
Logged Artifacts¶
- Fine-tuned models
- Inference outputs (JSONL format)
- Evaluation reports (Excel/JSON)
- Configuration files
Configuration¶
The pipeline orchestrator uses these configuration sections:
[mlflow]
tracking_uri = "http://localhost:5000"
experiment_name = "fine-tune-pipeline"
[pipeline]
enable_finetuning = true
enable_inference = true
enable_evaluation = true
stop_after_finetuning = false
stop_after_inference = false
Error Handling¶
The orchestrator provides comprehensive error handling:
- Phase Isolation: Errors in one phase don't affect others
- MLflow Logging: Error details logged to MLflow for debugging
- Graceful Degradation: Pipeline continues with remaining phases when possible
- Detailed Messages: Clear error messages with context
Best Practices¶
Experiment Organization¶
- Use descriptive experiment names
- Include timestamps in run names
- Tag runs with relevant metadata
- Use consistent naming conventions
Resource Management¶
- Monitor system metrics during runs
- Use appropriate hardware configurations
- Set reasonable timeout values
- Enable resource logging for optimization
Configuration Management¶
- Version control configuration files
- Use environment-specific configs
- Validate configurations before execution
- Document configuration changes
Troubleshooting¶
Common Issues¶
MLflow Connection Failed
# Check MLflow server status
mlflow server --host 0.0.0.0 --port 5000
# Verify tracking URI in config
tracking_uri = "http://localhost:5000"
Phase Execution Errors - Check individual component logs - Verify configuration parameters - Review MLflow run details - Check system resources
Memory Issues - Enable gradient checkpointing - Use quantization options - Reduce batch sizes - Monitor system metrics
Advanced Features¶
Custom Metric Sanitization¶
The pipeline automatically sanitizes metric names for MLflow compatibility:
def sanitize_metric_name(name: str) -> str:
"""Sanitize metric names for MLflow"""
name = name.replace("=", ":")
return re.sub(r"[^a-zA-Z0-9_\-\. :/]", "_", name)
System Metrics Logging¶
Automatic tracking of system resources:
Nested Run Management¶
Organized experiment tracking with parent-child relationships: