C3 AI Documentation Home

MlTemplate Pipe Tutorial

Introduction

This tutorial will cover the basics of developing MlTemplate pipes and is geared towards more development-focused Data Scientists. It will also cover some common scenarios and examples highlighting the power and expressivity of MlTemplate pipes.

What is an MlTemplate?

MlTemplate is a type namespace for key pipe types MlTemplate.AtomicPipe and MlTemplate.Pipeline. These types mirror their sibling types MlAtomicPipe and MlPipeline, respectively. In other words, the user of a MlTemplate.AtomicPipe can use it exactly like any other MlAtomicPipe. In fact, we have MlTemplate.AtomicPipe extends MlAtomicPipe. The same is true for the MlTemplate.Pipeline and MlPipeline pair. The "template" aspect allows for more flexibility, dynamism, and therefore expressability, as we will see in this tutorial.

When should I use an MlTemplate?

Here are just a few examples of situations where an MlTemplate would be advantageous over a non-template MlPipe:

  • explicitly model the hyperparameters of a native machine learning model or network architecture

  • support a curated experience for instantiating complicated and pre-built pipes

  • implement "use-case" pipes that can seamlessly switch between library implementations

  • implement ensemble patterns in pipelines

  • implement pipe customizations within pipelines

MlTemplate.AtomicPipe

Example 1 - Keras Convolution Filters

In this section, we will go over an example of implementing a basic CNN in keras. We provide two sample implementations - one referencing the platform-provided KerasPipe and one built from scratch.

Python
def random_training_data():
    import numpy as np
    x = np.random.randn(10, 7, 7, 1)
    y = np.random.randint(0, 10, size=(10, 1))
    return c3.Data.from_numpy(x), c3.Data.from_numpy(y)

train_x, train_y = random_training_data()

Sample Implementation 1

TutorialKerasConvPipe.c3typ

Type
/**
 * A basic Keras convolutional neural network for testing the tutorial `ML-Template-Pipe`.
 */
type TutorialKerasConvPipe extends MlTemplate.AtomicPipe<Data, Data, Data> type key "TKCP" {

  /**
   * Use this to tune the number of filters in the underlying CNN.
   */
  @ML(hyperparameter=true)
  numFilters: int = 16

  generatePipe: ~ py
}

TutorialKerasConvPipe.py

Python
def generatePipe(this):
    from tensorflow import keras
    from tensorflow.keras import layers

    model = keras.Sequential(
        [
            keras.Input(shape=(7, 7, 1), name="features"),
            layers.Conv2D(this.numFilters, kernel_size=(3, 3), activation="relu"),
            layers.Flatten(),
            layers.Dropout(0.5),
            layers.Dense(10, activation="softmax", name="target"),
        ]
    )
    model.compile(loss="sparse_categorical_crossentropy", optimizer="adam", metrics=["accuracy"])

    train_args = c3.KerasFitArgs(epochs=5, batch_size=8).withDefaults()
    return c3.KerasPipe.convert(model, spec=c3.KerasConvertSpec(trainArgs=train_args))

This implementation represents the bare minimum for an MlTemplate.AtomicPipe - the developer creates a single field and provides the mandatory implementation for generatePipe. The API pipe returned by generatePipe is used by MlTemplate.AtomicPipe prior to train, process (etc...) in order to generate the actual pipe to call train, process (etc...) on. In this sense, a developer could view their MlTemplate.AtomicPipe as simply a proxy to another dynamically generated MlAtomicPipe.

In this particular example, the generatePipe function generates a different keras native model architecture depending on the user-providable field numFilters on TutorialKerasConvPipe.

