Compare commits

..

No commits in common. "abd90259cee75eb0cfd742e120d667a0990aedb1" and "b6cbf3ea37e5979eb7849d814ef3cf77558307d5" have entirely different histories.

3 changed files with 88 additions and 163 deletions

View File

@ -18,7 +18,6 @@
"@grpc/proto-loader": "^0.7.0", "@grpc/proto-loader": "^0.7.0",
"esm": "^3.2.20", "esm": "^3.2.20",
"express": "^4.17.1", "express": "^4.17.1",
"geoip-lite": "^1.4.8",
"graphql": "^16.5.0", "graphql": "^16.5.0",
"graphql-request": "^4.3.0", "graphql-request": "^4.3.0",
"npm": "^8.5.2", "npm": "^8.5.2",

View File

@ -1,19 +1,10 @@
import express from "express"; import express from "express";
const dns = require("dns");
var geoip = require("geoip-lite");
const orchTesterRouter = express.Router(); const orchTesterRouter = express.Router();
import { import {
MASTER_DOMAIN, MASTER_DOMAIN, MASTER_PORT, MASTER_PATH, FRIENDLY_NAME,
MASTER_PORT, PRESHARED_MASTER_KEY, CONF_SLEEPTIME, CONT_SIG,
MASTER_PATH, CONF_ORCHINFO_TIMEOUT, CONF_BROADCASTER
FRIENDLY_NAME,
PRESHARED_MASTER_KEY,
CONF_SLEEPTIME,
CONT_SIG,
CONF_ORCHINFO_TIMEOUT,
CONF_BROADCASTER,
CONF_DNS_TIMEOUT,
} from "../config"; } from "../config";
process.env.NODE_TLS_REJECT_UNAUTHORIZED = "0"; process.env.NODE_TLS_REJECT_UNAUTHORIZED = "0";
@ -25,18 +16,20 @@ imported modules
*/ */
import { request, gql } from "graphql-request"; import { request, gql } from 'graphql-request';
const https = require("https"); const https = require('https');
const http = require("http"); const http = require('http');
var grpc = require("@grpc/grpc-js"); var grpc = require('@grpc/grpc-js');
var protoLoader = require("@grpc/proto-loader"); var protoLoader = require('@grpc/proto-loader');
var packageDefinition = protoLoader.loadSync("src/proto/livepeer.proto", { var packageDefinition = protoLoader.loadSync(
keepCase: true, "src/proto/livepeer.proto",
longs: String, {
enums: String, keepCase: true,
defaults: true, longs: String,
oneofs: true, enums: String,
}); defaults: true,
oneofs: true
});
var livepeerProto = grpc.loadPackageDefinition(packageDefinition).net; var livepeerProto = grpc.loadPackageDefinition(packageDefinition).net;
const ssl_creds = grpc.credentials.createSsl(null, null, null, { const ssl_creds = grpc.credentials.createSsl(null, null, null, {
checkServerIdentity: () => undefined, checkServerIdentity: () => undefined,
@ -55,15 +48,12 @@ function sleep(ms) {
} }
function shuffle(array) { function shuffle(array) {
let currentIndex = array.length, let currentIndex = array.length, randomIndex;
randomIndex;
while (currentIndex != 0) { while (currentIndex != 0) {
randomIndex = Math.floor(Math.random() * currentIndex); randomIndex = Math.floor(Math.random() * currentIndex);
currentIndex--; currentIndex--;
[array[currentIndex], array[randomIndex]] = [ [array[currentIndex], array[randomIndex]] = [
array[randomIndex], array[randomIndex], array[currentIndex]];
array[currentIndex],
];
} }
return array; return array;
} }
@ -79,26 +69,20 @@ but this would require constant watching for uri updates which is a pain to impl
var activeOrchestrators = []; var activeOrchestrators = [];
let lastUpdated = 0; let lastUpdated = 0;
let orchDNS = {};
/// Does a GQL query to the subgraph for orchestrator data /// Does a GQL query to the subgraph for orchestrator data
const getOrchestrators = async function () { const getOrchestrators = async function () {
console.log("Getting orchestrator data from the subgraph..."); console.log("Getting orchestrator data from the subgraph...");
try { try {
const orchQuery = gql` const orchQuery = gql`{
{ transcoders(where: {active: true}, first: 1000) {
transcoders(where: { active: true }, first: 1000) {
id id
status status
totalStake totalStake
serviceURI serviceURI
} }
} }`;
`; let orchData = await request("https://api.thegraph.com/subgraphs/name/livepeer/arbitrum-one", orchQuery);
let orchData = await request(
"https://api.thegraph.com/subgraphs/name/livepeer/arbitrum-one",
orchQuery
);
orchData = orchData.transcoders; orchData = orchData.transcoders;
if (!orchData) { if (!orchData) {
console.log("Thegraph is probably acting up..."); console.log("Thegraph is probably acting up...");
@ -110,7 +94,7 @@ const getOrchestrators = async function () {
console.log("Thegraph is probably acting up..."); console.log("Thegraph is probably acting up...");
return null; return null;
} }
}; }
/// Refreshes orchestrator data if the subgraph is available /// Refreshes orchestrator data if the subgraph is available
const refreshOrchCache = async function () { const refreshOrchCache = async function () {
@ -123,7 +107,7 @@ const refreshOrchCache = async function () {
lastUpdated = now; lastUpdated = now;
} }
} }
}; }
/* /*
@ -133,32 +117,24 @@ Doing grpc calls to an orchestrator
let currentPool = []; let currentPool = [];
const postStatistics = async function ( const postStatistics = async function (id, discoveryResults, responseTime) {
id,
discoveryResults,
lookupResults,
responseTime
) {
console.log("Posting stats for " + id + " (ping " + responseTime + " ms)"); console.log("Posting stats for " + id + " (ping " + responseTime + " ms)");
// TODO look at response and log error? // TODO look at response and log error?
var postData = JSON.stringify({ var postData = JSON.stringify({
id, id, discoveryResults, responseTime,
discoveryResults,
responseTime,
lookupResults,
tag: FRIENDLY_NAME, tag: FRIENDLY_NAME,
key: PRESHARED_MASTER_KEY, key: PRESHARED_MASTER_KEY
}); });
var options = { var options = {
hostname: MASTER_DOMAIN, hostname: MASTER_DOMAIN,
port: MASTER_PORT, port: MASTER_PORT,
path: MASTER_PATH, path: MASTER_PATH,
method: "POST", method: 'POST',
headers: { headers: {
"Content-Type": "application/json", 'Content-Type': 'application/json',
"Content-Length": postData.length, 'Content-Length': postData.length
}, }
}; };
var req; var req;
if (MASTER_DOMAIN == "127.0.0.1" || MASTER_DOMAIN == "localhost") { if (MASTER_DOMAIN == "127.0.0.1" || MASTER_DOMAIN == "localhost") {
@ -166,10 +142,8 @@ const postStatistics = async function (
// console.log('statusCode:', res.statusCode); // console.log('statusCode:', res.statusCode);
// console.log('headers:', res.headers); // console.log('headers:', res.headers);
res.on("data", (d) => { res.on('data', (d) => {
process.stdout.write( process.stdout.write('Received response ' + d + ' from ' + MASTER_DOMAIN);
"Received response " + d + " from " + MASTER_DOMAIN
);
}); });
}); });
} else { } else {
@ -177,19 +151,17 @@ const postStatistics = async function (
// console.log('statusCode:', res.statusCode); // console.log('statusCode:', res.statusCode);
// console.log('headers:', res.headers); // console.log('headers:', res.headers);
res.on("data", (d) => { res.on('data', (d) => {
process.stdout.write( process.stdout.write('Received response ' + d + ' from ' + MASTER_DOMAIN);
"Received response " + d + " from " + MASTER_DOMAIN
);
}); });
}); });
} }
req.on("error", (e) => { req.on('error', (e) => {
console.error("err", e); console.error('err', e);
}); });
req.write(postData); req.write(postData);
req.end(); req.end();
}; }
function hexToBytes(hex) { function hexToBytes(hex) {
for (var bytes = [], c = 0; c < hex.length; c += 2) for (var bytes = [], c = 0; c < hex.length; c += 2)
@ -198,44 +170,33 @@ function hexToBytes(hex) {
} }
const discoverOrchestrator = async function (target) { const discoverOrchestrator = async function (target) {
if (!target) { if (!target) { return; }
return; var client = new livepeerProto.Orchestrator(target, ssl_creds, {"GRPC_ARG_DEFAULT_AUTHORITY": Math.random().toString(36).substr(2, 5)});
}
var client = new livepeerProto.Orchestrator(target, ssl_creds, {
GRPC_ARG_DEFAULT_AUTHORITY: Math.random().toString(36).substr(2, 5),
});
var receivedResults = false; var receivedResults = false;
var orchestratorInfo; var orchestratorInfo;
const start = new Date().getTime(); const start = new Date().getTime();
var elapsed = null; var elapsed = null;
await client.GetOrchestrator( await client.GetOrchestrator({
{ address: hexToBytes(CONF_BROADCASTER),
address: hexToBytes(CONF_BROADCASTER), sig: CONT_SIG
sig: CONT_SIG, }, function (err, res) {
}, if (err) {
function (err, res) { console.log("Discovery error: ", err.details);
if (err) { orchestratorInfo = err.details;
console.log("Discovery error: ", err.details); elapsed = null;
orchestratorInfo = err.details; } else {
elapsed = null; orchestratorInfo = res;
} else { elapsed = new Date().getTime() - start;
orchestratorInfo = res;
elapsed = new Date().getTime() - start;
}
receivedResults = true;
} }
); receivedResults = true;
while (!receivedResults && new Date().getTime() - start < 4000) { });
await sleep(20); while (!receivedResults && new Date().getTime() - start < 4000) { await sleep(20); }
}
grpc.closeClient(client); grpc.closeClient(client);
return { discoveryResults: orchestratorInfo, elapsed }; return { discoveryResults: orchestratorInfo, elapsed };
}; }
const pingOrchestrator = async function (target) { const pingOrchestrator = async function (target) {
if (!target) { if (!target) { return; }
return;
}
var client = new livepeerProto.Orchestrator(target, ssl_creds); var client = new livepeerProto.Orchestrator(target, ssl_creds);
var receivedResults = false; var receivedResults = false;
var pingPong; var pingPong;
@ -252,53 +213,21 @@ const pingOrchestrator = async function (target) {
} }
receivedResults = true; receivedResults = true;
}); });
while (!receivedResults && new Date().getTime() - start < 4000) { while (!receivedResults && new Date().getTime() - start < 4000) { await sleep(20); }
await sleep(20);
}
return { pingResults: pingPong, elapsed }; return { pingResults: pingPong, elapsed };
};
async function getIP(hostname) {
let obj = await dns.promises.lookup(hostname).catch((error) => {
console.error(error);
});
if (obj) {
return obj.address;
} else {
return null;
}
} }
const testOrchestrator = async function (id, target) { const testOrchestrator = async function (id, target) {
if (!id.length || !target.length) { if (!id.length || !target.length) { return; }
return; target = target.replace(/^https?:\/\//, '');
}
const origTarget = new URL(target);
target = target.replace(/^https?:\/\//, "");
console.log("Target is " + target); console.log("Target is " + target);
// Resolve DNS
const now = new Date().getTime();
if (!orchDNS[id] || now - orchDNS[id].lastTime > CONF_DNS_TIMEOUT) {
const resolved = await getIP(origTarget.hostname);
orchDNS[id] = {
originalTarget: origTarget.origin,
resolvedTarget: resolved,
geoLookup: geoip.lookup(resolved),
geoFrom: FRIENDLY_NAME,
lastTime: now,
};
console.log("Updated DNS and GeoIP data: ", orchDNS[id]);
}
// Test orch
const { discoveryResults, elapsed } = await discoverOrchestrator(target); const { discoveryResults, elapsed } = await discoverOrchestrator(target);
if (discoveryResults && discoveryResults == "insufficient sender reserve") { if (discoveryResults && discoveryResults == "insufficient sender reserve") {
console.log( console.log('Ignoring ' + id + ' for stats due to insufficient sender reserve');
"Ignoring " + id + " for stats due to insufficient sender reserve"
);
return; return;
} }
await postStatistics(id, discoveryResults, orchDNS[id], elapsed); await postStatistics(id, discoveryResults, elapsed);
}; }
const refreshPool = function () { const refreshPool = function () {
currentPool = []; currentPool = [];
@ -306,19 +235,18 @@ const refreshPool = function () {
currentPool.push({ id: thisObj.id, target: thisObj.serviceURI }); currentPool.push({ id: thisObj.id, target: thisObj.serviceURI });
} }
shuffle(currentPool); shuffle(currentPool);
}; }
const pingNextOrch = async function () { const pingNextOrch = async function () {
if (!currentPool.length) { if (!currentPool.length) { refreshPool(); }
refreshPool();
}
let currentOrch = currentPool.splice(0, 1)[0]; let currentOrch = currentPool.splice(0, 1)[0];
if (!currentOrch.id || !currentOrch.target) { if (!currentOrch.id || !currentOrch.target) {
console.log("Skipping Orchestrator with malformed data: ", currentOrch); console.log("Skipping Orchestrator with malformed data: ", currentOrch);
return; return;
} }
await testOrchestrator(currentOrch.id, currentOrch.target); await testOrchestrator(currentOrch.id, currentOrch.target);
}; }
/* /*
@ -333,7 +261,7 @@ let isSyncing = false;
const handleSync = async function () { const handleSync = async function () {
try { try {
cycle++; cycle++;
console.log("Starting new cycle #" + cycle); console.log('Starting new cycle #' + cycle);
isSyncing = true; isSyncing = true;
await refreshOrchCache(); await refreshOrchCache();
await pingNextOrch(); await pingNextOrch();
@ -342,12 +270,13 @@ const handleSync = async function () {
handleSync(); handleSync();
}, CONF_SLEEPTIME); }, CONF_SLEEPTIME);
return; return;
} catch (err) { }
catch (err) {
console.log(err); console.log(err);
isSyncing = false; isSyncing = false;
setTimeout(() => { setTimeout(() => {
handleSync(); handleSync();
}, CONF_SLEEPTIME); }, CONF_SLEEPTIME)
} }
}; };
if (!isSyncing) { if (!isSyncing) {
@ -355,4 +284,5 @@ if (!isSyncing) {
handleSync(); handleSync();
} }
export default orchTesterRouter; export default orchTesterRouter;

View File

@ -12,13 +12,13 @@ collectDefaultMetrics({ timeout: 3000 });
const promLatestLatency = new client.Gauge({ const promLatestLatency = new client.Gauge({
name: 'orch_latest_latency', name: 'orch_latest_latency',
help: 'Latest latency known for a given Orchestrator', help: 'Latest latency known for a given Orchestrator',
labelNames: ['region', 'orchestrator', 'latitude', 'longitude'] labelNames: ['region', 'orchestrator']
}); });
register.registerMetric(promLatestLatency); register.registerMetric(promLatestLatency);
const promLatestPPP = new client.Gauge({ const promLatestPPP = new client.Gauge({
name: 'orch_latest_ppp', name: 'orch_latest_ppp',
help: 'Latest price per pixel known for a given Orchestrator', help: 'Latest price per pixel known for a given Orchestrator',
labelNames: ['region', 'orchestrator', 'latitude', 'longitude'] labelNames: ['region', 'orchestrator']
}); });
register.registerMetric(promLatestPPP); register.registerMetric(promLatestPPP);
const promLatency = new client.Summary({ const promLatency = new client.Summary({
@ -31,13 +31,13 @@ register.registerMetric(promLatency);
const promAverageLatency = new client.Gauge({ const promAverageLatency = new client.Gauge({
name: 'orch_average_latency', name: 'orch_average_latency',
help: 'Average latency for a given Orchestrator', help: 'Average latency for a given Orchestrator',
labelNames: ['region', 'orchestrator', 'latitude', 'longitude'] labelNames: ['region', 'orchestrator']
}); });
register.registerMetric(promAverageLatency); register.registerMetric(promAverageLatency);
const promAUptimeScore = new client.Gauge({ const promAUptimeScore = new client.Gauge({
name: 'orch_uptime_score', name: 'orch_uptime_score',
help: 'Uptime score for a given orchestrator', help: 'Uptime score for a given orchestrator',
labelNames: ['region', 'orchestrator', 'latitude', 'longitude'] labelNames: ['region', 'orchestrator']
}); });
register.registerMetric(promAUptimeScore); register.registerMetric(promAUptimeScore);
@ -106,10 +106,10 @@ Incoming stats parsing
masterRouter.post("/collectStats", async (req, res) => { masterRouter.post("/collectStats", async (req, res) => {
try { try {
if (!isSynced) { console.log("waiting for sync"); res.end('busy'); return; } if (!isSynced) { console.log("waiting for sync"); res.end('busy'); return; }
const { id, discoveryResults, responseTime, lookupResults, tag, key } = req.body; const { id, discoveryResults, responseTime, tag, key } = req.body;
if (!id || !tag || !key) { if (!id || !tag || !key) {
console.log("Received malformed data. Aborting stats update..."); console.log("Received malformed data. Aborting stats update...");
console.log(id, discoveryResults, responseTime, lookupResults, tag, key); console.log(id, discoveryResults, responseTime, tag, key);
res.send(false); res.send(false);
return; return;
} }
@ -118,12 +118,6 @@ masterRouter.post("/collectStats", async (req, res) => {
res.send(false); res.send(false);
return; return;
} }
let latitude = null;
let longitude = null;
if (lookupResults && lookupResults.geoLookup && lookupResults.geoLookup.ll){
latitude = lookupResults.geoLookup.ll[0];
longitude = lookupResults.geoLookup.ll[1];
}
let thisId = id; let thisId = id;
if (responseTime) { if (responseTime) {
for (const thisEns of ensData) { for (const thisEns of ensData) {
@ -131,11 +125,11 @@ masterRouter.post("/collectStats", async (req, res) => {
if (thisEns.address != thisId) { continue; } if (thisEns.address != thisId) { continue; }
thisId = thisEns.domain; thisId = thisEns.domain;
} }
promLatestLatency.set({ region: tag, orchestrator: thisId, latitude: latitude, longitude: longitude }, responseTime); promLatestLatency.set({ region: tag, orchestrator: thisId }, responseTime);
promLatency.observe({ region: tag }, responseTime); promLatency.observe({ region: tag }, responseTime);
} }
if (discoveryResults && discoveryResults.price_info){ if (discoveryResults && discoveryResults.price_info){
promLatestPPP.set({ region: tag, orchestrator: thisId, latitude: latitude, longitude: longitude }, discoveryResults.price_info.pricePerUnit / discoveryResults.price_info.pixelsPerUnit); promLatestPPP.set({ region: tag, orchestrator: thisId }, discoveryResults.price_info.pricePerUnit / discoveryResults.price_info.pixelsPerUnit);
} }
console.log('received data for ' + thisId + ' from ' + tag + ' (' + responseTime + " ms latency)"); console.log('received data for ' + thisId + ' from ' + tag + ' (' + responseTime + " ms latency)");
// Save data point // Save data point
@ -182,13 +176,15 @@ masterRouter.post("/collectStats", async (req, res) => {
let pingpoints = 0; let pingpoints = 0;
for (const thisData of currentDataList) { for (const thisData of currentDataList) {
// Count ping* vars // Count ping* vars
if (thisData.latency && thisData.latency != null) { if (thisData.latency) {
pingsum += thisData.latency; pingsum += thisData.latency;
pingpoints += 1; pingpoints += 1;
promLatestLatency.set({ region: tag, orchestrator: thisId }, thisData.latency);
promLatency.observe({ region: tag }, thisData.latency);
} }
// Only count *time vars if we have timestamps // Only count *time vars if we have timestamps
if (prevtime && thisData.timestamp) { if (prevtime && thisData.timestamp) {
if (thisData.latency && thisData.latency != null && thisData.latency < 1000) { if (thisData.latency) {
uptime += thisData.timestamp - prevtime; uptime += thisData.timestamp - prevtime;
} else { } else {
downtime += thisData.timestamp - prevtime; downtime += thisData.timestamp - prevtime;
@ -197,17 +193,17 @@ masterRouter.post("/collectStats", async (req, res) => {
prevtime = thisData.timestamp; prevtime = thisData.timestamp;
} }
if (pingpoints) { if (pingpoints) {
promAverageLatency.set({ region: tag, orchestrator: thisId, latitude: latitude, longitude: longitude }, pingsum / pingpoints); promAverageLatency.set({ region: tag, orchestrator: thisId }, pingsum / pingpoints);
} }
if (uptime || downtime) { if (uptime || downtime) {
let score; let score;
if (!uptime) { score = 0; } if (!uptime) { score = 0; }
else { score = uptime / (uptime + downtime); } else { score = uptime / (uptime + downtime); }
promAUptimeScore.set({ region: tag, orchestrator: thisId, latitude: latitude, longitude: longitude }, score); promAUptimeScore.set({ region: tag, orchestrator: thisId }, score);
} }
res.send(true); res.send(true);
} catch (err) { } catch (err) {
console.log(err, req.body); console.log(err);
res.status(400).send(err); res.status(400).send(err);
} }
}); });