Protocol Compatibility Test Suite
Protocol Compatibility Test Suite
Overview
This document defines the comprehensive protocol compatibility test suite for HeliosDB. These tests MUST pass in CI before any merge to ensure zero-friction adoption for existing database clients.
Test Matrix
Based on /home/claude/DMD/docs/01_PROTOCOL_TEST_MATRIX.md:
| Ecosystem | Protocol | Python Driver | Compatibility | Test Priority |
|---|---|---|---|---|
| PostgreSQL | libpq | psycopg2, asyncpg, SQLAlchemy | Gold | P0 (Must-Pass) |
| MySQL | v10 | mysql-connector, PyMySQL | Gold | P0 (Must-Pass) |
| Snowflake | HTTP/JSON | snowflake-connector-python | Silver | P1 |
| Databricks SQL | HTTP/Thrift | databricks-sql-connector | Silver | P1 |
| Pinecone | HTTP/gRPC | pinecone-client | Silver | P1 |
| SQL Server | TDS 7.4 | pyodbc, pymssql | Bronze | P2 |
| DB2 | DRDA | ibm_db | Bronze | P2 |
| Oracle/Tibero | Net8/TTC | oracledb (Thin) | Bronze | P2 |
Test Implementation
Directory Structure
tests/protocol/├── conftest.py # Pytest fixtures├── test_postgresql.py # PostgreSQL (Gold)├── test_mysql.py # MySQL (Gold)├── test_snowflake.py # Snowflake (Silver)├── test_databricks.py # Databricks (Silver)├── test_pinecone.py # Pinecone (Silver)├── test_sqlserver.py # SQL Server (Bronze)├── test_db2.py # DB2 (Bronze)├── test_oracle.py # Oracle (Bronze)├── helpers/│ ├── protocol_validator.py│ └── test_data.py└── docker/ └── docker-compose.protocol.ymlTest Infrastructure
import pytestimport dockerimport timefrom typing import Generator
@pytest.fixture(scope="session")def heliosdb_cluster(): """Start HeliosDB cluster for protocol testing""" client = docker.from_env()
# Start HeliosDB with all protocol handlers container = client.containers.run( "heliosdb:latest", detach=True, ports={ '5432/tcp': 5432, # PostgreSQL '3306/tcp': 3306, # MySQL '1433/tcp': 1433, # SQL Server '1521/tcp': 1521, # Oracle '50000/tcp': 50000, # DB2 '443/tcp': 8443, # HTTPS (Snowflake/Databricks/Pinecone) }, environment={ 'HELIOS_ENABLE_ALL_PROTOCOLS': 'true', 'HELIOS_TLS_CERT': '/certs/server.crt', 'HELIOS_TLS_KEY': '/certs/server.key', } )
# Wait for cluster to be ready for _ in range(30): try: # Health check exec_result = container.exec_run("heliosdb-cli status") if exec_result.exit_code == 0: break except: pass time.sleep(1) else: raise RuntimeError("HeliosDB cluster failed to start")
yield container
# Teardown container.stop() container.remove()
@pytest.fixturedef test_table(heliosdb_cluster): """Create test table before each test""" # Create via admin API import requests response = requests.post( "http://localhost:8080/admin/ddl", json={ "sql": """ CREATE TABLE test_data ( id BIGINT PRIMARY KEY, name VARCHAR(255), value INTEGER, created_at TIMESTAMP ) """ } ) assert response.status_code == 200
yield "test_data"
# Cleanup requests.post( "http://localhost:8080/admin/ddl", json={"sql": "DROP TABLE test_data"} )PostgreSQL Protocol Tests (Gold)
import psycopg2import psycopg2.extrasimport asyncpgimport pytestfrom sqlalchemy import create_engine, text
DSN = "host=localhost port=5432 dbname=helios user=test password=test sslmode=require"
class TestPostgreSQLProtocol:
@pytest.mark.protocol @pytest.mark.p0 def test_psycopg2_connect(self, heliosdb_cluster): """PG-001: Connect and simple query via psycopg2""" conn = psycopg2.connect(DSN) cur = conn.cursor() cur.execute("SELECT 1 as value") result = cur.fetchone() assert result[0] == 1 conn.close()
@pytest.mark.protocol @pytest.mark.p0 def test_psycopg2_prepared_statement(self, heliosdb_cluster, test_table): """PG-002: Prepared statements with parameters""" conn = psycopg2.connect(DSN) cur = conn.cursor()
# Insert with parameters cur.execute( "INSERT INTO test_data (id, name, value) VALUES (%s, %s, %s)", (1, "Alice", 100) ) conn.commit()
# Query with parameters cur.execute( "SELECT name, value FROM test_data WHERE id = %s", (1,) ) row = cur.fetchone() assert row == ("Alice", 100) conn.close()
@pytest.mark.protocol @pytest.mark.p0 def test_psycopg2_transaction(self, heliosdb_cluster, test_table): """PG-003: BEGIN/COMMIT/ROLLBACK semantics""" conn = psycopg2.connect(DSN) conn.autocommit = False cur = conn.cursor()
# Insert and rollback cur.execute("INSERT INTO test_data (id, name) VALUES (100, 'Rollback')") conn.rollback()
# Verify not present cur.execute("SELECT COUNT(*) FROM test_data WHERE id = 100") assert cur.fetchone()[0] == 0
# Insert and commit cur.execute("INSERT INTO test_data (id, name) VALUES (101, 'Commit')") conn.commit()
# Verify present cur.execute("SELECT COUNT(*) FROM test_data WHERE id = 101") assert cur.fetchone()[0] == 1 conn.close()
@pytest.mark.protocol @pytest.mark.p0 def test_psycopg2_server_cursor(self, heliosdb_cluster, test_table): """PG-004: Server-side cursors for large result sets""" conn = psycopg2.connect(DSN)
# Insert 1000 rows cur = conn.cursor() for i in range(1000): cur.execute( "INSERT INTO test_data (id, value) VALUES (%s, %s)", (i, i * 2) ) conn.commit()
# Use server-side cursor cur = conn.cursor(name='fetch_cursor') cur.execute("SELECT id, value FROM test_data ORDER BY id")
# Fetch in batches batch1 = cur.fetchmany(100) assert len(batch1) == 100 assert batch1[0][0] == 0
batch2 = cur.fetchmany(100) assert len(batch2) == 100 assert batch2[0][0] == 100
cur.close() conn.close()
@pytest.mark.protocol @pytest.mark.p0 def test_psycopg2_error_handling(self, heliosdb_cluster): """PG-005: PostgreSQL error codes and SQLSTATE""" conn = psycopg2.connect(DSN) cur = conn.cursor()
# Syntax error with pytest.raises(psycopg2.errors.SyntaxError): cur.execute("SELCT 1") # Typo
# Unique violation cur.execute("CREATE TABLE dup_test (id INT PRIMARY KEY)") cur.execute("INSERT INTO dup_test VALUES (1)") conn.commit()
with pytest.raises(psycopg2.errors.UniqueViolation): cur.execute("INSERT INTO dup_test VALUES (1)")
conn.rollback() conn.close()
@pytest.mark.protocol @pytest.mark.p0 @pytest.mark.asyncio async def test_asyncpg_connect(self, heliosdb_cluster): """PG-006: Connect via asyncpg (async Python driver)""" conn = await asyncpg.connect( host='localhost', port=5432, database='helios', user='test', password='test', ssl='require' )
value = await conn.fetchval('SELECT 1') assert value == 1
await conn.close()
@pytest.mark.protocol @pytest.mark.p0 def test_sqlalchemy_postgresql(self, heliosdb_cluster, test_table): """PG-007: SQLAlchemy with PostgreSQL dialect""" engine = create_engine( "postgresql+psycopg2://test:test@localhost:5432/helios", connect_args={'sslmode': 'require'} )
with engine.connect() as conn: # Insert result = conn.execute( text("INSERT INTO test_data (id, name) VALUES (:id, :name)"), {"id": 200, "name": "SQLAlchemy"} ) conn.commit()
# Query result = conn.execute( text("SELECT name FROM test_data WHERE id = :id"), {"id": 200} ) row = result.fetchone() assert row[0] == "SQLAlchemy"
@pytest.mark.protocol @pytest.mark.p0 def test_postgresql_copy(self, heliosdb_cluster, test_table): """PG-008: COPY command for bulk load""" conn = psycopg2.connect(DSN) cur = conn.cursor()
# COPY FROM STDIN from io import StringIO data = StringIO() for i in range(10000): data.write(f"{i}\tBulkRow{i}\t{i*10}\n") data.seek(0)
cur.copy_from( data, 'test_data', columns=('id', 'name', 'value'), sep='\t' ) conn.commit()
# Verify cur.execute("SELECT COUNT(*) FROM test_data") assert cur.fetchone()[0] == 10000
conn.close()MySQL Protocol Tests (Gold)
import pymysqlimport mysql.connectorimport pytestfrom sqlalchemy import create_engine, text
MYSQL_CONFIG = { 'host': 'localhost', 'port': 3306, 'user': 'test', 'password': 'test', 'database': 'helios', 'ssl': {'ssl': True}}
class TestMySQLProtocol:
@pytest.mark.protocol @pytest.mark.p0 def test_pymysql_connect(self, heliosdb_cluster): """MY-001: Connect and simple query via PyMySQL""" conn = pymysql.connect(**MYSQL_CONFIG) cur = conn.cursor() cur.execute("SELECT 1 as value") result = cur.fetchone() assert result[0] == 1 conn.close()
@pytest.mark.protocol @pytest.mark.p0 def test_pymysql_prepared_statement(self, heliosdb_cluster, test_table): """MY-002: Prepared statements with parameters""" conn = pymysql.connect(**MYSQL_CONFIG) cur = conn.cursor()
# Insert with parameters cur.execute( "INSERT INTO test_data (id, name, value) VALUES (%s, %s, %s)", (1, "Bob", 200) ) conn.commit()
# Query with parameters cur.execute( "SELECT name, value FROM test_data WHERE id = %s", (1,) ) row = cur.fetchone() assert row == ("Bob", 200) conn.close()
@pytest.mark.protocol @pytest.mark.p0 def test_mysql_autocommit(self, heliosdb_cluster, test_table): """MY-003: Autocommit mode semantics""" conn = pymysql.connect(**MYSQL_CONFIG)
# Default: autocommit off assert conn.get_autocommit() == False
cur = conn.cursor() cur.execute("INSERT INTO test_data (id, name) VALUES (50, 'Auto')")
# Not visible in new connection yet conn2 = pymysql.connect(**MYSQL_CONFIG) cur2 = conn2.cursor() cur2.execute("SELECT COUNT(*) FROM test_data WHERE id = 50") assert cur2.fetchone()[0] == 0 conn2.close()
# Commit conn.commit()
# Now visible conn3 = pymysql.connect(**MYSQL_CONFIG) cur3 = conn3.cursor() cur3.execute("SELECT COUNT(*) FROM test_data WHERE id = 50") assert cur3.fetchone()[0] == 1 conn3.close()
conn.close()
@pytest.mark.protocol @pytest.mark.p0 def test_mysql_connector(self, heliosdb_cluster, test_table): """MY-004: mysql-connector-python compatibility""" conn = mysql.connector.connect( host='localhost', port=3306, user='test', password='test', database='helios', ssl_ca=None, ssl_disabled=False )
cur = conn.cursor() cur.execute("INSERT INTO test_data (id, name) VALUES (75, 'Connector')") conn.commit()
cur.execute("SELECT name FROM test_data WHERE id = 75") assert cur.fetchone()[0] == 'Connector'
conn.close()
@pytest.mark.protocol @pytest.mark.p0 def test_mysql_error_handling(self, heliosdb_cluster): """MY-005: MySQL error numbers and messages""" conn = pymysql.connect(**MYSQL_CONFIG) cur = conn.cursor()
# Syntax error (Error 1064) with pytest.raises(pymysql.err.ProgrammingError) as exc_info: cur.execute("SELCT 1") # Typo assert exc_info.value.args[0] == 1064
# Duplicate key (Error 1062) cur.execute("CREATE TABLE dup_test (id INT PRIMARY KEY)") cur.execute("INSERT INTO dup_test VALUES (1)") conn.commit()
with pytest.raises(pymysql.err.IntegrityError) as exc_info: cur.execute("INSERT INTO dup_test VALUES (1)") assert exc_info.value.args[0] == 1062
conn.rollback() conn.close()
@pytest.mark.protocol @pytest.mark.p0 def test_sqlalchemy_mysql(self, heliosdb_cluster, test_table): """MY-006: SQLAlchemy with MySQL dialect""" engine = create_engine( "mysql+pymysql://test:test@localhost:3306/helios", connect_args={'ssl': {'ssl': True}} )
with engine.connect() as conn: result = conn.execute( text("INSERT INTO test_data (id, name) VALUES (:id, :name)"), {"id": 300, "name": "MySQL+SQLAlchemy"} ) conn.commit()
result = conn.execute( text("SELECT name FROM test_data WHERE id = :id"), {"id": 300} ) row = result.fetchone() assert row[0] == "MySQL+SQLAlchemy"Snowflake Protocol Tests (Silver)
import pytestimport snowflake.connector
SNOWFLAKE_CONFIG = { 'account': 'local', 'user': 'test', 'password': 'test', 'host': 'localhost', 'protocol': 'https', 'port': 8443, 'warehouse': 'default', 'database': 'helios', 'schema': 'public'}
class TestSnowflakeProtocol:
@pytest.mark.protocol @pytest.mark.p1 def test_snowflake_connect(self, heliosdb_cluster): """SF-001: Connect via snowflake-connector-python""" conn = snowflake.connector.connect(**SNOWFLAKE_CONFIG) cur = conn.cursor() cur.execute("SELECT 1 as value") result = cur.fetchone() assert result[0] == 1 conn.close()
@pytest.mark.protocol @pytest.mark.p1 def test_snowflake_query_submit(self, heliosdb_cluster, test_table): """SF-002: Query submission and result fetch""" conn = snowflake.connector.connect(**SNOWFLAKE_CONFIG) cur = conn.cursor()
# Insert data cur.execute( "INSERT INTO test_data (id, name, value) VALUES (?, ?, ?)", (1, "Snowflake", 500) )
# Query cur.execute("SELECT name, value FROM test_data WHERE id = ?", (1,)) row = cur.fetchone() assert row == ("Snowflake", 500)
conn.close()
@pytest.mark.protocol @pytest.mark.p1 def test_snowflake_fetchmany(self, heliosdb_cluster, test_table): """SF-003: Batch fetching""" conn = snowflake.connector.connect(**SNOWFLAKE_CONFIG) cur = conn.cursor()
# Insert 500 rows for i in range(500): cur.execute( "INSERT INTO test_data (id, value) VALUES (?, ?)", (i, i * 3) )
# Fetch in batches cur.execute("SELECT id, value FROM test_data ORDER BY id") batch = cur.fetchmany(100) assert len(batch) == 100 assert batch[0] == (0, 0)
conn.close()
@pytest.mark.protocol @pytest.mark.p1 def test_snowflake_cancel_query(self, heliosdb_cluster, test_table): """SF-004: Query cancellation""" conn = snowflake.connector.connect(**SNOWFLAKE_CONFIG) cur = conn.cursor()
# Start long-running query import threading import time
def run_query(): cur.execute("SELECT * FROM large_table") # Assumes large_table exists
thread = threading.Thread(target=run_query) thread.start()
# Cancel after 1 second time.sleep(1) cur.abort_query(cur.sfqid)
thread.join(timeout=5)
# Verify cancellation assert not thread.is_alive()
conn.close()Databricks SQL Protocol Tests (Silver)
import pytestfrom databricks import sql
DATABRICKS_CONFIG = { 'server_hostname': 'localhost', 'http_path': '/dbsql', 'access_token': 'test_token'}
class TestDatabricksProtocol:
@pytest.mark.protocol @pytest.mark.p1 def test_databricks_connect(self, heliosdb_cluster): """DB-001: Connect via databricks-sql-connector""" with sql.connect(**DATABRICKS_CONFIG) as conn: with conn.cursor() as cur: cur.execute("SELECT 1 as value") result = cur.fetchone() assert result[0] == 1
@pytest.mark.protocol @pytest.mark.p1 def test_databricks_query(self, heliosdb_cluster, test_table): """DB-002: Query execution""" with sql.connect(**DATABRICKS_CONFIG) as conn: with conn.cursor() as cur: cur.execute( "INSERT INTO test_data (id, name) VALUES (?, ?)", (100, "Databricks") )
cur.execute("SELECT name FROM test_data WHERE id = ?", (100,)) assert cur.fetchone()[0] == "Databricks"
@pytest.mark.protocol @pytest.mark.p1 def test_databricks_fetchmany(self, heliosdb_cluster, test_table): """DB-003: Batch fetching""" with sql.connect(**DATABRICKS_CONFIG) as conn: with conn.cursor() as cur: # Insert 300 rows for i in range(300): cur.execute( "INSERT INTO test_data (id, value) VALUES (?, ?)", (i, i * 5) )
cur.execute("SELECT id FROM test_data ORDER BY id") batch = cur.fetchmany(50) assert len(batch) == 50Pinecone Protocol Tests (Silver)
import pytestfrom pinecone import Pinecone, ServerlessSpec
class TestPineconeProtocol:
@pytest.mark.protocol @pytest.mark.p1 def test_pinecone_connect(self, heliosdb_cluster): """PC-001: Initialize Pinecone client""" pc = Pinecone( api_key='test_key', host='https://localhost:8443/pinecone' )
# List indexes indexes = pc.list_indexes() assert isinstance(indexes, list)
@pytest.mark.protocol @pytest.mark.p1 def test_pinecone_upsert(self, heliosdb_cluster): """PC-002: Upsert vectors""" pc = Pinecone(api_key='test_key', host='https://localhost:8443/pinecone')
# Create index pc.create_index( name='test_index', dimension=384, metric='cosine', spec=ServerlessSpec(cloud='local', region='default') )
index = pc.Index('test_index')
# Upsert vectors vectors = [ ("vec1", [0.1] * 384, {"category": "A"}), ("vec2", [0.2] * 384, {"category": "B"}), ("vec3", [0.3] * 384, {"category": "A"}), ]
index.upsert(vectors=vectors)
# Query results = index.query( vector=[0.15] * 384, top_k=2, filter={"category": "A"} )
assert len(results['matches']) == 2 assert results['matches'][0]['id'] in ['vec1', 'vec3']
@pytest.mark.protocol @pytest.mark.p1 def test_pinecone_filtered_query(self, heliosdb_cluster): """PC-003: Filtered vector search""" pc = Pinecone(api_key='test_key', host='https://localhost:8443/pinecone') index = pc.Index('test_index')
# Insert with metadata vectors = [ (f"id_{i}", [i/100.0] * 384, {"price": i * 10, "category": f"cat_{i%3}"}) for i in range(100) ] index.upsert(vectors=vectors)
# Query with filter results = index.query( vector=[0.5] * 384, top_k=5, filter={"price": {"$lt": 500}, "category": "cat_1"} )
assert len(results['matches']) <= 5 for match in results['matches']: assert match['metadata']['price'] < 500 assert match['metadata']['category'] == "cat_1"CI/CD Integration
name: Protocol Compatibility Tests
on: push: branches: [main, develop] pull_request:
jobs: protocol-test: runs-on: ubuntu-latest strategy: fail-fast: false matrix: protocol: - { name: postgresql, priority: p0 } - { name: mysql, priority: p0 } - { name: snowflake, priority: p1 } - { name: databricks, priority: p1 } - { name: pinecone, priority: p1 } - { name: sqlserver, priority: p2 } - { name: db2, priority: p2 } - { name: oracle, priority: p2 }
steps: - uses: actions/checkout@v3
- name: Set up Python uses: actions/setup-python@v4 with: python-version: '3.11'
- name: Install dependencies run: | pip install pytest pytest-asyncio pip install psycopg2-binary asyncpg sqlalchemy pip install pymysql mysql-connector-python pip install snowflake-connector-python pip install databricks-sql-connector pip install pinecone-client pip install pyodbc pymssql pip install ibm_db pip install oracledb
- name: Start HeliosDB cluster run: | docker-compose -f tests/protocol/docker/docker-compose.protocol.yml up -d sleep 10 # Wait for startup
- name: Run protocol tests run: | pytest tests/protocol/test_${{ matrix.protocol.name }}.py \ -v \ -m "protocol and ${{ matrix.protocol.priority }}" \ --junitxml=results-${{ matrix.protocol.name }}.xml
- name: Upload test results if: always() uses: actions/upload-artifact@v3 with: name: protocol-test-results path: results-*.xml
- name: Fail on P0 failures if: matrix.protocol.priority == 'p0' && failure() run: exit 1Test Execution
Local Testing
# Run all protocol testspytest tests/protocol/ -v
# Run only P0 (must-pass) testspytest tests/protocol/ -v -m "protocol and p0"
# Run specific protocolpytest tests/protocol/test_postgresql.py -v
# Run with coveragepytest tests/protocol/ -v --cov=heliosdb --cov-report=htmlDocker-based Testing
# Start HeliosDB clusterdocker-compose -f tests/protocol/docker/docker-compose.protocol.yml up -d
# Run testspytest tests/protocol/ -v
# Teardowndocker-compose -f tests/protocol/docker/docker-compose.protocol.yml downQuality Gates
Must-Pass Criteria (P0)
- PostgreSQL: All Gold tests pass
- MySQL: All Gold tests pass
- Protocol autodetect works correctly
- TLS/SSL connections succeed
- Error codes match protocol specs
Success Criteria (P1)
- Snowflake: Silver tests pass
- Databricks: Silver tests pass
- Pinecone: Silver tests pass
Future Goals (P2)
- SQL Server: Bronze tests pass
- DB2: Bronze tests pass
- Oracle: Bronze tests pass