Skip to content

Security

Secure your Axon memory systems with best practices and hardening guidelines.


Overview

Security is critical for production deployments. This guide covers authentication, authorization, encryption, PII protection, and security best practices for Axon-based applications.

Key Topics: - ✓ Authentication & authorization - ✓ Encryption (at rest & in transit) - ✓ PII detection & protection - ✓ Network security - ✓ Secret management - ✓ Audit logging


Authentication

API Key Authentication

# auth.py
from fastapi import FastAPI, HTTPException, Security
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
import os

app = FastAPI()
security = HTTPBearer()

# Load API keys from environment
VALID_API_KEYS = set(os.getenv("API_KEYS", "").split(","))

async def verify_api_key(credentials: HTTPAuthorizationCredentials = Security(security)):
    """Verify API key."""
    if credentials.credentials not in VALID_API_KEYS:
        raise HTTPException(status_code=401, detail="Invalid API key")
    return credentials.credentials

# Protected endpoints
@app.post("/store")
async def store(text: str, api_key: str = Security(verify_api_key)):
    """Store with API key auth."""
    await memory.store(text, user_id=api_key)
    return {"status": "success"}

JWT Authentication

# jwt_auth.py
from fastapi import Depends, HTTPException
from fastapi.security import HTTPBearer
from jose import JWTError, jwt
from datetime import datetime, timedelta

SECRET_KEY = os.getenv("JWT_SECRET_KEY")
ALGORITHM = "HS256"

security = HTTPBearer()

def create_access_token(user_id: str, expires_delta: timedelta = timedelta(hours=24)):
    """Create JWT token."""
    expire = datetime.utcnow() + expires_delta
    to_encode = {"sub": user_id, "exp": expire}
    return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)

async def get_current_user(token: str = Depends(security)):
    """Verify JWT and extract user."""
    try:
        payload = jwt.decode(token.credentials, SECRET_KEY, algorithms=[ALGORITHM])
        user_id = payload.get("sub")
        if user_id is None:
            raise HTTPException(status_code=401, detail="Invalid token")
        return user_id
    except JWTError:
        raise HTTPException(status_code=401, detail="Invalid token")

# Use in endpoints
@app.post("/store")
async def store(text: str, user_id: str = Depends(get_current_user)):
    await memory.store(text, user_id=user_id)
    return {"status": "success"}

Authorization

Role-Based Access Control (RBAC)

# rbac.py
from enum import Enum
from typing import Set

class Role(Enum):
    """User roles."""
    VIEWER = "viewer"
    USER = "user"
    ADMIN = "admin"

class Permissions:
    """Role permissions."""
    VIEWER = {"recall", "get"}
    USER = {"recall", "get", "store", "delete"}
    ADMIN = {"recall", "get", "store", "delete", "export", "compact"}

def check_permission(user_role: Role, operation: str) -> bool:
    """Check if role has permission for operation."""
    role_perms = getattr(Permissions, user_role.value.upper())
    return operation in role_perms

# Enforce in application
class SecureMemorySystem:
    """Memory system with RBAC."""

    def __init__(self, memory, user_role: Role):
        self.memory = memory
        self.role = user_role

    async def store(self, text: str, **kwargs):
        """Store with permission check."""
        if not check_permission(self.role, "store"):
            raise PermissionError("User lacks 'store' permission")

        return await self.memory.store(text, **kwargs)

    async def recall(self, query: str, **kwargs):
        """Recall with permission check."""
        if not check_permission(self.role, "recall"):
            raise PermissionError("User lacks 'recall' permission")

        return await self.memory.recall(query, **kwargs)

# Usage
admin_memory = SecureMemorySystem(memory, Role.ADMIN)
await admin_memory.store("Admin data")  # ✓ Allowed

viewer_memory = SecureMemorySystem(memory, Role.VIEWER)
await viewer_memory.store("Data")  # ✗ PermissionError

Data Access Control

# access_control.py
from axon.models.filter import Filter

class DataAccessController:
    """Control data access by user."""

    def __init__(self, memory, user_id: str, user_roles: Set[str]):
        self.memory = memory
        self.user_id = user_id
        self.user_roles = user_roles

    async def recall(self, query: str, **kwargs):
        """Recall with access control."""
        # Add user filter
        user_filter = Filter(metadata={"owner": self.user_id})

        # Merge with provided filter
        if "filter" in kwargs:
            existing_filter = kwargs["filter"]
            # Combine filters
            user_filter.metadata.update(existing_filter.metadata)

        kwargs["filter"] = user_filter

        # Only see own data
        return await self.memory.recall(query, **kwargs)

