Skip to content

Protocol Compatibility Test Suite

Protocol Compatibility Test Suite

Overview

This document defines the comprehensive protocol compatibility test suite for HeliosDB. These tests MUST pass in CI before any merge to ensure zero-friction adoption for existing database clients.

Test Matrix

Based on /home/claude/DMD/docs/01_PROTOCOL_TEST_MATRIX.md:

EcosystemProtocolPython DriverCompatibilityTest Priority
PostgreSQLlibpqpsycopg2, asyncpg, SQLAlchemyGoldP0 (Must-Pass)
MySQLv10mysql-connector, PyMySQLGoldP0 (Must-Pass)
SnowflakeHTTP/JSONsnowflake-connector-pythonSilverP1
Databricks SQLHTTP/Thriftdatabricks-sql-connectorSilverP1
PineconeHTTP/gRPCpinecone-clientSilverP1
SQL ServerTDS 7.4pyodbc, pymssqlBronzeP2
DB2DRDAibm_dbBronzeP2
Oracle/TiberoNet8/TTCoracledb (Thin)BronzeP2

Test Implementation

Directory Structure

tests/protocol/
├── conftest.py # Pytest fixtures
├── test_postgresql.py # PostgreSQL (Gold)
├── test_mysql.py # MySQL (Gold)
├── test_snowflake.py # Snowflake (Silver)
├── test_databricks.py # Databricks (Silver)
├── test_pinecone.py # Pinecone (Silver)
├── test_sqlserver.py # SQL Server (Bronze)
├── test_db2.py # DB2 (Bronze)
├── test_oracle.py # Oracle (Bronze)
├── helpers/
│ ├── protocol_validator.py
│ └── test_data.py
└── docker/
└── docker-compose.protocol.yml

Test Infrastructure

tests/protocol/conftest.py
import pytest
import docker
import time
from typing import Generator
@pytest.fixture(scope="session")
def heliosdb_cluster():
"""Start HeliosDB cluster for protocol testing"""
client = docker.from_env()
# Start HeliosDB with all protocol handlers
container = client.containers.run(
"heliosdb:latest",
detach=True,
ports={
'5432/tcp': 5432, # PostgreSQL
'3306/tcp': 3306, # MySQL
'1433/tcp': 1433, # SQL Server
'1521/tcp': 1521, # Oracle
'50000/tcp': 50000, # DB2
'443/tcp': 8443, # HTTPS (Snowflake/Databricks/Pinecone)
},
environment={
'HELIOS_ENABLE_ALL_PROTOCOLS': 'true',
'HELIOS_TLS_CERT': '/certs/server.crt',
'HELIOS_TLS_KEY': '/certs/server.key',
}
)
# Wait for cluster to be ready
for _ in range(30):
try:
# Health check
exec_result = container.exec_run("heliosdb-cli status")
if exec_result.exit_code == 0:
break
except:
pass
time.sleep(1)
else:
raise RuntimeError("HeliosDB cluster failed to start")
yield container
# Teardown
container.stop()
container.remove()
@pytest.fixture
def test_table(heliosdb_cluster):
"""Create test table before each test"""
# Create via admin API
import requests
response = requests.post(
"http://localhost:8080/admin/ddl",
json={
"sql": """
CREATE TABLE test_data (
id BIGINT PRIMARY KEY,
name VARCHAR(255),
value INTEGER,
created_at TIMESTAMP
)
"""
}
)
assert response.status_code == 200
yield "test_data"
# Cleanup
requests.post(
"http://localhost:8080/admin/ddl",
json={"sql": "DROP TABLE test_data"}
)

PostgreSQL Protocol Tests (Gold)

