Compare commits

..

No commits in common. "e779bbc3936e40970afe2c87d74af8af42547d91" and "381d4a746f729f51cb4f9543e333c1e213405c27" have entirely different histories.

2 changed files with 173 additions and 175 deletions

View File

@ -17,7 +17,7 @@
"dependencies": { "dependencies": {
"ethers": "5.7.2", "ethers": "5.7.2",
"express": "^4.17.1", "express": "^4.17.1",
"node-persist": "^4.0.3", "node-persist": "^3.1.3",
"npm": "^8.5.2", "npm": "^8.5.2",
"prom-client": "^14.0.1" "prom-client": "^14.0.1"
}, },

View File

@ -74,6 +74,8 @@ let orchCache = {};
let jsonString = ""; let jsonString = "";
let lastLeaderboardCheck = 0; let lastLeaderboardCheck = 0;
let lastStringify = 0; let lastStringify = 0;
let isSynced = false;
let storageLock = false;
let staleENSCache = false; let staleENSCache = false;
let staleOrchCache = false; let staleOrchCache = false;
@ -88,111 +90,29 @@ function sleep(ms) {
}); });
} }
// Recover persistent storage async function withStorageLock(fn) {
async function recoverStorage(initPrometheus) { while (storageLock) {
console.log("Retrieving state from storage."); await sleep(50); // Small delay before retry
storedDomains = await storage.getItem("ensDomainCache");
if (storedDomains) {
ensDomainCache = storedDomains;
} }
storedOrchs = await storage.getItem("orchCache"); storageLock = true;
if (storedOrchs) { try {
orchCache = storedOrchs; await fn();
} finally {
storageLock = false;
} }
console.log("Recovered state from storage.");
if (!initPrometheus) {
console.log("Skipping Prometheus initialisation.");
return;
} }
// Re-init from storage async function writeToStorage(key, data) {
for (const [id, obj] of Object.entries(orchCache)) {
const thisName = obj.name;
const thisInstances = obj.instances;
// Latest leaderboard results observed
if (obj.leaderboardResults) {
for (const [region, res] of Object.entries(obj.leaderboardResults)) {
// Skip the lastTime accessor - only use last observed regional stats
if (res.latestRTR == null || res.latestSR == null) {
continue;
}
console.log(
"Re-init leaderboard scores for orch=" +
id +
", RTR=" +
res.latestRTR +
" and success rate of " +
res.latestSR * 100 +
"%, livepeer region " +
region
);
let latitude = null;
let longitude = null;
for (const [resolvedTarget, instance] of Object.entries(
thisInstances
)) {
if (instance.livepeer_regions[region]) {
latitude = instance.latitude;
longitude = instance.longitude;
}
}
promLatestRTR.set(
{
livepeer_region: region,
orchestrator: thisName,
latitude: latitude,
longitude: longitude,
},
res.latestRTR
);
promLatestSuccessRate.set(
{
livepeer_region: region,
orchestrator: thisName,
latitude: latitude,
longitude: longitude,
},
res.latestSR
);
}
}
}
console.log("Initialized Prometheus state.");
}
function writeToStorage(key, data) {
try { try {
const serialized = JSON.stringify(data); const serialized = JSON.stringify(data);
if (!serialized) throw new Error("Invalid JSON data"); if (!serialized) throw new Error("Serialization failed");
storage.setItem(key, JSON.parse(serialized)); await withStorageLock(async () => {
console.log(`Stored data for key: ${key}`); await storage.setItem(key, JSON.parse(serialized));
} catch (err) {
console.error(`Error writing to storage for key: ${key}`, err.message);
recoverStorage(false);
}
}
async function flushCachesToStorage() {
try {
if (staleENSCache) {
writeToStorage("ensDomainCache", ensDomainCache);
}
if (staleOrchCache) {
writeToStorage("orchCache", orchCache);
}
console.log("Flushed caches to storage.");
} catch (err) {
console.error("Error flushing caches to storage:", err.message);
}
}
process.on("SIGINT", async () => {
console.log("Shutting down worker...");
await flushCachesToStorage();
process.exit();
}); });
} catch (err) {
console.error("Error writing to storage:", err.message);
}
}
// Process the task queue continuously // Process the task queue continuously
async function processQueue() { async function processQueue() {
@ -206,21 +126,30 @@ async function processQueue() {
} }
} else { } else {
if (staleENSCache) { if (staleENSCache) {
console.log("Writing ENS cache to storage..."); try {
writeToStorage("ensDomainCache", ensDomainCache); console.log("Writing ENS cache to disk");
await writeToStorage("ensDomainCache", ensDomainCache);
staleENSCache = false; staleENSCache = false;
} catch (err) {
console.log(err);
console.log("Error writing to storage...");
}
} }
if (staleOrchCache) { if (staleOrchCache) {
console.log("Writing orchestrator cache to storage..."); try {
writeToStorage("orchCache", orchCache); console.log("Writing Orchestrator cache to disk");
await writeToStorage("orchCache", orchCache);
staleOrchCache = false; staleOrchCache = false;
} catch (err) {
console.log(err);
console.log("Error writing to storage...");
}
} }
await sleep(1000); await sleep(1000);
} }
} }
} }
processQueue();
// ENS Domain Lookup // ENS Domain Lookup
async function getEnsDomain(addr) { async function getEnsDomain(addr) {
@ -538,15 +467,11 @@ async function updateScore(address) {
const thisName = orchCache[address.toLowerCase()].name; const thisName = orchCache[address.toLowerCase()].name;
const lastTime = orchCache[address.toLowerCase()].leaderboardResults.lastTime; const lastTime = orchCache[address.toLowerCase()].leaderboardResults.lastTime;
const url = `https://leaderboard-serverless.vercel.app/api/raw_stats?orchestrator=${address}`; let url =
"https://leaderboard-serverless.vercel.app/api/raw_stats?orchestrator=" +
try { address;
const response = await fetch(url, { timeout: 10000 });
if (!response.ok) {
throw new Error(`HTTP error! status: ${response.status}`);
}
const json = await response.json();
const json = await fetch(url).then((res) => res.json());
let hasEdited = false; let hasEdited = false;
for (const [region, results] of Object.entries(json)) { for (const [region, results] of Object.entries(json)) {
for (const instance of results) { for (const instance of results) {
@ -555,18 +480,22 @@ async function updateScore(address) {
const newRTR = instance.round_trip_time / instance.seg_duration; const newRTR = instance.round_trip_time / instance.seg_duration;
let latitude = null; let latitude = null;
let longitude = null; let longitude = null;
for (const [resolvedTarget, instance] of Object.entries(
for (const [resolvedTarget, instance] of Object.entries(thisInstances)) { thisInstances
)) {
if (instance.livepeer_regions[region]) { if (instance.livepeer_regions[region]) {
latitude = instance.latitude; latitude = instance.latitude;
longitude = instance.longitude; longitude = instance.longitude;
} }
} }
console.log( console.log(
`Found new RTR=${newRTR} and new success rate of ${newSR * 100}%, livepeer region ${instance.region}` "Found new RTR=" +
newRTR +
" and new success rate of " +
newSR * 100 +
"%, livepeer region " +
instance.region
); );
promLatestRTR.set( promLatestRTR.set(
{ {
livepeer_region: instance.region, livepeer_region: instance.region,
@ -585,24 +514,27 @@ async function updateScore(address) {
}, },
newSR newSR
); );
if (
if (!orchCache[address.toLowerCase()].leaderboardResults[instance.region]) { !orchCache[address.toLowerCase()].leaderboardResults[instance.region]
orchCache[address.toLowerCase()].leaderboardResults[instance.region] = {}; ) {
orchCache[address.toLowerCase()].leaderboardResults[instance.region] =
{};
} }
orchCache[address.toLowerCase()].leaderboardResults[instance.region].latestRTR = newRTR; orchCache[address.toLowerCase()].leaderboardResults[
orchCache[address.toLowerCase()].leaderboardResults[instance.region].latestSR = newSR; instance.region
].latestRTR = newRTR;
orchCache[address.toLowerCase()].leaderboardResults[
instance.region
].latestSR = newSR;
hasEdited = true; hasEdited = true;
} }
} }
} }
if (hasEdited) { if (hasEdited) {
orchCache[address.toLowerCase()].leaderboardResults.lastTime = new Date().getTime(); orchCache[address.toLowerCase()].leaderboardResults.lastTime =
new Date().getTime();
staleOrchCache = true; staleOrchCache = true;
} }
} catch (error) {
console.error(`Error fetching data for address ${address}:`, error.message);
}
} }
// Parse leaderboard results // Parse leaderboard results
@ -612,6 +544,81 @@ async function updateOrchScores() {
} }
} }
// Recover persistent storage
async function recoverStorage() {
await storage.init({
stringify: JSON.stringify,
parse: JSON.parse,
encoding: "utf8",
logging: false,
ttl: false,
forgiveParseErrors: false,
});
storedDomains = await storage.getItem("ensDomainCache");
if (storedDomains) {
ensDomainCache = storedDomains;
}
storedOrchs = await storage.getItem("orchCache");
if (storedOrchs) {
orchCache = storedOrchs;
}
// Re-init from storage
for (const [id, obj] of Object.entries(orchCache)) {
const thisName = obj.name;
const thisInstances = obj.instances;
// Latest leaderboard results observed
if (obj.leaderboardResults) {
for (const [region, res] of Object.entries(obj.leaderboardResults)) {
// Skip the lastTime accessor - only use last observed regional stats
if (res.latestRTR == null || res.latestSR == null) {
continue;
}
console.log(
"Re-init leaderboard scores for orch=" +
id +
", RTR=" +
res.latestRTR +
" and success rate of " +
res.latestSR * 100 +
"%, livepeer region " +
region
);
let latitude = null;
let longitude = null;
for (const [resolvedTarget, instance] of Object.entries(
thisInstances
)) {
if (instance.livepeer_regions[region]) {
latitude = instance.latitude;
longitude = instance.longitude;
}
}
promLatestRTR.set(
{
livepeer_region: region,
orchestrator: thisName,
latitude: latitude,
longitude: longitude,
},
res.latestRTR
);
promLatestSuccessRate.set(
{
livepeer_region: region,
orchestrator: thisName,
latitude: latitude,
longitude: longitude,
},
res.latestSR
);
}
}
}
isSynced = true;
}
// Main loop - checks for leaderboard results and refreshes some caches // Main loop - checks for leaderboard results and refreshes some caches
async function runTests() { async function runTests() {
const now = Date.now(); const now = Date.now();
@ -661,17 +668,8 @@ parentPort.on("message", async (message) => {
// Initialize and Start Worker // Initialize and Start Worker
(async () => { (async () => {
console.log("Opening storage."); console.log("Reading local storage...");
await storage.init({ recoverStorage();
stringify: JSON.stringify,
parse: JSON.parse,
encoding: "utf8",
logging: false,
ttl: false,
forgiveParseErrors: false,
});
recoverStorage(true);
console.log("Worker initialized."); console.log("Worker initialized.");
runTests(); runTests();
processQueue();
})(); })();