Setup and Use Distributed Training
Set up a dedicated design and architecture to support a C3 AI cluster that requires scalable and fault-tolerant distributed training. The C3 AI Distributed Training Framework (DTF) enables efficient and resilient training of large-scale models across distributed resources. DTF comes with built-in support for PyTorch Distributed, enabling Data Parallelism out of the box, while also being extensible to support other parallelism techniques and libraries. DTF integrates seamlessly with existing MlPipe APIs.
This topic addresses how to enable distributed training in C3 AI Studio, introduce a dedicated service task node for clear resource isolation, and configure checkpoints to support failure recovery.
Create a shared environment in C3 AI Studio
Create a shared environment (also sometimes called multi-node environment or MNE) called distributedtraining and start the C3 AI Distributed training configuration by completing the following steps:
In C3 AI Studio, click the Create environment button from the Home page.
In the Create new environment window, enter the name of the Shared environment (for example
distributedtraining) in the Name field.In the Server version field, enter the desired server version.
By default, the same server version as C3 AI Studio is used for the MNE if you create an environment without entering a server version. Leave the server version blank unless you need a specific, supported GA server version.
Select Shared Environment > Create.
A banner at top of the C3 AI Studio instance indicates the environment is starting.
If you receive an error message in the banner, such as "Failed to start new environment... User has reached maximum concurrent non-single-node environment of 0," C3 AI Studio might be configured to limit creation of shared environments for users with the
StudioUserrole. Contact your C3 AI Center of Excellence or studio admin to obtain higher permissions.Access the new environment by selecting the environment name in the Current environment drop-down list in the C3 AI Studio Home page. You can alternatively select it in the Environments tab to open the Environment Details page.
Once the status changes from "Starting..." to "Running," you'll be able to create an application for the C3 AI Distributed Training.
Create the C3 AI Distributed Training application
To create an application for the C3 AI Model Inference Service (MIS), do the following:
From the C3 AI Studio home page, select the MNE created in the steps above in the Current Environment dropdown list.
Select the ellipses menu (...) next to View Details and select Open console.
This opens the C3 AI static console of the C3 AI Environment Management application (env/c3) in the C3 Agentic AI Platform.
Once the console page loads, create the C3 AI Distributed Training for you environment by using the following example code snippet using the C3 AI console:
JavaScriptvar rootPkg = "distributedTraining" var appName = "dt" C3.env().startApp({rootPkg: rootPkg, name: appName})The above code creates an application in your environment named
dtthat functions as the dedicated app for distributed training. This app contains the libraries and dependencies necessary to run distributed training.You can also run the above code using the Google Chrome tools bar:
- Open Google Chrome tools bar.
- Select View > Developer > Developer Tools.
- Select the Console tab.
These actions open the C3 AI console of the C3 AI Environment Management application (env/c3), allowing you to interact with your environment while in the C3 Agentic AI Platform.
Configure C3 AI Distributed Training App Task Node Pool
Create and configure the C3 AI Distributed Training app with the necessary nodes to support distributed training.
The current implementation does not support sharing a App.NodePool for running multiple distributed training tasks simultaneously. If you need to run multiple distributed training tasks concurrently, you must create multiple node pools, each dedicated to a single training task.
Set up distributed training app node pools in the following steps:
From the C3 AI Studio Home page, select the distributed training MNE.
From the C3 AI Studio Env tab, select
distributedtrainingfrom the Current Environment dropdown list.Hover Over the App card and select
Open console. This opens the C3 AI console of thedistributedtrainingapp, allowing you to interact with the necessary types and packages.Configure a valid Hardware Profile for distributed training. Define a hardware profile with 8cpu and 16GB of memory for each node using this profile:
JavaScripthardwareProfile = C3.HardwareProfile.upsertProfile({ "name": '8vCpu16mem', //Name of the HP "cpu": 8, // Number of CPUs available on node "memoryMb": 16000 // Memory available on node })More info about node pool configuration and hardware profiles can be found in Configure and Manage Node Pools.
Configure a service node pool for distributed training using the previously-created hardware profile. Use a service node pool instead of task node pools because tasks nodes run invalidation queues and might pick up other tasks; a service node pool doesn't run these queues.
For this scenario, four nodes are created.
JavaScriptC3.app().configureNodePool("distrain", // name of the node pool to configure 4, // sets the target node count 4, // sets the minimum node count 4, // sets the maximum node count hardwareProfile, // sets the hardware profile [c3.Server.Role.SERVICE], // sets the server role that this node pool will function as (SERVICE) False, // optional - specifies whether autoscaling should be enabled 0.5, // optional - specifies the JVM max memory fraction for c3server ).update()
Create a simple model to run distributed training
To create a Simple Testing Model, follow these steps:
From the C3 AI Studio Home page, select the distributed training MNE from the Current Environment dropdown list.
From the C3 AI Studio Env page, select the
distributedtrainingapp.Hover over the app card and select
Jupyter. This selection opens the C3 AI JupyterLab of thedistributedtrainingapp. From JupyterLab, you can run Python commands to create and configure pipes for distributed training.Once JupyterLab has loaded, select the first icon below the
Notebooksection.On the Notebook, select a new runtime in the upper-right corner.
Select the
py-pytorchruntime. Select Install if necessary.On a new Notebook cell, create a simple model. The following example creates a basic neural network model built using PyTorch's
nn.Module.Python# Define the model import torch.nn as nn class SimpleModel(nn.Module): def __init__(self): import torch.nn as nn super(SimpleModel, self).__init__() self.layer1 = nn.Linear(4, 10) self.layer2 = nn.Linear(10, 5) self.layer3 = nn.Linear(5, 1) self.relu = nn.ReLU() def forward(self, x): x = self.relu(self.layer1(x)) x = self.relu(self.layer2(x)) x = self.layer3(x) return x def loss(self): import torch.nn as nn return nn.MSELoss() def optimizer(self): import torch.optim as optim return optim.SGD(self.parameters(), lr=0.01)This network includes a few fully-connected layers (Linear layers) and uses the ReLU activation function to introduce non-linearity. You can easily adapt this model for small-scale regression tasks. This network serves as a simple starting point for the distributed training example.
If
py-pytorchruntime is not selected, the execution fails. Select the runtime using the Select Kernel pop-up.After you finish creating your model, run the cell (Shift + Return).
Instantiate the new model by running the following command in a new cell.
Pythonmodel = SimpleModel()In another cell, convert the model to a pipe by running the following:
Pythonpipe = c3.PytorchPipe.convert(model)With
PytorchPipe, you can create some sample data by executing the following code in a separate cell:
import numpy as np
np.random.seed(0)
x_np, y_np = np.random.rand(100,4).astype(np.float32), np.random.rand(100,1).astype(np.float32)
x, y = c3.Data.Persisted.from_numpy(x_np), c3.Data.Persisted.from_numpy(y_np)Use the C3 AI DistributedTrainingSpec
DistributedTrainingSpec is a specification for configuring and managing distributed training operations in a C3 AI environment.
The DistributedTrainingSpec provides a flexible and robust interface for the configuration and management of distributed training jobs. It handles resource allocation, fault tolerance, and progress monitoring, ensuring efficient and reliable execution of distributed machine learning pipes.
To effectively use DistributedTrainingSpec, first ensure that the node pool is configured to allocate appropriate computational resources for your workload, balancing performance and cost. Here is a list of the available fields you can set in the spec:
workerProcessesPerNodeenables you to maximize node processing capacity — Consider starting with 2 or 4 processes per node and adjusting based on resource usage and training performance.minTotalWorkerProcessesensures a sufficient baseline for distributed training — Set this value based on the total number of nodes andworkerProcessesPerNodeto maintain flexibility for scaling.timeoutprevents jobs from running indefinitely or getting stuck by aborting training runs — Use a conservative timeout for test runs (such as 5 minutes) and increase it for longer, production-level training jobs.initialProgressCheckDelayallows worker processes enough time to connect to the master node, load model and data, and begin training —For lightweight models, start with 2 minutes. For more complex setups, provide enough time so the worker processes can download the data and model from remote storage.
progressTimeoutandprogressCheckIntervalhelp detect and recover from stalled training — TheprogressTimeoutspecifies how long training can run without progress before it is considered stalled, whileprogressCheckIntervalsets the frequency of progress monitoring.You could use 10 second intervals with a 1 minute
progressTimeout; for larger models, consider increasing these values to balance efficiency.masterPortspecifies the port number to be used for master-worker communication withintorch.distributed— You can use a distinct port for each training iteration you run.
The following sample uses 5 nodes and 4 workerProcessesPerNode, meaning 20 minTotalWorkerProcesses. The spec includes a 5 minute timeout and adds an initialProgressCheckDelay of 30 seconds, setting the progressCheckInterval to verify there is progress every 10 seconds. It terminates the training through progressTimeout if there is no progress in 1 minute .
You can copy and paste this code snipper in a new notebook cell and execute it.
distributedTrainingSpec = c3.DistributedTrainingSpec(workerProcessesPerNode=4,
minTotalWorkerProcesses=4 * 5, # 4 process in 5 nodes
initialProgressCheckDelay="30s",
timeout="5m",
progressTimeout="1m",
progressCheckInterval="10s",
nodePool='distrain')Run a Pytorch pipe using C3 AI Distributed Training
After creating the DistributedTrainingSpec, add it to a PytorchPipeSpec by running the below command in a new cell:
spec = c3.PytorchPipeSpec(distributedTrainingSpec=distributedTrainingSpec,
epochs=2000,
checkpointInterval=100)PytorchPipe uses this spec for training the your data. Once you have the spec, you can call the train method on it by executing the following on a new cell:
run = pipe.train(x, y, spec=spec)The above call uses the distributed training Pytorch pipe.
After you execute the above command, distributed training starts with the specified configuration. You can monitor the task node of the distributed training app by searching for DistributedTrainingCoordinator on the task node logs and PytorchPipe on the distributed training node logs.
Once the job has finished training, you can gather the results of your training by running the following commands:
trained_pipe = run.result(timeoutMinutes=10)
y_pred = trained_pipe.process(x).result().to_numpy()
import torch
assert torch.nn.MSELoss()(torch.from_numpy(y_pred.copy()), torch.from_numpy(y_np)) < 0.1[!Note**] The processing of the pipe runs on task nodes and is not distributed. Only the training step is run in a distributed manner.
Validate runtime improvements from distributed training
You can validate that the training time improves from using the distributed spec. To do so, you run the following code snippet in a new notebook cell:
# Running a NOT distributed trining spec
spec_nd = c3.PytorchPipeSpec(epochs=2000)
run_nd = pipe.train(x, y, spec=spec_nd)
trained_pipe_nd = run_nd.result()
y_pred_nd = trained_pipe_nd.process(x).result().to_numpy()
# Comparing the MSE for both training types.
import torch
mse_distributed = torch.nn.MSELoss()(torch.from_numpy(y_pred.copy()), torch.from_numpy(y_np))
mse_non_distributed = torch.nn.MSELoss()(torch.from_numpy(y_pred_nd.copy()), torch.from_numpy(y_np)
assert mse_distributed < mse_non_distributed