tests/protocol/test_postgresql.py
import psycopg2
import psycopg2.extras
import asyncpg
import pytest
from sqlalchemy import create_engine, text
DSN = "host=localhost port=5432 dbname=helios user=test password=test sslmode=require"
class TestPostgreSQLProtocol:
@pytest.mark.protocol
@pytest.mark.p0
def test_psycopg2_connect(self, heliosdb_cluster):
"""PG-001: Connect and simple query via psycopg2"""
conn = psycopg2.connect(DSN)
cur = conn.cursor()
cur.execute("SELECT 1 as value")
result = cur.fetchone()
assert result[0] == 1
conn.close()
@pytest.mark.protocol
@pytest.mark.p0
def test_psycopg2_prepared_statement(self, heliosdb_cluster, test_table):
"""PG-002: Prepared statements with parameters"""
conn = psycopg2.connect(DSN)
cur = conn.cursor()
# Insert with parameters
cur.execute(
"INSERT INTO test_data (id, name, value) VALUES (%s, %s, %s)",
(1, "Alice", 100)
)
conn.commit()
# Query with parameters
cur.execute(
"SELECT name, value FROM test_data WHERE id = %s",
(1,)
)
row = cur.fetchone()
assert row == ("Alice", 100)
conn.close()
@pytest.mark.protocol
@pytest.mark.p0
def test_psycopg2_transaction(self, heliosdb_cluster, test_table):
"""PG-003: BEGIN/COMMIT/ROLLBACK semantics"""
conn = psycopg2.connect(DSN)
conn.autocommit = False
cur = conn.cursor()
# Insert and rollback
cur.execute("INSERT INTO test_data (id, name) VALUES (100, 'Rollback')")
conn.rollback()
# Verify not present
cur.execute("SELECT COUNT(*) FROM test_data WHERE id = 100")
assert cur.fetchone()[0] == 0
# Insert and commit
cur.execute("INSERT INTO test_data (id, name) VALUES (101, 'Commit')")
conn.commit()
# Verify present
cur.execute("SELECT COUNT(*) FROM test_data WHERE id = 101")
assert cur.fetchone()[0] == 1
conn.close()
@pytest.mark.protocol
@pytest.mark.p0
def test_psycopg2_server_cursor(self, heliosdb_cluster, test_table):
"""PG-004: Server-side cursors for large result sets"""
conn = psycopg2.connect(DSN)
# Insert 1000 rows
cur = conn.cursor()
for i in range(1000):
cur.execute(
"INSERT INTO test_data (id, value) VALUES (%s, %s)",
(i, i * 2)
)
conn.commit()
# Use server-side cursor
cur = conn.cursor(name='fetch_cursor')
cur.execute("SELECT id, value FROM test_data ORDER BY id")
# Fetch in batches
batch1 = cur.fetchmany(100)
assert len(batch1) == 100
assert batch1[0][0] == 0
batch2 = cur.fetchmany(100)
assert len(batch2) == 100
assert batch2[0][0] == 100
cur.close()
conn.close()
@pytest.mark.protocol
@pytest.mark.p0
def test_psycopg2_error_handling(self, heliosdb_cluster):
"""PG-005: PostgreSQL error codes and SQLSTATE"""
conn = psycopg2.connect(DSN)
cur = conn.cursor()
# Syntax error
with pytest.raises(psycopg2.errors.SyntaxError):
cur.execute("SELCT 1") # Typo
# Unique violation
cur.execute("CREATE TABLE dup_test (id INT PRIMARY KEY)")
cur.execute("INSERT INTO dup_test VALUES (1)")
conn.commit()
with pytest.raises(psycopg2.errors.UniqueViolation):
cur.execute("INSERT INTO dup_test VALUES (1)")
conn.rollback()
conn.close()
@pytest.mark.protocol
@pytest.mark.p0
@pytest.mark.asyncio
async def test_asyncpg_connect(self, heliosdb_cluster):
"""PG-006: Connect via asyncpg (async Python driver)"""
conn = await asyncpg.connect(
host='localhost',
port=5432,
database='helios',
user='test',
password='test',
ssl='require'
)
value = await conn.fetchval('SELECT 1')
assert value == 1
await conn.close()
@pytest.mark.protocol
@pytest.mark.p0
def test_sqlalchemy_postgresql(self, heliosdb_cluster, test_table):
"""PG-007: SQLAlchemy with PostgreSQL dialect"""
engine = create_engine(
"postgresql+psycopg2://test:test@localhost:5432/helios",
connect_args={'sslmode': 'require'}
)
with engine.connect() as conn:
# Insert
result = conn.execute(
text("INSERT INTO test_data (id, name) VALUES (:id, :name)"),
{"id": 200, "name": "SQLAlchemy"}
)
conn.commit()
# Query
result = conn.execute(
text("SELECT name FROM test_data WHERE id = :id"),
{"id": 200}
)
row = result.fetchone()
assert row[0] == "SQLAlchemy"
@pytest.mark.protocol
@pytest.mark.p0
def test_postgresql_copy(self, heliosdb_cluster, test_table):
"""PG-008: COPY command for bulk load"""
conn = psycopg2.connect(DSN)
cur = conn.cursor()
# COPY FROM STDIN
from io import StringIO
data = StringIO()
for i in range(10000):
data.write(f"{i}\tBulkRow{i}\t{i*10}\n")
data.seek(0)
cur.copy_from(
data,
'test_data',
columns=('id', 'name', 'value'),
sep='\t'
)
conn.commit()
# Verify
cur.execute("SELECT COUNT(*) FROM test_data")
assert cur.fetchone()[0] == 10000
conn.close()

