C3 AI Documentation Home

Orchestrate Workflows

Workflows are useful for automating a series of tasks. The primary role of creating a workflow in the context of machine learning (ML) operations is to provide an intermediate executable representation of a ML pipeline.

Workflow syntax

A workflow is a directed acyclic graph (DAG) that could be disconnected and is a representation of the FlowGraph Type where FlowGraph edges are attached to vertices at specific ports, represented by the FlowGraph.Port Type. As a port graph, multiple edges can be linked between any two vertices of a workflow representation with specific names to direct numerous inputs and outputs.

Workflow vertices

A workflow can perform complex tasks by breaking them into smaller discrete steps. These discrete steps are vertices chained together to form a workflow.

You can assemble workflow vertices to accomplish a complex task or action. The vertices are required to run in a particular order specified by the directed edges between vertices. A single vertex can use the output from previous vertices as input for its execution.

You can connect vertices by using directed edges between ports, and a single vertex can connect to more than one port.

Action-based vertex

Action-based vertices perform a single action.

Depending on your choice of WorkflowExecutionSpec#executor, you can configure the workflow execution to be synchronous or asynchronous. The default option is asynchronous, which means that when a workflow executes an action, it queues the action to the ActionQueue and waits for its completion. C3 AI currently supports only one asynchronous executor, which uses the invalidation queues for orchestration. If you use a synchronous workflow, it executes tasks in order, with one task finishing before moving on to the next.

Upon completion, the workflow advances to the next vertices, if any.

An example of an action-based vertex is the Type Workflow.Action, a Workflow.Vertex that represents an ActionRef.

Input and output ports

There are direct inputs and outputs, represented by input and output ports, respectively:

  • Use $success to represent an output port whose role is to send a Workflow.Signal. You can connect it to a $start input port of a receiving vertex to indicate a relationship.

  • Use $start to represent an input port whose role is to signal through a Workflow.Signal. It can be connected to a $success output port of the previous vertex, indicating an implicit relationship.

    • Use an error as an output port if the input data has as an Error. You can use the error output as input for subsequent steps for custom error handling.
  • Use the $ prefix naming convention to distinguish control flow related ports from data flow related ports.

    NOTE: Refer to the signal for more information.

Workflow execution

The Workflow#start method handles workflow execution. Each time you execute a workflow, it can create a different Workflow.Run. Refer to the Workflow.Run Type to learn more about the information recorded during workflow execution and the result of the execution.

WorkflowExecutorRun can call the status() function on the Workflow.Run to determine the status of the workflow.

JSON
{
  "type" : "Workflow.Status",
  "started" : "2023-03-28T19:57:30Z",
  "startedby" : "9d3b1c47b470a633385200e45d8ffff8e9da5700840ff82e15a39469c219b0d1",
  "status" : "running"
}

The above output is from the example Simple Sequence workflow example below. The life cycle of a Workflow.Run is the following:

  • Initial
  • Running
  • Completed
  • Failed
  • Canceled

Refer to the Workflow.Status Type for more details. The WorkflowExecutionSpec Type allows you to choose multiple executors that support the workflow. Selecting an executor happens by setting the executor field in WorkflowExecutionSpec.

Types of workflows

A workflow can consist of two types between vertices and ports: control flow (green) or data flow (yellow) .

Types of Workflows

Control flow

No data is passed from vertex to vertex in the control flow workflow, and a vertex can execute after all its parents have completed execution. Vertices link ports that pass a signal to notify the source vertex has succeeded, enabling the next vertex to start executing.

Data flow

A data flow workflow requires passing data between vertices by linking edges from the output port of the source vertex to an input port of its adjacent (destination) vertex. One vertex's output becomes another input in this type of workflow.

Control Flow workflow

In the above diagram, there are three vertices and two edges. The first edge contains an out port and an input port that accepts arguments as data input, args. The second port only passes data from one vertex to the final vertex in the chain.

A workflow can combine control flow and data flow even between the same two pairs of vertices. There are no restrictions on incorporating the control flow workflow with the data flow workflow within or even within the same set of workflow vertices. For example, how a single output port is connected does not affect the connections of any other output port (or other connections from that same port). Input ports are related because the vertex can start after all Workflow#signals are present on all the input ports.

Example creation of a simple workflow

The following examples demonstrate workflow concepts.

Simple workflow

