Skip to content

AI Training

OVHcloud AI Training allows you to run machine learning training jobs on managed GPU infrastructure. This provider gives you full control over job lifecycle from your Airflow DAGs.


Overview

AI Training provides:

  • Managed Infrastructure — No need to manage servers or GPU drivers
  • Multiple GPU Types — V100S, A100, and more
  • Object Storage Integration — Mount your data from OVHcloud Object Storage
  • Full Job Lifecycle — Submit, monitor, stop, and delete jobs

Available Regions:

Region Location API Endpoint
gra Gravelines, France https://gra.ai.cloud.ovh.net
bhs Beauharnois, Canada https://bhs.ai.cloud.ovh.net

Connection Setup

Create an Airflow connection with your AI Training token:

airflow connections add ovh_ai_training_default \
    --conn-type generic \
    --conn-login gra \
    --conn-password your-ai-token-here

The --conn-login specifies the region (gra or bhs).

See Getting Started for detailed configuration options.


Operators Overview

Operator Description
OVHCloudAITrainingSubmitJobOperator Submit a new training job
OVHCloudAITrainingGetJobOperator Get job details and status
OVHCloudAITrainingWaitForJobOperator Wait for job completion
OVHCloudAITrainingStopJobOperator Stop a running job
OVHCloudAITrainingDeleteJobOperator Delete a job
OVHCloudAITrainingGetLogsOperator Retrieve job logs

Submit a Training Job

Basic Example

from airflow import DAG
from apache_airflow_provider_ovhcloud_ai.operators.ai_training import (
    OVHCloudAITrainingSubmitJobOperator
)
from datetime import datetime

with DAG(
    dag_id='simple_training_job',
    start_date=datetime(2024, 1, 1),
    schedule=None,
    catchup=False,
) as dag:

    submit = OVHCloudAITrainingSubmitJobOperator(
        task_id='submit_training',
        image='pytorch/pytorch:2.0.0-cuda11.7-cudnn8-runtime',
        name='my-training-job',
        command=['python', '-c', 'print("Hello from AI Training!")'],
        flavor='ai1-1-gpu',
    )

With GPU and Environment Variables

submit = OVHCloudAITrainingSubmitJobOperator(
    task_id='submit_gpu_training',
    image='pytorch/pytorch:2.0.0-cuda11.7-cudnn8-runtime',
    name='gpu-training-job',
    command=['python', 'train.py'],
    flavor='ai1-1-gpu',
    gpu=1,
    gpu_model='V100S',
    env_vars={
        'EPOCHS': '100',
        'BATCH_SIZE': '32',
        'LEARNING_RATE': '0.001'
    },
)

With Object Storage Volumes

Mount data from OVHcloud Object Storage:

from apache_airflow_provider_ovhcloud_ai.hooks.ai_training import OVHCloudAITrainingHook

submit = OVHCloudAITrainingSubmitJobOperator(
    task_id='training_with_data',
    image='pytorch/pytorch:2.0.0-cuda11.7-cudnn8-runtime',
    name='training-with-volumes',
    command=['python', 'train.py', '--data-dir', '/workspace/data'],
    flavor='ai1-1-gpu',
    volumes=[
        OVHCloudAITrainingHook.build_volume(
            container='my-training-data',
            alias='my-data-alias',
            mount_path='/workspace/data',
            permission='RO',  # Read-only
        ),
        OVHCloudAITrainingHook.build_volume(
            container='my-model-output',
            alias='my-output-alias',
            mount_path='/workspace/output',
            permission='RW',  # Read-write
        ),
    ],
)

Wait for Completion

Submit a job and wait for it to finish:

submit = OVHCloudAITrainingSubmitJobOperator(
    task_id='submit_and_wait',
    image='pytorch/pytorch:2.0.0-cuda11.7-cudnn8-runtime',
    name='training-job',
    command=['python', 'train.py'],
    flavor='ai1-1-gpu',
    wait_for_completion=True,  # Block until job completes
    check_interval=60,         # Check every 60 seconds
    wait_timeout=7200,         # Timeout after 2 hours
)

Submit Job Parameters

