Skip to content

Pinecone Vector Examples

Pinecone Vector Examples

Practical examples for HeliosDB’s Pinecone vector protocol support.

Connection Examples

Python Client

from pinecone import Pinecone
# Connect to HeliosDB
pc = Pinecone(
api_key="your-api-key",
host="http://localhost:8080"
)
# Access index
index = pc.Index("my-vectors")

REST API

Terminal window
# Using curl
curl -X POST "http://localhost:8080/vectors/v1/query" \
-H "Api-Key: your-api-key" \
-H "Content-Type: application/json" \
-d '{
"vector": [0.1, 0.2, 0.3, ...],
"topK": 10
}'

Node.js Client

const { Pinecone } = require('@pinecone-database/pinecone');
const pc = new Pinecone({
apiKey: 'your-api-key',
environment: 'localhost:8080'
});
const index = pc.index('my-vectors');

Upsert Operations

Single Vector

# Upsert single vector
index.upsert(vectors=[
{
"id": "vec1",
"values": [0.1, 0.2, 0.3, 0.4, 0.5],
"metadata": {
"category": "electronics",
"price": 99.99,
"in_stock": True
}
}
])

Batch Upsert

# Upsert multiple vectors
vectors = [
{
"id": f"vec_{i}",
"values": [random.random() for _ in range(1536)],
"metadata": {"category": f"cat_{i % 10}"}
}
for i in range(1000)
]
# Upsert in batches
index.upsert(vectors=vectors, batch_size=100)

With Namespace

# Upsert to specific namespace
index.upsert(
vectors=[
{"id": "prod_1", "values": [...], "metadata": {...}}
],
namespace="production"
)
# Upsert to different namespace
index.upsert(
vectors=[
{"id": "test_1", "values": [...], "metadata": {...}}
],
namespace="testing"
)

Query Operations

Basic Query

# Simple similarity search
results = index.query(
vector=[0.1, 0.2, 0.3, ...],
top_k=10
)
for match in results.matches:
print(f"ID: {match.id}, Score: {match.score}")

Query with Metadata

# Include metadata in results
results = index.query(
vector=[0.1, 0.2, 0.3, ...],
top_k=10,
include_metadata=True,
include_values=True
)
for match in results.matches:
print(f"ID: {match.id}")
print(f"Score: {match.score}")
print(f"Metadata: {match.metadata}")
print(f"Values: {match.values[:5]}...")

Query with Filter

# Filter by metadata
results = index.query(
vector=[0.1, 0.2, 0.3, ...],
top_k=10,
filter={
"category": {"$eq": "electronics"},
"price": {"$lt": 100}
},
include_metadata=True
)

Complex Filters

# AND filter
results = index.query(
vector=[...],
filter={
"$and": [
{"category": {"$eq": "electronics"}},
{"price": {"$gte": 50}},
{"in_stock": {"$eq": True}}
]
}
)
# OR filter
results = index.query(
vector=[...],
filter={
"$or": [
{"category": {"$eq": "electronics"}},
{"category": {"$eq": "computers"}}
]
}
)
# IN filter
results = index.query(
vector=[...],
filter={
"category": {"$in": ["electronics", "computers", "phones"]}
}
)

Fetch Operations

Fetch by ID

# Fetch single vector
result = index.fetch(ids=["vec1"])
print(result.vectors["vec1"])

Fetch Multiple

# Fetch multiple vectors
result = index.fetch(ids=["vec1", "vec2", "vec3"])
for id, vector in result.vectors.items():
print(f"ID: {id}, Values: {vector.values[:5]}...")

Fetch with Namespace

# Fetch from specific namespace
result = index.fetch(
ids=["prod_1", "prod_2"],
namespace="production"
)

Delete Operations

Delete by ID

# Delete single vector
index.delete(ids=["vec1"])
# Delete multiple vectors
index.delete(ids=["vec1", "vec2", "vec3"])

Delete by Filter

# Delete by metadata filter
index.delete(
filter={"category": {"$eq": "outdated"}}
)

Delete All in Namespace

# Delete entire namespace
index.delete(delete_all=True, namespace="testing")

Update Operations

Update Metadata

# Update vector metadata
index.update(
id="vec1",
set_metadata={
"price": 89.99,
"on_sale": True
}
)

Update Values

# Update vector values
index.update(
id="vec1",
values=[0.2, 0.3, 0.4, ...] # New vector values
)

Hybrid Search (Sparse + Dense)

Upsert Hybrid Vectors

# Upsert with sparse and dense values
index.upsert(vectors=[
{
"id": "doc1",
"values": [0.1, 0.2, ...], # Dense embedding
"sparse_values": {
"indices": [15, 42, 156, 283], # Token indices
"values": [0.8, 0.5, 0.3, 0.2] # Token weights
},
"metadata": {"title": "Introduction to ML"}
}
])

Hybrid Query

# Query with both dense and sparse vectors
results = index.query(
vector=[0.1, 0.2, ...], # Dense query vector
sparse_vector={
"indices": [15, 42],
"values": [0.9, 0.5]
},
top_k=10,
include_metadata=True
)

