Merge pull request #1899 from danielaskdd/kv-storage-workspace

Fix: workspace isolation problem for json KV storage
This commit is contained in:
Daniel.y 2025-08-02 11:36:20 +08:00 committed by GitHub
commit a417f7c168
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 28 additions and 26 deletions

View File

@ -45,21 +45,22 @@ class JsonDocStatusStorage(DocStatusStorage):
self._data = None
self._storage_lock = None
self.storage_updated = None
self.final_namespace = f"{self.workspace}_{self.namespace}"
async def initialize(self):
"""Initialize storage data"""
self._storage_lock = get_storage_lock()
self.storage_updated = await get_update_flag(self.namespace)
self.storage_updated = await get_update_flag(self.final_namespace)
async with get_data_init_lock():
# check need_init must before get_namespace_data
need_init = await try_initialize_namespace(self.namespace)
self._data = await get_namespace_data(self.namespace)
need_init = await try_initialize_namespace(self.final_namespace)
self._data = await get_namespace_data(self.final_namespace)
if need_init:
loaded_data = load_json(self._file_name) or {}
async with self._storage_lock:
self._data.update(loaded_data)
logger.info(
f"Process {os.getpid()} doc status load {self.namespace} with {len(loaded_data)} records"
f"Process {os.getpid()} doc status load {self.final_namespace} with {len(loaded_data)} records"
)
async def filter_keys(self, keys: set[str]) -> set[str]:
@ -145,10 +146,10 @@ class JsonDocStatusStorage(DocStatusStorage):
dict(self._data) if hasattr(self._data, "_getvalue") else self._data
)
logger.debug(
f"Process {os.getpid()} doc status writting {len(data_dict)} records to {self.namespace}"
f"Process {os.getpid()} doc status writting {len(data_dict)} records to {self.final_namespace}"
)
write_json(data_dict, self._file_name)
await clear_all_update_flags(self.namespace)
await clear_all_update_flags(self.final_namespace)
async def upsert(self, data: dict[str, dict[str, Any]]) -> None:
"""
@ -158,14 +159,14 @@ class JsonDocStatusStorage(DocStatusStorage):
"""
if not data:
return
logger.debug(f"Inserting {len(data)} records to {self.namespace}")
logger.debug(f"Inserting {len(data)} records to {self.final_namespace}")
async with self._storage_lock:
# Ensure chunks_list field exists for new documents
for doc_id, doc_data in data.items():
if "chunks_list" not in doc_data:
doc_data["chunks_list"] = []
self._data.update(data)
await set_all_update_flags(self.namespace)
await set_all_update_flags(self.final_namespace)
await self.index_done_callback()
@ -299,7 +300,7 @@ class JsonDocStatusStorage(DocStatusStorage):
any_deleted = True
if any_deleted:
await set_all_update_flags(self.namespace)
await set_all_update_flags(self.final_namespace)
async def drop(self) -> dict[str, str]:
"""Drop all document status data from storage and clean up resources
@ -317,11 +318,11 @@ class JsonDocStatusStorage(DocStatusStorage):
try:
async with self._storage_lock:
self._data.clear()
await set_all_update_flags(self.namespace)
await set_all_update_flags(self.final_namespace)
await self.index_done_callback()
logger.info(f"Process {os.getpid()} drop {self.namespace}")
logger.info(f"Process {os.getpid()} drop {self.final_namespace}")
return {"status": "success", "message": "data dropped"}
except Exception as e:
logger.error(f"Error dropping {self.namespace}: {e}")
logger.error(f"Error dropping {self.final_namespace}: {e}")
return {"status": "error", "message": str(e)}

View File

@ -41,15 +41,16 @@ class JsonKVStorage(BaseKVStorage):
self._data = None
self._storage_lock = None
self.storage_updated = None
self.final_namespace = f"{self.workspace}_{self.namespace}"
async def initialize(self):
"""Initialize storage data"""
self._storage_lock = get_storage_lock()
self.storage_updated = await get_update_flag(self.namespace)
self.storage_updated = await get_update_flag(self.final_namespace)
async with get_data_init_lock():
# check need_init must before get_namespace_data
need_init = await try_initialize_namespace(self.namespace)
self._data = await get_namespace_data(self.namespace)
need_init = await try_initialize_namespace(self.final_namespace)
self._data = await get_namespace_data(self.final_namespace)
if need_init:
loaded_data = load_json(self._file_name) or {}
async with self._storage_lock:
@ -63,7 +64,7 @@ class JsonKVStorage(BaseKVStorage):
data_count = len(loaded_data)
logger.info(
f"Process {os.getpid()} KV load {self.namespace} with {data_count} records"
f"Process {os.getpid()} KV load {self.final_namespace} with {data_count} records"
)
async def index_done_callback(self) -> None:
@ -77,10 +78,10 @@ class JsonKVStorage(BaseKVStorage):
data_count = len(data_dict)
logger.debug(
f"Process {os.getpid()} KV writting {data_count} records to {self.namespace}"
f"Process {os.getpid()} KV writting {data_count} records to {self.final_namespace}"
)
write_json(data_dict, self._file_name)
await clear_all_update_flags(self.namespace)
await clear_all_update_flags(self.final_namespace)
async def get_all(self) -> dict[str, Any]:
"""Get all data from storage
@ -150,7 +151,7 @@ class JsonKVStorage(BaseKVStorage):
current_time = int(time.time()) # Get current Unix timestamp
logger.debug(f"Inserting {len(data)} records to {self.namespace}")
logger.debug(f"Inserting {len(data)} records to {self.final_namespace}")
async with self._storage_lock:
# Add timestamps to data based on whether key exists
for k, v in data.items():
@ -169,7 +170,7 @@ class JsonKVStorage(BaseKVStorage):
v["_id"] = k
self._data.update(data)
await set_all_update_flags(self.namespace)
await set_all_update_flags(self.final_namespace)
async def delete(self, ids: list[str]) -> None:
"""Delete specific records from storage by their IDs
@ -192,7 +193,7 @@ class JsonKVStorage(BaseKVStorage):
any_deleted = True
if any_deleted:
await set_all_update_flags(self.namespace)
await set_all_update_flags(self.final_namespace)
async def drop_cache_by_modes(self, modes: list[str] | None = None) -> bool:
"""Delete specific records from storage by cache mode
@ -227,7 +228,7 @@ class JsonKVStorage(BaseKVStorage):
self._data.pop(key, None)
if keys_to_delete:
await set_all_update_flags(self.namespace)
await set_all_update_flags(self.final_namespace)
logger.info(
f"Dropped {len(keys_to_delete)} cache entries for modes: {modes}"
)
@ -276,7 +277,7 @@ class JsonKVStorage(BaseKVStorage):
# del self._data[mode_key]
# # Set update flags to notify persistence is needed
# await set_all_update_flags(self.namespace)
# await set_all_update_flags(self.final_namespace)
# logger.info(f"Cleared cache for {len(chunk_ids)} chunk IDs")
# return True
@ -301,13 +302,13 @@ class JsonKVStorage(BaseKVStorage):
try:
async with self._storage_lock:
self._data.clear()
await set_all_update_flags(self.namespace)
await set_all_update_flags(self.final_namespace)
await self.index_done_callback()
logger.info(f"Process {os.getpid()} drop {self.namespace}")
logger.info(f"Process {os.getpid()} drop {self.final_namespace}")
return {"status": "success", "message": "data dropped"}
except Exception as e:
logger.error(f"Error dropping {self.namespace}: {e}")
logger.error(f"Error dropping {self.final_namespace}: {e}")
return {"status": "error", "message": str(e)}
async def _migrate_legacy_cache_structure(self, data: dict) -> dict: