The Event Processing System is a powerful core component of the C3 Agentic AI Platform that orchestrates your application’s dynamic behavior. This sophisticated system handles real-time and scheduled events, manages task execution, and coordinates complex data workflows—providing the essential infrastructure for automating processes, responding to changes in data, and orchestrating operations across your entire application.

What is the Event Processing System?

The Event Processing System is a processing layer that handles events, schedules jobs, and manages workflows within the C3 Agentic AI Platform. It provides the infrastructure for responding to real-time data changes, executing scheduled tasks, and orchestrating complex business processes. This system enables your application to:
  • React to events in real time (example: sensor readings exceeding thresholds)
  • Schedule and execute recurring tasks (example: daily data aggregation)
  • Coordinate multi-step workflows (example: maintenance approval processes)
  • Manage long-running operations (example: ML model training)
  • Handle failures and retries gracefully
The Event Processing System works closely with the Type System, allowing you to define event handlers and workflows that operate on your domain model.

Core components

The Event Processing System consists of several key components:

Event handlers

Functions that respond to specific events, such as data changes, system events, or custom application events.

Jobs

Discrete units of work that can be scheduled, executed, monitored, and managed through the platform.

Workflows

Sequences of steps that coordinate multiple operations, potentially involving human interaction and decision points.

Queues

Managed lists of pending tasks that ensure reliable processing, even under heavy load or during system disruptions.
These components work together to provide a comprehensive framework for event-driven processing and task orchestration.

Event handlers

Event handlers are functions that respond to specific events within the C3 Agentic AI Platform. They can be attached to Types to respond to data changes, or they can listen for system or custom events.

Type event handlers

Type event handlers respond to lifecycle events on Type instances:
type WindTurbine mixes Asset {
  // Fields...
  
  @onCreate
  initializeTurbine() ~ js-server {
    // Initialize the turbine when it's created
    this.status = "COMMISSIONED";
    this.lastMaintenanceDate = new Date();
    this.nextMaintenanceDate = addDays(new Date(), 90);
  }
  
  @onUpdate(["status"])
  handleStatusChange(oldValue, newValue) ~ js-server {
    // React to status changes
    if (oldValue !== "ALARM" && newValue === "ALARM") {
      // Create an alert when the turbine enters alarm state
      Alert.make({
        source: this,
        severity: "HIGH",
        message: `Turbine ${this.name} entered alarm state`,
        timestamp: new Date()
      }).save();
    }
  }
}

System event handlers

System event handlers respond to platform events:
// Register a handler for system startup
SystemEvents.onStartup(async () => {
  // Initialize application resources
  await initializeApplicationResources();
  
  // Start monitoring services
  await startMonitoringServices();
  
  console.log("Application initialized successfully");
});

// Register a handler for system shutdown
SystemEvents.onShutdown(async () => {
  // Clean up resources
  await cleanupResources();
  
  console.log("Application shutdown complete");
});

Custom event handlers

Custom event handlers respond to application-specific events:
// Define a custom event type
const MAINTENANCE_COMPLETED = "MAINTENANCE_COMPLETED";

// Register a handler for the custom event
EventBus.on(MAINTENANCE_COMPLETED, async (data) => {
  // Get the turbine that was maintained
  const turbine = await WindTurbine.fetch(data.turbineId);
  
  // Update the turbine's maintenance records
  turbine.lastMaintenanceDate = data.completionDate;
  turbine.nextMaintenanceDate = addDays(data.completionDate, 90);
  turbine.status = "OPERATIONAL";
  
  // Save the updated turbine
  await turbine.save();
  
  // Log the maintenance completion
  console.log(`Maintenance completed for turbine ${turbine.name}`);
});

// Emit a custom event
EventBus.emit(MAINTENANCE_COMPLETED, {
  turbineId: "T-1001",
  completionDate: new Date(),
  technician: "John Smith",
  notes: "Replaced bearings and performed routine inspection"
});

Jobs

Jobs are discrete units of work that can be scheduled, executed, monitored, and managed through the platform. They provide a way to perform tasks that may be long-running, resource-intensive, or need to run on a schedule.

