Skip to content

Feature Guidelines from Leading Databases

Feature Guidelines from Leading Databases

Exadata Smart Scan (Predicate Pushdown to Storage)

One of the flagship features of Oracle Exadata is Smart Scan, which offloads heavy query processing tasks from database (compute) servers to storage servers. In Smart Scan, the storage tier filters rows and columns as data is read from disk, so that only the relevant subset of data is sent over the network to the compute nodes[1]. This includes predicate filtering (applying the WHERE clause) and column projection (only returning columns needed by the query). By doing this, Exadata dramatically reduces data transfer and CPU load on the database servers. For example, scanning a 1 TB table for certain rows might result in only a few megabytes of data being sent back if most rows are filtered out at the storage tier[2]. The implementation guideline for a new database would be to adopt a two-tier architecture where storage nodes are “intelligent.” The storage layer should be capable of performing predicate evaluation and data filtering locally. This means pushing down WHERE conditions and even certain SQL functions to the storage servers, similar to Exadata’s approach of evaluating a wide range of operators (=, !=, <, >, BETWEEN, IN, etc.) and common functions directly in the storage layer[1]. By processing data close to where it’s stored, we minimize network I/O. This also implies using a high-bandwidth, low-latency interconnect (as Exadata uses InfiniBand/RDMA) so that any data that is sent to compute nodes arrives quickly and with minimal overhead. Additionally, implementing Bloom filter pushdown for join processing can be beneficial (Exadata storage can apply bloom filters for joins in star schemas)[3]. Overall, the storage nodes in the new design should act as query processing engines for scans: they read data in large blocks, apply filters/projections, perform partial aggregations if possible, and return only the reduced dataset to the compute layer.

Exadata Hybrid Columnar Compression (HCC)

Another key Exadata feature is Hybrid Columnar Compression (HCC), which dramatically reduces storage usage and enhances scan performance. HCC uses a hybrid approach to store data: it groups table rows into compression units and stores columns together within each unit, achieving a columnar-like compression ratio while still being within a mainly row-based table structure[4]. Typical storage savings are on the order of 5x to 20x, with an average ~10x compression, which not only saves space but also reduces I/O for large scans[5]. The innovation in Exadata is that the storage servers handle compression and decompression, offloading this CPU work from the database servers[6]. In practice, for our new database, we should implement a compressed columnar storage format at the storage tier. The idea is to store data in a columnar fashion within each storage unit or partition (to get high compression and scan efficiency), but still allow the data to be shipped to compute nodes in a usable form. We can achieve something similar to HCC by grouping rows into large blocks (e.g. on the order of MBs) and compressing each column within the block. The storage node can then read compressed blocks, decompress or even operate on them if possible, and send the results. To maximize efficiency, the storage layer should also handle decompression and filtering on compressed data. Exadata’s Smart Scan is integrated with HCC so that it can apply predicates directly on compressed columns and only decompress the needed pieces[7]. We should follow that guideline: use vectorized decompression techniques in the storage nodes so that scans over compressed data run transparently fast – potentially faster than uncompressed, due to far less data movement[6]. We can offer different compression modes (e.g. a high-compression archival mode vs. a slightly lower compression mode for frequently accessed data), similar to HCC’s warehouse vs. archive compression levels[8]. Overall, hybrid columnar storage in the new system means we get the best of both worlds: excellent compression ratios (like a column store) and acceptable load/update performance (since data is still grouped in row-like chunks for modifications). This feature will significantly improve analytic query performance and reduce storage costs.

Oracle Partitioning and Sharding

Partitioning and Sharding are techniques to divide data into smaller chunks, which improve manageability and query performance, and allow scaling out across nodes. Oracle Database supports extensive table partitioning within a single instance (range, list, hash partitions, etc.), and it also offers Oracle Sharding for horizontal scaling across multiple database instances. In our new database, we should incorporate both levels: intra-node partitioning and cross-node sharding. Partitioning a large table (within one compute/storage group) means splitting it into segments by a key, so queries can prune partitions and only scan the necessary subset. This improves query performance (by reading less data) and also makes maintenance (like purging old data, or loading new data) more efficient, since you operate on one partition at a time. Oracle’s approach treats sharding as an extension of partitioning: shards are essentially partitions of the data spread across different physical databases in a shared-nothing architecture[9]. All shards together make up one logical database, transparent to the application[10][11]. We will adopt a similar approach.

Sharding design: The data will be horizontally partitioned by a shard key (for example, a tenant ID or geographic region or hash of a primary key) and distributed across independent nodes (each with its own storage). Each shard can be thought of as its own database that owns a subset of the data. The application will see a single database endpoint; behind the scenes, queries will be routed to the appropriate shard(s) based on the data needed. This shared-nothing design provides linear scalability and fault isolation: as Oracle notes, shards eliminate single points of failure and a slowdown or crash of one shard doesn’t affect others[12]. Our implementation will include a shard directory or coordinator that knows the mapping of data to shards (much like Oracle’s shard catalog). When we create a table, we can allow a DDL like CREATE TABLE … PARTITION BY … SHARD BY … to specify the partition key and shard key. Oracle Sharding automatically distributes partitions (shards) across shard nodes on creation[13], and our system can do similarly, possibly with the ability to add shards online and rebalance. We also plan to include replication within sharding: each shard will have one mirrored copy (a replica) on another node for high availability. This is akin to having Data Guard or active replicas for each shard in Oracle for fault tolerance[14], but in our case we’ll use an integrated replication mechanism (see the Sharding and Replication Strategy in the architecture section below for details).

In summary, by combining partitioning and sharding, the new database can scale out data volume and throughput. A single large table might be partitioned by date (for manageability) and also sharded by a customer ID (for distribution across nodes). Queries will benefit from partition pruning (only scanning relevant partitions on each shard) and from parallel execution across shards. The system should automatically route queries to the needed shards and aggregate results. Also, metadata about shards (which shard holds which key ranges) must be maintained. Oracle’s sharding makes the distribution transparent to clients[10]; likewise, our DB’s client interface will handle sharding internally so the user can run a single SQL query and the system will figure out the distribution.

LeanXcale Online Aggregation

LeanXcale is an emerging NewSQL database known for innovations in real-time analytics on operational data. A standout feature is its Online Aggregation (Online Aggregates) capability. This feature creates specialized, automatically maintained aggregation structures that update in real-time as data is inserted or updated[15]. In effect, these are like materialized aggregate views (e.g., a precomputed SUM/COUNT grouped by some dimensions) that incur minimal overhead on each transaction. Unlike traditional materialized views, LeanXcale’s online aggregates avoid locking or heavy recomputation; they are designed to update without introducing contention in concurrent transactions[15]. For our new database, implementing a similar feature means providing a way for users to define aggregations that the system maintains incrementally. For example, a user could specify:

CREATE ONLINE AGGREGATE SalesByRegion AS
SELECT region, SUM(amount) AS total_sales
FROM sales
GROUP BY region;

Under the hood, the system would create an internal table or index (a derived table) keyed by the group-by columns (here, region) to store the aggregate results[16]. Every time a new row is inserted into sales or an existing row is updated/deleted, the database would immediately adjust the SUM for the corresponding region in the aggregate table. This can be done efficiently with a lightweight, lock-free mechanism (for instance, using an atomic addition to the stored sum, or a multiversion approach to avoid write conflicts). The query optimizer should be aware of these online aggregates: if a user query asks for SELECT region, SUM(amount) FROM sales GROUP BY region, the optimizer can substitute a scan of the aggregate table instead of scanning the raw sales table[17]. LeanXcale’s documentation shows that after creating an online aggregate, the query plan changes to read from a derived table with the pre-summarized results, dramatically speeding up the query[18][16]. We will mirror this behavior.

In terms of guidelines, this feature requires integrating the aggregate maintenance with the storage engine’s transaction logic. Each insertion or update on the base table should trigger an update to the aggregate’s stored state. We must ensure this is atomic and consistent (probably by treating the base row and aggregate update as part of the same transaction). LeanXcale achieves non-blocking updates by doing the aggregate calculation at insertion time (essentially computing the new aggregate partial result during the insert) and likely uses an internal structure to avoid write contention[15]. We will consider approaches such as CRDTs (commutative replicated data types) or lock-free data structures for maintaining sums and counts. The outcome is that clients can run aggregation queries in O(1) or O(log n) time (just reading the precomputed result) instead of scanning large tables, enabling real-time analytics on fresh data. This is extremely useful for operational dashboards (like live KPI tracking) where you need up-to-the-second totals. Our new database thus inherits an unique selling point: the ability to handle OLTP and OLAP simultaneously, by maintaining analytics-friendly structures behind the scenes (similar in spirit to what LeanXcale and some time-series databases do).

Citus (Distributed PostgreSQL Extension)

