Async Contracts

Panda supports asynchronous smart contracts using Python's native async/await syntax. Async contracts can suspend execution at block boundaries and resume automatically after a specified number of blocks — enabling timelocks, recurring jobs, multi-phase workflows, and keeper-free automation.

How It Works

An async def method with await sleep(blocks=N) splits into phases:

  1. Phase 1 executes up to the await and commits state
  2. The harness schedules a timer for N blocks in the future
  3. Phase 2 resumes automatically when the timer fires, with full access to the contract's current state
from panda import contract, call, query, event

@contract
class TimelockWithdrawal:
    class State:
        balances: dict = {}
        pending_withdrawals: dict = {}
        nonces: dict = {}

    @call
    def deposit(self, ctx, amount: int):
        if amount <= 0:
            raise ValueError("amount must be positive")
        bal = dict(self.state.balances)
        bal[ctx.sender] = bal.get(ctx.sender, 0) + amount
        self.state.balances = bal
        self.emit(event.Deposited(sender=ctx.sender, amount=amount))

    @call
    async def request_withdrawal(self, ctx, amount: int):
        """Request a timelocked withdrawal. Executes after 100 blocks."""
        if amount <= 0 or amount > dict(self.state.balances).get(ctx.sender, 0):
            raise ValueError("invalid withdrawal amount")
        nonces = dict(self.state.nonces)
        nonce = nonces.get(ctx.sender, 0) + 1
        nonces[ctx.sender] = nonce
        self.state.nonces = nonces

        pending = dict(self.state.pending_withdrawals)
        pending[f"{ctx.sender}:{nonce}"] = amount
        self.state.pending_withdrawals = pending
        self.emit(event.WithdrawalRequested(
            sender=ctx.sender, amount=amount, nonce=nonce,
        ))

        # --- Suspend here. Resume after 100 blocks. ---
        await sleep(blocks=100)

        # Phase 2: execute after timelock
        key = f"{ctx.sender}:{nonce}"
        pending = dict(self.state.pending_withdrawals)
        if key not in pending or pending[key] != amount:
            return  # cancelled or stale
        bal = dict(self.state.balances)
        if bal.get(ctx.sender, 0) < amount:
            return  # insufficient balance
        bal[ctx.sender] = bal[ctx.sender] - amount
        self.state.balances = bal
        del pending[key]
        self.state.pending_withdrawals = pending
        self.emit(event.WithdrawalExecuted(
            to=ctx.sender, amount=amount, nonce=nonce,
        ))

    @query
    def get_balance(self, address: str) -> int:
        return self.state.balances.get(address, 0)

This is a real, tested contract from the Panda repo (contracts/async/timelock_withdrawal.py). The await sleep(blocks=100) line is the boundary — everything above runs immediately, everything below runs 100 blocks later.

await sleep(blocks=N)

The sleep function is the core async primitive. It creates a timer that resumes the coroutine after N blocks:

# Relative: resume N blocks from now
await sleep(blocks=100)

# Absolute: resume at a specific block height
await sleep(until_block=50000)

sleep is a built-in — no import needed (though from panda import sleep also works). Under the hood, the harness converts await sleep() into a timer registration and CPS (continuation-passing style) transformation.

Block Boundaries: def vs async def

This is the most important concept in async contracts. The type of helper function determines whether a call creates a block boundary:

def helper — Same block, no boundary

A plain def helper runs synchronously in the same block as the caller. No state is committed between the caller and the helper.

@contract
class PlainDefHelper:
    class State:
        value: int = 0
        block_at_start: int = 0
        block_after_helper: int = 0

    def _double(self, x: int):
        return x * 2

    @call
    def set_doubled(self, ctx, x: int):
        self.state.block_at_start = ctx.block_height
        result = self._double(x)
        self.state.block_after_helper = ctx.block_height
        self.state.value = result

After calling set_doubled(x=21), block_at_start == block_after_helper — the helper ran in the same block.

async def helper — Block boundary on await

An async def helper creates a block boundary every time it's awaited. The block height advances by 1.

@contract
class AsyncDefYields:
    class State:
        value: int = 0
        block_at_start: int = 0
        block_after_helper: int = 0

    async def _double(self, ctx, x: int):
        return x * 2

    @call
    async def set_doubled(self, ctx, x: int):
        self.state.block_at_start = ctx.block_height
        result = await self._double(ctx, x)
        self.state.block_after_helper = ctx.block_height
        self.state.value = result

