C3 AI Documentation Home

Azure Event Hubs Connector

The C3 Agentic AI Platform connector for Azure Event Hubs enables integration between C3 Agentic AI Platform applications and your Azure Event Hubs service.

When integrating with Azure Event Hubs in the context of data integration (consuming data to be persisted in a C3 AI Platform data store), it is recommended that you use the CloudMessageSourceSystem framework. The CloudMessageSourceSystem framework provides a codeless approach to consuming and ingesting messages from a variety of messaging systems including Azure Event Hubs, AWS Kinesis and Apache Kafka.

Prerequisites

Before using the Azure Event Hubs connector, ensure that you have access to an active Event Hubs namespace with one or more Hubs. See the Azure Event Hubs documentation for more information.

Register Azure Event Hubs credentials with the C3 Agentic AI Platform

Begin by defining the Event Hub cloud message broker name:

JavaScript
var ehResourceName = AzureEventHub.makeResourceName("EVENT_HUB_NAME", "EVENT_HUB_NAMESPACE", "RESOURCE_GROUP");

Generate the configuration key from an AzureEventHub Type instance constructed using the broker name. Then, set the broker name as the configuration value:

JavaScript
var configKey = AzureEventHub.make({resourceName: ehResourceName}).configKey();
AzureEventHub.setConfigOrSecretValue(configKey, "resourceName", ehResourceName);
AzureEventHub.setConfigOrSecretValue(configKey, "external", true);

Setting credentials using Service Principal

In this task, you configure authentication for your Azure resources using a service principal. This approach enhances security by allowing your application to authenticate without relying on user credentials.

Here's a step-by-step overview to guide you through the process:

Understand Service Principal authentication

A service principal is an identity created for use with applications, hosted services, and automated tools to access Azure resources. It uses a client ID and client secret for authentication.

Gather required information

Before you start, ensure you have the following details:

  1. Active Directory ID — Your Microsoft Entra ID Tenant ID
  2. Access Key — The Application (Client) ID for your registered application
  3. Secret Key — The client secret (application password) generated for your application
  4. Account ID — Your Azure subscription ID
  5. Region — The Azure region where your resources are located

Create the credentials object

Use the gathered information to create a credentials object in your code. Here’s a template to follow:

JavaScript
var credentials = {
  "type" : "AzureCredentials",
  "activeDirectoryId" : "< ACTIVE_DIRECTORY_ID >",
  "accessKey" : "< APPLICATION_ID >",
  "secretKey" : "< SECRET_ID >",
  "accountId" : "< SUBSCRIPTION_ID >",
  "region" : "< CLOUD_REGION >"
}

Set credentials for your resource

Use the credentials object to authenticate your application with Azure resources. This typically involves calling a method like AzureEventHub.setCredentialsForResourceName, passing in the resource name and the credentials object.

Test your configuration

After setting up the credentials, run your application to ensure that it can successfully authenticate and interact with Azure resources. Monitor for any errors and adjust your configuration as needed.

Conclusion

By following these steps, you securely set up service principal authentication for your Azure resources, allowing your application to operate efficiently and safely.

Note on Azure Event Hub client creation

When the following three fields are provided, the code always attempt to use the service principal to create the Azure Event Hub client:

  • activeDirectoryId
  • secretKey
  • accessKey

If you also intend to use a shared key while populating these fields, the code may still successfully create a client using the service principal. However, be aware that this client may have different roles compared to those assigned with the shared key.

Role assignment for Azure Event Hub access

After setting up credentials for users using a service principal in Azure, it is essential to assign the appropriate roles to ensure they can interact with Azure Event Hub effectively. Follow these steps:

  1. Access the Azure Portal: Log in to the Azure Portal and navigate to your subscription.

  2. Open Access Control (IAM): Select the subscription and go to the Access control (IAM) section.

  3. Add Role Assignment:

    a. Select + Add and choose Add role assignment.

    b. Select the Azure Event Hub Owner role to grant full access.

  4. Assign to Service Principal: Use the search option to find and select the service principal that requires access.

  5. Verify Assignment: Confirm that the service principal appears in the Role assignments section, indicating successful permission setup.

This process ensures that users can send and receive messages through Azure Event Hub, facilitating seamless integration and functionality.

Steps to test sending and receiving messages in static console

After configuring authentication for Azure resources using a service principal, follow these steps to test sending and receiving messages in the static console:

Define Azure Event Hub resource

Specify the name of your Azure Event Hub resource. If you don't have one, create it as needed. This variable holds the name of your Azure Event Hub resource. Ensure it is correctly specified to match your setup.

Python
eventHubResourceName = "<your_event_hub_resource_name>"

Create a service principal credential

Create a service principal credential to authenticate and authorize access to Azure resources.

