Skip to content

Performance Optimization

Optimize Axon memory systems for maximum throughput and minimal latency.


Overview

This guide covers performance optimization strategies for Axon-based applications, including benchmarking, profiling, and tuning recommendations.

Key Topics: - ✓ Performance benchmarks - ✓ Bottleneck identification - ✓ Optimization strategies - ✓ Caching patterns - ✓ Query optimization - ✓ Resource tuning


Benchmarks

Typical Performance

Operation InMemory Redis ChromaDB Qdrant Pinecone
store() 0.1-1ms 5-20ms 10-50ms 20-100ms 50-150ms
recall() 1-10ms 10-50ms 20-100ms 20-100ms 50-200ms
get() 0.1ms 2-10ms 5-20ms 10-50ms 20-100ms
delete() 0.1ms 2-10ms 5-20ms 10-50ms 20-100ms

Note: Latencies include embedding generation (~50-200ms for OpenAI)

Throughput

Adapter Reads/sec Writes/sec Scale
InMemory 50,000+ 50,000+ Single node
Redis 10,000+ 5,000+ Distributed
ChromaDB 1,000+ 500+ Single node
Qdrant 5,000+ 2,000+ Distributed
Pinecone 2,000+ 1,000+ Serverless

Bottleneck Identification

Profiling

import time
import logging
from axon import MemorySystem

logger = logging.getLogger(__name__)

async def profile_operation(operation_name: str, func, *args, **kwargs):
    """Profile an async operation."""
    start = time.time()
    result = await func(*args, **kwargs)
    duration = (time.time() - start) * 1000

    logger.info(f"{operation_name}: {duration:.2f}ms")
    return result

# Profile store operation
memory = MemorySystem(config)
await profile_operation("store", memory.store, "Test data", importance=0.8)

# Profile recall operation
await profile_operation("recall", memory.recall, "query", k=10)

Common Bottlenecks

  1. Embedding Generation (50-200ms)
  2. OpenAI API call latency
  3. Solution: Batch embeddings, use local models

  4. Network Latency (10-100ms)

  5. Redis/Qdrant round trips
  6. Solution: Connection pooling, request batching

  7. Vector Search (10-100ms)

  8. Large dataset similarity search
  9. Solution: Indexes, filtering, limit k

  10. Serialization (1-10ms)

  11. JSON encoding/decoding
  12. Solution: MessagePack, Protocol Buffers

Optimization Strategies

1. Batch Operations

# ❌ Slow: Individual operations
for text in texts:
    await memory.store(text)  # N round trips

# ✓ Fast: Batch operations
await memory.bulk_store(texts)  # 1 round trip

2. Connection Pooling

# ✓ Reuse connections
from axon.core.config import MemoryConfig
from axon.core.policies import SessionPolicy

config = MemoryConfig(
    session=SessionPolicy(
        adapter_type="redis",
        adapter_config={
            "url": "redis://localhost:6379",
            "max_connections": 50,  # Connection pool
            "socket_timeout": 5,
            "socket_connect_timeout": 5
        }
    )
)

3. Query Optimization

# ❌ Slow: Large result set
results = await memory.recall(query, k=1000)  # Too many results

# ✓ Fast: Limit results
results = await memory.recall(query, k=10)  # Just what you need

# ✓ Fast: Add filters
results = await memory.recall(
    query,
    k=10,
    filter=Filter(tags=["specific"])  # Reduce search space
)

4. Caching

from functools import lru_cache
import hashlib

# Cache embeddings
@lru_cache(maxsize=10000)
def get_cached_embedding(text: str):
    """Cache embeddings for frequently used text."""
    # Generate embedding once, reuse many times
    return embedder.embed(text)

# Use in store operations
embedding = get_cached_embedding(text)
await memory.store(text, embedding=embedding)

5. Async Operations

import asyncio

# ❌ Slow: Sequential
for text in texts:
    await memory.store(text)

# ✓ Fast: Concurrent
tasks = [memory.store(text) for text in texts]
await asyncio.gather(*tasks)

