From aead4fe65cdac36b6022e2bf4ae9c8041b42e246 Mon Sep 17 00:00:00 2001 From: wangxiaolei Date: Mon, 9 Feb 2026 10:49:23 +0800 Subject: [PATCH] refactor: document_indexing_update_task split database session (#32105) Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com> --- api/pyproject.toml | 2 +- api/tasks/document_indexing_update_task.py | 56 +++++++++++----------- api/uv.lock | 11 +++-- 3 files changed, 35 insertions(+), 34 deletions(-) diff --git a/api/pyproject.toml b/api/pyproject.toml index 4be7afff26..2a7c946e6e 100644 --- a/api/pyproject.toml +++ b/api/pyproject.toml @@ -81,7 +81,7 @@ dependencies = [ "starlette==0.49.1", "tiktoken~=0.9.0", "transformers~=4.56.1", - "unstructured[docx,epub,md,ppt,pptx]~=0.16.1", + "unstructured[docx,epub,md,ppt,pptx]~=0.18.18", "yarl~=1.18.3", "webvtt-py~=0.5.1", "sseclient-py~=1.8.0", diff --git a/api/tasks/document_indexing_update_task.py b/api/tasks/document_indexing_update_task.py index 45d58c92ec..c7508c6d05 100644 --- a/api/tasks/document_indexing_update_task.py +++ b/api/tasks/document_indexing_update_task.py @@ -36,25 +36,19 @@ def document_indexing_update_task(dataset_id: str, document_id: str): document.indexing_status = "parsing" document.processing_started_at = naive_utc_now() - # delete all document segment and index - try: - dataset = session.query(Dataset).where(Dataset.id == dataset_id).first() - if not dataset: - raise Exception("Dataset not found") + dataset = session.query(Dataset).where(Dataset.id == dataset_id).first() + if not dataset: + return - index_type = document.doc_form - index_processor = IndexProcessorFactory(index_type).init_index_processor() - - segments = session.scalars(select(DocumentSegment).where(DocumentSegment.document_id == document_id)).all() - if segments: - index_node_ids = [segment.index_node_id for segment in segments] - - # delete from vector index - index_processor.clean(dataset, index_node_ids, with_keywords=True, delete_child_chunks=True) - segment_ids = [segment.id for segment in segments] - segment_delete_stmt = delete(DocumentSegment).where(DocumentSegment.id.in_(segment_ids)) - session.execute(segment_delete_stmt) + index_type = document.doc_form + segments = session.scalars(select(DocumentSegment).where(DocumentSegment.document_id == document_id)).all() + index_node_ids = [segment.index_node_id for segment in segments] + clean_success = False + try: + index_processor = IndexProcessorFactory(index_type).init_index_processor() + if index_node_ids: + index_processor.clean(dataset, index_node_ids, with_keywords=True, delete_child_chunks=True) end_at = time.perf_counter() logger.info( click.style( @@ -64,15 +58,21 @@ def document_indexing_update_task(dataset_id: str, document_id: str): fg="green", ) ) - except Exception: - logger.exception("Cleaned document when document update data source or process rule failed") + clean_success = True + except Exception: + logger.exception("Failed to clean document index during update, document_id: %s", document_id) - try: - indexing_runner = IndexingRunner() - indexing_runner.run([document]) - end_at = time.perf_counter() - logger.info(click.style(f"update document: {document.id} latency: {end_at - start_at}", fg="green")) - except DocumentIsPausedError as ex: - logger.info(click.style(str(ex), fg="yellow")) - except Exception: - logger.exception("document_indexing_update_task failed, document_id: %s", document_id) + if clean_success: + with session_factory.create_session() as session, session.begin(): + segment_delete_stmt = delete(DocumentSegment).where(DocumentSegment.document_id == document_id) + session.execute(segment_delete_stmt) + + try: + indexing_runner = IndexingRunner() + indexing_runner.run([document]) + end_at = time.perf_counter() + logger.info(click.style(f"update document: {document.id} latency: {end_at - start_at}", fg="green")) + except DocumentIsPausedError as ex: + logger.info(click.style(str(ex), fg="yellow")) + except Exception: + logger.exception("document_indexing_update_task failed, document_id: %s", document_id) diff --git a/api/uv.lock b/api/uv.lock index 0a17741f9a..4eb5c42659 100644 --- a/api/uv.lock +++ b/api/uv.lock @@ -1653,7 +1653,7 @@ requires-dist = [ { name = "starlette", specifier = "==0.49.1" }, { name = "tiktoken", specifier = "~=0.9.0" }, { name = "transformers", specifier = "~=4.56.1" }, - { name = "unstructured", extras = ["docx", "epub", "md", "ppt", "pptx"], specifier = "~=0.16.1" }, + { name = "unstructured", extras = ["docx", "epub", "md", "ppt", "pptx"], specifier = "~=0.18.18" }, { name = "weave", specifier = ">=0.52.16" }, { name = "weaviate-client", specifier = "==4.17.0" }, { name = "webvtt-py", specifier = "~=0.5.1" }, @@ -6814,12 +6814,12 @@ wheels = [ [[package]] name = "unstructured" -version = "0.16.25" +version = "0.18.31" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "backoff" }, { name = "beautifulsoup4" }, - { name = "chardet" }, + { name = "charset-normalizer" }, { name = "dataclasses-json" }, { name = "emoji" }, { name = "filetype" }, @@ -6827,6 +6827,7 @@ dependencies = [ { name = "langdetect" }, { name = "lxml" }, { name = "nltk" }, + { name = "numba" }, { name = "numpy" }, { name = "psutil" }, { name = "python-iso639" }, @@ -6839,9 +6840,9 @@ dependencies = [ { name = "unstructured-client" }, { name = "wrapt" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/64/31/98c4c78e305d1294888adf87fd5ee30577a4c393951341ca32b43f167f1e/unstructured-0.16.25.tar.gz", hash = "sha256:73b9b0f51dbb687af572ecdb849a6811710b9cac797ddeab8ee80fa07d8aa5e6", size = 1683097, upload-time = "2025-03-07T11:19:39.507Z" } +sdist = { url = "https://files.pythonhosted.org/packages/a9/5f/64285bd69a538bc28753f1423fcaa9d64cd79a9e7c097171b1f0d27e9cdb/unstructured-0.18.31.tar.gz", hash = "sha256:af4bbe32d1894ae6e755f0da6fc0dd307a1d0adeebe0e7cc6278f6cf744339ca", size = 1707700, upload-time = "2026-01-27T15:33:05.378Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/12/4f/ad08585b5c8a33c82ea119494c4d3023f4796958c56e668b15cc282ec0a0/unstructured-0.16.25-py3-none-any.whl", hash = "sha256:14719ccef2830216cf1c5bf654f75e2bf07b17ca5dcee9da5ac74618130fd337", size = 1769286, upload-time = "2025-03-07T11:19:37.299Z" }, + { url = "https://files.pythonhosted.org/packages/c8/4a/9c43f39d9e443c9bc3f2e379b305bca27110adc653b071221b3132c18de5/unstructured-0.18.31-py3-none-any.whl", hash = "sha256:fab4641176cb9b192ed38048758aa0d9843121d03626d18f42275afb31e5b2d3", size = 1794889, upload-time = "2026-01-27T15:33:03.136Z" }, ] [package.optional-dependencies]