C3 AI Documentation Home

Implement Change Data Capture for SQL Source Collection

To capture data changes in a database and allow downstream systems to stay synchronized with source data, implement Change Data Capture (CDC) for source systems in the C3 Agentic AI Platform.

The platform supports monotonically increasing field-based CDC, which detects changes by periodically monitoring monotonically increasing fields (For example, timestamp and id fields like lastupdated, id) that get updated whenever a record is upserted. This approach requires a source table to have reliable columns and use a cron job to capture the data changes at periodic interval.

Monotonically increasing field-based CDC provides a reliable and efficient method for capturing data changes in SQL systems. You can build robust data synchronization pipelines that ensure data consistency across systems while minimizing performance impact on source databases. CDC configuration in the platform can have the following use cases:

  • Real-time analytics: Sync operational data to analytics systems
  • Data warehousing: Load data incrementally to a data warehouse
  • System integration: Keep multiple systems synchronized
  • Backup and archival: Back up critical data incrementally
  • Event streaming: Feed change events to downstream systems

The CDC system in the platform completes the following processes:

  1. First processing cycle
    • Reads all records ordered by cdc order, such as lastupdated, id
    • Creates checkpoint with latest processed cdc order
    • Transforms and loads data into target system
  2. Change detection
    • In the subsequent processing cycles of the cron job, the system reads the last checkpoint and fetches, stages, and processes rows with column values greater than the checkpoint values.

Implement Change Data Capture (CDC)

Complete these steps to configure CDC in the platform:

  1. Configure the SQL source system
  2. Create a source entity Type
  3. Configure CDC SQL source collection
  4. Create a target entity Type
  5. Define data transformation between source entity and target entity
  6. Configure the file source collection where the CDC changes will be staged
  7. Enable staging mode
  8. Configure processing schedule
  9. Start CDC processing

The following sections demonstrate an example of how to configure monotonically increasing field-based CDC. Use the example as guidance on how to implement CDC to suit your needs.

Configure the SQL source system

Run the following code to configure the SQL source system and set database credentials:

JavaScript
sqlSourceSystem = SqlSourceSystem.make("cdcSys").upsert()
# Set database credentials
creds = JdbcCredentials.fromServerEndpoint("<serverEndpoint>", <port>, DatastoreType.<datastore>, "<database>", "<schemaName>", "<username>", "<password>")
sqlSourceSystem.setCredentials(creds);

Create a source entity Type

Create a source entity Type to represent the structure of the source database table:

Type
entity type CdcDemoSource mixes External, NoSystemCols, Source schema name 'CDC_DEMO_TABLE' {
    id: string schema name 'ID'
    name_: string schema name 'NAME'
    title: string schema name 'TITLE'
    lastupdated: datetime no tz schema name 'LAST_UPDATED'
}

Store this Type in your application package. This source entity Type must have the following properties:

  • The Type mixes the External, NoSystemCols, and Source Types. Here is why:
    • External: Indicates that the data associated with this entity is not stored in C3 AI managed data store. Any fetch calls on this entity reads the data from the external store linked to this entity.
    • NoSystemCols: Skips adding C3 specific metadata columns to queries when performing fetch calls from this entity.
    • Source: Indicates that this entity also represents the schema of the external datastore. This is used when defining transforms from this source to a given target.
  • schema name maps C3 AI fields to database columns
  • The Type includes a field using which CDC should be performed. Example, the timestamp field lastupdated.

Configure CDC SQL source collection

Run the following code to define the CDC order and SQL source collection:

JavaScript
// Define CDC ordering fields
cdcOrder = "lastupdated, id"
// Create SQL source collection with CDC configuration
sqlSrcColl = SqlSourceCollection.builder()
                          .name("CdcDemoSource")
                          .sourceSystem(sqlSourceSystem)
                          .source(C3.type("CdcDemoSource"))
                          .cdcOrder(cdcOrder)  // Critical for CDC functionality
                          .build()
                          .upsert()

