Feature Materialization and Snapshots
This tutorial shows examples of materializing features and creating snapshots. To demonstrate these capabilities, we use the Features and the Feature.Set that are seeded as a part of the windTurbine package. To get more familiar with the Features created in this package, see Create Features Using C3 Metrics or Metric Expressions.
Note that these examples work for both Lambda Feature Sets (created from Python functions) and Composite Feature Sets (created from C3 Metrics).
Setup
Let's review the available Features and Feature.Sets in our application.
c3.Feature.eval(projection='id,name,description')| subject | name | description |
|---|---|---|
WindTurbine#activePowerDiffFeature | activePowerDiffFeature | Metric-backed feature, seeded in application. |
WindTurbine#gearOilTemperatureDiffFeature | gearOilTemperatureDiffFeature | Metric-backed feature, seeded in application. ... |
WindTurbine#generatorRotationSpeedDiffFeature | generatorRotationSpeedDiffFeature | Metric-backed feature, seeded in application. ... |
WindTurbine#willFailNextDayFeature | willFailNextDayFeature | A seeded feature that can be used as a label. ... |
We can retrieve the definition of individual features.
f1 = c3.Feature(id='WindTurbine#activePowerDiffFeature').get()
print('name:', f1.name)
print('id:', f1.id)
print('underlying metric:', f1.legacy)name: activePowerDiffFeature
id: WindTurbine#activePowerDiffFeature
underlying metric: {
"type" : "LegacyMetric",
"metric" : {
"type" : "SimpleMetric",
"name" : "ActivePowerDiff",
"expression" : "window(\"MAX\", rollingDiff(avg(normalized.data.activePower)), -23, 24, 1)",
"id" : "WindTurbine_ActivePowerDiff",
"srcType" : "WindTurbine",
"path" : "turbineMeasurements"
},
"interval" : "HOUR"
}
c3.Feature.Set.eval(projection='id,name,description')| subject | name | description |
|---|---|---|
WindTurbine#labelFeatureSet | labelFeatureSet | Feature is True if a failure should be predict... |
WindTurbine#metricsFeatureSet2 | metricsFeatureSet2 | These are metrics-backed features, created via... |
WindTurbine#sensorVolitalityLambda | sensorVolitalityLambda | None |
WindTurbine#turbinePropertyLambda | turbinePropertyLambda | None |
WindTurbine#windTurbineFeatures_ml | windTurbineFeatures_ml | This is a lambda feature set for the ML Pipeli... |
The feature set metricsFeatureSet2 has three features computed from the WindTurbineMeasurement entity, related to computing rolling differences.
fs = c3.Feature.Set(id='WindTurbine#metricsFeatureSet2').get()
print("name:", fs.name)
print('description:', fs.description)
print('interval:', fs.interval)
print('features:', fs.features)name: metricsFeatureSet2
description: These are metrics-backed features, created via seed.
interval: HOUR
features: [activePowerDiffFeature, generatorRotationSpeedDiffFeature, gearOilTemperatureDiffFeature]
Review the loaded data
We first take a look at the three entities we will be using in this demo -- WindTurbine, WindTurbineMeasurementSeries, and WindTurbineMeasurement.
c3.WindTurbine.eval()| ID | name | location | coordinates | power | manufacturer | unplannedMaintenanceEvents | plannedMaintenanceEvents | rebootEvents |
|---|---|---|---|---|---|---|---|---|
| demo_TURBINE-1 | TURBINE-1 | York | York | 150 | demo_Siemens | 0 | 0 | 0 |
| demo_TURBINE-2 | TURBINE-2 | York | York | 50 | demo_Siemens | 0 | 0 | 0 |
c3.WindTurbineMeasurementSeries.eval()| ID | windTurbine | treatment |
|---|---|---|
| demo_TURBINE-1_series | demo_TURBINE-1 | rate |
| demo_TURBINE-2_series | demo_TURBINE-2 | rate |
c3.WindTurbineMeasurement.eval().head()| start | parent | ID | gearOilTemperature | generatorRotationSpeed | activePower |
|---|---|---|---|---|---|
| 2022-01-01 00:30:05 | demo_TURBINE-2_series | demo_TURBINE-2_series#13 | 81.9 | 3138 | 5556 |
| 2022-01-01 04:28:45 | demo_TURBINE-2_series | demo_TURBINE-2_series#8 | 59.4 | 3266 | 4673 |
| 2022-01-01 04:50:20 | demo_TURBINE-2_series | demo_TURBINE-2_series#12 | 67.6 | 2384 | 4727 |
| 2022-01-01 07:43:51 | demo_TURBINE-2_series | demo_TURBINE-2_series#1 | 59.6 | 2489 | 5310 |
| 2022-01-01 07:45:32 | demo_TURBINE-2_series | demo_TURBINE-2_series#14 | 66 | 2557 | 5129 |
Materialization
Materialization computes feature values from the underlying raw data and saves the computed values in the feature store. The feature store can also compute feature values and return them directly. This can be run on feature sets or individual featues. Feature set materialization is most commonly used, as feature sets become the inputs to MlModels.
Here, we will focus on feature set materialization, but briefly cover feature materialization.
Before materialization
We can run WindTurbine.evalFeatureSetBatch() to read feature data. By default, the feature store looks for materialized data. Thus, running evalFeatureSetBatch() before materializing with return an empty Data Frame.
c3.WindTurbine.evalFeatureSetBatch(filter=True, featureSet=fs)| subject | timestamp | activePowerDiffFeature | generatorRotationSpeedDiffFeature | gearOilTemperatureDiffFeature |
|---|
If we want to dynamically compute our features, avoiding materialization, we can set the skipMaterialized parameter as follows.
c3.WindTurbine.evalFeatureSetBatch(filter=True, featureSet=fs, skipMaterialized=True).head()| subject | timestamp | activePowerDiffFeature | generatorRotationSpeedDiffFeature | gearOilTemperatureDiffFeature |
|---|---|---|---|---|
| demo_TURBINE-1 | 2019-04-20 01:00:00 | 0 | 0 | 0 |
| demo_TURBINE-1 | 2019-04-20 02:00:00 | 0 | 0 | 0 |
| demo_TURBINE-1 | 2019-04-20 03:00:00 | 0 | 0 | 0 |
| demo_TURBINE-1 | 2019-04-20 04:00:00 | 0 | 0 | 0 |
| demo_TURBINE-1 | 2019-04-20 05:00:00 | 0 | 0 | 0 |
Full Materialization
Now, we will materialize our feature set, using Feature.Set#materialize. We will first use full materialization, where no time range is specified. This will overwrite any existing data and materialize features for the entire time range of data. This time range is limited by the time horizon or the materialization time range, which we will discuss later.
Materialization is run as an asynchronous batch job on the server. If you specify sync=True on the materialize call, it will start the job and wait for it to complete. Otherwise, it will return immediately. Either way, the materialize call returns an instance of Feature.Store.MaterializationJob.
job = fs.materialize(sync=True)
job{
"type" : "Feature.Store.MaterializationJob<WindTurbine>",
"typeIdent" : "MAPRJ:FMRJ",
"id" : "3cb206bd-4ae8-4cd9-bf6f-33a38f9f6409",
"meta" : {
"created" : "2024-04-20T01:43:24Z",
"updated" : "2024-04-20T01:43:24Z",
"timestamp" : "2024-04-20T01:43:24Z"
},
"version" : 1
}Now, if we run evalFeatureSetBatch without skipMaterialized=True, we see data:
c3.WindTurbine.evalFeatureSetBatch(filter="name=='TURBINE-1'", featureSet=fs,
start='2022-01-01', end='2022-01-05')| subject | timestamp | activePowerDiffFeature | generatorRotationSpeedDiffFeature | gearOilTemperatureDiffFeature |
|---|---|---|---|---|
| demo_TURBINE-1 | 2022-01-01 00:00:00 | 5018.0 | 1758.0 | 36.4 |
| demo_TURBINE-1 | 2022-01-01 01:00:00 | 5018.0 | 1758.0 | 36.4 |
| demo_TURBINE-1 | 2022-01-01 02:00:00 | 5018.0 | 1758.0 | 36.4 |
| demo_TURBINE-1 | 2022-01-01 03:00:00 | 5018.0 | 1758.0 | 36.4 |
| demo_TURBINE-1 | 2022-01-01 04:00:00 | 5018.0 | 1758.0 | 36.4 |
| ... | ... | ... | ... | ... |
| demo_TURBINE-1 | 2022-01-04 19:00:00 | 1246.0 | 520.0 | 16.0 |
| demo_TURBINE-1 | 2022-01-04 20:00:00 | 1246.0 | 520.0 | 16.0 |
| demo_TURBINE-1 | 2022-01-04 21:00:00 | 1246.0 | 520.0 | 16.0 |
| demo_TURBINE-1 | 2022-01-04 22:00:00 | 1246.0 | 520.0 | 16.0 |
| demo_TURBINE-1 | 2022-01-04 23:00:00 | 1246.0 | 520.0 | 16.0 |
Incremental Materialization
Now, we use incremental materialization, where we materialize the feature set for a specific time range. This does not overwrite existing data for other subject and time ranges. Thus, incremental materialization can be used to update the feature store with newly arrived data.
# we will first delete the materialized data from the previous section, so that we can see the incremental changes.
fs.deleteMaterializedData(confirm=True).waitForCompletion(){
"type" : "MapReduceStatus",
"started" : "2024-04-20T01:43:28Z",
"startedby" : "Jeff.Fischer@c3.ai",
"completed" : "2024-04-20T01:43:28Z",
"status" : "completed"
}We will first materialize data for January 1, 2022.
fs.materialize(sync=True, start='2022-01-01', end='2022-01-02'){
"type" : "Feature.Store.MaterializationJob<WindTurbine>",
"typeIdent" : "MAPRJ:FMRJ",
"id" : "eda6d550-5d65-454c-ab62-8f8fe9d98c8c",
"meta" : {
"created" : "2024-04-20T01:43:29Z",
"updated" : "2024-04-20T01:43:29Z",
"timestamp" : "2024-04-20T01:43:29Z"
},
"version" : 1
}When we evaluate, we only see data for that day.
c3.WindTurbine.evalFeatureSetBatch(filter="name=='TURBINE-1'", featureSet=fs,
start='2022-01-01', end='2022-01-05')| subject | timestamp | activePowerDiffFeature | generatorRotationSpeedDiffFeature | gearOilTemperatureDiffFeature |
|---|---|---|---|---|
| demo_TURBINE-1 | 2022-01-01 00:00:00 | 5018 | 1758 | 36.4 |
| demo_TURBINE-1 | 2022-01-01 01:00:00 | 5018 | 1758 | 36.4 |
| demo_TURBINE-1 | 2022-01-01 02:00:00 | 5018 | 1758 | 36.4 |
| demo_TURBINE-1 | 2022-01-01 03:00:00 | 5018 | 1758 | 36.4 |
| demo_TURBINE-1 | 2022-01-01 04:00:00 | 5018 | 1758 | 36.4 |
| demo_TURBINE-1 | 2022-01-01 05:00:00 | 5018 | 1758 | 36.4 |
| demo_TURBINE-1 | 2022-01-01 06:00:00 | 5018 | 1758 | 36.4 |
| demo_TURBINE-1 | 2022-01-01 07:00:00 | 5018 | 1758 | 36.4 |
| demo_TURBINE-1 | 2022-01-01 08:00:00 | 5018 | 1758 | 36.4 |
| demo_TURBINE-1 | 2022-01-01 09:00:00 | 5018 | 1758 | 36.4 |
| demo_TURBINE-1 | 2022-01-01 10:00:00 | 5018 | 1758 | 36.4 |
| demo_TURBINE-1 | 2022-01-01 11:00:00 | 5018 | 1758 | 36.4 |
| demo_TURBINE-1 | 2022-01-01 12:00:00 | 5018 | 1758 | 36.4 |
| demo_TURBINE-1 | 2022-01-01 13:00:00 | 5018 | 1758 | 36.4 |
| demo_TURBINE-1 | 2022-01-01 14:00:00 | 5018 | 1758 | 36.4 |
| demo_TURBINE-1 | 2022-01-01 15:00:00 | 5018 | 1758 | 36.4 |
| demo_TURBINE-1 | 2022-01-01 16:00:00 | 5018 | 1758 | 36.4 |
| demo_TURBINE-1 | 2022-01-01 17:00:00 | 5018 | 1758 | 36.4 |
| demo_TURBINE-1 | 2022-01-01 18:00:00 | 5018 | 1758 | 36.4 |
| demo_TURBINE-1 | 2022-01-01 19:00:00 | 5018 | 1758 | 36.4 |
| demo_TURBINE-1 | 2022-01-01 20:00:00 | 5018 | 1758 | 36.4 |
| demo_TURBINE-1 | 2022-01-01 21:00:00 | 5018 | 1758 | 36.4 |
| demo_TURBINE-1 | 2022-01-01 22:00:00 | 5018 | 1758 | 36.4 |
| demo_TURBINE-1 | 2022-01-01 23:00:00 | 5018 | 1758 | 36.4 |
Now, let's materialize a second day's worth of data.
fs.materialize(sync=True, start='2022-01-02', end='2022-01-03'){
"type" : "Feature.Store.MaterializationJob<WindTurbine>",
"typeIdent" : "MAPRJ:FMRJ",
"id" : "e89a8637-aa5a-4940-a13f-51db6f2a9f4e",
"meta" : {
"created" : "2024-04-20T01:43:30Z",
"updated" : "2024-04-20T01:43:30Z",
"timestamp" : "2024-04-20T01:43:30Z"
},
"version" : 1
}When evaluating, we now see both days of data.
c3.WindTurbine.evalFeatureSetBatch(filter="name=='TURBINE-1'", featureSet=fs,
start='2022-01-01', end='2022-01-05')| subject | timestamp | activePowerDiffFeature | generatorRotationSpeedDiffFeature | gearOilTemperatureDiffFeature |
|---|---|---|---|---|
| demo_TURBINE-1 | 2022-01-01 00:00:00 | 5018 | 1758 | 36.4 |
| demo_TURBINE-1 | 2022-01-01 01:00:00 | 5018 | 1758 | 36.4 |
| demo_TURBINE-1 | 2022-01-01 02:00:00 | 5018 | 1758 | 36.4 |
| demo_TURBINE-1 | 2022-01-01 03:00:00 | 5018 | 1758 | 36.4 |
| demo_TURBINE-1 | 2022-01-01 04:00:00 | 5018 | 1758 | 36.4 |
| demo_TURBINE-1 | 2022-01-01 05:00:00 | 5018 | 1758 | 36.4 |
| demo_TURBINE-1 | 2022-01-01 06:00:00 | 5018 | 1758 | 36.4 |
| demo_TURBINE-1 | 2022-01-01 07:00:00 | 5018 | 1758 | 36.4 |
| demo_TURBINE-1 | 2022-01-01 08:00:00 | 5018 | 1758 | 36.4 |
| demo_TURBINE-1 | 2022-01-01 09:00:00 | 5018 | 1758 | 36.4 |
| demo_TURBINE-1 | 2022-01-01 10:00:00 | 5018 | 1758 | 36.4 |
| demo_TURBINE-1 | 2022-01-01 11:00:00 | 5018 | 1758 | 36.4 |
| demo_TURBINE-1 | 2022-01-01 12:00:00 | 5018 | 1758 | 36.4 |
| demo_TURBINE-1 | 2022-01-01 13:00:00 | 5018 | 1758 | 36.4 |
| demo_TURBINE-1 | 2022-01-01 14:00:00 | 5018 | 1758 | 36.4 |
| demo_TURBINE-1 | 2022-01-01 15:00:00 | 5018 | 1758 | 36.4 |
| demo_TURBINE-1 | 2022-01-01 16:00:00 | 5018 | 1758 | 36.4 |
| demo_TURBINE-1 | 2022-01-01 17:00:00 | 5018 | 1758 | 36.4 |
| demo_TURBINE-1 | 2022-01-01 18:00:00 | 5018 | 1758 | 36.4 |
| demo_TURBINE-1 | 2022-01-01 19:00:00 | 5018 | 1758 | 36.4 |
| demo_TURBINE-1 | 2022-01-01 20:00:00 | 5018 | 1758 | 36.4 |
| demo_TURBINE-1 | 2022-01-01 21:00:00 | 5018 | 1758 | 36.4 |
| demo_TURBINE-1 | 2022-01-01 22:00:00 | 5018 | 1758 | 36.4 |
| demo_TURBINE-1 | 2022-01-01 23:00:00 | 5018 | 1758 | 36.4 |
| demo_TURBINE-1 | 2022-01-02 00:00:00 | 560 | 701 | 10.3333 |
| demo_TURBINE-1 | 2022-01-02 01:00:00 | 560 | 701 | 10.3333 |
| demo_TURBINE-1 | 2022-01-02 02:00:00 | 560 | 701 | 10.3333 |
| demo_TURBINE-1 | 2022-01-02 03:00:00 | 560 | 701 | 10.3333 |
| demo_TURBINE-1 | 2022-01-02 04:00:00 | 560 | 701 | 10.3333 |
| demo_TURBINE-1 | 2022-01-02 05:00:00 | 560 | 701 | 10.3333 |
| demo_TURBINE-1 | 2022-01-02 06:00:00 | 560 | 701 | 10.3333 |
| demo_TURBINE-1 | 2022-01-02 07:00:00 | 560 | 701 | 10.3333 |
| demo_TURBINE-1 | 2022-01-02 08:00:00 | 560 | 701 | 10.3333 |
| demo_TURBINE-1 | 2022-01-02 09:00:00 | 560 | 701 | 10.3333 |
| demo_TURBINE-1 | 2022-01-02 10:00:00 | 560 | 701 | 10.3333 |
| demo_TURBINE-1 | 2022-01-02 11:00:00 | 560 | 701 | 10.3333 |
| demo_TURBINE-1 | 2022-01-02 12:00:00 | 498.333 | 701 | 10.3333 |
| demo_TURBINE-1 | 2022-01-02 13:00:00 | 498.333 | 701 | 10.3333 |
| demo_TURBINE-1 | 2022-01-02 14:00:00 | 785.667 | 701 | 10.3333 |
| demo_TURBINE-1 | 2022-01-02 15:00:00 | 785.667 | 701 | 14.1 |
| demo_TURBINE-1 | 2022-01-02 16:00:00 | 785.667 | 701 | 14.1 |
| demo_TURBINE-1 | 2022-01-02 17:00:00 | 785.667 | 701 | 14.1 |
| demo_TURBINE-1 | 2022-01-02 18:00:00 | 785.667 | 701 | 14.1 |
| demo_TURBINE-1 | 2022-01-02 19:00:00 | 785.667 | 701 | 14.1 |
| demo_TURBINE-1 | 2022-01-02 20:00:00 | 785.667 | 550.667 | 14.1 |
| demo_TURBINE-1 | 2022-01-02 21:00:00 | 785.667 | 550.667 | 14.1 |
| demo_TURBINE-1 | 2022-01-02 22:00:00 | 785.667 | 550.667 | 14.1 |
| demo_TURBINE-1 | 2022-01-02 23:00:00 | 785.667 | 550.667 | 14.1 |
Materialization jobs
Let's look at some other way to run materialization.
Asynchronous materialize call
First, we can materialize asynchronously and wait for completion.
job = fs.materialize(start='2022-01-03', end='2022-01-04')
print(f"Started job: {job}...")
result = job.waitForCompletion()
print(f"Done!\nResult: {result}")Started job: 886245d7-491b-4d37-9bf9-e07120ceef5e...
Done!
Result: {
"type" : "MapReduceStatus",
"started" : "2024-04-20T01:43:32Z",
"startedby" : "Jeff.Fischer@c3.ai",
"completed" : "2024-04-20T01:43:32Z",
"status" : "completed"
}
Directly creating a materialization job
Rather than call materialize() on the Feature.Set, you can also directly create and run a MaterializationJob. There is a helper function on the feature set, makeMaterializationJob that returns a materialization job instance for your specific feature set. From there, you can set any additional parameters on the job, save it, and start it.
This gives you more flexibility in how the materialization is run on the C3 AI appliation. In particular, we can set:
- the batch size for our job, which will determine the number of subjects processed at once
- a filter over the subjects to be materialized
Best practices
When setting these, consider the following:
- The larger the batch size, the fewer the query that need to be made to obtain the data (at least when using srcBatch).
- You should look at the expected number of records per subject. An entire batch will be read in memory at once, so if you pick too large of a batch size, you will run out of memory.
- The filter will be used to generate a SQL query against the subject table (e.g. WindTurbine). The filter should not refer to individual subjects directly (e.g. via intersects() or or), unless you reference only a limited number of subjects (no more than a few hundred). In most cases, the filter query will fail if you pass in more than 32k subjects directly. Passing a large number of subject IDs into a query is also likely to be inefficient and slow.
# note that withFeatures() takes either a list of features or a feature set.
matjob = fs.makeMaterializationJob(sync=False)\
.withBatchSize(2) \
.withFilter("location=='York'")
matjob = matjob.upsert().get()matjob.start(){
"type" : "MapReduceStatus",
"started" : "2024-04-20T01:43:34Z",
"startedby" : "Jeff.Fischer@c3.ai",
"status" : "running",
"step" : "initial"
}matjob.waitForCompletion(){
"type" : "MapReduceStatus",
"started" : "2024-04-20T01:43:34Z",
"startedby" : "Jeff.Fischer@c3.ai",
"completed" : "2024-04-20T01:43:37Z",
"status" : "completed"
}Individual Feature Materialization
We can also materialize features individually. Note that materializing a feature set does not automatically materialize data for the individual feature. Likewise, materializing features individually does not automatically materialize features sets that use those features. This behavior is needed as features may belong to multiple feature sets, each of which could have different data lifecycles and time horizons. If we try to evalaute an individual feature from our feature set before materialization, we will not get any data:
c3.WindTurbine.evalFeaturesBatch(features=['activePowerDiffFeature'], filter="name=='TURBINE-1'",
start='2022-01-02', end='2022-01-03')Of course, we can specify skipMaterialized=True to dynamically compute the feature values.
c3.WindTurbine.evalFeaturesBatch(features=['activePowerDiffFeature'], filter="name=='TURBINE-1'",
start='2022-01-02', end='2022-01-03', skipMaterialized=True)| subject | timestamp | activePowerDiffFeature |
|---|---|---|
| demo_TURBINE-1 | 2022-01-02 00:00:00 | 560 |
| demo_TURBINE-1 | 2022-01-02 01:00:00 | 560 |
| demo_TURBINE-1 | 2022-01-02 02:00:00 | 560 |
| demo_TURBINE-1 | 2022-01-02 03:00:00 | 560 |
| demo_TURBINE-1 | 2022-01-02 04:00:00 | 560 |
| demo_TURBINE-1 | 2022-01-02 05:00:00 | 560 |
| demo_TURBINE-1 | 2022-01-02 06:00:00 | 560 |
| demo_TURBINE-1 | 2022-01-02 07:00:00 | 560 |
| demo_TURBINE-1 | 2022-01-02 08:00:00 | 560 |
| demo_TURBINE-1 | 2022-01-02 09:00:00 | 560 |
| demo_TURBINE-1 | 2022-01-02 10:00:00 | 560 |
| demo_TURBINE-1 | 2022-01-02 11:00:00 | 560 |
| demo_TURBINE-1 | 2022-01-02 12:00:00 | 498.333 |
| demo_TURBINE-1 | 2022-01-02 13:00:00 | 498.333 |
| demo_TURBINE-1 | 2022-01-02 14:00:00 | 785.667 |
| demo_TURBINE-1 | 2022-01-02 15:00:00 | 785.667 |
| demo_TURBINE-1 | 2022-01-02 16:00:00 | 785.667 |
| demo_TURBINE-1 | 2022-01-02 17:00:00 | 785.667 |
| demo_TURBINE-1 | 2022-01-02 18:00:00 | 785.667 |
| demo_TURBINE-1 | 2022-01-02 19:00:00 | 785.667 |
| demo_TURBINE-1 | 2022-01-02 20:00:00 | 785.667 |
| demo_TURBINE-1 | 2022-01-02 21:00:00 | 785.667 |
| demo_TURBINE-1 | 2022-01-02 22:00:00 | 785.667 |
| demo_TURBINE-1 | 2022-01-02 23:00:00 | 785.667 |
Let's materialize our feature...
f = c3.Feature(id="WindTurbine#activePowerDiffFeature").get()
f.materialize(sync=True){
"type" : "Feature.Store.MaterializationJob<WindTurbine>",
"typeIdent" : "MAPRJ:FMRJ",
"id" : "ed5ce871-af36-4615-bdf4-0e67801988cb",
"meta" : {
"created" : "2024-04-20T01:43:39Z",
"updated" : "2024-04-20T01:43:39Z",
"timestamp" : "2024-04-20T01:43:39Z"
},
"version" : 1
}Now, we can evaluate our feature from the materialized data.
c3.WindTurbine.evalFeaturesBatch(features=['activePowerDiffFeature'], filter="name=='TURBINE-1'",
start='2022-01-02', end='2022-01-03')| subject | timestamp | activePowerDiffFeature |
|---|---|---|
| demo_TURBINE-1 | 2022-01-02 00:00:00 | 560 |
| demo_TURBINE-1 | 2022-01-02 01:00:00 | 560 |
| demo_TURBINE-1 | 2022-01-02 02:00:00 | 560 |
| demo_TURBINE-1 | 2022-01-02 03:00:00 | 560 |
| demo_TURBINE-1 | 2022-01-02 04:00:00 | 560 |
| demo_TURBINE-1 | 2022-01-02 05:00:00 | 560 |
| demo_TURBINE-1 | 2022-01-02 06:00:00 | 560 |
| demo_TURBINE-1 | 2022-01-02 07:00:00 | 560 |
| demo_TURBINE-1 | 2022-01-02 08:00:00 | 560 |
| demo_TURBINE-1 | 2022-01-02 09:00:00 | 560 |
| demo_TURBINE-1 | 2022-01-02 10:00:00 | 560 |
| demo_TURBINE-1 | 2022-01-02 11:00:00 | 560 |
| demo_TURBINE-1 | 2022-01-02 12:00:00 | 498.333 |
| demo_TURBINE-1 | 2022-01-02 13:00:00 | 498.333 |
| demo_TURBINE-1 | 2022-01-02 14:00:00 | 785.667 |
| demo_TURBINE-1 | 2022-01-02 15:00:00 | 785.667 |
| demo_TURBINE-1 | 2022-01-02 16:00:00 | 785.667 |
| demo_TURBINE-1 | 2022-01-02 17:00:00 | 785.667 |
| demo_TURBINE-1 | 2022-01-02 18:00:00 | 785.667 |
| demo_TURBINE-1 | 2022-01-02 19:00:00 | 785.667 |
| demo_TURBINE-1 | 2022-01-02 20:00:00 | 785.667 |
| demo_TURBINE-1 | 2022-01-02 21:00:00 | 785.667 |
| demo_TURBINE-1 | 2022-01-02 22:00:00 | 785.667 |
| demo_TURBINE-1 | 2022-01-02 23:00:00 | 785.667 |
Time Horizon and Materialization Time Range
The feature store needs "outer bounds" (earliest and latest) on time series data. This is for the following reasons:
- For certain calculations, independent of data source, the feature store needs to know the maximum distance back in time to compute values (e.g. finding the first value in a series or a windowed function).
- Some calculations need to consider data in the future (e.g. forecasting)
- When performing a full materialization, the feature store needs to know the time range to materialize. Particularly for lambda feature sets, the feature store will have no way of knowing what underlying data is available.
- Companies may have data retention policies mandating deletion of data beyond a certain age.
There are two ways this has been addressed in the feature store: materialization time range and time horizon.
Materialization Time Range
This is deprecated, but still the default approach for 8.4. There are two configuration parameters:
Feature.Store.Config.defaultMaterializeTimeRangeprovides a global default. If it is not set, the time range will be [now() - 5 years, now).- You can override this for individual feature sets using
materializeTimeRangeon theFeature.Setdefinition.
If you try to read or materialize outside of this time range, the operation will succeed, but you will not see any data outside of this range. That behavior can be confusing if you know raw data is available for a time range, but that range is outside the limits.
Time Horizon
Starting with 8.8, the behaviour where attempting to read or materialize outside of the time range results in an error is enabled by default. In this case, the materialization time range is ignored, and there are two associated configuration parameters:
Feature.Store.Config.defaultTimeHorizonprovides a global default. If it is not set, the time range will be [now() - 5 years, now).- You can override this for individual feature sets using
timeHorizonon theFeature.Setdefinition.
If you try to read or materialize outside of this time range, the operation will fail.
Recommendation
We recommend that, for most cases, setting the time horizon to a wide range, from the earliest data you expect to have to the present. Unless you have a specfic data retention policy for your feature data, it is usually most convenient to set the start of the range to an absolute value (e.g. "2022-02-01") instead of a relative value (e.g. now() - 5 years).
The examples below follow these recommendations. First, let's look at the current feature store configuration. If values are not returned, they will be at their defaults (e.g useOldMaterializedTimeRange = False).
c3.Feature.Store.config(){
"type" : "Feature.Store.Config",
"configOverride" : "APP",
"materializeBatchSize" : 100,
"recommendedBlockSizeMb" : 16.0,
"defaultTimeHorizon" : {
"type" : "Pair<string, string>",
"fst" : "dateTime('2022-02-01')",
"snd" : "now()"
}
}Now, let's set the default time horizon. We will set the start point to a absolute date and the end point to the present. For the purposes of this tutorial, we set the start point after the earliest raw data, but real applications would set it to before the earliest raw data.
horizon_start = "dateTime('2022-02-01')" # a month after the start of the data
horizon_end = "now()"
time_horizon = c3.Pair.ofStr(horizon_start, horizon_end)
config = c3.Feature.Store.config().withDefaultTimeHorizon(time_horizon)
config.setConfig()c3.Feature.Store.config(){
"type" : "Feature.Store.Config",
"configOverride" : "APP",
"materializeBatchSize" : 100,
"defaultTimeHorizon" : {
"type" : "Pair<string, string>",
"fst" : "dateTime('2022-02-01')",
"snd" : "now()"
}
}If we try to materialize after the time horizon, we get an error.
try:
fs.materialize(sync=True, start='2022-01-01', end='2022-06-02')
except RuntimeError as e:
err_msg = str(e).split('"errorMsg" : ')[1][:352]
print(f'\033[91m {err_msg}')Error invoking Java method Feature.Store.MaterializationJob<WindTurbine>#map:
Invalid value {\"start\":\"2022-01-01T00:00:00\",\"end\":\"2022-06-02T00:00:00\"} for Input timeRange {\"start\":\"2022-01-01T00:00:00\",\"end\":\"2022-06-02T00:00:00\"} is out of bound of the time horizon {\"start\":\"2022-02-01T00:00:00\",\"end\":\"2025-03-12T22:00:00\"}
Now, let's do a full materialization. This will use the time horizon as its limits.
fs.materialize(sync=True){
"type" : "Feature.Store.MaterializationJob<WindTurbine>",
"typeIdent" : "MAPRJ:FMRJ",
"id" : "3331fa9c-93c5-4050-958a-914126fd9563",
"meta" : {
"created" : "2024-04-20T01:43:44Z",
"updated" : "2024-04-20T01:43:44Z",
"timestamp" : "2024-04-20T01:43:44Z"
},
"version" : 1
}The earliest data returned is now February 2022, even though the raw data starts at January 2022.
c3.WindTurbine.evalFeatureSetBatch(filter="name=='TURBINE-1'", featureSet=fs).to_pandas()| subject | timestamp | activePowerDiffFeature | generatorRotationSpeedDiffFeature | gearOilTemperatureDiffFeature |
|---|---|---|---|---|
| demo_TURBINE-1 | 2022-02-01 00:00:00 | 0.0 | 0.0 | 7.1 |
| demo_TURBINE-1 | 2022-02-01 01:00:00 | nan | nan | 7.1 |
| demo_TURBINE-1 | 2022-02-01 02:00:00 | nan | nan | 14.7 |
| demo_TURBINE-1 | 2022-02-01 03:00:00 | nan | nan | 14.7 |
| demo_TURBINE-1 | 2022-02-01 04:00:00 | 474.5 | 625.0 | 14.7 |
| ... | ... | ... | ... | ... |
| demo_TURBINE-1 | 2024-04-19 21:00:00 | 0.0 | 0.0 | 0.0 |
| demo_TURBINE-1 | 2024-04-19 22:00:00 | 0.0 | 0.0 | 0.0 |
| demo_TURBINE-1 | 2024-04-19 23:00:00 | 0.0 | 0.0 | 0.0 |
| demo_TURBINE-1 | 2024-04-20 00:00:00 | 0.0 | 0.0 | 0.0 |
| demo_TURBINE-1 | 2024-04-20 01:00:00 | 0.0 | 0.0 | 0.0 |
If we try to evaluate outside the time horizon, we get an error.
try:
c3.WindTurbine.evalFeatureSetBatch(filter="name=='TURBINE-1'", featureSet=fs,
start='2022-01-01', end='2022-02-05').to_pandas()
except Exception as e:
err_msg = str(e).split("Error invoking Java method Feature.Store#readFeatureSet:")[1][:250]
print(f'\033[91m {err_msg}')
Invalid value {"start":"2022-01-01T00:00:00","end":"2022-02-05T00:00:00"} for Input timeRange {"start":"2022-01-01T00:00:00","end":"2022-02-05T00:00:00"} is out of bound of the time horizon {"start":"2022-02-01T00:00:00","end":"2025-03-12T22:00:00"}# reset the time horizon back to the default.
c3.Feature.Store.config().withDefaultTimeHorizon(None).setConfig()You can also change the time horizon for a specific feature set, by including a timeRange field in the Feature.Set definition. Here is an example of how this can be done:
horizon_start = "dateTime('2022-03-01')" # a month after the start of the data
horizon_end = "now()"
time_horizon = c3.Pair.ofStr(horizon_start, horizon_end)
new_fs = c3.Feature.Set(
id='WindTurbine#limitedTimeHorizon',
name='limitedTimeHorizon',
subjectType=c3.WindTurbine,
interval='HOUR',
features = ['activePowerDiffFeature', "generatorRotationSpeedDiffFeature","gearOilTemperatureDiffFeature"],
timeHorizon = time_horizon)
new_fs.merge(){
"type" : "Feature.Set",
"id" : "WindTurbine#limitedTimeHorizon",
"name" : "limitedTimeHorizon",
"meta" : {
"created" : "2024-04-20T01:43:46Z",
"updated" : "2024-04-20T01:43:46Z",
"timestamp" : "2024-04-20T01:43:46Z"
},
"version" : 1
}If we evaluate from this feature set, we see data starting in March 2022.
c3.WindTurbine.evalFeatureSetBatch(batch=['demo_TURBINE-2'], featureSet=new_fs, skipMaterialized=True)| subject | timestamp | activePowerDiffFeature | generatorRotationSpeedDiffFeature | gearOilTemperatureDiffFeature |
|---|---|---|---|---|
| demo_TURBINE-2 | 2022-03-01 00:00:00 | 803.4 | 385.5 | 12.833333 |
| demo_TURBINE-2 | 2022-03-01 01:00:00 | 803.4 | 385.5 | 12.833333 |
| demo_TURBINE-2 | 2022-03-01 02:00:00 | 803.4 | 385.5 | 7.100000 |
| demo_TURBINE-2 | 2022-03-01 03:00:00 | 803.4 | 385.5 | 7.100000 |
| demo_TURBINE-2 | 2022-03-01 04:00:00 | 803.4 | 385.5 | 7.100000 |
| ... | ... | ... | ... | ... |
| demo_TURBINE-2 | 2024-04-19 21:00:00 | 0.0 | 0.0 | 0.000000 |
| demo_TURBINE-2 | 2024-04-19 22:00:00 | 0.0 | 0.0 | 0.000000 |
| demo_TURBINE-2 | 2024-04-19 23:00:00 | 0.0 | 0.0 | 0.000000 |
| demo_TURBINE-2 | 2024-04-20 00:00:00 | 0.0 | 0.0 | 0.000000 |
| demo_TURBINE-2 | 2024-04-20 01:00:00 | 0.0 | 0.0 | 0.000000 |
Snapshots
Over time, the underlying source data (e.g., database entries) that feed into a Feature can change. This could be the result of correcting erronous data points that were initially loaded for a WindTurbine. As a result, the values of the same Feature for the same timestamp can change over time. In many applications we are interested to persist a snapshot of our Feature data at a given point in time. Among other reasons, this is important in cases where we would like to inspect the exact training data that was used to train a specific ML model. You can create a snapshot of a Feature.Set by providing a snapshotId, and a filter or and a batch of source objects to the Feature.Set.createSnapshot API. Here, we create an array with both of our wind turbines.
wind_turbines = c3.WindTurbine.fetch(include='name, id').objs
for wt in wind_turbines:
print(wt.name)TURBINE-1
TURBINE-2
Let's see what we've materialized for these subjects...
fs = c3.Feature.Set(id='WindTurbine#metricsFeatureSet2').get()
eval_result = c3.WindTurbine.evalFeatureSetBatch(batch=wind_turbines,
featureSet=fs)
eval_result.head()| subject | timestamp | activePowerDiffFeature | generatorRotationSpeedDiffFeature | gearOilTemperatureDiffFeature |
|---|---|---|---|---|
| demo_TURBINE-1 | 2019-04-20 01:00:00 | 0 | 0 | 0 |
| demo_TURBINE-1 | 2019-04-20 02:00:00 | 0 | 0 | 0 |
| demo_TURBINE-1 | 2019-04-20 03:00:00 | 0 | 0 | 0 |
| demo_TURBINE-1 | 2019-04-20 04:00:00 | 0 | 0 | 0 |
| demo_TURBINE-1 | 2019-04-20 05:00:00 | 0 | 0 | 0 |
A snapshot is created by the createSnapshot method on the feature set. It takes a snapshot ID and the specific subjects (via a list or a filter). The snapshot process runs as a background job, so we wait for its completion.
snapshot_id = 'demo-data-snapshot'
job = fs.createSnapshot({'snapshotId': snapshot_id, 'subjects': wind_turbines})
job.waitForCompletion(){
"type" : "MapReduceStatus",
"started" : "2024-04-20T01:43:49Z",
"startedby" : "Jeff.Fischer@c3.ai",
"completed" : "2024-04-20T01:43:52Z",
"status" : "completed"
}To see the snapshot data, we run the same eval as before, but include our snapshot ID.
snapshot_eval_result = c3.WindTurbine.evalFeatureSetBatch(batch=wind_turbines,
featureSet=fs,
snapshotId=snapshot_id)
snapshot_eval_result.head()| subject | timestamp | activePowerDiffFeature | generatorRotationSpeedDiffFeature | gearOilTemperatureDiffFeature |
|---|---|---|---|---|
| demo_TURBINE-1 | 2019-04-20 01:00:00 | 0 | 0 | 0 |
| demo_TURBINE-1 | 2019-04-20 02:00:00 | 0 | 0 | 0 |
| demo_TURBINE-1 | 2019-04-20 03:00:00 | 0 | 0 | 0 |
| demo_TURBINE-1 | 2019-04-20 04:00:00 | 0 | 0 | 0 |
| demo_TURBINE-1 | 2019-04-20 05:00:00 | 0 | 0 | 0 |
We can see that as a result of this evaluation a new Feature.Set.Snapshot is created
c3.Feature.Set.Snapshot.fetch()| ID | snapshotId | subjectType | subjectFilter | featureSet | featureFingerprint | start | end | meta | version |
|---|---|---|---|---|---|---|---|---|---|
WindTurbine#demo-data-snapshot | demo-data-snapshot | WindTurbine | intersects(id, ["demo_TURBINE-1", "demo_TURBINE-2"]) | WindTurbine#metricsFeatureSet2 | -718903540006309613 | 2019-04-20T01:00:00 | 2024-04-20T02:00:00 | {…} | 1 |
We can inspect the snapshot to see the subjects that were included.
[subject.id for subject in c3.Feature.Set.Snapshot(id=f'WindTurbine#{snapshot_id}').get().fetchSubjects().collect()]['demo_TURBINE-1', 'demo_TURBINE-2']To illustrate how snapshots work we will change the underlying data for activePowerDiffFeature. Let's fetch and view some of the WindTurbineMeasurements for TURBINE-1 snap_shot_result
turbine_1_measurements = c3.WindTurbineMeasurement.fetch(filter="parent=='demo_TURBINE-1_series' && start<'2022-03-01'")
# set the activePower field for these measurements to one
new_entries = []
for point in turbine_1_measurements.objs:
new_point = point.toJson()
new_point['activePower'] = 1
for field in ['start', 'version']:
del new_point[field]
new_entries.append(new_point)
_ = c3.WindTurbineMeasurement.mergeBatch(new_entries)
# check that the data has been updated as expected (activePower set to 1)
c3.WindTurbineMeasurement.eval(filter="parent=='demo_TURBINE-1_series' && start>='2022-02-01'").head()| start | isEstimated | parent | ID | gearOilTemperature | generatorRotationSpeed | activePower |
|---|---|---|---|---|---|---|
| 2022-02-01 00:06:19 | False | demo_TURBINE-1_series | demo_TURBINE-1_series#474 | 21.3 | 2290 | 1 |
| 2022-02-01 00:38:54 | False | demo_TURBINE-1_series | demo_TURBINE-1_series#479 | 11.7 | 2248 | 1 |
| 2022-02-01 01:45:15 | False | demo_TURBINE-1_series | demo_TURBINE-1_series#469 | 11.6 | 1766 | 1 |
| 2022-02-01 02:03:00 | False | demo_TURBINE-1_series | demo_TURBINE-1_series#475 | 27.1 | 2339 | 1 |
| 2022-02-01 02:40:21 | False | demo_TURBINE-1_series | demo_TURBINE-1_series#468 | 25.5 | 2443 | 1 |
Now that we have updated the underlying data, let's materialize our feature set and evaluate the feature values. Since activePowerDiffFeature calculates a rolling diff of WindTurbine.activePower, for the periods where the timeseries is constant we expect the feature value to be zero.
fs.materialize(sync=True){
"type" : "Feature.Store.MaterializationJob<WindTurbine>",
"typeIdent" : "MAPRJ:FMRJ",
"id" : "595b7268-1c27-4e6e-88a2-72aadccc482f",
"meta" : {
"created" : "2024-04-20T01:44:00Z",
"updated" : "2024-04-20T01:44:00Z",
"timestamp" : "2024-04-20T01:44:00Z"
},
"version" : 1
}eval_fs_result = c3.WindTurbine.evalFeatureSetBatch(filter="name=='TURBINE-1'",
featureSet=fs,
start='2022-02-01',
end='2022-03-01')
eval_fs_result[['subject', 'timestamp', 'activePowerDiffFeature']].head()| subject | timestamp | activePowerDiffFeature |
|---|---|---|
| demo_TURBINE-1 | 2022-02-01 00:00:00 | 0 |
| demo_TURBINE-1 | 2022-02-01 01:00:00 | 0 |
| demo_TURBINE-1 | 2022-02-01 02:00:00 | 0 |
| demo_TURBINE-1 | 2022-02-01 03:00:00 | 0 |
| demo_TURBINE-1 | 2022-02-01 04:00:00 | 0 |
We can see that as expected the feature values have been updated. Now let's inspect the result of the same evaluation with the snapshot_id that we defined above:
feature_set = c3.Feature.Set(id='WindTurbine#metricsFeatureSet2').get()
snapshot_id = 'demo-data-snapshot'
eval_fs_result = c3.WindTurbine.evalFeatureSetBatch(filter="name=='TURBINE-1'",
featureSet=fs,
snapshotId=snapshot_id,
start='2022-02-01',
end='2022-03-01')
eval_fs_result[['subject', 'timestamp', 'activePowerDiffFeature']].head()| subject | timestamp | activePowerDiffFeature |
|---|---|---|
| demo_TURBINE-1 | 2022-02-01 00:00:00 | nan |
| demo_TURBINE-1 | 2022-02-01 01:00:00 | nan |
| demo_TURBINE-1 | 2022-02-01 02:00:00 | nan |
| demo_TURBINE-1 | 2022-02-01 03:00:00 | nan |
| demo_TURBINE-1 | 2022-02-01 04:00:00 | 474.5 |
As expected, we can see that the snapshot correctly returns the feature values prior to our changes to the WindTurbineMeasurement data.
Snapshots are immutable -- you cannot create another snapshot under a given subject type with the same snapshotId, unless you delete it first.
# Try to create a snapshot using the same ID we already used
try:
job = fs.createSnapshot({'snapshotId': snapshot_id, 'subjects': wind_turbines})
except RuntimeError as e:
print(f'\033[91m {e}')[91m Error invoking Java method Feature.Set#createSnapshot: Duplicate value 'Snapshot may already exists. Please use a different `snapshotId` or delete the existing snapshot (using Feature.Set#deleteSnapshot) before attempting again.'.
Error calling /api/8/Feature.Set/createSnapshot with [{"type":"Feature.Set","id":"WindTurbine#metricsFeatureSet2","name":"metricsFeatureSet2","meta":{"type":"Meta","appCode":1796811345014444398,"env":"plat88236","app":"dsquickstart","created":"2024-04-20T00:27:35+00:00","createdBy":"worker","updated":"2024-04-20T00:27:35+00:00","updatedBy":"worker","timestamp":"2024-04-20T00:27:35+00:00","fetchInclude":"[]","fetchType":"Feature.Set"},"version":1,"subjectType":"WindTurbine","summary":"Three max difference engineered features.","description":"These are metrics-backed features, created via seed.","interval":"HOUR","features":{"type":"@indexed set<!string serialized Feature>","value":["activePowerDiffFeature","generatorRotationSpeedDiffFeature","gearOilTemperatureDiffFeature"]}},{"snapshotId":"demo-data-snapshot","subjects":{"type":"[mixing WindTurbine]","value":[{"type":"WindTurbine","id":"demo_TURBINE-1","name":"TURBINE-1","meta":{"type":"Meta","fetchInclude":"[name,id,version]","fetchType":"WindTurbine"},"version":131073},{"type":"WindTurbine","id":"demo_TURBINE-2","name":"TURBINE-2","meta":{"type":"Meta","fetchInclude":"[name,id,version]","fetchType":"WindTurbine"},"version":131073}]}}]
Clean up
The Feature Store provides several APIs for cleaning up data and metadata. These are particularly useful in testing and development or if you need to get the feature store back to a clean state.
# Remove the created snapshot
c3.Feature.Set(id='WindTurbine#metricsFeatureSet2').get().deleteSnapshot('demo-data-snapshot', True)# There is an API to remove all materialized data for a Feature.Set. Since that could potentially be a lot of data, it runs as a batch job.
c3.Feature(id='WindTurbine#activePowerDiffFeature').deleteMaterializedData(confirm=True).waitForCompletion()
c3.Feature.Set(id='WindTurbine#metricsFeatureSet2').deleteMaterializedData(confirm=True).waitForCompletion();# remove the feature set that we had created
c3.Feature.Set(id='WindTurbine#limitedTimeHorizon').get().removeSeedData()True# Reset the time horizon on the feature store, to put it back to its default state.
c3.Feature.Store.config().withDefaultTimeHorizon(None).setConfig()