C3 AI Documentation Home

WebSocket Integration Guide

WebSockets provide full-duplex communication channels over a single TCP connection, enabling real-time data exchange between clients and servers. This guide shows you how to create WebSocket handlers in the C3 Agentic AI Platform, to build real-time features like live data streaming, chat applications, and real-time notifications.

WebSocket Integration Overview

WebSocket integration in the C3 Agentic AI Platform allows you to:

  • Real-time Communication: Establish persistent connections for instant data exchange
  • Bidirectional Messaging: Send and receive messages in both directions simultaneously
  • Event-driven Architecture: Handle connection events, message events, and errors
  • Authentication Integration: Secure WebSocket connections using C3 AI's authentication system
  • Scalable Design: Support multiple concurrent connections with proper resource management

Creating a WebSocket Handler

To create a WebSocket handler, you need to:

  1. Create a C3 type that mixes WebSocketHandler
  2. Add the @webSocket annotation with configuration
  3. Implement the required message handling methods

Basic WebSocket Handler

Here's a simple echo WebSocket handler:

Type
@webSocket(endpoint="/echo")
type EchoWebSocketHandler mixes WebSocketHandler {
  onReceiveMessage: ~
  onReceiveBuffer: ~
  onCloseHandler: ~
  onExceptionHandler: ~
}

JavaScript Implementation

JavaScript
function onReceiveMessagemessage) {
  C3.logger("WebSocket").info("Received message: " + message);
  // Echo the received message back to the client
  this.sendMessage("Echo: " + message);
}

function onReceiveBuffer(buffer) {
  // Echo binary data back to the client
  this.sendBuffer(buffer);
}

function onCloseHandler() {
  C3.logger("WebSocket").info("WebSocket connection closed");
}

function onExceptionHandler(error) {
  C3.logger("WebSocket").error("WebSocket error:" + error);
}

WebSocket Annotation Configuration

The @webSocket annotation provides comprehensive configuration options:

Basic Configuration

Type
@webSocket(
  endpoint="/my-websocket",
  enableLogging=true,
  closeOnError=true
)
type MyWebSocketHandler mixes WebSocketHandler {
  // Handler implementation
}

Advanced Configuration

Type
@webSocket(
  endpoint="/stream",
  maxMessageSize=10485760,  // 10MB
  idleTimeout=300000,       // 5 minutes
  enableLogging=true,
  closeOnError=true,
  customHeaders={
    "X-Custom-Header": "CustomValue",
    "Access-Control-Allow-Origin": "*"
  }
)
type StreamingWebSocketHandler mixes WebSocketHandler {
  // Handler implementation
}

Configuration Options

OptionTypeDefaultDescription
endpointstringRequiredURL path for the WebSocket endpoint
maxMessageSizeint321048576 (1MB)Maximum message size in bytes
idleTimeoutint3260000 (60s)Connection timeout in milliseconds
enableLoggingbooleanfalseEnable debug logging for this endpoint
closeOnErrorbooleanfalseClose connection on message processing errors
customHeadersmap<string, string>{}Custom headers for handshake response

Message Handling Methods

WebSocket handlers support different types of message handling:

Text Message Handling

JavaScript
function onReceiveMessage(message) {
  
  // Process the message
  var response = processMessage(message);
  
  // Send response back to client
  this.sendMessage("Response: " + response);
}

Binary Message Handling

JavaScript
function onReceiveBuffer(buffer) {
  // Process binary data
  var processedData = processBinaryData(buffer);
  
  // Send processed data back
  this.sendBuffer(processedData);
}

Connection Event Handling

JavaScript
function onCloseHandler() {
  // Cleanup resources, update connection status, etc.
}

function onExceptionHandler(error) {
  // Handle errors, log details, notify administrators, etc.
}

Real-time Data Streaming Example

Here's a complete example of a real-time data streaming WebSocket handler:

C3 Type Definition

