Implement Change Data Capture for SQL Source Collection
To capture data changes in a database and allow downstream systems to stay synchronized with source data, implement Change Data Capture (CDC) for source systems in the C3 Agentic AI Platform.
The platform supports monotonically increasing field-based CDC, which detects changes by periodically monitoring monotonically increasing fields (For example, timestamp and id fields like lastupdated, id) that get updated whenever a record is upserted. This approach requires a source table to have reliable columns and use a cron job to capture the data changes at periodic interval.
Monotonically increasing field-based CDC provides a reliable and efficient method for capturing data changes in SQL systems. You can build robust data synchronization pipelines that ensure data consistency across systems while minimizing performance impact on source databases. CDC configuration in the platform can have the following use cases:
- Real-time analytics: Sync operational data to analytics systems
- Data warehousing: Load data incrementally to a data warehouse
- System integration: Keep multiple systems synchronized
- Backup and archival: Back up critical data incrementally
- Event streaming: Feed change events to downstream systems
The CDC system in the platform completes the following processes:
- First processing cycle
- Reads all records ordered by cdc order, such as
lastupdated, id - Creates checkpoint with latest processed cdc order
- Transforms and loads data into target system
- Reads all records ordered by cdc order, such as
- Change detection
- In the subsequent processing cycles of the cron job, the system reads the last checkpoint and fetches, stages, and processes rows with column values greater than the checkpoint values.
Implement Change Data Capture (CDC)
Complete these steps to configure CDC in the platform:
- Configure the SQL source system
- Create a source entity Type
- Configure CDC SQL source collection
- Create a target entity Type
- Define data transformation between source entity and target entity
- Configure the file source collection where the CDC changes will be staged
- Enable staging mode
- Configure processing schedule
- Start CDC processing
The following sections demonstrate an example of how to configure monotonically increasing field-based CDC. Use the example as guidance on how to implement CDC to suit your needs.
Configure the SQL source system
Run the following code to configure the SQL source system and set database credentials:
sqlSourceSystem = SqlSourceSystem.make("cdcSys").upsert()
# Set database credentials
creds = JdbcCredentials.fromServerEndpoint("<serverEndpoint>", <port>, DatastoreType.<datastore>, "<database>", "<schemaName>", "<username>", "<password>")
sqlSourceSystem.setCredentials(creds);Create a source entity Type
Create a source entity Type to represent the structure of the source database table:
entity type CdcDemoSource mixes External, NoSystemCols, Source schema name 'CDC_DEMO_TABLE' {
id: string schema name 'ID'
name_: string schema name 'NAME'
title: string schema name 'TITLE'
lastupdated: datetime no tz schema name 'LAST_UPDATED'
}Store this Type in your application package. This source entity Type must have the following properties:
- The Type mixes the
External,NoSystemCols, andSourceTypes. Here is why:External: Indicates that the data associated with this entity is not stored in C3 AI managed data store. Anyfetchcalls on this entity reads the data from the external store linked to this entity.NoSystemCols: Skips adding C3 specific metadata columns to queries when performing fetch calls from this entity.Source: Indicates that this entity also represents the schema of the external datastore. This is used when defining transforms from this source to a given target.
schema namemaps C3 AI fields to database columns- The Type includes a field using which CDC should be performed. Example, the timestamp field
lastupdated.
Configure CDC SQL source collection
Run the following code to define the CDC order and SQL source collection:
// Define CDC ordering fields
cdcOrder = "lastupdated, id"
// Create SQL source collection with CDC configuration
sqlSrcColl = SqlSourceCollection.builder()
.name("CdcDemoSource")
.sourceSystem(sqlSourceSystem)
.source(C3.type("CdcDemoSource"))
.cdcOrder(cdcOrder) // Critical for CDC functionality
.build()
.upsert()cdcOrder = "lastupdated, id" ensures deterministic processing order and defines the following:
lastupdatedindicates the primary orderingidindicates secondary ordering for records with the same timestamp
This example uses timestamp and incremental identifier fields as order indicators. Refer to the Best practices and requirements section at the end of this document to learn more about CDC order configuration.
Create a target entity type
Create a target entity Type to represent the processed and transformed data structure:
entity type CdcDemoTarget {
name: string
title: string
lastupdated: datetime no tz
}Define data transformation between source entity and target entity
Create the file CdcDemoSource-CdcDemoTarget.json in your application package to define how the CDC process transforms source data into the target format:
data = {
name: "CdcDemoSource-CdcDemoTarget",
source: "CdcDemoSource",
target: "CdcDemoTarget",
projection: {
id: id,
name: name_,
title: title,
power: power,
lastupdated: lastupdated
}
};This example represents a simple one-to-one transform for demonstration purposes. Define transformations as suitable for your own use case.
Configure the file source collection where the CDC changes will be staged
File staging consists of a FileSourceSystem and a FileSourceCollection that determines the format of captured data and where it is stored.
Run the following code to configure file staging:
// Create file source system for staging
fss = FileSourceSystem.make("cdcFss").upsert()
// Create staging file source collection
fsc = FileSourceCollection.builder(sqlSrcColl)
.sourceSystem(fss)
.name("stagingFileSourceCollection-" + sqlSrcColl.name)
.build()
.upsert()
// Configure staging settings
SourceCollection.Config.builder(fsc.config())
.contentTypeOverride(ContentType.parquet().mediaType)
.doNotArchiveSources(true)
.build()
.setConfig()
Note the following about this code:
doNotArchiveSources = truekeeps the staging files for captured data, so you can reuse the files for other transformations and data integration purposes.contentTypeOverride(ContentType.parquet().mediaType)sets the format to parquet for easier data processing that uses Spark or Delta Lake.
Enable staging mode
Run the following code to update the SQL source collection:
sqlSrcColl.config()
.withStage(true)
.withStagingMode(SourceCollection.StagingKind.FILE)
.withStageFileSourceCollectionName(fsc.name)
.setConfig()This code enables staging and specifies the staging mode and the file source collection.
Configure processing schedule
The processing schedule is the frequency at which you want to capture data changes using cron expressions.
Run the following code to configure the processing schedule:
sqlSrcColl.config().setConfigValue("processSchedule", "0/30 * * ? * * *")In this example, the cron expression is set to a 30 second frequency. The default value is every day at 12:00 am.
Start CDC processing
Run the following code to start the scheduled processing:
sqlSrcColl.startOnScheduleMode()Fetch the CDC check point
You can fetch the current check point for your source collection.
Run the following code:
SourceCollection.Cdc.Checkpoint.make()
.withSourceCollectionName(<name>)
.safeGet();Best practices and requirements for configuring CDC
Follow these best practices and requirements for configuring CDC:
Monotonically increasing fields requirements:
- Ensure the set of fields contain unique values and monotonically increase globally across the table when data is created or updated.
- Typically, provide one field that globally and monotonically increases on data update, and one field that globally and monotonically increases on data creation. Fields like
lastupdatedthat globally and monotonically increase both on data creation and update. Because there could be duplicates, the example in this topic also uses theidfield for uniqueness. - Index the fields for better query performance
CDC order configuration:
- Use a set of fields that monotonically increase globally as per the above requirement. Example:
cdcOrder = "lastupdated, id".
- Use a set of fields that monotonically increase globally as per the above requirement. Example:
Processing schedule configuration:
- Set a frequency that balances data freshness and system load
- Consider reducing frequency during peak database usage hours
- Monitor processing duration and compare to schedule frequency