Plexe can utilize Ray to distribute parts of the model building process, potentially speeding up execution, especially when exploring multiple solution candidates (max_iterations > 1) or when individual training runs are computationally intensive.

Prerequisites

  1. Install Ray: If you haven’t already, install Ray. You might need specific dependencies depending on your environment (e.g., for cluster support).
    pip install ray
    # Or for dashboard and cluster utilities:
    # pip install ray[default]
    
  2. Install Plexe with Ray support: Ensure you have the necessary Plexe dependencies.
    pip install plexe[all]
    # Or ensure 'smolagents[ray]' is included if installing manually
    
  3. (Optional) Ray Cluster: For true distribution, you need a running Ray cluster. You can start one locally or connect to a remote cluster. See the Ray documentation for setup instructions. If no cluster is running, Ray will typically utilize multiple cores on your local machine.

Enabling Distributed Execution

To enable distributed execution with Ray, simply set the distributed=True flag when initializing the plexe.Model.

import plexe
import pandas as pd
# Optional: Import Ray if you need to manually initialize or connect
# import ray

# --- Prepare Data (as in Quickstart) ---
try:
    df = pd.read_csv("housing_data.csv")
except FileNotFoundError:
    df = pd.DataFrame({ # Dummy data
        'square_footage': [1500, 2100, 1800, 2500, 1200], 'bedrooms': [3, 4, 3, 5, 2],
        'bathrooms': [2, 2.5, 2, 3, 1.5], 'price': [300000, 450000, 380000, 550000, 250000]
    })
datasets = [df]
# -----------------------------------------

# --- Optional: Initialize Ray manually ---
# If you need to connect to a specific cluster or configure local resources
# if not ray.is_initialized():
#    ray.init(address='auto') # Connects to running cluster or starts local one
#    # Or ray.init(address='ray://<head_node_ip>:10001') for remote cluster
#    print(f"Ray initialized: {ray.is_initialized()}")
# -----------------------------------------

# Define the model and enable distributed execution
model = plexe.Model(
    intent="Predict house prices based on property features.",
    input_schema={"square_footage": float, "bedrooms": int, "bathrooms": float},
    output_schema={"price": float},
    distributed=True # Enable Ray integration
)

print("Model defined with distributed=True. Ray will be used if available.")

# Build the model - Plexe will now attempt to use Ray for execution
print("Starting distributed model build...")
model.build(
    datasets=datasets,
    provider="openai/gpt-4o-mini",
    max_iterations=4, # Try multiple iterations to see potential parallelism
    chain_of_thought=True
)

print(f"Distributed model build finished. Model state: {model.get_state()}")

# Prediction and other methods work the same way
if model.get_state() == plexe.internal.common.utils.model_state.ModelState.READY:
    prediction = model.predict({
        "square_footage": 1900.0, "bedrooms": 3, "bathrooms": 2.0
    })
    print(f"Prediction: {prediction}")

# --- Optional: Shutdown Ray manually ---
# if ray.is_initialized():
#    ray.shutdown()
#    print("Ray shut down.")
# ---------------------------------------

How it Works

When distributed=True is set:

  1. Executor Selection: Plexe checks if Ray is available and initialized. If yes, it uses the RayExecutor for running the generated training code; otherwise, it falls back to the default ProcessExecutor.
  2. Task Distribution: The RayExecutor submits the training code execution as a remote task (@ray.remote) to the Ray cluster (or local Ray instance).
  3. Parallelism: If max_iterations in model.build() is greater than 1, and the Ray cluster has sufficient resources (CPUs/GPUs), Ray can potentially execute multiple training iterations in parallel, significantly speeding up the exploration of different model candidates.
  4. Results: Results (metrics, artifacts, logs, exceptions) are collected from the Ray tasks and integrated back into the Plexe model building process.

The degree of speedup depends on the number of iterations, the computational cost of each training run, and the resources available in your Ray cluster (or on your local machine). Tasks involving LLM calls for planning or code generation are typically not distributed via Ray in the current implementation.

Configuration (Advanced)

While distributed=True is the primary switch, advanced Ray configuration (like cluster address, resource limits) can be managed through Ray’s standard initialization methods (ray.init(...)) or configuration files before initializing the plexe.Model. Plexe itself reads some Ray configurations from its internal config (plexe.config.config.ray) which might be relevant in specific deployment scenarios, but direct ray.init() is the most common way to configure the connection.