Python
untrained_keras_10 = c3.TutorialKerasConvPipe(numFilters=10)
trained_keras_10 = untrained_keras_10.train(train_x, train_y).result()
type(trained_keras_10)
Python
trained_keras_10.nativeModel().summary()
Text
Model: "sequential"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
=================================================================
conv2d (Conv2D)              (None, 5, 5, 10)          100       
_________________________________________________________________
flatten (Flatten)            (None, 250)               0         
_________________________________________________________________
dropout (Dropout)            (None, 250)               0         
_________________________________________________________________
target (Dense)               (None, 10)                2510      
=================================================================
Total params: 2,610
Trainable params: 2,610
Non-trainable params: 0
_________________________________________________________________

Note that the native keras model indeed has 10 filters, as we specified, instead of the default 16! Since we annotated numFilters as a hyperparameter, we can change it using hyperparameter APIs, as well as use it in hyperparameter tuning. Note also that from the user's perspective, they are still interacting with a TutorialKerasConvPipe, even after training. They need not be concerned with the details of how the developer specialized generatePipe.

Python
untrained_keras_8 = untrained_keras_10.withHyperparams({'numFilters': 8})
trained_keras_8 = untrained_keras_8.train(train_x, train_y).result()
type(trained_keras_8)
Text
WARNING:tensorflow:Please add `keras.layers.InputLayer` instead of `keras.Input` to Sequential model. `keras.Input` is intended to be used by Functional model.
INFO:tensorflow:Assets written to: /tmp/tmp26flvb0i/assets


/c3/platform/src/data/graph/FlowGraph.Authoring.py:76: UserWarning: Vertex with no name found. Auto-generating unique name.
  warnings.warn(UserWarning("Vertex with no name found. Auto-generating unique name."))





TutorialKerasConvPipe
Python
trained_keras_8.nativeModel().summary()
Text
Model: "sequential"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
=================================================================
conv2d (Conv2D)              (None, 5, 5, 8)           80        
_________________________________________________________________
flatten (Flatten)            (None, 200)               0         
_________________________________________________________________
dropout (Dropout)            (None, 200)               0         
_________________________________________________________________
target (Dense)               (None, 10)                2010      
=================================================================
Total params: 2,090
Trainable params: 2,090
Non-trainable params: 0
_________________________________________________________________

Sample Implementation 2

TutorialKerasConvPipe2.c3typ

Type
/**
 * A basic Keras convolutional neural network for testing the tutorial `ML-Template-Pipe`.
 */
type TutorialKerasConvPipe2 extends MlTemplate.AtomicPipe<Data, Data, Data> type key "TKCPP" {

  /**
   * Use this to tune the number of filters in the underlying CNN.
   */
  @ML(hyperparameter=true)
  numFilters: int = 16

  generatePipe: ~ py

  doTrain: ~ py
  doProcess: ~ py
  saveModel: ~ py
  loadModel: ~ py
  nativeInputFormats: ~ py
}

TutorialKerasConvPipe2.py

Python
def generatePipe(this):
    from tensorflow import keras
    from tensorflow.keras import layers

    model = keras.Sequential(
        [
            keras.Input(shape=(7, 7, 1), name="features"),
            layers.Conv2D(this.numFilters, kernel_size=(3, 3), activation="relu"),
            layers.Flatten(),
            layers.Dropout(0.5),
            layers.Dense(10, activation="softmax", name="target"),
        ]
    )
    model.compile(loss="sparse_categorical_crossentropy", optimizer="adam", metrics=["accuracy"])

    return this.withModelFiles(this.saveNativeModel(model))


def doTrain(this, beforeTrainResults, spec=None):
    model, x, y = beforeTrainResults['model'], beforeTrainResults['x'], beforeTrainResults['y']
    model.fit(x, y)
    return {'model': model}


def doProcess(this, beforeProcessOutputs, spec=None):
    model, x = beforeProcessOutputs['model'], beforeProcessOutputs['x']
    return {'out': model.predict(x)}


def saveModel(this, modelDir, model):
    model.save(modelDir)


def loadModel(this, modelDir):
    import tensorflow as tf
    return tf.keras.models.load_model(modelDir)


def nativeInputFormats(this):
    return [c3.MlNativeDataFormat.NUMPY]