Job definition

Jobs are defined with a name, description, and implementation:
// Define a job to aggregate turbine performance data
JobRegistry.register({
  name: "AggregatePerformanceData",
  description: "Aggregates hourly performance data for all turbines",
  implementation: async (params) => {
    // Get the time range for aggregation
    const startTime = params.startTime || subHours(new Date(), 1);
    const endTime = params.endTime || new Date();
    
    // Get all turbines
    const turbines = await WindTurbine.filter({}).fetch();
    
    // Process each turbine
    for (const turbine of turbines) {
      // Get sensor readings for the time range
      const readings = await SensorReading.filter({
        turbine: { id: turbine.id },
        timestamp: { $gte: startTime, $lt: endTime }
      }).fetch();
      
      // Calculate aggregated metrics
      const metrics = calculateAggregatedMetrics(readings);
      
      // Create a performance record
      await TurbinePerformance.make({
        turbine: turbine,
        timestamp: endTime,
        timeRange: "HOURLY",
        averagePowerOutput: metrics.avgPower,
        maxPowerOutput: metrics.maxPower,
        efficiency: metrics.efficiency,
        availability: metrics.availability
      }).save();
    }
    
    return {
      turbineCount: turbines.length,
      startTime,
      endTime,
      status: "COMPLETED"
    };
  }
});

Job scheduling

Jobs can be scheduled to run at specific times or intervals:
// Schedule a job to run every hour
JobScheduler.schedule({
  jobName: "AggregatePerformanceData",
  schedule: "0 * * * *", // Cron expression: run at the start of every hour
  params: {},
  enabled: true,
  description: "Hourly aggregation of turbine performance data"
});

// Schedule a job to run daily at midnight
JobScheduler.schedule({
  jobName: "GenerateDailyReport",
  schedule: "0 0 * * *", // Cron expression: run at midnight every day
  params: {
    reportType: "DAILY_SUMMARY"
  },
  enabled: true,
  description: "Generate daily summary report"
});

Job execution

Jobs can be executed manually or programmatically:
// Execute a job immediately
JobExecutor.execute({
  jobName: "AggregatePerformanceData",
  params: {
    startTime: subDays(new Date(), 1),
    endTime: new Date()
  }
}).then(result => {
  console.log(`Job completed with result: ${JSON.stringify(result)}`);
}).catch(error => {
  console.error(`Job failed: ${error.message}`);
});

// Execute a job and wait for completion
async function regenerateHistoricalData() {
  try {
    // Execute the job
    const result = await JobExecutor.execute({
      jobName: "RegenerateHistoricalData",
      params: {
        startDate: "2023-01-01",
        endDate: "2023-12-31"
      }
    });
    
    // Process the result
    console.log(`Regenerated data for ${result.recordCount} records`);
    
    return result;
  } catch (error) {
    console.error(`Failed to regenerate historical data: ${error.message}`);
    throw error;
  }
}

Workflows

Workflows are sequences of steps that coordinate multiple operations, potentially involving human interaction and decision points. They provide a way to model complex business processes that may span multiple systems and involve both automated and manual tasks.

Workflow definition