Parameter Type Default Description
image str Required Docker image to use
name str None Job name
command List[str] None Command to execute
env_vars Dict[str, str] None Environment variables
flavor str None Instance flavor (e.g., ai1-1-gpu)
gpu int 0 Number of GPUs
gpu_model str None GPU model (V100S, A100, etc.)
cpu int None Number of CPUs
volumes List[Dict] None Volume configurations
labels Dict[str, str] None Job labels
ssh_public_keys List[str] None SSH keys for remote access
default_http_port int None HTTP port to expose
job_timeout int None Job timeout in seconds
unsecure_http bool False Allow unsecure HTTP
wait_for_completion bool False Wait for job to complete
check_interval int 30 Seconds between status checks
wait_timeout int 3600 Max wait time in seconds
ovh_conn_id str 'ovh_ai_training_default' Connection ID
region str None Override connection region

Volume Permissions

Permission Description
RO Read-only access
RW Read-write access
RWD Read-write-delete access

Complete Workflow Example

A complete DAG that submits a job, waits for completion, and retrieves logs:

from airflow import DAG
from apache_airflow_provider_ovhcloud_ai.operators.ai_training import (
    OVHCloudAITrainingSubmitJobOperator,
    OVHCloudAITrainingWaitForJobOperator,
    OVHCloudAITrainingGetLogsOperator,
    OVHCloudAITrainingDeleteJobOperator,
)
from apache_airflow_provider_ovhcloud_ai.hooks.ai_training import OVHCloudAITrainingHook
from datetime import datetime

with DAG(
    dag_id='complete_training_workflow',
    start_date=datetime(2024, 1, 1),
    schedule=None,
    catchup=False,
    tags=['ovhcloud', 'training'],
) as dag:

    # Submit the training job
    submit = OVHCloudAITrainingSubmitJobOperator(
        task_id='submit_job',
        image='pytorch/pytorch:2.0.0-cuda11.7-cudnn8-runtime',
        name='my-training-{{ ds_nodash }}',
        command=['python', 'train.py'],
        flavor='ai1-1-gpu',
        env_vars={
            'EPOCHS': '50',
            'MODEL_NAME': 'resnet50',
        },
        volumes=[
            OVHCloudAITrainingHook.build_volume(
                container='training-data',
                alias='data-alias',
                mount_path='/workspace/data',
                permission='RO',
            ),
            OVHCloudAITrainingHook.build_volume(
                container='model-output',
                alias='output-alias',
                mount_path='/workspace/output',
                permission='RW',
            ),
        ],
        labels={'project': 'ml-pipeline', 'env': 'production'},
    )

    # Wait for the job to complete
    wait = OVHCloudAITrainingWaitForJobOperator(
        task_id='wait_for_completion',
        job_id='{{ ti.xcom_pull(task_ids="submit_job")["id"] }}',
        target_states=['DONE'],
        failure_states=['FAILED', 'ERROR', 'INTERRUPTED'],
        check_interval=60,
        timeout=14400,  # 4 hours
    )

    # Get the logs
    logs = OVHCloudAITrainingGetLogsOperator(
        task_id='get_logs',
        job_id='{{ ti.xcom_pull(task_ids="submit_job")["id"] }}',
        tail=500,
    )

    # Clean up the job
    delete = OVHCloudAITrainingDeleteJobOperator(
        task_id='delete_job',
        job_id='{{ ti.xcom_pull(task_ids="submit_job")["id"] }}',
        trigger_rule='all_done',  # Run even if previous tasks failed
    )

    submit >> wait >> logs >> delete

Monitor Job Status

Get Job Details

from apache_airflow_provider_ovhcloud_ai.operators.ai_training import (
    OVHCloudAITrainingGetJobOperator
)

get_status = OVHCloudAITrainingGetJobOperator(
    task_id='get_job_status',
    job_id='{{ ti.xcom_pull(task_ids="submit_job")["id"] }}',
)

Wait for Specific States

from apache_airflow_provider_ovhcloud_ai.operators.ai_training import (
    OVHCloudAITrainingWaitForJobOperator
)

# Wait for job to be running (useful for interactive jobs)
wait_running = OVHCloudAITrainingWaitForJobOperator(
    task_id='wait_until_running',
    job_id='{{ ti.xcom_pull(task_ids="submit_job")["id"] }}',
    target_states=['RUNNING'],
    check_interval=30,
    timeout=600,
)

Job States

State Description
QUEUED Job is queued for execution
INITIALIZING Job is being initialized
PENDING Job is pending resource allocation
RUNNING Job is currently running
DONE Job completed successfully
FAILED Job failed
ERROR Job encountered an error
INTERRUPTED Job was interrupted
STOPPING Job is being stopped
STOPPED Job was stopped

Stop and Delete Jobs

Stop a Running Job

from apache_airflow_provider_ovhcloud_ai.operators.ai_training import (
    OVHCloudAITrainingStopJobOperator
)