The most basic workflow contains one Workflow.Action Type on one Workflow.Const Type. This example builds a simple workflow and runs the actions in sequence.

  1. Create two lists to hold the vertices and edges:

    Python
    # hold the vertices
    vertices = []
    # hold the edges
    edges = []
  2. Construct two types of vertices: The Workflow.Const and the Workflow.Action.

    Python
    # fromConst is a helper method for creating a `Workflow.Const`
    const_vertex = c3.Workflow.Const.fromConst("constant").withId("msg")

    The Workflow.Const Type is a specialized Workflow.Vertex representing a vertex with a constant value, such as the integer 42. A Workflow.Const has no input ports but can have a single output port labeled out.

  3. Construct the Workflow.Action and the action vertex.

    You can construct three vertices: A Workflow.Const, and Workflow.Action, and a Workflow.Output.

    Python
    # The "out" output represents the return value of the `#action`, if any.
    action_ref = c3.ActionRef(typeName="Echo", actionName="echoText")
    action_vertex = c3.Workflow.Action.fromAction(action_ref,"echoText", ["msg"], ["out"])
    output = c3.Workflow.Output.fromName("out");
  4. Construction the edges.

    Vertices create edges between their ports. For specific naming of ports, refer to the documentation for the C3 AI Types of the vertices Workflow.Action, Workflow.Input, Workflow.Output, and Workflow.Const. For example, the Type Workflow.Action has ports start, success, and out.

    Python
    edge01 = c3.Workflow.makeEdge(const_vertex.port("out"), action_vertex.port("msg"));
    edge02 = c3.Workflow.makeEdge(action_vertex.port("out"), output.port("in"));

    FlowGraph#makeEdge returns a new edge connecting the two ports.

  5. Construct the workflow.

    Python
    # Append the ports to the vertices and edges
    vertices.append(const_vertex)
    vertices.append(action_vertex);
    vertices.append(output);
    edges.append(edge01);
    edges.append(edge02);
    
    workflow = c3.Workflow(vertices=vertices, edges=edges).create().get();
    x = workflow.start(None)

    A simple workflow is created. To visualize the workflow you can call:

    Python
    workflow.visualize()

The following workflow is created and visualized.

Simple workflow

The above workflow contains three vertices and two edges.

Input and output workflow

The following example illustrates the use of an input provided by a lambda function to update the vertex value.

  1. Construct a Workflow.Const:

    Python
    # fromConst is a helper method for creating a `Workflow.Const`
    const_vertex = c3.Workflow.Const.fromConst("8.3").withId("msg")

    A Workflow.Const has no input ports but can have a single output port labeled out.

  2. Using the Workflow.Input Type, create specialized Workflow.Vertex that serves as a source vertex to represent the unique name of a workflow input. Every Workflow.Input has a single disconnected input port whose name is the name of this vertex, and every Workflow.Input has a single output port out.

    Python
    input_lambda = c3.Workflow.Input.fromName("input_lambda")
  3. Next, create a specialized Workflow.Vertex that represents an ActionRef. Every Workflow.Action has multiple input ports: a port $start and a number of ports whose names match the arguments to the configured Workflow.Action#action. Every Workflow.Action has three output ports: an optional port out, a port $success, and a port error. The out output represents the return value of the Workflow.Action#action, if any.

    Python
    # fromActionRef - Helper method for creating a `Workflow.Action` Type from a `ActionRef`. 
    # Automatically infers fields from `FlowGraph.Vertex`.
    lambdaCaller = c3.Workflow.Action.fromActionRef(c3.ActionRef.fromTypeAction(type=c3.Lambda, action='apply')).withName("apply").withId("apply")
  4. Create a Workflow.Vertex that serves as a sink vertex to represent the unique name of a workflow output. Every Workflow.Output has a single input port in. Every Workflow.Output has a single disconnected output port whose name represents the workflow output name.

    Python
    output_ = c3.Workflow.Output.fromName("output")

    FlowGraph#makeEdge constructs an edge and returns a new edge connecting the two ports. Three edges are constructed for this workflow.

    Python
    # create an edge between the out port of the input_lambda vertix
    edge_one = c3.Workflow.makeEdge(input_lambda.port("out"), lambdaCaller.port("this"))
    
    # create an edge between the constant vertex
    edge_two = c3.Workflow.makeEdge(const_vertex.port("out"), lambdaCaller.port("args"))
    
    # create an edge between vertex representing an Workflow.Action and the sink vertex
    edge_three = c3.Workflow.makeEdge(lambdaCaller.port("out"), output_.port("in"))
  5. Build the workflow.

    Python
    vertices = [input_lambda, const_vertex, lambdaCaller, output_]
    edges = [edge_one, edge_two, edge_three]
    workflow = c3.Workflow(vertices=vertices, edges=edges)
    workflow = workflow.create(returnInclude='this')
  6. Implement a function to accept input and return a string. The returned string hydrates the value of the Workflow.Const vertex.

    Python
    def func(arg1=None):
      list_strings = ["version_"+arg1, "some_other_string"]
      return list_strings
    
    lambda_func = c3.Lambda.fromPyFunc(func)
    inputs = {}
    inputs['input_lambda'] = lambda_func
    run = workflow.start(inputs=inputs)

    Applying the lambda function to the Workflow.Const vertex updates the vertex value to const_8.3.

    Input and Output workflow