JavaScript
cred=AzureCredentials.make({
  activeDirectoryId: "<your_active_directory_id>",
  secretKey: "<your_client_secret>",
  description: "<description>",
  accountId: "<your_subscription_id>",
  accessKey: "<your_application_or_tenant_id>",
  region: "<your_region>",
})

Set the credential

Assign the service principal credential to the CloudMessageSourceSystem to enable authenticated access to Azure resources. Use the setCredentials() method on the CloudMessageSourceSystem instance to assign the previously created service principal credential.

JavaScript
// create an instance of CloudMessageSourceSystem with the name "EventHub" using the make function
cmss=CloudMessageSourceSystem.make({
  "name": "EventHub"
})
//set the credentials
cmss.setCredentials(cred)

Create a cloud message

Create a cloud message to be sent to the Azure Event Hub.

Use the CloudStreamMessage.make() method to create a new message with the specified parameters.

JavaScript
msg = CloudStreamMessage.make({
  id: "123",
})

By creating a cloud message with these parameters, you prepare a message that can be sent to the Azure Event Hub for processing.

Send the message

By using the DataIntegrator.sendObjs() method, you send the message to the Azure Event Hub, ensuring it is processed and delivered according to the specified parameters.

JavaScript
DataIntegrator.sendObjs({'type': 'AzureEventHub', 'resourceName': '<your_event_hur_resource_name>'}, [msg], {'batchSize': 100, 'contentType': "application/json"})

Receive and verify the message on Azure portal

Receive and verify the messages sent to the Azure Event Hub, ensuring that the entire process from sending to receiving works correctly.

JavaScript
msgs=DataIntegrator.receiveMessages({'type': 'AzureEventHub', 'resourceName': '<your_event_hur_resource_name>'}, {'type': 'CloudStreamReceiveMessagesSpec', 'partition': "1", 'timeoutMillis': 1108, 'batchSize': 100})

Setting credentials using accessKey

In this task, you configure authentication for accessing Azure Event Hubs using the access key method. This approach utilizes a general Azure credentials setup, which can encompass various authentication methods. Follow these steps to ensure a successful configuration:

Understand accessKey authentication

The access key method allows your application to authenticate with Azure resources using a key that grants access to the associated services.

Gather necessary information

Before proceeding, make sure you have the following details:

  1. Active Directory ID — Your Microsoft Entra ID Tenant ID

  2. Access Key — The key used for authentication, which may be a primary or secondary key

  3. Secret Key — If applicable, this may refer to a client secret for additional security

  4. Account ID — Your Azure subscription ID

  5. Region — The Azure region where your resources are located

    a. Shared Access Policy Name and Key: If using SAS, gather these details as well.

Create the credentials object

Use the information collected to create a credentials object in your code. Here's a template to follow:

JavaScript
var credentials = {
  "type": "AzureCredentials",
  "activeDirectoryId": "< ACTIVE_DIRECTORY_ID >", // Tenant ID
  "accessKey": "< ACCESS_KEY >", // Access key
  "secretKey": "< SECRET_ID >", // Optional client secret
  "accountId": "< SUBSCRIPTION_ID >", // Subscription ID
  "region": "< CLOUD_REGION >", // Azure region
  "sharedAccessPolicyName": "< SAS_POLICY_NAME >", // Required for SAS
  "sharedAccessPolicyKey": "< SAS_KEY >" // Required for SAS
}

Set credentials for your resource

Use the credentials object to authenticate your application with Azure Event Hubs. This typically involves calling a method like AzureEventHub.setCredentialsForResourceName, passing in the resource name and the credentials object.

JavaScript
AzureEventHub.setCredentialsForResourceName(ehResourceName, credentials, ConfigOverride.APP);

This is crucial for establishing a secure connection to an Azure Event Hub by applying the necessary credentials. It allows your application to authenticate and interact with the Event Hub, enabling you to manage and process events effectively.

Test your configuration

After setting up the credentials, run your application to ensure it can successfully authenticate and interact with Azure Event Hubs. Monitor for any errors and adjust your configuration as needed.

Conclusion

By following these steps, you securely set up authentication for Azure Event Hubs using the access key method. This approach allows your application to access resources efficiently while maintaining security.

Configuring an Azure Event Hub consumer

Configuring the C3 Agentic AI Platform to subscribe to an Azure Event Hub requires the following 3 steps:

  1. Configure the Types to model the messages consumed from the Event Hub.
  2. Configure CloudMessageSourceSystem and CloudMessageSourceCollection seed data.
  3. Enable the CloudMessageSourceCollection to receive messages.

Additionally, the CloudInboundMessage Type can be configured to explicitly model an Event Hub consumer. CloudInboundMessage is a parametric Type requiring you to specify a Type that represents the messages consumed from the topic.

The following example defines and applies the configuration required by the C3 Agentic AI Platform to subscribe to the eh1 Event Hub available from Azure Event Hubs.