stop = OVHCloudAITrainingStopJobOperator(
    task_id='stop_job',
    job_id='{{ ti.xcom_pull(task_ids="submit_job")["id"] }}',
)

Delete a Job

from apache_airflow_provider_ovhcloud_ai.operators.ai_training import (
    OVHCloudAITrainingDeleteJobOperator
)

delete = OVHCloudAITrainingDeleteJobOperator(
    task_id='delete_job',
    job_id='{{ ti.xcom_pull(task_ids="submit_job")["id"] }}',
)

Using the Hook Directly

For advanced use cases, use the hook directly:

from airflow.operators.python import PythonOperator
from apache_airflow_provider_ovhcloud_ai.hooks.ai_training import OVHCloudAITrainingHook

def custom_training_logic(**context):
    hook = OVHCloudAITrainingHook(
        ovh_conn_id='ovh_ai_training_default',
        region='gra',
    )

    # Submit a job
    job = hook.submit_job(
        image='pytorch/pytorch:2.0.0-cuda11.7-cudnn8-runtime',
        name='custom-job',
        command=['python', 'train.py'],
        flavor='ai1-1-gpu',
        env_vars=OVHCloudAITrainingHook.build_env_vars({
            'EPOCHS': '100',
        }),
    )

    job_id = job['id']
    print(f"Job submitted: {job_id}")

    # Wait for completion
    final_job = hook.wait_for_job(
        job_id=job_id,
        target_states=['DONE'],
        check_interval=60,
        timeout=7200,
    )

    # Get logs
    logs = hook.get_job_logs(job_id, tail=100)
    print(logs)

    return final_job

with DAG(...) as dag:
    task = PythonOperator(
        task_id='custom_training',
        python_callable=custom_training_logic,
    )

Helper Methods

Build Volume Configuration

from apache_airflow_provider_ovhcloud_ai.hooks.ai_training import OVHCloudAITrainingHook

volume = OVHCloudAITrainingHook.build_volume(
    container='my-bucket',           # Object Storage container name
    alias='my-data-alias',           # Data store alias
    mount_path='/workspace/data',    # Mount path in container
    prefix='datasets/v1',            # Optional: subfolder in container
    permission='RO',                 # RO, RW, or RWD
    cache=True,                      # Enable caching
)

Build Environment Variables

from apache_airflow_provider_ovhcloud_ai.hooks.ai_training import OVHCloudAITrainingHook

env_vars = OVHCloudAITrainingHook.build_env_vars({
    'EPOCHS': '100',
    'BATCH_SIZE': '32',
    'LEARNING_RATE': '0.001',
})
# Returns: [{"name": "EPOCHS", "value": "100"}, ...]

Best Practices

1. Use Labels for Organization

submit = OVHCloudAITrainingSubmitJobOperator(
    task_id='submit_job',
    image='...',
    labels={
        'project': 'recommendation-engine',
        'team': 'ml-platform',
        'env': 'production',
        'dag_id': '{{ dag.dag_id }}',
        'run_id': '{{ run_id }}',
    },
)

2. Set Appropriate Timeouts

submit = OVHCloudAITrainingSubmitJobOperator(
    task_id='submit_job',
    image='...',
    job_timeout=14400,       # Job timeout: 4 hours
    wait_for_completion=True,
    wait_timeout=16000,      # Airflow wait timeout: slightly longer
)

3. Use Trigger Rules for Cleanup

delete = OVHCloudAITrainingDeleteJobOperator(
    task_id='cleanup_job',
    job_id='{{ ti.xcom_pull(task_ids="submit_job")["id"] }}',
    trigger_rule='all_done',  # Always run, even on failure
)

4. Separate Submit and Wait for Long Jobs

For very long jobs, separate the submit and wait operators:

submit = OVHCloudAITrainingSubmitJobOperator(
    task_id='submit_job',
    wait_for_completion=False,  # Don't block
    ...
)

# Use a sensor or separate DAG run for monitoring
wait = OVHCloudAITrainingWaitForJobOperator(
    task_id='wait_for_job',
    job_id='{{ ti.xcom_pull(task_ids="submit_job")["id"] }}',
    timeout=86400,  # 24 hours
    poke_interval=300,  # Check every 5 minutes
)

Error Handling

Common errors and solutions:

Error Cause Solution
Invalid region Wrong region specified Use gra or bhs
Token not found Missing connection password Check connection config
Job failed Training script error Check job logs
Timeout waiting Job took too long Increase wait_timeout
Invalid flavor Unknown instance type Check available flavors