Citus is a scale-out solution for PostgreSQL that influences our design for distributed SQL execution. As an extension, Citus turns a single-node Postgres into a distributed database by introducing the concept of distributed tables (shards) and a coordinator node that routes queries. A key design principle from Citus is co-location of data to optimize joins. In a Citus cluster, all tables that need to be joined are typically sharded on the same key so that matching values end up on the same node[19]. For example, if you shard both an orders table and a customers table by customer_id, Citus will place all rows for a given customer_id on one shard (and ensure those shards reside on the same worker node). This means a join between orders and customers for a particular customer can be executed completely on one node without cross-network data transfer[19][20]. We will adopt this coordinated sharding strategy. When users create a distributed table in our system, they will choose a distribution key. The system will hash or range-partition by that key across shards. We’ll encourage (and possibly automatically detect) cases where foreign key relationships share the same distribution key, allowing what Citus calls referenced tables and co-located tables. This will enable local transactions and joins for those related sets of data, vastly improving performance for multi-table operations (since joins or foreign key checks won’t require pulling data from other nodes in those cases).

Citus also provides a single logical database view even though under the covers there are many PostgreSQL instances. Similarly, our database will present a unified SQL interface (a single connection endpoint per cluster) and behind it there will be a coordinator or “compute leader” that plans queries across the nodes. Each node in our design can run an instance of the storage engine and query processor working on its shard of data (just as each Citus worker is a full Postgres instance). The coordinator will break down an incoming SQL query into sub-queries on shards, dispatch them (in parallel when possible), and combine the results. We should borrow Citus’s approach to handle different table types: Citus has distributed tables (sharded across nodes), reference tables (small tables replicated to all nodes, useful for lookup data that joins with shards), and local tables[21]. Our implementation will have a similar concept: we can allow certain tables to be replicated to all shards (or a subset) for efficiency, especially dimension or lookup tables.

Finally, we note that Citus supports distributed transactions (two-phase commit for transactions touching multiple shards) and provides some level of consistency across nodes. Our new system should include a transaction coordinator for multi-shard writes – possibly using a lightweight two-phase commit or a consensus protocol if needed. However, whenever possible, if a transaction or query only involves one shard, it should be executed directly on that shard without cluster-wide overhead. This is aligned with Citus’s philosophy to maximize single-shard work (where full ACID is preserved locally) and only coordinate when absolutely necessary.

In summary, from Citus we incorporate: the concept of a coordinator node for SQL parsing and global query planning, a mechanism to distribute tables across shards, co-location of related data by sharding key to enable efficient distributed joins[20], and support for reference tables and distributed transactions. This will give our database the ability to scale out while still supporting rich SQL semantics (including joins, foreign keys, and complex queries) across the cluster.

MySQL HeatWave (In-Memory Analytics & Mixed Workloads)

Oracle’s MySQL HeatWave is a MySQL-based system augmented with an in-memory, massively parallel query accelerator. It exemplifies how to combine OLTP and OLAP in one database engine. Key features of HeatWave that we plan to emulate include its in-memory columnar storage for analytics, vectorized execution, and real-time integration of operational data. HeatWave uses a hybrid columnar approach: when data from the MySQL InnoDB tables is loaded into the HeatWave cluster, it is stored in a compressed columnar format in memory, which is optimized for analytics[22]. The execution engine is highly parallel and vectorized, meaning it processes data in batches using CPU SIMD instructions for efficiency[23]. We will similarly design our query execution on the compute nodes to be vectorized and to operate on columnar data segments, especially for scans and aggregations. Our storage nodes can store data in a column-oriented way (for example, using something like Hybrid Columnar Compression or Snowflake-style micro-partitions), which the compute layer can then process with vectorized operators. The benefit is significant acceleration for analytical queries (scans, joins, group bys) due to better cache utilization and CPU throughput.

Another aspect of HeatWave is massive parallel partitioning: it partitions data across many nodes and also partitioning within nodes across cores[24]. An intelligent scheduler overlaps computation with communication to utilize thousands of cores efficiently[24]. In our architecture, we will likewise use a partitioned execution model: when a large query comes in, it can be broken into work on many partitions (shards) of data, processed in parallel by multiple storage nodes and multiple threads per node. We should design the compute node’s query planner to generate parallel execution plans (like exchange operators to gather results, etc.) and a scheduling mechanism that coordinates the tasks. The scheduler can attempt to pipeline data transfer with processing – for example, as soon as a storage node finishes scanning and filtering data, it can start sending partial results to the compute layer while continuing to process the next chunk (overlap network transfer with scan computation).

Crucially, HeatWave removes the need for a separate analytics database by replicating data from the transactional MySQL to the analytic engine in real-time[25]. Updates on the MySQL tables are propagated to the HeatWave in-memory store continuously, so queries always see up-to-date data without any manual ETL. In our design, since we unify compute and storage, we won’t have a separate copy exactly, but we want to ensure that fresh data is immediately available for analytical queries. This means our storage layer (where data is ingested for transactions) is the same place analytic queries will read (with columnar compression active). We might, however, implement something analogous to dual-format storage: newly inserted data might initially be in a row-based WAL or delta store for OLTP performance, and then transformed into columnar format in the background. This way OLTP writes are fast, and analytic reads eventually get the data in compressed columnar form. It’s similar to how some hybrid engines (like SingleStore or Oracle in-memory) handle data tiers. The main goal is zero ETL: the database should handle both workloads on the same data, possibly using background processes to reorganize data for analytics while keeping transactionally consistent.

We also note HeatWave’s new development of a Vector Store for AI/ML[26], which integrates vector similarity search into the database. In MySQL HeatWave, they announced the ability to store embeddings and perform similarity searches inside the database engine[26]. This aligns with our plan to have integrated vector data type support (see the separate Vector Data Storage and Index Module section). It validates the idea that a modern database should handle AI-oriented data types (vectors for machine learning) alongside traditional data.

Autopilot and automation: HeatWave includes a component called MySQL Autopilot which uses machine learning to automate indexing, provisioning, query plan improvements, etc.[27][28]. While implementation of such ML-driven features might be a future enhancement, our initial guideline is to design hooks for collecting statistics and possibly to include some smart tuning features (like automatic index suggestions or memory sizing). At minimum, our engine will gather detailed statistics on data distribution (as needed for partition pruning and query planning) and workload patterns. We may incorporate simplified versions of Autopilot features such as automatic secondary index creation for frequently queried columns (HeatWave can do this via ML predictions[29]). However, these are secondary; the primary takeaway from HeatWave is to achieve fast analytical query performance on fresh transactional data by using in-memory columnar, parallel execution, and eliminating the gap between OLTP and OLAP.

In summary, from MySQL HeatWave we derive the importance of a unified OLTP+OLAP engine with columnar in-memory processing and vectorization for speed[22]. Our database should allow users to run analytic queries (e.g. large scans, joins, aggregates on tables with millions of rows) directly on the live operational data, and get results fast – without offloading the data to an external warehouse. This will make the system suitable for mixed workloads and real-time analytics use cases.

Snowflake (Decoupled Storage & Compute, Micro-Partitioning)

Snowflake is a cloud data warehouse known for its separation of compute and storage and its automatic optimization through micro-partitioning. Adopting lessons from Snowflake will guide how we store data and scale compute independently in our design. First, Snowflake keeps all persistent data in a central storage layer (in their case, cloud object storage), and compute nodes (virtual warehouses) can be spun up or down on demand to query that data. We plan a similar separation: our storage tier will handle durable data storage (and heavy scanning/filtering as described earlier), while the compute tier will do query orchestration, final joins/aggregations, and present results to the user. They communicate over a network interface, meaning we can scale the two layers independently – for example, add more compute nodes to increase query throughput or concurrency without duplicating the data store, or scale out storage capacity without affecting the compute nodes. This design improves elasticity and resource utilization.

A specific Snowflake innovation is its micro-partitioning storage format. Instead of large, user-defined partitions, Snowflake automatically divides tables into small micro-partitions (50 MB to 500 MB of data each, before compression) and stores each in a compressed columnar format[30][31]. We will implement a storage format inspired by this. Concretely, each table (or each partition of a table) in our storage nodes could be broken into chunks of a certain size (say ~100 MB) stored back-to-back on disk (or in object storage if applicable), in a column-wise fashion. This fine granularity allows our storage servers to do very granular pruning of data during scans[32][33]. The trick is to store metadata for each chunk: Snowflake stores min and max values for each column in each micro-partition, and other stats like distinct count and bloom filters[34]. We will do the same; our storage layer will maintain a metadata index (a small in-memory or quickly accessible store) that maps to each data chunk’s key ranges or value ranges. When a query with a filter comes in, the storage node’s scan process can consult this metadata to skip entire chunks that do not qualify (this is sometimes called a “zone map” index or storage index). Snowflake’s documentation notes that if, for example, a query wants one hour of data out of a table with a year of data, ideally only 1/8760 of the micro-partitions would be scanned[33] – we should strive for that kind of efficiency. This means the new database doesn’t rely solely on user-defined partitions; it automatically partitions data as it is loaded, based on natural clustering of the insert order[35].

We should also consider Snowflake’s approach to table maintenance and transactions. Snowflake treats micro-partitions as immutable and implements operations like DELETE or UPDATE by creating new micro-partitions and marking old ones obsolete (an append-only storage approach). While our design might use a different update strategy (since we have a dedicated storage layer that can update in place or with MVCC), we can similarly optimize bulk operations. For instance, a TRUNCATE TABLE or dropping a partition can be a metadata-only operation (just drop references to micro-partitions)[36]. Also, dropping a column doesn’t rewrite data in Snowflake – it just updates metadata[37]; we might incorporate that idea to avoid expensive schema changes.

