diff --git a/api/configs/middleware/__init__.py b/api/configs/middleware/__init__.py index cff563693..18c5ba8df 100644 --- a/api/configs/middleware/__init__.py +++ b/api/configs/middleware/__init__.py @@ -1,7 +1,7 @@ from typing import Any, Optional from urllib.parse import quote_plus -from pydantic import Field, NonNegativeInt, PositiveInt, computed_field +from pydantic import Field, NonNegativeInt, PositiveFloat, PositiveInt, computed_field from pydantic_settings import BaseSettings from configs.middleware.cache.redis_config import RedisConfig @@ -158,6 +158,21 @@ class CeleryConfig(DatabaseConfig): default=None, ) + CELERY_USE_SENTINEL: Optional[bool] = Field( + description="Whether to use Redis Sentinel mode", + default=False, + ) + + CELERY_SENTINEL_MASTER_NAME: Optional[str] = Field( + description="Redis Sentinel master name", + default=None, + ) + + CELERY_SENTINEL_SOCKET_TIMEOUT: Optional[PositiveFloat] = Field( + description="Redis Sentinel socket timeout", + default=0.1, + ) + @computed_field @property def CELERY_RESULT_BACKEND(self) -> str | None: diff --git a/api/configs/middleware/cache/redis_config.py b/api/configs/middleware/cache/redis_config.py index cacdaf6fb..4fcd52ddc 100644 --- a/api/configs/middleware/cache/redis_config.py +++ b/api/configs/middleware/cache/redis_config.py @@ -1,6 +1,6 @@ from typing import Optional -from pydantic import Field, NonNegativeInt, PositiveInt +from pydantic import Field, NonNegativeInt, PositiveFloat, PositiveInt from pydantic_settings import BaseSettings @@ -38,3 +38,33 @@ class RedisConfig(BaseSettings): description="whether to use SSL for Redis connection", default=False, ) + + REDIS_USE_SENTINEL: Optional[bool] = Field( + description="Whether to use Redis Sentinel mode", + default=False, + ) + + REDIS_SENTINELS: Optional[str] = Field( + description="Redis Sentinel nodes", + default=None, + ) + + REDIS_SENTINEL_SERVICE_NAME: Optional[str] = Field( + description="Redis Sentinel service name", + default=None, + ) + + REDIS_SENTINEL_USERNAME: Optional[str] = Field( + description="Redis Sentinel username", + default=None, + ) + + REDIS_SENTINEL_PASSWORD: Optional[str] = Field( + description="Redis Sentinel password", + default=None, + ) + + REDIS_SENTINEL_SOCKET_TIMEOUT: Optional[PositiveFloat] = Field( + description="Redis Sentinel socket timeout", + default=0.1, + ) diff --git a/api/extensions/ext_celery.py b/api/extensions/ext_celery.py index f5ec7c175..0ff9f9084 100644 --- a/api/extensions/ext_celery.py +++ b/api/extensions/ext_celery.py @@ -10,11 +10,21 @@ def init_app(app: Flask) -> Celery: with app.app_context(): return self.run(*args, **kwargs) + broker_transport_options = {} + + if app.config.get("CELERY_USE_SENTINEL"): + broker_transport_options = { + "master_name": app.config.get("CELERY_SENTINEL_MASTER_NAME"), + "sentinel_kwargs": { + "socket_timeout": app.config.get("CELERY_SENTINEL_SOCKET_TIMEOUT", 0.1), + }, + } + celery_app = Celery( app.name, task_cls=FlaskTask, - broker=app.config["CELERY_BROKER_URL"], - backend=app.config["CELERY_BACKEND"], + broker=app.config.get("CELERY_BROKER_URL"), + backend=app.config.get("CELERY_BACKEND"), task_ignore_result=True, ) @@ -27,11 +37,12 @@ def init_app(app: Flask) -> Celery: } celery_app.conf.update( - result_backend=app.config["CELERY_RESULT_BACKEND"], + result_backend=app.config.get("CELERY_RESULT_BACKEND"), + broker_transport_options=broker_transport_options, broker_connection_retry_on_startup=True, ) - if app.config["BROKER_USE_SSL"]: + if app.config.get("BROKER_USE_SSL"): celery_app.conf.update( broker_use_ssl=ssl_options, # Add the SSL options to the broker configuration ) @@ -43,7 +54,7 @@ def init_app(app: Flask) -> Celery: "schedule.clean_embedding_cache_task", "schedule.clean_unused_datasets_task", ] - day = app.config["CELERY_BEAT_SCHEDULER_TIME"] + day = app.config.get("CELERY_BEAT_SCHEDULER_TIME") beat_schedule = { "clean_embedding_cache_task": { "task": "schedule.clean_embedding_cache_task.clean_embedding_cache_task", diff --git a/api/extensions/ext_redis.py b/api/extensions/ext_redis.py index d5fb162fd..054769e7f 100644 --- a/api/extensions/ext_redis.py +++ b/api/extensions/ext_redis.py @@ -1,26 +1,83 @@ import redis from redis.connection import Connection, SSLConnection +from redis.sentinel import Sentinel -redis_client = redis.Redis() + +class RedisClientWrapper(redis.Redis): + """ + A wrapper class for the Redis client that addresses the issue where the global + `redis_client` variable cannot be updated when a new Redis instance is returned + by Sentinel. + + This class allows for deferred initialization of the Redis client, enabling the + client to be re-initialized with a new instance when necessary. This is particularly + useful in scenarios where the Redis instance may change dynamically, such as during + a failover in a Sentinel-managed Redis setup. + + Attributes: + _client (redis.Redis): The actual Redis client instance. It remains None until + initialized with the `initialize` method. + + Methods: + initialize(client): Initializes the Redis client if it hasn't been initialized already. + __getattr__(item): Delegates attribute access to the Redis client, raising an error + if the client is not initialized. + """ + + def __init__(self): + self._client = None + + def initialize(self, client): + if self._client is None: + self._client = client + + def __getattr__(self, item): + if self._client is None: + raise RuntimeError("Redis client is not initialized. Call init_app first.") + return getattr(self._client, item) + + +redis_client = RedisClientWrapper() def init_app(app): + global redis_client connection_class = Connection if app.config.get("REDIS_USE_SSL"): connection_class = SSLConnection - redis_client.connection_pool = redis.ConnectionPool( - **{ - "host": app.config.get("REDIS_HOST"), - "port": app.config.get("REDIS_PORT"), - "username": app.config.get("REDIS_USERNAME"), - "password": app.config.get("REDIS_PASSWORD"), - "db": app.config.get("REDIS_DB"), - "encoding": "utf-8", - "encoding_errors": "strict", - "decode_responses": False, - }, - connection_class=connection_class, - ) + redis_params = { + "username": app.config.get("REDIS_USERNAME"), + "password": app.config.get("REDIS_PASSWORD"), + "db": app.config.get("REDIS_DB"), + "encoding": "utf-8", + "encoding_errors": "strict", + "decode_responses": False, + } + + if app.config.get("REDIS_USE_SENTINEL"): + sentinel_hosts = [ + (node.split(":")[0], int(node.split(":")[1])) for node in app.config.get("REDIS_SENTINELS").split(",") + ] + sentinel = Sentinel( + sentinel_hosts, + sentinel_kwargs={ + "socket_timeout": app.config.get("REDIS_SENTINEL_SOCKET_TIMEOUT", 0.1), + "username": app.config.get("REDIS_SENTINEL_USERNAME"), + "password": app.config.get("REDIS_SENTINEL_PASSWORD"), + }, + ) + master = sentinel.master_for(app.config.get("REDIS_SENTINEL_SERVICE_NAME"), **redis_params) + redis_client.initialize(master) + else: + redis_params.update( + { + "host": app.config.get("REDIS_HOST"), + "port": app.config.get("REDIS_PORT"), + "connection_class": connection_class, + } + ) + pool = redis.ConnectionPool(**redis_params) + redis_client.initialize(redis.Redis(connection_pool=pool)) app.extensions["redis"] = redis_client diff --git a/docker/.env.example b/docker/.env.example index 0a576de12..654313888 100644 --- a/docker/.env.example +++ b/docker/.env.example @@ -214,6 +214,18 @@ REDIS_USERNAME= REDIS_PASSWORD=difyai123456 REDIS_USE_SSL=false +# Whether to use Redis Sentinel mode. +# If set to true, the application will automatically discover and connect to the master node through Sentinel. +REDIS_USE_SENTINEL=false + +# List of Redis Sentinel nodes. If Sentinel mode is enabled, provide at least one Sentinel IP and port. +# Format: `:,:,:` +REDIS_SENTINELS= +REDIS_SENTINEL_SERVICE_NAME= +REDIS_SENTINEL_USERNAME= +REDIS_SENTINEL_PASSWORD= +REDIS_SENTINEL_SOCKET_TIMEOUT=0.1 + # ------------------------------ # Celery Configuration # ------------------------------ @@ -221,9 +233,16 @@ REDIS_USE_SSL=false # Use redis as the broker, and redis db 1 for celery broker. # Format as follows: `redis://:@:/` # Example: redis://:difyai123456@redis:6379/1 +# If use Redis Sentinel, format as follows: `sentinel://:@:/` +# Example: sentinel://localhost:26379/1;sentinel://localhost:26380/1;sentinel://localhost:26381/1 CELERY_BROKER_URL=redis://:difyai123456@redis:6379/1 BROKER_USE_SSL=false +# If you are using Redis Sentinel for high availability, configure the following settings. +CELERY_USE_SENTINEL=false +CELERY_SENTINEL_MASTER_NAME= +CELERY_SENTINEL_SOCKET_TIMEOUT=0.1 + # ------------------------------ # CORS Configuration # Used to set the front-end cross-domain access policy. diff --git a/docker/docker-compose.yaml b/docker/docker-compose.yaml index 4793d4ef0..770a141c0 100644 --- a/docker/docker-compose.yaml +++ b/docker/docker-compose.yaml @@ -42,8 +42,17 @@ x-shared-env: &shared-api-worker-env REDIS_PASSWORD: ${REDIS_PASSWORD:-difyai123456} REDIS_USE_SSL: ${REDIS_USE_SSL:-false} REDIS_DB: 0 + REDIS_USE_SENTINEL: ${REDIS_USE_SENTINEL:-false} + REDIS_SENTINELS: ${REDIS_SENTINELS:-} + REDIS_SENTINEL_SERVICE_NAME: ${REDIS_SENTINEL_SERVICE_NAME:-} + REDIS_SENTINEL_USERNAME: ${REDIS_SENTINEL_USERNAME:-} + REDIS_SENTINEL_PASSWORD: ${REDIS_SENTINEL_PASSWORD:-} + REDIS_SENTINEL_SOCKET_TIMEOUT: ${REDIS_SENTINEL_SOCKET_TIMEOUT:-} CELERY_BROKER_URL: ${CELERY_BROKER_URL:-redis://:difyai123456@redis:6379/1} BROKER_USE_SSL: ${BROKER_USE_SSL:-false} + CELERY_USE_SENTINEL: ${CELERY_USE_SENTINEL:-false} + CELERY_SENTINEL_MASTER_NAME: ${CELERY_SENTINEL_MASTER_NAME:-} + CELERY_SENTINEL_SOCKET_TIMEOUT: ${CELERY_SENTINEL_SOCKET_TIMEOUT:-} WEB_API_CORS_ALLOW_ORIGINS: ${WEB_API_CORS_ALLOW_ORIGINS:-*} CONSOLE_CORS_ALLOW_ORIGINS: ${CONSOLE_CORS_ALLOW_ORIGINS:-*} STORAGE_TYPE: ${STORAGE_TYPE:-local}