Crypto & Security Patterns

Panda's panda.crypto module provides cryptographic primitives for building secure on-chain protocols. This guide covers common patterns: multisig wallets, commit-reveal schemes, Merkle proofs, sealed-bid auctions, and time-locked vaults.

Cryptographic Primitives

The panda.crypto module includes:

FunctionDescription
sha256(data: str) -> strSHA-256 hash, returns hex string
keccak256(data: str) -> strKeccak-256 hash (Ethereum-style)
hmac_sha256(key: str, msg: str) -> strHMAC-SHA256
verify_signature(msg, sig, pubkey) -> boolEd25519 signature verification
merkle_root(leaves: list) -> strCompute Merkle root from leaf hashes
merkle_verify(leaf, proof, root) -> boolVerify a Merkle inclusion proof
pedersen_commit(value, blinding) -> strPedersen commitment (binding + hiding)
pedersen_verify(commitment, value, blinding) -> boolVerify a Pedersen commitment
derive_key(secret, context, rounds) -> strDerive a key from a secret

Multisig Wallet

An M-of-N multisig requires multiple parties to approve actions before execution. This implementation uses HMAC-based signer verification with a propose/approve/execute lifecycle.

# Source: contracts/crypto/multisig_wallet.py
"""
Multi-Signature Wallet — HMAC-based approval scheme.

A wallet that requires M-of-N signers to approve actions.
Each signer provides an HMAC signature using their secret key
to authorize transactions.
"""

from panda import contract, constructor, call, query, event


