feat: add web server backend.

This commit is contained in:
K
2026-04-28 23:55:41 +05:30
parent 3065a0adce
commit 3a0c32ea8f
8 changed files with 1705 additions and 0 deletions
+184
View File
@@ -0,0 +1,184 @@
"use strict";
/**
* retrieverService.js — persistent Python daemon.
*
* Spawns retrieve.py ONCE when the Node server starts. The Python process
* loads the FAISS index and BM25 index once, then serves queries via
* newline-delimited JSON on stdin/stdout.
*
* First query: ~150ms (index already warm from startup).
* Cold start happens in the background while the server is booting.
*
* inference.py is never modified.
*/
const { spawn } = require("child_process");
const path = require("path");
const readline = require("readline");
const { EventEmitter } = require("events");
const BRIDGE = path.join(__dirname, "../bridge/retrieve.py");
const ROOT = path.join(__dirname, "../../..");
const PYTHON = process.env.PYTHON_BIN || "python";
const BOOT_TIMEOUT_MS = 90_000; // Python cold-start budget
const QUERY_TIMEOUT_MS = 10_000; // per-query budget once warm
class PythonRetriever extends EventEmitter {
constructor() {
super();
this._proc = null;
this._rl = null;
this._ready = false;
this._error = null;
this._queue = []; // requests queued before ready: [{query,top_n,resolve,reject,timer}]
this._pending = []; // in-flight requests sent to Python: [{resolve,reject,timer}]
this._start();
}
_start() {
this._ready = false;
this._error = null;
console.log("[retriever] Starting Python daemon (first boot ~20s)…");
this._proc = spawn(PYTHON, [BRIDGE], {
cwd: ROOT,
env: { ...process.env },
stdio: ["pipe", "pipe", "pipe"],
});
this._rl = readline.createInterface({ input: this._proc.stdout, crlfDelay: Infinity });
this._rl.on("line", (raw) => this._onLine(raw));
this._proc.stderr.on("data", (d) => {
const text = d.toString();
// Suppress routine model-loading noise
if (
text.includes("Loading cached") ||
text.includes("BM25 index") ||
text.includes("Loaded ") ||
text.includes("Building BM25")
) return;
process.stderr.write("[py] " + text);
});
this._proc.on("close", (code) => {
console.warn(`[retriever] Daemon exited (code ${code}), restarting on next query.`);
this._ready = false;
this._proc = null;
this._rl = null;
// Reject anything still in flight
for (const p of [...this._queue, ...this._pending]) {
clearTimeout(p.timer);
p.reject(new Error("Python retriever restarted unexpectedly."));
}
this._queue.length = 0;
this._pending.length = 0;
});
this._proc.on("error", (err) => {
console.error("[retriever] Spawn error:", err.message);
this._error = err.message;
for (const p of [...this._queue, ...this._pending]) {
clearTimeout(p.timer);
p.reject(new Error(`Retriever spawn failed: ${err.message}`));
}
this._queue.length = 0;
this._pending.length = 0;
});
}
_onLine(raw) {
raw = raw.trim();
if (!raw) return;
let msg;
try { msg = JSON.parse(raw); }
catch { return; } // ignore non-JSON (e.g. sentence-transformers progress bars)
// ── Startup handshake ──
if (!this._ready) {
if (msg.ready) {
this._ready = true;
console.log(`[retriever] Ready — flushing ${this._queue.length} queued request(s).`);
// Send all queued requests in order
for (const item of this._queue) {
this._pending.push(item);
this._send(item);
}
this._queue.length = 0;
} else if (msg.error) {
this._error = msg.error;
console.error("[retriever] Init failed:", msg.error);
for (const p of this._queue) { clearTimeout(p.timer); p.reject(new Error(msg.error)); }
this._queue.length = 0;
}
return;
}
// ── Query response — FIFO ──
const item = this._pending.shift();
if (!item) return;
clearTimeout(item.timer);
if (msg.error) {
item.reject(new Error(msg.error));
} else {
item.resolve({
results: msg.results || [],
latency_seconds: msg.latency_seconds ?? 0,
});
}
}
_send(item) {
if (!this._proc || this._proc.killed) return;
this._proc.stdin.write(
JSON.stringify({ query: item.query, top_n: item.top_n }) + "\n"
);
}
/**
* @param {string} query
* @param {number} topN
* @returns {Promise<{ results: Array, latency_seconds: number }>}
*/
retrieve(query, topN = 5) {
// Restart if crashed
if (!this._proc) this._start();
return new Promise((resolve, reject) => {
if (this._error) {
return reject(new Error(this._error));
}
const timeoutMs = this._ready ? QUERY_TIMEOUT_MS : BOOT_TIMEOUT_MS;
const item = { query, top_n: topN, resolve, reject, timer: null };
item.timer = setTimeout(() => {
// Remove from whichever queue it's in
let idx = this._queue.indexOf(item);
if (idx !== -1) this._queue.splice(idx, 1);
idx = this._pending.indexOf(item);
if (idx !== -1) this._pending.splice(idx, 1);
reject(new Error(`Retriever timed out after ${timeoutMs}ms`));
}, timeoutMs);
if (this._ready) {
// Send immediately and wait for response
this._pending.push(item);
this._send(item);
} else {
// Queue until daemon signals ready
this._queue.push(item);
}
});
}
}
// Singleton — one daemon for the lifetime of the Node process
const retriever = new PythonRetriever();
module.exports = { retrieve: (q, n) => retriever.retrieve(q, n) };