Catch leaderboard errors

This commit is contained in:
Marco van Dijk 2025-01-04 18:07:01 +01:00
parent 381d4a746f
commit d1cda160b2

View File

@ -74,8 +74,6 @@ let orchCache = {};
let jsonString = ""; let jsonString = "";
let lastLeaderboardCheck = 0; let lastLeaderboardCheck = 0;
let lastStringify = 0; let lastStringify = 0;
let isSynced = false;
let storageLock = false;
let staleENSCache = false; let staleENSCache = false;
let staleOrchCache = false; let staleOrchCache = false;
@ -90,30 +88,112 @@ function sleep(ms) {
}); });
} }
async function withStorageLock(fn) { // Recover persistent storage
while (storageLock) { async function recoverStorage(initPrometheus) {
await sleep(50); // Small delay before retry console.log("Retrieving state from storage.");
storedDomains = await storage.getItem("ensDomainCache");
if (storedDomains) {
ensDomainCache = storedDomains;
} }
storageLock = true; storedOrchs = await storage.getItem("orchCache");
if (storedOrchs) {
orchCache = storedOrchs;
}
console.log("Recovered state from storage.");
if (!initPrometheus) {
console.log("Skipping Prometheus initialisation.");
return;
}
// Re-init from storage
for (const [id, obj] of Object.entries(orchCache)) {
const thisName = obj.name;
const thisInstances = obj.instances;
// Latest leaderboard results observed
if (obj.leaderboardResults) {
for (const [region, res] of Object.entries(obj.leaderboardResults)) {
// Skip the lastTime accessor - only use last observed regional stats
if (res.latestRTR == null || res.latestSR == null) {
continue;
}
console.log(
"Re-init leaderboard scores for orch=" +
id +
", RTR=" +
res.latestRTR +
" and success rate of " +
res.latestSR * 100 +
"%, livepeer region " +
region
);
let latitude = null;
let longitude = null;
for (const [resolvedTarget, instance] of Object.entries(
thisInstances
)) {
if (instance.livepeer_regions[region]) {
latitude = instance.latitude;
longitude = instance.longitude;
}
}
promLatestRTR.set(
{
livepeer_region: region,
orchestrator: thisName,
latitude: latitude,
longitude: longitude,
},
res.latestRTR
);
promLatestSuccessRate.set(
{
livepeer_region: region,
orchestrator: thisName,
latitude: latitude,
longitude: longitude,
},
res.latestSR
);
}
}
}
console.log("Initialized Prometheus state.");
}
function writeToStorage(key, data) {
try { try {
await fn(); const serialized = JSON.stringify(data);
} finally { if (!serialized) throw new Error("Invalid JSON data");
storageLock = false; storage.setItem(key, JSON.parse(serialized));
console.log(`Stored data for key: ${key}`);
} catch (err) {
console.error(`Error writing to storage for key: ${key}`, err.message);
recoverStorage(false);
} }
} }
async function writeToStorage(key, data) { async function flushCachesToStorage() {
try { try {
const serialized = JSON.stringify(data); if (staleENSCache) {
if (!serialized) throw new Error("Serialization failed"); writeToStorage("ensDomainCache", ensDomainCache);
await withStorageLock(async () => { }
await storage.setItem(key, JSON.parse(serialized)); if (staleOrchCache) {
}); writeToStorage("orchCache", orchCache);
}
console.log("Flushed caches to storage.");
} catch (err) { } catch (err) {
console.error("Error writing to storage:", err.message); console.error("Error flushing caches to storage:", err.message);
} }
} }
process.on("SIGINT", async () => {
console.log("Shutting down worker...");
await flushCachesToStorage();
process.exit();
});
// Process the task queue continuously // Process the task queue continuously
async function processQueue() { async function processQueue() {
while (true) { while (true) {
@ -126,30 +206,21 @@ async function processQueue() {
} }
} else { } else {
if (staleENSCache) { if (staleENSCache) {
try { console.log("Writing ENS cache to storage...");
console.log("Writing ENS cache to disk"); writeToStorage("ensDomainCache", ensDomainCache);
await writeToStorage("ensDomainCache", ensDomainCache);
staleENSCache = false; staleENSCache = false;
} catch (err) {
console.log(err);
console.log("Error writing to storage...");
}
} }
if (staleOrchCache) { if (staleOrchCache) {
try { console.log("Writing orchestrator cache to storage...");
console.log("Writing Orchestrator cache to disk"); writeToStorage("orchCache", orchCache);
await writeToStorage("orchCache", orchCache);
staleOrchCache = false; staleOrchCache = false;
} catch (err) {
console.log(err);
console.log("Error writing to storage...");
}
} }
await sleep(1000); await sleep(1000);
} }
} }
} }
processQueue();
// ENS Domain Lookup // ENS Domain Lookup
async function getEnsDomain(addr) { async function getEnsDomain(addr) {
@ -467,11 +538,15 @@ async function updateScore(address) {
const thisName = orchCache[address.toLowerCase()].name; const thisName = orchCache[address.toLowerCase()].name;
const lastTime = orchCache[address.toLowerCase()].leaderboardResults.lastTime; const lastTime = orchCache[address.toLowerCase()].leaderboardResults.lastTime;
let url = const url = `https://leaderboard-serverless.vercel.app/api/raw_stats?orchestrator=${address}`;
"https://leaderboard-serverless.vercel.app/api/raw_stats?orchestrator=" +
address; try {
const response = await fetch(url, { timeout: 10000 });
if (!response.ok) {
throw new Error(`HTTP error! status: ${response.status}`);
}
const json = await response.json();
const json = await fetch(url).then((res) => res.json());
let hasEdited = false; let hasEdited = false;
for (const [region, results] of Object.entries(json)) { for (const [region, results] of Object.entries(json)) {
for (const instance of results) { for (const instance of results) {
@ -480,22 +555,18 @@ async function updateScore(address) {
const newRTR = instance.round_trip_time / instance.seg_duration; const newRTR = instance.round_trip_time / instance.seg_duration;
let latitude = null; let latitude = null;
let longitude = null; let longitude = null;
for (const [resolvedTarget, instance] of Object.entries(
thisInstances for (const [resolvedTarget, instance] of Object.entries(thisInstances)) {
)) {
if (instance.livepeer_regions[region]) { if (instance.livepeer_regions[region]) {
latitude = instance.latitude; latitude = instance.latitude;
longitude = instance.longitude; longitude = instance.longitude;
} }
} }
console.log( console.log(
"Found new RTR=" + `Found new RTR=${newRTR} and new success rate of ${newSR * 100}%, livepeer region ${instance.region}`
newRTR +
" and new success rate of " +
newSR * 100 +
"%, livepeer region " +
instance.region
); );
promLatestRTR.set( promLatestRTR.set(
{ {
livepeer_region: instance.region, livepeer_region: instance.region,
@ -514,27 +585,24 @@ async function updateScore(address) {
}, },
newSR newSR
); );
if (
!orchCache[address.toLowerCase()].leaderboardResults[instance.region] if (!orchCache[address.toLowerCase()].leaderboardResults[instance.region]) {
) { orchCache[address.toLowerCase()].leaderboardResults[instance.region] = {};
orchCache[address.toLowerCase()].leaderboardResults[instance.region] =
{};
} }
orchCache[address.toLowerCase()].leaderboardResults[ orchCache[address.toLowerCase()].leaderboardResults[instance.region].latestRTR = newRTR;
instance.region orchCache[address.toLowerCase()].leaderboardResults[instance.region].latestSR = newSR;
].latestRTR = newRTR;
orchCache[address.toLowerCase()].leaderboardResults[
instance.region
].latestSR = newSR;
hasEdited = true; hasEdited = true;
} }
} }
} }
if (hasEdited) { if (hasEdited) {
orchCache[address.toLowerCase()].leaderboardResults.lastTime = orchCache[address.toLowerCase()].leaderboardResults.lastTime = new Date().getTime();
new Date().getTime();
staleOrchCache = true; staleOrchCache = true;
} }
} catch (error) {
console.error(`Error fetching data for address ${address}:`, error.message);
}
} }
// Parse leaderboard results // Parse leaderboard results
@ -544,81 +612,6 @@ async function updateOrchScores() {
} }
} }
// Recover persistent storage
async function recoverStorage() {
await storage.init({
stringify: JSON.stringify,
parse: JSON.parse,
encoding: "utf8",
logging: false,
ttl: false,
forgiveParseErrors: false,
});
storedDomains = await storage.getItem("ensDomainCache");
if (storedDomains) {
ensDomainCache = storedDomains;
}
storedOrchs = await storage.getItem("orchCache");
if (storedOrchs) {
orchCache = storedOrchs;
}
// Re-init from storage
for (const [id, obj] of Object.entries(orchCache)) {
const thisName = obj.name;
const thisInstances = obj.instances;
// Latest leaderboard results observed
if (obj.leaderboardResults) {
for (const [region, res] of Object.entries(obj.leaderboardResults)) {
// Skip the lastTime accessor - only use last observed regional stats
if (res.latestRTR == null || res.latestSR == null) {
continue;
}
console.log(
"Re-init leaderboard scores for orch=" +
id +
", RTR=" +
res.latestRTR +
" and success rate of " +
res.latestSR * 100 +
"%, livepeer region " +
region
);
let latitude = null;
let longitude = null;
for (const [resolvedTarget, instance] of Object.entries(
thisInstances
)) {
if (instance.livepeer_regions[region]) {
latitude = instance.latitude;
longitude = instance.longitude;
}
}
promLatestRTR.set(
{
livepeer_region: region,
orchestrator: thisName,
latitude: latitude,
longitude: longitude,
},
res.latestRTR
);
promLatestSuccessRate.set(
{
livepeer_region: region,
orchestrator: thisName,
latitude: latitude,
longitude: longitude,
},
res.latestSR
);
}
}
}
isSynced = true;
}
// Main loop - checks for leaderboard results and refreshes some caches // Main loop - checks for leaderboard results and refreshes some caches
async function runTests() { async function runTests() {
const now = Date.now(); const now = Date.now();
@ -668,8 +661,17 @@ parentPort.on("message", async (message) => {
// Initialize and Start Worker // Initialize and Start Worker
(async () => { (async () => {
console.log("Reading local storage..."); console.log("Opening storage.");
recoverStorage(); await storage.init({
stringify: JSON.stringify,
parse: JSON.parse,
encoding: "utf8",
logging: false,
ttl: false,
forgiveParseErrors: false,
});
recoverStorage(true);
console.log("Worker initialized."); console.log("Worker initialized.");
runTests(); runTests();
processQueue();
})(); })();