17. Advanced Features¶
17.1. SQL Injection Prevention¶
All identifiers are automatically validated to prevent SQL injection:
from pydantic import BaseModel
from wpostgresql import WPostgreSQL
from wpostgresql.exceptions import SQLInjectionError
db_config = {
"dbname": "wpostgresql",
"user": "postgres",
"password": "postgres",
"host": "localhost",
"port": 5432,
}
class Person(BaseModel):
id: int
name: str
age: int
db = WPostgreSQL(Person, db_config)
# This will be blocked
try:
db.get_by_field(**{"invalid; DROP TABLE users;--": "value"})
except SQLInjectionError as e:
print(f"Blocked SQL injection attempt: {e}")
# Valid field names work normally
valid_users = db.get_by_field(name="John")
17.2. Custom Field Handling with Pydantic¶
Leverage Pydantic’s validation features:
from pydantic import BaseModel, Field, field_validator
from typing import Optional
class User(BaseModel):
id: int
name: str = Field(max_length=100)
email: str = Field(pattern=r"^[\w\.-]+@[\w\.-]+\.\w+$")
age: Optional[int] = Field(ge=0, le=150)
is_active: bool = True
created_at: Optional[str] = None
@field_validator('name')
@classmethod
def name_must_be_capitalized(cls, v):
if not v[0].isupper():
raise ValueError('Name must start with capital letter')
return v
db = WPostgreSQL(User, db_config)
# This will validate before inserting
valid_user = User(id=1, name="John", email="john@example.com", age=25)
db.insert(valid_user)
# This will fail validation
try:
invalid_user = User(id=2, name="john", email="invalid", age=25)
except Exception as e:
print(f"Validation error: {e}")
17.3. Connection Pool Management¶
wpostgresql uses automatic connection pooling. Here’s how to manage it:
from pydantic import BaseModel
from wpostgresql import WPostgreSQL
from wpostgresql.core.connection import (
get_connection,
get_async_connection,
close_global_pools,
)
db_config = {
"dbname": "wpostgresql",
"user": "postgres",
"password": "postgres",
"host": "localhost",
}
class Person(BaseModel):
id: int
name: str
age: int
# Automatic pooling (default)
db = WPostgreSQL(Person, db_config)
# Use get_connection directly for raw SQL
with get_connection(db_config) as conn:
cursor = conn.cursor()
cursor.execute("SELECT * FROM person WHERE age > %s", (25,))
results = cursor.fetchall()
for row in results:
print(f" {row}")
# Clean up pools when done (e.g., at application shutdown)
close_global_pools()
print("Pools closed")
17.4. Connection Pooling - Advanced¶
Custom pool configuration for high-performance scenarios:
from wpostgresql import WPostgreSQL
from wpostgresql.core.connection import ConnectionManager, close_global_pools
from pydantic import BaseModel
db_config = {
"dbname": "wpostgresql",
"user": "postgres",
"password": "postgres",
"host": "localhost",
}
class Person(BaseModel):
id: int
name: str
age: int
# Create custom pool with specific settings
pool = ConnectionManager(
db_config,
min_connections=5,
max_connections=50
)
# Use pool directly
conn = pool.get_connection()
print(f"Got connection from pool: {conn}")
# Release back to pool
pool.release_connection(conn)
# Close pool when done
pool.close_all()
# Or close all global pools
close_global_pools()
17.5. Raw SQL Execution¶
Execute raw SQL when you need more control:
from pydantic import BaseModel
from wpostgresql import WPostgreSQL
from wpostgresql.core.connection import get_connection
db_config = {
"dbname": "wpostgresql",
"user": "postgres",
"password": "postgres",
"host": "localhost",
}
class Person(BaseModel):
id: int
name: str
age: int
db = WPostgreSQL(Person, db_config)
# Simple query
with get_connection(db_config) as conn:
with conn.cursor() as cursor:
cursor.execute("SELECT * FROM person")
rows = cursor.fetchall()
print("All records:")
for row in rows:
print(f" {row}")
# Query with parameters
with get_connection(db_config) as conn:
with conn.cursor() as cursor:
cursor.execute("SELECT * FROM person WHERE age > %s", (26,))
rows = cursor.fetchall()
print("\nPeople older than 26:")
for row in rows:
print(f" {row}")
# INSERT with RETURNING
with get_connection(db_config) as conn:
with conn.cursor() as cursor:
cursor.execute(
"INSERT INTO person (name, age) VALUES (%s, %s) RETURNING id, name",
("Charlie", 35)
)
row = cursor.fetchone()
print(f"\nInserted: id={row[0]}, name={row[1]}")
conn.commit()
# UPDATE with RETURNING
with get_connection(db_config) as conn:
with conn.cursor() as cursor:
cursor.execute(
"UPDATE person SET age = %s WHERE name = %s RETURNING id, name, age",
(36, "Charlie")
)
row = cursor.fetchone()
print(f"Updated: {row}")
conn.commit()
# DELETE with RETURNING
with get_connection(db_config) as conn:
with conn.cursor() as cursor:
cursor.execute("DELETE FROM person WHERE name = %s RETURNING id, name", ("Bob",))
row = cursor.fetchone()
print(f"Deleted: {row}")
conn.commit()
17.6. Logging¶
Enable detailed logging for debugging:
import logging
from pydantic import BaseModel
from wpostgresql import WPostgreSQL
# Configure logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger("wpostgresql")
logger.setLevel(logging.DEBUG)
# Also set psycopg logging
psycopg_logger = logging.getLogger("psycopg")
psycopg_logger.setLevel(logging.DEBUG)
class Person(BaseModel):
id: int
name: str
age: int
db = WPostgreSQL(Person, db_config)
# Operations will now be logged
db.insert(Person(id=1, name="Test", age=25))
people = db.get_all()
# Check logs for:
# - Connection pool creation
# - SQL queries executed
# - Transaction commits/rollbacks
17.7. Logging Configuration with Loguru¶
Use Loguru for more modern logging:
from loguru import logger
from pydantic import BaseModel
from wpostgresql import WPostgreSQL
# Configure Loguru
logger.add("wpostgresql.log", rotation="500 MB", retention="10 days")
class Person(BaseModel):
id: int
name: str
age: int
db = WPostgreSQL(Person, db_config)
logger.info("Starting database operations")
db.insert(Person(id=1, name="John", age=30))
logger.info("Insert complete")
people = db.get_all()
logger.info(f"Retrieved {len(people)} people")
17.8. CLI Usage¶
The package includes a CLI for common operations:
# Show help
wpostgresql --help
# Initialize a new database with a model
wpostgresql init --config db_config.json
# Sync schema
wpostgresql sync --model myapp.models
# Show version
wpostgresql --version
17.9. Error Handling¶
Proper error handling for production applications:
from pydantic import BaseModel
from wpostgresql import WPostgreSQL
from wpostgresql.exceptions import (
WPostgreSQLError,
ConnectionError,
OperationError,
TransactionError,
SQLInjectionError,
)
db_config = {
"dbname": "wpostgresql",
"user": "postgres",
"password": "postgres",
"host": "localhost",
}
class Person(BaseModel):
id: int
name: str
age: int
try:
db = WPostgreSQL(Person, db_config)
except ConnectionError as e:
print(f"Failed to connect: {e}")
except Exception as e:
print(f"Unexpected error: {e}")
try:
db.insert(Person(id=1, name="John", age=30))
except OperationError as e:
print(f"Operation failed: {e}")
try:
db.execute_transaction([
("INVALID SQL", None),
])
except TransactionError as e:
print(f"Transaction failed: {e}")
17.10. Type Hints and IDE Support¶
wpostgresql is fully typed for IDE support:
from pydantic import BaseModel
from wpostgresql import WPostgreSQL
class Person(BaseModel):
id: int
name: str
age: int
db: WPostgreSQL = WPostgreSQL(Person, db_config)
# IDE will suggest methods
# - insert()
# - get_all()
# - get_by_field()
# - update()
# - delete()
# - etc.
# Return types are clear
people: list[Person] = db.get_all()
person: list[Person] = db.get_by_field(name="John")
count: int = db.count()
# Async methods
async def get_people() -> list[Person]:
return await db.get_all_async()