Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 41 additions & 18 deletions apps/application/flow/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,33 +248,56 @@ def is_valid_start_node(self):
def is_valid_model_params(self):
node_list = [node for node in self.nodes if (
node.type == 'ai-chat-node' or node.type == 'question-node' or node.type == 'parameter-extraction-node')]

model_ids = []
nodes_requiring_validation = []
for node in node_list:
if (node.properties.get('node_data', {}).get('model_id_type') or 'custom') == 'reference':
continue
model = QuerySet(Model).filter(id=node.properties.get('node_data', {}).get('model_id')).first()
if model is None:
raise ValidationError(ErrorDetail(
_('The node {node} model does not exist').format(node=node.properties.get("stepName"))))
credential = get_model_credential(model.provider, model.model_type, model.model_name)
model_params_setting = node.properties.get('node_data', {}).get('model_params_setting')
model_params_setting_form = credential.get_model_params_setting_form(
model.model_name)
if model_params_setting is None:
model_params_setting = model_params_setting_form.get_default_form_data()
node.properties.get('node_data', {})['model_params_setting'] = model_params_setting
if node.properties.get('status', 200) != 200:
raise ValidationError(
ErrorDetail(_("Node {node} is unavailable").format(node=node.properties.get("stepName"))))
model_id = node.properties.get('node_data', {}).get('model_id')
if model_id:
model_ids.append(model_id)
nodes_requiring_validation.append((node, model_id))
if model_ids:
# 先批量获取数据
models_map = {str(model.id): model for model in QuerySet(Model).filter(id__in=model_ids)}
# 再循环从map中取数据进行处理
for node, model_id in nodes_requiring_validation:
model = models_map.get(str(model_id))
if model is None:
raise ValidationError(ErrorDetail(
_('The node {node} model does not exist').format(node=node.properties.get("stepName"))))
credential = get_model_credential(model.provider, model.model_type, model.model_name)
model_params_setting = node.properties.get('node_data', {}).get('model_params_setting')
model_params_setting_form = credential.get_model_params_setting_form(
model.model_name)
if model_params_setting is None:
model_params_setting = model_params_setting_form.get_default_form_data()
node.properties.get('node_data', {})['model_params_setting'] = model_params_setting
if node.properties.get('status', 200) != 200:
raise ValidationError(
ErrorDetail(_("Node {node} is unavailable").format(node=node.properties.get("stepName"))))

node_list = [node for node in self.nodes if (node.type == 'function-lib-node')]

function_lib_ids = []
nodes_requiring_tool_validation = []
for node in node_list:
function_lib_id = node.properties.get('node_data', {}).get('function_lib_id')
if function_lib_id is None:
raise ValidationError(ErrorDetail(
_('The library ID of node {node} cannot be empty').format(node=node.properties.get("stepName"))))
f_lib = QuerySet(Tool).filter(id=function_lib_id).first()
if f_lib is None:
raise ValidationError(ErrorDetail(_("The function library for node {node} is not available").format(
node=node.properties.get("stepName"))))
function_lib_ids.append(function_lib_id)
nodes_requiring_tool_validation.append((node, function_lib_id))
if function_lib_ids:
# 先批量获取数据
tools_map = {str(tool.id): tool for tool in QuerySet(Tool).filter(id__in=function_lib_ids)}
# 再循环从map中取数据进行处理
for node, function_lib_id in nodes_requiring_tool_validation:
f_lib = tools_map.get(str(function_lib_id))
if f_lib is None:
raise ValidationError(ErrorDetail(_("The function library for node {node} is not available").format(
node=node.properties.get("stepName"))))