MySQL Protocol Tests (Gold)

tests/protocol/test_mysql.py
import pymysql
import mysql.connector
import pytest
from sqlalchemy import create_engine, text
MYSQL_CONFIG = {
'host': 'localhost',
'port': 3306,
'user': 'test',
'password': 'test',
'database': 'helios',
'ssl': {'ssl': True}
}
class TestMySQLProtocol:
@pytest.mark.protocol
@pytest.mark.p0
def test_pymysql_connect(self, heliosdb_cluster):
"""MY-001: Connect and simple query via PyMySQL"""
conn = pymysql.connect(**MYSQL_CONFIG)
cur = conn.cursor()
cur.execute("SELECT 1 as value")
result = cur.fetchone()
assert result[0] == 1
conn.close()
@pytest.mark.protocol
@pytest.mark.p0
def test_pymysql_prepared_statement(self, heliosdb_cluster, test_table):
"""MY-002: Prepared statements with parameters"""
conn = pymysql.connect(**MYSQL_CONFIG)
cur = conn.cursor()
# Insert with parameters
cur.execute(
"INSERT INTO test_data (id, name, value) VALUES (%s, %s, %s)",
(1, "Bob", 200)
)
conn.commit()
# Query with parameters
cur.execute(
"SELECT name, value FROM test_data WHERE id = %s",
(1,)
)
row = cur.fetchone()
assert row == ("Bob", 200)
conn.close()
@pytest.mark.protocol
@pytest.mark.p0
def test_mysql_autocommit(self, heliosdb_cluster, test_table):
"""MY-003: Autocommit mode semantics"""
conn = pymysql.connect(**MYSQL_CONFIG)
# Default: autocommit off
assert conn.get_autocommit() == False
cur = conn.cursor()
cur.execute("INSERT INTO test_data (id, name) VALUES (50, 'Auto')")
# Not visible in new connection yet
conn2 = pymysql.connect(**MYSQL_CONFIG)
cur2 = conn2.cursor()
cur2.execute("SELECT COUNT(*) FROM test_data WHERE id = 50")
assert cur2.fetchone()[0] == 0
conn2.close()
# Commit
conn.commit()
# Now visible
conn3 = pymysql.connect(**MYSQL_CONFIG)
cur3 = conn3.cursor()
cur3.execute("SELECT COUNT(*) FROM test_data WHERE id = 50")
assert cur3.fetchone()[0] == 1
conn3.close()
conn.close()
@pytest.mark.protocol
@pytest.mark.p0
def test_mysql_connector(self, heliosdb_cluster, test_table):
"""MY-004: mysql-connector-python compatibility"""
conn = mysql.connector.connect(
host='localhost',
port=3306,
user='test',
password='test',
database='helios',
ssl_ca=None,
ssl_disabled=False
)
cur = conn.cursor()
cur.execute("INSERT INTO test_data (id, name) VALUES (75, 'Connector')")
conn.commit()
cur.execute("SELECT name FROM test_data WHERE id = 75")
assert cur.fetchone()[0] == 'Connector'
conn.close()
@pytest.mark.protocol
@pytest.mark.p0
def test_mysql_error_handling(self, heliosdb_cluster):
"""MY-005: MySQL error numbers and messages"""
conn = pymysql.connect(**MYSQL_CONFIG)
cur = conn.cursor()
# Syntax error (Error 1064)
with pytest.raises(pymysql.err.ProgrammingError) as exc_info:
cur.execute("SELCT 1") # Typo
assert exc_info.value.args[0] == 1064
# Duplicate key (Error 1062)
cur.execute("CREATE TABLE dup_test (id INT PRIMARY KEY)")
cur.execute("INSERT INTO dup_test VALUES (1)")
conn.commit()
with pytest.raises(pymysql.err.IntegrityError) as exc_info:
cur.execute("INSERT INTO dup_test VALUES (1)")
assert exc_info.value.args[0] == 1062
conn.rollback()
conn.close()
@pytest.mark.protocol
@pytest.mark.p0
def test_sqlalchemy_mysql(self, heliosdb_cluster, test_table):
"""MY-006: SQLAlchemy with MySQL dialect"""
engine = create_engine(
"mysql+pymysql://test:test@localhost:3306/helios",
connect_args={'ssl': {'ssl': True}}
)
with engine.connect() as conn:
result = conn.execute(
text("INSERT INTO test_data (id, name) VALUES (:id, :name)"),
{"id": 300, "name": "MySQL+SQLAlchemy"}
)
conn.commit()
result = conn.execute(
text("SELECT name FROM test_data WHERE id = :id"),
{"id": 300}
)
row = result.fetchone()
assert row[0] == "MySQL+SQLAlchemy"