Using multi-input lambdas in a workflow

The following workflow demonstrates how to convert a standard Python function, func, into a workflow-compatible component. It illustrates the process of creating constants, manipulating and combining them, and then feeding the results into a function.

At a high level:

  1. You define a function, func, that takes an integer and a string as inputs, and converts the string to an integer, and then returns their sum.
  2. The workflow contains various vertices:
    • Two constants with values 1 (an integer) and "1" (a string).
    • A definition of a Python function func.
    • A combiner that creates a tuple from the two constants.
    • An action that calls the function using the combined inputs.
    • A final output node.
  3. Edges that direct data flow between these vertices.
  4. Upon defining all vertices and edges, the workflow is instantiated and named DummyWorkflow.
  5. In the final step, the workflow is started, executing the defined operations in sequence and routing data through the established paths.

The Python func expects two inputs: an integer a and a string b. It returns the sum of a and the integer representation of b.

Python
def func(a, b):
     """a should be int and b should be string"""
    return a + int(b)

The following vertices are created:

Python
# Represents a constant integer value of `1`.
vertex_a = c3.Workflow.Const.fromConst(const=1)
# Represents a constant string value of `"1"`.
vertex_b = c3.Workflow.Const.fromConst(const="1")

The Lambda#fromPyFunc method converts the given Python function func into a Lambda function implemented in Python.

Python
func_lambda = c3.Lambda.fromPyFunc(func)

The func Python function is wrapped or transformed into a format (func_lambda) that can be used as a vertex in the workflow. This allows for the workflow to later use this converted function as an executable step, preserving its logic and behavior.

Next, create a new constant vertex in the workflow using the previously converted func_lambda.

Python
# Represents the Python function `func`
vertex_lambda = c3.Workflow.Const.fromConst(const=func_lambda)

c3.Workflow.Const.fromConst(...) is used to create a constant vertex, vertex_lambda, for the workflow.

const=func_lambda sets the constant value of the vertex to the func_lambda object.

The resulting object, vertex_lambda, represents a constant vertex in the workflow that holds the Python function. This line is integrating your Python function into the Workflow by making it a constant vertex, so it can be used as a reference for other workflow operations.

Now, create a vertex, vertex_combine, in the workflow that is used to structure or combine data into a specific format. vertex_combine takes two inputs: int and string and then produces an output structured as { a: int, b: string }.

The inputs for this vertex are be provided from vertex_a and vertex_b, and the output is passed along to the subsequent vertices in the workflow.

Python
# Combines the outputs of `vertex_a` and `vertex_b` into a tuple with type { a: int, b: string }
vertex_combine = c3.Workflow.Make.fromTargetType(
    targetType=c3.TupleType.fromString("{ a: int, b: string }"),
    inTypes={"a": c3.IntType(), "b": c3.StringType()}
)

c3.Workflow.Make.fromTargetType(...) is a method call that creates a vertex in the workflow and can take multiple inputs and combine them to form a specific type. The next line targetType=c3.TupleType.fromString("{ a: int, b: string }") specifies the desired output format or structure of the combined data.

It suggests that the output should be a tuple or an object with two fields:

  • a with type int.
  • b with type string.

The TupleType defines the schema or shape of the resulting data after combining the inputs.

inTypes={"a": c3.IntType(), "b": c3.StringType()} provides a mapping for the expected input types:

  • Input for the field a should be of type int.
  • Input for the field b should be of type string.

Create edges between vertices in the workflow.

Python
e1 = c3.Workflow.makeEdge(vertex_a.port("out"), vertex_combine.port("a"))
e2 = c3.Workflow.makeEdge(vertex_b.port("out"), vertex_combine.port("b"))

In the lines above, e1 is an edge connecting the output of vertex_a to the a input port of vertex_combine.

vertex_a.port("out") retrieves the out port of vertex_a.

vertex_combine.port("a") retrieves the a input port of vertex_combine.

Similarly, e2 is an edge connecting the output of vertex_b to the b input port of vertex_combine.

The output data from vertex_a flows into the a input of vertex_combine.

And, the output data from vertex_b flows into the b input of vertex_combine.

These lines set up the data flow structure, ensuring that vertex_combine receives its inputs from vertex_a and vertex_b.

The following code snippet below creates a vertex, specifically an action vertex, in the workflow. Recall, an action refers to a function or method call. This vertex executes a specific action, in this case, a Lambda function.