This example contains messages published to the eh1 hub with the following format:

JSON
{
  "Country": "USA",
  "City": "New York",
  "Date": "2023-05-11",
  "Temperature": 20,
  "Humidity": 60,
  "Wind Speed": 10,
  "Wind Direction": "NW",
  "Precipitation": 0,
  "Cloud Cover": 20,
  "Visibility": 10,
  "Pressure": 1013,
  "Dew Point": 10,
  "UV Index": 5,
  "Sunrise": "05:30",
  "Sunset": "20:15",
  "Moonrise": "22:00",
  "Moonset": "08:00",
  "Moon Phase": "Waning Gibbous",
  "Conditions": "Partly Cloudy",
  "Icon": "partly-cloudy-day"
}

Configure the Types to model the messages consumed from the Event Hub

First, you must define your C3 AI Types to model the JSON format of the messages to be consumed from the Event Hub. This example has one Type, SourceWeatherDataMeasurement, to model according to the JSON message above.

Type
/**
 * This type represents the raw data that will represent `WeatherDataMeasurement` information.
 */
type SourceWeatherDataMeasurement mixes Source  {

  Country: string

  City: string

  Date: datetime

  Temperature: int

  Humidity: int

  @ser(name="Wind Speed")
  WindSpeed: int

  @ser(name="Wind Direction")
  WindDirection: string

  Precipitation: int

  @ser(name="Cloud Cover")
  CloudCover: int

  Visibility: int

  Pressure: int

  @ser(name="Dew Point")
  DewPoint: int

  @ser(name="UV Index")
  UvIndex: int

  Sunrise: string

  Moonrise: string

  Moonset: string

  @ser(name="Moon Phase")
  MoonPhase: string

  Conditions: string

  Icon: string

}

Configure the CloudMessageSourceSystem and CloudMessageSourceCollection Types to register the Event Hub

A CloudMessageSourceSystem is a means of organizing related CloudMessageSourceCollections.

A CloudMessageSourceSystem is created by specifying its name. For example, you can add the following EventHub.json to the \metadata\CloudMessageSourceSystem directory of your package:

JSON
{
    "name": "EventHub"
}

A CloudMessageSourceCollection represents a source collection that references a CloudMessageBroker and its related queue, topic, stream, etc.

The CloudMessageSourceCollection references a CloudMessageSourceSystem and contains information about how to process data received from that topic, as well as information required to connect to the topic.

For example, to model a CloudMessageSourceCollection called WeatherDataMeasurementSource, you can add the following WeatherDataMeasurementEventHub.json to the \metadata\CloudMessageSourceCollection directory of your package:

JSON
{
    "name": "WeatherDataMeasurementSource",
    "sourceSystem": {
        "type" : "CloudMessageSourceSystem", 
        "name" : "EventHub"
    },
    "source": "SourceWeatherDataMeasurement",
    "external": true,
    "cloudMessageBrokerName": "eh1/c3-eventhub-testing/azv8qa-rsgp-c3-01",
    "cloudMessageBrokerType": "AzureEventHub",
    "batchSize": 10,
    "timeoutMillis": 5000
}

You can use the console to further configure CloudMessageSourceCollection using the SourceCollection.Config mixin. The following example specifies the content type, preferences for archiving records and the SourceCollectionProcessMode.

JavaScript
var src = CloudMessageSourceCollection.forName("WeatherDataMeasurementSource");
src.config()
.withContentTypeOverride("application/json")
.withDoNotChunk(true)
.withDoNotArchiveStatus(false)
.withDoNotArchiveSources(true)
.withDoNotArchiveFailedSources(false)
.withProcessMode("ON_ARRIVAL")
.setConfig()

Enable the CloudMessageSourceCollection to receive messages

The following commands tell the platform to subscribe to the Event Hub and process messages as they arrive into the broker in real time:

JavaScript
src.register()
src.startOnArrivalMode()

To verify that messages are being received, you can create an instance of the AzureEventHub Type using your resource name and call the receiveMessages() method. Be sure to include the Partition of the Event Hub that you want to read messages from. For more information, see the Partition section of the Features and terminology in Azure Event Hubs topic in the Azure Event Hubs documentation.

JavaScript
broker = AzureEventHub.fromResourceName(ehResourceName);
messages = broker.receiveMessages({partition: 0})

Messages are stored as a JavaScript ArrayBuffer, which can be read as objects by running the following command:

JavaScript
Sources.fromCloudMessages(messages, src).readSources().collect();

Output

To model an Event Hub consumer and configure downstream processing, you can create a CloudInboundMessage Type. Mixing in CloudInboundMessage indicates that the Type is an Event Hub consumer.

As messages are consumed from the eh1 topic, they are dispatched to the SourceOrdersInboundQueue.process() method for processing.

