11. Async Operations

wpostgresql provides full async support for all database operations, enabling non-blocking I/O for high-performance applications.

11.1. Basic Async Operations

Initialize and perform async CRUD operations:

import asyncio
from pydantic import BaseModel
from wpostgresql import WPostgreSQL

db_config = {
    "dbname": "wpostgresql",
    "user": "postgres",
    "password": "postgres",
    "host": "localhost",
    "port": 5432,
}

class Person(BaseModel):
    id: int
    name: str
    age: int

async def main():
    db = WPostgreSQL(Person, db_config)

    # Insert
    await db.insert_async(Person(id=1, name="Alice", age=25))
    await db.insert_async(Person(id=2, name="Bob", age=30))

    # Read
    all_people = await db.get_all_async()
    print("All people:", all_people)

    # Filter
    alice = await db.get_by_field_async(name="Alice")
    print("Found Alice:", alice)

    # Update
    await db.update_async(1, Person(id=1, name="Alice Updated", age=26))

    # Delete
    await db.delete_async(2)

    remaining = await db.get_all_async()
    print("Remaining:", remaining)

asyncio.run(main())

Output:

All people: [Person(id=1, name='Alice', age=25), Person(id=2, name='Bob', age=30)]
Found Alice: [Person(id=1, name='Alice', age=25)]
Remaining: [Person(id=1, name='Alice Updated', age=26)]

11.2. Async Pagination

Retrieve paginated results asynchronously:

import asyncio
from pydantic import BaseModel
from wpostgresql import WPostgreSQL

class Person(BaseModel):
    id: int
    name: str
    age: int

async def main():
    db = WPostgreSQL(Person, db_config)

    # Get page 1 with 10 items per page
    page1 = await db.get_page_async(page=1, per_page=10)
    print("Page 1:", page1)

    # Get page 2
    page2 = await db.get_page_async(page=2, per_page=10)
    print("Page 2:", page2)

    # Get with limit and offset
    paginated = await db.get_paginated_async(limit=5, offset=10)
    print("Paginated:", paginated)

    # With ordering
    ordered = await db.get_paginated_async(
        limit=10,
        offset=0,
        order_by="name",
        order_desc=False
    )
    print("Ordered by name:", ordered)

asyncio.run(main())

11.3. Async Bulk Operations

Perform bulk operations asynchronously:

import asyncio
from pydantic import BaseModel
from wpostgresql import WPostgreSQL

class Person(BaseModel):
    id: int
    name: str
    age: int

async def main():
    db = WPostgreSQL(Person, db_config)

    # Bulk insert
    persons = [
        Person(id=i, name=f"Person {i}", age=20 + i)
        for i in range(1, 11)
    ]
    await db.insert_many_async(persons)
    print("Inserted 10 records")

    # Bulk update
    updates = [
        (Person(id=i, name=f"Updated Person {i}", age=30 + i), i)
        for i in range(1, 6)
    ]
    count = await db.update_many_async(updates)
    print(f"Updated {count} records")

    # Bulk delete
    deleted = await db.delete_many_async([6, 7, 8, 9, 10])
    print(f"Deleted {deleted} records")

    remaining = await db.get_all_async()
    print(f"Remaining: {len(remaining)} records")

asyncio.run(main())

11.4. Async Count

Count records asynchronously:

import asyncio
from pydantic import BaseModel
from wpostgresql import WPostgreSQL

class Person(BaseModel):
    id: int
    name: str
    age: int

async def main():
    db = WPostgreSQL(Person, db_config)

    total = await db.count_async()
    print(f"Total records: {total}")

asyncio.run(main())

11.5. Async Transactions

Execute async transactions:

import asyncio
from pydantic import BaseModel
from wpostgresql import WPostgreSQL

class Account(BaseModel):
    id: int
    name: str
    balance: float

async def main():
    db = WPostgreSQL(Account, db_config)

    # Setup initial data
    await db.insert_async(Account(id=1, name="Alice", balance=1000))
    await db.insert_async(Account(id=2, name="Bob", balance=500))

    # Execute transaction
    operations = [
        ("UPDATE account SET balance = balance - 100 WHERE id = 1", ()),
        ("UPDATE account SET balance = balance + 100 WHERE id = 2", ()),
    ]

    results = await db.execute_transaction_async(operations)
    print("Transaction completed")

    # Check final balances
    alice = await db.get_by_field_async(id=1)
    bob = await db.get_by_field_async(id=2)
    print(f"Alice: {alice[0].balance}, Bob: {bob[0].balance}")

asyncio.run(main())

11.6. FastAPI Integration

Use with FastAPI for async web applications:

from fastapi import FastAPI
from pydantic import BaseModel
from wpostgresql import WPostgreSQL
import asyncio

app = FastAPI()

db_config = {
    "dbname": "wpostgresql",
    "user": "postgres",
    "password": "postgres",
}

class Item(BaseModel):
    id: int
    name: str
    price: float

db = WPostgreSQL(Item, db_config)

@app.post("/items/")
async def create_item(item: Item):
    await db.insert_async(item)
    return item

@app.get("/items/")
async def get_items():
    return await db.get_all_async()

@app.get("/items/{item_id}")
async def get_item(item_id: int):
    items = await db.get_by_field_async(id=item_id)
    if not items:
        return {"error": "Item not found"}
    return items[0]

@app.put("/items/{item_id}")
async def update_item(item_id: int, item: Item):
    await db.update_async(item_id, item)
    return item

@app.delete("/items/{item_id}")
async def delete_item(item_id: int):
    await db.delete_async(item_id)
    return {"message": "Deleted"}