Background Job Management¶
Overview¶
The sktime-mcp server now supports non-blocking background jobs for long-running operations like model training. This architecture prevents the MCP server from becoming unresponsive when training large models.
Architecture¶
The system uses a multi-threaded, asynchronous architecture inspired by CodeGraphContext:
Key Components¶
- JobManager (
runtime/jobs.py) - Thread-safe job tracking - Executor (
runtime/executor.py) - Async execution with job tracking - MCP Server (
server.py) - Event loop coordination
How It Works¶
1. Non-Blocking Execution¶
When you call fit_predict_async, the server:
- Creates a job with a unique ID
- Schedules the training as an async coroutine on the event loop
- Returns immediately with the job ID
- Continues handling other requests while training runs in the background
# Traditional (blocking) - Server freezes during training
result = fit_predict(handle, "airline", horizon=12)
# Async (non-blocking) - Server stays responsive
job_info = fit_predict_async(handle, "airline", horizon=12)
# Returns: {"success": True, "job_id": "abc-123-def"}
# Check progress
status = check_job_status("abc-123-def")
# Returns: {"status": "running", "progress_percentage": 66.7, ...}
2. Progress Tracking¶
Jobs track their progress through multiple steps:
{
"job_id": "abc-123-def",
"status": "running",
"total_steps": 3,
"completed_steps": 2,
"current_step": "Generating predictions (horizon=12)...",
"progress_percentage": 66.7,
"estimated_time_remaining": 5.2,
"estimated_time_remaining_human": "5s"
}
3. Job Lifecycle¶
Available Tools¶
Training Tools¶
fit_predict_async¶
Train a model in the background (non-blocking).
Arguments:
- estimator_handle: Handle from instantiate_estimator
- dataset: Dataset name (e.g., "airline", "sunspots")
- horizon: Forecast horizon (default: 12)
Returns:
{
"success": true,
"job_id": "abc-123-def-456",
"message": "Training job started for ARIMA on airline...",
"estimator": "ARIMA",
"dataset": "airline",
"horizon": 12
}
Example:
# Instantiate model
model = instantiate_estimator("ARIMA", {"order": [1, 1, 1]})
# Returns: {"handle": "est_abc123"}
# Start training (non-blocking!)
job = fit_predict_async("est_abc123", "airline", horizon=24)
# Returns immediately with job_id
# Server is still responsive - you can do other things!
Job Management Tools¶
check_job_status¶
Check the status and progress of a background job.
Arguments:
- job_id: Job ID to check
Returns:
{
"success": true,
"job_id": "abc-123-def",
"status": "running",
"job_type": "fit_predict",
"estimator_name": "ARIMA",
"dataset_name": "airline",
"total_steps": 3,
"completed_steps": 2,
"current_step": "Generating predictions...",
"progress_percentage": 66.7,
"elapsed_time": 10.5,
"estimated_time_remaining": 5.2,
"estimated_time_remaining_human": "5s",
"result": null
}
When completed:
list_jobs¶
List all background jobs with optional filtering.
Arguments:
- status (optional): Filter by status ("pending", "running", "completed", "failed", "cancelled")
- limit (optional): Maximum number of jobs to return (default: 20)
Returns:
{
"success": true,
"count": 3,
"jobs": [
{
"job_id": "abc-123",
"status": "completed",
"estimator_name": "ARIMA",
...
},
{
"job_id": "def-456",
"status": "running",
"progress_percentage": 50.0,
...
}
]
}
Example:
# List all running jobs
running_jobs = list_jobs(status="running")
# List all jobs (any status)
all_jobs = list_jobs(limit=50)
cancel_job¶
Cancel a running or pending job.
Arguments:
- job_id: Job ID to cancel
Returns:
Note: You can only cancel jobs in "pending" or "running" status.
delete_job¶
Delete a job from the job manager.
Arguments:
- job_id: Job ID to delete
Returns:
cleanup_old_jobs¶
Remove jobs older than specified hours.
Arguments:
- max_age_hours (optional): Maximum age in hours (default: 24)
Returns:
Complete Workflow Example¶
# 1. Instantiate a model
model_result = instantiate_estimator("AutoARIMA")
handle = model_result["handle"]
# 2. Start training in background
job_result = fit_predict_async(handle, "airline", horizon=24)
job_id = job_result["job_id"]
# 3. Server is responsive - you can do other things!
# Check available datasets
datasets = list_datasets()
# Search for other estimators
estimators = search_estimators("prophet")
# 4. Check training progress
status = check_job_status(job_id)
print(f"Progress: {status['progress_percentage']}%")
print(f"Current step: {status['current_step']}")
print(f"Time remaining: {status['estimated_time_remaining_human']}")
# 5. Wait for completion (or poll periodically)
while status["status"] == "running":
time.sleep(1)
status = check_job_status(job_id)
# 6. Get results
if status["status"] == "completed":
predictions = status["result"]["predictions"]
print(f"Predictions: {predictions}")
else:
print(f"Job failed: {status['errors']}")
# 7. Clean up
delete_job(job_id)
Technical Details¶
Async Execution¶
The fit_predict_async method uses:
asyncio.run_in_executor()- Runs synchronous sktime operations in a thread poolawait asyncio.sleep(0.01)- Yields control to the event loop every 10msasyncio.run_coroutine_threadsafe()- Schedules coroutines on the main event loop
This prevents blocking the MCP server's event loop.
Thread Safety¶
The JobManager uses threading.Lock() to ensure thread-safe access to job data:
Progress Updates¶
Jobs update their status in real-time:
# Step 1: Load data
job_manager.update_job(job_id,
completed_steps=0,
current_step="Loading dataset 'airline'..."
)
# Step 2: Fit model
job_manager.update_job(job_id,
completed_steps=1,
current_step="Fitting ARIMA on airline..."
)
# Step 3: Predict
job_manager.update_job(job_id,
completed_steps=2,
current_step="Generating predictions..."
)
Memory Management¶
Jobs are automatically cleaned up after 24 hours (configurable):
Benefits¶
✅ Non-blocking - Server stays responsive during long operations
✅ Progress tracking - Real-time updates on training progress
✅ Time estimation - Estimated time remaining
✅ Error handling - Graceful failure with error messages
✅ Cancellation - Ability to cancel long-running jobs
✅ Memory efficient - Automatic cleanup of old jobs
Comparison: Blocking vs Non-Blocking¶
Traditional (Blocking)¶
# ❌ Server freezes for 30 seconds
result = fit_predict(handle, "large_dataset", horizon=100)
# Server cannot handle any other requests during this time
Async (Non-Blocking)¶
# ✅ Server returns immediately
job = fit_predict_async(handle, "large_dataset", horizon=100)
# Server continues handling other requests
# Check progress whenever you want
status = check_job_status(job["job_id"])
When to Use Each¶
Use fit_predict (blocking) when:¶
- Training on small datasets (< 1 second)
- You need results immediately
- Running in a script/notebook (not via MCP)
Use fit_predict_async (non-blocking) when:¶
- Training large models
- Working with big datasets
- Server responsiveness is critical
- You want progress updates
- Training might take > 5 seconds
Future Enhancements¶
Potential improvements:
- [ ] Support for custom progress callbacks
- [ ] Job persistence across server restarts
- [ ] Job priority queue
- [ ] Parallel job execution limits
- [ ] Streaming progress updates via WebSocket
- [ ] Job result caching