ML Contracts
Introduction
panda.ml is a built-in module that provides machine learning primitives for Panda smart contracts. Models are trained on-chain via @call methods, serialized to contract state, and served for inference via @query methods. The module uses a dual-backend approach: real sklearn when available, with a pure-Python fallback that produces identical results.
Available Algorithms
- LinearRegression -- Linear least-squares regression
- LogisticRegression -- Logistic regression classifier
- DecisionTreeClassifier -- Decision tree (CART algorithm)
- KMeans -- K-Means clustering
- MLPClassifier -- Multi-layer perceptron neural network
- RandomForestClassifier -- Random forest ensemble
- SGDRegressor -- Stochastic gradient descent regressor (supports incremental learning)
Key Functions
save_model(model)-- Serialize a trained model to a dict suitable for state storageload_model(dict)-- Deserialize a model from a state dict back into a callable modelr2_score(y_true, y_pred)-- R-squared coefficient of determinationaccuracy_score(y_true, y_pred)-- Classification accuracymean_squared_error(y_true, y_pred)-- Mean squared errorconfusion_matrix(y_true, y_pred)-- Confusion matrix
Import Syntax
from panda.ml import LinearRegression, save_model, load_model, r2_score
All panda.ml classes and functions are injected by the PandaVM runtime. They are not pip-installable packages -- they exist only inside the on-chain execution environment (and in the SDK stubs for local testing).
Getting Started: Your First ML Contract
The simplest possible ML contract fits a linear regression model and serves predictions. This example shows the core pattern without validation or error handling:
from panda import contract, call, query
from panda.ml import LinearRegression, save_model, load_model
@contract
class SimplePredictor:
class State:
model: dict = {}
@call
def train(self, ctx, x: list, y: list):
model = LinearRegression()
model.fit(x, y)
self.state.model = save_model(model)
@query
def predict(self, x: list) -> list:
return load_model(self.state.model).predict(x)
The pattern used by every ML contract:
- Train: Create a model, call
model.fit(x, y), thensave_model(model)to serialize it into a dict. - Store: Assign the serialized dict to
self.state.model. This persists it on-chain. - Load: On query, call
load_model(self.state.model)to reconstruct the model from the stored dict. - Predict: Call
model.predict(x)on the reconstructed model and return the results.
The save_model() function produces a plain Python dict with a __panda_ml_model__ type tag. This dict is JSON-serializable and stored in the contract's state. The load_model() function reads this tag and reconstructs the correct model type.
Linear Regression
The LinearPredictor contract implements linear regression from scratch in pure Python, with no sklearn dependency. It computes coefficients using the ordinary least-squares closed-form solution. This is useful for understanding the math, and it works even when sklearn is not available.
"""
LinearPredictor -- A simple linear regression model that trains on-chain.
Example Panda smart contract demonstrating @contract, @call, @query decorators.
"""
from panda import contract, constructor, call, query, event
@contract
class LinearPredictor:
"""A simple linear regression model that trains on-chain."""
class State:
coefficients: list = []
intercept: float = 0.0
is_trained: bool = False
training_samples: int = 0
owner: str = ""
@constructor
def deploy(self, ctx):
"""Called once on deployment."""
self.state.owner = ctx.sender
print(f"LinearPredictor deployed by {ctx.sender}")
@call
def train(self, ctx, x: list, y: list):
"""Train the model with new data."""
# Simple least-squares linear regression (no sklearn needed for basic case)
n = len(x)
if n == 0:
return
# Validate coefficient dimensions
if not isinstance(x[0], (int, float)):
num_features = len(x[0])
if num_features > 10000:
raise ValueError(f"number of features must be <= 10000, got {num_features}")
# For 1D input
if isinstance(x[0], (int, float)):
sum_x = sum(x)
sum_y = sum(y)
sum_xy = sum(xi * yi for xi, yi in zip(x, y))
sum_x2 = sum(xi * xi for xi in x)
denom = n * sum_x2 - sum_x * sum_x
if denom == 0:
return
slope = (n * sum_xy - sum_x * sum_y) / denom
intercept = (sum_y - slope * sum_x) / n
self.state.coefficients = [slope]
self.state.intercept = intercept
else:
# Multi-dimensional: store raw for now
self.state.coefficients = [1.0] * len(x[0])
self.state.intercept = 0.0
self.state.is_trained = True
self.state.training_samples = self.state.training_samples + n
self.emit(
event.ModelTrained(
samples=n,
total_samples=self.state.training_samples,
)
)
print(
f"LinearPredictor trained on {n} samples by {ctx.sender}, total samples: {self.state.training_samples}"
)
@query
def predict(self, x: list) -> list:
"""Run inference. Read-only operation."""
if not self.state.is_trained:
return []
coeffs = self.state.coefficients
intercept = self.state.intercept
if isinstance(x[0], (int, float)):
# 1D prediction
results = [xi * coeffs[0] + intercept for xi in x]
else:
# Multi-dimensional
results = []
for row in x:
pred = sum(c * v for c, v in zip(coeffs, row)) + intercept
results.append(pred)
print(f"LinearPredictor predicted {len(results)} samples")
return results
@query
def info(self) -> dict:
"""Return model metadata."""
return {
"trained": self.state.is_trained,
"samples": self.state.training_samples,
"coefficients": self.state.coefficients,
"intercept": self.state.intercept,
}
How it works
The train method implements the closed-form solution for simple linear regression. For 1D input (scalar x values), it computes the slope and intercept using the standard formulas:
slope = (n * sum(xy) - sum(x) * sum(y)) / (n * sum(x^2) - sum(x)^2)intercept = (sum(y) - slope * sum(x)) / n
For multi-dimensional input, it stores placeholder coefficients. The model state is stored as plain Python lists and floats -- no serialization library needed since the math is done manually.
The predict method reconstructs predictions from the stored coefficients. For 1D: y = slope * x + intercept. For multi-dimensional: y = sum(coeff_i * x_i) + intercept.
The @constructor decorator marks deploy as the method called once at deployment time. It records the deployer's address as the contract owner. The @query decorator marks read-only methods that do not modify state.
Regression with sklearn
The RegressionTrainer contract uses panda.ml.LinearRegression (which wraps sklearn) and supports both linear and polynomial regression via Pipeline and PolynomialFeatures. This is the recommended approach for production regression contracts.
"""
RegressionTrainer -- Linear & polynomial regression with real sklearn.
Demonstrates:
- Real sklearn LinearRegression and PolynomialFeatures
- Model serialization via panda.ml save_model/load_model
- Constructor pattern for model configuration
- R2 score, coefficients inspection
"""
from panda import contract, constructor, call, query, event
from panda.ml import (
LinearRegression,
PolynomialFeatures,
Pipeline,
save_model,
load_model,
r2_score,
)
@contract
class RegressionTrainer:
"""Trains and serves linear/polynomial regression models on-chain."""
class State:
model: dict = {}
model_type: str = "linear"
degree: int = 1
is_trained: bool = False
sample_count: int = 0
owner: str = ""
@constructor
def create(self, ctx, model_type: str = "linear", degree: int = 2):
self.state.owner = ctx.sender
self.state.model_type = model_type
self.state.degree = degree
print(f"RegressionTrainer deployed by {ctx.sender}, type={model_type}, degree={degree}")
@call
def train(self, ctx, x: list, y: list):
"""Train the model with feature matrix x and target vector y."""
if len(x) != len(y):
raise ValueError("x and y must have the same length")
if len(x) < 2:
raise ValueError("Need at least 2 samples")
if self.state.model_type == "polynomial" and self.state.degree > 1:
pipe = Pipeline(
[
("poly", PolynomialFeatures(degree=self.state.degree)),
("lr", LinearRegression()),
]
)
pipe.fit(x, y)
self.state.model = save_model(pipe)
else:
model = LinearRegression()
model.fit(x, y)
self.state.model = save_model(model)
self.state.is_trained = True
self.state.sample_count = self.state.sample_count + len(x)
self.emit(
event.ModelTrained(
model_type=self.state.model_type,
samples=len(x),
total_samples=self.state.sample_count,
trainer=ctx.sender,
)
)
print(
f"RegressionTrainer ({self.state.model_type}) trained on {len(x)} samples by {ctx.sender}, total: {self.state.sample_count}"
)
@query
def predict(self, x: list) -> list:
"""Run inference on input features."""
if not self.state.is_trained:
raise ValueError("Model not trained yet")
model = load_model(self.state.model)
return model.predict(x)
@query
def metrics(self) -> dict:
"""Return model metadata and coefficients."""
info = {
"model_type": self.state.model_type,
"degree": self.state.degree,
"is_trained": self.state.is_trained,
"sample_count": self.state.sample_count,
}
if self.state.is_trained:
model_data = self.state.model
if model_data.get("__panda_ml_model__") == "Pipeline":
lr_data = model_data["steps"][-1][1]
info["coefficients"] = lr_data.get("coef", [])
info["intercept"] = lr_data.get("intercept", 0)
else:
info["coefficients"] = model_data.get("coef", [])
info["intercept"] = model_data.get("intercept", 0)
return info
@query
def evaluate(self, x: list, y: list) -> dict:
"""Evaluate model on test data, returning R2 score."""
if not self.state.is_trained:
raise ValueError("Model not trained yet")
model = load_model(self.state.model)
preds = model.predict(x)
return {
"r2": r2_score(y, preds),
"n_samples": len(x),
}
How it works
The constructor accepts model_type ("linear" or "polynomial") and degree parameters, which configure the model at deploy time. These are stored in state and used when train is called.
For polynomial regression, the contract builds a Pipeline that chains PolynomialFeatures (which expands [x] into [1, x, x^2, ..., x^degree]) with a LinearRegression estimator. The entire pipeline is serialized as one unit via save_model(pipe).
The metrics method inspects the serialized model dict directly. For pipelines, it navigates to the last step (the LinearRegression) to extract coefficients. The __panda_ml_model__ tag distinguishes pipeline models from plain models.
The evaluate method computes the R-squared score by running predictions on test data and comparing against known targets. R-squared ranges from negative infinity to 1.0, where 1.0 means perfect prediction.
Price Prediction
The PricePredictor contract applies linear regression to a real-world scenario: predicting asset prices from feature vectors. It tracks the R-squared score of the most recent training run as a measure of model quality.
"""
PricePredictor -- ML-powered price prediction contract.
Demonstrates:
- Training a linear regression model on historical price data
- Running inference to predict future prices
- Storing training history for auditability
- Using panda.ml LinearRegression + save_model/load_model
"""
from panda import contract, constructor, call, query, event
from panda.ml import LinearRegression, save_model, load_model, r2_score
@contract
class PricePredictor:
"""Linear regression model for on-chain price prediction."""
class State:
owner: str = ""
asset_name: str = ""
model: dict = {}
is_trained: bool = False
sample_count: int = 0
last_r2: float = 0.0
@constructor
def deploy(self, ctx, asset_name: str = "ETH"):
"""Deploy the price predictor for a specific asset.
Args:
asset_name: Name of the asset being predicted (e.g. "ETH", "BTC").
"""
self.state.owner = ctx.sender
self.state.asset_name = asset_name
print(f"PricePredictor deployed for {asset_name} by {ctx.sender}")
@call
def train(self, ctx, features: list, prices: list):
"""Train the model on historical data.
Args:
features: Feature matrix (e.g. [[volume, market_cap], ...])
prices: Target price values
"""
if len(features) != len(prices):
raise ValueError("features and prices must have same length")
if len(features) < 2:
raise ValueError("Need at least 2 data points")
model = LinearRegression()
model.fit(features, prices)
preds = model.predict(features)
r2 = r2_score(prices, preds)
self.state.model = save_model(model)
self.state.is_trained = True
self.state.sample_count = self.state.sample_count + len(features)
self.state.last_r2 = round(r2, 6)
self.emit(
event.ModelTrained(
asset=self.state.asset_name,
samples=len(features),
r2_score=self.state.last_r2,
trainer=ctx.sender,
)
)
@query
def predict(self, features: list) -> list:
"""Predict prices for given feature vectors.
Args:
features: Feature matrix, same shape as training data.
Returns:
List of predicted prices.
"""
if not self.state.is_trained:
raise ValueError("Model not trained yet")
model = load_model(self.state.model)
predictions = model.predict(features)
return [round(p, 2) for p in predictions]
@query
def info(self) -> dict:
"""Return model metadata."""
return {
"asset_name": self.state.asset_name,
"owner": self.state.owner,
"is_trained": self.state.is_trained,
"sample_count": self.state.sample_count,
"last_r2": self.state.last_r2,
}
How it works
The constructor takes an asset_name parameter (defaulting to "ETH") that labels which asset this predictor is for. This is stored in state and emitted in events, making it easy to filter predictions by asset on the explorer.
The train method fits a LinearRegression model and immediately evaluates it on the training data to compute the R-squared score. This score is stored in self.state.last_r2 so callers can check model quality via the info query without re-evaluating.
The predict method rounds output prices to 2 decimal places. This is a practical choice for financial predictions -- it avoids floating-point noise in the returned values while preserving enough precision for price data.
Each training event includes the R-squared score, so the training history can be audited on-chain. This is a pattern worth following: emit metrics alongside training events so that model quality is transparent.
Classification
Classification contracts predict discrete labels (categories) rather than continuous values. The simplest classification contract uses LogisticRegression:
from panda import contract, call, query
from panda.ml import LogisticRegression, save_model, load_model
@contract
class SimpleClassifier:
class State:
model: dict = {}
@call
def train(self, ctx, x: list, y: list):
model = LogisticRegression()
model.fit(x, y)
self.state.model = save_model(model)
@query
def classify(self, x: list) -> list:
return load_model(self.state.model).predict(x)
The full LogisticClassifier contract adds constructor configuration, probability outputs, evaluation metrics, and event emission:
"""
LogisticClassifier -- Multiclass logistic regression on-chain.
Demonstrates:
- Real sklearn LogisticRegression
- Probability predictions (predict_proba)
- Event emission for training and classification
- Model persistence across transactions
"""
from panda import contract, constructor, call, query, event
from panda.ml import (
LogisticRegression,
save_model,
load_model,
accuracy_score,
confusion_matrix,
)
@contract
class LogisticClassifier:
"""On-chain logistic regression classifier."""
class State:
model: dict = {}
classes: list = []
is_trained: bool = False
sample_count: int = 0
owner: str = ""
@constructor
def create(self, ctx, max_iter: int = 200):
self.state.owner = ctx.sender
self.state.model = {"max_iter": max_iter}
print(f"LogisticClassifier deployed by {ctx.sender}, max_iter={max_iter}")
@call
def train(self, ctx, x: list, y: list):
"""Train the classifier on labeled data."""
if len(x) != len(y):
raise ValueError("x and y must have the same length")
if len(x) < 2:
raise ValueError("Need at least 2 samples")
max_iter = self.state.model.get("max_iter", 200)
model = LogisticRegression(max_iter=max_iter)
model.fit(x, y)
self.state.model = save_model(model)
self.state.classes = sorted(set(y))
self.state.is_trained = True
self.state.sample_count = self.state.sample_count + len(x)
self.emit(
event.ModelTrained(
classes=self.state.classes,
samples=len(x),
trainer=ctx.sender,
)
)
print(
f"LogisticClassifier trained on {len(x)} samples by {ctx.sender}, classes: {self.state.classes}"
)
@query
def classify(self, x: list) -> list:
"""Predict class labels for input samples."""
if not self.state.is_trained:
raise ValueError("Model not trained yet")
model = load_model(self.state.model)
return model.predict(x)
@query
def predict_proba(self, x: list) -> list:
"""Predict class probabilities for input samples."""
if not self.state.is_trained:
raise ValueError("Model not trained yet")
model = load_model(self.state.model)
return model.predict_proba(x)
@query
def evaluate(self, x: list, y: list) -> dict:
"""Evaluate model accuracy on test data."""
if not self.state.is_trained:
raise ValueError("Model not trained yet")
model = load_model(self.state.model)
preds = model.predict(x)
return {
"accuracy": accuracy_score(y, preds),
"confusion_matrix": confusion_matrix(y, preds),
"classes": self.state.classes,
}
@query
def info(self) -> dict:
"""Return model metadata."""
return {
"is_trained": self.state.is_trained,
"classes": self.state.classes,
"sample_count": self.state.sample_count,
}
How it works
The constructor stores max_iter in the model dict before training. This is a pattern for pre-training configuration: use the model state dict to hold hyperparameters, then read them back in train. Once save_model() overwrites self.state.model, the hyperparameters are replaced by the trained model's serialized form.
The classify method returns discrete class labels (e.g., [0, 1, 0]). The predict_proba method returns probability distributions over all classes for each input (e.g., [[0.9, 0.1], [0.2, 0.8]]). Probability outputs are useful when the caller needs confidence scores rather than hard decisions.
The evaluate method returns both accuracy and a confusion matrix. The confusion matrix is a 2D list where matrix[i][j] is the count of samples with true label i that were predicted as label j. This enables detailed error analysis beyond a single accuracy number.
The classes field in state tracks the set of labels seen during training. This is important for multi-class problems where the caller needs to know which labels the model can predict.
Fraud Detection
The FraudDetector contract applies logistic regression to transaction fraud detection. It demonstrates role-based access control (only the owner can train), threshold-based flagging, model versioning, and prediction statistics tracking.
"""
FraudDetector -- On-chain fraud detection using panda.ml.
Demonstrates:
- Training a classifier on transaction features
- Running inference to flag suspicious transactions
- Model versioning (retrain with new data)
- Role-based access (only owner can train, anyone can query)
- panda.ml LogisticRegression + save_model/load_model
"""
from panda import contract, constructor, call, query, event
from panda.ml import LogisticRegression, save_model, load_model
@contract
class FraudDetector:
"""Logistic regression classifier for on-chain fraud detection."""
class State:
owner: str = ""
model: dict = {}
is_trained: bool = False
model_version: int = 0
total_predictions: int = 0
flagged_count: int = 0
threshold: float = 0.5
@constructor
def deploy(self, ctx, threshold: float = 0.5):
"""Deploy the fraud detector.
Args:
threshold: Probability threshold above which a tx is flagged (0.0-1.0).
"""
if threshold <= 0.0 or threshold >= 1.0:
raise ValueError("Threshold must be between 0 and 1")
self.state.owner = ctx.sender
self.state.threshold = threshold
print(f"FraudDetector deployed by {ctx.sender}, threshold={threshold}")
@call
def train(self, ctx, features: list, labels: list):
"""Train the fraud detection model.
Args:
features: List of feature vectors, e.g. [[amount, freq, age], ...]
labels: List of 0 (legit) or 1 (fraud) labels
"""
if ctx.sender != self.state.owner:
raise ValueError("Only owner can train")
if len(features) != len(labels):
raise ValueError("features and labels must have same length")
if len(features) < 2:
raise ValueError("Need at least 2 samples")
for label in labels:
if label not in (0, 1):
raise ValueError("Labels must be 0 or 1")
model = LogisticRegression()
model.fit(features, labels)
self.state.model = save_model(model)
self.state.is_trained = True
self.state.model_version = self.state.model_version + 1
self.emit(
event.ModelTrained(
version=self.state.model_version,
sample_count=len(features),
fraud_count=sum(labels),
trainer=ctx.sender,
)
)
@query
def predict(self, features: list) -> list:
"""Run fraud prediction on a batch of transactions.
Args:
features: List of feature vectors to classify.
Returns:
List of dicts with 'flagged' (bool) and 'score' (float) for each input.
"""
if not self.state.is_trained:
raise ValueError("Model not trained yet")
model = load_model(self.state.model)
probas = model.predict_proba(features)
results = []
for proba in probas:
# proba is [p_legit, p_fraud]
fraud_score = round(proba[1], 6) if len(proba) > 1 else round(proba[0], 6)
results.append(
{
"flagged": fraud_score >= self.state.threshold,
"score": fraud_score,
}
)
return results
@call
def check_transaction(self, ctx, features: list):
"""Check a single transaction and record the result.
Args:
features: Feature vector for one transaction, e.g. [amount, freq, age].
"""
if not self.state.is_trained:
raise ValueError("Model not trained yet")
model = load_model(self.state.model)
probas = model.predict_proba([features])
proba = probas[0]
fraud_score = round(proba[1], 6) if len(proba) > 1 else round(proba[0], 6)
flagged = fraud_score >= self.state.threshold
self.state.total_predictions = self.state.total_predictions + 1
if flagged:
self.state.flagged_count = self.state.flagged_count + 1
self.emit(
event.TransactionChecked(
checker=ctx.sender,
flagged=flagged,
score=fraud_score,
)
)
return {"flagged": flagged, "score": fraud_score}
@query
def stats(self) -> dict:
"""Return model and detection statistics."""
return {
"is_trained": self.state.is_trained,
"model_version": self.state.model_version,
"threshold": self.state.threshold,
"total_predictions": self.state.total_predictions,
"flagged_count": self.state.flagged_count,
}
How it works
The contract separates two prediction methods: predict (a @query, read-only) for batch scoring, and check_transaction (a @call, state-mutating) for individual transaction checks that update the prediction counter.
Role-based access: The train method checks ctx.sender != self.state.owner and rejects non-owner callers. This ensures only the contract deployer can update the model, while anyone can run predictions.
Threshold-based flagging: The constructor accepts a threshold parameter (default 0.5). The predict and check_transaction methods use predict_proba to get the fraud probability, then compare it against the threshold. The result includes both the boolean flagged flag and the raw score so callers can apply their own thresholds.
Model versioning: Each call to train increments model_version. This allows auditing which version of the model produced which predictions. The version number is emitted in the ModelTrained event.
Statistics tracking: check_transaction increments total_predictions and flagged_count on every call. The stats query exposes these counters, providing an on-chain record of how many transactions have been checked and what fraction were flagged.
Decision Trees
The DecisionTreeContract uses DecisionTreeClassifier from panda.ml, which implements the CART algorithm. Decision trees are interpretable models -- you can inspect which features matter most via feature importances.
"""
DecisionTreeContract -- Decision tree classifier on-chain.
Demonstrates:
- Real sklearn DecisionTreeClassifier
- Tree depth control, feature importances
- Interpretable classification
"""
from panda import contract, constructor, call, query, event
from panda.ml import (
DecisionTreeClassifier,
save_model,
load_model,
accuracy_score,
)
@contract
class DecisionTreeContract:
"""On-chain decision tree classifier."""
class State:
model: dict = {}
classes: list = []
is_trained: bool = False
max_depth: int = 5
sample_count: int = 0
owner: str = ""
@constructor
def create(self, ctx, max_depth: int = 5):
self.state.owner = ctx.sender
self.state.max_depth = max_depth
print(f"DecisionTreeContract deployed by {ctx.sender}, max_depth={max_depth}")
@call
def train(self, ctx, x: list, y: list):
"""Train the decision tree on labeled data."""
if len(x) != len(y):
raise ValueError("x and y must have the same length")
if len(x) < 2:
raise ValueError("Need at least 2 samples")
model = DecisionTreeClassifier(max_depth=self.state.max_depth)
model.fit(x, y)
self.state.model = save_model(model)
self.state.classes = sorted(set(y))
self.state.is_trained = True
self.state.sample_count = self.state.sample_count + len(x)
self.emit(
event.TreeTrained(
max_depth=self.state.max_depth,
classes=self.state.classes,
samples=len(x),
trainer=ctx.sender,
)
)
print(
f"DecisionTree trained on {len(x)} samples by {ctx.sender}, max_depth={self.state.max_depth}, classes: {self.state.classes}"
)
@query
def classify(self, x: list) -> list:
"""Classify input samples."""
if not self.state.is_trained:
raise ValueError("Model not trained yet")
model = load_model(self.state.model)
return model.predict(x)
@query
def tree_info(self) -> dict:
"""Return tree metadata."""
info = {
"is_trained": self.state.is_trained,
"max_depth": self.state.max_depth,
"classes": self.state.classes,
"sample_count": self.state.sample_count,
}
if self.state.is_trained:
info["feature_importances"] = self.state.model.get("feature_importances", [])
return info
@query
def evaluate(self, x: list, y: list) -> dict:
"""Evaluate accuracy on test data."""
if not self.state.is_trained:
raise ValueError("Model not trained yet")
model = load_model(self.state.model)
preds = model.predict(x)
return {
"accuracy": accuracy_score(y, preds),
"n_samples": len(x),
}
How it works
The max_depth parameter controls how deep the tree can grow. Shallow trees (depth 2-3) are simple and interpretable but may underfit. Deep trees (depth 10+) can memorize training data but may overfit. The default of 5 is a reasonable middle ground.
The tree_info query reads feature_importances directly from the serialized model dict. Feature importances are a list of floats (one per input feature) that sum to 1.0, indicating how much each feature contributed to the tree's decisions. This is a key advantage of tree-based models over black-box approaches.
The save_model() function serializes the entire tree structure (split points, thresholds, leaf values) as a nested dict. The load_model() function reconstructs the tree from this dict. The serialized form includes feature_importances as a top-level field for direct inspection without reconstructing the model.
Random Forest (Ensemble)
The EnsembleClassifier uses RandomForestClassifier, which trains multiple decision trees on random subsets of the data and combines their predictions via majority voting. This typically produces more robust predictions than a single decision tree.
"""
EnsembleClassifier -- Random forest classifier on-chain.
Demonstrates:
- Real sklearn RandomForestClassifier
- Ensemble of decision trees for robust classification
- Feature importance inspection
"""
from panda import contract, constructor, call, query, event
from panda.ml import (
RandomForestClassifier,
save_model,
load_model,
accuracy_score,
)
@contract
class EnsembleClassifier:
"""On-chain random forest classifier."""
class State:
model: dict = {}
classes: list = []
is_trained: bool = False
n_trees: int = 10
max_depth: int = 5
sample_count: int = 0
owner: str = ""
@constructor
def create(self, ctx, n_trees: int = 10, max_depth: int = 5):
self.state.owner = ctx.sender
self.state.n_trees = n_trees
self.state.max_depth = max_depth
print(
f"EnsembleClassifier deployed by {ctx.sender}, n_trees={n_trees}, max_depth={max_depth}"
)
@call
def train(self, ctx, x: list, y: list):
"""Train the random forest on labeled data."""
if len(x) != len(y):
raise ValueError("x and y must have the same length")
if len(x) < 2:
raise ValueError("Need at least 2 samples")
model = RandomForestClassifier(
n_estimators=self.state.n_trees,
max_depth=self.state.max_depth,
)
model.fit(x, y)
self.state.model = save_model(model)
self.state.classes = sorted(set(y))
self.state.is_trained = True
self.state.sample_count = self.state.sample_count + len(x)
self.emit(
event.ForestTrained(
n_trees=self.state.n_trees,
max_depth=self.state.max_depth,
classes=self.state.classes,
samples=len(x),
trainer=ctx.sender,
)
)
print(
f"RandomForest trained on {len(x)} samples by {ctx.sender}, {self.state.n_trees} trees, classes: {self.state.classes}"
)
@query
def classify(self, x: list) -> list:
"""Classify input samples via majority vote."""
if not self.state.is_trained:
raise ValueError("Model not trained yet")
model = load_model(self.state.model)
return model.predict(x)
@query
def forest_info(self) -> dict:
"""Return forest metadata and feature importances."""
info = {
"is_trained": self.state.is_trained,
"n_trees": self.state.n_trees,
"max_depth": self.state.max_depth,
"classes": self.state.classes,
"sample_count": self.state.sample_count,
}
if self.state.is_trained:
info["feature_importances"] = self.state.model.get("feature_importances", [])
return info
@query
def evaluate(self, x: list, y: list) -> dict:
"""Evaluate accuracy on test data."""
if not self.state.is_trained:
raise ValueError("Model not trained yet")
model = load_model(self.state.model)
preds = model.predict(x)
return {
"accuracy": accuracy_score(y, preds),
"n_samples": len(x),
}
How it works
The constructor accepts two hyperparameters: n_trees (number of decision trees in the forest, default 10) and max_depth (maximum depth per tree, default 5). More trees generally improve accuracy at the cost of larger serialized state and more gas for inference.
The classify method runs each input through all trees and returns the majority-voted label. This is more robust than a single tree because individual tree errors tend to cancel out when voting.
Feature importances in a random forest are averaged across all trees. They represent how useful each feature is across the entire ensemble, not just one tree. This provides a more stable importance ranking than a single decision tree.
The serialized model size grows linearly with n_trees. For on-chain contracts, keep n_trees reasonable (10-50) to avoid hitting the 10MB state size limit.
Neural Networks (MLP)
The NeuralNetworkContract uses MLPClassifier to train a multi-layer perceptron. This is the most expressive classifier in panda.ml, capable of learning non-linear decision boundaries.
"""
NeuralNetworkContract -- MLP classifier on-chain.
Demonstrates:
- Real sklearn MLPClassifier
- Configurable hidden layers and learning rate
- Multi-class classification with probability outputs
- Full training + inference lifecycle
"""
from panda import contract, constructor, call, query, event
from panda.ml import (
MLPClassifier,
save_model,
load_model,
accuracy_score,
)
@contract
class NeuralNetworkContract:
"""On-chain multi-layer perceptron classifier."""
class State:
model: dict = {}
classes: list = []
is_trained: bool = False
hidden_layers: list = [64, 32]
max_iter: int = 200
learning_rate: float = 0.001
sample_count: int = 0
owner: str = ""
@constructor
def create(
self,
ctx,
hidden_layers: list = None,
max_iter: int = 200,
learning_rate: float = 0.001,
):
self.state.owner = ctx.sender
if hidden_layers is not None:
if len(hidden_layers) > 20:
raise ValueError(
f"hidden_layers must have at most 20 layers, got {len(hidden_layers)}"
)
for i, size in enumerate(hidden_layers):
if not isinstance(size, int) or size < 1 or size > 10000:
raise ValueError(
f"hidden_layers[{i}] must be an integer between 1 and 10000, got {size}"
)
self.state.hidden_layers = hidden_layers
self.state.max_iter = max_iter
self.state.learning_rate = learning_rate
print(
f"NeuralNetwork deployed by {ctx.sender}, layers={self.state.hidden_layers}, max_iter={max_iter}"
)
@call
def train(self, ctx, x: list, y: list):
"""Train the neural network on labeled data."""
if len(x) != len(y):
raise ValueError("x and y must have the same length")
if len(x) < 2:
raise ValueError("Need at least 2 samples")
model = MLPClassifier(
hidden_layer_sizes=tuple(self.state.hidden_layers),
max_iter=self.state.max_iter,
learning_rate_init=self.state.learning_rate,
)
model.fit(x, y)
self.state.model = save_model(model)
self.state.classes = sorted(set(y))
self.state.is_trained = True
self.state.sample_count = self.state.sample_count + len(x)
self.emit(
event.NetworkTrained(
hidden_layers=self.state.hidden_layers,
classes=self.state.classes,
samples=len(x),
trainer=ctx.sender,
)
)
print(
f"MLP trained on {len(x)} samples by {ctx.sender}, layers={self.state.hidden_layers}, classes: {self.state.classes}"
)
@query
def predict(self, x: list) -> list:
"""Predict class labels."""
if not self.state.is_trained:
raise ValueError("Model not trained yet")
model = load_model(self.state.model)
return model.predict(x)
@query
def predict_proba(self, x: list) -> list:
"""Predict class probabilities."""
if not self.state.is_trained:
raise ValueError("Model not trained yet")
model = load_model(self.state.model)
return model.predict_proba(x)
@query
def architecture(self) -> dict:
"""Return network architecture details."""
info = {
"hidden_layers": self.state.hidden_layers,
"max_iter": self.state.max_iter,
"learning_rate": self.state.learning_rate,
"is_trained": self.state.is_trained,
"classes": self.state.classes,
"sample_count": self.state.sample_count,
}
if self.state.is_trained and self.state.model:
weights = self.state.model.get("weights", [])
total_params = 0
for w in weights:
if isinstance(w, list) and w and isinstance(w[0], list):
total_params += len(w) * len(w[0])
elif isinstance(w, list):
total_params += len(w)
biases = self.state.model.get("biases", [])
for b in biases:
if isinstance(b, list):
total_params += len(b)
info["total_parameters"] = total_params
return info
@query
def evaluate(self, x: list, y: list) -> dict:
"""Evaluate accuracy on test data."""
if not self.state.is_trained:
raise ValueError("Model not trained yet")
model = load_model(self.state.model)
preds = model.predict(x)
return {
"accuracy": accuracy_score(y, preds),
"n_samples": len(x),
}
How it works
The constructor validates hidden layer configuration: at most 20 layers, each with 1 to 10,000 neurons. The default architecture is [64, 32] -- two hidden layers with 64 and 32 neurons respectively. This is sufficient for many classification tasks while keeping the serialized state manageable.
MLPClassifier takes hidden_layer_sizes as a tuple (not a list), so the contract converts with tuple(self.state.hidden_layers). The learning_rate_init parameter sets the initial step size for the Adam optimizer used during training.
The architecture query computes total_parameters by iterating over the serialized weight matrices and bias vectors. For a [64, 32] network with 4-dimensional input and 2 classes, the parameter count is roughly (4*64 + 64) + (64*32 + 32) + (32*2 + 2) = 2,498. This number helps estimate gas cost and state size.
The predict_proba method returns probability distributions over classes, which is useful for soft decisions. For example, if the model outputs [0.3, 0.7] for two classes, the caller can use the 0.7 confidence score rather than just the hard label.
Unsupervised Learning: K-Means
Clustering is unsupervised -- it groups data points by similarity without labeled training data. The simplest clustering contract:
from panda import contract, call, query
from panda.ml import KMeans, save_model, load_model
@contract
class SimpleClustering:
class State:
model: dict = {}
@call
def fit(self, ctx, data: list):
model = KMeans(n_clusters=3)
model.fit(data)
self.state.model = save_model(model)
@query
def predict(self, data: list) -> list:
return load_model(self.state.model).predict(data)
The full ClusteringContract adds configurable cluster count, centroid inspection, and event emission:
"""
ClusteringContract -- K-Means clustering on-chain.
Demonstrates:
- Unsupervised learning (no labels needed)
- Centroid inspection and cluster assignment
- Real sklearn KMeans
"""
from panda import contract, constructor, call, query, event
from panda.ml import KMeans, save_model, load_model
@contract
class ClusteringContract:
"""On-chain K-Means clustering."""
class State:
model: dict = {}
is_fitted: bool = False
n_clusters: int = 3
sample_count: int = 0
owner: str = ""
@constructor
def create(self, ctx, n_clusters: int = 3):
self.state.owner = ctx.sender
self.state.n_clusters = n_clusters
print(f"ClusteringContract deployed by {ctx.sender}, n_clusters={n_clusters}")
@call
def fit(self, ctx, data: list):
"""Fit K-Means on the provided data points."""
if len(data) < self.state.n_clusters:
raise ValueError(
f"Need at least {self.state.n_clusters} samples for "
f"{self.state.n_clusters} clusters"
)
model = KMeans(n_clusters=self.state.n_clusters)
model.fit(data)
self.state.model = save_model(model)
self.state.is_fitted = True
self.state.sample_count = self.state.sample_count + len(data)
self.emit(
event.ClustersFitted(
n_clusters=self.state.n_clusters,
samples=len(data),
fitter=ctx.sender,
)
)
print(
f"ClusteringContract fitted {self.state.n_clusters} clusters on {len(data)} samples by {ctx.sender}"
)
@query
def predict_cluster(self, data: list) -> list:
"""Assign data points to their nearest cluster."""
if not self.state.is_fitted:
raise ValueError("Model not fitted yet")
model = load_model(self.state.model)
return model.predict(data)
@query
def get_centroids(self) -> list:
"""Return the cluster centroid coordinates."""
if not self.state.is_fitted:
raise ValueError("Model not fitted yet")
return self.state.model.get("centroids", [])
@query
def info(self) -> dict:
"""Return clustering metadata."""
return {
"is_fitted": self.state.is_fitted,
"n_clusters": self.state.n_clusters,
"sample_count": self.state.sample_count,
}
How it works
K-Means uses fit instead of train because there are no labels -- the algorithm discovers structure in the data. The fit method validates that the number of data points is at least n_clusters (you cannot form 3 clusters from 2 points).
The predict_cluster method assigns each input point to its nearest centroid, returning cluster indices (e.g., [0, 2, 1, 0]). The get_centroids method reads the centroid coordinates directly from the serialized model dict without reconstructing the model. Centroids are stored as a list of coordinate vectors (e.g., [[1.0, 2.0], [5.0, 6.0], [9.0, 10.0]] for 3 clusters in 2D space).
Note the naming convention difference: clustering contracts use is_fitted and fit (following sklearn's convention for unsupervised methods), while supervised contracts use is_trained and train.
Online Learning
The OnlineLearner contract uses SGDRegressor with partial_fit() for incremental learning. Unlike batch models that retrain from scratch, this model accumulates knowledge across multiple transactions. Each call to update improves the model with new data without forgetting what it learned previously.
"""
OnlineLearner -- Incremental SGD regression on-chain.
Demonstrates:
- Online/incremental learning across multiple transactions
- Model improves with each call to update()
- Training history tracking
- Real sklearn SGDRegressor partial_fit
"""
from panda import contract, constructor, call, query, event
from panda.ml import (
SGDRegressor,
save_model,
load_model,
r2_score,
mean_squared_error,
)
@contract
class OnlineLearner:
"""Incrementally-trained regression model that improves each transaction."""
class State:
model: dict = {}
is_fitted: bool = False
update_count: int = 0
total_samples: int = 0
history: list = []
owner: str = ""
@constructor
def create(self, ctx, learning_rate: float = 0.01):
self.state.owner = ctx.sender
self.state.model = {"learning_rate": learning_rate}
print(f"OnlineLearner deployed by {ctx.sender}, learning_rate={learning_rate}")
@call
def update(self, ctx, x: list, y: list):
"""Incrementally update the model with new data.
Each call improves the model by running one SGD pass over the
new batch. The model state is persisted between calls, so it
accumulates knowledge across multiple transactions.
"""
if len(x) != len(y):
raise ValueError("x and y must have the same length")
if len(x) == 0:
raise ValueError("Need at least 1 sample")
lr = self.state.model.get("learning_rate", 0.01)
if self.state.is_fitted:
# Load existing model and continue training
model = load_model(self.state.model)
else:
# First update: create new model
model = SGDRegressor(learning_rate=lr)
model.partial_fit(x, y)
# Compute loss on this batch for history
preds = model.predict(x)
mse = mean_squared_error(y, preds)
self.state.model = save_model(model)
self.state.is_fitted = True
self.state.update_count = self.state.update_count + 1
self.state.total_samples = self.state.total_samples + len(x)
# Track training history (keep last 50 entries)
history = self.state.history
history.append(
{
"update": self.state.update_count,
"batch_size": len(x),
"mse": mse,
}
)
if len(history) > 50:
history = history[-50:]
self.state.history = history
self.emit(
event.ModelUpdated(
update_number=self.state.update_count,
batch_size=len(x),
mse=mse,
updater=ctx.sender,
)
)
print(
f"OnlineLearner update #{self.state.update_count} by {ctx.sender}, batch_size={len(x)}, mse={mse:.6f}, total_samples={self.state.total_samples}"
)
@query
def predict(self, x: list) -> list:
"""Run inference with the current model."""
if not self.state.is_fitted:
raise ValueError("Model not fitted yet. Call update() first.")
model = load_model(self.state.model)
return model.predict(x)
@query
def evaluate(self, x: list, y: list) -> dict:
"""Evaluate model on test data."""
if not self.state.is_fitted:
raise ValueError("Model not fitted yet")
model = load_model(self.state.model)
preds = model.predict(x)
return {
"r2": r2_score(y, preds),
"mse": mean_squared_error(y, preds),
"n_samples": len(x),
}
@query
def training_history(self) -> dict:
"""Return the training history."""
return {
"update_count": self.state.update_count,
"total_samples": self.state.total_samples,
"history": self.state.history,
}
@query
def model_params(self) -> dict:
"""Return current model parameters."""
if not self.state.is_fitted:
return {"fitted": False}
return {
"fitted": True,
"coef": self.state.model.get("coef", []),
"intercept": self.state.model.get("intercept", 0),
"n_updates": self.state.model.get("n_updates", 0),
}
How it works
The key method is update, not train. It uses partial_fit() instead of fit(). The difference is critical: fit() retrains from scratch on each call, while partial_fit() takes one gradient step from the current weights using the new data batch.
The first call to update creates a fresh SGDRegressor. Subsequent calls load the existing model via load_model() and continue training. This means the model improves incrementally with each transaction. A contract can receive data from many different senders over time, and the model incorporates all of it.
Training history is capped at 50 entries (if len(history) > 50: history = history[-50:]) to prevent unbounded state growth. Each entry records the update number, batch size, and MSE on that batch, providing a loss curve that callers can use to monitor convergence.
The model_params query exposes the raw coefficients and intercept for transparency. Combined with training_history, this gives full visibility into how the model has evolved over time.
Model Marketplace
The ModelMarketplace contract is a registry for pre-trained models. Users train models off-chain using panda.ml, serialize them with save_model(), then upload the serialized dicts to the marketplace. Anyone can then run inference on any registered model.
"""
ModelMarketplace -- Multi-model registry and serving on-chain.
Demonstrates:
- Storing multiple models in a single contract
- Model registration, lookup, and inference
- Using panda.storage.PandaMap for model registry
- Any model type supported via panda.ml.load_model
- Users can train offline, serialize with save_model(), upload, and serve
"""
from panda import contract, constructor, call, query, event
from panda.ml import load_model
@contract
class ModelMarketplace:
"""On-chain model registry: register, discover, and run inference."""
class State:
models: dict = {}
model_count: int = 0
owner: str = ""
@constructor
def create(self, ctx):
self.state.owner = ctx.sender
print(f"ModelMarketplace deployed by {ctx.sender}")
@call
def register_model(self, ctx, model_name: str, model_dict: dict, description: str = ""):
"""Register a pre-trained model for on-chain serving.
The model_dict should be produced by panda.ml.save_model() and
must contain a '__panda_ml_model__' key.
"""
if not model_name:
raise ValueError("model_name cannot be empty")
if not isinstance(model_dict, dict):
raise ValueError("model_dict must be a dict")
if "__panda_ml_model__" not in model_dict:
raise ValueError(
"model_dict must have '__panda_ml_model__' key. "
"Use panda.ml.save_model() to create it."
)
# Validate model dict serialized size
model_str_len = len(str(model_dict))
if model_str_len > 1_000_000:
raise ValueError(
f"model_dict serialized size ({model_str_len}) exceeds limit of 1,000,000 characters"
)
# Verify the model can be loaded
load_model(model_dict)
models = self.state.models
is_new = model_name not in models
models[model_name] = {
"model": model_dict,
"description": description,
"model_type": model_dict["__panda_ml_model__"],
"owner": ctx.sender,
"block_height": ctx.block_height,
}
self.state.models = models
if is_new:
self.state.model_count = self.state.model_count + 1
self.emit(
event.ModelRegistered(
model_name=model_name,
model_type=model_dict["__panda_ml_model__"],
registrant=ctx.sender,
)
)
print(
f"Model '{model_name}' ({model_dict['__panda_ml_model__']}) registered by {ctx.sender}, total models: {self.state.model_count}"
)
@call
def remove_model(self, ctx, model_name: str):
"""Remove a model from the registry. Only the model owner can remove."""
models = self.state.models
if model_name not in models:
raise ValueError(f"Model '{model_name}' not found")
if models[model_name]["owner"] != ctx.sender and ctx.sender != self.state.owner:
raise ValueError("Only the model owner or contract owner can remove")
del models[model_name]
self.state.models = models
self.state.model_count = self.state.model_count - 1
self.emit(
event.ModelRemoved(
model_name=model_name,
remover=ctx.sender,
)
)
print(
f"Model '{model_name}' removed by {ctx.sender}, remaining models: {self.state.model_count}"
)
@query
def predict_with(self, model_name: str, x: list) -> list:
"""Run inference using a named model."""
models = self.state.models
if model_name not in models:
raise ValueError(f"Model '{model_name}' not found")
model_dict = models[model_name]["model"]
model = load_model(model_dict)
return model.predict(x)
@query
def list_models(self) -> list:
"""List all registered models with metadata."""
result = []
for name, entry in sorted(self.state.models.items()):
result.append(
{
"name": name,
"model_type": entry["model_type"],
"description": entry["description"],
"owner": entry["owner"],
"block_height": entry["block_height"],
}
)
return result
@query
def get_model(self, model_name: str) -> dict:
"""Get full model details including serialized weights."""
models = self.state.models
if model_name not in models:
raise ValueError(f"Model '{model_name}' not found")
entry = models[model_name]
return {
"name": model_name,
"model_type": entry["model_type"],
"description": entry["description"],
"owner": entry["owner"],
"block_height": entry["block_height"],
"model": entry["model"],
}
@query
def info(self) -> dict:
"""Return marketplace metadata."""
return {
"model_count": self.state.model_count,
"owner": self.state.owner,
}
How it works
The marketplace contract only imports load_model -- it does not import any model classes. This is intentional: the contract does not create models, it only stores and serves them. Any model type that panda.ml supports can be registered.
Registration validation: register_model checks three things before accepting a model: (1) the model dict has a __panda_ml_model__ key (proving it came from save_model()), (2) the serialized size is under 1,000,000 characters (preventing state bloat), and (3) load_model(model_dict) succeeds without error (proving the model is valid and can be deserialized).
Access control for removal: remove_model allows either the model's original registrant (models[model_name]["owner"]) or the contract owner (self.state.owner) to remove a model. This provides both user autonomy and admin oversight.
The predict_with query is the core serving method. It takes a model name and input data, loads the named model, and runs inference. This means a single marketplace contract can serve predictions from many different models of different types (regression, classification, clustering, etc.).
Each model entry records the block_height at registration time. This provides an on-chain timestamp for when the model was uploaded, which is useful for provenance tracking.
Language Models: NanoGPT
Proof of Concept: The NanoGPT series (NanoGPT, NanoGPTAsync, FederatedNanoGPT, NanoTokenizer) is a proof of concept demonstrating that transformer training and inference can run entirely on-chain. On-chain language models are an active area of research -- challenges around gas costs at scale, model size limits, training data quality, and inference latency are being actively investigated. These contracts are functional and fully tested, but should be treated as experimental building blocks rather than production-ready LLM infrastructure.
The NanoGPT contract is a complete GPT-2-style transformer implemented entirely in NumPy, deployed as a Panda smart contract. It supports on-chain training via manual backpropagation and autoregressive text generation. The default micro configuration (d_model=64, n_layers=2, n_heads=2, vocab_size=256, block_size=64) produces a model with approximately 33,000 parameters.
The full source is approximately 800 lines, including helper functions for softmax, GELU activation, layer normalization, multi-head causal self-attention, MLP blocks, cross-entropy loss, and complete forward/backward passes. The contract class itself is shown below:
@contract
class NanoGPT:
"""On-chain GPT-2-style transformer with manual backpropagation."""
class State:
weights: dict = {}
config: dict = {}
is_trained: bool = False
total_steps: int = 0
total_tokens_seen: int = 0
last_loss: float = 0.0
training_history: list = []
owner: str = ""
@constructor
def create(
self,
ctx,
d_model: int = 64,
n_layers: int = 2,
n_heads: int = 2,
vocab_size: int = 256,
block_size: int = 64,
):
"""Initialize the GPT model with random weights."""
# Validates config bounds, then initializes weights with Xavier-scale init
# using a deterministic RNG (np.random.RandomState(42)).
# Creates: token embeddings, position embeddings, N transformer blocks
# (each with LayerNorm + MultiHeadAttention + LayerNorm + MLP),
# final LayerNorm, and LM head.
# ~33K parameters for default config.
...
@call
def train(self, ctx, token_ids: list, learning_rate: float = 0.001):
"""
Train the model on a sequence of token ids using next-token prediction.
Creates a single training sequence using a sliding window of block_size+1
tokens. The input X is the first block_size tokens, and target Y is the
last block_size tokens (shifted by 1). Performs forward pass, computes
cross-entropy loss, backward pass, and SGD weight update.
"""
...
@query
def generate(
self, prompt_tokens: list, max_new_tokens: int = 50, temperature: float = 1.0
) -> list:
"""
Autoregressively generate tokens given a prompt.
Uses greedy decoding (argmax) after applying temperature scaling.
"""
...
@query
def get_loss(self, token_ids: list) -> float:
"""Compute cross-entropy loss on a sequence without updating weights."""
...
@query
def model_info(self) -> dict:
"""Return model configuration and training statistics."""
...
Architecture
The transformer architecture follows GPT-2:
- Token embedding: Lookup table mapping token IDs to d_model-dimensional vectors.
- Position embedding: Learned position vectors added to token embeddings.
- N transformer blocks, each containing:
- Layer normalization (pre-norm)
- Multi-head causal self-attention (with causal mask preventing attention to future positions)
- Residual connection
- Layer normalization (pre-norm)
- MLP (Linear -> GELU -> Linear, with 4x expansion)
- Residual connection
- Final layer normalization
- LM head: Linear projection from d_model to vocab_size, producing logits.
All operations are implemented in pure NumPy. The forward pass produces logits for each position, and the backward pass computes gradients through every layer using manual chain-rule differentiation (no autograd framework). Weight updates use simple SGD.
Training
The train method accepts a list of token IDs and a learning rate. It creates input/target pairs by shifting the sequence by one position (standard next-token prediction). The training loop:
- Forward pass through all layers, caching intermediate values.
- Cross-entropy loss between predicted logits and target tokens.
- Backward pass computing gradients for every weight matrix, bias vector, and normalization parameter.
- SGD update:
weight = weight - learning_rate * gradient.
Training history is capped at 50 entries, and each entry records the step number, loss, token count, and trainer address.
Generation
The generate method performs autoregressive decoding: at each step, it runs a forward pass on the current context (cropped to block_size), applies temperature scaling to the last position's logits, and selects the next token via argmax (greedy decoding). The generated tokens are appended to the context and the process repeats.
The full source including all helper functions is located at contracts/ml/nano_gpt.py.
NanoGPT Async -- Multi-Block Training
Proof of Concept: The NanoGPT series demonstrates that transformer training and inference can run entirely on-chain. On-chain language models are an active area of research -- challenges around gas costs at scale, model size limits, and training data quality are being actively investigated. These contracts are functional and fully tested, but should be treated as experimental building blocks rather than production-ready LLM infrastructure.
NanoGPTAsync extends NanoGPT with async multi-block training. Instead of running the entire training loop in a single transaction (which hits gas limits for larger datasets), it uses await sleep(blocks=1) to spread training across multiple blocks -- one mini-batch per block.
How async training works:
train_async()is called with the full dataset, epoch count, batch size, and learning rate.- The contract stores the training data in state and enters a loop.
- Each iteration processes one mini-batch (forward + backward + SGD), then calls
await sleep(blocks=1). - The VM checkpoints the contract state and resumes execution in the next block.
- The owner can cancel mid-flight with
cancel_training().
Key async-specific state fields:
class State:
# ... (same base fields as NanoGPT) ...
training_data: list = [] # stored for multi-block access
training_in_progress: bool = False # lock to prevent concurrent training
current_epoch: int = 0
target_epochs: int = 1
current_batch_idx: int = 0
total_batches: int = 0
batch_size: int = 4
epoch_losses: list = []
checkpoint_epoch: int = 0
learning_rate: float = 0.001
The async training loop (core pattern):
@call
async def train_async(self, ctx, token_ids: list, epochs: int = 1,
batch_size: int = 4, learning_rate: float = 0.001):
# Store training data in state for multi-block access
self.state.training_data = list(token_ids)
self.state.training_in_progress = True
while self.state.current_epoch < self.state.target_epochs:
await sleep(blocks=1) # yield to next block
if not self.state.training_in_progress:
break # cancelled
# Process one mini-batch
weights = _deserialize_weights(self.state.weights)
for s in range(seq_start, seq_end):
X = np.array(seq[:-1], dtype=np.int64)
Y = np.array(seq[1:], dtype=np.int64)
logits, caches = _forward_pass(weights, config, X)
loss, dlogits = _cross_entropy_loss(logits, Y)
grads = _backward_pass(dlogits, weights, config, caches)
for name in weights:
if name in grads:
weights[name] = weights[name] - lr * grads[name]
self.state.weights = _serialize_weights(weights)
# Advance batch/epoch counters...
self.state.training_in_progress = False
self.state.training_data = [] # free state space
Cancel mid-training:
@call
def cancel_training(self, ctx):
"""Cancel an in-progress async training run. Owner only."""
if ctx.sender != self.state.owner:
raise ValueError("Only the contract owner can cancel training")
if not self.state.training_in_progress:
raise ValueError("No training in progress")
self.state.training_in_progress = False
self.state.training_data = [] # Free state space
self.emit(event.TrainingCancelled(
cancelled_by=ctx.sender,
epoch=self.state.current_epoch,
batch=self.state.current_batch_idx,
))
Monitor progress:
@query
def training_progress(self) -> dict:
return {
"training_in_progress": self.state.training_in_progress,
"current_epoch": self.state.current_epoch,
"target_epochs": self.state.target_epochs,
"current_batch": self.state.current_batch_idx,
"total_batches": self.state.total_batches,
"total_steps": self.state.total_steps,
"last_loss": self.state.last_loss,
"checkpoint_epoch": self.state.checkpoint_epoch,
"epoch_losses": self.state.epoch_losses,
}
The full source is at contracts/ml/nano_gpt_async.py (~1000 lines including all helper functions).
FederatedNanoGPT -- Decentralized Training
FederatedNanoGPT implements Federated Stochastic Gradient Descent (FedSGD) for collaborative model training. Multiple participants train a shared transformer model without sharing their raw data.
How federated training works:
- The contract owner deploys with federation parameters:
min_contributions,max_grad_norm,learning_rate. - Contributors call
contribute_gradients(token_ids)with their private data. The contract runs forward + backward, clips gradients by L2 norm, and accumulates them into a buffer. - Once enough contributions are collected, anyone calls
aggregate_and_update()to average the gradients and apply one SGD step. - The gradient buffer is cleared, the round advances, and the process repeats.
Federated-specific state:
class State:
# ... (same base model fields) ...
round: int = 0
contributions_this_round: int = 0
min_contributions_per_round: int = 1
contributors: list = []
round_contributors: list = []
gradient_buffer: dict = {}
total_contributions: int = 0
max_grad_norm: float = 1.0
learning_rate: float = 0.001
Gradient contribution with L2 clipping:
@call
def contribute_gradients(self, ctx, token_ids: list):
"""Compute forward + backward on private data, submit clipped gradients."""
weights = _deserialize_weights(self.state.weights)
# Forward + backward on contributor's data
logits, caches = _forward_pass(weights, config, X)
loss, dlogits = _cross_entropy_loss(logits, Y)
grads = _backward_pass(dlogits, weights, config, caches)
# L2 gradient norm clipping
global_norm = float(np.sqrt(sum(np.sum(g**2) for g in grads.values())))
if global_norm > self.state.max_grad_norm:
clip_factor = self.state.max_grad_norm / global_norm
for name in grads:
grads[name] = grads[name] * clip_factor
# Accumulate into round buffer
buffer = dict(self.state.gradient_buffer)
for name, grad_arr in grads.items():
if name in buffer:
buffer[name] = (np.array(buffer[name]) + grad_arr).tolist()
else:
buffer[name] = grad_arr.tolist()
self.state.gradient_buffer = buffer
Aggregation and weight update:
@call
def aggregate_and_update(self, ctx):
"""Average accumulated gradients and apply one SGD step."""
n = self.state.contributions_this_round
if n < self.state.min_contributions_per_round:
raise ValueError(f"Not enough contributions: {n}")
weights = _deserialize_weights(self.state.weights)
buffer = self.state.gradient_buffer
lr = self.state.learning_rate
for name in buffer:
avg_grad = np.array(buffer[name]) / n
weights[name] = weights[name] - lr * avg_grad
self.state.weights = _serialize_weights(weights)
self.state.gradient_buffer = {}
self.state.round = self.state.round + 1
ZK validity proof for contributions:
@call
@proof(type="validity")
def contribute_and_aggregate(self, ctx, token_ids: list):
"""Contribute gradients and auto-aggregate if threshold met."""
self.contribute_gradients(ctx, token_ids)
if self.state.contributions_this_round >= self.state.min_contributions_per_round:
self.aggregate_and_update(ctx)
The @proof(type="validity") decorator generates a ZK proof that the gradient computation was performed correctly, preventing poisoning attacks.
The full source is at contracts/ml/federated_nano_gpt.py (~970 lines including all helper functions).
NanoTokenizer -- On-Chain BPE
NanoTokenizer implements Byte-Pair Encoding (BPE) -- the same tokenization algorithm used by GPT-2, GPT-3, and GPT-4. It converts text to token IDs and back, producing a reusable vocabulary for NanoGPT contracts.
How BPE works:
- Start with a base vocabulary of 256 byte values (0-255).
- Scan the corpus for the most frequent adjacent byte pair.
- Merge that pair into a new token (ID 256, 257, ...).
- Repeat for
num_mergesiterations.
Full contract source:
@contract
class NanoTokenizer:
"""Byte-pair encoding tokenizer trained and served entirely on-chain."""
class State:
vocab: dict = {}
merges: list = []
vocab_size: int = 256
id_to_token: dict = {}
is_trained: bool = False
corpus_size: int = 0
owner: str = ""
@constructor
def create(self, ctx, vocab_size: int = 256):
self.state.owner = ctx.sender
self.state.vocab_size = vocab_size
# Initialize base vocabulary: each byte value 0-255 maps to itself
vocab = {}
id_to_token = {}
for i in range(256):
token_str = str(i)
vocab[token_str] = i
id_to_token[str(i)] = [i]
self.state.vocab = vocab
self.state.id_to_token = id_to_token
@call
def train_tokenizer(self, ctx, text: str, num_merges: int = 100):
"""Train BPE on a text corpus. Additive: extends existing merge table."""
tokens = list(text.encode("utf-8"))
# Apply existing merges first
for merge in self.state.merges:
pair_a, pair_b, new_id = merge[0], merge[1], merge[2]
new_tokens = []
i = 0
while i < len(tokens):
if i < len(tokens) - 1 and tokens[i] == pair_a and tokens[i + 1] == pair_b:
new_tokens.append(new_id)
i += 2
else:
new_tokens.append(tokens[i])
i += 1
tokens = new_tokens
merges = self.state.merges
for merge_idx in range(num_merges):
# Count adjacent pairs
pairs = {}
for i in range(len(tokens) - 1):
pair_key = str(tokens[i]) + "," + str(tokens[i + 1])
pairs[pair_key] = pairs.get(pair_key, 0) + 1
if not pairs:
break
# Merge the most frequent pair
best_key = max(pairs, key=pairs.get)
parts = best_key.split(",")
best_a, best_b = int(parts[0]), int(parts[1])
new_id = 256 + len(merges)
# Replace all occurrences
new_tokens = []
i = 0
while i < len(tokens):
if i < len(tokens) - 1 and tokens[i] == best_a and tokens[i + 1] == best_b:
new_tokens.append(new_id)
i += 2
else:
new_tokens.append(tokens[i])
i += 1
tokens = new_tokens
merges.append([best_a, best_b, new_id])
self.state.merges = merges
self.state.is_trained = True
@query
def encode(self, text: str) -> list:
"""Encode text to token IDs by replaying merge rules."""
tokens = list(text.encode("utf-8"))
for merge in self.state.merges:
pair_a, pair_b, new_id = merge[0], merge[1], merge[2]
new_tokens = []
i = 0
while i < len(tokens):
if i < len(tokens) - 1 and tokens[i] == pair_a and tokens[i + 1] == pair_b:
new_tokens.append(new_id)
i += 2
else:
new_tokens.append(tokens[i])
i += 1
tokens = new_tokens
return tokens
@query
def decode(self, tokens: list) -> str:
"""Decode token IDs back to text."""
id_to_token = self.state.id_to_token
byte_values = []
for token_id in tokens:
token_key = str(token_id)
if token_key in id_to_token:
byte_values.extend(id_to_token[token_key])
elif token_id < 256:
byte_values.append(token_id)
return bytes(byte_values).decode("utf-8", errors="replace")
@query
def vocab_info(self) -> dict:
return {
"base_vocab": 256,
"num_merges": len(self.state.merges),
"current_vocab_size": 256 + len(self.state.merges),
"target_vocab_size": self.state.vocab_size,
"is_trained": self.state.is_trained,
"corpus_size": self.state.corpus_size,
}
Key properties:
- Additive training: Call
train_tokenizermultiple times -- new merges build on existing ones. - Deterministic roundtrip:
decode(encode(text)) == textalways holds. - Reusable vocabulary: Deploy one tokenizer, use its
encode()output astoken_idsfor NanoGPT.
The full source is at contracts/ml/nano_tokenizer.py (234 lines).
panda.ml API Reference
| Class / Function | Type | Description |
|---|---|---|
LinearRegression | Model | Linear least-squares regression. Methods: fit(x, y), predict(x). |
LogisticRegression | Model | Logistic regression classifier. Methods: fit(x, y), predict(x), predict_proba(x). Constructor param: max_iter. |
DecisionTreeClassifier | Model | CART decision tree. Methods: fit(x, y), predict(x). Constructor param: max_depth. Exposes feature_importances in serialized form. |
KMeans | Model | K-Means clustering. Methods: fit(data), predict(data). Constructor param: n_clusters. Exposes centroids in serialized form. |
MLPClassifier | Model | Multi-layer perceptron neural network. Methods: fit(x, y), predict(x), predict_proba(x). Constructor params: hidden_layer_sizes, max_iter, learning_rate_init. |
RandomForestClassifier | Model | Random forest ensemble of decision trees. Methods: fit(x, y), predict(x). Constructor params: n_estimators, max_depth. Exposes feature_importances in serialized form. |
SGDRegressor | Model | Stochastic gradient descent regressor with incremental learning. Methods: fit(x, y), partial_fit(x, y), predict(x). Constructor param: learning_rate. |
StandardScaler | Transformer | Feature normalization (zero mean, unit variance). Methods: fit(x), transform(x), fit_transform(x). |
PolynomialFeatures | Transformer | Polynomial feature expansion. Methods: fit(x), transform(x), fit_transform(x). Constructor param: degree. |
Pipeline | Meta | Chains transformers and a final model. Constructor takes a list of (name, estimator) tuples. Methods: fit(x, y), predict(x). |
save_model(model) | Function | Serialize a trained model to a JSON-compatible dict with a __panda_ml_model__ type tag. The returned dict can be stored in contract state. |
load_model(dict) | Function | Deserialize a model from a dict produced by save_model(). Returns a callable model object with predict() and other methods. |
r2_score(y_true, y_pred) | Function | R-squared (coefficient of determination). Returns a float in (-inf, 1.0] where 1.0 is perfect prediction. |
accuracy_score(y_true, y_pred) | Function | Classification accuracy. Returns a float in [0.0, 1.0] representing the fraction of correct predictions. |
mean_squared_error(y_true, y_pred) | Function | Mean squared error. Returns a non-negative float. Lower is better. |
confusion_matrix(y_true, y_pred) | Function | Confusion matrix as a 2D list. matrix[i][j] is the count of samples with true label i predicted as label j. |
Distributed & Large-Scale Training
The monolithic NanoGPT contract stores all ~33K parameters in a single contract. That works for small models, but real-world LLMs have billions of parameters. Panda solves this with distributed weight sharding -- splitting model weights across multiple contracts and coordinating training via cross-contract calls.
This section covers the patterns that make large-scale on-chain ML feasible: weight sharding, pipelined training, LoRA fine-tuning, gradient accumulation, and quantized storage.
Distributed Weight Storage
The core insight: a transformer's layers are mathematically independent during the forward pass (each layer only depends on the output of the previous layer). This means we can store each layer's weights in a separate contract and fetch them on demand.
Minimal example -- a WeightShard contract:
from panda import contract, constructor, call, query, event
@contract
class WeightShard:
"""Stores one chunk of model weights on-chain."""
class State:
shard_id: str = ""
weights: dict = {}
owner: str = ""
@constructor
def create(self, ctx, shard_id: str):
self.state.shard_id = shard_id
self.state.owner = ctx.sender
@call
def store_weights(self, ctx, weights: dict):
if ctx.sender != self.state.owner:
raise RuntimeError("Not authorized")
self.state.weights = weights
self.emit(event.WeightsStored(shard_id=self.state.shard_id))
@query
def get_weights(self) -> dict:
return self.state.weights
The full WeightShard contract adds shape validation, versioning, and size reporting. Each shard stores up to 1MB of weight data (the per-contract state limit is 10MB, but keeping shards small improves gas costs).
The coordinator pattern -- DistributedNanoGPT:
A coordinator contract holds the model config and shard addresses, then orchestrates training by gathering weights from shards, running forward/backward passes, and scattering gradients back:
from panda import contract, constructor, call, query, event
from panda import call_contract, query_contract
@contract
class DistributedNanoGPT:
"""Coordinates training across weight shard contracts."""
class State:
config: dict = {}
shard_addresses: dict = {}
total_steps: int = 0
last_loss: float = 0.0
owner: str = ""
@constructor
def create(self, ctx, config: dict, shard_addresses: dict):
self.state.config = config
self.state.shard_addresses = shard_addresses
self.state.owner = ctx.sender
@call
def train(self, ctx, token_ids: list, learning_rate: float = 0.001):
# 1. Gather weights from all shards
weights = {}
for shard_id, addr in self.state.shard_addresses.items():
shard_weights = query_contract(addr, "get_weights")
weights.update(shard_weights)
# 2. Forward + backward pass (same math as monolithic)
logits, caches = forward_pass(weights, self.state.config, token_ids)
loss, dlogits = cross_entropy_loss(logits, targets)
grads = backward_pass(dlogits, weights, self.state.config, caches)
# 3. Scatter updated weights back to shards
for shard_id, addr in self.state.shard_addresses.items():
shard_grads = extract_shard_grads(grads, shard_id)
call_contract(addr, "apply_gradients",
gradients=shard_grads, learning_rate=learning_rate)
self.state.total_steps += 1
self.state.last_loss = float(loss)
The distributed version produces results identical to the monolithic NanoGPT within floating-point tolerance (1e-10). This is because the math is the same -- we are simply changing where the weights are stored, not how they are used.
Deployment flow:
- Deploy 4
WeightShardcontracts (embed, block_0, block_1, head) - Deploy 1
DistributedNanoGPTcoordinator, passing shard addresses - Call
initialize_weightson the coordinator -- it generates weights and scatters them to shards - Call
trainon the coordinator -- it gathers, trains, and scatters automatically
Pipelined Training
The distributed pattern above still runs the entire forward/backward pass in a single transaction. For larger models, even gathering weights can exceed the gas limit. Pipelined training breaks the pass across multiple transactions -- one per layer.
Minimal example:
from panda import contract, call, query, event
from panda import call_contract, query_contract
from panda.timer import schedule
@contract
class PipelinedTrainer:
"""Breaks forward/backward across transactions, one per layer."""
class State:
shard_addresses: dict = {}
pipeline_state: dict = {}
current_stage: int = -1
@call
def start_forward(self, ctx, token_ids: list):
# Process embedding layer only
embed_w = query_contract(
self.state.shard_addresses["embed"], "get_weights")
activations = embed_forward(embed_w, token_ids)
self.state.pipeline_state = {"activations": activations,
"token_ids": token_ids}
self.state.current_stage = 0
# Auto-trigger next stage in the next block
schedule(self_address(), "forward_stage",
{"stage": 1}, delay_blocks=1)
@call
def forward_stage(self, ctx, stage: int):
shard_id = f"block_{stage}"
w = query_contract(
self.state.shard_addresses[shard_id], "get_weights")
activations = block_forward(
w, self.state.pipeline_state["activations"])
self.state.pipeline_state["activations"] = activations
self.state.current_stage = stage
# Schedule next stage or start backward pass
next_stage = stage + 1
if next_stage < self.state.config["n_layers"]:
schedule(self_address(), "forward_stage",
{"stage": next_stage}, delay_blocks=1)
else:
schedule(self_address(), "start_backward",
{}, delay_blocks=1)
Each transaction touches only 1-2 weight shards, keeping gas costs bounded. The panda.timer.schedule() call automatically triggers the next pipeline stage in the next block, so the training proceeds without manual intervention.
Pipeline stages for a 2-layer model:
| Block | Transaction | Shards Touched | Description |
|---|---|---|---|
| N | start_forward | embed | Embedding lookup + position encoding |
| N+1 | forward_stage(0) | block_0 | Attention + MLP for layer 0 |
| N+2 | forward_stage(1) | block_1 | Attention + MLP for layer 1 |
| N+3 | start_backward | head | LM head + loss + begin backward |
| N+4 | backward_stage(1) | block_1 | Backward through layer 1 |
| N+5 | backward_stage(0) | block_0 | Backward through layer 0 |
| N+6 | apply_updates | all | SGD weight update across all shards |
LoRA Fine-Tuning
Full fine-tuning requires storing and training all model weights. For an 8B-parameter model, that means 8,000 weight shards. LoRA (Low-Rank Adaptation) dramatically reduces this by freezing the base weights and only training tiny adapter matrices.
The math: Instead of training the full weight matrix W, LoRA decomposes the update as:
W_effective = W_base + (alpha / r) * A @ B
Where:
W_baseis the frozen pretrained weight (stored in shards, never updated)Ais a small matrix of shape[d_model, r](r is typically 2-8)Bis a small matrix of shape[r, d_model]alphais a scaling factor- Only
AandBare trained -- a tiny fraction of the total parameters
Minimal example:
from panda import contract, constructor, call, query, event
from panda import query_contract
import numpy as np
@contract
class LoRANanoGPT:
"""Fine-tunes a frozen GPT with Low-Rank Adapters."""
class State:
config: dict = {}
shard_addresses: dict = {}
adapters: dict = {} # Only these are trained
lora_rank: int = 4
lora_alpha: float = 1.0
@constructor
def create(self, ctx, config: dict, shard_addresses: dict,
lora_rank: int = 4, lora_alpha: float = 1.0):
self.state.config = config
self.state.shard_addresses = shard_addresses
self.state.lora_rank = lora_rank
self.state.lora_alpha = lora_alpha
# Initialize tiny adapter matrices (near-zero)
adapters = {}
d = config["d_model"]
r = lora_rank
rng = np.random.RandomState(42)
for i in range(config["n_layers"]):
prefix = f"blocks.{i}"
for proj in ["Wq", "Wk", "Wv", "Wo"]:
key = f"{prefix}.attn.{proj}"
adapters[f"{key}.A"] = (rng.randn(d, r) * 0.01).tolist()
adapters[f"{key}.B"] = np.zeros((r, d)).tolist()
self.state.adapters = adapters
self.emit(event.LoRAInitialized(rank=r, alpha=lora_alpha))
Why this is a game-changer for on-chain ML:
For an 8B-parameter model with d_model=4096 and LoRA rank r=4:
- Full weight matrix: 4096 x 4096 = 16.7M parameters per projection
- LoRA adapters: (4096 x 4) + (4 x 4096) = 32,768 parameters per projection
- 512x reduction in trainable parameters per layer
Instead of 8,000 weight shards for full fine-tuning, you need ~17 adapter shards. The frozen base weights are read-only references that never need gradient updates.
Gradient Accumulation
Even with sharding, a single training step may process too many tokens for one transaction's gas limit. Gradient accumulation splits training into micro-batches: each transaction computes gradients on a small batch and accumulates them, then a final transaction applies the summed gradients.
Minimal example:
from panda import contract, call, query, event
@contract
class GradientAccumulator:
"""Accumulates gradients across micro-batch transactions."""
class State:
accumulated_grads: dict = {}
micro_batch_count: int = 0
target_batches: int = 4
weights: dict = {}
@call
def accumulate(self, ctx, token_ids: list):
"""Process one micro-batch and accumulate gradients."""
grads = compute_gradients(self.state.weights, token_ids)
acc = self.state.accumulated_grads
for key in grads:
if key in acc:
acc[key] = add_arrays(acc[key], grads[key])
else:
acc[key] = grads[key]
self.state.accumulated_grads = acc
self.state.micro_batch_count += 1
self.emit(event.MicroBatchProcessed(
batch=self.state.micro_batch_count))
@call
def apply_gradients(self, ctx, learning_rate: float = 0.001):
"""Average accumulated gradients and update weights."""
n = self.state.micro_batch_count
if n == 0:
raise RuntimeError("No gradients accumulated")
weights = self.state.weights
for key in self.state.accumulated_grads:
avg_grad = scale_array(
self.state.accumulated_grads[key], 1.0 / n)
weights[key] = subtract_arrays(
weights[key], scale_array(avg_grad, learning_rate))
self.state.weights = weights
self.state.accumulated_grads = {}
self.state.micro_batch_count = 0
self.emit(event.GradientsApplied(micro_batches=n))
This pattern keeps per-transaction gas costs bounded regardless of total batch size. A typical flow:
accumulate(batch_1)-- 256 tokens, stores gradientsaccumulate(batch_2)-- 256 tokens, adds to accumulated gradientsaccumulate(batch_3)-- 256 tokensaccumulate(batch_4)-- 256 tokensapply_gradients(lr=0.001)-- averages and applies (effective batch = 1024 tokens)
Quantized Storage
Full-precision weights (float64) consume 8 bytes per parameter. For large models, storage becomes the bottleneck. Quantized weight shards store weights as int8 values with per-tensor scale/zero-point, reducing storage by approximately 4.5x while preserving model quality.
Minimal example:
from panda import contract, call, query, event
@contract
class QuantizedWeightShard:
"""Stores weights in int8 with scale/zero-point metadata."""
class State:
shard_id: str = ""
quantized_weights: dict = {} # int8 values as lists
scales: dict = {} # per-tensor scale factors
zero_points: dict = {} # per-tensor zero points
@call
def store_quantized(self, ctx, name: str, weights: list,
scale: float, zero_point: int):
"""Store a weight tensor in int8 format."""
qw = dict(self.state.quantized_weights)
qw[name] = weights # int8 values [-128, 127]
self.state.quantized_weights = qw
sc = dict(self.state.scales)
sc[name] = scale
self.state.scales = sc
zp = dict(self.state.zero_points)
zp[name] = zero_point
self.state.zero_points = zp
@query
def dequantize(self, name: str) -> list:
"""Reconstruct float weights: w = scale * (q - zero_point)."""
q = self.state.quantized_weights[name]
s = self.state.scales[name]
zp = self.state.zero_points[name]
return [s * (v - zp) for v in q]
Storage savings comparison:
| Format | Bytes/Param | 33K Model | 124M Model | 8B Model |
|---|---|---|---|---|
| float64 | 8 | 264 KB | 992 MB | 64 GB |
| float32 | 4 | 132 KB | 496 MB | 32 GB |
| int8 | 1 + metadata | 34 KB | 126 MB | 8.1 GB |
| int4 | 0.5 + metadata | 18 KB | 64 MB | 4.1 GB |
Quantization adds a small accuracy cost (typically < 1% degradation on perplexity benchmarks). For on-chain ML, the storage savings often make the difference between "fits in shards" and "impossible."
Scaling Analysis
How many contracts does it take to train real-world models on Panda? Assuming 1MB of weight data per shard contract:
| Model | Params | Shards (fp64) | Shards (int4) | With LoRA (trainable) |
|---|---|---|---|---|
| NanoGPT (ours) | 33K | 1 | 1 | N/A |
| GPT-2 Small | 124M | 124 | 7 | 2 |
| LLaMA 8B | 8B | 8,000 | 400 | 17 |
| LLaMA 70B | 70.6B | 70,600 | 3,530 | 150 |
Key takeaways:
- int4 quantization reduces shard count by ~20x vs fp64
- LoRA reduces trainable shards by another ~25x vs quantized full training
- A LLaMA 8B fine-tune with LoRA needs only 17 adapter shards + 400 frozen base shards (read-only, deployed once)
- Pipelined training makes this feasible: each transaction touches only 1-2 shards, bounded gas per tx
The combination of quantized storage + LoRA adapters + pipelined training creates a practical path to fine-tuning billion-parameter models on-chain.
The panda.distributed SDK
The panda.distributed module provides a developer-friendly API for managing weight shards. Instead of manually deploying and calling shard contracts, use the WeightManager:
from panda.distributed import WeightManager
# Initialize with shard addresses
wm = WeightManager(shard_addresses={
"embed": "0xabc...",
"block_0": "0xdef...",
"block_1": "0x123...",
"head": "0x456...",
})
# Gather all weights into a single dict (cross-contract queries)
all_weights = wm.gather()
# After computing gradients, scatter them back to shards
wm.scatter_gradients(gradients, learning_rate=0.001)
# Store new weights to a specific shard
wm.store_weights("block_0", updated_block0_weights)
# Get storage stats across all shards
stats = wm.stats()
# {"total_params": 33024, "total_bytes": 264192, "shard_count": 4}
Full coordinator example using WeightManager:
from panda import contract, constructor, call, query, event
from panda.distributed import WeightManager
import numpy as np
@contract
class DistributedTrainer:
"""Simplified distributed training using panda.distributed SDK."""
class State:
config: dict = {}
shard_addresses: dict = {}
total_steps: int = 0
last_loss: float = 0.0
@constructor
def create(self, ctx, config: dict, shard_addresses: dict):
self.state.config = config
self.state.shard_addresses = shard_addresses
@call
def train(self, ctx, token_ids: list, learning_rate: float = 0.001):
wm = WeightManager(self.state.shard_addresses)
# One line to gather all weights
weights = wm.gather()
# Standard forward/backward (same as monolithic)
logits, caches = forward_pass(weights, self.state.config, token_ids)
loss, dlogits = cross_entropy_loss(logits, get_targets(token_ids))
grads = backward_pass(dlogits, weights, self.state.config, caches)
# One line to scatter gradients back
wm.scatter_gradients(grads, learning_rate=learning_rate)
self.state.total_steps += 1
self.state.last_loss = float(loss)
self.emit(event.TrainingStep(step=self.state.total_steps, loss=float(loss)))
@query
def generate(self, prompt_tokens: list, max_new_tokens: int = 50) -> list:
wm = WeightManager(self.state.shard_addresses)
weights = wm.gather()
return autoregressive_generate(weights, self.state.config,
prompt_tokens, max_new_tokens)
The SDK handles shard-to-weight-key mapping, cross-contract serialization, and gradient partitioning automatically. You write the same forward/backward math as a monolithic contract -- the distribution is transparent.
Automatic Training Orchestration
The previous section covered how to distribute model weights across contracts manually. But what about the training loop itself? Real training runs involve splitting data into batches, processing them across blocks, checkpointing progress, and distributing work across parallel workers. Automatic training orchestration handles all of this: the user provides data and hyperparameters, and the contract manages the rest.
This section covers four contracts that implement the major orchestration patterns, plus the panda.training SDK module that provides reusable primitives for building your own orchestrators.
AutoCheckpointTrainer
The AutoCheckpointTrainer contract wraps a linear regression model with fully automatic multi-block training. Single-block training hits gas limits for large datasets. This contract splits the work across as many blocks as needed, processing exactly one mini-batch per block. Model weights are checkpointed between blocks, so a crash at any point loses at most one batch of work.
Key properties:
- Multi-block async training: Uses
await sleep(blocks=1)to yield after each mini-batch - Automatic checkpointing: Weights saved to state after every batch
- Resumable:
resume_training()picks up from the last checkpoint - Cancellable:
cancel_training()stops training mid-flight (owner only) - Progress queryable:
training_progress()returns percentage, epoch, batch, and loss history
Bare-Bones Example
from panda import contract, constructor, call, query
import numpy as np
@contract
class AutoCheckpointTrainer:
class State:
weights: list = []
bias: float = 0.0
training_in_progress: bool = False
current_epoch: int = 0
current_batch: int = 0
@constructor
def create(self, ctx, input_dim: int = 1):
self.state.weights = [0.0] * input_dim
@call
async def train(self, ctx, x: list, y: list, epochs: int = 1, batch_size: int = 4):
n = len(x)
n_batches = max(1, (n + batch_size - 1) // batch_size)
self.state.training_in_progress = True
for epoch in range(epochs):
for b in range(n_batches):
await sleep(blocks=1) # one batch per block
start = b * batch_size
end = min(start + batch_size, n)
batch_x = np.array(x[start:end])
batch_y = np.array(y[start:end])
w = np.array(self.state.weights)
y_pred = batch_x @ w + self.state.bias
residuals = y_pred - batch_y
dw = (2.0 / len(batch_x)) * (batch_x.T @ residuals)
db = (2.0 / len(batch_x)) * np.sum(residuals)
self.state.weights = (w - 0.01 * dw).tolist()
self.state.bias -= 0.01 * float(db)
self.state.training_in_progress = False
@query
def predict(self, x: list) -> list:
w = np.array(self.state.weights)
return (np.array(x) @ w + self.state.bias).tolist()
Full Contract
"""
AutoCheckpointTrainer -- Automatic multi-block training with checkpointing.
Wraps a linear regression model with automatic training orchestration:
- Splits training data into mini-batches
- Processes one batch per block via ``await sleep(blocks=1)``
- Checkpoints model weights between blocks (crash-safe)
- Resumable: if interrupted, call ``resume_training()``
- Cancellable: owner can call ``cancel_training()`` mid-flight
- Progress queryable mid-flight via ``training_progress()``
This demonstrates the auto-checkpoint pattern: the user provides data
and hyperparameters, and the contract handles all orchestration.
Why this matters:
Single-block training hits gas limits for large datasets. This contract
automatically splits the work across as many blocks as needed, with each
block processing exactly one mini-batch. The model state is saved between
blocks so a crash at any point loses at most one batch of work.
"""
from panda import contract, constructor, call, query, event
import numpy as np
@contract
class AutoCheckpointTrainer:
"""Linear model with automatic multi-block training and checkpointing."""
class State:
# Model weights (linear regression: y = X @ weights + bias)
weights: list = []
bias: float = 0.0
input_dim: int = 0
is_initialized: bool = False
# Training orchestration
training_data_x: list = []
training_data_y: list = []
training_in_progress: bool = False
current_epoch: int = 0
target_epochs: int = 1
current_batch: int = 0
total_batches: int = 0
batch_size: int = 4
learning_rate: float = 0.01
# Checkpoint / metrics
total_steps: int = 0
total_samples: int = 0
last_loss: float = 0.0
epoch_losses: list = []
training_history: list = []
checkpoint_epoch: int = 0
owner: str = ""
@constructor
def create(self, ctx, input_dim: int = 1):
"""Initialize the model with zero weights."""
if input_dim < 1 or input_dim > 1000:
raise ValueError(f"input_dim must be in [1, 1000], got {input_dim}")
self.state.owner = ctx.sender
self.state.input_dim = input_dim
self.state.weights = [0.0] * input_dim
self.state.bias = 0.0
self.state.is_initialized = True
@call
async def train(
self,
ctx,
x: list,
y: list,
epochs: int = 1,
batch_size: int = 4,
learning_rate: float = 0.01,
):
"""Start automatic multi-block training.
Data is split into mini-batches, one batch processed per block.
Model checkpoints automatically between blocks.
Args:
x: Training inputs, list of lists [[f1,f2,...], ...].
y: Training targets, list of floats.
epochs: Number of full passes over the data.
batch_size: Samples per mini-batch.
learning_rate: SGD step size.
"""
if self.state.training_in_progress:
raise ValueError("Training already in progress -- cancel or wait")
if not self.state.is_initialized:
raise ValueError("Model not initialized")
if len(x) == 0 or len(y) == 0:
raise ValueError("Training data must not be empty")
if len(x) != len(y):
raise ValueError(f"x and y must have same length ({len(x)} != {len(y)})")
if epochs < 1:
raise ValueError(f"epochs must be >= 1, got {epochs}")
if batch_size < 1:
raise ValueError(f"batch_size must be >= 1, got {batch_size}")
if learning_rate <= 0.0 or learning_rate > 10.0:
raise ValueError(f"learning_rate must be in (0, 10], got {learning_rate}")
n_samples = len(x)
n_batches = max(1, (n_samples + batch_size - 1) // batch_size)
# Store training state
self.state.training_data_x = x
self.state.training_data_y = y
self.state.target_epochs = epochs
self.state.batch_size = batch_size
self.state.learning_rate = learning_rate
self.state.current_epoch = 0
self.state.current_batch = 0
self.state.total_batches = n_batches
self.state.training_in_progress = True
self.state.epoch_losses = []
# Main training loop: one batch per block
while self.state.current_epoch < self.state.target_epochs:
await sleep(blocks=1)
if not self.state.training_in_progress:
break # Cancelled
# Reload from state (state may have changed between blocks)
data_x = self.state.training_data_x
data_y = self.state.training_data_y
bs = self.state.batch_size
lr = self.state.learning_rate
batch_idx = self.state.current_batch
n = len(data_x)
total_b = self.state.total_batches
# Slice this batch
start = batch_idx * bs
end = min(start + bs, n)
batch_x = np.array(data_x[start:end], dtype=np.float64)
batch_y = np.array(data_y[start:end], dtype=np.float64)
# Forward pass: y_pred = X @ w + b
w = np.array(self.state.weights, dtype=np.float64)
b = float(self.state.bias)
y_pred = batch_x @ w + b
# MSE loss
residuals = y_pred - batch_y
loss = float(np.mean(residuals**2))
n_batch = len(batch_x)
# Backward pass: gradients for MSE
dw = (2.0 / n_batch) * (batch_x.T @ residuals)
db = (2.0 / n_batch) * np.sum(residuals)
# SGD update
w = w - lr * dw
b = b - lr * float(db)
# Checkpoint: save updated weights to state
self.state.weights = w.tolist()
self.state.bias = float(b)
self.state.last_loss = loss
self.state.total_steps += 1
self.state.total_samples += n_batch
# History (keep last 50)
entry = {
"step": self.state.total_steps,
"loss": loss,
"epoch": self.state.current_epoch,
"batch": batch_idx,
"samples": n_batch,
}
history = self.state.training_history
history.append(entry)
if len(history) > 50:
history = history[-50:]
self.state.training_history = history
# Advance batch
self.state.current_batch = batch_idx + 1
# Check epoch completion
if self.state.current_batch >= total_b:
epoch_losses = list(self.state.epoch_losses)
epoch_losses.append(loss)
self.state.epoch_losses = epoch_losses
self.state.current_epoch += 1
self.state.current_batch = 0
self.state.checkpoint_epoch = self.state.current_epoch
# Training complete
self.state.training_in_progress = False
self.state.training_data_x = []
self.state.training_data_y = []
@call
async def resume_training(self, ctx):
"""Resume training from last checkpoint after interruption.
Picks up from the saved epoch/batch position. Requires that
training data is still in state (not cleared by completion).
"""
if self.state.training_in_progress:
raise ValueError("Training is already running")
if not self.state.training_data_x:
raise ValueError(
"No training data to resume from (training completed or never started)"
)
self.state.training_in_progress = True
while self.state.current_epoch < self.state.target_epochs:
await sleep(blocks=1)
if not self.state.training_in_progress:
break
data_x = self.state.training_data_x
data_y = self.state.training_data_y
bs = self.state.batch_size
lr = self.state.learning_rate
batch_idx = self.state.current_batch
n = len(data_x)
total_b = self.state.total_batches
start = batch_idx * bs
end = min(start + bs, n)
batch_x = np.array(data_x[start:end], dtype=np.float64)
batch_y = np.array(data_y[start:end], dtype=np.float64)
w = np.array(self.state.weights, dtype=np.float64)
b = float(self.state.bias)
y_pred = batch_x @ w + b
residuals = y_pred - batch_y
loss = float(np.mean(residuals**2))
n_batch = len(batch_x)
dw = (2.0 / n_batch) * (batch_x.T @ residuals)
db = (2.0 / n_batch) * np.sum(residuals)
w = w - lr * dw
b = b - lr * float(db)
self.state.weights = w.tolist()
self.state.bias = float(b)
self.state.last_loss = loss
self.state.total_steps += 1
self.state.total_samples += n_batch
self.state.current_batch = batch_idx + 1
if self.state.current_batch >= total_b:
epoch_losses = list(self.state.epoch_losses)
epoch_losses.append(loss)
self.state.epoch_losses = epoch_losses
self.state.current_epoch += 1
self.state.current_batch = 0
self.state.checkpoint_epoch = self.state.current_epoch
self.state.training_in_progress = False
self.state.training_data_x = []
self.state.training_data_y = []
@call
def cancel_training(self, ctx):
"""Cancel in-progress training. Owner only."""
if ctx.sender != self.state.owner:
raise ValueError("Only owner can cancel training")
if not self.state.training_in_progress:
raise ValueError("No training in progress")
self.state.training_in_progress = False
# Keep training data for potential resume
event.TrainingCancelled(
epoch=self.state.current_epoch,
batch=self.state.current_batch,
steps=self.state.total_steps,
)
@call
def train_single_batch(self, ctx, x: list, y: list, learning_rate: float = 0.01):
"""Train on a single batch synchronously (no multi-block).
Useful for online learning or when data arrives one batch at a time.
"""
if not self.state.is_initialized:
raise ValueError("Model not initialized")
if len(x) == 0 or len(y) == 0:
raise ValueError("Batch data must not be empty")
if len(x) != len(y):
raise ValueError(f"x and y length mismatch ({len(x)} != {len(y)})")
if learning_rate <= 0.0:
raise ValueError("learning_rate must be positive")
batch_x = np.array(x, dtype=np.float64)
batch_y = np.array(y, dtype=np.float64)
w = np.array(self.state.weights, dtype=np.float64)
b = float(self.state.bias)
y_pred = batch_x @ w + b
residuals = y_pred - batch_y
loss = float(np.mean(residuals**2))
n_batch = len(batch_x)
dw = (2.0 / n_batch) * (batch_x.T @ residuals)
db = (2.0 / n_batch) * np.sum(residuals)
w = w - learning_rate * dw
b = b - learning_rate * float(db)
self.state.weights = w.tolist()
self.state.bias = float(b)
self.state.last_loss = loss
self.state.total_steps += 1
self.state.total_samples += n_batch
return {"loss": loss, "step": self.state.total_steps}
@query
def predict(self, x: list) -> list:
"""Predict targets for input data."""
if not self.state.is_initialized:
raise ValueError("Model not initialized")
X = np.array(x, dtype=np.float64)
w = np.array(self.state.weights, dtype=np.float64)
b = float(self.state.bias)
preds = X @ w + b
return preds.tolist()
@query
def training_progress(self) -> dict:
"""Return current training progress and checkpoint state."""
total_batches = self.state.target_epochs * self.state.total_batches
done_batches = (
self.state.current_epoch * self.state.total_batches + self.state.current_batch
)
progress_pct = (done_batches / total_batches * 100.0) if total_batches > 0 else 0.0
return {
"training_in_progress": self.state.training_in_progress,
"current_epoch": self.state.current_epoch,
"target_epochs": self.state.target_epochs,
"current_batch": self.state.current_batch,
"total_batches": self.state.total_batches,
"total_steps": self.state.total_steps,
"total_samples": self.state.total_samples,
"last_loss": self.state.last_loss,
"checkpoint_epoch": self.state.checkpoint_epoch,
"progress_pct": round(progress_pct, 1),
"epoch_losses": self.state.epoch_losses,
}
@query
def model_info(self) -> dict:
"""Return model configuration and weights."""
return {
"input_dim": self.state.input_dim,
"is_initialized": self.state.is_initialized,
"weights": self.state.weights,
"bias": self.state.bias,
"total_steps": self.state.total_steps,
"total_samples": self.state.total_samples,
"last_loss": self.state.last_loss,
"owner": self.state.owner,
}
State Fields
| Field | Type | Description |
|---|---|---|
weights | list | Model weight vector (one per input feature) |
bias | float | Model bias term |
input_dim | int | Number of input features |
is_initialized | bool | Whether create() has been called |
training_data_x | list | Stored training inputs (cleared on completion) |
training_data_y | list | Stored training targets (cleared on completion) |
training_in_progress | bool | Whether an async training loop is active |
current_epoch | int | Current epoch index |
target_epochs | int | Total epochs requested |
current_batch | int | Current batch index within the epoch |
total_batches | int | Batches per epoch |
batch_size | int | Samples per mini-batch |
learning_rate | float | SGD step size |
total_steps | int | Total gradient steps taken (across all training runs) |
total_samples | int | Total samples processed |
last_loss | float | MSE loss from the most recent batch |
epoch_losses | list | Loss at the end of each epoch |
training_history | list | Last 50 training step records |
checkpoint_epoch | int | Last fully completed epoch |
owner | str | Deployer address |
Methods
| Method | Decorator | Description |
|---|---|---|
create(input_dim) | @constructor | Initialize zero-weight model with given input dimension |
train(x, y, epochs, batch_size, learning_rate) | @call async | Start multi-block training -- one batch per block with automatic checkpointing |
resume_training() | @call async | Resume from last checkpoint after interruption or crash |
cancel_training() | @call | Stop training mid-flight (owner only). Emits TrainingCancelled event |
train_single_batch(x, y, learning_rate) | @call | Synchronous single-batch training (no multi-block) |
predict(x) | @query | Run inference: returns X @ weights + bias |
training_progress() | @query | Returns progress percentage, epoch/batch counters, loss history |
model_info() | @query | Returns weights, bias, training stats, owner address |
DataParallelTrainer
The DataParallelTrainer contract implements the standard data-parallel distributed training pattern from distributed ML, adapted for on-chain execution. A coordinator contract holds the model weights, splits training data across registered TrainingWorker contracts, collects their gradients, averages them (all-reduce mean), and applies a single SGD update.
Key properties:
- Data-parallel distribution: Splits data evenly across workers; each computes gradients on its shard
- All-reduce gradient averaging: Coordinator gathers and averages gradients from all workers
- Local fallback: When no workers are registered, trains locally in a single transaction
- Owner-controlled: Only the owner can register/remove workers and trigger training
- Scalable: Linearly scales with number of workers (up to
max_workers)
Bare-Bones Example
from panda import contract, constructor, call, query, call_contract
import numpy as np
@contract
class DataParallelTrainer:
class State:
weights: list = []
bias: float = 0.0
workers: list = []
owner: str = ""
@constructor
def create(self, ctx, input_dim: int = 1):
self.state.owner = ctx.sender
self.state.weights = [0.0] * input_dim
@call
def register_worker(self, ctx, worker_address: str):
if ctx.sender != self.state.owner:
raise ValueError("Only owner can register workers")
workers = list(self.state.workers)
workers.append(worker_address)
self.state.workers = workers
@call
def train_round(self, ctx, x: list, y: list, learning_rate: float = 0.01):
n_workers = len(self.state.workers)
shard_size = max(1, len(x) // n_workers)
all_dw, all_db = [], []
for i, addr in enumerate(self.state.workers):
start = i * shard_size
end = len(x) if i == n_workers - 1 else start + shard_size
result = call_contract(addr, "compute_gradients",
x=x[start:end], y=y[start:end],
weights=self.state.weights,
bias=self.state.bias)
all_dw.append(result["dw"])
all_db.append(result["db"])
avg_dw = np.mean(np.array(all_dw), axis=0)
avg_db = float(np.mean(all_db))
w = np.array(self.state.weights) - learning_rate * avg_dw
self.state.weights = w.tolist()
self.state.bias -= learning_rate * avg_db
Full Contract
"""
DataParallelTrainer -- Automatic data-parallel distributed training.
Splits training data across multiple worker contracts, each computing
gradients independently. The coordinator averages gradients from all
workers and applies a single SGD update to the shared model.
This is the standard data-parallel training pattern from distributed ML,
implemented on-chain via cross-contract calls:
1. Coordinator holds the model weights
2. Each ``register_worker()`` call adds a worker address
3. ``distribute_and_train(data, ...)`` splits data evenly across workers
4. Each worker computes gradients on its data shard
5. Coordinator gathers gradients, averages them, applies SGD
6. Repeat for each epoch
Benefits over single-contract training:
- Each worker processes less data per transaction (lower gas)
- Workers can run in parallel across different blocks
- Scales linearly with number of workers
- Natural fit for multi-party training where each party has private data
Security model:
- Only the owner can register/remove workers
- Only the owner can trigger training rounds
- Workers must be pre-registered (prevents gradient poisoning by strangers)
"""
from panda import contract, constructor, call, query, call_contract
import numpy as np
@contract
class DataParallelTrainer:
"""Coordinator for data-parallel distributed training."""
class State:
# Model (linear regression for simplicity)
weights: list = []
bias: float = 0.0
input_dim: int = 0
is_initialized: bool = False
# Workers
workers: list = []
max_workers: int = 8
# Training state
total_rounds: int = 0
total_samples: int = 0
last_loss: float = 0.0
training_history: list = []
owner: str = ""
@constructor
def create(self, ctx, input_dim: int = 1, max_workers: int = 8):
"""Initialize the coordinator with a linear model."""
if input_dim < 1 or input_dim > 1000:
raise ValueError(f"input_dim must be in [1, 1000], got {input_dim}")
if max_workers < 1 or max_workers > 32:
raise ValueError(f"max_workers must be in [1, 32], got {max_workers}")
self.state.owner = ctx.sender
self.state.input_dim = input_dim
self.state.max_workers = max_workers
self.state.weights = [0.0] * input_dim
self.state.bias = 0.0
self.state.is_initialized = True
@call
def register_worker(self, ctx, worker_address: str):
"""Register a worker contract for data-parallel training."""
if ctx.sender != self.state.owner:
raise ValueError("Only owner can register workers")
worker_address = worker_address.strip()
if not worker_address:
raise ValueError("Worker address must not be empty")
if worker_address in self.state.workers:
raise ValueError(f"Worker {worker_address} already registered")
if len(self.state.workers) >= self.state.max_workers:
raise ValueError(f"Max workers ({self.state.max_workers}) reached")
workers = list(self.state.workers)
workers.append(worker_address)
self.state.workers = workers
@call
def remove_worker(self, ctx, worker_address: str):
"""Remove a worker from the pool."""
if ctx.sender != self.state.owner:
raise ValueError("Only owner can remove workers")
workers = list(self.state.workers)
if worker_address not in workers:
raise ValueError(f"Worker {worker_address} not registered")
workers.remove(worker_address)
self.state.workers = workers
@call
def train_round(self, ctx, x: list, y: list, learning_rate: float = 0.01):
"""Run one round of data-parallel training.
Splits data evenly across registered workers, sends each worker
its shard with the current model weights, collects gradients,
averages them, and applies a single SGD update.
Args:
x: Training inputs [[f1,f2,...], ...].
y: Training targets [t1, t2, ...].
learning_rate: SGD step size.
"""
if ctx.sender != self.state.owner:
raise ValueError("Only owner can trigger training")
if not self.state.is_initialized:
raise ValueError("Model not initialized")
if len(x) == 0 or len(y) == 0:
raise ValueError("Training data must not be empty")
if len(x) != len(y):
raise ValueError(f"x and y length mismatch ({len(x)} != {len(y)})")
if learning_rate <= 0.0:
raise ValueError("learning_rate must be positive")
workers = self.state.workers
n_workers = len(workers)
if n_workers == 0:
# No workers -- train locally (single-process fallback)
return self._train_local(x, y, learning_rate)
n = len(x)
shard_size = max(1, n // n_workers)
# Current model weights
current_weights = list(self.state.weights)
current_bias = float(self.state.bias)
# Distribute data to workers and collect gradients
all_dw = []
all_db = []
total_loss = 0.0
total_n = 0
for i, worker_addr in enumerate(workers):
start = i * shard_size
if i == n_workers - 1:
end = n # Last worker gets remainder
else:
end = min(start + shard_size, n)
if start >= n:
break # More workers than data points
shard_x = x[start:end]
shard_y = y[start:end]
# Send data shard + current weights to worker
result = call_contract(
worker_addr,
"compute_gradients",
x=shard_x,
y=shard_y,
weights=current_weights,
bias=current_bias,
)
if result and isinstance(result, dict):
all_dw.append(result.get("dw", [0.0] * self.state.input_dim))
all_db.append(result.get("db", 0.0))
total_loss += result.get("loss", 0.0) * len(shard_x)
total_n += len(shard_x)
if total_n == 0:
return {"loss": 0.0, "round": self.state.total_rounds}
# Average gradients across workers (all-reduce mean)
avg_dw = np.mean(np.array(all_dw), axis=0)
avg_db = float(np.mean(all_db))
avg_loss = total_loss / total_n
# SGD update
w = np.array(self.state.weights, dtype=np.float64)
w = w - learning_rate * avg_dw
b = current_bias - learning_rate * avg_db
self.state.weights = w.tolist()
self.state.bias = float(b)
self.state.last_loss = avg_loss
self.state.total_rounds += 1
self.state.total_samples += total_n
# History
entry = {
"round": self.state.total_rounds,
"loss": avg_loss,
"samples": total_n,
"workers_used": min(n_workers, n),
}
history = self.state.training_history
history.append(entry)
if len(history) > 50:
history = history[-50:]
self.state.training_history = history
return {"loss": avg_loss, "round": self.state.total_rounds}
def _train_local(self, x, y, learning_rate):
"""Fallback: train locally without workers."""
X = np.array(x, dtype=np.float64)
Y = np.array(y, dtype=np.float64)
w = np.array(self.state.weights, dtype=np.float64)
b = float(self.state.bias)
y_pred = X @ w + b
residuals = y_pred - Y
loss = float(np.mean(residuals**2))
n = len(X)
dw = (2.0 / n) * (X.T @ residuals)
db = (2.0 / n) * float(np.sum(residuals))
w = w - learning_rate * dw
b = b - learning_rate * db
self.state.weights = w.tolist()
self.state.bias = float(b)
self.state.last_loss = loss
self.state.total_rounds += 1
self.state.total_samples += n
entry = {"round": self.state.total_rounds, "loss": loss, "samples": n, "workers_used": 0}
history = self.state.training_history
history.append(entry)
if len(history) > 50:
history = history[-50:]
self.state.training_history = history
return {"loss": loss, "round": self.state.total_rounds}
@query
def predict(self, x: list) -> list:
"""Predict targets for input data."""
X = np.array(x, dtype=np.float64)
w = np.array(self.state.weights, dtype=np.float64)
preds = X @ w + float(self.state.bias)
return preds.tolist()
@query
def training_status(self) -> dict:
"""Return training status."""
return {
"total_rounds": self.state.total_rounds,
"total_samples": self.state.total_samples,
"last_loss": self.state.last_loss,
"num_workers": len(self.state.workers),
"workers": list(self.state.workers),
"weights": self.state.weights,
"bias": self.state.bias,
}
@query
def model_info(self) -> dict:
"""Return model info."""
return {
"input_dim": self.state.input_dim,
"is_initialized": self.state.is_initialized,
"total_rounds": self.state.total_rounds,
"total_samples": self.state.total_samples,
"last_loss": self.state.last_loss,
"num_workers": len(self.state.workers),
"max_workers": self.state.max_workers,
"owner": self.state.owner,
}
State Fields
| Field | Type | Description |
|---|---|---|
weights | list | Model weight vector |
bias | float | Model bias term |
input_dim | int | Number of input features |
is_initialized | bool | Whether the model has been initialized |
workers | list | Registered worker contract addresses |
max_workers | int | Maximum number of workers allowed |
total_rounds | int | Total training rounds completed |
total_samples | int | Total training samples processed |
last_loss | float | Loss from the most recent round |
training_history | list | Last 50 training round records |
owner | str | Deployer address |
Methods
| Method | Decorator | Description |
|---|---|---|
create(input_dim, max_workers) | @constructor | Initialize coordinator with a linear model |
register_worker(worker_address) | @call | Add a worker contract to the pool (owner only) |
remove_worker(worker_address) | @call | Remove a worker from the pool (owner only) |
train_round(x, y, learning_rate) | @call | Run one round of data-parallel training across all workers |
predict(x) | @query | Run inference with current weights |
training_status() | @query | Returns round count, loss, worker list, current weights |
model_info() | @query | Returns model config, training stats, owner |
TrainingWorker
The TrainingWorker contract is a stateless gradient computation worker used by DataParallelTrainer. It receives a data shard and current model weights from the coordinator, computes MSE gradients locally, and returns them. The worker holds no persistent model -- the coordinator maintains the canonical weights.
Bare-Bones Example
from panda import contract, constructor, call, query
import numpy as np
@contract
class TrainingWorker:
class State:
coordinator: str = ""
total_computations: int = 0
@constructor
def create(self, ctx, coordinator: str = ""):
self.state.coordinator = coordinator
@call
def compute_gradients(self, ctx, x: list, y: list, weights: list, bias: float = 0.0):
X = np.array(x, dtype=np.float64)
Y = np.array(y, dtype=np.float64)
w = np.array(weights, dtype=np.float64)
y_pred = X @ w + bias
residuals = y_pred - Y
n = len(X)
dw = (2.0 / n) * (X.T @ residuals)
db = (2.0 / n) * float(np.sum(residuals))
loss = float(np.mean(residuals**2))
self.state.total_computations += 1
return {"dw": dw.tolist(), "db": db, "loss": loss}
Full Contract
"""
TrainingWorker -- Worker contract for data-parallel distributed training.
Receives a data shard and current model weights from the coordinator,
computes gradients locally, and returns them. Stateless per training
round (no persistent model -- the coordinator holds the canonical model).
Used by DataParallelTrainer as a compute node in the data-parallel topology.
"""
from panda import contract, constructor, call, query
import numpy as np
@contract
class TrainingWorker:
"""Stateless gradient computation worker for data-parallel training."""
class State:
coordinator: str = ""
worker_id: str = ""
total_computations: int = 0
owner: str = ""
@constructor
def create(self, ctx, coordinator: str = "", worker_id: str = ""):
"""Initialize the worker with its coordinator address."""
self.state.owner = ctx.sender
self.state.coordinator = coordinator.strip()
self.state.worker_id = worker_id.strip() or ctx.sender
@call
def compute_gradients(self, ctx, x: list, y: list, weights: list, bias: float = 0.0):
"""Compute gradients on a data shard.
Args:
x: Input features [[f1,f2,...], ...].
y: Target values [t1, t2, ...].
weights: Current model weights from coordinator.
bias: Current model bias from coordinator.
Returns:
Dict with dw (weight gradients), db (bias gradient), loss.
"""
if len(x) == 0 or len(y) == 0:
raise ValueError("Data shard must not be empty")
if len(x) != len(y):
raise ValueError(f"x and y length mismatch ({len(x)} != {len(y)})")
X = np.array(x, dtype=np.float64)
Y = np.array(y, dtype=np.float64)
w = np.array(weights, dtype=np.float64)
b = float(bias)
# Forward pass
y_pred = X @ w + b
residuals = y_pred - Y
loss = float(np.mean(residuals**2))
n = len(X)
# Backward pass (MSE gradients)
dw = (2.0 / n) * (X.T @ residuals)
db = (2.0 / n) * float(np.sum(residuals))
self.state.total_computations += 1
return {"dw": dw.tolist(), "db": db, "loss": loss, "samples": n}
@query
def worker_info(self) -> dict:
"""Return worker metadata."""
return {
"worker_id": self.state.worker_id,
"coordinator": self.state.coordinator,
"total_computations": self.state.total_computations,
"owner": self.state.owner,
}
State Fields
| Field | Type | Description |
|---|---|---|
coordinator | str | Address of the coordinator contract |
worker_id | str | Identifier for this worker |
total_computations | int | Number of gradient computations performed |
owner | str | Deployer address |
Methods
| Method | Decorator | Description |
|---|---|---|
create(coordinator, worker_id) | @constructor | Initialize worker with coordinator address and ID |
compute_gradients(x, y, weights, bias) | @call | Compute MSE gradients on a data shard and return {dw, db, loss, samples} |
worker_info() | @query | Return worker metadata (ID, coordinator, computation count) |
Deployment flow for data-parallel training:
- Deploy N
TrainingWorkercontracts, passing the coordinator address - Deploy 1
DataParallelTrainercoordinator - Call
register_worker(worker_addr)for each worker - Call
train_round(x, y, learning_rate=0.01)-- the coordinator splits data, dispatches to workers, gathers gradients, and updates the model - Repeat
train_roundfor each epoch
AutoShardTrainer
The AutoShardTrainer contract implements a 2-layer MLP neural network with weights automatically distributed across WeightShard contracts. The user provides model dimensions and training data; the contract handles deploying shards, initializing weights (Xavier/He), gathering weights for the forward pass, computing gradients, and scattering updates back to shards.
Key properties:
- 2-layer MLP: Input -> Linear -> ReLU -> Linear -> Output
- Automatic weight sharding: Weights stored in external
WeightShardcontracts via cross-contract calls - Xavier/He initialization: Layer 0 uses Xavier init, layer 1 uses He init
- Gather/scatter pattern: Weights gathered before forward pass, gradients scattered after backward pass
- Transparent distribution:
train(x, y)looks the same as a monolithic contract -- sharding is invisible
Bare-Bones Example
from panda import contract, constructor, call, query, call_contract, query_contract
import numpy as np
@contract
class AutoShardTrainer:
class State:
config: dict = {}
shard_map: dict = {}
is_initialized: bool = False
@constructor
def create(self, ctx, input_dim: int = 4, hidden_dim: int = 8, output_dim: int = 1,
shard_addresses: dict = None):
self.state.config = {"input_dim": input_dim, "hidden_dim": hidden_dim,
"output_dim": output_dim}
self.state.shard_map = shard_addresses or {}
@call
def train(self, ctx, x: list, y: list, learning_rate: float = 0.01):
shard_map = self.state.shard_map
# Gather weights from shards
l0 = query_contract(shard_map["layer_0"], "get_weights")
l1 = query_contract(shard_map["layer_1"], "get_weights")
W0, b0 = np.array(l0["W0"]), np.array(l0["b0"])
W1, b1 = np.array(l1["W1"]), np.array(l1["b1"])
X, Y = np.array(x), np.array(y).reshape(-1, 1)
# Forward: Linear -> ReLU -> Linear
z0 = X @ W0 + b0
a0 = np.maximum(0, z0)
z1 = a0 @ W1 + b1
# Backward
residuals = z1 - Y
dz1 = (2.0 / len(X)) * residuals
dW1, db1 = a0.T @ dz1, np.sum(dz1, axis=0)
dz0 = (dz1 @ W1.T) * (z0 > 0).astype(float)
dW0, db0 = X.T @ dz0, np.sum(dz0, axis=0)
# Scatter gradients back
call_contract(shard_map["layer_0"], "apply_gradients",
gradients={"W0": dW0.tolist(), "b0": db0.tolist()},
learning_rate=learning_rate)
call_contract(shard_map["layer_1"], "apply_gradients",
gradients={"W1": dW1.tolist(), "b1": db1.tolist()},
learning_rate=learning_rate)
Full Contract
"""
AutoShardTrainer -- Automatic weight sharding for large model training.
Manages a multi-layer neural network where weights are automatically
distributed across WeightShard contracts. The user provides model config
and training data; the contract handles:
1. Deploying weight shard contracts (one per layer group)
2. Initializing weights with Xavier/He initialization
3. Gathering weights for forward pass
4. Computing gradients
5. Scattering gradients back to the correct shards
This is the auto-sharding pattern: given a model config, the contract
automatically creates the right number of shards, distributes weights,
and manages all gather/scatter operations transparently.
The user just calls ``train(x, y)`` and the orchestration is invisible.
"""
from panda import contract, constructor, call, query, call_contract, query_contract
import numpy as np
@contract
class AutoShardTrainer:
"""Neural network with automatic weight sharding across contracts."""
class State:
config: dict = {}
shard_map: dict = {}
is_initialized: bool = False
total_steps: int = 0
total_samples: int = 0
last_loss: float = 0.0
training_history: list = []
owner: str = ""
@constructor
def create(
self,
ctx,
input_dim: int = 4,
hidden_dim: int = 8,
output_dim: int = 1,
shard_addresses: dict = None,
):
"""Initialize the auto-sharded model.
Args:
input_dim: Number of input features.
hidden_dim: Hidden layer size.
output_dim: Number of outputs.
shard_addresses: Pre-deployed shard addresses
{"layer_0": addr, "layer_1": addr}.
"""
if input_dim < 1 or input_dim > 1000:
raise ValueError(f"input_dim must be in [1, 1000], got {input_dim}")
if hidden_dim < 1 or hidden_dim > 1000:
raise ValueError(f"hidden_dim must be in [1, 1000], got {hidden_dim}")
if output_dim < 1 or output_dim > 100:
raise ValueError(f"output_dim must be in [1, 100], got {output_dim}")
self.state.owner = ctx.sender
self.state.config = {
"input_dim": input_dim,
"hidden_dim": hidden_dim,
"output_dim": output_dim,
}
if shard_addresses is None:
shard_addresses = {}
self.state.shard_map = shard_addresses
@call
def register_shard(self, ctx, shard_name: str, shard_address: str):
"""Register a weight shard contract."""
if ctx.sender != self.state.owner:
raise ValueError("Only owner can register shards")
shard_map = dict(self.state.shard_map)
shard_map[shard_name] = shard_address.strip()
self.state.shard_map = shard_map
@call
def initialize_weights(self, ctx):
"""Generate and distribute initial weights to shards.
Uses Xavier initialization for the first layer and He initialization
for the output layer.
"""
if self.state.is_initialized:
raise ValueError("Already initialized")
shard_map = self.state.shard_map
if "layer_0" not in shard_map or "layer_1" not in shard_map:
raise ValueError("Must register layer_0 and layer_1 shards first")
config = self.state.config
input_dim = config["input_dim"]
hidden_dim = config["hidden_dim"]
output_dim = config["output_dim"]
rng = np.random.RandomState(42)
# Layer 0: input -> hidden (Xavier init)
scale_0 = np.sqrt(2.0 / (input_dim + hidden_dim))
W0 = (rng.randn(input_dim, hidden_dim) * scale_0).tolist()
b0 = np.zeros(hidden_dim).tolist()
call_contract(
shard_map["layer_0"],
"store_weights",
weights={"W0": W0, "b0": b0},
)
# Layer 1: hidden -> output (He init)
scale_1 = np.sqrt(2.0 / hidden_dim)
W1 = (rng.randn(hidden_dim, output_dim) * scale_1).tolist()
b1 = np.zeros(output_dim).tolist()
call_contract(
shard_map["layer_1"],
"store_weights",
weights={"W1": W1, "b1": b1},
)
self.state.is_initialized = True
@call
def train(self, ctx, x: list, y: list, learning_rate: float = 0.01):
"""Train on a batch. Weights are gathered from shards, gradients scattered back.
Args:
x: Input data [[f1,f2,...], ...].
y: Target data [[t1,...], ...] or [t1, t2, ...].
learning_rate: SGD step size.
Returns:
dict with loss and step count.
"""
if not self.state.is_initialized:
raise ValueError("Model not initialized")
if len(x) == 0 or len(y) == 0:
raise ValueError("Training data must not be empty")
if len(x) != len(y):
raise ValueError(f"x and y length mismatch ({len(x)} != {len(y)})")
if learning_rate <= 0.0:
raise ValueError("learning_rate must be positive")
shard_map = self.state.shard_map
config = self.state.config
output_dim = config["output_dim"]
# Gather weights from shards
layer0_weights = query_contract(shard_map["layer_0"], "get_weights")
layer1_weights = query_contract(shard_map["layer_1"], "get_weights")
W0 = np.array(layer0_weights["W0"])
b0 = np.array(layer0_weights["b0"])
W1 = np.array(layer1_weights["W1"])
b1 = np.array(layer1_weights["b1"])
X = np.array(x, dtype=np.float64)
Y = np.array(y, dtype=np.float64)
if Y.ndim == 1 and output_dim == 1:
Y = Y.reshape(-1, 1)
n = len(X)
# Forward pass: X -> Linear -> ReLU -> Linear
z0 = X @ W0 + b0 # [n, hidden]
a0 = np.maximum(0, z0) # ReLU
z1 = a0 @ W1 + b1 # [n, output]
# MSE loss
residuals = z1 - Y
loss = float(np.mean(residuals**2))
# Backward pass
dz1 = (2.0 / n) * residuals # [n, output]
dW1 = a0.T @ dz1 # [hidden, output]
db1 = np.sum(dz1, axis=0) # [output]
da0 = dz1 @ W1.T # [n, hidden]
dz0 = da0 * (z0 > 0).astype(float) # ReLU backward
dW0 = X.T @ dz0 # [input, hidden]
db0 = np.sum(dz0, axis=0) # [hidden]
# Scatter gradients back to shards
call_contract(
shard_map["layer_0"],
"apply_gradients",
gradients={"W0": dW0.tolist(), "b0": db0.tolist()},
learning_rate=learning_rate,
)
call_contract(
shard_map["layer_1"],
"apply_gradients",
gradients={"W1": dW1.tolist(), "b1": db1.tolist()},
learning_rate=learning_rate,
)
# Update stats
self.state.total_steps += 1
self.state.total_samples += n
self.state.last_loss = loss
entry = {"step": self.state.total_steps, "loss": loss, "samples": n}
history = self.state.training_history
history.append(entry)
if len(history) > 50:
history = history[-50:]
self.state.training_history = history
return {"loss": loss, "step": self.state.total_steps}
@query
def predict(self, x: list) -> list:
"""Forward pass for inference (gathers weights from shards)."""
if not self.state.is_initialized:
raise ValueError("Model not initialized")
shard_map = self.state.shard_map
layer0 = query_contract(shard_map["layer_0"], "get_weights")
layer1 = query_contract(shard_map["layer_1"], "get_weights")
W0 = np.array(layer0["W0"])
b0 = np.array(layer0["b0"])
W1 = np.array(layer1["W1"])
b1 = np.array(layer1["b1"])
X = np.array(x, dtype=np.float64)
z0 = X @ W0 + b0
a0 = np.maximum(0, z0)
z1 = a0 @ W1 + b1
return z1.tolist()
@query
def model_info(self) -> dict:
"""Return model configuration and training stats."""
return {
"config": self.state.config,
"shard_map": self.state.shard_map,
"is_initialized": self.state.is_initialized,
"total_steps": self.state.total_steps,
"total_samples": self.state.total_samples,
"last_loss": self.state.last_loss,
"owner": self.state.owner,
}
State Fields
| Field | Type | Description |
|---|---|---|
config | dict | Model dimensions: {input_dim, hidden_dim, output_dim} |
shard_map | dict | Maps shard names ("layer_0", "layer_1") to contract addresses |
is_initialized | bool | Whether weights have been generated and stored in shards |
total_steps | int | Total training steps completed |
total_samples | int | Total samples processed |
last_loss | float | MSE loss from the most recent training step |
training_history | list | Last 50 training step records |
owner | str | Deployer address |
Methods
| Method | Decorator | Description |
|---|---|---|
create(input_dim, hidden_dim, output_dim, shard_addresses) | @constructor | Initialize model config and optional pre-deployed shard addresses |
register_shard(shard_name, shard_address) | @call | Register a WeightShard contract for a layer (owner only) |
initialize_weights() | @call | Generate Xavier/He weights and scatter to shard contracts |
train(x, y, learning_rate) | @call | Gather weights, forward/backward pass, scatter gradients |
predict(x) | @query | Forward pass for inference (gathers weights from shards) |
model_info() | @query | Returns config, shard map, training stats, owner |
Deployment flow for auto-sharded training:
- Deploy 2
WeightShardcontracts (one per layer) - Deploy 1
AutoShardTrainer, passing shard addresses (or register them after) - Call
initialize_weights()-- generates Xavier/He weights and scatters to shards - Call
train(x, y)-- gathers weights, runs forward/backward, scatters gradients - Call
predict(x)-- gathers weights and runs forward pass for inference
The panda.training SDK
The panda.training module provides reusable primitives for building training orchestration into any contract. Instead of implementing batching, checkpointing, and sharding logic from scratch, import the SDK utilities.
Available classes and functions:
DataLoader: Sliding-window mini-batch iterator with epoch managementTrainingConfig: Hyperparameter configuration dataclassCheckpointState: Resumable training progress trackerestimate_model_size(): Check if a model fits within the 10MB state limit before deployingpartition_epochs(): Compute how many blocks a training run needsauto_shard(): DeployWeightShardcontracts automatically and return a shard map
Quick usage example:
from panda.training import DataLoader, TrainingConfig, CheckpointState
# Configure training
config = TrainingConfig(epochs=10, batch_size=4, learning_rate=0.001)
# Create a data loader from token IDs
loader = DataLoader(token_ids, block_size=8, batch_size=config.batch_size)
# Track checkpoint state
checkpoint = CheckpointState(total_epochs=config.epochs,
batches_per_epoch=len(loader))
for epoch in range(config.epochs):
for batch_x, batch_y in loader:
loss = train_step(batch_x, batch_y)
checkpoint.advance_batch(loss, tokens_processed=len(batch_x))
loader.reset()
print(f"Done: {checkpoint.progress_pct}%")
Full SDK Source
"""
panda.training -- Training orchestration utilities for Panda smart contracts.
Provides higher-level abstractions for splitting long-running model training
across multiple blockchain blocks/transactions. Handles data batching,
checkpoint management, and automatic weight sharding.
Available utilities:
- ``DataLoader``: Splits training data into mini-batches with epoch management.
- ``TrainingConfig``: Configuration dataclass for training hyperparameters.
- ``CheckpointState``: Tracks checkpoint progress for resumable training.
- ``auto_shard``: Deploys WeightShard contracts and returns shard map.
- ``estimate_model_size``: Estimates state size for a given model config.
- ``partition_epochs``: Splits epochs into per-block work units.
Usage::
from panda.training import DataLoader, TrainingConfig, auto_shard
config = TrainingConfig(epochs=10, batch_size=4, learning_rate=0.001)
loader = DataLoader(token_ids, block_size=8, batch_size=4)
for epoch in range(config.epochs):
for batch_x, batch_y in loader:
# ... train on batch ...
pass
loader.reset()
"""
class TrainingConfig:
"""Configuration for a training run.
Attributes:
epochs: Number of full passes over the training data.
batch_size: Number of sequences per mini-batch.
learning_rate: SGD step size.
checkpoint_interval: Save checkpoint every N batches (0 = every batch).
max_batches_per_block: Max batches to process per block (0 = unlimited).
grad_accumulation_steps: Accumulate gradients over N steps before update.
"""
def __init__(
self,
epochs=1,
batch_size=4,
learning_rate=0.001,
checkpoint_interval=0,
max_batches_per_block=1,
grad_accumulation_steps=1,
):
if epochs < 1:
raise ValueError(f"epochs must be >= 1, got {epochs}")
if batch_size < 1:
raise ValueError(f"batch_size must be >= 1, got {batch_size}")
if learning_rate <= 0.0 or learning_rate > 10.0:
raise ValueError(f"learning_rate must be in (0, 10], got {learning_rate}")
if max_batches_per_block < 0:
raise ValueError(f"max_batches_per_block must be >= 0, got {max_batches_per_block}")
if grad_accumulation_steps < 1:
raise ValueError(f"grad_accumulation_steps must be >= 1, got {grad_accumulation_steps}")
self.epochs = epochs
self.batch_size = batch_size
self.learning_rate = learning_rate
self.checkpoint_interval = checkpoint_interval
self.max_batches_per_block = max_batches_per_block
self.grad_accumulation_steps = grad_accumulation_steps
def to_dict(self):
return {
"epochs": self.epochs,
"batch_size": self.batch_size,
"learning_rate": self.learning_rate,
"checkpoint_interval": self.checkpoint_interval,
"max_batches_per_block": self.max_batches_per_block,
"grad_accumulation_steps": self.grad_accumulation_steps,
}
@classmethod
def from_dict(cls, d):
return cls(**{k: v for k, v in d.items() if k in cls.__init__.__code__.co_varnames})
class DataLoader:
"""Splits training data into mini-batches for sequential processing.
Creates sliding-window sequences from a flat token list, groups them
into batches, and provides iteration with epoch tracking.
Args:
token_ids: Flat list of integer token IDs.
block_size: Context window size (sequence length).
batch_size: Number of sequences per batch.
Usage::
loader = DataLoader([0,1,2,3,4,5,6,7,8,9], block_size=4, batch_size=2)
for batch_x, batch_y in loader:
# batch_x: list of input sequences
# batch_y: list of target sequences
pass
"""
def __init__(self, token_ids, block_size, batch_size=1):
if len(token_ids) < block_size + 1:
raise ValueError(
f"Need at least block_size+1 ({block_size + 1}) tokens, got {len(token_ids)}"
)
if block_size < 1:
raise ValueError(f"block_size must be >= 1, got {block_size}")
if batch_size < 1:
raise ValueError(f"batch_size must be >= 1, got {batch_size}")
self.token_ids = list(token_ids)
self.block_size = block_size
self.batch_size = batch_size
# Compute all possible sequences
self.n_sequences = len(token_ids) - block_size
self.n_batches = max(1, (self.n_sequences + batch_size - 1) // batch_size)
self._batch_idx = 0
def __len__(self):
return self.n_batches
def __iter__(self):
self._batch_idx = 0
return self
def __next__(self):
if self._batch_idx >= self.n_batches:
raise StopIteration
start = self._batch_idx * self.batch_size
end = min(start + self.batch_size, self.n_sequences)
batch_x = []
batch_y = []
for s in range(start, end):
seq = self.token_ids[s : s + self.block_size + 1]
batch_x.append(seq[:-1])
batch_y.append(seq[1:])
self._batch_idx += 1
return batch_x, batch_y
def reset(self):
"""Reset to beginning of data (for new epoch)."""
self._batch_idx = 0
@property
def current_batch(self):
return self._batch_idx
@property
def progress(self):
"""Return progress as fraction [0, 1]."""
if self.n_batches == 0:
return 1.0
return self._batch_idx / self.n_batches
class CheckpointState:
"""Tracks training checkpoint state for resumable training.
Stores epoch/batch progress, accumulated metrics, and supports
save/restore from contract state dicts.
Args:
total_epochs: Target number of epochs.
batches_per_epoch: Number of batches per epoch.
"""
def __init__(self, total_epochs=1, batches_per_epoch=1):
self.total_epochs = total_epochs
self.batches_per_epoch = batches_per_epoch
self.current_epoch = 0
self.current_batch = 0
self.total_steps = 0
self.total_tokens = 0
self.epoch_losses = []
self.last_loss = 0.0
self.is_complete = False
self.is_cancelled = False
def advance_batch(self, loss, tokens_processed):
"""Record a completed batch and advance counters."""
self.total_steps += 1
self.total_tokens += tokens_processed
self.last_loss = loss
self.current_batch += 1
if self.current_batch >= self.batches_per_epoch:
self.epoch_losses.append(loss)
self.current_epoch += 1
self.current_batch = 0
if self.current_epoch >= self.total_epochs:
self.is_complete = True
@property
def progress_pct(self):
"""Overall training progress as percentage."""
total_batches = self.total_epochs * self.batches_per_epoch
done = self.current_epoch * self.batches_per_epoch + self.current_batch
if total_batches == 0:
return 100.0
return (done / total_batches) * 100.0
def to_dict(self):
return {
"total_epochs": self.total_epochs,
"batches_per_epoch": self.batches_per_epoch,
"current_epoch": self.current_epoch,
"current_batch": self.current_batch,
"total_steps": self.total_steps,
"total_tokens": self.total_tokens,
"epoch_losses": list(self.epoch_losses),
"last_loss": self.last_loss,
"is_complete": self.is_complete,
"is_cancelled": self.is_cancelled,
}
@classmethod
def from_dict(cls, d):
obj = cls(d.get("total_epochs", 1), d.get("batches_per_epoch", 1))
obj.current_epoch = d.get("current_epoch", 0)
obj.current_batch = d.get("current_batch", 0)
obj.total_steps = d.get("total_steps", 0)
obj.total_tokens = d.get("total_tokens", 0)
obj.epoch_losses = list(d.get("epoch_losses", []))
obj.last_loss = d.get("last_loss", 0.0)
obj.is_complete = d.get("is_complete", False)
obj.is_cancelled = d.get("is_cancelled", False)
return obj
def estimate_model_size(d_model, n_layers, vocab_size, block_size, bytes_per_param=8):
"""Estimate the state size (bytes) for a transformer model config.
Useful for checking whether a model fits within the 10MB state limit
before deploying.
Args:
d_model: Model dimension.
n_layers: Number of transformer layers.
vocab_size: Vocabulary size.
block_size: Maximum sequence length.
bytes_per_param: Bytes per parameter in serialized form (default 8 for float64).
Returns:
dict with param_count, estimated_bytes, estimated_mb, fits_in_state.
"""
# Embeddings
params = vocab_size * d_model # tok_emb
params += block_size * d_model # pos_emb
# Each transformer block: 2 LayerNorms + attention (4 projections) + MLP (2 layers)
per_layer = (
2 * d_model # ln1 gamma + beta
+ 4 * d_model * d_model # Wq, Wk, Wv, Wo
+ 4 * d_model # bq, bk, bv, bo
+ 2 * d_model # ln2 gamma + beta
+ d_model * 4 * d_model # mlp fc1 W
+ 4 * d_model # mlp fc1 b
+ 4 * d_model * d_model # mlp fc2 W
+ d_model # mlp fc2 b
)
params += n_layers * per_layer
# Head: final ln + lm_head
params += 2 * d_model # ln_f gamma + beta
params += d_model * vocab_size # lm_head W
params += vocab_size # lm_head b
est_bytes = params * bytes_per_param
est_mb = est_bytes / (1024 * 1024)
return {
"param_count": params,
"estimated_bytes": est_bytes,
"estimated_mb": round(est_mb, 2),
"fits_in_state": est_mb < 10.0,
"recommended_shards": max(1, int(est_mb / 2.0) + 1) if est_mb > 2.0 else 1,
}
def partition_epochs(total_epochs, batches_per_epoch, max_batches_per_block=1):
"""Compute how many blocks are needed for a training run.
Args:
total_epochs: Number of training epochs.
batches_per_epoch: Batches per epoch.
max_batches_per_block: Max batches to process in one block.
Returns:
dict with total_batches, total_blocks, blocks_per_epoch.
"""
total_batches = total_epochs * batches_per_epoch
if max_batches_per_block <= 0:
total_blocks = 1
else:
total_blocks = (total_batches + max_batches_per_block - 1) // max_batches_per_block
blocks_per_epoch = (
(batches_per_epoch + max_batches_per_block - 1) // max_batches_per_block
if max_batches_per_block > 0
else 1
)
return {
"total_batches": total_batches,
"total_blocks": total_blocks,
"blocks_per_epoch": blocks_per_epoch,
"max_batches_per_block": max_batches_per_block,
}
def auto_shard(runner, model_config, owner="deployer"):
"""Deploy WeightShard contracts and return a shard map.
Automatically creates the right number of shard contracts for a
given model configuration.
Args:
runner: A ContractTestRunner instance.
model_config: Dict with d_model, n_layers, etc.
owner: Sender address for deployment.
Returns:
Dict mapping shard names to contract addresses.
"""
n_layers = model_config.get("n_layers", 2)
shard_names = ["embed"] + [f"block_{i}" for i in range(n_layers)] + ["head"]
shard_map = {}
for name in shard_names:
addr = runner.deploy("ml/weight_shard.py", sender=owner, shard_id=name)
shard_map[name] = addr
return shard_map
SDK Reference
TrainingConfig(epochs, batch_size, learning_rate, checkpoint_interval, max_batches_per_block, grad_accumulation_steps)
Configuration dataclass for training hyperparameters. Validates all inputs on construction. Supports to_dict() and from_dict() for state serialization.
| Parameter | Default | Description |
|---|---|---|
epochs | 1 | Number of full passes over the training data |
batch_size | 4 | Sequences per mini-batch |
learning_rate | 0.001 | SGD step size (must be in (0, 10]) |
checkpoint_interval | 0 | Save checkpoint every N batches (0 = every batch) |
max_batches_per_block | 1 | Max batches to process in one blockchain block |
grad_accumulation_steps | 1 | Accumulate gradients over N steps before applying update |
DataLoader(token_ids, block_size, batch_size)
Splits a flat list of token IDs into sliding-window input/target pairs, grouped into mini-batches. Implements Python's iterator protocol (__iter__, __next__). Call reset() at the end of each epoch. Requires at least block_size + 1 tokens.
| Property/Method | Description |
|---|---|
len(loader) | Number of batches per epoch |
for batch_x, batch_y in loader | Iterate over batches (input, target sequence pairs) |
loader.reset() | Reset to beginning of data for a new epoch |
loader.current_batch | Current batch index |
loader.progress | Progress as a fraction [0, 1] |
CheckpointState(total_epochs, batches_per_epoch)
Tracks training progress for resumable multi-block training. Call advance_batch(loss, tokens_processed) after each batch. Supports to_dict() and from_dict() for persisting in contract state.
| Property/Method | Description |
|---|---|
advance_batch(loss, tokens) | Record a completed batch, advance epoch/batch counters |
progress_pct | Overall training progress as percentage |
is_complete | Whether all epochs have finished |
is_cancelled | Whether training was cancelled |
to_dict() / from_dict(d) | Serialize/deserialize for contract state storage |
estimate_model_size(d_model, n_layers, vocab_size, block_size, bytes_per_param=8)
Estimates the serialized state size for a transformer model. Returns a dict with param_count, estimated_bytes, estimated_mb, fits_in_state (whether it fits under the 10MB limit), and recommended_shards.
partition_epochs(total_epochs, batches_per_epoch, max_batches_per_block=1)
Computes how many blockchain blocks a training run requires. Returns total_batches, total_blocks, and blocks_per_epoch.
auto_shard(runner, model_config, owner="deployer")
Deploys WeightShard contracts for each layer of a model and returns a shard address map. Creates shards named "embed", "block_0" through "block_{n-1}", and "head".
How It Works
- Deploy sends the Python contract to the chain. The PandaVM compiles and stores it.
- Call (train/fit/update) mutates on-chain state. Trained model weights are serialized to JSON-compatible dicts and stored in the contract's state.
- Query (predict/classify/info) reads state without modifying it. The model is reconstructed from stored weights and runs inference.
All training happens inside the PandaVM sandbox with deterministic execution. Gas metering enforces resource limits. The same contract produces identical results on every validator.