Workflows are defined with a name, description, and a sequence of steps:
// Define a maintenance workflow
WorkflowRegistry.register({
  name: "TurbineMaintenanceWorkflow",
  description: "Coordinates the maintenance process for a wind turbine",
  steps: [
    {
      id: "initiate",
      type: "start",
      next: "assessUrgency"
    },
    {
      id: "assessUrgency",
      type: "decision",
      decision: async (context) => {
        // Get the turbine
        const turbine = await WindTurbine.fetch(context.turbineId);
        
        // Get the latest sensor readings
        const readings = await SensorReading.filter({
          turbine: { id: turbine.id }
        }).orderBy({ timestamp: "DESC" }).limit(1).fetch();
        
        // Assess urgency based on sensor readings
        if (readings.length > 0 && readings[0].vibration > 10) {
          return "urgent";
        } else {
          return "routine";
        }
      },
      branches: {
        urgent: "scheduleEmergencyMaintenance",
        routine: "scheduleRoutineMaintenance"
      }
    },
    {
      id: "scheduleEmergencyMaintenance",
      type: "task",
      task: async (context) => {
        // Create an emergency maintenance record
        const record = await MaintenanceRecord.make({
          turbine: { id: context.turbineId },
          type: "EMERGENCY",
          status: "SCHEDULED",
          priority: "HIGH",
          scheduledDate: new Date(), // Immediate
          description: "Emergency maintenance required due to high vibration"
        }).save();
        
        // Update the context with the record ID
        context.maintenanceRecordId = record.id;
        
        return context;
      },
      next: "assignTechnician"
    },
    {
      id: "scheduleRoutineMaintenance",
      type: "task",
      task: async (context) => {
        // Create a routine maintenance record
        const record = await MaintenanceRecord.make({
          turbine: { id: context.turbineId },
          type: "ROUTINE",
          status: "SCHEDULED",
          priority: "NORMAL",
          scheduledDate: addDays(new Date(), 7), // Schedule for next week
          description: "Routine maintenance"
        }).save();
        
        // Update the context with the record ID
        context.maintenanceRecordId = record.id;
        
        return context;
      },
      next: "assignTechnician"
    },
    {
      id: "assignTechnician",
      type: "human_task",
      assignee: "maintenance_manager",
      form: "TechnicianAssignmentForm",
      timeout: 86400, // 24 hours in seconds
      next: "performMaintenance"
    },
    {
      id: "performMaintenance",
      type: "human_task",
      assignee: (context) => context.technicianId,
      form: "MaintenanceChecklistForm",
      timeout: 259200, // 3 days in seconds
      next: "verifyMaintenance"
    },
    {
      id: "verifyMaintenance",
      type: "task",
      task: async (context) => {
        // Get the maintenance record
        const record = await MaintenanceRecord.fetch(context.maintenanceRecordId);
        
        // Update the record status
        record.status = "COMPLETED";
        record.completionDate = new Date();
        record.completionNotes = context.maintenanceNotes;
        
        // Save the updated record
        await record.save();
        
        // Get the turbine
        const turbine = await WindTurbine.fetch(context.turbineId);
        
        // Update the turbine status
        turbine.status = "OPERATIONAL";
        turbine.lastMaintenanceDate = new Date();
        
        // Save the updated turbine
        await turbine.save();
        
        // Emit a maintenance completed event
        EventBus.emit("MAINTENANCE_COMPLETED", {
          turbineId: context.turbineId,
          maintenanceRecordId: context.maintenanceRecordId,
          completionDate: new Date()
        });
        
        return context;
      },
      next: "complete"
    },
    {
      id: "complete",
      type: "end"
    }
  ]
});

Workflow execution

Workflows can be started manually or in response to events:
// Start a maintenance workflow
WorkflowExecutor.start({
  workflowName: "TurbineMaintenanceWorkflow",
  context: {
    turbineId: "T-1001",
    initiatedBy: "system",
    reason: "Scheduled maintenance"
  }
}).then(instance => {
  console.log(`Started workflow instance: ${instance.id}`);
}).catch(error => {
  console.error(`Failed to start workflow: ${error.message}`);
});

// Start a workflow in response to an event
EventBus.on("TURBINE_ALARM", async (data) => {
  try {
    // Start a maintenance workflow
    const instance = await WorkflowExecutor.start({
      workflowName: "TurbineMaintenanceWorkflow",
      context: {
        turbineId: data.turbineId,
        initiatedBy: "alarm",
        reason: data.alarmMessage
      }
    });
    
    console.log(`Started maintenance workflow for turbine ${data.turbineId}`);
    
    return instance;
  } catch (error) {
    console.error(`Failed to start maintenance workflow: ${error.message}`);
    throw error;
  }
});

Human tasks