Adapter-Specific Tuning

InMemory Adapter

# Already optimized - no tuning needed
# Use for: Development, testing, ephemeral tier

Redis Adapter

config = MemoryConfig(
    session=SessionPolicy(
        adapter_type="redis",
        adapter_config={
            "url": "redis://localhost:6379",

            # Connection pool
            "max_connections": 50,
            "socket_timeout": 5,
            "socket_connect_timeout": 5,

            # Pipeline operations
            "decode_responses": True,

            # Namespace for isolation
            "namespace": "axon:session"
        }
    )
)

# Enable persistence
# redis.conf:
# appendonly yes
# appendfsync everysec  # Balance performance + durability

Qdrant Adapter

config = MemoryConfig(
    persistent=PersistentPolicy(
        adapter_type="qdrant",
        adapter_config={
            "url": "http://localhost:6333",
            "collection_name": "memories",

            # Increase timeout for large queries
            "timeout": 60,

            # Use async client
            "prefer_grpc": True  # Faster than HTTP
        }
    )
)

# Qdrant configuration (config.yaml):
# storage:
#   on_disk_payload: false  # Keep in RAM for speed
# service:
#   max_request_size_mb: 100

Pinecone Adapter

config = MemoryConfig(
    persistent=PersistentPolicy(
        adapter_type="pinecone",
        adapter_config={
            "api_key": "your-key",
            "index_name": "memories",
            "environment": "us-east1-gcp",

            # Use namespaces for isolation
            "namespace": "production"
        }
    )
)

# Pinecone automatically handles:
# - Auto-scaling
# - Load balancing
# - Replication

Embedding Optimization

Use Local Embedders

# ❌ Slow: OpenAI API (200ms per embedding)
from axon.embedders.openai import OpenAIEmbedder

embedder = OpenAIEmbedder()
# Each embedding: 200ms network latency

# ✓ Fast: Local sentence-transformers (10ms per embedding)
from axon.embedders.sentence_transformer import SentenceTransformerEmbedder

embedder = SentenceTransformerEmbedder(
    model_name="all-MiniLM-L6-v2"  # Fast, local
)
# Each embedding: 10ms on CPU, 1ms on GPU

Batch Embeddings

# ❌ Slow: Individual embeddings
for text in texts:
    embedding = await embedder.embed(text)  # N API calls

# ✓ Fast: Batch embeddings
embeddings = await embedder.embed_batch(texts)  # 1 API call

Cache Embeddings

import hashlib
import json
from pathlib import Path

class EmbeddingCache:
    """Cache embeddings to disk."""

    def __init__(self, cache_dir: str = ".embedding_cache"):
        self.cache_dir = Path(cache_dir)
        self.cache_dir.mkdir(exist_ok=True)

    def _get_key(self, text: str) -> str:
        """Generate cache key."""
        return hashlib.md5(text.encode()).hexdigest()

    async def get_embedding(self, text: str, embedder):
        """Get embedding from cache or generate."""
        key = self._get_key(text)
        cache_file = self.cache_dir / f"{key}.json"

        # Check cache
        if cache_file.exists():
            with open(cache_file) as f:
                return json.load(f)

        # Generate and cache
        embedding = await embedder.embed(text)
        with open(cache_file, 'w') as f:
            json.dump(embedding, f)

        return embedding

# Use cache
cache = EmbeddingCache()
embedding = await cache.get_embedding(text, embedder)

Memory Management

Set Compaction Thresholds

# Balance memory usage vs performance
config = MemoryConfig(
    ephemeral=EphemeralPolicy(
        compaction_threshold=1000,  # Compact at 1K entries
        compaction_batch_size=100
    ),
    session=SessionPolicy(
        compaction_threshold=10000,  # Compact at 10K
        compaction_batch_size=500
    ),
    persistent=PersistentPolicy(
        compaction_threshold=100000,  # Compact at 100K
        compaction_batch_size=1000
    )
)

Monitor Memory Usage

import psutil
import logging