Another aspect is concurrency and workload isolation. Snowflake allows multiple compute clusters to query the same data without interference, which is achieved by no shared locks on storage (since storage is mostly read-only data files) and by scaling compute separately for different users/queries. In our architecture, we could similarly allow multiple compute nodes or compute clusters to serve different workloads (for example, one set of compute nodes for OLTP transactions, another for OLAP analytical queries, both accessing the same storage nodes). Since storage nodes are stateless in terms of transactions (they just serve data and do operations as requested), this is feasible. We will have to manage consistency (ensuring a transaction’s writes are visible to queries properly – likely via a form of MVCC or an epoch-based snapshot system, like Snowflake’s time-travel feature that keeps track of micro-partition snapshots over time).

In summary, implementing micro-partitioned, columnar storage with rich metadata indexing[34][33] will provide our database with self-tuning performance (automatic clustering and pruning) and easy scale-out. Combined with the separation of compute and storage, it means we can scale the compute layer (for more parallel query power or more concurrent users) without moving data, and scale storage (adding nodes for more data volume) without changing query logic – the two layers communicate via a well-defined protocol. This also aligns with cloud-friendly architecture: compute nodes could even be serverless or transient, spinning up to run a query workload and then spinning down, while the storage nodes (or storage service) persist the data. The decoupled design, learned from Snowflake, thus contributes to a highly flexible and efficient system.

Pinecone (Vector Similarity Search at Scale)

Pinecone is a specialized vector database, and it provides insight into how we should handle the new database’s vector data type and similarity search capability. One core principle is the use of sharding and replication to scale and make vector search fault-tolerant[38]. Pinecone (and vector databases in general) shard the vector index across multiple nodes: this means the set of all vectors is partitioned into subsets, each served by a node (or a pod)[39]. When a similarity query arrives (e.g. “find the nearest neighbors to vector X”), the system uses a scatter-gather approach: it sends the query to all shards (or a selected subset of shards), each shard finds the nearest vectors among its local subset, and then the results are merged to produce the final top-K nearest list[40]. We will adopt this pattern in our vector module. The vector data (embeddings stored in a vector column) will be partitioned across storage nodes. Each storage node (or a dedicated set of “vector index nodes”) holds an index structure (like an HNSW graph or IVF index) for only the vectors in its shard. A similarity search query will be executed in parallel on all shards’ indexes, and the compute layer will merge the results. This ensures we can scale to very large vector sets by adding more nodes (shards), and the query latency remains manageable as it’s processed in parallel.

For replication, as mentioned, Pinecone keeps multiple replicas of each shard so that if one node fails, the data isn’t lost and queries can still be answered[41]. In our design, since we already plan one mirrored copy per shard for quorum/HA, the vector data will naturally be replicated. We must ensure that each vector shard’s replica also has an up-to-date copy of the vector index. This likely means that when vectors are inserted, we insert them on both the primary and replica. Depending on consistency needs, we might choose a strong consistency model (a vector is only considered inserted after both primary and replica have it) akin to Pinecone’s strong consistency option[41], or an eventual consistency if we prioritize write throughput. Given we aim for quorum, a safe approach is to have a consensus or two-phase commit on inserts: both replicas must acknowledge the insert. For query performance, we can designate one replica as the leader that handles all queries for that shard (the other can take over if the leader fails). Alternatively, we could query both replicas and merge results for even more parallelism, but that doubles the work and typically isn’t needed unless we want extra-low latency via redundancy.

An interesting concept from Pinecone’s latest architecture is separation of compute and storage even for vector indexes, and techniques like geometric partitioning of vector space[42][43]. This means instead of every query hitting every shard, you can partition the vector space (for example via clustering algorithms or graph partitions) so that each query only needs to search a few relevant shards, reducing search cost. We should consider allowing such an approach. Concretely, we could maintain a top-level index (like a coarse quantization or clustering of vectors) that directs a query to the most likely shards containing nearest neighbors. This could be a future optimization; initially, a simpler scatter-gather might suffice. But we will design the system in a modular way so that vector indexes can be partitioned hierarchically. For example, use a k-means to partition vectors into, say, 8 clusters; assign each cluster to a shard. Then a query vector is first mapped to its nearest cluster(s) and only those shards are searched. This idea ties into Pinecone’s notion of using “highly sophisticated geometric partitioning” to break an index into sub-indices, focusing search on relevant partitions[43]. It improves latency and cost by not involving all nodes for every search.

Another Pinecone feature is the freshness layer for new vectors[44][45]. Because building vector index structures (like graphs or quantization) can be slow, Pinecone uses a freshness buffer where newly inserted vectors reside (and are searchable, though perhaps via brute force) until they are incorporated into the main index. We can implement a similar mechanism: newly inserted vectors could be stored in a simple structure (like an array or flat list on each node, or a small incremental HNSW) that is always searched in addition to the main index, to ensure even recently added data is found. Then a background process can periodically batch these new vectors into the optimized index. This way, inserts are fast and recent data is queryable within a few seconds[46][45]. Once the background job adds them to the main index, the temp storage can be cleared. This design will provide a balance between query performance and data freshness.

In terms of data model, a vector column in a table will likely be accompanied by some metadata columns (like an ID or associated attributes). We need to support hybrid queries: e.g., “find nearest neighbors to this vector among items where category = ‘electronics’”. Pinecone supports metadata filtering alongside vector search. To do this, our index querying on each shard can accept a metadata filter, and the shard will only consider vectors whose metadata meets the criteria (this could be done by storing a mapping from metadata values to vector IDs, or by post-filtering the candidate results). We should design the vector index so it can integrate with the SQL WHERE clause filtering. Possibly, we maintain an inverted index for common metadata filters or simply check each candidate result’s metadata against the filter.

Implementation considerations: We will likely use a proven vector index algorithm for the per-shard index. HNSW (Hierarchical Navigable Small World graph) is a good choice for its speed and ability to do incremental insertions. There’s also IVF (inverted file index with product quantization) which is good for very large data sizes. The implementation could leverage existing libraries (like FAISS or Annoy) integrated into our storage engine, or we could implement from scratch tailored to our data structures. Because our database is to be written with performance in mind (low-level programming), integrating a C++ or Rust library for vector search is viable.

To sum up, from Pinecone we incorporate: sharded vector indices with parallel search across shards[40], replication of those indices for fault tolerance[41], possible intelligent routing of queries to relevant shards (vector space partitioning)[43], and a mechanism to handle real-time inserts without compromising query speed (freshness layer)[45]. These will enable our new database to natively support vector similarity queries on big datasets, at scale and with high performance. The vector functionality will be built as a separate module in the architecture (see below), but tightly integrated so that a user can, for example, create a table with a VECTOR column and index it for ANN (Approximate Nearest Neighbor), and then query it with SQL combined with a vector similarity function. This gives the database a cutting-edge capability in the era of AI applications.


High-Level Language Integration (Python Compatibility)

Implementation Language and Performance Considerations

While the core of a high-performance database often requires low-level programming (for fine control over memory, threads, and hardware), we intend to use a hybrid approach that leverages both low-level and high-level languages. The primary engine components – such as storage management, query execution, indexing, and network communication – will be implemented in a low-level language like C or C++ (or possibly Rust) to ensure we have the “lowest programming level” control for performance-critical paths. This is essential for tasks like memory management of buffer pools, implementing lock-free data structures, SIMD vectorized operations, etc. However, we are open to using a high-level language (with Python being the preference) for certain layers of the system to speed up development and provide flexibility.

One approach is to use Python for orchestration and extensibility, while keeping the heavy lifting in C/C++. For example, the query planner and optimizer could be implemented or scripted in Python, which would make it easier to evolve and incorporate AI-driven optimization in the future. Python’s rich ecosystem could allow us to prototype new optimization algorithms quickly. We could embed a Python interpreter in the database for things like stored procedures or user-defined functions as well, giving users a familiar language to script data transformations. However, we must be cautious: pure Python is much slower than C++ for data-intensive work, so any performance-critical inner loops must be native code. We can leverage tools like Cython or PyPy, or use Python only to call into efficient C/C++ libraries (for example, a Python function might call a C library that performs a vector search or a sorting operation). In effect, Python serves as “glue” while the engine core remains in C/C++. This hybrid model is used in some systems (for instance, Dropbox’s EdgeDB uses Rust for core and Python for query language binding, or earlier versions of MonetDB had a SQL front-end in a higher-level language).