After calling set_doubled(x=21), block_after_helper == block_at_start + 1 — the await created a block boundary.

Mixing sync and async helpers

You can freely mix both in the same contract. Sync helpers don't advance the block; async helpers do:

@contract
class MixedHelpers:
    class State:
        value: int = 0
        blocks_seen: list = []

    def _sync_validate(self, x: int):
        if x < 0:
            raise ValueError("negative not allowed")
        return x

    async def _async_transform(self, ctx, x: int):
        return x * 10

    @call
    async def process(self, ctx, x: int):
        self.state.blocks_seen = [ctx.block_height]        # block B
        validated = self._sync_validate(x)
        self.state.blocks_seen = self.state.blocks_seen + [ctx.block_height]  # still block B
        transformed = await self._async_transform(ctx, validated)
        self.state.blocks_seen = self.state.blocks_seen + [ctx.block_height]  # block B+1
        self.state.value = transformed

Calling process(x=5) produces blocks_seen = [B, B, B+1] — the sync helper stays in block B, the async helper advances to B+1.

Sequential Awaits

Multiple await calls in sequence create a pipeline where each step runs in a successive block:

@contract
class MultiAwait:
    class State:
        a: int = 0
        b: int = 0
        c: int = 0
        blocks_seen: list = []

    async def _step_a(self, ctx, x: int):
        return x + 1

    async def _step_b(self, ctx, x: int):
        return x * 2

    async def _step_c(self, ctx, x: int):
        return x - 3

    @call
    async def pipeline(self, ctx, x: int):
        self.state.blocks_seen = [ctx.block_height]     # block B
        self.state.a = await self._step_a(ctx, x)       # block B+1
        self.state.blocks_seen = self.state.blocks_seen + [ctx.block_height]
        self.state.b = await self._step_b(ctx, self.state.a)  # block B+2
        self.state.blocks_seen = self.state.blocks_seen + [ctx.block_height]
        self.state.c = await self._step_c(ctx, self.state.b)  # block B+3
        self.state.blocks_seen = self.state.blocks_seen + [ctx.block_height]

With x=5: step_a returns 6, step_b returns 12, step_c returns 9. The blocks_seen list shows 4 distinct block heights, each one higher than the last.

Multiple await sleep() — Multi-Phase Workflows

A single method can have multiple await sleep() calls, creating a multi-phase workflow. This is the pattern used for governance timelocks:

from panda import contract, constructor, call, query, event

@contract
class GovernanceTimelock:
    class State:
        proposals: dict = {}
        proposal_count: int = 0
        votes: dict = {}
        quorum: int = 100
        timelock_blocks: int = 200

    @constructor
    def __init__(self, ctx, quorum: int = 100, timelock_blocks: int = 200):
        self.state.quorum = quorum
        self.state.timelock_blocks = timelock_blocks

    @call
    async def propose(self, ctx, description: str):
        pid = self.state.proposal_count + 1
        self.state.proposal_count = pid
        props = dict(self.state.proposals)
        props[str(pid)] = {
            "description": description,
            "proposer": ctx.sender,
            "status": "voting",
            "votes_for": 0,
            "votes_against": 0,
        }
        self.state.proposals = props
        self.state.votes = dict(self.state.votes) | {str(pid): {}}
        self.emit(event.ProposalCreated(proposal_id=pid, proposer=ctx.sender))

        # Phase 1: Voting period (500 blocks)
        await sleep(blocks=500)

        # Phase 2: Close voting
        key = str(pid)
        props = dict(self.state.proposals)
        prop = props.get(key)
        if not prop or prop["status"] != "voting":
            return
        total = prop["votes_for"] + prop["votes_against"]
        passed = total >= self.state.quorum and prop["votes_for"] > prop["votes_against"]
        prop["status"] = "approved" if passed else "failed"
        props[key] = prop
        self.state.proposals = props

        if not passed:
            return

        # Phase 3: Timelock delay (200 blocks)
        await sleep(blocks=200)

        # Phase 4: Execute
        props = dict(self.state.proposals)
        prop = props.get(key)
        if not prop or prop["status"] != "approved":
            return
        prop["status"] = "executed"
        props[key] = prop
        self.state.proposals = props
        self.emit(event.ProposalExecuted(proposal_id=pid))

    @call
    def vote(self, ctx, proposal_id: int, support: bool):
        key, props = str(proposal_id), dict(self.state.proposals)
        prop = props.get(key)
        if not prop or prop["status"] != "voting":
            raise ValueError("proposal not in voting phase")
        votes, pv = dict(self.state.votes), dict(dict(self.state.votes).get(key, {}))
        if ctx.sender in pv:
            raise ValueError("already voted")
        pv[ctx.sender] = support
        votes[key] = pv
        self.state.votes = votes
        prop["votes_for" if support else "votes_against"] += 1
        props[key] = prop
        self.state.proposals = props

    @query
    def get_proposal(self, proposal_id: int) -> dict:
        return self.state.proposals.get(str(proposal_id), {})

