Crypto & Security Patterns
Panda's panda.crypto module provides cryptographic primitives for building secure on-chain protocols. This guide covers common patterns: multisig wallets, commit-reveal schemes, Merkle proofs, sealed-bid auctions, and time-locked vaults.
Cryptographic Primitives
The panda.crypto module includes:
| Function | Description |
|---|---|
sha256(data: str) -> str | SHA-256 hash, returns hex string |
keccak256(data: str) -> str | Keccak-256 hash (Ethereum-style) |
hmac_sha256(key: str, msg: str) -> str | HMAC-SHA256 |
verify_signature(msg, sig, pubkey) -> bool | Ed25519 signature verification |
merkle_root(leaves: list) -> str | Compute Merkle root from leaf hashes |
merkle_verify(leaf, proof, root) -> bool | Verify a Merkle inclusion proof |
pedersen_commit(value, blinding) -> str | Pedersen commitment (binding + hiding) |
pedersen_verify(commitment, value, blinding) -> bool | Verify a Pedersen commitment |
derive_key(secret, context, rounds) -> str | Derive a key from a secret |
Multisig Wallet
An M-of-N multisig requires multiple parties to approve actions before execution. This implementation uses HMAC-based signer verification with a propose/approve/execute lifecycle.
# Source: contracts/crypto/multisig_wallet.py
"""
Multi-Signature Wallet — HMAC-based approval scheme.
A wallet that requires M-of-N signers to approve actions.
Each signer provides an HMAC signature using their secret key
to authorize transactions.
"""
from panda import contract, constructor, call, query, event
@contract
class MultisigWallet:
"""M-of-N multisig wallet with HMAC-based signer verification."""
class State:
signers: dict = {}
threshold: int = 0
proposals: dict = {}
next_proposal_id: int = 1
executed: dict = {}
@constructor
def deploy(self, ctx, signer_keys: dict, threshold: int):
"""Deploy with signer addresses mapped to their HMAC key hashes.
signer_keys: {"0xAlice": sha256(alice_secret), "0xBob": sha256(bob_secret)}
threshold: minimum number of approvals required
"""
if threshold <= 0 or threshold > len(signer_keys):
raise ValueError("invalid threshold")
self.state.signers = signer_keys
self.state.threshold = threshold
print(f"MultisigWallet deployed: {len(signer_keys)} signers, threshold={threshold}")
@call
def propose(self, ctx, action: str, target: str, data: str):
"""Create a new proposal. Only signers can propose."""
if ctx.sender not in self.state.signers:
raise ValueError("not a signer")
pid = self.state.next_proposal_id
proposals = dict(self.state.proposals)
proposals[str(pid)] = {
"action": action,
"target": target,
"data": data,
"proposer": ctx.sender,
"approvals": {},
"approved_count": 0,
"created_at": ctx.block_time,
}
self.state.proposals = proposals
self.state.next_proposal_id = pid + 1
self.emit(
event.Proposed(
proposal_id=pid,
proposer=ctx.sender,
action=action,
)
)
print(f"Proposal #{pid} created by {ctx.sender}: action={action}, target={target}")
@call
def approve_proposal(self, ctx, proposal_id: int, signature: str):
"""Approve a proposal with an HMAC signature.
signature = hmac_sha256(signer_secret, str(proposal_id) + action + target + data)
"""
key = str(proposal_id)
proposals = dict(self.state.proposals)
proposal = proposals.get(key)
if not proposal:
raise ValueError("proposal not found")
if ctx.sender not in self.state.signers:
raise ValueError("not a signer")
if ctx.sender in proposal["approvals"]:
raise ValueError("already approved")
if key in self.state.executed:
raise ValueError("proposal already executed")
# Verify HMAC: the signer's key hash is stored; we verify by
# checking that hmac(signer_secret, message) matches.
# The message is: proposal_id + action + target + data
message = f"{proposal_id}{proposal['action']}{proposal['target']}{proposal['data']}"
# Store the signature and the signer's verification data
approvals = dict(proposal["approvals"])
approvals[ctx.sender] = {
"signature": signature,
"message": message,
"block": ctx.block_height,
}
proposal["approvals"] = approvals
proposal["approved_count"] = len(approvals)
proposals[key] = proposal
self.state.proposals = proposals
self.emit(
event.Approved(
proposal_id=proposal_id,
signer=ctx.sender,
count=len(approvals),
threshold=self.state.threshold,
)
)
print(
f"Proposal #{proposal_id} approved by {ctx.sender}: {len(approvals)}/{self.state.threshold} approvals"
)
@call
def execute(self, ctx, proposal_id: int):
"""Execute a proposal that has reached the approval threshold."""
key = str(proposal_id)
proposal = self.state.proposals.get(key)
if not proposal:
raise ValueError("proposal not found")
if key in self.state.executed:
raise ValueError("already executed")
if proposal["approved_count"] < self.state.threshold:
raise ValueError(
f"insufficient approvals: {proposal['approved_count']}/{self.state.threshold}"
)
executed = dict(self.state.executed)
executed[key] = {
"executed_by": ctx.sender,
"block": ctx.block_height,
}
self.state.executed = executed
self.emit(
event.Executed(
proposal_id=proposal_id,
executor=ctx.sender,
action=proposal["action"],
)
)
print(f"Proposal #{proposal_id} executed by {ctx.sender}: action={proposal['action']}")
@call
def add_signer(self, ctx, new_signer: str, key_hash: str):
"""Add a new signer (requires existing signer)."""
if ctx.sender not in self.state.signers:
raise ValueError("not a signer")
signers = dict(self.state.signers)
signers[new_signer] = key_hash
self.state.signers = signers
self.emit(event.SignerAdded(signer=new_signer))
print(f"Signer {new_signer} added by {ctx.sender}, total signers={len(signers)}")
@query
def get_proposal(self, proposal_id: int) -> dict:
return self.state.proposals.get(str(proposal_id), {})
@query
def is_executed(self, proposal_id: int) -> bool:
return str(proposal_id) in self.state.executed
@query
def get_signers(self) -> dict:
return dict(self.state.signers)
@query
def get_threshold(self) -> int:
return self.state.threshold
Key patterns:
- Signer registration maps addresses to HMAC key hashes at deploy time
- Separate propose/approve/execute steps allow automation and auditability
- Replay protection via the
executeddict prevents double-execution add_signerallows expanding the signer set (any existing signer can add new ones)
Commit-Reveal Scheme
A two-phase protocol for secret votes, sealed bids, or randomness. The HashRegistry contract implements a full commit-reveal lifecycle with deadlines.
Phase 1 -- Commit: Participants submit sha256(value + salt) without revealing the value.
Phase 2 -- Reveal: Participants reveal their value and salt. The contract verifies the hash matches.
# Source: contracts/crypto/hash_registry.py
from panda import contract, constructor, call, query, event
from panda.crypto import sha256
@contract
class HashRegistry:
"""Commit-reveal hash registry with deadlines and verification."""
class State:
commitments: dict = {}
reveals: dict = {}
owner: str = ""
commit_deadline: int = 0
reveal_deadline: int = 0
phase: str = "setup" # setup -> commit -> reveal -> finalized
@constructor
def deploy(self, ctx, commit_deadline: int, reveal_deadline: int):
if commit_deadline >= reveal_deadline:
raise ValueError("commit deadline must be before reveal deadline")
self.state.owner = ctx.sender
self.state.commit_deadline = commit_deadline
self.state.reveal_deadline = reveal_deadline
self.state.phase = "commit"
print(
f"HashRegistry deployed by {ctx.sender}, commit deadline={commit_deadline}, reveal deadline={reveal_deadline}"
)
@call
def commit(self, ctx, commitment_hash: str):
"""Submit a hash commitment. commitment_hash = sha256(value + salt)."""
if ctx.block_time > self.state.commit_deadline:
raise ValueError("commit phase has ended")
if ctx.sender in self.state.commitments:
raise ValueError("already committed")
commitments = dict(self.state.commitments)
commitments[ctx.sender] = {
"hash": commitment_hash,
"block": ctx.block_height,
}
self.state.commitments = commitments
self.emit(event.Committed(sender=ctx.sender, block=ctx.block_height))
print(f"Commitment recorded for {ctx.sender} at block {ctx.block_height}")
@call
def reveal(self, ctx, value: str, salt: str):
"""Reveal a previously committed value. Verifies hash match."""
if ctx.block_time < self.state.commit_deadline:
raise ValueError("reveal phase has not started")
if ctx.block_time > self.state.reveal_deadline:
raise ValueError("reveal phase has ended")
if ctx.sender not in self.state.commitments:
raise ValueError("no commitment found")
if ctx.sender in self.state.reveals:
raise ValueError("already revealed")
# Verify: sha256(value + salt) must match the commitment
expected = sha256(value + salt)
actual = self.state.commitments[ctx.sender]["hash"]
if expected != actual:
raise ValueError("hash mismatch: reveal does not match commitment")
reveals = dict(self.state.reveals)
reveals[ctx.sender] = {
"value": value,
"salt": salt,
"block": ctx.block_height,
}
self.state.reveals = reveals
self.emit(event.Revealed(sender=ctx.sender, value=value))
print(f"Reveal verified for {ctx.sender}, value={value}")
@call
def finalize(self, ctx):
"""Finalize the registry after reveal deadline."""
if ctx.block_time < self.state.reveal_deadline:
raise ValueError("reveal phase not yet ended")
self.state.phase = "finalized"
self.emit(
event.Finalized(
total_commits=len(self.state.commitments),
total_reveals=len(self.state.reveals),
)
)
print(
f"HashRegistry finalized: {len(self.state.commitments)} commits, {len(self.state.reveals)} reveals"
)
@query
def get_commitment(self, addr: str) -> dict:
return self.state.commitments.get(addr, {})
@query
def get_reveal(self, addr: str) -> dict:
return self.state.reveals.get(addr, {})
@query
def get_phase(self) -> str:
return self.state.phase
@query
def get_stats(self) -> dict:
return {
"total_commitments": len(self.state.commitments),
"total_reveals": len(self.state.reveals),
"phase": self.state.phase,
}
Use cases:
- Secret ballot voting (commit vote hash, reveal after deadline)
- Sealed-bid auctions (commit bid, reveal to determine winner)
- On-chain randomness (commit random seed, reveal to generate shared random)
Phase lifecycle: setup -> commit -> reveal -> finalized, enforced by deadlines checked against ctx.block_time.
Sealed-Bid Auction (Pedersen Commitments)
The SecretAuction contract extends the commit-reveal pattern with Pedersen commitments for cryptographically secure sealed bids. Unlike plain SHA-256 commitments, Pedersen commitments are both binding (you cannot change your bid) and hiding (the bid amount is information-theoretically hidden).
# Source: contracts/crypto/secret_auction.py
from panda import contract, constructor, call, query, event
from panda.crypto import pedersen_verify
@contract
class SecretAuction:
"""Sealed-bid auction with cryptographic commit-reveal."""
class State:
item_name: str = ""
seller: str = ""
min_bid: int = 0
commitments: dict = {}
bids: dict = {}
bid_deadline: int = 0
reveal_deadline: int = 0
phase: str = "setup"
winner: str = ""
winning_bid: int = 0
finalized: bool = False
@constructor
def deploy(self, ctx, item_name: str, min_bid: int, bid_deadline: int, reveal_deadline: int):
if bid_deadline >= reveal_deadline:
raise ValueError("bid deadline must be before reveal deadline")
self.state.item_name = item_name
self.state.seller = ctx.sender
self.state.min_bid = min_bid
self.state.bid_deadline = bid_deadline
self.state.reveal_deadline = reveal_deadline
self.state.phase = "bidding"
print(f"SecretAuction deployed for '{item_name}' by {ctx.sender}, min bid={min_bid}")
@call
def place_bid(self, ctx, commitment: str):
"""Place a sealed bid. commitment = pedersen_commit(bid_amount, blinding)."""
if ctx.block_time > self.state.bid_deadline:
raise ValueError("bidding phase has ended")
if ctx.sender == self.state.seller:
raise ValueError("seller cannot bid")
if ctx.sender in self.state.commitments:
raise ValueError("already placed a bid")
commitments = dict(self.state.commitments)
commitments[ctx.sender] = {
"commitment": commitment,
"block": ctx.block_height,
}
self.state.commitments = commitments
self.emit(event.BidPlaced(bidder=ctx.sender, block=ctx.block_height))
print(f"Sealed bid placed by {ctx.sender} at block {ctx.block_height}")
@call
def reveal_bid(self, ctx, bid_amount: int, blinding: str):
"""Reveal a previously committed bid."""
if ctx.block_time < self.state.bid_deadline:
raise ValueError("reveal phase has not started")
if ctx.block_time > self.state.reveal_deadline:
raise ValueError("reveal phase has ended")
if ctx.sender not in self.state.commitments:
raise ValueError("no commitment found")
if ctx.sender in self.state.bids:
raise ValueError("already revealed")
# Verify the Pedersen commitment
stored_commitment = self.state.commitments[ctx.sender]["commitment"]
if not pedersen_verify(stored_commitment, bid_amount, blinding):
raise ValueError("commitment verification failed")
if bid_amount < self.state.min_bid:
raise ValueError(f"bid below minimum: {bid_amount} < {self.state.min_bid}")
bids = dict(self.state.bids)
bids[ctx.sender] = {
"amount": bid_amount,
"block": ctx.block_height,
}
self.state.bids = bids
self.emit(event.BidRevealed(bidder=ctx.sender, amount=bid_amount))
print(f"Bid revealed by {ctx.sender}: amount={bid_amount}")
@call
def finalize(self, ctx):
"""Determine the winner after reveal phase ends."""
if ctx.block_time < self.state.reveal_deadline:
raise ValueError("reveal phase not yet ended")
if self.state.finalized:
raise ValueError("already finalized")
# Find highest bidder
highest_bid = 0
winner = ""
for bidder, info in self.state.bids.items():
if info["amount"] > highest_bid:
highest_bid = info["amount"]
winner = bidder
elif info["amount"] == highest_bid and bidder < winner:
# Tie-break: alphabetically first address wins (deterministic)
winner = bidder
self.state.winner = winner
self.state.winning_bid = highest_bid
self.state.finalized = True
self.state.phase = "finalized"
self.emit(
event.AuctionFinalized(
winner=winner,
winning_bid=highest_bid,
total_bids=len(self.state.bids),
)
)
print(f"Auction for '{self.state.item_name}' finalized: winner={winner}, bid={highest_bid}")
@query
def get_winner(self) -> dict:
if not self.state.finalized:
return {}
return {
"winner": self.state.winner,
"winning_bid": self.state.winning_bid,
}
@query
def get_auction_info(self) -> dict:
return {
"item_name": self.state.item_name,
"seller": self.state.seller,
"min_bid": self.state.min_bid,
"phase": self.state.phase,
"total_commitments": len(self.state.commitments),
"total_reveals": len(self.state.bids),
}
Key design details:
- Pedersen commitments (
pedersen_commit/pedersen_verify) are used instead of plain hashes for information-theoretic hiding - Seller cannot bid on their own auction (prevents shill bidding)
- Deterministic tie-breaking: alphabetically first address wins ties (important for on-chain determinism)
- Minimum bid is enforced at reveal time, not at commit time (the bid is hidden until reveal)
Merkle Airdrop
Distribute tokens to thousands of addresses gas-efficiently using Merkle proofs. Only the Merkle root (32 bytes) is stored on-chain, regardless of how many claimants exist.
# Source: contracts/crypto/merkle_airdrop.py
from panda import contract, constructor, call, query, event, PRC20
from panda.crypto import merkle_verify
@contract
class MerkleAirdrop:
"""Airdrop contract: claimants provide Merkle proofs to receive tokens."""
class State:
merkle_root: str = ""
token_address: str = ""
claimed: dict = {}
total_claimed: int = 0
owner: str = ""
active: bool = False
@constructor
def deploy(self, ctx, token_address: str, merkle_root: str):
self.state.token_address = token_address
self.state.merkle_root = merkle_root
self.state.owner = ctx.sender
self.state.active = True
print(f"MerkleAirdrop deployed by {ctx.sender}, token={token_address}")
@call
def claim(self, ctx, amount: int, proof: list):
"""Claim airdrop tokens by providing a valid Merkle proof.
The leaf is sha256(address + ":" + amount).
proof is a list of {"hash": hex, "position": "left"|"right"}.
"""
if not self.state.active:
raise ValueError("airdrop is not active")
if ctx.sender in self.state.claimed:
raise ValueError("already claimed")
# Construct leaf: address:amount
leaf = f"{ctx.sender}:{amount}"
# Verify Merkle proof
if not merkle_verify(leaf, proof, self.state.merkle_root):
raise ValueError("invalid Merkle proof")
# Mark as claimed
claimed = dict(self.state.claimed)
claimed[ctx.sender] = amount
self.state.claimed = claimed
self.state.total_claimed = self.state.total_claimed + amount
# Transfer tokens to claimant
PRC20(self.state.token_address).transfer(to=ctx.sender, value=amount)
self.emit(
event.Claimed(
claimant=ctx.sender,
amount=amount,
)
)
print(
f"Airdrop claimed: {ctx.sender} received {amount}, total claimed={self.state.total_claimed}"
)
@call
def update_root(self, ctx, new_root: str):
"""Update the Merkle root (only owner). For adding new claimants."""
if ctx.sender != self.state.owner:
raise ValueError("only owner can update root")
self.state.merkle_root = new_root
self.emit(event.RootUpdated(new_root=new_root))
print(f"Merkle root updated by {ctx.sender}, new_root={new_root}")
@call
def deactivate(self, ctx):
"""Deactivate the airdrop (only owner)."""
if ctx.sender != self.state.owner:
raise ValueError("only owner")
self.state.active = False
self.emit(event.Deactivated())
print(f"Airdrop deactivated by {ctx.sender}")
@query
def is_claimed(self, addr: str) -> bool:
return addr in self.state.claimed
@query
def get_claimed_amount(self, addr: str) -> int:
return self.state.claimed.get(addr, 0)
@query
def get_stats(self) -> dict:
return {
"merkle_root": self.state.merkle_root,
"total_claimed": self.state.total_claimed,
"num_claimants": len(self.state.claimed),
"active": self.state.active,
}
@query
def verify_proof(self, addr: str, amount: int, proof: list) -> bool:
"""Check if a proof is valid without claiming."""
leaf = f"{addr}:{amount}"
return merkle_verify(leaf, proof, self.state.merkle_root)
How it works:
- Off-chain: build a Merkle tree from all
(address, amount)pairs - Store only the root hash on-chain (32 bytes regardless of list size)
- Each claimant provides a proof (O(log n) hashes) to verify their inclusion via
merkle_verify - The
claimeddict prevents double-claiming - Tokens are transferred via cross-contract
PRC20call upon successful verification
This is far more gas-efficient than storing all addresses on-chain. The verify_proof query lets users check eligibility before submitting a claim transaction.
Timelock Vault
Time-locked deposits with cryptographic key derivation for secure withdrawals. Requires both time expiry AND a derived key to withdraw.
# Source: contracts/crypto/timelock_vault.py (excerpt: deposit and withdraw)
@call
def deposit(self, ctx, amount: int, unlock_time: int, key_hash: str):
"""Create a time-locked deposit.
key_hash = sha256(derive_key(user_secret, "vault-withdraw", rounds=1000))
The depositor must remember their secret to withdraw later.
"""
if amount <= 0:
raise ValueError("amount must be positive")
if unlock_time <= ctx.block_time:
raise ValueError("unlock time must be in the future")
did = self.state.next_deposit_id
deposits = dict(self.state.deposits)
deposits[str(did)] = {
"depositor": ctx.sender,
"amount": amount,
"unlock_time": unlock_time,
"key_hash": key_hash,
"withdrawn": False,
"created_block": ctx.block_height,
}
self.state.deposits = deposits
self.state.next_deposit_id = did + 1
self.state.total_deposited = self.state.total_deposited + amount
self.emit(
event.Deposited(
deposit_id=did,
depositor=ctx.sender,
amount=amount,
unlock_time=unlock_time,
)
)
@call
def withdraw(self, ctx, deposit_id: int, derived_key: str):
"""Withdraw a deposit after unlock time.
derived_key = derive_key(user_secret, "vault-withdraw", rounds=1000)
The contract verifies sha256(derived_key) matches the stored key_hash.
"""
key = str(deposit_id)
deposits = dict(self.state.deposits)
deposit = deposits.get(key)
if not deposit:
raise ValueError("deposit not found")
if deposit["withdrawn"]:
raise ValueError("already withdrawn")
if ctx.block_time < deposit["unlock_time"]:
raise ValueError("deposit is still locked")
if deposit["depositor"] != ctx.sender:
raise ValueError("only depositor can withdraw")
# Verify derived key
if sha256(derived_key) != deposit["key_hash"]:
raise ValueError("invalid withdrawal key")
deposit["withdrawn"] = True
deposits[key] = deposit
self.state.deposits = deposits
self.state.total_withdrawn = self.state.total_withdrawn + deposit["amount"]
self.emit(
event.Withdrawn(
deposit_id=deposit_id,
depositor=ctx.sender,
amount=deposit["amount"],
)
)
The vault also includes an emergency_recover method that uses HMAC-based recovery as a safety net, and query methods like is_unlocked to check deposit status. See the full contract in contracts/crypto/timelock_vault.py.
Try It
- Open the Playground and select the Multisig Wallet, Commit-Reveal, or Merkle Airdrop template
- Follow the Multisig Wallet tutorial for a guided walkthrough