# Usage
user_memory = DataAccessController(memory, "user_123", {"user"})
results = await user_memory.recall("query")
# Only returns entries where metadata.owner == "user_123"

Encryption

Encryption at Rest

# encryption.py
from cryptography.fernet import Fernet
import os

class EncryptedMemorySystem:
    """Memory system with encryption at rest."""

    def __init__(self, memory, encryption_key: bytes = None):
        self.memory = memory
        self.key = encryption_key or Fernet.generate_key()
        self.cipher = Fernet(self.key)

    async def store(self, text: str, **kwargs):
        """Store encrypted text."""
        # Encrypt text
        encrypted_text = self.cipher.encrypt(text.encode())
        encrypted_str = encrypted_text.decode()

        # Store encrypted
        entry_id = await self.memory.store(encrypted_str, **kwargs)
        return entry_id

    async def recall(self, query: str, **kwargs):
        """Recall and decrypt."""
        # Encrypt query for matching
        encrypted_query = self.cipher.encrypt(query.encode()).decode()

        # Recall encrypted entries
        results = await self.memory.recall(encrypted_query, **kwargs)

        # Decrypt results
        for entry in results:
            try:
                decrypted = self.cipher.decrypt(entry.text.encode())
                entry.text = decrypted.decode()
            except Exception:
                pass  # Entry not encrypted or wrong key

        return results

# Usage
# Load encryption key from secure storage
encryption_key = os.getenv("ENCRYPTION_KEY").encode()
encrypted_memory = EncryptedMemorySystem(memory, encryption_key)

await encrypted_memory.store("Sensitive data")

Encryption in Transit (TLS)

# docker-compose.yml with TLS
services:
  app:
    build: .
    ports:
      - "443:443"
    volumes:
      - ./certs:/certs:ro
    environment:
      - SSL_CERT_FILE=/certs/server.crt
      - SSL_KEY_FILE=/certs/server.key

  redis:
    image: redis:7-alpine
    command: >
      redis-server
      --tls-port 6380
      --port 0
      --tls-cert-file /tls/redis.crt
      --tls-key-file /tls/redis.key
      --tls-ca-cert-file /tls/ca.crt
    volumes:
      - ./certs:/tls:ro
# TLS configuration
import ssl

# Redis with TLS
config = MemoryConfig(
    session=SessionPolicy(
        adapter_type="redis",
        adapter_config={
            "url": "rediss://redis:6380",  # Note: rediss://
            "ssl_cert_reqs": ssl.CERT_REQUIRED,
            "ssl_ca_certs": "/certs/ca.crt"
        }
    )
)

PII Protection

Automatic PII Detection

# pii_protection.py
from axon.core.privacy import PIIDetector
from axon.models.base import PrivacyLevel

class PIIProtectedMemorySystem:
    """Memory system with automatic PII detection."""

    def __init__(self, memory):
        self.memory = memory
        self.detector = PIIDetector()

    async def store(self, text: str, **kwargs):
        """Store with PII detection."""
        # Detect PII
        result = self.detector.detect(text)

        if result.has_pii:
            # Log PII detection
            logger.warning(f"PII detected: {result.detected_types}")

            # Set privacy level
            kwargs["privacy_level"] = result.recommended_privacy_level

            # Add metadata
            if "metadata" not in kwargs:
                kwargs["metadata"] = {}
            kwargs["metadata"]["pii_detected"] = True
            kwargs["metadata"]["pii_types"] = list(result.detected_types)

        return await self.memory.store(text, **kwargs)

# Usage
protected_memory = PIIProtectedMemorySystem(memory)
await protected_memory.store("My SSN is 123-45-6789")
# Automatically marked as RESTRICTED

PII Redaction

# pii_redaction.py
import re

