Skip to main content

Server Services

The Server's business logic is organized into modular service classes and functions.

Service Organization

Storage Services (/services/storage/)

Service/ModulePurposeKey Components
BaseStorageServiceAbstract base class for storage operationssmart_chunk_text(), extract_metadata(), batch_process_with_progress()
storage_services.pyContains DocumentStorageService classupload_document(), store_documents(), process_document()
document_storage_service.pyStorage utility functionsadd_documents_to_supabase(), add_documents_to_supabase_parallel()
code_storage_service.pyCode extraction utilitiesextract_code_blocks(), generate_code_example_summary(), add_code_examples_to_supabase()
Storage Module Organization

The storage module has both service classes and utility functions:

  • storage_services.py contains the DocumentStorageService class that extends BaseStorageService
  • document_storage_service.py and code_storage_service.py contain standalone utility functions
  • This separation allows both object-oriented and functional approaches as needed

Search Services (/services/search/)

Service/ModulePurposeKey Components
search_services.pyContains SearchService classperform_rag_query(), search_code_examples_service(), rerank_results()
vector_search_service.pyVector search utilitiessearch_documents(), search_code_examples(), search_documents_async()
Search Module Organization
  • search_services.py contains the SearchService class for high-level search operations
  • vector_search_service.py contains low-level vector search utility functions
  • The SearchService uses vector search utilities internally

RAG Services (/services/rag/)

ServicePurposeKey Methods
CrawlingServiceWeb crawling operationscrawl_single_page(), crawl_batch_with_progress(), crawl_recursive_with_progress()

Knowledge Services (/services/knowledge/)

Recent Refactoring: The Knowledge API was refactored from a 1,076-line monolith to a 550-line API layer (49% reduction) by extracting business logic into specialized services.

ServicePurposeKey Methods
CrawlOrchestrationServiceOrchestrates entire crawl workfloworchestrate_crawl(), set_progress_id(), cancel(), is_cancelled()
KnowledgeItemServiceCRUD operations for knowledge itemslist_items(), update_item(), get_available_sources()
CodeExtractionServiceCode block extraction and processingextract_and_store_code_examples(), extract_code_blocks_from_html()
DatabaseMetricsServiceDatabase statistics and metricsget_metrics()

Service Layer Benefits:

  • Separation of Concerns: Business logic separated from API routing
  • Testability: Services can be unit tested without Socket.IO dependencies
  • Reusability: Services used by both API endpoints and MCP tools
  • Progress Tracking: Integrated with ProgressTracker utility

Project Services (/services/projects/)

Recent Refactoring: The Projects API was refactored from a monolithic 1,690-line file to a clean service layer architecture, reducing the main API file to 868 lines (42% reduction) while extracting business logic into specialized services.

ServiceFilePurposeKey Methods
ProjectServiceproject_service.pyCore project CRUD operationscreate_project(), get_project(), update_project(), delete_project(), list_projects()
TaskServicetask_service.pyTask lifecycle managementcreate_task(), update_task(), archive_task(), list_tasks()
ProjectCreationServiceproject_creation_service.pyAI-assisted project creation workflowcreate_project_with_ai(), generate_features(), process_github_repo()
SourceLinkingServicesource_linking_service.pyProject-knowledge source relationshipsupdate_project_sources(), format_projects_with_sources()
ProgressServiceprogress_service.pyReal-time operation tracking via Socket.IOstart_operation(), update_progress(), complete_operation(), error_operation()
DocumentServicedocument_service.pyProject document managementadd_document(), get_documents(), update_document()
VersioningServiceversioning_service.pyJSONB field version controlcreate_version(), get_versions(), restore_version()

Service Layer Benefits:

  • Separation of Concerns: Business logic separated from API routing
  • Reusability: Services can be used by multiple API endpoints and MCP tools
  • Testability: Each service can be unit tested independently
  • Socket.IO Integration: Real-time updates handled at the service layer

Example Service Coordination:

# projects_api.py - Thin API layer
@router.post("/projects")
async def create_project(request: CreateProjectRequest):
progress_id = secrets.token_hex(16)

# Start progress tracking
progress_service.start_operation(progress_id, 'project_creation')

# Background task coordinates multiple services
asyncio.create_task(_create_project_with_ai(progress_id, request))

return {"progress_id": progress_id}

# Background workflow using multiple services
async def _create_project_with_ai(progress_id: str, request: CreateProjectRequest):
creation_service = ProjectCreationService()

# AI-assisted creation with real-time updates
success, result = await creation_service.create_project_with_ai(
progress_id=progress_id,
title=request.title,
github_repo=request.github_repo
)

if success:
# Broadcast update to Socket.IO clients
await broadcast_project_update()
await progress_service.complete_operation(progress_id, result)

Core Services (/services/)

ServicePurposeKey Methods
SourceManagementServiceManage knowledge sourcesget_available_sources(), delete_source(), update_source_metadata()
CredentialServiceSecure credential storageget_credential(), set_credential()
PromptServiceAI prompt managementget_prompt(), reload_prompts()
ThreadingServiceThreading & rate limitingbatch_process(), rate_limited_operation()
MCPServiceClientHTTP client for MCPcrawl_url(), search(), delete_source()
ClientManagerDatabase client managementget_supabase_client()
MCPSessionManagerMCP session managementcreate_session(), validate_session(), cleanup_expired()
CrawlerManagerGlobal crawler instance managementget_crawler(), initialize(), cleanup()
LLMProviderServiceMulti-provider LLM supportget_llm_client(), get_llm_client_sync(), get_embedding_model()

Modular Architecture Pattern

The services follow a clear architectural pattern:

  • Utility Modules: Low-level functions for specific operations

    • vector_search_service.py - Vector database operations
    • document_storage_service.py - Document storage functions
    • code_storage_service.py - Code extraction functions
  • Service Classes: High-level orchestration of utilities

    • SearchService in search_services.py
    • DocumentStorageService in storage_services.py
    • Knowledge services in /services/knowledge/
    • Project services in /services/projects/
  • Base Classes: Abstract implementations for extensibility

    • BaseStorageService - Common storage patterns
  • Manager Classes: Singleton instances for global resources

    • CrawlerManager - Global crawler instance
    • MCPSessionManager - Session management
  • Single Responsibility: Each module has one focused purpose

Embedding Services (/services/embeddings/)

ModulePurposeKey Functions
embedding_service.pyOpenAI embedding creationcreate_embeddings_batch(), create_embeddings_batch_async()
contextual_embedding_service.pyContext-aware embeddingsgenerate_contextual_embedding(), generate_contextual_embeddings_batch()

Utility Classes (/utils/progress/)

UtilityPurposeKey Methods
ProgressTrackerConsolidated Socket.IO progress trackingstart(), update(), complete(), error(), update_batch_progress(), update_crawl_stats()
Async/Sync Boundaries

The embedding service provides both sync and async versions:

  • Use create_embeddings_batch_async() in async contexts (recommended)
  • Sync versions exist for backward compatibility but will return zero embeddings if called from async context
  • Always prefer async versions for better performance and proper event loop handling

Service Usage Patterns

Direct Service Usage (FastAPI)

# Import from new locations
from ..services.source_management_service import SourceManagementService
from ..services.storage import DocumentStorageService
from ..services.search import SearchService
from ..services.knowledge import CrawlOrchestrationService, KnowledgeItemService
from ..services.crawler_manager import get_crawler

# Example: Knowledge item crawling
@router.post("/knowledge-items/crawl")
async def crawl_knowledge_item(request: KnowledgeItemRequest):
crawler = await get_crawler()
orchestration_service = CrawlOrchestrationService(crawler, get_supabase_client())
result = await orchestration_service.orchestrate_crawl(request.dict())
return result

# Example: Crawl cancellation
@router.post("/knowledge-items/stop/{progress_id}")
async def stop_crawl_task(progress_id: str):
# Cancel orchestration service
orchestration = get_active_orchestration(progress_id)
if orchestration:
orchestration.cancel()

# Cancel asyncio task
if progress_id in active_crawl_tasks:
task = active_crawl_tasks[progress_id]
if not task.done():
task.cancel()
try:
await asyncio.wait_for(task, timeout=2.0)
except (asyncio.CancelledError, asyncio.TimeoutError):
pass
del active_crawl_tasks[progress_id]

# Emit Socket.IO events
await sio.emit('crawl:stopped', {
'progressId': progress_id,
'status': 'cancelled',
'message': 'Crawl cancelled by user'
}, room=progress_id)