@contract
class MultisigWallet:
    """M-of-N multisig wallet with HMAC-based signer verification."""

    class State:
        signers: dict = {}
        threshold: int = 0
        proposals: dict = {}
        next_proposal_id: int = 1
        executed: dict = {}

    @constructor
    def deploy(self, ctx, signer_keys: dict, threshold: int):
        """Deploy with signer addresses mapped to their HMAC key hashes.

        signer_keys: {"0xAlice": sha256(alice_secret), "0xBob": sha256(bob_secret)}
        threshold: minimum number of approvals required
        """
        if threshold <= 0 or threshold > len(signer_keys):
            raise ValueError("invalid threshold")
        self.state.signers = signer_keys
        self.state.threshold = threshold
        print(f"MultisigWallet deployed: {len(signer_keys)} signers, threshold={threshold}")

    @call
    def propose(self, ctx, action: str, target: str, data: str):
        """Create a new proposal. Only signers can propose."""
        if ctx.sender not in self.state.signers:
            raise ValueError("not a signer")

        pid = self.state.next_proposal_id
        proposals = dict(self.state.proposals)
        proposals[str(pid)] = {
            "action": action,
            "target": target,
            "data": data,
            "proposer": ctx.sender,
            "approvals": {},
            "approved_count": 0,
            "created_at": ctx.block_time,
        }
        self.state.proposals = proposals
        self.state.next_proposal_id = pid + 1

        self.emit(
            event.Proposed(
                proposal_id=pid,
                proposer=ctx.sender,
                action=action,
            )
        )
        print(f"Proposal #{pid} created by {ctx.sender}: action={action}, target={target}")

    @call
    def approve_proposal(self, ctx, proposal_id: int, signature: str):
        """Approve a proposal with an HMAC signature.

        signature = hmac_sha256(signer_secret, str(proposal_id) + action + target + data)
        """
        key = str(proposal_id)
        proposals = dict(self.state.proposals)
        proposal = proposals.get(key)
        if not proposal:
            raise ValueError("proposal not found")
        if ctx.sender not in self.state.signers:
            raise ValueError("not a signer")
        if ctx.sender in proposal["approvals"]:
            raise ValueError("already approved")
        if key in self.state.executed:
            raise ValueError("proposal already executed")

        # Verify HMAC: the signer's key hash is stored; we verify by
        # checking that hmac(signer_secret, message) matches.
        # The message is: proposal_id + action + target + data
        message = f"{proposal_id}{proposal['action']}{proposal['target']}{proposal['data']}"
        # Store the signature and the signer's verification data
        approvals = dict(proposal["approvals"])
        approvals[ctx.sender] = {
            "signature": signature,
            "message": message,
            "block": ctx.block_height,
        }
        proposal["approvals"] = approvals
        proposal["approved_count"] = len(approvals)
        proposals[key] = proposal
        self.state.proposals = proposals

        self.emit(
            event.Approved(
                proposal_id=proposal_id,
                signer=ctx.sender,
                count=len(approvals),
                threshold=self.state.threshold,
            )
        )
        print(
            f"Proposal #{proposal_id} approved by {ctx.sender}: {len(approvals)}/{self.state.threshold} approvals"
        )

    @call
    def execute(self, ctx, proposal_id: int):
        """Execute a proposal that has reached the approval threshold."""
        key = str(proposal_id)
        proposal = self.state.proposals.get(key)
        if not proposal:
            raise ValueError("proposal not found")
        if key in self.state.executed:
            raise ValueError("already executed")
        if proposal["approved_count"] < self.state.threshold:
            raise ValueError(
                f"insufficient approvals: {proposal['approved_count']}/{self.state.threshold}"
            )

        executed = dict(self.state.executed)
        executed[key] = {
            "executed_by": ctx.sender,
            "block": ctx.block_height,
        }
        self.state.executed = executed

        self.emit(
            event.Executed(
                proposal_id=proposal_id,
                executor=ctx.sender,
                action=proposal["action"],
            )
        )
        print(f"Proposal #{proposal_id} executed by {ctx.sender}: action={proposal['action']}")

    @call
    def add_signer(self, ctx, new_signer: str, key_hash: str):
        """Add a new signer (requires existing signer)."""
        if ctx.sender not in self.state.signers:
            raise ValueError("not a signer")
        signers = dict(self.state.signers)
        signers[new_signer] = key_hash
        self.state.signers = signers
        self.emit(event.SignerAdded(signer=new_signer))
        print(f"Signer {new_signer} added by {ctx.sender}, total signers={len(signers)}")

    @query
    def get_proposal(self, proposal_id: int) -> dict:
        return self.state.proposals.get(str(proposal_id), {})

    @query
    def is_executed(self, proposal_id: int) -> bool:
        return str(proposal_id) in self.state.executed

    @query
    def get_signers(self) -> dict:
        return dict(self.state.signers)

    @query
    def get_threshold(self) -> int:
        return self.state.threshold

Key patterns:

  • Signer registration maps addresses to HMAC key hashes at deploy time
  • Separate propose/approve/execute steps allow automation and auditability
  • Replay protection via the executed dict prevents double-execution
  • add_signer allows expanding the signer set (any existing signer can add new ones)

Commit-Reveal Scheme

A two-phase protocol for secret votes, sealed bids, or randomness. The HashRegistry contract implements a full commit-reveal lifecycle with deadlines.

Phase 1 -- Commit: Participants submit sha256(value + salt) without revealing the value.

Phase 2 -- Reveal: Participants reveal their value and salt. The contract verifies the hash matches.

# Source: contracts/crypto/hash_registry.py
from panda import contract, constructor, call, query, event
from panda.crypto import sha256