Configuring the C3 Agentic AI Platform to publish to an Event Hub requires the following 4 steps:

Type
type SourceWeatherDataMeasurementInboundQueue mixes CloudInboundMessage<SourceWeatherDataMeasurement> {
  
  process: ~ js-server
  
}

Implement the process method in SourceWeatherDataMeasurementInboundQueue.js, which persists data to the WeatherDataMeasurement Type.

JavaScript
function process(messages) {

    var array = WeatherDataMeasurement.array();
    var meas;
  
    messages.forEach(function (msg) {
  
      meas = WeatherDataMeasurement.make({
        country: msg.Country,
        city: msg.City,
        date: msg.Date,
        temperature: msg.Temperature,
        humidity: msg.Humidity,
        windSpeed: msg.WindSpeed,
        windDirection: msg.WindDirection,
        precipitation: msg.Precipitation,
        cloudCover: msg.CloudCover,
        visibility: msg.Visibility,
        pressure: msg.Pressure,
        dewPoint: msg.DewPoint,
        uvIndex: msg.UvIndex,
        sunrise: msg.Sunrise,
        moonrise: msg.Moonrise,
        moonset: msg.Moonset,
        moonPhase: msg.MoonPhase,
        conditions: msg.Conditions,
        icon: msg.Icon
      });
  
      array.push(meas);
  
    })
  
    try {
      WeatherDataMeasurement.createBatch(array);
    } catch (e) {
      log.info(e.message);
    }
  }

Next, associate the SourceWeatherDataMeasurementInboundQueue Type with the eh1 Event Hub.

JavaScript
SourceWeatherDataMeasurementInboundQueue.setBrokerName(ehResourceName, "AzureEventHub");
SourceWeatherDataMeasurementInboundQueue.setBatchSize(10);
SourceWeatherDataMeasurementInboundQueue.setTimeoutMillis(5000);

CloudMessageDispatcherConfig.clearCache();
SourceWeatherDataMeasurementInboundQueue.register();

You should now see data populate in the WeatherDataMeasurement Type.

Event Hub weather measurement data

Configure an Azure Event Hubs Producer

The steps to configure the C3 Agentic AI Platform to publish to an Event Hub are nearly identical to those for consuming messages from an Event Hub.

You can configure a C3 AI CloudOutboundMessage Type to explicitly model an Event Hub producer. Additionally, you can publish messages with the same structure as the ones consumed from the topic. This lets you reuse the SourceWeatherDataMeasurement Type in the CloudOutboundMessage parametric Type.

Mixing in CloudOutboundMessage indicates that the Type is an Event Hub producer.

Type
type SourceWeatherDataMeasurementOutboundQueue mixes CloudOutboundMessage<SourceWeatherDataMeasurement>

Next, associate the SourceWeatherDataMeasurementOutboundQueue Type with the eh1 Event Hub.

JavaScript
SourceWeatherDataMeasurementOutboundQueue.setBrokerName("eh1/c3-eventhub-testing/azv8qa-rsgp-c3-01", "AzureEventHub");
SourceWeatherDataMeasurementOutboundQueue.setBatchSize(10);

Lastly, publish messages to the eh1 Event Hub by invoking the SourceWeatherDataMeasurementOutboundQueue.sendBatch() method. The following code generates sample data and publishes it to the eh1 Event Hub.

JavaScript
function createMeasurements(city, country, date) {
  var cAry = C3.ArrayBuilder.fromJson({type: 'ArrayBuilder<SourceWeatherDataMeasurement>'});
  var partition = 1;
  var meas;
  var t = DateTime.fromString(date);

  for (var i=0; i < 10; i++) {
    t = t.plusDays(1);

    meas = SourceWeatherDataMeasurement.make({
        Country: country,
        City: city,
        Date: t,
        Temperature: randomBetween(18, 28),
        Humidity: randomBetween(20, 90),
        WindSpeed: randomBetween(2, 20),
        WindDirection: "SW",
        Precipitation: randomBetween(0, 15),
        CloudCover: randomBetween(0, 10),
        Visibility: randomBetween(25, 50),
        Pressure: randomBetween(900, 1200),
        DewPoint: randomBetween(0, 40),
        UvIndex: randomBetween(0, 10),
        Sunrise: "05:25",
        Moonrise: "21:00",
        Moonset: "09:00",
        MoonPhase: "Waning Gibbous",
        Conditions: "Sunny",
        Icon: "clear-day"
      });

    cAry.push(meas);   
  }

  SourceWeatherDataMeasurementOutboundQueue.sendBatch(cAry.build(), partition);
}

function randomBetween(min,max) {
    return Math.floor(Math.random()*(max-min+1)+min);
}

createMeasurements("Los Angeles", "USA", "2023-06-01");

You can verify that the messages were published to the Event Hub.

Event Hub send messages

See also

Was this page helpful?