Cross-Chain Messaging

Panda contracts can send and receive messages across chains -- Ethereum, Solana, Panda L2, and Panda Hub. The cross-chain SDK provides a Pythonic API with two interaction models: fire-and-forget (send a message and continue) and async/await (suspend until a response arrives from the other chain).

Getting Started

These three minimal contracts demonstrate the core cross-chain patterns. Each one is under 15 lines and does exactly one thing.

Hello World: Cross-Chain Call

The simplest possible cross-chain contract. It sends a single message to another chain using send_message():

from panda import contract, call, event, send_message

@contract
class HelloCrossChain:
    class State:
        sent_count: int = 0

    @call
    def send_greeting(self, ctx, dest_chain: str, recipient: str):
        msg_id = send_message(dest_chain, recipient, {"greeting": "hello from Panda"})
        self.state.sent_count += 1
        self.emit(event.GreetingSent(to=dest_chain, msg_id=msg_id))

What is happening here:

  • send_message(dest_chain, recipient, payload) emits a raw cross-chain message. The payload is an arbitrary dict.
  • The function returns a hex string msg_id for tracking.
  • The message is fire-and-forget -- execution continues immediately.

Hello World: Cross-Chain Transfer

A minimal lock-and-bridge. It locks tokens on Panda and sends a mint instruction to Ethereum using the Chain class:

from panda import contract, call, query, event, Chain

@contract
class SimpleTokenBridge:
    class State:
        locked: dict = {}

    @call
    def bridge_to_ethereum(self, ctx, amount: int, recipient: str):
        self.state.locked[ctx.sender] = self.state.locked.get(ctx.sender, 0) + amount
        eth = Chain.ethereum()
        eth.send(recipient, {"action": "mint", "amount": amount, "sender": ctx.sender})
        self.emit(event.Bridged(sender=ctx.sender, amount=amount, recipient=recipient))

    @query
    def locked_balance(self, addr: str) -> int:
        return self.state.locked.get(addr, 0)

What is happening here:

  • Chain.ethereum() creates a handle to the Ethereum chain.
  • eth.send(recipient, payload) sends a message to a recipient address on Ethereum. This wraps send_message() with a cleaner API.
  • The contract records locked balances locally and emits an event for indexers.

Hello World: @receiver

The simplest receiver. It accepts cross-chain messages from Ethereum and stores them:

from panda import contract, query, receiver, event

@contract
class SimpleReceiver:
    class State:
        last_message: dict = {}

    @receiver(chains=["ethereum"])
    def on_message(self, ctx, data: dict):
        self.state.last_message = {"from": ctx.source_chain, "data": data}
        self.emit(event.Received(source=ctx.source_chain))

    @query
    def get_last(self) -> dict:
        return self.state.last_message

What is happening here:

  • @receiver(chains=["ethereum"]) marks this method as a cross-chain message handler. Only messages originating from Ethereum are accepted -- Solana, Panda L2, etc. are rejected with a ContractError.
  • Inside a @receiver, ctx.source_chain and ctx.message_id are available.
  • Direct calls from users (i.e. normal @call invocations) are rejected -- @receiver methods can only be triggered by the cross-chain relay infrastructure.

Quick Example

This is the real BridgeV2 contract from the Panda repo. It demonstrates all three cross-chain patterns: await chain.call(), fire-and-forget, and @receiver:

from panda import contract, constructor, call, query, receiver, event, Chain

@contract
class BridgeV2:
    class State:
        owner: str = ""
        total_bridged: int = 0
        total_received: int = 0
        pending_bridges: dict = {}
        bridge_count: int = 0

    @constructor
    def deploy(self, ctx):
        self.state.owner = ctx.sender

    @call
    async def bridge_tokens(self, ctx, amount: int, recipient: str = ""):
        """Bridge tokens to Ethereum. Awaits confirmation from Ethereum side."""
        if amount <= 0:
            raise ValueError("Amount must be positive")

        target = recipient or ctx.sender
        eth = Chain.ethereum()

        # Lock tokens on this side
        bridge_id = self.state.bridge_count
        self.state.pending_bridges[str(bridge_id)] = {
            "sender": ctx.sender,
            "amount": amount,
            "recipient": target,
            "status": "pending",
        }
        self.state.bridge_count += 1

        # Send cross-chain call and await confirmation
        _result = await eth.call(
            "0xEthBridge", "lock_and_mint",
            amount=amount, recipient=target,
        )

        # This code runs when the cross-chain response arrives
        self.state.pending_bridges[str(bridge_id)]["status"] = "confirmed"
        self.state.total_bridged += amount
        self.emit(event.BridgeConfirmed(bridge_id=bridge_id, amount=amount))

    @call
    def bridge_tokens_fire_and_forget(self, ctx, amount: int, recipient: str = ""):
        """Bridge tokens without waiting for confirmation (fire-and-forget)."""
        if amount <= 0:
            raise ValueError("Amount must be positive")

        target = recipient or ctx.sender
        eth = Chain.ethereum()

        # Fire-and-forget -- msg_id is a string
        msg_id = eth.call(
            "0xEthBridge", "lock_and_mint",
            amount=amount, recipient=target,
        )
        self.state.total_bridged += amount
        self.emit(event.BridgeSent(msg_id=str(msg_id), amount=amount))

    @receiver(chains=["ethereum"])
    def on_tokens_locked(self, ctx, amount: int, sender: str):
        """Receive notification that tokens were locked on Ethereum.

        Only accepts messages from Ethereum (chain whitelist enforced).
        ctx.source_chain, ctx.message_id, ctx.is_cross_chain are available.
        """
        self.state.total_received += amount
        self.emit(event.TokensReceived(
            amount=amount,
            sender=sender,
            source_chain=ctx.source_chain,
            message_id=ctx.message_id,
        ))

    @receiver(chains=["ethereum", "solana"])
    def on_bridge_complete(self, ctx, bridge_id: int, status: str):
        """Receive bridge completion from Ethereum or Solana."""
        key = str(bridge_id)
        if key in self.state.pending_bridges:
            self.state.pending_bridges[key]["status"] = status
            self.emit(event.BridgeStatusUpdated(
                bridge_id=bridge_id, status=status,
            ))

    @receiver()
    def on_any_chain_message(self, ctx, data: str):
        """Receive messages from any chain (no whitelist)."""
        self.emit(event.AnyChainMessage(
            data=data,
            source_chain=ctx.source_chain,
        ))

    @query
    def get_bridge_status(self, bridge_id: int) -> dict:
        return self.state.pending_bridges.get(str(bridge_id), {})

    @query
    def get_stats(self) -> dict:
        return {
            "total_bridged": self.state.total_bridged,
            "total_received": self.state.total_received,
            "bridge_count": self.state.bridge_count,
        }

