Plexe provides a flexible callback system that allows you to monitor and interact with the model building process. This reference documents all built-in callbacks and provides information on creating custom callbacks.

Base Callback Class

All callbacks inherit from the Callback base class:

class Callback:
    def on_build_start(self, info: BuildStateInfo) -> None:
        """Called once at the beginning of model.build()"""
        pass

    def on_iteration_start(self, info: BuildStateInfo) -> None:
        """Called at the start of each iteration"""
        pass

    def on_iteration_end(self, info: BuildStateInfo) -> None:
        """Called at the end of each iteration"""
        pass

    def on_build_end(self, info: BuildStateInfo) -> None:
        """Called once at the end of model.build()"""
        pass

BuildStateInfo

The BuildStateInfo dataclass provides context about the current state of the build process and is passed to all callback methods:

@dataclass
class BuildStateInfo:
    # Common identification fields
    intent: str
    """The natural language description of the model's intent."""

    provider: str
    """The provider (LLM) used for generating the model."""

    # Schema fields
    input_schema: Optional[Type[BaseModel]] = None
    """The input schema for the model."""

    output_schema: Optional[Type[BaseModel]] = None
    """The output schema for the model."""

    run_timeout: Optional[int] = None
    """Maximum time in seconds for each individual training run."""

    max_iterations: Optional[int] = None
    """Maximum number of iterations for the model building process."""

    timeout: Optional[int] = None
    """Maximum total time in seconds for the entire model building process."""

    # Iteration fields
    iteration: int = 0
    """Current iteration number (0-indexed)."""

    # Dataset fields
    datasets: Optional[Dict[str, TabularConvertible]] = None
    """Dictionary of datasets used for training."""

    # Current node being evaluated
    node: Optional[Node] = None
    """The solution node being evaluated in the current iteration."""
AttributeTypeDescription
intentstrThe natural language description of the model’s intent
providerstrThe provider (LLM) used for generating the model
input_schemaOptional[Type[BaseModel]]The input schema for the model
output_schemaOptional[Type[BaseModel]]The output schema for the model
run_timeoutOptional[int]Maximum time in seconds for each individual training run
max_iterationsOptional[int]Maximum number of iterations for the model building process
timeoutOptional[int]Maximum total time in seconds for the entire model building process
iterationintCurrent iteration number (0-indexed)
datasetsOptional[Dict[str, TabularConvertible]]Dictionary of datasets used for training
nodeOptional[Node]The solution node being evaluated in the current iteration

Built-in Callbacks

ChainOfThoughtModelCallback

This callback logs detailed steps of the agent’s reasoning during the build process:

class ChainOfThoughtModelCallback(Callback):
    def __init__(
        self,
        emitter: Optional[Emitter] = None,
        include_code: bool = True
    )
ParameterTypeDescription
emitterOptional[Emitter]Object that handles outputting the chain of thought logs. Default: ConsoleEmitter()
include_codeboolWhether to include generated code in the logs. Default: True

This callback is automatically added when chain_of_thought=True is set in model.build().

Example:

import plexe
from plexe.callbacks import ChainOfThoughtModelCallback, ConsoleEmitter

# Create custom emitter if needed (otherwise uses default)
emitter = ConsoleEmitter()
callback = ChainOfThoughtModelCallback(emitter=emitter, include_code=True)

model = plexe.Model(intent="Predict house prices")
model.build(
    datasets=[df],
    callbacks=[callback]
)

MLFlowCallback

Integrates with MLflow for experiment tracking:

class MLFlowCallback(Callback):
    def __init__(
        self,
        tracking_uri: Optional[str] = None,
        experiment_name: Optional[str] = None,
        run_name_prefix: str = "plexe_",
        log_code: bool = True,
        log_artifacts: bool = True
    )
ParameterTypeDescription
tracking_uriOptional[str]MLflow tracking server URI. Default: None (uses default MLflow URI)
experiment_nameOptional[str]MLflow experiment name. Default: None (uses/creates “Default”)
run_name_prefixstrPrefix for MLflow run names. Default: "plexe_"
log_codeboolWhether to log generated code as artifacts. Default: True
log_artifactsboolWhether to log model artifacts. Default: True

