This content is currently WIP. Diagrams, content, and structure are subject to change.
The ML Pipeline is a core capability of the C3 Agentic AI Platform that enables you to develop, deploy, and manage machine learning models within your applications. This powerful integration framework turns your data into actionable insights by connecting machine learning workflows with your application’s data model and business logic.

What is ML Pipeline Integration?

ML Pipeline integration is the process of incorporating machine learning workflows into the C3 Agentic AI Platform. It provides a structured approach to developing, training, deploying, and monitoring ML models that work seamlessly with your application’s data model and business logic. ML pipeline The C3 Agentic AI Platform supports the entire machine learning lifecycle, from data preparation to model deployment and monitoring. This integration allows you to leverage the platform’s data management capabilities while incorporating advanced analytics and predictive capabilities into your applications.

ML Pipeline components

The ML Pipeline consists of several key components that work together to enable machine learning within the C3 Agentic AI Platform:

Data preparation

Tools for cleaning, transforming, and feature engineering that prepare your data for machine learning.

Model development

Support for developing models using popular frameworks like TensorFlow, PyTorch, and scikit-learn, as well as native C3 AI model types.

Model training

Infrastructure for training models on large datasets, with support for distributed training and GPU acceleration.

Model deployment

Mechanisms for deploying trained models to production, making them available for inference within your applications.

Model monitoring

Tools for tracking model performance, detecting drift, and managing model versions over time.

How ML Pipeline Integration works

The ML Pipeline Integration leverages the C3 Agentic AI Platform’s model-driven architecture to create a seamless connection between your data model and machine learning workflows:
  1. Data access: ML models access data through the Type System, using the same data model as the rest of your application
  2. Feature engineering: Transform raw data into features suitable for machine learning, with support for both batch and real-time processing
  3. Model training: Train models using your preferred framework, with support for distributed training on large datasets
  4. Model deployment: Deploy models to production with a few clicks, making them available for inference within your application
  5. Model monitoring: Track model performance over time, detect drift, and manage model versions
This integration ensures that your ML models work with the same data model as the rest of your application, providing a consistent view of your data across all components.

Supported model types

The C3 Agentic AI Platform supports multiple types of machine learning models:

Framework-based models

You can develop models using popular machine learning frameworks and deploy them within the C3 Agentic AI Platform:
# Example: Training a TensorFlow model
import tensorflow as tf
from c3.ml import ModelTrainer

# Define a simple neural network
model = tf.keras.Sequential([
    tf.keras.layers.Dense(64, activation='relu', input_shape=(10,)),
    tf.keras.layers.Dense(32, activation='relu'),
    tf.keras.layers.Dense(1)
])

model.compile(optimizer='adam', loss='mse')

# Train the model using data from the C3 Agentic AI Platform
trainer = ModelTrainer(model)
trainer.train(
    data_source='WindTurbineSensorData',
    features=['temperature', 'vibration', 'rpm', 'power_output'],
    target='remaining_useful_life',
    batch_size=64,
    epochs=100
)

# Save the model to the C3 Agentic AI Platform
trainer.save('TurbineRULPredictor')

Native C3 AI models

The platform also provides native model types that are optimized for common machine learning tasks:
type TurbineFailurePredictor mixes MLModel {
  // Model configuration
  algorithm: String = "RandomForest";
  hyperparameters: Map<String, Object> = {
    "n_estimators": 100,
    "max_depth": 10,
    "min_samples_split": 5
  };
  
  // Input and output specifications
  inputs: [
    { name: "temperature", type: Double },
    { name: "vibration", type: Double },
    { name: "rpm", type: Double },
    { name: "power_output", type: Double }
  ];
  output: { name: "failure_probability", type: Double };
  
  // Training configuration
  trainingConfig: MLTrainingConfig = {
    dataSource: "TurbineSensorReadings",
    splitRatio: 0.8,
    evaluationMetrics: ["accuracy", "precision", "recall"]
  };
}

Pre-built AI services

The platform includes pre-built AI services for common tasks like time series forecasting, anomaly detection, and natural language processing:
// Example: Using a pre-built anomaly detection service
import { AnomalyDetectionService } from 'c3/ai/services';

// Create an anomaly detection service
const anomalyDetector = new AnomalyDetectionService({
  dataSource: 'TurbineSensorReadings',
  timeField: 'timestamp',
  valueField: 'vibration',
  algorithm: 'IsolationForest'
});

// Detect anomalies in recent data
const anomalies = anomalyDetector.detectAnomalies({
  startTime: '2023-01-01T00:00:00Z',
  endTime: '2023-01-31T23:59:59Z',
  turbineId: 'T-1001'
});

// Process detected anomalies
anomalies.forEach(anomaly => {
  console.log(`Anomaly detected at ${anomaly.timestamp} with score ${anomaly.score}`);
});

Data flows in ML pipelines

ML pipelines in the C3 Agentic AI Platform involve several data flows:

Training data flow

The training data flow prepares historical data for model training:
  1. Data extraction: Extract relevant data from the Type System or external sources
  2. Data transformation: Clean, normalize, and transform the data into features
  3. Feature engineering: Create new features that capture domain knowledge
  4. Training/validation split: Split the data into training and validation sets
  5. Model training: Train the model on the training set and evaluate on the validation set

Inference data flow

The inference data flow processes new data to generate predictions:
  1. Data ingestion: Ingest new data from sensors, transactions, or other sources
  2. Feature computation: Transform the raw data into the features expected by the model
  3. Model inference: Pass the features to the model to generate predictions
  4. Post-processing: Apply business rules or thresholds to the predictions
  5. Action generation: Generate actions or alerts based on the predictions