class PIIRedactor:
    """Redact PII from text."""

    PATTERNS = {
        "ssn": (re.compile(r"\b\d{3}-\d{2}-\d{4}\b"), "[SSN]"),
        "email": (re.compile(r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b"), "[EMAIL]"),
        "phone": (re.compile(r"\b\d{3}-\d{3}-\d{4}\b"), "[PHONE]"),
        "credit_card": (re.compile(r"\b\d{4}[\s\-]?\d{4}[\s\-]?\d{4}[\s\-]?\d{4}\b"), "[CARD]")
    }

    def redact(self, text: str) -> str:
        """Redact PII from text."""
        redacted = text

        for pii_type, (pattern, replacement) in self.PATTERNS.items():
            redacted = pattern.sub(replacement, redacted)

        return redacted

# Usage
redactor = PIIRedactor()

original = "Email: john@example.com, SSN: 123-45-6789"
redacted = redactor.redact(original)
print(redacted)
# Output: "Email: [EMAIL], SSN: [SSN]"

# Store redacted version
await memory.store(redacted)

Network Security

Firewall Rules

# Allow only necessary ports
sudo ufw default deny incoming
sudo ufw default allow outgoing

# Allow application
sudo ufw allow 8000/tcp

# Allow Redis (internal only)
sudo ufw allow from 10.0.0.0/8 to any port 6379

# Allow Qdrant (internal only)
sudo ufw allow from 10.0.0.0/8 to any port 6333

# Enable firewall
sudo ufw enable

Network Segmentation

# docker-compose.yml with networks
version: '3.8'

services:
  app:
    networks:
      - frontend
      - backend

  redis:
    networks:
      - backend  # Not exposed to frontend

  qdrant:
    networks:
      - backend  # Not exposed to frontend

networks:
  frontend:
    driver: bridge
  backend:
    driver: bridge
    internal: true  # No external access

Secret Management

Environment Variables

# .env (DO NOT commit to git)
REDIS_PASSWORD=your-secure-password
QDRANT_API_KEY=your-api-key
OPENAI_API_KEY=your-openai-key
JWT_SECRET_KEY=your-jwt-secret
ENCRYPTION_KEY=your-encryption-key

Secrets Management Services

# aws_secrets.py
import boto3
import json

def get_secret(secret_name: str, region: str = "us-east-1") -> dict:
    """Get secret from AWS Secrets Manager."""
    client = boto3.client('secretsmanager', region_name=region)

    try:
        response = client.get_secret_value(SecretId=secret_name)
        return json.loads(response['SecretString'])
    except Exception as e:
        logger.error(f"Failed to get secret: {e}")
        raise

# Load secrets
secrets = get_secret("axon-production-secrets")

config = MemoryConfig(
    session=SessionPolicy(
        adapter_type="redis",
        adapter_config={
            "url": secrets["redis_url"],
            "password": secrets["redis_password"]
        }
    )
)

Audit Logging

Security Event Logging

# security_audit.py
from axon.core.audit import AuditLogger
from axon.models.audit import OperationType, EventStatus

audit_logger = AuditLogger()

async def log_security_event(
    event_type: str,
    user_id: str,
    success: bool,
    details: dict = None
):
    """Log security event."""
    await audit_logger.log_event(
        operation=OperationType.CUSTOM,
        user_id=user_id,
        status=EventStatus.SUCCESS if success else EventStatus.FAILURE,
        metadata={
            "event_type": event_type,
            "details": details or {}
        }
    )

# Log authentication attempts
await log_security_event(
    "authentication",
    user_id="user_123",
    success=True,
    details={"method": "jwt", "ip": "192.168.1.100"}
)

# Log authorization failures
await log_security_event(
    "authorization",
    user_id="user_456",
    success=False,
    details={"operation": "delete", "reason": "insufficient_permissions"}
)

Security Best Practices

1. Principle of Least Privilege

# ✓ Good: Minimal permissions
user_role = Role.USER  # Can store and recall
admin_role = Role.ADMIN  # Full access

# ✗ Bad: Everyone is admin
all_admin = Role.ADMIN

2. Defense in Depth

# ✓ Good: Multiple security layers
# - Authentication (JWT)
# - Authorization (RBAC)
# - Encryption (at rest + in transit)
# - PII detection
# - Audit logging
# - Network segmentation

# ✗ Bad: Single security layer
# - Only authentication

3. Regular Security Updates

# ✓ Good: Keep dependencies updated
pip install --upgrade axon-sdk
pip install --upgrade redis
pip install --upgrade qdrant-client

# Check for vulnerabilities
pip-audit

4. Secure Configuration

# ✓ Good: Secure defaults
config = MemoryConfig(
    session=SessionPolicy(
        adapter_type="redis",
        adapter_config={
            "url": "rediss://redis:6380",  # TLS
            "password": get_secret("redis_password"),
            "ssl_cert_reqs": ssl.CERT_REQUIRED
        }
    )
)

# ✗ Bad: Insecure configuration
config = MemoryConfig(
    session=SessionPolicy(
        adapter_type="redis",
        adapter_config={
            "url": "redis://redis:6379",  # No TLS
            "password": "password123"  # Hardcoded
        }
    )
)

Security Checklist

  • Authentication enabled (API keys or JWT)
  • Authorization implemented (RBAC)
  • Encryption at rest configured
  • TLS/SSL enabled for all connections
  • PII detection active
  • Secrets in environment variables or secrets manager
  • Firewall rules configured
  • Network segmentation implemented
  • Audit logging enabled
  • Regular security updates scheduled
  • Monitoring and alerting configured
  • Backup encryption enabled

Next Steps