diff --git a/api/core/indexing_runner.py b/api/core/indexing_runner.py index dcadd3f5a..1475c143c 100644 --- a/api/core/indexing_runner.py +++ b/api/core/indexing_runner.py @@ -11,6 +11,7 @@ from flask import current_app, Flask from flask_login import current_user from langchain.schema import Document from langchain.text_splitter import RecursiveCharacterTextSplitter, TextSplitter +from sqlalchemy.orm.exc import ObjectDeletedError from core.data_loader.file_extractor import FileExtractor from core.data_loader.loader.notion import NotionLoader @@ -79,6 +80,8 @@ class IndexingRunner: dataset_document.error = str(e.description) dataset_document.stopped_at = datetime.datetime.utcnow() db.session.commit() + except ObjectDeletedError: + logging.warning('Document deleted, document id: {}'.format(dataset_document.id)) except Exception as e: logging.exception("consume document failed") dataset_document.indexing_status = 'error' @@ -276,7 +279,8 @@ class IndexingRunner: ) if len(preview_texts) > 0: # qa model document - response = LLMGenerator.generate_qa_document(current_user.current_tenant_id, preview_texts[0], doc_language) + response = LLMGenerator.generate_qa_document(current_user.current_tenant_id, preview_texts[0], + doc_language) document_qa_list = self.format_split_text(response) return { "total_segments": total_segments * 20, @@ -372,7 +376,8 @@ class IndexingRunner: ) if len(preview_texts) > 0: # qa model document - response = LLMGenerator.generate_qa_document(current_user.current_tenant_id, preview_texts[0], doc_language) + response = LLMGenerator.generate_qa_document(current_user.current_tenant_id, preview_texts[0], + doc_language) document_qa_list = self.format_split_text(response) return { "total_segments": total_segments * 20, @@ -582,7 +587,6 @@ class IndexingRunner: all_qa_documents.extend(format_documents) - def _split_to_documents_for_estimate(self, text_docs: List[Document], splitter: TextSplitter, processing_rule: DatasetProcessRule) -> List[Document]: """ @@ -734,6 +738,9 @@ class IndexingRunner: count = DatasetDocument.query.filter_by(id=document_id, is_paused=True).count() if count > 0: raise DocumentIsPausedException() + document = DatasetDocument.query.filter_by(id=document_id).first() + if not document: + raise DocumentIsDeletedPausedException() update_params = { DatasetDocument.indexing_status: after_indexing_status @@ -781,3 +788,7 @@ class IndexingRunner: class DocumentIsPausedException(Exception): pass + + +class DocumentIsDeletedPausedException(Exception): + pass diff --git a/api/services/dataset_service.py b/api/services/dataset_service.py index 6a5f592a8..ede312469 100644 --- a/api/services/dataset_service.py +++ b/api/services/dataset_service.py @@ -385,9 +385,6 @@ class DocumentService: @staticmethod def delete_document(document): - if document.indexing_status in ["parsing", "cleaning", "splitting", "indexing"]: - raise DocumentIndexingError() - # trigger document_was_deleted signal document_was_deleted.send(document.id, dataset_id=document.dataset_id) diff --git a/api/tasks/clean_document_task.py b/api/tasks/clean_document_task.py index 2f37e5ff6..2738ede28 100644 --- a/api/tasks/clean_document_task.py +++ b/api/tasks/clean_document_task.py @@ -31,22 +31,24 @@ def clean_document_task(document_id: str, dataset_id: str): kw_index = IndexBuilder.get_index(dataset, 'economy') segments = db.session.query(DocumentSegment).filter(DocumentSegment.document_id == document_id).all() - index_node_ids = [segment.index_node_id for segment in segments] + # check segment is exist + if segments: + index_node_ids = [segment.index_node_id for segment in segments] - # delete from vector index - if vector_index: - vector_index.delete_by_document_id(document_id) + # delete from vector index + if vector_index: + vector_index.delete_by_document_id(document_id) - # delete from keyword index - if index_node_ids: - kw_index.delete_by_ids(index_node_ids) + # delete from keyword index + if index_node_ids: + kw_index.delete_by_ids(index_node_ids) - for segment in segments: - db.session.delete(segment) + for segment in segments: + db.session.delete(segment) - db.session.commit() - end_at = time.perf_counter() - logging.info( - click.style('Cleaned document when document deleted: {} latency: {}'.format(document_id, end_at - start_at), fg='green')) + db.session.commit() + end_at = time.perf_counter() + logging.info( + click.style('Cleaned document when document deleted: {} latency: {}'.format(document_id, end_at - start_at), fg='green')) except Exception: logging.exception("Cleaned document when document deleted failed") diff --git a/api/tasks/document_indexing_task.py b/api/tasks/document_indexing_task.py index 451e6f9b3..31d083aea 100644 --- a/api/tasks/document_indexing_task.py +++ b/api/tasks/document_indexing_task.py @@ -30,13 +30,11 @@ def document_indexing_task(dataset_id: str, document_ids: list): Document.dataset_id == dataset_id ).first() - if not document: - raise NotFound('Document not found') - - document.indexing_status = 'parsing' - document.processing_started_at = datetime.datetime.utcnow() - documents.append(document) - db.session.add(document) + if document: + document.indexing_status = 'parsing' + document.processing_started_at = datetime.datetime.utcnow() + documents.append(document) + db.session.add(document) db.session.commit() try: