Server Services
The Server's business logic is organized into modular service classes and functions.
Service Organization
Storage Services (/services/storage/)
| Service/Module | Purpose | Key Components |
|---|---|---|
| BaseStorageService | Abstract base class for storage operations | smart_chunk_text(), extract_metadata(), batch_process_with_progress() |
| storage_services.py | Contains DocumentStorageService class | upload_document(), store_documents(), process_document() |
| document_storage_service.py | Storage utility functions | add_documents_to_supabase(), add_documents_to_supabase_parallel() |
| code_storage_service.py | Code extraction utilities | extract_code_blocks(), generate_code_example_summary(), add_code_examples_to_supabase() |
The storage module has both service classes and utility functions:
- storage_services.py contains the
DocumentStorageServiceclass that extendsBaseStorageService - document_storage_service.py and code_storage_service.py contain standalone utility functions
- This separation allows both object-oriented and functional approaches as needed
Search Services (/services/search/)
| Service/Module | Purpose | Key Components |
|---|---|---|
| search_services.py | Contains SearchService class | perform_rag_query(), search_code_examples_service(), rerank_results() |
| vector_search_service.py | Vector search utilities | search_documents(), search_code_examples(), search_documents_async() |
- search_services.py contains the
SearchServiceclass for high-level search operations - vector_search_service.py contains low-level vector search utility functions
- The SearchService uses vector search utilities internally
RAG Services (/services/rag/)
| Service | Purpose | Key Methods |
|---|---|---|
| CrawlingService | Web crawling operations | crawl_single_page(), crawl_batch_with_progress(), crawl_recursive_with_progress() |
Knowledge Services (/services/knowledge/)
Recent Refactoring: The Knowledge API was refactored from a 1,076-line monolith to a 550-line API layer (49% reduction) by extracting business logic into specialized services.
| Service | Purpose | Key Methods |
|---|---|---|
| CrawlOrchestrationService | Orchestrates entire crawl workflow | orchestrate_crawl(), set_progress_id(), cancel(), is_cancelled() |
| KnowledgeItemService | CRUD operations for knowledge items | list_items(), update_item(), get_available_sources() |
| CodeExtractionService | Code block extraction and processing | extract_and_store_code_examples(), extract_code_blocks_from_html() |
| DatabaseMetricsService | Database statistics and metrics | get_metrics() |
Service Layer Benefits:
- Separation of Concerns: Business logic separated from API routing
- Testability: Services can be unit tested without Socket.IO dependencies
- Reusability: Services used by both API endpoints and MCP tools
- Progress Tracking: Integrated with ProgressTracker utility
Project Services (/services/projects/)
Recent Refactoring: The Projects API was refactored from a monolithic 1,690-line file to a clean service layer architecture, reducing the main API file to 868 lines (42% reduction) while extracting business logic into specialized services.
| Service | File | Purpose | Key Methods |
|---|---|---|---|
| ProjectService | project_service.py | Core project CRUD operations | create_project(), get_project(), update_project(), delete_project(), list_projects() |
| TaskService | task_service.py | Task lifecycle management | create_task(), update_task(), archive_task(), list_tasks() |
| ProjectCreationService | project_creation_service.py | AI-assisted project creation workflow | create_project_with_ai(), generate_features(), process_github_repo() |
| SourceLinkingService | source_linking_service.py | Project-knowledge source relationships | update_project_sources(), format_projects_with_sources() |
| ProgressService | progress_service.py | Real-time operation tracking via Socket.IO | start_operation(), update_progress(), complete_operation(), error_operation() |
| DocumentService | document_service.py | Project document management | add_document(), get_documents(), update_document() |
| VersioningService | versioning_service.py | JSONB field version control | create_version(), get_versions(), restore_version() |
Service Layer Benefits:
- Separation of Concerns: Business logic separated from API routing
- Reusability: Services can be used by multiple API endpoints and MCP tools
- Testability: Each service can be unit tested independently
- Socket.IO Integration: Real-time updates handled at the service layer
Example Service Coordination:
# projects_api.py - Thin API layer
@router.post("/projects")
async def create_project(request: CreateProjectRequest):
progress_id = secrets.token_hex(16)
# Start progress tracking
progress_service.start_operation(progress_id, 'project_creation')
# Background task coordinates multiple services
asyncio.create_task(_create_project_with_ai(progress_id, request))
return {"progress_id": progress_id}
# Background workflow using multiple services
async def _create_project_with_ai(progress_id: str, request: CreateProjectRequest):
creation_service = ProjectCreationService()
# AI-assisted creation with real-time updates
success, result = await creation_service.create_project_with_ai(
progress_id=progress_id,
title=request.title,
github_repo=request.github_repo
)
if success:
# Broadcast update to Socket.IO clients
await broadcast_project_update()
await progress_service.complete_operation(progress_id, result)
Core Services (/services/)
| Service | Purpose | Key Methods |
|---|---|---|
| SourceManagementService | Manage knowledge sources | get_available_sources(), delete_source(), update_source_metadata() |
| CredentialService | Secure credential storage | get_credential(), set_credential() |
| PromptService | AI prompt management | get_prompt(), reload_prompts() |
| ThreadingService | Threading & rate limiting | batch_process(), rate_limited_operation() |
| MCPServiceClient | HTTP client for MCP | crawl_url(), search(), delete_source() |
| ClientManager | Database client management | get_supabase_client() |
| MCPSessionManager | MCP session management | create_session(), validate_session(), cleanup_expired() |
| CrawlerManager | Global crawler instance management | get_crawler(), initialize(), cleanup() |
| LLMProviderService | Multi-provider LLM support | get_llm_client(), get_llm_client_sync(), get_embedding_model() |
Modular Architecture Pattern
The services follow a clear architectural pattern:
-
Utility Modules: Low-level functions for specific operations
vector_search_service.py- Vector database operationsdocument_storage_service.py- Document storage functionscode_storage_service.py- Code extraction functions
-
Service Classes: High-level orchestration of utilities
SearchServiceinsearch_services.pyDocumentStorageServiceinstorage_services.py- Knowledge services in
/services/knowledge/ - Project services in
/services/projects/
-
Base Classes: Abstract implementations for extensibility
BaseStorageService- Common storage patterns
-
Manager Classes: Singleton instances for global resources
CrawlerManager- Global crawler instanceMCPSessionManager- Session management
-
Single Responsibility: Each module has one focused purpose
Embedding Services (/services/embeddings/)
| Module | Purpose | Key Functions |
|---|---|---|
| embedding_service.py | OpenAI embedding creation | create_embeddings_batch(), create_embeddings_batch_async() |
| contextual_embedding_service.py | Context-aware embeddings | generate_contextual_embedding(), generate_contextual_embeddings_batch() |
Utility Classes (/utils/progress/)
| Utility | Purpose | Key Methods |
|---|---|---|
| ProgressTracker | Consolidated Socket.IO progress tracking | start(), update(), complete(), error(), update_batch_progress(), update_crawl_stats() |
The embedding service provides both sync and async versions:
- Use
create_embeddings_batch_async()in async contexts (recommended) - Sync versions exist for backward compatibility but will return zero embeddings if called from async context
- Always prefer async versions for better performance and proper event loop handling
Service Usage Patterns
Direct Service Usage (FastAPI)
# Import from new locations
from ..services.source_management_service import SourceManagementService
from ..services.storage import DocumentStorageService
from ..services.search import SearchService
from ..services.knowledge import CrawlOrchestrationService, KnowledgeItemService
from ..services.crawler_manager import get_crawler
# Example: Knowledge item crawling
@router.post("/knowledge-items/crawl")
async def crawl_knowledge_item(request: KnowledgeItemRequest):
crawler = await get_crawler()
orchestration_service = CrawlOrchestrationService(crawler, get_supabase_client())
result = await orchestration_service.orchestrate_crawl(request.dict())
return result
# Example: Crawl cancellation
@router.post("/knowledge-items/stop/{progress_id}")
async def stop_crawl_task(progress_id: str):
# Cancel orchestration service
orchestration = get_active_orchestration(progress_id)
if orchestration:
orchestration.cancel()
# Cancel asyncio task
if progress_id in active_crawl_tasks:
task = active_crawl_tasks[progress_id]
if not task.done():
task.cancel()
try:
await asyncio.wait_for(task, timeout=2.0)
except (asyncio.CancelledError, asyncio.TimeoutError):
pass
del active_crawl_tasks[progress_id]
# Emit Socket.IO events
await sio.emit('crawl:stopped', {
'progressId': progress_id,
'status': 'cancelled',
'message': 'Crawl cancelled by user'
}, room=progress_id)
return {'success': True}
# Example: Source deletion
@router.delete("/sources/{source_id}")
async def delete_source(source_id: str):
service = SourceManagementService(get_supabase_client())
success, result = service.delete_source(source_id)
return {"success": success, **result}
HTTP Service Usage (MCP)
async def delete_source(ctx: Context, source: str) -> str:
async with httpx.AsyncClient() as client:
response = await client.delete(f"{API_URL}/api/sources/{source}")
return response.json()
Simplified Socket.IO Emission Pattern (2025)
from ..socketio_app import get_socketio_instance
# Get Socket.IO instance
sio = get_socketio_instance()
async def crawl_with_progress(url: str, progress_id: str):
crawling_service = CrawlingService(crawler, supabase_client)
# Simple direct emission - no complex utils
await sio.emit('progress_update', {
'status': 'crawling',
'percentage': 10,
'log': f'Starting crawl of {url}'
}, room=progress_id)
results = await crawling_service.crawl_batch_with_progress(urls=[url])
return results
ProgressTracker Utility Pattern
from ..utils.progress import ProgressTracker
# Create tracker for an operation
tracker = ProgressTracker(sio, progress_id, 'crawl')
# Start tracking
await tracker.start({
'currentUrl': url,
'totalPages': 0,
'processedPages': 0
})
# Update progress
await tracker.update('crawling', 25, 'Processing page 1 of 4')
await tracker.update_crawl_stats(1, 4, 'https://example.com/page1')
# Complete or error
await tracker.complete({
'chunksStored': 150,
'wordCount': 5000
})
# OR
await tracker.error('Failed to crawl: Network timeout')
Key Service Features
Simplified Real-Time Communication (2025 Pattern)
Architecture Changes:
- No database polling - Eliminated 2-second polling system that checked for task/project changes
- Simple @sio.event handlers - Uses official Socket.IO 2025 documentation pattern
- No namespace classes - Removed complex
TasksNamespaceandProjectNamespaceclasses - Root namespace only - Everything runs on
/namespace for simplicity - Direct room management - Simple
sio.enter_room()andsio.emit()calls
Simplified Socket.IO Integration:
# Services import and use Socket.IO directly
from ..socketio_app import get_socketio_instance
sio = get_socketio_instance()
# Simple emission to rooms - no namespace complexity
await sio.emit('task_updated', task_data, room=project_id)
await sio.emit('progress_update', {'status': 'processing', 'percentage': 50}, room=progress_id)
# Event handlers use simple @sio.event decorators
@sio.event
async def join_project(sid, data):
await sio.enter_room(sid, data['project_id'])
tasks = get_tasks_for_project(data['project_id'])
await sio.emit('initial_tasks', tasks, to=sid)
Threading Service
- Rate limiting for OpenAI API (200k tokens/min)
- Thread pools for CPU and I/O operations
- Socket.IO-safe batch processing
- System metrics monitoring
Credential Service
- Encrypted credential storage
- Category-based organization
- Runtime caching for performance
- Environment variable compatibility
Document Storage
- Parallel document processing
- Progress reporting via callbacks
- Automatic embedding generation
- Batch operations for efficiency
Crawler Manager
- Singleton pattern for global crawler instance
- Automatic Docker environment detection
- Proper initialization and cleanup
- Prevents circular imports
LLM Provider Service
- Supports multiple providers: OpenAI, Google Gemini, Ollama
- OpenAI-compatible interface for all providers
- Automatic provider detection from environment
- Both async and sync client creation
Service Dependencies
Refactored Architecture (2025):
FastAPI Endpoints (projects_api.py)
↓
Socket.IO Handlers (socketio_handlers.py)
↓
Service Classes (ProjectService, TaskService, etc.)
↓
Service Functions (add_documents_to_supabase, etc.)
↓
Database (Supabase) / External APIs (OpenAI)
Key Architectural Changes:
- Extracted Socket.IO Handlers: Moved from embedded namespace classes to dedicated
socketio_handlers.pyfile (~225 lines) - Service Layer Separation: Business logic now in separate service classes instead of embedded in API routes
- Simplified Socket.IO: Eliminated complex namespace hierarchy in favor of simple
@sio.eventdecorators - Progress Tracking: Centralized in
ProgressServicewith real-time Socket.IO broadcasting
Architecture Benefits
Improved Code Organization
- Clear Separation: Utilities vs. service classes are clearly separated
- Single Responsibility: Each module has one focused purpose
- Reduced Duplication: Common functionality in base classes
- Better Maintainability: Easy to find and update specific functionality
- Service Layers: Business logic separated from API routing (knowledge, projects)
Enhanced Extensibility
- Base Classes: New storage services can extend
BaseStorageService - Modular Design: Easy to add new search algorithms or storage backends
- Service Composition: Services can be composed for complex operations
- Provider Flexibility: Easy to add new LLM providers via LLMProviderService
Testing & Development
- Isolated Testing: Each service can be unit tested independently
- Mock Injection: Services accept clients via dependency injection
- Clear Interfaces: Well-defined service contracts
- Progress Tracking: Centralized progress management via ProgressTracker
Service Directory Summary
Complete listing of all service modules organized by functionality:
services/
├── storage/ # Document and code storage
│ ├── base_storage_service.py # Abstract base class
│ ├── storage_services.py # DocumentStorageService class
│ ├── document_storage_service.py # Storage utilities
│ └── code_storage_service.py # Code extraction utilities
├── search/ # Search and retrieval
│ ├── search_services.py # SearchService class
│ └── vector_search_service.py # Vector search utilities
├── knowledge/ # Knowledge management (refactored)
│ ├── crawl_orchestration_service.py
│ ├── knowledge_item_service.py
│ ├── code_extraction_service.py
│ └── database_metrics_service.py
├── projects/ # Project management (refactored)
│ ├── project_service.py
│ ├── task_service.py
│ ├── project_creation_service.py
│ ├── source_linking_service.py
│ ├── progress_service.py
│ ├── document_service.py
│ └── versioning_service.py
├── rag/ # RAG operations
│ └── crawling_service.py # Web crawling
├── embeddings/ # Embedding generation
│ ├── embedding_service.py
│ └── contextual_embedding_service.py
└── (core services) # Infrastructure services
├── client_manager.py # Database client
├── crawler_manager.py # Crawler instance
├── credential_service.py # Credentials
├── prompt_service.py # Prompts
├── threading_service.py # Threading/rate limiting
├── mcp_service_client.py # MCP HTTP client
├── mcp_session_manager.py # MCP sessions
├── source_management_service.py # Sources
└── llm_provider_service.py # LLM providers
Configuration
Services are configured via environment variables:
SUPABASE_URL- Database URLSUPABASE_SERVICE_KEY- Database service keyOPENAI_API_KEY- For embeddings (or other provider keys)LLM_PROVIDER- LLM provider selection (openai, google, ollama)CODE_BLOCK_MIN_LENGTH- Minimum code block length in characters (default: 1000)USE_CONTEXTUAL_EMBEDDINGS- Enable contextual embeddingsUSE_HYBRID_SEARCH- Enable hybrid search modeUSE_RERANKING- Enable search result reranking
All services use dependency injection for testability and flexibility.
Troubleshooting
Code Extraction Issues
If code blocks are not being extracted properly:
-
Check the minimum length: The default
CODE_BLOCK_MIN_LENGTHis 1000 characters. This ensures only substantial code blocks are extracted, not small snippets. -
Verify markdown format: Code blocks must be wrapped in triple backticks (```) to be recognized.
-
HTML fallback: If markdown doesn't contain code blocks, the system will try to extract from HTML using
<pre>and<code>tags.
Crawling Performance
The crawling service uses simple configurations for reliability:
- No complex JavaScript waiting by default
- No forced element clicking
- Straightforward markdown extraction
If you need advanced crawling features for specific sites:
- Consider implementing site-specific handlers
- Use the
js_codeparameter only when necessary - Avoid
wait_forselectors unless you're certain the elements exist
Progress Tracking
Progress updates during crawling:
- 0-45%: Crawling pages
- 45-50%: Processing crawl results
- 50-75%: Storing documents with real-time batch updates
- 75-90%: Extracting and storing code examples
- 90-100%: Finalizing and cleanup
If progress appears stuck, check the logs for batch processing updates.