Chain Class

The Chain class represents a destination chain. Create one with a factory method or by name:

from panda import Chain

# Factory methods
eth = Chain.ethereum()
sol = Chain.solana()
l2  = Chain.l2()
hub = Chain.hub()

# By name
eth = Chain("ethereum")
sol = Chain("solana")
l2  = Chain("panda_l2")
hub = Chain("panda_hub")

Supported Chains

ChainConstructorConstant
EthereumChain.ethereum()ETHEREUM
SolanaChain.solana()SOLANA
Panda L2Chain.l2()PANDA_L2
Panda HubChain.hub()PANDA_HUB

You can also import the constants directly:

from panda import ETHEREUM, SOLANA, PANDA_L2, PANDA_HUB

Sending Cross-Chain Messages

There are three ways to send messages to another chain, from lowest-level to highest-level.

1. send_message() -- Low-Level

The lowest-level API. Emits a raw cross-chain message with arbitrary key-value payload. This is how the real TokenBridge contract sends bridge messages:

from panda import contract, constructor, call, query, event
from panda import send_message, Chain

@contract
class TokenBridge:
    class State:
        name: str = "PandaBridge"
        owner: str = ""
        locked_balances: dict = {}
        total_locked: int = 0
        total_released: int = 0
        message_count: int = 0

    @constructor
    def deploy(self, ctx, name: str = "PandaBridge"):
        self.state.name = name
        self.state.owner = ctx.sender

    @call
    def lock_and_bridge(self, ctx, dest_chain: str, recipient: str, amount: int):
        """Lock tokens on this chain and emit a cross-chain mint message."""
        if amount <= 0:
            raise ValueError("Amount must be positive")

        balance = self.state.locked_balances.get(ctx.sender, 0)
        self.state.locked_balances[ctx.sender] = balance + amount
        self.state.total_locked += amount
        self.state.message_count += 1

        msg_id = send_message(
            dest_chain,
            recipient,
            action="mint",
            amount=amount,
            sender=ctx.sender,
        )
        self.emit(event.TokensLocked(
            sender=ctx.sender,
            dest_chain=dest_chain,
            recipient=recipient,
            amount=amount,
            msg_id=msg_id,
        ))

send_message() returns a message ID (hex string) for tracking. If something goes wrong, it raises a RuntimeError.

For error-safe sending, use try_send_message() — as demonstrated in the real CrossChainMessenger contract:

from panda import send_message, try_send_message, Chain

@call
def send_safe(self, ctx, dest_chain: str, recipient: str, message: str = ""):
    """Send a message using try_send_message (never raises)."""
    success, result = try_send_message(
        dest_chain,
        recipient,
        message=message,
        sender=ctx.sender,
    )
    if success:
        self.state.sent_count += 1
        self.state.last_sent_id = result
    self.emit(event.SendAttempt(success=success, result=result))

2. Chain.call() / Chain.send() -- Mid-Level

The Chain class wraps send_message() with a cleaner API. The real TokenBridge uses both approaches:

@call
def bridge_to_ethereum(self, ctx, recipient: str, amount: int):
    """Convenience: bridge tokens to Ethereum using the Chain handle."""
    if amount <= 0:
        raise ValueError("Amount must be positive")

    balance = self.state.locked_balances.get(ctx.sender, 0)
    self.state.locked_balances[ctx.sender] = balance + amount
    self.state.total_locked += amount
    self.state.message_count += 1

    eth = Chain.ethereum()
    msg_id = eth.send(recipient, action="mint", amount=amount, sender=ctx.sender)
    self.emit(event.TokensLocked(
        sender=ctx.sender,
        dest_chain="ethereum",
        recipient=recipient,
        amount=amount,
        msg_id=msg_id,
    ))

The real CrossChainMessenger also demonstrates broadcasting to multiple chains:

@call
def broadcast(self, ctx, message: str = ""):
    """Broadcast a message to all supported chains."""
    chains = ["ethereum", "solana", "panda_hub"]
    for chain in chains:
        dest = Chain(chain)
        msg_id = dest.send(
            "0x0000000000000000000000000000000000000000",
            message=message,
            sender=ctx.sender,
        )
        self.state.sent_count += 1
        self.state.last_sent_id = msg_id

3. Typed Remote Handles -- High-Level

The most ergonomic API. Get a handle to a remote contract and call methods like normal Python. This is demonstrated in the real CrossChainSwap contract:

RemoteContract -- untyped proxy for any contract:

eth = Chain.ethereum()
remote = eth.contract("0xEthDex")

# Call methods directly -- they route through cross-chain messaging
msg_id = remote.deposit(amount=eth_amount, sender=ctx.sender)
msg_id = remote.swap(amount=amount, sender=ctx.sender)

RemotePRC20 -- typed proxy for PRC-20 tokens:

eth = Chain.ethereum()
usdc = eth.prc20(token_address)

# Typed methods with keyword arguments
msg_id = usdc.transfer(to=ctx.sender, value=amount)
msg_id = usdc.approve(spender="0xDex", value=5000)
msg_id = usdc.transfer_from(owner="0xAlice", to="0xBob", value=100)
msg_id = usdc.mint(to="0xBob", amount=500)

Fire-and-Forget vs Await

Every cross-chain call returns a dual-use object that works as both a string (message ID) and an awaitable.

Fire-and-Forget

Use the return value as a string. The message is emitted and execution continues immediately without waiting for a response. From the real BridgeV2:

@call
def bridge_tokens_fire_and_forget(self, ctx, amount: int, recipient: str = ""):
    """Bridge tokens without waiting for confirmation (fire-and-forget)."""
    if amount <= 0:
        raise ValueError("Amount must be positive")

    target = recipient or ctx.sender
    eth = Chain.ethereum()

    # Fire-and-forget -- msg_id is a string
    msg_id = eth.call(
        "0xEthBridge", "lock_and_mint",
        amount=amount, recipient=target,
    )
    self.state.total_bridged += amount
    self.emit(event.BridgeSent(msg_id=str(msg_id), amount=amount))

And the real CrossChainSwap:

@call
def fire_and_forget_swap(self, ctx, amount: int, dest_chain: str):
    """Fire-and-forget cross-chain swap (no await)."""
    chain = Chain(dest_chain)
    remote = chain.contract("0xDex")
    msg_id = remote.swap(amount=amount, sender=ctx.sender)

    self.state.swap_count += 1
    self.state.total_volume += amount
    self.emit(event.SwapSent(msg_id=str(msg_id), amount=amount, dest=dest_chain))

Await (Async Completion)

Use await to suspend the contract until a cross-chain response arrives. The method must be async def. From the real CrossChainSwap:

@call
async def swap_to_ethereum(self, ctx, amount: int, token_address: str):
    """Swap tokens cross-chain to Ethereum using typed remote handle."""
    if amount <= 0:
        raise ValueError("Amount must be positive")

    eth = Chain.ethereum()
    usdc = eth.prc20(token_address)

    swap_id = self.state.swap_count
    self.state.pending_swaps[str(swap_id)] = {
        "sender": ctx.sender,
        "amount": amount,
        "token": token_address,
        "status": "pending",
    }
    self.state.swap_count += 1

    # Await cross-chain transfer
    _result = await usdc.transfer(to=ctx.sender, value=amount)

    # Runs when response arrives
    self.state.pending_swaps[str(swap_id)]["status"] = "completed"
    self.state.total_volume += amount
    self.emit(event.SwapCompleted(swap_id=swap_id, amount=amount))

Sequential Multi-Chain Operations

You can await multiple cross-chain calls in sequence. Each one suspends until the previous completes. From the real CrossChainSwap:

@call
async def multi_chain_operation(self, ctx, eth_amount: int, sol_amount: int):
    """Perform operations on multiple chains sequentially."""
    eth = Chain.ethereum()
    sol = Chain.solana()

    # First, call Ethereum
    eth_remote = eth.contract("0xEthDex")
    _result1 = await eth_remote.deposit(amount=eth_amount, sender=ctx.sender)

    # Then, call Solana (runs after Ethereum response)
    sol_remote = sol.contract("SolDex")
    _result2 = await sol_remote.deposit(amount=sol_amount, sender=ctx.sender)

    self.state.total_volume += eth_amount + sol_amount
    self.emit(event.MultiChainOp(eth_amount=eth_amount, sol_amount=sol_amount))

Receiving Cross-Chain Messages

The @receiver Decorator

Mark a method as a cross-chain message handler with @receiver. Receiver methods can only be invoked with a valid cross-chain context -- direct calls from users are rejected.

from panda import receiver

@receiver(chains=["ethereum"])
def on_tokens_locked(self, ctx, amount: int, sender: str):
    """Only accepts messages from Ethereum."""
    self.state.total_received += amount
    self.emit(event.TokensReceived(
        amount=amount,
        sender=sender,
        source_chain=ctx.source_chain,
        message_id=ctx.message_id,
    ))

Chain Whitelisting

The chains parameter restricts which source chains can deliver messages. These are real test contracts that verify whitelisting:

# Only Ethereum -- Solana messages are rejected with ContractError
@contract
class EthOnlyBridge:
    class State:
        total_received: int = 0
        last_source: str = ""
        last_msg_id: str = ""

    @receiver(chains=["ethereum"])
    def on_tokens(self, ctx, amount: int, from_addr: str = ""):
        self.state.total_received += amount
        self.state.last_source = ctx.source_chain
        self.state.last_msg_id = ctx.message_id

# Ethereum or Solana -- rejects panda_l2, panda_hub, etc.
@contract
class MultiChainReceiver:
    class State:
        received: list = []

    @receiver(chains=["ethereum", "solana"])
    def on_message(self, ctx, data: str):
        self.state.received.append({"data": data, "source": ctx.source_chain})

    # No whitelist -- accepts from any chain
    @receiver()
    def on_any(self, ctx, data: str):
        self.state.received.append({"data": data, "source": ctx.source_chain, "any": True})

Security: Direct Call Rejection

@receiver methods cannot be invoked via normal @call -- they require cross-chain context. And @call methods cannot be invoked via call_cross_chain():

@contract
class MixedContract:
    class State:
        value: int = 0
        received: int = 0

    @call
    def set_value(self, ctx, value: int):
        self.state.value = value

    @receiver(chains=["ethereum"])
    def on_update(self, ctx, value: int):
        self.state.received += value

# In tests:
# runner.call(addr, "on_update", sender="attacker", value=100)
#   → ContractError (direct call to @receiver rejected)
#
# runner.call_cross_chain(addr, "set_value", source_chain="ethereum", value=42)
#   → ContractError ("set_value is not a @receiver")

Cross-Chain Context

Inside a @receiver method, the context object (ctx) includes additional fields:

@receiver(chains=["ethereum", "solana"])
def on_message(self, ctx, data: str):
    ctx.sender            # Address of the relayer that delivered the message
    ctx.source_chain      # Origin chain: "ethereum", "solana", etc.
    ctx.message_id        # Unique ID of this cross-chain message (hex string)
    ctx.is_cross_chain    # Always True inside a @receiver
    ctx.block_height      # Current block on this chain
    ctx.block_time        # Current block timestamp

Complete Example: Cross-Chain DeFi

The real CrossChainSwap contract combines typed handles, async/await, fire-and-forget, and @receiver in a single contract:

from panda import contract, constructor, call, query, receiver, event, Chain

@contract
class CrossChainSwap:
    class State:
        owner: str = ""
        swap_count: int = 0
        total_volume: int = 0
        pending_swaps: dict = {}

    @constructor
    def deploy(self, ctx):
        self.state.owner = ctx.sender

    @call
    async def swap_to_ethereum(self, ctx, amount: int, token_address: str):
        """Swap tokens cross-chain to Ethereum using typed remote handle."""
        if amount <= 0:
            raise ValueError("Amount must be positive")

        eth = Chain.ethereum()
        usdc = eth.prc20(token_address)

        swap_id = self.state.swap_count
        self.state.pending_swaps[str(swap_id)] = {
            "sender": ctx.sender,
            "amount": amount,
            "token": token_address,
            "status": "pending",
        }
        self.state.swap_count += 1

        # Await cross-chain transfer
        _result = await usdc.transfer(to=ctx.sender, value=amount)

        # Runs when response arrives
        self.state.pending_swaps[str(swap_id)]["status"] = "completed"
        self.state.total_volume += amount
        self.emit(event.SwapCompleted(swap_id=swap_id, amount=amount))

    @call
    async def multi_chain_operation(self, ctx, eth_amount: int, sol_amount: int):
        """Perform operations on multiple chains sequentially."""
        eth = Chain.ethereum()
        sol = Chain.solana()

        eth_remote = eth.contract("0xEthDex")
        _result1 = await eth_remote.deposit(amount=eth_amount, sender=ctx.sender)

        sol_remote = sol.contract("SolDex")
        _result2 = await sol_remote.deposit(amount=sol_amount, sender=ctx.sender)

        self.state.total_volume += eth_amount + sol_amount
        self.emit(event.MultiChainOp(eth_amount=eth_amount, sol_amount=sol_amount))

    @call
    def fire_and_forget_swap(self, ctx, amount: int, dest_chain: str):
        """Fire-and-forget cross-chain swap (no await)."""
        chain = Chain(dest_chain)
        remote = chain.contract("0xDex")
        msg_id = remote.swap(amount=amount, sender=ctx.sender)

        self.state.swap_count += 1
        self.state.total_volume += amount
        self.emit(event.SwapSent(msg_id=str(msg_id), amount=amount, dest=dest_chain))

    @receiver(chains=["ethereum"])
    def on_swap_result(self, ctx, swap_id: int, success: bool):
        """Receive swap result notification from Ethereum."""
        key = str(swap_id)
        if key in self.state.pending_swaps:
            self.state.pending_swaps[key]["status"] = "success" if success else "failed"
            self.emit(event.SwapResultReceived(
                swap_id=swap_id,
                success=success,
                source_chain=ctx.source_chain,
            ))

    @query
    def get_swap(self, swap_id: int) -> dict:
        return self.state.pending_swaps.get(str(swap_id), {})

    @query
    def get_volume(self) -> int:
        return self.state.total_volume

Testing Cross-Chain Contracts

The ContractTestRunner provides methods to simulate cross-chain messaging locally.

Calling a @receiver Method

Use call_cross_chain() to simulate a relayer delivering a cross-chain message:

from panda.testing import ContractTestRunner