The lifecycle: propose() creates a proposal, waits 500 blocks for voting, tallies votes, then waits another 200 blocks as a timelock before execution. Voters call vote() during the 500-block window — it's a sync method, so it executes immediately.

Recurring Patterns: while + await sleep()

For continuous, recurring operations, use a while loop with await sleep(). Each loop iteration runs in a future block:

Heartbeat Monitor

from panda import contract, constructor, call, query, event

@contract
class HeartbeatMonitor:
    class State:
        monitored_address: str = ""
        interval: int = 50
        last_seen: int = 0
        is_alive: bool = True
        missed_count: int = 0
        max_missed: int = 3
        monitoring: bool = False

    @constructor
    def __init__(self, ctx, monitored_address: str, interval: int = 50, max_missed: int = 3):
        self.state.monitored_address = monitored_address
        self.state.interval = interval
        self.state.max_missed = max_missed
        self.state.last_seen = ctx.block_height

    @call
    async def start_monitoring(self, ctx):
        if self.state.monitoring:
            raise ValueError("already monitoring")
        self.state.monitoring = True

        while self.state.monitoring and self.state.is_alive:
            await sleep(blocks=self.state.interval)

            if not self.state.monitoring or not self.state.is_alive:
                break

            if ctx.block_height - self.state.last_seen > self.state.interval:
                self.state.missed_count = self.state.missed_count + 1
                self.emit(event.HealthWarning(
                    address=self.state.monitored_address,
                    missed=self.state.missed_count,
                ))
                if self.state.missed_count >= self.state.max_missed:
                    self.state.is_alive = False
                    self.emit(event.NodeDown(address=self.state.monitored_address))
            else:
                self.state.missed_count = 0

    @call
    def stop_monitoring(self, ctx):
        if not self.state.monitoring:
            raise ValueError("not monitoring")
        self.state.monitoring = False

    @call
    def heartbeat(self, ctx):
        if ctx.sender != self.state.monitored_address:
            raise ValueError("only monitored address can send heartbeat")
        self.state.last_seen = ctx.block_height
        self.state.missed_count = 0
        self.state.is_alive = True

    @query
    def status(self) -> dict:
        return {
            "address": self.state.monitored_address,
            "alive": self.state.is_alive,
            "missed": self.state.missed_count,
            "last_seen": self.state.last_seen,
        }

The while loop checks health every interval blocks. If the monitored node misses 3 consecutive heartbeats, it's declared down. The stop_monitoring() method is sync — calling it sets monitoring = False, and the next loop iteration breaks.

Auto-Compound DeFi

from panda import contract, constructor, call, query, event

@contract
class AutoCompound:
    class State:
        staked: dict = {}
        rewards: dict = {}
        compound_rate: float = 0.05
        total_staked: int = 0
        is_compounding: bool = False

    @constructor
    def __init__(self, ctx, compound_rate: float = 0.05):
        self.state.compound_rate = compound_rate

    @call
    def stake(self, ctx, amount: int):
        if amount <= 0:
            raise ValueError("amount must be positive")
        staked = dict(self.state.staked)
        staked[ctx.sender] = staked.get(ctx.sender, 0) + amount
        self.state.staked = staked
        self.state.total_staked = self.state.total_staked + amount

    @call
    async def start_compounding(self, ctx):
        if self.state.is_compounding:
            raise ValueError("compounding already active")
        self.state.is_compounding = True

        while self.state.is_compounding:
            await sleep(blocks=100)

            if not self.state.is_compounding:
                break

            # Compound all stakers
            staked = dict(self.state.staked)
            rewards = dict(self.state.rewards)
            total_added = 0
            for addr, principal in staked.items():
                reward = int(principal * self.state.compound_rate)
                if reward > 0:
                    staked[addr] = principal + reward
                    rewards[addr] = rewards.get(addr, 0) + reward
                    total_added += reward
            self.state.staked = staked
            self.state.rewards = rewards
            self.state.total_staked = self.state.total_staked + total_added
            self.emit(event.CompoundExecuted(total_added=total_added))

    @call
    def stop_compounding(self, ctx):
        if not self.state.is_compounding:
            raise ValueError("compounding not active")
        self.state.is_compounding = False

    @query
    def get_position(self, address: str) -> dict:
        return {
            "staked": self.state.staked.get(address, 0),
            "rewards": self.state.rewards.get(address, 0),
        }

Every 100 blocks, the compounder multiplies each staker's principal by 5%. With 1000 staked, after 3 compounds: 1000 -> 1050 -> 1102 -> 1157.

for Loop Vesting

The for loop pattern works naturally with await sleep() for fixed-count periodic operations:

from panda import contract, constructor, call, query, event

@contract
class VestingSchedule:
    class State:
        beneficiary: str = ""
        total_amount: int = 0
        released: int = 0
        vesting_periods: int = 12
        period_blocks: int = 100
        current_period: int = 0
        owner: str = ""
        started: bool = False

    @constructor
    def __init__(self, ctx, beneficiary: str, total_amount: int,
                 vesting_periods: int = 12, period_blocks: int = 100):
        if total_amount <= 0:
            raise ValueError("total_amount must be positive")
        self.state.beneficiary = beneficiary
        self.state.total_amount = total_amount
        self.state.vesting_periods = vesting_periods
        self.state.period_blocks = period_blocks
        self.state.owner = ctx.sender

    @call
    async def start_vesting(self, ctx):
        if self.state.started:
            raise ValueError("vesting already started")
        if ctx.sender != self.state.owner:
            raise ValueError("only owner can start vesting")
        self.state.started = True

        for i in range(self.state.vesting_periods):
            await sleep(blocks=self.state.period_blocks)

            per_period = self.state.total_amount // self.state.vesting_periods
            self.state.current_period = self.state.current_period + 1
            remaining = self.state.total_amount - self.state.released

            # Last period gets any remainder from integer division
            if self.state.current_period == self.state.vesting_periods:
                amount = remaining
            else:
                amount = min(per_period, remaining)

            self.state.released = self.state.released + amount
            self.emit(event.VestingReleased(
                beneficiary=self.state.beneficiary,
                amount=amount,
                period=self.state.current_period,
            ))

    @query
    def vested_amount(self) -> dict:
        return {
            "beneficiary": self.state.beneficiary,
            "total": self.state.total_amount,
            "released": self.state.released,
            "period": self.state.current_period,
            "periods": self.state.vesting_periods,
            "started": self.state.started,
        }

12 periods, each 100 blocks apart. The loop suspends 12 times, releasing tokens each time it resumes.

Scheduled Jobs with Cancellation

The scheduled job pattern demonstrates user-parametric delays and mid-flight cancellation:

from panda import contract, call, query, event

VALID_JOB_TYPES = ["compute", "notify", "cleanup"]