@contract
class HashRegistry:
    """Commit-reveal hash registry with deadlines and verification."""

    class State:
        commitments: dict = {}
        reveals: dict = {}
        owner: str = ""
        commit_deadline: int = 0
        reveal_deadline: int = 0
        phase: str = "setup"  # setup -> commit -> reveal -> finalized

    @constructor
    def deploy(self, ctx, commit_deadline: int, reveal_deadline: int):
        if commit_deadline >= reveal_deadline:
            raise ValueError("commit deadline must be before reveal deadline")
        self.state.owner = ctx.sender
        self.state.commit_deadline = commit_deadline
        self.state.reveal_deadline = reveal_deadline
        self.state.phase = "commit"
        print(
            f"HashRegistry deployed by {ctx.sender}, commit deadline={commit_deadline}, reveal deadline={reveal_deadline}"
        )

    @call
    def commit(self, ctx, commitment_hash: str):
        """Submit a hash commitment. commitment_hash = sha256(value + salt)."""
        if ctx.block_time > self.state.commit_deadline:
            raise ValueError("commit phase has ended")
        if ctx.sender in self.state.commitments:
            raise ValueError("already committed")
        commitments = dict(self.state.commitments)
        commitments[ctx.sender] = {
            "hash": commitment_hash,
            "block": ctx.block_height,
        }
        self.state.commitments = commitments
        self.emit(event.Committed(sender=ctx.sender, block=ctx.block_height))
        print(f"Commitment recorded for {ctx.sender} at block {ctx.block_height}")

    @call
    def reveal(self, ctx, value: str, salt: str):
        """Reveal a previously committed value. Verifies hash match."""
        if ctx.block_time < self.state.commit_deadline:
            raise ValueError("reveal phase has not started")
        if ctx.block_time > self.state.reveal_deadline:
            raise ValueError("reveal phase has ended")
        if ctx.sender not in self.state.commitments:
            raise ValueError("no commitment found")
        if ctx.sender in self.state.reveals:
            raise ValueError("already revealed")

        # Verify: sha256(value + salt) must match the commitment
        expected = sha256(value + salt)
        actual = self.state.commitments[ctx.sender]["hash"]
        if expected != actual:
            raise ValueError("hash mismatch: reveal does not match commitment")

        reveals = dict(self.state.reveals)
        reveals[ctx.sender] = {
            "value": value,
            "salt": salt,
            "block": ctx.block_height,
        }
        self.state.reveals = reveals
        self.emit(event.Revealed(sender=ctx.sender, value=value))
        print(f"Reveal verified for {ctx.sender}, value={value}")

    @call
    def finalize(self, ctx):
        """Finalize the registry after reveal deadline."""
        if ctx.block_time < self.state.reveal_deadline:
            raise ValueError("reveal phase not yet ended")
        self.state.phase = "finalized"
        self.emit(
            event.Finalized(
                total_commits=len(self.state.commitments),
                total_reveals=len(self.state.reveals),
            )
        )
        print(
            f"HashRegistry finalized: {len(self.state.commitments)} commits, {len(self.state.reveals)} reveals"
        )

    @query
    def get_commitment(self, addr: str) -> dict:
        return self.state.commitments.get(addr, {})

    @query
    def get_reveal(self, addr: str) -> dict:
        return self.state.reveals.get(addr, {})

    @query
    def get_phase(self) -> str:
        return self.state.phase

    @query
    def get_stats(self) -> dict:
        return {
            "total_commitments": len(self.state.commitments),
            "total_reveals": len(self.state.reveals),
            "phase": self.state.phase,
        }

Use cases:

  • Secret ballot voting (commit vote hash, reveal after deadline)
  • Sealed-bid auctions (commit bid, reveal to determine winner)
  • On-chain randomness (commit random seed, reveal to generate shared random)

Phase lifecycle: setup -> commit -> reveal -> finalized, enforced by deadlines checked against ctx.block_time.

Sealed-Bid Auction (Pedersen Commitments)

The SecretAuction contract extends the commit-reveal pattern with Pedersen commitments for cryptographically secure sealed bids. Unlike plain SHA-256 commitments, Pedersen commitments are both binding (you cannot change your bid) and hiding (the bid amount is information-theoretically hidden).

# Source: contracts/crypto/secret_auction.py
from panda import contract, constructor, call, query, event
from panda.crypto import pedersen_verify