def is_valid_base_node(self):
base_node_list = [node for node in self.nodes if node.id == 'base-node']
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ def write_context_stream(node_variable: Dict, workflow_variable: Dict, node: INo
@param workflow: 工作流管理器
"""
response = node_variable.get("result")
answer = ""
reasoning_content = ""
answer_chunks = []
reasoning_content_chunks = []
model_setting = node.context.get(
"model_setting",
{"reasoning_content_enable": False, "reasoning_content_end": "</think>", "reasoning_content_start": "<think>"},
Expand All @@ -81,10 +81,10 @@ def write_context_stream(node_variable: Dict, workflow_variable: Dict, node: INo
reasoning_content_chunk = chunk.additional_kwargs.get("reasoning_content", "")
else:
reasoning_content_chunk = reasoning_chunk.get("reasoning_content")
answer += content_chunk
answer_chunks.append(content_chunk)
if reasoning_content_chunk is None:
reasoning_content_chunk = ""
reasoning_content += reasoning_content_chunk
reasoning_content_chunks.append(reasoning_content_chunk)
yield {
"content": content_chunk,
"reasoning_content": reasoning_content_chunk
Expand All @@ -93,14 +93,16 @@ def write_context_stream(node_variable: Dict, workflow_variable: Dict, node: INo
}

reasoning_chunk = reasoning.get_end_reasoning_content()
answer += reasoning_chunk.get("content")
answer_chunks.append(reasoning_chunk.get("content"))
reasoning_content_chunk = ""
if not response_reasoning_content:
reasoning_content_chunk = reasoning_chunk.get("reasoning_content")
yield {
"content": reasoning_chunk.get("content"),
"reasoning_content": reasoning_content_chunk if model_setting.get("reasoning_content_enable", False) else "",
}
answer = "".join(answer_chunks)
reasoning_content = "".join(reasoning_content_chunks)
_write_context(node_variable, workflow_variable, node, workflow, answer, reasoning_content)


Expand Down Expand Up @@ -525,12 +527,20 @@ def _process_videos(self, image, video_model):
if isinstance(image, str) and image.startswith("http"):
videos.append({"type": "video_url", "video_url": {"url": image}})
elif image is not None and len(image) > 0:
# 先批量获取数据
file_ids = [img["file_id"] for img in image if "file_id" in img]
if file_ids:
files_map = {str(f.id): f for f in QuerySet(File).filter(id__in=file_ids)}
else:
files_map = {}
# 再循环从map中取数据进行处理
for img in image:
if "file_id" in img:
file_id = img["file_id"]
file = QuerySet(File).filter(id=file_id).first()
url = video_model.upload_file_and_get_url(file.get_bytes(), file.file_name)
videos.append({"type": "video_url", "video_url": {"url": url}})
file = files_map.get(str(file_id))
if file:
url = video_model.upload_file_and_get_url(file.get_bytes(), file.file_name)
videos.append({"type": "video_url", "video_url": {"url": url}})
elif "url" in img and img["url"].startswith("http"):
videos.append({"type": "video_url", "video_url": {"url": img["url"]}})
return videos
Expand All @@ -543,16 +553,24 @@ def _process_images(self, image):
if isinstance(image, str) and image.startswith("http"):
images.append({"type": "image_url", "image_url": {"url": image}})
elif image is not None and len(image) > 0:
# 先批量获取数据
file_ids = [img["file_id"] for img in image if "file_id" in img]
if file_ids:
files_map = {str(f.id): f for f in QuerySet(File).filter(id__in=file_ids)}
else:
files_map = {}
# 再循环从map中取数据进行处理
for img in image:
if "file_id" in img:
file_id = img["file_id"]
file = QuerySet(File).filter(id=file_id).first()
image_bytes = file.get_bytes()
base64_image = base64.b64encode(image_bytes).decode("utf-8")
image_format = what(None, image_bytes)
images.append(
{"type": "image_url", "image_url": {"url": f"data:image/{image_format};base64,{base64_image}"}}
)
file = files_map.get(str(file_id))
if file:
image_bytes = file.get_bytes()
base64_image = base64.b64encode(image_bytes).decode("utf-8")
image_format = what(None, image_bytes)
images.append(
{"type": "image_url", "image_url": {"url": f"data:image/{image_format};base64,{base64_image}"}}
)
elif "url" in img and img["url"].startswith("http"):
images.append({"type": "image_url", "image_url": {"url": img["url"]}})
return images
Expand Down
Loading