From d1cda160b2232f0be120df79cc1222b3d3161aa8 Mon Sep 17 00:00:00 2001 From: Marco van Dijk Date: Sat, 4 Jan 2025 18:07:01 +0100 Subject: [PATCH] Catch leaderboard errors --- master/worker.js | 346 ++++++++++++++++++++++++----------------------- 1 file changed, 174 insertions(+), 172 deletions(-) diff --git a/master/worker.js b/master/worker.js index 117da5b..b92a88f 100644 --- a/master/worker.js +++ b/master/worker.js @@ -74,8 +74,6 @@ let orchCache = {}; let jsonString = ""; let lastLeaderboardCheck = 0; let lastStringify = 0; -let isSynced = false; -let storageLock = false; let staleENSCache = false; let staleOrchCache = false; @@ -90,30 +88,112 @@ function sleep(ms) { }); } -async function withStorageLock(fn) { - while (storageLock) { - await sleep(50); // Small delay before retry +// Recover persistent storage +async function recoverStorage(initPrometheus) { + console.log("Retrieving state from storage."); + storedDomains = await storage.getItem("ensDomainCache"); + if (storedDomains) { + ensDomainCache = storedDomains; } - storageLock = true; + storedOrchs = await storage.getItem("orchCache"); + if (storedOrchs) { + orchCache = storedOrchs; + } + console.log("Recovered state from storage."); + + if (!initPrometheus) { + console.log("Skipping Prometheus initialisation."); + return; + } + + // Re-init from storage + for (const [id, obj] of Object.entries(orchCache)) { + const thisName = obj.name; + const thisInstances = obj.instances; + + // Latest leaderboard results observed + if (obj.leaderboardResults) { + for (const [region, res] of Object.entries(obj.leaderboardResults)) { + // Skip the lastTime accessor - only use last observed regional stats + if (res.latestRTR == null || res.latestSR == null) { + continue; + } + console.log( + "Re-init leaderboard scores for orch=" + + id + + ", RTR=" + + res.latestRTR + + " and success rate of " + + res.latestSR * 100 + + "%, livepeer region " + + region + ); + let latitude = null; + let longitude = null; + for (const [resolvedTarget, instance] of Object.entries( + thisInstances + )) { + if (instance.livepeer_regions[region]) { + latitude = instance.latitude; + longitude = instance.longitude; + } + } + promLatestRTR.set( + { + livepeer_region: region, + orchestrator: thisName, + latitude: latitude, + longitude: longitude, + }, + res.latestRTR + ); + promLatestSuccessRate.set( + { + livepeer_region: region, + orchestrator: thisName, + latitude: latitude, + longitude: longitude, + }, + res.latestSR + ); + } + } + } + console.log("Initialized Prometheus state."); +} + +function writeToStorage(key, data) { try { - await fn(); - } finally { - storageLock = false; + const serialized = JSON.stringify(data); + if (!serialized) throw new Error("Invalid JSON data"); + storage.setItem(key, JSON.parse(serialized)); + console.log(`Stored data for key: ${key}`); + } catch (err) { + console.error(`Error writing to storage for key: ${key}`, err.message); + recoverStorage(false); } } -async function writeToStorage(key, data) { +async function flushCachesToStorage() { try { - const serialized = JSON.stringify(data); - if (!serialized) throw new Error("Serialization failed"); - await withStorageLock(async () => { - await storage.setItem(key, JSON.parse(serialized)); - }); + if (staleENSCache) { + writeToStorage("ensDomainCache", ensDomainCache); + } + if (staleOrchCache) { + writeToStorage("orchCache", orchCache); + } + console.log("Flushed caches to storage."); } catch (err) { - console.error("Error writing to storage:", err.message); + console.error("Error flushing caches to storage:", err.message); } } +process.on("SIGINT", async () => { + console.log("Shutting down worker..."); + await flushCachesToStorage(); + process.exit(); +}); + // Process the task queue continuously async function processQueue() { while (true) { @@ -126,30 +206,21 @@ async function processQueue() { } } else { if (staleENSCache) { - try { - console.log("Writing ENS cache to disk"); - await writeToStorage("ensDomainCache", ensDomainCache); - staleENSCache = false; - } catch (err) { - console.log(err); - console.log("Error writing to storage..."); - } + console.log("Writing ENS cache to storage..."); + writeToStorage("ensDomainCache", ensDomainCache); + staleENSCache = false; } + if (staleOrchCache) { - try { - console.log("Writing Orchestrator cache to disk"); - await writeToStorage("orchCache", orchCache); - staleOrchCache = false; - } catch (err) { - console.log(err); - console.log("Error writing to storage..."); - } + console.log("Writing orchestrator cache to storage..."); + writeToStorage("orchCache", orchCache); + staleOrchCache = false; } + await sleep(1000); } } } -processQueue(); // ENS Domain Lookup async function getEnsDomain(addr) { @@ -467,73 +538,70 @@ async function updateScore(address) { const thisName = orchCache[address.toLowerCase()].name; const lastTime = orchCache[address.toLowerCase()].leaderboardResults.lastTime; - let url = - "https://leaderboard-serverless.vercel.app/api/raw_stats?orchestrator=" + - address; + const url = `https://leaderboard-serverless.vercel.app/api/raw_stats?orchestrator=${address}`; - const json = await fetch(url).then((res) => res.json()); - let hasEdited = false; - for (const [region, results] of Object.entries(json)) { - for (const instance of results) { - if (instance.timestamp * 1000 > lastTime) { - const newSR = instance.success_rate; - const newRTR = instance.round_trip_time / instance.seg_duration; - let latitude = null; - let longitude = null; - for (const [resolvedTarget, instance] of Object.entries( - thisInstances - )) { - if (instance.livepeer_regions[region]) { - latitude = instance.latitude; - longitude = instance.longitude; + try { + const response = await fetch(url, { timeout: 10000 }); + if (!response.ok) { + throw new Error(`HTTP error! status: ${response.status}`); + } + const json = await response.json(); + + let hasEdited = false; + for (const [region, results] of Object.entries(json)) { + for (const instance of results) { + if (instance.timestamp * 1000 > lastTime) { + const newSR = instance.success_rate; + const newRTR = instance.round_trip_time / instance.seg_duration; + let latitude = null; + let longitude = null; + + for (const [resolvedTarget, instance] of Object.entries(thisInstances)) { + if (instance.livepeer_regions[region]) { + latitude = instance.latitude; + longitude = instance.longitude; + } } + + console.log( + `Found new RTR=${newRTR} and new success rate of ${newSR * 100}%, livepeer region ${instance.region}` + ); + + promLatestRTR.set( + { + livepeer_region: instance.region, + orchestrator: thisName, + latitude: latitude, + longitude: longitude, + }, + newRTR + ); + promLatestSuccessRate.set( + { + livepeer_region: instance.region, + orchestrator: thisName, + latitude: latitude, + longitude: longitude, + }, + newSR + ); + + if (!orchCache[address.toLowerCase()].leaderboardResults[instance.region]) { + orchCache[address.toLowerCase()].leaderboardResults[instance.region] = {}; + } + orchCache[address.toLowerCase()].leaderboardResults[instance.region].latestRTR = newRTR; + orchCache[address.toLowerCase()].leaderboardResults[instance.region].latestSR = newSR; + hasEdited = true; } - console.log( - "Found new RTR=" + - newRTR + - " and new success rate of " + - newSR * 100 + - "%, livepeer region " + - instance.region - ); - promLatestRTR.set( - { - livepeer_region: instance.region, - orchestrator: thisName, - latitude: latitude, - longitude: longitude, - }, - newRTR - ); - promLatestSuccessRate.set( - { - livepeer_region: instance.region, - orchestrator: thisName, - latitude: latitude, - longitude: longitude, - }, - newSR - ); - if ( - !orchCache[address.toLowerCase()].leaderboardResults[instance.region] - ) { - orchCache[address.toLowerCase()].leaderboardResults[instance.region] = - {}; - } - orchCache[address.toLowerCase()].leaderboardResults[ - instance.region - ].latestRTR = newRTR; - orchCache[address.toLowerCase()].leaderboardResults[ - instance.region - ].latestSR = newSR; - hasEdited = true; } } - } - if (hasEdited) { - orchCache[address.toLowerCase()].leaderboardResults.lastTime = - new Date().getTime(); - staleOrchCache = true; + + if (hasEdited) { + orchCache[address.toLowerCase()].leaderboardResults.lastTime = new Date().getTime(); + staleOrchCache = true; + } + } catch (error) { + console.error(`Error fetching data for address ${address}:`, error.message); } } @@ -544,81 +612,6 @@ async function updateOrchScores() { } } -// Recover persistent storage -async function recoverStorage() { - await storage.init({ - stringify: JSON.stringify, - parse: JSON.parse, - encoding: "utf8", - logging: false, - ttl: false, - forgiveParseErrors: false, - }); - storedDomains = await storage.getItem("ensDomainCache"); - if (storedDomains) { - ensDomainCache = storedDomains; - } - storedOrchs = await storage.getItem("orchCache"); - if (storedOrchs) { - orchCache = storedOrchs; - } - - // Re-init from storage - for (const [id, obj] of Object.entries(orchCache)) { - const thisName = obj.name; - const thisInstances = obj.instances; - - // Latest leaderboard results observed - if (obj.leaderboardResults) { - for (const [region, res] of Object.entries(obj.leaderboardResults)) { - // Skip the lastTime accessor - only use last observed regional stats - if (res.latestRTR == null || res.latestSR == null) { - continue; - } - console.log( - "Re-init leaderboard scores for orch=" + - id + - ", RTR=" + - res.latestRTR + - " and success rate of " + - res.latestSR * 100 + - "%, livepeer region " + - region - ); - let latitude = null; - let longitude = null; - for (const [resolvedTarget, instance] of Object.entries( - thisInstances - )) { - if (instance.livepeer_regions[region]) { - latitude = instance.latitude; - longitude = instance.longitude; - } - } - promLatestRTR.set( - { - livepeer_region: region, - orchestrator: thisName, - latitude: latitude, - longitude: longitude, - }, - res.latestRTR - ); - promLatestSuccessRate.set( - { - livepeer_region: region, - orchestrator: thisName, - latitude: latitude, - longitude: longitude, - }, - res.latestSR - ); - } - } - } - isSynced = true; -} - // Main loop - checks for leaderboard results and refreshes some caches async function runTests() { const now = Date.now(); @@ -668,8 +661,17 @@ parentPort.on("message", async (message) => { // Initialize and Start Worker (async () => { - console.log("Reading local storage..."); - recoverStorage(); + console.log("Opening storage."); + await storage.init({ + stringify: JSON.stringify, + parse: JSON.parse, + encoding: "utf8", + logging: false, + ttl: false, + forgiveParseErrors: false, + }); + recoverStorage(true); console.log("Worker initialized."); runTests(); + processQueue(); })();