Given the preference for Python, another angle is client API compatibility and driver support. We want the compute server (the front-end of our DB) to be compatible with all major database client libraries in Python. This means speaking a protocol that these libraries understand. Two main candidates are the PostgreSQL wire protocol and the MySQL protocol, as those are widespread. By implementing, for example, the PostgreSQL wire protocol on our server, any existing Python library like Psycopg2 or SQLAlchemy (with a Postgres dialect) could connect to our database without needing a custom driver. This greatly enhances accessibility — users can use familiar tools and ORMs. Similarly, supporting the MySQL protocol would let MySQL Connector/Python or PyMySQL talk to our database. We have to choose one to start with; PostgreSQL’s protocol is often chosen by new systems (CockroachDB, Yugabyte, etc. all mimic Postgres) because it’s robust and open. We could also implement a custom Python DB-API 2.0 driver for our database, but that would require users to adopt our specific driver. To maximize adoption quickly, implementing a well-known protocol is better. So, a concrete guideline: Implement the PostgreSQL frontend protocol on the compute nodes so that any Postgres-compatible Python client can connect. This covers not just Python, but many other languages too (as they have Postgres drivers). Over time, we can expand to also support the MySQL protocol on a separate port, making our database a “drop-in replacement” for both ecosystems.

Using Python for parts of the system also means developers of the database can work at a higher abstraction level where performance is less critical. For instance, cluster management tasks, orchestration of node startup/shutdown, rebalancing shards, monitoring, etc., could be written in Python. Python’s extensive libraries for networking, concurrency (asyncio), and system automation would help build a robust management layer quickly. These tasks happen relatively infrequently and aren’t as latency-sensitive as query execution, so Python’s overhead is acceptable there. We might ship a Python-based administration tool or service that interacts with the cluster, making decisions (like moving a shard from one node to another, or restarting a failed node) based on policies. This aligns with the idea of a decoupled control plane (in Python) and data plane (in C++).

Python Integration for Data Science and AI

Since our database supports vectors and aims to attract modern data workloads, tight integration with Python (the lingua franca of data science) is highly beneficial. We will provide Python client libraries or bindings that let users easily run queries and get results as, say, Pandas DataFrames or NumPy arrays. Perhaps we’ll develop an ORM or a lightweight query interface in Python for convenience. Additionally, supporting embedded Python UDFs (User-Defined Functions) inside the database is a possible feature: e.g., allow a user to write a Python function and register it in the database to be applied to each row of a query. Some databases like PostgreSQL (with PL/Python) or Oracle (with Oracle Machine Learning for Python) allow running Python logic close to the data. We can provide a sandboxed way to execute Python code on the compute nodes for complex calculations that SQL cannot easily express, especially useful in the context of machine learning (for instance, a UDF that applies an ML model to a row). This again should be done carefully to avoid slowing down the system – maybe the Python UDF executes in parallel via multiple Python interpreter instances or using Python’s C API for efficient data exchange (e.g. use Apache Arrow to pass columnar data to a Python environment).

Lastly, documentation and examples will emphasize Python usage: we might ship a Python SDK with our database that wraps around the lower-level drivers, providing high-level functions to do vector similarity searches, aggregate queries, etc., in a pythonic way. The end goal is that a developer or data scientist can install our database, and within a Python Jupyter notebook, connect and run both SQL queries and vector searches with minimal friction.

To summarize, Python compatibility operates on two levels: 1. Driver/Protocol level: making sure our database can be accessed via Python’s database libraries (preferably by emulating an existing protocol like PostgreSQL’s). This provides out-of-the-box compatibility with Python ecosystem tools[47] (MySQL HeatWave did similarly – they kept compatibility with MySQL clients, so existing applications don’t need changes[48]). 2. Development/Integration level: possibly using Python to implement non-critical components of the DBMS and allowing Python to be used for extensibility (procedures, UDFs) and management. This hybrid implementation can speed up our development and make the system more flexible, without sacrificing the performance of core operations handled in C/C++.

In conclusion, Python will play a significant role in our system’s usability and development. We ensure that from day one, a user can connect with a Python client and issue queries. Internally, we leverage Python where it makes sense, but we carefully write the core in a performant systems language. This way, we align with modern trends (like using Python in cloud database orchestration, or for AI integration) while still meeting the demands of a “leading-edge” high-speed database engine.


High-Level System Design and Architecture

Two-Tier Decoupled Architecture Overview

The new database system is designed with a two-tier decoupled architecture, logically separating the Compute layer from the Storage layer. This is inspired by systems like Oracle Exadata, Snowflake, and others that leverage dedicated tiers for specialized tasks. In our architecture, Compute Servers (Compute Nodes) form the upper tier and Storage Servers (Storage Nodes) form the lower tier. Physically, these could be separate server processes (or even separate machines/instances in a cluster). The communication between compute and storage happens over a high-speed network interface using a custom protocol optimized for bulk data transfer and pushdown operations. The decoupling means we can scale each layer independently: if the workload has become more CPU-intensive (many concurrent queries), we can add more compute nodes; if it has more data to store or higher scan throughput needs, we add more storage nodes.

Each Compute Server is responsible for accepting client connections, parsing and planning SQL queries, and coordinating query execution across the cluster. Compute servers maintain the global metadata (like the schema, partition/shard map, and cluster configuration). They do not store persistent user data themselves (aside from perhaps caching). Instead, they pull data from storage nodes on-demand. Compute nodes also execute any parts of the query plan that can’t be pushed down (for example, final aggregations or joins that require data from multiple storage nodes, sorting results, etc.).

Each Storage Server, on the other hand, is in charge of storing partitions of the data (both relational and vector data) and performing local processing on that data when instructed. They manage local disk storage (or SSD/NVMe), handle indexes, and can execute scan operations, apply predicates, perform partial aggregations or join filtering (like Bloom filters) as pushed down from compute. The storage servers implement low-level data structures like B-trees, columnar storage files, vector indexes, etc., and ensure data is durable and replicated.

This separation is conceptually shown below (in textual form):

  • Clients (applications, using Python drivers, etc.) connect to a Compute Node (which may be behind a load balancer or a coordinator address).

  • The Compute Node parses the SQL query, consults the metadata to see which shards/partitions of data are needed, then dispatches sub-queries or scan requests to the relevant Storage Nodes in parallel.

  • Each Storage Node scans its local data (using its CPU for predicate filtering, etc.) and returns results (filtered rows, or intermediate aggregates) back to the Compute Node.

  • The Compute Node then assembles these results (e.g., merging sorted streams, computing a global aggregate, or joining results from different storage nodes) to produce the final answer, which is then sent to the client.

The network protocol between compute and storage will support sending operations to storage nodes in a batch manner. For instance, the compute node might send a message like “Scan Table A partition X with predicate (col1 > 100 and col2 = ‘USA’), return columns col1, col3, aggregate on col2 …”. The storage node will interpret and execute this, similar to how an Exadata storage cell executes a piece of a query. We will design this protocol to allow not just scans, but also pushing down other operations when possible (like a sort or a index lookup request).

Because the architecture is shared-nothing at the storage layer (each storage node largely works on its own data slice), we avoid centralized bottlenecks. Compute nodes do have to orchestrate and merge results, but those can also scale-out if we allow multiple compute nodes to share the load of different queries or even distribute the work of a single query (for instance, a multi-stage distributed execution plan).

Compute Nodes (SQL Processing Layer)

The Compute Nodes act as the brains of the database cluster. They run the SQL execution engine and the transaction coordinator. Important responsibilities of compute nodes include:

  • SQL Parser and Planner: When a query arrives, the compute node parses SQL into an internal query plan. It looks at the global catalog to understand which tables are involved, their partitioning/sharding, available indexes, and statistics. The planner will create an execution plan possibly with distributed operators. For example, a plan might include scanning partition 1 on StorageNodeA and partition 2 on StorageNodeB, then joining them. The planner will decide what can be pushed down. It will likely choose to push filters and projections to storage, maybe even push some partial aggregation or index usage. The plan is optimized considering data distribution (similar to a cost-based optimizer that is partition-aware).

  • Query Coordinator: The compute node breaks the plan into sub-tasks. It may create a query fragment for each storage node (like “StorageNodeA: scan table T partition X with these filters”). It then sends these requests out (potentially in parallel). If some operations need to happen in the compute tier (like assembling a hash join across partitions), the compute node will do that after receiving data from storages. We will use an MPP (massively parallel processing) style execution where possible: all storage nodes can execute concurrently, and the compute node can pipeline the merging of results.

  • Transaction Management: Compute nodes also coordinate transactions. If a transaction involves updates/inserts, the compute node will ensure those changes go to the correct storage shards. It might use a two-phase commit across multiple storage nodes if a transaction spans shards. Given we have replication (each shard with a mirror), the compute node’s transaction manager might integrate with a consensus protocol. One approach is using a separate consensus service (like etcd/Raft) to manage a global transaction log or to manage failover of shard primaries. But to keep it simpler: each transaction’s commit will require that all involved storage nodes (and their replicas) confirm the commit. The compute node sends commit messages and waits for acknowledgments (achieving quorum as needed). For strictly read-only transactions or single-shard transactions, the coordination is simpler (one node can commit locally).

  • Caching and Buffering: Compute nodes could cache frequently accessed data in memory to avoid repeated calls to storage nodes. For instance, small dimension tables or results of recent queries might be cached. We could also implement a result cache (Snowflake does this at the global level) where if the same query is repeated, the compute node can return the cached result immediately. Additionally, intermediate results for large joins might be buffered at compute side if needed (though ideally distributed joins will stream data as it arrives).

  • Interface/Protocol Handling: The compute node speaks client protocols (PostgreSQL protocol, etc.). It handles authentication, session management, and query parsing for each connection. It may also manage pooling of connections or threads to serve many clients efficiently.

  • Fault tolerance for compute: If a compute node fails mid-query, clients connected to it will fail, but no data is lost since storage has the data. We could run multiple compute nodes behind a router such that if one goes down, clients reconnect to another. Compute nodes are stateless in terms of data, though not stateless in terms of active transactions. We might need to transfer transaction coordination if a coordinator fails (this is complex – one might consider a protocol to have a standby coordinator take over transactions if needed, or simply ask the client to retry the transaction). Since compute nodes can be quickly restarted without data recovery (they don’t hold primary data), we focus more on storage node fault tolerance and treat compute more elastically.

