Skip to content

Document Store: Integration

Document Store: Integration

Part of: HeliosDB Document Store User Guide

MongoDB Drivers

Rust:

Cargo.toml
[dependencies]
mongodb = "2.8"
tokio = { version = "1", features = ["full"] }
serde = { version = "1.0", features = ["derive"] }
use mongodb::{Client, options::ClientOptions};
use serde::{Deserialize, Serialize};
#[derive(Debug, Serialize, Deserialize)]
struct User {
name: String,
age: i32,
email: String,
}
#[tokio::main]
async fn main() -> mongodb::error::Result<()> {
let options = ClientOptions::parse("mongodb://localhost:27017").await?;
let client = Client::with_options(options)?;
let db = client.database("mydb");
let collection = db.collection::<User>("users");
let user = User {
name: "Alice".to_string(),
age: 30,
email: "alice@example.com".to_string(),
};
collection.insert_one(user, None).await?;
Ok(())
}

Python:

from pymongo import MongoClient
client = MongoClient("mongodb://localhost:27017")
db = client.mydb
collection = db.users
# Insert
user = {
"name": "Alice",
"age": 30,
"email": "alice@example.com"
}
collection.insert_one(user)
# Find
users = collection.find({"age": {"$gte": 25}})
for user in users:
print(user)
# Update
collection.update_one(
{"name": "Alice"},
{"$set": {"age": 31}}
)
# Delete
collection.delete_one({"name": "Alice"})

JavaScript (Node.js):

const { MongoClient } = require('mongodb');
async function main() {
const client = new MongoClient("mongodb://localhost:27017");
try {
await client.connect();
const db = client.db("mydb");
const collection = db.collection("users");
// Insert
await collection.insertOne({
name: "Alice",
age: 30,
email: "alice@example.com"
});
// Find
const users = await collection.find({ age: { $gte: 25 } }).toArray();
console.log(users);
// Update
await collection.updateOne(
{ name: "Alice" },
{ $set: { age: 31 } }
);
// Delete
await collection.deleteOne({ name: "Alice" });
} finally {
await client.close();
}
}
main();

SQL JSONB Functions

Query Documents via SQL:

-- PostgreSQL JSONB operators work directly
-- Get document field
SELECT data->>'name' AS name FROM users_doc;
-- Filter by field
SELECT * FROM users_doc WHERE data->>'age' = '30';
-- Filter with JSONB operators
SELECT * FROM users_doc WHERE data @> '{"role": "developer"}';
-- Nested field access
SELECT data->'address'->>'city' AS city FROM users_doc;
-- Array contains
SELECT * FROM users_doc WHERE data->'skills' ? 'Rust';
-- JSON path query
SELECT * FROM users_doc WHERE data @@ '$.age > 25';
-- Aggregation
SELECT
data->>'role' AS role,
COUNT(*) AS count,
AVG((data->>'age')::int) AS avg_age
FROM users_doc
GROUP BY data->>'role';

HeliosDB SQL Extensions:

-- Create document collection (table)
CREATE TABLE users_doc (
id TEXT PRIMARY KEY,
data JSONB NOT NULL,
created_at TIMESTAMP DEFAULT NOW(),
updated_at TIMESTAMP DEFAULT NOW()
);
-- Create JSONB index
CREATE INDEX idx_users_email ON users_doc USING GIN ((data->>'email'));
-- Full-text search on JSONB
CREATE INDEX idx_users_search ON users_doc USING GIN (to_tsvector('english', data->>'bio'));
SELECT * FROM users_doc
WHERE to_tsvector('english', data->>'bio') @@ to_tsquery('database & programming');

REST API

Expose Document Store via REST:

use axum::{
extract::{Path, Query, State},
http::StatusCode,
routing::{get, post, put, delete},
Json, Router,
};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
struct AppState {
store: Arc<DocumentStore>,
}
#[derive(Deserialize)]
struct CreateDocumentRequest {
collection: String,
data: serde_json::Value,
}
// POST /api/documents
async fn create_document(
State(state): State<Arc<AppState>>,
Json(req): Json<CreateDocumentRequest>,
) -> Result<Json<serde_json::Value>, StatusCode> {
let id = DocumentId::new(&format!("doc_{}", uuid::Uuid::new_v4()));
let doc = state.store.insert(
&Collection::new(&req.collection),
&id,
req.data
).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
Ok(Json(json!({
"id": id.as_str(),
"data": doc.data
})))
}
// GET /api/documents/:collection/:id
async fn get_document(
State(state): State<Arc<AppState>>,
Path((collection, id)): Path<(String, String)>,
) -> Result<Json<serde_json::Value>, StatusCode> {
let doc = state.store.get(
&Collection::new(&collection),
&DocumentId::new(&id)
).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
match doc {
Some(d) => Ok(Json(d.data)),
None => Err(StatusCode::NOT_FOUND),
}
}
// PUT /api/documents/:collection/:id
async fn update_document(
State(state): State<Arc<AppState>>,
Path((collection, id)): Path<(String, String)>,
Json(update): Json<serde_json::Value>,
) -> Result<StatusCode, StatusCode> {
state.store.update(
&Collection::new(&collection),
&DocumentId::new(&id),
update
).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
Ok(StatusCode::OK)
}
// DELETE /api/documents/:collection/:id
async fn delete_document(
State(state): State<Arc<AppState>>,
Path((collection, id)): Path<(String, String)>,
) -> Result<StatusCode, StatusCode> {
state.store.delete(
&Collection::new(&collection),
&DocumentId::new(&id)
).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
Ok(StatusCode::NO_CONTENT)
}
#[tokio::main]
async fn main() {
let store = DocumentStore::new("./data").unwrap();
let state = Arc::new(AppState {
store: Arc::new(store),
});
let app = Router::new()
.route("/api/documents", post(create_document))
.route("/api/documents/:collection/:id", get(get_document))
.route("/api/documents/:collection/:id", put(update_document))
.route("/api/documents/:collection/:id", delete(delete_document))
.with_state(state);
axum::Server::bind(&"0.0.0.0:3000".parse().unwrap())
.serve(app.into_make_service())
.await
.unwrap();
}

GraphQL Integration

use async_graphql::{Context, Object, Schema, EmptyMutation, EmptySubscription};
struct Query;
#[Object]
impl Query {
async fn user(&self, ctx: &Context<'_>, id: String) -> Result<User, Error> {
let store = ctx.data::<DocumentStore>()?;
let doc = store.get(
&Collection::new("users"),
&DocumentId::new(&id)
)?;
match doc {
Some(d) => Ok(serde_json::from_value(d.data)?),
None => Err("User not found".into()),
}
}
async fn users(
&self,
ctx: &Context<'_>,
role: Option<String>,
) -> Result<Vec<User>, Error> {
let store = ctx.data::<DocumentStore>()?;
let filter = if let Some(r) = role {
Filter::eq("role", json!(r))
} else {
Filter { op: FilterOp::All }
};
let docs = store.find(&Collection::new("users"), filter)?;
let users = docs.into_iter()
.map(|d| serde_json::from_value(d.data))
.collect::<Result<Vec<User>, _>>()?;
Ok(users)
}
}
type MySchema = Schema<Query, EmptyMutation, EmptySubscription>;
async fn graphql_handler(
schema: Extension<MySchema>,
req: GraphQLRequest,
) -> GraphQLResponse {
schema.execute(req.into_inner()).await.into()
}


Navigation: ← Previous: Performance Optimization | Back to Index | Next: Migration from MongoDB →