12. Transactions

Transactions ensure data integrity by grouping multiple operations into a single atomic unit.

12.1. Basic Transaction

Execute multiple operations in a single transaction:

from pydantic import BaseModel
from wpostgresql import WPostgreSQL

db_config = {
    "dbname": "wpostgresql",
    "user": "postgres",
    "password": "postgres",
    "host": "localhost",
    "port": 5432,
}

class Person(BaseModel):
    id: int
    name: str
    balance: float

db = WPostgreSQL(Person, db_config)

# Setup initial data
db.insert(Person(id=1, name="Alice", balance=1000))
db.insert(Person(id=2, name="Bob", balance=500))

# Execute a transaction: transfer 100 from Alice to Bob
try:
    result = db.execute_transaction([
        ("UPDATE person SET balance = balance - 100 WHERE id = 1", None),
        ("UPDATE person SET balance = balance + 100 WHERE id = 2", None),
    ])
    print("Transaction successful")
except Exception as e:
    print(f"Transaction failed: {e}")

# Verify the transfer
alice = db.get_by_field(id=1)[0]
bob = db.get_by_field(id=2)[0]
print(f"Alice balance: {alice.balance}")
print(f"Bob balance: {bob.balance}")

Output:

Transaction successful
Alice balance: 900.0
Bob balance: 600.0

12.2. Transaction Context Manager

Use with_transaction() for more complex operations:

from wpostgresql import WPostgreSQL, Transaction

class Account(BaseModel):
    id: int
    name: str
    balance: float

def transfer_funds(db: WPostgreSQL, from_id: int, to_id: int, amount: float):
    """Transfer funds between accounts using a transaction context manager."""
    def execute_transfer(txn: Transaction):
        # Deduct from source
        txn.execute(
            "UPDATE account SET balance = balance - %s WHERE id = %s",
            (amount, from_id)
        )
        # Add to destination
        txn.execute(
            "UPDATE account SET balance = balance + %s WHERE id = %s",
            (amount, to_id)
        )
        # Log the transaction
        txn.execute(
            "INSERT INTO transaction_log (from_id, to_id, amount) VALUES (%s, %s, %s)",
            (from_id, to_id, amount)
        )

    return db.with_transaction(execute_transfer)

# Usage
db = WPostgreSQL(Account, db_config)
db.insert(Account(id=1, name="Checking", balance=1000))
db.insert(Account(id=2, name="Savings", balance=500))

transfer_funds(db, from_id=1, to_id=2, amount=250)

print("Transfer complete")
print(f"Checking: {db.get_by_field(id=1)[0].balance}")
print(f"Savings: {db.get_by_field(id=2)[0].balance}")

Output:

Transfer complete
Checking: 750.0
Savings: 750.0

12.3. Transaction with Multiple Inserts

Insert multiple related records atomically:

from pydantic import BaseModel
from wpostgresql import WPostgreSQL

class Order(BaseModel):
    id: int
    customer_id: int
    total: float

class OrderItem(BaseModel):
    id: int
    order_id: int
    product_name: str
    quantity: int
    price: float

def create_order_with_items(db: WPostgreSQL, order: Order, items: list[OrderItem]):
    """Create an order with multiple items in a transaction."""
    operations = [
        ("INSERT INTO order_table (id, customer_id, total) VALUES (%s, %s, %s)",
         (order.id, order.customer_id, order.total))
    ]

    for item in items:
        operations.append(
            ("INSERT INTO order_item (id, order_id, product_name, quantity, price) VALUES (%s, %s, %s, %s, %s)",
             (item.id, item.order_id, item.product_name, item.quantity, item.price))
        )

    return db.execute_transaction(operations)

# Usage
db = WPostgreSQL(Order, db_config)

order = Order(id=1, customer_id=100, total=150.00)
items = [
    OrderItem(id=1, order_id=1, product_name="Widget A", quantity=2, price=25.00),
    OrderItem(id=2, order_id=1, product_name="Widget B", quantity=1, price=100.00),
]

create_order_with_items(db, order, items)
print("Order created with items")

12.4. Async Transactions

import asyncio
from pydantic import BaseModel
from wpostgresql import WPostgreSQL

class Account(BaseModel):
    id: int
    name: str
    balance: float

async def async_transfer(db: WPostgreSQL, from_id: int, to_id: int, amount: float):
    """Transfer funds asynchronously."""
    operations = [
        ("UPDATE account SET balance = balance - %s WHERE id = %s", (amount, from_id)),
        ("UPDATE account SET balance = balance + %s WHERE id = %s", (amount, to_id)),
    ]

    await db.execute_transaction_async(operations)

async def main():
    db = WPostgreSQL(Account, db_config)

    db.insert(Account(id=1, name="Checking", balance=1000))
    db.insert(Account(id=2, name="Savings", balance=500))

    await async_transfer(db, from_id=1, to_id=2, amount=300)

    print(f"Checking: {db.get_by_field(id=1)[0].balance}")
    print(f"Savings: {db.get_by_field(id=2)[0].balance}")

asyncio.run(main())

Output:

Checking: 700.0
Savings: 800.0

12.5. Transaction with Rollback

If any operation fails, all changes are automatically rolled back:

from pydantic import BaseModel
from wpostgresql import WPostgreSQL

class Person(BaseModel):
    id: int
    name: str
    age: int

db = WPostgreSQL(Person, db_config)

# Initial data
db.insert(Person(id=1, name="Test", age=25))

try:
    # This transaction will fail on the second operation
    operations = [
        ("UPDATE person SET age = 30 WHERE id = 1", ()),
        ("INSERT INTO nonexistent_table VALUES (1)", ()),  # This will fail
    ]
    db.execute_transaction(operations)
except Exception as e:
    print(f"Transaction rolled back: {e}")

# Verify that the first update was rolled back
person = db.get_by_field(id=1)[0]
print(f"Person age after rollback: {person.age}")

Output:

Transaction rolled back: ...
Person age after rollback: 25