runner = ContractTestRunner()
addr = runner.deploy(EthOnlyBridge, sender="deployer")

# Simulate receiving a message from Ethereum
result = runner.call_cross_chain(
    addr, "on_tokens",
    source_chain="ethereum",
    amount=100,
    from_addr="alice",
)

# Verify the receiver processed the message
state = runner.get_state(addr)
assert state["total_received"] == 100
assert state["last_source"] == "ethereum"
assert state["last_msg_id"].startswith("0x")

Custom Message ID

runner.call_cross_chain(
    addr, "on_tokens",
    source_chain="ethereum",
    message_id="0xcustom123",
    amount=25,
)

state = runner.get_state(addr)
assert state["last_msg_id"] == "0xcustom123"

Testing Chain Whitelisting

# Correct chain -- accepted
runner.call_cross_chain(addr, "on_tokens", source_chain="ethereum", amount=100)

# Wrong chain -- rejected
with pytest.raises(ContractError, match="does not accept messages from chain"):
    runner.call_cross_chain(addr, "on_tokens", source_chain="solana", amount=100)

Testing Direct Call Rejection

# Direct call to @receiver -- rejected
with pytest.raises(ContractError):
    runner.call(addr, "on_tokens", sender="attacker", amount=100)

# Cross-chain call to @call method -- rejected
with pytest.raises(ContractError, match="not a @receiver"):
    runner.call_cross_chain(addr, "set_value", source_chain="ethereum", value=42)

Delivering Responses to Awaited Calls

For async def methods that use await chain.call(), deliver the response with deliver_cross_chain_response():

runner = ContractTestRunner()
addr = runner.deploy(CrossChainSwap, sender="deployer")

# Start an async cross-chain call
runner.call(
    addr, "swap_to_ethereum", sender="alice",
    amount=100, token_address="0xUSDC",
)

# Check what cross-chain calls are pending
pending = runner.get_pending_cross_chain_calls()
assert len(pending) == 1
msg_id = pending[0]["msg_id"]

# Deliver the response -- this resumes the suspended coroutine
result = runner.deliver_cross_chain_response(msg_id, response={"success": True})

# The code after `await` has now executed
assert result.state["total_volume"] == 100

Inspecting Emitted Messages

All cross-chain messages emitted during test execution are captured:

runner = ContractTestRunner()
addr = runner.deploy(CrossChainMessenger, sender="deployer")

runner.call(
    addr, "send_to", sender="alice",
    dest_chain="ethereum", recipient="0xBob", message="hello",
)

messages = runner.get_cross_chain_messages()
assert len(messages) == 1
assert messages[0]["dest_chain"] == "ethereum"
assert messages[0]["recipient"] == "0xBob"
assert messages[0]["payload"]["message"] == "hello"

# Clear messages between test cases
runner.clear_cross_chain_messages()

Message Flow

  1. A contract calls Chain.call(), send_message(), or a RemoteContract method
  2. PandaVM captures the message intent in the execution output
  3. The host chain (Geth or rollup sequencer) picks up the message
  4. A bridge relayer delivers the message to the destination chain
  5. On the destination chain, the @receiver method or receiving contract processes it
  6. If the sender used await, the response is delivered back and the suspended coroutine resumes

API Reference

Functions

FunctionDescription
send_message(dest_chain, recipient, **payload)Emit a raw cross-chain message. Returns msg_id (str).
try_send_message(dest_chain, recipient, **payload)Safe variant. Returns (success, msg_id_or_error).

Chain Class

MethodDescription
Chain(name)Create a chain handle by name
Chain.ethereum()Factory for Ethereum
Chain.solana()Factory for Solana
Chain.l2()Factory for Panda L2
Chain.hub()Factory for Panda Hub
chain.call(addr, method, **kwargs)Call a remote contract method
chain.transfer(addr, amount, **kwargs)Transfer value
chain.send(addr, **payload)Send raw message
chain.contract(addr)Get RemoteContract handle
chain.prc20(addr)Get RemotePRC20 handle

RemoteContract

Untyped proxy. Call any method by name:

remote = eth.contract("0x1234...")
msg_id = remote.any_method(arg1=val1, arg2=val2)
result = await remote.any_method(arg1=val1, arg2=val2)

RemotePRC20

Typed PRC-20 proxy with named methods:

MethodTypeSignature
transfer@calltransfer(to=, value=)
approve@callapprove(spender=, value=)
transfer_from@calltransfer_from(owner=, to=, value=)
mint@callmint(to=, amount=)
balance_of@querybalance_of(owner=)
total_supply@querytotal_supply()

ContractTestRunner Methods

MethodDescription
call_cross_chain(addr, method, source_chain=, message_id=, **kwargs)Invoke a @receiver with cross-chain context
deliver_cross_chain_response(msg_id, response=)Resume a suspended await chain.call()
get_cross_chain_messages()List all emitted cross-chain messages
clear_cross_chain_messages()Clear the message log
get_pending_cross_chain_calls()List suspended async cross-chain calls

Cross-Chain Messenger

The CrossChainMessenger is a complete messaging contract that demonstrates send_message(), try_send_message(), Chain.send(), and multi-chain broadcasting. This is the full contract from contracts/examples/cross_chain_messenger.py:

"""Simple cross-chain messenger -- sends and receives messages across chains.

Demonstrates the minimal cross-chain SDK usage:
  - send_message() to emit messages
  - try_send_message() for error-safe sending
  - Chain class with factory methods
  - Receiving messages via normal @call methods

Usage:
    from panda import send_message, try_send_message, Chain
"""

from panda import contract, constructor, call, query, event
from panda import send_message, try_send_message, Chain


@contract
class CrossChainMessenger:
    """A simple cross-chain messenger that tracks sent and received messages."""

    class State:
        owner: str = ""
        sent_count: int = 0
        received_count: int = 0
        last_sent_id: str = ""
        last_received_from: str = ""
        last_received_payload: dict = {}
        message_log: list = []

    @constructor
    def deploy(self, ctx):
        self.state.owner = ctx.sender

    @call
    def send_to(self, ctx, dest_chain: str, recipient: str, message: str = ""):
        """Send a text message to another chain."""
        msg_id = send_message(
            dest_chain,
            recipient,
            message=message,
            sender=ctx.sender,
        )
        self.state.sent_count += 1
        self.state.last_sent_id = msg_id
        self.state.message_log.append(
            {
                "direction": "sent",
                "chain": dest_chain,
                "recipient": recipient,
                "msg_id": msg_id,
            }
        )
        self.emit(
            event.MessageSent(
                dest_chain=dest_chain,
                recipient=recipient,
                msg_id=msg_id,
            )
        )

    @call
    def send_safe(self, ctx, dest_chain: str, recipient: str, message: str = ""):
        """Send a message using try_send_message (never raises)."""
        success, result = try_send_message(
            dest_chain,
            recipient,
            message=message,
            sender=ctx.sender,
        )
        if success:
            self.state.sent_count += 1
            self.state.last_sent_id = result
        self.emit(event.SendAttempt(success=success, result=result))

    @call
    def broadcast(self, ctx, message: str = ""):
        """Broadcast a message to all supported chains."""
        chains = ["ethereum", "solana", "panda_hub"]
        for chain in chains:
            dest = Chain(chain)
            msg_id = dest.send(
                "0x0000000000000000000000000000000000000000",
                message=message,
                sender=ctx.sender,
            )
            self.state.sent_count += 1
            self.state.last_sent_id = msg_id

    @call
    def receive(self, ctx, sender: str = "", payload: dict = {}):
        """Receive a cross-chain message (called by relayer/sequencer)."""
        self.state.received_count += 1
        self.state.last_received_from = sender
        self.state.last_received_payload = payload
        self.state.message_log.append(
            {
                "direction": "received",
                "sender": sender,
                "payload": payload,
            }
        )
        self.emit(event.MessageReceived(sender=sender))

    @query
    def stats(self) -> dict:
        return {
            "sent_count": self.state.sent_count,
            "received_count": self.state.received_count,
            "last_sent_id": self.state.last_sent_id,
            "last_received_from": self.state.last_received_from,
        }

    @query
    def get_log(self) -> list:
        return self.state.message_log

Walkthrough

The CrossChainMessenger shows four distinct sending patterns:

  1. send_to -- uses send_message() directly. This is the lowest-level API. The contract passes a destination chain name, a recipient address, and keyword arguments that become the message payload. The returned msg_id is a hex string for tracking.

  2. send_safe -- uses try_send_message(), which never raises. Instead it returns a (success, result) tuple. If the send fails (e.g. invalid chain name), success is False and result contains the error string. This is useful when you want to handle failures gracefully rather than reverting the entire transaction.

  3. broadcast -- iterates over multiple chain names and uses Chain(name).send() to send to each. This demonstrates that Chain handles can be created dynamically from strings, not just from factory methods like Chain.ethereum().

  4. receive -- a plain @call method that acts as a message inbox. In production, only the authorized relayer would call this. For contracts that need stricter security, use @receiver instead (see the BridgeV2 example above).

The contract also maintains a message_log list that records every sent and received message, making it easy to audit message history via the get_log() query.

Token Bridge

The TokenBridge demonstrates the lock-and-mint pattern -- the standard approach for bridging tokens between chains. Tokens are locked on the source chain and minted (or released) on the destination chain. This is the full contract from contracts/examples/token_bridge.py:

"""Cross-chain token bridge -- locks tokens on source chain, emits message to mint on destination.

Demonstrates the ergonomic cross-chain SDK:
  - send_message() for low-level message emission
  - Chain("ethereum") for handle-based messaging
  - Receiving cross-chain messages via normal @call methods

Usage:
    from panda import contract, constructor, call, query, event
    from panda import send_message, Chain
"""

from panda import contract, constructor, call, query, event
from panda import send_message, Chain