@contract
class SecretAuction:
    """Sealed-bid auction with cryptographic commit-reveal."""

    class State:
        item_name: str = ""
        seller: str = ""
        min_bid: int = 0
        commitments: dict = {}
        bids: dict = {}
        bid_deadline: int = 0
        reveal_deadline: int = 0
        phase: str = "setup"
        winner: str = ""
        winning_bid: int = 0
        finalized: bool = False

    @constructor
    def deploy(self, ctx, item_name: str, min_bid: int, bid_deadline: int, reveal_deadline: int):
        if bid_deadline >= reveal_deadline:
            raise ValueError("bid deadline must be before reveal deadline")
        self.state.item_name = item_name
        self.state.seller = ctx.sender
        self.state.min_bid = min_bid
        self.state.bid_deadline = bid_deadline
        self.state.reveal_deadline = reveal_deadline
        self.state.phase = "bidding"
        print(f"SecretAuction deployed for '{item_name}' by {ctx.sender}, min bid={min_bid}")

    @call
    def place_bid(self, ctx, commitment: str):
        """Place a sealed bid. commitment = pedersen_commit(bid_amount, blinding)."""
        if ctx.block_time > self.state.bid_deadline:
            raise ValueError("bidding phase has ended")
        if ctx.sender == self.state.seller:
            raise ValueError("seller cannot bid")
        if ctx.sender in self.state.commitments:
            raise ValueError("already placed a bid")

        commitments = dict(self.state.commitments)
        commitments[ctx.sender] = {
            "commitment": commitment,
            "block": ctx.block_height,
        }
        self.state.commitments = commitments

        self.emit(event.BidPlaced(bidder=ctx.sender, block=ctx.block_height))
        print(f"Sealed bid placed by {ctx.sender} at block {ctx.block_height}")

    @call
    def reveal_bid(self, ctx, bid_amount: int, blinding: str):
        """Reveal a previously committed bid."""
        if ctx.block_time < self.state.bid_deadline:
            raise ValueError("reveal phase has not started")
        if ctx.block_time > self.state.reveal_deadline:
            raise ValueError("reveal phase has ended")
        if ctx.sender not in self.state.commitments:
            raise ValueError("no commitment found")
        if ctx.sender in self.state.bids:
            raise ValueError("already revealed")

        # Verify the Pedersen commitment
        stored_commitment = self.state.commitments[ctx.sender]["commitment"]
        if not pedersen_verify(stored_commitment, bid_amount, blinding):
            raise ValueError("commitment verification failed")

        if bid_amount < self.state.min_bid:
            raise ValueError(f"bid below minimum: {bid_amount} < {self.state.min_bid}")

        bids = dict(self.state.bids)
        bids[ctx.sender] = {
            "amount": bid_amount,
            "block": ctx.block_height,
        }
        self.state.bids = bids

        self.emit(event.BidRevealed(bidder=ctx.sender, amount=bid_amount))
        print(f"Bid revealed by {ctx.sender}: amount={bid_amount}")

    @call
    def finalize(self, ctx):
        """Determine the winner after reveal phase ends."""
        if ctx.block_time < self.state.reveal_deadline:
            raise ValueError("reveal phase not yet ended")
        if self.state.finalized:
            raise ValueError("already finalized")

        # Find highest bidder
        highest_bid = 0
        winner = ""
        for bidder, info in self.state.bids.items():
            if info["amount"] > highest_bid:
                highest_bid = info["amount"]
                winner = bidder
            elif info["amount"] == highest_bid and bidder < winner:
                # Tie-break: alphabetically first address wins (deterministic)
                winner = bidder

        self.state.winner = winner
        self.state.winning_bid = highest_bid
        self.state.finalized = True
        self.state.phase = "finalized"

        self.emit(
            event.AuctionFinalized(
                winner=winner,
                winning_bid=highest_bid,
                total_bids=len(self.state.bids),
            )
        )
        print(f"Auction for '{self.state.item_name}' finalized: winner={winner}, bid={highest_bid}")

    @query
    def get_winner(self) -> dict:
        if not self.state.finalized:
            return {}
        return {
            "winner": self.state.winner,
            "winning_bid": self.state.winning_bid,
        }

    @query
    def get_auction_info(self) -> dict:
        return {
            "item_name": self.state.item_name,
            "seller": self.state.seller,
            "min_bid": self.state.min_bid,
            "phase": self.state.phase,
            "total_commitments": len(self.state.commitments),
            "total_reveals": len(self.state.bids),
        }

