A python SDK for GaussDB Multimodal Store (Vector Store / Full Text Search), based on SQLAlchemy, compatible with Milvus API.
- git clone this repo, then install with:
poetry install- install with pip (already been released):
pip install pygsvector-0.1.0-py3-none-any.whlYou can build document locally with sphinx:
mkdir build
make htmlpygsvector supports two modes:
Milvus compatible mode: You can use theMilvusLikeClientclass to use vector storage in a way similar to the Milvus APISQLAlchemy hybrid mode: You can use the vector storage function provided by theGsVecClientclass and execute the relational database statement with the SQLAlchemy library. In this mode, you can regardpygsvectoras an extension of SQLAlchemy.
Refer to tests/test_milvus_like_client.py for more examples.
A simple workflow to perform ANN search with GaussDB Vector Store:
- setup a client:
from pygsvector import *
client = MilvusLikeClient(uri="127.0.0.1:2881", user="test@test")- create a collection with vector index:
test_collection_name = "ann_test"
# define the schema of collection with optional partitions
range_part = GsRangePartition(False, range_part_infos = [
RangeListPartInfo('p0', 100),
RangeListPartInfo('p1', 'maxvalue'),
], range_expr='id')
schema = client.create_schema(partitions=range_part)
# define field schema of collection
schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True)
schema.add_field(field_name="embedding", datatype=DataType.FLOAT_VECTOR, dim=3)
schema.add_field(field_name="meta", datatype=DataType.JSON, nullable=True)
# define index parameters
idx_params = client.prepare_index_params()
idx_params.add_index(
field_name='embedding',
index_type=VecIndexType.GSDISKANN,
index_name='vidx',
local_index=True,
metric_type="L2",
params={},
)
# create collection
client.create_collection(
collection_name=test_collection_name,
schema=schema,
index_params=idx_params,
)- insert data to your collection:
# prepare
vector_value1 = [0.748479,0.276979,0.555195]
vector_value2 = [0, 0, 0]
data1 = [{'id': i, 'embedding': vector_value1} for i in range(10)]
data1.extend([{'id': i, 'embedding': vector_value2} for i in range(10, 13)])
data1.extend([{'id': i, 'embedding': vector_value2} for i in range(111, 113)])
# insert data
client.insert(collection_name=test_collection_name, data=data1)- do ann search:
res = client.search(collection_name=test_collection_name, data=[0,0,0], anns_field='embedding', limit=5, output_fields=['id'])
# For example, the result will be:
# [{'id': 112}, {'id': 111}, {'id': 10}, {'id': 11}, {'id': 12}]Refer to tests/test_milvus_like_client.py for more examples.
- setup a client:
from pygsvector import *
from sqlalchemy import Column, Integer, JSON
from sqlalchemy import func
client = GsVecClient(uri="127.0.0.1:2881", user="test@test")- create a partitioned table with vector index:
# create partitioned table
range_part = GsRangePartition(False, range_part_infos=[
RangeListPartInfo('p0', 100),
RangeListPartInfo('p1', 'maxvalue'),
], range_expr='id')
test_collection_name = "ann_search_test"
cols = [
Column('id', Integer, primary_key=True, autoincrement=False),
Column('embedding', FLOATVECTOR(3)),
Column('meta', JSON)
]
client.create_table(test_collection_name, columns=cols, partitions=range_part)
# create vector index
client.create_index(
test_collection_name,
is_vec_index=True,
index_name="vidx13",
column_names=["embedding"],
index_type="GSDISKANN",
metric_type="l2",
local_index=True,
vidx_params="enable_pq=false",
)- insert data to your collection:
# insert data
vector_value1 = [0.748479, 0.276979, 0.555195]
vector_value2 = [0, 0, 0]
data1 = [{'id': i, 'embedding': vector_value1} for i in range(10)]
data1.extend([{'id': i, 'embedding': vector_value2} for i in range(10, 13)])
data1.extend([{'id': i, 'embedding': vector_value2} for i in range(111, 113)])
client.insert(test_collection_name, data=data1)- do ann search:
# perform ann search with basic column selection
res = client.ann_search(
test_collection_name,
vec_data=[0, 0, 0],
vec_column_name='embedding',
distance_func=l2_distance,
topk=5,
output_column_names=['id'] # Legacy parameter
)
# For example, the result will be:
# [(112,), (111,), (10,), (11,), (12,)]
# perform ann search with SQLAlchemy expressions (recommended)
from sqlalchemy import Table, text, func
table = Table(test_collection_name, client.metadata_obj, autoload_with=client.engine)
res = client.ann_search(
test_collection_name,
vec_data=[0, 0, 0],
vec_column_name='embedding',
distance_func=l2_distance,
topk=5,
output_columns=[
table.c.id,
table.c.meta,
(table.c.id + 1000).label('id_plus_1000'),
text("JSON_EXTRACT(meta, '$.key') as extracted_key")
]
)
# For example, the result will be:
# [(112, '{"key": "value"}', 1112, 'value'), ...]
# perform ann search with distance threshold (filter results by distance)
res = client.ann_search(
test_collection_name,
vec_data=[0, 0, 0],
vec_column_name='embedding',
distance_func=l2_distance,
with_dist=True,
topk=10,
output_column_names=['id'],
distance_threshold=0.5 # Only return results where distance <= 0.5
)
# Only returns results with distance <= 0.5
# For example, the result will be:
# [(10, 0.0), (11, 0.0), ...] # Only includes results with distance <= 0.5The ann_search method supports flexible output column selection through the output_columns parameter:
output_columns(recommended): Accepts SQLAlchemy Column objects, expressions, or a mix of both- Column objects:
table.c.id,table.c.name- Expressions:
(table.c.age + 10).label('age_plus_10') - String functions:
func.concat(table.c.name, ' (', table.c.age, ')').label('name_age')
- Expressions:
output_column_names(legacy): Accepts list of column name strings- Example:
['id', 'name', 'meta'] - Parameter Priority:
output_columnstakes precedence overoutput_column_nameswhen both are provided distance_threshold(optional): Filter results by distance threshold- Type:
Optional[float]- Only returns results where
distance <= threshold - Example:
distance_threshold=0.5returns only results with distance <= 0.5 - Use case: Quality control for similarity search, only return highly similar results
- Only returns results where
- If you want to use pure
SQLAlchemyAPI withGaussDBdialect, you can just get anSQLAlchemy.engineviaclient.engine. The engine can also be created as following:
import pygsvector
from sqlalchemy.dialects import registry
from sqlalchemy import create_engine
uri: str = "127.0.0.1:2881"
user: str = "root@test"
password: str = ""
db_name: str = "test"
registry.register("postgresql.gaussdb", "pygsvector", "GaussDBDialect")
connection_str = (
f"postgresql+gaussdb://{user}:{password}@{uri}/{db_name}"
)
engine = create_engine(connection_str, **kwargs)- Async engine is also supported:
import pygsvector
from sqlalchemy.dialects import registry
from sqlalchemy.ext.asyncio import create_async_engine
uri: str = "127.0.0.1:2881"
user: str = "root@test"
password: str = ""
db_name: str = "test"
registry.register("postgresql.asyncgaussdb", "pygsvector", "AsyncGaussDBDialect")
connection_str = (
f"postgresql+asyncgaussdb://{user}:{password}@{uri}/{db_name}"
)
engine = create_async_engine(connection_str)- For further usage in pure
SQLAlchemymode, please refer to SQLAlchemy