Create Features from Python Functions using Lambda Feature Sets
In the examples below, we create features using the WindTurbine application data model. We first write a Python function that retrieves measurement data using the WindTurbineVibration.eval() API and then computes feature values via Pandas APIs. This function returns a Pandas time series DataFrame with columns for all the features, as well as the subject ID and timestamp. After testing the function interactively, we create a Feature.Set, wrapping our Python function in a C3 AI Lambda and specifying it as the src parameter. This flavor of feature set is called a Lambda Feature Set. After creating the feature set, we materialize and then retrieve the feature data.
We then repeat this process, with a few differences:
- We will create a lambda function that accepts a list of WindTurbines and returns the computed feature data for all those turbines. This will be specified as the
srcBatchparameter to the Feature.Set. - We will query data and create fetures from
WindTurbineMeasurement, which involves a more complex data model.
Alternative approach
You can also create features from C3 AI Metrics. See the tutorial Create Features Using C3 Metrics or Metric Expressions for details. It uses the same application data model as this tutorial, but uses the expression language provided by C3 AI Metrics for defining the individual features.
Review the Application Data
We review the application data. We have four types of interest:
WindTurbine- this has a row for each wind turbine and is the subject type on which we will create features.WindTurbineVibration- time series vibration data for each wind turbine. It is directly a child ofWindTurbineand is stored in PostgeSQL rather than Cassandra. This makes it easier to query in the Python function. We will use this entity in our first example.WindTurbineMeasurementSeries- this is a "time series" header type which holds a time series for each wind turbine. This level of the data model is not needed for time series features created via Python functions. We include it here to enable features created via metrics in the same application.WindTurbineMeasurement- this contains the actual time series data measurements underWindTurbineMeasurementSeries. We will use this in our second example.
We can use the eval() method to see the data at each level of our data model. Note that eval() can be used to retrieve entity fields, features, and metrics. By default, it returns the fields on the entity.
c3.WindTurbine.eval()| ID | name | location | coordinates | power | manufacturer | unplannedMaintenanceEvents | plannedMaintenanceEvents |
|---|---|---|---|---|---|---|---|
| demo_TURBINE-1 | TURBINE-1 | York | York | 150 | demo_Siemens | 0 | 0 |
| demo_TURBINE-2 | TURBINE-2 | York | York | 50 | demo_Siemens | 0 | 0 |
(truncated rightmost 1 columns)
c3.WindTurbineVibration.eval(limit=10)| ID | asset | energyNear2xRpm | energyNear1xRpm | measurementDate |
|---|---|---|---|---|
| 01477819-03d7-4892-a1b5-73ae8e19edb1 | demo_TURBINE-2 | 39.43 | 40.18 | 2022-01-09 01:06:00 |
| 032503df-e518-46f3-92ab-72e1c1f3aec4 | demo_TURBINE-1 | 49.43 | 45.33 | 2022-05-27 01:06:00 |
| 0351df2a-9e74-41c7-b6b1-45fed87b7861 | demo_TURBINE-2 | 60.56 | 36.47 | 2022-05-25 00:56:00 |
| 03a31a2c-77c9-4484-9d2f-6823726a6228 | demo_TURBINE-2 | 57.26 | 35.07 | 2022-02-17 00:23:00 |
| 04263649-edf9-4657-938b-8ceb0689374a | demo_TURBINE-1 | 71.31 | 36.52 | 2022-03-18 00:37:00 |
| 04406e2b-8dfa-4871-a213-9c4e49aaf9eb | demo_TURBINE-2 | 55.74 | 47.82 | 2022-04-08 01:38:00 |
| 0483239e-2f89-43a0-9454-e0c2aceda03e | demo_TURBINE-2 | 51.19 | 41.67 | 2022-04-25 00:08:00 |
| 06e1b7c8-8c72-49d8-a330-edc7f97afb5f | demo_TURBINE-1 | 72.7 | 35.24 | 2022-03-06 01:28:00 |
| 086330d3-c3cd-4f79-880a-0ca71d326d40 | demo_TURBINE-2 | 45.22 | 56.24 | 2022-05-09 00:41:00 |
| 09a6fa06-6853-4693-a061-3c6c455fae33 | demo_TURBINE-2 | 45.81 | 18.62 | 2022-05-22 01:25:00 |
Let's see the time range of the underlying raw vibration data.
df = c3.WindTurbineVibration.eval(projection='measurementDate').to_pandas()
(df.min(), df.max())(measurementDate 2022-01-01 00:39:00
dtype: datetime64[ns],
measurementDate 2022-06-01 01:23:00
dtype: datetime64[ns])Thus, we have vibration data from January 1, 2022 through June 1, 2022.
c3.WindTurbineMeasurementSeries.eval()| ID | windTurbine | treatment |
|---|---|---|
| demo_TURBINE-1_series | demo_TURBINE-1 | rate |
| demo_TURBINE-2_series | demo_TURBINE-2 | rate |
c3.WindTurbineMeasurement.eval(limit=10)| start | parent | ID | gearOilTemperature | generatorRotationSpeed | activePower |
|---|---|---|---|---|---|
| 2022-01-01 00:45:16 | demo_TURBINE-1_series | demo_TURBINE-1_series#1 | 36.4 | 1758 | 5018 |
| 2022-01-01 02:27:19 | demo_TURBINE-1_series | demo_TURBINE-1_series#3 | 46.8 | 2446 | 5474 |
| 2022-01-01 05:42:09 | demo_TURBINE-1_series | demo_TURBINE-1_series#7 | 31.9 | 2184 | 4989 |
| 2022-01-01 08:47:58 | demo_TURBINE-1_series | demo_TURBINE-1_series#6 | 33.8 | 1846 | 4568 |
| 2022-01-01 11:21:47 | demo_TURBINE-1_series | demo_TURBINE-1_series#9 | 14.5 | 2068 | 5688 |
| 2022-01-01 13:38:38 | demo_TURBINE-1_series | demo_TURBINE-1_series#4 | 17.5 | 2566 | 4707 |
| 2022-01-01 14:54:16 | demo_TURBINE-1_series | demo_TURBINE-1_series#8 | 18.6 | 2681 | 4772 |
| 2022-01-01 16:49:09 | demo_TURBINE-1_series | demo_TURBINE-1_series#5 | 23.9 | 2838 | 4849 |
| 2022-01-01 18:01:37 | demo_TURBINE-1_series | demo_TURBINE-1_series#14 | 14.1 | 2305 | 5259 |
| 2022-01-01 18:17:51 | demo_TURBINE-1_series | demo_TURBINE-1_series#13 | 12.3 | 2446 | 4735 |
Write a Python function to compute the feature values
To create features, we write a Python function that takes as inputs a specific subject, an evaluation spec, and a feature set. It returns a DataFrame containing columns for the subject ID and timestamp as well as columns for each feature.
Retrieving the raw data
To do this, we first need to retrieve the raw time series data for the fields we want (the energyNear1xRpm and energyNear2xRpm fields of WindTurbineVibration in this case). In the Wind Turbine data model, the WindTurbineVibration is a direct child of WindTurbine and its asset field is a reference to the parent.
We can use WindTurbineVibration.eval() to retrieve our data, and then convert it to Pandas. When determining the set of rows to retrive, we need to consider the following:
- We only want rows for the requested
WindTurbine - The passed in spec includes start and end times. We only want to return rows beginning at
spec.startand beforespec.end. - To compute these rows, we may need additional data, e.g. for windowed/rolling functions and forward filling values. Otherwise, we may incorrectly return NaN values. This is particularly important for incremental materialization, where we are computing limited time slices of the feature data.
Handling no data for a subject
Note that it is possible for a query to return no matching rows. For example, a given subject might have a row in WindTurbine, but no vibration data. Since the feature store does not know where you are getting this data from, you need to handle this case. Often, some of the Pandas operations will fail if the DataFrame is empty. Thus, it is often easiest to check for no rows returned from your queries and return early with an empty DataFrame.
Resampling
We want to resample to a daily interval. First, we sort the DataFrame rows by timestamp to ensure that the resample works correctly and that the final result returned will be sorted. Next, we perform the actual resample. We then fill missing data with the most recent value so that rolling feature calculations work correctly.
Computing feature values
We can now compute our features based on this resampled time series data. For this case, we simply compute the ratio of energyNear1xRpm and energyNear2xRpm.
Returning the final DataFrame
After computing the feature data, we add a subject column to our DataFrame. Since we are querying one subject at a time, we can simply create a constant string column and insert it into our DataFrame. Finally, we wrap our DataFrame in a CustomMaterializationResult and return it.
def compute_vibration_features(subject, spec, fs):
import pandas as pd
# Get the time range from the spec.
start = spec.start
end = spec.end
# We adjust the start time of our query to account for forward filling (used below) and rolling/windowed
# computations (not used in this case, but common). This is needed for incremental materialization,
# where you may only want to materialize data for a specific time slice and not the complete time horizon.
lookback_days = 7
lookback_start = start.minusDays(lookback_days)
# The "fs" parameter will contain the feature set definition. We don't needed it here, so it can be safely ignored.
# build a filter that will retrieve vibration data for the specified subject and time range.
filter = c3.Filter.eq('asset', subject.id)
filter = filter.and_(c3.Filter.ge('measurementDate', lookback_start))
filter = filter.and_(c3.Filter.lt('measurementDate', end))
# retrieve the raw data, convert to pandas, and rename the timestamp column
df = c3.WindTurbineVibration.eval(filter=filter, projection="measurementDate, energyNear2xRpm, energyNear1xRpm",
order='measurementDate',
limit=-1 # make sure that you get back of the data, not just the first 2000 rows
)
df = df.to_pandas() # for the rest of the feature engineering, we use Pandas directly
if len(df)==0:
print("No data for batch")
return c3.Feature.CustomMaterializationResult(data=pd.DataFrame()) # handle case for no data for this batch
df = df.rename(columns={'measurementDate':'timestamp'})
# Aggregate to an hourly interval and fill missing values
df = df.resample('1D', on='timestamp').mean().fillna(method='ffill').reset_index()
# compute a new feature: ratio between the two energy values
df['energyRatio'] = df['energyNear2xRpm']/df['energyNear1xRpm']
# we queried for lookback_days extra days before the start. We now reduce the range to the original requested range.
df = df.loc[df['timestamp']>=start].reset_index(drop=True)
# Every feature store dataframe needs a subject column. In this case, we only have one subject, so we just repeat it.
subject_col = pd.Series([subject.id]*len(df)).astype('string')
df.insert(0, 'subject', subject_col)
return c3.Feature.CustomMaterializationResult(data=df)We can interactively call this function with a specific turbine to verify that it gives the expected results.
df = compute_vibration_features(c3.WindTurbine(id='demo_TURBINE-1').get(),
c3.EvalMetricsSpec(start=c3.DateTime.fromString('2022-04-01'),
end=c3.DateTime.fromString('2022-04-05')), None)
df.data| subject | timestamp | energyNear2xRpm | energyNear1xRpm | energyRatio |
|---|---|---|---|---|
| demo_TURBINE-1 | 2022-04-01 | 83.82 | 45.81 | 1.82973 |
| demo_TURBINE-1 | 2022-04-02 | 67.62 | 28.6 | 2.36434 |
| demo_TURBINE-1 | 2022-04-03 | 57.07 | 41.2 | 1.38519 |
| demo_TURBINE-1 | 2022-04-04 | 66.73 | 49.56 | 1.34645 |
Let's look at the Pandas data types of the columns we returned.
df.data.dtypessubject string
timestamp datetime64[ns]
energyNear2xRpm float64
energyNear1xRpm float64
energyRatio float64
dtype: objectCreate the Feature.Set
To create a Feature.Set from this function, we first build a mapping of the columns returned by the function to their (C3) types. We then construct the Feature.Set object passing the compute_turbine_features function (wrapped in a c3.Lambda) and our dict of columns. Finally, we use merge() to save this to the database.
columns = {
'subject': 'string',
'timestamp': 'datetime',
'energyNear2xRpm': 'double',
'energyNear1xRpm': 'double',
'energyRatio': 'double'
}
vibration_fs = c3.Feature.Set(id='WindTurbine#vibrationFeatureSet',
name="vibrationFeatureSet",
subjectType=c3.WindTurbine,
columns=columns,
src=c3.Lambda.fromPyFunc(compute_vibration_features) # if you have special library requirements, specify an action requirement here as well
).merge().get()
vibration_fs{
"type" : "Feature.Set",
"id" : "WindTurbine#vibrationFeatureSet",
"name" : "vibrationFeatureSet",
"meta" : {
"appCode" : 1796811345014444398,
"env" : "plat88236",
"app" : "dsquickstart",
"created" : "2024-04-20T01:22:58Z",
"createdBy" : "Jeff.Fischer@c3.ai",
"updated" : "2024-04-20T01:22:58Z",
"updatedBy" : "Jeff.Fischer@c3.ai",
"timestamp" : "2024-04-20T01:22:58Z",
"fetchInclude" : "[]",
"fetchType" : "Feature.Set"
},
"version" : 1,
"subjectType" : "WindTurbine",
"src" : {
"type" : "Lambda<function(subject: any, spec: any, fs: any): any>",
"language" : "Python",
"implementation" : "def compute_vibration_features(subject, spec, fs):\n import pandas as pd\n start = spec.start\n end = spec.end\n lookback_days = 7\n lookback_start = start.minusDays(lookback_days)\n filter = c3.Filter.eq('asset', subject.id)\n filter = filter.and_(c3.Filter.ge('measurementDate', lookback_start))\n filter = filter.and_(c3.Filter.lt('measurementDate', end))\n df = c3.WindTurbineVibration.eval(filter=filter, projection='measurementDate, energyNear2xRpm, energyNear1xRpm', order='measurementDate', limit=-1)\n df = df.to_pandas()\n if len(df) == 0:\n print('No data for batch')\n return c3.Feature.CustomMaterializationResult(data=pd.DataFrame())\n df = df.rename(columns={'measurementDate': 'timestamp'})\n df = df.resample('1D', on='timestamp').mean().fillna(method='ffill').reset_index()\n df['energyRatio'] = df['energyNear2xRpm'] / df['energyNear1xRpm']\n df = df.loc[df['timestamp'] >= start].reset_index(drop=True)\n subject_col = pd.Series([subject.id] * len(df)).astype('string')\n df.insert(0, 'subject', subject_col)\n return c3.Feature.CustomMaterializationResult(data=df)"
},
"columns" : {
"subject" : "string",
"timestamp" : "datetime",
"energyNear2xRpm" : "double",
"energyNear1xRpm" : "double",
"energyRatio" : "double"
}
}Materialize the Feature.Set
We will now materialize the feature set, pre-computing the values and storing then in the Feature Store for fast retrieval. Let's materialize data for a specific time range -- from January 1, 2022 up to (but not including) April 1, 2022.
The parameter sync is True, indicating that we want to wait for the result. The actual materialization will run as an asynchronous batch job on the server.
vibration_fs.materialize(sync=True, start="2022-01-01", end="2022-04-01"){
"type" : "Feature.Store.MaterializationJob<WindTurbine>",
"typeIdent" : "MAPRJ:FMRJ",
"id" : "635ebc74-77a5-4bcc-a4ab-8172195df6ef",
"meta" : {
"created" : "2024-04-20T01:22:59Z",
"updated" : "2024-04-20T01:22:59Z",
"timestamp" : "2024-04-20T01:22:59Z"
},
"version" : 1
}Evaluate feature data
To retrieve feature data from the feature store, you can use WindTurbine.evalFeatureSetBatch
c3.WindTurbine.evalFeatureSetBatch(filter="id=='demo_TURBINE-1'",
featureSet=vibration_fs)| subject | timestamp | energyNear2xRpm | energyNear1xRpm | energyRatio |
|---|---|---|---|---|
| demo_TURBINE-1 | 2022-01-01 | 43.30 | 57.96 | 0.747067 |
| demo_TURBINE-1 | 2022-01-02 | 59.44 | 36.77 | 1.616535 |
| demo_TURBINE-1 | 2022-01-03 | 59.49 | 53.17 | 1.118864 |
| demo_TURBINE-1 | 2022-01-04 | 54.67 | 25.52 | 2.142241 |
| demo_TURBINE-1 | 2022-01-05 | 62.59 | 43.97 | 1.423471 |
| ... | ... | ... | ... | ... |
| demo_TURBINE-1 | 2022-03-27 | 61.36 | 41.83 | 1.466890 |
| demo_TURBINE-1 | 2022-03-28 | 68.64 | 79.79 | 0.860258 |
| demo_TURBINE-1 | 2022-03-29 | 57.49 | 43.79 | 1.312857 |
| demo_TURBINE-1 | 2022-03-30 | 54.08 | 33.87 | 1.596693 |
| demo_TURBINE-1 | 2022-03-31 | 57.90 | 49.88 | 1.160786 |
We will now materialize a subsequent time range from April 1, 2022 up to (but not including) June 2, 2022.
vibration_fs.materialize(sync=True, start="2022-04-01", end="2022-06-02"){
"type" : "Feature.Store.MaterializationJob<WindTurbine>",
"typeIdent" : "MAPRJ:FMRJ",
"id" : "6ff41978-098a-44ce-85f8-42a75cc6917f",
"meta" : {
"created" : "2024-04-20T01:23:12Z",
"updated" : "2024-04-20T01:23:12Z",
"timestamp" : "2024-04-20T01:23:12Z"
},
"version" : 1
}A call to evalFeatureSetBatch will now show us the full time range.
c3.WindTurbine.evalFeatureSetBatch(filter="id=='demo_TURBINE-1'",
featureSet=vibration_fs)| subject | timestamp | energyNear2xRpm | energyNear1xRpm | energyRatio |
|---|---|---|---|---|
| demo_TURBINE-1 | 2022-01-01 | 43.30 | 57.96 | 0.747067 |
| demo_TURBINE-1 | 2022-01-02 | 59.44 | 36.77 | 1.616535 |
| demo_TURBINE-1 | 2022-01-03 | 59.49 | 53.17 | 1.118864 |
| demo_TURBINE-1 | 2022-01-04 | 54.67 | 25.52 | 2.142241 |
| demo_TURBINE-1 | 2022-01-05 | 62.59 | 43.97 | 1.423471 |
| ... | ... | ... | ... | ... |
| demo_TURBINE-1 | 2022-05-28 | 66.05 | 37.97 | 1.739531 |
| demo_TURBINE-1 | 2022-05-29 | 74.18 | 38.77 | 1.913335 |
| demo_TURBINE-1 | 2022-05-30 | 64.93 | 56.02 | 1.159050 |
| demo_TURBINE-1 | 2022-05-31 | 41.27 | 35.88 | 1.150223 |
| demo_TURBINE-1 | 2022-06-01 | 43.48 | 31.71 | 1.371176 |
Write a lambda Feature.Set using srcBatch
Now, we show an example of writing a lambda Feature.Set by supplying the srcBatch parameter to the Feature.Set rather than the src parameter. This variant accepts as its first parameter a list of subjects rather than a single subject. Both approaches will produce the exact same results, but srcBatch may be more efficient, due to the reduction in total queries to the database achieved by batching multiple subjects.
def compute_vibration_features_batch(batch, spec, fs):
import pandas as pd
# we don't want the batch to be too large, as it will bring too much data into memory and create a large
# sql statement for all of the subjects. You should adjust this based on the expected amount of data per subject.
MAX_BATCH_SIZE=10
if len(batch)>MAX_BATCH_SIZE:
raise Exception(f"Should not use this approach for large batches, reeduce the batch size from {len(batch)} subjects to a value below {MAX_BATCH_SIZE}")
# Get the time range from the spec.
start = spec.start
end = spec.end
# We adjust the start time of our query to account for forward filling (used below) and rolling/windowed
# computations (not used in this case, but common). This is needed for incremental materialization,
# where we will only be materializing a time slize of the data
lookback_days = 7
lookback_start = start.minusDays(lookback_days)
# Build a filter that will retrieve vibration data for the specified batch of subjects and time range.
# The main difference from the previous case is that we use intersects here instead of comparing against a single subject ID.
filter = c3.Filter.intersects('asset', [subject.id for subject in batch])
filter = filter.and_(c3.Filter.ge('measurementDate', lookback_start))
filter = filter.and_(c3.Filter.lt('measurementDate', end))
# Retrieve the raw data, convert to pandas, and rename the subject and timestamp columns.
# Note that we need to get the subject ID this time (asset.id) since we have more than one subject.
df = c3.WindTurbineVibration.eval(filter=filter, projection="asset.id, measurementDate, energyNear2xRpm, energyNear1xRpm",
order='asset.id, measurementDate',
limit=-1 # make sure that you get back of the data, not just the first 2000 rows
)
df = df.to_pandas() # for the rest of the feature engineering, we use Pandas directly
if len(df)==0:
print("No data for batch")
return c3.Feature.CustomMaterializationResult(data=pd.DataFrame()) # handle case for no data for this batch
df = df.rename(columns={'asset_id':'subject', 'measurementDate':'timestamp'})
# Aggregate to an hourly interval and fill missing values.
df = df.groupby('subject').resample('1D', on='timestamp').mean().fillna(method='ffill').reset_index()
# compute a new feature: ratio between the two energy values
df['energyRatio'] = df['energyNear2xRpm']/df['energyNear1xRpm']
# we queried for lookback_days extra days before the start. We now reduce the range to the original requested range
df = df[df['timestamp']>=start]
return c3.Feature.CustomMaterializationResult(data=df)We can interactively call this function with a list of wind turbines to verify that it gives the expected results.
batch = c3.WindTurbine.fetch().objs # get a batch of turbinesdf = compute_vibration_features_batch(
batch,
c3.EvalMetricsSpec(start=c3.DateTime.fromString('2022-04-01'),
end=c3.DateTime.fromString('2022-04-05')),
None)
df.data| subject | timestamp | energyNear2xRpm | energyNear1xRpm | energyRatio |
|---|---|---|---|---|
| demo_TURBINE-1 | 2022-04-01 | 83.82 | 45.81 | 1.82973 |
| demo_TURBINE-1 | 2022-04-02 | 67.62 | 28.6 | 2.36434 |
| demo_TURBINE-1 | 2022-04-03 | 57.07 | 41.2 | 1.38519 |
| demo_TURBINE-1 | 2022-04-04 | 66.73 | 49.56 | 1.34645 |
| demo_TURBINE-2 | 2022-04-01 | 53.94 | 35.19 | 1.53282 |
| demo_TURBINE-2 | 2022-04-02 | 57.29 | 30.14 | 1.9008 |
| demo_TURBINE-2 | 2022-04-03 | 67.9 | 51.07 | 1.32955 |
| demo_TURBINE-2 | 2022-04-04 | 63.97 | 51.46 | 1.2431 |
We now define our feature set, using the srcBatch parameter for the lambda.
columns = {
'subject': 'string',
'timestamp': 'datetime',
'energyNear2xRpm': 'double',
'energyNear1xRpm': 'double',
'energyRatio': 'double'
}
vibration_batch_fs = c3.Feature.Set(id='WindTurbine#vibrationBatchFeatureSet',
name="vibrationBatchFeatureSet",
subjectType=c3.WindTurbine,
columns=columns,
srcBatch=c3.Lambda.fromPyFunc(compute_vibration_features_batch, actionRequirement="py-data")
).merge().get()
vibration_batch_fs{
"type" : "Feature.Set",
"id" : "WindTurbine#vibrationBatchFeatureSet",
"name" : "vibrationBatchFeatureSet",
"meta" : {
"appCode" : 1796811345014444398,
"env" : "plat88236",
"app" : "dsquickstart",
"created" : "2024-04-20T01:23:16Z",
"createdBy" : "Jeff.Fischer@c3.ai",
"updated" : "2024-04-20T01:23:16Z",
"updatedBy" : "Jeff.Fischer@c3.ai",
"timestamp" : "2024-04-20T01:23:16Z",
"fetchInclude" : "[]",
"fetchType" : "Feature.Set"
},
"version" : 1,
"subjectType" : "WindTurbine",
"srcBatch" : {
"type" : "Lambda<function(batch: any, spec: any, fs: any): any>",
"language" : "Python",
"implementation" : "def compute_vibration_features_batch(batch, spec, fs):\n import pandas as pd\n MAX_BATCH_SIZE = 10\n if len(batch) > MAX_BATCH_SIZE:\n raise Exception(f'Should not use this approach for large batches, reeduce the batch size from {len(batch)} subjects to a value below {MAX_BATCH_SIZE}')\n start = spec.start\n end = spec.end\n lookback_days = 7\n lookback_start = start.minusDays(lookback_days)\n filter = c3.Filter.intersects('asset', [subject.id for subject in batch])\n filter = filter.and_(c3.Filter.ge('measurementDate', lookback_start))\n filter = filter.and_(c3.Filter.lt('measurementDate', end))\n df = c3.WindTurbineVibration.eval(filter=filter, projection='asset.id, measurementDate, energyNear2xRpm, energyNear1xRpm', order='asset.id, measurementDate', limit=-1)\n df = df.to_pandas()\n if len(df) == 0:\n print('No data for batch')\n return c3.Feature.CustomMaterializationResult(data=pd.DataFrame())\n df = df.rename(columns={'asset_id': 'subject', 'measurementDate': 'timestamp'})\n df = df.groupby('subject').resample('1D', on='timestamp').mean().fillna(method='ffill').reset_index()\n df['energyRatio'] = df['energyNear2xRpm'] / df['energyNear1xRpm']\n df = df[df['timestamp'] >= start]\n return c3.Feature.CustomMaterializationResult(data=df)",
"actionRequirement" : "py-data"
},
"columns" : {
"subject" : "string",
"timestamp" : "datetime",
"energyNear2xRpm" : "double",
"energyNear1xRpm" : "double",
"energyRatio" : "double"
}
}We can now materialize and evaluate our feature set. We will do a full materialization of this feature set (no time range specified).
vibration_batch_fs.materialize(sync=True)
c3.WindTurbine.evalFeatureSetBatch(filter="id=='demo_TURBINE-1'",
featureSet=vibration_batch_fs)| subject | timestamp | energyNear2xRpm | energyNear1xRpm | energyRatio |
|---|---|---|---|---|
| demo_TURBINE-1 | 2022-01-01 | 43.30 | 57.96 | 0.747067 |
| demo_TURBINE-1 | 2022-01-02 | 59.44 | 36.77 | 1.616535 |
| demo_TURBINE-1 | 2022-01-03 | 59.49 | 53.17 | 1.118864 |
| demo_TURBINE-1 | 2022-01-04 | 54.67 | 25.52 | 2.142241 |
| demo_TURBINE-1 | 2022-01-05 | 62.59 | 43.97 | 1.423471 |
| ... | ... | ... | ... | ... |
| demo_TURBINE-1 | 2022-05-28 | 66.05 | 37.97 | 1.739531 |
| demo_TURBINE-1 | 2022-05-29 | 74.18 | 38.77 | 1.913335 |
| demo_TURBINE-1 | 2022-05-30 | 64.93 | 56.02 | 1.159050 |
| demo_TURBINE-1 | 2022-05-31 | 41.27 | 35.88 | 1.150223 |
| demo_TURBINE-1 | 2022-06-01 | 43.48 | 31.71 | 1.371176 |
Querying from a partitioned table
For C3 data models built for metrics, there usually is a "time series header" entity between the subject type and the actual time series data. For example, wind turbine sensor measurement data is stored in WindTurbineMeasurement. It is not a direct child of WindTurbine. Instead, there is a WindTurbineMeasurementSeries entity in between. Futhermore, the WindTurbineMeasurement table is partitioned by the associated WindTurbineMeasurementSeries. This means that, unless we are retrieving all of the entries in WindTurbineMeasurements, we need to include the parent's ID in our filter.
We know that there is only one measurement series row per turbine, so we just need to find the ID of that measurement series associated with the requested turbine. From there, we can call WindTurbineMeasurement.eval() with the specified parent series.
def compute_turbine_features(subject, spec, fs):
import pandas as pd
# first get the series id
series_list = c3.WindTurbineMeasurementSeries.fetch(include='id', filter=c3.Filter.eq("windTurbine.id", subject.id)).objs
if len(series_list)==0:
print(f"No measurement series for subject {subject.id}")
return c3.Feature.CustomMaterializationResult(data=pd.DataFrame())
assert len(series_list)==1, f"Expecting exactly one time series for wind turbine {subject.id}, but found {len(series_list)}"
series_id = series_list[0].id
# Get the time range from the spec.
start = spec.start
end = spec.end
# We adjust the start time of our query to account for forward filling (used below) and rolling/windowed
# computations (also used below). This is needed for incremental materialization,
# where we will only be materializing a time slize of the data
lookback_hours = 96 # the longest window we will use in our feature calculations below
lookback_start = start.minusHours(lookback_hours)
# Build a filter that will retrieve measurement data for the specified turbine and time range.
# Note that we filter on parent, which is the header ID.
filter = c3.Filter.eq('parent', series_id)
filter = filter.and_(c3.Filter.ge('start', lookback_start))
filter = filter.and_(c3.Filter.lt('start', end))
# retrieve the raw data, convert to pandas, and rename the timestamp column
df = c3.WindTurbineMeasurement.eval(filter=filter,
projection='start, activePower, gearOilTemperature',
limit=-1 # make sure we don't stop at the first 2000 records!
)
df = df.to_pandas() # for the rest of the feature engineering, we use Pandas directly
if len(df)==0:
print(f"No data for subject {subject.id}")
return c3.Feature.CustomMaterializationResult(data=pd.DataFrame()) # handle case for no data for this subject
df = df.rename(columns={'start':'timestamp'})
# Sort by timestamp and then aggregate to an hourly interval and fill missing values
df.sort_values(by='timestamp', axis='index', ignore_index=True, inplace=True)
df = df.resample(rule='H', on='timestamp').mean().reset_index().fillna(method='ffill')
# compute the actual features
sensors = ["activePower", "gearOilTemperature"]
for sensor in sensors:
# Create a diff column smoothed by rolling max
df[sensor + '_diff'] = df[sensor].diff().rolling(window=24, min_periods=1).max()
# Create a rolling standard deviation column
df[sensor + '_rolling_std'] = df[sensor].rolling(window=24, min_periods=1).std()
# Create a rolling mean column
df[sensor + '_rolling_mean'] = df[sensor].rolling(window=24, min_periods=1).mean()
# Create a deviation from a rolling median column
df[sensor + '_median_deviation'] = (df[sensor] - df[sensor].rolling(window=96, min_periods=1).median()).abs()
# we queried for lookback_hours extra hours before the start. We now reduce the range to the original requested range.
df = df.loc[df['timestamp']>=start].reset_index(drop=True)
# add a subject column
subjects = pd.Series([subject.id]*len(df)).astype('string')
df.insert(0, 'subject', subjects)
df['timestamp'] = pd.to_datetime(df['timestamp'])
return c3.Feature.CustomMaterializationResult(data=df)Now we can define a lambda feature set, as before.
columns = {
'subject': 'string',
'timestamp': 'datetime',
'activePower': 'double',
'gearOilTemperature': 'double',
'activePower_diff': 'double',
'activePower_rolling_std': 'double',
'activePower_rolling_mean': 'double',
'activePower_median_deviation': 'double',
'gearOilTemperature_diff': 'double',
'gearOilTemperature_rolling_std': 'double',
'gearOilTemperature_rolling_mean': 'double',
'gearOilTemperature_median_deviation': 'double'
}
measurement_fs = c3.Feature.Set(id='WindTurbine#measurementFeatureSet',
name="measurementFeatureSet",
subjectType=c3.WindTurbine,
columns=columns,
src=c3.Lambda.fromPyFunc(compute_turbine_features,"py-data")
).merge().get()
measurement_fs{
"type" : "Feature.Set",
"id" : "WindTurbine#measurementFeatureSet",
"name" : "measurementFeatureSet",
"meta" : {
"appCode" : 1796811345014444398,
"env" : "plat88236",
"app" : "dsquickstart",
"created" : "2024-04-20T01:23:19Z",
"createdBy" : "Jeff.Fischer@c3.ai",
"updated" : "2024-04-20T01:23:19Z",
"updatedBy" : "Jeff.Fischer@c3.ai",
"timestamp" : "2024-04-20T01:23:19Z",
"fetchInclude" : "[]",
"fetchType" : "Feature.Set"
},
"version" : 1,
"subjectType" : "WindTurbine",
"src" : {
"type" : "Lambda<function(subject: any, spec: any, fs: any): any>",
"language" : "Python",
"implementation" : "def compute_turbine_features(subject, spec, fs):\n import pandas as pd\n series_list = c3.WindTurbineMeasurementSeries.fetch(include='id', filter=c3.Filter.eq('windTurbine.id', subject.id)).objs\n if len(series_list) == 0:\n print(f'No measurement series for subject {subject.id}')\n return c3.Feature.CustomMaterializationResult(data=pd.DataFrame())\n assert len(series_list) == 1, f'Expecting exactly one time series for wind turbine {subject.id}, but found {len(series_list)}'\n series_id = series_list[0].id\n start = spec.start\n end = spec.end\n lookback_hours = 96\n lookback_start = start.minusHours(lookback_hours)\n filter = c3.Filter.eq('parent', series_id)\n filter = filter.and_(c3.Filter.ge('start', lookback_start))\n filter = filter.and_(c3.Filter.lt('start', end))\n df = c3.WindTurbineMeasurement.eval(filter=filter, projection='start, activePower, gearOilTemperature', limit=-1)\n df = df.to_pandas()\n if len(df) == 0:\n print(f'No data for subject {subject.id}')\n return c3.Feature.CustomMaterializationResult(data=pd.DataFrame())\n df = df.rename(columns={'start': 'timestamp'})\n df.sort_values(by='timestamp', axis='index', ignore_index=True, inplace=True)\n df = df.resample(rule='H', on='timestamp').mean().reset_index().fillna(method='ffill')\n sensors = ['activePower', 'gearOilTemperature']\n for sensor in sensors:\n df[sensor + '_diff'] = df[sensor].diff().rolling(window=24, min_periods=1).max()\n df[sensor + '_rolling_std'] = df[sensor].rolling(window=24, min_periods=1).std()\n df[sensor + '_rolling_mean'] = df[sensor].rolling(window=24, min_periods=1).mean()\n df[sensor + '_median_deviation'] = (df[sensor] - df[sensor].rolling(window=96, min_periods=1).median()).abs()\n df = df.loc[df['timestamp'] >= start].reset_index(drop=True)\n subjects = pd.Series([subject.id] * len(df)).astype('string')\n df.insert(0, 'subject', subjects)\n df['timestamp'] = pd.to_datetime(df['timestamp'])\n return c3.Feature.CustomMaterializationResult(data=df)",
"actionRequirement" : "py-data"
},
"columns" : {
"subject" : "string",
"timestamp" : "datetime",
"activePower" : "double",
"gearOilTemperature" : "double",
"activePower_diff" : "double",
"activePower_rolling_std" : "double",
"activePower_rolling_mean" : "double",
"activePower_median_deviation" : "double",
"gearOilTemperature_diff" : "double",
"gearOilTemperature_rolling_std" : "double",
"gearOilTemperature_rolling_mean" : "double",
"gearOilTemperature_median_deviation" : "double"
}
}Finally, we materialize and evaluate the features.
measurement_fs.materialize(sync=True)
c3.WindTurbine.evalFeatureSetBatch(filter="id=='demo_TURBINE-1'",
featureSet=measurement_fs)| subject | timestamp | activePower | gearOilTemperature | activePower_diff | activePower_rolling_std | activePower_rolling_mean | activePower_median_deviation |
|---|---|---|---|---|---|---|---|
| demo_TURBINE-1 | 2022-01-01 00:00:00 | 5018.0 | 36.4 | nan | nan | 5018.000000 | 0.00 |
| demo_TURBINE-1 | 2022-01-01 01:00:00 | 5018.0 | 36.4 | 0.0 | 0.000000 | 5018.000000 | 0.00 |
| demo_TURBINE-1 | 2022-01-01 02:00:00 | 5474.0 | 46.8 | 456.0 | 263.271723 | 5170.000000 | 456.00 |
| demo_TURBINE-1 | 2022-01-01 03:00:00 | 5474.0 | 46.8 | 456.0 | 263.271723 | 5246.000000 | 228.00 |
| demo_TURBINE-1 | 2022-01-01 04:00:00 | 5474.0 | 46.8 | 456.0 | 249.761486 | 5291.600000 | 0.00 |
| ... | ... | ... | ... | ... | ... | ... | ... |
| demo_TURBINE-1 | 2022-05-31 17:00:00 | 6981.0 | 57.3 | 1515.0 | 412.317641 | 5749.326389 | 1427.00 |
| demo_TURBINE-1 | 2022-05-31 18:00:00 | 6981.0 | 57.3 | 1515.0 | 471.142840 | 5819.451389 | 1427.00 |
| demo_TURBINE-1 | 2022-05-31 19:00:00 | 6981.0 | 57.3 | 1515.0 | 527.284374 | 5869.076389 | 1427.00 |
| demo_TURBINE-1 | 2022-05-31 20:00:00 | 6096.0 | 36.2 | 1515.0 | 529.083267 | 5881.243056 | 525.25 |
| demo_TURBINE-1 | 2022-05-31 21:00:00 | 6033.0 | 45.4 | 1515.0 | 521.487772 | 5905.826389 | 437.75 |
(truncated rightmost 4 columns)