mirror of
https://github.com/langgenius/dify.git
synced 2026-02-09 23:20:12 -05:00
Merge remote-tracking branch 'origin/feat/support-agent-sandbox' into pre-align-hitl-frontend
This commit is contained in:
@@ -120,6 +120,30 @@ class WorkflowCollaborationService:
|
||||
|
||||
return {"msg": "skill_file_active_updated"}, 200
|
||||
|
||||
if event_type == "sync_request":
|
||||
leader_sid = self._repository.get_current_leader(workflow_id)
|
||||
if leader_sid and (
|
||||
self.is_session_active(workflow_id, leader_sid)
|
||||
and self._repository.is_graph_active(workflow_id, leader_sid)
|
||||
):
|
||||
target_sid = leader_sid
|
||||
else:
|
||||
if leader_sid:
|
||||
self._repository.delete_leader(workflow_id)
|
||||
target_sid = self._select_graph_leader(workflow_id, preferred_sid=sid)
|
||||
if target_sid:
|
||||
self._repository.set_leader(workflow_id, target_sid)
|
||||
self.broadcast_leader_change(workflow_id, target_sid)
|
||||
if not target_sid:
|
||||
return {"msg": "no_active_leader"}, 200
|
||||
|
||||
self._socketio.emit(
|
||||
"collaboration_update",
|
||||
{"type": event_type, "userId": user_id, "data": event_data, "timestamp": timestamp},
|
||||
room=target_sid,
|
||||
)
|
||||
return {"msg": "sync_request_forwarded"}, 200
|
||||
|
||||
self._socketio.emit(
|
||||
"collaboration_update",
|
||||
{"type": event_type, "userId": user_id, "data": event_data, "timestamp": timestamp},
|
||||
@@ -232,11 +256,44 @@ class WorkflowCollaborationService:
|
||||
def get_current_leader(self, workflow_id: str) -> str | None:
|
||||
return self._repository.get_current_leader(workflow_id)
|
||||
|
||||
def _prune_inactive_sessions(self, workflow_id: str) -> list[WorkflowSessionInfo]:
|
||||
"""Remove inactive sessions from storage and return active sessions only."""
|
||||
sessions = self._repository.list_sessions(workflow_id)
|
||||
if not sessions:
|
||||
return []
|
||||
|
||||
active_sessions: list[WorkflowSessionInfo] = []
|
||||
stale_sids: list[str] = []
|
||||
for session in sessions:
|
||||
sid = session["sid"]
|
||||
if self.is_session_active(workflow_id, sid):
|
||||
active_sessions.append(session)
|
||||
else:
|
||||
stale_sids.append(sid)
|
||||
|
||||
for sid in stale_sids:
|
||||
self._repository.delete_session(workflow_id, sid)
|
||||
|
||||
return active_sessions
|
||||
|
||||
def broadcast_online_users(self, workflow_id: str) -> None:
|
||||
users = self._repository.list_sessions(workflow_id)
|
||||
users = self._prune_inactive_sessions(workflow_id)
|
||||
users.sort(key=lambda x: x.get("connected_at") or 0)
|
||||
|
||||
leader_sid = self.get_current_leader(workflow_id)
|
||||
previous_leader = leader_sid
|
||||
active_sids = {user["sid"] for user in users}
|
||||
if leader_sid and leader_sid not in active_sids:
|
||||
self._repository.delete_leader(workflow_id)
|
||||
leader_sid = None
|
||||
|
||||
if not leader_sid and users:
|
||||
leader_sid = self._select_graph_leader(workflow_id)
|
||||
if leader_sid:
|
||||
self._repository.set_leader(workflow_id, leader_sid)
|
||||
|
||||
if leader_sid != previous_leader:
|
||||
self.broadcast_leader_change(workflow_id, leader_sid)
|
||||
|
||||
self._socketio.emit(
|
||||
"online_users",
|
||||
@@ -293,7 +350,9 @@ class WorkflowCollaborationService:
|
||||
|
||||
def _select_graph_leader(self, workflow_id: str, preferred_sid: str | None = None) -> str | None:
|
||||
session_sids = [
|
||||
session["sid"] for session in self._repository.list_sessions(workflow_id) if session.get("graph_active")
|
||||
session["sid"]
|
||||
for session in self._repository.list_sessions(workflow_id)
|
||||
if session.get("graph_active") and self.is_session_active(workflow_id, session["sid"])
|
||||
]
|
||||
if not session_sids:
|
||||
return None
|
||||
|
||||
@@ -140,9 +140,19 @@ class TestWorkflowCollaborationService:
|
||||
collaboration_service, repository, _socketio = service
|
||||
repository.get_current_leader.return_value = "sid-1"
|
||||
repository.set_leader_if_absent.return_value = True
|
||||
repository.list_sessions.return_value = [
|
||||
{
|
||||
"user_id": "u-2",
|
||||
"username": "B",
|
||||
"avatar": None,
|
||||
"sid": "sid-2",
|
||||
"connected_at": 1,
|
||||
"graph_active": True,
|
||||
}
|
||||
]
|
||||
|
||||
with (
|
||||
patch.object(collaboration_service, "is_session_active", return_value=False),
|
||||
patch.object(collaboration_service, "is_session_active", side_effect=lambda _wf, sid: sid != "sid-1"),
|
||||
patch.object(collaboration_service, "broadcast_leader_change") as broadcast_leader_change,
|
||||
):
|
||||
# Act
|
||||
@@ -161,6 +171,16 @@ class TestWorkflowCollaborationService:
|
||||
collaboration_service, repository, _socketio = service
|
||||
repository.get_current_leader.side_effect = [None, "sid-3"]
|
||||
repository.set_leader_if_absent.return_value = False
|
||||
repository.list_sessions.return_value = [
|
||||
{
|
||||
"user_id": "u-2",
|
||||
"username": "B",
|
||||
"avatar": None,
|
||||
"sid": "sid-2",
|
||||
"connected_at": 1,
|
||||
"graph_active": True,
|
||||
}
|
||||
]
|
||||
|
||||
# Act
|
||||
result = collaboration_service.get_or_set_leader("wf-1", "sid-2")
|
||||
@@ -174,9 +194,21 @@ class TestWorkflowCollaborationService:
|
||||
# Arrange
|
||||
collaboration_service, repository, _socketio = service
|
||||
repository.get_current_leader.return_value = "sid-1"
|
||||
repository.get_session_sids.return_value = ["sid-2"]
|
||||
repository.list_sessions.return_value = [
|
||||
{
|
||||
"user_id": "u-2",
|
||||
"username": "B",
|
||||
"avatar": None,
|
||||
"sid": "sid-2",
|
||||
"connected_at": 1,
|
||||
"graph_active": True,
|
||||
}
|
||||
]
|
||||
|
||||
with patch.object(collaboration_service, "broadcast_leader_change") as broadcast_leader_change:
|
||||
with (
|
||||
patch.object(collaboration_service, "is_session_active", return_value=True),
|
||||
patch.object(collaboration_service, "broadcast_leader_change") as broadcast_leader_change,
|
||||
):
|
||||
# Act
|
||||
collaboration_service.handle_leader_disconnect("wf-1", "sid-1")
|
||||
|
||||
@@ -190,7 +222,7 @@ class TestWorkflowCollaborationService:
|
||||
# Arrange
|
||||
collaboration_service, repository, _socketio = service
|
||||
repository.get_current_leader.return_value = "sid-1"
|
||||
repository.get_session_sids.return_value = []
|
||||
repository.list_sessions.return_value = []
|
||||
|
||||
# Act
|
||||
collaboration_service.handle_leader_disconnect("wf-1", "sid-1")
|
||||
@@ -209,8 +241,9 @@ class TestWorkflowCollaborationService:
|
||||
]
|
||||
repository.get_current_leader.return_value = "sid-1"
|
||||
|
||||
# Act
|
||||
collaboration_service.broadcast_online_users("wf-1")
|
||||
with patch.object(collaboration_service, "is_session_active", return_value=True):
|
||||
# Act
|
||||
collaboration_service.broadcast_online_users("wf-1")
|
||||
|
||||
# Assert
|
||||
socketio.emit.assert_called_once_with(
|
||||
@@ -248,8 +281,21 @@ class TestWorkflowCollaborationService:
|
||||
# Arrange
|
||||
collaboration_service, repository, _socketio = service
|
||||
repository.get_current_leader.return_value = None
|
||||
repository.list_sessions.return_value = [
|
||||
{
|
||||
"user_id": "u-2",
|
||||
"username": "B",
|
||||
"avatar": None,
|
||||
"sid": "sid-2",
|
||||
"connected_at": 1,
|
||||
"graph_active": True,
|
||||
}
|
||||
]
|
||||
|
||||
with patch.object(collaboration_service, "broadcast_leader_change") as broadcast_leader_change:
|
||||
with (
|
||||
patch.object(collaboration_service, "is_session_active", return_value=True),
|
||||
patch.object(collaboration_service, "broadcast_leader_change") as broadcast_leader_change,
|
||||
):
|
||||
# Act
|
||||
collaboration_service.refresh_session_state("wf-1", "sid-2")
|
||||
|
||||
|
||||
@@ -56,6 +56,28 @@ type LoroContainer = {
|
||||
getAttached?: () => unknown
|
||||
}
|
||||
|
||||
type GraphImportLogEntry = {
|
||||
timestamp: number
|
||||
appId: string | null
|
||||
sources: Array<'nodes' | 'edges'>
|
||||
before: {
|
||||
nodes: Node[]
|
||||
edges: Edge[]
|
||||
}
|
||||
after: {
|
||||
nodes: Node[]
|
||||
edges: Edge[]
|
||||
}
|
||||
meta: {
|
||||
leaderId: string | null
|
||||
isLeader: boolean
|
||||
graphViewActive: boolean | null
|
||||
pendingInitialSync: boolean
|
||||
}
|
||||
}
|
||||
|
||||
const GRAPH_IMPORT_LOG_LIMIT = 20
|
||||
|
||||
const toLoroValue = (value: unknown): Value => cloneDeep(value) as Value
|
||||
const toLoroRecord = (value: unknown): Record<string, Value> => cloneDeep(value) as Record<string, Value>
|
||||
export class CollaborationManager {
|
||||
@@ -78,6 +100,15 @@ export class CollaborationManager {
|
||||
private rejoinInProgress = false
|
||||
private pendingGraphImportEmit = false
|
||||
private graphViewActive: boolean | null = null
|
||||
private graphImportLogs: GraphImportLogEntry[] = []
|
||||
private pendingImportLog: {
|
||||
timestamp: number
|
||||
sources: Set<'nodes' | 'edges'>
|
||||
before: {
|
||||
nodes: Node[]
|
||||
edges: Edge[]
|
||||
}
|
||||
} | null = null
|
||||
|
||||
private getActiveSocket(): Socket | null {
|
||||
if (!this.currentAppId)
|
||||
@@ -504,6 +535,7 @@ export class CollaborationManager {
|
||||
this.onlineUsers = []
|
||||
this.isUndoRedoInProgress = false
|
||||
this.rejoinInProgress = false
|
||||
this.clearGraphImportLog()
|
||||
|
||||
// Only reset leader status when actually disconnecting
|
||||
const wasLeader = this.isLeader
|
||||
@@ -908,6 +940,7 @@ export class CollaborationManager {
|
||||
requestAnimationFrame(() => {
|
||||
const state = reactFlowStore.getState()
|
||||
const previousNodes: Node[] = state.getNodes()
|
||||
this.startImportLog('nodes', { nodes: previousNodes, edges: state.getEdges() })
|
||||
const previousNodeMap = new Map(previousNodes.map(node => [node.id, node]))
|
||||
const selectedIds = new Set(
|
||||
previousNodes
|
||||
@@ -964,6 +997,7 @@ export class CollaborationManager {
|
||||
requestAnimationFrame(() => {
|
||||
// Get ReactFlow's native setters, not the collaborative ones
|
||||
const state = reactFlowStore.getState()
|
||||
this.startImportLog('edges', { nodes: state.getNodes(), edges: state.getEdges() })
|
||||
const updatedEdges = Array.from(this.edgesMap?.values() || []) as Edge[]
|
||||
|
||||
this.pendingInitialSync = false
|
||||
@@ -984,6 +1018,7 @@ export class CollaborationManager {
|
||||
this.pendingGraphImportEmit = true
|
||||
requestAnimationFrame(() => {
|
||||
this.pendingGraphImportEmit = false
|
||||
this.finalizeImportLog()
|
||||
const mergedNodes = this.mergeLocalNodeState(this.getNodes())
|
||||
this.eventEmitter.emit('graphImport', {
|
||||
nodes: mergedNodes,
|
||||
@@ -1034,6 +1069,98 @@ export class CollaborationManager {
|
||||
})
|
||||
}
|
||||
|
||||
getGraphImportLog(): GraphImportLogEntry[] {
|
||||
return cloneDeep(this.graphImportLogs)
|
||||
}
|
||||
|
||||
clearGraphImportLog(): void {
|
||||
this.graphImportLogs = []
|
||||
this.pendingImportLog = null
|
||||
}
|
||||
|
||||
downloadGraphImportLog(): void {
|
||||
if (this.graphImportLogs.length === 0)
|
||||
return
|
||||
|
||||
const payload = {
|
||||
appId: this.currentAppId,
|
||||
generatedAt: new Date().toISOString(),
|
||||
entries: this.graphImportLogs,
|
||||
}
|
||||
const stamp = new Date().toISOString().replace(/[:.]/g, '-')
|
||||
const appSuffix = this.currentAppId ?? 'unknown'
|
||||
const fileName = `workflow-graph-import-log-${appSuffix}-${stamp}.json`
|
||||
const blob = new Blob([JSON.stringify(payload, null, 2)], { type: 'application/json' })
|
||||
const url = URL.createObjectURL(blob)
|
||||
const link = document.createElement('a')
|
||||
link.href = url
|
||||
link.download = fileName
|
||||
link.click()
|
||||
URL.revokeObjectURL(url)
|
||||
}
|
||||
|
||||
private snapshotReactFlowGraph(): { nodes: Node[], edges: Edge[] } {
|
||||
if (!this.reactFlowStore) {
|
||||
return {
|
||||
nodes: this.getNodes(),
|
||||
edges: this.getEdges(),
|
||||
}
|
||||
}
|
||||
|
||||
const state = this.reactFlowStore.getState()
|
||||
return {
|
||||
nodes: cloneDeep(state.getNodes()),
|
||||
edges: cloneDeep(state.getEdges()),
|
||||
}
|
||||
}
|
||||
|
||||
private startImportLog(source: 'nodes' | 'edges', before?: { nodes: Node[], edges: Edge[] }): void {
|
||||
if (!this.pendingImportLog) {
|
||||
const snapshot = before ?? this.snapshotReactFlowGraph()
|
||||
this.pendingImportLog = {
|
||||
timestamp: Date.now(),
|
||||
sources: new Set([source]),
|
||||
before: {
|
||||
nodes: cloneDeep(snapshot.nodes),
|
||||
edges: cloneDeep(snapshot.edges),
|
||||
},
|
||||
}
|
||||
return
|
||||
}
|
||||
this.pendingImportLog.sources.add(source)
|
||||
}
|
||||
|
||||
private finalizeImportLog(): void {
|
||||
if (!this.pendingImportLog)
|
||||
return
|
||||
|
||||
const afterSnapshot = this.snapshotReactFlowGraph()
|
||||
const entry: GraphImportLogEntry = {
|
||||
timestamp: this.pendingImportLog.timestamp,
|
||||
appId: this.currentAppId,
|
||||
sources: Array.from(this.pendingImportLog.sources),
|
||||
before: {
|
||||
nodes: this.pendingImportLog.before.nodes,
|
||||
edges: this.pendingImportLog.before.edges,
|
||||
},
|
||||
after: {
|
||||
nodes: cloneDeep(afterSnapshot.nodes),
|
||||
edges: cloneDeep(afterSnapshot.edges),
|
||||
},
|
||||
meta: {
|
||||
leaderId: this.leaderId,
|
||||
isLeader: this.isLeader,
|
||||
graphViewActive: this.graphViewActive,
|
||||
pendingInitialSync: this.pendingInitialSync,
|
||||
},
|
||||
}
|
||||
|
||||
this.graphImportLogs.push(entry)
|
||||
if (this.graphImportLogs.length > GRAPH_IMPORT_LOG_LIMIT)
|
||||
this.graphImportLogs.splice(0, this.graphImportLogs.length - GRAPH_IMPORT_LOG_LIMIT)
|
||||
this.pendingImportLog = null
|
||||
}
|
||||
|
||||
private setupSocketEventListeners(socket: Socket): void {
|
||||
socket.on('collaboration_update', (update: CollaborationUpdate) => {
|
||||
if (update.type === 'mouse_move') {
|
||||
|
||||
@@ -10,6 +10,7 @@ import {
|
||||
useWorkflowMoveMode,
|
||||
useWorkflowOrganize,
|
||||
} from '.'
|
||||
import { collaborationManager } from '../collaboration/core/collaboration-manager'
|
||||
import { useWorkflowStore } from '../store'
|
||||
import {
|
||||
getKeyboardKeyCodeBySystem,
|
||||
@@ -266,6 +267,13 @@ export const useShortcuts = (enabled = true): void => {
|
||||
useCapture: true,
|
||||
})
|
||||
|
||||
useKeyPress(`${getKeyboardKeyCodeBySystem('ctrl')}.shift.l`, (e) => {
|
||||
if (shouldHandleShortcut(e)) {
|
||||
e.preventDefault()
|
||||
collaborationManager.downloadGraphImportLog()
|
||||
}
|
||||
}, { exactMatch: true, useCapture: true })
|
||||
|
||||
// Shift ↓
|
||||
useKeyPress(
|
||||
'shift',
|
||||
|
||||
Reference in New Issue
Block a user