This alternate implementation shows an example of implementing an MlTemplate.AtomicPipe from scratch. One subtle but very important difference to note is that the generatePipe function in this implementation returns an instance of self (i.e. TutorialKerasConvPipe2), instead of a different pipe KerasPipe as in the first implementation. This means that the developer must implement their own doProcess at a minimum and optionally implement doTrain (for trainable pipes), save/loadModel (for custom model serialization logic), nativeInputFormats (pandas is the default). From the user's perspective, both of these keras examples have the exact same behavior.

Also note the saveNativeModel helper function for uploading the native model as files. This is the recommended way to save the native model during generatePipe.

Python
untrained_keras2_10 = c3.TutorialKerasConvPipe2(numFilters=10)
type(untrained_keras2_10)
Text
TutorialKerasConvPipe2
Python
untrained_keras2_10.nativeModel().summary()
Text
WARNING:tensorflow:Please add `keras.layers.InputLayer` instead of `keras.Input` to Sequential model. `keras.Input` is intended to be used by Functional model.
INFO:tensorflow:Assets written to: /tmp/tmp32ytk39u/assets
Model: "sequential_2"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
=================================================================
conv2d_2 (Conv2D)            (None, 5, 5, 10)          100       
_________________________________________________________________
flatten_2 (Flatten)          (None, 250)               0         
_________________________________________________________________
dropout_2 (Dropout)          (None, 250)               0         
_________________________________________________________________
target (Dense)               (None, 10)                2510      
=================================================================
Total params: 2,610
Trainable params: 2,610
Non-trainable params: 0
_________________________________________________________________
Python
untrained_keras2_8 = untrained_keras2_10.withHyperparams({'numFilters': 8})
type(untrained_keras2_8)
Text
TutorialKerasConvPipe2
Python
untrained_keras2_8.nativeModel().summary()
Text
WARNING:tensorflow:Please add `keras.layers.InputLayer` instead of `keras.Input` to Sequential model. `keras.Input` is intended to be used by Functional model.
INFO:tensorflow:Assets written to: /tmp/tmpmw1zm3ri/assets
Model: "sequential_3"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
=================================================================
conv2d_3 (Conv2D)            (None, 5, 5, 8)           80        
_________________________________________________________________
flatten_3 (Flatten)          (None, 200)               0         
_________________________________________________________________
dropout_3 (Dropout)          (None, 200)               0         
_________________________________________________________________
target (Dense)               (None, 10)                2010      
=================================================================
Total params: 2,090
Trainable params: 2,090
Non-trainable params: 0
_________________________________________________________________

[Advanced Tip] Comparing the Implementations

The first approach is simpler for developers, since it abstracts much of the keras integration to the platform-provided KerasPipe. Generally, if there exists an MlAtomicPipe that implements your desired library, it is recommended to reference that in your MlTemplate.AtomicPipe. However, if there does not exist such an implementation or the implementation is insufficient, then the second implementation approach should be used. When considering the second implementation approach, you should still consider whether it would be cleaner for your application to abstract the library integration from the template interface. You could accomplish this by creating your own MlAtomicPipe for the library integration and referencing that within a separate MlTemplate.AtomicPipe.

Generally, a developer should create an MlAtomicPipe when they intend for the user to directly author the native model of a library (via the convert API) and use MlTemplate.AtomicPipe when curating a strongly typed C3 interface for the user (via the generatePipe API). One situation to curate a strongly typed interface is when the developer wants to restrict the model architecture.

In this example, KerasPipe represents the MlAtomicPipe and supports convert directly on a user-created native compiled keras model. In contrast, TutorialKerasConvPipe represents an MlTemplate.AtomicPipe that a user only needs to specify numFilters on.

Example 2 - Linear Regression, with Multiple Libraries

In this section, we show an example of implementing a "use-case" pipe. Here, we pretend that we have users that care about linear regression as an ML task, but either do not care about the library implementation or would like the flexibility to try several.

