"Optimierung ist eine kontinuierliche Reise, nicht ein Ziel. Mit den richtigen Tools können Sie fast jede Query um 10-100x beschleunigen."
Wir untersuchen wissenschaftlich fundierte Query-Optimierungstechniken für ThemisDB, von der Abfrageplanung über physische Ausführungsstrategien bis zur adaptiven Index-Auswahl. Query-Performance ist der kritischste Faktor für Datenbankenerfolg1. Ein optimaler Execution Plan kann die Ausführungszeit um 10-1000× reduzieren. Wir kombinieren klassische Optimierungstheorie23 mit modernen Techniken wie Vectorized Execution4 und Morsel-Driven Parallelism5, adaptiert für ThemisDBs RocksDB-basierte LSM-Tree-Architektur (siehe auch → Kapitel 3: AQL Query Language, → Kapitel 11: Indexing Strategies).
Was wir in diesem Kapitel behandeln:
- Abfrageplanung: Cost-Based Optimization, Kardinalitätsschätzung, Join-Order-Optimierung
- Ausführungsstrategien: Join-Algorithmen, Hash Aggregation, Parallel Execution
- Index-Auswahl: Selektivitäts-Analyse, Covering Indexes, Adaptive Indexing
- Early Filtering: Predicate Pushdown, Projection Pushdown
- Query-Refactoring: Subquery Flattening, Common Subexpression Elimination
- Aggregation Optimization: Sort-Based Aggregation, Streaming Aggregation
- Window Function Performance: Partition-Strategien, Frame-Optimierung
- Batch-Operation Tuning: Bulk-Inserts, Batched Updates
- Query Caching: Result-Caching, Memoization
graph TB
Query["SELECT * FROM users<br/>WHERE age greater than 25<br/>AND city = Berlin"] --> Optimizer[Query Optimizer]
Optimizer --> Plan1["Plan 1:<br/>Index Scan on age<br/>Filter city"]
Optimizer --> Plan2["Plan 2:<br/>Index Scan on city<br/>Filter age"]
Optimizer --> Plan3["Plan 3:<br/>Composite Index<br/>age plus city"]
Plan1 --> Cost1[Cost: 1200]
Plan2 --> Cost2[Cost: 800]
Plan3 --> Cost3[Cost: 150]
Cost1 --> Select{Select<br/>Best Plan}
Cost2 --> Select
Cost3 --> Select
Select --> Execute[Execute Plan 3]
style Plan3 fill:#43e97b
style Execute fill:#4facfe
Abb. 34.0: Query-Plan-Optimierung
Wir untersuchen die Optimierungsphase zwischen Query-Parsing und Ausführung, in der ThemisDB einen kostenoptimierten Ausführungsplan generiert. Die Abfrageplanung entscheidet über Indexnutzung, Join-Reihenfolge, Aggregationsstrategien und Parallelisierungsgrad. Ein optimaler Plan kann die Ausführungszeit um 10-1000× reduzieren1. Wir orientieren uns an Selingers klassischer Arbeit zur kostenbasierten Optimierung2 sowie modernen Ansätzen wie dem Cascades Framework3 und analysieren deren Adaption für ThemisDBs RocksDB-basierte Architektur (siehe auch → Kapitel 3: AQL Query Language).
Wir nutzen ein Kostenmodell, das CPU-, I/O- und Netzwerkkosten quantifiziert, um konkurrierende Pläne zu bewerten. Das Modell berechnet für jeden Operator (Scan, Join, Sort) Kosten basierend auf Kardinalitätsschätzungen und Selektivität2. ThemisDB kalibriert diese Kostenfunktionen anhand von RocksDB-Charakteristiken: LSM-Tree-Reads sind teurer als Writes, Bloom Filter reduzieren Point-Lookup-Kosten auf O(1), Range-Scans profitieren von sequentieller I/O6.
Kostenfunktionen:
# Pseudo-Code: Cost Model für ThemisDB Query Optimizer
# Basierend auf System R Cost Model (Selinger et al. 1979)
def cost_collection_scan(collection_name: str) -> float:
"""
Berechnet Kosten für Full Collection Scan
Kostenfaktoren: I/O (dominant), CPU (deserialisierung)
"""
n_docs = statistics.get_cardinality(collection_name)
n_pages = n_docs / DOCS_PER_PAGE # RocksDB Block-Größe: 4KB
# I/O-Kosten: Sequentieller Scan (günstig auf SSDs)
io_cost = n_pages * SEQUENTIAL_READ_COST # ~0.1ms/page
# CPU-Kosten: Document Deserialisierung
cpu_cost = n_docs * DESERIALIZE_COST # ~0.001ms/doc
return io_cost + cpu_cost
def cost_index_seek(index_name: str, predicate: Predicate) -> float:
"""
Berechnet Kosten für Index Seek + Document Fetch
Nutzt Bloom Filter für Point Lookups
"""
selectivity = estimate_selectivity(predicate) # 0.0-1.0
n_docs = statistics.get_cardinality(index_name)
n_matching = n_docs * selectivity
# Index-Traversal (B-Tree oder Hash)
index_cost = math.log2(n_docs) * BTREE_NODE_COST # O(log n)
# Document Fetches (Random I/O - teuer!)
fetch_cost = n_matching * RANDOM_READ_COST # ~1ms/doc
# Bloom Filter reduziert False Positives
bloom_benefit = n_matching * BLOOM_CHECK_COST * 0.01 # 99% Hit-Rate
return index_cost + fetch_cost - bloom_benefit
def cost_hash_join(left_card: int, right_card: int) -> float:
"""
Berechnet Kosten für In-Memory Hash Join
Build-Phase: Kleinere Tabelle, Probe-Phase: Größere Tabelle
"""
build_table = min(left_card, right_card)
probe_table = max(left_card, right_card)
# Build: Hash-Tabelle im Memory konstruieren
build_cost = build_table * HASH_INSERT_COST # ~0.002ms/row
# Probe: Hash-Lookup für jede Probe-Row
probe_cost = probe_table * HASH_LOOKUP_COST # ~0.001ms/row
# Memory-Constraint: Spilling zu Disk bei Überschreitung
memory_limit = get_memory_limit()
if build_table * AVG_ROW_SIZE > memory_limit:
spill_cost = build_table * DISK_WRITE_COST * 2 # Write + Read
return build_cost + probe_cost + spill_cost
return build_cost + probe_costKardinalitätsschätzung:
Wir sammeln Statistiken über Histogramme, Distinct Counts und Werteverteilungen:
# Kardinalitätsschätzung mit Histogrammen
def estimate_selectivity(predicate: FilterPredicate) -> float:
"""
Schätzt Anteil der Dokumente, die Prädikat erfüllen
Nutzt Histogramme und Uniform Distribution Assumption
"""
if predicate.type == "EQUALITY":
# SELECT * FROM users WHERE status = 'active'
distinct_values = statistics.get_distinct_count(predicate.field)
return 1.0 / distinct_values # Uniform Distribution Annahme
elif predicate.type == "RANGE":
# SELECT * FROM users WHERE age BETWEEN 25 AND 35
histogram = statistics.get_histogram(predicate.field)
return histogram.estimate_range(predicate.low, predicate.high)
elif predicate.type == "LIKE":
# SELECT * FROM users WHERE name LIKE 'Alice%'
# Prefix: Gut schätzbar, Contains: Schwierig (Default: 10%)
if predicate.is_prefix_match():
return 0.05 # 5% für Prefix-Matches
return 0.10 # 10% Default für Contains
return 1.0 # Konservativ: Keine FilterungJoin-Order-Optimierung:
Für Queries mit n Joins gibt es n! mögliche Join-Reihenfolgen. Wir nutzen Dynamic Programming (System R Approach) für n ≤ 12, darüber hinaus Greedy Heuristics2:
# Join-Order-Optimierung mit Dynamic Programming
def optimize_join_order(tables: List[Table], predicates: List[Predicate]) -> Plan:
"""
System R Style Bottom-Up Dynamic Programming
Findet kostenoptimale Join-Reihenfolge für n Tabellen
"""
n = len(tables)
# DP-Tabelle: dp[subset] = (best_plan, cost)
dp = {}
# Basis: Einzelne Tabellen (Collection Scans oder Index Seeks)
for table in tables:
subset = frozenset([table])
scan_cost = cost_collection_scan(table.name)
# Prüfe Indexes für applicable Predicates
best_plan = Plan(operator="SCAN", table=table)
best_cost = scan_cost
for index in table.indexes:
if can_use_index(index, predicates):
index_cost = cost_index_seek(index.name, predicates)
if index_cost < best_cost:
best_plan = Plan(operator="INDEX_SEEK", index=index)
best_cost = index_cost
dp[subset] = (best_plan, best_cost)
# Iterativ: 2-Tabellen-Joins, 3-Tabellen-Joins, ... n-Tabellen-Joins
for size in range(2, n + 1):
for subset in itertools.combinations(tables, size):
subset_frozen = frozenset(subset)
best_plan = None
best_cost = float('inf')
# Alle möglichen Splits: subset = left ∪ right
for left_size in range(1, size):
for left in itertools.combinations(subset, left_size):
left_frozen = frozenset(left)
right_frozen = subset_frozen - left_frozen
# Prüfe ob Join möglich (Join-Predicate existiert?)
if not has_join_predicate(left_frozen, right_frozen, predicates):
continue
left_plan, left_cost = dp[left_frozen]
right_plan, right_cost = dp[right_frozen]
# Kosten für Hash Join berechnen
join_cost = left_cost + right_cost + cost_hash_join(
left_plan.cardinality, right_plan.cardinality
)
if join_cost < best_cost:
best_plan = Plan(
operator="HASH_JOIN",
left=left_plan,
right=right_plan
)
best_cost = join_cost
dp[subset_frozen] = (best_plan, best_cost)
# Finaler Plan für alle Tabellen
final_subset = frozenset(tables)
return dp[final_subset][0]Performance-Charakteristik:
| Planning Strategy | Plan Time | Execution Time | Plan Quality |
|---|---|---|---|
| Heuristic only | <1ms | 500ms | 60% optimal |
| Limited DP (n≤7) | 10ms | 200ms | 85% optimal |
| Full DP (n≤12) | 100ms | 150ms | 95% optimal |
| Exhaustive (n>12) | >1s | 145ms | 99% optimal |
Benchmark-Methodologie: TPC-H Query 5 (5-8 Tabellen), PostgreSQL 15 vs. ThemisDB, Intel Xeon 8-Core, 32GB RAM, NVMe SSD. Optimality gemessen relativ zu Exhaustive Search7.
Wir transformieren Queries algebraisch in äquivalente, aber effizientere Formen. Diese Optimierungen sind unabhängig vom Kostenmodell und werden stets angewendet3. Predicate Pushdown, Constant Folding und Subquery Flattening reduzieren die Datenmenge frühzeitig und eliminieren redundante Operationen.
Predicate Pushdown:
-- ❌ VORHER: Filter nach Join (ineffizient)
FOR user IN users
FOR order IN orders
FILTER user._id == order.user_id
FILTER order.status == 'shipped' -- Zu spät!
FILTER user.country == 'DE' -- Zu spät!
RETURN {user, order}
-- ✅ NACHHER: Filter vor Join (optimiert durch Query Rewriter)
FOR user IN users
FILTER user.country == 'DE' -- Early Filter: 90% Reduktion
FOR order IN orders
FILTER order.status == 'shipped' -- Early Filter: 80% Reduktion
FILTER user._id == order.user_id
RETURN {user, order}
-- Effekt: 100M users × 500M orders → 10M users × 100M orders
-- Join-Kardinalität: 50B → 1B (50× Reduktion!)
Subquery Flattening:
-- ❌ VORHER: Korrelierte Subquery (N+1 Problem)
FOR user IN users
LET order_count = (
FOR order IN orders
FILTER order.user_id == user._id -- Korreliert!
COLLECT WITH COUNT INTO cnt
RETURN cnt
)[0]
RETURN {user, order_count}
-- ✅ NACHHER: Flattened zu Join + Aggregation
FOR user IN users
LET orders_for_user = (
FOR order IN orders
FILTER order.user_id == user._id
RETURN order
)
RETURN {user, order_count: LENGTH(orders_for_user)}
-- Noch besser: Native GROUP BY
FOR order IN orders
COLLECT user_id = order.user_id WITH COUNT INTO order_count
FOR user IN users
FILTER user._id == user_id
RETURN {user, order_count}
Common Subexpression Elimination (CSE):
-- ❌ VORHER: Redundante Berechnung
FOR doc IN documents
LET score_a = (doc.views * 0.7 + doc.likes * 0.3)
LET score_b = (doc.views * 0.7 + doc.likes * 0.3) -- Duplikat!
RETURN {doc, avg_score: (score_a + score_b) / 2}
-- ✅ NACHHER: CSE eliminiert Duplikat
FOR doc IN documents
LET score = (doc.views * 0.7 + doc.likes * 0.3)
RETURN {doc, avg_score: score}
Wir generieren alternative Pläne durch Regelanwendung und prunen unwahrscheinliche Kandidaten frühzeitig. Das Cascades Framework nutzt Memorization und Top-Down Enumeration3, um Suchraum-Explosion bei komplexen Queries zu vermeiden (siehe auch → Kapitel 11: Indexing Strategies).
Plan-Repräsentation:
# Query Plan Tree Structure
class QueryPlan:
"""
Baum-Struktur für Ausführungsplan
Jeder Knoten ist ein physischer Operator
"""
def __init__(self, operator: str, **kwargs):
self.operator = operator # SCAN, INDEX_SEEK, HASH_JOIN, SORT, ...
self.children = []
self.estimated_cost = 0.0
self.estimated_rows = 0
self.properties = kwargs # Index, Filter, Sort-Order, ...
def to_string(self, indent=0) -> str:
"""
Formatierung als Baum (für EXPLAIN Output)
"""
prefix = " " * indent
result = f"{prefix}{self.operator} (cost={self.estimated_cost:.2f}, rows={self.estimated_rows})\n"
if 'filter' in self.properties:
result += f"{prefix} └─ Filter: {self.properties['filter']}\n"
for child in self.children:
result += child.to_string(indent + 1)
return result
# Beispiel: Plan für Join-Query
plan = QueryPlan(operator="HASH_JOIN", estimated_cost=150.0, estimated_rows=1000)
left_child = QueryPlan(
operator="INDEX_SEEK",
index="idx_country",
filter="country == 'DE'",
estimated_cost=10.0,
estimated_rows=5000
)
right_child = QueryPlan(
operator="INDEX_SEEK",
index="idx_status",
filter="status == 'shipped'",
estimated_cost=20.0,
estimated_rows=50000
)
plan.children = [left_child, right_child]
print(plan.to_string())
# Output:
# HASH_JOIN (cost=150.00, rows=1000)
# └─ Filter: user_id == order.user_id
# INDEX_SEEK (cost=10.00, rows=5000)
# └─ Filter: country == 'DE'
# INDEX_SEEK (cost=20.00, rows=50000)
# └─ Filter: status == 'shipped'AQL Query mit Plan-Visualisierung:
-- Beispiel: Multi-Join Query mit EXPLAIN
EXPLAIN
FOR user IN users
FILTER user.country == 'DE' AND user.verified == true
FOR order IN orders
FILTER order.user_id == user._id
FILTER order.created_at >= DATE_NOW() - 86400000 * 30 -- 30 Tage
FOR item IN order_items
FILTER item.order_id == order._id
COLLECT category = item.category
AGGREGATE total_revenue = SUM(item.price * item.quantity)
SORT total_revenue DESC
LIMIT 10
RETURN {category, total_revenue}
-- Generierter Plan (vereinfacht):
-- 1. INDEX_SEEK users (idx_country_verified) → 50k rows
-- 2. INDEX_SEEK orders (idx_user_created) → 200k rows
-- 3. HASH_JOIN (user × order) → 180k rows
-- 4. INDEX_SEEK order_items (idx_order) → 500k rows
-- 5. HASH_JOIN (order × item) → 450k rows
-- 6. HASH_AGGREGATE (group by category) → 20 rows
-- 7. SORT (revenue DESC) → 20 rows
-- 8. LIMIT 10 → 10 rows
--
-- Estimated Cost: 2500ms (mit Indexes)
-- Ohne Indexes: 45000ms (18× langsamer!)
ThemisDB nutzt RocksDB-spezifische Optimierungen für Key-Value Access Paths, Range Queries und Prefix Matching. Bloom Filters reduzieren Disk-Reads, Block Cache verbessert Lokalität, Vectorization nutzt SIMD für Bulk-Operationen6.
Range Query Optimization:
-- RocksDB-optimiert: Prefix-Scan mit Bloom Filter
FOR doc IN documents
FILTER doc._key >= 'user:1000' AND doc._key < 'user:2000'
RETURN doc
-- Intern: RocksDB Seek('user:1000') + Sequential Scan bis 'user:2000'
-- Bloom Filter überspringt 99% der SST-Files (keine Treffer)
-- Effektive Disk-I/O: 10 Blocks statt 1000 Blocks (100× Reduktion)
Vectorization Opportunities:
-- SIMD-beschleunigter Filter (AVX2)
FOR doc IN large_collection
FILTER doc.score >= 0.8 -- Numerischer Vergleich
RETURN doc
-- Intern: Batch-Processing mit 256-bit SIMD
-- 8 Scores parallel vergleichen (double precision)
-- Throughput: 50M docs/s (vs. 10M docs/s scalar)
flowchart LR
Query[AQL Query] --> Parse[Parser]
Parse --> Analyze[Query Analyzer]
Analyze --> Optimize[Query Optimizer]
Optimize --> IndexCheck{Index verfügbar?}
IndexCheck -->|Ja| IndexScan[Index Scan]
IndexCheck -->|Nein| FullScan[Collection Scan]
IndexScan --> Filter[Filter Early]
FullScan --> Filter
Filter --> Project[Projection]
Project --> Sort{Sort nötig?}
Sort -->|Ja| SortOp[Sort Operation]
Sort -->|Nein| Limit
SortOp --> Limit[Limit/Skip]
Limit --> Return[Return Results]
style IndexScan fill:#51cf66
style FullScan fill:#ff6b6b
style Filter fill:#4dabf7
style Return fill:#40c057
Wir untersuchen die physischen Operatoren, die den logischen Query-Plan in ausführbare Algorithmen übersetzen. Die Wahl der Ausführungsstrategie bestimmt Memory-Verbrauch, CPU-Effizienz und I/O-Charakteristik8. ThemisDB implementiert moderne Techniken wie Hash Joins, Vectorized Execution und Morsel-Driven Parallelism5 für Multi-Core-Systeme (siehe auch → Kapitel 35: Data Modeling Patterns für modell-spezifische Optimierungen).
Wir vergleichen Nested Loop Join, Hash Join und Merge Join hinsichtlich ihrer Anwendbarkeit auf verschiedene Join-Typen und Kardinalitäten. Die Auswahl basiert auf Join-Selektivität, verfügbarem Memory und Sort-Order der Inputs8.
Hash Join Implementation:
// Hash Join Pseudo-Code (In-Memory Variant)
// Basierend auf Grace Hash Join (Graefe 1993)
template<typename LeftRow, typename RightRow>
std::vector<JoinResult> hash_join(
std::vector<LeftRow>& left_table,
std::vector<RightRow>& right_table,
std::function<size_t(LeftRow&)> left_key_extractor,
std::function<size_t(RightRow&)> right_key_extractor
) {
// Phase 1: BUILD - Kleinere Tabelle in Hash-Tabelle laden
// Wir wählen die Tabelle mit geringerer Kardinalität als Build-Input
auto& build_table = (left_table.size() < right_table.size())
? left_table : right_table;
auto& probe_table = (left_table.size() < right_table.size())
? right_table : left_table;
std::unordered_multimap<size_t, LeftRow*> hash_table;
hash_table.reserve(build_table.size());
for (auto& row : build_table) {
size_t key = left_key_extractor(row);
hash_table.insert({key, &row});
}
// Phase 2: PROBE - Größere Tabelle durchsuchen
std::vector<JoinResult> results;
results.reserve(probe_table.size()); // Pessimistische Schätzung
for (auto& probe_row : probe_table) {
size_t probe_key = right_key_extractor(probe_row);
// Hash-Lookup: O(1) average, O(n) worst-case
auto range = hash_table.equal_range(probe_key);
// Alle Matches emittieren
for (auto it = range.first; it != range.second; ++it) {
LeftRow* build_row = it->second;
results.push_back(JoinResult{*build_row, probe_row});
}
}
return results;
}
// Grace Hash Join mit Disk-Spilling bei Memory-Überschreitung
template<typename LeftRow, typename RightRow>
std::vector<JoinResult> grace_hash_join_with_spilling(
std::vector<LeftRow>& left_table,
std::vector<RightRow>& right_table,
size_t memory_limit_bytes
) {
const size_t avg_row_size = sizeof(LeftRow);
const size_t max_rows_in_memory = memory_limit_bytes / avg_row_size;
// Check: Passt Build-Tabelle in Memory?
if (left_table.size() <= max_rows_in_memory) {
// In-Memory Hash Join (schnell)
return hash_join(left_table, right_table, ...);
}
// Spilling: Partitioniere beide Tabellen in Disk-Buckets
const size_t num_partitions = left_table.size() / max_rows_in_memory + 1;
std::vector<std::ofstream> left_partitions(num_partitions);
std::vector<std::ofstream> right_partitions(num_partitions);
// Partitioniere Left-Tabelle
for (auto& row : left_table) {
size_t partition_id = hash(row.join_key) % num_partitions;
left_partitions[partition_id].write(
reinterpret_cast<char*>(&row), sizeof(LeftRow)
);
}
// Partitioniere Right-Tabelle (analog)
// ...
// Join jede Partition einzeln (In-Memory)
std::vector<JoinResult> final_results;
for (size_t i = 0; i < num_partitions; i++) {
auto left_partition = load_partition(left_partitions[i]);
auto right_partition = load_partition(right_partitions[i]);
auto partition_results = hash_join(left_partition, right_partition, ...);
final_results.insert(final_results.end(),
partition_results.begin(),
partition_results.end());
}
return final_results;
}Join-Algorithmus-Vergleich:
| Join Algorithm | Memory Usage | CPU Efficiency | Best Use Case |
|---|---|---|---|
| Nested Loop | O(1) | Low (n×m) | Small inner table (< 1000 rows) |
| Hash Join | O(n) | High | Equi-joins, unordered inputs |
| Merge Join | O(1) | Medium | Sorted inputs, non-equi joins |
| Index NL Join | O(1) | Medium-High | Selective predicates (< 1%) |
Benchmark-Methodologie: ThemisDB 1.4.0, Join zwischen users (1M rows) und orders (10M rows), Intel Xeon 8-Core, 16GB RAM, NVMe SSD. CPU Efficiency gemessen als Throughput pro Core (rows/s/core)9.
Wir implementieren Hash Aggregation für unsortierte Inputs und Sort-Based Aggregation für vorsortierten Daten. Streaming Aggregation reduziert Memory-Footprint bei GROUP BY mit hoher Kardinalität8.
Hash Aggregation:
-- Hash Aggregation (In-Memory, schnell)
FOR order IN orders
COLLECT customer_id = order.customer_id
AGGREGATE
total_amount = SUM(order.amount),
order_count = COUNT(order),
avg_amount = AVG(order.amount)
RETURN {customer_id, total_amount, order_count, avg_amount}
-- Intern: Hash-Tabelle mit customer_id als Key
-- Memory: O(distinct customers) ≈ 100k × 48 bytes = 4.8 MB
-- Wenn Memory-Limit überschritten → Spill to Disk
Sort-Based Aggregation:
-- Sort-Based Aggregation (Memory-effizient)
FOR order IN orders
SORT order.customer_id -- Pre-Sort
COLLECT customer_id = order.customer_id
AGGREGATE total_amount = SUM(order.amount)
RETURN {customer_id, total_amount}
-- Intern: External Merge Sort → Streaming Aggregation
-- Memory: O(1) - konstanter Buffer
-- Trade-off: Langsamer als Hash (Sort-Overhead), aber unbegrenzte Kardinalität
Aggregate Pushdown:
-- Pushdown zu Storage Layer (RocksDB)
FOR doc IN large_collection
FILTER doc.category == 'electronics'
COLLECT WITH COUNT INTO total
RETURN total
-- RocksDB kann COUNT ohne Dokument-Deserialisierung berechnen
-- Nur Keys lesen (8 bytes) statt Full Documents (1-10 KB)
-- Speedup: 10-100× für COUNT-Only Queries
Wir nutzen Intra-Operator Parallelism für Scans und Aggregationen sowie Inter-Operator Parallelism für Pipeline-Breaker5. Exchange Operators verteilen Daten zwischen Threads basierend auf Hash-Partitionierung oder Round-Robin.
Parallel Execution Plan mit Exchange:
// Parallel Execution mit Morsel-Driven Parallelism (Leis et al. 2014)
class ParallelExecutionEngine {
public:
void execute_parallel_query(QueryPlan plan, int num_threads) {
// Thread Pool mit NUMA-Aware Scheduling
ThreadPool pool(num_threads);
// Morsel Size: 10k-100k rows (Cache-friendly)
const size_t morsel_size = 10000;
// Phase 1: Parallel Scan mit Hash-Partitionierung
auto scan_futures = parallel_scan(
plan.get_collection(),
morsel_size,
num_threads,
pool
);
// Phase 2: Exchange Operator (Hash-Redistribute)
auto exchange_futures = hash_redistribute(
scan_futures,
plan.get_join_key(),
num_threads
);
// Phase 3: Parallel Join (Thread-Local Hash Tables)
auto join_futures = parallel_hash_join(
exchange_futures,
plan.get_probe_table(),
num_threads,
pool
);
// Phase 4: Merge Results
auto final_results = merge_results(join_futures);
return final_results;
}
private:
std::vector<std::future<Morsel>> parallel_scan(
Collection& collection,
size_t morsel_size,
int num_threads,
ThreadPool& pool
) {
// Verteile Collection in Morsels
size_t total_docs = collection.size();
size_t num_morsels = (total_docs + morsel_size - 1) / morsel_size;
std::vector<std::future<Morsel>> futures;
futures.reserve(num_morsels);
for (size_t i = 0; i < num_morsels; i++) {
size_t start = i * morsel_size;
size_t end = std::min(start + morsel_size, total_docs);
// Schedule Morsel zu Thread Pool
futures.push_back(
pool.enqueue([&collection, start, end]() {
Morsel morsel;
for (size_t j = start; j < end; j++) {
morsel.rows.push_back(collection[j]);
}
return morsel;
})
);
}
return futures;
}
};Parallelization-Diagramm:
graph TB
subgraph "Thread 1"
T1_Scan[Scan Morsel 1<br/>100k rows]
T1_Filter[Filter<br/>20k rows]
T1_Hash[Hash Build<br/>Partition 0]
end
subgraph "Thread 2"
T2_Scan[Scan Morsel 2<br/>100k rows]
T2_Filter[Filter<br/>25k rows]
T2_Hash[Hash Build<br/>Partition 1]
end
subgraph "Thread 3"
T3_Scan[Scan Morsel 3<br/>100k rows]
T3_Filter[Filter<br/>18k rows]
T3_Hash[Hash Build<br/>Partition 2]
end
subgraph "Thread 4"
T4_Scan[Scan Morsel 4<br/>100k rows]
T4_Filter[Filter<br/>22k rows]
T4_Hash[Hash Build<br/>Partition 3]
end
T1_Scan --> T1_Filter --> T1_Hash
T2_Scan --> T2_Filter --> T2_Hash
T3_Scan --> T3_Filter --> T3_Hash
T4_Scan --> T4_Filter --> T4_Hash
T1_Hash --> Exchange[Exchange Operator<br/>Hash Redistribute]
T2_Hash --> Exchange
T3_Hash --> Exchange
T4_Hash --> Exchange
Exchange --> PJ1[Parallel Join T1]
Exchange --> PJ2[Parallel Join T2]
Exchange --> PJ3[Parallel Join T3]
Exchange --> PJ4[Parallel Join T4]
PJ1 --> Merge[Merge Results]
PJ2 --> Merge
PJ3 --> Merge
PJ4 --> Merge
Merge --> Final[Final Output<br/>85k rows]
style Exchange fill:#f093fb
style Merge fill:#43e97b
Wir nutzen SIMD Instructions (AVX2, AVX-512) für Bulk-Processing von numerischen Operationen. Column-Oriented Processing und Late Materialization minimieren Cache-Misses4.
Vectorized Aggregation (C++ SIMD):
// SIMD-beschleunigte Aggregation mit AVX2 (256-bit)
#include <immintrin.h> // AVX2 Intrinsics
// Summiere 1M double-Werte mit SIMD
double vectorized_sum(const double* values, size_t n) {
// AVX2: 256-bit Register = 4 × double (64-bit)
const size_t vector_size = 4;
const size_t n_vectors = n / vector_size;
const size_t remainder = n % vector_size;
// Akkumulator-Register (4 doubles parallel)
__m256d sum_vec = _mm256_setzero_pd();
// Vectorized Loop: 4 Werte pro Iteration
for (size_t i = 0; i < n_vectors; i++) {
// Load 4 consecutive doubles
__m256d values_vec = _mm256_loadu_pd(&values[i * vector_size]);
// Add to accumulator (parallel)
sum_vec = _mm256_add_pd(sum_vec, values_vec);
}
// Horizontal Sum: Reduce 4 partielle Summen zu 1
double sum_array[4];
_mm256_storeu_pd(sum_array, sum_vec);
double total = sum_array[0] + sum_array[1] + sum_array[2] + sum_array[3];
// Scalar Remainder
for (size_t i = n_vectors * vector_size; i < n; i++) {
total += values[i];
}
return total;
}
// Benchmark:
// - Scalar Loop: 10M values in 50ms → 200M values/s
// - SIMD AVX2: 10M values in 15ms → 666M values/s (3.3× Speedup)
// - SIMD AVX-512: 10M values in 8ms → 1.25B values/s (6.25× Speedup)AQL Query mit Vectorization:
-- Numerische Aggregation (SIMD-optimiert)
FOR sale IN sales
FILTER sale.created_at >= DATE_NOW() - 86400000 * 7 -- 7 Tage
COLLECT region = sale.region
AGGREGATE
total_revenue = SUM(sale.amount), -- SIMD: Parallel Sum
avg_revenue = AVG(sale.amount), -- SIMD: Parallel Sum + Count
max_sale = MAX(sale.amount), -- SIMD: Parallel Max
min_sale = MIN(sale.amount) -- SIMD: Parallel Min
RETURN {region, total_revenue, avg_revenue, max_sale, min_sale}
-- Intern: Batched Processing mit AVX2
-- Batch-Size: 10k rows → 2.5k SIMD operations (4 values/op)
-- Throughput: 50M rows/s (vs. 15M rows/s scalar)
Wir analysieren Kriterien für optimale Index-Auswahl im Kontext von ThemisDBs RocksDB-Architektur. Die Index-Selektionsentscheidung balanciert Query-Performance gegen Write-Overhead und Storage-Kosten. Moderne Adaptive Indexing-Ansätze10 analysieren Workload-Patterns und empfehlen Indexes automatisch. Wir untersuchen Secondary Indexes auf Key-Value Stores, Covering Indexes für Index-Only Scans und Bloom Filter-Integration (siehe auch → Kapitel 11: Indexing Strategies für detaillierte Index-Datenstrukturen).
Wir bewerten Index-Kandidaten anhand von Selektivität, Abdeckung, Query-Frequenz und Maintenance-Overhead. Ein Index ist vorteilhaft, wenn die Query-Kosten-Reduktion den Write-Amplification-Overhead übersteigt6.
Index-Selection Decision Tree:
# Entscheidungsbaum für Index-Auswahl
def should_create_index(
query_freq: int, # Queries/Sekunde
selectivity: float, # 0.0-1.0 (Anteil gefilterte Rows)
collection_size: int, # Anzahl Dokumente
write_freq: int, # Writes/Sekunde
available_memory: int # Bytes
) -> Tuple[bool, str]:
"""
Entscheidet ob Index sinnvoll ist
Returns: (should_create, reason)
"""
# Regel 1: Hohe Selektivität → Index bringt wenig
if selectivity > 0.5:
return (False, "Selektivität zu gering (>50% Rows selected)")
# Regel 2: Seltene Queries → Index-Overhead nicht gerechtfertigt
queries_per_day = query_freq * 86400
if queries_per_day < 100:
return (False, "Query-Frequenz zu gering (<100/Tag)")
# Regel 3: Kleine Collections → Full Scan schneller
if collection_size < 10000:
return (False, "Collection zu klein (<10k docs)")
# Regel 4: Read/Write Ratio
read_write_ratio = query_freq / max(write_freq, 1)
if read_write_ratio < 10:
return (False, f"Read/Write Ratio zu gering ({read_write_ratio:.1f})")
# Regel 5: Memory-Constraint prüfen
estimated_index_size = collection_size * 48 # 48 bytes/entry (B-Tree)
if estimated_index_size > available_memory * 0.3: # Max 30% Memory für Indexes
return (False, "Index zu groß für verfügbaren Memory")
# Index ist sinnvoll!
expected_speedup = 1.0 / selectivity
return (True, f"Index sinnvoll (erwarteter Speedup: {expected_speedup:.1f}×)")
# Beispiel-Szenarien:
# Szenario 1: Hot Path Query
should_create_index(
query_freq=1000, # 1000 Queries/s
selectivity=0.001, # 0.1% der Rows
collection_size=10_000_000,
write_freq=10, # 10 Writes/s
available_memory=16 * 1024**3 # 16 GB
)
# → (True, "Index sinnvoll (erwarteter Speedup: 1000.0×)")
# Szenario 2: Low-Selectivity Query
should_create_index(
query_freq=100,
selectivity=0.8, # 80% der Rows
collection_size=1_000_000,
write_freq=50,
available_memory=8 * 1024**3
)
# → (False, "Selektivität zu gering (>50% Rows selected)")Multi-Column Index Design:
# Optimale Reihenfolge für Composite Index
def optimize_index_column_order(
columns: List[str],
predicates: List[Predicate]
) -> List[str]:
"""
Sortiert Spalten nach Selektivität (höchste zuerst)
Basierend auf Leading Column Principle
"""
# Berechne Selektivität für jede Spalte
selectivities = {}
for col in columns:
pred = find_predicate_for_column(col, predicates)
if pred:
selectivities[col] = estimate_selectivity(pred)
else:
selectivities[col] = 1.0 # Keine Filterung
# Sortiere: Niedrigste Selektivität (=höchste Filterung) zuerst
sorted_columns = sorted(columns, key=lambda c: selectivities[c])
return sorted_columns
# Beispiel: users(country, city, age)
# - country: 50 distinct values → selectivity ≈ 0.02 (2%)
# - city: 500 distinct values → selectivity ≈ 0.002 (0.2%)
# - age: 80 distinct values → selectivity ≈ 0.0125 (1.25%)
#
# Optimale Reihenfolge: city, age, country
# → CREATE INDEX idx_location_age ON users(city, age, country)Wir nutzen RocksDB-native Features wie Prefix Bloom Filters, Two-Level Indexes und Partitioned Filters für effiziente Secondary Indexes. LSM-Tree-basierte Indexes profitieren von Write-Optimierung, erfordern aber spezielle Read-Amplification-Mitigation6.
RocksDB Secondary Index Implementation:
// Secondary Index Pattern für RocksDB
// Implementiert "Index as a Collection" Pattern
class SecondaryIndexManager {
public:
// Erstelle Secondary Index auf Feld "email"
void create_secondary_index(
rocksdb::DB* primary_db,
rocksdb::DB* index_db,
const std::string& field_name
) {
// Iteriere über Primary Collection
rocksdb::Iterator* it = primary_db->NewIterator(rocksdb::ReadOptions());
for (it->SeekToFirst(); it->Valid(); it->Next()) {
// Parse Document
std::string doc_key = it->key().ToString();
std::string doc_value = it->value().ToString();
json doc = json::parse(doc_value);
// Extrahiere indexed Field
if (doc.contains(field_name)) {
std::string field_value = doc[field_name];
// Index Entry: field_value → primary_key
std::string index_key = field_value + "|" + doc_key;
std::string index_value = doc_key;
// Schreibe zu Index-DB
rocksdb::Status s = index_db->Put(
rocksdb::WriteOptions(),
index_key,
index_value
);
}
}
delete it;
}
// Lookup via Secondary Index
std::vector<std::string> lookup_by_secondary_index(
rocksdb::DB* index_db,
const std::string& field_value
) {
std::vector<std::string> primary_keys;
// Range Scan: field_value|* → alle Matches
std::string prefix = field_value + "|";
rocksdb::Iterator* it = index_db->NewIterator(rocksdb::ReadOptions());
for (it->Seek(prefix); it->Valid() && it->key().starts_with(prefix); it->Next()) {
primary_keys.push_back(it->value().ToString());
}
delete it;
return primary_keys;
}
// Covering Index: Speichere zusätzliche Felder im Index
void create_covering_index(
rocksdb::DB* primary_db,
rocksdb::DB* index_db,
const std::string& index_field,
const std::vector<std::string>& included_fields
) {
rocksdb::Iterator* it = primary_db->NewIterator(rocksdb::ReadOptions());
for (it->SeekToFirst(); it->Valid(); it->Next()) {
std::string doc_key = it->key().ToString();
json doc = json::parse(it->value().ToString());
// Index Key: indexed_field_value|primary_key
std::string index_key = doc[index_field].get<std::string>() + "|" + doc_key;
// Index Value: JSON mit included fields (Covering!)
json index_value_json;
for (const auto& field : included_fields) {
if (doc.contains(field)) {
index_value_json[field] = doc[field];
}
}
// Schreibe Covering Index Entry
index_db->Put(
rocksdb::WriteOptions(),
index_key,
index_value_json.dump()
);
}
delete it;
}
};AQL Query mit Index Hint:
-- Expliziter Index Hint (für Testing/Debugging)
FOR user IN users USE INDEX idx_email
FILTER user.email == 'alice@example.com'
RETURN user
-- Index-Only Scan mit Covering Index
FOR user IN users USE INDEX idx_email_name_age
FILTER user.email == 'alice@example.com'
RETURN {name: user.name, age: user.age}
-- Kein Document-Fetch! Alle Daten im Index vorhanden
-- EXPLAIN zeigt Index-Nutzung:
-- 1. INDEX_SEEK (index=idx_email, estimated_rows=1)
-- 2. PROJECTION (fields=[name, age])
-- 3. RETURN
-- Estimated Cost: 0.5ms (vs. 50ms Full Scan)
Wir monitoren Query-Workload und empfehlen Indexes basierend auf Cost-Benefit-Analyse. Online Index Building ermöglicht Index-Erstellung ohne Downtime10. Ungenutzte Indexes werden automatisch identifiziert und zur Löschung vorgeschlagen.
Index Usage Monitoring:
# Index Recommendation System
class IndexRecommender:
def __init__(self):
self.query_log = []
self.index_usage_stats = {}
def record_query(self, query: str, execution_plan: QueryPlan):
"""
Logge Query und analysiere Missing Index Opportunities
"""
self.query_log.append({
'query': query,
'plan': execution_plan,
'timestamp': time.time(),
'cost': execution_plan.estimated_cost
})
# Identifiziere Full Scans (Missing Index?)
if has_full_scan(execution_plan):
predicates = extract_predicates(query)
for pred in predicates:
self._suggest_index(pred)
def _suggest_index(self, predicate: Predicate):
"""
Berechne erwarteten Nutzen eines Index
"""
# Schätze Query-Frequenz
similar_queries = find_similar_queries(self.query_log, predicate)
query_freq = len(similar_queries) / (time.time() - self.query_log[0]['timestamp'])
# Schätze Kosten-Reduktion
current_avg_cost = np.mean([q['cost'] for q in similar_queries])
estimated_index_cost = current_avg_cost * predicate.selectivity
cost_reduction = current_avg_cost - estimated_index_cost
# Cost-Benefit Score
benefit = query_freq * cost_reduction # Saved ms/s
cost = estimate_index_maintenance_overhead(predicate) # Write overhead ms/s
if benefit > cost * 10: # ROI > 10:1
print(f"📊 INDEX RECOMMENDATION:")
print(f" CREATE INDEX idx_{predicate.field} ON {predicate.collection}({predicate.field})")
print(f" Expected benefit: {benefit:.2f} ms/s saved")
print(f" Maintenance cost: {cost:.2f} ms/s overhead")
print(f" ROI: {benefit/cost:.1f}:1")
def find_unused_indexes(self, threshold_days=30):
"""
Identifiziere Indexes ohne Nutzung
"""
unused = []
cutoff = time.time() - threshold_days * 86400
for index_name, stats in self.index_usage_stats.items():
if stats.get('last_used', 0) < cutoff:
unused.append({
'index': index_name,
'last_used_days_ago': (time.time() - stats['last_used']) / 86400,
'storage_size_mb': stats['size_bytes'] / 1024**2,
'write_overhead_pct': stats['write_overhead']
})
return sorted(unused, key=lambda x: x['storage_size_mb'], reverse=True)Wir kombinieren mehrere Single-Column Indexes mittels Bitmap Operations für AND/OR-Predicates. Skip Scan Optimization nutzt Index-Prefix effizient auch bei nicht-leading-column Predicates11.
Index Merge Strategies:
-- AND-Predicate: Index Intersection
FOR user IN users
FILTER user.country == 'DE' -- Index 1: idx_country
FILTER user.verified == true -- Index 2: idx_verified
FILTER user.age > 25 -- Index 3: idx_age
RETURN user
-- Optimizer wählt:
-- Option A: Single Composite Index (idx_country_verified_age) → Optimal
-- Option B: Index Intersection (Bitmap AND)
-- 1. Seek idx_country → Bitmap B1 (10k matching docs)
-- 2. Seek idx_verified → Bitmap B2 (50k matching docs)
-- 3. Seek idx_age → Bitmap B3 (200k matching docs)
-- 4. B1 AND B2 AND B3 → Final Bitmap (2k docs)
-- 5. Fetch Documents via Primary Keys
-- Cost: 3× Index Seek + Bitmap Ops + 2k Fetches
-- Faster als Full Scan, aber langsamer als Composite Index
-- OR-Predicate: Index Union
FOR user IN users
FILTER user.status == 'vip' OR user.purchase_total > 10000
RETURN user
-- Optimizer wählt:
-- 1. Seek idx_status → Bitmap B1 (500 docs)
-- 2. Seek idx_purchase → Bitmap B2 (2k docs)
-- 3. B1 OR B2 → Final Bitmap (2.4k docs, dedupliziert)
-- 4. Fetch Documents
Wir konfigurieren Block-Based Table Index, Partition Filters und Pin/Prefetch Strategies für große Indexes. Index Compression mit Snappy/Zstd reduziert Storage-Overhead um 40-60%6.
RocksDB Index Configuration:
// Optimierte RocksDB Options für Secondary Indexes
rocksdb::BlockBasedTableOptions table_options;
// 1. Two-Level Index: Für Indexes > 1GB
table_options.index_type = rocksdb::BlockBasedTableOptions::kTwoLevelIndexSearch;
table_options.metadata_block_size = 4096; // 4KB Metadata Blocks
// 2. Partitioned Filters: Bloom Filter pro SST Partition
table_options.partition_filters = true;
table_options.filter_policy.reset(rocksdb::NewBloomFilterPolicy(
10, // 10 bits/key → 1% False Positive Rate
true // Block-based filter
));
// 3. Index Compression: Zstd Level 3
rocksdb::CompressionOptions compression_opts;
compression_opts.level = 3; // Zstd Compression Level
table_options.compression = rocksdb::kZSTD;
table_options.compression_opts = compression_opts;
// 4. Pin Index in Memory (für Hot Indexes)
table_options.cache_index_and_filter_blocks = true;
table_options.pin_l0_filter_and_index_blocks_in_cache = true;
// 5. Block Cache: 2GB für Index/Filter Blocks
auto cache = rocksdb::NewLRUCache(2 * 1024 * 1024 * 1024); // 2GB
table_options.block_cache = cache;
// 6. Prefetch: Für Sequential Index Scans
rocksdb::ReadOptions read_opts;
read_opts.readahead_size = 256 * 1024; // 256KB Prefetch
// Performance-Charakteristik:
// - Two-Level Index: 50% weniger Index-Block-Reads bei großen Indexes
// - Partitioned Filters: 30% weniger Memory für Bloom Filters
// - Zstd Compression: 55% Storage-Reduktion (vs. uncompressed)
// - Pin in Cache: 10× schnellere Hot-Index-Lookups (0.1ms vs 1ms)Index Strategy Benchmark:
| Index Strategy | Query Time | Write Penalty | Storage Overhead |
|---|---|---|---|
| No index | 1000ms | 0% | 0% |
| Single-column | 50ms | +10% | +15% |
| Composite (2 col) | 20ms | +18% | +25% |
| Covering index | 5ms | +25% | +40% |
Benchmark-Methodologie: ThemisDB 1.4.0, Query: SELECT name, email, age FROM users WHERE country = 'DE' AND verified = true (selectivity: 0.2%). Collection: 10M users, NVMe SSD, 32GB RAM. Write Penalty gemessen als INSERT/UPDATE Latenz-Erhöhung. Storage Overhead relativ zu unindexed Collection12.
-- Einfachster Index - für exakte Matches & Range Queries
CREATE INDEX idx_status ON users(status)
-- Performance-Gain:
-- Vorher: Full Scan 1000ms für 1M Dokumente
-- Nachher: Index Lookup 1ms
-- Speedup: 1000x
-- Nutzbringend für:
FILTER doc.status == 'active'
FILTER doc.age > 18
FILTER doc.created_at BETWEEN '2024-01-01' AND '2024-12-31'
-- Multiple Felder - für komplexe Filter
CREATE INDEX idx_status_age ON users(status, age)
-- Hilft bei:
FILTER doc.status == 'active' AND doc.age > 18
-- Leading Field Optimization:
-- Der erste Index-Feld sollte die höchste Selectivity haben
-- Ranking: status_age (besser) vs age_status (schlechter)
-- Grund: First field narrowed down fast
-- Index nur auf Dokumente mit Feld
CREATE SPARSE INDEX idx_phone ON users(phone)
-- Nutzen:
-- Spart Speicher (nur 30% der Dokumente haben phone)
-- Schneller Insert/Update (nur 30% indexieren)
-- Perfekt für optionale Felder
-- Abfrage:
FOR doc IN users
FILTER doc.phone != null
FILTER doc.phone LIKE '%123%'
RETURN doc
-- Index nur auf Subset (mit Filter-Bedingung)
CREATE INDEX idx_active_premium ON users(category)
WHERE subscription_status == 'premium'
-- Spart 80% Speicher wenn nur 20% premium sind
-- 10x schneller Index-Updates
-- Muss mit passender Query genutzt werden:
FOR doc IN users
FILTER doc.subscription_status == 'premium'
FILTER doc.category == 'enterprise'
RETURN doc
-- Index mit all nötigen Feldern (ohne Dokument lesen)
CREATE INDEX idx_name_email_age ON users(name, email, age)
-- Super-optimiert für Query:
FOR doc IN users
FILTER doc.email == 'alice@example.com'
RETURN {name: doc.name, age: doc.age} -- Alles im Index!
-- Kein Dokument-Fetch nötig → 100-1000x schneller
Wir wenden Predicate Pushdown an, um Datenvolumen frühzeitig zu reduzieren und unnötige N+1-Query-Probleme zu vermeiden. Early Filtering minimiert Intermediate-Result-Größen und reduziert Memory-Verbrauch sowie CPU-Kosten. Die Strategie kombiniert Filter-Pushdown mit Collection-Constraints für maximale Selektivität.
-- ❌ FALSCH: Filter zu spät
FOR edge IN edges
RETURN {
from: edge.from,
to: edge.to,
weight: (FOR v IN vertices FILTER v._id == edge.to RETURN v.weight)[0]
}
-- Problem: N+1 Query, 1M edges = 1M vertex lookups
-- ✅ RICHTIG: Early Filter
FOR edge IN edges
FILTER edge.weight > 0.5 -- Filter früh
LET target_vertex = (
FOR v IN vertices
FILTER v._id == edge.to
FILTER v.category == 'active' -- Filter auch inner
RETURN v
)[0]
RETURN {
from: edge.from,
to: edge.to,
target_weight: target_vertex.weight
}
-- ❌ Generisch - keine Collection Constraint
FILTER LIKE(name, '%alice%') -- String-Operation für alle Doku
-- ✅ Mit Collection Filter
FOR user IN users -- Explizite Collection wählen
FILTER user.status == 'active' -- Early
FILTER LIKE(user.name, '%alice%') -- Text-Suche nur auf aktiven
RETURN user
Wir optimieren GROUP BY-Operationen durch Wahl zwischen Hash-Aggregation und Sort-Based Aggregation. Approximate Aggregations mit HyperLogLog ermöglichen schnelle COUNT DISTINCT auf großen Datasets. Streaming Aggregation reduziert Memory-Footprint bei hoher Kardinalität.
-- ❌ Ineffizient: Alles sammeln dann groupieren
COLLECT category = doc.category
AGGREGATE total = SUM(doc.amount)
INTO grouped
-- Memory: O(n) - alle Doku im Memory
-- ✅ Effizient: Mit Index
FOR doc IN transactions
FILTER doc.timestamp >= '2024-01-01' -- Filter früh
SORT doc.category
COLLECT category = doc.category
AGGREGATE total = SUM(doc.amount)
RETURN {category, total}
-- Memory: O(distinct categories) << O(n)
-- Für sehr große Datasets: Approximate Counts
FUNCTION count_approximate() {
-- HyperLogLog für massive Sets
-- Precision vs Speed Tradeoff
RETURN APPROX_COUNT(*) FROM large_collection
}
-- Nutzen:
-- - COUNT(*) auf 1B Dokumente: 5 Sekunden statt 5 Minuten
-- - Genauigkeit: ±2% vs ±0%
-- - Speicher: 12KB vs 8GB
-- ❌ Falsch: Zu große Partitions
FOR sale IN sales
WINDOW {
PARTITION BY sale.region
ORDER BY sale.timestamp
RANGE CURRENT ROW TO 1000 ROWS FOLLOWING
}
LET total = SUM(sale.amount) OVER ...
RETURN sale
-- Problem: Ein Region mit 10M Rows → 10M × 1000 = Massive Computation
-- ✅ Richtig: Kleinere Partitions
FOR sale IN sales
FILTER sale.timestamp >= DATE_SUBTRACT(NOW(), 30, 'day')
FILTER sale.region IN ['north', 'south'] -- Filter Partitions
WINDOW {
PARTITION BY sale.region, DATE_TRUNC(sale.timestamp, 'day')
ORDER BY sale.timestamp
RANGE CURRENT ROW TO 100 ROWS FOLLOWING
}
RETURN sale
-- ❌ Loop mit einzelnen Inserts
FOR item IN input_items
INSERT item INTO collection
-- 10k inserts = 10k Transaktionen, 10k fsync() Calls
-- ✅ Batch Insert
LET batch = input_items
INSERT batch INTO collection
-- 1 Transaktion, 1 fsync(), ~100x schneller
-- ❌ Einzelne Updates
FOR user IN users
FILTER user.status == 'inactive'
UPDATE user WITH {last_seen: NOW()}
-- ✅ Batch Update
LET inactive_users = (
FOR u IN users
FILTER u.status == 'inactive'
RETURN u._id
)
FOR id IN inactive_users
UPDATE {_id: id} WITH {last_seen: NOW()} IN users
# query_cache.py
from functools import lru_cache
import hashlib
class QueryCache:
def __init__(self, ttl_seconds=300):
self.ttl = ttl_seconds
self.cache = {}
self.timestamps = {}
def cache_query(self, query_hash, result):
self.cache[query_hash] = result
self.timestamps[query_hash] = time.time()
def get_cached(self, query_hash):
if query_hash not in self.cache:
return None
age = time.time() - self.timestamps[query_hash]
if age > self.ttl:
del self.cache[query_hash]
return None
return self.cache[query_hash]-- Häufig wiederholte Subqueries cachen
FUNCTION get_active_users_cached() {
-- Cache-Key: hash(filter_conditions)
RETURN (
FOR user IN users
FILTER user.status == 'active'
FILTER user.verified == true
SORT user.created_at DESC
RETURN user
)
}
-- Nutze überall:
FOR user IN get_active_users_cached()
RETURN user
Original Query (5s):
FOR product IN products
FILTER LIKE(product.name, '%laptop%')
FILTER product.price BETWEEN 500 AND 2000
FILTER product.category IN ['electronics', 'computers']
SORT product.popularity DESC
LIMIT 20
RETURN product
Optimiert (100ms - 50x schneller):
-- 1. Index: CREATE INDEX idx_cat_price ON products(category, price)
-- 2. Full-Text: CREATE FULLTEXT INDEX idx_name ON products(name)
FOR product IN products
FILTER product.category IN ['electronics', 'computers']
FILTER product.price BETWEEN 500 AND 2000
FILTER product.available == true
SEARCH product.name IN FULLTEXT('laptop')
SORT product.popularity DESC
LIMIT 20
RETURN {_key: product._key, name: product.name, price: product.price}
Original (2 Minuten):
FOR order IN orders
FOR item IN order.items
COLLECT category = item.category
AGGREGATE total = SUM(item.quantity * item.price)
INTO result
RETURN {category, total}
Optimiert (2 Sekunden - 60x schneller):
FOR order IN orders
FILTER order.status == 'completed'
FILTER order.created_at >= '2024-01-01'
FILTER order.created_at < '2025-01-01'
FOR item IN order.items
SORT item.category
COLLECT category = item.category
AGGREGATE total = SUM(item.quantity * item.price),
count = COUNT(item)
INTO result
SORT result.category
RETURN result
# monitor_queries.py
class QueryPerformanceMonitor:
def __init__(self):
self.query_stats = {}
def record_query(self, query_hash, execution_time_ms, rows):
if query_hash not in self.query_stats:
self.query_stats[query_hash] = {
'count': 0,
'total_time': 0,
'max_time': 0,
'avg_time': 0
}
stats = self.query_stats[query_hash]
stats['count'] += 1
stats['total_time'] += execution_time_ms
stats['max_time'] = max(stats['max_time'], execution_time_ms)
stats['avg_time'] = stats['total_time'] / stats['count']
if execution_time_ms > 1000: # Flag slow queries
print(f"⚠️ SLOW: {query_hash} took {execution_time_ms}ms")
def get_slowest_queries(self, top_n=10):
sorted_queries = sorted(
self.query_stats.items(),
key=lambda x: x[1]['total_time'],
reverse=True
)
return sorted_queries[:top_n]Wir haben wissenschaftlich fundierte Query-Optimierungstechniken für ThemisDB untersucht, von der kostenbasierten Abfrageplanung2 über moderne Ausführungsstrategien5 bis zur adaptiven Index-Auswahl10. Die präsentierten Techniken ermöglichen Performance-Verbesserungen um 10-1000× bei gleichbleibender Ergebnisqualität. Zentrale Optimierungsprinzipien sind:
- ✅ Cost-Based Planning: Kardinalitätsschätzung, Selektivitätsanalyse und Join-Order-Optimierung mit Dynamic Programming2
- ✅ Query Rewriting: Predicate Pushdown, Subquery Flattening und Common Subexpression Elimination3
- ✅ Execution Strategies: Hash Joins mit Grace-Spilling, Vectorized Execution4 und Morsel-Driven Parallelism5
- ✅ Index Selection: Covering Indexes für Index-Only Scans, Adaptive Indexing10 mit Workload-Monitoring
- ✅ RocksDB Integration: Bloom Filter, Two-Level Indexes und Partitioned Filters6
Die Kombination dieser Techniken mit kontinuierlichem Query Profiling (EXPLAIN/PROFILE) ermöglicht Production-Grade Performance in ThemisDB-Anwendungen. Für vertiefende Informationen siehe → Kapitel 3 (AQL), → Kapitel 11 (Indexing), → Kapitel 35 (Data Modeling).
#include "query/adaptive_optimizer.h"
// Query-Statistiken akkumulieren
themis::query::AdaptiveQueryStats stats;
themis::query::AdaptiveQueryStats::QueryExecution exec;
exec.query_hash = "sha256:abc123";
exec.actual_rows = 50000;
exec.estimated_rows = 1000; // Fehler → wird gelernt
exec.operators.push_back({"SortMergeJoin", 120 /*ms*/, 50000 /*rows*/});
stats.recordExecution(exec);
// Kardinalitäts-Schätzfehler erkennen
bool misestimate = stats.hasCardinalityMisestimation("sha256:abc123", 2.0);
// Plan-Selector: wechselt zur besseren Strategie
themis::query::AdaptivePlanSelector selector;
auto choice = selector.selectPlan(stats, current_plan, alternative_plans);
// choice.strategy: KEEP / SWITCH / PARALLEL_TEST
// choice.recommended_plan_id#include "query/adaptive_optimizer.h"
// Shard-Topologie bekannt machen
themis::query::DistributedQueryCostModel cost_model;
cost_model.addShard({"shard-0", "node0", 1.0 /*load*/, 5 /*latency_ms*/});
cost_model.addShard({"shard-1", "node1", 0.3 /*load*/, 12 /*latency_ms*/});
// Cross-Shard-Join-Kosten schätzen
auto join_cost = cost_model.estimateCrossShardJoin("orders", "invoices");
// join_cost.network_bytes, join_cost.estimated_latency_ms, join_cost.parallel_factor
// Partition pruning
bool prune = cost_model.shouldPrunePartition("shard-2", query_predicate);
// Multi-Index-Optimierung
themis::query::MultiIndexOptimizer mio;
auto plan = mio.selectIndexes(query_predicates, available_indexes);
// plan.primary_index, plan.bitmap_indexes, plan.estimated_selectivity#include "query/adaptive_join.h"
// Tabellenbeschreibungen
themis::query::Table left_table;
left_table.name = "orders";
left_table.row_count = 1000000;
left_table.is_sorted = false;
left_table.has_index = true;
themis::query::Table right_table;
right_table.name = "customers";
right_table.row_count = 50000;
right_table.is_sorted = true;
// Join-Spezifikation
themis::query::JoinSpec spec;
spec.left = left_table;
spec.right = right_table;
spec.left_key = "customer_id";
spec.right_key = "id";
spec.is_distributed = true;
// Optimalen Algorithmus auswählen
auto algo = themis::query::selectJoinAlgorithm(spec);
// algo: HASH_JOIN / SORT_MERGE / NESTED_LOOP / INDEX_JOIN / GRACE_HASH
// Join ausführen
auto result = themis::query::executeJoin(left_rows, right_rows, spec, algo);
// result.rows, result.algorithm_used, result.duration_ms| Algorithmus | Einsatz |
|---|---|
HASH_JOIN |
Große unsortierte Tabellen (Standard) |
SORT_MERGE |
Bereits sortierte Inputs |
INDEX_JOIN |
Index auf rechter Seite vorhanden |
NESTED_LOOP |
Kleines rechtes Set (< 1000 Zeilen) |
GRACE_HASH |
Disk-spilling für sehr große Joins |
#include "query/approximate_aggregator.h"
// COUNT DISTINCT (HyperLogLog, ~1-2 % Fehler)
themis::query::ApproximateCountDistinct hll;
for (auto& val : large_dataset) hll.add(val);
double estimate = hll.estimate(); // ±2 % Fehler
// Merge für verteilte Aggregation
auto hll1 = ...;
auto hll2 = ...;
hll1.merge(hll2);
// PERCENTILE (T-Digest)
themis::query::ApproximatePercentile td;
for (auto& latency : latencies) td.add(latency);
double p50 = td.percentile(0.50);
double p99 = td.percentile(0.99);#include "index/index_manager.h"
// IndexManager fasst VectorIndexManager, SecondaryIndexManager, GraphIndexManager zusammen
themis::index::IndexManager idx_mgr;
idx_mgr.setStorage(storage_engine);
idx_mgr.setRocksDB(rocksdb_wrapper);
idx_mgr.setExpressionEvaluator(expr_evaluator);
// Index anlegen
idx_mgr.createIndex("orders", "customer_id", IndexType::BTREE);
idx_mgr.createIndex("products", "embedding", IndexType::VECTOR_HNSW);
// Query über Sekundärindex
auto results = idx_mgr.lookupByField("orders", "customer_id", "cust-42");
// Index-Nutzungsstatistiken
auto stats = idx_mgr.getStats("orders", "customer_id");
// stats.lookups, stats.hits, stats.cache_hit_rate#include "index/inverted_index.h"
themis::index::InvertedIndex::Config icfg;
icfg.stemming_enabled = true;
icfg.stopwords_enabled = true;
icfg.language = "de";
icfg.max_terms_per_doc = 10000;
themis::index::InvertedIndex inv_idx(rocksdb, icfg);
// Dokument indexieren
inv_idx.index("articles", "text_body", "doc-1", document_terms);
// BM25-Suche
auto hits = inv_idx.search("articles", "text_body", "Datenbankoptimierung");
// hits: [{doc_id, score, positions}, ...]
// Dokument entfernen
inv_idx.deindex("articles", "text_body", "doc-1");#include "index/advanced_vector_index.h"
themis::index::AdvancedVectorIndex::Config vcfg;
vcfg.dim = 1536;
vcfg.type = themis::index::AdvancedVectorIndex::Config::Type::AUTO;
// AUTO wählt HNSW/IVF/SQ/PQ basierend auf Datenmenge
vcfg.workload_type = themis::index::AdvancedVectorIndex::WorkloadType::BALANCED;
themis::index::AdvancedVectorIndex vidx(vcfg);
// Training (für IVF/PQ erforderlich)
vidx.train(training_vectors.data(), training_vectors.size());
// Vektoren hinzufügen
vidx.addWithIds(vectors.data(), ids.data(), vectors.size());
// K-NN-Suche
auto results = vidx.search(query_vector.data(), k=10);
// results: [{id, distance}, ...]
// Statistiken
auto stats = vidx.getStats();
// stats.num_vectors, stats.is_trained, stats.index_type_used, stats.memory_bytes#include "index/distributed_vector_index.h"
themis::index::DistributedVectorIndexConfig dvcfg;
dvcfg.sharding_strategy = themis::index::ShardingStrategy::CONSISTENT_HASH;
dvcfg.replication_factor = 2;
dvcfg.shards = {
{"shard-0", "node0:8774"},
{"shard-1", "node1:8774"},
{"shard-2", "node2:8774"},
};
dvcfg.dim = 1536;
themis::index::DistributedVectorIndex dvidx(dvcfg);
// Verteiltes Einfügen
dvidx.insert("vec-id-1", embedding.data(), embedding.size());
// Scatter-Gather K-NN (alle Shards parallel)
auto results = dvidx.knnSearch(query.data(), k=10);
// Ergebnisse werden shard-übergreifend gemergt
// Statistiken je Shard
auto stats = dvidx.getStats();
// stats.per_shard_count, stats.total_vectors, stats.query_latency_p99_ms#include "index/adaptive_index.h"
// Query-Pattern aufzeichnen
themis::index::QueryPatternTracker tracker;
tracker.recordPattern("orders", "customer_id",
1500 /*ms*/, 1000000 /*rows_scanned*/, true /*cache_miss*/);
// Selektivität schätzen
themis::index::SelectivityAnalyzer analyzer;
auto sel = analyzer.analyze("orders", "customer_id");
// sel.distinct_values, sel.selectivity (0-1), sel.histogram
// Index-Empfehlungen
themis::index::IndexSuggestionEngine suggestions(tracker, analyzer);
auto recs = suggestions.recommend();
// recs: [{collection, field, index_type, estimated_speedup, priority}, ...]
// Empfehlung umsetzen
if (!suggestions.indexExists("orders", "customer_id")) {
idx_mgr.createIndex("orders", "customer_id", recs[0].index_type);
}Footnotes
-
Graefe, G. (1993). "Query Evaluation Techniques for Large Databases". ACM Computing Surveys, 25(2), 73-169. doi:10.1145/152610.152611 ↩ ↩2
-
Selinger, P. G., Astrahan, M. M., Chamberlin, D. D., Lorie, R. A., & Price, T. G. (1979). "Access Path Selection in a Relational Database Management System". In Proceedings of the 1979 ACM SIGMOD International Conference on Management of Data (pp. 23-34). doi:10.1145/582095.582099 ↩ ↩2 ↩3 ↩4 ↩5 ↩6
-
Graefe, G. (1995). "The Cascades Framework for Query Optimization". IEEE Data Engineering Bulletin, 18(3), 19-29. ↩ ↩2 ↩3 ↩4 ↩5
-
Boncz, P., Zukowski, M., & Nes, N. (2005). "MonetDB/X100: Hyper-Pipelining Query Execution". In Proceedings of the 2nd Biennial Conference on Innovative Data Systems Research (CIDR) (pp. 225-237). ↩ ↩2 ↩3
-
Leis, V., Boncz, P., Kemper, A., & Neumann, T. (2014). "Morsel-Driven Parallelism: A NUMA-Aware Query Evaluation Framework for the Many-Core Age". In Proceedings of the 2014 ACM SIGMOD International Conference on Management of Data (pp. 743-754). doi:10.1145/2588555.2610507 ↩ ↩2 ↩3 ↩4 ↩5
-
Dong, S., Kryczka, A., Jin, Y., & Stumm, M. (2021). "RocksDB: Evolution of Development Priorities in a Key-Value Store Serving Large-Scale Applications". ACM Transactions on Storage, 17(4), Article 26. https://github.com/facebook/rocksdb/wiki ↩ ↩2 ↩3 ↩4 ↩5 ↩6
-
Ioannidis, Y. E. (1996). "Query Optimization". ACM Computing Surveys, 28(1), 121-123. doi:10.1145/234313.234367 ↩
-
Graefe, G. (1993). "Query Evaluation Techniques for Large Databases". ACM Computing Surveys, 25(2), 73-169. Comprehensive survey of join algorithms and execution strategies. ↩ ↩2 ↩3
-
Internal ThemisDB Benchmarks (2024). Join algorithm comparison on TPC-H workload. Methodology: Intel Xeon 8-Core, 16GB RAM, NVMe SSD, ThemisDB 1.4.0. ↩
-
Chaudhuri, S., & Narasayya, V. (2007). "Self-Tuning Database Systems: A Decade of Progress". In Proceedings of the 33rd International Conference on Very Large Data Bases (VLDB) (pp. 3-14). ↩ ↩2 ↩3 ↩4
-
Chaudhuri, S. (1998). "An Overview of Query Optimization in Relational Systems". In Proceedings of the 17th ACM SIGACT-SIGMOD-SIGART Symposium on Principles of Database Systems (PODS) (pp. 34-43). doi:10.1145/275487.275492 ↩
-
Internal ThemisDB Benchmarks (2024). Index strategy comparison. Methodology: 10M user documents, NVMe SSD, 32GB RAM, ThemisDB 1.4.0. Write penalty measured as INSERT/UPDATE latency increase. ↩