AI in CareerForge
Deep Dive
A complete technical breakdown of how artificial intelligence is embedded in CareerForge — architecture, implementation code, guardrails, current strengths and weaknesses, and the path toward a self-improving, simulation-driven platform.
Last updated: 23 April 2026 · Internal use only
What AI Does in CareerForge
AI in CareerForge is not a single feature — it is an intelligence layer that runs across all four portals, generating personalised career guidance, synthesising market data, and building structured learning paths. Every AI call is grounded in real business context (the employee's profile, skill inventory, anonymised coaching relationship) and constrained to JSON-structured output.
AI Architecture
The AI layer is a dedicated microservice (ai-connector-service, port 8001) plus two Celery workers (celery-worker, celery-beat). All other backend services import directly from the shared RAG module — there is no inter-service HTTP call to the AI connector for inference; only for connector management (CRUD) and async task status polling.
Component Map
| Component | File | Port / Trigger | Role |
|---|---|---|---|
| ai-connector-service | services/ai_connector/main.py | :8001 | FastAPI: connector CRUD, task status polling, Prometheus metrics |
| rag.py | services/ai_connector/rag.py | imported | LLM fallback chain, caching, text search, all public AI functions |
| embeddings.py | services/ai_connector/embeddings.py | imported | Text chunking + embedding generation (Ollama / OpenAI) |
| celery-worker | tasks/ingestion.py, tasks/pipeline.py | Redis queue | Connector sync, learning path generation, intelligence digest |
| celery-beat | services/ai_connector/celery_app.py | Cron (hourly) | Schedules sync_all_connectors every hour |
| PostgreSQL / pgvector | shared/models/ai.py | DB | Stores ai_documents, vectors (1536 dims), connector configs, ingestion history |
| Redis | celery_app.py | :6379 | Celery broker + result backend |
LLM Provider Fallback Chain
async def _ai_chat(prompt: str, system: str = "") -> str: # rag.py:72 — try Claude first (more capable), fall back to Perplexity (has live web search) if ANTHROPIC_API_KEY: result = await _anthropic_chat(prompt, system) if result: return result return await _perplexity_chat(prompt, system)
Embedding Provider Fallback
Both providers produce 1536-dimensional vectors, stored in PostgreSQL with the pgvector extension.
The RAG Pattern
RAG — Retrieval-Augmented Generation — grounds LLM responses in real documents. In CareerForge the knowledge base is the ai_documents table, populated by connector ingestion. Every AI function first searches this table, prepends relevant chunks to the prompt, then calls the LLM.
Request Flow
(employee / coach)
employee/coach svc
cache lookup
ai_documents LIKE
context + task
Claude / Perplexity
+ cache result
Public AI Functions
| Function | Consumer | Output Schema | Cache TTL |
|---|---|---|---|
| get_role_suggestions_for_profile() rag.py:236 |
Employee portal | [{role_title, match_score, description, required_skills[]}] | 24 hours |
| get_role_skill_requirements() rag.py:272 |
Employee portal | [{skill, skill_ids[], required_level, market_demand}] | 12 hours |
| get_coaching_advice() rag.py:301 |
Coach portal | {answer, sources[], model_used, generated_at} | No cache |
| get_emerging_roles() rag.py:340 |
Coach portal (Catalogues) | [{role_title, demand_level, avg_salary_range, required_skills[], growth_rate_percent}] | 2 hours |
| get_market_trends() rag.py:373 |
Coach portal (Analytics) | [{topic, trend_direction, summary, confidence, sources[]}] | 6 hours |
Caching Strategy
AI responses are expensive — both in latency (1–5 seconds) and API cost. CareerForge uses the ai_documents PostgreSQL table as a key-value cache. This gives the cache an automatic audit trail: every cache entry is a document with an ingestion timestamp.
TTL Tiers (by volatility)
Cache Key Design
| Key Pattern | Scoped By | Example |
|---|---|---|
| __cache_roles_{pseudo_id}__ | Individual employee | __cache_roles_a3f7b2__ |
| __cache_roles_{spec}_{seniority}__ | Specialisation + level | __cache_roles_backend_senior__ |
| __cache_roles_catalogue_{spec}__ | Specialisation (coach view) | __cache_roles_catalogue_general__ |
| __cache_gap_{safe_role}__ | Target role | __cache_gap_data_engineer__ |
| __cache_trends_{topic}__ | Topic keyword | __cache_trends_cloud__ |
| __digest__ | Global singleton | __digest__ |
# rag.py:120 — cache lookup with TTL check async def _get_cached_response(cache_key: str, ttl_hours: int) -> list | dict | None: async with AsyncSessionLocal() as session: row = (await session.execute( sql_text("SELECT content, ingested_at FROM ai_documents " "WHERE source_name = :key ORDER BY ingested_at DESC LIMIT 1"), {"key": cache_key} )).fetchone() if not row: return None gen_at = datetime.fromisoformat(row.ingested_at) if (datetime.now(UTC) - gen_at) > timedelta(hours=ttl_hours): return None # expired — caller will regenerate return json.loads(row.content)
Connector Ingestion
Connectors are configurable data sources — REST APIs, RSS feeds, JSON endpoints — that feed real-world market intelligence into CareerForge. The Admin portal provides CRUD for connectors. Ingestion runs hourly via Celery Beat.
Data Models
| Table | Purpose | Key Fields |
|---|---|---|
| connector_configs | Registry of data sources | name, source_type, config_json_encrypted, fetch_interval_minutes, trend_score |
| connector_sources | Individual URLs per connector | connector_id, url, label, last_status_code |
| ai_documents | Ingested content + AI cache | source_name, content, topics[], trend_score, metadata_json |
| ai_document_vectors | Chunk embeddings | document_id, chunk_index, chunk_text, embedding vector(1536) |
| ingestion_history | Audit trail per sync | connector_id, chunks_ingested, duration_seconds, success, error_message |
Ingestion Pipeline (tasks/ingestion.py)
Response normalisation handles multiple API schemas automatically: items[], data[], results[], or bare dict. Field aliases resolve content/body/description/text and url/link transparently.
Celery Pipeline Tasks
Long-running AI generation — learning paths and the weekly digest — runs asynchronously in Celery workers, so API responses remain fast. The client polls GET /api/v1/tasks/{task_id} for status.
| Task | Trigger | LLM Budget | Output |
|---|---|---|---|
| generate_initial_profile pipeline.py:19 |
Employee registration (fire-and-forget) | Claude with thinking 2048 | LearningPath node graph stored in DB |
| generate_learning_path pipeline.py:125 |
Manual refresh by employee | Claude with thinking 2048 | Updated LearningPath, task_id returned for polling |
| generate_intelligence_digest pipeline.py:227 |
Weekly (Beat) or coach requests refresh | Claude with thinking 4096 | JSON digest: rising skills, role movements, recommended actions |
| sync_all_connectors ingestion.py:249 |
Hourly Beat schedule | No LLM — embedding model only | New documents + vectors in ai_documents |
Learning Path Output Schema
{
"target_role": "Senior Data Engineer",
"summary": "12-month path targeting 8 skills across 4 phases...",
"nodes": [
{
"id": "n1",
"title": "Apache Kafka Fundamentals",
"type": "skill", // skill | cert | project | milestone
"description": "...",
"estimated_hours": 40,
"prerequisites": [],
"resources": [{ "type": "course", "title": "...", "url": "..." }]
}
]
}
Intelligence Digest Schema
{
"week_summary": "...",
"top_skills_rising": [{ "skill": "Rust", "trend": "growing 34% YoY" }],
"role_movements": [{ "role": "AI Engineer", "direction": "rising", "insight": "..." }],
"recommended_actions": ["Upskill Python ML team in LangGraph", "..."],
"generated_at": "2026-04-21T08:00:00Z"
}
Code Walkthrough
The following are the key implementation files with their roles and the most important code patterns.
ai_connector/
rag.py ← LLM orchestration, caching, all public AI functions (401 lines)
embeddings.py ← Ollama/OpenAI embedding + chunking (67 lines)
main.py ← FastAPI app: connector CRUD + task status API (52 lines)
celery_app.py ← Celery config + Beat schedule (39 lines)
db.py ← Async SQLAlchemy session (46 lines)
tasks/
ingestion.py ← Connector sync: HTTP fetch → chunk → embed → store (273 lines)
pipeline.py ← Learning path + digest generation with extended thinking (333 lines)
routers/
tasks.py ← GET /tasks/{id} status polling (35 lines)
employee/routers/
learning.py ← /learning-path, /skill-gap, /role-suggestions (214 lines)
registration.py ← triggers generate_initial_profile.delay() (175 lines)
coach/routers/
advice.py ← POST /employees/{pseudo_id}/ai-advice (79 lines)
catalogues.py ← GET /catalogues/roles (194 lines)
analytics.py ← GET /analytics/market-trend, /intelligence-digest (288 lines)
shared/
models/
ai.py ← ConnectorConfig, AiDocument, AiDocumentVector, IngestionHistory (103 lines)
auth/
keycloak.py ← RS256 JWT verify, JWKS caching 300s, realm-based verifiers (169 lines)
Prompt Engineering Pattern
All prompts follow a consistent 3-part template: identity declaration → context injection → JSON-only output constraint.
# rag.py:253–265 — role suggestions prompt assembly system = ( "You are a career intelligence system specialising in IT career paths. " "Suggest emerging roles based on current 2025 market data. " "Respond ONLY with a valid JSON array of objects with keys: " "role_title (string), match_score (0.0-1.0), description (string), " "required_skills (array of strings)." ) context_block = ("Connector market data:\n" + context + "\n\n") if context else "" prompt = ( context_block + f"Suggest 5 emerging IT career roles for a {seniority} {specialization} professional " + "in 2025. Focus on high-demand, growing roles. Return JSON array only." )
JSON Parsing with Graceful Fallback
# rag.py:81 — robust JSON extraction from LLM output def _parse_json(text: str) -> list: if not text: return [] text = text.strip() if "```" in text: # strip markdown code fences for part in text.split("```"): p = part.strip() if p.startswith("json"): p = p[4:].strip() if p.startswith("[") or p.startswith("{"): text = p; break try: result = json.loads(text) return result if isinstance(result, list) else [result] except json.JSONDecodeError: m = re.search(r"\[.*\]", text, re.DOTALL) # last-chance regex if m: try: return json.loads(m.group()) except: pass return [] # graceful empty on failure
Safety & Guardrails
Guardrails in CareerForge operate at three levels: prompt-level constraints, input validation, and infrastructure-level access control. The goal is to prevent PII leaks, hallucinated identities, runaway API spend, and prompt injection from malicious connector data.
| Guardrail | Layer | Implementation |
|---|---|---|
| PII never sent to LLM | Prompt | Coaching prompts use pseudo_id + aggregated profile only. System prompt: "Never reveal or speculate about personal identity or PII." |
| JSON-only output | Prompt | Every system prompt ends with "Respond ONLY with a valid JSON array". _parse_json() strips markdown and falls back to empty list — never returns free text to the UI. |
| Input length cap | API | Coach advice question: max 2000 chars (FastAPI validator, advice.py:42). 422 returned if exceeded. |
| LLM timeout | HTTP | httpx.AsyncClient(timeout=45.0) for all LLM calls. Connector fetches use 60s timeout. Prevents stuck requests blocking workers. |
| Celery task isolation | Infra | task_acks_late=True, worker_prefetch_multiplier=1: tasks only ACK on success, preventing duplicate processing on crash. |
| Connector dedup | Ingestion | SHA-256 content hash checked before insertion. Prevents prompt poisoning from re-injecting modified documents at the same URL. |
| Encrypted connector config | DB | Connector API keys stored in config_json_encrypted (AES-256-GCM via Vault-backed key). Never logged. |
| API key injection | Infra | LLM keys come from Kubernetes Sealed Secrets → env vars. Never in code or ConfigMaps. |
| Retry cap | Celery | max_retries=3 on connector sync tasks. Failed ingestion writes to IngestionHistory.error_message and stops retrying after 3 attempts. |
| CORS allow-list | Network | Only the four portal domains are allowed; wildcard CORS is explicitly not configured. |
Role-Based AI Access
Keycloak enforces which user roles can access which AI features. Each portal authenticates against its own realm; the token's realm_access.roles claim is verified on every request.
| AI Feature | Required Role | Endpoint |
|---|---|---|
| Learning path generation | employee | GET /learning-path |
| Skill gap analysis | employee | GET /skill-gap |
| Role suggestions | employee | GET /role-suggestions |
| AI coaching advice | coach | POST /employees/{pseudo_id}/ai-advice |
| Emerging roles catalogue | coach | GET /catalogues/roles |
| Market trends | coach | GET /analytics/market-trend |
| Intelligence digest | coach | GET /analytics/intelligence-digest |
| Connector management | SUPER_ADMIN / APP_ADMIN | POST/PUT/DELETE /connectors |
| Trigger digest refresh | coach | GET /analytics/intelligence-digest?refresh=true |
PII Protection
Personal data is protected at two levels: pseudonymisation of the employee identity before it reaches any AI prompt, and AES-256-GCM encryption of the stored PII fields.
Pseudonymisation Flow
(real user ID)
with HMAC_KEY_HEX
(opaque token)
(no name/email)
Coaches see only the pseudo_id, the employee's specialisation, seniority, experience years, and region (not city). The HMAC key is injected from Sealed Secrets — not derivable from any data in the LLM prompt.
At-rest Encryption
Employee PII fields (name, email, exact location) are stored encrypted in employee.encrypted_pii using AES-256-GCM. The key is injected at runtime from DEV_PII_KEY_HEX (Vault-backed in production, Sealed Secret in the demo cluster).
Strengths & Weaknesses
An honest assessment of what the current AI implementation does well and where it falls short.
Strengths
Weaknesses
Company Simulation
CareerForge is seeded with 14 realistic personas across 8 engineering practices, spanning 5 seniority levels and 4 user roles. This creates a miniature simulation of a real outsourcing company's internal dynamics — skill distributions, coaching relationships, cohort readiness, and management visibility.
Simulated Population
| Role | Count | Practices Covered | AI Interactions |
|---|---|---|---|
| Employees | 7 | Backend, Frontend, Data, Cloud, ML, QA, Security, PMO | Learning paths, role suggestions, skill gap |
| Coaches | 6 | Backend+Frontend, Data+ML, Cloud+Security, Mobile, QA+PMO, Backend | AI coaching advice, market trends, digest |
| Managers | 1 | Cross-practice | Team readiness dashboard (AI-scored) |
| Admins | 4 | HR, Platform, Org, L&D | Connector management, system config |
Dynamics the Simulation Captures
- Skill distribution heterogeneity — a junior QA engineer has a different AI-generated learning path than a senior Data Engineer, even within the same company
- Coach caseload variation — some coaches cover two practices; their AI advice calls span different specialisations, stress-testing the system prompt's generalism
- Temporal dynamics — caches expire at different rates; the snapshot of "current" AI output shifts over the 2h/6h/24h/7d cycle even with no user action
- Commitment and sign-off workflow — forms create durable commitments that accumulate; over time the DB grows a history of what was promised vs delivered
- Manager visibility lag — the manager sees readiness scores that reflect cached AI output; there is a natural delay between a skill being updated and the manager's dashboard updating
- Practice-level cohort patterns — employees in the same practice tend to receive similar role suggestions (same specialisation cache key), surfacing whether the LLM's recommendations are varied enough
Reconciliation Loops
A reconciliation loop is a closed feedback cycle: the platform observes the outcome of its AI recommendations, compares them to what actually happened, and adjusts future recommendations accordingly. CareerForge has the infrastructure to support several such loops — some partially built, some requiring additional work.
The Current Data Signals
AI generates recommendation
(learning path, role suggestion)
Employee accepts & acts
(commits, completes skill)
Coach signs off
(IDP form, verified skill)
Outcome measured
(readiness Δ, role movement)
Signal fed back
(update prompt, weights)
Loop 1 — Connector Quality Feedback
Current state: fully implemented infrastructure, loop not closed.
Every connector source has a trend_score field and last_status_code. The ingestion task records IngestionHistory.chunks_ingested and success. A reconciliation job could:
- Correlate connector sources with AI output quality (do documents from source X lead to higher-confidence market trends?)
- Automatically lower trend_score for sources that repeatedly 404 or return empty content
- Promote sources whose topics appear frequently in accepted learning paths
Loop 2 — Learning Path Acceptance Rate
Current state: data exists, loop not implemented.
The learning_path_nodes table records which nodes employees mark complete. A weekly digest job could compute:
-- Proposed reconciliation query SELECT node_type, skill_name, COUNT(*) FILTER (WHERE status = 'completed') AS completed, COUNT(*) FILTER (WHERE status = 'skipped') AS skipped, COUNT(*) AS total FROM learning_path_nodes WHERE created_at > NOW() - INTERVAL '30 days' GROUP BY node_type, skill_name ORDER BY (skipped::float / NULLIF(total, 0)) DESC;
Nodes with high skip rates could feed back into the system prompt: "Avoid recommending X for specialisation at seniority level — historically low completion rate."
Loop 3 — Role Suggestion Relevance
Current state: data partially exists.
If an employee clicks through a role suggestion and sets it as their target role, that is an acceptance signal. The ratio of suggested-to-accepted roles, broken down by specialisation and seniority, tells us whether the LLM is recommending roles people actually want. This can be used to:
- Adjust the role suggestion prompt to reflect which roles in which practices actually get accepted
- Build a fine-tuning dataset of (profile → accepted role) pairs
- Invalidate the 24h cache early for a sub-segment if the acceptance rate drops below a threshold
Loop 4 — Coaching Advice Quality Score
Current state: not implemented. Requires UI addition.
Add a thumbs-up/down widget after each AI coaching response. Store the score in a new ai_feedback table keyed by (pseudo_id, question_hash, model_used, score). The reconciliation job then:
- Computes per-model quality scores over rolling 30-day windows
- Routes to the higher-scoring model when both are available
- Feeds low-scored (question, answer) pairs back to the prompt as negative examples
Loop 5 — Market Trend Accuracy
Current state: infrastructure exists for trend_direction tracking. Loop not closed.
Perplexity sonar-pro returns real-time web search context. Store the predicted trend_direction at time T. At time T+30d, re-run the trend query and compare. If the direction is consistent, increase the confidence score for that topic. If it reversed, lower it and flag for human review. Over time this builds a calibration curve for the AI's trend forecasting ability.
Self-Improvement Architecture (Target State)
# Proposed: weekly reconciliation task @celery_app.task(name="reconcile_ai_quality") async def reconcile_ai_quality(): signals = { "path_skip_rates": await compute_node_skip_rates(), "role_accept_rates": await compute_role_acceptance(), "advice_scores": await aggregate_feedback_scores(), "connector_quality": await score_connector_sources(), } # Write signals as a new ai_document for RAG context injection await store_cached_response("__reconciliation_signals__", signals) # Update connector trend_scores based on quality signals await update_connector_scores(signals["connector_quality"]) # Invalidate stale caches for sub-segments with poor outcomes await invalidate_poor_performing_caches(signals)
Future Roadmap
The following developments are ordered by impact and implementation complexity. Each builds on the existing infrastructure without requiring architectural rewrites.
| # | Feature | Impact | Complexity | Depends On |
|---|---|---|---|---|
| 1 |
Vector similarity search (replace LIKE) Replace _cached_text_search() LIKE queries with pgvector cosine similarity over ai_document_vectors
|
High | Low | pgvector already installed; embeddings already stored |
| 2 |
LLM rate limiter per user/hour Redis counter keyed by pseudo_id:hour; 429 after N coaching calls
|
Medium | Low | Redis already available as Celery backend |
| 3 |
Cache the coaching advice on question hash SHA-256(pseudo_id + question) as cache key, 1h TTL |
Medium | Low | Existing cache infrastructure |
| 4 |
Cache pruning task Nightly Celery Beat job deletes ai_documents entries older than 2× their TTL
|
Low-med | Low | Celery Beat already running |
| 5 |
Coaching advice feedback widget 👍/👎 on each AI advice response, stored in ai_feedback table
|
High | Medium | Frontend change + new DB table |
| 6 |
Reconciliation Celery task Weekly job: compute skip rates, acceptance rates, feedback scores; update connector trend_score; inject signals into next prompt
|
High | Medium | Feedback widget (#5) + path completion tracking |
| 7 |
Anthropic prompt caching Add cache_control: {"type": "ephemeral"} to the system prompt block; reduce latency and cost by 60–80% on repeated similar queries
|
High | Low | Anthropic API key available |
| 8 |
Fine-tuning dataset pipeline Export (profile, question) → (accepted advice) pairs as JSONL for fine-tuning Claude or a smaller local model |
High | High | Feedback loop (#6) + 3+ months of data |
| 9 |
Local LLM option (Ollama Mistral/Llama) Add Ollama as a third fallback in _ai_chat() for air-gapped environments; already used for embeddings
|
Low-med | Medium | Ollama service already in cluster for embeddings |
| 10 |
Anomaly detection on skill gap evolution Alert when an employee's readiness score drops unexpectedly fast — possible skill obsolescence or coaching relationship issue |
Medium | High | Reconciliation task (#6) + Alertmanager integration |
CareerForge AI Deep Dive — Internal use only