Python
def random_lr_data():
    import pandas as pd
    import numpy as np
    x = pd.DataFrame({"a": np.random.randint(0, 10, size=(10)), "b": np.random.randint(0, 10, size=(10))})
    y = pd.DataFrame({"c": np.random.randint(0, 10, size=(10))})
    return c3.Data.from_pandas(x), c3.Data.from_pandas(y)

lr_x, lr_y = random_lr_data()

TutorialLinearRegPipe.c3typ

Type
/**
 * A basic linear regression "Use Case" pipe for testing the tutorial `ML-Template-Pipe`.
 */
type TutorialLinearRegPipe extends MlTemplate.AtomicPipe<Data, Data, Data> type key "TLRP" {

  /**
   * Set this enumeration to change the library implementation for linear regression.
   */
  @ML(hyperparameter=true)
  library: string enum("sklearn", "keras") = "sklearn"

  generatePipe: ~ py
}

TutorialLinearRegPipe.py

Python
def generatePipe(this):
    if this.library == "sklearn":
        from sklearn.linear_model import LinearRegression
        lr = LinearRegression()
        return c3.SklearnPipe.convert(lr)
    elif this.library == "keras":
        from tensorflow import keras
        from tensorflow.keras import layers

        model = keras.Sequential([
            layers.Dense(1, use_bias=True, input_shape=(2,))
        ])
        model.compile(loss="mse", optimizer="adam", metrics=["mse"])

        train_args = c3.KerasFitArgs(epochs=5, batch_size=8).withDefaults()
        return c3.KerasPipe.convert(model, spec=c3.KerasConvertSpec(trainArgs=train_args))
    raise ValueError(f"Unrecognized library {this.library}")

Note that users of this pipe have a simplified experience when specifying how they want to do linear regression - they need only change the library field. In addition, a developer can add more libraries in the future! In this scenario, TutorialLinearRegPipe could be a great entry point for more hands-off data scientists. Note that, similar to Example 1, we could have implemented this in a number of ways. For simplicity, we chose to reference SklearnPipe and KerasPipe instead of reimplementing the library integration.

Python
untrained_lr_sklearn = c3.TutorialLinearRegPipe(library="sklearn")
trained_lr_sklearn = untrained_lr_sklearn.train(lr_x, lr_y).result()
type(trained_lr_sklearn)
Text
Default process function for LinearRegression set to predict()


/c3/platform/src/data/graph/FlowGraph.Authoring.py:76: UserWarning: Vertex with no name found. Auto-generating unique name.
  warnings.warn(UserWarning("Vertex with no name found. Auto-generating unique name."))





TutorialLinearRegPipe
Python
type(trained_lr_sklearn.nativeModel())
Text
sklearn.linear_model._base.LinearRegression
Python
untrained_lr_keras = c3.TutorialLinearRegPipe(library="keras")
trained_lr_keras = untrained_lr_keras.train(lr_x, lr_y).result()
type(trained_lr_keras)
Text
INFO:tensorflow:Assets written to: /tmp/tmpy9s2mgrp/assets


/c3/platform/src/data/graph/FlowGraph.Authoring.py:76: UserWarning: Vertex with no name found. Auto-generating unique name.
  warnings.warn(UserWarning("Vertex with no name found. Auto-generating unique name."))





TutorialLinearRegPipe
Python
type(trained_lr_keras.nativeModel())
Text
tensorflow.python.keras.engine.sequential.Sequential

[Advanced Tip] Pretrained Models for an ML task

For some ML tasks, there could exist a number of pretrained models for that task. When integrating a pretrained model, there is a step to download the pretrained model from a publicly available repository's endpoint. While it may be tempting to implement the model download in the doProcess of a pipe, it is better to implement the model download as part of doTrain. This will save the downloaded model to C3 and ensure that the pipe's process uses the exact same model each time. In other words, it guards against changes in the model hosted at the public endpoint. Similarly, this would also protect against any outages of the public repository, when the pipe is deployed in a production setting.