Type
@webSocket(
  endpoint="/data-stream",
  maxMessageSize=5242880,  // 5MB
  idleTimeout=300000,      // 5 minutes
  enableLogging=true,
  closeOnError=true
)
type DataStreamWebSocketHandler mixes WebSocketHandler {
  onReceiveMessage: ~
  onReceiveBuffer: ~
  onCloseHandler: ~
  onExceptionHandler: ~
}

JavaScript Implementation

JavaScript
var activeConnections = [];
var dataInterval;

function onReceiveMessage(message) {
  console.log("Received message:", message);
  
  try {
    var request = JSON.parse(message);
    
    switch (request.type) {
      case "subscribe":
        handleSubscription(request);
        break;
      case "unsubscribe":
        handleUnsubscription(request);
        break;
      case "filter":
        updateFilter(request);
        break;
      default:
        this.sendMessage(JSON.stringify({
          type: "error",
          message: "Unknown message type: " + request.type
        }));
    }
  } catch (e) {
    this.sendMessage(JSON.stringify({
      type: "error",
      message: "Invalid JSON: " + e.message
    }));
  }
}

function handleSubscription(request) {
  // Add connection to active list
  activeConnections.push(this);
  
  // Start data streaming if not already started
  if (!dataInterval) {
    startDataStreaming();
  }
  
  // Send confirmation
  this.sendMessage(JSON.stringify({
    type: "subscribed",
    message: "Successfully subscribed to data stream"
  }));
}

function handleUnsubscription(request) {
  // Remove connection from active list
  var index = activeConnections.indexOf(this);
  if (index > -1) {
    activeConnections.splice(index, 1);
  }
  
  // Stop data streaming if no active connections
  if (activeConnections.length === 0) {
    stopDataStreaming();
  }
  
  this.sendMessage(JSON.stringify({
    type: "unsubscribed",
    message: "Successfully unsubscribed from data stream"
  }));
}

function startDataStreaming() {
  dataInterval = setInterval(function() {
    // Generate or fetch real-time data
    var data = generateRealTimeData();
    
    // Send to all active connections
    activeConnections.forEach(function(connection) {
      if (connection && !connection.closed()) {
        connection.send(JSON.stringify({
          type: "data",
          timestamp: new Date().toISOString(),
          data: data
        }));
      }
    });
  }, 1000); // Send data every second
}

function stopDataStreaming() {
  if (dataInterval) {
    clearInterval(dataInterval);
    dataInterval = null;
  }
}

function onCloseHandler() {
  // Remove this connection from active list
  var index = activeConnections.indexOf(this);
  if (index > -1) {
    activeConnections.splice(index, 1);
  }
  
  // Stop streaming if no more connections
  if (activeConnections.length === 0) {
    stopDataStreaming();
  }
}

function onExceptionHandler(error) {
  console.error("Data stream WebSocket error:", error);
  
  // Remove this connection from active list
  var index = activeConnections.indexOf(this);
  if (index > -1) {
    activeConnections.splice(index, 1);
  }
}

function generateRealTimeData() {
  // This would typically fetch data from your data sources
  return {
    temperature: Math.random() * 100,
    humidity: Math.random() * 100,
    pressure: Math.random() * 1000,
    timestamp: new Date().toISOString()
  };
}

Chat Application Example

Here's a complete chat application WebSocket handler:

C3 Type Definition

Type
@webSocket(
  endpoint="/chat",
  maxMessageSize=1048576,  // 1MB
  idleTimeout=1800000,     // 30 minutes
  enableLogging=true,
  closeOnError=true
)
type ChatWebSocketHandler mixes WebSocketHandler {
  onReceiveMessage: ~
  onCloseHandler: ~
  onExceptionHandler: ~
}

JavaScript Implementation

JavaScript
var chatRooms = {};
var userConnections = {};