logger = logging.getLogger(__name__)

def log_memory_usage():
    """Log current memory usage."""
    process = psutil.Process()
    mem_info = process.memory_info()

    logger.info(f"Memory usage: {mem_info.rss / 1024 / 1024:.2f} MB")

# Monitor periodically
import schedule

schedule.every(5).minutes.do(log_memory_usage)

Query Optimization

Use Filters

# ❌ Slow: Search all entries
results = await memory.recall(query, k=10)  # Searches millions

# ✓ Fast: Filter first
results = await memory.recall(
    query,
    k=10,
    filter=Filter(
        tags=["category:tech"],  # Narrow search space
        min_importance=0.7       # Only important entries
    )
)

Limit Result Count

# ❌ Slow: Too many results
results = await memory.recall(query, k=1000)  # Process 1000 results

# ✓ Fast: Reasonable limit
results = await memory.recall(query, k=10)  # Process 10 results

Use Appropriate Tier

# ❌ Slow: Search persistent for recent data
results = await memory.recall(query, tier="persistent")

# ✓ Fast: Search ephemeral for recent
results = await memory.recall(query, tier="ephemeral")

Load Testing

Benchmark Script

import asyncio
import time
from axon import MemorySystem

async def benchmark_store(memory, n: int = 1000):
    """Benchmark store operations."""
    start = time.time()

    tasks = [
        memory.store(f"Test entry {i}", importance=0.5)
        for i in range(n)
    ]

    await asyncio.gather(*tasks)

    duration = time.time() - start
    ops_per_sec = n / duration

    print(f"Store: {n} ops in {duration:.2f}s = {ops_per_sec:.0f} ops/sec")

async def benchmark_recall(memory, n: int = 1000):
    """Benchmark recall operations."""
    start = time.time()

    tasks = [
        memory.recall(f"query {i}", k=10)
        for i in range(n)
    ]

    await asyncio.gather(*tasks)

    duration = time.time() - start
    ops_per_sec = n / duration

    print(f"Recall: {n} ops in {duration:.2f}s = {ops_per_sec:.0f} ops/sec")

# Run benchmarks
memory = MemorySystem(config)
await benchmark_store(memory, 1000)
await benchmark_recall(memory, 1000)

Stress Testing

import asyncio
import time

async def stress_test(memory, duration_seconds: int = 60):
    """Stress test for duration."""
    start = time.time()
    operations = 0
    errors = 0

    async def worker():
        nonlocal operations, errors
        while time.time() - start < duration_seconds:
            try:
                await memory.store(f"Test {operations}", importance=0.5)
                operations += 1
            except Exception as e:
                errors += 1
                print(f"Error: {e}")

    # Run 10 concurrent workers
    await asyncio.gather(*[worker() for _ in range(10)])

    duration = time.time() - start
    print(f"Stress test: {operations} ops in {duration:.2f}s")
    print(f"Throughput: {operations / duration:.0f} ops/sec")
    print(f"Errors: {errors}")

# Run stress test
await stress_test(memory, duration_seconds=60)

Best Practices

1. Use Appropriate Adapters

# ✓ Good: Match adapter to use case
config = MemoryConfig(
    ephemeral=EphemeralPolicy(adapter_type="memory"),  # Fast
    session=SessionPolicy(adapter_type="redis"),       # Distributed
    persistent=PersistentPolicy(adapter_type="qdrant")  # Scalable
)

# ✗ Bad: Wrong adapter
config = MemoryConfig(
    ephemeral=EphemeralPolicy(adapter_type="pinecone")  # Overkill!
)

2. Enable Connection Pooling

# ✓ Good: Connection pool
adapter_config={"max_connections": 50}

# ✗ Bad: No pooling
adapter_config={}  # 1 connection per request

3. Monitor Performance

# ✓ Good: Track metrics
from axon.core.logging_config import log_performance

@log_performance
async def store_with_monitoring(memory, text):
    return await memory.store(text)

# ✗ Bad: No monitoring

Next Steps