@contract
class TokenBridge:
    """A cross-chain token bridge that locks tokens and emits bridge messages.

    Tokens locked on this chain are released (or minted) on the destination
    chain when the relayer delivers the cross-chain message.
    """

    class State:
        name: str = "PandaBridge"
        owner: str = ""
        locked_balances: dict = {}
        total_locked: int = 0
        total_released: int = 0
        message_count: int = 0

    @constructor
    def deploy(self, ctx, name: str = "PandaBridge"):
        self.state.name = name
        self.state.owner = ctx.sender

    @call
    def lock_and_bridge(self, ctx, dest_chain: str, recipient: str, amount: int):
        """Lock tokens on this chain and emit a cross-chain mint message."""
        if amount <= 0:
            raise ValueError("Amount must be positive")

        # Lock tokens
        balance = self.state.locked_balances.get(ctx.sender, 0)
        self.state.locked_balances[ctx.sender] = balance + amount
        self.state.total_locked += amount
        self.state.message_count += 1

        # Emit cross-chain message to mint on destination
        msg_id = send_message(
            dest_chain,
            recipient,
            action="mint",
            amount=amount,
            sender=ctx.sender,
        )

        self.emit(
            event.TokensLocked(
                sender=ctx.sender,
                dest_chain=dest_chain,
                recipient=recipient,
                amount=amount,
                msg_id=msg_id,
            )
        )

    @call
    def bridge_to_ethereum(self, ctx, recipient: str, amount: int):
        """Convenience: bridge tokens to Ethereum using the Chain handle."""
        if amount <= 0:
            raise ValueError("Amount must be positive")

        balance = self.state.locked_balances.get(ctx.sender, 0)
        self.state.locked_balances[ctx.sender] = balance + amount
        self.state.total_locked += amount
        self.state.message_count += 1

        eth = Chain.ethereum()
        msg_id = eth.send(recipient, action="mint", amount=amount, sender=ctx.sender)

        self.emit(
            event.TokensLocked(
                sender=ctx.sender,
                dest_chain="ethereum",
                recipient=recipient,
                amount=amount,
                msg_id=msg_id,
            )
        )

    @call
    def receive_and_release(self, ctx, recipient: str, amount: int, source_chain: str = ""):
        """Release tokens when a cross-chain message arrives from another chain.

        In production, only the authorized relayer/sequencer can call this.
        """
        if amount <= 0:
            raise ValueError("Amount must be positive")

        locked = self.state.locked_balances.get(recipient, 0)
        if locked < amount:
            raise ValueError(f"Insufficient locked balance: {locked} < {amount}")

        self.state.locked_balances[recipient] = locked - amount
        self.state.total_released += amount

        self.emit(
            event.TokensReleased(
                recipient=recipient,
                amount=amount,
                source_chain=source_chain,
            )
        )

    @query
    def get_locked_balance(self, owner: str = "") -> int:
        return self.state.locked_balances.get(owner, 0)

    @query
    def stats(self) -> dict:
        return {
            "name": self.state.name,
            "total_locked": self.state.total_locked,
            "total_released": self.state.total_released,
            "message_count": self.state.message_count,
        }

Walkthrough

The TokenBridge has three key methods that show the full lifecycle of a cross-chain bridge:

  1. lock_and_bridge -- the generic entry point. It accepts any destination chain as a string ("ethereum", "solana", etc.), locks the caller's tokens by incrementing their balance in locked_balances, and emits a cross-chain message via send_message(). The message payload includes action="mint" so the receiving side knows what to do.

  2. bridge_to_ethereum -- a convenience wrapper that hardcodes the destination to Ethereum. Instead of send_message(), it uses Chain.ethereum().send(), which is the mid-level API. Both approaches emit the same cross-chain message -- Chain.send() just wraps send_message() with a cleaner syntax.

  3. receive_and_release -- the reverse direction. When a relayer delivers a message from another chain indicating that tokens were locked there, this method releases the equivalent amount on this chain. It checks that the recipient has sufficient locked balance before releasing, preventing over-withdrawal.

The contract also provides two queries: get_locked_balance to check a specific address's locked amount, and stats to get aggregate bridge statistics.

Cross-Chain DeFi

The CrossChainSwap contract combines typed remote handles (RemoteContract, RemotePRC20), async/await, fire-and-forget messaging, and @receiver callbacks in a single contract. This is the full contract from contracts/examples/cross_chain_defi.py:

"""
cross_chain_defi -- Cross-chain DeFi swap using typed remote handles + await.

Demonstrates:
  - Chain.contract() for untyped remote contract handles
  - Chain.prc20() for typed PRC-20 remote token handles
  - await for async cross-chain completion
  - Combining @receiver and async workflows
"""

from panda import contract, constructor, call, query, receiver, event, Chain