cdcOrder = "lastupdated, id" ensures deterministic processing order and defines the following:

  • lastupdated indicates the primary ordering
  • id indicates secondary ordering for records with the same timestamp

This example uses timestamp and incremental identifier fields as order indicators. Refer to the Best practices and requirements section at the end of this document to learn more about CDC order configuration.

Create a target entity type

Create a target entity Type to represent the processed and transformed data structure:

Type
entity type CdcDemoTarget {

    name: string

    title: string 

    lastupdated: datetime no tz

}

Define data transformation between source entity and target entity

Create the file CdcDemoSource-CdcDemoTarget.json in your application package to define how the CDC process transforms source data into the target format:

JSON
data = {
  name: "CdcDemoSource-CdcDemoTarget",
  source: "CdcDemoSource",
  target: "CdcDemoTarget",
  projection: {
    id: id,
    name: name_,
    title: title,
    power: power,
    lastupdated: lastupdated
  }
};

This example represents a simple one-to-one transform for demonstration purposes. Define transformations as suitable for your own use case.

Configure the file source collection where the CDC changes will be staged

File staging consists of a FileSourceSystem and a FileSourceCollection that determines the format of captured data and where it is stored.

Run the following code to configure file staging:

JavaScript
// Create file source system for staging

fss = FileSourceSystem.make("cdcFss").upsert()

// Create staging file source collection

fsc = FileSourceCollection.builder(sqlSrcColl)
                          .sourceSystem(fss)
                          .name("stagingFileSourceCollection-" + sqlSrcColl.name)
                          .build()
                          .upsert()

// Configure staging settings

SourceCollection.Config.builder(fsc.config())
                       .contentTypeOverride(ContentType.parquet().mediaType)
                       .doNotArchiveSources(true)
                       .build()
                       .setConfig()

Note the following about this code:

  • doNotArchiveSources = true keeps the staging files for captured data, so you can reuse the files for other transformations and data integration purposes.
  • contentTypeOverride(ContentType.parquet().mediaType) sets the format to parquet for easier data processing that uses Spark or Delta Lake.

Enable staging mode

Run the following code to update the SQL source collection:

JavaScript
sqlSrcColl.config()
          .withStage(true)
          .withStagingMode(SourceCollection.StagingKind.FILE)
          .withStageFileSourceCollectionName(fsc.name)
          .setConfig()

This code enables staging and specifies the staging mode and the file source collection.

Configure processing schedule

The processing schedule is the frequency at which you want to capture data changes using cron expressions.

Run the following code to configure the processing schedule:

JavaScript
sqlSrcColl.config().setConfigValue("processSchedule", "0/30 * * ? * * *")

In this example, the cron expression is set to a 30 second frequency. The default value is every day at 12:00 am.

Start CDC processing

Run the following code to start the scheduled processing:

JavaScript
sqlSrcColl.startOnScheduleMode()

Fetch the CDC check point

You can fetch the current check point for your source collection.

Run the following code:

JavaScript
SourceCollection.Cdc.Checkpoint.make()
                               .withSourceCollectionName(<name>)
                               .safeGet();

Best practices and requirements for configuring CDC

Follow these best practices and requirements for configuring CDC:

  • Monotonically increasing fields requirements:

    • Ensure the set of fields contain unique values and monotonically increase globally across the table when data is created or updated.
    • Typically, provide one field that globally and monotonically increases on data update, and one field that globally and monotonically increases on data creation. Fields like lastupdated that globally and monotonically increase both on data creation and update. Because there could be duplicates, the example in this topic also uses the id field for uniqueness.
    • Index the fields for better query performance
  • CDC order configuration:

    • Use a set of fields that monotonically increase globally as per the above requirement. Example: cdcOrder = "lastupdated, id".
  • Processing schedule configuration:

    • Set a frequency that balances data freshness and system load
    • Consider reducing frequency during peak database usage hours
    • Monitor processing duration and compare to schedule frequency
Was this page helpful?