190 lines
5.9 KiB
JavaScript
190 lines
5.9 KiB
JavaScript
"use strict";
|
|
/**
|
|
* retrieverService.js -- persistent Python daemon.
|
|
*
|
|
* Spawns retrieve.py ONCE when the Node server starts. The Python process
|
|
* loads the FAISS index and BM25 index once, then serves queries via
|
|
* newline-delimited JSON on stdin/stdout.
|
|
*
|
|
* First query: ~150ms (index already warm from startup).
|
|
* Cold start happens in the background while the server is booting.
|
|
*
|
|
* inference.py is never modified.
|
|
*/
|
|
|
|
const { spawn } = require("child_process");
|
|
const path = require("path");
|
|
const readline = require("readline");
|
|
const { EventEmitter } = require("events");
|
|
|
|
/** @type {string} - Absolute path to bridge/retrieve.py. */
|
|
const BRIDGE = path.join(__dirname, "../bridge/retrieve.py");
|
|
/** @type {string} - Repository root, used as cwd for the Python subprocess. */
|
|
const ROOT = path.join(__dirname, "../../..");
|
|
/** @type {string} - Python executable; override with PYTHON_BIN env var. */
|
|
const PYTHON = process.env.PYTHON_BIN || "python";
|
|
|
|
/** @type {number} - Maximum milliseconds to wait for the daemon to signal ready on cold start. */
|
|
const BOOT_TIMEOUT_MS = 90_000;
|
|
/** @type {number} - Maximum milliseconds to wait for a single query response once the daemon is warm. */
|
|
const QUERY_TIMEOUT_MS = 10_000;
|
|
|
|
class PythonRetriever extends EventEmitter {
|
|
constructor() {
|
|
super();
|
|
this._proc = null;
|
|
this._rl = null;
|
|
this._ready = false;
|
|
this._error = null;
|
|
this._queue = []; // requests queued before ready: [{query,top_n,resolve,reject,timer}]
|
|
this._pending = []; // in-flight requests sent to Python: [{resolve,reject,timer}]
|
|
this._start();
|
|
}
|
|
|
|
_start() {
|
|
this._ready = false;
|
|
this._error = null;
|
|
|
|
console.log("[retriever] Starting Python daemon (first boot ~20s)...");
|
|
|
|
this._proc = spawn(PYTHON, [BRIDGE], {
|
|
cwd: ROOT,
|
|
env: { ...process.env },
|
|
stdio: ["pipe", "pipe", "pipe"],
|
|
});
|
|
|
|
this._rl = readline.createInterface({ input: this._proc.stdout, crlfDelay: Infinity });
|
|
|
|
this._rl.on("line", (raw) => this._onLine(raw));
|
|
|
|
this._proc.stderr.on("data", (d) => {
|
|
const text = d.toString();
|
|
// Suppress routine model-loading noise
|
|
if (
|
|
text.includes("Loading cached") ||
|
|
text.includes("BM25 index") ||
|
|
text.includes("Loaded ") ||
|
|
text.includes("Building BM25")
|
|
) return;
|
|
process.stderr.write("[py] " + text);
|
|
});
|
|
|
|
this._proc.on("close", (code) => {
|
|
console.warn(`[retriever] Daemon exited (code ${code}), restarting on next query.`);
|
|
this._ready = false;
|
|
this._proc = null;
|
|
this._rl = null;
|
|
// Reject anything still in flight
|
|
for (const p of [...this._queue, ...this._pending]) {
|
|
clearTimeout(p.timer);
|
|
p.reject(new Error("Python retriever restarted unexpectedly."));
|
|
}
|
|
this._queue.length = 0;
|
|
this._pending.length = 0;
|
|
});
|
|
|
|
this._proc.on("error", (err) => {
|
|
console.error("[retriever] Spawn error:", err.message);
|
|
this._error = err.message;
|
|
for (const p of [...this._queue, ...this._pending]) {
|
|
clearTimeout(p.timer);
|
|
p.reject(new Error(`Retriever spawn failed: ${err.message}`));
|
|
}
|
|
this._queue.length = 0;
|
|
this._pending.length = 0;
|
|
});
|
|
}
|
|
|
|
_onLine(raw) {
|
|
raw = raw.trim();
|
|
if (!raw) return;
|
|
|
|
let msg;
|
|
try { msg = JSON.parse(raw); }
|
|
catch { return; } // ignore non-JSON (e.g. sentence-transformers progress bars)
|
|
|
|
// Startup handshake: wait for {"ready":true} before flushing the queue.
|
|
if (!this._ready) {
|
|
if (msg.ready) {
|
|
this._ready = true;
|
|
console.log(`[retriever] Ready -- flushing ${this._queue.length} queued request(s).`);
|
|
// Send all queued requests in order
|
|
for (const item of this._queue) {
|
|
this._pending.push(item);
|
|
this._send(item);
|
|
}
|
|
this._queue.length = 0;
|
|
} else if (msg.error) {
|
|
this._error = msg.error;
|
|
console.error("[retriever] Init failed:", msg.error);
|
|
for (const p of this._queue) { clearTimeout(p.timer); p.reject(new Error(msg.error)); }
|
|
this._queue.length = 0;
|
|
}
|
|
return;
|
|
}
|
|
|
|
// Query response -- resolve/reject the oldest in-flight request (FIFO).
|
|
const item = this._pending.shift();
|
|
if (!item) return;
|
|
clearTimeout(item.timer);
|
|
|
|
if (msg.error) {
|
|
item.reject(new Error(msg.error));
|
|
} else {
|
|
item.resolve({
|
|
results: msg.results || [],
|
|
latency_seconds: msg.latency_seconds ?? 0,
|
|
});
|
|
}
|
|
}
|
|
|
|
_send(item) {
|
|
if (!this._proc || this._proc.killed) return;
|
|
this._proc.stdin.write(
|
|
JSON.stringify({ query: item.query, top_n: item.top_n }) + "\n"
|
|
);
|
|
}
|
|
|
|
/**
|
|
* @param {string} query
|
|
* @param {number} topN
|
|
* @returns {Promise<{ results: Array, latency_seconds: number }>}
|
|
*/
|
|
retrieve(query, topN = 5) {
|
|
// Restart if crashed
|
|
if (!this._proc) this._start();
|
|
|
|
return new Promise((resolve, reject) => {
|
|
if (this._error) {
|
|
return reject(new Error(this._error));
|
|
}
|
|
|
|
const timeoutMs = this._ready ? QUERY_TIMEOUT_MS : BOOT_TIMEOUT_MS;
|
|
const item = { query, top_n: topN, resolve, reject, timer: null };
|
|
|
|
item.timer = setTimeout(() => {
|
|
// Remove from whichever queue it's in
|
|
let idx = this._queue.indexOf(item);
|
|
if (idx !== -1) this._queue.splice(idx, 1);
|
|
idx = this._pending.indexOf(item);
|
|
if (idx !== -1) this._pending.splice(idx, 1);
|
|
reject(new Error(`Retriever timed out after ${timeoutMs}ms`));
|
|
}, timeoutMs);
|
|
|
|
if (this._ready) {
|
|
// Send immediately and wait for response
|
|
this._pending.push(item);
|
|
this._send(item);
|
|
} else {
|
|
// Queue until daemon signals ready
|
|
this._queue.push(item);
|
|
}
|
|
});
|
|
}
|
|
}
|
|
|
|
// Singleton -- one daemon for the lifetime of the Node process
|
|
const retriever = new PythonRetriever();
|
|
|
|
module.exports = { retrieve: (q, n) => retriever.retrieve(q, n) };
|