mirror of
https://gitee.com/dify_ai/dify.git
synced 2024-12-03 11:48:08 +08:00
166 lines
6.5 KiB
Python
166 lines
6.5 KiB
Python
from collections.abc import Generator, Mapping
|
|
from typing import Any, Union
|
|
|
|
from openai._exceptions import RateLimitError
|
|
|
|
from configs import dify_config
|
|
from core.app.apps.advanced_chat.app_generator import AdvancedChatAppGenerator
|
|
from core.app.apps.agent_chat.app_generator import AgentChatAppGenerator
|
|
from core.app.apps.chat.app_generator import ChatAppGenerator
|
|
from core.app.apps.completion.app_generator import CompletionAppGenerator
|
|
from core.app.apps.workflow.app_generator import WorkflowAppGenerator
|
|
from core.app.entities.app_invoke_entities import InvokeFrom
|
|
from core.app.features.rate_limiting import RateLimit
|
|
from models.model import Account, App, AppMode, EndUser
|
|
from models.workflow import Workflow
|
|
from services.errors.llm import InvokeRateLimitError
|
|
from services.workflow_service import WorkflowService
|
|
|
|
|
|
class AppGenerateService:
|
|
@classmethod
|
|
def generate(
|
|
cls,
|
|
app_model: App,
|
|
user: Union[Account, EndUser],
|
|
args: Mapping[str, Any],
|
|
invoke_from: InvokeFrom,
|
|
streaming: bool = True,
|
|
):
|
|
"""
|
|
App Content Generate
|
|
:param app_model: app model
|
|
:param user: user
|
|
:param args: args
|
|
:param invoke_from: invoke from
|
|
:param streaming: streaming
|
|
:return:
|
|
"""
|
|
max_active_request = AppGenerateService._get_max_active_requests(app_model)
|
|
rate_limit = RateLimit(app_model.id, max_active_request)
|
|
request_id = RateLimit.gen_request_key()
|
|
try:
|
|
request_id = rate_limit.enter(request_id)
|
|
if app_model.mode == AppMode.COMPLETION.value:
|
|
return rate_limit.generate(
|
|
CompletionAppGenerator().generate(
|
|
app_model=app_model, user=user, args=args, invoke_from=invoke_from, stream=streaming
|
|
),
|
|
request_id,
|
|
)
|
|
elif app_model.mode == AppMode.AGENT_CHAT.value or app_model.is_agent:
|
|
return rate_limit.generate(
|
|
AgentChatAppGenerator().generate(
|
|
app_model=app_model, user=user, args=args, invoke_from=invoke_from, stream=streaming
|
|
),
|
|
request_id,
|
|
)
|
|
elif app_model.mode == AppMode.CHAT.value:
|
|
return rate_limit.generate(
|
|
ChatAppGenerator().generate(
|
|
app_model=app_model, user=user, args=args, invoke_from=invoke_from, stream=streaming
|
|
),
|
|
request_id,
|
|
)
|
|
elif app_model.mode == AppMode.ADVANCED_CHAT.value:
|
|
workflow = cls._get_workflow(app_model, invoke_from)
|
|
return rate_limit.generate(
|
|
AdvancedChatAppGenerator().generate(
|
|
app_model=app_model,
|
|
workflow=workflow,
|
|
user=user,
|
|
args=args,
|
|
invoke_from=invoke_from,
|
|
stream=streaming,
|
|
),
|
|
request_id,
|
|
)
|
|
elif app_model.mode == AppMode.WORKFLOW.value:
|
|
workflow = cls._get_workflow(app_model, invoke_from)
|
|
return rate_limit.generate(
|
|
WorkflowAppGenerator().generate(
|
|
app_model=app_model,
|
|
workflow=workflow,
|
|
user=user,
|
|
args=args,
|
|
invoke_from=invoke_from,
|
|
stream=streaming,
|
|
),
|
|
request_id,
|
|
)
|
|
else:
|
|
raise ValueError(f"Invalid app mode {app_model.mode}")
|
|
except RateLimitError as e:
|
|
raise InvokeRateLimitError(str(e))
|
|
finally:
|
|
if not streaming:
|
|
rate_limit.exit(request_id)
|
|
|
|
@staticmethod
|
|
def _get_max_active_requests(app_model: App) -> int:
|
|
max_active_requests = app_model.max_active_requests
|
|
if app_model.max_active_requests is None:
|
|
max_active_requests = int(dify_config.APP_MAX_ACTIVE_REQUESTS)
|
|
return max_active_requests
|
|
|
|
@classmethod
|
|
def generate_single_iteration(cls, app_model: App, user: Account, node_id: str, args: Any, streaming: bool = True):
|
|
if app_model.mode == AppMode.ADVANCED_CHAT.value:
|
|
workflow = cls._get_workflow(app_model, InvokeFrom.DEBUGGER)
|
|
return AdvancedChatAppGenerator().single_iteration_generate(
|
|
app_model=app_model, workflow=workflow, node_id=node_id, user=user, args=args, stream=streaming
|
|
)
|
|
elif app_model.mode == AppMode.WORKFLOW.value:
|
|
workflow = cls._get_workflow(app_model, InvokeFrom.DEBUGGER)
|
|
return WorkflowAppGenerator().single_iteration_generate(
|
|
app_model=app_model, workflow=workflow, node_id=node_id, user=user, args=args, stream=streaming
|
|
)
|
|
else:
|
|
raise ValueError(f"Invalid app mode {app_model.mode}")
|
|
|
|
@classmethod
|
|
def generate_more_like_this(
|
|
cls,
|
|
app_model: App,
|
|
user: Union[Account, EndUser],
|
|
message_id: str,
|
|
invoke_from: InvokeFrom,
|
|
streaming: bool = True,
|
|
) -> Union[dict, Generator]:
|
|
"""
|
|
Generate more like this
|
|
:param app_model: app model
|
|
:param user: user
|
|
:param message_id: message id
|
|
:param invoke_from: invoke from
|
|
:param streaming: streaming
|
|
:return:
|
|
"""
|
|
return CompletionAppGenerator().generate_more_like_this(
|
|
app_model=app_model, message_id=message_id, user=user, invoke_from=invoke_from, stream=streaming
|
|
)
|
|
|
|
@classmethod
|
|
def _get_workflow(cls, app_model: App, invoke_from: InvokeFrom) -> Workflow:
|
|
"""
|
|
Get workflow
|
|
:param app_model: app model
|
|
:param invoke_from: invoke from
|
|
:return:
|
|
"""
|
|
workflow_service = WorkflowService()
|
|
if invoke_from == InvokeFrom.DEBUGGER:
|
|
# fetch draft workflow by app_model
|
|
workflow = workflow_service.get_draft_workflow(app_model=app_model)
|
|
|
|
if not workflow:
|
|
raise ValueError("Workflow not initialized")
|
|
else:
|
|
# fetch published workflow by app_model
|
|
workflow = workflow_service.get_published_workflow(app_model=app_model)
|
|
|
|
if not workflow:
|
|
raise ValueError("Workflow not published")
|
|
|
|
return workflow
|