@contract
class ScheduledJob:
    class State:
        jobs: dict = {}
        job_count: int = 0
        results: dict = {}

    @call
    async def schedule_job(self, ctx, job_type: str, delay_blocks: int, params: dict = None):
        if job_type not in VALID_JOB_TYPES:
            raise ValueError(f"invalid job_type: must be one of {VALID_JOB_TYPES}")
        if delay_blocks < 1:
            raise ValueError("delay_blocks must be >= 1")

        jid = self.state.job_count + 1
        self.state.job_count = jid
        jobs = dict(self.state.jobs)
        jobs[str(jid)] = {
            "job_type": job_type,
            "params": params or {},
            "creator": ctx.sender,
            "status": "pending",
        }
        self.state.jobs = jobs
        self.emit(event.JobScheduled(job_id=jid, job_type=job_type, delay=delay_blocks))

        await sleep(blocks=delay_blocks)

        # Check if cancelled during sleep
        key = str(jid)
        jobs = dict(self.state.jobs)
        job = jobs.get(key)
        if not job or job["status"] != "pending":
            return  # cancelled or removed

        # Execute job
        if job_type == "compute":
            result = {"output": sum((params or {}).get("values", [])), "op": "sum"}
        elif job_type == "notify":
            result = {"message": (params or {}).get("message", ""), "sent": True}
        else:
            result = {"keys_removed": (params or {}).get("keys", []), "cleaned": True}

        results = dict(self.state.results)
        results[key] = result
        self.state.results = results
        job["status"] = "completed"
        jobs[key] = job
        self.state.jobs = jobs
        self.emit(event.JobCompleted(job_id=jid, job_type=job_type))

    @call
    def cancel_job(self, ctx, job_id: int):
        key, jobs = str(job_id), dict(self.state.jobs)
        job = jobs.get(key)
        if not job:
            raise ValueError("job not found")
        if job["creator"] != ctx.sender:
            raise ValueError("only creator can cancel")
        if job["status"] != "pending":
            raise ValueError(f"job is {job['status']}, cannot cancel")
        job["status"] = "cancelled"
        jobs[key] = job
        self.state.jobs = jobs

    @query
    def get_job(self, job_id: int) -> dict:
        key = str(job_id)
        return {
            "job": self.state.jobs.get(key, {}),
            "result": self.state.results.get(key, {}),
        }

The key pattern: after await sleep(), the continuation re-reads state to check if the job was cancelled. If cancel_job() was called during the sleep window, the continuation sees status == "cancelled" and skips execution.

Auction Finalizer

The AuctionFinalizer contract uses await sleep(until_block=...) for absolute-block deadlines. After activation, the auction auto-settles when the end block arrives:

<!-- Source: contracts/async/auction_finalizer.py -->
from panda import contract, constructor, call, query, event


@contract
class AuctionFinalizer:
    """Timed auction with automatic deadline-based settlement."""

    class State:
        highest_bid: int = 0
        highest_bidder: str = ""
        end_block: int = 0
        settled: bool = False
        active: bool = False

    @constructor
    def __init__(self, ctx, duration_blocks: int = 1000):
        self.state.end_block = ctx.block_height + duration_blocks
        print(f"Auction created by {ctx.sender}, ends at block {self.state.end_block}")

    @call
    async def activate(self, ctx):
        """Activate the auction. Auto-finalizes at end_block."""
        if self.state.active:
            raise ValueError("auction already activated")
        if self.state.settled:
            raise ValueError("auction already settled")
        self.state.active = True
        print(f"Auction activated by {ctx.sender}")

        await sleep(until_block=self.state.end_block)

        # Auto-finalize after deadline
        if self.state.settled:
            return  # Already settled (shouldn't happen, but defensive)
        self.state.settled = True
        winner = self.state.highest_bidder
        amount = self.state.highest_bid
        status = "sold" if winner else "no_bids"
        self.emit(event.AuctionSettled(winner=winner or "", amount=amount, status=status))
        print(f"Auction settled: winner={winner or 'none'} amount={amount}")

    @call
    def bid(self, ctx, amount: int):
        if self.state.settled:
            raise ValueError("auction already settled")
        if ctx.block_height >= self.state.end_block:
            raise ValueError("auction has ended")
        if amount <= self.state.highest_bid:
            raise ValueError(f"bid must exceed current highest {self.state.highest_bid}")
        self.state.highest_bid = amount
        self.state.highest_bidder = ctx.sender
        self.emit(event.BidPlaced(bidder=ctx.sender, amount=amount))
        print(f"Bid: {ctx.sender} bid {amount}")

    @query
    def get_auction(self) -> dict:
        return {
            "highest_bid": self.state.highest_bid,
            "highest_bidder": self.state.highest_bidder,
            "end_block": self.state.end_block,
            "settled": self.state.settled,
        }

Key pattern: await sleep(until_block=N) suspends until absolute block N, unlike await sleep(blocks=N) which is relative. The constructor computes end_block = ctx.block_height + duration_blocks, and the activation method sleeps until that block. During the sleep window, users call bid() (a sync method) to place bids. When the block arrives, the continuation auto-settles.