@contract
class CrossChainSwap:
    class State:
        owner: str = ""
        swap_count: int = 0
        total_volume: int = 0
        pending_swaps: dict = {}

    @constructor
    def deploy(self, ctx):
        self.state.owner = ctx.sender

    @call
    async def swap_to_ethereum(self, ctx, amount: int, token_address: str):
        """Swap tokens cross-chain to Ethereum using typed remote handle."""
        if amount <= 0:
            raise ValueError("Amount must be positive")

        eth = Chain.ethereum()
        usdc = eth.prc20(token_address)

        swap_id = self.state.swap_count
        self.state.pending_swaps[str(swap_id)] = {
            "sender": ctx.sender,
            "amount": amount,
            "token": token_address,
            "status": "pending",
        }
        self.state.swap_count += 1

        # Await cross-chain transfer
        _result = await usdc.transfer(to=ctx.sender, value=amount)

        # Runs when response arrives
        self.state.pending_swaps[str(swap_id)]["status"] = "completed"
        self.state.total_volume += amount
        self.emit(event.SwapCompleted(swap_id=swap_id, amount=amount))

    @call
    async def multi_chain_operation(self, ctx, eth_amount: int, sol_amount: int):
        """Perform operations on multiple chains sequentially."""
        eth = Chain.ethereum()
        sol = Chain.solana()

        # First, call Ethereum
        eth_remote = eth.contract("0xEthDex")
        _result1 = await eth_remote.deposit(amount=eth_amount, sender=ctx.sender)

        # Then, call Solana (runs after Ethereum response)
        sol_remote = sol.contract("SolDex")
        _result2 = await sol_remote.deposit(amount=sol_amount, sender=ctx.sender)

        self.state.total_volume += eth_amount + sol_amount
        self.emit(event.MultiChainOp(eth_amount=eth_amount, sol_amount=sol_amount))

    @call
    def fire_and_forget_swap(self, ctx, amount: int, dest_chain: str):
        """Fire-and-forget cross-chain swap (no await)."""
        chain = Chain(dest_chain)
        remote = chain.contract("0xDex")
        msg_id = remote.swap(amount=amount, sender=ctx.sender)

        self.state.swap_count += 1
        self.state.total_volume += amount
        self.emit(event.SwapSent(msg_id=str(msg_id), amount=amount, dest=dest_chain))

    @receiver(chains=["ethereum"])
    def on_swap_result(self, ctx, swap_id: int, success: bool):
        """Receive swap result notification from Ethereum."""
        key = str(swap_id)
        if key in self.state.pending_swaps:
            self.state.pending_swaps[key]["status"] = "success" if success else "failed"
            self.emit(
                event.SwapResultReceived(
                    swap_id=swap_id,
                    success=success,
                    source_chain=ctx.source_chain,
                )
            )

    @query
    def get_swap(self, swap_id: int) -> dict:
        return self.state.pending_swaps.get(str(swap_id), {})

    @query
    def get_volume(self) -> int:
        return self.state.total_volume

Walkthrough

The CrossChainSwap demonstrates the most advanced cross-chain patterns:

  1. Typed remote handles -- swap_to_ethereum uses eth.prc20(token_address) to get a RemotePRC20 handle. This gives you typed methods like .transfer(to=, value=) instead of raw message payloads. The multi_chain_operation uses eth.contract("0xEthDex") for an untyped RemoteContract handle where you call any method by name.

  2. async/await -- both swap_to_ethereum and multi_chain_operation are async def methods. The await keyword suspends contract execution until the cross-chain response arrives. Code after await runs only when the destination chain confirms the operation.

  3. Sequential multi-chain -- multi_chain_operation awaits Ethereum first, then Solana. The Solana call only executes after the Ethereum response arrives. This guarantees ordering across chains.

  4. Fire-and-forget -- fire_and_forget_swap shows the alternative: call a remote contract without waiting. The msg_id is returned as a string for tracking, but execution continues immediately.

  5. @receiver callback -- on_swap_result handles asynchronous notifications from Ethereum. It updates the swap status in pending_swaps based on the result. The chains=["ethereum"] whitelist ensures only Ethereum can send these notifications.

Relay Patterns

These three contracts from the Panda repo demonstrate cross-contract call relaying -- where one contract calls another, which calls a third. This is useful for testing ctx.sender vs ctx.origin_sender in call chains.

SenderInspector

The innermost contract. It records who called it (ctx.sender) and who originated the transaction (ctx.origin_sender):

"""Records ctx.sender and ctx.origin_sender for cross-contract call tests."""

from panda import contract, call, query


@contract
class SenderInspector:
    class State:
        last_sender: str = ""
        last_origin: str = ""

    @call
    def record(self, ctx):
        self.state.last_sender = ctx.sender
        self.state.last_origin = ctx.origin_sender
        return ctx.sender

    @query
    def get_sender(self):
        return self.state.last_sender

    @query
    def get_origin(self):
        return self.state.last_origin

RelayMiddle

The middle layer. It holds a reference to a SenderInspector and relays calls to it via Contract():

"""Relays a cross-contract call to SenderInspector. Used for depth-2 tests."""

from panda import contract, call, Contract


@contract
class RelayMiddle:
    class State:
        inspector_address: str = ""

    @call
    def set_inspector(self, ctx, address: str):
        self.state.inspector_address = address

    @call
    def relay_record(self, ctx):
        """Call inspector.record() — inspector should see this contract as sender."""
        inspector = Contract(self.state.inspector_address)
        return inspector.record()

RelayOuter

The outermost layer. It calls RelayMiddle, which calls SenderInspector:

"""Calls RelayMiddle which calls SenderInspector. Used for depth-2 tests."""

from panda import contract, call, Contract


@contract
class RelayOuter:
    class State:
        result: str = ""

    @call
    def call_through(self, ctx, middle_address: str):
        """Call middle.relay_record() which calls inspector.record()."""
        middle = Contract(middle_address)
        r = middle.relay_record()
        self.state.result = str(r)
        return r

Walkthrough

The relay pattern tests a fundamental property of cross-contract calls: ctx.sender is always the immediate caller, while ctx.origin_sender is the original transaction signer.

When a user calls RelayOuter.call_through():

  1. RelayOuter calls RelayMiddle.relay_record() -- inside RelayMiddle, ctx.sender is the RelayOuter contract address
  2. RelayMiddle calls SenderInspector.record() -- inside SenderInspector, ctx.sender is the RelayMiddle contract address
  3. In all three contracts, ctx.origin_sender is the original user's address

This is the same behavior as Ethereum's msg.sender vs tx.origin. The Contract() class (distinct from Chain()) is used for same-chain cross-contract calls, not cross-chain messaging.

Next Steps