Storage Nodes (Data Storage and Processing Layer)

The Storage Nodes are the workhorses handling data persistence and low-level query processing. Each storage node is responsible for a subset of the data, typically determined by shard assignments and partition ranges. Key aspects of storage nodes include:

  • Data Storage and Retrieval: Storage nodes manage on-disk (and in-memory) storage of tables. They handle writes (inserts/updates/deletes) to their portion of the data, ensuring they are written to durable storage (using techniques like write-ahead logging for recovery). We will likely use a storage engine that supports multi-model data: primarily relational tables (with rows, columns, indexes) and also vector indexes. Each storage node could use a combination of storage techniques: a row-oriented store for recent or small tables and a columnar store for large tables or partitions in analytic mode (like a hybrid store). The storage engine will maintain B+ tree indexes or LSM-trees for primary keys and secondary indexes as needed on that node, and also maintain the zone maps (min/max metadata) for columnar segments to speed up scans.

  • Smart Scan Execution: When the compute layer pushes down an operation, the storage node’s execution engine interprets it. For a scan request, the storage node will locate the relevant table partition files, then apply predicate filtering and column projection as it reads. This means reading each relevant block (micro-partition or compression unit), using metadata to skip blocks that don’t qualify (predicate pushdown at the storage level), and only extracting the needed columns. This was detailed in the Smart Scan section, and at runtime the storage node might break a request into reading multiple blocks in parallel (if it has multiple CPU cores, it can parallelize the scan of a single partition). The storage node can also perform partial aggregation: if the compute node requests an aggregate (SUM, COUNT, etc.) with a GROUP BY and that group key is within the partition, the storage node can compute a local aggregate. For example, if we’re grouping by “region” and each storage node has data for many regions, it can produce sum per region for its data. These local results will be sent to compute for final merging (compute will sum the sums from each node for the same region, for instance).

  • Local Join and Bloom Filters: If a query involves a join where one side of the join (e.g., a dimension table) is small enough, the compute node might ship a filtered list or a Bloom filter to the storage node to apply during scan of the fact table. Exadata storage cells do this for star schema joins, sending Bloom filters generated from the dimension table keys to storage, so that storage can quickly discard rows that won’t join[49]. We plan similar functionality: the compute node can send a Bloom filter or a list of keys to a storage node along with the scan request. The storage node’s scan loop then checks the Bloom filter for the join key of each row and only returns those that pass (thus reducing data sent over).

  • Transactionality and Concurrency Control: Each storage node must implement a concurrency control mechanism (likely MVCC – Multi-Version Concurrency Control) for the data it handles. Since different transactions could be updating different shards concurrently, we isolate at the shard level. For any single storage node, it will handle multiple transactions (some initiated locally, some distributed). Using MVCC, we can allow reads to not block writes and vice versa by keeping multiple versions of data in case a transaction is reading an old snapshot. The storage node works with the compute node’s transaction coordinator to manage locks or version validations at commit time if needed. For distributed transactions, a two-phase commit will involve storage nodes preparing a transaction (locking necessary rows or ensuring redo logs flushed) and then committing when told. If using MVCC, perhaps a timestamp ordering or global transaction ID could help align visibility across nodes.

  • Replication and Failover: Each piece of data (shard/partition) is stored on two storage nodes (primary and its replica). Storage nodes thus also handle replication: when a write comes in to the primary, it will apply the change and also send the redo/log or data to the replica storage node. The replica applies the change, and acknowledgment flows back. This could be synchronous replication (for strong consistency) or semi-synchronous. We aim for at least one replica to acknowledge before commit (to satisfy the “quorum” idea that a transaction commit is safe if mirrored). If a storage node fails, its replica can take over. We would have a failover protocol: either an automated leader election (like Raft) per shard or via the compute node detecting failure and routing requests to the replica. The system should quickly remap shards to new nodes if needed. Re-replication (to restore a second copy) will happen in the background by copying data to a new node.

  • Vector Index Serving: Some storage nodes (or all, depending on configuration) will also manage vector data indexes. They might load the vector index structure in memory for fast search. On a similarity search request from compute, a storage node will execute the search on its local index (possibly using multiple threads, since vector search algorithms can themselves be parallelized by splitting the index) and return the top-K results (vectors or their IDs with distances) to the compute node.

  • Storage Node API: The storage nodes essentially expose an API to the compute nodes – not publicly to end users, but internally. They might listen on a certain port for incoming “query fragment” requests from compute nodes. This API might be implemented as an RPC protocol over TCP or even utilize something like gRPC if that doesn’t add too much overhead (given it’s all internal communication, a binary efficient protocol is preferred). The requests will include enough info for the storage node to do its job without further global knowledge (compute will, for example, include a transaction ID and snapshot timestamp for the read, so the storage node can ensure it reads a consistent snapshot for that transaction’s context).

In short, the Storage Layer provides distributed storage with local query processing and works under the direction of the Compute Layer. It focuses on data locality and throughput, utilizing techniques like predicate pushdown, compression, and indexing to serve the compute layer with exactly the data needed, as quickly as possible.

Sharding and Replication Strategy

Data in the database is divided by shards, with each shard being a unit of distribution and replication. A shard could correspond to a subset of the data based on a hash or range of a key (for example, customer ID hash modulo 8, or a date range). We will have a metadata service (possibly maintained by the compute nodes) that maps each table to its shards and knows which storage node is responsible for each shard (and which node is the replica).

Sharding (Horizontal Partitioning): As described earlier, each table can be sharded on a shard key. All rows with a given shard key value (or range) belong to the same shard and thus the same storage node (unless split further by partitions, in which case partitioning is a second-level division within a shard). We will likely choose a hash-based sharding by default, because it balances load evenly and is simpler to manage. However, we may allow range sharding for scenarios where data locality by range is useful (like time-series). The number of shards can be high – even more than the number of storage nodes, because we might allocate multiple shards per node for flexibility in rebalancing. If we have 10 storage nodes, we might choose 40 shards so that on average ~4 shards per node and if we add an 11th node we can move a couple of shards to it. This practice avoids having to split data during node addition; we just move whole shards.

Replication (Mirroring): Each shard will have two copies (to start with) – one primary and one secondary (mirror). The primary is the one that handles all write operations; the secondary is kept in sync. The term quorum suggests that we might require both copies to acknowledge a transaction (in a 2-replica setup, that essentially means synchronous replication). However, with just 2 copies, the quorum of 2 is needed for commit to avoid losing data if one fails right after commit. So yes, we plan synchronous replication to the mirror: a transaction’s changes are not committed until they are on both the primary and replica. In case of a network partition or a node failure, if the primary fails, the system can promote the replica to become the new primary (and ideally bring up a new second replica somewhere to restore redundancy).

We also might consider having a small consensus group per shard using an algorithm like Raft or Paxos with 3 replicas, which would allow majority quorum (2 of 3) and can tolerate one failure without stopping writes. But the user explicitly said one mirrored copy for quorum, which sounds like 2 total. So we stick to two copies for now. This means in failure scenarios, we temporarily lose the ability to commit new writes for that shard until the surviving copy is recognized and a new replica is attached or it is decided to run single-copy (which is risky). Likely we will use a shard manager component (running as part of the compute tier perhaps) to detect a down node (via heartbeat) and then mark its shards’ replicas as primaries. If the old primary returns, we have to reconcile (probably it would rollback any incomplete transactions and sync from the new primary).

Consistency Model: We aim for strong consistency (linearizable or at least sequential consistency for transactions) across replicas. That means when a transaction commits, any subsequent reads (even on another node) will see that data. This is ensured by the synchronous replication and the compute node coordinating transactions. For reads, compute nodes will generally read from primaries (since primaries have latest data). We could allow read-load to be spread to replicas if they are synchronized and if the transaction is OK with slightly stale data, but that complicates things, so initially, we will use the replicas mainly for failover, not for active read-scaling. (In future maybe a follower-read mode could be allowed for certain analytics that can tolerate replication lag.)

Partitioning vs Sharding: The database supports both, which can be hierarchical. For example, a table could be sharded by customer ID across nodes, but within each shard (customer range), the table could be partitioned by date. The storage node then would have multiple partitions of the table in its local storage. The compute node’s query planning will utilize both levels: first route to only the relevant shards (e.g., only the shard for customer X), then within that shard, include the partition prune (e.g., only scan partition for 2024 data). Partition pruning is done by the storage node using local metadata (like an index of min/max values per partition, similar to Snowflake’s micro-partitions or just classic partition metadata). So effectively, sharding is coarse-grained distribution and partitioning is fine-grained segmentation within a shard. We use both to minimize work: Only relevant shards are involved in a query, and those shards’ storage nodes will only read relevant partitions.