Feedback data flow

The feedback data flow captures the results of predictions to improve future models:
  1. Outcome recording: Record the actual outcomes corresponding to predictions
  2. Performance evaluation: Compare predictions to actual outcomes
  3. Model monitoring: Track model performance metrics over time
  4. Drift detection: Detect when model performance degrades
  5. Retraining trigger: Trigger model retraining when necessary

Practical application: Wind turbine predictive maintenance

Let’s explore how ML Pipeline Integration works in a wind turbine predictive maintenance application:

Data preparation

The application collects sensor data from wind turbines, including temperature, vibration, power output, and rotational speed. This data is stored in the C3 Agentic AI Platform’s Type System:
type TurbineSensorReading mixes Entity {
  turbine: -> WindTurbine;
  timestamp: DateTime;
  temperature: Double;
  vibration: Double;
  rpm: Double;
  powerOutput: Double;
}

Feature engineering

Raw sensor data is transformed into features that capture patterns relevant to turbine failures:
def create_features(readings):
    features = []
    for turbine_id, turbine_readings in readings.groupby('turbine_id'):
        # Calculate rolling statistics
        rolling_mean = turbine_readings['vibration'].rolling(window=24).mean()
        rolling_std = turbine_readings['vibration'].rolling(window=24).std()
        
        # Calculate rate of change
        temp_change = turbine_readings['temperature'].diff()
        
        # Create feature vector
        turbine_features = pd.DataFrame({
            'turbine_id': turbine_id,
            'timestamp': turbine_readings['timestamp'],
            'vibration_mean': rolling_mean,
            'vibration_std': rolling_std,
            'temp_change': temp_change,
            'power_efficiency': turbine_readings['powerOutput'] / turbine_readings['rpm'],
            'failure': turbine_readings['failure']  # Target variable
        })
        
        features.append(turbine_features)
    
    return pd.concat(features)

Model training

A machine learning model is trained to predict turbine failures based on the engineered features:
from sklearn.ensemble import RandomForestClassifier
from c3.ml import ModelTrainer

# Create and train a random forest classifier
model = RandomForestClassifier(
    n_estimators=100,
    max_depth=10,
    min_samples_split=5
)

# Train the model using the C3 Agentic AI Platform's training infrastructure
trainer = ModelTrainer(model)
trainer.train(
    features=features,
    target='failure',
    evaluation_metrics=['accuracy', 'precision', 'recall', 'f1']
)

# Register the trained model with the C3 Agentic AI Platform
model_id = trainer.register(
    name='TurbineFailurePredictor',
    version='1.0',
    description='Predicts turbine failures based on sensor data'
)

Model deployment

The trained model is deployed to production, making it available for inference:
// Deploy the model to production
const deployment = ModelDeployment.make({
  model: ModelRegistry.fetch('TurbineFailurePredictor'),
  version: '1.0',
  environment: 'production',
  scalingConfig: {
    minInstances: 2,
    maxInstances: 10,
    targetCpuUtilization: 70
  }
});

deployment.activate();

Inference and action

The deployed model generates predictions that trigger maintenance actions:
// Function to process new sensor readings and generate maintenance alerts
function processSensorReadings(readings) {
  // Prepare features for the model
  const features = FeatureTransformer.transform(readings);
  
  // Get predictions from the model
  const predictions = ModelService.predict('TurbineFailurePredictor', features);
  
  // Process predictions and generate maintenance alerts
  predictions.forEach((prediction, index) => {
    const reading = readings[index];
    const turbineId = reading.turbine.id;
    
    if (prediction.failureProbability > 0.7) {
      // High risk of failure - create emergency maintenance alert
      MaintenanceAlert.make({
        turbine: reading.turbine,
        alertType: 'EMERGENCY',
        probability: prediction.failureProbability,
        recommendedAction: 'Immediate inspection required',
        estimatedTimeToFailure: prediction.timeToFailure
      }).save();
    } else if (prediction.failureProbability > 0.3) {
      // Moderate risk - schedule maintenance
      MaintenanceSchedule.make({
        turbine: reading.turbine,
        priority: 'MEDIUM',
        probability: prediction.failureProbability,
        recommendedAction: 'Schedule inspection within 2 weeks',
        estimatedTimeToFailure: prediction.timeToFailure
      }).save();
    }
  });
}

Model monitoring

The platform continuously monitors model performance and triggers retraining when necessary:
// Configure model monitoring
const monitor = ModelMonitor.make({
  model: ModelRegistry.fetch('TurbineFailurePredictor'),
  metrics: ['accuracy', 'precision', 'recall', 'f1'],
  dataSource: 'TurbineSensorReadings',
  evaluationFrequency: 'DAILY',
  driftDetection: {
    enabled: true,
    thresholds: {
      accuracy: 0.05,  // Alert if accuracy drops by more than 5%
      dataDrift: 0.1   // Alert if feature distribution changes by more than 10%
    }
  },
  alertConfig: {
    recipients: ['maintenance-team@example.com'],
    channels: ['email', 'slack']
  }
});

monitor.activate();

Benefits of ML Pipeline Integration

Integrating machine learning pipelines with the C3 Agentic AI Platform provides several key benefits:
  • Unified data model: ML models work with the same data model as the rest of your application
  • Simplified deployment: Deploy models to production with a few clicks
  • Automated monitoring: Track model performance and detect drift automatically
  • Scalable infrastructure: Train and serve models on distributed infrastructure
  • Version control: Manage model versions and promote models from development to production
  • Feedback loops: Capture outcomes to improve future models
These benefits enable you to incorporate machine learning into your applications more easily and effectively, turning data into actionable insights that drive business value.