Python
# Represents the action of calling the `func` lambda function
vertex_sum = c3.Workflow.Action.fromAction(
    action=c3.ActionRef.fromTypeAction(
        type=c3.Lambda,
        action="call"     
      ),
    inNames=["this", "args"],
    outNames=["out"],
    name="dummy_lambda"
)

Breaking down the above code snippet:

  • c3.Workflow.Action.fromAction(...) constructs a new action vertex for the workflow.

  • action=c3.ActionRef.fromTypeAction(...) specifies what action the vertex performs:

    • type=c3.Lambda - The action belongs to the Lambda Type.

    • action="call" - Specifies the method or function of the Lambda Type that should be executed. The action should calls the lambda function (or executes it).

  • inNames=["this", "args"] are the names of the input ports for this vertex. The "this" port expects the actual lambda function to be executed, and the "args" port expects the arguments to be passed to the lambda function.

  • outNames=["out"] are the names of the output ports for this vertex. After the lambda function is executed, the result is available at this "out" port.

  • name="dummy_lambda" provides a name to the action vertex, for identification or logging purposes.

The following code snippet creates edges between vertices in the workflow.

Python
# creates an edge between the out port of the `vertex_lamba` and the input port labeled "this" of the vertex_sum. This
# edge is effectively passing the lambda function to the action vertex, which will call it
e3 = c3.Workflow.makeEdge(vertex_lambda.port("out"), vertex_sum.port("this"))
e4 = c3.Workflow.makeEdge(vertex_combine.port("out"), vertex_sum.port("args"))

These lines are ensuring that:

  • The lambda function, func, is passed to the vertex_sum action vertex.

  • The combined values from vertex_a and vertex_b are also passed as arguments to the same action vertex, ensuring that when the action is executed, it can successfully call the lambda function with the provided arguments.

This next code snippet defines the final steps of the workflow.

Python
vertex_output = c3.Workflow.Output.fromName("output")
e5 = c3.Workflow.makeEdge(vertex_sum.port("out"), vertex_output.port("in"))

vertices = [vertex_a, vertex_b, vertex_lambda, vertex_combine, vertex_sum, vertex_output]
edges = [e1, e2, e3, e4, e5]
workflow = c3.Workflow.make({
    "vertices": vertices,
    "edges": edges,
    "id": str(uuid.uuid4()),
    "name": "DummyWorkflow"
}).create()

workflow.start()

Let's break down its functionalities step-by-step:

  • Create an output vertex named output for the workflow.

    Python
    vertex_output = c3.Workflow.Output.fromName("output")
  • Create an edge e5 that connects the out of the vertex_sum (which represents the action of calling the lambda function func on the inputs a and b) to the input port of vertex_output. It routes the result of the lambda function to the final output node.

    Python
    e5 = c3.Workflow.makeEdge(vertex_sum.port("out"), vertex_output.port("in"))
  • The vertices and edges defined in the workflow are assigned into two separate lists, representing all the vertices and edges in the workflow.

    Python
    vertices = [vertex_a, vertex_b, vertex_lambda, vertex_combine, vertex_sum, vertex_output]
    edges = [e1, e2, e3, e4, e5]
  • Using the Workflow.make method, a new workflow is instantiated with the following properties:

    • vertices: List of all vertices in the Workflow.
    • edges: List of all edges in the Workflow.
    • id: A unique identifier for the Workflow, generated using the uuid library.
    • name: A string name for the Workflow, in this case, DummyWorkflow.

Calling create() finalizes the creation of the workflow, ensuring all the provided vertices and edges are valid.

Python
workflow = c3.Workflow.make({
    "vertices": vertices,
    "edges": edges,
    "id": str(uuid.uuid4()),
    "name": "DummyWorkflow"
}).create()
  • After the workflow is created, it can be started using the start() method to execute the workflow, passing data from the input vertices, through the defined operations, and finally to the output vertex.

    Python
    workflow.start()

The final steps of the multi-input lambda workflow, link together all previously defined operations and data points, and then begins its execution.

Troubleshooting tips for workflow issues

This section covers the most common issues that can occur with workflows and how to troubleshoot them.

Structural issues

  • A common cause of workflow errors results from improper design of the workflow. For example, a workflow could contain an improperly designed cycle by creating an edge that links back to the same vertex. Taking advantage of the visualize() method in the Workflow Type can enable you to visualize the workflow output.

  • Mixing ports that are not compatible is another common issue. An example is trying to connect an out port to a $start port.

  • A final common issue is if your workflow is missing edges where you intended to have edges. Calling the visualize() method can help you visualize your design or triage any structural issues with your workflow.

Execution issues

  • An issue you might encounter when executing workflows is attempting to inspect the output before execution it has completed. Workflow.status() can be used to view the status of a workflow. You should not visualize the output until the workflow status is complete.

See also

Was this page helpful?