Pinecone Vector Examples
Pinecone Vector Examples
Practical examples for HeliosDB’s Pinecone vector protocol support.
Connection Examples
Python Client
from pinecone import Pinecone
# Connect to HeliosDBpc = Pinecone( api_key="your-api-key", host="http://localhost:8080")
# Access indexindex = pc.Index("my-vectors")REST API
# Using curlcurl -X POST "http://localhost:8080/vectors/v1/query" \ -H "Api-Key: your-api-key" \ -H "Content-Type: application/json" \ -d '{ "vector": [0.1, 0.2, 0.3, ...], "topK": 10 }'Node.js Client
const { Pinecone } = require('@pinecone-database/pinecone');
const pc = new Pinecone({ apiKey: 'your-api-key', environment: 'localhost:8080'});
const index = pc.index('my-vectors');Upsert Operations
Single Vector
# Upsert single vectorindex.upsert(vectors=[ { "id": "vec1", "values": [0.1, 0.2, 0.3, 0.4, 0.5], "metadata": { "category": "electronics", "price": 99.99, "in_stock": True } }])Batch Upsert
# Upsert multiple vectorsvectors = [ { "id": f"vec_{i}", "values": [random.random() for _ in range(1536)], "metadata": {"category": f"cat_{i % 10}"} } for i in range(1000)]
# Upsert in batchesindex.upsert(vectors=vectors, batch_size=100)With Namespace
# Upsert to specific namespaceindex.upsert( vectors=[ {"id": "prod_1", "values": [...], "metadata": {...}} ], namespace="production")
# Upsert to different namespaceindex.upsert( vectors=[ {"id": "test_1", "values": [...], "metadata": {...}} ], namespace="testing")Query Operations
Basic Query
# Simple similarity searchresults = index.query( vector=[0.1, 0.2, 0.3, ...], top_k=10)
for match in results.matches: print(f"ID: {match.id}, Score: {match.score}")Query with Metadata
# Include metadata in resultsresults = index.query( vector=[0.1, 0.2, 0.3, ...], top_k=10, include_metadata=True, include_values=True)
for match in results.matches: print(f"ID: {match.id}") print(f"Score: {match.score}") print(f"Metadata: {match.metadata}") print(f"Values: {match.values[:5]}...")Query with Filter
# Filter by metadataresults = index.query( vector=[0.1, 0.2, 0.3, ...], top_k=10, filter={ "category": {"$eq": "electronics"}, "price": {"$lt": 100} }, include_metadata=True)Complex Filters
# AND filterresults = index.query( vector=[...], filter={ "$and": [ {"category": {"$eq": "electronics"}}, {"price": {"$gte": 50}}, {"in_stock": {"$eq": True}} ] })
# OR filterresults = index.query( vector=[...], filter={ "$or": [ {"category": {"$eq": "electronics"}}, {"category": {"$eq": "computers"}} ] })
# IN filterresults = index.query( vector=[...], filter={ "category": {"$in": ["electronics", "computers", "phones"]} })Fetch Operations
Fetch by ID
# Fetch single vectorresult = index.fetch(ids=["vec1"])print(result.vectors["vec1"])Fetch Multiple
# Fetch multiple vectorsresult = index.fetch(ids=["vec1", "vec2", "vec3"])
for id, vector in result.vectors.items(): print(f"ID: {id}, Values: {vector.values[:5]}...")Fetch with Namespace
# Fetch from specific namespaceresult = index.fetch( ids=["prod_1", "prod_2"], namespace="production")Delete Operations
Delete by ID
# Delete single vectorindex.delete(ids=["vec1"])
# Delete multiple vectorsindex.delete(ids=["vec1", "vec2", "vec3"])Delete by Filter
# Delete by metadata filterindex.delete( filter={"category": {"$eq": "outdated"}})Delete All in Namespace
# Delete entire namespaceindex.delete(delete_all=True, namespace="testing")Update Operations
Update Metadata
# Update vector metadataindex.update( id="vec1", set_metadata={ "price": 89.99, "on_sale": True })Update Values
# Update vector valuesindex.update( id="vec1", values=[0.2, 0.3, 0.4, ...] # New vector values)Hybrid Search (Sparse + Dense)
Upsert Hybrid Vectors
# Upsert with sparse and dense valuesindex.upsert(vectors=[ { "id": "doc1", "values": [0.1, 0.2, ...], # Dense embedding "sparse_values": { "indices": [15, 42, 156, 283], # Token indices "values": [0.8, 0.5, 0.3, 0.2] # Token weights }, "metadata": {"title": "Introduction to ML"} }])Hybrid Query
# Query with both dense and sparse vectorsresults = index.query( vector=[0.1, 0.2, ...], # Dense query vector sparse_vector={ "indices": [15, 42], "values": [0.9, 0.5] }, top_k=10, include_metadata=True)Index Statistics
Get Index Stats
# Get index statisticsstats = index.describe_index_stats()
print(f"Total vectors: {stats.total_vector_count}")print(f"Dimension: {stats.dimension}")print(f"Index fullness: {stats.index_fullness}")
# Per-namespace statsfor ns, ns_stats in stats.namespaces.items(): print(f"Namespace '{ns}': {ns_stats.vector_count} vectors")Real-World Use Cases
Semantic Search
from sentence_transformers import SentenceTransformer
# Initialize embedding modelmodel = SentenceTransformer('all-MiniLM-L6-v2')
# Index documentsdocuments = [ {"id": "doc1", "text": "Machine learning fundamentals"}, {"id": "doc2", "text": "Deep learning architectures"}, {"id": "doc3", "text": "Natural language processing"}]
vectors = [ { "id": doc["id"], "values": model.encode(doc["text"]).tolist(), "metadata": {"text": doc["text"]} } for doc in documents]
index.upsert(vectors=vectors)
# Searchquery = "neural networks"query_embedding = model.encode(query).tolist()
results = index.query( vector=query_embedding, top_k=5, include_metadata=True)
for match in results.matches: print(f"Score: {match.score:.4f} - {match.metadata['text']}")RAG (Retrieval-Augmented Generation)
import openai
def rag_query(question, index, model, top_k=5): # Embed question question_embedding = model.encode(question).tolist()
# Retrieve relevant documents results = index.query( vector=question_embedding, top_k=top_k, include_metadata=True )
# Build context from retrieved documents context = "\n".join([ m.metadata.get("text", "") for m in results.matches ])
# Generate answer with LLM response = openai.ChatCompletion.create( model="gpt-4", messages=[ {"role": "system", "content": f"Context:\n{context}"}, {"role": "user", "content": question} ] )
return response.choices[0].message.content
# Use RAGanswer = rag_query( "What is deep learning?", index, model)print(answer)Image Similarity Search
from torchvision import models, transformsfrom PIL import Imageimport torch
# Load image modelmodel = models.resnet50(pretrained=True)model.eval()
# Remove classification layermodel = torch.nn.Sequential(*list(model.children())[:-1])
# Transform for imagestransform = transforms.Compose([ transforms.Resize(256), transforms.CenterCrop(224), transforms.ToTensor(), transforms.Normalize( mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225] )])
def get_image_embedding(image_path): img = Image.open(image_path).convert('RGB') img_tensor = transform(img).unsqueeze(0) with torch.no_grad(): embedding = model(img_tensor) return embedding.squeeze().numpy().tolist()
# Index imagesfor image_path in image_paths: embedding = get_image_embedding(image_path) index.upsert(vectors=[{ "id": image_path, "values": embedding, "metadata": {"path": image_path} }])
# Search similar imagesquery_embedding = get_image_embedding("query_image.jpg")results = index.query( vector=query_embedding, top_k=10, include_metadata=True)Recommendation System
# Index user preferencesuser_vectors = [ { "id": f"user_{user_id}", "values": user_embedding, "metadata": { "user_id": user_id, "preferences": preferences } } for user_id, user_embedding, preferences in users]
index.upsert(vectors=user_vectors, namespace="users")
# Index itemsitem_vectors = [ { "id": f"item_{item_id}", "values": item_embedding, "metadata": { "item_id": item_id, "category": category, "price": price } } for item_id, item_embedding, category, price in items]
index.upsert(vectors=item_vectors, namespace="items")
# Get recommendations for userdef recommend(user_id, top_k=10): # Get user vector user = index.fetch( ids=[f"user_{user_id}"], namespace="users" ) user_vector = user.vectors[f"user_{user_id}"].values
# Find similar items results = index.query( vector=user_vector, top_k=top_k, include_metadata=True, namespace="items" )
return [ { "item_id": m.metadata["item_id"], "category": m.metadata["category"], "score": m.score } for m in results.matches ]
recommendations = recommend("user_123")Anomaly Detection
def detect_anomalies(vector, threshold=0.8): """ Detect if a vector is anomalous by comparing to existing vectors in the index. """ results = index.query( vector=vector, top_k=5 )
# If no similar vectors found, it's anomalous if not results.matches: return True, 0.0
# Average similarity score avg_score = sum(m.score for m in results.matches) / len(results.matches)
# If average similarity is below threshold, it's anomalous is_anomaly = avg_score < threshold
return is_anomaly, avg_score
# Check new data pointnew_vector = [0.1, 0.2, ...]is_anomaly, score = detect_anomalies(new_vector)
if is_anomaly: print(f"Anomaly detected! Similarity score: {score}")Related: README.md | CONFIGURATION.md | COMPATIBILITY.md
Last Updated: December 2025