Async Contracts
Panda supports asynchronous smart contracts using Python's native async/await syntax. Async contracts can suspend execution at block boundaries and resume automatically after a specified number of blocks — enabling timelocks, recurring jobs, multi-phase workflows, and keeper-free automation.
How It Works
An async def method with await sleep(blocks=N) splits into phases:
- Phase 1 executes up to the
awaitand commits state - The harness schedules a timer for
Nblocks in the future - Phase 2 resumes automatically when the timer fires, with full access to the contract's current state
from panda import contract, call, query, event
@contract
class TimelockWithdrawal:
class State:
balances: dict = {}
pending_withdrawals: dict = {}
nonces: dict = {}
@call
def deposit(self, ctx, amount: int):
if amount <= 0:
raise ValueError("amount must be positive")
bal = dict(self.state.balances)
bal[ctx.sender] = bal.get(ctx.sender, 0) + amount
self.state.balances = bal
self.emit(event.Deposited(sender=ctx.sender, amount=amount))
@call
async def request_withdrawal(self, ctx, amount: int):
"""Request a timelocked withdrawal. Executes after 100 blocks."""
if amount <= 0 or amount > dict(self.state.balances).get(ctx.sender, 0):
raise ValueError("invalid withdrawal amount")
nonces = dict(self.state.nonces)
nonce = nonces.get(ctx.sender, 0) + 1
nonces[ctx.sender] = nonce
self.state.nonces = nonces
pending = dict(self.state.pending_withdrawals)
pending[f"{ctx.sender}:{nonce}"] = amount
self.state.pending_withdrawals = pending
self.emit(event.WithdrawalRequested(
sender=ctx.sender, amount=amount, nonce=nonce,
))
# --- Suspend here. Resume after 100 blocks. ---
await sleep(blocks=100)
# Phase 2: execute after timelock
key = f"{ctx.sender}:{nonce}"
pending = dict(self.state.pending_withdrawals)
if key not in pending or pending[key] != amount:
return # cancelled or stale
bal = dict(self.state.balances)
if bal.get(ctx.sender, 0) < amount:
return # insufficient balance
bal[ctx.sender] = bal[ctx.sender] - amount
self.state.balances = bal
del pending[key]
self.state.pending_withdrawals = pending
self.emit(event.WithdrawalExecuted(
to=ctx.sender, amount=amount, nonce=nonce,
))
@query
def get_balance(self, address: str) -> int:
return self.state.balances.get(address, 0)
This is a real, tested contract from the Panda repo (contracts/async/timelock_withdrawal.py). The await sleep(blocks=100) line is the boundary — everything above runs immediately, everything below runs 100 blocks later.
await sleep(blocks=N)
The sleep function is the core async primitive. It creates a timer that resumes the coroutine after N blocks:
# Relative: resume N blocks from now
await sleep(blocks=100)
# Absolute: resume at a specific block height
await sleep(until_block=50000)
sleep is a built-in — no import needed (though from panda import sleep also works). Under the hood, the harness converts await sleep() into a timer registration and CPS (continuation-passing style) transformation.
Block Boundaries: def vs async def
This is the most important concept in async contracts. The type of helper function determines whether a call creates a block boundary:
def helper — Same block, no boundary
A plain def helper runs synchronously in the same block as the caller. No state is committed between the caller and the helper.
@contract
class PlainDefHelper:
class State:
value: int = 0
block_at_start: int = 0
block_after_helper: int = 0
def _double(self, x: int):
return x * 2
@call
def set_doubled(self, ctx, x: int):
self.state.block_at_start = ctx.block_height
result = self._double(x)
self.state.block_after_helper = ctx.block_height
self.state.value = result
After calling set_doubled(x=21), block_at_start == block_after_helper — the helper ran in the same block.
async def helper — Block boundary on await
An async def helper creates a block boundary every time it's awaited. The block height advances by 1.
@contract
class AsyncDefYields:
class State:
value: int = 0
block_at_start: int = 0
block_after_helper: int = 0
async def _double(self, ctx, x: int):
return x * 2
@call
async def set_doubled(self, ctx, x: int):
self.state.block_at_start = ctx.block_height
result = await self._double(ctx, x)
self.state.block_after_helper = ctx.block_height
self.state.value = result
After calling set_doubled(x=21), block_after_helper == block_at_start + 1 — the await created a block boundary.
Mixing sync and async helpers
You can freely mix both in the same contract. Sync helpers don't advance the block; async helpers do:
@contract
class MixedHelpers:
class State:
value: int = 0
blocks_seen: list = []
def _sync_validate(self, x: int):
if x < 0:
raise ValueError("negative not allowed")
return x
async def _async_transform(self, ctx, x: int):
return x * 10
@call
async def process(self, ctx, x: int):
self.state.blocks_seen = [ctx.block_height] # block B
validated = self._sync_validate(x)
self.state.blocks_seen = self.state.blocks_seen + [ctx.block_height] # still block B
transformed = await self._async_transform(ctx, validated)
self.state.blocks_seen = self.state.blocks_seen + [ctx.block_height] # block B+1
self.state.value = transformed
Calling process(x=5) produces blocks_seen = [B, B, B+1] — the sync helper stays in block B, the async helper advances to B+1.
Sequential Awaits
Multiple await calls in sequence create a pipeline where each step runs in a successive block:
@contract
class MultiAwait:
class State:
a: int = 0
b: int = 0
c: int = 0
blocks_seen: list = []
async def _step_a(self, ctx, x: int):
return x + 1
async def _step_b(self, ctx, x: int):
return x * 2
async def _step_c(self, ctx, x: int):
return x - 3
@call
async def pipeline(self, ctx, x: int):
self.state.blocks_seen = [ctx.block_height] # block B
self.state.a = await self._step_a(ctx, x) # block B+1
self.state.blocks_seen = self.state.blocks_seen + [ctx.block_height]
self.state.b = await self._step_b(ctx, self.state.a) # block B+2
self.state.blocks_seen = self.state.blocks_seen + [ctx.block_height]
self.state.c = await self._step_c(ctx, self.state.b) # block B+3
self.state.blocks_seen = self.state.blocks_seen + [ctx.block_height]
With x=5: step_a returns 6, step_b returns 12, step_c returns 9. The blocks_seen list shows 4 distinct block heights, each one higher than the last.
Multiple await sleep() — Multi-Phase Workflows
A single method can have multiple await sleep() calls, creating a multi-phase workflow. This is the pattern used for governance timelocks:
from panda import contract, constructor, call, query, event
@contract
class GovernanceTimelock:
class State:
proposals: dict = {}
proposal_count: int = 0
votes: dict = {}
quorum: int = 100
timelock_blocks: int = 200
@constructor
def __init__(self, ctx, quorum: int = 100, timelock_blocks: int = 200):
self.state.quorum = quorum
self.state.timelock_blocks = timelock_blocks
@call
async def propose(self, ctx, description: str):
pid = self.state.proposal_count + 1
self.state.proposal_count = pid
props = dict(self.state.proposals)
props[str(pid)] = {
"description": description,
"proposer": ctx.sender,
"status": "voting",
"votes_for": 0,
"votes_against": 0,
}
self.state.proposals = props
self.state.votes = dict(self.state.votes) | {str(pid): {}}
self.emit(event.ProposalCreated(proposal_id=pid, proposer=ctx.sender))
# Phase 1: Voting period (500 blocks)
await sleep(blocks=500)
# Phase 2: Close voting
key = str(pid)
props = dict(self.state.proposals)
prop = props.get(key)
if not prop or prop["status"] != "voting":
return
total = prop["votes_for"] + prop["votes_against"]
passed = total >= self.state.quorum and prop["votes_for"] > prop["votes_against"]
prop["status"] = "approved" if passed else "failed"
props[key] = prop
self.state.proposals = props
if not passed:
return
# Phase 3: Timelock delay (200 blocks)
await sleep(blocks=200)
# Phase 4: Execute
props = dict(self.state.proposals)
prop = props.get(key)
if not prop or prop["status"] != "approved":
return
prop["status"] = "executed"
props[key] = prop
self.state.proposals = props
self.emit(event.ProposalExecuted(proposal_id=pid))
@call
def vote(self, ctx, proposal_id: int, support: bool):
key, props = str(proposal_id), dict(self.state.proposals)
prop = props.get(key)
if not prop or prop["status"] != "voting":
raise ValueError("proposal not in voting phase")
votes, pv = dict(self.state.votes), dict(dict(self.state.votes).get(key, {}))
if ctx.sender in pv:
raise ValueError("already voted")
pv[ctx.sender] = support
votes[key] = pv
self.state.votes = votes
prop["votes_for" if support else "votes_against"] += 1
props[key] = prop
self.state.proposals = props
@query
def get_proposal(self, proposal_id: int) -> dict:
return self.state.proposals.get(str(proposal_id), {})
The lifecycle: propose() creates a proposal, waits 500 blocks for voting, tallies votes, then waits another 200 blocks as a timelock before execution. Voters call vote() during the 500-block window — it's a sync method, so it executes immediately.
Recurring Patterns: while + await sleep()
For continuous, recurring operations, use a while loop with await sleep(). Each loop iteration runs in a future block:
Heartbeat Monitor
from panda import contract, constructor, call, query, event
@contract
class HeartbeatMonitor:
class State:
monitored_address: str = ""
interval: int = 50
last_seen: int = 0
is_alive: bool = True
missed_count: int = 0
max_missed: int = 3
monitoring: bool = False
@constructor
def __init__(self, ctx, monitored_address: str, interval: int = 50, max_missed: int = 3):
self.state.monitored_address = monitored_address
self.state.interval = interval
self.state.max_missed = max_missed
self.state.last_seen = ctx.block_height
@call
async def start_monitoring(self, ctx):
if self.state.monitoring:
raise ValueError("already monitoring")
self.state.monitoring = True
while self.state.monitoring and self.state.is_alive:
await sleep(blocks=self.state.interval)
if not self.state.monitoring or not self.state.is_alive:
break
if ctx.block_height - self.state.last_seen > self.state.interval:
self.state.missed_count = self.state.missed_count + 1
self.emit(event.HealthWarning(
address=self.state.monitored_address,
missed=self.state.missed_count,
))
if self.state.missed_count >= self.state.max_missed:
self.state.is_alive = False
self.emit(event.NodeDown(address=self.state.monitored_address))
else:
self.state.missed_count = 0
@call
def stop_monitoring(self, ctx):
if not self.state.monitoring:
raise ValueError("not monitoring")
self.state.monitoring = False
@call
def heartbeat(self, ctx):
if ctx.sender != self.state.monitored_address:
raise ValueError("only monitored address can send heartbeat")
self.state.last_seen = ctx.block_height
self.state.missed_count = 0
self.state.is_alive = True
@query
def status(self) -> dict:
return {
"address": self.state.monitored_address,
"alive": self.state.is_alive,
"missed": self.state.missed_count,
"last_seen": self.state.last_seen,
}
The while loop checks health every interval blocks. If the monitored node misses 3 consecutive heartbeats, it's declared down. The stop_monitoring() method is sync — calling it sets monitoring = False, and the next loop iteration breaks.
Auto-Compound DeFi
from panda import contract, constructor, call, query, event
@contract
class AutoCompound:
class State:
staked: dict = {}
rewards: dict = {}
compound_rate: float = 0.05
total_staked: int = 0
is_compounding: bool = False
@constructor
def __init__(self, ctx, compound_rate: float = 0.05):
self.state.compound_rate = compound_rate
@call
def stake(self, ctx, amount: int):
if amount <= 0:
raise ValueError("amount must be positive")
staked = dict(self.state.staked)
staked[ctx.sender] = staked.get(ctx.sender, 0) + amount
self.state.staked = staked
self.state.total_staked = self.state.total_staked + amount
@call
async def start_compounding(self, ctx):
if self.state.is_compounding:
raise ValueError("compounding already active")
self.state.is_compounding = True
while self.state.is_compounding:
await sleep(blocks=100)
if not self.state.is_compounding:
break
# Compound all stakers
staked = dict(self.state.staked)
rewards = dict(self.state.rewards)
total_added = 0
for addr, principal in staked.items():
reward = int(principal * self.state.compound_rate)
if reward > 0:
staked[addr] = principal + reward
rewards[addr] = rewards.get(addr, 0) + reward
total_added += reward
self.state.staked = staked
self.state.rewards = rewards
self.state.total_staked = self.state.total_staked + total_added
self.emit(event.CompoundExecuted(total_added=total_added))
@call
def stop_compounding(self, ctx):
if not self.state.is_compounding:
raise ValueError("compounding not active")
self.state.is_compounding = False
@query
def get_position(self, address: str) -> dict:
return {
"staked": self.state.staked.get(address, 0),
"rewards": self.state.rewards.get(address, 0),
}
Every 100 blocks, the compounder multiplies each staker's principal by 5%. With 1000 staked, after 3 compounds: 1000 -> 1050 -> 1102 -> 1157.
for Loop Vesting
The for loop pattern works naturally with await sleep() for fixed-count periodic operations:
from panda import contract, constructor, call, query, event
@contract
class VestingSchedule:
class State:
beneficiary: str = ""
total_amount: int = 0
released: int = 0
vesting_periods: int = 12
period_blocks: int = 100
current_period: int = 0
owner: str = ""
started: bool = False
@constructor
def __init__(self, ctx, beneficiary: str, total_amount: int,
vesting_periods: int = 12, period_blocks: int = 100):
if total_amount <= 0:
raise ValueError("total_amount must be positive")
self.state.beneficiary = beneficiary
self.state.total_amount = total_amount
self.state.vesting_periods = vesting_periods
self.state.period_blocks = period_blocks
self.state.owner = ctx.sender
@call
async def start_vesting(self, ctx):
if self.state.started:
raise ValueError("vesting already started")
if ctx.sender != self.state.owner:
raise ValueError("only owner can start vesting")
self.state.started = True
for i in range(self.state.vesting_periods):
await sleep(blocks=self.state.period_blocks)
per_period = self.state.total_amount // self.state.vesting_periods
self.state.current_period = self.state.current_period + 1
remaining = self.state.total_amount - self.state.released
# Last period gets any remainder from integer division
if self.state.current_period == self.state.vesting_periods:
amount = remaining
else:
amount = min(per_period, remaining)
self.state.released = self.state.released + amount
self.emit(event.VestingReleased(
beneficiary=self.state.beneficiary,
amount=amount,
period=self.state.current_period,
))
@query
def vested_amount(self) -> dict:
return {
"beneficiary": self.state.beneficiary,
"total": self.state.total_amount,
"released": self.state.released,
"period": self.state.current_period,
"periods": self.state.vesting_periods,
"started": self.state.started,
}
12 periods, each 100 blocks apart. The loop suspends 12 times, releasing tokens each time it resumes.
Scheduled Jobs with Cancellation
The scheduled job pattern demonstrates user-parametric delays and mid-flight cancellation:
from panda import contract, call, query, event
VALID_JOB_TYPES = ["compute", "notify", "cleanup"]
@contract
class ScheduledJob:
class State:
jobs: dict = {}
job_count: int = 0
results: dict = {}
@call
async def schedule_job(self, ctx, job_type: str, delay_blocks: int, params: dict = None):
if job_type not in VALID_JOB_TYPES:
raise ValueError(f"invalid job_type: must be one of {VALID_JOB_TYPES}")
if delay_blocks < 1:
raise ValueError("delay_blocks must be >= 1")
jid = self.state.job_count + 1
self.state.job_count = jid
jobs = dict(self.state.jobs)
jobs[str(jid)] = {
"job_type": job_type,
"params": params or {},
"creator": ctx.sender,
"status": "pending",
}
self.state.jobs = jobs
self.emit(event.JobScheduled(job_id=jid, job_type=job_type, delay=delay_blocks))
await sleep(blocks=delay_blocks)
# Check if cancelled during sleep
key = str(jid)
jobs = dict(self.state.jobs)
job = jobs.get(key)
if not job or job["status"] != "pending":
return # cancelled or removed
# Execute job
if job_type == "compute":
result = {"output": sum((params or {}).get("values", [])), "op": "sum"}
elif job_type == "notify":
result = {"message": (params or {}).get("message", ""), "sent": True}
else:
result = {"keys_removed": (params or {}).get("keys", []), "cleaned": True}
results = dict(self.state.results)
results[key] = result
self.state.results = results
job["status"] = "completed"
jobs[key] = job
self.state.jobs = jobs
self.emit(event.JobCompleted(job_id=jid, job_type=job_type))
@call
def cancel_job(self, ctx, job_id: int):
key, jobs = str(job_id), dict(self.state.jobs)
job = jobs.get(key)
if not job:
raise ValueError("job not found")
if job["creator"] != ctx.sender:
raise ValueError("only creator can cancel")
if job["status"] != "pending":
raise ValueError(f"job is {job['status']}, cannot cancel")
job["status"] = "cancelled"
jobs[key] = job
self.state.jobs = jobs
@query
def get_job(self, job_id: int) -> dict:
key = str(job_id)
return {
"job": self.state.jobs.get(key, {}),
"result": self.state.results.get(key, {}),
}
The key pattern: after await sleep(), the continuation re-reads state to check if the job was cancelled. If cancel_job() was called during the sleep window, the continuation sees status == "cancelled" and skips execution.
Auction Finalizer
The AuctionFinalizer contract uses await sleep(until_block=...) for absolute-block deadlines. After activation, the auction auto-settles when the end block arrives:
from panda import contract, constructor, call, query, event
@contract
class AuctionFinalizer:
"""Timed auction with automatic deadline-based settlement."""
class State:
highest_bid: int = 0
highest_bidder: str = ""
end_block: int = 0
settled: bool = False
active: bool = False
@constructor
def __init__(self, ctx, duration_blocks: int = 1000):
self.state.end_block = ctx.block_height + duration_blocks
print(f"Auction created by {ctx.sender}, ends at block {self.state.end_block}")
@call
async def activate(self, ctx):
"""Activate the auction. Auto-finalizes at end_block."""
if self.state.active:
raise ValueError("auction already activated")
if self.state.settled:
raise ValueError("auction already settled")
self.state.active = True
print(f"Auction activated by {ctx.sender}")
await sleep(until_block=self.state.end_block)
# Auto-finalize after deadline
if self.state.settled:
return # Already settled (shouldn't happen, but defensive)
self.state.settled = True
winner = self.state.highest_bidder
amount = self.state.highest_bid
status = "sold" if winner else "no_bids"
self.emit(event.AuctionSettled(winner=winner or "", amount=amount, status=status))
print(f"Auction settled: winner={winner or 'none'} amount={amount}")
@call
def bid(self, ctx, amount: int):
if self.state.settled:
raise ValueError("auction already settled")
if ctx.block_height >= self.state.end_block:
raise ValueError("auction has ended")
if amount <= self.state.highest_bid:
raise ValueError(f"bid must exceed current highest {self.state.highest_bid}")
self.state.highest_bid = amount
self.state.highest_bidder = ctx.sender
self.emit(event.BidPlaced(bidder=ctx.sender, amount=amount))
print(f"Bid: {ctx.sender} bid {amount}")
@query
def get_auction(self) -> dict:
return {
"highest_bid": self.state.highest_bid,
"highest_bidder": self.state.highest_bidder,
"end_block": self.state.end_block,
"settled": self.state.settled,
}
Key pattern: await sleep(until_block=N) suspends until absolute block N, unlike await sleep(blocks=N) which is relative. The constructor computes end_block = ctx.block_height + duration_blocks, and the activation method sleeps until that block. During the sleep window, users call bid() (a sync method) to place bids. When the block arrives, the continuation auto-settles.
Batch Processor
The BatchProcessor uses a while loop with await sleep(blocks=1) to process items one chunk per block:
from panda import contract, call, query, event
@contract
class BatchProcessor:
"""Process large item lists across multiple blocks."""
class State:
items: list = []
batch_size: int = 10
processed: int = 0
results: list = []
processing: bool = False
@call
async def submit_batch(self, ctx, items: list, batch_size: int = 10):
if self.state.processing:
raise ValueError("batch already in progress")
if not items:
raise ValueError("items list cannot be empty")
self.state.items = list(items)
self.state.batch_size = batch_size
self.state.processed = 0
self.state.results = []
self.state.processing = True
self.emit(event.BatchSubmitted(submitter=ctx.sender, total=len(items)))
print(f"Batch submitted: {len(items)} items, batch_size={batch_size}")
while self.state.processed < len(self.state.items):
await sleep(blocks=1)
if not self.state.processing:
break
# Process one chunk
items_list = list(self.state.items)
results = list(self.state.results)
start = self.state.processed
end = min(start + self.state.batch_size, len(items_list))
for i in range(start, end):
results.append({"index": i, "value": items_list[i], "status": "processed"})
self.state.results = results
self.state.processed = end
self.emit(event.BatchProcessed(start=start, end=end, count=end - start))
print(f"Processed items {start}-{end - 1} ({end - start} items)")
if end >= len(items_list):
self.state.processing = False
self.emit(event.BatchComplete(total=len(items_list)))
print(f"All {len(items_list)} items processed")
@query
def get_progress(self) -> dict:
total = len(self.state.items)
return {
"total": total,
"processed": self.state.processed,
"remaining": total - self.state.processed,
"processing": self.state.processing,
}
The while loop processes batch_size items per block. Each await sleep(blocks=1) advances to the next block, where the next chunk is processed. The processing flag enables external cancellation -- if another transaction sets self.state.processing = False during a sleep, the loop breaks on the next iteration.
Async Composition: Reusable Helpers
Async helpers with sleep can be composed. The caller suspends until the helper completes, including any sleeps inside it:
from panda import contract, call, query, event, sleep
@contract
class DataPipeline:
class State:
raw_data: list = []
validated_data: list = []
stage1_result: int = 0
final_result: int = 0
status: str = "idle"
_temp: int = 0
@call
def set_data(self, ctx, data: list):
self.state.raw_data = data
self.state.status = "data_loaded"
# Sync helper: plain def = same block, no await
def _validate(self, data: list):
cleaned = [x for x in data if isinstance(x, (int, float)) and x >= 0]
if len(cleaned) == 0:
raise ValueError("No valid data points after filtering")
return cleaned
# Async helper WITH sleep
async def _process_stage1(self, ctx, data: list):
self.state.stage1_result = sum(data)
self.state.status = "stage1_complete"
self.state.emit(event.Stage1Done(total=self.state.stage1_result))
await sleep(blocks=10)
return self.state.stage1_result
# Async helper WITH sleep
async def _process_stage2(self, ctx, stage1_val: int, multiplier: int):
self.state._temp = stage1_val * multiplier
self.state.status = "stage2_processing"
await sleep(blocks=5)
self.state.final_result = self.state._temp + 1
self.state.status = "complete"
return self.state.final_result
@call
async def run_pipeline(self, ctx, multiplier: int = 2):
self.state.status = "running"
# Step 1: validate (sync def -- runs in same block)
cleaned = self._validate(self.state.raw_data)
self.state.validated_data = cleaned
# Step 2: stage1 (async helper, suspends for 10 blocks)
s1 = await self._process_stage1(ctx, cleaned)
# Step 3: stage2 (async helper, suspends for 5 blocks)
result = await self._process_stage2(ctx, s1, multiplier)
self.state.emit(event.PipelineComplete(result=result))
With data=[1,2,3,-1,4] and multiplier=3: validate filters to [1,2,3,4], stage1 sums to 10 (suspends 10 blocks), stage2 computes 10*3+1=31 (suspends 5 blocks). Three phases total, 15 blocks of suspension.
Testing Async Contracts
The ContractTestRunner provides methods for testing async contracts:
from panda.testing import ContractTestRunner
from panda import timer as panda_timer
# Always reset timer state between tests
panda_timer._reset()
runner = ContractTestRunner()
addr = runner.deploy("async/timelock_withdrawal.py", sender="deployer")
# Phase 1: deposit and request withdrawal
runner.call(addr, "deposit", sender="alice", amount=1000)
runner.call(addr, "request_withdrawal", sender="alice", amount=500)
# State reflects Phase 1 (before sleep)
assert runner.query(addr, "get_balance", address="alice").return_value == 1000
# Advance blocks and fire the timer to resume Phase 2
runner.block_height += 100
runner.fire_pending_timers(addr, sender="alice")
# State reflects Phase 2 (after sleep)
assert runner.query(addr, "get_balance", address="alice").return_value == 500
Key testing methods
| Method | Description |
|---|---|
runner.fire_pending_timers(addr) | Fire all pending timers for a contract, resuming suspended coroutines |
runner.block_height += N | Advance the block height (timers fire when enough blocks have passed) |
runner.get_hibernation_snapshot(addr) | Inspect the suspended coroutine's frame chain (method, line, locals) |
panda_timer._reset() | Clear all timers between test cases |
panda_timer.get_pending() | List all pending timers (method name, delay, contract address) |
Inspecting suspended state
While a coroutine is suspended, you can inspect the hibernation snapshot:
runner.call(addr, "run_with_locals", sender="alice", x=9)
# Get the snapshot of the suspended coroutine
snapshot = runner.get_hibernation_snapshot(addr)
# Each frame in the chain has method name, line number, and locals
frame = snapshot[0]
assert frame["method"] == "run_with_locals"
assert frame["locals"]["x"] == 9
assert frame["locals"]["multiplier"] == 5
Gas Escrow
For contracts with timer chains (multiple sequential await sleep() calls), gas must be escrowed upfront to pay for all phases:
# Escrow gas for all phases of a multi-sleep contract
result = runner.call(
addr, "count_up", sender="alice",
gas_deposit=2_000_000, # covers all 3 phases
)
# Each fire_pending_timers draws from the escrow pool
runner.fire_pending_timers(addr) # phase 2
runner.fire_pending_timers(addr) # phase 3
Key properties:
gas_deposit=0is backward compatible (works like before)- Different contracts have independent escrow pools
- Deposit amount doesn't affect execution results (deterministic)
CallResultincludesgas_deposit_remainingandgas_deposit_refundfields
Hibernation Mode
Contracts can opt into hibernate mode for checkpoint-resume semantics where local variables are serialized into on-chain state at each suspension point:
@contract
class HibernatePipeline:
class Meta:
async_mode = "hibernate"
class State:
raw_data: list = []
stage1_result: int = 0
final_result: int = 0
status: str = "idle"
@call
async def run_pipeline(self, ctx, multiplier: int = 2):
self.state.status = "running"
cleaned = self._validate(self.state.raw_data)
self.state.stage1_result = sum(cleaned)
self.state.status = "stage1_complete"
await sleep(blocks=10) # checkpoint: locals serialized
# After resume, local variable 'multiplier' is restored from checkpoint
self.state.final_result = self.state.stage1_result * multiplier + 1
self.state.status = "complete"
With Meta.async_mode = "hibernate":
- Local variables (including function parameters) are captured at each
awaitpoint - State + locals are serialized to on-chain storage
- On resume, the frame is reconstructed with the exact same locals
ctx.senderis preserved across hibernation
Block-Based Time
Panda uses block height as its time primitive. The ctx object provides:
ctx.block_height— current block number (updated on each resume)ctx.block_time— block timestamp (deterministic, set by block proposer)
Converting Between Blocks and Time
On Panda EVM (Geth fork), blocks are produced approximately every 3 seconds:
| Duration | Approximate Blocks |
|---|---|
| 1 minute | ~20 blocks |
| 1 hour | ~1,200 blocks |
| 1 day | ~28,800 blocks |
| 1 week | ~201,600 blocks |
Try It
- Open the Playground and select the Vesting Schedule template
- See the Cross-Chain Messaging guide for
await chain.call()patterns - Read the SDK Reference for the full API