return {'success': True}

# Example: Source deletion
@router.delete("/sources/{source_id}")
async def delete_source(source_id: str):
service = SourceManagementService(get_supabase_client())
success, result = service.delete_source(source_id)
return {"success": success, **result}

HTTP Service Usage (MCP)

async def delete_source(ctx: Context, source: str) -> str:
async with httpx.AsyncClient() as client:
response = await client.delete(f"{API_URL}/api/sources/{source}")
return response.json()

Simplified Socket.IO Emission Pattern (2025)

from ..socketio_app import get_socketio_instance

# Get Socket.IO instance
sio = get_socketio_instance()

async def crawl_with_progress(url: str, progress_id: str):
crawling_service = CrawlingService(crawler, supabase_client)

# Simple direct emission - no complex utils
await sio.emit('progress_update', {
'status': 'crawling',
'percentage': 10,
'log': f'Starting crawl of {url}'
}, room=progress_id)

results = await crawling_service.crawl_batch_with_progress(urls=[url])
return results

ProgressTracker Utility Pattern

from ..utils.progress import ProgressTracker

# Create tracker for an operation
tracker = ProgressTracker(sio, progress_id, 'crawl')

# Start tracking
await tracker.start({
'currentUrl': url,
'totalPages': 0,
'processedPages': 0
})

# Update progress
await tracker.update('crawling', 25, 'Processing page 1 of 4')
await tracker.update_crawl_stats(1, 4, 'https://example.com/page1')

# Complete or error
await tracker.complete({
'chunksStored': 150,
'wordCount': 5000
})
# OR
await tracker.error('Failed to crawl: Network timeout')

Key Service Features

Simplified Real-Time Communication (2025 Pattern)

Architecture Changes:

  • No database polling - Eliminated 2-second polling system that checked for task/project changes
  • Simple @sio.event handlers - Uses official Socket.IO 2025 documentation pattern
  • No namespace classes - Removed complex TasksNamespace and ProjectNamespace classes
  • Root namespace only - Everything runs on / namespace for simplicity
  • Direct room management - Simple sio.enter_room() and sio.emit() calls

Simplified Socket.IO Integration:

# Services import and use Socket.IO directly
from ..socketio_app import get_socketio_instance

sio = get_socketio_instance()

# Simple emission to rooms - no namespace complexity
await sio.emit('task_updated', task_data, room=project_id)
await sio.emit('progress_update', {'status': 'processing', 'percentage': 50}, room=progress_id)

# Event handlers use simple @sio.event decorators
@sio.event
async def join_project(sid, data):
await sio.enter_room(sid, data['project_id'])
tasks = get_tasks_for_project(data['project_id'])
await sio.emit('initial_tasks', tasks, to=sid)

Threading Service

  • Rate limiting for OpenAI API (200k tokens/min)
  • Thread pools for CPU and I/O operations
  • Socket.IO-safe batch processing
  • System metrics monitoring

Credential Service

  • Encrypted credential storage
  • Category-based organization
  • Runtime caching for performance
  • Environment variable compatibility

Document Storage

  • Parallel document processing
  • Progress reporting via callbacks
  • Automatic embedding generation
  • Batch operations for efficiency

Crawler Manager

  • Singleton pattern for global crawler instance
  • Automatic Docker environment detection
  • Proper initialization and cleanup
  • Prevents circular imports

LLM Provider Service

  • Supports multiple providers: OpenAI, Google Gemini, Ollama
  • OpenAI-compatible interface for all providers
  • Automatic provider detection from environment
  • Both async and sync client creation

Service Dependencies

Refactored Architecture (2025):

FastAPI Endpoints (projects_api.py)

Socket.IO Handlers (socketio_handlers.py)

Service Classes (ProjectService, TaskService, etc.)

Service Functions (add_documents_to_supabase, etc.)

Database (Supabase) / External APIs (OpenAI)

Key Architectural Changes:

  • Extracted Socket.IO Handlers: Moved from embedded namespace classes to dedicated socketio_handlers.py file (~225 lines)
  • Service Layer Separation: Business logic now in separate service classes instead of embedded in API routes
  • Simplified Socket.IO: Eliminated complex namespace hierarchy in favor of simple @sio.event decorators
  • Progress Tracking: Centralized in ProgressService with real-time Socket.IO broadcasting