MlTemplate.Pipeline

Example 3 - Ensemble Pipeline

In this section, we will go over an example of implementing a complex ensemble pipeline, using a simple "voter" pattern.

TutorialEnsemblePipeline.c3typ

Type
/**
 * A basic ensemble pipeline for testing the tutorial `ML-Template-Pipe`. Implements a voter pattern across the user-provided
 * voter pipes and aggregator pipe.
 */
type TutorialEnsemblePipeline<DO> extends MlTemplate.Pipeline<Data, Data, void, DO, void> type key 'TENP' {

  /**
   * Specify a list of pipes to use as voters in the ensemble. The result will be aggregated by `#aggregator`.
   * This pipeline will return, as output, the output of each voter, as well as an additional output representing the
   * aggregation.
   */
  voters: ![MlPipe]

  /**
   * Specify a pipe to aggregate the votes from the voters.
   */
  aggregator: !MlPipe

  generatePipeline: ~ py
  generateTypeBindings: ~ py
}

TutorialEnsemblePipeline.py

Python
def generatePipeline(this):
    this = this.getMissing({'include': 'this, voters.this, aggregator.this'})

    mla = c3.MlPipeline.Authoring
    x, y = mla.var(), mla.var()

    voter_outputs = {}
    for i, voter in enumerate(this.voters):
        voter = voter.getMissing({'include': 'this'})
        voter_outputs[f"voter_out_{i}"] = voter(x, y)

    aggregator = this.aggregator.getMissing({'include': 'this'})
    aggregated = aggregator(voter_outputs)

    return mla.pipeline(x=x, y=y, out={**voter_outputs, "aggregated": aggregated})


def generateTypeBindings(this):
    do_str = f'{{{", ".join([f"voter_out_{i}: Data" for i in range(len(this.voters))])}, aggregated: Data}}'
    return c3.MlPipe.Bindings(do=c3.ValueType.fromString(do_str))

There are a couple of things to note in the MlTemplate.Pipeline implementation. First, the pipeline is highly customizable, in ways that are more difficult to achieve with a vanilla MlPipeline. The voters field allows users to fully specify which sub-models (i.e. MlPipe) to use for the ensemble. Note this can be variably sized! The aggregators field allows users to fully customize their aggregator. Second, the pipeline implements the optional method generateTypeBindings. This API exists for both MlTemplate.Pipeline and MlTemplate.AtomicPipe, and it must be overridden when the input or output bindings of the MlTemplate is dynamically determined. In Examples 1 and 2, the inputs and outputs were all Data, so there was no need to override generateTypeBindings. In contrast, for this example, note how we have the DO (i.e. the output) binding unbound on the type declaration. This binding is populated in the generateTypeBindings implementation, and it is necessary because the number of outputs is indeterminate until the user provides the list of voters.

To demonstrate this pipe, we will use the different flavors of linear regression implemented in Example 2 as the voters and an MlCustomPipe for the aggregator.

Python
voter_template_sklearn = c3.TutorialLinearRegPipe(library="sklearn").upsert()
voter_template_keras = c3.TutorialLinearRegPipe(library="keras").upsert()
Python
def average(x, spec):
    import pandas as pd
    votes = pd.concat([data.reset_index(drop=True) for data in x.values()], axis='columns')
    return pd.DataFrame(votes.mean(axis='columns'))

aggregator_avg = c3.MlCustomPipe.from_py(average, xType="map<string, Data>", name="Average")
Python
untrained_pipeline_2v = c3.TutorialEnsemblePipeline(voters=[voter_template_sklearn, voter_template_keras], aggregator=aggregator_avg)
trained_pipeline_2v = untrained_pipeline_2v.train(lr_x, lr_y).result()
type(trained_pipeline_2v)
Text
Default process function for LinearRegression set to predict()
INFO:tensorflow:Assets written to: /tmp/tmp91vcs7v4/assets


