mirror of
https://github.com/stronk-dev/OrchestratorTracker.git
synced 2025-07-04 18:35:09 +02:00
Refactor
This commit is contained in:
parent
abd90259ce
commit
6e2cdbbafa
4
.gitignore
vendored
4
.gitignore
vendored
@ -1,3 +1,3 @@
|
||||
master/*.md
|
||||
client/*.md
|
||||
master/.node-persist/storage/*
|
||||
notes.md
|
||||
genSignature/*
|
10
README.md
10
README.md
@ -1,11 +1,11 @@
|
||||
This program consists of three components:
|
||||
- A master which consolidates orchestrator statistics from various clients
|
||||
- Converts raw results of grpc calls into prometheus data points
|
||||
- Publishes GetOrchestrator responses and discovery time
|
||||
- Publishes orchestrator price info
|
||||
- Observes Livepeer Inc performance leaderboards results to Prometheus
|
||||
- TODO integration with email / telegram
|
||||
- TODO api to subscribe for notifications and cancel it
|
||||
- TODO read test stream scores
|
||||
- TODO allow custom names (configure via frontend, ENS done)
|
||||
- A client which pushes stats to master
|
||||
- Gets the list of active Orchestrators from the Livepeer subgraph
|
||||
- Does GetOrchestrator calls to all Orchestrators one by one
|
||||
- A frontend where users can (un)subscribe to notifications
|
||||
- Test each of them and upload results
|
||||
- TODO A frontend where users can (un)subscribe to notifications
|
3
client/.gitignore
vendored
3
client/.gitignore
vendored
@ -1,4 +1,3 @@
|
||||
/node_modules
|
||||
/src/config.js
|
||||
genSig.js
|
||||
src/config.js
|
||||
package-lock.json
|
@ -3,20 +3,28 @@ Requires a Broadcaster with some reserve amount set. TODO
|
||||
### Config variables
|
||||
```
|
||||
{
|
||||
CLIENT_PORT: what port the client accepts API requests on
|
||||
MASTER_PORT: what port the master should be listening on
|
||||
MASTER_DOMAIN: the domain on which the master is hosted
|
||||
MASTER_PATH: the path to the collectStats endpoint. Should be `/api/master/collectStats` unless you are running a reverse proxy
|
||||
FRIENDLY_NAME: `region` label which gets attached to collected data
|
||||
PRESHARED_MASTER_KEY: must be the same as the `PRESHARED_MASTER_KEY` on the master
|
||||
CONF_SLEEPTIME: time between testing Orchestrators in milliseconds
|
||||
CONF_ORCHINFO_TIMEOUT: timeout for refreshing the list of active orchestrators in milliseconds
|
||||
CONF_BROADCASTER: eth address of the broadcaster
|
||||
CONT_SIG = broadcasters' signature of their eth address - reach out on how to get this
|
||||
CONF_MASTER_PORT: Port to upload results to
|
||||
CONF_MASTER_DOMAIN: Domain to upload results to
|
||||
CONF_MASTER_PATH: Path to the upload endpoint in case there's a reverse proxy
|
||||
CONF_FRIENDLY_NAME: Name to attach to the prober, the city name usually works
|
||||
CONF_REGION: Continent the prober is located in
|
||||
CONF_LP_REGIONS: Livepeer leaderboard regions which map to locally resolved orchestrators
|
||||
CONF_CONCURRENCY: Amount of orchestrators to test at the same time
|
||||
CONF_SLEEPTIME: Time to sleep in the main loop
|
||||
CONF_MAX_LATENCY: Max GetOrch discovery time before we set it to 0
|
||||
CONF_ROUNDTIME: Minimum amount of time between batch-checks
|
||||
CONF_ORCHINFO_TIMEOUT: Timeout between refreshing the active O list
|
||||
CONF_BROADCASTER: Public address of the broadcaster to mimic
|
||||
CONF_DNS_TIMEOUT: Timeout between DNS & GEO resolving for orchestrator instances
|
||||
CONF_PRESHARED_MASTER_KEY: password for uploading to the hodler
|
||||
CONF_SIGNATURE: Broadcaster signature over a message containing it's own public address
|
||||
}
|
||||
```
|
||||
|
||||
Obtaining the `CONF_SIGNATURE` is unfortunately a pain in the ass. The only way for now it add a print statement to the `livepeer/server/rpc.go/genOrchestratorReq` function in `go-livepeer` and then run the Broadcaster. All web3 libraries prepend a prefix to any message they sign for security purposes, so a script which can generate the signature from the private key would be nice to have here...
|
||||
|
||||
### Run production
|
||||
Note: this folder has to be placed in `/orchTest/client`
|
||||
Note: this folder has to be placed in `/orchTest/client`, or edit `ecosystem.config.js` to match the new location
|
||||
|
||||
npm install
|
||||
nano src/config.js
|
||||
@ -24,17 +32,21 @@ Note: this folder has to be placed in `/orchTest/client`
|
||||
example `config.js`:
|
||||
```
|
||||
export const {
|
||||
CLIENT_PORT = 42068,
|
||||
MASTER_PORT = 443,
|
||||
MASTER_DOMAIN = "nframe.nl",
|
||||
MASTER_PATH = "/orch/collectStats",
|
||||
FRIENDLY_NAME = "Chicago",
|
||||
PRESHARED_MASTER_KEY = "koekjes",
|
||||
CONF_SLEEPTIME = 2000,
|
||||
CONF_ORCHINFO_TIMEOUT = 600000,
|
||||
CONF_MASTER_PORT = 443,
|
||||
CONF_MASTER_DOMAIN = "stronk.rocks",
|
||||
CONF_MASTER_PATH = "/orch/collectStats",
|
||||
CONF_FRIENDLY_NAME = "Michigan",
|
||||
CONF_REGION = "Europe",
|
||||
CONF_LP_REGIONS = ["FRA", "LON", "PRA"],
|
||||
CONF_CONCURRENCY = 6,
|
||||
CONF_SLEEPTIME = 2000, //< 2 seconds
|
||||
CONF_MAX_LATENCY = 2000, //< 2 seconds
|
||||
CONF_ROUNDTIME = 60000, //< 1 minute
|
||||
CONF_ORCHINFO_TIMEOUT = 14400000, //< 4 hours
|
||||
CONF_BROADCASTER = "847791cBF03be716A7fe9Dc8c9Affe17Bd49Ae5e",
|
||||
CONT_SIG = Buffer [array of uint8]
|
||||
|
||||
CONF_DNS_TIMEOUT = 600000, //< 10 minutes
|
||||
CONF_PRESHARED_MASTER_KEY = "koekjes",
|
||||
CONF_SIGNATURE = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
|
||||
} = process.env;
|
||||
|
||||
```
|
||||
@ -49,16 +61,21 @@ export const {
|
||||
example `config.js`:
|
||||
```
|
||||
export const {
|
||||
CLIENT_PORT = 42068,
|
||||
MASTER_PORT = 42069,
|
||||
MASTER_DOMAIN = "127.0.0.1",
|
||||
MASTER_PATH = "/api/master/collectStats",
|
||||
FRIENDLY_NAME = "Leiden",
|
||||
PRESHARED_MASTER_KEY = "koekjes",
|
||||
CONF_SLEEPTIME = 5000,
|
||||
CONF_ORCHINFO_TIMEOUT = 100000,
|
||||
CONF_MASTER_PORT = 42069,
|
||||
CONF_MASTER_DOMAIN = "127.0.0.1",
|
||||
CONF_MASTER_PATH = "/api/master/collectStats",
|
||||
CONF_FRIENDLY_NAME = "Leiden",
|
||||
CONF_REGION = "Europe",
|
||||
CONF_LP_REGIONS = ["FRA", "LON", "PRA"],
|
||||
CONF_CONCURRENCY = 6,
|
||||
CONF_SLEEPTIME = 2000, //< 2 seconds
|
||||
CONF_MAX_LATENCY = 2000, //< 2 seconds
|
||||
CONF_ROUNDTIME = 60000, //< 1 minute
|
||||
CONF_ORCHINFO_TIMEOUT = 14400000, //< 4 hours
|
||||
CONF_BROADCASTER = "847791cBF03be716A7fe9Dc8c9Affe17Bd49Ae5e",
|
||||
CONT_SIG = Buffer [array of uint8]
|
||||
CONF_DNS_TIMEOUT = 600000, //< 10 minutes
|
||||
CONF_PRESHARED_MASTER_KEY = "koekjes",
|
||||
CONF_SIGNATURE = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
|
||||
} = process.env;
|
||||
```
|
||||
|
||||
|
@ -1,8 +1,8 @@
|
||||
module.exports = {
|
||||
apps: [
|
||||
{
|
||||
name: "orchProber",
|
||||
script: "./src/orchProber.js",
|
||||
name: "client-orchestrator-prober",
|
||||
script: "./src/client.js",
|
||||
cwd: "/orchTest/client",
|
||||
env_production: {
|
||||
NODE_ENV: "production"
|
||||
|
@ -1,14 +1,15 @@
|
||||
{
|
||||
"name": "orchProber",
|
||||
"version": "0.0.1",
|
||||
"name": "client-orchestrator-prober",
|
||||
"version": "0.1.0",
|
||||
"description": "",
|
||||
"main": "./src/orchProber.js",
|
||||
"module": "./src/orchProber.js",
|
||||
"main": "./src/client.js",
|
||||
"type": "module",
|
||||
"module": "./src/client.js",
|
||||
"scripts": {
|
||||
"prod": "NODE_ENV=production pm2 start ecosystem.config.js",
|
||||
"start": "NODE_ENV=production node ./src/orchProber.js",
|
||||
"dev": "NODE_ENV=development nodemon ./src/orchProber.js",
|
||||
"local": "NODE_ENV=local nodemon ./src/orchProber.js"
|
||||
"start": "NODE_ENV=production node ./src/client.js",
|
||||
"dev": "NODE_ENV=development nodemon ./src/client.js",
|
||||
"local": "NODE_ENV=local nodemon ./src/client.js"
|
||||
},
|
||||
"keywords": [],
|
||||
"author": "Marco van Dijk",
|
||||
@ -16,13 +17,9 @@
|
||||
"dependencies": {
|
||||
"@grpc/grpc-js": "^1.6.8",
|
||||
"@grpc/proto-loader": "^0.7.0",
|
||||
"esm": "^3.2.20",
|
||||
"express": "^4.17.1",
|
||||
"geoip-lite": "^1.4.8",
|
||||
"graphql": "^16.5.0",
|
||||
"graphql-request": "^4.3.0",
|
||||
"npm": "^8.5.2",
|
||||
"web3": "^1.7.5"
|
||||
"graphql-request": "^4.3.0"
|
||||
},
|
||||
"devDependencies": {
|
||||
"nodemon": "^1.18.10"
|
||||
|
@ -1,33 +1,7 @@
|
||||
import express from 'express';
|
||||
import { orchTesterRouter } from './routes/index';
|
||||
import { CLIENT_PORT } from "./config";
|
||||
// Env variable which determines which DB to connect to
|
||||
const { NODE_ENV: mode } = process.env;
|
||||
import { runTests } from "./util/orchTester.js";
|
||||
|
||||
(async () => {
|
||||
try {
|
||||
const app = express();
|
||||
app.disable('x-powered-by');
|
||||
app.use(express.urlencoded({ extended: true }));
|
||||
app.use(express.json());
|
||||
|
||||
const apiRouter = express.Router();
|
||||
app.use('/api', apiRouter);
|
||||
apiRouter.use('/tester', orchTesterRouter);
|
||||
// Error handler
|
||||
app.use(function (err, req, res, next) {
|
||||
res.locals.message = err.message;
|
||||
// Also log it to the console
|
||||
console.log(`${err.status || 500} - ${err.message} - ${req.originalUrl} - ${req.method} - ${req.ip}`);
|
||||
// Render the error page
|
||||
res.status(err.status || 500);
|
||||
res.render('error');
|
||||
});
|
||||
|
||||
app.listen(CLIENT_PORT, "0.0.0.0", function () {
|
||||
console.log(`Listening on port ${CLIENT_PORT}`);
|
||||
});
|
||||
} catch (err) {
|
||||
console.log(err);
|
||||
}
|
||||
})();
|
||||
// On first boot, kickstart the test loop
|
||||
console.log("Starting main loop...");
|
||||
runTests();
|
||||
})();
|
||||
|
@ -1,2 +0,0 @@
|
||||
require = require("esm")(module)
|
||||
module.exports = require("./client.js")
|
@ -1,2 +0,0 @@
|
||||
import orchTesterRouter from './orchTester';
|
||||
export { orchTesterRouter };
|
@ -1,22 +1,28 @@
|
||||
import express from "express";
|
||||
const dns = require("dns");
|
||||
var geoip = require("geoip-lite");
|
||||
import dns from "dns";
|
||||
import geoip from "geoip-lite";
|
||||
import { request, gql } from "graphql-request";
|
||||
import https from "https";
|
||||
import http from "http";
|
||||
import grpc from "@grpc/grpc-js";
|
||||
import protoLoader from "@grpc/proto-loader";
|
||||
|
||||
const orchTesterRouter = express.Router();
|
||||
import {
|
||||
MASTER_DOMAIN,
|
||||
MASTER_PORT,
|
||||
MASTER_PATH,
|
||||
FRIENDLY_NAME,
|
||||
PRESHARED_MASTER_KEY,
|
||||
CONF_MASTER_PORT,
|
||||
CONF_MASTER_DOMAIN,
|
||||
CONF_MASTER_PATH,
|
||||
CONF_FRIENDLY_NAME,
|
||||
CONF_REGION,
|
||||
CONF_LP_REGIONS,
|
||||
CONF_MAX_LATENCY,
|
||||
CONF_ROUNDTIME,
|
||||
CONF_CONCURRENCY,
|
||||
CONF_SLEEPTIME,
|
||||
CONT_SIG,
|
||||
CONF_ORCHINFO_TIMEOUT,
|
||||
CONF_BROADCASTER,
|
||||
CONF_DNS_TIMEOUT,
|
||||
} from "../config";
|
||||
|
||||
process.env.NODE_TLS_REJECT_UNAUTHORIZED = "0";
|
||||
CONF_PRESHARED_MASTER_KEY,
|
||||
CONF_SIGNATURE,
|
||||
} from "../config.js";
|
||||
|
||||
/*
|
||||
|
||||
@ -25,11 +31,6 @@ imported modules
|
||||
|
||||
*/
|
||||
|
||||
import { request, gql } from "graphql-request";
|
||||
const https = require("https");
|
||||
const http = require("http");
|
||||
var grpc = require("@grpc/grpc-js");
|
||||
var protoLoader = require("@grpc/proto-loader");
|
||||
var packageDefinition = protoLoader.loadSync("src/proto/livepeer.proto", {
|
||||
keepCase: true,
|
||||
longs: String,
|
||||
@ -41,6 +42,23 @@ var livepeerProto = grpc.loadPackageDefinition(packageDefinition).net;
|
||||
const ssl_creds = grpc.credentials.createSsl(null, null, null, {
|
||||
checkServerIdentity: () => undefined,
|
||||
});
|
||||
// Since go-livepeer uses self-signed certificates or something
|
||||
process.env.NODE_TLS_REJECT_UNAUTHORIZED = "0";
|
||||
|
||||
/*
|
||||
|
||||
Global variables
|
||||
|
||||
*/
|
||||
|
||||
var activeOrchestrators = []; //< List of all active orchs pulled from the graph
|
||||
var orchestratorsLastUpdated = 0; //< Used to refresh above list periodically
|
||||
var lastRoundStart = 0;
|
||||
let orchDNS = {}; //< Caches DNS and GEO lookups
|
||||
let currentPool = []; //< Current working set of Orchestrators to test
|
||||
let cycle = 0; //< Rounds of testing the script has done
|
||||
let batchResults = {}; //< Current cache of results to batch upload
|
||||
let currentPending = 0; //< Current outstanding requests
|
||||
|
||||
/*
|
||||
|
||||
@ -68,19 +86,213 @@ function shuffle(array) {
|
||||
return array;
|
||||
}
|
||||
|
||||
function hexToBytes(hex) {
|
||||
for (var bytes = [], c = 0; c < hex.length; c += 2)
|
||||
bytes.push(parseInt(hex.substr(c, 2), 16));
|
||||
return bytes;
|
||||
}
|
||||
|
||||
/*
|
||||
|
||||
Doing grpc calls to an orchestrator and uploading to the hodler
|
||||
|
||||
*/
|
||||
|
||||
const batchPostStats = async function () {
|
||||
var postData = JSON.stringify({
|
||||
batchResults: batchResults,
|
||||
tag: CONF_FRIENDLY_NAME,
|
||||
key: CONF_PRESHARED_MASTER_KEY,
|
||||
region: CONF_REGION,
|
||||
livepeer_regions: CONF_LP_REGIONS,
|
||||
});
|
||||
var options = {
|
||||
hostname: CONF_MASTER_DOMAIN,
|
||||
port: CONF_MASTER_PORT,
|
||||
path: CONF_MASTER_PATH,
|
||||
method: "POST",
|
||||
headers: {
|
||||
"Content-Type": "application/json",
|
||||
"Content-Length": postData.length,
|
||||
},
|
||||
};
|
||||
console.log("Uploading stats of " + postData.length + "B");
|
||||
var req;
|
||||
if (CONF_MASTER_DOMAIN == "127.0.0.1" || CONF_MASTER_DOMAIN == "localhost") {
|
||||
req = http.request(options, (res) => {
|
||||
res.on("data", (d) => {
|
||||
process.stdout.write(
|
||||
"Received response " + d + " from " + CONF_MASTER_DOMAIN
|
||||
);
|
||||
});
|
||||
});
|
||||
} else {
|
||||
req = https.request(options, (res) => {
|
||||
res.on("data", (d) => {
|
||||
process.stdout.write(
|
||||
"Received response " + d + " from " + CONF_MASTER_DOMAIN
|
||||
);
|
||||
});
|
||||
});
|
||||
}
|
||||
req.on("error", (e) => {
|
||||
console.error("err", e);
|
||||
});
|
||||
req.write(postData);
|
||||
req.end();
|
||||
};
|
||||
|
||||
const discoverOrchestrator = async function (target) {
|
||||
if (!target) {
|
||||
return;
|
||||
}
|
||||
// Try to prevent the connection from being reused
|
||||
var client = new livepeerProto.Orchestrator(target, ssl_creds, {
|
||||
GRPC_ARG_DEFAULT_AUTHORITY: Math.random().toString(36).substr(2, 5),
|
||||
});
|
||||
var receivedResults = false;
|
||||
var orchestratorInfo = {};
|
||||
const start = new Date().getTime();
|
||||
var elapsed = 0;
|
||||
await client.GetOrchestrator(
|
||||
{
|
||||
address: hexToBytes(CONF_BROADCASTER),
|
||||
sig: CONF_SIGNATURE,
|
||||
},
|
||||
function (err, res) {
|
||||
if (err) {
|
||||
console.log("Discovery error: ", err.details);
|
||||
orchestratorInfo.err = err.details;
|
||||
} else {
|
||||
orchestratorInfo = res;
|
||||
elapsed = new Date().getTime() - start;
|
||||
}
|
||||
receivedResults = true;
|
||||
}
|
||||
);
|
||||
// Wait for a max of 4 seconds for a callback from the GRPC call
|
||||
while (!receivedResults && new Date().getTime() - start < CONF_MAX_LATENCY) {
|
||||
await sleep(20);
|
||||
}
|
||||
grpc.closeClient(client);
|
||||
if (!orchestratorInfo) {
|
||||
return {
|
||||
discoveryResults: {
|
||||
transcoder: null,
|
||||
price_info: null,
|
||||
latency: 0,
|
||||
err: "Took too long to respond. Aborted test...",
|
||||
},
|
||||
};
|
||||
}
|
||||
return {
|
||||
discoveryResults: {
|
||||
transcoder: orchestratorInfo.transcoder,
|
||||
price_info: orchestratorInfo.price_info,
|
||||
latency: elapsed,
|
||||
err: orchestratorInfo.err,
|
||||
},
|
||||
};
|
||||
};
|
||||
|
||||
// Resolve hostname to IP
|
||||
async function getIP(hostname) {
|
||||
let obj = await dns.promises.lookup(hostname).catch((error) => {
|
||||
console.error(error);
|
||||
});
|
||||
if (obj) {
|
||||
return obj.address;
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
const testOrchestrator = async function (id, target) {
|
||||
if (!id.length || !target.length) {
|
||||
return;
|
||||
}
|
||||
const origTarget = new URL(target);
|
||||
target = target.replace(/^https?:\/\//, "");
|
||||
console.log("Testing orchestrator " + target);
|
||||
// Resolve DNS and GEO
|
||||
const now = new Date().getTime();
|
||||
if (!orchDNS[id] || now - orchDNS[id].lastTime > CONF_DNS_TIMEOUT) {
|
||||
const resolved = await getIP(origTarget.hostname);
|
||||
const geo = geoip.lookup(resolved);
|
||||
let geoObj = null;
|
||||
if (geo) {
|
||||
geoObj = {
|
||||
// country: geo.country, //< commented out these fields
|
||||
// region: geo.region, // since they're causing issues with JSON.stringify()
|
||||
// city: geo.city, // prob due to special characters
|
||||
latitude: geo.ll[0],
|
||||
longitude: geo.ll[1],
|
||||
};
|
||||
}
|
||||
orchDNS[id] = {
|
||||
originalTarget: origTarget.origin,
|
||||
resolvedTarget: resolved,
|
||||
geoLookup: geoObj,
|
||||
geoFrom: CONF_FRIENDLY_NAME,
|
||||
lastTime: now,
|
||||
};
|
||||
console.log("Updated DNS and GeoIP data for " + id);
|
||||
}
|
||||
// Test orch
|
||||
const { discoveryResults } = await discoverOrchestrator(target);
|
||||
if (
|
||||
discoveryResults &&
|
||||
discoveryResults.err == "insufficient sender reserve"
|
||||
) {
|
||||
console.log("Ignoring " + id + " due to insufficient sender reserve");
|
||||
return;
|
||||
}
|
||||
// Cache results
|
||||
batchResults[id] = {
|
||||
name: id,
|
||||
discovery: discoveryResults,
|
||||
resolv: orchDNS[id],
|
||||
};
|
||||
if (discoveryResults.err) {
|
||||
batchResults[id].discovery.latency = 0;
|
||||
}
|
||||
currentPending--;
|
||||
};
|
||||
|
||||
/*
|
||||
|
||||
Batch test logic
|
||||
|
||||
*/
|
||||
|
||||
const batchTestOrchs = async function () {
|
||||
// Clear buff
|
||||
batchResults = {};
|
||||
currentPending = 0;
|
||||
// Keep going until we've got no more todo or pending
|
||||
while (currentPool.length || currentPending) {
|
||||
// Concurrent requests
|
||||
while (currentPending < CONF_CONCURRENCY && currentPool.length) {
|
||||
let currentOrch = currentPool.splice(0, 1)[0];
|
||||
if (!currentOrch.id || !currentOrch.target) {
|
||||
console.log("Skipping Orchestrator with malformed data: ", currentOrch);
|
||||
continue;
|
||||
}
|
||||
currentPending++;
|
||||
testOrchestrator(currentOrch.id, currentOrch.target);
|
||||
}
|
||||
await sleep(50);
|
||||
}
|
||||
batchPostStats();
|
||||
};
|
||||
|
||||
/*
|
||||
|
||||
Refreshing active orchestrators
|
||||
Pulls this data from the Livepeer subgraph (https://api.thegraph.com/subgraphs/name/livepeer/arbitrum-one/graphql)
|
||||
We might want to switch to reading directly from the blockchain
|
||||
but this would require constant watching for uri updates which is a pain to implement
|
||||
|
||||
*/
|
||||
|
||||
var activeOrchestrators = [];
|
||||
let lastUpdated = 0;
|
||||
let orchDNS = {};
|
||||
|
||||
/// Does a GQL query to the subgraph for orchestrator data
|
||||
const getOrchestrators = async function () {
|
||||
console.log("Getting orchestrator data from the subgraph...");
|
||||
@ -116,190 +328,16 @@ const getOrchestrators = async function () {
|
||||
const refreshOrchCache = async function () {
|
||||
const now = new Date().getTime();
|
||||
// Update cmc once their data has expired
|
||||
if (now - lastUpdated > CONF_ORCHINFO_TIMEOUT) {
|
||||
if (now - orchestratorsLastUpdated > CONF_ORCHINFO_TIMEOUT) {
|
||||
const data = await getOrchestrators();
|
||||
if (data) {
|
||||
activeOrchestrators = data;
|
||||
lastUpdated = now;
|
||||
orchestratorsLastUpdated = now;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
/*
|
||||
|
||||
Doing grpc calls to an orchestrator
|
||||
|
||||
*/
|
||||
|
||||
let currentPool = [];
|
||||
|
||||
const postStatistics = async function (
|
||||
id,
|
||||
discoveryResults,
|
||||
lookupResults,
|
||||
responseTime
|
||||
) {
|
||||
console.log("Posting stats for " + id + " (ping " + responseTime + " ms)");
|
||||
|
||||
// TODO look at response and log error?
|
||||
var postData = JSON.stringify({
|
||||
id,
|
||||
discoveryResults,
|
||||
responseTime,
|
||||
lookupResults,
|
||||
tag: FRIENDLY_NAME,
|
||||
key: PRESHARED_MASTER_KEY,
|
||||
});
|
||||
var options = {
|
||||
hostname: MASTER_DOMAIN,
|
||||
port: MASTER_PORT,
|
||||
path: MASTER_PATH,
|
||||
method: "POST",
|
||||
headers: {
|
||||
"Content-Type": "application/json",
|
||||
"Content-Length": postData.length,
|
||||
},
|
||||
};
|
||||
var req;
|
||||
if (MASTER_DOMAIN == "127.0.0.1" || MASTER_DOMAIN == "localhost") {
|
||||
req = http.request(options, (res) => {
|
||||
// console.log('statusCode:', res.statusCode);
|
||||
// console.log('headers:', res.headers);
|
||||
|
||||
res.on("data", (d) => {
|
||||
process.stdout.write(
|
||||
"Received response " + d + " from " + MASTER_DOMAIN
|
||||
);
|
||||
});
|
||||
});
|
||||
} else {
|
||||
req = https.request(options, (res) => {
|
||||
// console.log('statusCode:', res.statusCode);
|
||||
// console.log('headers:', res.headers);
|
||||
|
||||
res.on("data", (d) => {
|
||||
process.stdout.write(
|
||||
"Received response " + d + " from " + MASTER_DOMAIN
|
||||
);
|
||||
});
|
||||
});
|
||||
}
|
||||
req.on("error", (e) => {
|
||||
console.error("err", e);
|
||||
});
|
||||
req.write(postData);
|
||||
req.end();
|
||||
};
|
||||
|
||||
function hexToBytes(hex) {
|
||||
for (var bytes = [], c = 0; c < hex.length; c += 2)
|
||||
bytes.push(parseInt(hex.substr(c, 2), 16));
|
||||
return bytes;
|
||||
}
|
||||
|
||||
const discoverOrchestrator = async function (target) {
|
||||
if (!target) {
|
||||
return;
|
||||
}
|
||||
var client = new livepeerProto.Orchestrator(target, ssl_creds, {
|
||||
GRPC_ARG_DEFAULT_AUTHORITY: Math.random().toString(36).substr(2, 5),
|
||||
});
|
||||
var receivedResults = false;
|
||||
var orchestratorInfo;
|
||||
const start = new Date().getTime();
|
||||
var elapsed = null;
|
||||
await client.GetOrchestrator(
|
||||
{
|
||||
address: hexToBytes(CONF_BROADCASTER),
|
||||
sig: CONT_SIG,
|
||||
},
|
||||
function (err, res) {
|
||||
if (err) {
|
||||
console.log("Discovery error: ", err.details);
|
||||
orchestratorInfo = err.details;
|
||||
elapsed = null;
|
||||
} else {
|
||||
orchestratorInfo = res;
|
||||
elapsed = new Date().getTime() - start;
|
||||
}
|
||||
receivedResults = true;
|
||||
}
|
||||
);
|
||||
while (!receivedResults && new Date().getTime() - start < 4000) {
|
||||
await sleep(20);
|
||||
}
|
||||
grpc.closeClient(client);
|
||||
return { discoveryResults: orchestratorInfo, elapsed };
|
||||
};
|
||||
|
||||
const pingOrchestrator = async function (target) {
|
||||
if (!target) {
|
||||
return;
|
||||
}
|
||||
var client = new livepeerProto.Orchestrator(target, ssl_creds);
|
||||
var receivedResults = false;
|
||||
var pingPong;
|
||||
const start = new Date().getTime();
|
||||
var elapsed = null;
|
||||
await client.Ping({ value: "koekjes" }, function (err, res) {
|
||||
if (err) {
|
||||
console.log("Ping err: ", err.details);
|
||||
pingPong = err.details;
|
||||
elapsed = null;
|
||||
} else {
|
||||
pingPong = res;
|
||||
elapsed = new Date().getTime() - start;
|
||||
}
|
||||
receivedResults = true;
|
||||
});
|
||||
while (!receivedResults && new Date().getTime() - start < 4000) {
|
||||
await sleep(20);
|
||||
}
|
||||
return { pingResults: pingPong, elapsed };
|
||||
};
|
||||
|
||||
async function getIP(hostname) {
|
||||
let obj = await dns.promises.lookup(hostname).catch((error) => {
|
||||
console.error(error);
|
||||
});
|
||||
if (obj) {
|
||||
return obj.address;
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
const testOrchestrator = async function (id, target) {
|
||||
if (!id.length || !target.length) {
|
||||
return;
|
||||
}
|
||||
const origTarget = new URL(target);
|
||||
target = target.replace(/^https?:\/\//, "");
|
||||
console.log("Target is " + target);
|
||||
// Resolve DNS
|
||||
const now = new Date().getTime();
|
||||
if (!orchDNS[id] || now - orchDNS[id].lastTime > CONF_DNS_TIMEOUT) {
|
||||
const resolved = await getIP(origTarget.hostname);
|
||||
orchDNS[id] = {
|
||||
originalTarget: origTarget.origin,
|
||||
resolvedTarget: resolved,
|
||||
geoLookup: geoip.lookup(resolved),
|
||||
geoFrom: FRIENDLY_NAME,
|
||||
lastTime: now,
|
||||
};
|
||||
console.log("Updated DNS and GeoIP data: ", orchDNS[id]);
|
||||
}
|
||||
// Test orch
|
||||
const { discoveryResults, elapsed } = await discoverOrchestrator(target);
|
||||
if (discoveryResults && discoveryResults == "insufficient sender reserve") {
|
||||
console.log(
|
||||
"Ignoring " + id + " for stats due to insufficient sender reserve"
|
||||
);
|
||||
return;
|
||||
}
|
||||
await postStatistics(id, discoveryResults, orchDNS[id], elapsed);
|
||||
};
|
||||
|
||||
// Creates a new working set of orchs to test
|
||||
const refreshPool = function () {
|
||||
currentPool = [];
|
||||
for (const thisObj of activeOrchestrators) {
|
||||
@ -308,51 +346,36 @@ const refreshPool = function () {
|
||||
shuffle(currentPool);
|
||||
};
|
||||
|
||||
const pingNextOrch = async function () {
|
||||
if (!currentPool.length) {
|
||||
refreshPool();
|
||||
}
|
||||
let currentOrch = currentPool.splice(0, 1)[0];
|
||||
if (!currentOrch.id || !currentOrch.target) {
|
||||
console.log("Skipping Orchestrator with malformed data: ", currentOrch);
|
||||
return;
|
||||
}
|
||||
await testOrchestrator(currentOrch.id, currentOrch.target);
|
||||
};
|
||||
|
||||
/*
|
||||
|
||||
Main Loop
|
||||
Maybe we shouldn't use nodejs as a client...
|
||||
|
||||
*/
|
||||
|
||||
let cycle = 0;
|
||||
let isSyncing = false;
|
||||
// Does the actual looping over last parsed block -> latest block in chain
|
||||
const handleSync = async function () {
|
||||
const runTests = async function () {
|
||||
try {
|
||||
cycle++;
|
||||
console.log("Starting new cycle #" + cycle);
|
||||
isSyncing = true;
|
||||
await refreshOrchCache();
|
||||
await pingNextOrch();
|
||||
isSyncing = false;
|
||||
const now = new Date().getTime();
|
||||
if (!lastRoundStart || now - lastRoundStart > CONF_ROUNDTIME) {
|
||||
cycle++;
|
||||
console.log("Starting new cycle #" + cycle);
|
||||
// If stale, retrieve new set of active orchestrators
|
||||
await refreshOrchCache();
|
||||
// Create a new shuffled working set of orchs to test
|
||||
refreshPool();
|
||||
// Test all orchs in working set
|
||||
await batchTestOrchs();
|
||||
lastRoundStart = now;
|
||||
}
|
||||
setTimeout(() => {
|
||||
handleSync();
|
||||
runTests();
|
||||
}, CONF_SLEEPTIME);
|
||||
return;
|
||||
} catch (err) {
|
||||
console.log(err);
|
||||
isSyncing = false;
|
||||
setTimeout(() => {
|
||||
handleSync();
|
||||
runTests();
|
||||
}, CONF_SLEEPTIME);
|
||||
}
|
||||
};
|
||||
if (!isSyncing) {
|
||||
console.log("Starting main loop");
|
||||
handleSync();
|
||||
}
|
||||
|
||||
export default orchTesterRouter;
|
||||
export { runTests };
|
@ -2,9 +2,14 @@
|
||||
### Config variables
|
||||
```
|
||||
{
|
||||
MASTER_PORT: what port the master is listening on
|
||||
PRESHARED_MASTER_KEY: clients must provide this value when uploading stats,
|
||||
CONF_ENS_TIMEOUT: timeout in ms after which it will pull new ENS domain names from nFrame
|
||||
CONF_API_L1_HTTP: HTTP Uri to L1 RPC provider
|
||||
CONF_API_L1_KEY: and it's api key,
|
||||
CONF_TIMEOUT_ENS_DOMAIN: timeout before refreshing an orchs ENS domain name
|
||||
CONF_KEY_EXPIRY: timeout before clearing certain orch instance data
|
||||
CONF_MASTER_PORT: port to accept uploads on from probers
|
||||
CONF_PRESHARED_MASTER_KEY: password for uploading
|
||||
CONF_SCORE_TIMEOUT: timeout for checking if some new test stream results came in
|
||||
CONF_SLEEPTIME: Time to sleep in the main loop
|
||||
}
|
||||
```
|
||||
### Run production
|
||||
@ -16,9 +21,14 @@ Note: this folder has to be placed in `/orchTest/master`
|
||||
example `config.js`:
|
||||
```
|
||||
export const {
|
||||
MASTER_PORT = 42069,
|
||||
PRESHARED_MASTER_KEY = "koekjes",
|
||||
CONF_ENS_TIMEOUT = 360000
|
||||
CONF_API_L1_HTTP = "https://eth-mainnet.alchemyapi.io/v2/",
|
||||
CONF_API_L1_KEY = "koekjes",
|
||||
CONF_TIMEOUT_ENS_DOMAIN = 7200000, //< 2 hours
|
||||
CONF_KEY_EXPIRY = 3600000, //< 1 hour
|
||||
CONF_MASTER_PORT = 42069,
|
||||
CONF_PRESHARED_MASTER_KEY = "koekjes",
|
||||
CONF_SCORE_TIMEOUT = 300000, //< 5 minutes
|
||||
CONF_SLEEPTIME = 2000, //< 2 seconds
|
||||
} = process.env;
|
||||
```
|
||||
|
||||
@ -32,11 +42,17 @@ export const {
|
||||
example `config.js`:
|
||||
```
|
||||
export const {
|
||||
MASTER_PORT = 42069,
|
||||
PRESHARED_MASTER_KEY = "koekjes",
|
||||
CONF_ENS_TIMEOUT = 360000
|
||||
CONF_API_L1_HTTP = "https://eth-mainnet.alchemyapi.io/v2/",
|
||||
CONF_API_L1_KEY = "koekjes",
|
||||
CONF_TIMEOUT_ENS_DOMAIN = 7200000, //< 2 hours
|
||||
CONF_KEY_EXPIRY = 3600000, //< 1 hour
|
||||
CONF_MASTER_PORT = 42069,
|
||||
CONF_PRESHARED_MASTER_KEY = "koekjes",
|
||||
CONF_SCORE_TIMEOUT = 60000, //< 1 minute
|
||||
CONF_SLEEPTIME = 2000, //< 2 seconds
|
||||
} = process.env;
|
||||
```
|
||||
|
||||
npm run dev
|
||||
|
||||
|
||||
|
@ -1,8 +1,8 @@
|
||||
module.exports = {
|
||||
apps: [
|
||||
{
|
||||
name: "hodler",
|
||||
script: "./src/hodler.js",
|
||||
name: "master-hodler",
|
||||
script: "./src/master.js",
|
||||
cwd: "/orchTest/master",
|
||||
env_production: {
|
||||
NODE_ENV: "production"
|
||||
|
@ -1,22 +1,22 @@
|
||||
{
|
||||
"name": "hodler",
|
||||
"version": "0.0.1",
|
||||
"name": "master-hodler",
|
||||
"version": "0.1.0",
|
||||
"description": "",
|
||||
"main": "./src/hodler.js",
|
||||
"module": "./src/hodler.js",
|
||||
"main": "./src/master.js",
|
||||
"type": "module",
|
||||
"module": "./src/master.js",
|
||||
"scripts": {
|
||||
"prod": "NODE_ENV=production pm2 start ecosystem.config.js",
|
||||
"start": "NODE_ENV=production node ./src/hodler.js",
|
||||
"dev": "NODE_ENV=development nodemon ./src/hodler.js",
|
||||
"local": "NODE_ENV=local nodemon ./src/hodler.js"
|
||||
"start": "NODE_ENV=production node ./src/master.js",
|
||||
"dev": "NODE_ENV=development nodemon ./src/master.js",
|
||||
"local": "NODE_ENV=local nodemon ./src/master.js"
|
||||
},
|
||||
"keywords": [],
|
||||
"author": "Marco van Dijk",
|
||||
"license": "WTFPL",
|
||||
"dependencies": {
|
||||
"esm": "^3.2.20",
|
||||
"ethers": "^6.8.1",
|
||||
"express": "^4.17.1",
|
||||
"node-persist": "^3.1.0",
|
||||
"npm": "^8.5.2",
|
||||
"prom-client": "^14.0.1"
|
||||
},
|
||||
|
@ -1,2 +0,0 @@
|
||||
require = require("esm")(module)
|
||||
module.exports = require("./server.js")
|
36
master/src/master.js
Normal file
36
master/src/master.js
Normal file
@ -0,0 +1,36 @@
|
||||
import express from "express";
|
||||
import { masterRouter } from "./routes/index.js";
|
||||
import { CONF_MASTER_PORT } from "./config.js";
|
||||
// Env variable which determines which DB to connect to
|
||||
const { NODE_ENV: mode } = process.env;
|
||||
|
||||
(async () => {
|
||||
try {
|
||||
const app = express();
|
||||
app.disable("x-powered-by");
|
||||
app.use(express.urlencoded({ extended: true, limit: "1000kb" }));
|
||||
app.use(express.json({ limit: "1000kb" }));
|
||||
const apiRouter = express.Router();
|
||||
app.use("/api", apiRouter);
|
||||
apiRouter.use("/master", masterRouter);
|
||||
// Error handler
|
||||
app.use(function (err, req, res, next) {
|
||||
res.locals.message = err.message;
|
||||
// Also log it to the console
|
||||
console.log(
|
||||
`${err.status || 500} - ${err.message} - ${req.originalUrl} - ${
|
||||
req.method
|
||||
} - ${req.ip}`
|
||||
);
|
||||
// Render the error page
|
||||
res.status(err.status || 500);
|
||||
res.render("error");
|
||||
});
|
||||
|
||||
app.listen(CONF_MASTER_PORT, "0.0.0.0", function () {
|
||||
console.log(`Listening on port ${CONF_MASTER_PORT}`);
|
||||
});
|
||||
} catch (err) {
|
||||
console.log(err);
|
||||
}
|
||||
})();
|
490
master/src/routes/hodler.js
Normal file
490
master/src/routes/hodler.js
Normal file
@ -0,0 +1,490 @@
|
||||
import express from "express";
|
||||
import client from "prom-client";
|
||||
import { JsonRpcProvider } from "ethers";
|
||||
import {
|
||||
CONF_API_L1_HTTP,
|
||||
CONF_API_L1_KEY,
|
||||
CONF_PRESHARED_MASTER_KEY,
|
||||
CONF_TIMEOUT_ENS_DOMAIN,
|
||||
CONF_KEY_EXPIRY,
|
||||
CONF_SCORE_TIMEOUT,
|
||||
CONF_SLEEPTIME,
|
||||
} from "../config.js";
|
||||
|
||||
/*
|
||||
|
||||
Init
|
||||
|
||||
*/
|
||||
|
||||
const l1provider = new JsonRpcProvider(CONF_API_L1_HTTP + CONF_API_L1_KEY);
|
||||
const masterRouter = express.Router();
|
||||
const register = new client.Registry();
|
||||
|
||||
// Regional stats - prober instance specific
|
||||
const promLatestLatency = new client.Gauge({
|
||||
name: "orch_latest_latency",
|
||||
help: "Latest latency known for a given Orchestrator",
|
||||
labelNames: ["region", "orchestrator", "latitude", "longitude"],
|
||||
});
|
||||
register.registerMetric(promLatestLatency);
|
||||
const promLatency = new client.Summary({
|
||||
name: "orch_latency",
|
||||
help: "Summary of latency stats",
|
||||
percentiles: [0.01, 0.1, 0.9, 0.99],
|
||||
labelNames: ["region"],
|
||||
});
|
||||
register.registerMetric(promLatency);
|
||||
const promAverageLatency = new client.Gauge({
|
||||
name: "orch_average_latency",
|
||||
help: "Average latency for a given Orchestrator",
|
||||
labelNames: ["region", "orchestrator", "latitude", "longitude"],
|
||||
});
|
||||
register.registerMetric(promAverageLatency);
|
||||
|
||||
// Regional stats - Livepeer test stream specific
|
||||
const promLatestRTR = new client.Gauge({
|
||||
name: "orch_latest_rtr",
|
||||
help: "Latest realtime ratio as specified by Livepeer inc's regional performance leaderboards",
|
||||
labelNames: ["livepeer_region", "orchestrator", "latitude", "longitude"],
|
||||
});
|
||||
register.registerMetric(promLatestRTR);
|
||||
const promLatestSuccessRate = new client.Gauge({
|
||||
name: "orch_latest_success_rate",
|
||||
help: "Latest success rate as specified by Livepeer inc's regional performance leaderboards",
|
||||
labelNames: ["livepeer_region", "orchestrator", "latitude", "longitude"],
|
||||
});
|
||||
register.registerMetric(promLatestSuccessRate);
|
||||
|
||||
// Global stats - orchestrator instance specific
|
||||
const promLatestPPP = new client.Gauge({
|
||||
name: "orch_latest_ppp",
|
||||
help: "Latest price per pixel known for a given Orchestrator",
|
||||
labelNames: ["instance", "orchestrator", "latitude", "longitude"],
|
||||
});
|
||||
register.registerMetric(promLatestPPP);
|
||||
const promAUptimeScore = new client.Gauge({
|
||||
name: "orch_uptime_score",
|
||||
help: "Uptime score for a given orchestrator",
|
||||
labelNames: ["instance", "orchestrator", "latitude", "longitude"],
|
||||
});
|
||||
register.registerMetric(promAUptimeScore);
|
||||
|
||||
function sleep(ms) {
|
||||
return new Promise((resolve) => {
|
||||
setTimeout(resolve, ms);
|
||||
});
|
||||
}
|
||||
|
||||
/*
|
||||
|
||||
Globals
|
||||
|
||||
*/
|
||||
|
||||
let ensDomainCache = {};
|
||||
let orchCache = {};
|
||||
let lastLeaderboardCheck = 0;
|
||||
|
||||
/*
|
||||
|
||||
ENS
|
||||
|
||||
*/
|
||||
|
||||
const getEnsDomain = async function (addr) {
|
||||
const now = new Date().getTime();
|
||||
const cached = ensDomainCache[addr];
|
||||
if (cached && now - cached.timestamp < CONF_TIMEOUT_ENS_DOMAIN) {
|
||||
return cached.domain || cached.address;
|
||||
}
|
||||
// Refresh cause not cached or stale
|
||||
let ensDomain;
|
||||
try {
|
||||
ensDomain = await l1provider.lookupAddress(addr.toLowerCase());
|
||||
} catch (err) {
|
||||
console.log("Error looking up ENS info, retrying...");
|
||||
await sleep(50);
|
||||
return null;
|
||||
}
|
||||
let ensObj;
|
||||
if (!ensDomain) {
|
||||
ensObj = {
|
||||
domain: null,
|
||||
address: addr,
|
||||
timestamp: now,
|
||||
};
|
||||
} else {
|
||||
ensObj = {
|
||||
domain: ensDomain,
|
||||
address: addr,
|
||||
timestamp: now,
|
||||
};
|
||||
}
|
||||
console.log(
|
||||
"Updated ENS domain " +
|
||||
ensObj.domain +
|
||||
" owned by " +
|
||||
ensObj.address +
|
||||
" @ " +
|
||||
ensObj.timestamp
|
||||
);
|
||||
ensDomainCache[addr] = ensObj;
|
||||
return ensObj.domain || ensObj.address;
|
||||
};
|
||||
|
||||
/*
|
||||
|
||||
Update cache & prometheus
|
||||
|
||||
*/
|
||||
|
||||
const updatePrometheus = async function (tag, id, instance, orchInfo) {
|
||||
const thisInstance = orchInfo.instances[instance];
|
||||
const regionInfo = orchInfo.regionalStats[tag];
|
||||
if (regionInfo.latestDiscoveryTime) {
|
||||
promLatestLatency.set(
|
||||
{
|
||||
region: tag,
|
||||
orchestrator: orchInfo.name,
|
||||
latitude: thisInstance.latitude,
|
||||
longitude: thisInstance.longitude,
|
||||
},
|
||||
regionInfo.latestDiscoveryTime
|
||||
);
|
||||
}
|
||||
if (regionInfo.avgDiscoveryTime) {
|
||||
promAverageLatency.set(
|
||||
{
|
||||
region: tag,
|
||||
orchestrator: orchInfo.name,
|
||||
latitude: thisInstance.latitude,
|
||||
longitude: thisInstance.longitude,
|
||||
},
|
||||
regionInfo.avgDiscoveryTime
|
||||
);
|
||||
}
|
||||
promAUptimeScore.set(
|
||||
{
|
||||
instance: instance,
|
||||
orchestrator: orchInfo.name,
|
||||
latitude: thisInstance.latitude,
|
||||
longitude: thisInstance.longitude,
|
||||
},
|
||||
regionInfo.uptimePercentage
|
||||
);
|
||||
promLatestPPP.set(
|
||||
{
|
||||
instance: instance,
|
||||
orchestrator: orchInfo.name,
|
||||
latitude: thisInstance.latitude,
|
||||
longitude: thisInstance.longitude,
|
||||
},
|
||||
thisInstance.price
|
||||
);
|
||||
promLatency.observe({ region: tag }, regionInfo.latestDiscoveryTime);
|
||||
};
|
||||
|
||||
const onOrchUpdate = async function (id, obj, tag, region, livepeer_regions) {
|
||||
const now = new Date().getTime();
|
||||
// Overwrite name with ENS domain if set
|
||||
let ensDomain = null;
|
||||
while (!ensDomain) {
|
||||
ensDomain = await getEnsDomain(id);
|
||||
}
|
||||
obj.name = ensDomain;
|
||||
// Retrieve entry to update or init it
|
||||
let newObj = orchCache[id];
|
||||
if (!newObj) {
|
||||
newObj = {
|
||||
name: "",
|
||||
regionalStats: {},
|
||||
instances: {},
|
||||
leaderboardResults: { lastTime: now },
|
||||
};
|
||||
}
|
||||
// Find region entry or init it
|
||||
let newRegion = newObj.regionalStats[tag];
|
||||
if (!newRegion) {
|
||||
newRegion = {
|
||||
measurements: [],
|
||||
avgDiscoveryTime: -1,
|
||||
uptimePercentage: 1.0,
|
||||
latestDiscoveryTime: -1,
|
||||
};
|
||||
}
|
||||
|
||||
// Record measurement
|
||||
let measurement = {
|
||||
latency: obj.discovery.latency,
|
||||
timestamp: now,
|
||||
duration: 0,
|
||||
};
|
||||
if (newRegion.measurements.length) {
|
||||
measurement.duration =
|
||||
now - newRegion.measurements[newRegion.measurements.length - 1].timestamp;
|
||||
}
|
||||
newRegion.measurements.push(measurement);
|
||||
|
||||
// Recalc average && uptime
|
||||
let uptime = 0;
|
||||
let downtime = 0;
|
||||
let pingSum = 0;
|
||||
let pingEntries = 0;
|
||||
for (const measurement of newRegion.measurements) {
|
||||
if (measurement.latency && measurement.latency > 0) {
|
||||
if (measurement.duration) {
|
||||
uptime += measurement.duration;
|
||||
}
|
||||
pingSum += measurement.latency;
|
||||
pingEntries++;
|
||||
} else {
|
||||
if (measurement.duration) {
|
||||
downtime += measurement.duration;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (pingEntries > 0) {
|
||||
newRegion.avgDiscoveryTime = pingSum / pingEntries;
|
||||
} else {
|
||||
newRegion.avgDiscoveryTime = measurement.latency;
|
||||
}
|
||||
newRegion.latestDiscoveryTime = measurement.latency;
|
||||
|
||||
if (
|
||||
downtime ||
|
||||
(!newRegion.avgDiscoveryTime && !newRegion.latestDiscoveryTime)
|
||||
) {
|
||||
if (!uptime) {
|
||||
newRegion.uptimePercentage = 0.0;
|
||||
} else {
|
||||
newRegion.uptimePercentage = uptime / (uptime + downtime);
|
||||
}
|
||||
}
|
||||
|
||||
// Find instance entry or init it
|
||||
let newInstance = newObj.instances[obj.resolv.resolvedTarget];
|
||||
if (!newInstance) {
|
||||
newInstance = {
|
||||
price: -1,
|
||||
latitude: -1,
|
||||
longitude: -1,
|
||||
probedFrom: {},
|
||||
regions: {},
|
||||
livepeer_regions: {},
|
||||
};
|
||||
}
|
||||
|
||||
// Remove expired stuff
|
||||
for (const [id, obj] of Object.entries(newInstance.probedFrom)) {
|
||||
if (now - obj.lastTime > CONF_KEY_EXPIRY) {
|
||||
newInstance.probedFrom[id] = null;
|
||||
}
|
||||
}
|
||||
for (const [id, obj] of Object.entries(newInstance.regions)) {
|
||||
if (now - obj.lastTime > CONF_KEY_EXPIRY) {
|
||||
newInstance.regions[id] = null;
|
||||
}
|
||||
}
|
||||
for (const [id, obj] of Object.entries(newInstance.livepeer_regions)) {
|
||||
if (now - obj.lastTime > CONF_KEY_EXPIRY) {
|
||||
newInstance.livepeer_regions[id] = null;
|
||||
}
|
||||
}
|
||||
|
||||
// Set last times for instance info
|
||||
newInstance.probedFrom[tag] = {
|
||||
lastTime: now,
|
||||
};
|
||||
newInstance.regions[region] = {
|
||||
lastTime: now,
|
||||
};
|
||||
for (const region of livepeer_regions) {
|
||||
newInstance.livepeer_regions[region] = {
|
||||
lastTime: now,
|
||||
};
|
||||
}
|
||||
|
||||
// Set location and price info
|
||||
if (obj.discovery.price_info) {
|
||||
newInstance.price =
|
||||
obj.discovery.price_info.pricePerUnit /
|
||||
obj.discovery.price_info.pixelsPerUnit;
|
||||
}
|
||||
if (obj.resolv.geoLookup) {
|
||||
newInstance.latitude = obj.resolv.geoLookup.latitude;
|
||||
newInstance.longitude = obj.resolv.geoLookup.longitude;
|
||||
}
|
||||
|
||||
// Update name
|
||||
newObj.name = obj.name;
|
||||
|
||||
// Finished updating
|
||||
newObj.instances[obj.resolv.resolvedTarget] = newInstance;
|
||||
newObj.regionalStats[tag] = newRegion;
|
||||
orchCache[id] = newObj;
|
||||
|
||||
// Update prometheus stats
|
||||
updatePrometheus(tag, id, obj.resolv.resolvedTarget, newObj);
|
||||
console.log("Handled results for " + id + " from prober " + tag);
|
||||
};
|
||||
|
||||
const updateCache = async function (
|
||||
batchResults,
|
||||
tag,
|
||||
region,
|
||||
livepeer_regions
|
||||
) {
|
||||
console.log("Parsing stats from " + tag + " (" + region + ")");
|
||||
|
||||
for (const [id, obj] of Object.entries(batchResults)) {
|
||||
onOrchUpdate(id, obj, tag, region, livepeer_regions);
|
||||
}
|
||||
};
|
||||
|
||||
/*
|
||||
|
||||
Public endpoints
|
||||
|
||||
*/
|
||||
|
||||
// Mutate state with new stats
|
||||
masterRouter.post("/collectStats", async (req, res) => {
|
||||
try {
|
||||
const { batchResults, tag, key, region, livepeer_regions } = req.body;
|
||||
if (!batchResults || !tag || !key || !region || !livepeer_regions) {
|
||||
console.log("Received malformed data. Aborting stats update...");
|
||||
console.log(batchResults, tag, key, region, livepeer_regions);
|
||||
res.send(false);
|
||||
return;
|
||||
}
|
||||
if (CONF_PRESHARED_MASTER_KEY != req.body.key) {
|
||||
console.log("Unauthorized");
|
||||
res.send(false);
|
||||
return;
|
||||
}
|
||||
updateCache(batchResults, tag, region, livepeer_regions);
|
||||
res.send(true);
|
||||
} catch (err) {
|
||||
console.log(err, req.body);
|
||||
res.status(400).send(err);
|
||||
}
|
||||
});
|
||||
|
||||
// Retrieve prom data
|
||||
masterRouter.get("/prometheus", async (req, res) => {
|
||||
try {
|
||||
res.set("Content-Type", register.contentType);
|
||||
const metrics = await register.metrics();
|
||||
res.end(metrics);
|
||||
} catch (err) {
|
||||
res.status(400).send(err);
|
||||
}
|
||||
});
|
||||
|
||||
// Retrieve cache as JSON
|
||||
masterRouter.get("/json", async (req, res) => {
|
||||
try {
|
||||
res.set("Content-Type", "application/json");
|
||||
res.end(JSON.stringify(orchCache));
|
||||
} catch (err) {
|
||||
res.status(400).send(err);
|
||||
}
|
||||
});
|
||||
|
||||
/*
|
||||
|
||||
Main Loop
|
||||
|
||||
*/
|
||||
|
||||
const updateScore = async function (address) {
|
||||
console.log("Checking for new scores for " + address);
|
||||
const lastTime = orchCache[address].leaderboardResults.lastTime;
|
||||
|
||||
let url =
|
||||
"https://leaderboard-serverless.vercel.app/api/raw_stats?orchestrator=" +
|
||||
address;
|
||||
|
||||
const json = await fetch(url).then((res) => res.json());
|
||||
let hasEdited = false;
|
||||
for (const [region, results] of Object.entries(json)) {
|
||||
for (const instance of results) {
|
||||
if (instance.timestamp > lastTime) {
|
||||
const newRTR = instance.success_rate;
|
||||
const newSR = instance.round_trip_time / instance.seg_duration;
|
||||
let latitude = null;
|
||||
let longitude = null;
|
||||
for (const instance of orchCache[address].instances) {
|
||||
if (instance.livepeer_regions[region]) {
|
||||
latitude = instance.livepeer_regions[region].latitude;
|
||||
longitude = instance.livepeer_regions[region].longitude;
|
||||
}
|
||||
}
|
||||
console.log(
|
||||
"Found new RTR=" +
|
||||
newRTR +
|
||||
" and new success rate of " +
|
||||
newSR * 100 +
|
||||
"%, livepeer region " +
|
||||
instance.region
|
||||
);
|
||||
promLatestRTR.set(
|
||||
{
|
||||
livepeer_region: instance.region,
|
||||
orchestrator: address,
|
||||
latitude: latitude,
|
||||
longitude: longitude,
|
||||
},
|
||||
newRTR
|
||||
);
|
||||
promLatestSuccessRate.set(
|
||||
{
|
||||
livepeer_region: instance.region,
|
||||
orchestrator: address,
|
||||
latitude: latitude,
|
||||
longitude: longitude,
|
||||
},
|
||||
newSR
|
||||
);
|
||||
hasEdited = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (hasEdited) {
|
||||
orchCache[address].leaderboardResults.lastTime = new Date().getTime();
|
||||
}
|
||||
};
|
||||
|
||||
const updateOrchScores = async function () {
|
||||
for (const [id, obj] of Object.entries(orchCache)) {
|
||||
await updateScore(id);
|
||||
}
|
||||
};
|
||||
|
||||
const runTests = async function () {
|
||||
try {
|
||||
const now = new Date().getTime();
|
||||
if (
|
||||
!lastLeaderboardCheck ||
|
||||
now - lastLeaderboardCheck > CONF_SCORE_TIMEOUT
|
||||
) {
|
||||
await updateOrchScores();
|
||||
lastLeaderboardCheck = now;
|
||||
}
|
||||
setTimeout(() => {
|
||||
runTests();
|
||||
}, CONF_SLEEPTIME);
|
||||
return;
|
||||
} catch (err) {
|
||||
console.log(err);
|
||||
setTimeout(() => {
|
||||
runTests();
|
||||
}, CONF_SLEEPTIME);
|
||||
}
|
||||
};
|
||||
console.log("Starting watcher for test stream results");
|
||||
runTests();
|
||||
|
||||
export default masterRouter;
|
@ -1,2 +1,2 @@
|
||||
import masterRouter from './master';
|
||||
import masterRouter from './hodler.js';
|
||||
export { masterRouter };
|
@ -1,309 +0,0 @@
|
||||
import express, { response } from "express";
|
||||
import {
|
||||
PRESHARED_MASTER_KEY, CONF_ENS_TIMEOUT
|
||||
} from "../config";
|
||||
const https = require('https');
|
||||
const storage = require('node-persist');
|
||||
const masterRouter = express.Router();
|
||||
const client = require('prom-client');
|
||||
const register = new client.Registry();
|
||||
const collectDefaultMetrics = client.collectDefaultMetrics;
|
||||
collectDefaultMetrics({ timeout: 3000 });
|
||||
const promLatestLatency = new client.Gauge({
|
||||
name: 'orch_latest_latency',
|
||||
help: 'Latest latency known for a given Orchestrator',
|
||||
labelNames: ['region', 'orchestrator', 'latitude', 'longitude']
|
||||
});
|
||||
register.registerMetric(promLatestLatency);
|
||||
const promLatestPPP = new client.Gauge({
|
||||
name: 'orch_latest_ppp',
|
||||
help: 'Latest price per pixel known for a given Orchestrator',
|
||||
labelNames: ['region', 'orchestrator', 'latitude', 'longitude']
|
||||
});
|
||||
register.registerMetric(promLatestPPP);
|
||||
const promLatency = new client.Summary({
|
||||
name: 'orch_latency',
|
||||
help: 'Summary of latency stats',
|
||||
percentiles: [0.01, 0.1, 0.9, 0.99],
|
||||
labelNames: ['region']
|
||||
});
|
||||
register.registerMetric(promLatency);
|
||||
const promAverageLatency = new client.Gauge({
|
||||
name: 'orch_average_latency',
|
||||
help: 'Average latency for a given Orchestrator',
|
||||
labelNames: ['region', 'orchestrator', 'latitude', 'longitude']
|
||||
});
|
||||
register.registerMetric(promAverageLatency);
|
||||
const promAUptimeScore = new client.Gauge({
|
||||
name: 'orch_uptime_score',
|
||||
help: 'Uptime score for a given orchestrator',
|
||||
labelNames: ['region', 'orchestrator', 'latitude', 'longitude']
|
||||
});
|
||||
register.registerMetric(promAUptimeScore);
|
||||
|
||||
let isSynced = false;
|
||||
|
||||
|
||||
|
||||
/*
|
||||
|
||||
ENS data from nFrame
|
||||
[{"domain":null,"address":"0xc3c7c4c8f7061b7d6a72766eee5359fe4f36e61e","timestamp":1659963951567}]
|
||||
|
||||
*/
|
||||
|
||||
|
||||
let ensData = [];
|
||||
let lastUpdated = 0;
|
||||
|
||||
const updateEns = async function () {
|
||||
try {
|
||||
const url = "https://stronk.rocks/api/livepeer/getEnsDomains";
|
||||
console.log("Getting new ENS data from " + url);
|
||||
|
||||
https.get(url, (res2) => {
|
||||
let body = "";
|
||||
res2.on("data", (chunk) => {
|
||||
body += chunk;
|
||||
});
|
||||
res2.on("end", () => {
|
||||
try {
|
||||
const data = JSON.parse(body);
|
||||
for (const newOrchData of data) {
|
||||
if (!newOrchData) { continue; }
|
||||
if (!newOrchData.domain) { continue; }
|
||||
var found = false;
|
||||
for (var orchIdx = 0; orchIdx < ensData.length; orchIdx++) {
|
||||
if (ensData[orchIdx].address != newOrchData.address) { continue; }
|
||||
ensData[orchIdx] = newOrchData;
|
||||
found = true;
|
||||
break;
|
||||
}
|
||||
if (!found) {
|
||||
ensData.push(newOrchData);
|
||||
}
|
||||
}
|
||||
storage.setItem('ensData', ensData);
|
||||
} catch (error) {
|
||||
console.error(error.message);
|
||||
};
|
||||
});
|
||||
}).on("error", (error) => {
|
||||
console.error(error.message);
|
||||
});
|
||||
} catch (err) {
|
||||
console.log(err);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
|
||||
Incoming stats parsing
|
||||
|
||||
*/
|
||||
|
||||
masterRouter.post("/collectStats", async (req, res) => {
|
||||
try {
|
||||
if (!isSynced) { console.log("waiting for sync"); res.end('busy'); return; }
|
||||
const { id, discoveryResults, responseTime, lookupResults, tag, key } = req.body;
|
||||
if (!id || !tag || !key) {
|
||||
console.log("Received malformed data. Aborting stats update...");
|
||||
console.log(id, discoveryResults, responseTime, lookupResults, tag, key);
|
||||
res.send(false);
|
||||
return;
|
||||
}
|
||||
if (PRESHARED_MASTER_KEY != key) {
|
||||
console.log("Unauthorized");
|
||||
res.send(false);
|
||||
return;
|
||||
}
|
||||
let latitude = null;
|
||||
let longitude = null;
|
||||
if (lookupResults && lookupResults.geoLookup && lookupResults.geoLookup.ll){
|
||||
latitude = lookupResults.geoLookup.ll[0];
|
||||
longitude = lookupResults.geoLookup.ll[1];
|
||||
}
|
||||
let thisId = id;
|
||||
if (responseTime) {
|
||||
for (const thisEns of ensData) {
|
||||
if (!thisEns || !thisEns.domain);
|
||||
if (thisEns.address != thisId) { continue; }
|
||||
thisId = thisEns.domain;
|
||||
}
|
||||
promLatestLatency.set({ region: tag, orchestrator: thisId, latitude: latitude, longitude: longitude }, responseTime);
|
||||
promLatency.observe({ region: tag }, responseTime);
|
||||
}
|
||||
if (discoveryResults && discoveryResults.price_info){
|
||||
promLatestPPP.set({ region: tag, orchestrator: thisId, latitude: latitude, longitude: longitude }, discoveryResults.price_info.pricePerUnit / discoveryResults.price_info.pixelsPerUnit);
|
||||
}
|
||||
console.log('received data for ' + thisId + ' from ' + tag + ' (' + responseTime + " ms latency)");
|
||||
// Save data point
|
||||
const now = new Date().getTime();
|
||||
// Update ENS from nframe if expired
|
||||
if (now - lastUpdated > CONF_ENS_TIMEOUT) {
|
||||
await updateEns();
|
||||
lastUpdated = now;
|
||||
}
|
||||
let thisPing = responseTime;
|
||||
if (!discoveryResults || !responseTime) { thisPing = null; }
|
||||
let currentDataList = [];
|
||||
let orchFound = false;
|
||||
let regionFound = false;
|
||||
for (var orchIdx = 0; orchIdx < orchScores.length; orchIdx++) {
|
||||
if (orchScores[orchIdx].id != thisId) { continue; }
|
||||
orchFound = true;
|
||||
for (var regionIdx = 0; regionIdx < orchScores[orchIdx].data.length; regionIdx++) {
|
||||
if (orchScores[orchIdx].data[regionIdx].tag != tag) { continue; }
|
||||
regionFound = true;
|
||||
if (orchScores[orchIdx].data[regionIdx].data.length > 60) {
|
||||
orchScores[orchIdx].data[regionIdx].data = orchScores[orchIdx].data[regionIdx].data.slice(1);
|
||||
}
|
||||
orchScores[orchIdx].data[regionIdx].data.push({ latency: thisPing, timestamp: now });
|
||||
currentDataList = orchScores[orchIdx].data[regionIdx].data;
|
||||
break;
|
||||
}
|
||||
if (!regionFound) {
|
||||
currentDataList = [{ latency: thisPing, timestamp: now }];
|
||||
orchScores[orchIdx].data.push({ tag, data: currentDataList });
|
||||
}
|
||||
break;
|
||||
}
|
||||
if (!orchFound) {
|
||||
currentDataList = [{ latency: thisPing, timestamp: now }];
|
||||
orchScores.push({ id: thisId, data: [{ tag, data: currentDataList }] });
|
||||
}
|
||||
await storage.setItem('orchScores', orchScores);
|
||||
// Calc new scores
|
||||
let prevtime = null;
|
||||
let uptime = 0;
|
||||
let downtime = 0;
|
||||
let pingsum = 0;
|
||||
let pingpoints = 0;
|
||||
for (const thisData of currentDataList) {
|
||||
// Count ping* vars
|
||||
if (thisData.latency && thisData.latency != null) {
|
||||
pingsum += thisData.latency;
|
||||
pingpoints += 1;
|
||||
}
|
||||
// Only count *time vars if we have timestamps
|
||||
if (prevtime && thisData.timestamp) {
|
||||
if (thisData.latency && thisData.latency != null && thisData.latency < 1000) {
|
||||
uptime += thisData.timestamp - prevtime;
|
||||
} else {
|
||||
downtime += thisData.timestamp - prevtime;
|
||||
}
|
||||
}
|
||||
prevtime = thisData.timestamp;
|
||||
}
|
||||
if (pingpoints) {
|
||||
promAverageLatency.set({ region: tag, orchestrator: thisId, latitude: latitude, longitude: longitude }, pingsum / pingpoints);
|
||||
}
|
||||
if (uptime || downtime) {
|
||||
let score;
|
||||
if (!uptime) { score = 0; }
|
||||
else { score = uptime / (uptime + downtime); }
|
||||
promAUptimeScore.set({ region: tag, orchestrator: thisId, latitude: latitude, longitude: longitude }, score);
|
||||
}
|
||||
res.send(true);
|
||||
} catch (err) {
|
||||
console.log(err, req.body);
|
||||
res.status(400).send(err);
|
||||
}
|
||||
});
|
||||
|
||||
|
||||
/*
|
||||
|
||||
Public endpoints
|
||||
|
||||
*/
|
||||
|
||||
|
||||
masterRouter.get("/prometheus", async (req, res) => {
|
||||
try {
|
||||
res.set('Content-Type', register.contentType);
|
||||
const metrics = await register.metrics();
|
||||
res.end(metrics);
|
||||
} catch (err) {
|
||||
res.status(400).send(err);
|
||||
}
|
||||
});
|
||||
|
||||
masterRouter.get("/json", async (req, res) => {
|
||||
try {
|
||||
res.set('Content-Type', 'application/json');
|
||||
res.end(JSON.stringify(orchScores));
|
||||
} catch (err) {
|
||||
res.status(400).send(err);
|
||||
}
|
||||
});
|
||||
|
||||
|
||||
/*
|
||||
|
||||
Recover from storage
|
||||
|
||||
*/
|
||||
|
||||
|
||||
let orchScores;
|
||||
|
||||
const recoverStorage = async function () {
|
||||
await storage.init({
|
||||
stringify: JSON.stringify,
|
||||
parse: JSON.parse,
|
||||
encoding: 'utf8',
|
||||
logging: false,
|
||||
ttl: false,
|
||||
forgiveParseErrors: false
|
||||
});
|
||||
ensData = await storage.getItem('ensData');
|
||||
if (!ensData) { ensData = []; }
|
||||
orchScores = await storage.getItem('orchScores');
|
||||
if (!orchScores) { orchScores = []; }
|
||||
// Init prometheus from storage
|
||||
for (const thisOrch of orchScores) {
|
||||
console.log("recovering scores for " + thisOrch.id);
|
||||
for (const thisRegion of thisOrch.data) {
|
||||
let prevtime = null;
|
||||
let uptime = 0;
|
||||
let downtime = 0;
|
||||
let pingsum = 0;
|
||||
let pingpoints = 0;
|
||||
for (const thisData of thisRegion.data) {
|
||||
// Count ping* vars
|
||||
if (thisData.latency) {
|
||||
pingsum += thisData.latency;
|
||||
pingpoints += 1;
|
||||
promLatestLatency.set({ region: thisRegion.tag, orchestrator: thisOrch.id }, thisData.latency);
|
||||
promLatency.observe({ region: thisRegion.tag }, thisData.latency);
|
||||
}
|
||||
// Only count *time vars if we have timestamps
|
||||
if (prevtime && thisData.timestamp) {
|
||||
if (thisData.latency) {
|
||||
uptime += thisData.timestamp - prevtime;
|
||||
} else {
|
||||
downtime += thisData.timestamp - prevtime;
|
||||
}
|
||||
}
|
||||
prevtime = thisData.timestamp;
|
||||
}
|
||||
if (pingpoints) {
|
||||
promAverageLatency.set({ region: thisRegion.tag, orchestrator: thisOrch.id }, pingsum / pingpoints);
|
||||
}
|
||||
if (uptime || downtime) {
|
||||
const score = uptime / (uptime + downtime)
|
||||
promAUptimeScore.set({ region: thisRegion.tag, orchestrator: thisOrch.id }, score);
|
||||
}
|
||||
}
|
||||
}
|
||||
isSynced = true;
|
||||
}
|
||||
recoverStorage();
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
export default masterRouter;
|
@ -1,32 +0,0 @@
|
||||
import express from 'express';
|
||||
import { masterRouter } from './routes/index';
|
||||
import { MASTER_PORT } from "./config";
|
||||
// Env variable which determines which DB to connect to
|
||||
const { NODE_ENV: mode } = process.env;
|
||||
|
||||
(async () => {
|
||||
try {
|
||||
const app = express();
|
||||
app.disable('x-powered-by');
|
||||
app.use(express.urlencoded({ extended: true }));
|
||||
app.use(express.json());
|
||||
const apiRouter = express.Router();
|
||||
app.use('/api', apiRouter);
|
||||
apiRouter.use('/master', masterRouter);
|
||||
// Error handler
|
||||
app.use(function (err, req, res, next) {
|
||||
res.locals.message = err.message;
|
||||
// Also log it to the console
|
||||
console.log(`${err.status || 500} - ${err.message} - ${req.originalUrl} - ${req.method} - ${req.ip}`);
|
||||
// Render the error page
|
||||
res.status(err.status || 500);
|
||||
res.render('error');
|
||||
});
|
||||
|
||||
app.listen(MASTER_PORT, "0.0.0.0", function () {
|
||||
console.log(`Listening on port ${MASTER_PORT}`);
|
||||
});
|
||||
} catch (err) {
|
||||
console.log(err);
|
||||
}
|
||||
})();
|
Loading…
x
Reference in New Issue
Block a user