Added ENS

This commit is contained in:
Marco van Dijk 2022-08-09 14:57:03 +02:00
parent 32e398d4d0
commit 39961eda0c
4 changed files with 88 additions and 15 deletions

View File

@ -4,7 +4,7 @@ This program consists of three components:
- TODO integration with email / telegram - TODO integration with email / telegram
- TODO api to subscribe for notifications and cancel it - TODO api to subscribe for notifications and cancel it
- TODO read test stream scores - TODO read test stream scores
- TODO allow custom names (ENS, configure via frontend) to be used over orch address - TODO allow custom names (configure via frontend, ENS done)
- A client which pushes stats to master - A client which pushes stats to master
- Gets the list of active Orchestrators from the Livepeer subgraph - Gets the list of active Orchestrators from the Livepeer subgraph
- Does GetOrchestrator calls to all Orchestrators one by one - Does GetOrchestrator calls to all Orchestrators one by one

View File

@ -170,6 +170,7 @@ function hexToBytes(hex) {
} }
const discoverOrchestrator = async function (target) { const discoverOrchestrator = async function (target) {
if (!target) { return; }
var client = new livepeerProto.Orchestrator(target, ssl_creds); var client = new livepeerProto.Orchestrator(target, ssl_creds);
const addr = CONF_BROADCASTER; const addr = CONF_BROADCASTER;
var receivedResults = false; var receivedResults = false;
@ -194,6 +195,7 @@ const discoverOrchestrator = async function (target) {
} }
const pingOrchestrator = async function (target) { const pingOrchestrator = async function (target) {
if (!target) { return; }
var client = new livepeerProto.Orchestrator(target, ssl_creds); var client = new livepeerProto.Orchestrator(target, ssl_creds);
var receivedResults = false; var receivedResults = false;
var pingPong; var pingPong;
@ -218,7 +220,7 @@ const testOrchestrator = async function (id, target) {
target = target.replace(/^https?:\/\//, ''); target = target.replace(/^https?:\/\//, '');
console.log("Target is " + target); console.log("Target is " + target);
const { discoveryResults, elapsed } = await discoverOrchestrator(target); const { discoveryResults, elapsed } = await discoverOrchestrator(target);
if (discoveryResults && discoveryResults == "insufficient sender reserve"){ if (discoveryResults && discoveryResults == "insufficient sender reserve") {
console.log('Ignoring ' + id + ' for stats due to insufficient sender reserve'); console.log('Ignoring ' + id + ' for stats due to insufficient sender reserve');
return; return;
} }

View File

@ -3,7 +3,8 @@
``` ```
{ {
MASTER_PORT: what port the master is listening on MASTER_PORT: what port the master is listening on
PRESHARED_MASTER_KEY: clients must provide this value when uploading stats PRESHARED_MASTER_KEY: clients must provide this value when uploading stats,
CONF_ENS_TIMEOUT: timeout in ms after which it will pull new ENS domain names from nFrame
} }
``` ```
### Run production ### Run production
@ -16,7 +17,8 @@ example `config.js`:
``` ```
export const { export const {
MASTER_PORT = 42069, MASTER_PORT = 42069,
PRESHARED_MASTER_KEY = "koekjes" PRESHARED_MASTER_KEY = "koekjes",
CONF_ENS_TIMEOUT = 360000
} = process.env; } = process.env;
``` ```
@ -31,7 +33,8 @@ example `config.js`:
``` ```
export const { export const {
MASTER_PORT = 42069, MASTER_PORT = 42069,
PRESHARED_MASTER_KEY = "koekjes" PRESHARED_MASTER_KEY = "koekjes",
CONF_ENS_TIMEOUT = 360000
} = process.env; } = process.env;
``` ```

View File

@ -1,7 +1,8 @@
import express, { response } from "express"; import express, { response } from "express";
import { import {
PRESHARED_MASTER_KEY PRESHARED_MASTER_KEY, CONF_ENS_TIMEOUT
} from "../config"; } from "../config";
const https = require('https');
const storage = require('node-persist'); const storage = require('node-persist');
const masterRouter = express.Router(); const masterRouter = express.Router();
const client = require('prom-client'); const client = require('prom-client');
@ -36,6 +37,60 @@ register.registerMetric(promAUptimeScore);
let isSynced = false; let isSynced = false;
/*
ENS data from nFrame
[{"domain":null,"address":"0xc3c7c4c8f7061b7d6a72766eee5359fe4f36e61e","timestamp":1659963951567}]
*/
let ensData = [];
let lastUpdated = 0;
const updateEns = async function () {
try {
const url = "https://nframe.nl/api/livepeer/getEnsDomains";
console.log("Getting new ENS data from " + url);
https.get(url, (res2) => {
let body = "";
res2.on("data", (chunk) => {
body += chunk;
});
res2.on("end", () => {
try {
const data = JSON.parse(body);
for (const newOrchData of data) {
if (!newOrchData) { continue; }
if (!newOrchData.domain) { continue; }
var found = false;
for (var orchIdx = 0; orchIdx < ensData.length; orchIdx++) {
if (ensData[orchIdx].address != newOrchData.address) { continue; }
ensData[orchIdx] = newOrchData;
found = true;
break;
}
if (!found) {
ensData.push(newOrchData);
}
}
storage.setItem('ensData', ensData);
} catch (error) {
console.error(error.message);
};
});
}).on("error", (error) => {
console.error(error.message);
});
} catch (err) {
console.log(err);
}
}
/* /*
Incoming stats parsing Incoming stats parsing
@ -44,7 +99,7 @@ Incoming stats parsing
masterRouter.post("/collectStats", async (req, res) => { masterRouter.post("/collectStats", async (req, res) => {
try { try {
if (!isSynced){ console.log ("waiting for sync"); res.end('busy'); return;} if (!isSynced) { console.log("waiting for sync"); res.end('busy'); return; }
const { id, discoveryResults, const { id, discoveryResults,
responseTime, tag, key } = req.body; responseTime, tag, key } = req.body;
if (!id || !tag || !key) { if (!id || !tag || !key) {
@ -58,20 +113,31 @@ masterRouter.post("/collectStats", async (req, res) => {
res.send(false); res.send(false);
return; return;
} }
console.log('received data for ' + id + ' from ' + tag + ' (' + responseTime + " ms latency)"); let thisId = id;
if (responseTime){ if (responseTime) {
promLatestLatency.set({ region: tag, orchestrator: id }, responseTime); for (const thisEns of ensData) {
if (!thisEns || !thisEns.domain);
if (thisEns.address != thisId) { continue; }
thisId = thisEns.domain;
}
promLatestLatency.set({ region: tag, orchestrator: thisId }, responseTime);
promLatency.observe({ region: tag }, responseTime); promLatency.observe({ region: tag }, responseTime);
} }
console.log('received data for ' + thisId + ' from ' + tag + ' (' + responseTime + " ms latency)");
// Save data point // Save data point
const now = new Date().getTime(); const now = new Date().getTime();
// Update ENS from nframe if expired
if (now - lastUpdated > CONF_ENS_TIMEOUT) {
await updateEns();
lastUpdated = now;
}
let thisPing = responseTime; let thisPing = responseTime;
if (!discoveryResults || !responseTime) { thisPing = null; } if (!discoveryResults || !responseTime) { thisPing = null; }
let currentDataList = []; let currentDataList = [];
let orchFound = false; let orchFound = false;
let regionFound = false; let regionFound = false;
for (var orchIdx = 0; orchIdx < orchScores.length; orchIdx++) { for (var orchIdx = 0; orchIdx < orchScores.length; orchIdx++) {
if (orchScores[orchIdx].id != id) { continue; } if (orchScores[orchIdx].id != thisId) { continue; }
orchFound = true; orchFound = true;
for (var regionIdx = 0; regionIdx < orchScores[orchIdx].data.length; regionIdx++) { for (var regionIdx = 0; regionIdx < orchScores[orchIdx].data.length; regionIdx++) {
if (orchScores[orchIdx].data[regionIdx].tag != tag) { continue; } if (orchScores[orchIdx].data[regionIdx].tag != tag) { continue; }
@ -91,7 +157,7 @@ masterRouter.post("/collectStats", async (req, res) => {
} }
if (!orchFound) { if (!orchFound) {
currentDataList = [{ tag, data: [{ latency: thisPing, timestamp: now }] }]; currentDataList = [{ tag, data: [{ latency: thisPing, timestamp: now }] }];
orchScores.push({ id, data: currentDataList }); orchScores.push({ id: thisId, data: currentDataList });
} }
await storage.setItem('orchScores', orchScores); await storage.setItem('orchScores', orchScores);
// Calc new scores // Calc new scores
@ -105,7 +171,7 @@ masterRouter.post("/collectStats", async (req, res) => {
if (thisData.latency) { if (thisData.latency) {
pingsum += thisData.latency; pingsum += thisData.latency;
pingpoints += 1; pingpoints += 1;
promLatestLatency.set({ region: tag, orchestrator: id }, thisData.latency); promLatestLatency.set({ region: tag, orchestrator: thisId }, thisData.latency);
promLatency.observe({ region: tag }, thisData.latency); promLatency.observe({ region: tag }, thisData.latency);
} }
// Only count *time vars if we have timestamps // Only count *time vars if we have timestamps
@ -119,11 +185,11 @@ masterRouter.post("/collectStats", async (req, res) => {
prevtime = thisData.timestamp; prevtime = thisData.timestamp;
} }
if (pingpoints) { if (pingpoints) {
promAverageLatency.set({ region: tag, orchestrator: id }, pingsum / pingpoints); promAverageLatency.set({ region: tag, orchestrator: thisId }, pingsum / pingpoints);
} }
if (uptime || downtime) { if (uptime || downtime) {
const score = uptime / (uptime + downtime) const score = uptime / (uptime + downtime)
promAUptimeScore.set({ region: tag, orchestrator: id }, score); promAUptimeScore.set({ region: tag, orchestrator: thisId }, score);
} }
res.send(true); res.send(true);
} catch (err) { } catch (err) {
@ -178,6 +244,8 @@ const recoverStorage = async function () {
ttl: false, ttl: false,
forgiveParseErrors: false forgiveParseErrors: false
}); });
ensData = await storage.getItem('ensData');
if (!ensData) { ensData = []; }
orchScores = await storage.getItem('orchScores'); orchScores = await storage.getItem('orchScores');
if (!orchScores) { orchScores = []; } if (!orchScores) { orchScores = []; }
// Init prometheus from storage // Init prometheus from storage