OxiDB's aggregation pipeline lets you transform and analyze data through a series of stages. Each stage takes documents as input, processes them, and passes results to the next stage.
Pipeline Stages
| Stage | Description |
|---|---|
$match | Filter documents (same syntax as find queries) |
$group | Group by field and compute aggregates (sum, avg, min, max, count) |
$sort | Sort results |
$skip | Skip N documents |
$limit | Limit output to N documents |
$project | Reshape documents (include/exclude/rename fields) |
$count | Count documents and return as a single result |
$unwind | Deconstruct an array field into separate documents |
$addFields | Add new computed fields to documents |
$lookup | Join with another collection |
Example: Sales Analytics
Given an "orders" collection, let's analyze sales by category:
from oxidb import OxiDbClient
db = OxiDbClient("127.0.0.1", 4444)
# Revenue by category, sorted highest first
results = db.aggregate("orders", [
{"$match": {"status": "completed"}},
{"$group": {
"_id": "$category",
"total_revenue": {"$sum": "$amount"},
"order_count": {"$sum": 1},
"avg_order": {"$avg": "$amount"},
"max_order": {"$max": "$amount"}
}},
{"$sort": {"total_revenue": -1}},
{"$limit": 10}
])
for r in results:
print(f"{r['_id']}: ${r['total_revenue']:.2f} ({r['order_count']} orders)")
$unwind: Working with Arrays
Deconstruct array fields to analyze individual elements:
# Count products by tag
results = db.aggregate("products", [
{"$unwind": "$tags"},
{"$group": {
"_id": "$tags",
"count": {"$sum": 1}
}},
{"$sort": {"count": -1}}
])
$lookup: Joining Collections
Perform left outer joins between collections:
# Get orders with customer details
results = db.aggregate("orders", [
{"$lookup": {
"from": "customers",
"localField": "customer_id",
"foreignField": "_id",
"as": "customer"
}},
{"$limit": 50}
])
$project: Reshaping Documents
# Create a summary view
results = db.aggregate("users", [
{"$project": {
"full_name": 1,
"email": 1,
"role": 1,
"_id": 0
}}
])
$addFields: Computed Fields
# Add a computed field
results = db.aggregate("products", [
{"$addFields": {
"discounted_price": {"$mul": ["$price", 0.9]}
}},
{"$sort": {"discounted_price": 1}}
])
Performance
OxiDB's aggregation pipeline uses parallel segmented scanning for the initial $match stage, distributing work across multiple threads. In benchmarks, the pipeline handles 1M documents in under 250ms.
Discussion 0
No comments yet. Start the conversation.