Metadata Management: We will maintain a global catalog that includes information about shards, their key ranges or hash assignments, and their replica locations. Compute nodes will have a copy of this or access to it. Possibly we designate a particular service or one of the compute nodes as a “meta leader” that updates the shard map when rebalancing or failures happen. This map needs to be consistent for all compute nodes, so perhaps a small distributed consensus is needed for metadata. In initial design, we might centralize that role to a single node (with a backup) for simplicity, but ensure it’s highly available.

Rebalancing and Elasticity: When we add a new storage node, we can move some shards (their data) to it. This would involve copying one of the replica’s data to the new node and then making it the primary or something along those lines. Similarly, if a node is overloaded, we can migrate a shard to a less loaded node (live migration might be complex, but we can quiesce the shard or do a controlled handoff). The architecture should permit these operations without downtime on other shards.

Query Routing: Given a SQL query, the compute node will figure out which shards contain the relevant data. For a point lookup by primary key, it knows exactly which shard (via hashing or range lookup) and can send the request only to that storage node. For a broad query (e.g., full table scan without a limiting filter on the shard key), the compute node will dispatch work to all shards (scatter). For an intermediate case (e.g., a range query on a partition key that doesn’t align with the shard key), it may have to send to multiple shards but not all (for instance, a query for customer IDs between A and B might touch only some hash partitions or some range shards).

Failure Handling: In the event a storage node goes down, the compute nodes will detect the lost heartbeat. They will consult the shard map to see which shards were on that node, then mark its replicas as active. If the failed node was a primary for some shards, their secondaries become the new primaries. Compute nodes should then route future requests to those new primaries. If the failed node had secondaries for some shards, those shards are temporarily single-copy until we re-establish a replica (perhaps on a newly allocated node or an existing one with space). We’ll prioritize bringing back to two replicas to maintain fault tolerance.

Query Execution Flow (Put Together)

To illustrate the interplay of these components, consider an example query and walk through execution:

Example: SELECT c.name, SUM(o.amount) FROM orders o JOIN customers c ON o.cust_id = c.id WHERE o.order_date >= ‘2025-01-01’ AND c.region = ‘EUROPE’ GROUP BY c.name;

Preparation: Assume customers and orders are sharded by cust_id. Also assume customers is a reference or replicated table (small enough to be on all nodes) or co-sharded with orders by cust_id. If co-sharded, then customers with a certain cust_id live on the same node/shard as that customer’s orders. This is ideal for local joins.

  • The client sends this SQL to a Compute Node.

  • The Compute Node parses and plans:

  • It recognizes a join between customers and orders on cust_id (the shard key). If co-sharded, it realizes it can perform the join per shard locally. It also sees a filter c.region = ‘EUROPE’ which could be applied either at the customers side.

  • The planner might do this: push down the join to storage if possible (some systems allow pushing joins, but often we do it in compute. We might instead push down a semi-join filter).

  • It definitely will push down the order_date >= ‘2025-01-01’ filter to the orders scan, and c.region = ‘EUROPE’ to customers scan.

  • One strategy: since customers might be smaller, the compute node could retrieve the list of customer IDs in Europe first (by querying the customers table or using a metadata index if available). Suppose it finds customer IDs [101, 205, 333, …] belong to Europe. Then it could push those as a filter to orders (this is a form of Bloom filter or list).

  • Or if customers is replicated on all nodes, compute node can send the region filter to each storage node to only join with matching customers.

  • The plan likely ends up as: for each shard, do a local join of orders and customers (with region filter on customers and date filter on orders), produce partial group-by results (SUM per customer name). Then compute node will aggregate those partial sums across shards and output.

  • The Compute Node then issues subqueries to storage:

  • It identifies which shards have data from 2025 onward and Europe customers. If shard key is cust_id, region filter doesn’t map directly to shards (unless our system has partitioned by region secondarily). So it likely has to query all shards, but we can include the region filter in each subquery.

  • For each shard (or for each storage node), the compute node sends a request: “Perform a join of customers and orders where customers.region=‘EUROPE’ and orders.date >= 2025-01-01 and customers.id = orders.cust_id, then compute SUM(o.amount) GROUP BY c.name”.

  • This is a complex pushdown; realistically we might break it: fetch qualifying customer IDs from customers then use them in orders. But Exadata can actually offload some join work with Bloom filters. We can do similar: first, compute node obtains a Bloom filter of all European customer IDs (maybe by scanning customers table globally or since customers might be small, even compute has them cached). Then include that Bloom filter in the request to each storage node for orders.

  • The storage node receives the scan request for orders (for its shard’s portion of orders). It applies: date >= 2025 filter, and then checks each order’s cust_id against the Bloom filter (to ensure the customer is European). It doesn’t actually join to the customer table rows because presumably the customer name is needed. If customers is replicated, the storage node could have a local copy of customers to get c.name. Perhaps easier: after getting qualifying orders, the compute node can join with customers to get names, unless we also push name down by having storage node look it up (if local copy exists).

  • Let’s assume customers table is small and replicated, so each storage node actually has a full copy (or at least an index by cust_id -> name, region). Then the storage node can indeed perform the join: for each order that passes the date filter, it looks up the customer record (in-memory, since small) and checks region = ‘EUROPE’. If matches, it takes the customer name and the order amount.

  • The storage node then accumulates a running sum of amount per customer name (local group-by). This uses very little memory if there are not many customers per shard, but it could also just output (cust_name, amount) pairs which the compute node will group. Better to group locally to reduce data if one customer has many orders.

  • After processing all relevant orders, the storage node sends back the result, e.g.: [(“Acme Corp”, 12345.67), (“Beta Ltd”, 8910.11), …] for that shard.

  • The Compute Node receives these partial results from all shards (maybe as they stream in). It then performs a final aggregation: it groups by customer name across all shards and sums the totals. Then it sorts or formats the output if needed.

  • Compute sends the final result to the client.

This flow highlights how the compute-storage cooperation works, with compute doing global tasks and storage doing partition-local tasks. It also shows the importance of co-locating data or replicating small tables to push down joins.

Fault Tolerance and High Availability

The system’s high-availability design revolves around replication and failover at the storage layer, as well as statelessness (or easy state recovery) at the compute layer:

  • Storage Failover: If a storage node fails, the shards it was primary for will automatically fail over to their replicas. This could be coordinated by a small consensus algorithm or simply by the compute node noticing a timeout. Each shard pair (primary-secondary) might run a lightweight heartbeat; the secondary can detect primary failure and promote itself, or the compute’s metadata manager can decide promotion. Once a new primary is set, all compute nodes update their shard map so future requests go to the new primary. Meanwhile, a replacement for the down node can be brought up and it will receive data from the surviving node to re-establish a second copy (this might involve copying all data of that shard – which is heavy – but necessary for full redundancy).

  • Compute Node Failover: If a compute node goes down, any in-progress queries on it are lost (the client will retry or get an error). But the data remains safe. Clients can reconnect to another compute node in a pool. We might have a load balancer or a virtual IP to route connections to available compute nodes. We should ensure that all compute nodes have access to the same metadata and that transactions can be handled by any of them. Some transactions that were mid-flight may have partially executed on storage nodes; those storage changes would be under a transaction ID and if commit never comes, they will eventually be aborted (we might implement transaction timeout or a recovery process for orphaned prepared transactions).

  • Durability and Recovery: Each storage node will use a write-ahead log so that if it crashes and comes back, it can recover in-memory state (or if both primary and replica crash, we have logs to recover from at least one). In a replicated setting, we could also rely on the replica as an up-to-date copy. After a crash, the node should sync from its partner to catch up on missed writes. Our design likely will use logical or physical logs to simplify replication. Possibly a consensus log approach where the primary writes to a log that both it and replica consume to apply changes.

  • Scaling: For scaling out, we add new storage nodes and redistribute shards. This might be done with minimal impact: we could replicate one of an existing shard to the new node, then switch roles (so the new node becomes a secondary or primary for that shard). Compute nodes just need the updated shard map. For compute node scaling, we simply start another compute node process and register it to the cluster – since they share no persistent state except metadata, this is straightforward. They might need to warm up caches or so, but otherwise, it’s easy to add compute capacity.

  • Network Layer: We have to be mindful of network usage. The system will likely be deployed in a data center or cloud environment with high-speed networking. For large data flows (like full table scans), the network can become a bottleneck. We mitigate this by maximizing predicate pushdown (so less data goes over network) and by using compression for data in transit if beneficial. Possibly use a protocol like gRPC or a custom binary over TCP with efficient serialization (maybe Arrow Flight for result sets, which is a columnar transfer format). Exadata uses InfiniBand with specialized protocols; in our case, standard Ethernet with optimizations will do.

  • Security: Not explicitly asked, but we should note that separating compute and storage allows for some security measures – e.g., data at rest encryption on storage nodes, and perhaps encryption in transit between storage and compute. Also, multi-tenant isolation could be achieved by assigning certain tables or shards to certain storage nodes, etc.