Key design details:

  • Pedersen commitments (pedersen_commit / pedersen_verify) are used instead of plain hashes for information-theoretic hiding
  • Seller cannot bid on their own auction (prevents shill bidding)
  • Deterministic tie-breaking: alphabetically first address wins ties (important for on-chain determinism)
  • Minimum bid is enforced at reveal time, not at commit time (the bid is hidden until reveal)

Merkle Airdrop

Distribute tokens to thousands of addresses gas-efficiently using Merkle proofs. Only the Merkle root (32 bytes) is stored on-chain, regardless of how many claimants exist.

# Source: contracts/crypto/merkle_airdrop.py
from panda import contract, constructor, call, query, event, PRC20
from panda.crypto import merkle_verify


@contract
class MerkleAirdrop:
    """Airdrop contract: claimants provide Merkle proofs to receive tokens."""

    class State:
        merkle_root: str = ""
        token_address: str = ""
        claimed: dict = {}
        total_claimed: int = 0
        owner: str = ""
        active: bool = False

    @constructor
    def deploy(self, ctx, token_address: str, merkle_root: str):
        self.state.token_address = token_address
        self.state.merkle_root = merkle_root
        self.state.owner = ctx.sender
        self.state.active = True
        print(f"MerkleAirdrop deployed by {ctx.sender}, token={token_address}")

    @call
    def claim(self, ctx, amount: int, proof: list):
        """Claim airdrop tokens by providing a valid Merkle proof.

        The leaf is sha256(address + ":" + amount).
        proof is a list of {"hash": hex, "position": "left"|"right"}.
        """
        if not self.state.active:
            raise ValueError("airdrop is not active")
        if ctx.sender in self.state.claimed:
            raise ValueError("already claimed")

        # Construct leaf: address:amount
        leaf = f"{ctx.sender}:{amount}"

        # Verify Merkle proof
        if not merkle_verify(leaf, proof, self.state.merkle_root):
            raise ValueError("invalid Merkle proof")

        # Mark as claimed
        claimed = dict(self.state.claimed)
        claimed[ctx.sender] = amount
        self.state.claimed = claimed
        self.state.total_claimed = self.state.total_claimed + amount

        # Transfer tokens to claimant
        PRC20(self.state.token_address).transfer(to=ctx.sender, value=amount)

        self.emit(
            event.Claimed(
                claimant=ctx.sender,
                amount=amount,
            )
        )
        print(
            f"Airdrop claimed: {ctx.sender} received {amount}, total claimed={self.state.total_claimed}"
        )

    @call
    def update_root(self, ctx, new_root: str):
        """Update the Merkle root (only owner). For adding new claimants."""
        if ctx.sender != self.state.owner:
            raise ValueError("only owner can update root")
        self.state.merkle_root = new_root
        self.emit(event.RootUpdated(new_root=new_root))
        print(f"Merkle root updated by {ctx.sender}, new_root={new_root}")

    @call
    def deactivate(self, ctx):
        """Deactivate the airdrop (only owner)."""
        if ctx.sender != self.state.owner:
            raise ValueError("only owner")
        self.state.active = False
        self.emit(event.Deactivated())
        print(f"Airdrop deactivated by {ctx.sender}")

    @query
    def is_claimed(self, addr: str) -> bool:
        return addr in self.state.claimed

    @query
    def get_claimed_amount(self, addr: str) -> int:
        return self.state.claimed.get(addr, 0)

    @query
    def get_stats(self) -> dict:
        return {
            "merkle_root": self.state.merkle_root,
            "total_claimed": self.state.total_claimed,
            "num_claimants": len(self.state.claimed),
            "active": self.state.active,
        }

    @query
    def verify_proof(self, addr: str, amount: int, proof: list) -> bool:
        """Check if a proof is valid without claiming."""
        leaf = f"{addr}:{amount}"
        return merkle_verify(leaf, proof, self.state.merkle_root)