Batch Processor

The BatchProcessor uses a while loop with await sleep(blocks=1) to process items one chunk per block:

<!-- Source: contracts/async/batch_processor.py -->
from panda import contract, call, query, event


@contract
class BatchProcessor:
    """Process large item lists across multiple blocks."""

    class State:
        items: list = []
        batch_size: int = 10
        processed: int = 0
        results: list = []
        processing: bool = False

    @call
    async def submit_batch(self, ctx, items: list, batch_size: int = 10):
        if self.state.processing:
            raise ValueError("batch already in progress")
        if not items:
            raise ValueError("items list cannot be empty")
        self.state.items = list(items)
        self.state.batch_size = batch_size
        self.state.processed = 0
        self.state.results = []
        self.state.processing = True
        self.emit(event.BatchSubmitted(submitter=ctx.sender, total=len(items)))
        print(f"Batch submitted: {len(items)} items, batch_size={batch_size}")

        while self.state.processed < len(self.state.items):
            await sleep(blocks=1)

            if not self.state.processing:
                break

            # Process one chunk
            items_list = list(self.state.items)
            results = list(self.state.results)
            start = self.state.processed
            end = min(start + self.state.batch_size, len(items_list))
            for i in range(start, end):
                results.append({"index": i, "value": items_list[i], "status": "processed"})
            self.state.results = results
            self.state.processed = end
            self.emit(event.BatchProcessed(start=start, end=end, count=end - start))
            print(f"Processed items {start}-{end - 1} ({end - start} items)")

            if end >= len(items_list):
                self.state.processing = False
                self.emit(event.BatchComplete(total=len(items_list)))
                print(f"All {len(items_list)} items processed")

    @query
    def get_progress(self) -> dict:
        total = len(self.state.items)
        return {
            "total": total,
            "processed": self.state.processed,
            "remaining": total - self.state.processed,
            "processing": self.state.processing,
        }

The while loop processes batch_size items per block. Each await sleep(blocks=1) advances to the next block, where the next chunk is processed. The processing flag enables external cancellation -- if another transaction sets self.state.processing = False during a sleep, the loop breaks on the next iteration.

Async Composition: Reusable Helpers

Async helpers with sleep can be composed. The caller suspends until the helper completes, including any sleeps inside it:

from panda import contract, call, query, event, sleep

@contract
class DataPipeline:
    class State:
        raw_data: list = []
        validated_data: list = []
        stage1_result: int = 0
        final_result: int = 0
        status: str = "idle"
        _temp: int = 0

    @call
    def set_data(self, ctx, data: list):
        self.state.raw_data = data
        self.state.status = "data_loaded"

    # Sync helper: plain def = same block, no await
    def _validate(self, data: list):
        cleaned = [x for x in data if isinstance(x, (int, float)) and x >= 0]
        if len(cleaned) == 0:
            raise ValueError("No valid data points after filtering")
        return cleaned

    # Async helper WITH sleep
    async def _process_stage1(self, ctx, data: list):
        self.state.stage1_result = sum(data)
        self.state.status = "stage1_complete"
        self.state.emit(event.Stage1Done(total=self.state.stage1_result))
        await sleep(blocks=10)
        return self.state.stage1_result

    # Async helper WITH sleep
    async def _process_stage2(self, ctx, stage1_val: int, multiplier: int):
        self.state._temp = stage1_val * multiplier
        self.state.status = "stage2_processing"
        await sleep(blocks=5)
        self.state.final_result = self.state._temp + 1
        self.state.status = "complete"
        return self.state.final_result

    @call
    async def run_pipeline(self, ctx, multiplier: int = 2):
        self.state.status = "running"

        # Step 1: validate (sync def -- runs in same block)
        cleaned = self._validate(self.state.raw_data)
        self.state.validated_data = cleaned

        # Step 2: stage1 (async helper, suspends for 10 blocks)
        s1 = await self._process_stage1(ctx, cleaned)

        # Step 3: stage2 (async helper, suspends for 5 blocks)
        result = await self._process_stage2(ctx, s1, multiplier)

        self.state.emit(event.PipelineComplete(result=result))