Workflows can include human tasks that require user interaction:
// Define a form for a human task
FormRegistry.register({
  name: "TechnicianAssignmentForm",
  fields: [
    {
      name: "technicianId",
      type: "select",
      label: "Assign Technician",
      options: async () => {
        // Get all technicians
        const technicians = await Technician.filter({
          status: "AVAILABLE"
        }).fetch();
        
        // Map technicians to options
        return technicians.map(tech => ({
          value: tech.id,
          label: tech.name
        }));
      },
      required: true
    },
    {
      name: "priority",
      type: "select",
      label: "Priority",
      options: [
        { value: "LOW", label: "Low" },
        { value: "MEDIUM", label: "Medium" },
        { value: "HIGH", label: "High" },
        { value: "CRITICAL", label: "Critical" }
      ],
      required: true
    },
    {
      name: "notes",
      type: "textarea",
      label: "Notes for Technician"
    }
  ],
  onSubmit: async (formData, context) => {
    // Update the workflow context with form data
    context.technicianId = formData.technicianId;
    context.priority = formData.priority;
    context.assignmentNotes = formData.notes;
    
    // Get the maintenance record
    const record = await MaintenanceRecord.fetch(context.maintenanceRecordId);
    
    // Update the record with the assigned technician
    record.technicianId = formData.technicianId;
    record.priority = formData.priority;
    
    // Save the updated record
    await record.save();
    
    return context;
  }
});

Queues

Queues are managed lists of pending tasks that ensure reliable processing, even under heavy load or during system disruptions. They provide a way to handle asynchronous operations, distribute work across multiple processors, and ensure that tasks are completed even if failures occur.

Queue definition

Queues are defined with a name, description, and handler function:
// Define a queue for processing sensor readings
QueueRegistry.register({
  name: "SensorReadingProcessingQueue",
  description: "Processes incoming sensor readings from wind turbines",
  concurrency: 10, // Process up to 10 items concurrently
  handler: async (item) => {
    try {
      // Get the turbine
      const turbine = await WindTurbine.fetch(item.turbineId);
      
      // Create a sensor reading
      const reading = await SensorReading.make({
        turbine: turbine,
        timestamp: new Date(item.timestamp),
        temperature: item.temperature,
        vibration: item.vibration,
        rpm: item.rpm,
        powerOutput: item.powerOutput
      }).save();
      
      // Check for anomalies
      const anomalyDetected = await AnomalyDetectionService.checkReading(reading);
      
      if (anomalyDetected) {
        // Create an alert
        await Alert.make({
          source: turbine,
          relatedTo: reading,
          severity: "MEDIUM",
          message: `Anomaly detected in sensor reading for turbine ${turbine.name}`,
          timestamp: new Date()
        }).save();
        
        // Update turbine status
        turbine.status = "WARNING";
        await turbine.save();
      }
      
      return {
        readingId: reading.id,
        anomalyDetected
      };
    } catch (error) {
      console.error(`Failed to process sensor reading: ${error.message}`);
      throw error; // Rethrow to trigger retry
    }
  },
  retryStrategy: {
    maxRetries: 3,
    backoff: "exponential",
    initialDelay: 1000 // 1 second
  }
});

Queue operations

Items can be added to queues and their status can be monitored:
// Add an item to a queue
QueueManager.enqueue({
  queueName: "SensorReadingProcessingQueue",
  item: {
    turbineId: "T-1001",
    timestamp: new Date().toISOString(),
    temperature: 42.5,
    vibration: 3.2,
    rpm: 15.7,
    powerOutput: 1.8
  }
}).then(result => {
  console.log(`Enqueued item with ID: ${result.itemId}`);
}).catch(error => {
  console.error(`Failed to enqueue item: ${error.message}`);
});

// Add multiple items to a queue
QueueManager.enqueueBatch({
  queueName: "SensorReadingProcessingQueue",
  items: readings.map(reading => ({
    turbineId: reading.turbineId,
    timestamp: reading.timestamp,
    temperature: reading.temperature,
    vibration: reading.vibration,
    rpm: reading.rpm,
    powerOutput: reading.powerOutput
  }))
}).then(result => {
  console.log(`Enqueued ${result.itemIds.length} items`);
}).catch(error => {
  console.error(`Failed to enqueue batch: ${error.message}`);
});

