diff --git a/.gitignore b/.gitignore index fb9a937..e9efd7c 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,3 @@ -master/*.md -client/*.md master/.node-persist/storage/* +notes.md +genSignature/* \ No newline at end of file diff --git a/README.md b/README.md index cfdc878..9465260 100644 --- a/README.md +++ b/README.md @@ -1,11 +1,11 @@ This program consists of three components: - A master which consolidates orchestrator statistics from various clients - - Converts raw results of grpc calls into prometheus data points + - Publishes GetOrchestrator responses and discovery time + - Publishes orchestrator price info + - Observes Livepeer Inc performance leaderboards results to Prometheus - TODO integration with email / telegram - TODO api to subscribe for notifications and cancel it - - TODO read test stream scores - - TODO allow custom names (configure via frontend, ENS done) - A client which pushes stats to master - Gets the list of active Orchestrators from the Livepeer subgraph - - Does GetOrchestrator calls to all Orchestrators one by one - - A frontend where users can (un)subscribe to notifications \ No newline at end of file + - Test each of them and upload results + - TODO A frontend where users can (un)subscribe to notifications \ No newline at end of file diff --git a/client/.gitignore b/client/.gitignore index fae52e8..ccda033 100644 --- a/client/.gitignore +++ b/client/.gitignore @@ -1,4 +1,3 @@ /node_modules -/src/config.js -genSig.js +src/config.js package-lock.json \ No newline at end of file diff --git a/client/README.md b/client/README.md index 963d32a..41d8633 100644 --- a/client/README.md +++ b/client/README.md @@ -3,20 +3,28 @@ Requires a Broadcaster with some reserve amount set. TODO ### Config variables ``` { - CLIENT_PORT: what port the client accepts API requests on - MASTER_PORT: what port the master should be listening on - MASTER_DOMAIN: the domain on which the master is hosted - MASTER_PATH: the path to the collectStats endpoint. Should be `/api/master/collectStats` unless you are running a reverse proxy - FRIENDLY_NAME: `region` label which gets attached to collected data - PRESHARED_MASTER_KEY: must be the same as the `PRESHARED_MASTER_KEY` on the master - CONF_SLEEPTIME: time between testing Orchestrators in milliseconds - CONF_ORCHINFO_TIMEOUT: timeout for refreshing the list of active orchestrators in milliseconds - CONF_BROADCASTER: eth address of the broadcaster - CONT_SIG = broadcasters' signature of their eth address - reach out on how to get this + CONF_MASTER_PORT: Port to upload results to + CONF_MASTER_DOMAIN: Domain to upload results to + CONF_MASTER_PATH: Path to the upload endpoint in case there's a reverse proxy + CONF_FRIENDLY_NAME: Name to attach to the prober, the city name usually works + CONF_REGION: Continent the prober is located in + CONF_LP_REGIONS: Livepeer leaderboard regions which map to locally resolved orchestrators + CONF_CONCURRENCY: Amount of orchestrators to test at the same time + CONF_SLEEPTIME: Time to sleep in the main loop + CONF_MAX_LATENCY: Max GetOrch discovery time before we set it to 0 + CONF_ROUNDTIME: Minimum amount of time between batch-checks + CONF_ORCHINFO_TIMEOUT: Timeout between refreshing the active O list + CONF_BROADCASTER: Public address of the broadcaster to mimic + CONF_DNS_TIMEOUT: Timeout between DNS & GEO resolving for orchestrator instances + CONF_PRESHARED_MASTER_KEY: password for uploading to the hodler + CONF_SIGNATURE: Broadcaster signature over a message containing it's own public address } ``` + +Obtaining the `CONF_SIGNATURE` is unfortunately a pain in the ass. The only way for now it add a print statement to the `livepeer/server/rpc.go/genOrchestratorReq` function in `go-livepeer` and then run the Broadcaster. All web3 libraries prepend a prefix to any message they sign for security purposes, so a script which can generate the signature from the private key would be nice to have here... + ### Run production -Note: this folder has to be placed in `/orchTest/client` +Note: this folder has to be placed in `/orchTest/client`, or edit `ecosystem.config.js` to match the new location npm install nano src/config.js @@ -24,17 +32,21 @@ Note: this folder has to be placed in `/orchTest/client` example `config.js`: ``` export const { - CLIENT_PORT = 42068, - MASTER_PORT = 443, - MASTER_DOMAIN = "nframe.nl", - MASTER_PATH = "/orch/collectStats", - FRIENDLY_NAME = "Chicago", - PRESHARED_MASTER_KEY = "koekjes", - CONF_SLEEPTIME = 2000, - CONF_ORCHINFO_TIMEOUT = 600000, + CONF_MASTER_PORT = 443, + CONF_MASTER_DOMAIN = "stronk.rocks", + CONF_MASTER_PATH = "/orch/collectStats", + CONF_FRIENDLY_NAME = "Michigan", + CONF_REGION = "Europe", + CONF_LP_REGIONS = ["FRA", "LON", "PRA"], + CONF_CONCURRENCY = 6, + CONF_SLEEPTIME = 2000, //< 2 seconds + CONF_MAX_LATENCY = 2000, //< 2 seconds + CONF_ROUNDTIME = 60000, //< 1 minute + CONF_ORCHINFO_TIMEOUT = 14400000, //< 4 hours CONF_BROADCASTER = "847791cBF03be716A7fe9Dc8c9Affe17Bd49Ae5e", - CONT_SIG = Buffer [array of uint8] - + CONF_DNS_TIMEOUT = 600000, //< 10 minutes + CONF_PRESHARED_MASTER_KEY = "koekjes", + CONF_SIGNATURE = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], } = process.env; ``` @@ -49,16 +61,21 @@ export const { example `config.js`: ``` export const { - CLIENT_PORT = 42068, - MASTER_PORT = 42069, - MASTER_DOMAIN = "127.0.0.1", - MASTER_PATH = "/api/master/collectStats", - FRIENDLY_NAME = "Leiden", - PRESHARED_MASTER_KEY = "koekjes", - CONF_SLEEPTIME = 5000, - CONF_ORCHINFO_TIMEOUT = 100000, + CONF_MASTER_PORT = 42069, + CONF_MASTER_DOMAIN = "127.0.0.1", + CONF_MASTER_PATH = "/api/master/collectStats", + CONF_FRIENDLY_NAME = "Leiden", + CONF_REGION = "Europe", + CONF_LP_REGIONS = ["FRA", "LON", "PRA"], + CONF_CONCURRENCY = 6, + CONF_SLEEPTIME = 2000, //< 2 seconds + CONF_MAX_LATENCY = 2000, //< 2 seconds + CONF_ROUNDTIME = 60000, //< 1 minute + CONF_ORCHINFO_TIMEOUT = 14400000, //< 4 hours CONF_BROADCASTER = "847791cBF03be716A7fe9Dc8c9Affe17Bd49Ae5e", - CONT_SIG = Buffer [array of uint8] + CONF_DNS_TIMEOUT = 600000, //< 10 minutes + CONF_PRESHARED_MASTER_KEY = "koekjes", + CONF_SIGNATURE = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], } = process.env; ``` diff --git a/client/ecosystem.config.js b/client/ecosystem.config.js index 06a1bde..7548289 100644 --- a/client/ecosystem.config.js +++ b/client/ecosystem.config.js @@ -1,8 +1,8 @@ module.exports = { apps: [ { - name: "orchProber", - script: "./src/orchProber.js", + name: "client-orchestrator-prober", + script: "./src/client.js", cwd: "/orchTest/client", env_production: { NODE_ENV: "production" diff --git a/client/package.json b/client/package.json index 9f5bd60..72c31db 100644 --- a/client/package.json +++ b/client/package.json @@ -1,14 +1,15 @@ { - "name": "orchProber", - "version": "0.0.1", + "name": "client-orchestrator-prober", + "version": "0.1.0", "description": "", - "main": "./src/orchProber.js", - "module": "./src/orchProber.js", + "main": "./src/client.js", + "type": "module", + "module": "./src/client.js", "scripts": { "prod": "NODE_ENV=production pm2 start ecosystem.config.js", - "start": "NODE_ENV=production node ./src/orchProber.js", - "dev": "NODE_ENV=development nodemon ./src/orchProber.js", - "local": "NODE_ENV=local nodemon ./src/orchProber.js" + "start": "NODE_ENV=production node ./src/client.js", + "dev": "NODE_ENV=development nodemon ./src/client.js", + "local": "NODE_ENV=local nodemon ./src/client.js" }, "keywords": [], "author": "Marco van Dijk", @@ -16,13 +17,9 @@ "dependencies": { "@grpc/grpc-js": "^1.6.8", "@grpc/proto-loader": "^0.7.0", - "esm": "^3.2.20", - "express": "^4.17.1", "geoip-lite": "^1.4.8", "graphql": "^16.5.0", - "graphql-request": "^4.3.0", - "npm": "^8.5.2", - "web3": "^1.7.5" + "graphql-request": "^4.3.0" }, "devDependencies": { "nodemon": "^1.18.10" diff --git a/client/src/client.js b/client/src/client.js index 97024df..485ed3c 100644 --- a/client/src/client.js +++ b/client/src/client.js @@ -1,33 +1,7 @@ -import express from 'express'; -import { orchTesterRouter } from './routes/index'; -import { CLIENT_PORT } from "./config"; -// Env variable which determines which DB to connect to -const { NODE_ENV: mode } = process.env; +import { runTests } from "./util/orchTester.js"; (async () => { - try { - const app = express(); - app.disable('x-powered-by'); - app.use(express.urlencoded({ extended: true })); - app.use(express.json()); - - const apiRouter = express.Router(); - app.use('/api', apiRouter); - apiRouter.use('/tester', orchTesterRouter); - // Error handler - app.use(function (err, req, res, next) { - res.locals.message = err.message; - // Also log it to the console - console.log(`${err.status || 500} - ${err.message} - ${req.originalUrl} - ${req.method} - ${req.ip}`); - // Render the error page - res.status(err.status || 500); - res.render('error'); - }); - - app.listen(CLIENT_PORT, "0.0.0.0", function () { - console.log(`Listening on port ${CLIENT_PORT}`); - }); - } catch (err) { - console.log(err); - } -})(); \ No newline at end of file + // On first boot, kickstart the test loop + console.log("Starting main loop..."); + runTests(); +})(); diff --git a/client/src/orchProber.js b/client/src/orchProber.js deleted file mode 100644 index 4d5caa6..0000000 --- a/client/src/orchProber.js +++ /dev/null @@ -1,2 +0,0 @@ -require = require("esm")(module) -module.exports = require("./client.js") \ No newline at end of file diff --git a/client/src/routes/index.js b/client/src/routes/index.js deleted file mode 100644 index 28a407e..0000000 --- a/client/src/routes/index.js +++ /dev/null @@ -1,2 +0,0 @@ -import orchTesterRouter from './orchTester'; -export { orchTesterRouter }; \ No newline at end of file diff --git a/client/src/routes/orchTester.js b/client/src/util/orchTester.js similarity index 50% rename from client/src/routes/orchTester.js rename to client/src/util/orchTester.js index aed6068..6d13102 100644 --- a/client/src/routes/orchTester.js +++ b/client/src/util/orchTester.js @@ -1,22 +1,28 @@ -import express from "express"; -const dns = require("dns"); -var geoip = require("geoip-lite"); +import dns from "dns"; +import geoip from "geoip-lite"; +import { request, gql } from "graphql-request"; +import https from "https"; +import http from "http"; +import grpc from "@grpc/grpc-js"; +import protoLoader from "@grpc/proto-loader"; -const orchTesterRouter = express.Router(); import { - MASTER_DOMAIN, - MASTER_PORT, - MASTER_PATH, - FRIENDLY_NAME, - PRESHARED_MASTER_KEY, + CONF_MASTER_PORT, + CONF_MASTER_DOMAIN, + CONF_MASTER_PATH, + CONF_FRIENDLY_NAME, + CONF_REGION, + CONF_LP_REGIONS, + CONF_MAX_LATENCY, + CONF_ROUNDTIME, + CONF_CONCURRENCY, CONF_SLEEPTIME, - CONT_SIG, CONF_ORCHINFO_TIMEOUT, CONF_BROADCASTER, CONF_DNS_TIMEOUT, -} from "../config"; - -process.env.NODE_TLS_REJECT_UNAUTHORIZED = "0"; + CONF_PRESHARED_MASTER_KEY, + CONF_SIGNATURE, +} from "../config.js"; /* @@ -25,11 +31,6 @@ imported modules */ -import { request, gql } from "graphql-request"; -const https = require("https"); -const http = require("http"); -var grpc = require("@grpc/grpc-js"); -var protoLoader = require("@grpc/proto-loader"); var packageDefinition = protoLoader.loadSync("src/proto/livepeer.proto", { keepCase: true, longs: String, @@ -41,6 +42,23 @@ var livepeerProto = grpc.loadPackageDefinition(packageDefinition).net; const ssl_creds = grpc.credentials.createSsl(null, null, null, { checkServerIdentity: () => undefined, }); +// Since go-livepeer uses self-signed certificates or something +process.env.NODE_TLS_REJECT_UNAUTHORIZED = "0"; + +/* + +Global variables + +*/ + +var activeOrchestrators = []; //< List of all active orchs pulled from the graph +var orchestratorsLastUpdated = 0; //< Used to refresh above list periodically +var lastRoundStart = 0; +let orchDNS = {}; //< Caches DNS and GEO lookups +let currentPool = []; //< Current working set of Orchestrators to test +let cycle = 0; //< Rounds of testing the script has done +let batchResults = {}; //< Current cache of results to batch upload +let currentPending = 0; //< Current outstanding requests /* @@ -68,19 +86,213 @@ function shuffle(array) { return array; } +function hexToBytes(hex) { + for (var bytes = [], c = 0; c < hex.length; c += 2) + bytes.push(parseInt(hex.substr(c, 2), 16)); + return bytes; +} + +/* + +Doing grpc calls to an orchestrator and uploading to the hodler + +*/ + +const batchPostStats = async function () { + var postData = JSON.stringify({ + batchResults: batchResults, + tag: CONF_FRIENDLY_NAME, + key: CONF_PRESHARED_MASTER_KEY, + region: CONF_REGION, + livepeer_regions: CONF_LP_REGIONS, + }); + var options = { + hostname: CONF_MASTER_DOMAIN, + port: CONF_MASTER_PORT, + path: CONF_MASTER_PATH, + method: "POST", + headers: { + "Content-Type": "application/json", + "Content-Length": postData.length, + }, + }; + console.log("Uploading stats of " + postData.length + "B"); + var req; + if (CONF_MASTER_DOMAIN == "127.0.0.1" || CONF_MASTER_DOMAIN == "localhost") { + req = http.request(options, (res) => { + res.on("data", (d) => { + process.stdout.write( + "Received response " + d + " from " + CONF_MASTER_DOMAIN + ); + }); + }); + } else { + req = https.request(options, (res) => { + res.on("data", (d) => { + process.stdout.write( + "Received response " + d + " from " + CONF_MASTER_DOMAIN + ); + }); + }); + } + req.on("error", (e) => { + console.error("err", e); + }); + req.write(postData); + req.end(); +}; + +const discoverOrchestrator = async function (target) { + if (!target) { + return; + } + // Try to prevent the connection from being reused + var client = new livepeerProto.Orchestrator(target, ssl_creds, { + GRPC_ARG_DEFAULT_AUTHORITY: Math.random().toString(36).substr(2, 5), + }); + var receivedResults = false; + var orchestratorInfo = {}; + const start = new Date().getTime(); + var elapsed = 0; + await client.GetOrchestrator( + { + address: hexToBytes(CONF_BROADCASTER), + sig: CONF_SIGNATURE, + }, + function (err, res) { + if (err) { + console.log("Discovery error: ", err.details); + orchestratorInfo.err = err.details; + } else { + orchestratorInfo = res; + elapsed = new Date().getTime() - start; + } + receivedResults = true; + } + ); + // Wait for a max of 4 seconds for a callback from the GRPC call + while (!receivedResults && new Date().getTime() - start < CONF_MAX_LATENCY) { + await sleep(20); + } + grpc.closeClient(client); + if (!orchestratorInfo) { + return { + discoveryResults: { + transcoder: null, + price_info: null, + latency: 0, + err: "Took too long to respond. Aborted test...", + }, + }; + } + return { + discoveryResults: { + transcoder: orchestratorInfo.transcoder, + price_info: orchestratorInfo.price_info, + latency: elapsed, + err: orchestratorInfo.err, + }, + }; +}; + +// Resolve hostname to IP +async function getIP(hostname) { + let obj = await dns.promises.lookup(hostname).catch((error) => { + console.error(error); + }); + if (obj) { + return obj.address; + } else { + return null; + } +} + +const testOrchestrator = async function (id, target) { + if (!id.length || !target.length) { + return; + } + const origTarget = new URL(target); + target = target.replace(/^https?:\/\//, ""); + console.log("Testing orchestrator " + target); + // Resolve DNS and GEO + const now = new Date().getTime(); + if (!orchDNS[id] || now - orchDNS[id].lastTime > CONF_DNS_TIMEOUT) { + const resolved = await getIP(origTarget.hostname); + const geo = geoip.lookup(resolved); + let geoObj = null; + if (geo) { + geoObj = { + // country: geo.country, //< commented out these fields + // region: geo.region, // since they're causing issues with JSON.stringify() + // city: geo.city, // prob due to special characters + latitude: geo.ll[0], + longitude: geo.ll[1], + }; + } + orchDNS[id] = { + originalTarget: origTarget.origin, + resolvedTarget: resolved, + geoLookup: geoObj, + geoFrom: CONF_FRIENDLY_NAME, + lastTime: now, + }; + console.log("Updated DNS and GeoIP data for " + id); + } + // Test orch + const { discoveryResults } = await discoverOrchestrator(target); + if ( + discoveryResults && + discoveryResults.err == "insufficient sender reserve" + ) { + console.log("Ignoring " + id + " due to insufficient sender reserve"); + return; + } + // Cache results + batchResults[id] = { + name: id, + discovery: discoveryResults, + resolv: orchDNS[id], + }; + if (discoveryResults.err) { + batchResults[id].discovery.latency = 0; + } + currentPending--; +}; + +/* + + Batch test logic + +*/ + +const batchTestOrchs = async function () { + // Clear buff + batchResults = {}; + currentPending = 0; + // Keep going until we've got no more todo or pending + while (currentPool.length || currentPending) { + // Concurrent requests + while (currentPending < CONF_CONCURRENCY && currentPool.length) { + let currentOrch = currentPool.splice(0, 1)[0]; + if (!currentOrch.id || !currentOrch.target) { + console.log("Skipping Orchestrator with malformed data: ", currentOrch); + continue; + } + currentPending++; + testOrchestrator(currentOrch.id, currentOrch.target); + } + await sleep(50); + } + batchPostStats(); +}; + /* Refreshing active orchestrators Pulls this data from the Livepeer subgraph (https://api.thegraph.com/subgraphs/name/livepeer/arbitrum-one/graphql) -We might want to switch to reading directly from the blockchain -but this would require constant watching for uri updates which is a pain to implement */ -var activeOrchestrators = []; -let lastUpdated = 0; -let orchDNS = {}; - /// Does a GQL query to the subgraph for orchestrator data const getOrchestrators = async function () { console.log("Getting orchestrator data from the subgraph..."); @@ -116,190 +328,16 @@ const getOrchestrators = async function () { const refreshOrchCache = async function () { const now = new Date().getTime(); // Update cmc once their data has expired - if (now - lastUpdated > CONF_ORCHINFO_TIMEOUT) { + if (now - orchestratorsLastUpdated > CONF_ORCHINFO_TIMEOUT) { const data = await getOrchestrators(); if (data) { activeOrchestrators = data; - lastUpdated = now; + orchestratorsLastUpdated = now; } } }; -/* - -Doing grpc calls to an orchestrator - -*/ - -let currentPool = []; - -const postStatistics = async function ( - id, - discoveryResults, - lookupResults, - responseTime -) { - console.log("Posting stats for " + id + " (ping " + responseTime + " ms)"); - - // TODO look at response and log error? - var postData = JSON.stringify({ - id, - discoveryResults, - responseTime, - lookupResults, - tag: FRIENDLY_NAME, - key: PRESHARED_MASTER_KEY, - }); - var options = { - hostname: MASTER_DOMAIN, - port: MASTER_PORT, - path: MASTER_PATH, - method: "POST", - headers: { - "Content-Type": "application/json", - "Content-Length": postData.length, - }, - }; - var req; - if (MASTER_DOMAIN == "127.0.0.1" || MASTER_DOMAIN == "localhost") { - req = http.request(options, (res) => { - // console.log('statusCode:', res.statusCode); - // console.log('headers:', res.headers); - - res.on("data", (d) => { - process.stdout.write( - "Received response " + d + " from " + MASTER_DOMAIN - ); - }); - }); - } else { - req = https.request(options, (res) => { - // console.log('statusCode:', res.statusCode); - // console.log('headers:', res.headers); - - res.on("data", (d) => { - process.stdout.write( - "Received response " + d + " from " + MASTER_DOMAIN - ); - }); - }); - } - req.on("error", (e) => { - console.error("err", e); - }); - req.write(postData); - req.end(); -}; - -function hexToBytes(hex) { - for (var bytes = [], c = 0; c < hex.length; c += 2) - bytes.push(parseInt(hex.substr(c, 2), 16)); - return bytes; -} - -const discoverOrchestrator = async function (target) { - if (!target) { - return; - } - var client = new livepeerProto.Orchestrator(target, ssl_creds, { - GRPC_ARG_DEFAULT_AUTHORITY: Math.random().toString(36).substr(2, 5), - }); - var receivedResults = false; - var orchestratorInfo; - const start = new Date().getTime(); - var elapsed = null; - await client.GetOrchestrator( - { - address: hexToBytes(CONF_BROADCASTER), - sig: CONT_SIG, - }, - function (err, res) { - if (err) { - console.log("Discovery error: ", err.details); - orchestratorInfo = err.details; - elapsed = null; - } else { - orchestratorInfo = res; - elapsed = new Date().getTime() - start; - } - receivedResults = true; - } - ); - while (!receivedResults && new Date().getTime() - start < 4000) { - await sleep(20); - } - grpc.closeClient(client); - return { discoveryResults: orchestratorInfo, elapsed }; -}; - -const pingOrchestrator = async function (target) { - if (!target) { - return; - } - var client = new livepeerProto.Orchestrator(target, ssl_creds); - var receivedResults = false; - var pingPong; - const start = new Date().getTime(); - var elapsed = null; - await client.Ping({ value: "koekjes" }, function (err, res) { - if (err) { - console.log("Ping err: ", err.details); - pingPong = err.details; - elapsed = null; - } else { - pingPong = res; - elapsed = new Date().getTime() - start; - } - receivedResults = true; - }); - while (!receivedResults && new Date().getTime() - start < 4000) { - await sleep(20); - } - return { pingResults: pingPong, elapsed }; -}; - -async function getIP(hostname) { - let obj = await dns.promises.lookup(hostname).catch((error) => { - console.error(error); - }); - if (obj) { - return obj.address; - } else { - return null; - } -} - -const testOrchestrator = async function (id, target) { - if (!id.length || !target.length) { - return; - } - const origTarget = new URL(target); - target = target.replace(/^https?:\/\//, ""); - console.log("Target is " + target); - // Resolve DNS - const now = new Date().getTime(); - if (!orchDNS[id] || now - orchDNS[id].lastTime > CONF_DNS_TIMEOUT) { - const resolved = await getIP(origTarget.hostname); - orchDNS[id] = { - originalTarget: origTarget.origin, - resolvedTarget: resolved, - geoLookup: geoip.lookup(resolved), - geoFrom: FRIENDLY_NAME, - lastTime: now, - }; - console.log("Updated DNS and GeoIP data: ", orchDNS[id]); - } - // Test orch - const { discoveryResults, elapsed } = await discoverOrchestrator(target); - if (discoveryResults && discoveryResults == "insufficient sender reserve") { - console.log( - "Ignoring " + id + " for stats due to insufficient sender reserve" - ); - return; - } - await postStatistics(id, discoveryResults, orchDNS[id], elapsed); -}; - +// Creates a new working set of orchs to test const refreshPool = function () { currentPool = []; for (const thisObj of activeOrchestrators) { @@ -308,51 +346,36 @@ const refreshPool = function () { shuffle(currentPool); }; -const pingNextOrch = async function () { - if (!currentPool.length) { - refreshPool(); - } - let currentOrch = currentPool.splice(0, 1)[0]; - if (!currentOrch.id || !currentOrch.target) { - console.log("Skipping Orchestrator with malformed data: ", currentOrch); - return; - } - await testOrchestrator(currentOrch.id, currentOrch.target); -}; - /* Main Loop -Maybe we shouldn't use nodejs as a client... */ -let cycle = 0; -let isSyncing = false; -// Does the actual looping over last parsed block -> latest block in chain -const handleSync = async function () { +const runTests = async function () { try { - cycle++; - console.log("Starting new cycle #" + cycle); - isSyncing = true; - await refreshOrchCache(); - await pingNextOrch(); - isSyncing = false; + const now = new Date().getTime(); + if (!lastRoundStart || now - lastRoundStart > CONF_ROUNDTIME) { + cycle++; + console.log("Starting new cycle #" + cycle); + // If stale, retrieve new set of active orchestrators + await refreshOrchCache(); + // Create a new shuffled working set of orchs to test + refreshPool(); + // Test all orchs in working set + await batchTestOrchs(); + lastRoundStart = now; + } setTimeout(() => { - handleSync(); + runTests(); }, CONF_SLEEPTIME); return; } catch (err) { console.log(err); - isSyncing = false; setTimeout(() => { - handleSync(); + runTests(); }, CONF_SLEEPTIME); } }; -if (!isSyncing) { - console.log("Starting main loop"); - handleSync(); -} -export default orchTesterRouter; +export { runTests }; diff --git a/master/README.md b/master/README.md index 04b2b96..bfea9e9 100644 --- a/master/README.md +++ b/master/README.md @@ -2,9 +2,14 @@ ### Config variables ``` { - MASTER_PORT: what port the master is listening on - PRESHARED_MASTER_KEY: clients must provide this value when uploading stats, - CONF_ENS_TIMEOUT: timeout in ms after which it will pull new ENS domain names from nFrame + CONF_API_L1_HTTP: HTTP Uri to L1 RPC provider + CONF_API_L1_KEY: and it's api key, + CONF_TIMEOUT_ENS_DOMAIN: timeout before refreshing an orchs ENS domain name + CONF_KEY_EXPIRY: timeout before clearing certain orch instance data + CONF_MASTER_PORT: port to accept uploads on from probers + CONF_PRESHARED_MASTER_KEY: password for uploading + CONF_SCORE_TIMEOUT: timeout for checking if some new test stream results came in + CONF_SLEEPTIME: Time to sleep in the main loop } ``` ### Run production @@ -16,9 +21,14 @@ Note: this folder has to be placed in `/orchTest/master` example `config.js`: ``` export const { - MASTER_PORT = 42069, - PRESHARED_MASTER_KEY = "koekjes", - CONF_ENS_TIMEOUT = 360000 + CONF_API_L1_HTTP = "https://eth-mainnet.alchemyapi.io/v2/", + CONF_API_L1_KEY = "koekjes", + CONF_TIMEOUT_ENS_DOMAIN = 7200000, //< 2 hours + CONF_KEY_EXPIRY = 3600000, //< 1 hour + CONF_MASTER_PORT = 42069, + CONF_PRESHARED_MASTER_KEY = "koekjes", + CONF_SCORE_TIMEOUT = 300000, //< 5 minutes + CONF_SLEEPTIME = 2000, //< 2 seconds } = process.env; ``` @@ -32,11 +42,17 @@ export const { example `config.js`: ``` export const { - MASTER_PORT = 42069, - PRESHARED_MASTER_KEY = "koekjes", - CONF_ENS_TIMEOUT = 360000 + CONF_API_L1_HTTP = "https://eth-mainnet.alchemyapi.io/v2/", + CONF_API_L1_KEY = "koekjes", + CONF_TIMEOUT_ENS_DOMAIN = 7200000, //< 2 hours + CONF_KEY_EXPIRY = 3600000, //< 1 hour + CONF_MASTER_PORT = 42069, + CONF_PRESHARED_MASTER_KEY = "koekjes", + CONF_SCORE_TIMEOUT = 60000, //< 1 minute + CONF_SLEEPTIME = 2000, //< 2 seconds } = process.env; ``` npm run dev + diff --git a/master/ecosystem.config.js b/master/ecosystem.config.js index 0466b2f..4053ffc 100644 --- a/master/ecosystem.config.js +++ b/master/ecosystem.config.js @@ -1,8 +1,8 @@ module.exports = { apps: [ { - name: "hodler", - script: "./src/hodler.js", + name: "master-hodler", + script: "./src/master.js", cwd: "/orchTest/master", env_production: { NODE_ENV: "production" diff --git a/master/package.json b/master/package.json index 98ed400..844396f 100644 --- a/master/package.json +++ b/master/package.json @@ -1,22 +1,22 @@ { - "name": "hodler", - "version": "0.0.1", + "name": "master-hodler", + "version": "0.1.0", "description": "", - "main": "./src/hodler.js", - "module": "./src/hodler.js", + "main": "./src/master.js", + "type": "module", + "module": "./src/master.js", "scripts": { "prod": "NODE_ENV=production pm2 start ecosystem.config.js", - "start": "NODE_ENV=production node ./src/hodler.js", - "dev": "NODE_ENV=development nodemon ./src/hodler.js", - "local": "NODE_ENV=local nodemon ./src/hodler.js" + "start": "NODE_ENV=production node ./src/master.js", + "dev": "NODE_ENV=development nodemon ./src/master.js", + "local": "NODE_ENV=local nodemon ./src/master.js" }, "keywords": [], "author": "Marco van Dijk", "license": "WTFPL", "dependencies": { - "esm": "^3.2.20", + "ethers": "^6.8.1", "express": "^4.17.1", - "node-persist": "^3.1.0", "npm": "^8.5.2", "prom-client": "^14.0.1" }, diff --git a/master/src/hodler.js b/master/src/hodler.js deleted file mode 100644 index 77efc98..0000000 --- a/master/src/hodler.js +++ /dev/null @@ -1,2 +0,0 @@ -require = require("esm")(module) -module.exports = require("./server.js") \ No newline at end of file diff --git a/master/src/master.js b/master/src/master.js new file mode 100644 index 0000000..a92fad5 --- /dev/null +++ b/master/src/master.js @@ -0,0 +1,36 @@ +import express from "express"; +import { masterRouter } from "./routes/index.js"; +import { CONF_MASTER_PORT } from "./config.js"; +// Env variable which determines which DB to connect to +const { NODE_ENV: mode } = process.env; + +(async () => { + try { + const app = express(); + app.disable("x-powered-by"); + app.use(express.urlencoded({ extended: true, limit: "1000kb" })); + app.use(express.json({ limit: "1000kb" })); + const apiRouter = express.Router(); + app.use("/api", apiRouter); + apiRouter.use("/master", masterRouter); + // Error handler + app.use(function (err, req, res, next) { + res.locals.message = err.message; + // Also log it to the console + console.log( + `${err.status || 500} - ${err.message} - ${req.originalUrl} - ${ + req.method + } - ${req.ip}` + ); + // Render the error page + res.status(err.status || 500); + res.render("error"); + }); + + app.listen(CONF_MASTER_PORT, "0.0.0.0", function () { + console.log(`Listening on port ${CONF_MASTER_PORT}`); + }); + } catch (err) { + console.log(err); + } +})(); diff --git a/master/src/routes/hodler.js b/master/src/routes/hodler.js new file mode 100644 index 0000000..018dda9 --- /dev/null +++ b/master/src/routes/hodler.js @@ -0,0 +1,490 @@ +import express from "express"; +import client from "prom-client"; +import { JsonRpcProvider } from "ethers"; +import { + CONF_API_L1_HTTP, + CONF_API_L1_KEY, + CONF_PRESHARED_MASTER_KEY, + CONF_TIMEOUT_ENS_DOMAIN, + CONF_KEY_EXPIRY, + CONF_SCORE_TIMEOUT, + CONF_SLEEPTIME, +} from "../config.js"; + +/* + + Init + +*/ + +const l1provider = new JsonRpcProvider(CONF_API_L1_HTTP + CONF_API_L1_KEY); +const masterRouter = express.Router(); +const register = new client.Registry(); + +// Regional stats - prober instance specific +const promLatestLatency = new client.Gauge({ + name: "orch_latest_latency", + help: "Latest latency known for a given Orchestrator", + labelNames: ["region", "orchestrator", "latitude", "longitude"], +}); +register.registerMetric(promLatestLatency); +const promLatency = new client.Summary({ + name: "orch_latency", + help: "Summary of latency stats", + percentiles: [0.01, 0.1, 0.9, 0.99], + labelNames: ["region"], +}); +register.registerMetric(promLatency); +const promAverageLatency = new client.Gauge({ + name: "orch_average_latency", + help: "Average latency for a given Orchestrator", + labelNames: ["region", "orchestrator", "latitude", "longitude"], +}); +register.registerMetric(promAverageLatency); + +// Regional stats - Livepeer test stream specific +const promLatestRTR = new client.Gauge({ + name: "orch_latest_rtr", + help: "Latest realtime ratio as specified by Livepeer inc's regional performance leaderboards", + labelNames: ["livepeer_region", "orchestrator", "latitude", "longitude"], +}); +register.registerMetric(promLatestRTR); +const promLatestSuccessRate = new client.Gauge({ + name: "orch_latest_success_rate", + help: "Latest success rate as specified by Livepeer inc's regional performance leaderboards", + labelNames: ["livepeer_region", "orchestrator", "latitude", "longitude"], +}); +register.registerMetric(promLatestSuccessRate); + +// Global stats - orchestrator instance specific +const promLatestPPP = new client.Gauge({ + name: "orch_latest_ppp", + help: "Latest price per pixel known for a given Orchestrator", + labelNames: ["instance", "orchestrator", "latitude", "longitude"], +}); +register.registerMetric(promLatestPPP); +const promAUptimeScore = new client.Gauge({ + name: "orch_uptime_score", + help: "Uptime score for a given orchestrator", + labelNames: ["instance", "orchestrator", "latitude", "longitude"], +}); +register.registerMetric(promAUptimeScore); + +function sleep(ms) { + return new Promise((resolve) => { + setTimeout(resolve, ms); + }); +} + +/* + + Globals + +*/ + +let ensDomainCache = {}; +let orchCache = {}; +let lastLeaderboardCheck = 0; + +/* + +ENS + +*/ + +const getEnsDomain = async function (addr) { + const now = new Date().getTime(); + const cached = ensDomainCache[addr]; + if (cached && now - cached.timestamp < CONF_TIMEOUT_ENS_DOMAIN) { + return cached.domain || cached.address; + } + // Refresh cause not cached or stale + let ensDomain; + try { + ensDomain = await l1provider.lookupAddress(addr.toLowerCase()); + } catch (err) { + console.log("Error looking up ENS info, retrying..."); + await sleep(50); + return null; + } + let ensObj; + if (!ensDomain) { + ensObj = { + domain: null, + address: addr, + timestamp: now, + }; + } else { + ensObj = { + domain: ensDomain, + address: addr, + timestamp: now, + }; + } + console.log( + "Updated ENS domain " + + ensObj.domain + + " owned by " + + ensObj.address + + " @ " + + ensObj.timestamp + ); + ensDomainCache[addr] = ensObj; + return ensObj.domain || ensObj.address; +}; + +/* + + Update cache & prometheus + +*/ + +const updatePrometheus = async function (tag, id, instance, orchInfo) { + const thisInstance = orchInfo.instances[instance]; + const regionInfo = orchInfo.regionalStats[tag]; + if (regionInfo.latestDiscoveryTime) { + promLatestLatency.set( + { + region: tag, + orchestrator: orchInfo.name, + latitude: thisInstance.latitude, + longitude: thisInstance.longitude, + }, + regionInfo.latestDiscoveryTime + ); + } + if (regionInfo.avgDiscoveryTime) { + promAverageLatency.set( + { + region: tag, + orchestrator: orchInfo.name, + latitude: thisInstance.latitude, + longitude: thisInstance.longitude, + }, + regionInfo.avgDiscoveryTime + ); + } + promAUptimeScore.set( + { + instance: instance, + orchestrator: orchInfo.name, + latitude: thisInstance.latitude, + longitude: thisInstance.longitude, + }, + regionInfo.uptimePercentage + ); + promLatestPPP.set( + { + instance: instance, + orchestrator: orchInfo.name, + latitude: thisInstance.latitude, + longitude: thisInstance.longitude, + }, + thisInstance.price + ); + promLatency.observe({ region: tag }, regionInfo.latestDiscoveryTime); +}; + +const onOrchUpdate = async function (id, obj, tag, region, livepeer_regions) { + const now = new Date().getTime(); + // Overwrite name with ENS domain if set + let ensDomain = null; + while (!ensDomain) { + ensDomain = await getEnsDomain(id); + } + obj.name = ensDomain; + // Retrieve entry to update or init it + let newObj = orchCache[id]; + if (!newObj) { + newObj = { + name: "", + regionalStats: {}, + instances: {}, + leaderboardResults: { lastTime: now }, + }; + } + // Find region entry or init it + let newRegion = newObj.regionalStats[tag]; + if (!newRegion) { + newRegion = { + measurements: [], + avgDiscoveryTime: -1, + uptimePercentage: 1.0, + latestDiscoveryTime: -1, + }; + } + + // Record measurement + let measurement = { + latency: obj.discovery.latency, + timestamp: now, + duration: 0, + }; + if (newRegion.measurements.length) { + measurement.duration = + now - newRegion.measurements[newRegion.measurements.length - 1].timestamp; + } + newRegion.measurements.push(measurement); + + // Recalc average && uptime + let uptime = 0; + let downtime = 0; + let pingSum = 0; + let pingEntries = 0; + for (const measurement of newRegion.measurements) { + if (measurement.latency && measurement.latency > 0) { + if (measurement.duration) { + uptime += measurement.duration; + } + pingSum += measurement.latency; + pingEntries++; + } else { + if (measurement.duration) { + downtime += measurement.duration; + } + } + } + + if (pingEntries > 0) { + newRegion.avgDiscoveryTime = pingSum / pingEntries; + } else { + newRegion.avgDiscoveryTime = measurement.latency; + } + newRegion.latestDiscoveryTime = measurement.latency; + + if ( + downtime || + (!newRegion.avgDiscoveryTime && !newRegion.latestDiscoveryTime) + ) { + if (!uptime) { + newRegion.uptimePercentage = 0.0; + } else { + newRegion.uptimePercentage = uptime / (uptime + downtime); + } + } + + // Find instance entry or init it + let newInstance = newObj.instances[obj.resolv.resolvedTarget]; + if (!newInstance) { + newInstance = { + price: -1, + latitude: -1, + longitude: -1, + probedFrom: {}, + regions: {}, + livepeer_regions: {}, + }; + } + + // Remove expired stuff + for (const [id, obj] of Object.entries(newInstance.probedFrom)) { + if (now - obj.lastTime > CONF_KEY_EXPIRY) { + newInstance.probedFrom[id] = null; + } + } + for (const [id, obj] of Object.entries(newInstance.regions)) { + if (now - obj.lastTime > CONF_KEY_EXPIRY) { + newInstance.regions[id] = null; + } + } + for (const [id, obj] of Object.entries(newInstance.livepeer_regions)) { + if (now - obj.lastTime > CONF_KEY_EXPIRY) { + newInstance.livepeer_regions[id] = null; + } + } + + // Set last times for instance info + newInstance.probedFrom[tag] = { + lastTime: now, + }; + newInstance.regions[region] = { + lastTime: now, + }; + for (const region of livepeer_regions) { + newInstance.livepeer_regions[region] = { + lastTime: now, + }; + } + + // Set location and price info + if (obj.discovery.price_info) { + newInstance.price = + obj.discovery.price_info.pricePerUnit / + obj.discovery.price_info.pixelsPerUnit; + } + if (obj.resolv.geoLookup) { + newInstance.latitude = obj.resolv.geoLookup.latitude; + newInstance.longitude = obj.resolv.geoLookup.longitude; + } + + // Update name + newObj.name = obj.name; + + // Finished updating + newObj.instances[obj.resolv.resolvedTarget] = newInstance; + newObj.regionalStats[tag] = newRegion; + orchCache[id] = newObj; + + // Update prometheus stats + updatePrometheus(tag, id, obj.resolv.resolvedTarget, newObj); + console.log("Handled results for " + id + " from prober " + tag); +}; + +const updateCache = async function ( + batchResults, + tag, + region, + livepeer_regions +) { + console.log("Parsing stats from " + tag + " (" + region + ")"); + + for (const [id, obj] of Object.entries(batchResults)) { + onOrchUpdate(id, obj, tag, region, livepeer_regions); + } +}; + +/* + +Public endpoints + +*/ + +// Mutate state with new stats +masterRouter.post("/collectStats", async (req, res) => { + try { + const { batchResults, tag, key, region, livepeer_regions } = req.body; + if (!batchResults || !tag || !key || !region || !livepeer_regions) { + console.log("Received malformed data. Aborting stats update..."); + console.log(batchResults, tag, key, region, livepeer_regions); + res.send(false); + return; + } + if (CONF_PRESHARED_MASTER_KEY != req.body.key) { + console.log("Unauthorized"); + res.send(false); + return; + } + updateCache(batchResults, tag, region, livepeer_regions); + res.send(true); + } catch (err) { + console.log(err, req.body); + res.status(400).send(err); + } +}); + +// Retrieve prom data +masterRouter.get("/prometheus", async (req, res) => { + try { + res.set("Content-Type", register.contentType); + const metrics = await register.metrics(); + res.end(metrics); + } catch (err) { + res.status(400).send(err); + } +}); + +// Retrieve cache as JSON +masterRouter.get("/json", async (req, res) => { + try { + res.set("Content-Type", "application/json"); + res.end(JSON.stringify(orchCache)); + } catch (err) { + res.status(400).send(err); + } +}); + +/* + +Main Loop + +*/ + +const updateScore = async function (address) { + console.log("Checking for new scores for " + address); + const lastTime = orchCache[address].leaderboardResults.lastTime; + + let url = + "https://leaderboard-serverless.vercel.app/api/raw_stats?orchestrator=" + + address; + + const json = await fetch(url).then((res) => res.json()); + let hasEdited = false; + for (const [region, results] of Object.entries(json)) { + for (const instance of results) { + if (instance.timestamp > lastTime) { + const newRTR = instance.success_rate; + const newSR = instance.round_trip_time / instance.seg_duration; + let latitude = null; + let longitude = null; + for (const instance of orchCache[address].instances) { + if (instance.livepeer_regions[region]) { + latitude = instance.livepeer_regions[region].latitude; + longitude = instance.livepeer_regions[region].longitude; + } + } + console.log( + "Found new RTR=" + + newRTR + + " and new success rate of " + + newSR * 100 + + "%, livepeer region " + + instance.region + ); + promLatestRTR.set( + { + livepeer_region: instance.region, + orchestrator: address, + latitude: latitude, + longitude: longitude, + }, + newRTR + ); + promLatestSuccessRate.set( + { + livepeer_region: instance.region, + orchestrator: address, + latitude: latitude, + longitude: longitude, + }, + newSR + ); + hasEdited = true; + } + } + } + if (hasEdited) { + orchCache[address].leaderboardResults.lastTime = new Date().getTime(); + } +}; + +const updateOrchScores = async function () { + for (const [id, obj] of Object.entries(orchCache)) { + await updateScore(id); + } +}; + +const runTests = async function () { + try { + const now = new Date().getTime(); + if ( + !lastLeaderboardCheck || + now - lastLeaderboardCheck > CONF_SCORE_TIMEOUT + ) { + await updateOrchScores(); + lastLeaderboardCheck = now; + } + setTimeout(() => { + runTests(); + }, CONF_SLEEPTIME); + return; + } catch (err) { + console.log(err); + setTimeout(() => { + runTests(); + }, CONF_SLEEPTIME); + } +}; +console.log("Starting watcher for test stream results"); +runTests(); + +export default masterRouter; diff --git a/master/src/routes/index.js b/master/src/routes/index.js index 4591eae..8de5a1d 100644 --- a/master/src/routes/index.js +++ b/master/src/routes/index.js @@ -1,2 +1,2 @@ -import masterRouter from './master'; +import masterRouter from './hodler.js'; export { masterRouter }; \ No newline at end of file diff --git a/master/src/routes/master.js b/master/src/routes/master.js deleted file mode 100644 index ce7f2a6..0000000 --- a/master/src/routes/master.js +++ /dev/null @@ -1,309 +0,0 @@ -import express, { response } from "express"; -import { - PRESHARED_MASTER_KEY, CONF_ENS_TIMEOUT -} from "../config"; -const https = require('https'); -const storage = require('node-persist'); -const masterRouter = express.Router(); -const client = require('prom-client'); -const register = new client.Registry(); -const collectDefaultMetrics = client.collectDefaultMetrics; -collectDefaultMetrics({ timeout: 3000 }); -const promLatestLatency = new client.Gauge({ - name: 'orch_latest_latency', - help: 'Latest latency known for a given Orchestrator', - labelNames: ['region', 'orchestrator', 'latitude', 'longitude'] -}); -register.registerMetric(promLatestLatency); -const promLatestPPP = new client.Gauge({ - name: 'orch_latest_ppp', - help: 'Latest price per pixel known for a given Orchestrator', - labelNames: ['region', 'orchestrator', 'latitude', 'longitude'] -}); -register.registerMetric(promLatestPPP); -const promLatency = new client.Summary({ - name: 'orch_latency', - help: 'Summary of latency stats', - percentiles: [0.01, 0.1, 0.9, 0.99], - labelNames: ['region'] -}); -register.registerMetric(promLatency); -const promAverageLatency = new client.Gauge({ - name: 'orch_average_latency', - help: 'Average latency for a given Orchestrator', - labelNames: ['region', 'orchestrator', 'latitude', 'longitude'] -}); -register.registerMetric(promAverageLatency); -const promAUptimeScore = new client.Gauge({ - name: 'orch_uptime_score', - help: 'Uptime score for a given orchestrator', - labelNames: ['region', 'orchestrator', 'latitude', 'longitude'] -}); -register.registerMetric(promAUptimeScore); - -let isSynced = false; - - - -/* - -ENS data from nFrame -[{"domain":null,"address":"0xc3c7c4c8f7061b7d6a72766eee5359fe4f36e61e","timestamp":1659963951567}] - -*/ - - -let ensData = []; -let lastUpdated = 0; - -const updateEns = async function () { - try { - const url = "https://stronk.rocks/api/livepeer/getEnsDomains"; - console.log("Getting new ENS data from " + url); - - https.get(url, (res2) => { - let body = ""; - res2.on("data", (chunk) => { - body += chunk; - }); - res2.on("end", () => { - try { - const data = JSON.parse(body); - for (const newOrchData of data) { - if (!newOrchData) { continue; } - if (!newOrchData.domain) { continue; } - var found = false; - for (var orchIdx = 0; orchIdx < ensData.length; orchIdx++) { - if (ensData[orchIdx].address != newOrchData.address) { continue; } - ensData[orchIdx] = newOrchData; - found = true; - break; - } - if (!found) { - ensData.push(newOrchData); - } - } - storage.setItem('ensData', ensData); - } catch (error) { - console.error(error.message); - }; - }); - }).on("error", (error) => { - console.error(error.message); - }); - } catch (err) { - console.log(err); - } -} - - -/* - -Incoming stats parsing - -*/ - -masterRouter.post("/collectStats", async (req, res) => { - try { - if (!isSynced) { console.log("waiting for sync"); res.end('busy'); return; } - const { id, discoveryResults, responseTime, lookupResults, tag, key } = req.body; - if (!id || !tag || !key) { - console.log("Received malformed data. Aborting stats update..."); - console.log(id, discoveryResults, responseTime, lookupResults, tag, key); - res.send(false); - return; - } - if (PRESHARED_MASTER_KEY != key) { - console.log("Unauthorized"); - res.send(false); - return; - } - let latitude = null; - let longitude = null; - if (lookupResults && lookupResults.geoLookup && lookupResults.geoLookup.ll){ - latitude = lookupResults.geoLookup.ll[0]; - longitude = lookupResults.geoLookup.ll[1]; - } - let thisId = id; - if (responseTime) { - for (const thisEns of ensData) { - if (!thisEns || !thisEns.domain); - if (thisEns.address != thisId) { continue; } - thisId = thisEns.domain; - } - promLatestLatency.set({ region: tag, orchestrator: thisId, latitude: latitude, longitude: longitude }, responseTime); - promLatency.observe({ region: tag }, responseTime); - } - if (discoveryResults && discoveryResults.price_info){ - promLatestPPP.set({ region: tag, orchestrator: thisId, latitude: latitude, longitude: longitude }, discoveryResults.price_info.pricePerUnit / discoveryResults.price_info.pixelsPerUnit); - } - console.log('received data for ' + thisId + ' from ' + tag + ' (' + responseTime + " ms latency)"); - // Save data point - const now = new Date().getTime(); - // Update ENS from nframe if expired - if (now - lastUpdated > CONF_ENS_TIMEOUT) { - await updateEns(); - lastUpdated = now; - } - let thisPing = responseTime; - if (!discoveryResults || !responseTime) { thisPing = null; } - let currentDataList = []; - let orchFound = false; - let regionFound = false; - for (var orchIdx = 0; orchIdx < orchScores.length; orchIdx++) { - if (orchScores[orchIdx].id != thisId) { continue; } - orchFound = true; - for (var regionIdx = 0; regionIdx < orchScores[orchIdx].data.length; regionIdx++) { - if (orchScores[orchIdx].data[regionIdx].tag != tag) { continue; } - regionFound = true; - if (orchScores[orchIdx].data[regionIdx].data.length > 60) { - orchScores[orchIdx].data[regionIdx].data = orchScores[orchIdx].data[regionIdx].data.slice(1); - } - orchScores[orchIdx].data[regionIdx].data.push({ latency: thisPing, timestamp: now }); - currentDataList = orchScores[orchIdx].data[regionIdx].data; - break; - } - if (!regionFound) { - currentDataList = [{ latency: thisPing, timestamp: now }]; - orchScores[orchIdx].data.push({ tag, data: currentDataList }); - } - break; - } - if (!orchFound) { - currentDataList = [{ latency: thisPing, timestamp: now }]; - orchScores.push({ id: thisId, data: [{ tag, data: currentDataList }] }); - } - await storage.setItem('orchScores', orchScores); - // Calc new scores - let prevtime = null; - let uptime = 0; - let downtime = 0; - let pingsum = 0; - let pingpoints = 0; - for (const thisData of currentDataList) { - // Count ping* vars - if (thisData.latency && thisData.latency != null) { - pingsum += thisData.latency; - pingpoints += 1; - } - // Only count *time vars if we have timestamps - if (prevtime && thisData.timestamp) { - if (thisData.latency && thisData.latency != null && thisData.latency < 1000) { - uptime += thisData.timestamp - prevtime; - } else { - downtime += thisData.timestamp - prevtime; - } - } - prevtime = thisData.timestamp; - } - if (pingpoints) { - promAverageLatency.set({ region: tag, orchestrator: thisId, latitude: latitude, longitude: longitude }, pingsum / pingpoints); - } - if (uptime || downtime) { - let score; - if (!uptime) { score = 0; } - else { score = uptime / (uptime + downtime); } - promAUptimeScore.set({ region: tag, orchestrator: thisId, latitude: latitude, longitude: longitude }, score); - } - res.send(true); - } catch (err) { - console.log(err, req.body); - res.status(400).send(err); - } -}); - - -/* - -Public endpoints - -*/ - - -masterRouter.get("/prometheus", async (req, res) => { - try { - res.set('Content-Type', register.contentType); - const metrics = await register.metrics(); - res.end(metrics); - } catch (err) { - res.status(400).send(err); - } -}); - -masterRouter.get("/json", async (req, res) => { - try { - res.set('Content-Type', 'application/json'); - res.end(JSON.stringify(orchScores)); - } catch (err) { - res.status(400).send(err); - } -}); - - -/* - -Recover from storage - -*/ - - -let orchScores; - -const recoverStorage = async function () { - await storage.init({ - stringify: JSON.stringify, - parse: JSON.parse, - encoding: 'utf8', - logging: false, - ttl: false, - forgiveParseErrors: false - }); - ensData = await storage.getItem('ensData'); - if (!ensData) { ensData = []; } - orchScores = await storage.getItem('orchScores'); - if (!orchScores) { orchScores = []; } - // Init prometheus from storage - for (const thisOrch of orchScores) { - console.log("recovering scores for " + thisOrch.id); - for (const thisRegion of thisOrch.data) { - let prevtime = null; - let uptime = 0; - let downtime = 0; - let pingsum = 0; - let pingpoints = 0; - for (const thisData of thisRegion.data) { - // Count ping* vars - if (thisData.latency) { - pingsum += thisData.latency; - pingpoints += 1; - promLatestLatency.set({ region: thisRegion.tag, orchestrator: thisOrch.id }, thisData.latency); - promLatency.observe({ region: thisRegion.tag }, thisData.latency); - } - // Only count *time vars if we have timestamps - if (prevtime && thisData.timestamp) { - if (thisData.latency) { - uptime += thisData.timestamp - prevtime; - } else { - downtime += thisData.timestamp - prevtime; - } - } - prevtime = thisData.timestamp; - } - if (pingpoints) { - promAverageLatency.set({ region: thisRegion.tag, orchestrator: thisOrch.id }, pingsum / pingpoints); - } - if (uptime || downtime) { - const score = uptime / (uptime + downtime) - promAUptimeScore.set({ region: thisRegion.tag, orchestrator: thisOrch.id }, score); - } - } - } - isSynced = true; -} -recoverStorage(); - - - - - -export default masterRouter; diff --git a/master/src/server.js b/master/src/server.js deleted file mode 100644 index 9bf6e37..0000000 --- a/master/src/server.js +++ /dev/null @@ -1,32 +0,0 @@ -import express from 'express'; -import { masterRouter } from './routes/index'; -import { MASTER_PORT } from "./config"; -// Env variable which determines which DB to connect to -const { NODE_ENV: mode } = process.env; - -(async () => { - try { - const app = express(); - app.disable('x-powered-by'); - app.use(express.urlencoded({ extended: true })); - app.use(express.json()); - const apiRouter = express.Router(); - app.use('/api', apiRouter); - apiRouter.use('/master', masterRouter); - // Error handler - app.use(function (err, req, res, next) { - res.locals.message = err.message; - // Also log it to the console - console.log(`${err.status || 500} - ${err.message} - ${req.originalUrl} - ${req.method} - ${req.ip}`); - // Render the error page - res.status(err.status || 500); - res.render('error'); - }); - - app.listen(MASTER_PORT, "0.0.0.0", function () { - console.log(`Listening on port ${MASTER_PORT}`); - }); - } catch (err) { - console.log(err); - } -})(); \ No newline at end of file