Snowflake Protocol Tests (Silver)

tests/protocol/test_snowflake.py
import pytest
import snowflake.connector
SNOWFLAKE_CONFIG = {
'account': 'local',
'user': 'test',
'password': 'test',
'host': 'localhost',
'protocol': 'https',
'port': 8443,
'warehouse': 'default',
'database': 'helios',
'schema': 'public'
}
class TestSnowflakeProtocol:
@pytest.mark.protocol
@pytest.mark.p1
def test_snowflake_connect(self, heliosdb_cluster):
"""SF-001: Connect via snowflake-connector-python"""
conn = snowflake.connector.connect(**SNOWFLAKE_CONFIG)
cur = conn.cursor()
cur.execute("SELECT 1 as value")
result = cur.fetchone()
assert result[0] == 1
conn.close()
@pytest.mark.protocol
@pytest.mark.p1
def test_snowflake_query_submit(self, heliosdb_cluster, test_table):
"""SF-002: Query submission and result fetch"""
conn = snowflake.connector.connect(**SNOWFLAKE_CONFIG)
cur = conn.cursor()
# Insert data
cur.execute(
"INSERT INTO test_data (id, name, value) VALUES (?, ?, ?)",
(1, "Snowflake", 500)
)
# Query
cur.execute("SELECT name, value FROM test_data WHERE id = ?", (1,))
row = cur.fetchone()
assert row == ("Snowflake", 500)
conn.close()
@pytest.mark.protocol
@pytest.mark.p1
def test_snowflake_fetchmany(self, heliosdb_cluster, test_table):
"""SF-003: Batch fetching"""
conn = snowflake.connector.connect(**SNOWFLAKE_CONFIG)
cur = conn.cursor()
# Insert 500 rows
for i in range(500):
cur.execute(
"INSERT INTO test_data (id, value) VALUES (?, ?)",
(i, i * 3)
)
# Fetch in batches
cur.execute("SELECT id, value FROM test_data ORDER BY id")
batch = cur.fetchmany(100)
assert len(batch) == 100
assert batch[0] == (0, 0)
conn.close()
@pytest.mark.protocol
@pytest.mark.p1
def test_snowflake_cancel_query(self, heliosdb_cluster, test_table):
"""SF-004: Query cancellation"""
conn = snowflake.connector.connect(**SNOWFLAKE_CONFIG)
cur = conn.cursor()
# Start long-running query
import threading
import time
def run_query():
cur.execute("SELECT * FROM large_table") # Assumes large_table exists
thread = threading.Thread(target=run_query)
thread.start()
# Cancel after 1 second
time.sleep(1)
cur.abort_query(cur.sfqid)
thread.join(timeout=5)
# Verify cancellation
assert not thread.is_alive()
conn.close()

