最近、私たちはカタログにAIで駆動された意味論的検索を実装するタスクを与えられました。すでにSnowflakeを使用しているため、この機能はCortex Search Serviceを使用して実装することにしました。
Cortex Searchが何であるか知らない場合、概要を確認するためにこのリンクを迅速に確認してください。 基本的には、Cortex Searchは既にSnowflakeに格納されているデータの上に直接、低遅延の意味論的およびフルテキスト検索を構築することができます。
この記事では、私がCortex Search Serviceを私たちのPythonバックエンドアプリケーションに統合する経験を共有し、私たちが直面した落とし穴についても触れます。
検索列に焦点を当てる
SEARCH_TEXT 列、より正確にはCortex Searchの「検索列」は、Cortex Searchがインデックス化して検索取得に使用する列です。
最初に犯した間違いの一つは、ほぼすべての利用可能なフィールドをSEARCH_TEXT列に入れようとしたことです。
最初は論理が合理的に見えました:Cortex Searchが見るフィールドが多いほど、文脈が豊富になります。しかし実際には、これが検索可能なテキストにノイズを混ぜることがあります。
例えば、ID、ステータスフラグ、テナント/会社ID、通貨ID、内部カテゴリIDなどのフィールドは、意味論的検索には通常役に立たない:
'id:' || COALESCE(id::STRING, ''),
'warehouse_id:' || COALESCE(warehouse_id::STRING, ''),
'project_id:' || COALESCE(project_id::STRING, ''),
'is_active:' || COALESCE(is_active::STRING, ''),
'status:' || COALESCE(status, ''),
'currency_id:' || COALESCE(currency_id::STRING, '')
これらのフィールドは、ユーザーが検索している内容の一部では通常ない。ATTRIBUTESとして公開し、フィルタリングのために使用すべきです。次のセクションで詳しく説明します。
テヘSEARCH_TEXT項目の実際の意味を説明するフィールドに焦点を当てるべきです:
CONCAT_WS(
' ',
title,
brand,
origin,
category_name,
subcategory_name,
short_description
) AS SEARCH_TEXT
エンドユーザーの検索クエリに有用なフィールドを含める必要があります。
フィルタリング可能なフィールドを公開するATTRIBUTES
Cortex Search Serviceの設定におけるもう一つの小さいしかし重要な詳細は、ATTRIBUTES プロパティ.
The ON 列は検索可能なテキスト列です。これは Cortex Search がユーザーのクエリをマッチングするために使用します。しかし、組織、ステータス、カテゴリ、ブランド、地域、または利用可能性などのメタデータで結果をフィルタリングする必要がある場合、それらの列はサービスが作成される際に ATTRIBUTES に追加する必要があります。
属性は主に検索されるテキストではありません。検索結果と共に返される列であり、フィルタリングや表示に利用できます。Snowflakeドキュメントにおいて、Cortex SearchがフィルタリングするのはCREATE CORTEX SEARCH SERVICEコマンドで指定されたATTRIBUTES列に基づいています。
CREATE OR REPLACE CORTEX SEARCH SERVICE DEMO_DB.SEARCH.PRODUCT_SEARCH_SERVICE ON SEARCH_TEXT
ATTRIBUTES (
CITY,
COUNTRY,
CURRENCY,
IS_ACTIVE
)
WAREHOUSE = WH_SEARCH_DEMO
TARGET_LAG = '1 hour'
AS
SELECT
ID,
CITY,
COUNTRY,
CURRENCY,
IS_ACTIVE
FROM DEMO_DB.SEARCH.PRODUCT_SEARCH_SOURCE;
覚えておくべきこと:後でフィルタリングしたいすべての列は属性として利用可能でなければならない。また、SnowflakeはATTRIBUTES内の列はサービスを作成するために使用されるソースクエリに含まれている必要があると指摘している。
その後、Pythonコードでこれらのフィルタリングを適用できる:
from typing import Any, Dict, List
COUNTRY_ATTRIBUTE = "COUNTRY"
CITY_ATTRIBUTE = "CITY"
IS_ACTIVE_ATTRIBUTE = "IS_ACTIVE"
filters: List[Dict[str, Any]] = [
{
"@or": [
{
"@eq": {
COUNTRY_ATTRIBUTE: "UK",
}
},
{
"@eq": {
CITY_ATTRIBUTE: "London",
}
},
],
},
{
"@eq": {
IS_ACTIVE_ATTRIBUTE: True,
}
},
]
response = search_service.search(
query=query,
columns=[
"ID",
"COUNTRY",
"CITY",
"IS_ACTIVE",
],
filter={
"@and": filters,
},
limit=20,
)
フィルターは選択的で、理解しやすいように保とう。良いフィルターは、Cortex Searchが結果をランク付けして返す前に検索空間を減少させるべきです。フィルターデータペイロードがあまりにも大きくなったり、あまりにもネスト的になったりすると、検索ソーステーブルを調整すべきで、あまりにもアプリケーションロジックを検索クエリに移すべきではない兆候かもしれません。
TARGET_LAG
に注意してください。もう一つ重要な議論があります。CREATE OR REPLACE CORTEX SEARCH SERVICEコマンドはTARGET_LAG.
簡単な言葉で言えば、TARGET_LAGはCortex Searchインデックスがソーステーブルと比較してどの程度新鮮であるかを制御します
例えば:
TARGET_LAG = '10 minutes'
これは新しい行がすぐに検索可能になることを意味するわけではありません。Cortex Searchはまだ内部インデックスを更新する必要があります。したがって、ソーステーブルに行を挿入または更新すると、その変更は次の更新が発生した後のみ検索結果に表示されます。
これは、管理された埋め込みを使用する場合に特に重要です。Cortex Searchは、ユーザーが意味論的検索を通じてそのレコードを見つける前に、更新されたソースデータを処理し、埋め込みを作成または更新し、検索インデックスをリフレッシュする必要があります。
したがって、TARGET_LAGが長すぎると、検索結果が古くなることがあります。新しく公開されたアイテムはすでにソーステーブルに存在しているかもしれませんが、しばらくの間、ユーザーは検索でそれを見つけることができません。
同時に、TARGET_LAGをあまりに低く設定することも常に最善の答えではありません。より頻繁なリフレッシュは、バックグラウンドでSnowflakeの作業を増やす可能性があり、それがクレジットの使用量を増加させることがあります。Snowflakeはまた、目標のレイグがあまりに低すぎると、必要以上にインデックスを頻繁にリフレッシュする可能性があると指摘しています.
したがって、適切な値は、検索結果が実際にどれだけ新鮮である必要があるかによって決まります.
CREATE OR REPLACE CORTEX SEARCH SERVICE DEMO_DB.SEARCH.PRODUCT_SEARCH_SERVICE
ON SEARCH_TEXT
ATTRIBUTES
(
ITEM_ID,
STATUS,
IS_ACTIVE,
ACCOUNT_ID
)
WAREHOUSE = WH_SEARCH_DEMO
TARGET_LAG = '30 minutes'
AS
SELECT
ITEM_ID,
SEARCH_TEXT,
STATUS,
IS_ACTIVE,
ACCOUNT_ID
FROM DEMO_DB.SEARCH.PRODUCT_SEARCH_SOURCE;
ユーザー向けカタログ検索では、新しく公開されたアイテムが迅速に表示されるように、15〜20分程度が適切かもしれません
内部の知識ベース、ドキュメント検索、または頻繁に変更されないデータの場合は、1時間以上で問題ありません
あなたの使用例に適した埋め込みモデルを選択してください
Cortex Searchはベクトル検索段階で埋め込みモデルを使用します。簡単に言えば、このモデルはあなたの検索列とユーザーのクエリをベクトルに変換し、Cortex Searchが意味的に似ているレコードだけでなく、同じキーワードを含むレコードを見つけることができます。SnowflakeはCortex Searchサービスを作成する際にEMBEDDING_MODELパラメータでモデルを選択することができます.
CREATE OR REPLACE CORTEX SEARCH SERVICE DEMO_DB.SEARCH.PRODUCT_SEARCH_SERVICE
ON SEARCH_TEXT
ATTRIBUTES
(
ITEM_ID,
STATUS,
IS_ACTIVE,
ACCOUNT_ID
)
WAREHOUSE = WH_SEARCH_DEMO
TARGET_LAG = '30 minutes'
EMBEDDING_MODEL = 'snowflake-arctic-embed-m-v1.5' # custom EMBEDDING_MODEL
AS
SELECT
ITEM_ID,
SEARCH_TEXT,
STATUS,
IS_ACTIVE,
ACCOUNT_ID
FROM DEMO_DB.SEARCH.PRODUCT_SEARCH_SOURCE;
Snowflakeリストsnowflake-arctic-embed-m-v1.5をデフォルトのCortex Search埋め込みモデルとして設定しています。768の出力次元、512トークンのコンテキストウィンドウ、英語のみの言語サポートを備えています。Snowflakeは、利用可能なCortex Searchモデルの中で最も高速なインデックスングオプションとしても説明しています。これにより、英語のみのカタログやインデックスング速度が重要な内部検索の良いスタートポイントになります。
カタログが多言語の場合、デフォルトの英語のみのモデルがあなたには適さない可能性があります。その場合、snowflake-arctic-embed-l-v2.0、snowflake-arctic-embed-l-v2.0-8k または voyage-multilingual-2 のような多言語モデルを確認してください。サポートされているモデル、次元、コンテキストウィンドウ、言語サポートの完全なリストについては、公式のSnowflakeエンコーディングモデル表を参照してください。
SnowflakeのCREATE CORTEX SEARCH SERVICE のドキュメントでは、EMBEDDING_MODEL をサービス定義の一部として示しており、変更することは通常、ランタイムパラメータの調整ではなく、サービスの再作成を意味します。したがって、これは早期にテストする価値があります。特に、製品が多言語データを持っているか、または異なる言語で検索を行うユーザーがいる場合にはなおさらです.
Cortex接続のウォームアップ
コアスーチェックを統合した主な理由の一つは、大量のアイテムがあるため、検索クエリの遅延を減らすことでした。
最初のボトルネックは接続処理から来ました。私はSnowflake Pythonライブラリを使用しました。、そして各新しい検索リクエストには新しい接続を開く必要がありました。その接続設定自体が約1~1.5秒かかり、実際の検索クエリはたった500~600ミリ秒でした。そのため、検索自体を最適化する前に、リクエストパスから接続設定のコストを削除することに焦点を当てました。
最適化は、Snowflakeの接続設定をリクエストパスから移動することでした。私はwarmup()メソッドを追加し、Cortex Searchサービスを一度解決し、Snowflakeの接続、Rootオブジェクト、サービス参照をワーカープロセスレベルでキャッシュしました。キャッシュはロックで保護されているため、複数のリクエストが同時に到着した場合でも初期化は安全です。
def warmup(self) -> None:
self._get_service()
def _get_service(self):
cache_key = (self.database, self.schema, self.service_name)
service = self.__class__._service_cache.get(cache_key)
if service:
return service
with self.__class__._lock:
service = self.__class__._service_cache.get(cache_key)
if service:
return service
root = self._get_root()
service = (
root.databases[self.database]
.schemas[self.schema]
.cortex_search_services[self.service_name]
)
self.__class__._service_cache[cache_key] = service
return service
def _get_root(self):
root = self.__class__._root
connection = self.__class__._connection
if root and connection and not connection.is_closed():
return root
with self.__class__._lock:
root = self.__class__._root
connection = self.__class__._connection
if root and connection and not connection.is_closed():
return root
if connection and not connection.is_closed():
try:
connection.close()
except Exception:
pass
self.__class__._connection = None
self.__class__._root = None
self.__class__._service_cache = {}
try:
connection = snowflake.connector.connect(**self._connection_parameters)
except Exception as exc:
raise CortexSearchCatalogServiceError(
f"Failed to create Snowflake connection for Cortex search: {exc}"
)
self.__class__._connection = connection
root = Root(connection)
self.__class__._root = root
return root
その後、Gunicornのpost_forkフックからこのウォームアップメソッドを呼び出しました。Gunicornのワーカーは別プロセスであるため、それぞれのワーカーはフォークした後に独自のSnowflake接続を作成する必要があります。この変更により、接続はワーカースタートアップ時に開かれる代わりに、最初の検索リクエスト時に開かれるため、ユーザーフェース経路から1–1.5秒の接続設定コストが削除されます.
def post_fork(server, worker):
# Warm the Snowflake Cortex client in each worker process so the first user
# search does not pay the connection/session setup cost on the request path.
try:
if not _cortex_warmup_is_configured():
server.log.debug(
"Skipping Cortex warmup for worker pid=%s because Snowflake settings are incomplete.",
worker.pid,
)
return
from apps.db.cortex_search_services.cortex_search_catalog_service import CortexSearchCatalogService
CortexSearchCatalogService().warmup()
server.log.info("Cortex warmup completed for worker pid=%s", worker.pid)
except Exception:
server.log.exception("Cortex warmup failed for worker pid=%s", worker.pid)
コアな検索スコアリングの重みを調整
次に重要な最適化は遅延ではなく、結果の品質についてでした
Cortex Searchはハイブリッドランキングを使用しています。意味ベースの/ベクトル類似性、キーワード/テキストマッチング、ニューラルリランクイングを組み合わせることができます。Snowflakeはscoring_config.weightsを通じてこれを公開しており、ベクトル、テキスト、リランクイングが各スコアリングコンポーネントの相対的な寄与を制御します。デフォルトでは、これらのコンポーネントは均等な重みを持っていますが、使用例に応じてクエリごとに調整することができます。
例えば、私たちのケースでは、純粋なキーワードマッチングが、たまに一致する単語を含んでいるだけで、間違ったアイテムを上位にランク付けすることがありました。実際の例としては、「手袋保護」のサインが実際の手袋よりも上位にランク付けされたことがあります。テキストの一致は強かったですが、意味的にはユーザーが期待する製品ではありませんでした。
これを修正するために、ベクトルスコアの重みをテキストスコアに対して増やしました:
# Tune these weights to adjust Cortex ranking before results reach application code.
# vectors = semantic similarity, texts = keyword match, reranker = neural reranker.
# Raise vectors relative to texts to prevent keyword-heavy but semantically irrelevant
# items from outranking semantically correct ones.
DEFAULT_SCORING_CONFIG = {
"weights": {
"vectors": 2,
"texts": 1,
"reranker": 1,
}
}
これは重要なパラメータです。Cortex Searchがアプリケーションコードに結果を送る前に結果を並べ替える方法を直接変えます。検索が従来のフルテキスト検索に近い場合は、テキストの重みを増やすことをお勧めします。ユーザーが意味、同義語、説明、自然言語で検索する場合は、ベクトルの重みを増やすことでより良い結果が得られます。
テストする価値のある別のパラメータはリランカーです。Cortex Searchはデフォルトで意味ベースのリランキングを使用して関連性を向上させますが、リランキングはクエリの遅延を増加させることもあります。Snowflakeでは、追加の品質改善よりも低い遅延がより重要な場合にリランキングを無効にすることができます.
Cortex Searchから必要なデータのみを返してください
本番コードにCortex Searchを組み込む前に知っておくべき実用的な制限はペイロードサイズです
Cortex Searchのクエリに対するSnowflakeドキュメントのレスポンスサイズ制限:REST APIおよびPython APIのレスポンスペイロードは10MBを超えないこと
これは、Cortex Searchに送信するものと、Cortex Searchに返すように要求するものの両方に対して注意を払う必要があることを意味します
フィルタでは、本当に必要なものだけを送ることを試みてください。巨大なネストは、本番環境での静かなバグの最短の方法です。もし大きいOR同じフィールドに対する条件であれば、可能であればよりコンパクトな演算子を選ぶ。例えば、長いものを構築する代わりにorリスト:
filter={
"@or": [
{"@eq": {"STATUS": "published"}},
{"@eq": {"STATUS": "scheduled"}},
{"@eq": {"STATUS": "archived"}},
]
}
使うことを試みるin もしあなたのケースに合致する場合:
filter={
"@in": {
"STATUS": ["published", "scheduled", "archived"]
}
}
同じことはあなたのデータモデルにも当てはまります。すべての検索リクエストに大量のフィルタロジックが必要な場合、検索用モデルが十分に形成されていない可能性があります。時には、Cortex Searchリクエストにあまり多くのアプリケーションロジックを押し込む代わりに、フィルタリングが容易なフィールドを持つクリーンな検索ソーステーブルやビューを準備するのが良い場合があります。
返信の場合、私はCortex Searchの結果列を小さく保ちたいです。ほとんどのアプリケーション検索フローでは、Cortex Searchは完全なオブジェクトペイロードを返す必要はありません。内部IDのみを返すことができ、ランク付けやデバッグに必要な軽量フィールドをいくつか返すだけです.
response = search_service.search(
query=query,
columns=[
"ID",
],
filter=filter,
limit=50,
)
その後、アプリケーションはそれらのIDを取得し、メインアプリケーションデータベースから完全なレコードを取得できます:
item_ids = [row["ID"] for row in response.results]
items = (
CatalogItem.objects
.filter(id__in=item_ids)
.select_related("brand", "category")
.prefetch_related("tags")
)
これにより、Cortex Searchは最も得意とするものに集中します:関連するレコードを見つけること。あなたのアプリケーションデータベースは、すべてのドメインオブジェクトをロードすることに責任があり、またクエリコストを節約します。
結論
Cortex Searchは、Snowflakeにデータが既にある場合に、別の検索パイプラインをゼロから構築せずに低遅延の意味論的およびフルテキスト検索を行う強力なオプションです.
しかし、重要なのはそれが完全に「設定して忘れる」ものではないということです。品質とパフォーマンスは、サービスをどのように設定し、バックエンドアプリケーションからどのように使用するかによります.
私の経験から得られた最大の教訓は:
- 初めてのユーザーリクエスト前にSnowflake接続をウォームアップする;
- 検索列を実際にユーザーが検索するフィールドに集中させる;
- データと言語の要件に基づいて埋め込みモデルを選択する;
- 使用例に基づいて意味的またはキーワードベースのマッチングが必要かどうかによってスコアリングの重みを調整する;
- フィルタリングが必要な場合は、ATTRIBUTESに必要なフィールドを追加する;
- は、検索結果がどれだけ新鮮である必要があるかに基づいてTARGET_LAGを設定します;
- は、Cortex Searchから必要なデータのみを返し、主要なアプリケーションデータベースから完全なオブジェクトをロードします。
全体として、Cortex Searchはアプリケーションレベルの検索において非常にうまく機能することができ、特にカタログ検索、ドキュメント検索、Snowflakeに既に保存されている他のデータに対してです。












