Cross-Chain Messaging
Panda contracts can send and receive messages across chains -- Ethereum, Solana, Panda L2, and Panda Hub. The cross-chain SDK provides a Pythonic API with two interaction models: fire-and-forget (send a message and continue) and async/await (suspend until a response arrives from the other chain).
Getting Started
These three minimal contracts demonstrate the core cross-chain patterns. Each one is under 15 lines and does exactly one thing.
Hello World: Cross-Chain Call
The simplest possible cross-chain contract. It sends a single message to another chain using send_message():
from panda import contract, call, event, send_message
@contract
class HelloCrossChain:
class State:
sent_count: int = 0
@call
def send_greeting(self, ctx, dest_chain: str, recipient: str):
msg_id = send_message(dest_chain, recipient, {"greeting": "hello from Panda"})
self.state.sent_count += 1
self.emit(event.GreetingSent(to=dest_chain, msg_id=msg_id))
What is happening here:
send_message(dest_chain, recipient, payload)emits a raw cross-chain message. The payload is an arbitrary dict.- The function returns a hex string
msg_idfor tracking. - The message is fire-and-forget -- execution continues immediately.
Hello World: Cross-Chain Transfer
A minimal lock-and-bridge. It locks tokens on Panda and sends a mint instruction to Ethereum using the Chain class:
from panda import contract, call, query, event, Chain
@contract
class SimpleTokenBridge:
class State:
locked: dict = {}
@call
def bridge_to_ethereum(self, ctx, amount: int, recipient: str):
self.state.locked[ctx.sender] = self.state.locked.get(ctx.sender, 0) + amount
eth = Chain.ethereum()
eth.send(recipient, {"action": "mint", "amount": amount, "sender": ctx.sender})
self.emit(event.Bridged(sender=ctx.sender, amount=amount, recipient=recipient))
@query
def locked_balance(self, addr: str) -> int:
return self.state.locked.get(addr, 0)
What is happening here:
Chain.ethereum()creates a handle to the Ethereum chain.eth.send(recipient, payload)sends a message to a recipient address on Ethereum. This wrapssend_message()with a cleaner API.- The contract records locked balances locally and emits an event for indexers.
Hello World: @receiver
The simplest receiver. It accepts cross-chain messages from Ethereum and stores them:
from panda import contract, query, receiver, event
@contract
class SimpleReceiver:
class State:
last_message: dict = {}
@receiver(chains=["ethereum"])
def on_message(self, ctx, data: dict):
self.state.last_message = {"from": ctx.source_chain, "data": data}
self.emit(event.Received(source=ctx.source_chain))
@query
def get_last(self) -> dict:
return self.state.last_message
What is happening here:
@receiver(chains=["ethereum"])marks this method as a cross-chain message handler. Only messages originating from Ethereum are accepted -- Solana, Panda L2, etc. are rejected with aContractError.- Inside a
@receiver,ctx.source_chainandctx.message_idare available. - Direct calls from users (i.e. normal
@callinvocations) are rejected --@receivermethods can only be triggered by the cross-chain relay infrastructure.
Quick Example
This is the real BridgeV2 contract from the Panda repo. It demonstrates all three cross-chain patterns: await chain.call(), fire-and-forget, and @receiver:
from panda import contract, constructor, call, query, receiver, event, Chain
@contract
class BridgeV2:
class State:
owner: str = ""
total_bridged: int = 0
total_received: int = 0
pending_bridges: dict = {}
bridge_count: int = 0
@constructor
def deploy(self, ctx):
self.state.owner = ctx.sender
@call
async def bridge_tokens(self, ctx, amount: int, recipient: str = ""):
"""Bridge tokens to Ethereum. Awaits confirmation from Ethereum side."""
if amount <= 0:
raise ValueError("Amount must be positive")
target = recipient or ctx.sender
eth = Chain.ethereum()
# Lock tokens on this side
bridge_id = self.state.bridge_count
self.state.pending_bridges[str(bridge_id)] = {
"sender": ctx.sender,
"amount": amount,
"recipient": target,
"status": "pending",
}
self.state.bridge_count += 1
# Send cross-chain call and await confirmation
_result = await eth.call(
"0xEthBridge", "lock_and_mint",
amount=amount, recipient=target,
)
# This code runs when the cross-chain response arrives
self.state.pending_bridges[str(bridge_id)]["status"] = "confirmed"
self.state.total_bridged += amount
self.emit(event.BridgeConfirmed(bridge_id=bridge_id, amount=amount))
@call
def bridge_tokens_fire_and_forget(self, ctx, amount: int, recipient: str = ""):
"""Bridge tokens without waiting for confirmation (fire-and-forget)."""
if amount <= 0:
raise ValueError("Amount must be positive")
target = recipient or ctx.sender
eth = Chain.ethereum()
# Fire-and-forget -- msg_id is a string
msg_id = eth.call(
"0xEthBridge", "lock_and_mint",
amount=amount, recipient=target,
)
self.state.total_bridged += amount
self.emit(event.BridgeSent(msg_id=str(msg_id), amount=amount))
@receiver(chains=["ethereum"])
def on_tokens_locked(self, ctx, amount: int, sender: str):
"""Receive notification that tokens were locked on Ethereum.
Only accepts messages from Ethereum (chain whitelist enforced).
ctx.source_chain, ctx.message_id, ctx.is_cross_chain are available.
"""
self.state.total_received += amount
self.emit(event.TokensReceived(
amount=amount,
sender=sender,
source_chain=ctx.source_chain,
message_id=ctx.message_id,
))
@receiver(chains=["ethereum", "solana"])
def on_bridge_complete(self, ctx, bridge_id: int, status: str):
"""Receive bridge completion from Ethereum or Solana."""
key = str(bridge_id)
if key in self.state.pending_bridges:
self.state.pending_bridges[key]["status"] = status
self.emit(event.BridgeStatusUpdated(
bridge_id=bridge_id, status=status,
))
@receiver()
def on_any_chain_message(self, ctx, data: str):
"""Receive messages from any chain (no whitelist)."""
self.emit(event.AnyChainMessage(
data=data,
source_chain=ctx.source_chain,
))
@query
def get_bridge_status(self, bridge_id: int) -> dict:
return self.state.pending_bridges.get(str(bridge_id), {})
@query
def get_stats(self) -> dict:
return {
"total_bridged": self.state.total_bridged,
"total_received": self.state.total_received,
"bridge_count": self.state.bridge_count,
}
Chain Class
The Chain class represents a destination chain. Create one with a factory method or by name:
from panda import Chain
# Factory methods
eth = Chain.ethereum()
sol = Chain.solana()
l2 = Chain.l2()
hub = Chain.hub()
# By name
eth = Chain("ethereum")
sol = Chain("solana")
l2 = Chain("panda_l2")
hub = Chain("panda_hub")
Supported Chains
| Chain | Constructor | Constant |
|---|---|---|
| Ethereum | Chain.ethereum() | ETHEREUM |
| Solana | Chain.solana() | SOLANA |
| Panda L2 | Chain.l2() | PANDA_L2 |
| Panda Hub | Chain.hub() | PANDA_HUB |
You can also import the constants directly:
from panda import ETHEREUM, SOLANA, PANDA_L2, PANDA_HUB
Sending Cross-Chain Messages
There are three ways to send messages to another chain, from lowest-level to highest-level.
1. send_message() -- Low-Level
The lowest-level API. Emits a raw cross-chain message with arbitrary key-value payload. This is how the real TokenBridge contract sends bridge messages:
from panda import contract, constructor, call, query, event
from panda import send_message, Chain
@contract
class TokenBridge:
class State:
name: str = "PandaBridge"
owner: str = ""
locked_balances: dict = {}
total_locked: int = 0
total_released: int = 0
message_count: int = 0
@constructor
def deploy(self, ctx, name: str = "PandaBridge"):
self.state.name = name
self.state.owner = ctx.sender
@call
def lock_and_bridge(self, ctx, dest_chain: str, recipient: str, amount: int):
"""Lock tokens on this chain and emit a cross-chain mint message."""
if amount <= 0:
raise ValueError("Amount must be positive")
balance = self.state.locked_balances.get(ctx.sender, 0)
self.state.locked_balances[ctx.sender] = balance + amount
self.state.total_locked += amount
self.state.message_count += 1
msg_id = send_message(
dest_chain,
recipient,
action="mint",
amount=amount,
sender=ctx.sender,
)
self.emit(event.TokensLocked(
sender=ctx.sender,
dest_chain=dest_chain,
recipient=recipient,
amount=amount,
msg_id=msg_id,
))
send_message() returns a message ID (hex string) for tracking. If something goes wrong, it raises a RuntimeError.
For error-safe sending, use try_send_message() — as demonstrated in the real CrossChainMessenger contract:
from panda import send_message, try_send_message, Chain
@call
def send_safe(self, ctx, dest_chain: str, recipient: str, message: str = ""):
"""Send a message using try_send_message (never raises)."""
success, result = try_send_message(
dest_chain,
recipient,
message=message,
sender=ctx.sender,
)
if success:
self.state.sent_count += 1
self.state.last_sent_id = result
self.emit(event.SendAttempt(success=success, result=result))
2. Chain.call() / Chain.send() -- Mid-Level
The Chain class wraps send_message() with a cleaner API. The real TokenBridge uses both approaches:
@call
def bridge_to_ethereum(self, ctx, recipient: str, amount: int):
"""Convenience: bridge tokens to Ethereum using the Chain handle."""
if amount <= 0:
raise ValueError("Amount must be positive")
balance = self.state.locked_balances.get(ctx.sender, 0)
self.state.locked_balances[ctx.sender] = balance + amount
self.state.total_locked += amount
self.state.message_count += 1
eth = Chain.ethereum()
msg_id = eth.send(recipient, action="mint", amount=amount, sender=ctx.sender)
self.emit(event.TokensLocked(
sender=ctx.sender,
dest_chain="ethereum",
recipient=recipient,
amount=amount,
msg_id=msg_id,
))
The real CrossChainMessenger also demonstrates broadcasting to multiple chains:
@call
def broadcast(self, ctx, message: str = ""):
"""Broadcast a message to all supported chains."""
chains = ["ethereum", "solana", "panda_hub"]
for chain in chains:
dest = Chain(chain)
msg_id = dest.send(
"0x0000000000000000000000000000000000000000",
message=message,
sender=ctx.sender,
)
self.state.sent_count += 1
self.state.last_sent_id = msg_id
3. Typed Remote Handles -- High-Level
The most ergonomic API. Get a handle to a remote contract and call methods like normal Python. This is demonstrated in the real CrossChainSwap contract:
RemoteContract -- untyped proxy for any contract:
eth = Chain.ethereum()
remote = eth.contract("0xEthDex")
# Call methods directly -- they route through cross-chain messaging
msg_id = remote.deposit(amount=eth_amount, sender=ctx.sender)
msg_id = remote.swap(amount=amount, sender=ctx.sender)
RemotePRC20 -- typed proxy for PRC-20 tokens:
eth = Chain.ethereum()
usdc = eth.prc20(token_address)
# Typed methods with keyword arguments
msg_id = usdc.transfer(to=ctx.sender, value=amount)
msg_id = usdc.approve(spender="0xDex", value=5000)
msg_id = usdc.transfer_from(owner="0xAlice", to="0xBob", value=100)
msg_id = usdc.mint(to="0xBob", amount=500)
Fire-and-Forget vs Await
Every cross-chain call returns a dual-use object that works as both a string (message ID) and an awaitable.
Fire-and-Forget
Use the return value as a string. The message is emitted and execution continues immediately without waiting for a response. From the real BridgeV2:
@call
def bridge_tokens_fire_and_forget(self, ctx, amount: int, recipient: str = ""):
"""Bridge tokens without waiting for confirmation (fire-and-forget)."""
if amount <= 0:
raise ValueError("Amount must be positive")
target = recipient or ctx.sender
eth = Chain.ethereum()
# Fire-and-forget -- msg_id is a string
msg_id = eth.call(
"0xEthBridge", "lock_and_mint",
amount=amount, recipient=target,
)
self.state.total_bridged += amount
self.emit(event.BridgeSent(msg_id=str(msg_id), amount=amount))
And the real CrossChainSwap:
@call
def fire_and_forget_swap(self, ctx, amount: int, dest_chain: str):
"""Fire-and-forget cross-chain swap (no await)."""
chain = Chain(dest_chain)
remote = chain.contract("0xDex")
msg_id = remote.swap(amount=amount, sender=ctx.sender)
self.state.swap_count += 1
self.state.total_volume += amount
self.emit(event.SwapSent(msg_id=str(msg_id), amount=amount, dest=dest_chain))
Await (Async Completion)
Use await to suspend the contract until a cross-chain response arrives. The method must be async def. From the real CrossChainSwap:
@call
async def swap_to_ethereum(self, ctx, amount: int, token_address: str):
"""Swap tokens cross-chain to Ethereum using typed remote handle."""
if amount <= 0:
raise ValueError("Amount must be positive")
eth = Chain.ethereum()
usdc = eth.prc20(token_address)
swap_id = self.state.swap_count
self.state.pending_swaps[str(swap_id)] = {
"sender": ctx.sender,
"amount": amount,
"token": token_address,
"status": "pending",
}
self.state.swap_count += 1
# Await cross-chain transfer
_result = await usdc.transfer(to=ctx.sender, value=amount)
# Runs when response arrives
self.state.pending_swaps[str(swap_id)]["status"] = "completed"
self.state.total_volume += amount
self.emit(event.SwapCompleted(swap_id=swap_id, amount=amount))
Sequential Multi-Chain Operations
You can await multiple cross-chain calls in sequence. Each one suspends until the previous completes. From the real CrossChainSwap:
@call
async def multi_chain_operation(self, ctx, eth_amount: int, sol_amount: int):
"""Perform operations on multiple chains sequentially."""
eth = Chain.ethereum()
sol = Chain.solana()
# First, call Ethereum
eth_remote = eth.contract("0xEthDex")
_result1 = await eth_remote.deposit(amount=eth_amount, sender=ctx.sender)
# Then, call Solana (runs after Ethereum response)
sol_remote = sol.contract("SolDex")
_result2 = await sol_remote.deposit(amount=sol_amount, sender=ctx.sender)
self.state.total_volume += eth_amount + sol_amount
self.emit(event.MultiChainOp(eth_amount=eth_amount, sol_amount=sol_amount))
Receiving Cross-Chain Messages
The @receiver Decorator
Mark a method as a cross-chain message handler with @receiver. Receiver methods can only be invoked with a valid cross-chain context -- direct calls from users are rejected.
from panda import receiver
@receiver(chains=["ethereum"])
def on_tokens_locked(self, ctx, amount: int, sender: str):
"""Only accepts messages from Ethereum."""
self.state.total_received += amount
self.emit(event.TokensReceived(
amount=amount,
sender=sender,
source_chain=ctx.source_chain,
message_id=ctx.message_id,
))
Chain Whitelisting
The chains parameter restricts which source chains can deliver messages. These are real test contracts that verify whitelisting:
# Only Ethereum -- Solana messages are rejected with ContractError
@contract
class EthOnlyBridge:
class State:
total_received: int = 0
last_source: str = ""
last_msg_id: str = ""
@receiver(chains=["ethereum"])
def on_tokens(self, ctx, amount: int, from_addr: str = ""):
self.state.total_received += amount
self.state.last_source = ctx.source_chain
self.state.last_msg_id = ctx.message_id
# Ethereum or Solana -- rejects panda_l2, panda_hub, etc.
@contract
class MultiChainReceiver:
class State:
received: list = []
@receiver(chains=["ethereum", "solana"])
def on_message(self, ctx, data: str):
self.state.received.append({"data": data, "source": ctx.source_chain})
# No whitelist -- accepts from any chain
@receiver()
def on_any(self, ctx, data: str):
self.state.received.append({"data": data, "source": ctx.source_chain, "any": True})
Security: Direct Call Rejection
@receiver methods cannot be invoked via normal @call -- they require cross-chain context. And @call methods cannot be invoked via call_cross_chain():
@contract
class MixedContract:
class State:
value: int = 0
received: int = 0
@call
def set_value(self, ctx, value: int):
self.state.value = value
@receiver(chains=["ethereum"])
def on_update(self, ctx, value: int):
self.state.received += value
# In tests:
# runner.call(addr, "on_update", sender="attacker", value=100)
# → ContractError (direct call to @receiver rejected)
#
# runner.call_cross_chain(addr, "set_value", source_chain="ethereum", value=42)
# → ContractError ("set_value is not a @receiver")
Cross-Chain Context
Inside a @receiver method, the context object (ctx) includes additional fields:
@receiver(chains=["ethereum", "solana"])
def on_message(self, ctx, data: str):
ctx.sender # Address of the relayer that delivered the message
ctx.source_chain # Origin chain: "ethereum", "solana", etc.
ctx.message_id # Unique ID of this cross-chain message (hex string)
ctx.is_cross_chain # Always True inside a @receiver
ctx.block_height # Current block on this chain
ctx.block_time # Current block timestamp
Complete Example: Cross-Chain DeFi
The real CrossChainSwap contract combines typed handles, async/await, fire-and-forget, and @receiver in a single contract:
from panda import contract, constructor, call, query, receiver, event, Chain
@contract
class CrossChainSwap:
class State:
owner: str = ""
swap_count: int = 0
total_volume: int = 0
pending_swaps: dict = {}
@constructor
def deploy(self, ctx):
self.state.owner = ctx.sender
@call
async def swap_to_ethereum(self, ctx, amount: int, token_address: str):
"""Swap tokens cross-chain to Ethereum using typed remote handle."""
if amount <= 0:
raise ValueError("Amount must be positive")
eth = Chain.ethereum()
usdc = eth.prc20(token_address)
swap_id = self.state.swap_count
self.state.pending_swaps[str(swap_id)] = {
"sender": ctx.sender,
"amount": amount,
"token": token_address,
"status": "pending",
}
self.state.swap_count += 1
# Await cross-chain transfer
_result = await usdc.transfer(to=ctx.sender, value=amount)
# Runs when response arrives
self.state.pending_swaps[str(swap_id)]["status"] = "completed"
self.state.total_volume += amount
self.emit(event.SwapCompleted(swap_id=swap_id, amount=amount))
@call
async def multi_chain_operation(self, ctx, eth_amount: int, sol_amount: int):
"""Perform operations on multiple chains sequentially."""
eth = Chain.ethereum()
sol = Chain.solana()
eth_remote = eth.contract("0xEthDex")
_result1 = await eth_remote.deposit(amount=eth_amount, sender=ctx.sender)
sol_remote = sol.contract("SolDex")
_result2 = await sol_remote.deposit(amount=sol_amount, sender=ctx.sender)
self.state.total_volume += eth_amount + sol_amount
self.emit(event.MultiChainOp(eth_amount=eth_amount, sol_amount=sol_amount))
@call
def fire_and_forget_swap(self, ctx, amount: int, dest_chain: str):
"""Fire-and-forget cross-chain swap (no await)."""
chain = Chain(dest_chain)
remote = chain.contract("0xDex")
msg_id = remote.swap(amount=amount, sender=ctx.sender)
self.state.swap_count += 1
self.state.total_volume += amount
self.emit(event.SwapSent(msg_id=str(msg_id), amount=amount, dest=dest_chain))
@receiver(chains=["ethereum"])
def on_swap_result(self, ctx, swap_id: int, success: bool):
"""Receive swap result notification from Ethereum."""
key = str(swap_id)
if key in self.state.pending_swaps:
self.state.pending_swaps[key]["status"] = "success" if success else "failed"
self.emit(event.SwapResultReceived(
swap_id=swap_id,
success=success,
source_chain=ctx.source_chain,
))
@query
def get_swap(self, swap_id: int) -> dict:
return self.state.pending_swaps.get(str(swap_id), {})
@query
def get_volume(self) -> int:
return self.state.total_volume
Testing Cross-Chain Contracts
The ContractTestRunner provides methods to simulate cross-chain messaging locally.
Calling a @receiver Method
Use call_cross_chain() to simulate a relayer delivering a cross-chain message:
from panda.testing import ContractTestRunner
runner = ContractTestRunner()
addr = runner.deploy(EthOnlyBridge, sender="deployer")
# Simulate receiving a message from Ethereum
result = runner.call_cross_chain(
addr, "on_tokens",
source_chain="ethereum",
amount=100,
from_addr="alice",
)
# Verify the receiver processed the message
state = runner.get_state(addr)
assert state["total_received"] == 100
assert state["last_source"] == "ethereum"
assert state["last_msg_id"].startswith("0x")
Custom Message ID
runner.call_cross_chain(
addr, "on_tokens",
source_chain="ethereum",
message_id="0xcustom123",
amount=25,
)
state = runner.get_state(addr)
assert state["last_msg_id"] == "0xcustom123"
Testing Chain Whitelisting
# Correct chain -- accepted
runner.call_cross_chain(addr, "on_tokens", source_chain="ethereum", amount=100)
# Wrong chain -- rejected
with pytest.raises(ContractError, match="does not accept messages from chain"):
runner.call_cross_chain(addr, "on_tokens", source_chain="solana", amount=100)
Testing Direct Call Rejection
# Direct call to @receiver -- rejected
with pytest.raises(ContractError):
runner.call(addr, "on_tokens", sender="attacker", amount=100)
# Cross-chain call to @call method -- rejected
with pytest.raises(ContractError, match="not a @receiver"):
runner.call_cross_chain(addr, "set_value", source_chain="ethereum", value=42)
Delivering Responses to Awaited Calls
For async def methods that use await chain.call(), deliver the response with deliver_cross_chain_response():
runner = ContractTestRunner()
addr = runner.deploy(CrossChainSwap, sender="deployer")
# Start an async cross-chain call
runner.call(
addr, "swap_to_ethereum", sender="alice",
amount=100, token_address="0xUSDC",
)
# Check what cross-chain calls are pending
pending = runner.get_pending_cross_chain_calls()
assert len(pending) == 1
msg_id = pending[0]["msg_id"]
# Deliver the response -- this resumes the suspended coroutine
result = runner.deliver_cross_chain_response(msg_id, response={"success": True})
# The code after `await` has now executed
assert result.state["total_volume"] == 100
Inspecting Emitted Messages
All cross-chain messages emitted during test execution are captured:
runner = ContractTestRunner()
addr = runner.deploy(CrossChainMessenger, sender="deployer")
runner.call(
addr, "send_to", sender="alice",
dest_chain="ethereum", recipient="0xBob", message="hello",
)
messages = runner.get_cross_chain_messages()
assert len(messages) == 1
assert messages[0]["dest_chain"] == "ethereum"
assert messages[0]["recipient"] == "0xBob"
assert messages[0]["payload"]["message"] == "hello"
# Clear messages between test cases
runner.clear_cross_chain_messages()
Message Flow
- A contract calls
Chain.call(),send_message(), or aRemoteContractmethod - PandaVM captures the message intent in the execution output
- The host chain (Geth or rollup sequencer) picks up the message
- A bridge relayer delivers the message to the destination chain
- On the destination chain, the
@receivermethod or receiving contract processes it - If the sender used
await, the response is delivered back and the suspended coroutine resumes
API Reference
Functions
| Function | Description |
|---|---|
send_message(dest_chain, recipient, **payload) | Emit a raw cross-chain message. Returns msg_id (str). |
try_send_message(dest_chain, recipient, **payload) | Safe variant. Returns (success, msg_id_or_error). |
Chain Class
| Method | Description |
|---|---|
Chain(name) | Create a chain handle by name |
Chain.ethereum() | Factory for Ethereum |
Chain.solana() | Factory for Solana |
Chain.l2() | Factory for Panda L2 |
Chain.hub() | Factory for Panda Hub |
chain.call(addr, method, **kwargs) | Call a remote contract method |
chain.transfer(addr, amount, **kwargs) | Transfer value |
chain.send(addr, **payload) | Send raw message |
chain.contract(addr) | Get RemoteContract handle |
chain.prc20(addr) | Get RemotePRC20 handle |
RemoteContract
Untyped proxy. Call any method by name:
remote = eth.contract("0x1234...")
msg_id = remote.any_method(arg1=val1, arg2=val2)
result = await remote.any_method(arg1=val1, arg2=val2)
RemotePRC20
Typed PRC-20 proxy with named methods:
| Method | Type | Signature |
|---|---|---|
transfer | @call | transfer(to=, value=) |
approve | @call | approve(spender=, value=) |
transfer_from | @call | transfer_from(owner=, to=, value=) |
mint | @call | mint(to=, amount=) |
balance_of | @query | balance_of(owner=) |
total_supply | @query | total_supply() |
ContractTestRunner Methods
| Method | Description |
|---|---|
call_cross_chain(addr, method, source_chain=, message_id=, **kwargs) | Invoke a @receiver with cross-chain context |
deliver_cross_chain_response(msg_id, response=) | Resume a suspended await chain.call() |
get_cross_chain_messages() | List all emitted cross-chain messages |
clear_cross_chain_messages() | Clear the message log |
get_pending_cross_chain_calls() | List suspended async cross-chain calls |
Cross-Chain Messenger
The CrossChainMessenger is a complete messaging contract that demonstrates send_message(), try_send_message(), Chain.send(), and multi-chain broadcasting. This is the full contract from contracts/examples/cross_chain_messenger.py:
"""Simple cross-chain messenger -- sends and receives messages across chains.
Demonstrates the minimal cross-chain SDK usage:
- send_message() to emit messages
- try_send_message() for error-safe sending
- Chain class with factory methods
- Receiving messages via normal @call methods
Usage:
from panda import send_message, try_send_message, Chain
"""
from panda import contract, constructor, call, query, event
from panda import send_message, try_send_message, Chain
@contract
class CrossChainMessenger:
"""A simple cross-chain messenger that tracks sent and received messages."""
class State:
owner: str = ""
sent_count: int = 0
received_count: int = 0
last_sent_id: str = ""
last_received_from: str = ""
last_received_payload: dict = {}
message_log: list = []
@constructor
def deploy(self, ctx):
self.state.owner = ctx.sender
@call
def send_to(self, ctx, dest_chain: str, recipient: str, message: str = ""):
"""Send a text message to another chain."""
msg_id = send_message(
dest_chain,
recipient,
message=message,
sender=ctx.sender,
)
self.state.sent_count += 1
self.state.last_sent_id = msg_id
self.state.message_log.append(
{
"direction": "sent",
"chain": dest_chain,
"recipient": recipient,
"msg_id": msg_id,
}
)
self.emit(
event.MessageSent(
dest_chain=dest_chain,
recipient=recipient,
msg_id=msg_id,
)
)
@call
def send_safe(self, ctx, dest_chain: str, recipient: str, message: str = ""):
"""Send a message using try_send_message (never raises)."""
success, result = try_send_message(
dest_chain,
recipient,
message=message,
sender=ctx.sender,
)
if success:
self.state.sent_count += 1
self.state.last_sent_id = result
self.emit(event.SendAttempt(success=success, result=result))
@call
def broadcast(self, ctx, message: str = ""):
"""Broadcast a message to all supported chains."""
chains = ["ethereum", "solana", "panda_hub"]
for chain in chains:
dest = Chain(chain)
msg_id = dest.send(
"0x0000000000000000000000000000000000000000",
message=message,
sender=ctx.sender,
)
self.state.sent_count += 1
self.state.last_sent_id = msg_id
@call
def receive(self, ctx, sender: str = "", payload: dict = {}):
"""Receive a cross-chain message (called by relayer/sequencer)."""
self.state.received_count += 1
self.state.last_received_from = sender
self.state.last_received_payload = payload
self.state.message_log.append(
{
"direction": "received",
"sender": sender,
"payload": payload,
}
)
self.emit(event.MessageReceived(sender=sender))
@query
def stats(self) -> dict:
return {
"sent_count": self.state.sent_count,
"received_count": self.state.received_count,
"last_sent_id": self.state.last_sent_id,
"last_received_from": self.state.last_received_from,
}
@query
def get_log(self) -> list:
return self.state.message_log
Walkthrough
The CrossChainMessenger shows four distinct sending patterns:
-
send_to-- usessend_message()directly. This is the lowest-level API. The contract passes a destination chain name, a recipient address, and keyword arguments that become the message payload. The returnedmsg_idis a hex string for tracking. -
send_safe-- usestry_send_message(), which never raises. Instead it returns a(success, result)tuple. If the send fails (e.g. invalid chain name),successisFalseandresultcontains the error string. This is useful when you want to handle failures gracefully rather than reverting the entire transaction. -
broadcast-- iterates over multiple chain names and usesChain(name).send()to send to each. This demonstrates thatChainhandles can be created dynamically from strings, not just from factory methods likeChain.ethereum(). -
receive-- a plain@callmethod that acts as a message inbox. In production, only the authorized relayer would call this. For contracts that need stricter security, use@receiverinstead (see the BridgeV2 example above).
The contract also maintains a message_log list that records every sent and received message, making it easy to audit message history via the get_log() query.
Token Bridge
The TokenBridge demonstrates the lock-and-mint pattern -- the standard approach for bridging tokens between chains. Tokens are locked on the source chain and minted (or released) on the destination chain. This is the full contract from contracts/examples/token_bridge.py:
"""Cross-chain token bridge -- locks tokens on source chain, emits message to mint on destination.
Demonstrates the ergonomic cross-chain SDK:
- send_message() for low-level message emission
- Chain("ethereum") for handle-based messaging
- Receiving cross-chain messages via normal @call methods
Usage:
from panda import contract, constructor, call, query, event
from panda import send_message, Chain
"""
from panda import contract, constructor, call, query, event
from panda import send_message, Chain
@contract
class TokenBridge:
"""A cross-chain token bridge that locks tokens and emits bridge messages.
Tokens locked on this chain are released (or minted) on the destination
chain when the relayer delivers the cross-chain message.
"""
class State:
name: str = "PandaBridge"
owner: str = ""
locked_balances: dict = {}
total_locked: int = 0
total_released: int = 0
message_count: int = 0
@constructor
def deploy(self, ctx, name: str = "PandaBridge"):
self.state.name = name
self.state.owner = ctx.sender
@call
def lock_and_bridge(self, ctx, dest_chain: str, recipient: str, amount: int):
"""Lock tokens on this chain and emit a cross-chain mint message."""
if amount <= 0:
raise ValueError("Amount must be positive")
# Lock tokens
balance = self.state.locked_balances.get(ctx.sender, 0)
self.state.locked_balances[ctx.sender] = balance + amount
self.state.total_locked += amount
self.state.message_count += 1
# Emit cross-chain message to mint on destination
msg_id = send_message(
dest_chain,
recipient,
action="mint",
amount=amount,
sender=ctx.sender,
)
self.emit(
event.TokensLocked(
sender=ctx.sender,
dest_chain=dest_chain,
recipient=recipient,
amount=amount,
msg_id=msg_id,
)
)
@call
def bridge_to_ethereum(self, ctx, recipient: str, amount: int):
"""Convenience: bridge tokens to Ethereum using the Chain handle."""
if amount <= 0:
raise ValueError("Amount must be positive")
balance = self.state.locked_balances.get(ctx.sender, 0)
self.state.locked_balances[ctx.sender] = balance + amount
self.state.total_locked += amount
self.state.message_count += 1
eth = Chain.ethereum()
msg_id = eth.send(recipient, action="mint", amount=amount, sender=ctx.sender)
self.emit(
event.TokensLocked(
sender=ctx.sender,
dest_chain="ethereum",
recipient=recipient,
amount=amount,
msg_id=msg_id,
)
)
@call
def receive_and_release(self, ctx, recipient: str, amount: int, source_chain: str = ""):
"""Release tokens when a cross-chain message arrives from another chain.
In production, only the authorized relayer/sequencer can call this.
"""
if amount <= 0:
raise ValueError("Amount must be positive")
locked = self.state.locked_balances.get(recipient, 0)
if locked < amount:
raise ValueError(f"Insufficient locked balance: {locked} < {amount}")
self.state.locked_balances[recipient] = locked - amount
self.state.total_released += amount
self.emit(
event.TokensReleased(
recipient=recipient,
amount=amount,
source_chain=source_chain,
)
)
@query
def get_locked_balance(self, owner: str = "") -> int:
return self.state.locked_balances.get(owner, 0)
@query
def stats(self) -> dict:
return {
"name": self.state.name,
"total_locked": self.state.total_locked,
"total_released": self.state.total_released,
"message_count": self.state.message_count,
}
Walkthrough
The TokenBridge has three key methods that show the full lifecycle of a cross-chain bridge:
-
lock_and_bridge-- the generic entry point. It accepts any destination chain as a string ("ethereum","solana", etc.), locks the caller's tokens by incrementing their balance inlocked_balances, and emits a cross-chain message viasend_message(). The message payload includesaction="mint"so the receiving side knows what to do. -
bridge_to_ethereum-- a convenience wrapper that hardcodes the destination to Ethereum. Instead ofsend_message(), it usesChain.ethereum().send(), which is the mid-level API. Both approaches emit the same cross-chain message --Chain.send()just wrapssend_message()with a cleaner syntax. -
receive_and_release-- the reverse direction. When a relayer delivers a message from another chain indicating that tokens were locked there, this method releases the equivalent amount on this chain. It checks that the recipient has sufficient locked balance before releasing, preventing over-withdrawal.
The contract also provides two queries: get_locked_balance to check a specific address's locked amount, and stats to get aggregate bridge statistics.
Cross-Chain DeFi
The CrossChainSwap contract combines typed remote handles (RemoteContract, RemotePRC20), async/await, fire-and-forget messaging, and @receiver callbacks in a single contract. This is the full contract from contracts/examples/cross_chain_defi.py:
"""
cross_chain_defi -- Cross-chain DeFi swap using typed remote handles + await.
Demonstrates:
- Chain.contract() for untyped remote contract handles
- Chain.prc20() for typed PRC-20 remote token handles
- await for async cross-chain completion
- Combining @receiver and async workflows
"""
from panda import contract, constructor, call, query, receiver, event, Chain
@contract
class CrossChainSwap:
class State:
owner: str = ""
swap_count: int = 0
total_volume: int = 0
pending_swaps: dict = {}
@constructor
def deploy(self, ctx):
self.state.owner = ctx.sender
@call
async def swap_to_ethereum(self, ctx, amount: int, token_address: str):
"""Swap tokens cross-chain to Ethereum using typed remote handle."""
if amount <= 0:
raise ValueError("Amount must be positive")
eth = Chain.ethereum()
usdc = eth.prc20(token_address)
swap_id = self.state.swap_count
self.state.pending_swaps[str(swap_id)] = {
"sender": ctx.sender,
"amount": amount,
"token": token_address,
"status": "pending",
}
self.state.swap_count += 1
# Await cross-chain transfer
_result = await usdc.transfer(to=ctx.sender, value=amount)
# Runs when response arrives
self.state.pending_swaps[str(swap_id)]["status"] = "completed"
self.state.total_volume += amount
self.emit(event.SwapCompleted(swap_id=swap_id, amount=amount))
@call
async def multi_chain_operation(self, ctx, eth_amount: int, sol_amount: int):
"""Perform operations on multiple chains sequentially."""
eth = Chain.ethereum()
sol = Chain.solana()
# First, call Ethereum
eth_remote = eth.contract("0xEthDex")
_result1 = await eth_remote.deposit(amount=eth_amount, sender=ctx.sender)
# Then, call Solana (runs after Ethereum response)
sol_remote = sol.contract("SolDex")
_result2 = await sol_remote.deposit(amount=sol_amount, sender=ctx.sender)
self.state.total_volume += eth_amount + sol_amount
self.emit(event.MultiChainOp(eth_amount=eth_amount, sol_amount=sol_amount))
@call
def fire_and_forget_swap(self, ctx, amount: int, dest_chain: str):
"""Fire-and-forget cross-chain swap (no await)."""
chain = Chain(dest_chain)
remote = chain.contract("0xDex")
msg_id = remote.swap(amount=amount, sender=ctx.sender)
self.state.swap_count += 1
self.state.total_volume += amount
self.emit(event.SwapSent(msg_id=str(msg_id), amount=amount, dest=dest_chain))
@receiver(chains=["ethereum"])
def on_swap_result(self, ctx, swap_id: int, success: bool):
"""Receive swap result notification from Ethereum."""
key = str(swap_id)
if key in self.state.pending_swaps:
self.state.pending_swaps[key]["status"] = "success" if success else "failed"
self.emit(
event.SwapResultReceived(
swap_id=swap_id,
success=success,
source_chain=ctx.source_chain,
)
)
@query
def get_swap(self, swap_id: int) -> dict:
return self.state.pending_swaps.get(str(swap_id), {})
@query
def get_volume(self) -> int:
return self.state.total_volume
Walkthrough
The CrossChainSwap demonstrates the most advanced cross-chain patterns:
-
Typed remote handles --
swap_to_ethereumuseseth.prc20(token_address)to get aRemotePRC20handle. This gives you typed methods like.transfer(to=, value=)instead of raw message payloads. Themulti_chain_operationuseseth.contract("0xEthDex")for an untypedRemoteContracthandle where you call any method by name. -
async/await-- bothswap_to_ethereumandmulti_chain_operationareasync defmethods. Theawaitkeyword suspends contract execution until the cross-chain response arrives. Code afterawaitruns only when the destination chain confirms the operation. -
Sequential multi-chain --
multi_chain_operationawaits Ethereum first, then Solana. The Solana call only executes after the Ethereum response arrives. This guarantees ordering across chains. -
Fire-and-forget --
fire_and_forget_swapshows the alternative: call a remote contract without waiting. Themsg_idis returned as a string for tracking, but execution continues immediately. -
@receivercallback --on_swap_resulthandles asynchronous notifications from Ethereum. It updates the swap status inpending_swapsbased on the result. Thechains=["ethereum"]whitelist ensures only Ethereum can send these notifications.
Relay Patterns
These three contracts from the Panda repo demonstrate cross-contract call relaying -- where one contract calls another, which calls a third. This is useful for testing ctx.sender vs ctx.origin_sender in call chains.
SenderInspector
The innermost contract. It records who called it (ctx.sender) and who originated the transaction (ctx.origin_sender):
"""Records ctx.sender and ctx.origin_sender for cross-contract call tests."""
from panda import contract, call, query
@contract
class SenderInspector:
class State:
last_sender: str = ""
last_origin: str = ""
@call
def record(self, ctx):
self.state.last_sender = ctx.sender
self.state.last_origin = ctx.origin_sender
return ctx.sender
@query
def get_sender(self):
return self.state.last_sender
@query
def get_origin(self):
return self.state.last_origin
RelayMiddle
The middle layer. It holds a reference to a SenderInspector and relays calls to it via Contract():
"""Relays a cross-contract call to SenderInspector. Used for depth-2 tests."""
from panda import contract, call, Contract
@contract
class RelayMiddle:
class State:
inspector_address: str = ""
@call
def set_inspector(self, ctx, address: str):
self.state.inspector_address = address
@call
def relay_record(self, ctx):
"""Call inspector.record() — inspector should see this contract as sender."""
inspector = Contract(self.state.inspector_address)
return inspector.record()
RelayOuter
The outermost layer. It calls RelayMiddle, which calls SenderInspector:
"""Calls RelayMiddle which calls SenderInspector. Used for depth-2 tests."""
from panda import contract, call, Contract
@contract
class RelayOuter:
class State:
result: str = ""
@call
def call_through(self, ctx, middle_address: str):
"""Call middle.relay_record() which calls inspector.record()."""
middle = Contract(middle_address)
r = middle.relay_record()
self.state.result = str(r)
return r
Walkthrough
The relay pattern tests a fundamental property of cross-contract calls: ctx.sender is always the immediate caller, while ctx.origin_sender is the original transaction signer.
When a user calls RelayOuter.call_through():
RelayOutercallsRelayMiddle.relay_record()-- insideRelayMiddle,ctx.senderis theRelayOutercontract addressRelayMiddlecallsSenderInspector.record()-- insideSenderInspector,ctx.senderis theRelayMiddlecontract address- In all three contracts,
ctx.origin_senderis the original user's address
This is the same behavior as Ethereum's msg.sender vs tx.origin. The Contract() class (distinct from Chain()) is used for same-chain cross-contract calls, not cross-chain messaging.
Next Steps
- Try the Contract Playground to write and deploy cross-chain contracts
- Read the SDK Reference for the full API
- See the Async Contracts guide for
await sleep()and timer patterns - Learn about Rollup Architecture for how messages flow between chains