From 39961eda0c730121e178c75cbe3267842e4b3c35 Mon Sep 17 00:00:00 2001 From: Marco van Dijk Date: Tue, 9 Aug 2022 14:57:03 +0200 Subject: [PATCH] Added ENS --- README.md | 2 +- client/src/routes/orchTester.js | 4 +- master/README.md | 9 ++-- master/src/routes/master.js | 88 +++++++++++++++++++++++++++++---- 4 files changed, 88 insertions(+), 15 deletions(-) diff --git a/README.md b/README.md index cd83e6d..cfdc878 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,7 @@ This program consists of three components: - TODO integration with email / telegram - TODO api to subscribe for notifications and cancel it - TODO read test stream scores - - TODO allow custom names (ENS, configure via frontend) to be used over orch address + - TODO allow custom names (configure via frontend, ENS done) - A client which pushes stats to master - Gets the list of active Orchestrators from the Livepeer subgraph - Does GetOrchestrator calls to all Orchestrators one by one diff --git a/client/src/routes/orchTester.js b/client/src/routes/orchTester.js index db6f2b7..08e7779 100644 --- a/client/src/routes/orchTester.js +++ b/client/src/routes/orchTester.js @@ -170,6 +170,7 @@ function hexToBytes(hex) { } const discoverOrchestrator = async function (target) { + if (!target) { return; } var client = new livepeerProto.Orchestrator(target, ssl_creds); const addr = CONF_BROADCASTER; var receivedResults = false; @@ -194,6 +195,7 @@ const discoverOrchestrator = async function (target) { } const pingOrchestrator = async function (target) { + if (!target) { return; } var client = new livepeerProto.Orchestrator(target, ssl_creds); var receivedResults = false; var pingPong; @@ -218,7 +220,7 @@ const testOrchestrator = async function (id, target) { target = target.replace(/^https?:\/\//, ''); console.log("Target is " + target); const { discoveryResults, elapsed } = await discoverOrchestrator(target); - if (discoveryResults && discoveryResults == "insufficient sender reserve"){ + if (discoveryResults && discoveryResults == "insufficient sender reserve") { console.log('Ignoring ' + id + ' for stats due to insufficient sender reserve'); return; } diff --git a/master/README.md b/master/README.md index c1bba43..04b2b96 100644 --- a/master/README.md +++ b/master/README.md @@ -3,7 +3,8 @@ ``` { MASTER_PORT: what port the master is listening on - PRESHARED_MASTER_KEY: clients must provide this value when uploading stats + PRESHARED_MASTER_KEY: clients must provide this value when uploading stats, + CONF_ENS_TIMEOUT: timeout in ms after which it will pull new ENS domain names from nFrame } ``` ### Run production @@ -16,7 +17,8 @@ example `config.js`: ``` export const { MASTER_PORT = 42069, - PRESHARED_MASTER_KEY = "koekjes" + PRESHARED_MASTER_KEY = "koekjes", + CONF_ENS_TIMEOUT = 360000 } = process.env; ``` @@ -31,7 +33,8 @@ example `config.js`: ``` export const { MASTER_PORT = 42069, - PRESHARED_MASTER_KEY = "koekjes" + PRESHARED_MASTER_KEY = "koekjes", + CONF_ENS_TIMEOUT = 360000 } = process.env; ``` diff --git a/master/src/routes/master.js b/master/src/routes/master.js index 10f339d..ef3c28c 100644 --- a/master/src/routes/master.js +++ b/master/src/routes/master.js @@ -1,7 +1,8 @@ import express, { response } from "express"; import { - PRESHARED_MASTER_KEY + PRESHARED_MASTER_KEY, CONF_ENS_TIMEOUT } from "../config"; +const https = require('https'); const storage = require('node-persist'); const masterRouter = express.Router(); const client = require('prom-client'); @@ -36,6 +37,60 @@ register.registerMetric(promAUptimeScore); let isSynced = false; + + +/* + +ENS data from nFrame +[{"domain":null,"address":"0xc3c7c4c8f7061b7d6a72766eee5359fe4f36e61e","timestamp":1659963951567}] + +*/ + + +let ensData = []; +let lastUpdated = 0; + +const updateEns = async function () { + try { + const url = "https://nframe.nl/api/livepeer/getEnsDomains"; + console.log("Getting new ENS data from " + url); + + https.get(url, (res2) => { + let body = ""; + res2.on("data", (chunk) => { + body += chunk; + }); + res2.on("end", () => { + try { + const data = JSON.parse(body); + for (const newOrchData of data) { + if (!newOrchData) { continue; } + if (!newOrchData.domain) { continue; } + var found = false; + for (var orchIdx = 0; orchIdx < ensData.length; orchIdx++) { + if (ensData[orchIdx].address != newOrchData.address) { continue; } + ensData[orchIdx] = newOrchData; + found = true; + break; + } + if (!found) { + ensData.push(newOrchData); + } + } + storage.setItem('ensData', ensData); + } catch (error) { + console.error(error.message); + }; + }); + }).on("error", (error) => { + console.error(error.message); + }); + } catch (err) { + console.log(err); + } +} + + /* Incoming stats parsing @@ -44,7 +99,7 @@ Incoming stats parsing masterRouter.post("/collectStats", async (req, res) => { try { - if (!isSynced){ console.log ("waiting for sync"); res.end('busy'); return;} + if (!isSynced) { console.log("waiting for sync"); res.end('busy'); return; } const { id, discoveryResults, responseTime, tag, key } = req.body; if (!id || !tag || !key) { @@ -58,20 +113,31 @@ masterRouter.post("/collectStats", async (req, res) => { res.send(false); return; } - console.log('received data for ' + id + ' from ' + tag + ' (' + responseTime + " ms latency)"); - if (responseTime){ - promLatestLatency.set({ region: tag, orchestrator: id }, responseTime); + let thisId = id; + if (responseTime) { + for (const thisEns of ensData) { + if (!thisEns || !thisEns.domain); + if (thisEns.address != thisId) { continue; } + thisId = thisEns.domain; + } + promLatestLatency.set({ region: tag, orchestrator: thisId }, responseTime); promLatency.observe({ region: tag }, responseTime); } + console.log('received data for ' + thisId + ' from ' + tag + ' (' + responseTime + " ms latency)"); // Save data point const now = new Date().getTime(); + // Update ENS from nframe if expired + if (now - lastUpdated > CONF_ENS_TIMEOUT) { + await updateEns(); + lastUpdated = now; + } let thisPing = responseTime; if (!discoveryResults || !responseTime) { thisPing = null; } let currentDataList = []; let orchFound = false; let regionFound = false; for (var orchIdx = 0; orchIdx < orchScores.length; orchIdx++) { - if (orchScores[orchIdx].id != id) { continue; } + if (orchScores[orchIdx].id != thisId) { continue; } orchFound = true; for (var regionIdx = 0; regionIdx < orchScores[orchIdx].data.length; regionIdx++) { if (orchScores[orchIdx].data[regionIdx].tag != tag) { continue; } @@ -91,7 +157,7 @@ masterRouter.post("/collectStats", async (req, res) => { } if (!orchFound) { currentDataList = [{ tag, data: [{ latency: thisPing, timestamp: now }] }]; - orchScores.push({ id, data: currentDataList }); + orchScores.push({ id: thisId, data: currentDataList }); } await storage.setItem('orchScores', orchScores); // Calc new scores @@ -105,7 +171,7 @@ masterRouter.post("/collectStats", async (req, res) => { if (thisData.latency) { pingsum += thisData.latency; pingpoints += 1; - promLatestLatency.set({ region: tag, orchestrator: id }, thisData.latency); + promLatestLatency.set({ region: tag, orchestrator: thisId }, thisData.latency); promLatency.observe({ region: tag }, thisData.latency); } // Only count *time vars if we have timestamps @@ -119,11 +185,11 @@ masterRouter.post("/collectStats", async (req, res) => { prevtime = thisData.timestamp; } if (pingpoints) { - promAverageLatency.set({ region: tag, orchestrator: id }, pingsum / pingpoints); + promAverageLatency.set({ region: tag, orchestrator: thisId }, pingsum / pingpoints); } if (uptime || downtime) { const score = uptime / (uptime + downtime) - promAUptimeScore.set({ region: tag, orchestrator: id }, score); + promAUptimeScore.set({ region: tag, orchestrator: thisId }, score); } res.send(true); } catch (err) { @@ -178,6 +244,8 @@ const recoverStorage = async function () { ttl: false, forgiveParseErrors: false }); + ensData = await storage.getItem('ensData'); + if (!ensData) { ensData = []; } orchScores = await storage.getItem('orchScores'); if (!orchScores) { orchScores = []; } // Init prometheus from storage