Databricks SQL Protocol Tests (Silver)

tests/protocol/test_databricks.py
import pytest
from databricks import sql
DATABRICKS_CONFIG = {
'server_hostname': 'localhost',
'http_path': '/dbsql',
'access_token': 'test_token'
}
class TestDatabricksProtocol:
@pytest.mark.protocol
@pytest.mark.p1
def test_databricks_connect(self, heliosdb_cluster):
"""DB-001: Connect via databricks-sql-connector"""
with sql.connect(**DATABRICKS_CONFIG) as conn:
with conn.cursor() as cur:
cur.execute("SELECT 1 as value")
result = cur.fetchone()
assert result[0] == 1
@pytest.mark.protocol
@pytest.mark.p1
def test_databricks_query(self, heliosdb_cluster, test_table):
"""DB-002: Query execution"""
with sql.connect(**DATABRICKS_CONFIG) as conn:
with conn.cursor() as cur:
cur.execute(
"INSERT INTO test_data (id, name) VALUES (?, ?)",
(100, "Databricks")
)
cur.execute("SELECT name FROM test_data WHERE id = ?", (100,))
assert cur.fetchone()[0] == "Databricks"
@pytest.mark.protocol
@pytest.mark.p1
def test_databricks_fetchmany(self, heliosdb_cluster, test_table):
"""DB-003: Batch fetching"""
with sql.connect(**DATABRICKS_CONFIG) as conn:
with conn.cursor() as cur:
# Insert 300 rows
for i in range(300):
cur.execute(
"INSERT INTO test_data (id, value) VALUES (?, ?)",
(i, i * 5)
)
cur.execute("SELECT id FROM test_data ORDER BY id")
batch = cur.fetchmany(50)
assert len(batch) == 50

Pinecone Protocol Tests (Silver)

