The logseq-python library includes a comprehensive caching system designed to optimize pipeline processing performance by storing and reusing extracted content and analysis results.
The caching system provides:
- Multiple Backend Support: Memory, SQLite, and Redis backends
- Intelligent Key Generation: Content-based cache keys with hash-based deduplication
- TTL Support: Time-based cache expiration
- LRU Eviction: Least Recently Used eviction for memory-constrained environments
- Statistics: Hit rate tracking and cache performance metrics
- Thread Safety: Concurrent access support
- Pipeline Integration: Seamless integration with extraction and analysis components
Represents a cached item with metadata:
@dataclass
class CacheEntry:
key: str
value: Any
created_at: datetime
expires_at: Optional[datetime] = None
access_count: int = 0
last_accessed: Optional[datetime] = None
tags: List[str] = NoneDefines the interface for cache backends:
get(key)- Retrieve cache entryset(key, value, ttl, tags)- Store cache entrydelete(key)- Remove cache entryclear()- Remove all entrieskeys(pattern)- List keys matching patternstats()- Get cache statistics
In-memory cache with LRU eviction:
cache = MemoryCache(max_size=10000)Features:
- Fast access (O(1) for get/set operations)
- LRU eviction when at capacity
- TTL expiration support
- Pattern-based key matching
- No persistence between sessions
Persistent cache using SQLite database:
cache = SQLiteCache(db_path="/path/to/cache.db")Features:
- Persistent storage
- ACID transactions
- Efficient indexed lookups
- Cleanup operations for expired entries
- Cross-session data persistence
Distributed cache using Redis:
cache = RedisCache(host="localhost", port=6379, db=0)Features:
- Distributed caching
- Network-based storage
- Redis-native TTL handling
- High availability support
- Shared cache across multiple processes
High-level interface for pipeline processing:
cache = PipelineCache(backend, default_ttl=3600)Methods:
get_extracted_content(content, extractor)- Get cached extraction resultscache_extracted_content(content, extractor, result, ttl)- Cache extraction resultsget_analysis_result(content, analyzer)- Get cached analysis resultscache_analysis_result(content, analyzer, result, ttl)- Cache analysis results
Automatic caching wrappers for extractors and analyzers:
cached_extractor = CachedExtractor(extractor, cache)
cached_analyzer = CachedAnalyzer(analyzer, cache)from logseq_py.pipeline.cache import create_memory_cache
# Create memory cache
cache = create_memory_cache(max_size=5000)
# Cache extraction results
content = "Some content to extract from"
extractor_name = "url_extractor"
result = {"extracted": "data", "metadata": {"count": 5}}
cache.cache_extracted_content(content, extractor_name, result)
# Retrieve cached results
cached = cache.get_extracted_content(content, extractor_name)from logseq_py.pipeline.cache import create_sqlite_cache
# Create SQLite cache (persists between sessions)
cache = create_sqlite_cache("/path/to/cache.db")
# Cache analysis results with TTL
content = "Content to analyze"
analyzer_name = "sentiment_analyzer"
result = {"sentiment": "positive", "confidence": 0.85}
cache.cache_analysis_result(content, analyzer_name, result, ttl=3600)from logseq_py.pipeline.cache import create_memory_cache, CachedExtractor, CachedAnalyzer
# Create cache
cache = create_memory_cache()
# Create cached wrappers
cached_extractor = CachedExtractor(my_extractor, cache)
cached_analyzer = CachedAnalyzer(my_analyzer, cache)
# Use as normal - caching happens automatically
for block in blocks:
# Check cache first, extract if not found, cache the result
extracted = cached_extractor.extract(block)
analyzed = cached_analyzer.analyze(block.content)# Get cache statistics
stats = cache.get_stats()
print(f"Hit rate: {stats['hit_rate']:.2%}")
print(f"Cache size: {stats['size']} entries")
print(f"Total requests: {stats['total_requests']}")from logseq_py.pipeline.cache import cached
@cached(cache, ttl=300)
def expensive_computation(data):
# Expensive operation that benefits from caching
return process_data(data)
# First call executes function and caches result
result1 = expensive_computation(data)
# Second call returns cached result
result2 = expensive_computation(data) # Fast!# Memory cache with size limit
cache = create_memory_cache(max_size=1000)
# SQLite cache (size managed by disk space)
cache = create_sqlite_cache("/path/to/cache.db")
# Clear cache when needed
cleared_count = cache.clear()# Default TTL for all cache operations
cache = PipelineCache(backend, default_ttl=7200) # 2 hours
# Override TTL for specific operations
cache.cache_extracted_content(content, extractor, result, ttl=300) # 5 minutesCache keys are automatically generated based on:
- Content hash (SHA-256, first 16 characters)
- Extractor/analyzer name
- Operation type (extract/analyze)
Example key: extract:url_extractor:a1b2c3d4e5f6g7h8
- Memory: Fast, single-process, development/testing
- SQLite: Persistent, single-machine, production
- Redis: Distributed, multi-process, high-scale production
# Short TTL for frequently changing data
cache.cache_analysis_result(content, "news_analyzer", result, ttl=300) # 5 min
# Long TTL for stable data
cache.cache_extracted_content(content, "pdf_extractor", result, ttl=86400) # 24 hoursdef log_cache_stats(cache):
stats = cache.get_stats()
if stats['hit_rate'] < 0.5:
logger.warning(f"Low cache hit rate: {stats['hit_rate']:.2%}")try:
result = cache.get_extracted_content(content, extractor)
if result is None:
result = extractor.extract(content)
cache.cache_extracted_content(content, extractor, result)
except Exception as e:
logger.warning(f"Cache operation failed: {e}")
result = extractor.extract(content) # Fallback to direct extraction# Tag cache entries for easier management
cache.backend.set("key", "value", tags=["extraction", "youtube"])
# Invalidate by tag when needed
cache.invalidate_by_tag("youtube")The caching system integrates seamlessly with the async pipeline:
import asyncio
from logseq_py.pipeline.async_pipeline import AsyncBatchProcessor
from logseq_py.pipeline.cache import create_memory_cache, CachedExtractor
async def process_with_caching():
cache = create_memory_cache()
cached_extractor = CachedExtractor(my_extractor, cache)
batch_processor = AsyncBatchProcessor(max_concurrent=10)
async def process_batch(blocks):
tasks = [cached_extractor.extract(block) for block in blocks]
return await asyncio.gather(*tasks)
async for results in batch_processor.process_batches(blocks, process_batch):
# Process results with cached extraction
handle_results(results)-
High Memory Usage
- Reduce cache size:
cache = create_memory_cache(max_size=1000) - Use shorter TTL:
ttl=300 - Switch to SQLite backend
- Reduce cache size:
-
Low Hit Rate
- Check if content is being processed multiple times
- Verify cache keys are consistent
- Consider longer TTL values
-
SQLite Database Locks
- Use connection pooling
- Set appropriate timeout values
- Consider WAL mode for better concurrency
-
Redis Connection Issues
- Check Redis server status
- Verify network connectivity
- Implement connection retry logic
Enable detailed logging:
import logging
logging.getLogger("pipeline.cache").setLevel(logging.DEBUG)Check cache statistics regularly:
stats = cache.get_stats()
print(f"Cache performance: {stats}")- Pros: Fastest access, no I/O overhead
- Cons: Limited by memory, no persistence
- Best for: Development, single-process applications
- Pros: Persistent, ACID compliant, good performance
- Cons: File I/O overhead, single-writer limitation
- Best for: Single-machine production applications
- Pros: Distributed, high availability, excellent performance
- Cons: Network overhead, additional infrastructure
- Best for: Multi-process, distributed applications
# Export from one cache
old_cache = create_memory_cache()
new_cache = create_sqlite_cache("/path/to/new.db")
# Migrate data (example)
for key in old_cache.backend.keys("*"):
entry = old_cache.backend.get(key)
if entry and not entry.is_expired():
new_cache.backend.set(key, entry.value, tags=entry.tags)# Clean up expired entries (SQLite)
if hasattr(cache.backend, 'cleanup_expired'):
expired_count = cache.backend.cleanup_expired()
print(f"Cleaned up {expired_count} expired entries")
# Monitor cache size
stats = cache.get_stats()
if stats.get('size', 0) > 10000:
# Consider clearing old entries or increasing capacity
passThe caching system provides powerful performance optimization capabilities for your Logseq pipeline processing while maintaining flexibility and ease of use.