function onReceiveMessagemessage) {
  try {
    var chatMessage = JSON.parse(message);
    
    switch (chatMessage.type) {
      case "join":
        handleJoinRoom(chatMessage);
        break;
      case "leave":
        handleLeaveRoom(chatMessage);
        break;
      case "message":
        handleChatMessage(chatMessage);
        break;
      case "typing":
        handleTypingIndicator(chatMessage);
        break;
      default:
        this.sendMessage(JSON.stringify({
          type: "error",
          message: "Unknown message type"
        }));
    }
  } catch (e) {
    this.sendMessage(JSON.stringify({
      type: "error",
      message: "Invalid message format"
    }));
  }
}

function handleJoinRoom(chatMessage) {
  var roomId = chatMessage.roomId;
  var username = chatMessage.username;
  
  // Initialize room if it doesn't exist
  if (!chatRooms[roomId]) {
    chatRooms[roomId] = [];
  }
  
  // Add user to room
  chatRooms[roomId].push({
    connection: this,
    username: username,
    joinedAt: new Date()
  });
  
  // Store user connection
  userConnections[username] = this;
  
  // Notify room about new user
  broadcastToRoom(roomId, {
    type: "user_joined",
    username: username,
    timestamp: new Date().toISOString()
  });
  
  // Send room history to new user
  this.sendMessage(JSON.stringify({
    type: "room_history",
    messages: getRoomHistory(roomId)
  }));
}

function handleLeaveRoom(chatMessage) {
  var roomId = chatMessage.roomId;
  var username = chatMessage.username;
  
  // Remove user from room
  if (chatRooms[roomId]) {
    chatRooms[roomId] = chatRooms[roomId].filter(function(user) {
      return user.username !== username;
    });
  }
  
  // Remove user connection
  delete userConnections[username];
  
  // Notify room about user leaving
  broadcastToRoom(roomId, {
    type: "user_left",
    username: username,
    timestamp: new Date().toISOString()
  });
}

function handleChatMessage(chatMessage) {
  var roomId = chatMessage.roomId;
  var username = chatMessage.username;
  var message = chatMessage.message;
  
  // Create message object
  var messageObj = {
    type: "message",
    roomId: roomId,
    username: username,
    message: message,
    timestamp: new Date().toISOString()
  };
  
  // Store message in room history
  storeMessage(roomId, messageObj);
  
  // Broadcast to all users in room
  broadcastToRoom(roomId, messageObj);
}

function handleTypingIndicator(chatMessage) {
  var roomId = chatMessage.roomId;
  var username = chatMessage.username;
  var isTyping = chatMessage.isTyping;
  
  // Broadcast typing indicator to room (excluding sender)
  broadcastToRoom(roomId, {
    type: "typing",
    username: username,
    isTyping: isTyping,
    timestamp: new Date().toISOString()
  }, username);
}

function broadcastToRoom(roomId, message, excludeUser) {
  if (chatRooms[roomId]) {
    chatRooms[roomId].forEach(function(user) {
      if (user.connection && !user.connection.closed() && user.username !== excludeUser) {
        user.connection.send(JSON.stringify(message));
      }
    });
  }
}

function onCloseHandler() {
  // Find and remove this connection from all rooms
  for (var roomId in chatRooms) {
    chatRooms[roomId] = chatRooms[roomId].filter(function(user) {
      if (user.connection === this) {
        // Notify room about disconnection
        broadcastToRoom(roomId, {
          type: "user_disconnected",
          username: user.username,
          timestamp: new Date().toISOString()
        });
        return false;
      }
      return true;
    }.bind(this));
  }
}

function onExceptionHandler(error) {
  console.error("Chat WebSocket error:", error);
  
  // Remove this connection from all rooms
  for (var roomId in chatRooms) {
    chatRooms[roomId] = chatRooms[roomId].filter(function(user) {
      return user.connection !== this;
    }.bind(this));
  }
}

// Helper functions
function getRoomHistory(roomId) {
  // This would typically fetch from a database
  return chatRooms[roomId] ? chatRooms[roomId].messages || [] : [];
}

function storeMessage(roomId, message) {
  if (!chatRooms[roomId]) {
    chatRooms[roomId] = [];
  }
  if (!chatRooms[roomId].messages) {
    chatRooms[roomId].messages = [];
  }
  chatRooms[roomId].messages.push(message);
}