tests/protocol/test_pinecone.py
import pytest
from pinecone import Pinecone, ServerlessSpec
class TestPineconeProtocol:
@pytest.mark.protocol
@pytest.mark.p1
def test_pinecone_connect(self, heliosdb_cluster):
"""PC-001: Initialize Pinecone client"""
pc = Pinecone(
api_key='test_key',
host='https://localhost:8443/pinecone'
)
# List indexes
indexes = pc.list_indexes()
assert isinstance(indexes, list)
@pytest.mark.protocol
@pytest.mark.p1
def test_pinecone_upsert(self, heliosdb_cluster):
"""PC-002: Upsert vectors"""
pc = Pinecone(api_key='test_key', host='https://localhost:8443/pinecone')
# Create index
pc.create_index(
name='test_index',
dimension=384,
metric='cosine',
spec=ServerlessSpec(cloud='local', region='default')
)
index = pc.Index('test_index')
# Upsert vectors
vectors = [
("vec1", [0.1] * 384, {"category": "A"}),
("vec2", [0.2] * 384, {"category": "B"}),
("vec3", [0.3] * 384, {"category": "A"}),
]
index.upsert(vectors=vectors)
# Query
results = index.query(
vector=[0.15] * 384,
top_k=2,
filter={"category": "A"}
)
assert len(results['matches']) == 2
assert results['matches'][0]['id'] in ['vec1', 'vec3']
@pytest.mark.protocol
@pytest.mark.p1
def test_pinecone_filtered_query(self, heliosdb_cluster):
"""PC-003: Filtered vector search"""
pc = Pinecone(api_key='test_key', host='https://localhost:8443/pinecone')
index = pc.Index('test_index')
# Insert with metadata
vectors = [
(f"id_{i}", [i/100.0] * 384, {"price": i * 10, "category": f"cat_{i%3}"})
for i in range(100)
]
index.upsert(vectors=vectors)
# Query with filter
results = index.query(
vector=[0.5] * 384,
top_k=5,
filter={"price": {"$lt": 500}, "category": "cat_1"}
)
assert len(results['matches']) <= 5
for match in results['matches']:
assert match['metadata']['price'] < 500
assert match['metadata']['category'] == "cat_1"

CI/CD Integration

.github/workflows/protocol-tests.yml
name: Protocol Compatibility Tests
on:
push:
branches: [main, develop]
pull_request:
jobs:
protocol-test:
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
protocol:
- { name: postgresql, priority: p0 }
- { name: mysql, priority: p0 }
- { name: snowflake, priority: p1 }
- { name: databricks, priority: p1 }
- { name: pinecone, priority: p1 }
- { name: sqlserver, priority: p2 }
- { name: db2, priority: p2 }
- { name: oracle, priority: p2 }
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.11'
- name: Install dependencies
run: |
pip install pytest pytest-asyncio
pip install psycopg2-binary asyncpg sqlalchemy
pip install pymysql mysql-connector-python
pip install snowflake-connector-python
pip install databricks-sql-connector
pip install pinecone-client
pip install pyodbc pymssql
pip install ibm_db
pip install oracledb
- name: Start HeliosDB cluster
run: |
docker-compose -f tests/protocol/docker/docker-compose.protocol.yml up -d
sleep 10 # Wait for startup
- name: Run protocol tests
run: |
pytest tests/protocol/test_${{ matrix.protocol.name }}.py \
-v \
-m "protocol and ${{ matrix.protocol.priority }}" \
--junitxml=results-${{ matrix.protocol.name }}.xml
- name: Upload test results
if: always()
uses: actions/upload-artifact@v3
with:
name: protocol-test-results
path: results-*.xml
- name: Fail on P0 failures
if: matrix.protocol.priority == 'p0' && failure()
run: exit 1

Test Execution

Local Testing

Terminal window
# Run all protocol tests
pytest tests/protocol/ -v
# Run only P0 (must-pass) tests
pytest tests/protocol/ -v -m "protocol and p0"
# Run specific protocol
pytest tests/protocol/test_postgresql.py -v
# Run with coverage
pytest tests/protocol/ -v --cov=heliosdb --cov-report=html

Docker-based Testing

Terminal window
# Start HeliosDB cluster
docker-compose -f tests/protocol/docker/docker-compose.protocol.yml up -d
# Run tests
pytest tests/protocol/ -v
# Teardown
docker-compose -f tests/protocol/docker/docker-compose.protocol.yml down

Quality Gates

Must-Pass Criteria (P0)

  • PostgreSQL: All Gold tests pass
  • MySQL: All Gold tests pass
  • Protocol autodetect works correctly
  • TLS/SSL connections succeed
  • Error codes match protocol specs

Success Criteria (P1)

  • Snowflake: Silver tests pass
  • Databricks: Silver tests pass
  • Pinecone: Silver tests pass

Future Goals (P2)

  • SQL Server: Bronze tests pass
  • DB2: Bronze tests pass
  • Oracle: Bronze tests pass