recommender-service-flask/src/app.py

297 lines
8.5 KiB
Python

from flask import Flask, jsonify, request, Response
# from dotenv import load_dotenv
from services.embedding_service import Embedding
import logging
import json
import uuid_utils as uuid
from qdrant_client.http import models
from qdrant_client.models import PointStruct
from werkzeug.exceptions import BadRequest
from qdrant_services.qdrant import Qdrant
from config import LOGGING_LEVEL, LOGGING_FORMAT
logging.basicConfig(
level=LOGGING_LEVEL,
format=LOGGING_FORMAT,
)
logger = logging.getLogger(__name__)
# load_dotenv()
app = Flask(__name__)
@app.get("/")
def hello():
return jsonify({"message":"Hello, World!"})
@app.get("/healthz")
def health_healthz():
return jsonify("OK"), 200
@app.post("/add_points")
def add_points():
#doc uuid and clauses: [{},{}]
if request.content_type == 'application/json':
try:
data = request.get_json(force=True)
if data is None:
raise BadRequest
except BadRequest:
return jsonify({"error": "Invalid JSON"}), 400
else:
data = request.form.to_dict()
doc_uuid=data.get("doc_uuid")
clauses = data.get("clauses", [])
if not doc_uuid or not clauses:
return jsonify({"error": "Missing 'doc_uuid' or 'clauses' in request"}), 400
vector_points = []
for clause in clauses:
clause_id = clause.get("clause_id")
line_item = clause.get("line_item")
line_number = clause.get("line_number")
if not clause_id or not line_item:
return jsonify({"error":"Missing fileds in clause"}), 400
try:
clause_vectors=Embedding.call(line_item)
except Exception as e:
return jsonify({"error": f"Embedding failed: {str(e)}"}), 500
payload = {
"doc_id": doc_uuid,
"clause_id": clause_id,
"line_item": line_item,
"line_number": line_number
}
for vector in clause_vectors:
point = PointStruct(
id=str(uuid.uuid7()),
vector=vector.tolist(),
payload=payload
)
vector_points.append(point)
try:
result = Qdrant.get_client().upsert(
points= vector_points
)
except Exception as e:
return jsonify({"error": f"Qdrant upsert failed: {str(e)}"}), 500
return jsonify({"message": f"{len(vector_points)} points added successfully"}), 200
@app.post("/add")
def add():
if request.content_type == 'application/json':
try:
data = request.get_json(force=True)
if data is None:
raise BadRequest
except BadRequest:
return jsonify({"error": "Invalid JSON"}), 400
else:
data = request.form.to_dict()
clause_text = data.get("clause", '')
user_payload = data.get("payload", None)
user_meta = data.get("meta", None)
if clause_text=='':
return jsonify({"error":"clause is required"}), 400
payload = {"clause": clause_text}
if user_payload is not None:
payload["user_payload"] = user_payload
if user_meta is not None:
# decode for form-data only
payload["user_meta"] = user_meta if isinstance(user_meta, dict) else json.loads(user_meta)
clause_vectors = Embedding.call(clause_text)
result = Qdrant.get_client().upsert(
points=[
PointStruct(
id=str(uuid.uuid7()),
vector=vector.tolist(),
payload=payload
)
for vector in clause_vectors
]
)
logger.info("add clause [%s], result: %s" % (clause_text, result))
return Response(status=202)
@app.post("/delete_points")
def delete_points():
if request.content_type == 'application/json':
try:
data = request.get_json(force=True)
if data is None:
raise BadRequest
except BadRequest:
return jsonify({"error": "Invalid JSON"}), 400
else:
data = request.form.to_dict()
doc_uuid=data.get("doc_uuid")
if not doc_uuid:
return jsonify({"error": "Missing 'doc_uuid' in request"}), 400
filter_condition = models.Filter(
must=[
models.FieldCondition(
key="doc_id",
match=models.MatchValue(value=doc_uuid)
)
]
)
try:
result = Qdrant.get_client().delete(
points_selector=models.FilterSelector(
filter=filter_condition
),
wait=True
)
if result and result.status == "completed":
return jsonify({"message": f"Deleted all points with doc_id = {doc_uuid}"}), 200
else:
return jsonify({"warning": "Deletion request did not complete successfully"}), 500
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.post("/delete")
def delete():
if request.content_type == 'application/json':
try:
data = request.get_json(force=True)
if data is None:
raise BadRequest
except BadRequest:
return jsonify({"error": "Invalid JSON"}), 400
else:
data = request.form.to_dict()
id = data.get("id", None)
filter_ = data.get("filter", None)
wait = data.get("wait", True)
if id is None and filter_ is None:
return jsonify({"error": "id or filter is required"}), 400
if id is not None and filter_ is not None:
return jsonify({"error": "id and filter cannot be used together"}), 400
try:
if id:
result = Qdrant.get_client().delete(
points_selector=models.PointIdsList(
points=[id],
),
wait=wait,
)
if filter_:
filter_ = json.loads(filter_)
filter_ = filter_[0]
if filter_['type'] != 'match':
return jsonify({"error": "unsupported filter type"}), 400
f = models.Filter(
must=[
models.FieldCondition(
key=filter_['key'], match=models.MatchValue(value=filter_['value'])
),
],
)
filter_ = f
result = Qdrant.get_client().delete(
points_selector=models.FilterSelector(
filter=filter_,
),
wait=wait,
)
if wait:
return jsonify({"message": "Success"}),200
else:
return jsonify({"message": "Accepted"}), 202
except Exception as e:
logger.error(f"Qdrant deletion failed: {str(e)}")
return jsonify({"error": "Failed to delete from Qdrant"}), 500
@app.get("/recommend")
def recommend():
try:
from services.recommend_service import Recommend
clause = request.args.get("clause", '')
# filter shall be a json array. each element is a dict with following keys:
# key: field name
# value: field value
# type : operator type, default (and the only supported one for now) is "match"
# example: [{"key": "meta.source", "value": "km", "type": "match"}]
# please note we limit the number of filters to 1 for now
filter_ = request.args.get("filter", None)
logger.info(f"recommend request received: [{clause}] with filter [{filter_}]")
updated_clause = clause.replace('"', '')
if filter_:
filter_ = json.loads(filter_)
filter_ = filter_[0]
if filter_['type'] != 'match':
raise ValueError('unsupported filter type')
f = models.Filter(
must=[
models.FieldCondition(
key=filter_['key'], match=models.MatchValue(value=filter_['value'])
),
],
)
filter_ = f
result = Recommend.call(updated_clause, filter=filter_)
if result is None:
result = {"data": []}
else:
result = {"data": [i.dict() for i in result]}
logger.info("recommender returns %d results" % len(result["data"]))
return jsonify(result), 200
except Exception as e:
logger.error('recommend error: %s', type(e).__name__, exc_info=True)
return jsonify({"error":"Internal Server Error"}), 500
if __name__=="__main__":
app.run(debug=True, port=8000)