Transactions¶
Atomic operations across multiple storage tiers using Two-Phase Commit (2PC).
Overview¶
The Transaction System provides atomic guarantees for operations spanning multiple storage tiers. Using the Two-Phase Commit (2PC) protocol, you can ensure that either all operations succeed or all are rolled back.
Key Features: - ✓ Two-Phase Commit (2PC) protocol - ✓ Atomic operations across tiers - ✓ Automatic rollback on failure - ✓ Context manager for easy use - ✓ Isolation level support - ✓ Timeout protection
Why Transactions?¶
Without Transactions¶
# ❌ Risk: Partial failure leaves inconsistent state
await memory.store("Important data", tier="ephemeral") # Success
await memory.store("Important data", tier="session") # Success
await memory.store("Important data", tier="persistent") # FAILS!
# Result: Data in ephemeral and session, but not persistent
# State is inconsistent!
With Transactions¶
# ✓ All-or-nothing guarantee
async with memory.transaction() as txn:
await txn.store("Important data", tier="ephemeral")
await txn.store("Important data", tier="session")
await txn.store("Important data", tier="persistent")
# If any fails, ALL are rolled back
Basic Usage¶
Context Manager (Recommended)¶
from axon import MemorySystem
memory = MemorySystem(config)
# Transactional store
async with memory.transaction() as txn:
await txn.store("Entry 1", tier="ephemeral")
await txn.store("Entry 2", tier="persistent")
# Commits automatically on exit
# Rolls back on exception
Manual Transaction Control¶
# Begin transaction
txn_id = await memory.begin_transaction()
try:
# Add operations
await memory.store("Entry 1", tier="ephemeral", transaction_id=txn_id)
await memory.store("Entry 2", tier="persistent", transaction_id=txn_id)
# Commit
success = await memory.commit_transaction(txn_id)
if not success:
raise RuntimeError("Transaction commit failed")
except Exception as e:
# Rollback on error
await memory.abort_transaction(txn_id)
raise
Two-Phase Commit Protocol¶
Phase 1: Prepare¶
All participants (adapters) prepare to commit:
# 1. Coordinator asks: "Can you commit?"
prepare_success = await adapter.prepare_transaction(txn_id)
# Each adapter:
# - Validates all operations
# - Reserves resources
# - Writes to transaction log
# - Responds: YES or NO
Phase 2: Commit/Abort¶
Based on Phase 1 responses:
if all(prepare_responses):
# All said YES → commit all
for adapter in adapters:
await adapter.commit_transaction(txn_id)
else:
# Any said NO → abort all
for adapter in adapters:
await adapter.abort_transaction(txn_id)
Isolation Levels¶
Available Levels¶
from axon.core.transaction import IsolationLevel
# Four standard isolation levels
IsolationLevel.READ_UNCOMMITTED # Lowest isolation (fast, risky)
IsolationLevel.READ_COMMITTED # Default (balanced)
IsolationLevel.REPEATABLE_READ # Higher consistency
IsolationLevel.SERIALIZABLE # Highest isolation (slow, safe)
Configure Isolation¶
from axon.core.transaction import TransactionCoordinator
coordinator = TransactionCoordinator(
adapters=adapters,
isolation_level=IsolationLevel.READ_COMMITTED,
timeout_seconds=30.0
)
memory = MemorySystem(config, transaction_coordinator=coordinator)
Operations¶
Store Operations¶
async with memory.transaction() as txn:
# Store across multiple tiers atomically
id1 = await txn.store("Entry 1", tier="ephemeral", importance=0.5)
id2 = await txn.store("Entry 2", tier="session", importance=0.7)
id3 = await txn.store("Entry 3", tier="persistent", importance=0.9)
# All three succeed or all three fail
Update Operations¶
async with memory.transaction() as txn:
# Update multiple entries atomically
entry1 = await memory.get("entry_1")
entry1.metadata.importance = 0.9
await txn.update(entry1, tier="persistent")
entry2 = await memory.get("entry_2")
entry2.metadata.tags.append("verified")
await txn.update(entry2, tier="persistent")
Delete Operations¶
async with memory.transaction() as txn:
# Delete from multiple tiers atomically
await txn.forget("entry_1", tier="ephemeral")
await txn.forget("entry_1", tier="session")
await txn.forget("entry_1", tier="persistent")
# Entry deleted from all tiers or none
Error Handling¶
Automatic Rollback¶
async with memory.transaction() as txn:
await txn.store("Entry 1", tier="ephemeral")
await txn.store("Entry 2", tier="persistent")
# Simulated error
if some_condition:
raise ValueError("Validation failed")
# Transaction automatically rolls back
# Neither entry is saved
Explicit Abort¶
txn_id = await memory.begin_transaction()
try:
await memory.store("Entry 1", tier="ephemeral", transaction_id=txn_id)
# Check business logic
if not business_logic_valid():
# Explicit abort
await memory.abort_transaction(txn_id)
return
await memory.store("Entry 2", tier="persistent", transaction_id=txn_id)
await memory.commit_transaction(txn_id)
except Exception as e:
await memory.abort_transaction(txn_id)
raise
Examples¶
Cross-Tier Consistency¶
async def store_with_consistency(memory, text: str):
"""Store in multiple tiers with consistency guarantee."""
async with memory.transaction() as txn:
# Store in ephemeral for fast access
await txn.store(
text,
tier="ephemeral",
importance=0.5,
tags=["recent"]
)
# Store in persistent for durability
await txn.store(
text,
tier="persistent",
importance=0.8,
tags=["important", "verified"]
)
# Both succeed or both fail
# No partial state
Batch Updates¶
async def update_importance_batch(memory, entry_ids: list[str], new_importance: float):
"""Update importance for multiple entries atomically."""
async with memory.transaction() as txn:
for entry_id in entry_ids:
entry = await memory.get(entry_id)
entry.metadata.importance = new_importance
await txn.update(entry, tier="persistent")
# All updates succeed or all fail
Data Migration¶
async def migrate_tier(memory, from_tier: str, to_tier: str):
"""Migrate all entries from one tier to another atomically."""
# Get all entries from source tier
entries = await memory.export(tier=from_tier)
async with memory.transaction() as txn:
# Copy to destination tier
for entry in entries:
await txn.store(
entry.text,
tier=to_tier,
importance=entry.metadata.importance,
tags=entry.metadata.tags
)
# Delete from source tier
for entry in entries:
await txn.forget(entry.id, tier=from_tier)
# Migration is atomic: all succeed or all fail
Performance¶
Overhead¶
| Aspect | Cost | Notes |
|---|---|---|
| Latency | +50-200ms | 2PC coordination overhead |
| Throughput | -30-50% | Sequential prepare + commit |
| Memory | +1KB per txn | Transaction state tracking |
Optimization Tips¶
# 1. Batch operations in single transaction
async with memory.transaction() as txn:
for entry in entries: # Batch in one txn
await txn.store(entry)
# vs multiple single-entry transactions (slower)
for entry in entries:
async with memory.transaction() as txn:
await txn.store(entry)
# 2. Use lower isolation levels for better performance
coordinator = TransactionCoordinator(
isolation_level=IsolationLevel.READ_COMMITTED # Faster than SERIALIZABLE
)
# 3. Set appropriate timeouts
coordinator = TransactionCoordinator(
timeout_seconds=10.0 # Fail fast on slow operations
)
Best Practices¶
1. Use for Critical Operations Only¶
# ✓ Good: Critical multi-tier operation
async with memory.transaction() as txn:
await txn.store("Payment record", tier="persistent")
await txn.store("Audit log", tier="session")
# ✗ Bad: Single-tier, non-critical operation
async with memory.transaction() as txn:
await txn.store("Temp cache", tier="ephemeral") # Overkill!
2. Keep Transactions Short¶
# ✓ Good: Quick transaction
async with memory.transaction() as txn:
await txn.store("Entry 1")
await txn.store("Entry 2")
# ✗ Bad: Long-running transaction
async with memory.transaction() as txn:
for i in range(10000): # Too many operations!
await txn.store(f"Entry {i}")
# Holds locks too long
3. Handle Errors Explicitly¶
try:
async with memory.transaction() as txn:
await txn.store("Critical data")
except Exception as e:
logger.error(f"Transaction failed: {e}")
# Notify user/admin
# Retry with exponential backoff
# Or fail gracefully
Adapter Support¶
Check Support¶
# Check if adapter supports transactions
supports_txn = await adapter.supports_transactions()
if supports_txn:
# Use transactional operations
async with memory.transaction() as txn:
await txn.store("data")
else:
# Fallback to non-transactional
await memory.store("data")
Adapter Implementation¶
| Adapter | Transaction Support | Notes |
|---|---|---|
| InMemory | ❌ No | Not needed (single-process) |
| Redis | ✅ Yes | MULTI/EXEC |
| ChromaDB | ❌ No | No transaction support |
| Qdrant | ❌ No | No transaction support |
| Pinecone | ❌ No | No transaction support |
Limitations¶
1. Performance Overhead¶
2PC adds latency due to coordination: - Prepare phase: N adapter calls - Commit phase: N adapter calls - Total: 2N round trips
2. Adapter Support Required¶
Not all adapters support transactions: - Vector databases often don't support 2PC - Fallback to best-effort consistency
3. Distributed Failures¶
Edge cases in distributed systems: - Network partitions - Coordinator crashes - Participant timeouts
Mitigation: - Use timeouts - Implement transaction recovery - Monitor transaction failures
Next Steps¶
-
Audit Logging
Audit transaction operations.
-
Compaction
Compact entries with consistency.
-
Production Deployment
Deploy with transactions enabled.