With data=[1,2,3,-1,4] and multiplier=3: validate filters to [1,2,3,4], stage1 sums to 10 (suspends 10 blocks), stage2 computes 10*3+1=31 (suspends 5 blocks). Three phases total, 15 blocks of suspension.

Testing Async Contracts

The ContractTestRunner provides methods for testing async contracts:

from panda.testing import ContractTestRunner
from panda import timer as panda_timer

# Always reset timer state between tests
panda_timer._reset()
runner = ContractTestRunner()
addr = runner.deploy("async/timelock_withdrawal.py", sender="deployer")

# Phase 1: deposit and request withdrawal
runner.call(addr, "deposit", sender="alice", amount=1000)
runner.call(addr, "request_withdrawal", sender="alice", amount=500)

# State reflects Phase 1 (before sleep)
assert runner.query(addr, "get_balance", address="alice").return_value == 1000

# Advance blocks and fire the timer to resume Phase 2
runner.block_height += 100
runner.fire_pending_timers(addr, sender="alice")

# State reflects Phase 2 (after sleep)
assert runner.query(addr, "get_balance", address="alice").return_value == 500

Key testing methods

MethodDescription
runner.fire_pending_timers(addr)Fire all pending timers for a contract, resuming suspended coroutines
runner.block_height += NAdvance the block height (timers fire when enough blocks have passed)
runner.get_hibernation_snapshot(addr)Inspect the suspended coroutine's frame chain (method, line, locals)
panda_timer._reset()Clear all timers between test cases
panda_timer.get_pending()List all pending timers (method name, delay, contract address)

Inspecting suspended state

While a coroutine is suspended, you can inspect the hibernation snapshot:

runner.call(addr, "run_with_locals", sender="alice", x=9)

# Get the snapshot of the suspended coroutine
snapshot = runner.get_hibernation_snapshot(addr)

# Each frame in the chain has method name, line number, and locals
frame = snapshot[0]
assert frame["method"] == "run_with_locals"
assert frame["locals"]["x"] == 9
assert frame["locals"]["multiplier"] == 5

Gas Escrow

For contracts with timer chains (multiple sequential await sleep() calls), gas must be escrowed upfront to pay for all phases:

# Escrow gas for all phases of a multi-sleep contract
result = runner.call(
    addr, "count_up", sender="alice",
    gas_deposit=2_000_000,  # covers all 3 phases
)

# Each fire_pending_timers draws from the escrow pool
runner.fire_pending_timers(addr)  # phase 2
runner.fire_pending_timers(addr)  # phase 3

Key properties:

  • gas_deposit=0 is backward compatible (works like before)
  • Different contracts have independent escrow pools
  • Deposit amount doesn't affect execution results (deterministic)
  • CallResult includes gas_deposit_remaining and gas_deposit_refund fields

Hibernation Mode

Contracts can opt into hibernate mode for checkpoint-resume semantics where local variables are serialized into on-chain state at each suspension point:

@contract
class HibernatePipeline:
    class Meta:
        async_mode = "hibernate"

    class State:
        raw_data: list = []
        stage1_result: int = 0
        final_result: int = 0
        status: str = "idle"

    @call
    async def run_pipeline(self, ctx, multiplier: int = 2):
        self.state.status = "running"
        cleaned = self._validate(self.state.raw_data)

        self.state.stage1_result = sum(cleaned)
        self.state.status = "stage1_complete"
        await sleep(blocks=10)  # checkpoint: locals serialized

        # After resume, local variable 'multiplier' is restored from checkpoint
        self.state.final_result = self.state.stage1_result * multiplier + 1
        self.state.status = "complete"

With Meta.async_mode = "hibernate":

  • Local variables (including function parameters) are captured at each await point
  • State + locals are serialized to on-chain storage
  • On resume, the frame is reconstructed with the exact same locals
  • ctx.sender is preserved across hibernation

Block-Based Time

Panda uses block height as its time primitive. The ctx object provides:

  • ctx.block_height — current block number (updated on each resume)
  • ctx.block_time — block timestamp (deterministic, set by block proposer)

Converting Between Blocks and Time

On Panda EVM (Geth fork), blocks are produced approximately every 3 seconds:

DurationApproximate Blocks
1 minute~20 blocks
1 hour~1,200 blocks
1 day~28,800 blocks
1 week~201,600 blocks

Try It