Example:

import plexe
from plexe.callbacks import MLFlowCallback

# Initialize MLflow callback
mlflow_callback = MLFlowCallback(
    tracking_uri="http://localhost:5000",
    experiment_name="Housing Price Models",
    run_name_prefix="housing_"
)

model = plexe.Model(intent="Predict house prices")
model.build(
    datasets=[df],
    callbacks=[mlflow_callback]
)

Creating Custom Callbacks

You can create custom callbacks by subclassing Callback and implementing the desired methods:

import plexe
from plexe.callbacks import Callback, BuildStateInfo
import time

class TimingCallback(Callback):
    def __init__(self):
        self.start_time = None
        self.iteration_start_times = {}
        
    def on_build_start(self, info: BuildStateInfo) -> None:
        self.start_time = time.time()
        print(f"Build started at {time.strftime('%H:%M:%S')}")
        
    def on_iteration_start(self, info: BuildStateInfo) -> None:
        iteration = info.iteration + 1  # 1-based for output
        self.iteration_start_times[iteration] = time.time()
        print(f"Iteration {iteration} started at {time.strftime('%H:%M:%S')}")
        
    def on_iteration_end(self, info: BuildStateInfo) -> None:
        iteration = info.iteration + 1  # 1-based for output
        iteration_time = time.time() - self.iteration_start_times[iteration]
        status = "succeeded" if not (info.node and info.node.exception_was_raised) else "failed"
        
        print(f"Iteration {iteration} {status} in {iteration_time:.2f} seconds")
        
        if info.node and info.node.performance:
            print(f"  Performance: {info.node.performance.name} = {info.node.performance.value:.4f}")
        
    def on_build_end(self, info: BuildStateInfo) -> None:
        total_time = time.time() - self.start_time
        print(f"Build finished in {total_time:.2f} seconds")
        print(f"Final state: {info.model.get_state().name}")

Using Multiple Callbacks

You can use multiple callbacks simultaneously:

import plexe
from plexe.callbacks import MLFlowCallback

# Create callback instances
mlflow_callback = MLFlowCallback(experiment_name="Housing Models")
timing_callback = TimingCallback()  # Custom callback from above

# Use all callbacks together
model = plexe.Model(intent="Predict house prices")
model.build(
    datasets=[df],
    callbacks=[mlflow_callback, timing_callback],
    chain_of_thought=True  # This adds ChainOfThoughtModelCallback automatically
)

Callback Execution Order

When multiple callbacks are provided:

  1. All callbacks’ on_build_start methods are called in the order they appear in the list
  2. For each iteration: a. All callbacks’ on_iteration_start methods are called in order b. The iteration runs c. All callbacks’ on_iteration_end methods are called in order
  3. All callbacks’ on_build_end methods are called in order

Emitters for Chain of Thought

The ChainOfThoughtModelCallback uses a ChainOfThoughtEmitter to output the chain of thought logs. Built-in emitters include:

ConsoleEmitter

Outputs logs to the console (stdout).

LoggingEmitter

Sends logs to the Python logging system.

MultiEmitter

Combines multiple emitters into one.

Creating Custom Emitters

You can create custom emitters by subclassing ChainOfThoughtEmitter:

from plexe.internal.common.utils.chain_of_thought.emitters import ChainOfThoughtEmitter

class CustomEmitter(ChainOfThoughtEmitter):
    def emit_thought(self, role: str, thought: str) -> None:
        # Custom logic for emitting thoughts
        formatted = f"[{role}] {thought}"
        # Do something with formatted message
        # e.g., send to logging service, web socket, etc.
        
    def emit_code(self, role: str, code: str, language: str = None) -> None:
        # Custom logic for emitting code
        formatted = f"[{role} CODE ({language})] {code}"
        # Handle code blocks specially

Best Practices

  • Choose callbacks based on your needs: Use MLflow for experiment tracking, TensorBoard for visualization, or custom callbacks for specialized logging
  • Limit callback overhead: Complex callbacks can slow down the build process
  • Combine callbacks strategically: Multiple callbacks can provide different views of the same process
  • Handle exceptions gracefully: Callbacks should catch their own exceptions to avoid disrupting the build process