mirror of
https://github.com/langgenius/dify.git
synced 2026-02-16 10:30:12 -05:00
Compare commits
91 Commits
refactor/q
...
feature/ta
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a0aa8cdb45 | ||
|
|
ae8618877b | ||
|
|
fad6fa141d | ||
|
|
30821fd26c | ||
|
|
1a9fdd9a65 | ||
|
|
de610cbf39 | ||
|
|
1c55602445 | ||
|
|
a3f1220d23 | ||
|
|
d62e16b9bb | ||
|
|
13f2a43ccc | ||
|
|
6903c31b84 | ||
|
|
b2cc9b255d | ||
|
|
e9f0e1e839 | ||
|
|
cd497a8c52 | ||
|
|
7aab4529e6 | ||
|
|
4bff0cd0ab | ||
|
|
c98870c3f4 | ||
|
|
b06c7c8f33 | ||
|
|
1a2fce7055 | ||
|
|
2b021e8752 | ||
|
|
4a197b9458 | ||
|
|
772ff636ec | ||
|
|
ab1c5a2027 | ||
|
|
33e99f069b | ||
|
|
52af829f1f | ||
|
|
0ef8b5a0ca | ||
|
|
2bfc54314e | ||
|
|
bdd8d5b470 | ||
|
|
4955de5905 | ||
|
|
3bee2ee067 | ||
|
|
328897f81c | ||
|
|
ab078380a3 | ||
|
|
a33ac77a22 | ||
|
|
d3923e7b56 | ||
|
|
2f633de45e | ||
|
|
98c88cec34 | ||
|
|
c6999fb5be | ||
|
|
f7f9a08fa5 | ||
|
|
5008f5e89b | ||
|
|
1dd89a02ea | ||
|
|
5bf4114d6f | ||
|
|
a56e94ba8e | ||
|
|
11f1782df0 | ||
|
|
8cf5d9a6a1 | ||
|
|
0ec2b12e65 | ||
|
|
f33b1a3332 | ||
|
|
08026f7399 | ||
|
|
18e051bd66 | ||
|
|
42f991dbef | ||
|
|
b1b2c9636f | ||
|
|
01f17b7ddc | ||
|
|
14b2e5bd0d | ||
|
|
d095bd413b | ||
|
|
3473ff7ad1 | ||
|
|
138c56bd6e | ||
|
|
c327d0bb44 | ||
|
|
e4b97fba29 | ||
|
|
7f9884e7a1 | ||
|
|
e389cd1665 | ||
|
|
87f348a0de | ||
|
|
206706987d | ||
|
|
91da784f84 | ||
|
|
a129e684cc | ||
|
|
fe07c810ba | ||
|
|
a22cc5bc5e | ||
|
|
1fbdf6b465 | ||
|
|
491e1fd6a4 | ||
|
|
0e33dfb5c2 | ||
|
|
ea708e7a32 | ||
|
|
c09e29c3f8 | ||
|
|
2d53ba8671 | ||
|
|
9be863fefa | ||
|
|
8f43629cd8 | ||
|
|
9ee71902c1 | ||
|
|
a012c87445 | ||
|
|
450578d4c0 | ||
|
|
837237aa6d | ||
|
|
b63dfbf654 | ||
|
|
51ea87ab85 | ||
|
|
00698e41b7 | ||
|
|
df938a4543 | ||
|
|
9161936f41 | ||
|
|
f9a21b56ab | ||
|
|
220e1df847 | ||
|
|
8cfdde594c | ||
|
|
31a8fd810c | ||
|
|
9fad97ec9b | ||
|
|
0c2729d9b3 | ||
|
|
a2e03b811e | ||
|
|
1e10bf525c | ||
|
|
8b1af36d94 |
1
.agent/skills
Symbolic link
1
.agent/skills
Symbolic link
@@ -0,0 +1 @@
|
||||
../.claude/skills
|
||||
@@ -5,5 +5,18 @@
|
||||
"typescript-lsp@claude-plugins-official": true,
|
||||
"pyright-lsp@claude-plugins-official": true,
|
||||
"ralph-loop@claude-plugins-official": true
|
||||
},
|
||||
"hooks": {
|
||||
"PreToolUse": [
|
||||
{
|
||||
"matcher": "Bash",
|
||||
"hooks": [
|
||||
{
|
||||
"type": "command",
|
||||
"command": "npx -y block-no-verify@1.1.1"
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
|
||||
46
.claude/skills/orpc-contract-first/SKILL.md
Normal file
46
.claude/skills/orpc-contract-first/SKILL.md
Normal file
@@ -0,0 +1,46 @@
|
||||
---
|
||||
name: orpc-contract-first
|
||||
description: Guide for implementing oRPC contract-first API patterns in Dify frontend. Triggers when creating new API contracts, adding service endpoints, integrating TanStack Query with typed contracts, or migrating legacy service calls to oRPC. Use for all API layer work in web/contract and web/service directories.
|
||||
---
|
||||
|
||||
# oRPC Contract-First Development
|
||||
|
||||
## Project Structure
|
||||
|
||||
```
|
||||
web/contract/
|
||||
├── base.ts # Base contract (inputStructure: 'detailed')
|
||||
├── router.ts # Router composition & type exports
|
||||
├── marketplace.ts # Marketplace contracts
|
||||
└── console/ # Console contracts by domain
|
||||
├── system.ts
|
||||
└── billing.ts
|
||||
```
|
||||
|
||||
## Workflow
|
||||
|
||||
1. **Create contract** in `web/contract/console/{domain}.ts`
|
||||
- Import `base` from `../base` and `type` from `@orpc/contract`
|
||||
- Define route with `path`, `method`, `input`, `output`
|
||||
|
||||
2. **Register in router** at `web/contract/router.ts`
|
||||
- Import directly from domain file (no barrel files)
|
||||
- Nest by API prefix: `billing: { invoices, bindPartnerStack }`
|
||||
|
||||
3. **Create hooks** in `web/service/use-{domain}.ts`
|
||||
- Use `consoleQuery.{group}.{contract}.queryKey()` for query keys
|
||||
- Use `consoleClient.{group}.{contract}()` for API calls
|
||||
|
||||
## Key Rules
|
||||
|
||||
- **Input structure**: Always use `{ params, query?, body? }` format
|
||||
- **Path params**: Use `{paramName}` in path, match in `params` object
|
||||
- **Router nesting**: Group by API prefix (e.g., `/billing/*` → `billing: {}`)
|
||||
- **No barrel files**: Import directly from specific files
|
||||
- **Types**: Import from `@/types/`, use `type<T>()` helper
|
||||
|
||||
## Type Export
|
||||
|
||||
```typescript
|
||||
export type ConsoleInputs = InferContractRouterInputs<typeof consoleRouterContract>
|
||||
```
|
||||
6
.github/workflows/api-tests.yml
vendored
6
.github/workflows/api-tests.yml
vendored
@@ -39,12 +39,6 @@ jobs:
|
||||
- name: Install dependencies
|
||||
run: uv sync --project api --dev
|
||||
|
||||
- name: Run pyrefly check
|
||||
run: |
|
||||
cd api
|
||||
uv add --dev pyrefly
|
||||
uv run pyrefly check || true
|
||||
|
||||
- name: Run dify config tests
|
||||
run: uv run --project api dev/pytest/pytest_config_tests.py
|
||||
|
||||
|
||||
4
.github/workflows/autofix.yml
vendored
4
.github/workflows/autofix.yml
vendored
@@ -16,14 +16,14 @@ jobs:
|
||||
|
||||
- name: Check Docker Compose inputs
|
||||
id: docker-compose-changes
|
||||
uses: tj-actions/changed-files@v46
|
||||
uses: tj-actions/changed-files@v47
|
||||
with:
|
||||
files: |
|
||||
docker/generate_docker_compose
|
||||
docker/.env.example
|
||||
docker/docker-compose-template.yaml
|
||||
docker/docker-compose.yaml
|
||||
- uses: actions/setup-python@v5
|
||||
- uses: actions/setup-python@v6
|
||||
with:
|
||||
python-version: "3.11"
|
||||
|
||||
|
||||
2
.github/workflows/build-push.yml
vendored
2
.github/workflows/build-push.yml
vendored
@@ -112,7 +112,7 @@ jobs:
|
||||
context: "web"
|
||||
steps:
|
||||
- name: Download digests
|
||||
uses: actions/download-artifact@v4
|
||||
uses: actions/download-artifact@v7
|
||||
with:
|
||||
path: /tmp/digests
|
||||
pattern: digests-${{ matrix.context }}-*
|
||||
|
||||
2
.github/workflows/deploy-agent-dev.yml
vendored
2
.github/workflows/deploy-agent-dev.yml
vendored
@@ -19,7 +19,7 @@ jobs:
|
||||
github.event.workflow_run.head_branch == 'deploy/agent-dev'
|
||||
steps:
|
||||
- name: Deploy to server
|
||||
uses: appleboy/ssh-action@v0.1.8
|
||||
uses: appleboy/ssh-action@v1
|
||||
with:
|
||||
host: ${{ secrets.AGENT_DEV_SSH_HOST }}
|
||||
username: ${{ secrets.SSH_USER }}
|
||||
|
||||
2
.github/workflows/deploy-dev.yml
vendored
2
.github/workflows/deploy-dev.yml
vendored
@@ -16,7 +16,7 @@ jobs:
|
||||
github.event.workflow_run.head_branch == 'deploy/dev'
|
||||
steps:
|
||||
- name: Deploy to server
|
||||
uses: appleboy/ssh-action@v0.1.8
|
||||
uses: appleboy/ssh-action@v1
|
||||
with:
|
||||
host: ${{ secrets.SSH_HOST }}
|
||||
username: ${{ secrets.SSH_USER }}
|
||||
|
||||
29
.github/workflows/deploy-hitl.yml
vendored
Normal file
29
.github/workflows/deploy-hitl.yml
vendored
Normal file
@@ -0,0 +1,29 @@
|
||||
name: Deploy HITL
|
||||
|
||||
on:
|
||||
workflow_run:
|
||||
workflows: ["Build and Push API & Web"]
|
||||
branches:
|
||||
- "feat/hitl-frontend"
|
||||
- "feat/hitl-backend"
|
||||
types:
|
||||
- completed
|
||||
|
||||
jobs:
|
||||
deploy:
|
||||
runs-on: ubuntu-latest
|
||||
if: |
|
||||
github.event.workflow_run.conclusion == 'success' &&
|
||||
(
|
||||
github.event.workflow_run.head_branch == 'feat/hitl-frontend' ||
|
||||
github.event.workflow_run.head_branch == 'feat/hitl-backend'
|
||||
)
|
||||
steps:
|
||||
- name: Deploy to server
|
||||
uses: appleboy/ssh-action@v1
|
||||
with:
|
||||
host: ${{ secrets.HITL_SSH_HOST }}
|
||||
username: ${{ secrets.SSH_USER }}
|
||||
key: ${{ secrets.SSH_PRIVATE_KEY }}
|
||||
script: |
|
||||
${{ vars.SSH_SCRIPT || secrets.SSH_SCRIPT }}
|
||||
2
.github/workflows/stale.yml
vendored
2
.github/workflows/stale.yml
vendored
@@ -18,7 +18,7 @@ jobs:
|
||||
pull-requests: write
|
||||
|
||||
steps:
|
||||
- uses: actions/stale@v5
|
||||
- uses: actions/stale@v10
|
||||
with:
|
||||
days-before-issue-stale: 15
|
||||
days-before-issue-close: 3
|
||||
|
||||
15
.github/workflows/style.yml
vendored
15
.github/workflows/style.yml
vendored
@@ -65,6 +65,9 @@ jobs:
|
||||
defaults:
|
||||
run:
|
||||
working-directory: ./web
|
||||
permissions:
|
||||
checks: write
|
||||
pull-requests: read
|
||||
|
||||
steps:
|
||||
- name: Checkout code
|
||||
@@ -90,7 +93,7 @@ jobs:
|
||||
uses: actions/setup-node@v6
|
||||
if: steps.changed-files.outputs.any_changed == 'true'
|
||||
with:
|
||||
node-version: 22
|
||||
node-version: 24
|
||||
cache: pnpm
|
||||
cache-dependency-path: ./web/pnpm-lock.yaml
|
||||
|
||||
@@ -103,7 +106,15 @@ jobs:
|
||||
if: steps.changed-files.outputs.any_changed == 'true'
|
||||
working-directory: ./web
|
||||
run: |
|
||||
pnpm run lint
|
||||
pnpm run lint:report
|
||||
continue-on-error: true
|
||||
|
||||
# - name: Annotate Code
|
||||
# if: steps.changed-files.outputs.any_changed == 'true' && github.event_name == 'pull_request'
|
||||
# uses: DerLev/eslint-annotations@51347b3a0abfb503fc8734d5ae31c4b151297fae
|
||||
# with:
|
||||
# eslint-report: web/eslint_report.json
|
||||
# github-token: ${{ secrets.GITHUB_TOKEN }}
|
||||
|
||||
- name: Web type check
|
||||
if: steps.changed-files.outputs.any_changed == 'true'
|
||||
|
||||
8
.github/workflows/tool-test-sdks.yaml
vendored
8
.github/workflows/tool-test-sdks.yaml
vendored
@@ -16,10 +16,6 @@ jobs:
|
||||
name: unit test for Node.js SDK
|
||||
runs-on: ubuntu-latest
|
||||
|
||||
strategy:
|
||||
matrix:
|
||||
node-version: [16, 18, 20, 22]
|
||||
|
||||
defaults:
|
||||
run:
|
||||
working-directory: sdks/nodejs-client
|
||||
@@ -29,10 +25,10 @@ jobs:
|
||||
with:
|
||||
persist-credentials: false
|
||||
|
||||
- name: Use Node.js ${{ matrix.node-version }}
|
||||
- name: Use Node.js
|
||||
uses: actions/setup-node@v6
|
||||
with:
|
||||
node-version: ${{ matrix.node-version }}
|
||||
node-version: 24
|
||||
cache: ''
|
||||
cache-dependency-path: 'pnpm-lock.yaml'
|
||||
|
||||
|
||||
2
.github/workflows/translate-i18n-claude.yml
vendored
2
.github/workflows/translate-i18n-claude.yml
vendored
@@ -57,7 +57,7 @@ jobs:
|
||||
- name: Set up Node.js
|
||||
uses: actions/setup-node@v6
|
||||
with:
|
||||
node-version: 'lts/*'
|
||||
node-version: 24
|
||||
cache: pnpm
|
||||
cache-dependency-path: ./web/pnpm-lock.yaml
|
||||
|
||||
|
||||
2
.github/workflows/trigger-i18n-sync.yml
vendored
2
.github/workflows/trigger-i18n-sync.yml
vendored
@@ -21,7 +21,7 @@ jobs:
|
||||
|
||||
steps:
|
||||
- name: Checkout repository
|
||||
uses: actions/checkout@v4
|
||||
uses: actions/checkout@v6
|
||||
with:
|
||||
fetch-depth: 0
|
||||
|
||||
|
||||
2
.github/workflows/web-tests.yml
vendored
2
.github/workflows/web-tests.yml
vendored
@@ -31,7 +31,7 @@ jobs:
|
||||
- name: Setup Node.js
|
||||
uses: actions/setup-node@v6
|
||||
with:
|
||||
node-version: 22
|
||||
node-version: 24
|
||||
cache: pnpm
|
||||
cache-dependency-path: ./web/pnpm-lock.yaml
|
||||
|
||||
|
||||
@@ -12,12 +12,8 @@ The codebase is split into:
|
||||
|
||||
## Backend Workflow
|
||||
|
||||
- Read `api/AGENTS.md` for details
|
||||
- Run backend CLI commands through `uv run --project api <command>`.
|
||||
|
||||
- Before submission, all backend modifications must pass local checks: `make lint`, `make type-check`, and `uv run --project api --dev dev/pytest/pytest_unit_tests.sh`.
|
||||
|
||||
- Use Makefile targets for linting and formatting; `make lint` and `make type-check` cover the required checks.
|
||||
|
||||
- Integration tests are CI-only and are not expected to run in the local environment.
|
||||
|
||||
## Frontend Workflow
|
||||
|
||||
12
Makefile
12
Makefile
@@ -61,7 +61,8 @@ check:
|
||||
|
||||
lint:
|
||||
@echo "🔧 Running ruff format, check with fixes, import linter, and dotenv-linter..."
|
||||
@uv run --project api --dev sh -c 'ruff format ./api && ruff check --fix ./api'
|
||||
@uv run --project api --dev ruff format ./api
|
||||
@uv run --project api --dev ruff check --fix ./api
|
||||
@uv run --directory api --dev lint-imports
|
||||
@uv run --project api --dev dotenv-linter ./api/.env.example ./web/.env.example
|
||||
@echo "✅ Linting complete"
|
||||
@@ -73,7 +74,12 @@ type-check:
|
||||
|
||||
test:
|
||||
@echo "🧪 Running backend unit tests..."
|
||||
@uv run --project api --dev dev/pytest/pytest_unit_tests.sh
|
||||
@if [ -n "$(TARGET_TESTS)" ]; then \
|
||||
echo "Target: $(TARGET_TESTS)"; \
|
||||
uv run --project api --dev pytest $(TARGET_TESTS); \
|
||||
else \
|
||||
uv run --project api --dev dev/pytest/pytest_unit_tests.sh; \
|
||||
fi
|
||||
@echo "✅ Tests complete"
|
||||
|
||||
# Build Docker images
|
||||
@@ -125,7 +131,7 @@ help:
|
||||
@echo " make check - Check code with ruff"
|
||||
@echo " make lint - Format, fix, and lint code (ruff, imports, dotenv)"
|
||||
@echo " make type-check - Run type checking with basedpyright"
|
||||
@echo " make test - Run backend unit tests"
|
||||
@echo " make test - Run backend unit tests (or TARGET_TESTS=./api/tests/<target_tests>)"
|
||||
@echo ""
|
||||
@echo "Docker Build Targets:"
|
||||
@echo " make build-web - Build web Docker image"
|
||||
|
||||
0
agent-notes/.gitkeep
Normal file
0
agent-notes/.gitkeep
Normal file
@@ -417,6 +417,8 @@ SMTP_USERNAME=123
|
||||
SMTP_PASSWORD=abc
|
||||
SMTP_USE_TLS=true
|
||||
SMTP_OPPORTUNISTIC_TLS=false
|
||||
# Optional: override the local hostname used for SMTP HELO/EHLO
|
||||
SMTP_LOCAL_HOSTNAME=
|
||||
# Sendgid configuration
|
||||
SENDGRID_API_KEY=
|
||||
# Sentry configuration
|
||||
@@ -589,6 +591,7 @@ ENABLE_CLEAN_UNUSED_DATASETS_TASK=false
|
||||
ENABLE_CREATE_TIDB_SERVERLESS_TASK=false
|
||||
ENABLE_UPDATE_TIDB_SERVERLESS_STATUS_TASK=false
|
||||
ENABLE_CLEAN_MESSAGES=false
|
||||
ENABLE_WORKFLOW_RUN_CLEANUP_TASK=false
|
||||
ENABLE_MAIL_CLEAN_DOCUMENT_NOTIFY_TASK=false
|
||||
ENABLE_DATASETS_QUEUE_MONITOR=false
|
||||
ENABLE_CHECK_UPGRADABLE_PLUGIN_TASK=true
|
||||
@@ -712,3 +715,4 @@ ANNOTATION_IMPORT_MAX_CONCURRENT=5
|
||||
SANDBOX_EXPIRED_RECORDS_CLEAN_GRACEFUL_PERIOD=21
|
||||
SANDBOX_EXPIRED_RECORDS_CLEAN_BATCH_SIZE=1000
|
||||
SANDBOX_EXPIRED_RECORDS_RETENTION_DAYS=30
|
||||
|
||||
|
||||
248
api/AGENTS.md
248
api/AGENTS.md
@@ -1,62 +1,236 @@
|
||||
# Agent Skill Index
|
||||
# API Agent Guide
|
||||
|
||||
## Agent Notes (must-check)
|
||||
|
||||
Before you start work on any backend file under `api/`, you MUST check whether a related note exists under:
|
||||
|
||||
- `agent-notes/<same-relative-path-as-target-file>.md`
|
||||
|
||||
Rules:
|
||||
|
||||
- **Path mapping**: for a target file `<path>/<name>.py`, the note must be `agent-notes/<path>/<name>.py.md` (same folder structure, same filename, plus `.md`).
|
||||
- **Before working**:
|
||||
- If the note exists, read it first and follow any constraints/decisions recorded there.
|
||||
- If the note conflicts with the current code, or references an "origin" file/path that has been deleted, renamed, or migrated, treat the **code as the single source of truth** and update the note to match reality.
|
||||
- If the note does not exist, create it with a short architecture/intent summary and any relevant invariants/edge cases.
|
||||
- **During working**:
|
||||
- Keep the note in sync as you discover constraints, make decisions, or change approach.
|
||||
- If you move/rename a file, migrate its note to the new mapped path (and fix any outdated references inside the note).
|
||||
- Record non-obvious edge cases, trade-offs, and the test/verification plan as you go (not just at the end).
|
||||
- Keep notes **coherent**: integrate new findings into the relevant sections and rewrite for clarity; avoid append-only “recent fix” / changelog-style additions unless the note is explicitly intended to be a changelog.
|
||||
- **When finishing work**:
|
||||
- Update the related note(s) to reflect what changed, why, and any new edge cases/tests.
|
||||
- If a file is deleted, remove or clearly deprecate the corresponding note so it cannot be mistaken as current guidance.
|
||||
- Keep notes concise and accurate; they are meant to prevent repeated rediscovery.
|
||||
|
||||
## Skill Index
|
||||
|
||||
Start with the section that best matches your need. Each entry lists the problems it solves plus key files/concepts so you know what to expect before opening it.
|
||||
|
||||
______________________________________________________________________
|
||||
### Platform Foundations
|
||||
|
||||
## Platform Foundations
|
||||
|
||||
- **[Infrastructure Overview](agent_skills/infra.md)**\
|
||||
When to read this:
|
||||
#### [Infrastructure Overview](agent_skills/infra.md)
|
||||
|
||||
- **When to read this**
|
||||
- You need to understand where a feature belongs in the architecture.
|
||||
- You’re wiring storage, Redis, vector stores, or OTEL.
|
||||
- You’re about to add CLI commands or async jobs.\
|
||||
What it covers: configuration stack (`configs/app_config.py`, remote settings), storage entry points (`extensions/ext_storage.py`, `core/file/file_manager.py`), Redis conventions (`extensions/ext_redis.py`), plugin runtime topology, vector-store factory (`core/rag/datasource/vdb/*`), observability hooks, SSRF proxy usage, and core CLI commands.
|
||||
- You’re about to add CLI commands or async jobs.
|
||||
- **What it covers**
|
||||
- Configuration stack (`configs/app_config.py`, remote settings)
|
||||
- Storage entry points (`extensions/ext_storage.py`, `core/file/file_manager.py`)
|
||||
- Redis conventions (`extensions/ext_redis.py`)
|
||||
- Plugin runtime topology
|
||||
- Vector-store factory (`core/rag/datasource/vdb/*`)
|
||||
- Observability hooks
|
||||
- SSRF proxy usage
|
||||
- Core CLI commands
|
||||
|
||||
- **[Coding Style](agent_skills/coding_style.md)**\
|
||||
When to read this:
|
||||
### Plugin & Extension Development
|
||||
|
||||
- You’re writing or reviewing backend code and need the authoritative checklist.
|
||||
- You’re unsure about Pydantic validators, SQLAlchemy session usage, or logging patterns.
|
||||
- You want the exact lint/type/test commands used in PRs.\
|
||||
Includes: Ruff & BasedPyright commands, no-annotation policy, session examples (`with Session(db.engine, ...)`), `@field_validator` usage, logging expectations, and the rule set for file size, helpers, and package management.
|
||||
|
||||
______________________________________________________________________
|
||||
|
||||
## Plugin & Extension Development
|
||||
|
||||
- **[Plugin Systems](agent_skills/plugin.md)**\
|
||||
When to read this:
|
||||
#### [Plugin Systems](agent_skills/plugin.md)
|
||||
|
||||
- **When to read this**
|
||||
- You’re building or debugging a marketplace plugin.
|
||||
- You need to know how manifests, providers, daemons, and migrations fit together.\
|
||||
What it covers: plugin manifests (`core/plugin/entities/plugin.py`), installation/upgrade flows (`services/plugin/plugin_service.py`, CLI commands), runtime adapters (`core/plugin/impl/*` for tool/model/datasource/trigger/endpoint/agent), daemon coordination (`core/plugin/entities/plugin_daemon.py`), and how provider registries surface capabilities to the rest of the platform.
|
||||
- You need to know how manifests, providers, daemons, and migrations fit together.
|
||||
- **What it covers**
|
||||
- Plugin manifests (`core/plugin/entities/plugin.py`)
|
||||
- Installation/upgrade flows (`services/plugin/plugin_service.py`, CLI commands)
|
||||
- Runtime adapters (`core/plugin/impl/*` for tool/model/datasource/trigger/endpoint/agent)
|
||||
- Daemon coordination (`core/plugin/entities/plugin_daemon.py`)
|
||||
- How provider registries surface capabilities to the rest of the platform
|
||||
|
||||
- **[Plugin OAuth](agent_skills/plugin_oauth.md)**\
|
||||
When to read this:
|
||||
#### [Plugin OAuth](agent_skills/plugin_oauth.md)
|
||||
|
||||
- **When to read this**
|
||||
- You must integrate OAuth for a plugin or datasource.
|
||||
- You’re handling credential encryption or refresh flows.\
|
||||
Topics: credential storage, encryption helpers (`core/helper/provider_encryption.py`), OAuth client bootstrap (`services/plugin/oauth_service.py`, `services/plugin/plugin_parameter_service.py`), and how console/API layers expose the flows.
|
||||
- You’re handling credential encryption or refresh flows.
|
||||
- **Topics**
|
||||
- Credential storage
|
||||
- Encryption helpers (`core/helper/provider_encryption.py`)
|
||||
- OAuth client bootstrap (`services/plugin/oauth_service.py`, `services/plugin/plugin_parameter_service.py`)
|
||||
- How console/API layers expose the flows
|
||||
|
||||
______________________________________________________________________
|
||||
### Workflow Entry & Execution
|
||||
|
||||
## Workflow Entry & Execution
|
||||
#### [Trigger Concepts](agent_skills/trigger.md)
|
||||
|
||||
- **[Trigger Concepts](agent_skills/trigger.md)**\
|
||||
When to read this:
|
||||
- **When to read this**
|
||||
- You’re debugging why a workflow didn’t start.
|
||||
- You’re adding a new trigger type or hook.
|
||||
- You need to trace async execution, draft debugging, or webhook/schedule pipelines.\
|
||||
Details: Start-node taxonomy, webhook & schedule internals (`core/workflow/nodes/trigger_*`, `services/trigger/*`), async orchestration (`services/async_workflow_service.py`, Celery queues), debug event bus, and storage/logging interactions.
|
||||
- You need to trace async execution, draft debugging, or webhook/schedule pipelines.
|
||||
- **Details**
|
||||
- Start-node taxonomy
|
||||
- Webhook & schedule internals (`core/workflow/nodes/trigger_*`, `services/trigger/*`)
|
||||
- Async orchestration (`services/async_workflow_service.py`, Celery queues)
|
||||
- Debug event bus
|
||||
- Storage/logging interactions
|
||||
|
||||
______________________________________________________________________
|
||||
## General Reminders
|
||||
|
||||
## Additional Notes for Agents
|
||||
|
||||
- All skill docs assume you follow the coding style guide—run Ruff/BasedPyright/tests listed there before submitting changes.
|
||||
- All skill docs assume you follow the coding style rules below—run the lint/type/test commands before submitting changes.
|
||||
- When you cannot find an answer in these briefs, search the codebase using the paths referenced (e.g., `core/plugin/impl/tool.py`, `services/dataset_service.py`).
|
||||
- If you run into cross-cutting concerns (tenancy, configuration, storage), check the infrastructure guide first; it links to most supporting modules.
|
||||
- Keep multi-tenancy and configuration central: everything flows through `configs.dify_config` and `tenant_id`.
|
||||
- When touching plugins or triggers, consult both the system overview and the specialised doc to ensure you adjust lifecycle, storage, and observability consistently.
|
||||
|
||||
## Coding Style
|
||||
|
||||
This is the default standard for backend code in this repo. Follow it for new code and use it as the checklist when reviewing changes.
|
||||
|
||||
### Linting & Formatting
|
||||
|
||||
- Use Ruff for formatting and linting (follow `.ruff.toml`).
|
||||
- Keep each line under 120 characters (including spaces).
|
||||
|
||||
### Naming Conventions
|
||||
|
||||
- Use `snake_case` for variables and functions.
|
||||
- Use `PascalCase` for classes.
|
||||
- Use `UPPER_CASE` for constants.
|
||||
|
||||
### Typing & Class Layout
|
||||
|
||||
- Code should usually include type annotations that match the repo’s current Python version (avoid untyped public APIs and “mystery” values).
|
||||
- Prefer modern typing forms (e.g. `list[str]`, `dict[str, int]`) and avoid `Any` unless there’s a strong reason.
|
||||
- For classes, declare member variables at the top of the class body (before `__init__`) so the class shape is obvious at a glance:
|
||||
|
||||
```python
|
||||
from datetime import datetime
|
||||
|
||||
|
||||
class Example:
|
||||
user_id: str
|
||||
created_at: datetime
|
||||
|
||||
def __init__(self, user_id: str, created_at: datetime) -> None:
|
||||
self.user_id = user_id
|
||||
self.created_at = created_at
|
||||
```
|
||||
|
||||
### General Rules
|
||||
|
||||
- Use Pydantic v2 conventions.
|
||||
- Use `uv` for Python package management in this repo (usually with `--project api`).
|
||||
- Prefer simple functions over small “utility classes” for lightweight helpers.
|
||||
- Avoid implementing dunder methods unless it’s clearly needed and matches existing patterns.
|
||||
- Never start long-running services as part of agent work (`uv run app.py`, `flask run`, etc.); running tests is allowed.
|
||||
- Keep files below ~800 lines; split when necessary.
|
||||
- Keep code readable and explicit—avoid clever hacks.
|
||||
|
||||
### Architecture & Boundaries
|
||||
|
||||
- Mirror the layered architecture: controller → service → core/domain.
|
||||
- Reuse existing helpers in `core/`, `services/`, and `libs/` before creating new abstractions.
|
||||
- Optimise for observability: deterministic control flow, clear logging, actionable errors.
|
||||
|
||||
### Logging & Errors
|
||||
|
||||
- Never use `print`; use a module-level logger:
|
||||
- `logger = logging.getLogger(__name__)`
|
||||
- Include tenant/app/workflow identifiers in log context when relevant.
|
||||
- Raise domain-specific exceptions (`services/errors`, `core/errors`) and translate them into HTTP responses in controllers.
|
||||
- Log retryable events at `warning`, terminal failures at `error`.
|
||||
|
||||
### SQLAlchemy Patterns
|
||||
|
||||
- Models inherit from `models.base.TypeBase`; do not create ad-hoc metadata or engines.
|
||||
- Open sessions with context managers:
|
||||
|
||||
```python
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
with Session(db.engine, expire_on_commit=False) as session:
|
||||
stmt = select(Workflow).where(
|
||||
Workflow.id == workflow_id,
|
||||
Workflow.tenant_id == tenant_id,
|
||||
)
|
||||
workflow = session.execute(stmt).scalar_one_or_none()
|
||||
```
|
||||
|
||||
- Prefer SQLAlchemy expressions; avoid raw SQL unless necessary.
|
||||
- Always scope queries by `tenant_id` and protect write paths with safeguards (`FOR UPDATE`, row counts, etc.).
|
||||
- Introduce repository abstractions only for very large tables (e.g., workflow executions) or when alternative storage strategies are required.
|
||||
|
||||
### Storage & External I/O
|
||||
|
||||
- Access storage via `extensions.ext_storage.storage`.
|
||||
- Use `core.helper.ssrf_proxy` for outbound HTTP fetches.
|
||||
- Background tasks that touch storage must be idempotent, and should log relevant object identifiers.
|
||||
|
||||
### Pydantic Usage
|
||||
|
||||
- Define DTOs with Pydantic v2 models and forbid extras by default.
|
||||
- Use `@field_validator` / `@model_validator` for domain rules.
|
||||
|
||||
Example:
|
||||
|
||||
```python
|
||||
from pydantic import BaseModel, ConfigDict, HttpUrl, field_validator
|
||||
|
||||
|
||||
class TriggerConfig(BaseModel):
|
||||
endpoint: HttpUrl
|
||||
secret: str
|
||||
|
||||
model_config = ConfigDict(extra="forbid")
|
||||
|
||||
@field_validator("secret")
|
||||
def ensure_secret_prefix(cls, value: str) -> str:
|
||||
if not value.startswith("dify_"):
|
||||
raise ValueError("secret must start with dify_")
|
||||
return value
|
||||
```
|
||||
|
||||
### Generics & Protocols
|
||||
|
||||
- Use `typing.Protocol` to define behavioural contracts (e.g., cache interfaces).
|
||||
- Apply generics (`TypeVar`, `Generic`) for reusable utilities like caches or providers.
|
||||
- Validate dynamic inputs at runtime when generics cannot enforce safety alone.
|
||||
|
||||
### Tooling & Checks
|
||||
|
||||
Quick checks while iterating:
|
||||
|
||||
- Format: `make format`
|
||||
- Lint (includes auto-fix): `make lint`
|
||||
- Type check: `make type-check`
|
||||
- Targeted tests: `make test TARGET_TESTS=./api/tests/<target_tests>`
|
||||
|
||||
Before opening a PR / submitting:
|
||||
|
||||
- `make lint`
|
||||
- `make type-check`
|
||||
- `make test`
|
||||
|
||||
### Controllers & Services
|
||||
|
||||
- Controllers: parse input via Pydantic, invoke services, return serialised responses; no business logic.
|
||||
- Services: coordinate repositories, providers, background tasks; keep side effects explicit.
|
||||
- Document non-obvious behaviour with concise comments.
|
||||
|
||||
### Miscellaneous
|
||||
|
||||
- Use `configs.dify_config` for configuration—never read environment variables directly.
|
||||
- Maintain tenant awareness end-to-end; `tenant_id` must flow through every layer touching shared resources.
|
||||
- Queue async work through `services/async_workflow_service`; implement tasks under `tasks/` with explicit queue selection.
|
||||
- Keep experimental scripts under `dev/`; do not ship them in production builds.
|
||||
|
||||
@@ -1,115 +0,0 @@
|
||||
## Linter
|
||||
|
||||
- Always follow `.ruff.toml`.
|
||||
- Run `uv run ruff check --fix --unsafe-fixes`.
|
||||
- Keep each line under 100 characters (including spaces).
|
||||
|
||||
## Code Style
|
||||
|
||||
- `snake_case` for variables and functions.
|
||||
- `PascalCase` for classes.
|
||||
- `UPPER_CASE` for constants.
|
||||
|
||||
## Rules
|
||||
|
||||
- Use Pydantic v2 standard.
|
||||
- Use `uv` for package management.
|
||||
- Do not override dunder methods like `__init__`, `__iadd__`, etc.
|
||||
- Never launch services (`uv run app.py`, `flask run`, etc.); running tests under `tests/` is allowed.
|
||||
- Prefer simple functions over classes for lightweight helpers.
|
||||
- Keep files below 800 lines; split when necessary.
|
||||
- Keep code readable—no clever hacks.
|
||||
- Never use `print`; log with `logger = logging.getLogger(__name__)`.
|
||||
|
||||
## Guiding Principles
|
||||
|
||||
- Mirror the project’s layered architecture: controller → service → core/domain.
|
||||
- Reuse existing helpers in `core/`, `services/`, and `libs/` before creating new abstractions.
|
||||
- Optimise for observability: deterministic control flow, clear logging, actionable errors.
|
||||
|
||||
## SQLAlchemy Patterns
|
||||
|
||||
- Models inherit from `models.base.Base`; never create ad-hoc metadata or engines.
|
||||
|
||||
- Open sessions with context managers:
|
||||
|
||||
```python
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
with Session(db.engine, expire_on_commit=False) as session:
|
||||
stmt = select(Workflow).where(
|
||||
Workflow.id == workflow_id,
|
||||
Workflow.tenant_id == tenant_id,
|
||||
)
|
||||
workflow = session.execute(stmt).scalar_one_or_none()
|
||||
```
|
||||
|
||||
- Use SQLAlchemy expressions; avoid raw SQL unless necessary.
|
||||
|
||||
- Introduce repository abstractions only for very large tables (e.g., workflow executions) to support alternative storage strategies.
|
||||
|
||||
- Always scope queries by `tenant_id` and protect write paths with safeguards (`FOR UPDATE`, row counts, etc.).
|
||||
|
||||
## Storage & External IO
|
||||
|
||||
- Access storage via `extensions.ext_storage.storage`.
|
||||
- Use `core.helper.ssrf_proxy` for outbound HTTP fetches.
|
||||
- Background tasks that touch storage must be idempotent and log the relevant object identifiers.
|
||||
|
||||
## Pydantic Usage
|
||||
|
||||
- Define DTOs with Pydantic v2 models and forbid extras by default.
|
||||
|
||||
- Use `@field_validator` / `@model_validator` for domain rules.
|
||||
|
||||
- Example:
|
||||
|
||||
```python
|
||||
from pydantic import BaseModel, ConfigDict, HttpUrl, field_validator
|
||||
|
||||
class TriggerConfig(BaseModel):
|
||||
endpoint: HttpUrl
|
||||
secret: str
|
||||
|
||||
model_config = ConfigDict(extra="forbid")
|
||||
|
||||
@field_validator("secret")
|
||||
def ensure_secret_prefix(cls, value: str) -> str:
|
||||
if not value.startswith("dify_"):
|
||||
raise ValueError("secret must start with dify_")
|
||||
return value
|
||||
```
|
||||
|
||||
## Generics & Protocols
|
||||
|
||||
- Use `typing.Protocol` to define behavioural contracts (e.g., cache interfaces).
|
||||
- Apply generics (`TypeVar`, `Generic`) for reusable utilities like caches or providers.
|
||||
- Validate dynamic inputs at runtime when generics cannot enforce safety alone.
|
||||
|
||||
## Error Handling & Logging
|
||||
|
||||
- Raise domain-specific exceptions (`services/errors`, `core/errors`) and translate to HTTP responses in controllers.
|
||||
- Declare `logger = logging.getLogger(__name__)` at module top.
|
||||
- Include tenant/app/workflow identifiers in log context.
|
||||
- Log retryable events at `warning`, terminal failures at `error`.
|
||||
|
||||
## Tooling & Checks
|
||||
|
||||
- Format/lint: `uv run --project api --dev ruff format ./api` and `uv run --project api --dev ruff check --fix --unsafe-fixes ./api`.
|
||||
- Type checks: `uv run --directory api --dev basedpyright`.
|
||||
- Tests: `uv run --project api --dev dev/pytest/pytest_unit_tests.sh`.
|
||||
- Run all of the above before submitting your work.
|
||||
|
||||
## Controllers & Services
|
||||
|
||||
- Controllers: parse input via Pydantic, invoke services, return serialised responses; no business logic.
|
||||
- Services: coordinate repositories, providers, background tasks; keep side effects explicit.
|
||||
- Avoid repositories unless necessary; direct SQLAlchemy usage is preferred for typical tables.
|
||||
- Document non-obvious behaviour with concise comments.
|
||||
|
||||
## Miscellaneous
|
||||
|
||||
- Use `configs.dify_config` for configuration—never read environment variables directly.
|
||||
- Maintain tenant awareness end-to-end; `tenant_id` must flow through every layer touching shared resources.
|
||||
- Queue async work through `services/async_workflow_service`; implement tasks under `tasks/` with explicit queue selection.
|
||||
- Keep experimental scripts under `dev/`; do not ship them in production builds.
|
||||
154
api/commands.py
154
api/commands.py
@@ -1,7 +1,9 @@
|
||||
import base64
|
||||
import datetime
|
||||
import json
|
||||
import logging
|
||||
import secrets
|
||||
import time
|
||||
from typing import Any
|
||||
|
||||
import click
|
||||
@@ -34,7 +36,7 @@ from libs.rsa import generate_key_pair
|
||||
from models import Tenant
|
||||
from models.dataset import Dataset, DatasetCollectionBinding, DatasetMetadata, DatasetMetadataBinding, DocumentSegment
|
||||
from models.dataset import Document as DatasetDocument
|
||||
from models.model import Account, App, AppAnnotationSetting, AppMode, Conversation, MessageAnnotation, UploadFile
|
||||
from models.model import App, AppAnnotationSetting, AppMode, Conversation, MessageAnnotation, UploadFile
|
||||
from models.oauth import DatasourceOauthParamConfig, DatasourceProvider
|
||||
from models.provider import Provider, ProviderModel
|
||||
from models.provider_ids import DatasourceProviderID, ToolProviderID
|
||||
@@ -45,6 +47,9 @@ from services.clear_free_plan_tenant_expired_logs import ClearFreePlanTenantExpi
|
||||
from services.plugin.data_migration import PluginDataMigration
|
||||
from services.plugin.plugin_migration import PluginMigration
|
||||
from services.plugin.plugin_service import PluginService
|
||||
from services.retention.conversation.messages_clean_policy import create_message_clean_policy
|
||||
from services.retention.conversation.messages_clean_service import MessagesCleanService
|
||||
from services.retention.workflow_run.clear_free_plan_expired_workflow_run_logs import WorkflowRunCleanup
|
||||
from tasks.remove_app_and_related_data_task import delete_draft_variables_batch
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -62,8 +67,10 @@ def reset_password(email, new_password, password_confirm):
|
||||
if str(new_password).strip() != str(password_confirm).strip():
|
||||
click.echo(click.style("Passwords do not match.", fg="red"))
|
||||
return
|
||||
normalized_email = email.strip().lower()
|
||||
|
||||
with sessionmaker(db.engine, expire_on_commit=False).begin() as session:
|
||||
account = session.query(Account).where(Account.email == email).one_or_none()
|
||||
account = AccountService.get_account_by_email_with_case_fallback(email.strip(), session=session)
|
||||
|
||||
if not account:
|
||||
click.echo(click.style(f"Account not found for email: {email}", fg="red"))
|
||||
@@ -84,7 +91,7 @@ def reset_password(email, new_password, password_confirm):
|
||||
base64_password_hashed = base64.b64encode(password_hashed).decode()
|
||||
account.password = base64_password_hashed
|
||||
account.password_salt = base64_salt
|
||||
AccountService.reset_login_error_rate_limit(email)
|
||||
AccountService.reset_login_error_rate_limit(normalized_email)
|
||||
click.echo(click.style("Password reset successfully.", fg="green"))
|
||||
|
||||
|
||||
@@ -100,20 +107,22 @@ def reset_email(email, new_email, email_confirm):
|
||||
if str(new_email).strip() != str(email_confirm).strip():
|
||||
click.echo(click.style("New emails do not match.", fg="red"))
|
||||
return
|
||||
normalized_new_email = new_email.strip().lower()
|
||||
|
||||
with sessionmaker(db.engine, expire_on_commit=False).begin() as session:
|
||||
account = session.query(Account).where(Account.email == email).one_or_none()
|
||||
account = AccountService.get_account_by_email_with_case_fallback(email.strip(), session=session)
|
||||
|
||||
if not account:
|
||||
click.echo(click.style(f"Account not found for email: {email}", fg="red"))
|
||||
return
|
||||
|
||||
try:
|
||||
email_validate(new_email)
|
||||
email_validate(normalized_new_email)
|
||||
except:
|
||||
click.echo(click.style(f"Invalid email: {new_email}", fg="red"))
|
||||
return
|
||||
|
||||
account.email = new_email
|
||||
account.email = normalized_new_email
|
||||
click.echo(click.style("Email updated successfully.", fg="green"))
|
||||
|
||||
|
||||
@@ -658,7 +667,7 @@ def create_tenant(email: str, language: str | None = None, name: str | None = No
|
||||
return
|
||||
|
||||
# Create account
|
||||
email = email.strip()
|
||||
email = email.strip().lower()
|
||||
|
||||
if "@" not in email:
|
||||
click.echo(click.style("Invalid email address.", fg="red"))
|
||||
@@ -852,6 +861,61 @@ def clear_free_plan_tenant_expired_logs(days: int, batch: int, tenant_ids: list[
|
||||
click.echo(click.style("Clear free plan tenant expired logs completed.", fg="green"))
|
||||
|
||||
|
||||
@click.command("clean-workflow-runs", help="Clean expired workflow runs and related data for free tenants.")
|
||||
@click.option("--days", default=30, show_default=True, help="Delete workflow runs created before N days ago.")
|
||||
@click.option("--batch-size", default=200, show_default=True, help="Batch size for selecting workflow runs.")
|
||||
@click.option(
|
||||
"--start-from",
|
||||
type=click.DateTime(formats=["%Y-%m-%d", "%Y-%m-%dT%H:%M:%S"]),
|
||||
default=None,
|
||||
help="Optional lower bound (inclusive) for created_at; must be paired with --end-before.",
|
||||
)
|
||||
@click.option(
|
||||
"--end-before",
|
||||
type=click.DateTime(formats=["%Y-%m-%d", "%Y-%m-%dT%H:%M:%S"]),
|
||||
default=None,
|
||||
help="Optional upper bound (exclusive) for created_at; must be paired with --start-from.",
|
||||
)
|
||||
@click.option(
|
||||
"--dry-run",
|
||||
is_flag=True,
|
||||
help="Preview cleanup results without deleting any workflow run data.",
|
||||
)
|
||||
def clean_workflow_runs(
|
||||
days: int,
|
||||
batch_size: int,
|
||||
start_from: datetime.datetime | None,
|
||||
end_before: datetime.datetime | None,
|
||||
dry_run: bool,
|
||||
):
|
||||
"""
|
||||
Clean workflow runs and related workflow data for free tenants.
|
||||
"""
|
||||
if (start_from is None) ^ (end_before is None):
|
||||
raise click.UsageError("--start-from and --end-before must be provided together.")
|
||||
|
||||
start_time = datetime.datetime.now(datetime.UTC)
|
||||
click.echo(click.style(f"Starting workflow run cleanup at {start_time.isoformat()}.", fg="white"))
|
||||
|
||||
WorkflowRunCleanup(
|
||||
days=days,
|
||||
batch_size=batch_size,
|
||||
start_from=start_from,
|
||||
end_before=end_before,
|
||||
dry_run=dry_run,
|
||||
).run()
|
||||
|
||||
end_time = datetime.datetime.now(datetime.UTC)
|
||||
elapsed = end_time - start_time
|
||||
click.echo(
|
||||
click.style(
|
||||
f"Workflow run cleanup completed. start={start_time.isoformat()} "
|
||||
f"end={end_time.isoformat()} duration={elapsed}",
|
||||
fg="green",
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
@click.option("-f", "--force", is_flag=True, help="Skip user confirmation and force the command to execute.")
|
||||
@click.command("clear-orphaned-file-records", help="Clear orphaned file records.")
|
||||
def clear_orphaned_file_records(force: bool):
|
||||
@@ -2111,3 +2175,79 @@ def migrate_oss(
|
||||
except Exception as e:
|
||||
db.session.rollback()
|
||||
click.echo(click.style(f"Failed to update DB storage_type: {str(e)}", fg="red"))
|
||||
|
||||
|
||||
@click.command("clean-expired-messages", help="Clean expired messages.")
|
||||
@click.option(
|
||||
"--start-from",
|
||||
type=click.DateTime(formats=["%Y-%m-%d", "%Y-%m-%dT%H:%M:%S"]),
|
||||
required=True,
|
||||
help="Lower bound (inclusive) for created_at.",
|
||||
)
|
||||
@click.option(
|
||||
"--end-before",
|
||||
type=click.DateTime(formats=["%Y-%m-%d", "%Y-%m-%dT%H:%M:%S"]),
|
||||
required=True,
|
||||
help="Upper bound (exclusive) for created_at.",
|
||||
)
|
||||
@click.option("--batch-size", default=1000, show_default=True, help="Batch size for selecting messages.")
|
||||
@click.option(
|
||||
"--graceful-period",
|
||||
default=21,
|
||||
show_default=True,
|
||||
help="Graceful period in days after subscription expiration, will be ignored when billing is disabled.",
|
||||
)
|
||||
@click.option("--dry-run", is_flag=True, default=False, help="Show messages logs would be cleaned without deleting")
|
||||
def clean_expired_messages(
|
||||
batch_size: int,
|
||||
graceful_period: int,
|
||||
start_from: datetime.datetime,
|
||||
end_before: datetime.datetime,
|
||||
dry_run: bool,
|
||||
):
|
||||
"""
|
||||
Clean expired messages and related data for tenants based on clean policy.
|
||||
"""
|
||||
click.echo(click.style("clean_messages: start clean messages.", fg="green"))
|
||||
|
||||
start_at = time.perf_counter()
|
||||
|
||||
try:
|
||||
# Create policy based on billing configuration
|
||||
# NOTE: graceful_period will be ignored when billing is disabled.
|
||||
policy = create_message_clean_policy(graceful_period_days=graceful_period)
|
||||
|
||||
# Create and run the cleanup service
|
||||
service = MessagesCleanService.from_time_range(
|
||||
policy=policy,
|
||||
start_from=start_from,
|
||||
end_before=end_before,
|
||||
batch_size=batch_size,
|
||||
dry_run=dry_run,
|
||||
)
|
||||
stats = service.run()
|
||||
|
||||
end_at = time.perf_counter()
|
||||
click.echo(
|
||||
click.style(
|
||||
f"clean_messages: completed successfully\n"
|
||||
f" - Latency: {end_at - start_at:.2f}s\n"
|
||||
f" - Batches processed: {stats['batches']}\n"
|
||||
f" - Total messages scanned: {stats['total_messages']}\n"
|
||||
f" - Messages filtered: {stats['filtered_messages']}\n"
|
||||
f" - Messages deleted: {stats['total_deleted']}",
|
||||
fg="green",
|
||||
)
|
||||
)
|
||||
except Exception as e:
|
||||
end_at = time.perf_counter()
|
||||
logger.exception("clean_messages failed")
|
||||
click.echo(
|
||||
click.style(
|
||||
f"clean_messages: failed after {end_at - start_at:.2f}s - {str(e)}",
|
||||
fg="red",
|
||||
)
|
||||
)
|
||||
raise
|
||||
|
||||
click.echo(click.style("messages cleanup completed.", fg="green"))
|
||||
|
||||
@@ -949,6 +949,12 @@ class MailConfig(BaseSettings):
|
||||
default=False,
|
||||
)
|
||||
|
||||
SMTP_LOCAL_HOSTNAME: str | None = Field(
|
||||
description="Override the local hostname used in SMTP HELO/EHLO. "
|
||||
"Useful behind NAT or when the default hostname causes rejections.",
|
||||
default=None,
|
||||
)
|
||||
|
||||
EMAIL_SEND_IP_LIMIT_PER_MINUTE: PositiveInt = Field(
|
||||
description="Maximum number of emails allowed to be sent from the same IP address in a minute",
|
||||
default=50,
|
||||
@@ -1101,6 +1107,10 @@ class CeleryScheduleTasksConfig(BaseSettings):
|
||||
description="Enable clean messages task",
|
||||
default=False,
|
||||
)
|
||||
ENABLE_WORKFLOW_RUN_CLEANUP_TASK: bool = Field(
|
||||
description="Enable scheduled workflow run cleanup task",
|
||||
default=False,
|
||||
)
|
||||
ENABLE_MAIL_CLEAN_DOCUMENT_NOTIFY_TASK: bool = Field(
|
||||
description="Enable mail clean document notify task",
|
||||
default=False,
|
||||
|
||||
@@ -4,7 +4,7 @@ from pydantic_settings import BaseSettings
|
||||
|
||||
class VolcengineTOSStorageConfig(BaseSettings):
|
||||
"""
|
||||
Configuration settings for Volcengine Tinder Object Storage (TOS)
|
||||
Configuration settings for Volcengine Torch Object Storage (TOS)
|
||||
"""
|
||||
|
||||
VOLCENGINE_TOS_BUCKET_NAME: str | None = Field(
|
||||
|
||||
@@ -592,9 +592,12 @@ def _get_conversation(app_model, conversation_id):
|
||||
if not conversation:
|
||||
raise NotFound("Conversation Not Exists.")
|
||||
|
||||
if not conversation.read_at:
|
||||
conversation.read_at = naive_utc_now()
|
||||
conversation.read_account_id = current_user.id
|
||||
db.session.commit()
|
||||
db.session.execute(
|
||||
sa.update(Conversation)
|
||||
.where(Conversation.id == conversation_id, Conversation.read_at.is_(None))
|
||||
.values(read_at=naive_utc_now(), read_account_id=current_user.id)
|
||||
)
|
||||
db.session.commit()
|
||||
db.session.refresh(conversation)
|
||||
|
||||
return conversation
|
||||
|
||||
@@ -63,10 +63,9 @@ class ActivateCheckApi(Resource):
|
||||
args = ActivateCheckQuery.model_validate(request.args.to_dict(flat=True)) # type: ignore
|
||||
|
||||
workspaceId = args.workspace_id
|
||||
reg_email = args.email
|
||||
token = args.token
|
||||
|
||||
invitation = RegisterService.get_invitation_if_token_valid(workspaceId, reg_email, token)
|
||||
invitation = RegisterService.get_invitation_with_case_fallback(workspaceId, args.email, token)
|
||||
if invitation:
|
||||
data = invitation.get("data", {})
|
||||
tenant = invitation.get("tenant", None)
|
||||
@@ -100,11 +99,12 @@ class ActivateApi(Resource):
|
||||
def post(self):
|
||||
args = ActivatePayload.model_validate(console_ns.payload)
|
||||
|
||||
invitation = RegisterService.get_invitation_if_token_valid(args.workspace_id, args.email, args.token)
|
||||
normalized_request_email = args.email.lower() if args.email else None
|
||||
invitation = RegisterService.get_invitation_with_case_fallback(args.workspace_id, args.email, args.token)
|
||||
if invitation is None:
|
||||
raise AlreadyActivateError()
|
||||
|
||||
RegisterService.revoke_token(args.workspace_id, args.email, args.token)
|
||||
RegisterService.revoke_token(args.workspace_id, normalized_request_email, args.token)
|
||||
|
||||
account = invitation["account"]
|
||||
account.name = args.name
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
from flask import request
|
||||
from flask_restx import Resource
|
||||
from pydantic import BaseModel, Field, field_validator
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from configs import dify_config
|
||||
@@ -62,6 +61,7 @@ class EmailRegisterSendEmailApi(Resource):
|
||||
@email_register_enabled
|
||||
def post(self):
|
||||
args = EmailRegisterSendPayload.model_validate(console_ns.payload)
|
||||
normalized_email = args.email.lower()
|
||||
|
||||
ip_address = extract_remote_ip(request)
|
||||
if AccountService.is_email_send_ip_limit(ip_address):
|
||||
@@ -70,13 +70,12 @@ class EmailRegisterSendEmailApi(Resource):
|
||||
if args.language in languages:
|
||||
language = args.language
|
||||
|
||||
if dify_config.BILLING_ENABLED and BillingService.is_email_in_freeze(args.email):
|
||||
if dify_config.BILLING_ENABLED and BillingService.is_email_in_freeze(normalized_email):
|
||||
raise AccountInFreezeError()
|
||||
|
||||
with Session(db.engine) as session:
|
||||
account = session.execute(select(Account).filter_by(email=args.email)).scalar_one_or_none()
|
||||
token = None
|
||||
token = AccountService.send_email_register_email(email=args.email, account=account, language=language)
|
||||
account = AccountService.get_account_by_email_with_case_fallback(args.email, session=session)
|
||||
token = AccountService.send_email_register_email(email=normalized_email, account=account, language=language)
|
||||
return {"result": "success", "data": token}
|
||||
|
||||
|
||||
@@ -88,9 +87,9 @@ class EmailRegisterCheckApi(Resource):
|
||||
def post(self):
|
||||
args = EmailRegisterValidityPayload.model_validate(console_ns.payload)
|
||||
|
||||
user_email = args.email
|
||||
user_email = args.email.lower()
|
||||
|
||||
is_email_register_error_rate_limit = AccountService.is_email_register_error_rate_limit(args.email)
|
||||
is_email_register_error_rate_limit = AccountService.is_email_register_error_rate_limit(user_email)
|
||||
if is_email_register_error_rate_limit:
|
||||
raise EmailRegisterLimitError()
|
||||
|
||||
@@ -98,11 +97,14 @@ class EmailRegisterCheckApi(Resource):
|
||||
if token_data is None:
|
||||
raise InvalidTokenError()
|
||||
|
||||
if user_email != token_data.get("email"):
|
||||
token_email = token_data.get("email")
|
||||
normalized_token_email = token_email.lower() if isinstance(token_email, str) else token_email
|
||||
|
||||
if user_email != normalized_token_email:
|
||||
raise InvalidEmailError()
|
||||
|
||||
if args.code != token_data.get("code"):
|
||||
AccountService.add_email_register_error_rate_limit(args.email)
|
||||
AccountService.add_email_register_error_rate_limit(user_email)
|
||||
raise EmailCodeError()
|
||||
|
||||
# Verified, revoke the first token
|
||||
@@ -113,8 +115,8 @@ class EmailRegisterCheckApi(Resource):
|
||||
user_email, code=args.code, additional_data={"phase": "register"}
|
||||
)
|
||||
|
||||
AccountService.reset_email_register_error_rate_limit(args.email)
|
||||
return {"is_valid": True, "email": token_data.get("email"), "token": new_token}
|
||||
AccountService.reset_email_register_error_rate_limit(user_email)
|
||||
return {"is_valid": True, "email": normalized_token_email, "token": new_token}
|
||||
|
||||
|
||||
@console_ns.route("/email-register")
|
||||
@@ -141,22 +143,23 @@ class EmailRegisterResetApi(Resource):
|
||||
AccountService.revoke_email_register_token(args.token)
|
||||
|
||||
email = register_data.get("email", "")
|
||||
normalized_email = email.lower()
|
||||
|
||||
with Session(db.engine) as session:
|
||||
account = session.execute(select(Account).filter_by(email=email)).scalar_one_or_none()
|
||||
account = AccountService.get_account_by_email_with_case_fallback(email, session=session)
|
||||
|
||||
if account:
|
||||
raise EmailAlreadyInUseError()
|
||||
else:
|
||||
account = self._create_new_account(email, args.password_confirm)
|
||||
account = self._create_new_account(normalized_email, args.password_confirm)
|
||||
if not account:
|
||||
raise AccountNotFoundError()
|
||||
token_pair = AccountService.login(account=account, ip_address=extract_remote_ip(request))
|
||||
AccountService.reset_login_error_rate_limit(email)
|
||||
AccountService.reset_login_error_rate_limit(normalized_email)
|
||||
|
||||
return {"result": "success", "data": token_pair.model_dump()}
|
||||
|
||||
def _create_new_account(self, email, password) -> Account | None:
|
||||
def _create_new_account(self, email: str, password: str) -> Account | None:
|
||||
# Create new account if allowed
|
||||
account = None
|
||||
try:
|
||||
|
||||
@@ -4,7 +4,6 @@ import secrets
|
||||
from flask import request
|
||||
from flask_restx import Resource, fields
|
||||
from pydantic import BaseModel, Field, field_validator
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from controllers.console import console_ns
|
||||
@@ -21,7 +20,6 @@ from events.tenant_event import tenant_was_created
|
||||
from extensions.ext_database import db
|
||||
from libs.helper import EmailStr, extract_remote_ip
|
||||
from libs.password import hash_password, valid_password
|
||||
from models import Account
|
||||
from services.account_service import AccountService, TenantService
|
||||
from services.feature_service import FeatureService
|
||||
|
||||
@@ -76,6 +74,7 @@ class ForgotPasswordSendEmailApi(Resource):
|
||||
@email_password_login_enabled
|
||||
def post(self):
|
||||
args = ForgotPasswordSendPayload.model_validate(console_ns.payload)
|
||||
normalized_email = args.email.lower()
|
||||
|
||||
ip_address = extract_remote_ip(request)
|
||||
if AccountService.is_email_send_ip_limit(ip_address):
|
||||
@@ -87,11 +86,11 @@ class ForgotPasswordSendEmailApi(Resource):
|
||||
language = "en-US"
|
||||
|
||||
with Session(db.engine) as session:
|
||||
account = session.execute(select(Account).filter_by(email=args.email)).scalar_one_or_none()
|
||||
account = AccountService.get_account_by_email_with_case_fallback(args.email, session=session)
|
||||
|
||||
token = AccountService.send_reset_password_email(
|
||||
account=account,
|
||||
email=args.email,
|
||||
email=normalized_email,
|
||||
language=language,
|
||||
is_allow_register=FeatureService.get_system_features().is_allow_register,
|
||||
)
|
||||
@@ -122,9 +121,9 @@ class ForgotPasswordCheckApi(Resource):
|
||||
def post(self):
|
||||
args = ForgotPasswordCheckPayload.model_validate(console_ns.payload)
|
||||
|
||||
user_email = args.email
|
||||
user_email = args.email.lower()
|
||||
|
||||
is_forgot_password_error_rate_limit = AccountService.is_forgot_password_error_rate_limit(args.email)
|
||||
is_forgot_password_error_rate_limit = AccountService.is_forgot_password_error_rate_limit(user_email)
|
||||
if is_forgot_password_error_rate_limit:
|
||||
raise EmailPasswordResetLimitError()
|
||||
|
||||
@@ -132,11 +131,16 @@ class ForgotPasswordCheckApi(Resource):
|
||||
if token_data is None:
|
||||
raise InvalidTokenError()
|
||||
|
||||
if user_email != token_data.get("email"):
|
||||
token_email = token_data.get("email")
|
||||
if not isinstance(token_email, str):
|
||||
raise InvalidEmailError()
|
||||
normalized_token_email = token_email.lower()
|
||||
|
||||
if user_email != normalized_token_email:
|
||||
raise InvalidEmailError()
|
||||
|
||||
if args.code != token_data.get("code"):
|
||||
AccountService.add_forgot_password_error_rate_limit(args.email)
|
||||
AccountService.add_forgot_password_error_rate_limit(user_email)
|
||||
raise EmailCodeError()
|
||||
|
||||
# Verified, revoke the first token
|
||||
@@ -144,11 +148,11 @@ class ForgotPasswordCheckApi(Resource):
|
||||
|
||||
# Refresh token data by generating a new token
|
||||
_, new_token = AccountService.generate_reset_password_token(
|
||||
user_email, code=args.code, additional_data={"phase": "reset"}
|
||||
token_email, code=args.code, additional_data={"phase": "reset"}
|
||||
)
|
||||
|
||||
AccountService.reset_forgot_password_error_rate_limit(args.email)
|
||||
return {"is_valid": True, "email": token_data.get("email"), "token": new_token}
|
||||
AccountService.reset_forgot_password_error_rate_limit(user_email)
|
||||
return {"is_valid": True, "email": normalized_token_email, "token": new_token}
|
||||
|
||||
|
||||
@console_ns.route("/forgot-password/resets")
|
||||
@@ -187,9 +191,8 @@ class ForgotPasswordResetApi(Resource):
|
||||
password_hashed = hash_password(args.new_password, salt)
|
||||
|
||||
email = reset_data.get("email", "")
|
||||
|
||||
with Session(db.engine) as session:
|
||||
account = session.execute(select(Account).filter_by(email=email)).scalar_one_or_none()
|
||||
account = AccountService.get_account_by_email_with_case_fallback(email, session=session)
|
||||
|
||||
if account:
|
||||
self._update_existing_account(account, password_hashed, salt, session)
|
||||
|
||||
@@ -90,32 +90,38 @@ class LoginApi(Resource):
|
||||
def post(self):
|
||||
"""Authenticate user and login."""
|
||||
args = LoginPayload.model_validate(console_ns.payload)
|
||||
request_email = args.email
|
||||
normalized_email = request_email.lower()
|
||||
|
||||
if dify_config.BILLING_ENABLED and BillingService.is_email_in_freeze(args.email):
|
||||
if dify_config.BILLING_ENABLED and BillingService.is_email_in_freeze(normalized_email):
|
||||
raise AccountInFreezeError()
|
||||
|
||||
is_login_error_rate_limit = AccountService.is_login_error_rate_limit(args.email)
|
||||
is_login_error_rate_limit = AccountService.is_login_error_rate_limit(normalized_email)
|
||||
if is_login_error_rate_limit:
|
||||
raise EmailPasswordLoginLimitError()
|
||||
|
||||
invite_token = args.invite_token
|
||||
invitation_data: dict[str, Any] | None = None
|
||||
if args.invite_token:
|
||||
invitation_data = RegisterService.get_invitation_if_token_valid(None, args.email, args.invite_token)
|
||||
if invite_token:
|
||||
invitation_data = RegisterService.get_invitation_with_case_fallback(None, request_email, invite_token)
|
||||
if invitation_data is None:
|
||||
invite_token = None
|
||||
|
||||
try:
|
||||
if invitation_data:
|
||||
data = invitation_data.get("data", {})
|
||||
invitee_email = data.get("email") if data else None
|
||||
if invitee_email != args.email:
|
||||
invitee_email_normalized = invitee_email.lower() if isinstance(invitee_email, str) else invitee_email
|
||||
if invitee_email_normalized != normalized_email:
|
||||
raise InvalidEmailError()
|
||||
account = AccountService.authenticate(args.email, args.password, args.invite_token)
|
||||
else:
|
||||
account = AccountService.authenticate(args.email, args.password)
|
||||
account = _authenticate_account_with_case_fallback(
|
||||
request_email, normalized_email, args.password, invite_token
|
||||
)
|
||||
except services.errors.account.AccountLoginError:
|
||||
raise AccountBannedError()
|
||||
except services.errors.account.AccountPasswordError:
|
||||
AccountService.add_login_error_rate_limit(args.email)
|
||||
raise AuthenticationFailedError()
|
||||
except services.errors.account.AccountPasswordError as exc:
|
||||
AccountService.add_login_error_rate_limit(normalized_email)
|
||||
raise AuthenticationFailedError() from exc
|
||||
# SELF_HOSTED only have one workspace
|
||||
tenants = TenantService.get_join_tenants(account)
|
||||
if len(tenants) == 0:
|
||||
@@ -130,7 +136,7 @@ class LoginApi(Resource):
|
||||
}
|
||||
|
||||
token_pair = AccountService.login(account=account, ip_address=extract_remote_ip(request))
|
||||
AccountService.reset_login_error_rate_limit(args.email)
|
||||
AccountService.reset_login_error_rate_limit(normalized_email)
|
||||
|
||||
# Create response with cookies instead of returning tokens in body
|
||||
response = make_response({"result": "success"})
|
||||
@@ -170,18 +176,19 @@ class ResetPasswordSendEmailApi(Resource):
|
||||
@console_ns.expect(console_ns.models[EmailPayload.__name__])
|
||||
def post(self):
|
||||
args = EmailPayload.model_validate(console_ns.payload)
|
||||
normalized_email = args.email.lower()
|
||||
|
||||
if args.language is not None and args.language == "zh-Hans":
|
||||
language = "zh-Hans"
|
||||
else:
|
||||
language = "en-US"
|
||||
try:
|
||||
account = AccountService.get_user_through_email(args.email)
|
||||
account = _get_account_with_case_fallback(args.email)
|
||||
except AccountRegisterError:
|
||||
raise AccountInFreezeError()
|
||||
|
||||
token = AccountService.send_reset_password_email(
|
||||
email=args.email,
|
||||
email=normalized_email,
|
||||
account=account,
|
||||
language=language,
|
||||
is_allow_register=FeatureService.get_system_features().is_allow_register,
|
||||
@@ -196,6 +203,7 @@ class EmailCodeLoginSendEmailApi(Resource):
|
||||
@console_ns.expect(console_ns.models[EmailPayload.__name__])
|
||||
def post(self):
|
||||
args = EmailPayload.model_validate(console_ns.payload)
|
||||
normalized_email = args.email.lower()
|
||||
|
||||
ip_address = extract_remote_ip(request)
|
||||
if AccountService.is_email_send_ip_limit(ip_address):
|
||||
@@ -206,13 +214,13 @@ class EmailCodeLoginSendEmailApi(Resource):
|
||||
else:
|
||||
language = "en-US"
|
||||
try:
|
||||
account = AccountService.get_user_through_email(args.email)
|
||||
account = _get_account_with_case_fallback(args.email)
|
||||
except AccountRegisterError:
|
||||
raise AccountInFreezeError()
|
||||
|
||||
if account is None:
|
||||
if FeatureService.get_system_features().is_allow_register:
|
||||
token = AccountService.send_email_code_login_email(email=args.email, language=language)
|
||||
token = AccountService.send_email_code_login_email(email=normalized_email, language=language)
|
||||
else:
|
||||
raise AccountNotFound()
|
||||
else:
|
||||
@@ -229,14 +237,17 @@ class EmailCodeLoginApi(Resource):
|
||||
def post(self):
|
||||
args = EmailCodeLoginPayload.model_validate(console_ns.payload)
|
||||
|
||||
user_email = args.email
|
||||
original_email = args.email
|
||||
user_email = original_email.lower()
|
||||
language = args.language
|
||||
|
||||
token_data = AccountService.get_email_code_login_data(args.token)
|
||||
if token_data is None:
|
||||
raise InvalidTokenError()
|
||||
|
||||
if token_data["email"] != args.email:
|
||||
token_email = token_data.get("email")
|
||||
normalized_token_email = token_email.lower() if isinstance(token_email, str) else token_email
|
||||
if normalized_token_email != user_email:
|
||||
raise InvalidEmailError()
|
||||
|
||||
if token_data["code"] != args.code:
|
||||
@@ -244,7 +255,7 @@ class EmailCodeLoginApi(Resource):
|
||||
|
||||
AccountService.revoke_email_code_login_token(args.token)
|
||||
try:
|
||||
account = AccountService.get_user_through_email(user_email)
|
||||
account = _get_account_with_case_fallback(original_email)
|
||||
except AccountRegisterError:
|
||||
raise AccountInFreezeError()
|
||||
if account:
|
||||
@@ -275,7 +286,7 @@ class EmailCodeLoginApi(Resource):
|
||||
except WorkspacesLimitExceededError:
|
||||
raise WorkspacesLimitExceeded()
|
||||
token_pair = AccountService.login(account, ip_address=extract_remote_ip(request))
|
||||
AccountService.reset_login_error_rate_limit(args.email)
|
||||
AccountService.reset_login_error_rate_limit(user_email)
|
||||
|
||||
# Create response with cookies instead of returning tokens in body
|
||||
response = make_response({"result": "success"})
|
||||
@@ -309,3 +320,22 @@ class RefreshTokenApi(Resource):
|
||||
return response
|
||||
except Exception as e:
|
||||
return {"result": "fail", "message": str(e)}, 401
|
||||
|
||||
|
||||
def _get_account_with_case_fallback(email: str):
|
||||
account = AccountService.get_user_through_email(email)
|
||||
if account or email == email.lower():
|
||||
return account
|
||||
|
||||
return AccountService.get_user_through_email(email.lower())
|
||||
|
||||
|
||||
def _authenticate_account_with_case_fallback(
|
||||
original_email: str, normalized_email: str, password: str, invite_token: str | None
|
||||
):
|
||||
try:
|
||||
return AccountService.authenticate(original_email, password, invite_token)
|
||||
except services.errors.account.AccountPasswordError:
|
||||
if original_email == normalized_email:
|
||||
raise
|
||||
return AccountService.authenticate(normalized_email, password, invite_token)
|
||||
|
||||
@@ -3,7 +3,6 @@ import logging
|
||||
import httpx
|
||||
from flask import current_app, redirect, request
|
||||
from flask_restx import Resource
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy.orm import Session
|
||||
from werkzeug.exceptions import Unauthorized
|
||||
|
||||
@@ -118,7 +117,10 @@ class OAuthCallback(Resource):
|
||||
invitation = RegisterService.get_invitation_by_token(token=invite_token)
|
||||
if invitation:
|
||||
invitation_email = invitation.get("email", None)
|
||||
if invitation_email != user_info.email:
|
||||
invitation_email_normalized = (
|
||||
invitation_email.lower() if isinstance(invitation_email, str) else invitation_email
|
||||
)
|
||||
if invitation_email_normalized != user_info.email.lower():
|
||||
return redirect(f"{dify_config.CONSOLE_WEB_URL}/signin?message=Invalid invitation token.")
|
||||
|
||||
return redirect(f"{dify_config.CONSOLE_WEB_URL}/signin/invite-settings?invite_token={invite_token}")
|
||||
@@ -175,7 +177,7 @@ def _get_account_by_openid_or_email(provider: str, user_info: OAuthUserInfo) ->
|
||||
|
||||
if not account:
|
||||
with Session(db.engine) as session:
|
||||
account = session.execute(select(Account).filter_by(email=user_info.email)).scalar_one_or_none()
|
||||
account = AccountService.get_account_by_email_with_case_fallback(user_info.email, session=session)
|
||||
|
||||
return account
|
||||
|
||||
@@ -197,9 +199,10 @@ def _generate_account(provider: str, user_info: OAuthUserInfo) -> tuple[Account,
|
||||
tenant_was_created.send(new_tenant)
|
||||
|
||||
if not account:
|
||||
normalized_email = user_info.email.lower()
|
||||
oauth_new_user = True
|
||||
if not FeatureService.get_system_features().is_allow_register:
|
||||
if dify_config.BILLING_ENABLED and BillingService.is_email_in_freeze(user_info.email):
|
||||
if dify_config.BILLING_ENABLED and BillingService.is_email_in_freeze(normalized_email):
|
||||
raise AccountRegisterError(
|
||||
description=(
|
||||
"This email account has been deleted within the past "
|
||||
@@ -210,7 +213,11 @@ def _generate_account(provider: str, user_info: OAuthUserInfo) -> tuple[Account,
|
||||
raise AccountRegisterError(description=("Invalid email or password"))
|
||||
account_name = user_info.name or "Dify"
|
||||
account = RegisterService.register(
|
||||
email=user_info.email, name=account_name, password=None, open_id=user_info.id, provider=provider
|
||||
email=normalized_email,
|
||||
name=account_name,
|
||||
password=None,
|
||||
open_id=user_info.id,
|
||||
provider=provider,
|
||||
)
|
||||
|
||||
# Set interface language
|
||||
|
||||
@@ -7,7 +7,7 @@ from typing import Literal, cast
|
||||
import sqlalchemy as sa
|
||||
from flask import request
|
||||
from flask_restx import Resource, fields, marshal, marshal_with
|
||||
from pydantic import BaseModel
|
||||
from pydantic import BaseModel, Field
|
||||
from sqlalchemy import asc, desc, select
|
||||
from werkzeug.exceptions import Forbidden, NotFound
|
||||
|
||||
@@ -104,6 +104,15 @@ class DocumentRenamePayload(BaseModel):
|
||||
name: str
|
||||
|
||||
|
||||
class DocumentDatasetListParam(BaseModel):
|
||||
page: int = Field(1, title="Page", description="Page number.")
|
||||
limit: int = Field(20, title="Limit", description="Page size.")
|
||||
search: str | None = Field(None, alias="keyword", title="Search", description="Search keyword.")
|
||||
sort_by: str = Field("-created_at", alias="sort", title="SortBy", description="Sort by field.")
|
||||
status: str | None = Field(None, title="Status", description="Document status.")
|
||||
fetch_val: str = Field("false", alias="fetch")
|
||||
|
||||
|
||||
register_schema_models(
|
||||
console_ns,
|
||||
KnowledgeConfig,
|
||||
@@ -225,14 +234,16 @@ class DatasetDocumentListApi(Resource):
|
||||
def get(self, dataset_id):
|
||||
current_user, current_tenant_id = current_account_with_tenant()
|
||||
dataset_id = str(dataset_id)
|
||||
page = request.args.get("page", default=1, type=int)
|
||||
limit = request.args.get("limit", default=20, type=int)
|
||||
search = request.args.get("keyword", default=None, type=str)
|
||||
sort = request.args.get("sort", default="-created_at", type=str)
|
||||
status = request.args.get("status", default=None, type=str)
|
||||
raw_args = request.args.to_dict()
|
||||
param = DocumentDatasetListParam.model_validate(raw_args)
|
||||
page = param.page
|
||||
limit = param.limit
|
||||
search = param.search
|
||||
sort = param.sort_by
|
||||
status = param.status
|
||||
# "yes", "true", "t", "y", "1" convert to True, while others convert to False.
|
||||
try:
|
||||
fetch_val = request.args.get("fetch", default="false")
|
||||
fetch_val = param.fetch_val
|
||||
if isinstance(fetch_val, bool):
|
||||
fetch = fetch_val
|
||||
else:
|
||||
|
||||
@@ -81,7 +81,7 @@ class ExternalKnowledgeApiPayload(BaseModel):
|
||||
class ExternalDatasetCreatePayload(BaseModel):
|
||||
external_knowledge_api_id: str
|
||||
external_knowledge_id: str
|
||||
name: str = Field(..., min_length=1, max_length=40)
|
||||
name: str = Field(..., min_length=1, max_length=100)
|
||||
description: str | None = Field(None, max_length=400)
|
||||
external_retrieval_model: dict[str, object] | None = None
|
||||
|
||||
|
||||
@@ -84,10 +84,11 @@ class SetupApi(Resource):
|
||||
raise NotInitValidateError()
|
||||
|
||||
args = SetupRequestPayload.model_validate(console_ns.payload)
|
||||
normalized_email = args.email.lower()
|
||||
|
||||
# setup
|
||||
RegisterService.setup(
|
||||
email=args.email,
|
||||
email=normalized_email,
|
||||
name=args.name,
|
||||
password=args.password,
|
||||
ip_address=extract_remote_ip(request),
|
||||
|
||||
@@ -30,6 +30,11 @@ class TagBindingRemovePayload(BaseModel):
|
||||
type: Literal["knowledge", "app"] | None = Field(default=None, description="Tag type")
|
||||
|
||||
|
||||
class TagListQueryParam(BaseModel):
|
||||
type: Literal["knowledge", "app", ""] = Field("", description="Tag type filter")
|
||||
keyword: str | None = Field(None, description="Search keyword")
|
||||
|
||||
|
||||
register_schema_models(
|
||||
console_ns,
|
||||
TagBasePayload,
|
||||
@@ -43,12 +48,15 @@ class TagListApi(Resource):
|
||||
@setup_required
|
||||
@login_required
|
||||
@account_initialization_required
|
||||
@console_ns.doc(
|
||||
params={"type": 'Tag type filter. Can be "knowledge" or "app".', "keyword": "Search keyword for tag name."}
|
||||
)
|
||||
@marshal_with(dataset_tag_fields)
|
||||
def get(self):
|
||||
_, current_tenant_id = current_account_with_tenant()
|
||||
tag_type = request.args.get("type", type=str, default="")
|
||||
keyword = request.args.get("keyword", default=None, type=str)
|
||||
tags = TagService.get_tags(tag_type, current_tenant_id, keyword)
|
||||
raw_args = request.args.to_dict()
|
||||
param = TagListQueryParam.model_validate(raw_args)
|
||||
tags = TagService.get_tags(param.type, current_tenant_id, param.keyword)
|
||||
|
||||
return tags, 200
|
||||
|
||||
|
||||
@@ -41,7 +41,7 @@ from fields.member_fields import account_fields
|
||||
from libs.datetime_utils import naive_utc_now
|
||||
from libs.helper import EmailStr, TimestampField, extract_remote_ip, timezone
|
||||
from libs.login import current_account_with_tenant, login_required
|
||||
from models import Account, AccountIntegrate, InvitationCode
|
||||
from models import AccountIntegrate, InvitationCode
|
||||
from services.account_service import AccountService
|
||||
from services.billing_service import BillingService
|
||||
from services.errors.account import CurrentPasswordIncorrectError as ServiceCurrentPasswordIncorrectError
|
||||
@@ -536,7 +536,8 @@ class ChangeEmailSendEmailApi(Resource):
|
||||
else:
|
||||
language = "en-US"
|
||||
account = None
|
||||
user_email = args.email
|
||||
user_email = None
|
||||
email_for_sending = args.email.lower()
|
||||
if args.phase is not None and args.phase == "new_email":
|
||||
if args.token is None:
|
||||
raise InvalidTokenError()
|
||||
@@ -546,16 +547,24 @@ class ChangeEmailSendEmailApi(Resource):
|
||||
raise InvalidTokenError()
|
||||
user_email = reset_data.get("email", "")
|
||||
|
||||
if user_email != current_user.email:
|
||||
if user_email.lower() != current_user.email.lower():
|
||||
raise InvalidEmailError()
|
||||
|
||||
user_email = current_user.email
|
||||
else:
|
||||
with Session(db.engine) as session:
|
||||
account = session.execute(select(Account).filter_by(email=args.email)).scalar_one_or_none()
|
||||
account = AccountService.get_account_by_email_with_case_fallback(args.email, session=session)
|
||||
if account is None:
|
||||
raise AccountNotFound()
|
||||
email_for_sending = account.email
|
||||
user_email = account.email
|
||||
|
||||
token = AccountService.send_change_email_email(
|
||||
account=account, email=args.email, old_email=user_email, language=language, phase=args.phase
|
||||
account=account,
|
||||
email=email_for_sending,
|
||||
old_email=user_email,
|
||||
language=language,
|
||||
phase=args.phase,
|
||||
)
|
||||
return {"result": "success", "data": token}
|
||||
|
||||
@@ -571,9 +580,9 @@ class ChangeEmailCheckApi(Resource):
|
||||
payload = console_ns.payload or {}
|
||||
args = ChangeEmailValidityPayload.model_validate(payload)
|
||||
|
||||
user_email = args.email
|
||||
user_email = args.email.lower()
|
||||
|
||||
is_change_email_error_rate_limit = AccountService.is_change_email_error_rate_limit(args.email)
|
||||
is_change_email_error_rate_limit = AccountService.is_change_email_error_rate_limit(user_email)
|
||||
if is_change_email_error_rate_limit:
|
||||
raise EmailChangeLimitError()
|
||||
|
||||
@@ -581,11 +590,13 @@ class ChangeEmailCheckApi(Resource):
|
||||
if token_data is None:
|
||||
raise InvalidTokenError()
|
||||
|
||||
if user_email != token_data.get("email"):
|
||||
token_email = token_data.get("email")
|
||||
normalized_token_email = token_email.lower() if isinstance(token_email, str) else token_email
|
||||
if user_email != normalized_token_email:
|
||||
raise InvalidEmailError()
|
||||
|
||||
if args.code != token_data.get("code"):
|
||||
AccountService.add_change_email_error_rate_limit(args.email)
|
||||
AccountService.add_change_email_error_rate_limit(user_email)
|
||||
raise EmailCodeError()
|
||||
|
||||
# Verified, revoke the first token
|
||||
@@ -596,8 +607,8 @@ class ChangeEmailCheckApi(Resource):
|
||||
user_email, code=args.code, old_email=token_data.get("old_email"), additional_data={}
|
||||
)
|
||||
|
||||
AccountService.reset_change_email_error_rate_limit(args.email)
|
||||
return {"is_valid": True, "email": token_data.get("email"), "token": new_token}
|
||||
AccountService.reset_change_email_error_rate_limit(user_email)
|
||||
return {"is_valid": True, "email": normalized_token_email, "token": new_token}
|
||||
|
||||
|
||||
@console_ns.route("/account/change-email/reset")
|
||||
@@ -611,11 +622,12 @@ class ChangeEmailResetApi(Resource):
|
||||
def post(self):
|
||||
payload = console_ns.payload or {}
|
||||
args = ChangeEmailResetPayload.model_validate(payload)
|
||||
normalized_new_email = args.new_email.lower()
|
||||
|
||||
if AccountService.is_account_in_freeze(args.new_email):
|
||||
if AccountService.is_account_in_freeze(normalized_new_email):
|
||||
raise AccountInFreezeError()
|
||||
|
||||
if not AccountService.check_email_unique(args.new_email):
|
||||
if not AccountService.check_email_unique(normalized_new_email):
|
||||
raise EmailAlreadyInUseError()
|
||||
|
||||
reset_data = AccountService.get_change_email_data(args.token)
|
||||
@@ -626,13 +638,13 @@ class ChangeEmailResetApi(Resource):
|
||||
|
||||
old_email = reset_data.get("old_email", "")
|
||||
current_user, _ = current_account_with_tenant()
|
||||
if current_user.email != old_email:
|
||||
if current_user.email.lower() != old_email.lower():
|
||||
raise AccountNotFound()
|
||||
|
||||
updated_account = AccountService.update_account_email(current_user, email=args.new_email)
|
||||
updated_account = AccountService.update_account_email(current_user, email=normalized_new_email)
|
||||
|
||||
AccountService.send_change_email_completed_notify_email(
|
||||
email=args.new_email,
|
||||
email=normalized_new_email,
|
||||
)
|
||||
|
||||
return updated_account
|
||||
@@ -645,8 +657,9 @@ class CheckEmailUnique(Resource):
|
||||
def post(self):
|
||||
payload = console_ns.payload or {}
|
||||
args = CheckEmailUniquePayload.model_validate(payload)
|
||||
if AccountService.is_account_in_freeze(args.email):
|
||||
normalized_email = args.email.lower()
|
||||
if AccountService.is_account_in_freeze(normalized_email):
|
||||
raise AccountInFreezeError()
|
||||
if not AccountService.check_email_unique(args.email):
|
||||
if not AccountService.check_email_unique(normalized_email):
|
||||
raise EmailAlreadyInUseError()
|
||||
return {"result": "success"}
|
||||
|
||||
@@ -116,26 +116,31 @@ class MemberInviteEmailApi(Resource):
|
||||
raise WorkspaceMembersLimitExceeded()
|
||||
|
||||
for invitee_email in invitee_emails:
|
||||
normalized_invitee_email = invitee_email.lower()
|
||||
try:
|
||||
if not inviter.current_tenant:
|
||||
raise ValueError("No current tenant")
|
||||
token = RegisterService.invite_new_member(
|
||||
inviter.current_tenant, invitee_email, interface_language, role=invitee_role, inviter=inviter
|
||||
tenant=inviter.current_tenant,
|
||||
email=invitee_email,
|
||||
language=interface_language,
|
||||
role=invitee_role,
|
||||
inviter=inviter,
|
||||
)
|
||||
encoded_invitee_email = parse.quote(invitee_email)
|
||||
encoded_invitee_email = parse.quote(normalized_invitee_email)
|
||||
invitation_results.append(
|
||||
{
|
||||
"status": "success",
|
||||
"email": invitee_email,
|
||||
"email": normalized_invitee_email,
|
||||
"url": f"{console_web_url}/activate?email={encoded_invitee_email}&token={token}",
|
||||
}
|
||||
)
|
||||
except AccountAlreadyInTenantError:
|
||||
invitation_results.append(
|
||||
{"status": "success", "email": invitee_email, "url": f"{console_web_url}/signin"}
|
||||
{"status": "success", "email": normalized_invitee_email, "url": f"{console_web_url}/signin"}
|
||||
)
|
||||
except Exception as e:
|
||||
invitation_results.append({"status": "failed", "email": invitee_email, "message": str(e)})
|
||||
invitation_results.append({"status": "failed", "email": normalized_invitee_email, "message": str(e)})
|
||||
|
||||
return {
|
||||
"result": "success",
|
||||
|
||||
@@ -4,7 +4,6 @@ import secrets
|
||||
from flask import request
|
||||
from flask_restx import Resource
|
||||
from pydantic import BaseModel, Field, field_validator
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from controllers.common.schema import register_schema_models
|
||||
@@ -22,7 +21,7 @@ from controllers.web import web_ns
|
||||
from extensions.ext_database import db
|
||||
from libs.helper import EmailStr, extract_remote_ip
|
||||
from libs.password import hash_password, valid_password
|
||||
from models import Account
|
||||
from models.account import Account
|
||||
from services.account_service import AccountService
|
||||
|
||||
|
||||
@@ -70,6 +69,9 @@ class ForgotPasswordSendEmailApi(Resource):
|
||||
def post(self):
|
||||
payload = ForgotPasswordSendPayload.model_validate(web_ns.payload or {})
|
||||
|
||||
request_email = payload.email
|
||||
normalized_email = request_email.lower()
|
||||
|
||||
ip_address = extract_remote_ip(request)
|
||||
if AccountService.is_email_send_ip_limit(ip_address):
|
||||
raise EmailSendIpLimitError()
|
||||
@@ -80,12 +82,12 @@ class ForgotPasswordSendEmailApi(Resource):
|
||||
language = "en-US"
|
||||
|
||||
with Session(db.engine) as session:
|
||||
account = session.execute(select(Account).filter_by(email=payload.email)).scalar_one_or_none()
|
||||
account = AccountService.get_account_by_email_with_case_fallback(request_email, session=session)
|
||||
token = None
|
||||
if account is None:
|
||||
raise AuthenticationFailedError()
|
||||
else:
|
||||
token = AccountService.send_reset_password_email(account=account, email=payload.email, language=language)
|
||||
token = AccountService.send_reset_password_email(account=account, email=normalized_email, language=language)
|
||||
|
||||
return {"result": "success", "data": token}
|
||||
|
||||
@@ -104,9 +106,9 @@ class ForgotPasswordCheckApi(Resource):
|
||||
def post(self):
|
||||
payload = ForgotPasswordCheckPayload.model_validate(web_ns.payload or {})
|
||||
|
||||
user_email = payload.email
|
||||
user_email = payload.email.lower()
|
||||
|
||||
is_forgot_password_error_rate_limit = AccountService.is_forgot_password_error_rate_limit(payload.email)
|
||||
is_forgot_password_error_rate_limit = AccountService.is_forgot_password_error_rate_limit(user_email)
|
||||
if is_forgot_password_error_rate_limit:
|
||||
raise EmailPasswordResetLimitError()
|
||||
|
||||
@@ -114,11 +116,16 @@ class ForgotPasswordCheckApi(Resource):
|
||||
if token_data is None:
|
||||
raise InvalidTokenError()
|
||||
|
||||
if user_email != token_data.get("email"):
|
||||
token_email = token_data.get("email")
|
||||
if not isinstance(token_email, str):
|
||||
raise InvalidEmailError()
|
||||
normalized_token_email = token_email.lower()
|
||||
|
||||
if user_email != normalized_token_email:
|
||||
raise InvalidEmailError()
|
||||
|
||||
if payload.code != token_data.get("code"):
|
||||
AccountService.add_forgot_password_error_rate_limit(payload.email)
|
||||
AccountService.add_forgot_password_error_rate_limit(user_email)
|
||||
raise EmailCodeError()
|
||||
|
||||
# Verified, revoke the first token
|
||||
@@ -126,11 +133,11 @@ class ForgotPasswordCheckApi(Resource):
|
||||
|
||||
# Refresh token data by generating a new token
|
||||
_, new_token = AccountService.generate_reset_password_token(
|
||||
user_email, code=payload.code, additional_data={"phase": "reset"}
|
||||
token_email, code=payload.code, additional_data={"phase": "reset"}
|
||||
)
|
||||
|
||||
AccountService.reset_forgot_password_error_rate_limit(payload.email)
|
||||
return {"is_valid": True, "email": token_data.get("email"), "token": new_token}
|
||||
AccountService.reset_forgot_password_error_rate_limit(user_email)
|
||||
return {"is_valid": True, "email": normalized_token_email, "token": new_token}
|
||||
|
||||
|
||||
@web_ns.route("/forgot-password/resets")
|
||||
@@ -174,7 +181,7 @@ class ForgotPasswordResetApi(Resource):
|
||||
email = reset_data.get("email", "")
|
||||
|
||||
with Session(db.engine) as session:
|
||||
account = session.execute(select(Account).filter_by(email=email)).scalar_one_or_none()
|
||||
account = AccountService.get_account_by_email_with_case_fallback(email, session=session)
|
||||
|
||||
if account:
|
||||
self._update_existing_account(account, password_hashed, salt, session)
|
||||
|
||||
@@ -197,25 +197,29 @@ class EmailCodeLoginApi(Resource):
|
||||
)
|
||||
args = parser.parse_args()
|
||||
|
||||
user_email = args["email"]
|
||||
user_email = args["email"].lower()
|
||||
|
||||
token_data = WebAppAuthService.get_email_code_login_data(args["token"])
|
||||
if token_data is None:
|
||||
raise InvalidTokenError()
|
||||
|
||||
if token_data["email"] != args["email"]:
|
||||
token_email = token_data.get("email")
|
||||
if not isinstance(token_email, str):
|
||||
raise InvalidEmailError()
|
||||
normalized_token_email = token_email.lower()
|
||||
if normalized_token_email != user_email:
|
||||
raise InvalidEmailError()
|
||||
|
||||
if token_data["code"] != args["code"]:
|
||||
raise EmailCodeError()
|
||||
|
||||
WebAppAuthService.revoke_email_code_login_token(args["token"])
|
||||
account = WebAppAuthService.get_user_through_email(user_email)
|
||||
account = WebAppAuthService.get_user_through_email(token_email)
|
||||
if not account:
|
||||
raise AuthenticationFailedError()
|
||||
|
||||
token = WebAppAuthService.login(account=account)
|
||||
AccountService.reset_login_error_rate_limit(args["email"])
|
||||
AccountService.reset_login_error_rate_limit(user_email)
|
||||
response = make_response({"result": "success", "data": {"access_token": token}})
|
||||
# set_access_token_to_cookie(request, response, token, samesite="None", httponly=False)
|
||||
return response
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import json
|
||||
import logging
|
||||
import uuid
|
||||
from decimal import Decimal
|
||||
from typing import Union, cast
|
||||
|
||||
from sqlalchemy import select
|
||||
@@ -41,6 +42,7 @@ from core.tools.tool_manager import ToolManager
|
||||
from core.tools.utils.dataset_retriever_tool import DatasetRetrieverTool
|
||||
from extensions.ext_database import db
|
||||
from factories import file_factory
|
||||
from models.enums import CreatorUserRole
|
||||
from models.model import Conversation, Message, MessageAgentThought, MessageFile
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -289,6 +291,7 @@ class BaseAgentRunner(AppRunner):
|
||||
thought = MessageAgentThought(
|
||||
message_id=message_id,
|
||||
message_chain_id=None,
|
||||
tool_process_data=None,
|
||||
thought="",
|
||||
tool=tool_name,
|
||||
tool_labels_str="{}",
|
||||
@@ -296,20 +299,20 @@ class BaseAgentRunner(AppRunner):
|
||||
tool_input=tool_input,
|
||||
message=message,
|
||||
message_token=0,
|
||||
message_unit_price=0,
|
||||
message_price_unit=0,
|
||||
message_unit_price=Decimal(0),
|
||||
message_price_unit=Decimal("0.001"),
|
||||
message_files=json.dumps(messages_ids) if messages_ids else "",
|
||||
answer="",
|
||||
observation="",
|
||||
answer_token=0,
|
||||
answer_unit_price=0,
|
||||
answer_price_unit=0,
|
||||
answer_unit_price=Decimal(0),
|
||||
answer_price_unit=Decimal("0.001"),
|
||||
tokens=0,
|
||||
total_price=0,
|
||||
total_price=Decimal(0),
|
||||
position=self.agent_thought_count + 1,
|
||||
currency="USD",
|
||||
latency=0,
|
||||
created_by_role="account",
|
||||
created_by_role=CreatorUserRole.ACCOUNT,
|
||||
created_by=self.user_id,
|
||||
)
|
||||
|
||||
@@ -342,7 +345,8 @@ class BaseAgentRunner(AppRunner):
|
||||
raise ValueError("agent thought not found")
|
||||
|
||||
if thought:
|
||||
agent_thought.thought += thought
|
||||
existing_thought = agent_thought.thought or ""
|
||||
agent_thought.thought = f"{existing_thought}{thought}"
|
||||
|
||||
if tool_name:
|
||||
agent_thought.tool = tool_name
|
||||
@@ -440,21 +444,30 @@ class BaseAgentRunner(AppRunner):
|
||||
agent_thoughts: list[MessageAgentThought] = message.agent_thoughts
|
||||
if agent_thoughts:
|
||||
for agent_thought in agent_thoughts:
|
||||
tools = agent_thought.tool
|
||||
if tools:
|
||||
tools = tools.split(";")
|
||||
tool_names_raw = agent_thought.tool
|
||||
if tool_names_raw:
|
||||
tool_names = tool_names_raw.split(";")
|
||||
tool_calls: list[AssistantPromptMessage.ToolCall] = []
|
||||
tool_call_response: list[ToolPromptMessage] = []
|
||||
try:
|
||||
tool_inputs = json.loads(agent_thought.tool_input)
|
||||
except Exception:
|
||||
tool_inputs = {tool: {} for tool in tools}
|
||||
try:
|
||||
tool_responses = json.loads(agent_thought.observation)
|
||||
except Exception:
|
||||
tool_responses = dict.fromkeys(tools, agent_thought.observation)
|
||||
tool_input_payload = agent_thought.tool_input
|
||||
if tool_input_payload:
|
||||
try:
|
||||
tool_inputs = json.loads(tool_input_payload)
|
||||
except Exception:
|
||||
tool_inputs = {tool: {} for tool in tool_names}
|
||||
else:
|
||||
tool_inputs = {tool: {} for tool in tool_names}
|
||||
|
||||
for tool in tools:
|
||||
observation_payload = agent_thought.observation
|
||||
if observation_payload:
|
||||
try:
|
||||
tool_responses = json.loads(observation_payload)
|
||||
except Exception:
|
||||
tool_responses = dict.fromkeys(tool_names, observation_payload)
|
||||
else:
|
||||
tool_responses = dict.fromkeys(tool_names, observation_payload)
|
||||
|
||||
for tool in tool_names:
|
||||
# generate a uuid for tool call
|
||||
tool_call_id = str(uuid.uuid4())
|
||||
tool_calls.append(
|
||||
@@ -484,7 +497,7 @@ class BaseAgentRunner(AppRunner):
|
||||
*tool_call_response,
|
||||
]
|
||||
)
|
||||
if not tools:
|
||||
if not tool_names_raw:
|
||||
result.append(AssistantPromptMessage(content=agent_thought.thought))
|
||||
else:
|
||||
if message.answer:
|
||||
|
||||
@@ -188,7 +188,7 @@ class FunctionCallAgentRunner(BaseAgentRunner):
|
||||
),
|
||||
)
|
||||
|
||||
assistant_message = AssistantPromptMessage(content="", tool_calls=[])
|
||||
assistant_message = AssistantPromptMessage(content=response, tool_calls=[])
|
||||
if tool_calls:
|
||||
assistant_message.tool_calls = [
|
||||
AssistantPromptMessage.ToolCall(
|
||||
@@ -200,8 +200,6 @@ class FunctionCallAgentRunner(BaseAgentRunner):
|
||||
)
|
||||
for tool_call in tool_calls
|
||||
]
|
||||
else:
|
||||
assistant_message.content = response
|
||||
|
||||
self._current_thoughts.append(assistant_message)
|
||||
|
||||
|
||||
@@ -24,7 +24,7 @@ from core.app.layers.conversation_variable_persist_layer import ConversationVari
|
||||
from core.db.session_factory import session_factory
|
||||
from core.moderation.base import ModerationError
|
||||
from core.moderation.input_moderation import InputModeration
|
||||
from core.variables.variables import VariableUnion
|
||||
from core.variables.variables import Variable
|
||||
from core.workflow.enums import WorkflowType
|
||||
from core.workflow.graph_engine.command_channels.redis_channel import RedisChannel
|
||||
from core.workflow.graph_engine.layers.base import GraphEngineLayer
|
||||
@@ -149,8 +149,8 @@ class AdvancedChatAppRunner(WorkflowBasedAppRunner):
|
||||
system_variables=system_inputs,
|
||||
user_inputs=inputs,
|
||||
environment_variables=self._workflow.environment_variables,
|
||||
# Based on the definition of `VariableUnion`,
|
||||
# `list[Variable]` can be safely used as `list[VariableUnion]` since they are compatible.
|
||||
# Based on the definition of `Variable`,
|
||||
# `VariableBase` instances can be safely used as `Variable` since they are compatible.
|
||||
conversation_variables=conversation_variables,
|
||||
)
|
||||
|
||||
@@ -318,7 +318,7 @@ class AdvancedChatAppRunner(WorkflowBasedAppRunner):
|
||||
trace_manager=app_generate_entity.trace_manager,
|
||||
)
|
||||
|
||||
def _initialize_conversation_variables(self) -> list[VariableUnion]:
|
||||
def _initialize_conversation_variables(self) -> list[Variable]:
|
||||
"""
|
||||
Initialize conversation variables for the current conversation.
|
||||
|
||||
@@ -343,7 +343,7 @@ class AdvancedChatAppRunner(WorkflowBasedAppRunner):
|
||||
conversation_variables = [var.to_variable() for var in existing_variables]
|
||||
|
||||
session.commit()
|
||||
return cast(list[VariableUnion], conversation_variables)
|
||||
return cast(list[Variable], conversation_variables)
|
||||
|
||||
def _load_existing_conversation_variables(self, session: Session) -> list[ConversationVariable]:
|
||||
"""
|
||||
|
||||
@@ -189,7 +189,7 @@ class BaseAppGenerator:
|
||||
elif value == 0:
|
||||
value = False
|
||||
case VariableEntityType.JSON_OBJECT:
|
||||
if not isinstance(value, dict):
|
||||
if value and not isinstance(value, dict):
|
||||
raise ValueError(f"{variable_entity.variable} in input form must be a dict")
|
||||
case _:
|
||||
raise AssertionError("this statement should be unreachable.")
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
import logging
|
||||
|
||||
from core.variables import Variable
|
||||
from core.variables import VariableBase
|
||||
from core.workflow.constants import CONVERSATION_VARIABLE_NODE_ID
|
||||
from core.workflow.conversation_variable_updater import ConversationVariableUpdater
|
||||
from core.workflow.enums import NodeType
|
||||
@@ -44,7 +44,7 @@ class ConversationVariablePersistenceLayer(GraphEngineLayer):
|
||||
if selector[0] != CONVERSATION_VARIABLE_NODE_ID:
|
||||
continue
|
||||
variable = self.graph_runtime_state.variable_pool.get(selector)
|
||||
if not isinstance(variable, Variable):
|
||||
if not isinstance(variable, VariableBase):
|
||||
logger.warning(
|
||||
"Conversation variable not found in variable pool. selector=%s",
|
||||
selector,
|
||||
|
||||
@@ -33,6 +33,10 @@ class MaxRetriesExceededError(ValueError):
|
||||
pass
|
||||
|
||||
|
||||
request_error = httpx.RequestError
|
||||
max_retries_exceeded_error = MaxRetriesExceededError
|
||||
|
||||
|
||||
def _create_proxy_mounts() -> dict[str, httpx.HTTPTransport]:
|
||||
return {
|
||||
"http://": httpx.HTTPTransport(
|
||||
|
||||
@@ -71,8 +71,8 @@ class LLMGenerator:
|
||||
response: LLMResult = model_instance.invoke_llm(
|
||||
prompt_messages=list(prompts), model_parameters={"max_tokens": 500, "temperature": 1}, stream=False
|
||||
)
|
||||
answer = cast(str, response.message.content)
|
||||
if answer is None:
|
||||
answer = response.message.get_text_content()
|
||||
if answer == "":
|
||||
return ""
|
||||
try:
|
||||
result_dict = json.loads(answer)
|
||||
@@ -184,7 +184,7 @@ class LLMGenerator:
|
||||
prompt_messages=list(prompt_messages), model_parameters=model_parameters, stream=False
|
||||
)
|
||||
|
||||
rule_config["prompt"] = cast(str, response.message.content)
|
||||
rule_config["prompt"] = response.message.get_text_content()
|
||||
|
||||
except InvokeError as e:
|
||||
error = str(e)
|
||||
@@ -237,13 +237,11 @@ class LLMGenerator:
|
||||
|
||||
return rule_config
|
||||
|
||||
rule_config["prompt"] = cast(str, prompt_content.message.content)
|
||||
rule_config["prompt"] = prompt_content.message.get_text_content()
|
||||
|
||||
if not isinstance(prompt_content.message.content, str):
|
||||
raise NotImplementedError("prompt content is not a string")
|
||||
parameter_generate_prompt = parameter_template.format(
|
||||
inputs={
|
||||
"INPUT_TEXT": prompt_content.message.content,
|
||||
"INPUT_TEXT": prompt_content.message.get_text_content(),
|
||||
},
|
||||
remove_template_variables=False,
|
||||
)
|
||||
@@ -253,7 +251,7 @@ class LLMGenerator:
|
||||
statement_generate_prompt = statement_template.format(
|
||||
inputs={
|
||||
"TASK_DESCRIPTION": instruction,
|
||||
"INPUT_TEXT": prompt_content.message.content,
|
||||
"INPUT_TEXT": prompt_content.message.get_text_content(),
|
||||
},
|
||||
remove_template_variables=False,
|
||||
)
|
||||
@@ -263,7 +261,7 @@ class LLMGenerator:
|
||||
parameter_content: LLMResult = model_instance.invoke_llm(
|
||||
prompt_messages=list(parameter_messages), model_parameters=model_parameters, stream=False
|
||||
)
|
||||
rule_config["variables"] = re.findall(r'"\s*([^"]+)\s*"', cast(str, parameter_content.message.content))
|
||||
rule_config["variables"] = re.findall(r'"\s*([^"]+)\s*"', parameter_content.message.get_text_content())
|
||||
except InvokeError as e:
|
||||
error = str(e)
|
||||
error_step = "generate variables"
|
||||
@@ -272,7 +270,7 @@ class LLMGenerator:
|
||||
statement_content: LLMResult = model_instance.invoke_llm(
|
||||
prompt_messages=list(statement_messages), model_parameters=model_parameters, stream=False
|
||||
)
|
||||
rule_config["opening_statement"] = cast(str, statement_content.message.content)
|
||||
rule_config["opening_statement"] = statement_content.message.get_text_content()
|
||||
except InvokeError as e:
|
||||
error = str(e)
|
||||
error_step = "generate conversation opener"
|
||||
@@ -315,7 +313,7 @@ class LLMGenerator:
|
||||
prompt_messages=list(prompt_messages), model_parameters=model_parameters, stream=False
|
||||
)
|
||||
|
||||
generated_code = cast(str, response.message.content)
|
||||
generated_code = response.message.get_text_content()
|
||||
return {"code": generated_code, "language": code_language, "error": ""}
|
||||
|
||||
except InvokeError as e:
|
||||
@@ -351,7 +349,7 @@ class LLMGenerator:
|
||||
raise TypeError("Expected LLMResult when stream=False")
|
||||
response = result
|
||||
|
||||
answer = cast(str, response.message.content)
|
||||
answer = response.message.get_text_content()
|
||||
return answer.strip()
|
||||
|
||||
@classmethod
|
||||
@@ -375,10 +373,7 @@ class LLMGenerator:
|
||||
prompt_messages=list(prompt_messages), model_parameters=model_parameters, stream=False
|
||||
)
|
||||
|
||||
raw_content = response.message.content
|
||||
|
||||
if not isinstance(raw_content, str):
|
||||
raise ValueError(f"LLM response content must be a string, got: {type(raw_content)}")
|
||||
raw_content = response.message.get_text_content()
|
||||
|
||||
try:
|
||||
parsed_content = json.loads(raw_content)
|
||||
|
||||
@@ -251,10 +251,7 @@ class AssistantPromptMessage(PromptMessage):
|
||||
|
||||
:return: True if prompt message is empty, False otherwise
|
||||
"""
|
||||
if not super().is_empty() and not self.tool_calls:
|
||||
return False
|
||||
|
||||
return True
|
||||
return super().is_empty() and not self.tool_calls
|
||||
|
||||
|
||||
class SystemPromptMessage(PromptMessage):
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import logging
|
||||
from collections.abc import Sequence
|
||||
|
||||
from opentelemetry.trace import SpanKind
|
||||
from sqlalchemy.orm import sessionmaker
|
||||
|
||||
from core.ops.aliyun_trace.data_exporter.traceclient import (
|
||||
@@ -54,7 +55,7 @@ from core.ops.entities.trace_entity import (
|
||||
ToolTraceInfo,
|
||||
WorkflowTraceInfo,
|
||||
)
|
||||
from core.repositories import SQLAlchemyWorkflowNodeExecutionRepository
|
||||
from core.repositories import DifyCoreRepositoryFactory
|
||||
from core.workflow.entities import WorkflowNodeExecution
|
||||
from core.workflow.enums import NodeType, WorkflowNodeExecutionMetadataKey
|
||||
from extensions.ext_database import db
|
||||
@@ -151,6 +152,7 @@ class AliyunDataTrace(BaseTraceInstance):
|
||||
),
|
||||
status=status,
|
||||
links=trace_metadata.links,
|
||||
span_kind=SpanKind.SERVER,
|
||||
)
|
||||
self.trace_client.add_span(message_span)
|
||||
|
||||
@@ -273,7 +275,7 @@ class AliyunDataTrace(BaseTraceInstance):
|
||||
service_account = self.get_service_account_with_tenant(app_id)
|
||||
|
||||
session_factory = sessionmaker(bind=db.engine)
|
||||
workflow_node_execution_repository = SQLAlchemyWorkflowNodeExecutionRepository(
|
||||
workflow_node_execution_repository = DifyCoreRepositoryFactory.create_workflow_node_execution_repository(
|
||||
session_factory=session_factory,
|
||||
user=service_account,
|
||||
app_id=app_id,
|
||||
@@ -456,6 +458,7 @@ class AliyunDataTrace(BaseTraceInstance):
|
||||
),
|
||||
status=status,
|
||||
links=trace_metadata.links,
|
||||
span_kind=SpanKind.SERVER,
|
||||
)
|
||||
self.trace_client.add_span(message_span)
|
||||
|
||||
@@ -475,6 +478,7 @@ class AliyunDataTrace(BaseTraceInstance):
|
||||
),
|
||||
status=status,
|
||||
links=trace_metadata.links,
|
||||
span_kind=SpanKind.SERVER if message_span_id is None else SpanKind.INTERNAL,
|
||||
)
|
||||
self.trace_client.add_span(workflow_span)
|
||||
|
||||
|
||||
@@ -166,7 +166,7 @@ class SpanBuilder:
|
||||
attributes=span_data.attributes,
|
||||
events=span_data.events,
|
||||
links=span_data.links,
|
||||
kind=trace_api.SpanKind.INTERNAL,
|
||||
kind=span_data.span_kind,
|
||||
status=span_data.status,
|
||||
start_time=span_data.start_time,
|
||||
end_time=span_data.end_time,
|
||||
|
||||
@@ -4,7 +4,7 @@ from typing import Any
|
||||
|
||||
from opentelemetry import trace as trace_api
|
||||
from opentelemetry.sdk.trace import Event
|
||||
from opentelemetry.trace import Status, StatusCode
|
||||
from opentelemetry.trace import SpanKind, Status, StatusCode
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
|
||||
@@ -34,3 +34,4 @@ class SpanData(BaseModel):
|
||||
status: Status = Field(default=Status(StatusCode.UNSET), description="The status of the span.")
|
||||
start_time: int | None = Field(..., description="The start time of the span in nanoseconds.")
|
||||
end_time: int | None = Field(..., description="The end time of the span in nanoseconds.")
|
||||
span_kind: SpanKind = Field(default=SpanKind.INTERNAL, description="The OpenTelemetry SpanKind for this span.")
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
from core.plugin.entities.endpoint import EndpointEntityWithInstance
|
||||
from core.plugin.impl.base import BasePluginClient
|
||||
from core.plugin.impl.exc import PluginDaemonInternalServerError
|
||||
|
||||
|
||||
class PluginEndpointClient(BasePluginClient):
|
||||
@@ -70,18 +71,27 @@ class PluginEndpointClient(BasePluginClient):
|
||||
def delete_endpoint(self, tenant_id: str, user_id: str, endpoint_id: str):
|
||||
"""
|
||||
Delete the given endpoint.
|
||||
|
||||
This operation is idempotent: if the endpoint is already deleted (record not found),
|
||||
it will return True instead of raising an error.
|
||||
"""
|
||||
return self._request_with_plugin_daemon_response(
|
||||
"POST",
|
||||
f"plugin/{tenant_id}/endpoint/remove",
|
||||
bool,
|
||||
data={
|
||||
"endpoint_id": endpoint_id,
|
||||
},
|
||||
headers={
|
||||
"Content-Type": "application/json",
|
||||
},
|
||||
)
|
||||
try:
|
||||
return self._request_with_plugin_daemon_response(
|
||||
"POST",
|
||||
f"plugin/{tenant_id}/endpoint/remove",
|
||||
bool,
|
||||
data={
|
||||
"endpoint_id": endpoint_id,
|
||||
},
|
||||
headers={
|
||||
"Content-Type": "application/json",
|
||||
},
|
||||
)
|
||||
except PluginDaemonInternalServerError as e:
|
||||
# Make delete idempotent: if record is not found, consider it a success
|
||||
if "record not found" in str(e.description).lower():
|
||||
return True
|
||||
raise
|
||||
|
||||
def enable_endpoint(self, tenant_id: str, user_id: str, endpoint_id: str):
|
||||
"""
|
||||
|
||||
@@ -7,8 +7,8 @@ from typing import Any, cast
|
||||
|
||||
from flask import has_request_context
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from core.db.session_factory import session_factory
|
||||
from core.file import FILE_MODEL_IDENTITY, File, FileTransferMethod
|
||||
from core.model_runtime.entities.llm_entities import LLMUsage, LLMUsageMetadata
|
||||
from core.tools.__base.tool import Tool
|
||||
@@ -20,7 +20,6 @@ from core.tools.entities.tool_entities import (
|
||||
ToolProviderType,
|
||||
)
|
||||
from core.tools.errors import ToolInvokeError
|
||||
from extensions.ext_database import db
|
||||
from factories.file_factory import build_from_mapping
|
||||
from libs.login import current_user
|
||||
from models import Account, Tenant
|
||||
@@ -230,30 +229,32 @@ class WorkflowTool(Tool):
|
||||
"""
|
||||
Resolve user from database (worker/Celery context).
|
||||
"""
|
||||
with session_factory.create_session() as session:
|
||||
tenant_stmt = select(Tenant).where(Tenant.id == self.runtime.tenant_id)
|
||||
tenant = session.scalar(tenant_stmt)
|
||||
if not tenant:
|
||||
return None
|
||||
|
||||
user_stmt = select(Account).where(Account.id == user_id)
|
||||
user = session.scalar(user_stmt)
|
||||
if user:
|
||||
user.current_tenant = tenant
|
||||
session.expunge(user)
|
||||
return user
|
||||
|
||||
end_user_stmt = select(EndUser).where(EndUser.id == user_id, EndUser.tenant_id == tenant.id)
|
||||
end_user = session.scalar(end_user_stmt)
|
||||
if end_user:
|
||||
session.expunge(end_user)
|
||||
return end_user
|
||||
|
||||
tenant_stmt = select(Tenant).where(Tenant.id == self.runtime.tenant_id)
|
||||
tenant = db.session.scalar(tenant_stmt)
|
||||
if not tenant:
|
||||
return None
|
||||
|
||||
user_stmt = select(Account).where(Account.id == user_id)
|
||||
user = db.session.scalar(user_stmt)
|
||||
if user:
|
||||
user.current_tenant = tenant
|
||||
return user
|
||||
|
||||
end_user_stmt = select(EndUser).where(EndUser.id == user_id, EndUser.tenant_id == tenant.id)
|
||||
end_user = db.session.scalar(end_user_stmt)
|
||||
if end_user:
|
||||
return end_user
|
||||
|
||||
return None
|
||||
|
||||
def _get_workflow(self, app_id: str, version: str) -> Workflow:
|
||||
"""
|
||||
get the workflow by app id and version
|
||||
"""
|
||||
with Session(db.engine, expire_on_commit=False) as session, session.begin():
|
||||
with session_factory.create_session() as session, session.begin():
|
||||
if not version:
|
||||
stmt = (
|
||||
select(Workflow)
|
||||
@@ -265,22 +266,24 @@ class WorkflowTool(Tool):
|
||||
stmt = select(Workflow).where(Workflow.app_id == app_id, Workflow.version == version)
|
||||
workflow = session.scalar(stmt)
|
||||
|
||||
if not workflow:
|
||||
raise ValueError("workflow not found or not published")
|
||||
if not workflow:
|
||||
raise ValueError("workflow not found or not published")
|
||||
|
||||
return workflow
|
||||
session.expunge(workflow)
|
||||
return workflow
|
||||
|
||||
def _get_app(self, app_id: str) -> App:
|
||||
"""
|
||||
get the app by app id
|
||||
"""
|
||||
stmt = select(App).where(App.id == app_id)
|
||||
with Session(db.engine, expire_on_commit=False) as session, session.begin():
|
||||
with session_factory.create_session() as session, session.begin():
|
||||
app = session.scalar(stmt)
|
||||
if not app:
|
||||
raise ValueError("app not found")
|
||||
if not app:
|
||||
raise ValueError("app not found")
|
||||
|
||||
return app
|
||||
session.expunge(app)
|
||||
return app
|
||||
|
||||
def _transform_args(self, tool_parameters: dict) -> tuple[dict, list[dict]]:
|
||||
"""
|
||||
|
||||
@@ -30,6 +30,7 @@ from .variables import (
|
||||
SecretVariable,
|
||||
StringVariable,
|
||||
Variable,
|
||||
VariableBase,
|
||||
)
|
||||
|
||||
__all__ = [
|
||||
@@ -62,4 +63,5 @@ __all__ = [
|
||||
"StringSegment",
|
||||
"StringVariable",
|
||||
"Variable",
|
||||
"VariableBase",
|
||||
]
|
||||
|
||||
@@ -232,7 +232,7 @@ def get_segment_discriminator(v: Any) -> SegmentType | None:
|
||||
# - All variants in `SegmentUnion` must inherit from the `Segment` class.
|
||||
# - The union must include all non-abstract subclasses of `Segment`, except:
|
||||
# - `SegmentGroup`, which is not added to the variable pool.
|
||||
# - `Variable` and its subclasses, which are handled by `VariableUnion`.
|
||||
# - `VariableBase` and its subclasses, which are handled by `Variable`.
|
||||
SegmentUnion: TypeAlias = Annotated[
|
||||
(
|
||||
Annotated[NoneSegment, Tag(SegmentType.NONE)]
|
||||
|
||||
@@ -27,7 +27,7 @@ from .segments import (
|
||||
from .types import SegmentType
|
||||
|
||||
|
||||
class Variable(Segment):
|
||||
class VariableBase(Segment):
|
||||
"""
|
||||
A variable is a segment that has a name.
|
||||
|
||||
@@ -45,23 +45,23 @@ class Variable(Segment):
|
||||
selector: Sequence[str] = Field(default_factory=list)
|
||||
|
||||
|
||||
class StringVariable(StringSegment, Variable):
|
||||
class StringVariable(StringSegment, VariableBase):
|
||||
pass
|
||||
|
||||
|
||||
class FloatVariable(FloatSegment, Variable):
|
||||
class FloatVariable(FloatSegment, VariableBase):
|
||||
pass
|
||||
|
||||
|
||||
class IntegerVariable(IntegerSegment, Variable):
|
||||
class IntegerVariable(IntegerSegment, VariableBase):
|
||||
pass
|
||||
|
||||
|
||||
class ObjectVariable(ObjectSegment, Variable):
|
||||
class ObjectVariable(ObjectSegment, VariableBase):
|
||||
pass
|
||||
|
||||
|
||||
class ArrayVariable(ArraySegment, Variable):
|
||||
class ArrayVariable(ArraySegment, VariableBase):
|
||||
pass
|
||||
|
||||
|
||||
@@ -89,16 +89,16 @@ class SecretVariable(StringVariable):
|
||||
return encrypter.obfuscated_token(self.value)
|
||||
|
||||
|
||||
class NoneVariable(NoneSegment, Variable):
|
||||
class NoneVariable(NoneSegment, VariableBase):
|
||||
value_type: SegmentType = SegmentType.NONE
|
||||
value: None = None
|
||||
|
||||
|
||||
class FileVariable(FileSegment, Variable):
|
||||
class FileVariable(FileSegment, VariableBase):
|
||||
pass
|
||||
|
||||
|
||||
class BooleanVariable(BooleanSegment, Variable):
|
||||
class BooleanVariable(BooleanSegment, VariableBase):
|
||||
pass
|
||||
|
||||
|
||||
@@ -139,13 +139,13 @@ class RAGPipelineVariableInput(BaseModel):
|
||||
value: Any
|
||||
|
||||
|
||||
# The `VariableUnion`` type is used to enable serialization and deserialization with Pydantic.
|
||||
# Use `Variable` for type hinting when serialization is not required.
|
||||
# The `Variable` type is used to enable serialization and deserialization with Pydantic.
|
||||
# Use `VariableBase` for type hinting when serialization is not required.
|
||||
#
|
||||
# Note:
|
||||
# - All variants in `VariableUnion` must inherit from the `Variable` class.
|
||||
# - The union must include all non-abstract subclasses of `Segment`, except:
|
||||
VariableUnion: TypeAlias = Annotated[
|
||||
# - All variants in `Variable` must inherit from the `VariableBase` class.
|
||||
# - The union must include all non-abstract subclasses of `VariableBase`.
|
||||
Variable: TypeAlias = Annotated[
|
||||
(
|
||||
Annotated[NoneVariable, Tag(SegmentType.NONE)]
|
||||
| Annotated[StringVariable, Tag(SegmentType.STRING)]
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
import abc
|
||||
from typing import Protocol
|
||||
|
||||
from core.variables import Variable
|
||||
from core.variables import VariableBase
|
||||
|
||||
|
||||
class ConversationVariableUpdater(Protocol):
|
||||
@@ -20,12 +20,12 @@ class ConversationVariableUpdater(Protocol):
|
||||
"""
|
||||
|
||||
@abc.abstractmethod
|
||||
def update(self, conversation_id: str, variable: "Variable"):
|
||||
def update(self, conversation_id: str, variable: "VariableBase"):
|
||||
"""
|
||||
Updates the value of the specified conversation variable in the underlying storage.
|
||||
|
||||
:param conversation_id: The ID of the conversation to update. Typically references `ConversationVariable.id`.
|
||||
:param variable: The `Variable` instance containing the updated value.
|
||||
:param variable: The `VariableBase` instance containing the updated value.
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
@@ -211,6 +211,10 @@ class WorkflowExecutionStatus(StrEnum):
|
||||
def is_ended(self) -> bool:
|
||||
return self in _END_STATE
|
||||
|
||||
@classmethod
|
||||
def ended_values(cls) -> list[str]:
|
||||
return [status.value for status in _END_STATE]
|
||||
|
||||
|
||||
_END_STATE = frozenset(
|
||||
[
|
||||
|
||||
@@ -11,7 +11,7 @@ from typing import Any
|
||||
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
from core.variables.variables import VariableUnion
|
||||
from core.variables.variables import Variable
|
||||
|
||||
|
||||
class CommandType(StrEnum):
|
||||
@@ -46,7 +46,7 @@ class PauseCommand(GraphEngineCommand):
|
||||
class VariableUpdate(BaseModel):
|
||||
"""Represents a single variable update instruction."""
|
||||
|
||||
value: VariableUnion = Field(description="New variable value")
|
||||
value: Variable = Field(description="New variable value")
|
||||
|
||||
|
||||
class UpdateVariablesCommand(GraphEngineCommand):
|
||||
|
||||
@@ -17,6 +17,7 @@ from core.helper import ssrf_proxy
|
||||
from core.variables.segments import ArrayFileSegment, FileSegment
|
||||
from core.workflow.runtime import VariablePool
|
||||
|
||||
from ..protocols import FileManagerProtocol, HttpClientProtocol
|
||||
from .entities import (
|
||||
HttpRequestNodeAuthorization,
|
||||
HttpRequestNodeData,
|
||||
@@ -78,6 +79,8 @@ class Executor:
|
||||
timeout: HttpRequestNodeTimeout,
|
||||
variable_pool: VariablePool,
|
||||
max_retries: int = dify_config.SSRF_DEFAULT_MAX_RETRIES,
|
||||
http_client: HttpClientProtocol = ssrf_proxy,
|
||||
file_manager: FileManagerProtocol = file_manager,
|
||||
):
|
||||
# If authorization API key is present, convert the API key using the variable pool
|
||||
if node_data.authorization.type == "api-key":
|
||||
@@ -104,6 +107,8 @@ class Executor:
|
||||
self.data = None
|
||||
self.json = None
|
||||
self.max_retries = max_retries
|
||||
self._http_client = http_client
|
||||
self._file_manager = file_manager
|
||||
|
||||
# init template
|
||||
self.variable_pool = variable_pool
|
||||
@@ -200,7 +205,7 @@ class Executor:
|
||||
if file_variable is None:
|
||||
raise FileFetchError(f"cannot fetch file with selector {file_selector}")
|
||||
file = file_variable.value
|
||||
self.content = file_manager.download(file)
|
||||
self.content = self._file_manager.download(file)
|
||||
case "x-www-form-urlencoded":
|
||||
form_data = {
|
||||
self.variable_pool.convert_template(item.key).text: self.variable_pool.convert_template(
|
||||
@@ -239,7 +244,7 @@ class Executor:
|
||||
):
|
||||
file_tuple = (
|
||||
file.filename,
|
||||
file_manager.download(file),
|
||||
self._file_manager.download(file),
|
||||
file.mime_type or "application/octet-stream",
|
||||
)
|
||||
if key not in files:
|
||||
@@ -332,19 +337,18 @@ class Executor:
|
||||
do http request depending on api bundle
|
||||
"""
|
||||
_METHOD_MAP = {
|
||||
"get": ssrf_proxy.get,
|
||||
"head": ssrf_proxy.head,
|
||||
"post": ssrf_proxy.post,
|
||||
"put": ssrf_proxy.put,
|
||||
"delete": ssrf_proxy.delete,
|
||||
"patch": ssrf_proxy.patch,
|
||||
"get": self._http_client.get,
|
||||
"head": self._http_client.head,
|
||||
"post": self._http_client.post,
|
||||
"put": self._http_client.put,
|
||||
"delete": self._http_client.delete,
|
||||
"patch": self._http_client.patch,
|
||||
}
|
||||
method_lc = self.method.lower()
|
||||
if method_lc not in _METHOD_MAP:
|
||||
raise InvalidHttpMethodError(f"Invalid http method {self.method}")
|
||||
|
||||
request_args = {
|
||||
"url": self.url,
|
||||
"data": self.data,
|
||||
"files": self.files,
|
||||
"json": self.json,
|
||||
@@ -357,8 +361,12 @@ class Executor:
|
||||
}
|
||||
# request_args = {k: v for k, v in request_args.items() if v is not None}
|
||||
try:
|
||||
response: httpx.Response = _METHOD_MAP[method_lc](**request_args, max_retries=self.max_retries)
|
||||
except (ssrf_proxy.MaxRetriesExceededError, httpx.RequestError) as e:
|
||||
response: httpx.Response = _METHOD_MAP[method_lc](
|
||||
url=self.url,
|
||||
**request_args,
|
||||
max_retries=self.max_retries,
|
||||
)
|
||||
except (self._http_client.max_retries_exceeded_error, self._http_client.request_error) as e:
|
||||
raise HttpRequestNodeError(str(e)) from e
|
||||
# FIXME: fix type ignore, this maybe httpx type issue
|
||||
return response
|
||||
|
||||
@@ -1,10 +1,11 @@
|
||||
import logging
|
||||
import mimetypes
|
||||
from collections.abc import Mapping, Sequence
|
||||
from typing import Any
|
||||
from collections.abc import Callable, Mapping, Sequence
|
||||
from typing import TYPE_CHECKING, Any
|
||||
|
||||
from configs import dify_config
|
||||
from core.file import File, FileTransferMethod
|
||||
from core.file import File, FileTransferMethod, file_manager
|
||||
from core.helper import ssrf_proxy
|
||||
from core.tools.tool_file_manager import ToolFileManager
|
||||
from core.variables.segments import ArrayFileSegment
|
||||
from core.workflow.enums import NodeType, WorkflowNodeExecutionStatus
|
||||
@@ -13,6 +14,7 @@ from core.workflow.nodes.base import variable_template_parser
|
||||
from core.workflow.nodes.base.entities import VariableSelector
|
||||
from core.workflow.nodes.base.node import Node
|
||||
from core.workflow.nodes.http_request.executor import Executor
|
||||
from core.workflow.nodes.protocols import FileManagerProtocol, HttpClientProtocol
|
||||
from factories import file_factory
|
||||
|
||||
from .entities import (
|
||||
@@ -30,10 +32,35 @@ HTTP_REQUEST_DEFAULT_TIMEOUT = HttpRequestNodeTimeout(
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from core.workflow.entities import GraphInitParams
|
||||
from core.workflow.runtime import GraphRuntimeState
|
||||
|
||||
|
||||
class HttpRequestNode(Node[HttpRequestNodeData]):
|
||||
node_type = NodeType.HTTP_REQUEST
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
id: str,
|
||||
config: Mapping[str, Any],
|
||||
graph_init_params: "GraphInitParams",
|
||||
graph_runtime_state: "GraphRuntimeState",
|
||||
*,
|
||||
http_client: HttpClientProtocol = ssrf_proxy,
|
||||
tool_file_manager_factory: Callable[[], ToolFileManager] = ToolFileManager,
|
||||
file_manager: FileManagerProtocol = file_manager,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
id=id,
|
||||
config=config,
|
||||
graph_init_params=graph_init_params,
|
||||
graph_runtime_state=graph_runtime_state,
|
||||
)
|
||||
self._http_client = http_client
|
||||
self._tool_file_manager_factory = tool_file_manager_factory
|
||||
self._file_manager = file_manager
|
||||
|
||||
@classmethod
|
||||
def get_default_config(cls, filters: Mapping[str, object] | None = None) -> Mapping[str, object]:
|
||||
return {
|
||||
@@ -71,6 +98,8 @@ class HttpRequestNode(Node[HttpRequestNodeData]):
|
||||
timeout=self._get_request_timeout(self.node_data),
|
||||
variable_pool=self.graph_runtime_state.variable_pool,
|
||||
max_retries=0,
|
||||
http_client=self._http_client,
|
||||
file_manager=self._file_manager,
|
||||
)
|
||||
process_data["request"] = http_executor.to_log()
|
||||
|
||||
@@ -199,7 +228,7 @@ class HttpRequestNode(Node[HttpRequestNodeData]):
|
||||
mime_type = (
|
||||
content_disposition_type or content_type or mimetypes.guess_type(filename)[0] or "application/octet-stream"
|
||||
)
|
||||
tool_file_manager = ToolFileManager()
|
||||
tool_file_manager = self._tool_file_manager_factory()
|
||||
|
||||
tool_file = tool_file_manager.create_file_by_raw(
|
||||
user_id=self.user_id,
|
||||
|
||||
@@ -11,7 +11,7 @@ from typing_extensions import TypeIs
|
||||
from core.model_runtime.entities.llm_entities import LLMUsage
|
||||
from core.variables import IntegerVariable, NoneSegment
|
||||
from core.variables.segments import ArrayAnySegment, ArraySegment
|
||||
from core.variables.variables import VariableUnion
|
||||
from core.variables.variables import Variable
|
||||
from core.workflow.constants import CONVERSATION_VARIABLE_NODE_ID
|
||||
from core.workflow.enums import (
|
||||
NodeExecutionType,
|
||||
@@ -240,7 +240,7 @@ class IterationNode(LLMUsageTrackingMixin, Node[IterationNodeData]):
|
||||
datetime,
|
||||
list[GraphNodeEventBase],
|
||||
object | None,
|
||||
dict[str, VariableUnion],
|
||||
dict[str, Variable],
|
||||
LLMUsage,
|
||||
]
|
||||
],
|
||||
@@ -308,7 +308,7 @@ class IterationNode(LLMUsageTrackingMixin, Node[IterationNodeData]):
|
||||
item: object,
|
||||
flask_app: Flask,
|
||||
context_vars: contextvars.Context,
|
||||
) -> tuple[datetime, list[GraphNodeEventBase], object | None, dict[str, VariableUnion], LLMUsage]:
|
||||
) -> tuple[datetime, list[GraphNodeEventBase], object | None, dict[str, Variable], LLMUsage]:
|
||||
"""Execute a single iteration in parallel mode and return results."""
|
||||
with preserve_flask_contexts(flask_app=flask_app, context_vars=context_vars):
|
||||
iter_start_at = datetime.now(UTC).replace(tzinfo=None)
|
||||
@@ -515,11 +515,11 @@ class IterationNode(LLMUsageTrackingMixin, Node[IterationNodeData]):
|
||||
|
||||
return variable_mapping
|
||||
|
||||
def _extract_conversation_variable_snapshot(self, *, variable_pool: VariablePool) -> dict[str, VariableUnion]:
|
||||
def _extract_conversation_variable_snapshot(self, *, variable_pool: VariablePool) -> dict[str, Variable]:
|
||||
conversation_variables = variable_pool.variable_dictionary.get(CONVERSATION_VARIABLE_NODE_ID, {})
|
||||
return {name: variable.model_copy(deep=True) for name, variable in conversation_variables.items()}
|
||||
|
||||
def _sync_conversation_variables_from_snapshot(self, snapshot: dict[str, VariableUnion]) -> None:
|
||||
def _sync_conversation_variables_from_snapshot(self, snapshot: dict[str, Variable]) -> None:
|
||||
parent_pool = self.graph_runtime_state.variable_pool
|
||||
parent_conversations = parent_pool.variable_dictionary.get(CONVERSATION_VARIABLE_NODE_ID, {})
|
||||
|
||||
|
||||
@@ -1,16 +1,21 @@
|
||||
from collections.abc import Sequence
|
||||
from collections.abc import Callable, Sequence
|
||||
from typing import TYPE_CHECKING, final
|
||||
|
||||
from typing_extensions import override
|
||||
|
||||
from configs import dify_config
|
||||
from core.file import file_manager
|
||||
from core.helper import ssrf_proxy
|
||||
from core.helper.code_executor.code_executor import CodeExecutor
|
||||
from core.helper.code_executor.code_node_provider import CodeNodeProvider
|
||||
from core.tools.tool_file_manager import ToolFileManager
|
||||
from core.workflow.enums import NodeType
|
||||
from core.workflow.graph import NodeFactory
|
||||
from core.workflow.nodes.base.node import Node
|
||||
from core.workflow.nodes.code.code_node import CodeNode
|
||||
from core.workflow.nodes.code.limits import CodeNodeLimits
|
||||
from core.workflow.nodes.http_request.node import HttpRequestNode
|
||||
from core.workflow.nodes.protocols import FileManagerProtocol, HttpClientProtocol
|
||||
from core.workflow.nodes.template_transform.template_renderer import (
|
||||
CodeExecutorJinja2TemplateRenderer,
|
||||
Jinja2TemplateRenderer,
|
||||
@@ -43,6 +48,9 @@ class DifyNodeFactory(NodeFactory):
|
||||
code_providers: Sequence[type[CodeNodeProvider]] | None = None,
|
||||
code_limits: CodeNodeLimits | None = None,
|
||||
template_renderer: Jinja2TemplateRenderer | None = None,
|
||||
http_request_http_client: HttpClientProtocol = ssrf_proxy,
|
||||
http_request_tool_file_manager_factory: Callable[[], ToolFileManager] = ToolFileManager,
|
||||
http_request_file_manager: FileManagerProtocol = file_manager,
|
||||
) -> None:
|
||||
self.graph_init_params = graph_init_params
|
||||
self.graph_runtime_state = graph_runtime_state
|
||||
@@ -61,6 +69,9 @@ class DifyNodeFactory(NodeFactory):
|
||||
max_object_array_length=dify_config.CODE_MAX_OBJECT_ARRAY_LENGTH,
|
||||
)
|
||||
self._template_renderer = template_renderer or CodeExecutorJinja2TemplateRenderer()
|
||||
self._http_request_http_client = http_request_http_client
|
||||
self._http_request_tool_file_manager_factory = http_request_tool_file_manager_factory
|
||||
self._http_request_file_manager = http_request_file_manager
|
||||
|
||||
@override
|
||||
def create_node(self, node_config: dict[str, object]) -> Node:
|
||||
@@ -113,6 +124,7 @@ class DifyNodeFactory(NodeFactory):
|
||||
code_providers=self._code_providers,
|
||||
code_limits=self._code_limits,
|
||||
)
|
||||
|
||||
if node_type == NodeType.TEMPLATE_TRANSFORM:
|
||||
return TemplateTransformNode(
|
||||
id=node_id,
|
||||
@@ -122,6 +134,17 @@ class DifyNodeFactory(NodeFactory):
|
||||
template_renderer=self._template_renderer,
|
||||
)
|
||||
|
||||
if node_type == NodeType.HTTP_REQUEST:
|
||||
return HttpRequestNode(
|
||||
id=node_id,
|
||||
config=node_config,
|
||||
graph_init_params=self.graph_init_params,
|
||||
graph_runtime_state=self.graph_runtime_state,
|
||||
http_client=self._http_request_http_client,
|
||||
tool_file_manager_factory=self._http_request_tool_file_manager_factory,
|
||||
file_manager=self._http_request_file_manager,
|
||||
)
|
||||
|
||||
return node_class(
|
||||
id=node_id,
|
||||
config=node_config,
|
||||
|
||||
29
api/core/workflow/nodes/protocols.py
Normal file
29
api/core/workflow/nodes/protocols.py
Normal file
@@ -0,0 +1,29 @@
|
||||
from typing import Protocol
|
||||
|
||||
import httpx
|
||||
|
||||
from core.file import File
|
||||
|
||||
|
||||
class HttpClientProtocol(Protocol):
|
||||
@property
|
||||
def max_retries_exceeded_error(self) -> type[Exception]: ...
|
||||
|
||||
@property
|
||||
def request_error(self) -> type[Exception]: ...
|
||||
|
||||
def get(self, url: str, max_retries: int = ..., **kwargs: object) -> httpx.Response: ...
|
||||
|
||||
def head(self, url: str, max_retries: int = ..., **kwargs: object) -> httpx.Response: ...
|
||||
|
||||
def post(self, url: str, max_retries: int = ..., **kwargs: object) -> httpx.Response: ...
|
||||
|
||||
def put(self, url: str, max_retries: int = ..., **kwargs: object) -> httpx.Response: ...
|
||||
|
||||
def delete(self, url: str, max_retries: int = ..., **kwargs: object) -> httpx.Response: ...
|
||||
|
||||
def patch(self, url: str, max_retries: int = ..., **kwargs: object) -> httpx.Response: ...
|
||||
|
||||
|
||||
class FileManagerProtocol(Protocol):
|
||||
def download(self, f: File, /) -> bytes: ...
|
||||
@@ -1,7 +1,7 @@
|
||||
from collections.abc import Mapping, Sequence
|
||||
from typing import TYPE_CHECKING, Any
|
||||
|
||||
from core.variables import SegmentType, Variable
|
||||
from core.variables import SegmentType, VariableBase
|
||||
from core.workflow.constants import CONVERSATION_VARIABLE_NODE_ID
|
||||
from core.workflow.entities import GraphInitParams
|
||||
from core.workflow.enums import NodeType, WorkflowNodeExecutionStatus
|
||||
@@ -33,6 +33,15 @@ class VariableAssignerNode(Node[VariableAssignerData]):
|
||||
graph_runtime_state=graph_runtime_state,
|
||||
)
|
||||
|
||||
def blocks_variable_output(self, variable_selectors: set[tuple[str, ...]]) -> bool:
|
||||
"""
|
||||
Check if this Variable Assigner node blocks the output of specific variables.
|
||||
|
||||
Returns True if this node updates any of the requested conversation variables.
|
||||
"""
|
||||
assigned_selector = tuple(self.node_data.assigned_variable_selector)
|
||||
return assigned_selector in variable_selectors
|
||||
|
||||
@classmethod
|
||||
def version(cls) -> str:
|
||||
return "1"
|
||||
@@ -64,7 +73,7 @@ class VariableAssignerNode(Node[VariableAssignerData]):
|
||||
assigned_variable_selector = self.node_data.assigned_variable_selector
|
||||
# Should be String, Number, Object, ArrayString, ArrayNumber, ArrayObject
|
||||
original_variable = self.graph_runtime_state.variable_pool.get(assigned_variable_selector)
|
||||
if not isinstance(original_variable, Variable):
|
||||
if not isinstance(original_variable, VariableBase):
|
||||
raise VariableOperatorNodeError("assigned variable not found")
|
||||
|
||||
match self.node_data.write_mode:
|
||||
|
||||
@@ -2,7 +2,7 @@ import json
|
||||
from collections.abc import Mapping, MutableMapping, Sequence
|
||||
from typing import TYPE_CHECKING, Any
|
||||
|
||||
from core.variables import SegmentType, Variable
|
||||
from core.variables import SegmentType, VariableBase
|
||||
from core.variables.consts import SELECTORS_LENGTH
|
||||
from core.workflow.constants import CONVERSATION_VARIABLE_NODE_ID
|
||||
from core.workflow.enums import NodeType, WorkflowNodeExecutionStatus
|
||||
@@ -118,7 +118,7 @@ class VariableAssignerNode(Node[VariableAssignerNodeData]):
|
||||
# ==================== Validation Part
|
||||
|
||||
# Check if variable exists
|
||||
if not isinstance(variable, Variable):
|
||||
if not isinstance(variable, VariableBase):
|
||||
raise VariableNotFoundError(variable_selector=item.variable_selector)
|
||||
|
||||
# Check if operation is supported
|
||||
@@ -192,7 +192,7 @@ class VariableAssignerNode(Node[VariableAssignerNodeData]):
|
||||
|
||||
for selector in updated_variable_selectors:
|
||||
variable = self.graph_runtime_state.variable_pool.get(selector)
|
||||
if not isinstance(variable, Variable):
|
||||
if not isinstance(variable, VariableBase):
|
||||
raise VariableNotFoundError(variable_selector=selector)
|
||||
process_data[variable.name] = variable.value
|
||||
|
||||
@@ -213,7 +213,7 @@ class VariableAssignerNode(Node[VariableAssignerNodeData]):
|
||||
def _handle_item(
|
||||
self,
|
||||
*,
|
||||
variable: Variable,
|
||||
variable: VariableBase,
|
||||
operation: Operation,
|
||||
value: Any,
|
||||
):
|
||||
|
||||
@@ -9,10 +9,10 @@ from typing import Annotated, Any, Union, cast
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
from core.file import File, FileAttribute, file_manager
|
||||
from core.variables import Segment, SegmentGroup, Variable
|
||||
from core.variables import Segment, SegmentGroup, VariableBase
|
||||
from core.variables.consts import SELECTORS_LENGTH
|
||||
from core.variables.segments import FileSegment, ObjectSegment
|
||||
from core.variables.variables import RAGPipelineVariableInput, VariableUnion
|
||||
from core.variables.variables import RAGPipelineVariableInput, Variable
|
||||
from core.workflow.constants import (
|
||||
CONVERSATION_VARIABLE_NODE_ID,
|
||||
ENVIRONMENT_VARIABLE_NODE_ID,
|
||||
@@ -32,7 +32,7 @@ class VariablePool(BaseModel):
|
||||
# The first element of the selector is the node id, it's the first-level key in the dictionary.
|
||||
# Other elements of the selector are the keys in the second-level dictionary. To get the key, we hash the
|
||||
# elements of the selector except the first one.
|
||||
variable_dictionary: defaultdict[str, Annotated[dict[str, VariableUnion], Field(default_factory=dict)]] = Field(
|
||||
variable_dictionary: defaultdict[str, Annotated[dict[str, Variable], Field(default_factory=dict)]] = Field(
|
||||
description="Variables mapping",
|
||||
default=defaultdict(dict),
|
||||
)
|
||||
@@ -46,13 +46,13 @@ class VariablePool(BaseModel):
|
||||
description="System variables",
|
||||
default_factory=SystemVariable.empty,
|
||||
)
|
||||
environment_variables: Sequence[VariableUnion] = Field(
|
||||
environment_variables: Sequence[Variable] = Field(
|
||||
description="Environment variables.",
|
||||
default_factory=list[VariableUnion],
|
||||
default_factory=list[Variable],
|
||||
)
|
||||
conversation_variables: Sequence[VariableUnion] = Field(
|
||||
conversation_variables: Sequence[Variable] = Field(
|
||||
description="Conversation variables.",
|
||||
default_factory=list[VariableUnion],
|
||||
default_factory=list[Variable],
|
||||
)
|
||||
rag_pipeline_variables: list[RAGPipelineVariableInput] = Field(
|
||||
description="RAG pipeline variables.",
|
||||
@@ -105,7 +105,7 @@ class VariablePool(BaseModel):
|
||||
f"got {len(selector)} elements"
|
||||
)
|
||||
|
||||
if isinstance(value, Variable):
|
||||
if isinstance(value, VariableBase):
|
||||
variable = value
|
||||
elif isinstance(value, Segment):
|
||||
variable = variable_factory.segment_to_variable(segment=value, selector=selector)
|
||||
@@ -114,9 +114,9 @@ class VariablePool(BaseModel):
|
||||
variable = variable_factory.segment_to_variable(segment=segment, selector=selector)
|
||||
|
||||
node_id, name = self._selector_to_keys(selector)
|
||||
# Based on the definition of `VariableUnion`,
|
||||
# `list[Variable]` can be safely used as `list[VariableUnion]` since they are compatible.
|
||||
self.variable_dictionary[node_id][name] = cast(VariableUnion, variable)
|
||||
# Based on the definition of `Variable`,
|
||||
# `VariableBase` instances can be safely used as `Variable` since they are compatible.
|
||||
self.variable_dictionary[node_id][name] = cast(Variable, variable)
|
||||
|
||||
@classmethod
|
||||
def _selector_to_keys(cls, selector: Sequence[str]) -> tuple[str, str]:
|
||||
|
||||
@@ -2,7 +2,7 @@ import abc
|
||||
from collections.abc import Mapping, Sequence
|
||||
from typing import Any, Protocol
|
||||
|
||||
from core.variables import Variable
|
||||
from core.variables import VariableBase
|
||||
from core.variables.consts import SELECTORS_LENGTH
|
||||
from core.workflow.runtime import VariablePool
|
||||
|
||||
@@ -26,7 +26,7 @@ class VariableLoader(Protocol):
|
||||
"""
|
||||
|
||||
@abc.abstractmethod
|
||||
def load_variables(self, selectors: list[list[str]]) -> list[Variable]:
|
||||
def load_variables(self, selectors: list[list[str]]) -> list[VariableBase]:
|
||||
"""Load variables based on the provided selectors. If the selectors are empty,
|
||||
this method should return an empty list.
|
||||
|
||||
@@ -36,7 +36,7 @@ class VariableLoader(Protocol):
|
||||
:param: selectors: a list of string list, each inner list should have at least two elements:
|
||||
- the first element is the node ID,
|
||||
- the second element is the variable name.
|
||||
:return: a list of Variable objects that match the provided selectors.
|
||||
:return: a list of VariableBase objects that match the provided selectors.
|
||||
"""
|
||||
pass
|
||||
|
||||
@@ -46,7 +46,7 @@ class _DummyVariableLoader(VariableLoader):
|
||||
Serves as a placeholder when no variable loading is needed.
|
||||
"""
|
||||
|
||||
def load_variables(self, selectors: list[list[str]]) -> list[Variable]:
|
||||
def load_variables(self, selectors: list[list[str]]) -> list[VariableBase]:
|
||||
return []
|
||||
|
||||
|
||||
|
||||
@@ -19,6 +19,7 @@ from core.workflow.graph_engine.protocols.command_channel import CommandChannel
|
||||
from core.workflow.graph_events import GraphEngineEvent, GraphNodeEventBase, GraphRunFailedEvent
|
||||
from core.workflow.nodes import NodeType
|
||||
from core.workflow.nodes.base.node import Node
|
||||
from core.workflow.nodes.node_factory import DifyNodeFactory
|
||||
from core.workflow.nodes.node_mapping import NODE_TYPE_CLASSES_MAPPING
|
||||
from core.workflow.runtime import GraphRuntimeState, VariablePool
|
||||
from core.workflow.system_variable import SystemVariable
|
||||
@@ -136,13 +137,11 @@ class WorkflowEntry:
|
||||
:param user_inputs: user inputs
|
||||
:return:
|
||||
"""
|
||||
node_config = workflow.get_node_config_by_id(node_id)
|
||||
node_config = dict(workflow.get_node_config_by_id(node_id))
|
||||
node_config_data = node_config.get("data", {})
|
||||
|
||||
# Get node class
|
||||
# Get node type
|
||||
node_type = NodeType(node_config_data.get("type"))
|
||||
node_version = node_config_data.get("version", "1")
|
||||
node_cls = NODE_TYPE_CLASSES_MAPPING[node_type][node_version]
|
||||
|
||||
# init graph init params and runtime state
|
||||
graph_init_params = GraphInitParams(
|
||||
@@ -158,12 +157,12 @@ class WorkflowEntry:
|
||||
graph_runtime_state = GraphRuntimeState(variable_pool=variable_pool, start_at=time.perf_counter())
|
||||
|
||||
# init workflow run state
|
||||
node = node_cls(
|
||||
id=str(uuid.uuid4()),
|
||||
config=node_config,
|
||||
node_factory = DifyNodeFactory(
|
||||
graph_init_params=graph_init_params,
|
||||
graph_runtime_state=graph_runtime_state,
|
||||
)
|
||||
node = node_factory.create_node(node_config)
|
||||
node_cls = type(node)
|
||||
|
||||
try:
|
||||
# variable selector to variable mapping
|
||||
@@ -190,8 +189,7 @@ class WorkflowEntry:
|
||||
)
|
||||
|
||||
try:
|
||||
# run node
|
||||
generator = node.run()
|
||||
generator = cls._traced_node_run(node)
|
||||
except Exception as e:
|
||||
logger.exception(
|
||||
"error while running node, workflow_id=%s, node_id=%s, node_type=%s, node_version=%s",
|
||||
@@ -324,8 +322,7 @@ class WorkflowEntry:
|
||||
tenant_id=tenant_id,
|
||||
)
|
||||
|
||||
# run node
|
||||
generator = node.run()
|
||||
generator = cls._traced_node_run(node)
|
||||
|
||||
return node, generator
|
||||
except Exception as e:
|
||||
@@ -431,3 +428,26 @@ class WorkflowEntry:
|
||||
input_value = current_variable.value | input_value
|
||||
|
||||
variable_pool.add([variable_node_id] + variable_key_list, input_value)
|
||||
|
||||
@staticmethod
|
||||
def _traced_node_run(node: Node) -> Generator[GraphNodeEventBase, None, None]:
|
||||
"""
|
||||
Wraps a node's run method with OpenTelemetry tracing and returns a generator.
|
||||
"""
|
||||
# Wrap node.run() with ObservabilityLayer hooks to produce node-level spans
|
||||
layer = ObservabilityLayer()
|
||||
layer.on_graph_start()
|
||||
node.ensure_execution_id()
|
||||
|
||||
def _gen():
|
||||
error: Exception | None = None
|
||||
layer.on_node_run_start(node)
|
||||
try:
|
||||
yield from node.run()
|
||||
except Exception as exc:
|
||||
error = exc
|
||||
raise
|
||||
finally:
|
||||
layer.on_node_run_end(node, error)
|
||||
|
||||
return _gen()
|
||||
|
||||
@@ -6,6 +6,7 @@ from .create_site_record_when_app_created import handle as handle_create_site_re
|
||||
from .delete_tool_parameters_cache_when_sync_draft_workflow import (
|
||||
handle as handle_delete_tool_parameters_cache_when_sync_draft_workflow,
|
||||
)
|
||||
from .queue_credential_sync_when_tenant_created import handle as handle_queue_credential_sync_when_tenant_created
|
||||
from .sync_plugin_trigger_when_app_created import handle as handle_sync_plugin_trigger_when_app_created
|
||||
from .sync_webhook_when_app_created import handle as handle_sync_webhook_when_app_created
|
||||
from .sync_workflow_schedule_when_app_published import handle as handle_sync_workflow_schedule_when_app_published
|
||||
@@ -30,6 +31,7 @@ __all__ = [
|
||||
"handle_create_installed_app_when_app_created",
|
||||
"handle_create_site_record_when_app_created",
|
||||
"handle_delete_tool_parameters_cache_when_sync_draft_workflow",
|
||||
"handle_queue_credential_sync_when_tenant_created",
|
||||
"handle_sync_plugin_trigger_when_app_created",
|
||||
"handle_sync_webhook_when_app_created",
|
||||
"handle_sync_workflow_schedule_when_app_published",
|
||||
|
||||
@@ -0,0 +1,19 @@
|
||||
from configs import dify_config
|
||||
from events.tenant_event import tenant_was_created
|
||||
from services.enterprise.workspace_sync import WorkspaceSyncService
|
||||
|
||||
|
||||
@tenant_was_created.connect
|
||||
def handle(sender, **kwargs):
|
||||
"""Queue credential sync when a tenant/workspace is created."""
|
||||
# Only queue sync tasks if plugin manager (enterprise feature) is enabled
|
||||
if not dify_config.ENTERPRISE_ENABLED:
|
||||
return
|
||||
|
||||
tenant = sender
|
||||
|
||||
# Determine source from kwargs if available, otherwise use generic
|
||||
source = kwargs.get("source", "tenant_created")
|
||||
|
||||
# Queue credential sync task to Redis for enterprise backend to process
|
||||
WorkspaceSyncService.queue_credential_sync(tenant.id, source=source)
|
||||
@@ -163,6 +163,13 @@ def init_app(app: DifyApp) -> Celery:
|
||||
"task": "schedule.clean_workflow_runlogs_precise.clean_workflow_runlogs_precise",
|
||||
"schedule": crontab(minute="0", hour="2"),
|
||||
}
|
||||
if dify_config.ENABLE_WORKFLOW_RUN_CLEANUP_TASK:
|
||||
# for saas only
|
||||
imports.append("schedule.clean_workflow_runs_task")
|
||||
beat_schedule["clean_workflow_runs_task"] = {
|
||||
"task": "schedule.clean_workflow_runs_task.clean_workflow_runs_task",
|
||||
"schedule": crontab(minute="0", hour="0"),
|
||||
}
|
||||
if dify_config.ENABLE_WORKFLOW_SCHEDULE_POLLER_TASK:
|
||||
imports.append("schedule.workflow_schedule_task")
|
||||
beat_schedule["workflow_schedule_task"] = {
|
||||
|
||||
@@ -4,6 +4,8 @@ from dify_app import DifyApp
|
||||
def init_app(app: DifyApp):
|
||||
from commands import (
|
||||
add_qdrant_index,
|
||||
clean_expired_messages,
|
||||
clean_workflow_runs,
|
||||
cleanup_orphaned_draft_variables,
|
||||
clear_free_plan_tenant_expired_logs,
|
||||
clear_orphaned_file_records,
|
||||
@@ -56,6 +58,8 @@ def init_app(app: DifyApp):
|
||||
setup_datasource_oauth_client,
|
||||
transform_datasource_credentials,
|
||||
install_rag_pipeline_plugins,
|
||||
clean_workflow_runs,
|
||||
clean_expired_messages,
|
||||
]
|
||||
for cmd in cmds_to_register:
|
||||
app.cli.add_command(cmd)
|
||||
|
||||
@@ -10,6 +10,7 @@ import os
|
||||
|
||||
from dotenv import load_dotenv
|
||||
|
||||
from configs import dify_config
|
||||
from dify_app import DifyApp
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -19,12 +20,17 @@ def is_enabled() -> bool:
|
||||
"""
|
||||
Check if logstore extension is enabled.
|
||||
|
||||
Logstore is considered enabled when:
|
||||
1. All required Aliyun SLS environment variables are set
|
||||
2. At least one repository configuration points to a logstore implementation
|
||||
|
||||
Returns:
|
||||
True if all required Aliyun SLS environment variables are set, False otherwise
|
||||
True if logstore should be initialized, False otherwise
|
||||
"""
|
||||
# Load environment variables from .env file
|
||||
load_dotenv()
|
||||
|
||||
# Check if Aliyun SLS connection parameters are configured
|
||||
required_vars = [
|
||||
"ALIYUN_SLS_ACCESS_KEY_ID",
|
||||
"ALIYUN_SLS_ACCESS_KEY_SECRET",
|
||||
@@ -33,24 +39,32 @@ def is_enabled() -> bool:
|
||||
"ALIYUN_SLS_PROJECT_NAME",
|
||||
]
|
||||
|
||||
all_set = all(os.environ.get(var) for var in required_vars)
|
||||
sls_vars_set = all(os.environ.get(var) for var in required_vars)
|
||||
|
||||
if not all_set:
|
||||
logger.info("Logstore extension disabled: required Aliyun SLS environment variables not set")
|
||||
if not sls_vars_set:
|
||||
return False
|
||||
|
||||
return all_set
|
||||
# Check if any repository configuration points to logstore implementation
|
||||
repository_configs = [
|
||||
dify_config.CORE_WORKFLOW_EXECUTION_REPOSITORY,
|
||||
dify_config.CORE_WORKFLOW_NODE_EXECUTION_REPOSITORY,
|
||||
dify_config.API_WORKFLOW_NODE_EXECUTION_REPOSITORY,
|
||||
dify_config.API_WORKFLOW_RUN_REPOSITORY,
|
||||
]
|
||||
|
||||
uses_logstore = any("logstore" in config.lower() for config in repository_configs)
|
||||
|
||||
if not uses_logstore:
|
||||
return False
|
||||
|
||||
logger.info("Logstore extension enabled: SLS variables set and repository configured to use logstore")
|
||||
return True
|
||||
|
||||
|
||||
def init_app(app: DifyApp):
|
||||
"""
|
||||
Initialize logstore on application startup.
|
||||
|
||||
This function:
|
||||
1. Creates Aliyun SLS project if it doesn't exist
|
||||
2. Creates logstores (workflow_execution, workflow_node_execution) if they don't exist
|
||||
3. Creates indexes with field configurations based on PostgreSQL table structures
|
||||
|
||||
This operation is idempotent and only executes once during application startup.
|
||||
If initialization fails, the application continues running without logstore features.
|
||||
|
||||
Args:
|
||||
app: The Dify application instance
|
||||
@@ -58,17 +72,23 @@ def init_app(app: DifyApp):
|
||||
try:
|
||||
from extensions.logstore.aliyun_logstore import AliyunLogStore
|
||||
|
||||
logger.info("Initializing logstore...")
|
||||
logger.info("Initializing Aliyun SLS Logstore...")
|
||||
|
||||
# Create logstore client and initialize project/logstores/indexes
|
||||
# Create logstore client and initialize resources
|
||||
logstore_client = AliyunLogStore()
|
||||
logstore_client.init_project_logstore()
|
||||
|
||||
# Attach to app for potential later use
|
||||
app.extensions["logstore"] = logstore_client
|
||||
|
||||
logger.info("Logstore initialized successfully")
|
||||
|
||||
except Exception:
|
||||
logger.exception("Failed to initialize logstore")
|
||||
# Don't raise - allow application to continue even if logstore init fails
|
||||
# This ensures that the application can still run if logstore is misconfigured
|
||||
logger.exception(
|
||||
"Logstore initialization failed. Configuration: endpoint=%s, region=%s, project=%s, timeout=%ss. "
|
||||
"Application will continue but logstore features will NOT work.",
|
||||
os.environ.get("ALIYUN_SLS_ENDPOINT"),
|
||||
os.environ.get("ALIYUN_SLS_REGION"),
|
||||
os.environ.get("ALIYUN_SLS_PROJECT_NAME"),
|
||||
os.environ.get("ALIYUN_SLS_CHECK_CONNECTIVITY_TIMEOUT", "30"),
|
||||
)
|
||||
# Don't raise - allow application to continue even if logstore setup fails
|
||||
|
||||
@@ -2,6 +2,7 @@ from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import os
|
||||
import socket
|
||||
import threading
|
||||
import time
|
||||
from collections.abc import Sequence
|
||||
@@ -179,9 +180,18 @@ class AliyunLogStore:
|
||||
self.region: str = os.environ.get("ALIYUN_SLS_REGION", "")
|
||||
self.project_name: str = os.environ.get("ALIYUN_SLS_PROJECT_NAME", "")
|
||||
self.logstore_ttl: int = int(os.environ.get("ALIYUN_SLS_LOGSTORE_TTL", 365))
|
||||
self.log_enabled: bool = os.environ.get("SQLALCHEMY_ECHO", "false").lower() == "true"
|
||||
self.log_enabled: bool = (
|
||||
os.environ.get("SQLALCHEMY_ECHO", "false").lower() == "true"
|
||||
or os.environ.get("LOGSTORE_SQL_ECHO", "false").lower() == "true"
|
||||
)
|
||||
self.pg_mode_enabled: bool = os.environ.get("LOGSTORE_PG_MODE_ENABLED", "true").lower() == "true"
|
||||
|
||||
# Get timeout configuration
|
||||
check_timeout = int(os.environ.get("ALIYUN_SLS_CHECK_CONNECTIVITY_TIMEOUT", 30))
|
||||
|
||||
# Pre-check endpoint connectivity to prevent indefinite hangs
|
||||
self._check_endpoint_connectivity(self.endpoint, check_timeout)
|
||||
|
||||
# Initialize SDK client
|
||||
self.client = LogClient(
|
||||
self.endpoint, self.access_key_id, self.access_key_secret, auth_version=AUTH_VERSION_4, region=self.region
|
||||
@@ -199,6 +209,49 @@ class AliyunLogStore:
|
||||
|
||||
self.__class__._initialized = True
|
||||
|
||||
@staticmethod
|
||||
def _check_endpoint_connectivity(endpoint: str, timeout: int) -> None:
|
||||
"""
|
||||
Check if the SLS endpoint is reachable before creating LogClient.
|
||||
Prevents indefinite hangs when the endpoint is unreachable.
|
||||
|
||||
Args:
|
||||
endpoint: SLS endpoint URL
|
||||
timeout: Connection timeout in seconds
|
||||
|
||||
Raises:
|
||||
ConnectionError: If endpoint is not reachable
|
||||
"""
|
||||
# Parse endpoint URL to extract hostname and port
|
||||
from urllib.parse import urlparse
|
||||
|
||||
parsed_url = urlparse(endpoint if "://" in endpoint else f"http://{endpoint}")
|
||||
hostname = parsed_url.hostname
|
||||
port = parsed_url.port or (443 if parsed_url.scheme == "https" else 80)
|
||||
|
||||
if not hostname:
|
||||
raise ConnectionError(f"Invalid endpoint URL: {endpoint}")
|
||||
|
||||
sock = None
|
||||
try:
|
||||
# Create socket and set timeout
|
||||
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
sock.settimeout(timeout)
|
||||
sock.connect((hostname, port))
|
||||
except Exception as e:
|
||||
# Catch all exceptions and provide clear error message
|
||||
error_type = type(e).__name__
|
||||
raise ConnectionError(
|
||||
f"Cannot connect to {hostname}:{port} (timeout={timeout}s): [{error_type}] {e}"
|
||||
) from e
|
||||
finally:
|
||||
# Ensure socket is properly closed
|
||||
if sock:
|
||||
try:
|
||||
sock.close()
|
||||
except Exception: # noqa: S110
|
||||
pass # Ignore errors during cleanup
|
||||
|
||||
@property
|
||||
def supports_pg_protocol(self) -> bool:
|
||||
"""Check if PG protocol is supported and enabled."""
|
||||
@@ -220,19 +273,16 @@ class AliyunLogStore:
|
||||
try:
|
||||
self._use_pg_protocol = self._pg_client.init_connection()
|
||||
if self._use_pg_protocol:
|
||||
logger.info("Successfully connected to project %s using PG protocol", self.project_name)
|
||||
logger.info("Using PG protocol for project %s", self.project_name)
|
||||
# Check if scan_index is enabled for all logstores
|
||||
self._check_and_disable_pg_if_scan_index_disabled()
|
||||
return True
|
||||
else:
|
||||
logger.info("PG connection failed for project %s. Will use SDK mode.", self.project_name)
|
||||
logger.info("Using SDK mode for project %s", self.project_name)
|
||||
return False
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
"Failed to establish PG connection for project %s: %s. Will use SDK mode.",
|
||||
self.project_name,
|
||||
str(e),
|
||||
)
|
||||
logger.info("Using SDK mode for project %s", self.project_name)
|
||||
logger.debug("PG connection details: %s", str(e))
|
||||
self._use_pg_protocol = False
|
||||
return False
|
||||
|
||||
@@ -246,10 +296,6 @@ class AliyunLogStore:
|
||||
if self._use_pg_protocol:
|
||||
return
|
||||
|
||||
logger.info(
|
||||
"Attempting delayed PG connection for newly created project %s ...",
|
||||
self.project_name,
|
||||
)
|
||||
self._attempt_pg_connection_init()
|
||||
self.__class__._pg_connection_timer = None
|
||||
|
||||
@@ -284,11 +330,7 @@ class AliyunLogStore:
|
||||
if project_is_new:
|
||||
# For newly created projects, schedule delayed PG connection
|
||||
self._use_pg_protocol = False
|
||||
logger.info(
|
||||
"Project %s is newly created. Will use SDK mode and schedule PG connection attempt in %d seconds.",
|
||||
self.project_name,
|
||||
self.__class__._pg_connection_delay,
|
||||
)
|
||||
logger.info("Using SDK mode for project %s (newly created)", self.project_name)
|
||||
if self.__class__._pg_connection_timer is not None:
|
||||
self.__class__._pg_connection_timer.cancel()
|
||||
self.__class__._pg_connection_timer = threading.Timer(
|
||||
@@ -299,7 +341,6 @@ class AliyunLogStore:
|
||||
self.__class__._pg_connection_timer.start()
|
||||
else:
|
||||
# For existing projects, attempt PG connection immediately
|
||||
logger.info("Project %s already exists. Attempting PG connection...", self.project_name)
|
||||
self._attempt_pg_connection_init()
|
||||
|
||||
def _check_and_disable_pg_if_scan_index_disabled(self) -> None:
|
||||
@@ -318,9 +359,9 @@ class AliyunLogStore:
|
||||
existing_config = self.get_existing_index_config(logstore_name)
|
||||
if existing_config and not existing_config.scan_index:
|
||||
logger.info(
|
||||
"Logstore %s has scan_index=false, USE SDK mode for read/write operations. "
|
||||
"PG protocol requires scan_index to be enabled.",
|
||||
"Logstore %s requires scan_index enabled, using SDK mode for project %s",
|
||||
logstore_name,
|
||||
self.project_name,
|
||||
)
|
||||
self._use_pg_protocol = False
|
||||
# Close PG connection if it was initialized
|
||||
@@ -748,7 +789,6 @@ class AliyunLogStore:
|
||||
reverse=reverse,
|
||||
)
|
||||
|
||||
# Log query info if SQLALCHEMY_ECHO is enabled
|
||||
if self.log_enabled:
|
||||
logger.info(
|
||||
"[LogStore] GET_LOGS | logstore=%s | project=%s | query=%s | "
|
||||
@@ -770,7 +810,6 @@ class AliyunLogStore:
|
||||
for log in logs:
|
||||
result.append(log.get_contents())
|
||||
|
||||
# Log result count if SQLALCHEMY_ECHO is enabled
|
||||
if self.log_enabled:
|
||||
logger.info(
|
||||
"[LogStore] GET_LOGS RESULT | logstore=%s | returned_count=%d",
|
||||
@@ -845,7 +884,6 @@ class AliyunLogStore:
|
||||
query=full_query,
|
||||
)
|
||||
|
||||
# Log query info if SQLALCHEMY_ECHO is enabled
|
||||
if self.log_enabled:
|
||||
logger.info(
|
||||
"[LogStore-SDK] EXECUTE_SQL | logstore=%s | project=%s | from_time=%d | to_time=%d | full_query=%s",
|
||||
@@ -853,8 +891,7 @@ class AliyunLogStore:
|
||||
self.project_name,
|
||||
from_time,
|
||||
to_time,
|
||||
query,
|
||||
sql,
|
||||
full_query,
|
||||
)
|
||||
|
||||
try:
|
||||
@@ -865,7 +902,6 @@ class AliyunLogStore:
|
||||
for log in logs:
|
||||
result.append(log.get_contents())
|
||||
|
||||
# Log result count if SQLALCHEMY_ECHO is enabled
|
||||
if self.log_enabled:
|
||||
logger.info(
|
||||
"[LogStore-SDK] EXECUTE_SQL RESULT | logstore=%s | returned_count=%d",
|
||||
|
||||
@@ -7,8 +7,7 @@ from contextlib import contextmanager
|
||||
from typing import Any
|
||||
|
||||
import psycopg2
|
||||
import psycopg2.pool
|
||||
from psycopg2 import InterfaceError, OperationalError
|
||||
from sqlalchemy import create_engine
|
||||
|
||||
from configs import dify_config
|
||||
|
||||
@@ -16,11 +15,7 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class AliyunLogStorePG:
|
||||
"""
|
||||
PostgreSQL protocol support for Aliyun SLS LogStore.
|
||||
|
||||
Handles PG connection pooling and operations for regions that support PG protocol.
|
||||
"""
|
||||
"""PostgreSQL protocol support for Aliyun SLS LogStore using SQLAlchemy connection pool."""
|
||||
|
||||
def __init__(self, access_key_id: str, access_key_secret: str, endpoint: str, project_name: str):
|
||||
"""
|
||||
@@ -36,24 +31,11 @@ class AliyunLogStorePG:
|
||||
self._access_key_secret = access_key_secret
|
||||
self._endpoint = endpoint
|
||||
self.project_name = project_name
|
||||
self._pg_pool: psycopg2.pool.SimpleConnectionPool | None = None
|
||||
self._engine: Any = None # SQLAlchemy Engine
|
||||
self._use_pg_protocol = False
|
||||
|
||||
def _check_port_connectivity(self, host: str, port: int, timeout: float = 2.0) -> bool:
|
||||
"""
|
||||
Check if a TCP port is reachable using socket connection.
|
||||
|
||||
This provides a fast check before attempting full database connection,
|
||||
preventing long waits when connecting to unsupported regions.
|
||||
|
||||
Args:
|
||||
host: Hostname or IP address
|
||||
port: Port number
|
||||
timeout: Connection timeout in seconds (default: 2.0)
|
||||
|
||||
Returns:
|
||||
True if port is reachable, False otherwise
|
||||
"""
|
||||
"""Fast TCP port check to avoid long waits on unsupported regions."""
|
||||
try:
|
||||
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
sock.settimeout(timeout)
|
||||
@@ -65,166 +47,101 @@ class AliyunLogStorePG:
|
||||
return False
|
||||
|
||||
def init_connection(self) -> bool:
|
||||
"""
|
||||
Initialize PostgreSQL connection pool for SLS PG protocol support.
|
||||
|
||||
Attempts to connect to SLS using PostgreSQL protocol. If successful, sets
|
||||
_use_pg_protocol to True and creates a connection pool. If connection fails
|
||||
(region doesn't support PG protocol or other errors), returns False.
|
||||
|
||||
Returns:
|
||||
True if PG protocol is supported and initialized, False otherwise
|
||||
"""
|
||||
"""Initialize SQLAlchemy connection pool with pool_recycle and TCP keepalive support."""
|
||||
try:
|
||||
# Extract hostname from endpoint (remove protocol if present)
|
||||
pg_host = self._endpoint.replace("http://", "").replace("https://", "")
|
||||
|
||||
# Get pool configuration
|
||||
pg_max_connections = int(os.environ.get("ALIYUN_SLS_PG_MAX_CONNECTIONS", 10))
|
||||
# Pool configuration
|
||||
pool_size = int(os.environ.get("ALIYUN_SLS_PG_POOL_SIZE", 5))
|
||||
max_overflow = int(os.environ.get("ALIYUN_SLS_PG_MAX_OVERFLOW", 5))
|
||||
pool_recycle = int(os.environ.get("ALIYUN_SLS_PG_POOL_RECYCLE", 3600))
|
||||
pool_pre_ping = os.environ.get("ALIYUN_SLS_PG_POOL_PRE_PING", "false").lower() == "true"
|
||||
|
||||
logger.debug(
|
||||
"Check PG protocol connection to SLS: host=%s, project=%s",
|
||||
pg_host,
|
||||
self.project_name,
|
||||
)
|
||||
logger.debug("Check PG protocol connection to SLS: host=%s, project=%s", pg_host, self.project_name)
|
||||
|
||||
# Fast port connectivity check before attempting full connection
|
||||
# This prevents long waits when connecting to unsupported regions
|
||||
# Fast port check to avoid long waits
|
||||
if not self._check_port_connectivity(pg_host, 5432, timeout=1.0):
|
||||
logger.info(
|
||||
"USE SDK mode for read/write operations, host=%s",
|
||||
pg_host,
|
||||
)
|
||||
logger.debug("Using SDK mode for host=%s", pg_host)
|
||||
return False
|
||||
|
||||
# Create connection pool
|
||||
self._pg_pool = psycopg2.pool.SimpleConnectionPool(
|
||||
minconn=1,
|
||||
maxconn=pg_max_connections,
|
||||
host=pg_host,
|
||||
port=5432,
|
||||
database=self.project_name,
|
||||
user=self._access_key_id,
|
||||
password=self._access_key_secret,
|
||||
sslmode="require",
|
||||
connect_timeout=5,
|
||||
application_name=f"Dify-{dify_config.project.version}",
|
||||
# Build connection URL
|
||||
from urllib.parse import quote_plus
|
||||
|
||||
username = quote_plus(self._access_key_id)
|
||||
password = quote_plus(self._access_key_secret)
|
||||
database_url = (
|
||||
f"postgresql+psycopg2://{username}:{password}@{pg_host}:5432/{self.project_name}?sslmode=require"
|
||||
)
|
||||
|
||||
# Note: Skip test query because SLS PG protocol only supports SELECT/INSERT on actual tables
|
||||
# Connection pool creation success already indicates connectivity
|
||||
# Create SQLAlchemy engine with connection pool
|
||||
self._engine = create_engine(
|
||||
database_url,
|
||||
pool_size=pool_size,
|
||||
max_overflow=max_overflow,
|
||||
pool_recycle=pool_recycle,
|
||||
pool_pre_ping=pool_pre_ping,
|
||||
pool_timeout=30,
|
||||
connect_args={
|
||||
"connect_timeout": 5,
|
||||
"application_name": f"Dify-{dify_config.project.version}-fixautocommit",
|
||||
"keepalives": 1,
|
||||
"keepalives_idle": 60,
|
||||
"keepalives_interval": 10,
|
||||
"keepalives_count": 5,
|
||||
},
|
||||
)
|
||||
|
||||
self._use_pg_protocol = True
|
||||
logger.info(
|
||||
"PG protocol initialized successfully for SLS project=%s. Will use PG for read/write operations.",
|
||||
"PG protocol initialized for SLS project=%s (pool_size=%d, pool_recycle=%ds)",
|
||||
self.project_name,
|
||||
pool_size,
|
||||
pool_recycle,
|
||||
)
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
# PG connection failed - fallback to SDK mode
|
||||
self._use_pg_protocol = False
|
||||
if self._pg_pool:
|
||||
if self._engine:
|
||||
try:
|
||||
self._pg_pool.closeall()
|
||||
self._engine.dispose()
|
||||
except Exception:
|
||||
logger.debug("Failed to close PG connection pool during cleanup, ignoring")
|
||||
self._pg_pool = None
|
||||
logger.debug("Failed to dispose engine during cleanup, ignoring")
|
||||
self._engine = None
|
||||
|
||||
logger.info(
|
||||
"PG protocol connection failed (region may not support PG protocol): %s. "
|
||||
"Falling back to SDK mode for read/write operations.",
|
||||
str(e),
|
||||
)
|
||||
return False
|
||||
|
||||
def _is_connection_valid(self, conn: Any) -> bool:
|
||||
"""
|
||||
Check if a connection is still valid.
|
||||
|
||||
Args:
|
||||
conn: psycopg2 connection object
|
||||
|
||||
Returns:
|
||||
True if connection is valid, False otherwise
|
||||
"""
|
||||
try:
|
||||
# Check if connection is closed
|
||||
if conn.closed:
|
||||
return False
|
||||
|
||||
# Quick ping test - execute a lightweight query
|
||||
# For SLS PG protocol, we can't use SELECT 1 without FROM,
|
||||
# so we just check the connection status
|
||||
with conn.cursor() as cursor:
|
||||
cursor.execute("SELECT 1")
|
||||
cursor.fetchone()
|
||||
return True
|
||||
except Exception:
|
||||
logger.debug("Using SDK mode for region: %s", str(e))
|
||||
return False
|
||||
|
||||
@contextmanager
|
||||
def _get_connection(self):
|
||||
"""
|
||||
Context manager to get a PostgreSQL connection from the pool.
|
||||
"""Get connection from SQLAlchemy pool. Pool handles recycle, invalidation, and keepalive automatically."""
|
||||
if not self._engine:
|
||||
raise RuntimeError("SQLAlchemy engine is not initialized")
|
||||
|
||||
Automatically validates and refreshes stale connections.
|
||||
|
||||
Note: Aliyun SLS PG protocol does not support transactions, so we always
|
||||
use autocommit mode.
|
||||
|
||||
Yields:
|
||||
psycopg2 connection object
|
||||
|
||||
Raises:
|
||||
RuntimeError: If PG pool is not initialized
|
||||
"""
|
||||
if not self._pg_pool:
|
||||
raise RuntimeError("PG connection pool is not initialized")
|
||||
|
||||
conn = self._pg_pool.getconn()
|
||||
connection = self._engine.raw_connection()
|
||||
try:
|
||||
# Validate connection and get a fresh one if needed
|
||||
if not self._is_connection_valid(conn):
|
||||
logger.debug("Connection is stale, marking as bad and getting a new one")
|
||||
# Mark connection as bad and get a new one
|
||||
self._pg_pool.putconn(conn, close=True)
|
||||
conn = self._pg_pool.getconn()
|
||||
|
||||
# Aliyun SLS PG protocol does not support transactions, always use autocommit
|
||||
conn.autocommit = True
|
||||
yield conn
|
||||
connection.autocommit = True # SLS PG protocol does not support transactions
|
||||
yield connection
|
||||
except Exception:
|
||||
raise
|
||||
finally:
|
||||
# Return connection to pool (or close if it's bad)
|
||||
if self._is_connection_valid(conn):
|
||||
self._pg_pool.putconn(conn)
|
||||
else:
|
||||
self._pg_pool.putconn(conn, close=True)
|
||||
connection.close()
|
||||
|
||||
def close(self) -> None:
|
||||
"""Close the PostgreSQL connection pool."""
|
||||
if self._pg_pool:
|
||||
"""Dispose SQLAlchemy engine and close all connections."""
|
||||
if self._engine:
|
||||
try:
|
||||
self._pg_pool.closeall()
|
||||
logger.info("PG connection pool closed")
|
||||
self._engine.dispose()
|
||||
logger.info("SQLAlchemy engine disposed")
|
||||
except Exception:
|
||||
logger.exception("Failed to close PG connection pool")
|
||||
logger.exception("Failed to dispose engine")
|
||||
|
||||
def _is_retriable_error(self, error: Exception) -> bool:
|
||||
"""
|
||||
Check if an error is retriable (connection-related issues).
|
||||
|
||||
Args:
|
||||
error: Exception to check
|
||||
|
||||
Returns:
|
||||
True if the error is retriable, False otherwise
|
||||
"""
|
||||
# Retry on connection-related errors
|
||||
if isinstance(error, (OperationalError, InterfaceError)):
|
||||
"""Check if error is retriable (connection-related issues)."""
|
||||
# Check for psycopg2 connection errors directly
|
||||
if isinstance(error, (psycopg2.OperationalError, psycopg2.InterfaceError)):
|
||||
return True
|
||||
|
||||
# Check error message for specific connection issues
|
||||
error_msg = str(error).lower()
|
||||
retriable_patterns = [
|
||||
"connection",
|
||||
@@ -234,34 +151,18 @@ class AliyunLogStorePG:
|
||||
"reset by peer",
|
||||
"no route to host",
|
||||
"network",
|
||||
"operational error",
|
||||
"interface error",
|
||||
]
|
||||
return any(pattern in error_msg for pattern in retriable_patterns)
|
||||
|
||||
def put_log(self, logstore: str, contents: Sequence[tuple[str, str]], log_enabled: bool = False) -> None:
|
||||
"""
|
||||
Write log to SLS using PostgreSQL protocol with automatic retry.
|
||||
|
||||
Note: SLS PG protocol only supports INSERT (not UPDATE). This uses append-only
|
||||
writes with log_version field for versioning, same as SDK implementation.
|
||||
|
||||
Args:
|
||||
logstore: Name of the logstore table
|
||||
contents: List of (field_name, value) tuples
|
||||
log_enabled: Whether to enable logging
|
||||
|
||||
Raises:
|
||||
psycopg2.Error: If database operation fails after all retries
|
||||
"""
|
||||
"""Write log to SLS using INSERT with automatic retry (3 attempts with exponential backoff)."""
|
||||
if not contents:
|
||||
return
|
||||
|
||||
# Extract field names and values from contents
|
||||
fields = [field_name for field_name, _ in contents]
|
||||
values = [value for _, value in contents]
|
||||
|
||||
# Build INSERT statement with literal values
|
||||
# Note: Aliyun SLS PG protocol doesn't support parameterized queries,
|
||||
# so we need to use mogrify to safely create literal values
|
||||
field_list = ", ".join([f'"{field}"' for field in fields])
|
||||
|
||||
if log_enabled:
|
||||
@@ -272,67 +173,40 @@ class AliyunLogStorePG:
|
||||
len(contents),
|
||||
)
|
||||
|
||||
# Retry configuration
|
||||
max_retries = 3
|
||||
retry_delay = 0.1 # Start with 100ms
|
||||
retry_delay = 0.1
|
||||
|
||||
for attempt in range(max_retries):
|
||||
try:
|
||||
with self._get_connection() as conn:
|
||||
with conn.cursor() as cursor:
|
||||
# Use mogrify to safely convert values to SQL literals
|
||||
placeholders = ", ".join(["%s"] * len(fields))
|
||||
values_literal = cursor.mogrify(f"({placeholders})", values).decode("utf-8")
|
||||
insert_sql = f'INSERT INTO "{logstore}" ({field_list}) VALUES {values_literal}'
|
||||
cursor.execute(insert_sql)
|
||||
# Success - exit retry loop
|
||||
return
|
||||
|
||||
except psycopg2.Error as e:
|
||||
# Check if error is retriable
|
||||
if not self._is_retriable_error(e):
|
||||
# Not a retriable error (e.g., data validation error), fail immediately
|
||||
logger.exception(
|
||||
"Failed to put logs to logstore %s via PG protocol (non-retriable error)",
|
||||
logstore,
|
||||
)
|
||||
logger.exception("Failed to put logs to logstore %s (non-retriable error)", logstore)
|
||||
raise
|
||||
|
||||
# Retriable error - log and retry if we have attempts left
|
||||
if attempt < max_retries - 1:
|
||||
logger.warning(
|
||||
"Failed to put logs to logstore %s via PG protocol (attempt %d/%d): %s. Retrying...",
|
||||
"Failed to put logs to logstore %s (attempt %d/%d): %s. Retrying...",
|
||||
logstore,
|
||||
attempt + 1,
|
||||
max_retries,
|
||||
str(e),
|
||||
)
|
||||
time.sleep(retry_delay)
|
||||
retry_delay *= 2 # Exponential backoff
|
||||
retry_delay *= 2
|
||||
else:
|
||||
# Last attempt failed
|
||||
logger.exception(
|
||||
"Failed to put logs to logstore %s via PG protocol after %d attempts",
|
||||
logstore,
|
||||
max_retries,
|
||||
)
|
||||
logger.exception("Failed to put logs to logstore %s after %d attempts", logstore, max_retries)
|
||||
raise
|
||||
|
||||
def execute_sql(self, sql: str, logstore: str, log_enabled: bool = False) -> list[dict[str, Any]]:
|
||||
"""
|
||||
Execute SQL query using PostgreSQL protocol with automatic retry.
|
||||
|
||||
Args:
|
||||
sql: SQL query string
|
||||
logstore: Name of the logstore (for logging purposes)
|
||||
log_enabled: Whether to enable logging
|
||||
|
||||
Returns:
|
||||
List of result rows as dictionaries
|
||||
|
||||
Raises:
|
||||
psycopg2.Error: If database operation fails after all retries
|
||||
"""
|
||||
"""Execute SQL query with automatic retry (3 attempts with exponential backoff)."""
|
||||
if log_enabled:
|
||||
logger.info(
|
||||
"[LogStore-PG] EXECUTE_SQL | logstore=%s | project=%s | sql=%s",
|
||||
@@ -341,20 +215,16 @@ class AliyunLogStorePG:
|
||||
sql,
|
||||
)
|
||||
|
||||
# Retry configuration
|
||||
max_retries = 3
|
||||
retry_delay = 0.1 # Start with 100ms
|
||||
retry_delay = 0.1
|
||||
|
||||
for attempt in range(max_retries):
|
||||
try:
|
||||
with self._get_connection() as conn:
|
||||
with conn.cursor() as cursor:
|
||||
cursor.execute(sql)
|
||||
|
||||
# Get column names from cursor description
|
||||
columns = [desc[0] for desc in cursor.description]
|
||||
|
||||
# Fetch all results and convert to list of dicts
|
||||
result = []
|
||||
for row in cursor.fetchall():
|
||||
row_dict = {}
|
||||
@@ -372,36 +242,31 @@ class AliyunLogStorePG:
|
||||
return result
|
||||
|
||||
except psycopg2.Error as e:
|
||||
# Check if error is retriable
|
||||
if not self._is_retriable_error(e):
|
||||
# Not a retriable error (e.g., SQL syntax error), fail immediately
|
||||
logger.exception(
|
||||
"Failed to execute SQL query on logstore %s via PG protocol (non-retriable error): sql=%s",
|
||||
"Failed to execute SQL on logstore %s (non-retriable error): sql=%s",
|
||||
logstore,
|
||||
sql,
|
||||
)
|
||||
raise
|
||||
|
||||
# Retriable error - log and retry if we have attempts left
|
||||
if attempt < max_retries - 1:
|
||||
logger.warning(
|
||||
"Failed to execute SQL query on logstore %s via PG protocol (attempt %d/%d): %s. Retrying...",
|
||||
"Failed to execute SQL on logstore %s (attempt %d/%d): %s. Retrying...",
|
||||
logstore,
|
||||
attempt + 1,
|
||||
max_retries,
|
||||
str(e),
|
||||
)
|
||||
time.sleep(retry_delay)
|
||||
retry_delay *= 2 # Exponential backoff
|
||||
retry_delay *= 2
|
||||
else:
|
||||
# Last attempt failed
|
||||
logger.exception(
|
||||
"Failed to execute SQL query on logstore %s via PG protocol after %d attempts: sql=%s",
|
||||
"Failed to execute SQL on logstore %s after %d attempts: sql=%s",
|
||||
logstore,
|
||||
max_retries,
|
||||
sql,
|
||||
)
|
||||
raise
|
||||
|
||||
# This line should never be reached due to raise above, but makes type checker happy
|
||||
return []
|
||||
|
||||
@@ -0,0 +1,29 @@
|
||||
"""
|
||||
LogStore repository utilities.
|
||||
"""
|
||||
|
||||
from typing import Any
|
||||
|
||||
|
||||
def safe_float(value: Any, default: float = 0.0) -> float:
|
||||
"""
|
||||
Safely convert a value to float, handling 'null' strings and None.
|
||||
"""
|
||||
if value is None or value in {"null", ""}:
|
||||
return default
|
||||
try:
|
||||
return float(value)
|
||||
except (ValueError, TypeError):
|
||||
return default
|
||||
|
||||
|
||||
def safe_int(value: Any, default: int = 0) -> int:
|
||||
"""
|
||||
Safely convert a value to int, handling 'null' strings and None.
|
||||
"""
|
||||
if value is None or value in {"null", ""}:
|
||||
return default
|
||||
try:
|
||||
return int(float(value))
|
||||
except (ValueError, TypeError):
|
||||
return default
|
||||
|
||||
@@ -14,6 +14,8 @@ from typing import Any
|
||||
from sqlalchemy.orm import sessionmaker
|
||||
|
||||
from extensions.logstore.aliyun_logstore import AliyunLogStore
|
||||
from extensions.logstore.repositories import safe_float, safe_int
|
||||
from extensions.logstore.sql_escape import escape_identifier, escape_logstore_query_value
|
||||
from models.workflow import WorkflowNodeExecutionModel
|
||||
from repositories.api_workflow_node_execution_repository import DifyAPIWorkflowNodeExecutionRepository
|
||||
|
||||
@@ -52,9 +54,8 @@ def _dict_to_workflow_node_execution_model(data: dict[str, Any]) -> WorkflowNode
|
||||
model.created_by_role = data.get("created_by_role") or ""
|
||||
model.created_by = data.get("created_by") or ""
|
||||
|
||||
# Numeric fields with defaults
|
||||
model.index = int(data.get("index", 0))
|
||||
model.elapsed_time = float(data.get("elapsed_time", 0))
|
||||
model.index = safe_int(data.get("index", 0))
|
||||
model.elapsed_time = safe_float(data.get("elapsed_time", 0))
|
||||
|
||||
# Optional fields
|
||||
model.workflow_run_id = data.get("workflow_run_id")
|
||||
@@ -130,6 +131,12 @@ class LogstoreAPIWorkflowNodeExecutionRepository(DifyAPIWorkflowNodeExecutionRep
|
||||
node_id,
|
||||
)
|
||||
try:
|
||||
# Escape parameters to prevent SQL injection
|
||||
escaped_tenant_id = escape_identifier(tenant_id)
|
||||
escaped_app_id = escape_identifier(app_id)
|
||||
escaped_workflow_id = escape_identifier(workflow_id)
|
||||
escaped_node_id = escape_identifier(node_id)
|
||||
|
||||
# Check if PG protocol is supported
|
||||
if self.logstore_client.supports_pg_protocol:
|
||||
# Use PG protocol with SQL query (get latest version of each record)
|
||||
@@ -138,10 +145,10 @@ class LogstoreAPIWorkflowNodeExecutionRepository(DifyAPIWorkflowNodeExecutionRep
|
||||
SELECT *,
|
||||
ROW_NUMBER() OVER (PARTITION BY id ORDER BY log_version DESC) as rn
|
||||
FROM "{AliyunLogStore.workflow_node_execution_logstore}"
|
||||
WHERE tenant_id = '{tenant_id}'
|
||||
AND app_id = '{app_id}'
|
||||
AND workflow_id = '{workflow_id}'
|
||||
AND node_id = '{node_id}'
|
||||
WHERE tenant_id = '{escaped_tenant_id}'
|
||||
AND app_id = '{escaped_app_id}'
|
||||
AND workflow_id = '{escaped_workflow_id}'
|
||||
AND node_id = '{escaped_node_id}'
|
||||
AND __time__ > 0
|
||||
) AS subquery WHERE rn = 1
|
||||
LIMIT 100
|
||||
@@ -153,7 +160,8 @@ class LogstoreAPIWorkflowNodeExecutionRepository(DifyAPIWorkflowNodeExecutionRep
|
||||
else:
|
||||
# Use SDK with LogStore query syntax
|
||||
query = (
|
||||
f"tenant_id: {tenant_id} and app_id: {app_id} and workflow_id: {workflow_id} and node_id: {node_id}"
|
||||
f"tenant_id: {escaped_tenant_id} and app_id: {escaped_app_id} "
|
||||
f"and workflow_id: {escaped_workflow_id} and node_id: {escaped_node_id}"
|
||||
)
|
||||
from_time = 0
|
||||
to_time = int(time.time()) # now
|
||||
@@ -227,6 +235,11 @@ class LogstoreAPIWorkflowNodeExecutionRepository(DifyAPIWorkflowNodeExecutionRep
|
||||
workflow_run_id,
|
||||
)
|
||||
try:
|
||||
# Escape parameters to prevent SQL injection
|
||||
escaped_tenant_id = escape_identifier(tenant_id)
|
||||
escaped_app_id = escape_identifier(app_id)
|
||||
escaped_workflow_run_id = escape_identifier(workflow_run_id)
|
||||
|
||||
# Check if PG protocol is supported
|
||||
if self.logstore_client.supports_pg_protocol:
|
||||
# Use PG protocol with SQL query (get latest version of each record)
|
||||
@@ -235,9 +248,9 @@ class LogstoreAPIWorkflowNodeExecutionRepository(DifyAPIWorkflowNodeExecutionRep
|
||||
SELECT *,
|
||||
ROW_NUMBER() OVER (PARTITION BY id ORDER BY log_version DESC) as rn
|
||||
FROM "{AliyunLogStore.workflow_node_execution_logstore}"
|
||||
WHERE tenant_id = '{tenant_id}'
|
||||
AND app_id = '{app_id}'
|
||||
AND workflow_run_id = '{workflow_run_id}'
|
||||
WHERE tenant_id = '{escaped_tenant_id}'
|
||||
AND app_id = '{escaped_app_id}'
|
||||
AND workflow_run_id = '{escaped_workflow_run_id}'
|
||||
AND __time__ > 0
|
||||
) AS subquery WHERE rn = 1
|
||||
LIMIT 1000
|
||||
@@ -248,7 +261,10 @@ class LogstoreAPIWorkflowNodeExecutionRepository(DifyAPIWorkflowNodeExecutionRep
|
||||
)
|
||||
else:
|
||||
# Use SDK with LogStore query syntax
|
||||
query = f"tenant_id: {tenant_id} and app_id: {app_id} and workflow_run_id: {workflow_run_id}"
|
||||
query = (
|
||||
f"tenant_id: {escaped_tenant_id} and app_id: {escaped_app_id} "
|
||||
f"and workflow_run_id: {escaped_workflow_run_id}"
|
||||
)
|
||||
from_time = 0
|
||||
to_time = int(time.time()) # now
|
||||
|
||||
@@ -313,16 +329,24 @@ class LogstoreAPIWorkflowNodeExecutionRepository(DifyAPIWorkflowNodeExecutionRep
|
||||
"""
|
||||
logger.debug("get_execution_by_id: execution_id=%s, tenant_id=%s", execution_id, tenant_id)
|
||||
try:
|
||||
# Escape parameters to prevent SQL injection
|
||||
escaped_execution_id = escape_identifier(execution_id)
|
||||
|
||||
# Check if PG protocol is supported
|
||||
if self.logstore_client.supports_pg_protocol:
|
||||
# Use PG protocol with SQL query (get latest version of record)
|
||||
tenant_filter = f"AND tenant_id = '{tenant_id}'" if tenant_id else ""
|
||||
if tenant_id:
|
||||
escaped_tenant_id = escape_identifier(tenant_id)
|
||||
tenant_filter = f"AND tenant_id = '{escaped_tenant_id}'"
|
||||
else:
|
||||
tenant_filter = ""
|
||||
|
||||
sql_query = f"""
|
||||
SELECT * FROM (
|
||||
SELECT *,
|
||||
ROW_NUMBER() OVER (PARTITION BY id ORDER BY log_version DESC) as rn
|
||||
FROM "{AliyunLogStore.workflow_node_execution_logstore}"
|
||||
WHERE id = '{execution_id}' {tenant_filter} AND __time__ > 0
|
||||
WHERE id = '{escaped_execution_id}' {tenant_filter} AND __time__ > 0
|
||||
) AS subquery WHERE rn = 1
|
||||
LIMIT 1
|
||||
"""
|
||||
@@ -332,10 +356,14 @@ class LogstoreAPIWorkflowNodeExecutionRepository(DifyAPIWorkflowNodeExecutionRep
|
||||
)
|
||||
else:
|
||||
# Use SDK with LogStore query syntax
|
||||
# Note: Values must be quoted in LogStore query syntax to prevent injection
|
||||
if tenant_id:
|
||||
query = f"id: {execution_id} and tenant_id: {tenant_id}"
|
||||
query = (
|
||||
f"id:{escape_logstore_query_value(execution_id)} "
|
||||
f"and tenant_id:{escape_logstore_query_value(tenant_id)}"
|
||||
)
|
||||
else:
|
||||
query = f"id: {execution_id}"
|
||||
query = f"id:{escape_logstore_query_value(execution_id)}"
|
||||
|
||||
from_time = 0
|
||||
to_time = int(time.time()) # now
|
||||
|
||||
@@ -10,6 +10,7 @@ Key Features:
|
||||
- Optimized deduplication using finished_at IS NOT NULL filter
|
||||
- Window functions only when necessary (running status queries)
|
||||
- Multi-tenant data isolation and security
|
||||
- SQL injection prevention via parameter escaping
|
||||
"""
|
||||
|
||||
import logging
|
||||
@@ -22,6 +23,8 @@ from typing import Any, cast
|
||||
from sqlalchemy.orm import sessionmaker
|
||||
|
||||
from extensions.logstore.aliyun_logstore import AliyunLogStore
|
||||
from extensions.logstore.repositories import safe_float, safe_int
|
||||
from extensions.logstore.sql_escape import escape_identifier, escape_logstore_query_value, escape_sql_string
|
||||
from libs.infinite_scroll_pagination import InfiniteScrollPagination
|
||||
from models.enums import WorkflowRunTriggeredFrom
|
||||
from models.workflow import WorkflowRun
|
||||
@@ -63,10 +66,9 @@ def _dict_to_workflow_run(data: dict[str, Any]) -> WorkflowRun:
|
||||
model.created_by_role = data.get("created_by_role") or ""
|
||||
model.created_by = data.get("created_by") or ""
|
||||
|
||||
# Numeric fields with defaults
|
||||
model.total_tokens = int(data.get("total_tokens", 0))
|
||||
model.total_steps = int(data.get("total_steps", 0))
|
||||
model.exceptions_count = int(data.get("exceptions_count", 0))
|
||||
model.total_tokens = safe_int(data.get("total_tokens", 0))
|
||||
model.total_steps = safe_int(data.get("total_steps", 0))
|
||||
model.exceptions_count = safe_int(data.get("exceptions_count", 0))
|
||||
|
||||
# Optional fields
|
||||
model.graph = data.get("graph")
|
||||
@@ -101,7 +103,8 @@ def _dict_to_workflow_run(data: dict[str, Any]) -> WorkflowRun:
|
||||
if model.finished_at and model.created_at:
|
||||
model.elapsed_time = (model.finished_at - model.created_at).total_seconds()
|
||||
else:
|
||||
model.elapsed_time = float(data.get("elapsed_time", 0))
|
||||
# Use safe conversion to handle 'null' strings and None values
|
||||
model.elapsed_time = safe_float(data.get("elapsed_time", 0))
|
||||
|
||||
return model
|
||||
|
||||
@@ -165,16 +168,26 @@ class LogstoreAPIWorkflowRunRepository(APIWorkflowRunRepository):
|
||||
status,
|
||||
)
|
||||
# Convert triggered_from to list if needed
|
||||
if isinstance(triggered_from, WorkflowRunTriggeredFrom):
|
||||
if isinstance(triggered_from, (WorkflowRunTriggeredFrom, str)):
|
||||
triggered_from_list = [triggered_from]
|
||||
else:
|
||||
triggered_from_list = list(triggered_from)
|
||||
|
||||
# Build triggered_from filter
|
||||
triggered_from_filter = " OR ".join([f"triggered_from='{tf.value}'" for tf in triggered_from_list])
|
||||
# Escape parameters to prevent SQL injection
|
||||
escaped_tenant_id = escape_identifier(tenant_id)
|
||||
escaped_app_id = escape_identifier(app_id)
|
||||
|
||||
# Build status filter
|
||||
status_filter = f"AND status='{status}'" if status else ""
|
||||
# Build triggered_from filter with escaped values
|
||||
# Support both enum and string values for triggered_from
|
||||
triggered_from_filter = " OR ".join(
|
||||
[
|
||||
f"triggered_from='{escape_sql_string(tf.value if isinstance(tf, WorkflowRunTriggeredFrom) else tf)}'"
|
||||
for tf in triggered_from_list
|
||||
]
|
||||
)
|
||||
|
||||
# Build status filter with escaped value
|
||||
status_filter = f"AND status='{escape_sql_string(status)}'" if status else ""
|
||||
|
||||
# Build last_id filter for pagination
|
||||
# Note: This is simplified. In production, you'd need to track created_at from last record
|
||||
@@ -188,8 +201,8 @@ class LogstoreAPIWorkflowRunRepository(APIWorkflowRunRepository):
|
||||
SELECT * FROM (
|
||||
SELECT *, ROW_NUMBER() OVER (PARTITION BY id ORDER BY log_version DESC) AS rn
|
||||
FROM {AliyunLogStore.workflow_execution_logstore}
|
||||
WHERE tenant_id='{tenant_id}'
|
||||
AND app_id='{app_id}'
|
||||
WHERE tenant_id='{escaped_tenant_id}'
|
||||
AND app_id='{escaped_app_id}'
|
||||
AND ({triggered_from_filter})
|
||||
{status_filter}
|
||||
{last_id_filter}
|
||||
@@ -232,6 +245,11 @@ class LogstoreAPIWorkflowRunRepository(APIWorkflowRunRepository):
|
||||
logger.debug("get_workflow_run_by_id: tenant_id=%s, app_id=%s, run_id=%s", tenant_id, app_id, run_id)
|
||||
|
||||
try:
|
||||
# Escape parameters to prevent SQL injection
|
||||
escaped_run_id = escape_identifier(run_id)
|
||||
escaped_tenant_id = escape_identifier(tenant_id)
|
||||
escaped_app_id = escape_identifier(app_id)
|
||||
|
||||
# Check if PG protocol is supported
|
||||
if self.logstore_client.supports_pg_protocol:
|
||||
# Use PG protocol with SQL query (get latest version of record)
|
||||
@@ -240,7 +258,10 @@ class LogstoreAPIWorkflowRunRepository(APIWorkflowRunRepository):
|
||||
SELECT *,
|
||||
ROW_NUMBER() OVER (PARTITION BY id ORDER BY log_version DESC) as rn
|
||||
FROM "{AliyunLogStore.workflow_execution_logstore}"
|
||||
WHERE id = '{run_id}' AND tenant_id = '{tenant_id}' AND app_id = '{app_id}' AND __time__ > 0
|
||||
WHERE id = '{escaped_run_id}'
|
||||
AND tenant_id = '{escaped_tenant_id}'
|
||||
AND app_id = '{escaped_app_id}'
|
||||
AND __time__ > 0
|
||||
) AS subquery WHERE rn = 1
|
||||
LIMIT 100
|
||||
"""
|
||||
@@ -250,7 +271,12 @@ class LogstoreAPIWorkflowRunRepository(APIWorkflowRunRepository):
|
||||
)
|
||||
else:
|
||||
# Use SDK with LogStore query syntax
|
||||
query = f"id: {run_id} and tenant_id: {tenant_id} and app_id: {app_id}"
|
||||
# Note: Values must be quoted in LogStore query syntax to prevent injection
|
||||
query = (
|
||||
f"id:{escape_logstore_query_value(run_id)} "
|
||||
f"and tenant_id:{escape_logstore_query_value(tenant_id)} "
|
||||
f"and app_id:{escape_logstore_query_value(app_id)}"
|
||||
)
|
||||
from_time = 0
|
||||
to_time = int(time.time()) # now
|
||||
|
||||
@@ -323,6 +349,9 @@ class LogstoreAPIWorkflowRunRepository(APIWorkflowRunRepository):
|
||||
logger.debug("get_workflow_run_by_id_without_tenant: run_id=%s", run_id)
|
||||
|
||||
try:
|
||||
# Escape parameter to prevent SQL injection
|
||||
escaped_run_id = escape_identifier(run_id)
|
||||
|
||||
# Check if PG protocol is supported
|
||||
if self.logstore_client.supports_pg_protocol:
|
||||
# Use PG protocol with SQL query (get latest version of record)
|
||||
@@ -331,7 +360,7 @@ class LogstoreAPIWorkflowRunRepository(APIWorkflowRunRepository):
|
||||
SELECT *,
|
||||
ROW_NUMBER() OVER (PARTITION BY id ORDER BY log_version DESC) as rn
|
||||
FROM "{AliyunLogStore.workflow_execution_logstore}"
|
||||
WHERE id = '{run_id}' AND __time__ > 0
|
||||
WHERE id = '{escaped_run_id}' AND __time__ > 0
|
||||
) AS subquery WHERE rn = 1
|
||||
LIMIT 100
|
||||
"""
|
||||
@@ -341,7 +370,8 @@ class LogstoreAPIWorkflowRunRepository(APIWorkflowRunRepository):
|
||||
)
|
||||
else:
|
||||
# Use SDK with LogStore query syntax
|
||||
query = f"id: {run_id}"
|
||||
# Note: Values must be quoted in LogStore query syntax
|
||||
query = f"id:{escape_logstore_query_value(run_id)}"
|
||||
from_time = 0
|
||||
to_time = int(time.time()) # now
|
||||
|
||||
@@ -410,6 +440,11 @@ class LogstoreAPIWorkflowRunRepository(APIWorkflowRunRepository):
|
||||
triggered_from,
|
||||
status,
|
||||
)
|
||||
# Escape parameters to prevent SQL injection
|
||||
escaped_tenant_id = escape_identifier(tenant_id)
|
||||
escaped_app_id = escape_identifier(app_id)
|
||||
escaped_triggered_from = escape_sql_string(triggered_from)
|
||||
|
||||
# Build time range filter
|
||||
time_filter = ""
|
||||
if time_range:
|
||||
@@ -418,6 +453,8 @@ class LogstoreAPIWorkflowRunRepository(APIWorkflowRunRepository):
|
||||
|
||||
# If status is provided, simple count
|
||||
if status:
|
||||
escaped_status = escape_sql_string(status)
|
||||
|
||||
if status == "running":
|
||||
# Running status requires window function
|
||||
sql = f"""
|
||||
@@ -425,9 +462,9 @@ class LogstoreAPIWorkflowRunRepository(APIWorkflowRunRepository):
|
||||
FROM (
|
||||
SELECT *, ROW_NUMBER() OVER (PARTITION BY id ORDER BY log_version DESC) AS rn
|
||||
FROM {AliyunLogStore.workflow_execution_logstore}
|
||||
WHERE tenant_id='{tenant_id}'
|
||||
AND app_id='{app_id}'
|
||||
AND triggered_from='{triggered_from}'
|
||||
WHERE tenant_id='{escaped_tenant_id}'
|
||||
AND app_id='{escaped_app_id}'
|
||||
AND triggered_from='{escaped_triggered_from}'
|
||||
AND status='running'
|
||||
{time_filter}
|
||||
) t
|
||||
@@ -438,10 +475,10 @@ class LogstoreAPIWorkflowRunRepository(APIWorkflowRunRepository):
|
||||
sql = f"""
|
||||
SELECT COUNT(DISTINCT id) as count
|
||||
FROM {AliyunLogStore.workflow_execution_logstore}
|
||||
WHERE tenant_id='{tenant_id}'
|
||||
AND app_id='{app_id}'
|
||||
AND triggered_from='{triggered_from}'
|
||||
AND status='{status}'
|
||||
WHERE tenant_id='{escaped_tenant_id}'
|
||||
AND app_id='{escaped_app_id}'
|
||||
AND triggered_from='{escaped_triggered_from}'
|
||||
AND status='{escaped_status}'
|
||||
AND finished_at IS NOT NULL
|
||||
{time_filter}
|
||||
"""
|
||||
@@ -467,13 +504,14 @@ class LogstoreAPIWorkflowRunRepository(APIWorkflowRunRepository):
|
||||
# No status filter - get counts grouped by status
|
||||
# Use optimized query for finished runs, separate query for running
|
||||
try:
|
||||
# Escape parameters (already escaped above, reuse variables)
|
||||
# Count finished runs grouped by status
|
||||
finished_sql = f"""
|
||||
SELECT status, COUNT(DISTINCT id) as count
|
||||
FROM {AliyunLogStore.workflow_execution_logstore}
|
||||
WHERE tenant_id='{tenant_id}'
|
||||
AND app_id='{app_id}'
|
||||
AND triggered_from='{triggered_from}'
|
||||
WHERE tenant_id='{escaped_tenant_id}'
|
||||
AND app_id='{escaped_app_id}'
|
||||
AND triggered_from='{escaped_triggered_from}'
|
||||
AND finished_at IS NOT NULL
|
||||
{time_filter}
|
||||
GROUP BY status
|
||||
@@ -485,9 +523,9 @@ class LogstoreAPIWorkflowRunRepository(APIWorkflowRunRepository):
|
||||
FROM (
|
||||
SELECT *, ROW_NUMBER() OVER (PARTITION BY id ORDER BY log_version DESC) AS rn
|
||||
FROM {AliyunLogStore.workflow_execution_logstore}
|
||||
WHERE tenant_id='{tenant_id}'
|
||||
AND app_id='{app_id}'
|
||||
AND triggered_from='{triggered_from}'
|
||||
WHERE tenant_id='{escaped_tenant_id}'
|
||||
AND app_id='{escaped_app_id}'
|
||||
AND triggered_from='{escaped_triggered_from}'
|
||||
AND status='running'
|
||||
{time_filter}
|
||||
) t
|
||||
@@ -546,7 +584,13 @@ class LogstoreAPIWorkflowRunRepository(APIWorkflowRunRepository):
|
||||
logger.debug(
|
||||
"get_daily_runs_statistics: tenant_id=%s, app_id=%s, triggered_from=%s", tenant_id, app_id, triggered_from
|
||||
)
|
||||
# Build time range filter
|
||||
|
||||
# Escape parameters to prevent SQL injection
|
||||
escaped_tenant_id = escape_identifier(tenant_id)
|
||||
escaped_app_id = escape_identifier(app_id)
|
||||
escaped_triggered_from = escape_sql_string(triggered_from)
|
||||
|
||||
# Build time range filter (datetime.isoformat() is safe)
|
||||
time_filter = ""
|
||||
if start_date:
|
||||
time_filter += f" AND __time__ >= to_unixtime(from_iso8601_timestamp('{start_date.isoformat()}'))"
|
||||
@@ -557,9 +601,9 @@ class LogstoreAPIWorkflowRunRepository(APIWorkflowRunRepository):
|
||||
sql = f"""
|
||||
SELECT DATE(from_unixtime(__time__)) as date, COUNT(DISTINCT id) as runs
|
||||
FROM {AliyunLogStore.workflow_execution_logstore}
|
||||
WHERE tenant_id='{tenant_id}'
|
||||
AND app_id='{app_id}'
|
||||
AND triggered_from='{triggered_from}'
|
||||
WHERE tenant_id='{escaped_tenant_id}'
|
||||
AND app_id='{escaped_app_id}'
|
||||
AND triggered_from='{escaped_triggered_from}'
|
||||
AND finished_at IS NOT NULL
|
||||
{time_filter}
|
||||
GROUP BY date
|
||||
@@ -601,7 +645,13 @@ class LogstoreAPIWorkflowRunRepository(APIWorkflowRunRepository):
|
||||
app_id,
|
||||
triggered_from,
|
||||
)
|
||||
# Build time range filter
|
||||
|
||||
# Escape parameters to prevent SQL injection
|
||||
escaped_tenant_id = escape_identifier(tenant_id)
|
||||
escaped_app_id = escape_identifier(app_id)
|
||||
escaped_triggered_from = escape_sql_string(triggered_from)
|
||||
|
||||
# Build time range filter (datetime.isoformat() is safe)
|
||||
time_filter = ""
|
||||
if start_date:
|
||||
time_filter += f" AND __time__ >= to_unixtime(from_iso8601_timestamp('{start_date.isoformat()}'))"
|
||||
@@ -611,9 +661,9 @@ class LogstoreAPIWorkflowRunRepository(APIWorkflowRunRepository):
|
||||
sql = f"""
|
||||
SELECT DATE(from_unixtime(__time__)) as date, COUNT(DISTINCT created_by) as terminal_count
|
||||
FROM {AliyunLogStore.workflow_execution_logstore}
|
||||
WHERE tenant_id='{tenant_id}'
|
||||
AND app_id='{app_id}'
|
||||
AND triggered_from='{triggered_from}'
|
||||
WHERE tenant_id='{escaped_tenant_id}'
|
||||
AND app_id='{escaped_app_id}'
|
||||
AND triggered_from='{escaped_triggered_from}'
|
||||
AND finished_at IS NOT NULL
|
||||
{time_filter}
|
||||
GROUP BY date
|
||||
@@ -655,7 +705,13 @@ class LogstoreAPIWorkflowRunRepository(APIWorkflowRunRepository):
|
||||
app_id,
|
||||
triggered_from,
|
||||
)
|
||||
# Build time range filter
|
||||
|
||||
# Escape parameters to prevent SQL injection
|
||||
escaped_tenant_id = escape_identifier(tenant_id)
|
||||
escaped_app_id = escape_identifier(app_id)
|
||||
escaped_triggered_from = escape_sql_string(triggered_from)
|
||||
|
||||
# Build time range filter (datetime.isoformat() is safe)
|
||||
time_filter = ""
|
||||
if start_date:
|
||||
time_filter += f" AND __time__ >= to_unixtime(from_iso8601_timestamp('{start_date.isoformat()}'))"
|
||||
@@ -665,9 +721,9 @@ class LogstoreAPIWorkflowRunRepository(APIWorkflowRunRepository):
|
||||
sql = f"""
|
||||
SELECT DATE(from_unixtime(__time__)) as date, SUM(total_tokens) as token_count
|
||||
FROM {AliyunLogStore.workflow_execution_logstore}
|
||||
WHERE tenant_id='{tenant_id}'
|
||||
AND app_id='{app_id}'
|
||||
AND triggered_from='{triggered_from}'
|
||||
WHERE tenant_id='{escaped_tenant_id}'
|
||||
AND app_id='{escaped_app_id}'
|
||||
AND triggered_from='{escaped_triggered_from}'
|
||||
AND finished_at IS NOT NULL
|
||||
{time_filter}
|
||||
GROUP BY date
|
||||
@@ -709,7 +765,13 @@ class LogstoreAPIWorkflowRunRepository(APIWorkflowRunRepository):
|
||||
app_id,
|
||||
triggered_from,
|
||||
)
|
||||
# Build time range filter
|
||||
|
||||
# Escape parameters to prevent SQL injection
|
||||
escaped_tenant_id = escape_identifier(tenant_id)
|
||||
escaped_app_id = escape_identifier(app_id)
|
||||
escaped_triggered_from = escape_sql_string(triggered_from)
|
||||
|
||||
# Build time range filter (datetime.isoformat() is safe)
|
||||
time_filter = ""
|
||||
if start_date:
|
||||
time_filter += f" AND __time__ >= to_unixtime(from_iso8601_timestamp('{start_date.isoformat()}'))"
|
||||
@@ -726,9 +788,9 @@ class LogstoreAPIWorkflowRunRepository(APIWorkflowRunRepository):
|
||||
created_by,
|
||||
COUNT(DISTINCT id) AS interactions
|
||||
FROM {AliyunLogStore.workflow_execution_logstore}
|
||||
WHERE tenant_id='{tenant_id}'
|
||||
AND app_id='{app_id}'
|
||||
AND triggered_from='{triggered_from}'
|
||||
WHERE tenant_id='{escaped_tenant_id}'
|
||||
AND app_id='{escaped_app_id}'
|
||||
AND triggered_from='{escaped_triggered_from}'
|
||||
AND finished_at IS NOT NULL
|
||||
{time_filter}
|
||||
GROUP BY date, created_by
|
||||
|
||||
@@ -10,6 +10,7 @@ from sqlalchemy.orm import sessionmaker
|
||||
from core.repositories.sqlalchemy_workflow_execution_repository import SQLAlchemyWorkflowExecutionRepository
|
||||
from core.workflow.entities import WorkflowExecution
|
||||
from core.workflow.repositories.workflow_execution_repository import WorkflowExecutionRepository
|
||||
from core.workflow.workflow_type_encoder import WorkflowRuntimeTypeConverter
|
||||
from extensions.logstore.aliyun_logstore import AliyunLogStore
|
||||
from libs.helper import extract_tenant_id
|
||||
from models import (
|
||||
@@ -22,18 +23,6 @@ from models.enums import WorkflowRunTriggeredFrom
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def to_serializable(obj):
|
||||
"""
|
||||
Convert non-JSON-serializable objects into JSON-compatible formats.
|
||||
|
||||
- Uses `to_dict()` if it's a callable method.
|
||||
- Falls back to string representation.
|
||||
"""
|
||||
if hasattr(obj, "to_dict") and callable(obj.to_dict):
|
||||
return obj.to_dict()
|
||||
return str(obj)
|
||||
|
||||
|
||||
class LogstoreWorkflowExecutionRepository(WorkflowExecutionRepository):
|
||||
def __init__(
|
||||
self,
|
||||
@@ -79,7 +68,7 @@ class LogstoreWorkflowExecutionRepository(WorkflowExecutionRepository):
|
||||
|
||||
# Control flag for dual-write (write to both LogStore and SQL database)
|
||||
# Set to True to enable dual-write for safe migration, False to use LogStore only
|
||||
self._enable_dual_write = os.environ.get("LOGSTORE_DUAL_WRITE_ENABLED", "true").lower() == "true"
|
||||
self._enable_dual_write = os.environ.get("LOGSTORE_DUAL_WRITE_ENABLED", "false").lower() == "true"
|
||||
|
||||
# Control flag for whether to write the `graph` field to LogStore.
|
||||
# If LOGSTORE_ENABLE_PUT_GRAPH_FIELD is "true", write the full `graph` field;
|
||||
@@ -113,6 +102,9 @@ class LogstoreWorkflowExecutionRepository(WorkflowExecutionRepository):
|
||||
# Generate log_version as nanosecond timestamp for record versioning
|
||||
log_version = str(time.time_ns())
|
||||
|
||||
# Use WorkflowRuntimeTypeConverter to handle complex types (Segment, File, etc.)
|
||||
json_converter = WorkflowRuntimeTypeConverter()
|
||||
|
||||
logstore_model = [
|
||||
("id", domain_model.id_),
|
||||
("log_version", log_version), # Add log_version field for append-only writes
|
||||
@@ -127,19 +119,19 @@ class LogstoreWorkflowExecutionRepository(WorkflowExecutionRepository):
|
||||
("version", domain_model.workflow_version),
|
||||
(
|
||||
"graph",
|
||||
json.dumps(domain_model.graph, ensure_ascii=False, default=to_serializable)
|
||||
json.dumps(json_converter.to_json_encodable(domain_model.graph), ensure_ascii=False)
|
||||
if domain_model.graph and self._enable_put_graph_field
|
||||
else "{}",
|
||||
),
|
||||
(
|
||||
"inputs",
|
||||
json.dumps(domain_model.inputs, ensure_ascii=False, default=to_serializable)
|
||||
json.dumps(json_converter.to_json_encodable(domain_model.inputs), ensure_ascii=False)
|
||||
if domain_model.inputs
|
||||
else "{}",
|
||||
),
|
||||
(
|
||||
"outputs",
|
||||
json.dumps(domain_model.outputs, ensure_ascii=False, default=to_serializable)
|
||||
json.dumps(json_converter.to_json_encodable(domain_model.outputs), ensure_ascii=False)
|
||||
if domain_model.outputs
|
||||
else "{}",
|
||||
),
|
||||
|
||||
@@ -24,6 +24,8 @@ from core.workflow.enums import NodeType
|
||||
from core.workflow.repositories.workflow_node_execution_repository import OrderConfig, WorkflowNodeExecutionRepository
|
||||
from core.workflow.workflow_type_encoder import WorkflowRuntimeTypeConverter
|
||||
from extensions.logstore.aliyun_logstore import AliyunLogStore
|
||||
from extensions.logstore.repositories import safe_float, safe_int
|
||||
from extensions.logstore.sql_escape import escape_identifier
|
||||
from libs.helper import extract_tenant_id
|
||||
from models import (
|
||||
Account,
|
||||
@@ -73,7 +75,7 @@ def _dict_to_workflow_node_execution(data: dict[str, Any]) -> WorkflowNodeExecut
|
||||
node_execution_id=data.get("node_execution_id"),
|
||||
workflow_id=data.get("workflow_id", ""),
|
||||
workflow_execution_id=data.get("workflow_run_id"),
|
||||
index=int(data.get("index", 0)),
|
||||
index=safe_int(data.get("index", 0)),
|
||||
predecessor_node_id=data.get("predecessor_node_id"),
|
||||
node_id=data.get("node_id", ""),
|
||||
node_type=NodeType(data.get("node_type", "start")),
|
||||
@@ -83,7 +85,7 @@ def _dict_to_workflow_node_execution(data: dict[str, Any]) -> WorkflowNodeExecut
|
||||
outputs=outputs,
|
||||
status=status,
|
||||
error=data.get("error"),
|
||||
elapsed_time=float(data.get("elapsed_time", 0.0)),
|
||||
elapsed_time=safe_float(data.get("elapsed_time", 0.0)),
|
||||
metadata=domain_metadata,
|
||||
created_at=created_at,
|
||||
finished_at=finished_at,
|
||||
@@ -147,7 +149,7 @@ class LogstoreWorkflowNodeExecutionRepository(WorkflowNodeExecutionRepository):
|
||||
|
||||
# Control flag for dual-write (write to both LogStore and SQL database)
|
||||
# Set to True to enable dual-write for safe migration, False to use LogStore only
|
||||
self._enable_dual_write = os.environ.get("LOGSTORE_DUAL_WRITE_ENABLED", "true").lower() == "true"
|
||||
self._enable_dual_write = os.environ.get("LOGSTORE_DUAL_WRITE_ENABLED", "false").lower() == "true"
|
||||
|
||||
def _to_logstore_model(self, domain_model: WorkflowNodeExecution) -> Sequence[tuple[str, str]]:
|
||||
logger.debug(
|
||||
@@ -274,16 +276,34 @@ class LogstoreWorkflowNodeExecutionRepository(WorkflowNodeExecutionRepository):
|
||||
Save or update the inputs, process_data, or outputs associated with a specific
|
||||
node_execution record.
|
||||
|
||||
For LogStore implementation, this is similar to save() since we always write
|
||||
complete records. We append a new record with updated data fields.
|
||||
For LogStore implementation, this is a no-op for the LogStore write because save()
|
||||
already writes all fields including inputs, process_data, and outputs. The caller
|
||||
typically calls save() first to persist status/metadata, then calls save_execution_data()
|
||||
to persist data fields. Since LogStore writes complete records atomically, we don't
|
||||
need a separate write here to avoid duplicate records.
|
||||
|
||||
However, if dual-write is enabled, we still need to call the SQL repository's
|
||||
save_execution_data() method to properly update the SQL database.
|
||||
|
||||
Args:
|
||||
execution: The NodeExecution instance with data to save
|
||||
"""
|
||||
logger.debug("save_execution_data: id=%s, node_execution_id=%s", execution.id, execution.node_execution_id)
|
||||
# In LogStore, we simply write a new complete record with the data
|
||||
# The log_version timestamp will ensure this is treated as the latest version
|
||||
self.save(execution)
|
||||
logger.debug(
|
||||
"save_execution_data: no-op for LogStore (data already saved by save()): id=%s, node_execution_id=%s",
|
||||
execution.id,
|
||||
execution.node_execution_id,
|
||||
)
|
||||
# No-op for LogStore: save() already writes all fields including inputs, process_data, and outputs
|
||||
# Calling save() again would create a duplicate record in the append-only LogStore
|
||||
|
||||
# Dual-write to SQL database if enabled (for safe migration)
|
||||
if self._enable_dual_write:
|
||||
try:
|
||||
self.sql_repository.save_execution_data(execution)
|
||||
logger.debug("Dual-write: saved node execution data to SQL database: id=%s", execution.id)
|
||||
except Exception:
|
||||
logger.exception("Failed to dual-write node execution data to SQL database: id=%s", execution.id)
|
||||
# Don't raise - LogStore write succeeded, SQL is just a backup
|
||||
|
||||
def get_by_workflow_run(
|
||||
self,
|
||||
@@ -292,8 +312,8 @@ class LogstoreWorkflowNodeExecutionRepository(WorkflowNodeExecutionRepository):
|
||||
) -> Sequence[WorkflowNodeExecution]:
|
||||
"""
|
||||
Retrieve all NodeExecution instances for a specific workflow run.
|
||||
Uses LogStore SQL query with finished_at IS NOT NULL filter for deduplication.
|
||||
This ensures we only get the final version of each node execution.
|
||||
Uses LogStore SQL query with window function to get the latest version of each node execution.
|
||||
This ensures we only get the most recent version of each node execution record.
|
||||
Args:
|
||||
workflow_run_id: The workflow run ID
|
||||
order_config: Optional configuration for ordering results
|
||||
@@ -304,16 +324,19 @@ class LogstoreWorkflowNodeExecutionRepository(WorkflowNodeExecutionRepository):
|
||||
A list of NodeExecution instances
|
||||
|
||||
Note:
|
||||
This method filters by finished_at IS NOT NULL to avoid duplicates from
|
||||
version updates. For complete history including intermediate states,
|
||||
a different query strategy would be needed.
|
||||
This method uses ROW_NUMBER() window function partitioned by node_execution_id
|
||||
to get the latest version (highest log_version) of each node execution.
|
||||
"""
|
||||
logger.debug("get_by_workflow_run: workflow_run_id=%s, order_config=%s", workflow_run_id, order_config)
|
||||
# Build SQL query with deduplication using finished_at IS NOT NULL
|
||||
# This optimization avoids window functions for common case where we only
|
||||
# want the final state of each node execution
|
||||
# Build SQL query with deduplication using window function
|
||||
# ROW_NUMBER() OVER (PARTITION BY node_execution_id ORDER BY log_version DESC)
|
||||
# ensures we get the latest version of each node execution
|
||||
|
||||
# Build ORDER BY clause
|
||||
# Escape parameters to prevent SQL injection
|
||||
escaped_workflow_run_id = escape_identifier(workflow_run_id)
|
||||
escaped_tenant_id = escape_identifier(self._tenant_id)
|
||||
|
||||
# Build ORDER BY clause for outer query
|
||||
order_clause = ""
|
||||
if order_config and order_config.order_by:
|
||||
order_fields = []
|
||||
@@ -327,16 +350,23 @@ class LogstoreWorkflowNodeExecutionRepository(WorkflowNodeExecutionRepository):
|
||||
if order_fields:
|
||||
order_clause = "ORDER BY " + ", ".join(order_fields)
|
||||
|
||||
sql = f"""
|
||||
SELECT *
|
||||
FROM {AliyunLogStore.workflow_node_execution_logstore}
|
||||
WHERE workflow_run_id='{workflow_run_id}'
|
||||
AND tenant_id='{self._tenant_id}'
|
||||
AND finished_at IS NOT NULL
|
||||
"""
|
||||
|
||||
# Build app_id filter for subquery
|
||||
app_id_filter = ""
|
||||
if self._app_id:
|
||||
sql += f" AND app_id='{self._app_id}'"
|
||||
escaped_app_id = escape_identifier(self._app_id)
|
||||
app_id_filter = f" AND app_id='{escaped_app_id}'"
|
||||
|
||||
# Use window function to get latest version of each node execution
|
||||
sql = f"""
|
||||
SELECT * FROM (
|
||||
SELECT *, ROW_NUMBER() OVER (PARTITION BY node_execution_id ORDER BY log_version DESC) AS rn
|
||||
FROM {AliyunLogStore.workflow_node_execution_logstore}
|
||||
WHERE workflow_run_id='{escaped_workflow_run_id}'
|
||||
AND tenant_id='{escaped_tenant_id}'
|
||||
{app_id_filter}
|
||||
) t
|
||||
WHERE rn = 1
|
||||
"""
|
||||
|
||||
if order_clause:
|
||||
sql += f" {order_clause}"
|
||||
|
||||
134
api/extensions/logstore/sql_escape.py
Normal file
134
api/extensions/logstore/sql_escape.py
Normal file
@@ -0,0 +1,134 @@
|
||||
"""
|
||||
SQL Escape Utility for LogStore Queries
|
||||
|
||||
This module provides escaping utilities to prevent injection attacks in LogStore queries.
|
||||
|
||||
LogStore supports two query modes:
|
||||
1. PG Protocol Mode: Uses SQL syntax with single quotes for strings
|
||||
2. SDK Mode: Uses LogStore query syntax (key: value) with double quotes
|
||||
|
||||
Key Security Concerns:
|
||||
- Prevent tenant A from accessing tenant B's data via injection
|
||||
- SLS queries are read-only, so we focus on data access control
|
||||
- Different escaping strategies for SQL vs LogStore query syntax
|
||||
"""
|
||||
|
||||
|
||||
def escape_sql_string(value: str) -> str:
|
||||
"""
|
||||
Escape a string value for safe use in SQL queries.
|
||||
|
||||
This function escapes single quotes by doubling them, which is the standard
|
||||
SQL escaping method. This prevents SQL injection by ensuring that user input
|
||||
cannot break out of string literals.
|
||||
|
||||
Args:
|
||||
value: The string value to escape
|
||||
|
||||
Returns:
|
||||
Escaped string safe for use in SQL queries
|
||||
|
||||
Examples:
|
||||
>>> escape_sql_string("normal_value")
|
||||
"normal_value"
|
||||
>>> escape_sql_string("value' OR '1'='1")
|
||||
"value'' OR ''1''=''1"
|
||||
>>> escape_sql_string("tenant's_id")
|
||||
"tenant''s_id"
|
||||
|
||||
Security:
|
||||
- Prevents breaking out of string literals
|
||||
- Stops injection attacks like: ' OR '1'='1
|
||||
- Protects against cross-tenant data access
|
||||
"""
|
||||
if not value:
|
||||
return value
|
||||
|
||||
# Escape single quotes by doubling them (standard SQL escaping)
|
||||
# This prevents breaking out of string literals in SQL queries
|
||||
return value.replace("'", "''")
|
||||
|
||||
|
||||
def escape_identifier(value: str) -> str:
|
||||
"""
|
||||
Escape an identifier (tenant_id, app_id, run_id, etc.) for safe SQL use.
|
||||
|
||||
This function is for PG protocol mode (SQL syntax).
|
||||
For SDK mode, use escape_logstore_query_value() instead.
|
||||
|
||||
Args:
|
||||
value: The identifier value to escape
|
||||
|
||||
Returns:
|
||||
Escaped identifier safe for use in SQL queries
|
||||
|
||||
Examples:
|
||||
>>> escape_identifier("550e8400-e29b-41d4-a716-446655440000")
|
||||
"550e8400-e29b-41d4-a716-446655440000"
|
||||
>>> escape_identifier("tenant_id' OR '1'='1")
|
||||
"tenant_id'' OR ''1''=''1"
|
||||
|
||||
Security:
|
||||
- Prevents SQL injection via identifiers
|
||||
- Stops cross-tenant access attempts
|
||||
- Works for UUIDs, alphanumeric IDs, and similar identifiers
|
||||
"""
|
||||
# For identifiers, use the same escaping as strings
|
||||
# This is simple and effective for preventing injection
|
||||
return escape_sql_string(value)
|
||||
|
||||
|
||||
def escape_logstore_query_value(value: str) -> str:
|
||||
"""
|
||||
Escape value for LogStore query syntax (SDK mode).
|
||||
|
||||
LogStore query syntax rules:
|
||||
1. Keywords (and/or/not) are case-insensitive
|
||||
2. Single quotes are ordinary characters (no special meaning)
|
||||
3. Double quotes wrap values: key:"value"
|
||||
4. Backslash is the escape character:
|
||||
- \" for double quote inside value
|
||||
- \\ for backslash itself
|
||||
5. Parentheses can change query structure
|
||||
|
||||
To prevent injection:
|
||||
- Wrap value in double quotes to treat special chars as literals
|
||||
- Escape backslashes and double quotes using backslash
|
||||
|
||||
Args:
|
||||
value: The value to escape for LogStore query syntax
|
||||
|
||||
Returns:
|
||||
Quoted and escaped value safe for LogStore query syntax (includes the quotes)
|
||||
|
||||
Examples:
|
||||
>>> escape_logstore_query_value("normal_value")
|
||||
'"normal_value"'
|
||||
>>> escape_logstore_query_value("value or field:evil")
|
||||
'"value or field:evil"' # 'or' and ':' are now literals
|
||||
>>> escape_logstore_query_value('value"test')
|
||||
'"value\\"test"' # Internal double quote escaped
|
||||
>>> escape_logstore_query_value('value\\test')
|
||||
'"value\\\\test"' # Backslash escaped
|
||||
|
||||
Security:
|
||||
- Prevents injection via and/or/not keywords
|
||||
- Prevents injection via colons (:)
|
||||
- Prevents injection via parentheses
|
||||
- Protects against cross-tenant data access
|
||||
|
||||
Note:
|
||||
Escape order is critical: backslash first, then double quotes.
|
||||
Otherwise, we'd double-escape the escape character itself.
|
||||
"""
|
||||
if not value:
|
||||
return '""'
|
||||
|
||||
# IMPORTANT: Escape backslashes FIRST, then double quotes
|
||||
# This prevents double-escaping (e.g., " -> \" -> \\" incorrectly)
|
||||
escaped = value.replace("\\", "\\\\") # \ -> \\
|
||||
escaped = escaped.replace('"', '\\"') # " -> \"
|
||||
|
||||
# Wrap in double quotes to treat as literal string
|
||||
# This prevents and/or/not/:/() from being interpreted as operators
|
||||
return f'"{escaped}"'
|
||||
@@ -115,7 +115,18 @@ def build_from_mappings(
|
||||
# TODO(QuantumGhost): Performance concern - each mapping triggers a separate database query.
|
||||
# Implement batch processing to reduce database load when handling multiple files.
|
||||
# Filter out None/empty mappings to avoid errors
|
||||
valid_mappings = [m for m in mappings if m and m.get("transfer_method")]
|
||||
def is_valid_mapping(m: Mapping[str, Any]) -> bool:
|
||||
if not m or not m.get("transfer_method"):
|
||||
return False
|
||||
# For REMOTE_URL transfer method, ensure url or remote_url is provided and not None
|
||||
transfer_method = m.get("transfer_method")
|
||||
if transfer_method == FileTransferMethod.REMOTE_URL:
|
||||
url = m.get("url") or m.get("remote_url")
|
||||
if not url:
|
||||
return False
|
||||
return True
|
||||
|
||||
valid_mappings = [m for m in mappings if is_valid_mapping(m)]
|
||||
files = [
|
||||
build_from_mapping(
|
||||
mapping=mapping,
|
||||
|
||||
@@ -38,7 +38,7 @@ from core.variables.variables import (
|
||||
ObjectVariable,
|
||||
SecretVariable,
|
||||
StringVariable,
|
||||
Variable,
|
||||
VariableBase,
|
||||
)
|
||||
from core.workflow.constants import (
|
||||
CONVERSATION_VARIABLE_NODE_ID,
|
||||
@@ -72,25 +72,25 @@ SEGMENT_TO_VARIABLE_MAP = {
|
||||
}
|
||||
|
||||
|
||||
def build_conversation_variable_from_mapping(mapping: Mapping[str, Any], /) -> Variable:
|
||||
def build_conversation_variable_from_mapping(mapping: Mapping[str, Any], /) -> VariableBase:
|
||||
if not mapping.get("name"):
|
||||
raise VariableError("missing name")
|
||||
return _build_variable_from_mapping(mapping=mapping, selector=[CONVERSATION_VARIABLE_NODE_ID, mapping["name"]])
|
||||
|
||||
|
||||
def build_environment_variable_from_mapping(mapping: Mapping[str, Any], /) -> Variable:
|
||||
def build_environment_variable_from_mapping(mapping: Mapping[str, Any], /) -> VariableBase:
|
||||
if not mapping.get("name"):
|
||||
raise VariableError("missing name")
|
||||
return _build_variable_from_mapping(mapping=mapping, selector=[ENVIRONMENT_VARIABLE_NODE_ID, mapping["name"]])
|
||||
|
||||
|
||||
def build_pipeline_variable_from_mapping(mapping: Mapping[str, Any], /) -> Variable:
|
||||
def build_pipeline_variable_from_mapping(mapping: Mapping[str, Any], /) -> VariableBase:
|
||||
if not mapping.get("variable"):
|
||||
raise VariableError("missing variable")
|
||||
return mapping["variable"]
|
||||
|
||||
|
||||
def _build_variable_from_mapping(*, mapping: Mapping[str, Any], selector: Sequence[str]) -> Variable:
|
||||
def _build_variable_from_mapping(*, mapping: Mapping[str, Any], selector: Sequence[str]) -> VariableBase:
|
||||
"""
|
||||
This factory function is used to create the environment variable or the conversation variable,
|
||||
not support the File type.
|
||||
@@ -100,7 +100,7 @@ def _build_variable_from_mapping(*, mapping: Mapping[str, Any], selector: Sequen
|
||||
if (value := mapping.get("value")) is None:
|
||||
raise VariableError("missing value")
|
||||
|
||||
result: Variable
|
||||
result: VariableBase
|
||||
match value_type:
|
||||
case SegmentType.STRING:
|
||||
result = StringVariable.model_validate(mapping)
|
||||
@@ -134,7 +134,7 @@ def _build_variable_from_mapping(*, mapping: Mapping[str, Any], selector: Sequen
|
||||
raise VariableError(f"variable size {result.size} exceeds limit {dify_config.MAX_VARIABLE_SIZE}")
|
||||
if not result.selector:
|
||||
result = result.model_copy(update={"selector": selector})
|
||||
return cast(Variable, result)
|
||||
return cast(VariableBase, result)
|
||||
|
||||
|
||||
def build_segment(value: Any, /) -> Segment:
|
||||
@@ -285,8 +285,8 @@ def segment_to_variable(
|
||||
id: str | None = None,
|
||||
name: str | None = None,
|
||||
description: str = "",
|
||||
) -> Variable:
|
||||
if isinstance(segment, Variable):
|
||||
) -> VariableBase:
|
||||
if isinstance(segment, VariableBase):
|
||||
return segment
|
||||
name = name or selector[-1]
|
||||
id = id or str(uuid4())
|
||||
@@ -297,7 +297,7 @@ def segment_to_variable(
|
||||
|
||||
variable_class = SEGMENT_TO_VARIABLE_MAP[segment_type]
|
||||
return cast(
|
||||
Variable,
|
||||
VariableBase,
|
||||
variable_class(
|
||||
id=id,
|
||||
name=name,
|
||||
|
||||
@@ -2,6 +2,7 @@ from __future__ import annotations
|
||||
|
||||
from datetime import datetime
|
||||
from typing import TypeAlias
|
||||
from uuid import uuid4
|
||||
|
||||
from pydantic import BaseModel, ConfigDict, Field, field_validator
|
||||
|
||||
@@ -20,8 +21,8 @@ class SimpleFeedback(ResponseModel):
|
||||
|
||||
|
||||
class RetrieverResource(ResponseModel):
|
||||
id: str
|
||||
message_id: str
|
||||
id: str = Field(default_factory=lambda: str(uuid4()))
|
||||
message_id: str = Field(default_factory=lambda: str(uuid4()))
|
||||
position: int
|
||||
dataset_id: str | None = None
|
||||
dataset_name: str | None = None
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
from flask_restx import fields
|
||||
|
||||
from core.helper import encrypter
|
||||
from core.variables import SecretVariable, SegmentType, Variable
|
||||
from core.variables import SecretVariable, SegmentType, VariableBase
|
||||
from fields.member_fields import simple_account_fields
|
||||
from libs.helper import TimestampField
|
||||
|
||||
@@ -21,7 +21,7 @@ class EnvironmentVariableField(fields.Raw):
|
||||
"value_type": value.value_type.value,
|
||||
"description": value.description,
|
||||
}
|
||||
if isinstance(value, Variable):
|
||||
if isinstance(value, VariableBase):
|
||||
return {
|
||||
"id": value.id,
|
||||
"name": value.name,
|
||||
|
||||
@@ -3,6 +3,8 @@ import smtplib
|
||||
from email.mime.multipart import MIMEMultipart
|
||||
from email.mime.text import MIMEText
|
||||
|
||||
from configs import dify_config
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@@ -19,20 +21,21 @@ class SMTPClient:
|
||||
self.opportunistic_tls = opportunistic_tls
|
||||
|
||||
def send(self, mail: dict):
|
||||
smtp = None
|
||||
smtp: smtplib.SMTP | None = None
|
||||
local_host = dify_config.SMTP_LOCAL_HOSTNAME
|
||||
try:
|
||||
if self.use_tls:
|
||||
if self.opportunistic_tls:
|
||||
smtp = smtplib.SMTP(self.server, self.port, timeout=10)
|
||||
# Send EHLO command with the HELO domain name as the server address
|
||||
smtp.ehlo(self.server)
|
||||
smtp.starttls()
|
||||
# Resend EHLO command to identify the TLS session
|
||||
smtp.ehlo(self.server)
|
||||
else:
|
||||
smtp = smtplib.SMTP_SSL(self.server, self.port, timeout=10)
|
||||
if self.use_tls and not self.opportunistic_tls:
|
||||
# SMTP with SSL (implicit TLS)
|
||||
smtp = smtplib.SMTP_SSL(self.server, self.port, timeout=10, local_hostname=local_host)
|
||||
else:
|
||||
smtp = smtplib.SMTP(self.server, self.port, timeout=10)
|
||||
# Plain SMTP or SMTP with STARTTLS (explicit TLS)
|
||||
smtp = smtplib.SMTP(self.server, self.port, timeout=10, local_hostname=local_host)
|
||||
|
||||
assert smtp is not None
|
||||
if self.use_tls and self.opportunistic_tls:
|
||||
smtp.ehlo(self.server)
|
||||
smtp.starttls()
|
||||
smtp.ehlo(self.server)
|
||||
|
||||
# Only authenticate if both username and password are non-empty
|
||||
if self.username and self.password and self.username.strip() and self.password.strip():
|
||||
|
||||
@@ -0,0 +1,30 @@
|
||||
"""add workflow_run_created_at_id_idx
|
||||
|
||||
Revision ID: 905527cc8fd3
|
||||
Revises: 7df29de0f6be
|
||||
Create Date: 2025-01-09 16:30:02.462084
|
||||
|
||||
"""
|
||||
from alembic import op
|
||||
import models as models
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = '905527cc8fd3'
|
||||
down_revision = '7df29de0f6be'
|
||||
branch_labels = None
|
||||
depends_on = None
|
||||
|
||||
|
||||
def upgrade():
|
||||
# ### commands auto generated by Alembic - please adjust! ###
|
||||
with op.batch_alter_table('workflow_runs', schema=None) as batch_op:
|
||||
batch_op.create_index('workflow_run_created_at_id_idx', ['created_at', 'id'], unique=False)
|
||||
|
||||
# ### end Alembic commands ###
|
||||
|
||||
|
||||
def downgrade():
|
||||
# ### commands auto generated by Alembic - please adjust! ###
|
||||
with op.batch_alter_table('workflow_runs', schema=None) as batch_op:
|
||||
batch_op.drop_index('workflow_run_created_at_id_idx')
|
||||
# ### end Alembic commands ###
|
||||
@@ -0,0 +1,33 @@
|
||||
"""feat: add created_at id index to messages
|
||||
|
||||
Revision ID: 3334862ee907
|
||||
Revises: 905527cc8fd3
|
||||
Create Date: 2026-01-12 17:29:44.846544
|
||||
|
||||
"""
|
||||
from alembic import op
|
||||
import models as models
|
||||
import sqlalchemy as sa
|
||||
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = '3334862ee907'
|
||||
down_revision = '905527cc8fd3'
|
||||
branch_labels = None
|
||||
depends_on = None
|
||||
|
||||
|
||||
def upgrade():
|
||||
# ### commands auto generated by Alembic - please adjust! ###
|
||||
with op.batch_alter_table('messages', schema=None) as batch_op:
|
||||
batch_op.create_index('message_created_at_id_idx', ['created_at', 'id'], unique=False)
|
||||
|
||||
# ### end Alembic commands ###
|
||||
|
||||
|
||||
def downgrade():
|
||||
# ### commands auto generated by Alembic - please adjust! ###
|
||||
with op.batch_alter_table('messages', schema=None) as batch_op:
|
||||
batch_op.drop_index('message_created_at_id_idx')
|
||||
|
||||
# ### end Alembic commands ###
|
||||
@@ -1149,7 +1149,7 @@ class DatasetCollectionBinding(TypeBase):
|
||||
)
|
||||
|
||||
|
||||
class TidbAuthBinding(Base):
|
||||
class TidbAuthBinding(TypeBase):
|
||||
__tablename__ = "tidb_auth_bindings"
|
||||
__table_args__ = (
|
||||
sa.PrimaryKeyConstraint("id", name="tidb_auth_bindings_pkey"),
|
||||
@@ -1158,7 +1158,13 @@ class TidbAuthBinding(Base):
|
||||
sa.Index("tidb_auth_bindings_created_at_idx", "created_at"),
|
||||
sa.Index("tidb_auth_bindings_status_idx", "status"),
|
||||
)
|
||||
id: Mapped[str] = mapped_column(StringUUID, primary_key=True, default=lambda: str(uuid4()))
|
||||
id: Mapped[str] = mapped_column(
|
||||
StringUUID,
|
||||
primary_key=True,
|
||||
insert_default=lambda: str(uuid4()),
|
||||
default_factory=lambda: str(uuid4()),
|
||||
init=False,
|
||||
)
|
||||
tenant_id: Mapped[str | None] = mapped_column(StringUUID, nullable=True)
|
||||
cluster_id: Mapped[str] = mapped_column(String(255), nullable=False)
|
||||
cluster_name: Mapped[str] = mapped_column(String(255), nullable=False)
|
||||
@@ -1166,7 +1172,9 @@ class TidbAuthBinding(Base):
|
||||
status: Mapped[str] = mapped_column(sa.String(255), nullable=False, server_default=sa.text("'CREATING'"))
|
||||
account: Mapped[str] = mapped_column(String(255), nullable=False)
|
||||
password: Mapped[str] = mapped_column(String(255), nullable=False)
|
||||
created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, server_default=func.current_timestamp())
|
||||
created_at: Mapped[datetime] = mapped_column(
|
||||
DateTime, nullable=False, server_default=func.current_timestamp(), init=False
|
||||
)
|
||||
|
||||
|
||||
class Whitelist(TypeBase):
|
||||
|
||||
@@ -968,6 +968,7 @@ class Message(Base):
|
||||
Index("message_workflow_run_id_idx", "conversation_id", "workflow_run_id"),
|
||||
Index("message_created_at_idx", "created_at"),
|
||||
Index("message_app_mode_idx", "app_mode"),
|
||||
Index("message_created_at_id_idx", "created_at", "id"),
|
||||
)
|
||||
|
||||
id: Mapped[str] = mapped_column(StringUUID, default=lambda: str(uuid4()))
|
||||
@@ -1447,7 +1448,7 @@ class MessageAnnotation(Base):
|
||||
return account
|
||||
|
||||
|
||||
class AppAnnotationHitHistory(Base):
|
||||
class AppAnnotationHitHistory(TypeBase):
|
||||
__tablename__ = "app_annotation_hit_histories"
|
||||
__table_args__ = (
|
||||
sa.PrimaryKeyConstraint("id", name="app_annotation_hit_histories_pkey"),
|
||||
@@ -1457,17 +1458,19 @@ class AppAnnotationHitHistory(Base):
|
||||
sa.Index("app_annotation_hit_histories_message_idx", "message_id"),
|
||||
)
|
||||
|
||||
id = mapped_column(StringUUID, default=lambda: str(uuid4()))
|
||||
app_id = mapped_column(StringUUID, nullable=False)
|
||||
id: Mapped[str] = mapped_column(StringUUID, default=lambda: str(uuid4()), init=False)
|
||||
app_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
|
||||
annotation_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
|
||||
source = mapped_column(LongText, nullable=False)
|
||||
question = mapped_column(LongText, nullable=False)
|
||||
account_id = mapped_column(StringUUID, nullable=False)
|
||||
created_at = mapped_column(sa.DateTime, nullable=False, server_default=func.current_timestamp())
|
||||
score = mapped_column(Float, nullable=False, server_default=sa.text("0"))
|
||||
message_id = mapped_column(StringUUID, nullable=False)
|
||||
annotation_question = mapped_column(LongText, nullable=False)
|
||||
annotation_content = mapped_column(LongText, nullable=False)
|
||||
source: Mapped[str] = mapped_column(LongText, nullable=False)
|
||||
question: Mapped[str] = mapped_column(LongText, nullable=False)
|
||||
account_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
|
||||
created_at: Mapped[datetime] = mapped_column(
|
||||
sa.DateTime, nullable=False, server_default=func.current_timestamp(), init=False
|
||||
)
|
||||
score: Mapped[float] = mapped_column(Float, nullable=False, server_default=sa.text("0"))
|
||||
message_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
|
||||
annotation_question: Mapped[str] = mapped_column(LongText, nullable=False)
|
||||
annotation_content: Mapped[str] = mapped_column(LongText, nullable=False)
|
||||
|
||||
@property
|
||||
def account(self):
|
||||
@@ -1843,7 +1846,7 @@ class MessageChain(TypeBase):
|
||||
)
|
||||
|
||||
|
||||
class MessageAgentThought(Base):
|
||||
class MessageAgentThought(TypeBase):
|
||||
__tablename__ = "message_agent_thoughts"
|
||||
__table_args__ = (
|
||||
sa.PrimaryKeyConstraint("id", name="message_agent_thought_pkey"),
|
||||
@@ -1851,34 +1854,42 @@ class MessageAgentThought(Base):
|
||||
sa.Index("message_agent_thought_message_chain_id_idx", "message_chain_id"),
|
||||
)
|
||||
|
||||
id = mapped_column(StringUUID, default=lambda: str(uuid4()))
|
||||
message_id = mapped_column(StringUUID, nullable=False)
|
||||
message_chain_id = mapped_column(StringUUID, nullable=True)
|
||||
id: Mapped[str] = mapped_column(
|
||||
StringUUID, insert_default=lambda: str(uuid4()), default_factory=lambda: str(uuid4()), init=False
|
||||
)
|
||||
message_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
|
||||
position: Mapped[int] = mapped_column(sa.Integer, nullable=False)
|
||||
thought = mapped_column(LongText, nullable=True)
|
||||
tool = mapped_column(LongText, nullable=True)
|
||||
tool_labels_str = mapped_column(LongText, nullable=False, default=sa.text("'{}'"))
|
||||
tool_meta_str = mapped_column(LongText, nullable=False, default=sa.text("'{}'"))
|
||||
tool_input = mapped_column(LongText, nullable=True)
|
||||
observation = mapped_column(LongText, nullable=True)
|
||||
created_by_role: Mapped[str] = mapped_column(String(255), nullable=False)
|
||||
created_by: Mapped[str] = mapped_column(StringUUID, nullable=False)
|
||||
message_chain_id: Mapped[str | None] = mapped_column(StringUUID, nullable=True, default=None)
|
||||
thought: Mapped[str | None] = mapped_column(LongText, nullable=True, default=None)
|
||||
tool: Mapped[str | None] = mapped_column(LongText, nullable=True, default=None)
|
||||
tool_labels_str: Mapped[str] = mapped_column(LongText, nullable=False, default=sa.text("'{}'"))
|
||||
tool_meta_str: Mapped[str] = mapped_column(LongText, nullable=False, default=sa.text("'{}'"))
|
||||
tool_input: Mapped[str | None] = mapped_column(LongText, nullable=True, default=None)
|
||||
observation: Mapped[str | None] = mapped_column(LongText, nullable=True, default=None)
|
||||
# plugin_id = mapped_column(StringUUID, nullable=True) ## for future design
|
||||
tool_process_data = mapped_column(LongText, nullable=True)
|
||||
message = mapped_column(LongText, nullable=True)
|
||||
message_token: Mapped[int | None] = mapped_column(sa.Integer, nullable=True)
|
||||
message_unit_price = mapped_column(sa.Numeric, nullable=True)
|
||||
message_price_unit = mapped_column(sa.Numeric(10, 7), nullable=False, server_default=sa.text("0.001"))
|
||||
message_files = mapped_column(LongText, nullable=True)
|
||||
answer = mapped_column(LongText, nullable=True)
|
||||
answer_token: Mapped[int | None] = mapped_column(sa.Integer, nullable=True)
|
||||
answer_unit_price = mapped_column(sa.Numeric, nullable=True)
|
||||
answer_price_unit = mapped_column(sa.Numeric(10, 7), nullable=False, server_default=sa.text("0.001"))
|
||||
tokens: Mapped[int | None] = mapped_column(sa.Integer, nullable=True)
|
||||
total_price = mapped_column(sa.Numeric, nullable=True)
|
||||
currency = mapped_column(String(255), nullable=True)
|
||||
latency: Mapped[float | None] = mapped_column(sa.Float, nullable=True)
|
||||
created_by_role = mapped_column(String(255), nullable=False)
|
||||
created_by = mapped_column(StringUUID, nullable=False)
|
||||
created_at = mapped_column(sa.DateTime, nullable=False, server_default=sa.func.current_timestamp())
|
||||
tool_process_data: Mapped[str | None] = mapped_column(LongText, nullable=True, default=None)
|
||||
message: Mapped[str | None] = mapped_column(LongText, nullable=True, default=None)
|
||||
message_token: Mapped[int | None] = mapped_column(sa.Integer, nullable=True, default=None)
|
||||
message_unit_price: Mapped[Decimal | None] = mapped_column(sa.Numeric, nullable=True, default=None)
|
||||
message_price_unit: Mapped[Decimal] = mapped_column(
|
||||
sa.Numeric(10, 7), nullable=False, default=Decimal("0.001"), server_default=sa.text("0.001")
|
||||
)
|
||||
message_files: Mapped[str | None] = mapped_column(LongText, nullable=True, default=None)
|
||||
answer: Mapped[str | None] = mapped_column(LongText, nullable=True, default=None)
|
||||
answer_token: Mapped[int | None] = mapped_column(sa.Integer, nullable=True, default=None)
|
||||
answer_unit_price: Mapped[Decimal | None] = mapped_column(sa.Numeric, nullable=True, default=None)
|
||||
answer_price_unit: Mapped[Decimal] = mapped_column(
|
||||
sa.Numeric(10, 7), nullable=False, default=Decimal("0.001"), server_default=sa.text("0.001")
|
||||
)
|
||||
tokens: Mapped[int | None] = mapped_column(sa.Integer, nullable=True, default=None)
|
||||
total_price: Mapped[Decimal | None] = mapped_column(sa.Numeric, nullable=True, default=None)
|
||||
currency: Mapped[str | None] = mapped_column(String(255), nullable=True, default=None)
|
||||
latency: Mapped[float | None] = mapped_column(sa.Float, nullable=True, default=None)
|
||||
created_at: Mapped[datetime] = mapped_column(
|
||||
sa.DateTime, nullable=False, init=False, server_default=sa.func.current_timestamp()
|
||||
)
|
||||
|
||||
@property
|
||||
def files(self) -> list[Any]:
|
||||
@@ -2075,7 +2086,7 @@ class TraceAppConfig(TypeBase):
|
||||
}
|
||||
|
||||
|
||||
class TenantCreditPool(Base):
|
||||
class TenantCreditPool(TypeBase):
|
||||
__tablename__ = "tenant_credit_pools"
|
||||
__table_args__ = (
|
||||
sa.PrimaryKeyConstraint("id", name="tenant_credit_pool_pkey"),
|
||||
@@ -2083,14 +2094,20 @@ class TenantCreditPool(Base):
|
||||
sa.Index("tenant_credit_pool_pool_type_idx", "pool_type"),
|
||||
)
|
||||
|
||||
id = mapped_column(StringUUID, primary_key=True, server_default=text("uuid_generate_v4()"))
|
||||
tenant_id = mapped_column(StringUUID, nullable=False)
|
||||
pool_type = mapped_column(String(40), nullable=False, default="trial", server_default="trial")
|
||||
quota_limit = mapped_column(BigInteger, nullable=False, default=0)
|
||||
quota_used = mapped_column(BigInteger, nullable=False, default=0)
|
||||
created_at = mapped_column(sa.DateTime, nullable=False, server_default=text("CURRENT_TIMESTAMP"))
|
||||
updated_at = mapped_column(
|
||||
sa.DateTime, nullable=False, server_default=func.current_timestamp(), onupdate=func.current_timestamp()
|
||||
id: Mapped[str] = mapped_column(StringUUID, primary_key=True, server_default=text("uuid_generate_v4()"), init=False)
|
||||
tenant_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
|
||||
pool_type: Mapped[str] = mapped_column(String(40), nullable=False, default="trial", server_default="trial")
|
||||
quota_limit: Mapped[int] = mapped_column(BigInteger, nullable=False, default=0)
|
||||
quota_used: Mapped[int] = mapped_column(BigInteger, nullable=False, default=0)
|
||||
created_at: Mapped[datetime] = mapped_column(
|
||||
sa.DateTime, nullable=False, server_default=text("CURRENT_TIMESTAMP"), init=False
|
||||
)
|
||||
updated_at: Mapped[datetime] = mapped_column(
|
||||
sa.DateTime,
|
||||
nullable=False,
|
||||
server_default=func.current_timestamp(),
|
||||
onupdate=func.current_timestamp(),
|
||||
init=False,
|
||||
)
|
||||
|
||||
@property
|
||||
|
||||
@@ -1,11 +1,9 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
from collections.abc import Generator, Mapping, Sequence
|
||||
from datetime import datetime
|
||||
from enum import StrEnum
|
||||
from typing import TYPE_CHECKING, Any, Union, cast
|
||||
from typing import TYPE_CHECKING, Any, Optional, Union, cast
|
||||
from uuid import uuid4
|
||||
|
||||
import sqlalchemy as sa
|
||||
@@ -46,7 +44,7 @@ if TYPE_CHECKING:
|
||||
|
||||
from constants import DEFAULT_FILE_NUMBER_LIMITS, HIDDEN_VALUE
|
||||
from core.helper import encrypter
|
||||
from core.variables import SecretVariable, Segment, SegmentType, Variable
|
||||
from core.variables import SecretVariable, Segment, SegmentType, VariableBase
|
||||
from factories import variable_factory
|
||||
from libs import helper
|
||||
|
||||
@@ -69,7 +67,7 @@ class WorkflowType(StrEnum):
|
||||
RAG_PIPELINE = "rag-pipeline"
|
||||
|
||||
@classmethod
|
||||
def value_of(cls, value: str) -> WorkflowType:
|
||||
def value_of(cls, value: str) -> "WorkflowType":
|
||||
"""
|
||||
Get value of given mode.
|
||||
|
||||
@@ -82,7 +80,7 @@ class WorkflowType(StrEnum):
|
||||
raise ValueError(f"invalid workflow type value {value}")
|
||||
|
||||
@classmethod
|
||||
def from_app_mode(cls, app_mode: Union[str, AppMode]) -> WorkflowType:
|
||||
def from_app_mode(cls, app_mode: Union[str, "AppMode"]) -> "WorkflowType":
|
||||
"""
|
||||
Get workflow type from app mode.
|
||||
|
||||
@@ -178,12 +176,12 @@ class Workflow(Base): # bug
|
||||
graph: str,
|
||||
features: str,
|
||||
created_by: str,
|
||||
environment_variables: Sequence[Variable],
|
||||
conversation_variables: Sequence[Variable],
|
||||
environment_variables: Sequence[VariableBase],
|
||||
conversation_variables: Sequence[VariableBase],
|
||||
rag_pipeline_variables: list[dict],
|
||||
marked_name: str = "",
|
||||
marked_comment: str = "",
|
||||
) -> Workflow:
|
||||
) -> "Workflow":
|
||||
workflow = Workflow()
|
||||
workflow.id = str(uuid4())
|
||||
workflow.tenant_id = tenant_id
|
||||
@@ -447,7 +445,7 @@ class Workflow(Base): # bug
|
||||
|
||||
# decrypt secret variables value
|
||||
def decrypt_func(
|
||||
var: Variable,
|
||||
var: VariableBase,
|
||||
) -> StringVariable | IntegerVariable | FloatVariable | SecretVariable:
|
||||
if isinstance(var, SecretVariable):
|
||||
return var.model_copy(update={"value": encrypter.decrypt_token(tenant_id=tenant_id, token=var.value)})
|
||||
@@ -463,7 +461,7 @@ class Workflow(Base): # bug
|
||||
return decrypted_results
|
||||
|
||||
@environment_variables.setter
|
||||
def environment_variables(self, value: Sequence[Variable]):
|
||||
def environment_variables(self, value: Sequence[VariableBase]):
|
||||
if not value:
|
||||
self._environment_variables = "{}"
|
||||
return
|
||||
@@ -487,7 +485,7 @@ class Workflow(Base): # bug
|
||||
value[i] = origin_variables_dictionary[variable.id].model_copy(update={"name": variable.name})
|
||||
|
||||
# encrypt secret variables value
|
||||
def encrypt_func(var: Variable) -> Variable:
|
||||
def encrypt_func(var: VariableBase) -> VariableBase:
|
||||
if isinstance(var, SecretVariable):
|
||||
return var.model_copy(update={"value": encrypter.encrypt_token(tenant_id=tenant_id, token=var.value)})
|
||||
else:
|
||||
@@ -517,7 +515,7 @@ class Workflow(Base): # bug
|
||||
return result
|
||||
|
||||
@property
|
||||
def conversation_variables(self) -> Sequence[Variable]:
|
||||
def conversation_variables(self) -> Sequence[VariableBase]:
|
||||
# TODO: find some way to init `self._conversation_variables` when instance created.
|
||||
if self._conversation_variables is None:
|
||||
self._conversation_variables = "{}"
|
||||
@@ -527,7 +525,7 @@ class Workflow(Base): # bug
|
||||
return results
|
||||
|
||||
@conversation_variables.setter
|
||||
def conversation_variables(self, value: Sequence[Variable]):
|
||||
def conversation_variables(self, value: Sequence[VariableBase]):
|
||||
self._conversation_variables = json.dumps(
|
||||
{var.name: var.model_dump() for var in value},
|
||||
ensure_ascii=False,
|
||||
@@ -597,6 +595,7 @@ class WorkflowRun(Base):
|
||||
__table_args__ = (
|
||||
sa.PrimaryKeyConstraint("id", name="workflow_run_pkey"),
|
||||
sa.Index("workflow_run_triggerd_from_idx", "tenant_id", "app_id", "triggered_from"),
|
||||
sa.Index("workflow_run_created_at_id_idx", "created_at", "id"),
|
||||
)
|
||||
|
||||
id: Mapped[str] = mapped_column(StringUUID, default=lambda: str(uuid4()))
|
||||
@@ -621,7 +620,7 @@ class WorkflowRun(Base):
|
||||
finished_at: Mapped[datetime | None] = mapped_column(DateTime)
|
||||
exceptions_count: Mapped[int] = mapped_column(sa.Integer, server_default=sa.text("0"), nullable=True)
|
||||
|
||||
pause: Mapped[WorkflowPause | None] = orm.relationship(
|
||||
pause: Mapped[Optional["WorkflowPause"]] = orm.relationship(
|
||||
"WorkflowPause",
|
||||
primaryjoin="WorkflowRun.id == foreign(WorkflowPause.workflow_run_id)",
|
||||
uselist=False,
|
||||
@@ -691,7 +690,7 @@ class WorkflowRun(Base):
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, data: dict[str, Any]) -> WorkflowRun:
|
||||
def from_dict(cls, data: dict[str, Any]) -> "WorkflowRun":
|
||||
return cls(
|
||||
id=data.get("id"),
|
||||
tenant_id=data.get("tenant_id"),
|
||||
@@ -843,7 +842,7 @@ class WorkflowNodeExecutionModel(Base): # This model is expected to have `offlo
|
||||
created_by: Mapped[str] = mapped_column(StringUUID)
|
||||
finished_at: Mapped[datetime | None] = mapped_column(DateTime)
|
||||
|
||||
offload_data: Mapped[list[WorkflowNodeExecutionOffload]] = orm.relationship(
|
||||
offload_data: Mapped[list["WorkflowNodeExecutionOffload"]] = orm.relationship(
|
||||
"WorkflowNodeExecutionOffload",
|
||||
primaryjoin="WorkflowNodeExecutionModel.id == foreign(WorkflowNodeExecutionOffload.node_execution_id)",
|
||||
uselist=True,
|
||||
@@ -853,13 +852,13 @@ class WorkflowNodeExecutionModel(Base): # This model is expected to have `offlo
|
||||
|
||||
@staticmethod
|
||||
def preload_offload_data(
|
||||
query: Select[tuple[WorkflowNodeExecutionModel]] | orm.Query[WorkflowNodeExecutionModel],
|
||||
query: Select[tuple["WorkflowNodeExecutionModel"]] | orm.Query["WorkflowNodeExecutionModel"],
|
||||
):
|
||||
return query.options(orm.selectinload(WorkflowNodeExecutionModel.offload_data))
|
||||
|
||||
@staticmethod
|
||||
def preload_offload_data_and_files(
|
||||
query: Select[tuple[WorkflowNodeExecutionModel]] | orm.Query[WorkflowNodeExecutionModel],
|
||||
query: Select[tuple["WorkflowNodeExecutionModel"]] | orm.Query["WorkflowNodeExecutionModel"],
|
||||
):
|
||||
return query.options(
|
||||
orm.selectinload(WorkflowNodeExecutionModel.offload_data).options(
|
||||
@@ -934,7 +933,7 @@ class WorkflowNodeExecutionModel(Base): # This model is expected to have `offlo
|
||||
)
|
||||
return extras
|
||||
|
||||
def _get_offload_by_type(self, type_: ExecutionOffLoadType) -> WorkflowNodeExecutionOffload | None:
|
||||
def _get_offload_by_type(self, type_: ExecutionOffLoadType) -> Optional["WorkflowNodeExecutionOffload"]:
|
||||
return next(iter([i for i in self.offload_data if i.type_ == type_]), None)
|
||||
|
||||
@property
|
||||
@@ -1048,7 +1047,7 @@ class WorkflowNodeExecutionOffload(Base):
|
||||
back_populates="offload_data",
|
||||
)
|
||||
|
||||
file: Mapped[UploadFile | None] = orm.relationship(
|
||||
file: Mapped[Optional["UploadFile"]] = orm.relationship(
|
||||
foreign_keys=[file_id],
|
||||
lazy="raise",
|
||||
uselist=False,
|
||||
@@ -1066,7 +1065,7 @@ class WorkflowAppLogCreatedFrom(StrEnum):
|
||||
INSTALLED_APP = "installed-app"
|
||||
|
||||
@classmethod
|
||||
def value_of(cls, value: str) -> WorkflowAppLogCreatedFrom:
|
||||
def value_of(cls, value: str) -> "WorkflowAppLogCreatedFrom":
|
||||
"""
|
||||
Get value of given mode.
|
||||
|
||||
@@ -1183,7 +1182,7 @@ class ConversationVariable(TypeBase):
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def from_variable(cls, *, app_id: str, conversation_id: str, variable: Variable) -> ConversationVariable:
|
||||
def from_variable(cls, *, app_id: str, conversation_id: str, variable: VariableBase) -> "ConversationVariable":
|
||||
obj = cls(
|
||||
id=variable.id,
|
||||
app_id=app_id,
|
||||
@@ -1192,7 +1191,7 @@ class ConversationVariable(TypeBase):
|
||||
)
|
||||
return obj
|
||||
|
||||
def to_variable(self) -> Variable:
|
||||
def to_variable(self) -> VariableBase:
|
||||
mapping = json.loads(self.data)
|
||||
return variable_factory.build_conversation_variable_from_mapping(mapping)
|
||||
|
||||
@@ -1336,7 +1335,7 @@ class WorkflowDraftVariable(Base):
|
||||
)
|
||||
|
||||
# Relationship to WorkflowDraftVariableFile
|
||||
variable_file: Mapped[WorkflowDraftVariableFile | None] = orm.relationship(
|
||||
variable_file: Mapped[Optional["WorkflowDraftVariableFile"]] = orm.relationship(
|
||||
foreign_keys=[file_id],
|
||||
lazy="raise",
|
||||
uselist=False,
|
||||
@@ -1506,7 +1505,7 @@ class WorkflowDraftVariable(Base):
|
||||
node_execution_id: str | None,
|
||||
description: str = "",
|
||||
file_id: str | None = None,
|
||||
) -> WorkflowDraftVariable:
|
||||
) -> "WorkflowDraftVariable":
|
||||
variable = WorkflowDraftVariable()
|
||||
variable.id = str(uuid4())
|
||||
variable.created_at = naive_utc_now()
|
||||
@@ -1529,7 +1528,7 @@ class WorkflowDraftVariable(Base):
|
||||
name: str,
|
||||
value: Segment,
|
||||
description: str = "",
|
||||
) -> WorkflowDraftVariable:
|
||||
) -> "WorkflowDraftVariable":
|
||||
variable = cls._new(
|
||||
app_id=app_id,
|
||||
node_id=CONVERSATION_VARIABLE_NODE_ID,
|
||||
@@ -1550,7 +1549,7 @@ class WorkflowDraftVariable(Base):
|
||||
value: Segment,
|
||||
node_execution_id: str,
|
||||
editable: bool = False,
|
||||
) -> WorkflowDraftVariable:
|
||||
) -> "WorkflowDraftVariable":
|
||||
variable = cls._new(
|
||||
app_id=app_id,
|
||||
node_id=SYSTEM_VARIABLE_NODE_ID,
|
||||
@@ -1573,7 +1572,7 @@ class WorkflowDraftVariable(Base):
|
||||
visible: bool = True,
|
||||
editable: bool = True,
|
||||
file_id: str | None = None,
|
||||
) -> WorkflowDraftVariable:
|
||||
) -> "WorkflowDraftVariable":
|
||||
variable = cls._new(
|
||||
app_id=app_id,
|
||||
node_id=node_id,
|
||||
@@ -1669,7 +1668,7 @@ class WorkflowDraftVariableFile(Base):
|
||||
)
|
||||
|
||||
# Relationship to UploadFile
|
||||
upload_file: Mapped[UploadFile] = orm.relationship(
|
||||
upload_file: Mapped["UploadFile"] = orm.relationship(
|
||||
foreign_keys=[upload_file_id],
|
||||
lazy="raise",
|
||||
uselist=False,
|
||||
@@ -1736,7 +1735,7 @@ class WorkflowPause(DefaultFieldsMixin, Base):
|
||||
state_object_key: Mapped[str] = mapped_column(String(length=255), nullable=False)
|
||||
|
||||
# Relationship to WorkflowRun
|
||||
workflow_run: Mapped[WorkflowRun] = orm.relationship(
|
||||
workflow_run: Mapped["WorkflowRun"] = orm.relationship(
|
||||
foreign_keys=[workflow_run_id],
|
||||
# require explicit preloading.
|
||||
lazy="raise",
|
||||
@@ -1792,7 +1791,7 @@ class WorkflowPauseReason(DefaultFieldsMixin, Base):
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def from_entity(cls, pause_reason: PauseReason) -> WorkflowPauseReason:
|
||||
def from_entity(cls, pause_reason: PauseReason) -> "WorkflowPauseReason":
|
||||
if isinstance(pause_reason, HumanInputRequired):
|
||||
return cls(
|
||||
type_=PauseReasonType.HUMAN_INPUT_REQUIRED, form_id=pause_reason.form_id, node_id=pause_reason.node_id
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[project]
|
||||
name = "dify-api"
|
||||
version = "1.11.2"
|
||||
version = "1.11.4"
|
||||
requires-python = ">=3.11,<3.13"
|
||||
|
||||
dependencies = [
|
||||
@@ -189,7 +189,7 @@ storage = [
|
||||
"opendal~=0.46.0",
|
||||
"oss2==2.18.5",
|
||||
"supabase~=2.18.1",
|
||||
"tos~=2.7.1",
|
||||
"tos~=2.9.0",
|
||||
]
|
||||
|
||||
############################################################
|
||||
|
||||
@@ -34,11 +34,14 @@ Example:
|
||||
```
|
||||
"""
|
||||
|
||||
from collections.abc import Sequence
|
||||
from collections.abc import Callable, Sequence
|
||||
from datetime import datetime
|
||||
from typing import Protocol
|
||||
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from core.workflow.entities.pause_reason import PauseReason
|
||||
from core.workflow.enums import WorkflowType
|
||||
from core.workflow.repositories.workflow_execution_repository import WorkflowExecutionRepository
|
||||
from libs.infinite_scroll_pagination import InfiniteScrollPagination
|
||||
from models.enums import WorkflowRunTriggeredFrom
|
||||
@@ -253,6 +256,44 @@ class APIWorkflowRunRepository(WorkflowExecutionRepository, Protocol):
|
||||
"""
|
||||
...
|
||||
|
||||
def get_runs_batch_by_time_range(
|
||||
self,
|
||||
start_from: datetime | None,
|
||||
end_before: datetime,
|
||||
last_seen: tuple[datetime, str] | None,
|
||||
batch_size: int,
|
||||
run_types: Sequence[WorkflowType] | None = None,
|
||||
tenant_ids: Sequence[str] | None = None,
|
||||
) -> Sequence[WorkflowRun]:
|
||||
"""
|
||||
Fetch ended workflow runs in a time window for archival and clean batching.
|
||||
"""
|
||||
...
|
||||
|
||||
def delete_runs_with_related(
|
||||
self,
|
||||
runs: Sequence[WorkflowRun],
|
||||
delete_node_executions: Callable[[Session, Sequence[WorkflowRun]], tuple[int, int]] | None = None,
|
||||
delete_trigger_logs: Callable[[Session, Sequence[str]], int] | None = None,
|
||||
) -> dict[str, int]:
|
||||
"""
|
||||
Delete workflow runs and their related records (node executions, offloads, app logs,
|
||||
trigger logs, pauses, pause reasons).
|
||||
"""
|
||||
...
|
||||
|
||||
def count_runs_with_related(
|
||||
self,
|
||||
runs: Sequence[WorkflowRun],
|
||||
count_node_executions: Callable[[Session, Sequence[WorkflowRun]], tuple[int, int]] | None = None,
|
||||
count_trigger_logs: Callable[[Session, Sequence[str]], int] | None = None,
|
||||
) -> dict[str, int]:
|
||||
"""
|
||||
Count workflow runs and their related records (node executions, offloads, app logs,
|
||||
trigger logs, pauses, pause reasons) without deleting data.
|
||||
"""
|
||||
...
|
||||
|
||||
def create_workflow_pause(
|
||||
self,
|
||||
workflow_run_id: str,
|
||||
|
||||
@@ -7,13 +7,18 @@ using SQLAlchemy 2.0 style queries for WorkflowNodeExecutionModel operations.
|
||||
|
||||
from collections.abc import Sequence
|
||||
from datetime import datetime
|
||||
from typing import cast
|
||||
from typing import TypedDict, cast
|
||||
|
||||
from sqlalchemy import asc, delete, desc, select
|
||||
from sqlalchemy import asc, delete, desc, func, select, tuple_
|
||||
from sqlalchemy.engine import CursorResult
|
||||
from sqlalchemy.orm import Session, sessionmaker
|
||||
|
||||
from models.workflow import WorkflowNodeExecutionModel
|
||||
from models.enums import WorkflowRunTriggeredFrom
|
||||
from models.workflow import (
|
||||
WorkflowNodeExecutionModel,
|
||||
WorkflowNodeExecutionOffload,
|
||||
WorkflowNodeExecutionTriggeredFrom,
|
||||
)
|
||||
from repositories.api_workflow_node_execution_repository import DifyAPIWorkflowNodeExecutionRepository
|
||||
|
||||
|
||||
@@ -44,6 +49,26 @@ class DifyAPISQLAlchemyWorkflowNodeExecutionRepository(DifyAPIWorkflowNodeExecut
|
||||
"""
|
||||
self._session_maker = session_maker
|
||||
|
||||
@staticmethod
|
||||
def _map_run_triggered_from_to_node_triggered_from(triggered_from: str) -> str:
|
||||
"""
|
||||
Map workflow run triggered_from values to workflow node execution triggered_from values.
|
||||
"""
|
||||
if triggered_from in {
|
||||
WorkflowRunTriggeredFrom.APP_RUN.value,
|
||||
WorkflowRunTriggeredFrom.DEBUGGING.value,
|
||||
WorkflowRunTriggeredFrom.SCHEDULE.value,
|
||||
WorkflowRunTriggeredFrom.PLUGIN.value,
|
||||
WorkflowRunTriggeredFrom.WEBHOOK.value,
|
||||
}:
|
||||
return WorkflowNodeExecutionTriggeredFrom.WORKFLOW_RUN.value
|
||||
if triggered_from in {
|
||||
WorkflowRunTriggeredFrom.RAG_PIPELINE_RUN.value,
|
||||
WorkflowRunTriggeredFrom.RAG_PIPELINE_DEBUGGING.value,
|
||||
}:
|
||||
return WorkflowNodeExecutionTriggeredFrom.RAG_PIPELINE_RUN.value
|
||||
return ""
|
||||
|
||||
def get_node_last_execution(
|
||||
self,
|
||||
tenant_id: str,
|
||||
@@ -290,3 +315,119 @@ class DifyAPISQLAlchemyWorkflowNodeExecutionRepository(DifyAPIWorkflowNodeExecut
|
||||
result = cast(CursorResult, session.execute(stmt))
|
||||
session.commit()
|
||||
return result.rowcount
|
||||
|
||||
class RunContext(TypedDict):
|
||||
run_id: str
|
||||
tenant_id: str
|
||||
app_id: str
|
||||
workflow_id: str
|
||||
triggered_from: str
|
||||
|
||||
@staticmethod
|
||||
def delete_by_runs(session: Session, runs: Sequence[RunContext]) -> tuple[int, int]:
|
||||
"""
|
||||
Delete node executions (and offloads) for the given workflow runs using indexed columns.
|
||||
|
||||
Uses the composite index on (tenant_id, app_id, workflow_id, triggered_from, workflow_run_id)
|
||||
by filtering on those columns with tuple IN.
|
||||
"""
|
||||
if not runs:
|
||||
return 0, 0
|
||||
|
||||
tuple_values = [
|
||||
(
|
||||
run["tenant_id"],
|
||||
run["app_id"],
|
||||
run["workflow_id"],
|
||||
DifyAPISQLAlchemyWorkflowNodeExecutionRepository._map_run_triggered_from_to_node_triggered_from(
|
||||
run["triggered_from"]
|
||||
),
|
||||
run["run_id"],
|
||||
)
|
||||
for run in runs
|
||||
]
|
||||
|
||||
node_execution_ids = session.scalars(
|
||||
select(WorkflowNodeExecutionModel.id).where(
|
||||
tuple_(
|
||||
WorkflowNodeExecutionModel.tenant_id,
|
||||
WorkflowNodeExecutionModel.app_id,
|
||||
WorkflowNodeExecutionModel.workflow_id,
|
||||
WorkflowNodeExecutionModel.triggered_from,
|
||||
WorkflowNodeExecutionModel.workflow_run_id,
|
||||
).in_(tuple_values)
|
||||
)
|
||||
).all()
|
||||
|
||||
if not node_execution_ids:
|
||||
return 0, 0
|
||||
|
||||
offloads_deleted = (
|
||||
cast(
|
||||
CursorResult,
|
||||
session.execute(
|
||||
delete(WorkflowNodeExecutionOffload).where(
|
||||
WorkflowNodeExecutionOffload.node_execution_id.in_(node_execution_ids)
|
||||
)
|
||||
),
|
||||
).rowcount
|
||||
or 0
|
||||
)
|
||||
|
||||
node_executions_deleted = (
|
||||
cast(
|
||||
CursorResult,
|
||||
session.execute(
|
||||
delete(WorkflowNodeExecutionModel).where(WorkflowNodeExecutionModel.id.in_(node_execution_ids))
|
||||
),
|
||||
).rowcount
|
||||
or 0
|
||||
)
|
||||
|
||||
return node_executions_deleted, offloads_deleted
|
||||
|
||||
@staticmethod
|
||||
def count_by_runs(session: Session, runs: Sequence[RunContext]) -> tuple[int, int]:
|
||||
"""
|
||||
Count node executions (and offloads) for the given workflow runs using indexed columns.
|
||||
"""
|
||||
if not runs:
|
||||
return 0, 0
|
||||
|
||||
tuple_values = [
|
||||
(
|
||||
run["tenant_id"],
|
||||
run["app_id"],
|
||||
run["workflow_id"],
|
||||
DifyAPISQLAlchemyWorkflowNodeExecutionRepository._map_run_triggered_from_to_node_triggered_from(
|
||||
run["triggered_from"]
|
||||
),
|
||||
run["run_id"],
|
||||
)
|
||||
for run in runs
|
||||
]
|
||||
tuple_filter = tuple_(
|
||||
WorkflowNodeExecutionModel.tenant_id,
|
||||
WorkflowNodeExecutionModel.app_id,
|
||||
WorkflowNodeExecutionModel.workflow_id,
|
||||
WorkflowNodeExecutionModel.triggered_from,
|
||||
WorkflowNodeExecutionModel.workflow_run_id,
|
||||
).in_(tuple_values)
|
||||
|
||||
node_executions_count = (
|
||||
session.scalar(select(func.count()).select_from(WorkflowNodeExecutionModel).where(tuple_filter)) or 0
|
||||
)
|
||||
offloads_count = (
|
||||
session.scalar(
|
||||
select(func.count())
|
||||
.select_from(WorkflowNodeExecutionOffload)
|
||||
.join(
|
||||
WorkflowNodeExecutionModel,
|
||||
WorkflowNodeExecutionOffload.node_execution_id == WorkflowNodeExecutionModel.id,
|
||||
)
|
||||
.where(tuple_filter)
|
||||
)
|
||||
or 0
|
||||
)
|
||||
|
||||
return int(node_executions_count), int(offloads_count)
|
||||
|
||||
@@ -21,7 +21,7 @@ Implementation Notes:
|
||||
|
||||
import logging
|
||||
import uuid
|
||||
from collections.abc import Sequence
|
||||
from collections.abc import Callable, Sequence
|
||||
from datetime import datetime
|
||||
from decimal import Decimal
|
||||
from typing import Any, cast
|
||||
@@ -32,7 +32,7 @@ from sqlalchemy.engine import CursorResult
|
||||
from sqlalchemy.orm import Session, selectinload, sessionmaker
|
||||
|
||||
from core.workflow.entities.pause_reason import HumanInputRequired, PauseReason, SchedulingPause
|
||||
from core.workflow.enums import WorkflowExecutionStatus
|
||||
from core.workflow.enums import WorkflowExecutionStatus, WorkflowType
|
||||
from extensions.ext_storage import storage
|
||||
from libs.datetime_utils import naive_utc_now
|
||||
from libs.helper import convert_datetime_to_date
|
||||
@@ -40,8 +40,14 @@ from libs.infinite_scroll_pagination import InfiniteScrollPagination
|
||||
from libs.time_parser import get_time_threshold
|
||||
from libs.uuid_utils import uuidv7
|
||||
from models.enums import WorkflowRunTriggeredFrom
|
||||
from models.workflow import WorkflowPause as WorkflowPauseModel
|
||||
from models.workflow import WorkflowPauseReason, WorkflowRun
|
||||
from models.workflow import (
|
||||
WorkflowAppLog,
|
||||
WorkflowPauseReason,
|
||||
WorkflowRun,
|
||||
)
|
||||
from models.workflow import (
|
||||
WorkflowPause as WorkflowPauseModel,
|
||||
)
|
||||
from repositories.api_workflow_run_repository import APIWorkflowRunRepository
|
||||
from repositories.entities.workflow_pause import WorkflowPauseEntity
|
||||
from repositories.types import (
|
||||
@@ -314,6 +320,171 @@ class DifyAPISQLAlchemyWorkflowRunRepository(APIWorkflowRunRepository):
|
||||
logger.info("Total deleted %s workflow runs for app %s", total_deleted, app_id)
|
||||
return total_deleted
|
||||
|
||||
def get_runs_batch_by_time_range(
|
||||
self,
|
||||
start_from: datetime | None,
|
||||
end_before: datetime,
|
||||
last_seen: tuple[datetime, str] | None,
|
||||
batch_size: int,
|
||||
run_types: Sequence[WorkflowType] | None = None,
|
||||
tenant_ids: Sequence[str] | None = None,
|
||||
) -> Sequence[WorkflowRun]:
|
||||
"""
|
||||
Fetch ended workflow runs in a time window for archival and clean batching.
|
||||
|
||||
Query scope:
|
||||
- created_at in [start_from, end_before)
|
||||
- type in run_types (when provided)
|
||||
- status is an ended state
|
||||
- optional tenant_id filter and cursor (last_seen) for pagination
|
||||
"""
|
||||
with self._session_maker() as session:
|
||||
stmt = (
|
||||
select(WorkflowRun)
|
||||
.where(
|
||||
WorkflowRun.created_at < end_before,
|
||||
WorkflowRun.status.in_(WorkflowExecutionStatus.ended_values()),
|
||||
)
|
||||
.order_by(WorkflowRun.created_at.asc(), WorkflowRun.id.asc())
|
||||
.limit(batch_size)
|
||||
)
|
||||
if run_types is not None:
|
||||
if not run_types:
|
||||
return []
|
||||
stmt = stmt.where(WorkflowRun.type.in_(run_types))
|
||||
|
||||
if start_from:
|
||||
stmt = stmt.where(WorkflowRun.created_at >= start_from)
|
||||
|
||||
if tenant_ids:
|
||||
stmt = stmt.where(WorkflowRun.tenant_id.in_(tenant_ids))
|
||||
|
||||
if last_seen:
|
||||
stmt = stmt.where(
|
||||
or_(
|
||||
WorkflowRun.created_at > last_seen[0],
|
||||
and_(WorkflowRun.created_at == last_seen[0], WorkflowRun.id > last_seen[1]),
|
||||
)
|
||||
)
|
||||
|
||||
return session.scalars(stmt).all()
|
||||
|
||||
def delete_runs_with_related(
|
||||
self,
|
||||
runs: Sequence[WorkflowRun],
|
||||
delete_node_executions: Callable[[Session, Sequence[WorkflowRun]], tuple[int, int]] | None = None,
|
||||
delete_trigger_logs: Callable[[Session, Sequence[str]], int] | None = None,
|
||||
) -> dict[str, int]:
|
||||
if not runs:
|
||||
return {
|
||||
"runs": 0,
|
||||
"node_executions": 0,
|
||||
"offloads": 0,
|
||||
"app_logs": 0,
|
||||
"trigger_logs": 0,
|
||||
"pauses": 0,
|
||||
"pause_reasons": 0,
|
||||
}
|
||||
|
||||
with self._session_maker() as session:
|
||||
run_ids = [run.id for run in runs]
|
||||
if delete_node_executions:
|
||||
node_executions_deleted, offloads_deleted = delete_node_executions(session, runs)
|
||||
else:
|
||||
node_executions_deleted, offloads_deleted = 0, 0
|
||||
|
||||
app_logs_result = session.execute(delete(WorkflowAppLog).where(WorkflowAppLog.workflow_run_id.in_(run_ids)))
|
||||
app_logs_deleted = cast(CursorResult, app_logs_result).rowcount or 0
|
||||
|
||||
pause_ids = session.scalars(
|
||||
select(WorkflowPauseModel.id).where(WorkflowPauseModel.workflow_run_id.in_(run_ids))
|
||||
).all()
|
||||
pause_reasons_deleted = 0
|
||||
pauses_deleted = 0
|
||||
|
||||
if pause_ids:
|
||||
pause_reasons_result = session.execute(
|
||||
delete(WorkflowPauseReason).where(WorkflowPauseReason.pause_id.in_(pause_ids))
|
||||
)
|
||||
pause_reasons_deleted = cast(CursorResult, pause_reasons_result).rowcount or 0
|
||||
pauses_result = session.execute(delete(WorkflowPauseModel).where(WorkflowPauseModel.id.in_(pause_ids)))
|
||||
pauses_deleted = cast(CursorResult, pauses_result).rowcount or 0
|
||||
|
||||
trigger_logs_deleted = delete_trigger_logs(session, run_ids) if delete_trigger_logs else 0
|
||||
|
||||
runs_result = session.execute(delete(WorkflowRun).where(WorkflowRun.id.in_(run_ids)))
|
||||
runs_deleted = cast(CursorResult, runs_result).rowcount or 0
|
||||
|
||||
session.commit()
|
||||
|
||||
return {
|
||||
"runs": runs_deleted,
|
||||
"node_executions": node_executions_deleted,
|
||||
"offloads": offloads_deleted,
|
||||
"app_logs": app_logs_deleted,
|
||||
"trigger_logs": trigger_logs_deleted,
|
||||
"pauses": pauses_deleted,
|
||||
"pause_reasons": pause_reasons_deleted,
|
||||
}
|
||||
|
||||
def count_runs_with_related(
|
||||
self,
|
||||
runs: Sequence[WorkflowRun],
|
||||
count_node_executions: Callable[[Session, Sequence[WorkflowRun]], tuple[int, int]] | None = None,
|
||||
count_trigger_logs: Callable[[Session, Sequence[str]], int] | None = None,
|
||||
) -> dict[str, int]:
|
||||
if not runs:
|
||||
return {
|
||||
"runs": 0,
|
||||
"node_executions": 0,
|
||||
"offloads": 0,
|
||||
"app_logs": 0,
|
||||
"trigger_logs": 0,
|
||||
"pauses": 0,
|
||||
"pause_reasons": 0,
|
||||
}
|
||||
|
||||
with self._session_maker() as session:
|
||||
run_ids = [run.id for run in runs]
|
||||
if count_node_executions:
|
||||
node_executions_count, offloads_count = count_node_executions(session, runs)
|
||||
else:
|
||||
node_executions_count, offloads_count = 0, 0
|
||||
|
||||
app_logs_count = (
|
||||
session.scalar(
|
||||
select(func.count()).select_from(WorkflowAppLog).where(WorkflowAppLog.workflow_run_id.in_(run_ids))
|
||||
)
|
||||
or 0
|
||||
)
|
||||
|
||||
pause_ids = session.scalars(
|
||||
select(WorkflowPauseModel.id).where(WorkflowPauseModel.workflow_run_id.in_(run_ids))
|
||||
).all()
|
||||
pauses_count = len(pause_ids)
|
||||
pause_reasons_count = 0
|
||||
if pause_ids:
|
||||
pause_reasons_count = (
|
||||
session.scalar(
|
||||
select(func.count())
|
||||
.select_from(WorkflowPauseReason)
|
||||
.where(WorkflowPauseReason.pause_id.in_(pause_ids))
|
||||
)
|
||||
or 0
|
||||
)
|
||||
|
||||
trigger_logs_count = count_trigger_logs(session, run_ids) if count_trigger_logs else 0
|
||||
|
||||
return {
|
||||
"runs": len(runs),
|
||||
"node_executions": node_executions_count,
|
||||
"offloads": offloads_count,
|
||||
"app_logs": int(app_logs_count),
|
||||
"trigger_logs": trigger_logs_count,
|
||||
"pauses": pauses_count,
|
||||
"pause_reasons": int(pause_reasons_count),
|
||||
}
|
||||
|
||||
def create_workflow_pause(
|
||||
self,
|
||||
workflow_run_id: str,
|
||||
|
||||
@@ -4,8 +4,10 @@ SQLAlchemy implementation of WorkflowTriggerLogRepository.
|
||||
|
||||
from collections.abc import Sequence
|
||||
from datetime import UTC, datetime, timedelta
|
||||
from typing import cast
|
||||
|
||||
from sqlalchemy import and_, select
|
||||
from sqlalchemy import and_, delete, func, select
|
||||
from sqlalchemy.engine import CursorResult
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from models.enums import WorkflowTriggerStatus
|
||||
@@ -84,3 +86,37 @@ class SQLAlchemyWorkflowTriggerLogRepository(WorkflowTriggerLogRepository):
|
||||
)
|
||||
|
||||
return list(self.session.scalars(query).all())
|
||||
|
||||
def delete_by_run_ids(self, run_ids: Sequence[str]) -> int:
|
||||
"""
|
||||
Delete trigger logs associated with the given workflow run ids.
|
||||
|
||||
Args:
|
||||
run_ids: Collection of workflow run identifiers.
|
||||
|
||||
Returns:
|
||||
Number of rows deleted.
|
||||
"""
|
||||
if not run_ids:
|
||||
return 0
|
||||
|
||||
result = self.session.execute(delete(WorkflowTriggerLog).where(WorkflowTriggerLog.workflow_run_id.in_(run_ids)))
|
||||
return cast(CursorResult, result).rowcount or 0
|
||||
|
||||
def count_by_run_ids(self, run_ids: Sequence[str]) -> int:
|
||||
"""
|
||||
Count trigger logs associated with the given workflow run ids.
|
||||
|
||||
Args:
|
||||
run_ids: Collection of workflow run identifiers.
|
||||
|
||||
Returns:
|
||||
Number of rows matched.
|
||||
"""
|
||||
if not run_ids:
|
||||
return 0
|
||||
|
||||
count = self.session.scalar(
|
||||
select(func.count()).select_from(WorkflowTriggerLog).where(WorkflowTriggerLog.workflow_run_id.in_(run_ids))
|
||||
)
|
||||
return int(count or 0)
|
||||
|
||||
@@ -109,3 +109,15 @@ class WorkflowTriggerLogRepository(Protocol):
|
||||
A sequence of recent WorkflowTriggerLog instances
|
||||
"""
|
||||
...
|
||||
|
||||
def delete_by_run_ids(self, run_ids: Sequence[str]) -> int:
|
||||
"""
|
||||
Delete trigger logs for workflow run IDs.
|
||||
|
||||
Args:
|
||||
run_ids: Workflow run IDs to delete
|
||||
|
||||
Returns:
|
||||
Number of rows deleted
|
||||
"""
|
||||
...
|
||||
|
||||
@@ -1,90 +1,62 @@
|
||||
import datetime
|
||||
import logging
|
||||
import time
|
||||
|
||||
import click
|
||||
from sqlalchemy.exc import SQLAlchemyError
|
||||
|
||||
import app
|
||||
from configs import dify_config
|
||||
from enums.cloud_plan import CloudPlan
|
||||
from extensions.ext_database import db
|
||||
from extensions.ext_redis import redis_client
|
||||
from models.model import (
|
||||
App,
|
||||
Message,
|
||||
MessageAgentThought,
|
||||
MessageAnnotation,
|
||||
MessageChain,
|
||||
MessageFeedback,
|
||||
MessageFile,
|
||||
)
|
||||
from models.web import SavedMessage
|
||||
from services.feature_service import FeatureService
|
||||
from services.retention.conversation.messages_clean_policy import create_message_clean_policy
|
||||
from services.retention.conversation.messages_clean_service import MessagesCleanService
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@app.celery.task(queue="dataset")
|
||||
@app.celery.task(queue="retention")
|
||||
def clean_messages():
|
||||
click.echo(click.style("Start clean messages.", fg="green"))
|
||||
start_at = time.perf_counter()
|
||||
plan_sandbox_clean_message_day = datetime.datetime.now() - datetime.timedelta(
|
||||
days=dify_config.PLAN_SANDBOX_CLEAN_MESSAGE_DAY_SETTING
|
||||
)
|
||||
while True:
|
||||
try:
|
||||
# Main query with join and filter
|
||||
messages = (
|
||||
db.session.query(Message)
|
||||
.where(Message.created_at < plan_sandbox_clean_message_day)
|
||||
.order_by(Message.created_at.desc())
|
||||
.limit(100)
|
||||
.all()
|
||||
)
|
||||
"""
|
||||
Clean expired messages based on clean policy.
|
||||
|
||||
except SQLAlchemyError:
|
||||
raise
|
||||
if not messages:
|
||||
break
|
||||
for message in messages:
|
||||
app = db.session.query(App).filter_by(id=message.app_id).first()
|
||||
if not app:
|
||||
logger.warning(
|
||||
"Expected App record to exist, but none was found, app_id=%s, message_id=%s",
|
||||
message.app_id,
|
||||
message.id,
|
||||
)
|
||||
continue
|
||||
features_cache_key = f"features:{app.tenant_id}"
|
||||
plan_cache = redis_client.get(features_cache_key)
|
||||
if plan_cache is None:
|
||||
features = FeatureService.get_features(app.tenant_id)
|
||||
redis_client.setex(features_cache_key, 600, features.billing.subscription.plan)
|
||||
plan = features.billing.subscription.plan
|
||||
else:
|
||||
plan = plan_cache.decode()
|
||||
if plan == CloudPlan.SANDBOX:
|
||||
# clean related message
|
||||
db.session.query(MessageFeedback).where(MessageFeedback.message_id == message.id).delete(
|
||||
synchronize_session=False
|
||||
)
|
||||
db.session.query(MessageAnnotation).where(MessageAnnotation.message_id == message.id).delete(
|
||||
synchronize_session=False
|
||||
)
|
||||
db.session.query(MessageChain).where(MessageChain.message_id == message.id).delete(
|
||||
synchronize_session=False
|
||||
)
|
||||
db.session.query(MessageAgentThought).where(MessageAgentThought.message_id == message.id).delete(
|
||||
synchronize_session=False
|
||||
)
|
||||
db.session.query(MessageFile).where(MessageFile.message_id == message.id).delete(
|
||||
synchronize_session=False
|
||||
)
|
||||
db.session.query(SavedMessage).where(SavedMessage.message_id == message.id).delete(
|
||||
synchronize_session=False
|
||||
)
|
||||
db.session.query(Message).where(Message.id == message.id).delete()
|
||||
db.session.commit()
|
||||
end_at = time.perf_counter()
|
||||
click.echo(click.style(f"Cleaned messages from db success latency: {end_at - start_at}", fg="green"))
|
||||
This task uses MessagesCleanService to efficiently clean messages in batches.
|
||||
The behavior depends on BILLING_ENABLED configuration:
|
||||
- BILLING_ENABLED=True: only delete messages from sandbox tenants (with whitelist/grace period)
|
||||
- BILLING_ENABLED=False: delete all messages within the time range
|
||||
"""
|
||||
click.echo(click.style("clean_messages: start clean messages.", fg="green"))
|
||||
start_at = time.perf_counter()
|
||||
|
||||
try:
|
||||
# Create policy based on billing configuration
|
||||
policy = create_message_clean_policy(
|
||||
graceful_period_days=dify_config.SANDBOX_EXPIRED_RECORDS_CLEAN_GRACEFUL_PERIOD,
|
||||
)
|
||||
|
||||
# Create and run the cleanup service
|
||||
service = MessagesCleanService.from_days(
|
||||
policy=policy,
|
||||
days=dify_config.SANDBOX_EXPIRED_RECORDS_RETENTION_DAYS,
|
||||
batch_size=dify_config.SANDBOX_EXPIRED_RECORDS_CLEAN_BATCH_SIZE,
|
||||
)
|
||||
stats = service.run()
|
||||
|
||||
end_at = time.perf_counter()
|
||||
click.echo(
|
||||
click.style(
|
||||
f"clean_messages: completed successfully\n"
|
||||
f" - Latency: {end_at - start_at:.2f}s\n"
|
||||
f" - Batches processed: {stats['batches']}\n"
|
||||
f" - Total messages scanned: {stats['total_messages']}\n"
|
||||
f" - Messages filtered: {stats['filtered_messages']}\n"
|
||||
f" - Messages deleted: {stats['total_deleted']}",
|
||||
fg="green",
|
||||
)
|
||||
)
|
||||
except Exception as e:
|
||||
end_at = time.perf_counter()
|
||||
logger.exception("clean_messages failed")
|
||||
click.echo(
|
||||
click.style(
|
||||
f"clean_messages: failed after {end_at - start_at:.2f}s - {str(e)}",
|
||||
fg="red",
|
||||
)
|
||||
)
|
||||
raise
|
||||
|
||||
43
api/schedule/clean_workflow_runs_task.py
Normal file
43
api/schedule/clean_workflow_runs_task.py
Normal file
@@ -0,0 +1,43 @@
|
||||
from datetime import UTC, datetime
|
||||
|
||||
import click
|
||||
|
||||
import app
|
||||
from configs import dify_config
|
||||
from services.retention.workflow_run.clear_free_plan_expired_workflow_run_logs import WorkflowRunCleanup
|
||||
|
||||
|
||||
@app.celery.task(queue="retention")
|
||||
def clean_workflow_runs_task() -> None:
|
||||
"""
|
||||
Scheduled cleanup for workflow runs and related records (sandbox tenants only).
|
||||
"""
|
||||
click.echo(
|
||||
click.style(
|
||||
(
|
||||
"Scheduled workflow run cleanup starting: "
|
||||
f"cutoff={dify_config.SANDBOX_EXPIRED_RECORDS_RETENTION_DAYS} days, "
|
||||
f"batch={dify_config.SANDBOX_EXPIRED_RECORDS_CLEAN_BATCH_SIZE}"
|
||||
),
|
||||
fg="green",
|
||||
)
|
||||
)
|
||||
|
||||
start_time = datetime.now(UTC)
|
||||
|
||||
WorkflowRunCleanup(
|
||||
days=dify_config.SANDBOX_EXPIRED_RECORDS_RETENTION_DAYS,
|
||||
batch_size=dify_config.SANDBOX_EXPIRED_RECORDS_CLEAN_BATCH_SIZE,
|
||||
start_from=None,
|
||||
end_before=None,
|
||||
).run()
|
||||
|
||||
end_time = datetime.now(UTC)
|
||||
elapsed = end_time - start_time
|
||||
click.echo(
|
||||
click.style(
|
||||
f"Scheduled workflow run cleanup finished. start={start_time.isoformat()} "
|
||||
f"end={end_time.isoformat()} duration={elapsed}",
|
||||
fg="green",
|
||||
)
|
||||
)
|
||||
@@ -50,10 +50,13 @@ def create_clusters(batch_size):
|
||||
)
|
||||
for new_cluster in new_clusters:
|
||||
tidb_auth_binding = TidbAuthBinding(
|
||||
tenant_id=None,
|
||||
cluster_id=new_cluster["cluster_id"],
|
||||
cluster_name=new_cluster["cluster_name"],
|
||||
account=new_cluster["account"],
|
||||
password=new_cluster["password"],
|
||||
active=False,
|
||||
status="CREATING",
|
||||
)
|
||||
db.session.add(tidb_auth_binding)
|
||||
db.session.commit()
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user