Index Statistics

Get Index Stats

# Get index statistics
stats = index.describe_index_stats()
print(f"Total vectors: {stats.total_vector_count}")
print(f"Dimension: {stats.dimension}")
print(f"Index fullness: {stats.index_fullness}")
# Per-namespace stats
for ns, ns_stats in stats.namespaces.items():
print(f"Namespace '{ns}': {ns_stats.vector_count} vectors")

Real-World Use Cases

from sentence_transformers import SentenceTransformer
# Initialize embedding model
model = SentenceTransformer('all-MiniLM-L6-v2')
# Index documents
documents = [
{"id": "doc1", "text": "Machine learning fundamentals"},
{"id": "doc2", "text": "Deep learning architectures"},
{"id": "doc3", "text": "Natural language processing"}
]
vectors = [
{
"id": doc["id"],
"values": model.encode(doc["text"]).tolist(),
"metadata": {"text": doc["text"]}
}
for doc in documents
]
index.upsert(vectors=vectors)
# Search
query = "neural networks"
query_embedding = model.encode(query).tolist()
results = index.query(
vector=query_embedding,
top_k=5,
include_metadata=True
)
for match in results.matches:
print(f"Score: {match.score:.4f} - {match.metadata['text']}")

RAG (Retrieval-Augmented Generation)

import openai
def rag_query(question, index, model, top_k=5):
# Embed question
question_embedding = model.encode(question).tolist()
# Retrieve relevant documents
results = index.query(
vector=question_embedding,
top_k=top_k,
include_metadata=True
)
# Build context from retrieved documents
context = "\n".join([
m.metadata.get("text", "")
for m in results.matches
])
# Generate answer with LLM
response = openai.ChatCompletion.create(
model="gpt-4",
messages=[
{"role": "system", "content": f"Context:\n{context}"},
{"role": "user", "content": question}
]
)
return response.choices[0].message.content
# Use RAG
answer = rag_query(
"What is deep learning?",
index,
model
)
print(answer)
from torchvision import models, transforms
from PIL import Image
import torch
# Load image model
model = models.resnet50(pretrained=True)
model.eval()
# Remove classification layer
model = torch.nn.Sequential(*list(model.children())[:-1])
# Transform for images
transform = transforms.Compose([
transforms.Resize(256),
transforms.CenterCrop(224),
transforms.ToTensor(),
transforms.Normalize(
mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225]
)
])
def get_image_embedding(image_path):
img = Image.open(image_path).convert('RGB')
img_tensor = transform(img).unsqueeze(0)
with torch.no_grad():
embedding = model(img_tensor)
return embedding.squeeze().numpy().tolist()
# Index images
for image_path in image_paths:
embedding = get_image_embedding(image_path)
index.upsert(vectors=[{
"id": image_path,
"values": embedding,
"metadata": {"path": image_path}
}])
# Search similar images
query_embedding = get_image_embedding("query_image.jpg")
results = index.query(
vector=query_embedding,
top_k=10,
include_metadata=True
)

Recommendation System

# Index user preferences
user_vectors = [
{
"id": f"user_{user_id}",
"values": user_embedding,
"metadata": {
"user_id": user_id,
"preferences": preferences
}
}
for user_id, user_embedding, preferences in users
]
index.upsert(vectors=user_vectors, namespace="users")
# Index items
item_vectors = [
{
"id": f"item_{item_id}",
"values": item_embedding,
"metadata": {
"item_id": item_id,
"category": category,
"price": price
}
}
for item_id, item_embedding, category, price in items
]
index.upsert(vectors=item_vectors, namespace="items")
# Get recommendations for user
def recommend(user_id, top_k=10):
# Get user vector
user = index.fetch(
ids=[f"user_{user_id}"],
namespace="users"
)
user_vector = user.vectors[f"user_{user_id}"].values
# Find similar items
results = index.query(
vector=user_vector,
top_k=top_k,
include_metadata=True,
namespace="items"
)
return [
{
"item_id": m.metadata["item_id"],
"category": m.metadata["category"],
"score": m.score
}
for m in results.matches
]
recommendations = recommend("user_123")

Anomaly Detection

def detect_anomalies(vector, threshold=0.8):
"""
Detect if a vector is anomalous by comparing
to existing vectors in the index.
"""
results = index.query(
vector=vector,
top_k=5
)
# If no similar vectors found, it's anomalous
if not results.matches:
return True, 0.0
# Average similarity score
avg_score = sum(m.score for m in results.matches) / len(results.matches)
# If average similarity is below threshold, it's anomalous
is_anomaly = avg_score < threshold
return is_anomaly, avg_score
# Check new data point
new_vector = [0.1, 0.2, ...]
is_anomaly, score = detect_anomalies(new_vector)
if is_anomaly:
print(f"Anomaly detected! Similarity score: {score}")

Related: README.md | CONFIGURATION.md | COMPATIBILITY.md

Last Updated: December 2025