Architecture Benefits

Improved Code Organization

  • Clear Separation: Utilities vs. service classes are clearly separated
  • Single Responsibility: Each module has one focused purpose
  • Reduced Duplication: Common functionality in base classes
  • Better Maintainability: Easy to find and update specific functionality
  • Service Layers: Business logic separated from API routing (knowledge, projects)

Enhanced Extensibility

  • Base Classes: New storage services can extend BaseStorageService
  • Modular Design: Easy to add new search algorithms or storage backends
  • Service Composition: Services can be composed for complex operations
  • Provider Flexibility: Easy to add new LLM providers via LLMProviderService

Testing & Development

  • Isolated Testing: Each service can be unit tested independently
  • Mock Injection: Services accept clients via dependency injection
  • Clear Interfaces: Well-defined service contracts
  • Progress Tracking: Centralized progress management via ProgressTracker

Service Directory Summary

Complete listing of all service modules organized by functionality:

services/
├── storage/ # Document and code storage
│ ├── base_storage_service.py # Abstract base class
│ ├── storage_services.py # DocumentStorageService class
│ ├── document_storage_service.py # Storage utilities
│ └── code_storage_service.py # Code extraction utilities
├── search/ # Search and retrieval
│ ├── search_services.py # SearchService class
│ └── vector_search_service.py # Vector search utilities
├── knowledge/ # Knowledge management (refactored)
│ ├── crawl_orchestration_service.py
│ ├── knowledge_item_service.py
│ ├── code_extraction_service.py
│ └── database_metrics_service.py
├── projects/ # Project management (refactored)
│ ├── project_service.py
│ ├── task_service.py
│ ├── project_creation_service.py
│ ├── source_linking_service.py
│ ├── progress_service.py
│ ├── document_service.py
│ └── versioning_service.py
├── rag/ # RAG operations
│ └── crawling_service.py # Web crawling
├── embeddings/ # Embedding generation
│ ├── embedding_service.py
│ └── contextual_embedding_service.py
└── (core services) # Infrastructure services
├── client_manager.py # Database client
├── crawler_manager.py # Crawler instance
├── credential_service.py # Credentials
├── prompt_service.py # Prompts
├── threading_service.py # Threading/rate limiting
├── mcp_service_client.py # MCP HTTP client
├── mcp_session_manager.py # MCP sessions
├── source_management_service.py # Sources
└── llm_provider_service.py # LLM providers

Configuration

Services are configured via environment variables:

  • SUPABASE_URL - Database URL
  • SUPABASE_SERVICE_KEY - Database service key
  • OPENAI_API_KEY - For embeddings (or other provider keys)
  • LLM_PROVIDER - LLM provider selection (openai, google, ollama)
  • CODE_BLOCK_MIN_LENGTH - Minimum code block length in characters (default: 1000)
  • USE_CONTEXTUAL_EMBEDDINGS - Enable contextual embeddings
  • USE_HYBRID_SEARCH - Enable hybrid search mode
  • USE_RERANKING - Enable search result reranking

All services use dependency injection for testability and flexibility.

Troubleshooting

Code Extraction Issues

If code blocks are not being extracted properly:

  1. Check the minimum length: The default CODE_BLOCK_MIN_LENGTH is 1000 characters. This ensures only substantial code blocks are extracted, not small snippets.

  2. Verify markdown format: Code blocks must be wrapped in triple backticks (```) to be recognized.

  3. HTML fallback: If markdown doesn't contain code blocks, the system will try to extract from HTML using <pre> and <code> tags.

Crawling Performance

The crawling service uses simple configurations for reliability:

  • No complex JavaScript waiting by default
  • No forced element clicking
  • Straightforward markdown extraction

If you need advanced crawling features for specific sites:

  1. Consider implementing site-specific handlers
  2. Use the js_code parameter only when necessary
  3. Avoid wait_for selectors unless you're certain the elements exist

Progress Tracking

Progress updates during crawling:

  • 0-45%: Crawling pages
  • 45-50%: Processing crawl results
  • 50-75%: Storing documents with real-time batch updates
  • 75-90%: Extracting and storing code examples
  • 90-100%: Finalizing and cleanup

If progress appears stuck, check the logs for batch processing updates.