mirror of
https://github.com/stronk-dev/OrchestratorTracker.git
synced 2025-07-06 19:35:10 +02:00
Compare commits
15 Commits
abd90259ce
...
d3afd739ae
Author | SHA1 | Date | |
---|---|---|---|
d3afd739ae | |||
25db78d58f | |||
ab7c8e4510 | |||
488c3cff64 | |||
f7d1ee5cdd | |||
e51fb26db5 | |||
4720e88b46 | |||
b2ae0eeee5 | |||
df401322fc | |||
e8be381ee6 | |||
9b63d72b09 | |||
045b18a973 | |||
dd6021fe49 | |||
0858d0b83d | |||
6e2cdbbafa |
4
.gitignore
vendored
4
.gitignore
vendored
@ -1,3 +1,3 @@
|
|||||||
master/*.md
|
|
||||||
client/*.md
|
|
||||||
master/.node-persist/storage/*
|
master/.node-persist/storage/*
|
||||||
|
notes.md
|
||||||
|
genSignature/*
|
10
README.md
10
README.md
@ -1,11 +1,11 @@
|
|||||||
This program consists of three components:
|
This program consists of three components:
|
||||||
- A master which consolidates orchestrator statistics from various clients
|
- A master which consolidates orchestrator statistics from various clients
|
||||||
- Converts raw results of grpc calls into prometheus data points
|
- Publishes GetOrchestrator responses and discovery time
|
||||||
|
- Publishes orchestrator price info
|
||||||
|
- Observes Livepeer Inc performance leaderboards results to Prometheus
|
||||||
- TODO integration with email / telegram
|
- TODO integration with email / telegram
|
||||||
- TODO api to subscribe for notifications and cancel it
|
- TODO api to subscribe for notifications and cancel it
|
||||||
- TODO read test stream scores
|
|
||||||
- TODO allow custom names (configure via frontend, ENS done)
|
|
||||||
- A client which pushes stats to master
|
- A client which pushes stats to master
|
||||||
- Gets the list of active Orchestrators from the Livepeer subgraph
|
- Gets the list of active Orchestrators from the Livepeer subgraph
|
||||||
- Does GetOrchestrator calls to all Orchestrators one by one
|
- Test each of them and upload results
|
||||||
- A frontend where users can (un)subscribe to notifications
|
- TODO A frontend where users can (un)subscribe to notifications
|
3
client/.gitignore
vendored
3
client/.gitignore
vendored
@ -1,4 +1,3 @@
|
|||||||
/node_modules
|
/node_modules
|
||||||
/src/config.js
|
src/config.js
|
||||||
genSig.js
|
|
||||||
package-lock.json
|
package-lock.json
|
@ -3,20 +3,28 @@ Requires a Broadcaster with some reserve amount set. TODO
|
|||||||
### Config variables
|
### Config variables
|
||||||
```
|
```
|
||||||
{
|
{
|
||||||
CLIENT_PORT: what port the client accepts API requests on
|
CONF_MASTER_PORT: Port to upload results to
|
||||||
MASTER_PORT: what port the master should be listening on
|
CONF_MASTER_DOMAIN: Domain to upload results to
|
||||||
MASTER_DOMAIN: the domain on which the master is hosted
|
CONF_MASTER_PATH: Path to the upload endpoint in case there's a reverse proxy
|
||||||
MASTER_PATH: the path to the collectStats endpoint. Should be `/api/master/collectStats` unless you are running a reverse proxy
|
CONF_FRIENDLY_NAME: Name to attach to the prober, the city name usually works
|
||||||
FRIENDLY_NAME: `region` label which gets attached to collected data
|
CONF_REGION: Continent the prober is located in
|
||||||
PRESHARED_MASTER_KEY: must be the same as the `PRESHARED_MASTER_KEY` on the master
|
CONF_LP_REGIONS: Livepeer leaderboard regions which map to locally resolved orchestrators
|
||||||
CONF_SLEEPTIME: time between testing Orchestrators in milliseconds
|
CONF_CONCURRENCY: Amount of orchestrators to test at the same time
|
||||||
CONF_ORCHINFO_TIMEOUT: timeout for refreshing the list of active orchestrators in milliseconds
|
CONF_SLEEPTIME: Time to sleep in the main loop
|
||||||
CONF_BROADCASTER: eth address of the broadcaster
|
CONF_MAX_LATENCY: Max GetOrch discovery time before we set it to 0
|
||||||
CONT_SIG = broadcasters' signature of their eth address - reach out on how to get this
|
CONF_ROUNDTIME: Minimum amount of time between batch-checks
|
||||||
|
CONF_ORCHINFO_TIMEOUT: Timeout between refreshing the active O list
|
||||||
|
CONF_BROADCASTER: Public address of the broadcaster to mimic
|
||||||
|
CONF_DNS_TIMEOUT: Timeout between DNS & GEO resolving for orchestrator instances
|
||||||
|
CONF_PRESHARED_MASTER_KEY: password for uploading to the hodler
|
||||||
|
CONF_SIGNATURE: Broadcaster signature over a message containing it's own public address
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
|
Obtaining the `CONF_SIGNATURE` is unfortunately a pain in the ass. The only way for now it add a print statement to the `livepeer/server/rpc.go/genOrchestratorReq` function in `go-livepeer` and then run the Broadcaster. All web3 libraries prepend a prefix to any message they sign for security purposes, so a script which can generate the signature from the private key would be nice to have here...
|
||||||
|
|
||||||
### Run production
|
### Run production
|
||||||
Note: this folder has to be placed in `/orchTest/client`
|
Note: this folder has to be placed in `/orchTest/client`, or edit `ecosystem.config.js` to match the new location
|
||||||
|
|
||||||
npm install
|
npm install
|
||||||
nano src/config.js
|
nano src/config.js
|
||||||
@ -24,22 +32,26 @@ Note: this folder has to be placed in `/orchTest/client`
|
|||||||
example `config.js`:
|
example `config.js`:
|
||||||
```
|
```
|
||||||
export const {
|
export const {
|
||||||
CLIENT_PORT = 42068,
|
CONF_MASTER_PORT = 443,
|
||||||
MASTER_PORT = 443,
|
CONF_MASTER_DOMAIN = "stronk.rocks",
|
||||||
MASTER_DOMAIN = "nframe.nl",
|
CONF_MASTER_PATH = "/orch/collectStats",
|
||||||
MASTER_PATH = "/orch/collectStats",
|
CONF_FRIENDLY_NAME = "Michigan",
|
||||||
FRIENDLY_NAME = "Chicago",
|
CONF_REGION = "Europe",
|
||||||
PRESHARED_MASTER_KEY = "koekjes",
|
CONF_LP_REGIONS = ["FRA", "LON", "PRA"],
|
||||||
CONF_SLEEPTIME = 2000,
|
CONF_CONCURRENCY = 6,
|
||||||
CONF_ORCHINFO_TIMEOUT = 600000,
|
CONF_SLEEPTIME = 2000, //< 2 seconds
|
||||||
|
CONF_MAX_LATENCY = 2000, //< 2 seconds
|
||||||
|
CONF_ROUNDTIME = 60000, //< 1 minute
|
||||||
|
CONF_ORCHINFO_TIMEOUT = 14400000, //< 4 hours
|
||||||
CONF_BROADCASTER = "847791cBF03be716A7fe9Dc8c9Affe17Bd49Ae5e",
|
CONF_BROADCASTER = "847791cBF03be716A7fe9Dc8c9Affe17Bd49Ae5e",
|
||||||
CONT_SIG = Buffer [array of uint8]
|
CONF_DNS_TIMEOUT = 600000, //< 10 minutes
|
||||||
|
CONF_PRESHARED_MASTER_KEY = "koekjes",
|
||||||
|
CONF_SIGNATURE = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
|
||||||
} = process.env;
|
} = process.env;
|
||||||
|
|
||||||
```
|
```
|
||||||
|
|
||||||
pm2 start ecosystem.config.js
|
pm2 start ecosystem.config.cjs
|
||||||
|
|
||||||
### Run development
|
### Run development
|
||||||
|
|
||||||
@ -49,16 +61,21 @@ export const {
|
|||||||
example `config.js`:
|
example `config.js`:
|
||||||
```
|
```
|
||||||
export const {
|
export const {
|
||||||
CLIENT_PORT = 42068,
|
CONF_MASTER_PORT = 42069,
|
||||||
MASTER_PORT = 42069,
|
CONF_MASTER_DOMAIN = "127.0.0.1",
|
||||||
MASTER_DOMAIN = "127.0.0.1",
|
CONF_MASTER_PATH = "/api/master/collectStats",
|
||||||
MASTER_PATH = "/api/master/collectStats",
|
CONF_FRIENDLY_NAME = "Leiden",
|
||||||
FRIENDLY_NAME = "Leiden",
|
CONF_REGION = "Europe",
|
||||||
PRESHARED_MASTER_KEY = "koekjes",
|
CONF_LP_REGIONS = ["FRA", "LON", "PRA"],
|
||||||
CONF_SLEEPTIME = 5000,
|
CONF_CONCURRENCY = 6,
|
||||||
CONF_ORCHINFO_TIMEOUT = 100000,
|
CONF_SLEEPTIME = 2000, //< 2 seconds
|
||||||
|
CONF_MAX_LATENCY = 2000, //< 2 seconds
|
||||||
|
CONF_ROUNDTIME = 60000, //< 1 minute
|
||||||
|
CONF_ORCHINFO_TIMEOUT = 14400000, //< 4 hours
|
||||||
CONF_BROADCASTER = "847791cBF03be716A7fe9Dc8c9Affe17Bd49Ae5e",
|
CONF_BROADCASTER = "847791cBF03be716A7fe9Dc8c9Affe17Bd49Ae5e",
|
||||||
CONT_SIG = Buffer [array of uint8]
|
CONF_DNS_TIMEOUT = 600000, //< 10 minutes
|
||||||
|
CONF_PRESHARED_MASTER_KEY = "koekjes",
|
||||||
|
CONF_SIGNATURE = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
|
||||||
} = process.env;
|
} = process.env;
|
||||||
```
|
```
|
||||||
|
|
||||||
|
@ -1,8 +1,8 @@
|
|||||||
module.exports = {
|
module.exports = {
|
||||||
apps: [
|
apps: [
|
||||||
{
|
{
|
||||||
name: "orchProber",
|
name: "client-orchestrator-prober",
|
||||||
script: "./src/orchProber.js",
|
script: "./src/client.js",
|
||||||
cwd: "/orchTest/client",
|
cwd: "/orchTest/client",
|
||||||
env_production: {
|
env_production: {
|
||||||
NODE_ENV: "production"
|
NODE_ENV: "production"
|
||||||
|
@ -1,14 +1,15 @@
|
|||||||
{
|
{
|
||||||
"name": "orchProber",
|
"name": "client-orchestrator-prober",
|
||||||
"version": "0.0.1",
|
"version": "0.1.0",
|
||||||
"description": "",
|
"description": "",
|
||||||
"main": "./src/orchProber.js",
|
"main": "./src/client.js",
|
||||||
"module": "./src/orchProber.js",
|
"type": "commonjs",
|
||||||
|
"module": "./src/client.js",
|
||||||
"scripts": {
|
"scripts": {
|
||||||
"prod": "NODE_ENV=production pm2 start ecosystem.config.js",
|
"prod": "NODE_ENV=production pm2 start ecosystem.config.js",
|
||||||
"start": "NODE_ENV=production node ./src/orchProber.js",
|
"start": "NODE_ENV=production node ./src/client.js",
|
||||||
"dev": "NODE_ENV=development nodemon ./src/orchProber.js",
|
"dev": "NODE_ENV=development nodemon ./src/client.js",
|
||||||
"local": "NODE_ENV=local nodemon ./src/orchProber.js"
|
"local": "NODE_ENV=local nodemon ./src/client.js"
|
||||||
},
|
},
|
||||||
"keywords": [],
|
"keywords": [],
|
||||||
"author": "Marco van Dijk",
|
"author": "Marco van Dijk",
|
||||||
@ -16,13 +17,9 @@
|
|||||||
"dependencies": {
|
"dependencies": {
|
||||||
"@grpc/grpc-js": "^1.6.8",
|
"@grpc/grpc-js": "^1.6.8",
|
||||||
"@grpc/proto-loader": "^0.7.0",
|
"@grpc/proto-loader": "^0.7.0",
|
||||||
"esm": "^3.2.20",
|
|
||||||
"express": "^4.17.1",
|
|
||||||
"geoip-lite": "^1.4.8",
|
"geoip-lite": "^1.4.8",
|
||||||
"graphql": "^16.5.0",
|
"graphql": "^16.5.0",
|
||||||
"graphql-request": "^4.3.0",
|
"graphql-request": "^4.3.0"
|
||||||
"npm": "^8.5.2",
|
|
||||||
"web3": "^1.7.5"
|
|
||||||
},
|
},
|
||||||
"devDependencies": {
|
"devDependencies": {
|
||||||
"nodemon": "^1.18.10"
|
"nodemon": "^1.18.10"
|
||||||
|
@ -1,33 +1,7 @@
|
|||||||
import express from 'express';
|
const { runTests } = require("./util/orchTester.js");
|
||||||
import { orchTesterRouter } from './routes/index';
|
|
||||||
import { CLIENT_PORT } from "./config";
|
|
||||||
// Env variable which determines which DB to connect to
|
|
||||||
const { NODE_ENV: mode } = process.env;
|
|
||||||
|
|
||||||
(async () => {
|
(async () => {
|
||||||
try {
|
// On first boot, kickstart the test loop
|
||||||
const app = express();
|
console.log("Starting main loop...");
|
||||||
app.disable('x-powered-by');
|
runTests();
|
||||||
app.use(express.urlencoded({ extended: true }));
|
|
||||||
app.use(express.json());
|
|
||||||
|
|
||||||
const apiRouter = express.Router();
|
|
||||||
app.use('/api', apiRouter);
|
|
||||||
apiRouter.use('/tester', orchTesterRouter);
|
|
||||||
// Error handler
|
|
||||||
app.use(function (err, req, res, next) {
|
|
||||||
res.locals.message = err.message;
|
|
||||||
// Also log it to the console
|
|
||||||
console.log(`${err.status || 500} - ${err.message} - ${req.originalUrl} - ${req.method} - ${req.ip}`);
|
|
||||||
// Render the error page
|
|
||||||
res.status(err.status || 500);
|
|
||||||
res.render('error');
|
|
||||||
});
|
|
||||||
|
|
||||||
app.listen(CLIENT_PORT, "0.0.0.0", function () {
|
|
||||||
console.log(`Listening on port ${CLIENT_PORT}`);
|
|
||||||
});
|
|
||||||
} catch (err) {
|
|
||||||
console.log(err);
|
|
||||||
}
|
|
||||||
})();
|
})();
|
@ -1,2 +0,0 @@
|
|||||||
require = require("esm")(module)
|
|
||||||
module.exports = require("./client.js")
|
|
@ -1,2 +0,0 @@
|
|||||||
import orchTesterRouter from './orchTester';
|
|
||||||
export { orchTesterRouter };
|
|
@ -1,22 +1,28 @@
|
|||||||
import express from "express";
|
|
||||||
const dns = require("dns");
|
const dns = require("dns");
|
||||||
var geoip = require("geoip-lite");
|
const geoip = require("geoip-lite");
|
||||||
|
const { request, gql } = require("graphql-request");
|
||||||
|
const https = require("https");
|
||||||
|
const http = require("http");
|
||||||
|
const grpc = require("@grpc/grpc-js");
|
||||||
|
const protoLoader = require("@grpc/proto-loader");
|
||||||
|
|
||||||
const orchTesterRouter = express.Router();
|
const {
|
||||||
import {
|
CONF_MASTER_PORT,
|
||||||
MASTER_DOMAIN,
|
CONF_MASTER_DOMAIN,
|
||||||
MASTER_PORT,
|
CONF_MASTER_PATH,
|
||||||
MASTER_PATH,
|
CONF_FRIENDLY_NAME,
|
||||||
FRIENDLY_NAME,
|
CONF_REGION,
|
||||||
PRESHARED_MASTER_KEY,
|
CONF_LP_REGIONS,
|
||||||
|
CONF_MAX_LATENCY,
|
||||||
|
CONF_ROUNDTIME,
|
||||||
|
CONF_CONCURRENCY,
|
||||||
CONF_SLEEPTIME,
|
CONF_SLEEPTIME,
|
||||||
CONT_SIG,
|
|
||||||
CONF_ORCHINFO_TIMEOUT,
|
CONF_ORCHINFO_TIMEOUT,
|
||||||
CONF_BROADCASTER,
|
CONF_BROADCASTER,
|
||||||
CONF_DNS_TIMEOUT,
|
CONF_DNS_TIMEOUT,
|
||||||
} from "../config";
|
CONF_PRESHARED_MASTER_KEY,
|
||||||
|
CONF_SIGNATURE,
|
||||||
process.env.NODE_TLS_REJECT_UNAUTHORIZED = "0";
|
} = require("../config.js");
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
|
||||||
@ -25,11 +31,6 @@ imported modules
|
|||||||
|
|
||||||
*/
|
*/
|
||||||
|
|
||||||
import { request, gql } from "graphql-request";
|
|
||||||
const https = require("https");
|
|
||||||
const http = require("http");
|
|
||||||
var grpc = require("@grpc/grpc-js");
|
|
||||||
var protoLoader = require("@grpc/proto-loader");
|
|
||||||
var packageDefinition = protoLoader.loadSync("src/proto/livepeer.proto", {
|
var packageDefinition = protoLoader.loadSync("src/proto/livepeer.proto", {
|
||||||
keepCase: true,
|
keepCase: true,
|
||||||
longs: String,
|
longs: String,
|
||||||
@ -41,6 +42,23 @@ var livepeerProto = grpc.loadPackageDefinition(packageDefinition).net;
|
|||||||
const ssl_creds = grpc.credentials.createSsl(null, null, null, {
|
const ssl_creds = grpc.credentials.createSsl(null, null, null, {
|
||||||
checkServerIdentity: () => undefined,
|
checkServerIdentity: () => undefined,
|
||||||
});
|
});
|
||||||
|
// Since go-livepeer uses self-signed certificates or something
|
||||||
|
process.env.NODE_TLS_REJECT_UNAUTHORIZED = "0";
|
||||||
|
|
||||||
|
/*
|
||||||
|
|
||||||
|
Global variables
|
||||||
|
|
||||||
|
*/
|
||||||
|
|
||||||
|
var activeOrchestrators = []; //< List of all active orchs pulled from the graph
|
||||||
|
var orchestratorsLastUpdated = 0; //< Used to refresh above list periodically
|
||||||
|
var lastRoundStart = 0;
|
||||||
|
let orchDNS = {}; //< Caches DNS and GEO lookups
|
||||||
|
let currentPool = []; //< Current working set of Orchestrators to test
|
||||||
|
let cycle = 0; //< Rounds of testing the script has done
|
||||||
|
let batchResults = {}; //< Current cache of results to batch upload
|
||||||
|
let currentPending = 0; //< Current outstanding requests
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
|
||||||
@ -68,19 +86,213 @@ function shuffle(array) {
|
|||||||
return array;
|
return array;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function hexToBytes(hex) {
|
||||||
|
for (var bytes = [], c = 0; c < hex.length; c += 2)
|
||||||
|
bytes.push(parseInt(hex.substr(c, 2), 16));
|
||||||
|
return bytes;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
|
||||||
|
Doing grpc calls to an orchestrator and uploading to the hodler
|
||||||
|
|
||||||
|
*/
|
||||||
|
|
||||||
|
const batchPostStats = async function () {
|
||||||
|
var postData = JSON.stringify({
|
||||||
|
batchResults: batchResults,
|
||||||
|
tag: CONF_FRIENDLY_NAME,
|
||||||
|
key: CONF_PRESHARED_MASTER_KEY,
|
||||||
|
region: CONF_REGION,
|
||||||
|
livepeer_regions: CONF_LP_REGIONS,
|
||||||
|
});
|
||||||
|
var options = {
|
||||||
|
hostname: CONF_MASTER_DOMAIN,
|
||||||
|
port: CONF_MASTER_PORT,
|
||||||
|
path: CONF_MASTER_PATH,
|
||||||
|
method: "POST",
|
||||||
|
headers: {
|
||||||
|
"Content-Type": "application/json",
|
||||||
|
"Content-Length": postData.length,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
console.log("Uploading " + postData.length + " B of stats");
|
||||||
|
var req;
|
||||||
|
if (CONF_MASTER_DOMAIN == "127.0.0.1" || CONF_MASTER_DOMAIN == "localhost") {
|
||||||
|
req = http.request(options, (res) => {
|
||||||
|
res.on("data", (d) => {
|
||||||
|
process.stdout.write(
|
||||||
|
"Received response " + d + " from " + CONF_MASTER_DOMAIN
|
||||||
|
);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
req = https.request(options, (res) => {
|
||||||
|
res.on("data", (d) => {
|
||||||
|
process.stdout.write(
|
||||||
|
"Received response " + d + " from " + CONF_MASTER_DOMAIN
|
||||||
|
);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
req.on("error", (e) => {
|
||||||
|
console.error("err", e);
|
||||||
|
});
|
||||||
|
req.write(postData);
|
||||||
|
req.end();
|
||||||
|
};
|
||||||
|
|
||||||
|
const discoverOrchestrator = async function (target) {
|
||||||
|
if (!target) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
// Try to prevent the connection from being reused
|
||||||
|
var client = new livepeerProto.Orchestrator(target, ssl_creds, {
|
||||||
|
GRPC_ARG_DEFAULT_AUTHORITY: Math.random().toString(36).substr(2, 5),
|
||||||
|
});
|
||||||
|
var receivedResults = false;
|
||||||
|
var orchestratorInfo = {};
|
||||||
|
const start = new Date().getTime();
|
||||||
|
var elapsed = 0;
|
||||||
|
await client.GetOrchestrator(
|
||||||
|
{
|
||||||
|
address: hexToBytes(CONF_BROADCASTER),
|
||||||
|
sig: CONF_SIGNATURE,
|
||||||
|
},
|
||||||
|
function (err, res) {
|
||||||
|
if (err) {
|
||||||
|
console.log("Discovery error: ", err.details);
|
||||||
|
orchestratorInfo.err = err.details;
|
||||||
|
} else {
|
||||||
|
orchestratorInfo = res;
|
||||||
|
elapsed = new Date().getTime() - start;
|
||||||
|
}
|
||||||
|
receivedResults = true;
|
||||||
|
}
|
||||||
|
);
|
||||||
|
// Wait for a max of 4 seconds for a callback from the GRPC call
|
||||||
|
while (!receivedResults && new Date().getTime() - start < CONF_MAX_LATENCY) {
|
||||||
|
await sleep(20);
|
||||||
|
}
|
||||||
|
grpc.closeClient(client);
|
||||||
|
if (!orchestratorInfo) {
|
||||||
|
return {
|
||||||
|
discoveryResults: {
|
||||||
|
transcoder: null,
|
||||||
|
price_info: null,
|
||||||
|
latency: 0,
|
||||||
|
err: "Took too long to respond. Aborted test...",
|
||||||
|
},
|
||||||
|
};
|
||||||
|
}
|
||||||
|
return {
|
||||||
|
discoveryResults: {
|
||||||
|
transcoder: orchestratorInfo.transcoder,
|
||||||
|
price_info: orchestratorInfo.price_info,
|
||||||
|
latency: elapsed,
|
||||||
|
err: orchestratorInfo.err,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
};
|
||||||
|
|
||||||
|
// Resolve hostname to IP
|
||||||
|
async function getIP(hostname) {
|
||||||
|
let obj = await dns.promises.lookup(hostname).catch((error) => {
|
||||||
|
console.error(error);
|
||||||
|
});
|
||||||
|
if (obj) {
|
||||||
|
return obj.address;
|
||||||
|
} else {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
const testOrchestrator = async function (id, target) {
|
||||||
|
if (!id.length || !target.length) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
const origTarget = new URL(target);
|
||||||
|
target = target.replace(/^https?:\/\//, "");
|
||||||
|
console.log("Testing orchestrator " + target);
|
||||||
|
// Resolve DNS and GEO
|
||||||
|
const now = new Date().getTime();
|
||||||
|
if (!orchDNS[id] || now - orchDNS[id].lastTime > CONF_DNS_TIMEOUT) {
|
||||||
|
const resolved = await getIP(origTarget.hostname);
|
||||||
|
const geo = geoip.lookup(resolved);
|
||||||
|
let geoObj = null;
|
||||||
|
if (geo) {
|
||||||
|
geoObj = {
|
||||||
|
// country: geo.country, //< commented out these fields
|
||||||
|
// region: geo.region, // since they're causing issues with JSON.stringify()
|
||||||
|
// city: geo.city, // prob due to special characters
|
||||||
|
latitude: geo.ll[0],
|
||||||
|
longitude: geo.ll[1],
|
||||||
|
};
|
||||||
|
}
|
||||||
|
orchDNS[id] = {
|
||||||
|
originalTarget: origTarget.origin,
|
||||||
|
resolvedTarget: resolved,
|
||||||
|
geoLookup: geoObj,
|
||||||
|
geoFrom: CONF_FRIENDLY_NAME,
|
||||||
|
lastTime: now,
|
||||||
|
};
|
||||||
|
console.log("Updated DNS and GeoIP data for " + id);
|
||||||
|
}
|
||||||
|
// Test orch
|
||||||
|
const { discoveryResults } = await discoverOrchestrator(target);
|
||||||
|
if (
|
||||||
|
discoveryResults &&
|
||||||
|
discoveryResults.err == "insufficient sender reserve"
|
||||||
|
) {
|
||||||
|
console.log("Ignoring " + id + " due to insufficient sender reserve");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
// Cache results
|
||||||
|
batchResults[id] = {
|
||||||
|
name: id,
|
||||||
|
discovery: discoveryResults,
|
||||||
|
resolv: orchDNS[id],
|
||||||
|
};
|
||||||
|
if (discoveryResults.err) {
|
||||||
|
batchResults[id].discovery.latency = 0;
|
||||||
|
}
|
||||||
|
currentPending--;
|
||||||
|
};
|
||||||
|
|
||||||
|
/*
|
||||||
|
|
||||||
|
Batch test logic
|
||||||
|
|
||||||
|
*/
|
||||||
|
|
||||||
|
const batchTestOrchs = async function () {
|
||||||
|
// Clear buff
|
||||||
|
batchResults = {};
|
||||||
|
currentPending = 0;
|
||||||
|
// Keep going until we've got no more todo or pending
|
||||||
|
while (currentPool.length || currentPending) {
|
||||||
|
// Concurrent requests
|
||||||
|
while (currentPending < CONF_CONCURRENCY && currentPool.length) {
|
||||||
|
let currentOrch = currentPool.splice(0, 1)[0];
|
||||||
|
if (!currentOrch.id || !currentOrch.target) {
|
||||||
|
console.log("Skipping Orchestrator with malformed data: ", currentOrch);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
currentPending++;
|
||||||
|
testOrchestrator(currentOrch.id, currentOrch.target);
|
||||||
|
}
|
||||||
|
await sleep(50);
|
||||||
|
}
|
||||||
|
batchPostStats();
|
||||||
|
};
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
|
||||||
Refreshing active orchestrators
|
Refreshing active orchestrators
|
||||||
Pulls this data from the Livepeer subgraph (https://api.thegraph.com/subgraphs/name/livepeer/arbitrum-one/graphql)
|
Pulls this data from the Livepeer subgraph (https://api.thegraph.com/subgraphs/name/livepeer/arbitrum-one/graphql)
|
||||||
We might want to switch to reading directly from the blockchain
|
|
||||||
but this would require constant watching for uri updates which is a pain to implement
|
|
||||||
|
|
||||||
*/
|
*/
|
||||||
|
|
||||||
var activeOrchestrators = [];
|
|
||||||
let lastUpdated = 0;
|
|
||||||
let orchDNS = {};
|
|
||||||
|
|
||||||
/// Does a GQL query to the subgraph for orchestrator data
|
/// Does a GQL query to the subgraph for orchestrator data
|
||||||
const getOrchestrators = async function () {
|
const getOrchestrators = async function () {
|
||||||
console.log("Getting orchestrator data from the subgraph...");
|
console.log("Getting orchestrator data from the subgraph...");
|
||||||
@ -116,190 +328,16 @@ const getOrchestrators = async function () {
|
|||||||
const refreshOrchCache = async function () {
|
const refreshOrchCache = async function () {
|
||||||
const now = new Date().getTime();
|
const now = new Date().getTime();
|
||||||
// Update cmc once their data has expired
|
// Update cmc once their data has expired
|
||||||
if (now - lastUpdated > CONF_ORCHINFO_TIMEOUT) {
|
if (now - orchestratorsLastUpdated > CONF_ORCHINFO_TIMEOUT) {
|
||||||
const data = await getOrchestrators();
|
const data = await getOrchestrators();
|
||||||
if (data) {
|
if (data) {
|
||||||
activeOrchestrators = data;
|
activeOrchestrators = data;
|
||||||
lastUpdated = now;
|
orchestratorsLastUpdated = now;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
/*
|
// Creates a new working set of orchs to test
|
||||||
|
|
||||||
Doing grpc calls to an orchestrator
|
|
||||||
|
|
||||||
*/
|
|
||||||
|
|
||||||
let currentPool = [];
|
|
||||||
|
|
||||||
const postStatistics = async function (
|
|
||||||
id,
|
|
||||||
discoveryResults,
|
|
||||||
lookupResults,
|
|
||||||
responseTime
|
|
||||||
) {
|
|
||||||
console.log("Posting stats for " + id + " (ping " + responseTime + " ms)");
|
|
||||||
|
|
||||||
// TODO look at response and log error?
|
|
||||||
var postData = JSON.stringify({
|
|
||||||
id,
|
|
||||||
discoveryResults,
|
|
||||||
responseTime,
|
|
||||||
lookupResults,
|
|
||||||
tag: FRIENDLY_NAME,
|
|
||||||
key: PRESHARED_MASTER_KEY,
|
|
||||||
});
|
|
||||||
var options = {
|
|
||||||
hostname: MASTER_DOMAIN,
|
|
||||||
port: MASTER_PORT,
|
|
||||||
path: MASTER_PATH,
|
|
||||||
method: "POST",
|
|
||||||
headers: {
|
|
||||||
"Content-Type": "application/json",
|
|
||||||
"Content-Length": postData.length,
|
|
||||||
},
|
|
||||||
};
|
|
||||||
var req;
|
|
||||||
if (MASTER_DOMAIN == "127.0.0.1" || MASTER_DOMAIN == "localhost") {
|
|
||||||
req = http.request(options, (res) => {
|
|
||||||
// console.log('statusCode:', res.statusCode);
|
|
||||||
// console.log('headers:', res.headers);
|
|
||||||
|
|
||||||
res.on("data", (d) => {
|
|
||||||
process.stdout.write(
|
|
||||||
"Received response " + d + " from " + MASTER_DOMAIN
|
|
||||||
);
|
|
||||||
});
|
|
||||||
});
|
|
||||||
} else {
|
|
||||||
req = https.request(options, (res) => {
|
|
||||||
// console.log('statusCode:', res.statusCode);
|
|
||||||
// console.log('headers:', res.headers);
|
|
||||||
|
|
||||||
res.on("data", (d) => {
|
|
||||||
process.stdout.write(
|
|
||||||
"Received response " + d + " from " + MASTER_DOMAIN
|
|
||||||
);
|
|
||||||
});
|
|
||||||
});
|
|
||||||
}
|
|
||||||
req.on("error", (e) => {
|
|
||||||
console.error("err", e);
|
|
||||||
});
|
|
||||||
req.write(postData);
|
|
||||||
req.end();
|
|
||||||
};
|
|
||||||
|
|
||||||
function hexToBytes(hex) {
|
|
||||||
for (var bytes = [], c = 0; c < hex.length; c += 2)
|
|
||||||
bytes.push(parseInt(hex.substr(c, 2), 16));
|
|
||||||
return bytes;
|
|
||||||
}
|
|
||||||
|
|
||||||
const discoverOrchestrator = async function (target) {
|
|
||||||
if (!target) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
var client = new livepeerProto.Orchestrator(target, ssl_creds, {
|
|
||||||
GRPC_ARG_DEFAULT_AUTHORITY: Math.random().toString(36).substr(2, 5),
|
|
||||||
});
|
|
||||||
var receivedResults = false;
|
|
||||||
var orchestratorInfo;
|
|
||||||
const start = new Date().getTime();
|
|
||||||
var elapsed = null;
|
|
||||||
await client.GetOrchestrator(
|
|
||||||
{
|
|
||||||
address: hexToBytes(CONF_BROADCASTER),
|
|
||||||
sig: CONT_SIG,
|
|
||||||
},
|
|
||||||
function (err, res) {
|
|
||||||
if (err) {
|
|
||||||
console.log("Discovery error: ", err.details);
|
|
||||||
orchestratorInfo = err.details;
|
|
||||||
elapsed = null;
|
|
||||||
} else {
|
|
||||||
orchestratorInfo = res;
|
|
||||||
elapsed = new Date().getTime() - start;
|
|
||||||
}
|
|
||||||
receivedResults = true;
|
|
||||||
}
|
|
||||||
);
|
|
||||||
while (!receivedResults && new Date().getTime() - start < 4000) {
|
|
||||||
await sleep(20);
|
|
||||||
}
|
|
||||||
grpc.closeClient(client);
|
|
||||||
return { discoveryResults: orchestratorInfo, elapsed };
|
|
||||||
};
|
|
||||||
|
|
||||||
const pingOrchestrator = async function (target) {
|
|
||||||
if (!target) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
var client = new livepeerProto.Orchestrator(target, ssl_creds);
|
|
||||||
var receivedResults = false;
|
|
||||||
var pingPong;
|
|
||||||
const start = new Date().getTime();
|
|
||||||
var elapsed = null;
|
|
||||||
await client.Ping({ value: "koekjes" }, function (err, res) {
|
|
||||||
if (err) {
|
|
||||||
console.log("Ping err: ", err.details);
|
|
||||||
pingPong = err.details;
|
|
||||||
elapsed = null;
|
|
||||||
} else {
|
|
||||||
pingPong = res;
|
|
||||||
elapsed = new Date().getTime() - start;
|
|
||||||
}
|
|
||||||
receivedResults = true;
|
|
||||||
});
|
|
||||||
while (!receivedResults && new Date().getTime() - start < 4000) {
|
|
||||||
await sleep(20);
|
|
||||||
}
|
|
||||||
return { pingResults: pingPong, elapsed };
|
|
||||||
};
|
|
||||||
|
|
||||||
async function getIP(hostname) {
|
|
||||||
let obj = await dns.promises.lookup(hostname).catch((error) => {
|
|
||||||
console.error(error);
|
|
||||||
});
|
|
||||||
if (obj) {
|
|
||||||
return obj.address;
|
|
||||||
} else {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
const testOrchestrator = async function (id, target) {
|
|
||||||
if (!id.length || !target.length) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
const origTarget = new URL(target);
|
|
||||||
target = target.replace(/^https?:\/\//, "");
|
|
||||||
console.log("Target is " + target);
|
|
||||||
// Resolve DNS
|
|
||||||
const now = new Date().getTime();
|
|
||||||
if (!orchDNS[id] || now - orchDNS[id].lastTime > CONF_DNS_TIMEOUT) {
|
|
||||||
const resolved = await getIP(origTarget.hostname);
|
|
||||||
orchDNS[id] = {
|
|
||||||
originalTarget: origTarget.origin,
|
|
||||||
resolvedTarget: resolved,
|
|
||||||
geoLookup: geoip.lookup(resolved),
|
|
||||||
geoFrom: FRIENDLY_NAME,
|
|
||||||
lastTime: now,
|
|
||||||
};
|
|
||||||
console.log("Updated DNS and GeoIP data: ", orchDNS[id]);
|
|
||||||
}
|
|
||||||
// Test orch
|
|
||||||
const { discoveryResults, elapsed } = await discoverOrchestrator(target);
|
|
||||||
if (discoveryResults && discoveryResults == "insufficient sender reserve") {
|
|
||||||
console.log(
|
|
||||||
"Ignoring " + id + " for stats due to insufficient sender reserve"
|
|
||||||
);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
await postStatistics(id, discoveryResults, orchDNS[id], elapsed);
|
|
||||||
};
|
|
||||||
|
|
||||||
const refreshPool = function () {
|
const refreshPool = function () {
|
||||||
currentPool = [];
|
currentPool = [];
|
||||||
for (const thisObj of activeOrchestrators) {
|
for (const thisObj of activeOrchestrators) {
|
||||||
@ -308,51 +346,36 @@ const refreshPool = function () {
|
|||||||
shuffle(currentPool);
|
shuffle(currentPool);
|
||||||
};
|
};
|
||||||
|
|
||||||
const pingNextOrch = async function () {
|
|
||||||
if (!currentPool.length) {
|
|
||||||
refreshPool();
|
|
||||||
}
|
|
||||||
let currentOrch = currentPool.splice(0, 1)[0];
|
|
||||||
if (!currentOrch.id || !currentOrch.target) {
|
|
||||||
console.log("Skipping Orchestrator with malformed data: ", currentOrch);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
await testOrchestrator(currentOrch.id, currentOrch.target);
|
|
||||||
};
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
|
||||||
Main Loop
|
Main Loop
|
||||||
Maybe we shouldn't use nodejs as a client...
|
|
||||||
|
|
||||||
*/
|
*/
|
||||||
|
|
||||||
let cycle = 0;
|
const runTests = async function () {
|
||||||
let isSyncing = false;
|
|
||||||
// Does the actual looping over last parsed block -> latest block in chain
|
|
||||||
const handleSync = async function () {
|
|
||||||
try {
|
try {
|
||||||
|
const now = new Date().getTime();
|
||||||
|
if (!lastRoundStart || now - lastRoundStart > CONF_ROUNDTIME) {
|
||||||
cycle++;
|
cycle++;
|
||||||
console.log("Starting new cycle #" + cycle);
|
console.log("Starting new cycle #" + cycle);
|
||||||
isSyncing = true;
|
// If stale, retrieve new set of active orchestrators
|
||||||
await refreshOrchCache();
|
await refreshOrchCache();
|
||||||
await pingNextOrch();
|
// Create a new shuffled working set of orchs to test
|
||||||
isSyncing = false;
|
refreshPool();
|
||||||
|
// Test all orchs in working set
|
||||||
|
await batchTestOrchs();
|
||||||
|
lastRoundStart = now;
|
||||||
|
}
|
||||||
setTimeout(() => {
|
setTimeout(() => {
|
||||||
handleSync();
|
runTests();
|
||||||
}, CONF_SLEEPTIME);
|
}, CONF_SLEEPTIME);
|
||||||
return;
|
return;
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
console.log(err);
|
console.log(err);
|
||||||
isSyncing = false;
|
|
||||||
setTimeout(() => {
|
setTimeout(() => {
|
||||||
handleSync();
|
runTests();
|
||||||
}, CONF_SLEEPTIME);
|
}, CONF_SLEEPTIME);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
if (!isSyncing) {
|
|
||||||
console.log("Starting main loop");
|
|
||||||
handleSync();
|
|
||||||
}
|
|
||||||
|
|
||||||
export default orchTesterRouter;
|
exports.runTests = runTests;
|
@ -2,9 +2,14 @@
|
|||||||
### Config variables
|
### Config variables
|
||||||
```
|
```
|
||||||
{
|
{
|
||||||
MASTER_PORT: what port the master is listening on
|
CONF_API_L1_HTTP: HTTP Uri to L1 RPC provider
|
||||||
PRESHARED_MASTER_KEY: clients must provide this value when uploading stats,
|
CONF_API_L1_KEY: and it's api key,
|
||||||
CONF_ENS_TIMEOUT: timeout in ms after which it will pull new ENS domain names from nFrame
|
CONF_TIMEOUT_ENS_DOMAIN: timeout before refreshing an orchs ENS domain name
|
||||||
|
CONF_KEY_EXPIRY: timeout before clearing certain orch instance data
|
||||||
|
CONF_MASTER_PORT: port to accept uploads on from probers
|
||||||
|
CONF_PRESHARED_MASTER_KEY: password for uploading
|
||||||
|
CONF_SCORE_TIMEOUT: timeout for checking if some new test stream results came in
|
||||||
|
CONF_SLEEPTIME: Time to sleep in the main loop
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
### Run production
|
### Run production
|
||||||
@ -16,13 +21,18 @@ Note: this folder has to be placed in `/orchTest/master`
|
|||||||
example `config.js`:
|
example `config.js`:
|
||||||
```
|
```
|
||||||
export const {
|
export const {
|
||||||
MASTER_PORT = 42069,
|
CONF_API_L1_HTTP = "https://eth-mainnet.alchemyapi.io/v2/",
|
||||||
PRESHARED_MASTER_KEY = "koekjes",
|
CONF_API_L1_KEY = "koekjes",
|
||||||
CONF_ENS_TIMEOUT = 360000
|
CONF_TIMEOUT_ENS_DOMAIN = 7200000, //< 2 hours
|
||||||
|
CONF_KEY_EXPIRY = 3600000, //< 1 hour
|
||||||
|
CONF_MASTER_PORT = 42069,
|
||||||
|
CONF_PRESHARED_MASTER_KEY = "koekjes",
|
||||||
|
CONF_SCORE_TIMEOUT = 300000, //< 5 minutes
|
||||||
|
CONF_SLEEPTIME = 2000, //< 2 seconds
|
||||||
} = process.env;
|
} = process.env;
|
||||||
```
|
```
|
||||||
|
|
||||||
pm2 start ecosystem.config.js
|
pm2 start ecosystem.config.cjs
|
||||||
|
|
||||||
### Run development
|
### Run development
|
||||||
|
|
||||||
@ -32,11 +42,17 @@ export const {
|
|||||||
example `config.js`:
|
example `config.js`:
|
||||||
```
|
```
|
||||||
export const {
|
export const {
|
||||||
MASTER_PORT = 42069,
|
CONF_API_L1_HTTP = "https://eth-mainnet.alchemyapi.io/v2/",
|
||||||
PRESHARED_MASTER_KEY = "koekjes",
|
CONF_API_L1_KEY = "koekjes",
|
||||||
CONF_ENS_TIMEOUT = 360000
|
CONF_TIMEOUT_ENS_DOMAIN = 7200000, //< 2 hours
|
||||||
|
CONF_KEY_EXPIRY = 3600000, //< 1 hour
|
||||||
|
CONF_MASTER_PORT = 42069,
|
||||||
|
CONF_PRESHARED_MASTER_KEY = "koekjes",
|
||||||
|
CONF_SCORE_TIMEOUT = 60000, //< 1 minute
|
||||||
|
CONF_SLEEPTIME = 2000, //< 2 seconds
|
||||||
} = process.env;
|
} = process.env;
|
||||||
```
|
```
|
||||||
|
|
||||||
npm run dev
|
npm run dev
|
||||||
|
|
||||||
|
|
||||||
|
@ -1,8 +1,8 @@
|
|||||||
module.exports = {
|
module.exports = {
|
||||||
apps: [
|
apps: [
|
||||||
{
|
{
|
||||||
name: "hodler",
|
name: "master-hodler",
|
||||||
script: "./src/hodler.js",
|
script: "./src/master.js",
|
||||||
cwd: "/orchTest/master",
|
cwd: "/orchTest/master",
|
||||||
env_production: {
|
env_production: {
|
||||||
NODE_ENV: "production"
|
NODE_ENV: "production"
|
||||||
|
@ -1,22 +1,23 @@
|
|||||||
{
|
{
|
||||||
"name": "hodler",
|
"name": "master-hodler",
|
||||||
"version": "0.0.1",
|
"version": "0.1.0",
|
||||||
"description": "",
|
"description": "",
|
||||||
"main": "./src/hodler.js",
|
"main": "./src/master.js",
|
||||||
"module": "./src/hodler.js",
|
"type": "commonjs",
|
||||||
|
"module": "./src/master.js",
|
||||||
"scripts": {
|
"scripts": {
|
||||||
"prod": "NODE_ENV=production pm2 start ecosystem.config.js",
|
"prod": "NODE_ENV=production pm2 start ecosystem.config.js",
|
||||||
"start": "NODE_ENV=production node ./src/hodler.js",
|
"start": "NODE_ENV=production node ./src/master.js",
|
||||||
"dev": "NODE_ENV=development nodemon ./src/hodler.js",
|
"dev": "NODE_ENV=development nodemon ./src/master.js",
|
||||||
"local": "NODE_ENV=local nodemon ./src/hodler.js"
|
"local": "NODE_ENV=local nodemon ./src/master.js"
|
||||||
},
|
},
|
||||||
"keywords": [],
|
"keywords": [],
|
||||||
"author": "Marco van Dijk",
|
"author": "Marco van Dijk",
|
||||||
"license": "WTFPL",
|
"license": "WTFPL",
|
||||||
"dependencies": {
|
"dependencies": {
|
||||||
"esm": "^3.2.20",
|
"ethers": "^6.8.1",
|
||||||
"express": "^4.17.1",
|
"express": "^4.17.1",
|
||||||
"node-persist": "^3.1.0",
|
"node-persist": "^3.1.3",
|
||||||
"npm": "^8.5.2",
|
"npm": "^8.5.2",
|
||||||
"prom-client": "^14.0.1"
|
"prom-client": "^14.0.1"
|
||||||
},
|
},
|
||||||
|
@ -1,2 +0,0 @@
|
|||||||
require = require("esm")(module)
|
|
||||||
module.exports = require("./server.js")
|
|
34
master/src/master.js
Normal file
34
master/src/master.js
Normal file
@ -0,0 +1,34 @@
|
|||||||
|
const express = require("express");
|
||||||
|
const { masterRouter } = require("./routes/index.js");
|
||||||
|
const { CONF_MASTER_PORT } = require("./config.js");
|
||||||
|
|
||||||
|
(async () => {
|
||||||
|
try {
|
||||||
|
const app = express();
|
||||||
|
app.disable("x-powered-by");
|
||||||
|
app.use(express.urlencoded({ extended: true, limit: "1000kb" }));
|
||||||
|
app.use(express.json({ limit: "1000kb" }));
|
||||||
|
const apiRouter = express.Router();
|
||||||
|
app.use("/api", apiRouter);
|
||||||
|
apiRouter.use("/master", masterRouter);
|
||||||
|
// Error handler
|
||||||
|
app.use(function (err, req, res, next) {
|
||||||
|
res.locals.message = err.message;
|
||||||
|
// Also log it to the console
|
||||||
|
console.log(
|
||||||
|
`${err.status || 500} - ${err.message} - ${req.originalUrl} - ${
|
||||||
|
req.method
|
||||||
|
} - ${req.ip}`
|
||||||
|
);
|
||||||
|
// Render the error page
|
||||||
|
res.status(err.status || 500);
|
||||||
|
res.render("error");
|
||||||
|
});
|
||||||
|
|
||||||
|
app.listen(CONF_MASTER_PORT, "0.0.0.0", function () {
|
||||||
|
console.log(`Listening on port ${CONF_MASTER_PORT}`);
|
||||||
|
});
|
||||||
|
} catch (err) {
|
||||||
|
console.log(err);
|
||||||
|
}
|
||||||
|
})();
|
532
master/src/routes/hodler.js
Normal file
532
master/src/routes/hodler.js
Normal file
@ -0,0 +1,532 @@
|
|||||||
|
const express = require("express");
|
||||||
|
const client = require("prom-client");
|
||||||
|
const { ethers } = require("ethers");
|
||||||
|
const storage = require("node-persist");
|
||||||
|
const {
|
||||||
|
CONF_API_L1_HTTP,
|
||||||
|
CONF_API_L1_KEY,
|
||||||
|
CONF_PRESHARED_MASTER_KEY,
|
||||||
|
CONF_TIMEOUT_ENS_DOMAIN,
|
||||||
|
CONF_KEY_EXPIRY,
|
||||||
|
CONF_SCORE_TIMEOUT,
|
||||||
|
CONF_SLEEPTIME,
|
||||||
|
} = require("../config.js");
|
||||||
|
|
||||||
|
/*
|
||||||
|
|
||||||
|
Init
|
||||||
|
|
||||||
|
*/
|
||||||
|
|
||||||
|
const l1provider = new ethers.JsonRpcProvider(
|
||||||
|
CONF_API_L1_HTTP + CONF_API_L1_KEY
|
||||||
|
);
|
||||||
|
const masterRouter = express.Router();
|
||||||
|
const register = new client.Registry();
|
||||||
|
|
||||||
|
// Regional stats - prober instance specific
|
||||||
|
const promLatestLatency = new client.Gauge({
|
||||||
|
name: "orch_latest_latency",
|
||||||
|
help: "Latest latency known for a given Orchestrator",
|
||||||
|
labelNames: ["region", "orchestrator", "latitude", "longitude"],
|
||||||
|
});
|
||||||
|
register.registerMetric(promLatestLatency);
|
||||||
|
const promLatency = new client.Summary({
|
||||||
|
name: "orch_latency",
|
||||||
|
help: "Summary of latency stats",
|
||||||
|
percentiles: [0.01, 0.1, 0.9, 0.99],
|
||||||
|
labelNames: ["region"],
|
||||||
|
});
|
||||||
|
register.registerMetric(promLatency);
|
||||||
|
const promAverageLatency = new client.Gauge({
|
||||||
|
name: "orch_average_latency",
|
||||||
|
help: "Average latency for a given Orchestrator",
|
||||||
|
labelNames: ["region", "orchestrator", "latitude", "longitude"],
|
||||||
|
});
|
||||||
|
register.registerMetric(promAverageLatency);
|
||||||
|
|
||||||
|
// Regional stats - Livepeer test stream specific
|
||||||
|
const promLatestRTR = new client.Gauge({
|
||||||
|
name: "orch_latest_rtr",
|
||||||
|
help: "Latest realtime ratio as specified by Livepeer inc's regional performance leaderboards",
|
||||||
|
labelNames: ["livepeer_region", "orchestrator", "latitude", "longitude"],
|
||||||
|
});
|
||||||
|
register.registerMetric(promLatestRTR);
|
||||||
|
const promLatestSuccessRate = new client.Gauge({
|
||||||
|
name: "orch_latest_success_rate",
|
||||||
|
help: "Latest success rate as specified by Livepeer inc's regional performance leaderboards",
|
||||||
|
labelNames: ["livepeer_region", "orchestrator", "latitude", "longitude"],
|
||||||
|
});
|
||||||
|
register.registerMetric(promLatestSuccessRate);
|
||||||
|
|
||||||
|
// Global stats - orchestrator instance specific
|
||||||
|
const promLatestPPP = new client.Gauge({
|
||||||
|
name: "orch_latest_ppp",
|
||||||
|
help: "Latest price per pixel known for a given Orchestrator",
|
||||||
|
labelNames: ["instance", "orchestrator", "latitude", "longitude"],
|
||||||
|
});
|
||||||
|
register.registerMetric(promLatestPPP);
|
||||||
|
const promAUptimeScore = new client.Gauge({
|
||||||
|
name: "orch_uptime_score",
|
||||||
|
help: "Uptime score for a given orchestrator",
|
||||||
|
labelNames: ["instance", "orchestrator", "latitude", "longitude"],
|
||||||
|
});
|
||||||
|
register.registerMetric(promAUptimeScore);
|
||||||
|
|
||||||
|
function sleep(ms) {
|
||||||
|
return new Promise((resolve) => {
|
||||||
|
setTimeout(resolve, ms);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
|
||||||
|
Globals
|
||||||
|
|
||||||
|
*/
|
||||||
|
|
||||||
|
let ensDomainCache = {};
|
||||||
|
let orchCache = {};
|
||||||
|
let lastLeaderboardCheck = 0;
|
||||||
|
let isSynced = false;
|
||||||
|
|
||||||
|
/*
|
||||||
|
|
||||||
|
ENS
|
||||||
|
|
||||||
|
*/
|
||||||
|
|
||||||
|
const getEnsDomain = async function (addr) {
|
||||||
|
try {
|
||||||
|
const now = new Date().getTime();
|
||||||
|
const cached = ensDomainCache[addr];
|
||||||
|
if (cached && now - cached.timestamp < CONF_TIMEOUT_ENS_DOMAIN) {
|
||||||
|
return cached.domain ? cached.domain : cached.address;
|
||||||
|
}
|
||||||
|
// Refresh cause not cached or stale
|
||||||
|
const address = ethers.getAddress(addr);
|
||||||
|
const ensDomain = await l1provider.lookupAddress(address);
|
||||||
|
let ensObj;
|
||||||
|
if (!ensDomain) {
|
||||||
|
let domain = null;
|
||||||
|
if (cached) {
|
||||||
|
domain = cached.domain;
|
||||||
|
}
|
||||||
|
ensObj = {
|
||||||
|
domain: domain,
|
||||||
|
address: addr,
|
||||||
|
timestamp: now,
|
||||||
|
};
|
||||||
|
} else {
|
||||||
|
ensObj = {
|
||||||
|
domain: ensDomain,
|
||||||
|
address: addr,
|
||||||
|
timestamp: now,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
console.log(
|
||||||
|
"Updated ENS domain " +
|
||||||
|
ensObj.domain +
|
||||||
|
" owned by " +
|
||||||
|
ensObj.address +
|
||||||
|
" @ " +
|
||||||
|
ensObj.timestamp
|
||||||
|
);
|
||||||
|
ensDomainCache[addr] = ensObj;
|
||||||
|
await storage.setItem("ensDomainCache", ensDomainCache);
|
||||||
|
return ensObj.domain ? ensObj.domain : ensObj.address;
|
||||||
|
} catch (err) {
|
||||||
|
console.log(err);
|
||||||
|
console.log("Error looking up ENS info, retrying...");
|
||||||
|
await sleep(200);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
/*
|
||||||
|
|
||||||
|
Update cache & prometheus
|
||||||
|
|
||||||
|
*/
|
||||||
|
|
||||||
|
const updatePrometheus = async function (tag, id, instance, orchInfo) {
|
||||||
|
const thisInstance = orchInfo.instances[instance];
|
||||||
|
const regionInfo = orchInfo.regionalStats[tag];
|
||||||
|
if (regionInfo.latestDiscoveryTime) {
|
||||||
|
promLatestLatency.set(
|
||||||
|
{
|
||||||
|
region: tag,
|
||||||
|
orchestrator: orchInfo.name,
|
||||||
|
latitude: thisInstance.latitude,
|
||||||
|
longitude: thisInstance.longitude,
|
||||||
|
},
|
||||||
|
regionInfo.latestDiscoveryTime
|
||||||
|
);
|
||||||
|
}
|
||||||
|
if (regionInfo.avgDiscoveryTime) {
|
||||||
|
promAverageLatency.set(
|
||||||
|
{
|
||||||
|
region: tag,
|
||||||
|
orchestrator: orchInfo.name,
|
||||||
|
latitude: thisInstance.latitude,
|
||||||
|
longitude: thisInstance.longitude,
|
||||||
|
},
|
||||||
|
regionInfo.avgDiscoveryTime
|
||||||
|
);
|
||||||
|
}
|
||||||
|
promAUptimeScore.set(
|
||||||
|
{
|
||||||
|
instance: instance,
|
||||||
|
orchestrator: orchInfo.name,
|
||||||
|
latitude: thisInstance.latitude,
|
||||||
|
longitude: thisInstance.longitude,
|
||||||
|
},
|
||||||
|
regionInfo.uptimePercentage
|
||||||
|
);
|
||||||
|
promLatestPPP.set(
|
||||||
|
{
|
||||||
|
instance: instance,
|
||||||
|
orchestrator: orchInfo.name,
|
||||||
|
latitude: thisInstance.latitude,
|
||||||
|
longitude: thisInstance.longitude,
|
||||||
|
},
|
||||||
|
thisInstance.price
|
||||||
|
);
|
||||||
|
promLatency.observe({ region: tag }, regionInfo.latestDiscoveryTime);
|
||||||
|
};
|
||||||
|
|
||||||
|
const onOrchUpdate = async function (id, obj, tag, region, livepeer_regions) {
|
||||||
|
const now = new Date().getTime();
|
||||||
|
// Overwrite name with ENS domain if set
|
||||||
|
let ensDomain = null;
|
||||||
|
while (!ensDomain) {
|
||||||
|
ensDomain = await getEnsDomain(id);
|
||||||
|
}
|
||||||
|
// Retrieve entry to update or init it
|
||||||
|
let newObj = orchCache[id.toLowerCase()];
|
||||||
|
if (!newObj) {
|
||||||
|
newObj = {
|
||||||
|
name: obj.name,
|
||||||
|
regionalStats: {},
|
||||||
|
instances: {},
|
||||||
|
leaderboardResults: { lastTime: now },
|
||||||
|
};
|
||||||
|
} else {
|
||||||
|
newObj.name = ensDomain;
|
||||||
|
}
|
||||||
|
// Find region entry or init it
|
||||||
|
let newRegion = newObj.regionalStats[tag];
|
||||||
|
if (!newRegion) {
|
||||||
|
newRegion = {
|
||||||
|
measurements: [],
|
||||||
|
avgDiscoveryTime: -1,
|
||||||
|
uptimePercentage: 1.0,
|
||||||
|
latestDiscoveryTime: -1,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
// Record measurement
|
||||||
|
let measurement = {
|
||||||
|
latency: obj.discovery.latency,
|
||||||
|
timestamp: now,
|
||||||
|
duration: 0,
|
||||||
|
};
|
||||||
|
if (newRegion.measurements.length) {
|
||||||
|
measurement.duration =
|
||||||
|
now - newRegion.measurements[newRegion.measurements.length - 1].timestamp;
|
||||||
|
}
|
||||||
|
newRegion.measurements.push(measurement);
|
||||||
|
if (newRegion.measurements.length > 60) {
|
||||||
|
newRegion.measurements = newRegion.measurements.slice(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Recalc average && uptime
|
||||||
|
let uptime = 0;
|
||||||
|
let downtime = 0;
|
||||||
|
let pingSum = 0;
|
||||||
|
let pingEntries = 0;
|
||||||
|
for (const measurement of newRegion.measurements) {
|
||||||
|
if (measurement.latency && measurement.latency > 0) {
|
||||||
|
if (measurement.duration) {
|
||||||
|
uptime += measurement.duration;
|
||||||
|
}
|
||||||
|
pingSum += measurement.latency;
|
||||||
|
pingEntries++;
|
||||||
|
} else {
|
||||||
|
if (measurement.duration) {
|
||||||
|
downtime += measurement.duration;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pingEntries > 0) {
|
||||||
|
newRegion.avgDiscoveryTime = pingSum / pingEntries;
|
||||||
|
} else {
|
||||||
|
newRegion.avgDiscoveryTime = measurement.latency;
|
||||||
|
}
|
||||||
|
newRegion.latestDiscoveryTime = measurement.latency;
|
||||||
|
|
||||||
|
if (
|
||||||
|
downtime ||
|
||||||
|
(!newRegion.avgDiscoveryTime && !newRegion.latestDiscoveryTime)
|
||||||
|
) {
|
||||||
|
if (!uptime) {
|
||||||
|
newRegion.uptimePercentage = 0.0;
|
||||||
|
} else {
|
||||||
|
newRegion.uptimePercentage = uptime / (uptime + downtime);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Find instance entry or init it
|
||||||
|
let newInstance = newObj.instances[obj.resolv.resolvedTarget];
|
||||||
|
if (!newInstance) {
|
||||||
|
newInstance = {
|
||||||
|
price: -1,
|
||||||
|
latitude: -1,
|
||||||
|
longitude: -1,
|
||||||
|
probedFrom: {},
|
||||||
|
regions: {},
|
||||||
|
livepeer_regions: {},
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
// Remove expired stuff
|
||||||
|
for (const [id, obj] of Object.entries(newInstance.probedFrom)) {
|
||||||
|
if (now - obj.lastTime > CONF_KEY_EXPIRY) {
|
||||||
|
newInstance.probedFrom[id] = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for (const [id, obj] of Object.entries(newInstance.regions)) {
|
||||||
|
if (now - obj.lastTime > CONF_KEY_EXPIRY) {
|
||||||
|
newInstance.regions[id] = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for (const [id, obj] of Object.entries(newInstance.livepeer_regions)) {
|
||||||
|
if (now - obj.lastTime > CONF_KEY_EXPIRY) {
|
||||||
|
newInstance.livepeer_regions[id] = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set last times for instance info
|
||||||
|
newInstance.probedFrom[tag] = {
|
||||||
|
lastTime: now,
|
||||||
|
};
|
||||||
|
newInstance.regions[region] = {
|
||||||
|
lastTime: now,
|
||||||
|
};
|
||||||
|
for (const region of livepeer_regions) {
|
||||||
|
newInstance.livepeer_regions[region] = {
|
||||||
|
lastTime: now,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set location and price info
|
||||||
|
if (obj.discovery.price_info) {
|
||||||
|
newInstance.price =
|
||||||
|
obj.discovery.price_info.pricePerUnit /
|
||||||
|
obj.discovery.price_info.pixelsPerUnit;
|
||||||
|
}
|
||||||
|
if (obj.resolv.geoLookup) {
|
||||||
|
newInstance.latitude = obj.resolv.geoLookup.latitude;
|
||||||
|
newInstance.longitude = obj.resolv.geoLookup.longitude;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update name
|
||||||
|
newObj.name = obj.name;
|
||||||
|
|
||||||
|
// Finished updating
|
||||||
|
newObj.instances[obj.resolv.resolvedTarget] = newInstance;
|
||||||
|
newObj.regionalStats[tag] = newRegion;
|
||||||
|
orchCache[id.toLowerCase()] = newObj;
|
||||||
|
await storage.setItem("orchCache", orchCache);
|
||||||
|
|
||||||
|
// Update prometheus stats
|
||||||
|
updatePrometheus(tag, id, obj.resolv.resolvedTarget, newObj);
|
||||||
|
console.log("Handled results for " + id + " from prober " + tag);
|
||||||
|
};
|
||||||
|
|
||||||
|
const updateCache = async function (
|
||||||
|
batchResults,
|
||||||
|
tag,
|
||||||
|
region,
|
||||||
|
livepeer_regions
|
||||||
|
) {
|
||||||
|
console.log("Parsing stats from " + tag + " (" + region + ")");
|
||||||
|
|
||||||
|
for (const [id, obj] of Object.entries(batchResults)) {
|
||||||
|
onOrchUpdate(id, obj, tag, region, livepeer_regions);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
/*
|
||||||
|
|
||||||
|
Public endpoints
|
||||||
|
|
||||||
|
*/
|
||||||
|
|
||||||
|
// Mutate state with new stats
|
||||||
|
masterRouter.post("/collectStats", async (req, res) => {
|
||||||
|
if (!isSynced) {
|
||||||
|
res.send("busy");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
const { batchResults, tag, key, region, livepeer_regions } = req.body;
|
||||||
|
if (!batchResults || !tag || !key || !region || !livepeer_regions) {
|
||||||
|
console.log("Received malformed data. Aborting stats update...");
|
||||||
|
console.log(batchResults, tag, key, region, livepeer_regions);
|
||||||
|
res.send(false);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (CONF_PRESHARED_MASTER_KEY != req.body.key) {
|
||||||
|
console.log("Unauthorized");
|
||||||
|
res.send(false);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
updateCache(batchResults, tag, region, livepeer_regions);
|
||||||
|
res.send(true);
|
||||||
|
} catch (err) {
|
||||||
|
console.log(err, req.body);
|
||||||
|
res.status(400).send(err);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// Retrieve prom data
|
||||||
|
masterRouter.get("/prometheus", async (req, res) => {
|
||||||
|
try {
|
||||||
|
res.set("Content-Type", register.contentType);
|
||||||
|
const metrics = await register.metrics();
|
||||||
|
res.end(metrics);
|
||||||
|
} catch (err) {
|
||||||
|
res.status(400).send(err);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// Retrieve cache as JSON
|
||||||
|
masterRouter.get("/json", async (req, res) => {
|
||||||
|
try {
|
||||||
|
res.set("Content-Type", "application/json");
|
||||||
|
res.end(JSON.stringify(orchCache));
|
||||||
|
} catch (err) {
|
||||||
|
res.status(400).send(err);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
/*
|
||||||
|
|
||||||
|
Main Loop
|
||||||
|
|
||||||
|
*/
|
||||||
|
|
||||||
|
const updateScore = async function (address) {
|
||||||
|
console.log("Checking for new scores for " + address);
|
||||||
|
const thisInstances = orchCache[address.toLowerCase()].instances;
|
||||||
|
const lastTime = orchCache[address.toLowerCase()].leaderboardResults.lastTime;
|
||||||
|
|
||||||
|
let url =
|
||||||
|
"https://leaderboard-serverless.vercel.app/api/raw_stats?orchestrator=" +
|
||||||
|
address;
|
||||||
|
|
||||||
|
const json = await fetch(url).then((res) => res.json());
|
||||||
|
let hasEdited = false;
|
||||||
|
for (const [region, results] of Object.entries(json)) {
|
||||||
|
for (const instance of results) {
|
||||||
|
if (instance.timestamp * 1000 > lastTime) {
|
||||||
|
const newSR = instance.success_rate;
|
||||||
|
const newRTR = instance.round_trip_time / instance.seg_duration;
|
||||||
|
let latitude = null;
|
||||||
|
let longitude = null;
|
||||||
|
for (const [resolvedTarget, instance] of Object.entries(thisInstances)) {
|
||||||
|
if (instance.livepeer_regions[region]) {
|
||||||
|
latitude = instance.latitude;
|
||||||
|
longitude = instance.longitude;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
console.log(
|
||||||
|
"Found new RTR=" +
|
||||||
|
newRTR +
|
||||||
|
" and new success rate of " +
|
||||||
|
newSR * 100 +
|
||||||
|
"%, livepeer region " +
|
||||||
|
instance.region
|
||||||
|
);
|
||||||
|
promLatestRTR.set(
|
||||||
|
{
|
||||||
|
livepeer_region: instance.region,
|
||||||
|
orchestrator: address,
|
||||||
|
latitude: latitude,
|
||||||
|
longitude: longitude,
|
||||||
|
},
|
||||||
|
newRTR
|
||||||
|
);
|
||||||
|
promLatestSuccessRate.set(
|
||||||
|
{
|
||||||
|
livepeer_region: instance.region,
|
||||||
|
orchestrator: address,
|
||||||
|
latitude: latitude,
|
||||||
|
longitude: longitude,
|
||||||
|
},
|
||||||
|
newSR
|
||||||
|
);
|
||||||
|
hasEdited = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (hasEdited) {
|
||||||
|
orchCache[address.toLowerCase()].leaderboardResults.lastTime =
|
||||||
|
new Date().getTime();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
const updateOrchScores = async function () {
|
||||||
|
for (const [id, obj] of Object.entries(orchCache)) {
|
||||||
|
await updateScore(id);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
const recoverStorage = async function () {
|
||||||
|
await storage.init({
|
||||||
|
stringify: JSON.stringify,
|
||||||
|
parse: JSON.parse,
|
||||||
|
encoding: "utf8",
|
||||||
|
logging: false,
|
||||||
|
ttl: false,
|
||||||
|
forgiveParseErrors: false,
|
||||||
|
});
|
||||||
|
storedDomains = await storage.getItem("ensDomainCache");
|
||||||
|
if (storedDomains) {
|
||||||
|
ensDomainCache = storedDomains;
|
||||||
|
}
|
||||||
|
storedOrchs = await storage.getItem("orchCache");
|
||||||
|
if (storedOrchs) {
|
||||||
|
orchCache = storedOrchs;
|
||||||
|
}
|
||||||
|
isSynced = true;
|
||||||
|
};
|
||||||
|
recoverStorage();
|
||||||
|
|
||||||
|
const runTests = async function () {
|
||||||
|
try {
|
||||||
|
const now = new Date().getTime();
|
||||||
|
if (
|
||||||
|
!lastLeaderboardCheck ||
|
||||||
|
now - lastLeaderboardCheck > CONF_SCORE_TIMEOUT
|
||||||
|
) {
|
||||||
|
await updateOrchScores();
|
||||||
|
lastLeaderboardCheck = now;
|
||||||
|
}
|
||||||
|
setTimeout(() => {
|
||||||
|
runTests();
|
||||||
|
}, CONF_SLEEPTIME);
|
||||||
|
return;
|
||||||
|
} catch (err) {
|
||||||
|
console.log(err);
|
||||||
|
setTimeout(() => {
|
||||||
|
runTests();
|
||||||
|
}, CONF_SLEEPTIME);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
console.log("Starting watcher for test stream results");
|
||||||
|
runTests();
|
||||||
|
|
||||||
|
exports.masterRouter = masterRouter;
|
@ -1,2 +1,2 @@
|
|||||||
import masterRouter from './master';
|
const { masterRouter } = require("./hodler.js");
|
||||||
export { masterRouter };
|
exports.masterRouter = masterRouter;
|
@ -1,309 +0,0 @@
|
|||||||
import express, { response } from "express";
|
|
||||||
import {
|
|
||||||
PRESHARED_MASTER_KEY, CONF_ENS_TIMEOUT
|
|
||||||
} from "../config";
|
|
||||||
const https = require('https');
|
|
||||||
const storage = require('node-persist');
|
|
||||||
const masterRouter = express.Router();
|
|
||||||
const client = require('prom-client');
|
|
||||||
const register = new client.Registry();
|
|
||||||
const collectDefaultMetrics = client.collectDefaultMetrics;
|
|
||||||
collectDefaultMetrics({ timeout: 3000 });
|
|
||||||
const promLatestLatency = new client.Gauge({
|
|
||||||
name: 'orch_latest_latency',
|
|
||||||
help: 'Latest latency known for a given Orchestrator',
|
|
||||||
labelNames: ['region', 'orchestrator', 'latitude', 'longitude']
|
|
||||||
});
|
|
||||||
register.registerMetric(promLatestLatency);
|
|
||||||
const promLatestPPP = new client.Gauge({
|
|
||||||
name: 'orch_latest_ppp',
|
|
||||||
help: 'Latest price per pixel known for a given Orchestrator',
|
|
||||||
labelNames: ['region', 'orchestrator', 'latitude', 'longitude']
|
|
||||||
});
|
|
||||||
register.registerMetric(promLatestPPP);
|
|
||||||
const promLatency = new client.Summary({
|
|
||||||
name: 'orch_latency',
|
|
||||||
help: 'Summary of latency stats',
|
|
||||||
percentiles: [0.01, 0.1, 0.9, 0.99],
|
|
||||||
labelNames: ['region']
|
|
||||||
});
|
|
||||||
register.registerMetric(promLatency);
|
|
||||||
const promAverageLatency = new client.Gauge({
|
|
||||||
name: 'orch_average_latency',
|
|
||||||
help: 'Average latency for a given Orchestrator',
|
|
||||||
labelNames: ['region', 'orchestrator', 'latitude', 'longitude']
|
|
||||||
});
|
|
||||||
register.registerMetric(promAverageLatency);
|
|
||||||
const promAUptimeScore = new client.Gauge({
|
|
||||||
name: 'orch_uptime_score',
|
|
||||||
help: 'Uptime score for a given orchestrator',
|
|
||||||
labelNames: ['region', 'orchestrator', 'latitude', 'longitude']
|
|
||||||
});
|
|
||||||
register.registerMetric(promAUptimeScore);
|
|
||||||
|
|
||||||
let isSynced = false;
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
|
|
||||||
ENS data from nFrame
|
|
||||||
[{"domain":null,"address":"0xc3c7c4c8f7061b7d6a72766eee5359fe4f36e61e","timestamp":1659963951567}]
|
|
||||||
|
|
||||||
*/
|
|
||||||
|
|
||||||
|
|
||||||
let ensData = [];
|
|
||||||
let lastUpdated = 0;
|
|
||||||
|
|
||||||
const updateEns = async function () {
|
|
||||||
try {
|
|
||||||
const url = "https://stronk.rocks/api/livepeer/getEnsDomains";
|
|
||||||
console.log("Getting new ENS data from " + url);
|
|
||||||
|
|
||||||
https.get(url, (res2) => {
|
|
||||||
let body = "";
|
|
||||||
res2.on("data", (chunk) => {
|
|
||||||
body += chunk;
|
|
||||||
});
|
|
||||||
res2.on("end", () => {
|
|
||||||
try {
|
|
||||||
const data = JSON.parse(body);
|
|
||||||
for (const newOrchData of data) {
|
|
||||||
if (!newOrchData) { continue; }
|
|
||||||
if (!newOrchData.domain) { continue; }
|
|
||||||
var found = false;
|
|
||||||
for (var orchIdx = 0; orchIdx < ensData.length; orchIdx++) {
|
|
||||||
if (ensData[orchIdx].address != newOrchData.address) { continue; }
|
|
||||||
ensData[orchIdx] = newOrchData;
|
|
||||||
found = true;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
if (!found) {
|
|
||||||
ensData.push(newOrchData);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
storage.setItem('ensData', ensData);
|
|
||||||
} catch (error) {
|
|
||||||
console.error(error.message);
|
|
||||||
};
|
|
||||||
});
|
|
||||||
}).on("error", (error) => {
|
|
||||||
console.error(error.message);
|
|
||||||
});
|
|
||||||
} catch (err) {
|
|
||||||
console.log(err);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
|
|
||||||
Incoming stats parsing
|
|
||||||
|
|
||||||
*/
|
|
||||||
|
|
||||||
masterRouter.post("/collectStats", async (req, res) => {
|
|
||||||
try {
|
|
||||||
if (!isSynced) { console.log("waiting for sync"); res.end('busy'); return; }
|
|
||||||
const { id, discoveryResults, responseTime, lookupResults, tag, key } = req.body;
|
|
||||||
if (!id || !tag || !key) {
|
|
||||||
console.log("Received malformed data. Aborting stats update...");
|
|
||||||
console.log(id, discoveryResults, responseTime, lookupResults, tag, key);
|
|
||||||
res.send(false);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
if (PRESHARED_MASTER_KEY != key) {
|
|
||||||
console.log("Unauthorized");
|
|
||||||
res.send(false);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
let latitude = null;
|
|
||||||
let longitude = null;
|
|
||||||
if (lookupResults && lookupResults.geoLookup && lookupResults.geoLookup.ll){
|
|
||||||
latitude = lookupResults.geoLookup.ll[0];
|
|
||||||
longitude = lookupResults.geoLookup.ll[1];
|
|
||||||
}
|
|
||||||
let thisId = id;
|
|
||||||
if (responseTime) {
|
|
||||||
for (const thisEns of ensData) {
|
|
||||||
if (!thisEns || !thisEns.domain);
|
|
||||||
if (thisEns.address != thisId) { continue; }
|
|
||||||
thisId = thisEns.domain;
|
|
||||||
}
|
|
||||||
promLatestLatency.set({ region: tag, orchestrator: thisId, latitude: latitude, longitude: longitude }, responseTime);
|
|
||||||
promLatency.observe({ region: tag }, responseTime);
|
|
||||||
}
|
|
||||||
if (discoveryResults && discoveryResults.price_info){
|
|
||||||
promLatestPPP.set({ region: tag, orchestrator: thisId, latitude: latitude, longitude: longitude }, discoveryResults.price_info.pricePerUnit / discoveryResults.price_info.pixelsPerUnit);
|
|
||||||
}
|
|
||||||
console.log('received data for ' + thisId + ' from ' + tag + ' (' + responseTime + " ms latency)");
|
|
||||||
// Save data point
|
|
||||||
const now = new Date().getTime();
|
|
||||||
// Update ENS from nframe if expired
|
|
||||||
if (now - lastUpdated > CONF_ENS_TIMEOUT) {
|
|
||||||
await updateEns();
|
|
||||||
lastUpdated = now;
|
|
||||||
}
|
|
||||||
let thisPing = responseTime;
|
|
||||||
if (!discoveryResults || !responseTime) { thisPing = null; }
|
|
||||||
let currentDataList = [];
|
|
||||||
let orchFound = false;
|
|
||||||
let regionFound = false;
|
|
||||||
for (var orchIdx = 0; orchIdx < orchScores.length; orchIdx++) {
|
|
||||||
if (orchScores[orchIdx].id != thisId) { continue; }
|
|
||||||
orchFound = true;
|
|
||||||
for (var regionIdx = 0; regionIdx < orchScores[orchIdx].data.length; regionIdx++) {
|
|
||||||
if (orchScores[orchIdx].data[regionIdx].tag != tag) { continue; }
|
|
||||||
regionFound = true;
|
|
||||||
if (orchScores[orchIdx].data[regionIdx].data.length > 60) {
|
|
||||||
orchScores[orchIdx].data[regionIdx].data = orchScores[orchIdx].data[regionIdx].data.slice(1);
|
|
||||||
}
|
|
||||||
orchScores[orchIdx].data[regionIdx].data.push({ latency: thisPing, timestamp: now });
|
|
||||||
currentDataList = orchScores[orchIdx].data[regionIdx].data;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
if (!regionFound) {
|
|
||||||
currentDataList = [{ latency: thisPing, timestamp: now }];
|
|
||||||
orchScores[orchIdx].data.push({ tag, data: currentDataList });
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
if (!orchFound) {
|
|
||||||
currentDataList = [{ latency: thisPing, timestamp: now }];
|
|
||||||
orchScores.push({ id: thisId, data: [{ tag, data: currentDataList }] });
|
|
||||||
}
|
|
||||||
await storage.setItem('orchScores', orchScores);
|
|
||||||
// Calc new scores
|
|
||||||
let prevtime = null;
|
|
||||||
let uptime = 0;
|
|
||||||
let downtime = 0;
|
|
||||||
let pingsum = 0;
|
|
||||||
let pingpoints = 0;
|
|
||||||
for (const thisData of currentDataList) {
|
|
||||||
// Count ping* vars
|
|
||||||
if (thisData.latency && thisData.latency != null) {
|
|
||||||
pingsum += thisData.latency;
|
|
||||||
pingpoints += 1;
|
|
||||||
}
|
|
||||||
// Only count *time vars if we have timestamps
|
|
||||||
if (prevtime && thisData.timestamp) {
|
|
||||||
if (thisData.latency && thisData.latency != null && thisData.latency < 1000) {
|
|
||||||
uptime += thisData.timestamp - prevtime;
|
|
||||||
} else {
|
|
||||||
downtime += thisData.timestamp - prevtime;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
prevtime = thisData.timestamp;
|
|
||||||
}
|
|
||||||
if (pingpoints) {
|
|
||||||
promAverageLatency.set({ region: tag, orchestrator: thisId, latitude: latitude, longitude: longitude }, pingsum / pingpoints);
|
|
||||||
}
|
|
||||||
if (uptime || downtime) {
|
|
||||||
let score;
|
|
||||||
if (!uptime) { score = 0; }
|
|
||||||
else { score = uptime / (uptime + downtime); }
|
|
||||||
promAUptimeScore.set({ region: tag, orchestrator: thisId, latitude: latitude, longitude: longitude }, score);
|
|
||||||
}
|
|
||||||
res.send(true);
|
|
||||||
} catch (err) {
|
|
||||||
console.log(err, req.body);
|
|
||||||
res.status(400).send(err);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
|
|
||||||
Public endpoints
|
|
||||||
|
|
||||||
*/
|
|
||||||
|
|
||||||
|
|
||||||
masterRouter.get("/prometheus", async (req, res) => {
|
|
||||||
try {
|
|
||||||
res.set('Content-Type', register.contentType);
|
|
||||||
const metrics = await register.metrics();
|
|
||||||
res.end(metrics);
|
|
||||||
} catch (err) {
|
|
||||||
res.status(400).send(err);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
masterRouter.get("/json", async (req, res) => {
|
|
||||||
try {
|
|
||||||
res.set('Content-Type', 'application/json');
|
|
||||||
res.end(JSON.stringify(orchScores));
|
|
||||||
} catch (err) {
|
|
||||||
res.status(400).send(err);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
|
|
||||||
Recover from storage
|
|
||||||
|
|
||||||
*/
|
|
||||||
|
|
||||||
|
|
||||||
let orchScores;
|
|
||||||
|
|
||||||
const recoverStorage = async function () {
|
|
||||||
await storage.init({
|
|
||||||
stringify: JSON.stringify,
|
|
||||||
parse: JSON.parse,
|
|
||||||
encoding: 'utf8',
|
|
||||||
logging: false,
|
|
||||||
ttl: false,
|
|
||||||
forgiveParseErrors: false
|
|
||||||
});
|
|
||||||
ensData = await storage.getItem('ensData');
|
|
||||||
if (!ensData) { ensData = []; }
|
|
||||||
orchScores = await storage.getItem('orchScores');
|
|
||||||
if (!orchScores) { orchScores = []; }
|
|
||||||
// Init prometheus from storage
|
|
||||||
for (const thisOrch of orchScores) {
|
|
||||||
console.log("recovering scores for " + thisOrch.id);
|
|
||||||
for (const thisRegion of thisOrch.data) {
|
|
||||||
let prevtime = null;
|
|
||||||
let uptime = 0;
|
|
||||||
let downtime = 0;
|
|
||||||
let pingsum = 0;
|
|
||||||
let pingpoints = 0;
|
|
||||||
for (const thisData of thisRegion.data) {
|
|
||||||
// Count ping* vars
|
|
||||||
if (thisData.latency) {
|
|
||||||
pingsum += thisData.latency;
|
|
||||||
pingpoints += 1;
|
|
||||||
promLatestLatency.set({ region: thisRegion.tag, orchestrator: thisOrch.id }, thisData.latency);
|
|
||||||
promLatency.observe({ region: thisRegion.tag }, thisData.latency);
|
|
||||||
}
|
|
||||||
// Only count *time vars if we have timestamps
|
|
||||||
if (prevtime && thisData.timestamp) {
|
|
||||||
if (thisData.latency) {
|
|
||||||
uptime += thisData.timestamp - prevtime;
|
|
||||||
} else {
|
|
||||||
downtime += thisData.timestamp - prevtime;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
prevtime = thisData.timestamp;
|
|
||||||
}
|
|
||||||
if (pingpoints) {
|
|
||||||
promAverageLatency.set({ region: thisRegion.tag, orchestrator: thisOrch.id }, pingsum / pingpoints);
|
|
||||||
}
|
|
||||||
if (uptime || downtime) {
|
|
||||||
const score = uptime / (uptime + downtime)
|
|
||||||
promAUptimeScore.set({ region: thisRegion.tag, orchestrator: thisOrch.id }, score);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
isSynced = true;
|
|
||||||
}
|
|
||||||
recoverStorage();
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
export default masterRouter;
|
|
@ -1,32 +0,0 @@
|
|||||||
import express from 'express';
|
|
||||||
import { masterRouter } from './routes/index';
|
|
||||||
import { MASTER_PORT } from "./config";
|
|
||||||
// Env variable which determines which DB to connect to
|
|
||||||
const { NODE_ENV: mode } = process.env;
|
|
||||||
|
|
||||||
(async () => {
|
|
||||||
try {
|
|
||||||
const app = express();
|
|
||||||
app.disable('x-powered-by');
|
|
||||||
app.use(express.urlencoded({ extended: true }));
|
|
||||||
app.use(express.json());
|
|
||||||
const apiRouter = express.Router();
|
|
||||||
app.use('/api', apiRouter);
|
|
||||||
apiRouter.use('/master', masterRouter);
|
|
||||||
// Error handler
|
|
||||||
app.use(function (err, req, res, next) {
|
|
||||||
res.locals.message = err.message;
|
|
||||||
// Also log it to the console
|
|
||||||
console.log(`${err.status || 500} - ${err.message} - ${req.originalUrl} - ${req.method} - ${req.ip}`);
|
|
||||||
// Render the error page
|
|
||||||
res.status(err.status || 500);
|
|
||||||
res.render('error');
|
|
||||||
});
|
|
||||||
|
|
||||||
app.listen(MASTER_PORT, "0.0.0.0", function () {
|
|
||||||
console.log(`Listening on port ${MASTER_PORT}`);
|
|
||||||
});
|
|
||||||
} catch (err) {
|
|
||||||
console.log(err);
|
|
||||||
}
|
|
||||||
})();
|
|
Loading…
x
Reference in New Issue
Block a user