Client-Side Integration

JavaScript Client Example

JavaScript
// Connect to WebSocket
var ws = new WebSocket('ws://localhost:8080/myapp/echo');

ws.onopen = function(event) {
  console.log('WebSocket connected');
  
  // Send a test message
  ws.send('Hello WebSocket!');
};

ws.onmessage = function(event) {
  console.log('Received:', event.data);
  
  // Handle different message types
  try {
    var message = JSON.parse(event.data);
    handleMessage(message);
  } catch (e) {
    // Handle plain text messages
    console.log('Text message:', event.data);
  }
};

ws.onclose = function(event) {
  console.log('WebSocket closed:', event.code, event.reason);
};

ws.onerror = function(error) {
  console.error('WebSocket error:', error);
};

function handleMessage(message) {
  switch (message.type) {
    case 'data':
      updateDataDisplay(message.data);
      break;
    case 'error':
      showError(message.message);
      break;
    case 'user_joined':
      showUserJoined(message.username);
      break;
    case 'message':
      displayChatMessage(message);
      break;
  }
}

Authentication Headers

When connecting to WebSocket endpoints that require authentication:

JavaScript
// For basic authentication
var ws = new WebSocket('ws://localhost:8080/myapp/secure-endpoint', [], {
  headers: {
    'Authorization': 'Bearer ' + authToken
  }
});

// Or using the C3 test suite for testing
var request = new ClientUpgradeRequest();
request.setHeader("Authorization", TestSuite.testAuthHeader());

Best Practices

1. Error Handling

Always implement proper error handling in your WebSocket handlers:

JavaScript
function onReceiveMessage(message) {
  try {
    // Process message
    processMessage(message);
  } catch (error) {
    console.error("Error processing message:", error);
    this.sendMessage(JSON.stringify({
      type: "error",
      message: "Failed to process message: " + error.message
    }));
  }
}

2. Connection Management

Keep track of active connections and clean up properly:

JavaScript
var activeConnections = new Set();

function onReceiveMessage(message) {
  // Add connection to active set
  activeConnections.add(this);
  
  // Process message
  processMessage(message);
}

function onCloseHandler() {
  // Remove from active connections
  activeConnections.delete(this);
  
  // Cleanup resources
  cleanup();
}

3. Message Validation

Validate incoming messages before processing:

JavaScript
function onReceiveMessage(message) {
  try {
    var data = JSON.parse(message);
    
    // Validate required fields
    if (!data.type || !data.payload) {
      throw new Error("Invalid message format");
    }
    
    // Process validated message
    processMessage(data);
  } catch (error) {
    this.sendMessage(JSON.stringify({
      type: "error",
      message: "Invalid message: " + error.message
    }));
  }
}

4. Resource Cleanup

Always clean up resources when connections close:

JavaScript
function onCloseHandler() {
  // Stop any timers or intervals
  if (this.intervalId) {
    clearInterval(this.intervalId);
  }
  
  // Remove from active connections
  removeConnection(this);
  
  // Cleanup any other resources
  cleanup();
}

Troubleshooting

Common Issues

  1. Connection Refused: Check that the WebSocket endpoint is correctly configured and the server is running.

  2. Authentication Failures: Ensure proper authentication headers are included in the WebSocket handshake.

  3. Message Size Limits: If sending large messages, increase the maxMessageSize configuration.

  4. Connection Timeouts: Adjust the idleTimeout setting for long-running connections.

Debug Logging

Enable logging to debug WebSocket issues:

Type
@webSocket(
  endpoint="/debug",
  enableLogging=true
)
type DebugWebSocketHandler mixes WebSocketHandler {
  // Handler implementation
}

This guide provides a comprehensive foundation for building real-time WebSocket applications in the C3 Agentic AI Platform. For more advanced use cases and specific implementation details, refer to the WebSocket API documentation and examples.

Was this page helpful?