// Get queue statistics
QueueManager.getStats("SensorReadingProcessingQueue").then(stats => {
  console.log(`Queue stats: ${JSON.stringify(stats)}`);
  // Example stats: { pending: 42, processing: 8, completed: 1024, failed: 3 }
}).catch(error => {
  console.error(`Failed to get queue stats: ${error.message}`);
});

Practical application: Wind farm monitoring

Let’s explore how the Event Processing System is used in a wind farm monitoring application:

Real-time sensor data processing

The application processes sensor data in real time using queues and event handlers:
// Define a handler for incoming sensor data
EventBus.on("SENSOR_DATA_RECEIVED", async (data) => {
  try {
    // Enqueue the data for processing
    await QueueManager.enqueue({
      queueName: "SensorReadingProcessingQueue",
      item: {
        turbineId: data.turbineId,
        timestamp: data.timestamp,
        temperature: data.temperature,
        vibration: data.vibration,
        rpm: data.rpm,
        powerOutput: data.powerOutput
      }
    });
    
    return { status: "enqueued" };
  } catch (error) {
    console.error(`Failed to process sensor data: ${error.message}`);
    throw error;
  }
});

// Define a Type event handler for high vibration readings
type SensorReading mixes Entity {
  // Fields...
  
  @onUpdate(["vibration"])
  checkVibrationLevel(oldValue, newValue) ~ js-server {
    // Check if vibration exceeds threshold
    if (newValue > 15) {
      // Get the turbine
      const turbine = this.turbine.fetch();
      
      // Create a high-priority alert
      Alert.make({
        source: turbine,
        relatedTo: this,
        severity: "HIGH",
        message: `High vibration detected (${newValue}) for turbine ${turbine.name}`,
        timestamp: new Date()
      }).save();
      
      // Update turbine status
      turbine.status = "ALARM";
      turbine.save();
      
      // Emit an event for the high vibration
      EventBus.emit("TURBINE_ALARM", {
        turbineId: turbine.id,
        readingId: this.id,
        alarmType: "HIGH_VIBRATION",
        alarmMessage: `High vibration detected: ${newValue}`,
        timestamp: new Date()
      });
    }
  }
}

Scheduled maintenance planning

The application uses jobs to schedule and plan maintenance activities:
// Define a job to identify turbines needing maintenance
JobRegistry.register({
  name: "IdentifyMaintenanceCandidates",
  description: "Identifies turbines that need maintenance based on performance and sensor data",
  implementation: async (params) => {
    // Get all operational turbines
    const turbines = await WindTurbine.filter({
      status: "OPERATIONAL"
    }).fetch();
    
    const maintenanceCandidates = [];
    
    // Process each turbine
    for (const turbine of turbines) {
      // Check if maintenance is due based on last maintenance date
      const daysSinceLastMaintenance = daysBetween(turbine.lastMaintenanceDate, new Date());
      if (daysSinceLastMaintenance > 90) {
        maintenanceCandidates.push({
          turbine: turbine,
          reason: "SCHEDULED_MAINTENANCE_DUE",
          priority: "NORMAL"
        });
        continue;
      }
      
      // Check recent sensor readings for warning signs
      const recentReadings = await SensorReading.filter({
        turbine: { id: turbine.id },
        timestamp: { $gte: subDays(new Date(), 7) }
      }).fetch();
      
      // Analyze readings for maintenance indicators
      const maintenanceIndicators = analyzeReadingsForMaintenanceIndicators(recentReadings);
      
      if (maintenanceIndicators.length > 0) {
        maintenanceCandidates.push({
          turbine: turbine,
          reason: "PERFORMANCE_DEGRADATION",
          indicators: maintenanceIndicators,
          priority: "HIGH"
        });
      }
    }
    
    // Create maintenance recommendations
    for (const candidate of maintenanceCandidates) {
      await MaintenanceRecommendation.make({
        turbine: candidate.turbine,
        reason: candidate.reason,
        priority: candidate.priority,
        indicators: JSON.stringify(candidate.indicators || []),
        status: "PENDING_REVIEW",
        createdDate: new Date()
      }).save();
    }
    
    return {
      turbinesAnalyzed: turbines.length,
      maintenanceCandidatesIdentified: maintenanceCandidates.length
    };
  }
});