The system architecture can be visualized as a set of compute nodes interacting with a set of storage nodes. Each data item belongs to a shard that lives on two storage nodes. Compute nodes are the SQL brains that know how to assemble the data. This architecture is very versatile: we can run it on-premises with dedicated machines or in the cloud where storage nodes might be implemented as a service (even potentially using something like AWS S3 for storage and stateless compute nodes for compute, somewhat like Snowflake’s approach). Initially, we’ll probably tie storage nodes to local NVMe storage for high performance and treat it as a tightly-coupled cluster.

By implementing this blueprint, the new database will achieve high performance (through parallel execution and pushdowns), scalability (through sharding and adding nodes), and fault tolerance (through replication and quick failover). It integrates the best ideas from the systems researched (Exadata’s two-tier, Snowflake’s separation, Oracle/Citus sharding, etc.) into a cohesive design.


Vector Data Storage and Index Module

Vector Data Type and Indexing

A core innovative aspect of the database is native support for vector embeddings as a first-class data type alongside traditional relational data. We introduce a column type, say VECTOR(D) for a D-dimensional vector, to store high-dimensional numerical data (common in AI applications, e.g. embeddings from neural networks). Storing vectors is one part; making them searchable by similarity is another. For the storage of vector data, each storage node will handle it much like other data, but likely using a specialized format. We can store vectors in a contiguous block (array of floats) within tuple storage for small dimensions, or separate them into their own storage structures for larger dimensions to optimize memory alignment and access.

Indexing: We will integrate an Approximate Nearest Neighbor (ANN) index for vectors. Popular ANN data structures include HNSW (graph-based) and IVF (quantization-based) with product quantization for compression[50][51]. Our plan is to use HNSW (Hierarchical Navigable Small World graph) as the default index due to its strong performance in many scenarios and ability to handle dynamic inserts. Each vector column can have an index built on it (similar to how you’d build an index on a normal column). The syntax could be something like CREATE INDEX idx_vectors ON table(vector_col) USING HNSW;. The storage node upon building such an index will construct the graph (or other structure) in the background.

The index may be stored partially in memory and on disk (for large collections, perhaps a disk-based index is needed). We might adopt a strategy where a subset of the graph (like entry points or some layers of HNSW) is kept in memory for speed, and deeper parts can be loaded on demand. However, given that vector queries are typically latency-sensitive, we would encourage keeping the index in memory by default (with periodic persistence to disk for recovery). The index building process can be multi-threaded and will likely happen on the storage node when an admin or automated system triggers it. Alternatively, we maintain the index incrementally as data is inserted (HNSW supports insertion, though with some quality trade-offs, whereas something like k-means based IVF would typically require batch rebuilds for best results).

Distance Metrics: We will support common similarity metrics like cosine similarity, Euclidean distance (L2), and maybe inner product. The index can be configured for the metric. For example, cosine similarity search can be done by L2 normalization of vectors and using dot product, etc. The index structure might differ slightly by metric (HNSW and IVF can handle any metric if we supply a distance function).

Distributed Vector Storage and Sharding

For very large vector data (far beyond what one node’s memory can hold), we implement sharding for vectors as discussed. This goes hand-in-hand with the overall sharding of the table. If a table is sharded by a key, then the vector in each row naturally sits on the shard where that row sits. For example, if we have a Products table with a vector embedding for each product, and if we shard by product ID, then the vector for product X is stored on the shard responsible for product X. This way, a similarity search typically needs to search all shards to find the truly nearest neighbors globally (because any shard could have the closest vector).

However, we can optionally support sharding of the vector space itself. This is more advanced: We could partition vectors by some approach such as assigning them to clusters. One method is to do a top-level clustering of the entire vector dataset (like K-means into M clusters) and assign each vector to a cluster ID; then each cluster ID is handled by a particular shard. This means each shard holds vectors that are roughly similar (belong to the same cluster). A query vector would ideally only need to be sent to the shard(s) of the nearest cluster(s) to find good matches[43]. This approach reduces query load on all shards at the cost of a more complex indexing and potential uneven load if data is skewed. We will likely implement a simpler version: either global query to all shards by default (scatter-gather), and in a future release add an optional vector partitioning method.

So initially, any SELECT … FROM table WHERE SIMILARITY(vector_col, [query_vector]) < some_threshold … or a function like ORDER BY cosine_distance(vector_col, query_vec) LIMIT 10 will be implemented by the compute node taking the query vector and sending it to each storage node that has that table’s shard. This is done in parallel, so the latency is that of the slowest node’s search (which is okay if each node has roughly equal data). Each storage node searches its local ANN index for the top K closest vectors among its subset and returns those (with their distances). The compute node then merges these k-lists (like a multi-way merge in a top-K algorithm) to find the overall top K nearest vectors across the whole dataset. This merging is efficient if k is small (like 10 or 100). The amount of data returned from each storage is k results, which is tiny, so network cost is low for the final stage. The heavy work (comparing the query vector to many candidates) is done in parallel on each storage node.

Replication: Since each vector shard is replicated on two nodes, we should ensure only one of them actively processes a given similarity query to avoid duplicate work. The compute node can direct the query to only the current primary for each shard. The replica could be idle or doing other work. In some high-throughput scenarios, we could load-balance by sometimes querying replicas (if they’re not busy with writes) but that introduces consistency questions if a very recent insert hasn’t propagated (though we are synchronous, so in theory by commit time, replica has it). It’s simpler to use the primary. If a primary is down and a replica is now primary, the compute node just uses that transparently after failover.

One challenge is freshness: when a new vector is inserted into the table, it should be added to the index. If the index is updated in real-time, then the vector becomes searchable immediately. HNSW can handle inserts, so we might do that – each insert on storage node also inserts the vector into the HNSW graph structure. This might slow down inserts a bit (because maintaining the index has a cost), but ensures up-to-date results. Alternatively, as discussed, we can implement a two-tier approach: newly inserted vectors go into a small auxiliary index or buffer (freshness layer)[44][45]. Queries will search both the main ANN index and this buffer. The buffer could just be a brute-force list or a smaller HNSW with just recent items. Periodically (say every few minutes or hours, configurable), we merge the buffer into the main index in a batch process which is more optimized. This approach is beneficial if insertion rate is high and building incrementally is too inefficient or degrades query quality. We can allow configuration like INDEX MAINTENANCE = { REALTIME | BATCH } when creating the index.

If we do batch indexing, we must mark newly inserted vectors as “unindexed” so the query planner knows to query all shards’ main index + new vectors. Perhaps easier: treat the new vectors as part of an “overlay index” that always gets searched.

We will extend SQL or provide functions for performing vector similarity queries. Two likely methods: - A table-valued function or a special clause for nearest neighbors. For example, SELECT * FROM table ORDER BY L2_DISTANCE(vector_col, query_vec) ASC LIMIT 5; – this would find the 5 nearest vectors to a given query_vec by Euclidean distance. Or a more explicit syntax: SELECT nn.*, nn.distance FROM table, NEAREST_NEIGHBORS(table vector_col, query_vec, 5) AS nn WHERE table.id = nn.id; where the NEAREST_NEIGHBORS function returns the top 5 IDs and distances which then join back to the table. We will decide on a user-friendly syntax, possibly just overloading ORDER BY with a limit is simplest.

Under the hood, when the compute node sees such a query, it knows it’s a vector search. Instead of doing a full table scan sorted by distance (which is infeasible for large data), it will invoke the ANN index search on each storage node. The compute node will pass the query_vec (the actual numerical vector) in the request to storage nodes. The storage nodes will run the ANN search algorithm: - For HNSW: traverse the graph starting from an entry point, etc., using the query vector’s coordinates to calculate distances. They will find the local top K nearest neighbors on that node. - Or for IVF: identify relevant clusters via coarse quantizer and then look inside them. - In either case, each storage node returns a small set of candidates (id and distance, plus maybe the vector if needed or any other columns if requested). The compute node then merges these as described to find the global top K.

We must ensure the distances are comparable across shards. If using Euclidean or cosine, that’s fine – it’s the same metric globally. If using something like inner product and we do maximum instead of minimum, we adapt accordingly.

Filtering: Often one wants “nearest neighbors among a subset of vectors (like those with a certain label or property)”. Our system supports adding a WHERE clause in addition to the vector search. For example, “find the closest 10 products in the same category as X”. That means we need to filter by category = some value as well. This we can handle by either: - applying the filter after retrieving neighbors (the index might retrieve some items that get filtered out, so we might need to get more than K from each shard to have enough after filtering) - or building the filter into the search: on each storage node, when considering a candidate vector from the index, check its category; if it doesn’t match, skip it. Some vector libraries allow a callback for filtering during search. We can implement that since our storage node has the full row data or at least the indexed metadata. Pinecone’s approach is similar – they allow metadata filters with vector search. We will likely maintain for each vector in the index a reference to its metadata, and the search will only count those that pass.