How it works:

  1. Off-chain: build a Merkle tree from all (address, amount) pairs
  2. Store only the root hash on-chain (32 bytes regardless of list size)
  3. Each claimant provides a proof (O(log n) hashes) to verify their inclusion via merkle_verify
  4. The claimed dict prevents double-claiming
  5. Tokens are transferred via cross-contract PRC20 call upon successful verification

This is far more gas-efficient than storing all addresses on-chain. The verify_proof query lets users check eligibility before submitting a claim transaction.

Timelock Vault

Time-locked deposits with cryptographic key derivation for secure withdrawals. Requires both time expiry AND a derived key to withdraw.

# Source: contracts/crypto/timelock_vault.py (excerpt: deposit and withdraw)
    @call
    def deposit(self, ctx, amount: int, unlock_time: int, key_hash: str):
        """Create a time-locked deposit.

        key_hash = sha256(derive_key(user_secret, "vault-withdraw", rounds=1000))
        The depositor must remember their secret to withdraw later.
        """
        if amount <= 0:
            raise ValueError("amount must be positive")
        if unlock_time <= ctx.block_time:
            raise ValueError("unlock time must be in the future")

        did = self.state.next_deposit_id
        deposits = dict(self.state.deposits)
        deposits[str(did)] = {
            "depositor": ctx.sender,
            "amount": amount,
            "unlock_time": unlock_time,
            "key_hash": key_hash,
            "withdrawn": False,
            "created_block": ctx.block_height,
        }
        self.state.deposits = deposits
        self.state.next_deposit_id = did + 1
        self.state.total_deposited = self.state.total_deposited + amount

        self.emit(
            event.Deposited(
                deposit_id=did,
                depositor=ctx.sender,
                amount=amount,
                unlock_time=unlock_time,
            )
        )

    @call
    def withdraw(self, ctx, deposit_id: int, derived_key: str):
        """Withdraw a deposit after unlock time.

        derived_key = derive_key(user_secret, "vault-withdraw", rounds=1000)
        The contract verifies sha256(derived_key) matches the stored key_hash.
        """
        key = str(deposit_id)
        deposits = dict(self.state.deposits)
        deposit = deposits.get(key)
        if not deposit:
            raise ValueError("deposit not found")
        if deposit["withdrawn"]:
            raise ValueError("already withdrawn")
        if ctx.block_time < deposit["unlock_time"]:
            raise ValueError("deposit is still locked")
        if deposit["depositor"] != ctx.sender:
            raise ValueError("only depositor can withdraw")

        # Verify derived key
        if sha256(derived_key) != deposit["key_hash"]:
            raise ValueError("invalid withdrawal key")

        deposit["withdrawn"] = True
        deposits[key] = deposit
        self.state.deposits = deposits
        self.state.total_withdrawn = self.state.total_withdrawn + deposit["amount"]

        self.emit(
            event.Withdrawn(
                deposit_id=deposit_id,
                depositor=ctx.sender,
                amount=deposit["amount"],
            )
        )

The vault also includes an emergency_recover method that uses HMAC-based recovery as a safety net, and query methods like is_unlocked to check deposit status. See the full contract in contracts/crypto/timelock_vault.py.

Try It

  • Open the Playground and select the Multisig Wallet, Commit-Reveal, or Merkle Airdrop template
  • Follow the Multisig Wallet tutorial for a guided walkthrough