// Schedule the job to run weekly
JobScheduler.schedule({
  jobName: "IdentifyMaintenanceCandidates",
  schedule: "0 0 * * 1", // Run at midnight every Monday
  params: {},
  enabled: true,
  description: "Weekly identification of turbines needing maintenance"
});

Maintenance workflow

The application uses workflows to coordinate maintenance activities:
// Define a workflow for maintenance approval
WorkflowRegistry.register({
  name: "MaintenanceApprovalWorkflow",
  description: "Coordinates the approval and execution of maintenance activities",
  steps: [
    {
      id: "initiate",
      type: "start",
      next: "reviewRecommendation"
    },
    {
      id: "reviewRecommendation",
      type: "human_task",
      assignee: "maintenance_manager",
      form: "MaintenanceReviewForm",
      timeout: 86400, // 24 hours
      next: "checkApproval"
    },
    {
      id: "checkApproval",
      type: "decision",
      decision: (context) => context.approved ? "approved" : "rejected",
      branches: {
        approved: "scheduleMaintenance",
        rejected: "recordRejection"
      }
    },
    {
      id: "scheduleMaintenance",
      type: "task",
      task: async (context) => {
        // Create a maintenance record
        const record = await MaintenanceRecord.make({
          turbine: { id: context.turbineId },
          type: context.maintenanceType,
          status: "SCHEDULED",
          priority: context.priority,
          scheduledDate: context.scheduledDate,
          description: context.description
        }).save();
        
        // Update the recommendation status
        const recommendation = await MaintenanceRecommendation.fetch(context.recommendationId);
        recommendation.status = "APPROVED";
        recommendation.resolution = "SCHEDULED_MAINTENANCE";
        recommendation.resolutionDate = new Date();
        await recommendation.save();
        
        // Update the context with the record ID
        context.maintenanceRecordId = record.id;
        
        return context;
      },
      next: "notifyTechnicians"
    },
    {
      id: "recordRejection",
      type: "task",
      task: async (context) => {
        // Update the recommendation status
        const recommendation = await MaintenanceRecommendation.fetch(context.recommendationId);
        recommendation.status = "REJECTED";
        recommendation.resolution = context.rejectionReason;
        recommendation.resolutionDate = new Date();
        await recommendation.save();
        
        return context;
      },
      next: "complete"
    },
    {
      id: "notifyTechnicians",
      type: "task",
      task: async (context) => {
        // Send notifications to technicians
        await NotificationService.sendNotification({
          type: "MAINTENANCE_SCHEDULED",
          recipients: ["technician_team"],
          subject: `Maintenance scheduled for turbine ${context.turbineId}`,
          body: `Maintenance has been scheduled for turbine ${context.turbineId} on ${context.scheduledDate}. Priority: ${context.priority}`,
          data: {
            turbineId: context.turbineId,
            maintenanceRecordId: context.maintenanceRecordId,
            scheduledDate: context.scheduledDate,
            priority: context.priority
          }
        });
        
        return context;
      },
      next: "complete"
    },
    {
      id: "complete",
      type: "end"
    }
  ]
});

// Start the workflow when a recommendation is created
type MaintenanceRecommendation mixes Entity {
  // Fields...
  
  @onCreate
  startApprovalWorkflow() ~ js-server {
    // Start the approval workflow
    WorkflowExecutor.start({
      workflowName: "MaintenanceApprovalWorkflow",
      context: {
        recommendationId: this.id,
        turbineId: this.turbine.id,
        priority: this.priority,
        reason: this.reason
      }
    });
  }
}

Benefits of the Event Processing System

The Event Processing System provides several key benefits:
  • Automation: Automate routine tasks and complex workflows
  • Responsiveness: React to events in real time
  • Reliability: Ensure tasks are completed even if failures occur
  • Scalability: Distribute work across multiple processors
  • Visibility: Monitor and manage tasks and workflows
  • Flexibility: Adapt to changing business requirements
These benefits enable you to build applications that can handle complex processes, respond to events in real time, and scale to meet the needs of your business.