/c3/platform/src/data/graph/FlowGraph.Authoring.py:76: UserWarning: Vertex with no name found. Auto-generating unique name.
  warnings.warn(UserWarning("Vertex with no name found. Auto-generating unique name."))





TutorialEnsemblePipeline<{ voter_out_0: Data, voter_out_1: Data, aggregated: Data }>

Note: aggregated is the average of the two voter results, as we specified

Python
trained_pipeline_2v.visualize(static=True)

Note: be careful to always upsert your pipes before associating them to another pipe!

Python
process_result_2v = trained_pipeline_2v.process(lr_x).result()
Python
import pandas as pd
result_df = pd.concat([x.to_pandas() for x in process_result_2v], axis='columns')
result_df.columns = process_result_2v.fieldNames()
result_df
voter_out_0voter_out_1aggregated
04.720610-4.6110270.054792
16.254921-4.6612690.796826
25.938139-5.7210100.108565
35.938139-5.7210100.108565
41.3466350.3914100.869022
52.716841-3.169550-0.226355
65.506850-4.9425200.282165
74.720610-4.6110270.054792
86.331259-5.8867570.222251
92.525995-0.1058301.210082

Here is the same thing, except with more voters and a different aggregation function. Note the difference in the visualization, as well as the difference in the type bindings.

Python
from sklearn.linear_model import LinearRegression
lr = LinearRegression()
voter_sklearn = c3.SklearnPipe.convert(lr).upsert()
Text
Default process function for LinearRegression set to predict()
Python
def maximum(x, spec):
    import pandas as pd
    votes = pd.concat([data.reset_index(drop=True) for data in x.values()], axis='columns')
    return pd.DataFrame(votes.max(axis='columns'))

aggregator_max = c3.MlCustomPipe.from_py(maximum, xType="map<string, Data>", name="Maximum")
Python
untrained_pipeline_3v = c3.TutorialEnsemblePipeline(voters=[voter_template_sklearn, voter_template_keras, voter_sklearn], aggregator=aggregator_max)
type(untrained_pipeline_3v)
Text
TutorialEnsemblePipeline<{ voter_out_0: Data, voter_out_1: Data, voter_out_2: Data, aggregated: Data }>
Python
untrained_pipeline_3v.visualize(static=True)
Text
Default process function for LinearRegression set to predict()
INFO:tensorflow:Assets written to: /tmp/tmpc4omk2f8/assets


/c3/platform/src/data/graph/FlowGraph.Authoring.py:76: UserWarning: Vertex with no name found. Auto-generating unique name.
  warnings.warn(UserWarning("Vertex with no name found. Auto-generating unique name."))
Python
trained_pipeline_3v = untrained_pipeline_3v.train(lr_x, lr_y).result()
process_result_3v = trained_pipeline_3v.process(lr_x).result()
Text
Default process function for LinearRegression set to predict()
INFO:tensorflow:Assets written to: /tmp/tmp_7nv74yd/assets


/c3/platform/src/data/graph/FlowGraph.Authoring.py:76: UserWarning: Vertex with no name found. Auto-generating unique name.
  warnings.warn(UserWarning("Vertex with no name found. Auto-generating unique name."))
Python
import pandas as pd
result_df = pd.concat([x.to_pandas() for x in process_result_3v], axis='columns')
result_df.columns = process_result_3v.fieldNames()
result_df
voter_out_0voter_out_1voter_out_2aggregated
04.72061-4.6110274.720614.72061
16.254921-4.6612696.2549216.254921
25.938139-5.721015.9381395.938139
35.938139-5.721015.9381395.938139
41.3466350.391411.3466351.346635
52.716841-3.169552.7168412.716841
65.50685-4.942525.506855.50685
74.72061-4.6110274.720614.72061
86.331259-5.8867576.3312596.331259
92.525995-0.105832.5259952.525995

See also

Was this page helpful?