Output: The result of a similarity search might include the distance or similarity score. We should allow that (like returning a column distance which is computed). We’ll compute it precisely for the final results (the ANN search gives an approximate, but usually very accurate, distance measure). If we use approximate methods, we might even refine the final candidates: e.g., for the top 100 found by ANN, we could fetch their exact vectors and compute exact distances and then sort accurately. This re-ranking step can improve precision of the result if needed.

Performance: The vector search time depends on index parameters. We will allow tuning of index quality vs speed (like HNSW ef parameter). The user could specify during index creation or query time hints for accuracy. Also, if the dataset of vectors is not huge, we might even do exact search (scan all) if under some threshold, but that’s generally not needed if ANN is configured well.

Because the vector search is in C++ (likely) on storage nodes using efficient math (with optional BLAS or SIMD acceleration for dot products), it will be quite fast per node. And because it’s parallel, the overall latency might be on the order of tens of milliseconds even for millions of vectors distributed, which is good for interactive use.

Integration with Relational Engine

One of the strengths of our approach is that vectors live alongside other relational data. This means users can do hybrid queries that combine traditional filters and joins with vector similarity. The system architecture supports this seamlessly: since storage nodes can handle both types of operations, a single query can push down both a vector search and a relational filter.

For example, consider a query: “Find the 5 most similar images to this example that were uploaded in the last week by users from Europe.” Suppose we have a table Images(image_id, user_id, upload_date, embedding VECTOR(128)) and a Users(user_id, region) table. This query involves a vector search on Images.embedding plus a join/filter with Users.region = ‘Europe’ and upload_date filter. The planner might do: - Use the vector index on Images.embedding to get candidate images (perhaps retrieve top 50 similar first). - Join those candidate image IDs with Users to filter by region and with the upload_date condition. - Then take the top 5 of the remaining.

Alternatively, we could push down more to storage: - If images are sharded by user_id, co-sharded with Users, a storage node can check region easily. - Compute could send vector query to all shards with an added condition user_region = Europe and upload_date > X. If the storage node has the Users table reference or at least a mapping from user->region (maybe by a pre-join or because users could be a reference table), it could directly filter those out during search or right after. - It might be simpler though for compute to do: get neighbors first (unfiltered), then do the join+filter on those few results. Because filtering after retrieving 50 per shard (say 500 total) is trivial to do on compute. This is an example of a situation where not all filters are pushed down because it might be wasteful to do so for each distance computation, and easier to filter the short list of candidates.

In any case, we will ensure that the compute layer can mix vector search plan nodes with normal plan nodes. This could involve adding new operators in the query plan, like a “Vector Index Scan” operator that returns approximate nearest neighbor IDs, which then feed into a “join” or “filter” operator.

SQL Extensions: We might add a function for distance computation that can be used in SELECT or ORDER BY. Also possibly a CREATE INDEX … TYPE IVFFLAT etc., for index creation. We will document these clearly so users know how to utilize the vector features.

Maintaining Consistency and Freshness

Ensuring that the vector indexes and data remain consistent is important. Because we treat vector columns as part of the table, any insert, update, or delete that affects vectors should be transactionally consistent with the rest of the data. That is, if a transaction inserts a new row with a vector and commits, then immediately a similarity search should be able to find that vector (assuming commit was successful). With synchronous replication, the vector will be on the primary and replica nodes upon commit.

If we do choose a batch indexing approach, then there’s a slight divergence: the vector exists but isn’t in the main index yet. We mitigate with the aforementioned freshness layer: the new vector is in the “recent additions” list that is searched (linearly or via a small index) for queries. So you still find it. It might be slightly slower to search recent ones (because maybe it’s a linear search among the last N additions), but if N is bounded (like last few thousand vectors) it’s not a problem.

For deletes, we need to delete vectors from the index. HNSW and others can remove vectors, though sometimes not as cleanly – we might just mark as deleted (lazy deletion) and skip them during search, and rebuild the index occasionally to purge deleted entries. For updates (vector value changes), it’s like a delete+insert in the index.

We will also ensure that if a transaction aborts, any index entries added for it are removed or never made visible. This usually means we don’t publish an insert to the ANN index until commit. Or if we do, we hide it behind a transaction visibility check (which is complicated inside an ANN search). More straightforward: do not insert into ANN until commit is confirmed; meanwhile, store uncommitted new vectors in a side buffer (which is anyway needed for freshness logic). On commit, move them to the main index (or to the committed buffer if we do periodic indexing). On abort, discard them.

Because our vector search is approximate, consistency also has a different facet: it’s not guaranteed to always find the absolute true nearest neighbor if using ANN. But that’s a trade-off the user accepts for speed. We will provide as high accuracy as feasible with proper tuning, but it’s not a transactional “consistency” issue per se, more of an algorithmic one.

Module Design for Vector Operations

Internally, we treat the Vector Index Module as a component of the storage engine. One could conceptualize it as a pluggable module: for example, a storage node could have a sub-system specifically for vector indexing that can be updated and queried independently. We might even isolate it in terms of threads – e.g., a set of worker threads or a process that handles vector search requests so that a heavy vector search doesn’t starve regular OLTP operations on the node. Alternatively, we integrate at the query execution level with proper scheduling (like given a vector query, allocate threads to it, but still leave some for other tasks).

The vector module could reuse existing libraries like Facebook’s FAISS for core algorithms, which is C++ and has optimized implementations. That could speed up development. FAISS can do HNSW, IVF, PQ, etc., and is well-tested. If integrated, we would feed it the data and it would handle search computations. We’d still need to handle distribution logic outside it.

Best storage distribution for vectors essentially boils down to: - Partitioning (sharding) vectors across nodes to scale horizontally. - Possibly partitioning within each index to minimize search per query (focusing search). - Replicating for fault tolerance. - Ensuring near-linear scalability in data size and parallel query capacity.

We also note that vector data can be large (each vector might be hundreds of floats = a few hundred bytes). Thousands or millions of such vectors can be several GBs. We should compress vectors on disk if possible. Some techniques: storing them as INT8 (quantization) if some precision loss is acceptable, or using product quantization inside the index. FAISS and others support compressing the vectors to bytes for faster distance computation (this is essentially what PQ does[51]). We might allow an index to be built with compression to save memory at cost of some accuracy.

Usage Example: - A user creates a table with a vector column and inserts data (vectors). - They issue a similarity search query; the system returns the nearest items quickly. - If they also filter by some metadata, it works; if they join with other tables, it also works. - All this happens with the same SQL interface – no separate vector search system needed.

By incorporating this module, our database becomes a multi-model system that can handle vector search (usually the domain of specialized engines like Pinecone or Weaviate) and relational queries in one platform. This is a strong differentiator: for instance, MySQL HeatWave is adding a vector store[26], and we are designing ours from the ground up to have it integrated. This means less ETL and complexity for developers (they don’t need to run a separate vector DB for AI features).

In conclusion, the vector storage and index module is carefully designed to complement the overall architecture: - It operates at the storage layer for speed, using efficient C++ routines. - It works in tandem with sharding and replication strategies. - It exposes functionality to the compute layer so that SQL queries can invoke vector searches. - It remains consistent with the transactional semantics of the database. - And it leverages state-of-the-art algorithms for fast similarity search, ensuring our database is not only capable of traditional OLTP/OLAP tasks but is also AI-ready, able to power use-cases like recommendation systems, semantic search, and more in real-time.


[1] [2] [3] [7] [49] Offloading Data Search and Retrieval Processing

https://docs.oracle.com/en/engineered-systems/exadata-database-machine/sagug/offloading-data-search-and-retrieval-processing.html

[4] [5] [6] [8] Hybrid Columnar Compression | Oracle Exadata Database Machine | Oracle Technology

https://www.oracle.com/database/technologies/exadata/software/hcc/

[9] [10] [11] [12] [13] [14] Oracle Sharding Overview

https://docs.oracle.com/en/database/oracle/oracle-database/19/shard/sharding-overview.html

[15] [16] [17] [18] Guide to Read Data Efficiently in LeanXcale :: LeanXcale Documentation

https://docs.leanxcale.com/leanxcale/v2.5/developer/readinLX/index.html

[19] [20] Table Co-Location — Citus Docs 7.2 documentation

https://docs.citusdata.com/en/v7.2/sharding/colocation.html

[21] Database Table Types with Citus and Postgres

https://www.citusdata.com/blog/2017/07/27/database-table-types-with-citus-and-postgres/

[22] [23] [27] MySQL Analytics with HeatWave | NeST Digital

https://nestdigital.com/blog/mysql-analytics-with-heatwave/

[24] [25] [29] [47] [48] MySQL HeatWave Features | Oracle

https://www.oracle.com/heatwave/features/

[26] [28] Oracle’s MySQL HeatWave gets Vector Store, generative AI features | InfoWorld

https://www.infoworld.com/article/2334840/oracle-s-mysql-heatwave-gets-vector-store-generative-ai-features.html

[30] [31] [32] [33] [34] [35] [36] [37] Micro-partitions & Data Clustering | Snowflake Documentation

https://docs.snowflake.com/en/user-guide/tables-clustering-micropartitions

[38] [39] [40] [41] [42] [43] [44] [45] [46] [50] [51] What is a Vector Database & How Does it Work? Use Cases + Examples | Pinecone

https://www.pinecone.io/learn/vector-database/