WebSocket Integration Guide
WebSockets provide full-duplex communication channels over a single TCP connection, enabling real-time data exchange between clients and servers. This guide shows you how to create WebSocket handlers in the C3 Agentic AI Platform, to build real-time features like live data streaming, chat applications, and real-time notifications.
WebSocket Integration Overview
WebSocket integration in the C3 Agentic AI Platform allows you to:
- Real-time Communication: Establish persistent connections for instant data exchange
- Bidirectional Messaging: Send and receive messages in both directions simultaneously
- Event-driven Architecture: Handle connection events, message events, and errors
- Authentication Integration: Secure WebSocket connections using C3 AI's authentication system
- Scalable Design: Support multiple concurrent connections with proper resource management
Creating a WebSocket Handler
To create a WebSocket handler, you need to:
- Create a C3 type that mixes
WebSocketHandler - Add the
@webSocketannotation with configuration - Implement the required message handling methods
Basic WebSocket Handler
Here's a simple echo WebSocket handler:
@webSocket(endpoint="/echo")
type EchoWebSocketHandler mixes WebSocketHandler {
onReceiveMessage: ~
onReceiveBuffer: ~
onCloseHandler: ~
onExceptionHandler: ~
}JavaScript Implementation
function onReceiveMessagemessage) {
C3.logger("WebSocket").info("Received message: " + message);
// Echo the received message back to the client
this.sendMessage("Echo: " + message);
}
function onReceiveBuffer(buffer) {
// Echo binary data back to the client
this.sendBuffer(buffer);
}
function onCloseHandler() {
C3.logger("WebSocket").info("WebSocket connection closed");
}
function onExceptionHandler(error) {
C3.logger("WebSocket").error("WebSocket error:" + error);
}WebSocket Annotation Configuration
The @webSocket annotation provides comprehensive configuration options:
Basic Configuration
@webSocket(
endpoint="/my-websocket",
enableLogging=true,
closeOnError=true
)
type MyWebSocketHandler mixes WebSocketHandler {
// Handler implementation
}Advanced Configuration
@webSocket(
endpoint="/stream",
maxMessageSize=10485760, // 10MB
idleTimeout=300000, // 5 minutes
enableLogging=true,
closeOnError=true,
customHeaders={
"X-Custom-Header": "CustomValue",
"Access-Control-Allow-Origin": "*"
}
)
type StreamingWebSocketHandler mixes WebSocketHandler {
// Handler implementation
}Configuration Options
| Option | Type | Default | Description |
|---|---|---|---|
endpoint | string | Required | URL path for the WebSocket endpoint |
maxMessageSize | int32 | 1048576 (1MB) | Maximum message size in bytes |
idleTimeout | int32 | 60000 (60s) | Connection timeout in milliseconds |
enableLogging | boolean | false | Enable debug logging for this endpoint |
closeOnError | boolean | false | Close connection on message processing errors |
customHeaders | map<string, string> | {} | Custom headers for handshake response |
Message Handling Methods
WebSocket handlers support different types of message handling:
Text Message Handling
function onReceiveMessage(message) {
// Process the message
var response = processMessage(message);
// Send response back to client
this.sendMessage("Response: " + response);
}Binary Message Handling
function onReceiveBuffer(buffer) {
// Process binary data
var processedData = processBinaryData(buffer);
// Send processed data back
this.sendBuffer(processedData);
}Connection Event Handling
function onCloseHandler() {
// Cleanup resources, update connection status, etc.
}
function onExceptionHandler(error) {
// Handle errors, log details, notify administrators, etc.
}Real-time Data Streaming Example
Here's a complete example of a real-time data streaming WebSocket handler:
C3 Type Definition
@webSocket(
endpoint="/data-stream",
maxMessageSize=5242880, // 5MB
idleTimeout=300000, // 5 minutes
enableLogging=true,
closeOnError=true
)
type DataStreamWebSocketHandler mixes WebSocketHandler {
onReceiveMessage: ~
onReceiveBuffer: ~
onCloseHandler: ~
onExceptionHandler: ~
}JavaScript Implementation
var activeConnections = [];
var dataInterval;
function onReceiveMessage(message) {
console.log("Received message:", message);
try {
var request = JSON.parse(message);
switch (request.type) {
case "subscribe":
handleSubscription(request);
break;
case "unsubscribe":
handleUnsubscription(request);
break;
case "filter":
updateFilter(request);
break;
default:
this.sendMessage(JSON.stringify({
type: "error",
message: "Unknown message type: " + request.type
}));
}
} catch (e) {
this.sendMessage(JSON.stringify({
type: "error",
message: "Invalid JSON: " + e.message
}));
}
}
function handleSubscription(request) {
// Add connection to active list
activeConnections.push(this);
// Start data streaming if not already started
if (!dataInterval) {
startDataStreaming();
}
// Send confirmation
this.sendMessage(JSON.stringify({
type: "subscribed",
message: "Successfully subscribed to data stream"
}));
}
function handleUnsubscription(request) {
// Remove connection from active list
var index = activeConnections.indexOf(this);
if (index > -1) {
activeConnections.splice(index, 1);
}
// Stop data streaming if no active connections
if (activeConnections.length === 0) {
stopDataStreaming();
}
this.sendMessage(JSON.stringify({
type: "unsubscribed",
message: "Successfully unsubscribed from data stream"
}));
}
function startDataStreaming() {
dataInterval = setInterval(function() {
// Generate or fetch real-time data
var data = generateRealTimeData();
// Send to all active connections
activeConnections.forEach(function(connection) {
if (connection && !connection.closed()) {
connection.send(JSON.stringify({
type: "data",
timestamp: new Date().toISOString(),
data: data
}));
}
});
}, 1000); // Send data every second
}
function stopDataStreaming() {
if (dataInterval) {
clearInterval(dataInterval);
dataInterval = null;
}
}
function onCloseHandler() {
// Remove this connection from active list
var index = activeConnections.indexOf(this);
if (index > -1) {
activeConnections.splice(index, 1);
}
// Stop streaming if no more connections
if (activeConnections.length === 0) {
stopDataStreaming();
}
}
function onExceptionHandler(error) {
console.error("Data stream WebSocket error:", error);
// Remove this connection from active list
var index = activeConnections.indexOf(this);
if (index > -1) {
activeConnections.splice(index, 1);
}
}
function generateRealTimeData() {
// This would typically fetch data from your data sources
return {
temperature: Math.random() * 100,
humidity: Math.random() * 100,
pressure: Math.random() * 1000,
timestamp: new Date().toISOString()
};
}Chat Application Example
Here's a complete chat application WebSocket handler:
C3 Type Definition
@webSocket(
endpoint="/chat",
maxMessageSize=1048576, // 1MB
idleTimeout=1800000, // 30 minutes
enableLogging=true,
closeOnError=true
)
type ChatWebSocketHandler mixes WebSocketHandler {
onReceiveMessage: ~
onCloseHandler: ~
onExceptionHandler: ~
}JavaScript Implementation
var chatRooms = {};
var userConnections = {};
function onReceiveMessagemessage) {
try {
var chatMessage = JSON.parse(message);
switch (chatMessage.type) {
case "join":
handleJoinRoom(chatMessage);
break;
case "leave":
handleLeaveRoom(chatMessage);
break;
case "message":
handleChatMessage(chatMessage);
break;
case "typing":
handleTypingIndicator(chatMessage);
break;
default:
this.sendMessage(JSON.stringify({
type: "error",
message: "Unknown message type"
}));
}
} catch (e) {
this.sendMessage(JSON.stringify({
type: "error",
message: "Invalid message format"
}));
}
}
function handleJoinRoom(chatMessage) {
var roomId = chatMessage.roomId;
var username = chatMessage.username;
// Initialize room if it doesn't exist
if (!chatRooms[roomId]) {
chatRooms[roomId] = [];
}
// Add user to room
chatRooms[roomId].push({
connection: this,
username: username,
joinedAt: new Date()
});
// Store user connection
userConnections[username] = this;
// Notify room about new user
broadcastToRoom(roomId, {
type: "user_joined",
username: username,
timestamp: new Date().toISOString()
});
// Send room history to new user
this.sendMessage(JSON.stringify({
type: "room_history",
messages: getRoomHistory(roomId)
}));
}
function handleLeaveRoom(chatMessage) {
var roomId = chatMessage.roomId;
var username = chatMessage.username;
// Remove user from room
if (chatRooms[roomId]) {
chatRooms[roomId] = chatRooms[roomId].filter(function(user) {
return user.username !== username;
});
}
// Remove user connection
delete userConnections[username];
// Notify room about user leaving
broadcastToRoom(roomId, {
type: "user_left",
username: username,
timestamp: new Date().toISOString()
});
}
function handleChatMessage(chatMessage) {
var roomId = chatMessage.roomId;
var username = chatMessage.username;
var message = chatMessage.message;
// Create message object
var messageObj = {
type: "message",
roomId: roomId,
username: username,
message: message,
timestamp: new Date().toISOString()
};
// Store message in room history
storeMessage(roomId, messageObj);
// Broadcast to all users in room
broadcastToRoom(roomId, messageObj);
}
function handleTypingIndicator(chatMessage) {
var roomId = chatMessage.roomId;
var username = chatMessage.username;
var isTyping = chatMessage.isTyping;
// Broadcast typing indicator to room (excluding sender)
broadcastToRoom(roomId, {
type: "typing",
username: username,
isTyping: isTyping,
timestamp: new Date().toISOString()
}, username);
}
function broadcastToRoom(roomId, message, excludeUser) {
if (chatRooms[roomId]) {
chatRooms[roomId].forEach(function(user) {
if (user.connection && !user.connection.closed() && user.username !== excludeUser) {
user.connection.send(JSON.stringify(message));
}
});
}
}
function onCloseHandler() {
// Find and remove this connection from all rooms
for (var roomId in chatRooms) {
chatRooms[roomId] = chatRooms[roomId].filter(function(user) {
if (user.connection === this) {
// Notify room about disconnection
broadcastToRoom(roomId, {
type: "user_disconnected",
username: user.username,
timestamp: new Date().toISOString()
});
return false;
}
return true;
}.bind(this));
}
}
function onExceptionHandler(error) {
console.error("Chat WebSocket error:", error);
// Remove this connection from all rooms
for (var roomId in chatRooms) {
chatRooms[roomId] = chatRooms[roomId].filter(function(user) {
return user.connection !== this;
}.bind(this));
}
}
// Helper functions
function getRoomHistory(roomId) {
// This would typically fetch from a database
return chatRooms[roomId] ? chatRooms[roomId].messages || [] : [];
}
function storeMessage(roomId, message) {
if (!chatRooms[roomId]) {
chatRooms[roomId] = [];
}
if (!chatRooms[roomId].messages) {
chatRooms[roomId].messages = [];
}
chatRooms[roomId].messages.push(message);
}Client-Side Integration
JavaScript Client Example
// Connect to WebSocket
var ws = new WebSocket('ws://localhost:8080/myapp/echo');
ws.onopen = function(event) {
console.log('WebSocket connected');
// Send a test message
ws.send('Hello WebSocket!');
};
ws.onmessage = function(event) {
console.log('Received:', event.data);
// Handle different message types
try {
var message = JSON.parse(event.data);
handleMessage(message);
} catch (e) {
// Handle plain text messages
console.log('Text message:', event.data);
}
};
ws.onclose = function(event) {
console.log('WebSocket closed:', event.code, event.reason);
};
ws.onerror = function(error) {
console.error('WebSocket error:', error);
};
function handleMessage(message) {
switch (message.type) {
case 'data':
updateDataDisplay(message.data);
break;
case 'error':
showError(message.message);
break;
case 'user_joined':
showUserJoined(message.username);
break;
case 'message':
displayChatMessage(message);
break;
}
}Authentication Headers
When connecting to WebSocket endpoints that require authentication:
// For basic authentication
var ws = new WebSocket('ws://localhost:8080/myapp/secure-endpoint', [], {
headers: {
'Authorization': 'Bearer ' + authToken
}
});
// Or using the C3 test suite for testing
var request = new ClientUpgradeRequest();
request.setHeader("Authorization", TestSuite.testAuthHeader());Best Practices
1. Error Handling
Always implement proper error handling in your WebSocket handlers:
function onReceiveMessage(message) {
try {
// Process message
processMessage(message);
} catch (error) {
console.error("Error processing message:", error);
this.sendMessage(JSON.stringify({
type: "error",
message: "Failed to process message: " + error.message
}));
}
}2. Connection Management
Keep track of active connections and clean up properly:
var activeConnections = new Set();
function onReceiveMessage(message) {
// Add connection to active set
activeConnections.add(this);
// Process message
processMessage(message);
}
function onCloseHandler() {
// Remove from active connections
activeConnections.delete(this);
// Cleanup resources
cleanup();
}3. Message Validation
Validate incoming messages before processing:
function onReceiveMessage(message) {
try {
var data = JSON.parse(message);
// Validate required fields
if (!data.type || !data.payload) {
throw new Error("Invalid message format");
}
// Process validated message
processMessage(data);
} catch (error) {
this.sendMessage(JSON.stringify({
type: "error",
message: "Invalid message: " + error.message
}));
}
}4. Resource Cleanup
Always clean up resources when connections close:
function onCloseHandler() {
// Stop any timers or intervals
if (this.intervalId) {
clearInterval(this.intervalId);
}
// Remove from active connections
removeConnection(this);
// Cleanup any other resources
cleanup();
}Troubleshooting
Common Issues
Connection Refused: Check that the WebSocket endpoint is correctly configured and the server is running.
Authentication Failures: Ensure proper authentication headers are included in the WebSocket handshake.
Message Size Limits: If sending large messages, increase the
maxMessageSizeconfiguration.Connection Timeouts: Adjust the
idleTimeoutsetting for long-running connections.
Debug Logging
Enable logging to debug WebSocket issues:
@webSocket(
endpoint="/debug",
enableLogging=true
)
type DebugWebSocketHandler mixes WebSocketHandler {
// Handler implementation
}This guide provides a comprehensive foundation for building real-time WebSocket applications in the C3 Agentic AI Platform. For more advanced use cases and specific implementation details, refer to the WebSocket API documentation and examples.