OxiDB's aggregation pipeline lets you transform and analyze data through a series of stages. Each stage takes documents as input, processes them, and passes results to the next stage.

Pipeline Stages

StageDescription
$matchFilter documents (same syntax as find queries)
$groupGroup by field and compute aggregates (sum, avg, min, max, count)
$sortSort results
$skipSkip N documents
$limitLimit output to N documents
$projectReshape documents (include/exclude/rename fields)
$countCount documents and return as a single result
$unwindDeconstruct an array field into separate documents
$addFieldsAdd new computed fields to documents
$lookupJoin with another collection

Example: Sales Analytics

Given an "orders" collection, let's analyze sales by category:

from oxidb import OxiDbClient
db = OxiDbClient("127.0.0.1", 4444)

# Revenue by category, sorted highest first
results = db.aggregate("orders", [
    {"$match": {"status": "completed"}},
    {"$group": {
        "_id": "$category",
        "total_revenue": {"$sum": "$amount"},
        "order_count": {"$sum": 1},
        "avg_order": {"$avg": "$amount"},
        "max_order": {"$max": "$amount"}
    }},
    {"$sort": {"total_revenue": -1}},
    {"$limit": 10}
])

for r in results:
    print(f"{r['_id']}: ${r['total_revenue']:.2f} ({r['order_count']} orders)")

$unwind: Working with Arrays

Deconstruct array fields to analyze individual elements:

# Count products by tag
results = db.aggregate("products", [
    {"$unwind": "$tags"},
    {"$group": {
        "_id": "$tags",
        "count": {"$sum": 1}
    }},
    {"$sort": {"count": -1}}
])

$lookup: Joining Collections

Perform left outer joins between collections:

# Get orders with customer details
results = db.aggregate("orders", [
    {"$lookup": {
        "from": "customers",
        "localField": "customer_id",
        "foreignField": "_id",
        "as": "customer"
    }},
    {"$limit": 50}
])

$project: Reshaping Documents

# Create a summary view
results = db.aggregate("users", [
    {"$project": {
        "full_name": 1,
        "email": 1,
        "role": 1,
        "_id": 0
    }}
])

$addFields: Computed Fields

# Add a computed field
results = db.aggregate("products", [
    {"$addFields": {
        "discounted_price": {"$mul": ["$price", 0.9]}
    }},
    {"$sort": {"discounted_price": 1}}
])

Performance

OxiDB's aggregation pipeline uses parallel segmented scanning for the initial $match stage, distributing work across multiple threads. In benchmarks, the pipeline handles 1M documents in under 250ms.