Plexe provides a flexible callback system that allows you to monitor and interact with the model building process. This reference documents all built-in callbacks and provides information on creating custom callbacks.
Base Callback Class
All callbacks inherit from the Callback
base class:
class Callback:
def on_build_start(self, info: BuildStateInfo) -> None:
"""Called once at the beginning of model.build()"""
pass
def on_iteration_start(self, info: BuildStateInfo) -> None:
"""Called at the start of each iteration"""
pass
def on_iteration_end(self, info: BuildStateInfo) -> None:
"""Called at the end of each iteration"""
pass
def on_build_end(self, info: BuildStateInfo) -> None:
"""Called once at the end of model.build()"""
pass
BuildStateInfo
The BuildStateInfo
dataclass provides context about the current state of the build process and is passed to all callback methods:
@dataclass
class BuildStateInfo:
# Common identification fields
intent: str
"""The natural language description of the model's intent."""
provider: str
"""The provider (LLM) used for generating the model."""
# Schema fields
input_schema: Optional[Type[BaseModel]] = None
"""The input schema for the model."""
output_schema: Optional[Type[BaseModel]] = None
"""The output schema for the model."""
run_timeout: Optional[int] = None
"""Maximum time in seconds for each individual training run."""
max_iterations: Optional[int] = None
"""Maximum number of iterations for the model building process."""
timeout: Optional[int] = None
"""Maximum total time in seconds for the entire model building process."""
# Iteration fields
iteration: int = 0
"""Current iteration number (0-indexed)."""
# Dataset fields
datasets: Optional[Dict[str, TabularConvertible]] = None
"""Dictionary of datasets used for training."""
# Current node being evaluated
node: Optional[Node] = None
"""The solution node being evaluated in the current iteration."""
Attribute | Type | Description |
---|
intent | str | The natural language description of the model’s intent |
provider | str | The provider (LLM) used for generating the model |
input_schema | Optional[Type[BaseModel]] | The input schema for the model |
output_schema | Optional[Type[BaseModel]] | The output schema for the model |
run_timeout | Optional[int] | Maximum time in seconds for each individual training run |
max_iterations | Optional[int] | Maximum number of iterations for the model building process |
timeout | Optional[int] | Maximum total time in seconds for the entire model building process |
iteration | int | Current iteration number (0-indexed) |
datasets | Optional[Dict[str, TabularConvertible]] | Dictionary of datasets used for training |
node | Optional[Node] | The solution node being evaluated in the current iteration |
Built-in Callbacks
ChainOfThoughtModelCallback
This callback logs detailed steps of the agent’s reasoning during the build process:
class ChainOfThoughtModelCallback(Callback):
def __init__(
self,
emitter: Optional[Emitter] = None,
include_code: bool = True
)
Parameter | Type | Description |
---|
emitter | Optional[Emitter] | Object that handles outputting the chain of thought logs. Default: ConsoleEmitter() |
include_code | bool | Whether to include generated code in the logs. Default: True |
This callback is automatically added when chain_of_thought=True
is set in model.build()
.
Example:
import plexe
from plexe.callbacks import ChainOfThoughtModelCallback, ConsoleEmitter
# Create custom emitter if needed (otherwise uses default)
emitter = ConsoleEmitter()
callback = ChainOfThoughtModelCallback(emitter=emitter, include_code=True)
model = plexe.Model(intent="Predict house prices")
model.build(
datasets=[df],
callbacks=[callback]
)
MLFlowCallback
Integrates with MLflow for experiment tracking:
class MLFlowCallback(Callback):
def __init__(
self,
tracking_uri: Optional[str] = None,
experiment_name: Optional[str] = None,
run_name_prefix: str = "plexe_",
log_code: bool = True,
log_artifacts: bool = True
)
Parameter | Type | Description |
---|
tracking_uri | Optional[str] | MLflow tracking server URI. Default: None (uses default MLflow URI) |
experiment_name | Optional[str] | MLflow experiment name. Default: None (uses/creates “Default”) |
run_name_prefix | str | Prefix for MLflow run names. Default: "plexe_" |
log_code | bool | Whether to log generated code as artifacts. Default: True |
log_artifacts | bool | Whether to log model artifacts. Default: True |
Example:
import plexe
from plexe.callbacks import MLFlowCallback
# Initialize MLflow callback
mlflow_callback = MLFlowCallback(
tracking_uri="http://localhost:5000",
experiment_name="Housing Price Models",
run_name_prefix="housing_"
)
model = plexe.Model(intent="Predict house prices")
model.build(
datasets=[df],
callbacks=[mlflow_callback]
)
Creating Custom Callbacks
You can create custom callbacks by subclassing Callback
and implementing the desired methods:
import plexe
from plexe.callbacks import Callback, BuildStateInfo
import time
class TimingCallback(Callback):
def __init__(self):
self.start_time = None
self.iteration_start_times = {}
def on_build_start(self, info: BuildStateInfo) -> None:
self.start_time = time.time()
print(f"Build started at {time.strftime('%H:%M:%S')}")
def on_iteration_start(self, info: BuildStateInfo) -> None:
iteration = info.iteration + 1 # 1-based for output
self.iteration_start_times[iteration] = time.time()
print(f"Iteration {iteration} started at {time.strftime('%H:%M:%S')}")
def on_iteration_end(self, info: BuildStateInfo) -> None:
iteration = info.iteration + 1 # 1-based for output
iteration_time = time.time() - self.iteration_start_times[iteration]
status = "succeeded" if not (info.node and info.node.exception_was_raised) else "failed"
print(f"Iteration {iteration} {status} in {iteration_time:.2f} seconds")
if info.node and info.node.performance:
print(f" Performance: {info.node.performance.name} = {info.node.performance.value:.4f}")
def on_build_end(self, info: BuildStateInfo) -> None:
total_time = time.time() - self.start_time
print(f"Build finished in {total_time:.2f} seconds")
print(f"Final state: {info.model.get_state().name}")
Using Multiple Callbacks
You can use multiple callbacks simultaneously:
import plexe
from plexe.callbacks import MLFlowCallback
# Create callback instances
mlflow_callback = MLFlowCallback(experiment_name="Housing Models")
timing_callback = TimingCallback() # Custom callback from above
# Use all callbacks together
model = plexe.Model(intent="Predict house prices")
model.build(
datasets=[df],
callbacks=[mlflow_callback, timing_callback],
chain_of_thought=True # This adds ChainOfThoughtModelCallback automatically
)
Callback Execution Order
When multiple callbacks are provided:
- All callbacks’
on_build_start
methods are called in the order they appear in the list
- For each iteration:
a. All callbacks’
on_iteration_start
methods are called in order
b. The iteration runs
c. All callbacks’ on_iteration_end
methods are called in order
- All callbacks’
on_build_end
methods are called in order
Emitters for Chain of Thought
The ChainOfThoughtModelCallback
uses a ChainOfThoughtEmitter
to output the chain of thought logs. Built-in emitters include:
ConsoleEmitter
Outputs logs to the console (stdout).
LoggingEmitter
Sends logs to the Python logging system.
MultiEmitter
Combines multiple emitters into one.
Creating Custom Emitters
You can create custom emitters by subclassing ChainOfThoughtEmitter
:
from plexe.internal.common.utils.chain_of_thought.emitters import ChainOfThoughtEmitter
class CustomEmitter(ChainOfThoughtEmitter):
def emit_thought(self, role: str, thought: str) -> None:
# Custom logic for emitting thoughts
formatted = f"[{role}] {thought}"
# Do something with formatted message
# e.g., send to logging service, web socket, etc.
def emit_code(self, role: str, code: str, language: str = None) -> None:
# Custom logic for emitting code
formatted = f"[{role} CODE ({language})] {code}"
# Handle code blocks specially
Best Practices
- Choose callbacks based on your needs: Use MLflow for experiment tracking, TensorBoard for visualization, or custom callbacks for specialized logging
- Limit callback overhead: Complex callbacks can slow down the build process
- Combine callbacks strategically: Multiple callbacks can provide different views of the same process
- Handle exceptions gracefully: Callbacks should catch their own exceptions to avoid disrupting the build process