diff --git a/client/package.json b/client/package.json index 05b63a5..9f5bd60 100644 --- a/client/package.json +++ b/client/package.json @@ -18,6 +18,7 @@ "@grpc/proto-loader": "^0.7.0", "esm": "^3.2.20", "express": "^4.17.1", + "geoip-lite": "^1.4.8", "graphql": "^16.5.0", "graphql-request": "^4.3.0", "npm": "^8.5.2", diff --git a/client/src/routes/orchTester.js b/client/src/routes/orchTester.js index e39b2c0..aed6068 100644 --- a/client/src/routes/orchTester.js +++ b/client/src/routes/orchTester.js @@ -1,10 +1,19 @@ import express from "express"; +const dns = require("dns"); +var geoip = require("geoip-lite"); const orchTesterRouter = express.Router(); import { - MASTER_DOMAIN, MASTER_PORT, MASTER_PATH, FRIENDLY_NAME, - PRESHARED_MASTER_KEY, CONF_SLEEPTIME, CONT_SIG, - CONF_ORCHINFO_TIMEOUT, CONF_BROADCASTER + MASTER_DOMAIN, + MASTER_PORT, + MASTER_PATH, + FRIENDLY_NAME, + PRESHARED_MASTER_KEY, + CONF_SLEEPTIME, + CONT_SIG, + CONF_ORCHINFO_TIMEOUT, + CONF_BROADCASTER, + CONF_DNS_TIMEOUT, } from "../config"; process.env.NODE_TLS_REJECT_UNAUTHORIZED = "0"; @@ -16,20 +25,18 @@ imported modules */ -import { request, gql } from 'graphql-request'; -const https = require('https'); -const http = require('http'); -var grpc = require('@grpc/grpc-js'); -var protoLoader = require('@grpc/proto-loader'); -var packageDefinition = protoLoader.loadSync( - "src/proto/livepeer.proto", - { - keepCase: true, - longs: String, - enums: String, - defaults: true, - oneofs: true - }); +import { request, gql } from "graphql-request"; +const https = require("https"); +const http = require("http"); +var grpc = require("@grpc/grpc-js"); +var protoLoader = require("@grpc/proto-loader"); +var packageDefinition = protoLoader.loadSync("src/proto/livepeer.proto", { + keepCase: true, + longs: String, + enums: String, + defaults: true, + oneofs: true, +}); var livepeerProto = grpc.loadPackageDefinition(packageDefinition).net; const ssl_creds = grpc.credentials.createSsl(null, null, null, { checkServerIdentity: () => undefined, @@ -48,12 +55,15 @@ function sleep(ms) { } function shuffle(array) { - let currentIndex = array.length, randomIndex; + let currentIndex = array.length, + randomIndex; while (currentIndex != 0) { randomIndex = Math.floor(Math.random() * currentIndex); currentIndex--; [array[currentIndex], array[randomIndex]] = [ - array[randomIndex], array[currentIndex]]; + array[randomIndex], + array[currentIndex], + ]; } return array; } @@ -69,20 +79,26 @@ but this would require constant watching for uri updates which is a pain to impl var activeOrchestrators = []; let lastUpdated = 0; +let orchDNS = {}; /// Does a GQL query to the subgraph for orchestrator data const getOrchestrators = async function () { console.log("Getting orchestrator data from the subgraph..."); try { - const orchQuery = gql`{ - transcoders(where: {active: true}, first: 1000) { + const orchQuery = gql` + { + transcoders(where: { active: true }, first: 1000) { id status totalStake serviceURI } - }`; - let orchData = await request("https://api.thegraph.com/subgraphs/name/livepeer/arbitrum-one", orchQuery); + } + `; + let orchData = await request( + "https://api.thegraph.com/subgraphs/name/livepeer/arbitrum-one", + orchQuery + ); orchData = orchData.transcoders; if (!orchData) { console.log("Thegraph is probably acting up..."); @@ -94,7 +110,7 @@ const getOrchestrators = async function () { console.log("Thegraph is probably acting up..."); return null; } -} +}; /// Refreshes orchestrator data if the subgraph is available const refreshOrchCache = async function () { @@ -107,7 +123,7 @@ const refreshOrchCache = async function () { lastUpdated = now; } } -} +}; /* @@ -117,24 +133,32 @@ Doing grpc calls to an orchestrator let currentPool = []; -const postStatistics = async function (id, discoveryResults, responseTime) { +const postStatistics = async function ( + id, + discoveryResults, + lookupResults, + responseTime +) { console.log("Posting stats for " + id + " (ping " + responseTime + " ms)"); // TODO look at response and log error? var postData = JSON.stringify({ - id, discoveryResults, responseTime, + id, + discoveryResults, + responseTime, + lookupResults, tag: FRIENDLY_NAME, - key: PRESHARED_MASTER_KEY + key: PRESHARED_MASTER_KEY, }); var options = { hostname: MASTER_DOMAIN, port: MASTER_PORT, path: MASTER_PATH, - method: 'POST', + method: "POST", headers: { - 'Content-Type': 'application/json', - 'Content-Length': postData.length - } + "Content-Type": "application/json", + "Content-Length": postData.length, + }, }; var req; if (MASTER_DOMAIN == "127.0.0.1" || MASTER_DOMAIN == "localhost") { @@ -142,8 +166,10 @@ const postStatistics = async function (id, discoveryResults, responseTime) { // console.log('statusCode:', res.statusCode); // console.log('headers:', res.headers); - res.on('data', (d) => { - process.stdout.write('Received response ' + d + ' from ' + MASTER_DOMAIN); + res.on("data", (d) => { + process.stdout.write( + "Received response " + d + " from " + MASTER_DOMAIN + ); }); }); } else { @@ -151,17 +177,19 @@ const postStatistics = async function (id, discoveryResults, responseTime) { // console.log('statusCode:', res.statusCode); // console.log('headers:', res.headers); - res.on('data', (d) => { - process.stdout.write('Received response ' + d + ' from ' + MASTER_DOMAIN); + res.on("data", (d) => { + process.stdout.write( + "Received response " + d + " from " + MASTER_DOMAIN + ); }); }); } - req.on('error', (e) => { - console.error('err', e); + req.on("error", (e) => { + console.error("err", e); }); req.write(postData); req.end(); -} +}; function hexToBytes(hex) { for (var bytes = [], c = 0; c < hex.length; c += 2) @@ -170,33 +198,44 @@ function hexToBytes(hex) { } const discoverOrchestrator = async function (target) { - if (!target) { return; } - var client = new livepeerProto.Orchestrator(target, ssl_creds, {"GRPC_ARG_DEFAULT_AUTHORITY": Math.random().toString(36).substr(2, 5)}); + if (!target) { + return; + } + var client = new livepeerProto.Orchestrator(target, ssl_creds, { + GRPC_ARG_DEFAULT_AUTHORITY: Math.random().toString(36).substr(2, 5), + }); var receivedResults = false; var orchestratorInfo; const start = new Date().getTime(); var elapsed = null; - await client.GetOrchestrator({ - address: hexToBytes(CONF_BROADCASTER), - sig: CONT_SIG - }, function (err, res) { - if (err) { - console.log("Discovery error: ", err.details); - orchestratorInfo = err.details; - elapsed = null; - } else { - orchestratorInfo = res; - elapsed = new Date().getTime() - start; + await client.GetOrchestrator( + { + address: hexToBytes(CONF_BROADCASTER), + sig: CONT_SIG, + }, + function (err, res) { + if (err) { + console.log("Discovery error: ", err.details); + orchestratorInfo = err.details; + elapsed = null; + } else { + orchestratorInfo = res; + elapsed = new Date().getTime() - start; + } + receivedResults = true; } - receivedResults = true; - }); - while (!receivedResults && new Date().getTime() - start < 4000) { await sleep(20); } + ); + while (!receivedResults && new Date().getTime() - start < 4000) { + await sleep(20); + } grpc.closeClient(client); return { discoveryResults: orchestratorInfo, elapsed }; -} +}; const pingOrchestrator = async function (target) { - if (!target) { return; } + if (!target) { + return; + } var client = new livepeerProto.Orchestrator(target, ssl_creds); var receivedResults = false; var pingPong; @@ -213,21 +252,53 @@ const pingOrchestrator = async function (target) { } receivedResults = true; }); - while (!receivedResults && new Date().getTime() - start < 4000) { await sleep(20); } + while (!receivedResults && new Date().getTime() - start < 4000) { + await sleep(20); + } return { pingResults: pingPong, elapsed }; +}; + +async function getIP(hostname) { + let obj = await dns.promises.lookup(hostname).catch((error) => { + console.error(error); + }); + if (obj) { + return obj.address; + } else { + return null; + } } const testOrchestrator = async function (id, target) { - if (!id.length || !target.length) { return; } - target = target.replace(/^https?:\/\//, ''); - console.log("Target is " + target); - const { discoveryResults, elapsed } = await discoverOrchestrator(target); - if (discoveryResults && discoveryResults == "insufficient sender reserve") { - console.log('Ignoring ' + id + ' for stats due to insufficient sender reserve'); + if (!id.length || !target.length) { return; } - await postStatistics(id, discoveryResults, elapsed); -} + const origTarget = new URL(target); + target = target.replace(/^https?:\/\//, ""); + console.log("Target is " + target); + // Resolve DNS + const now = new Date().getTime(); + if (!orchDNS[id] || now - orchDNS[id].lastTime > CONF_DNS_TIMEOUT) { + const resolved = await getIP(origTarget.hostname); + orchDNS[id] = { + originalTarget: origTarget.origin, + resolvedTarget: resolved, + geoLookup: geoip.lookup(resolved), + geoFrom: FRIENDLY_NAME, + lastTime: now, + }; + console.log("Updated DNS and GeoIP data: ", orchDNS[id]); + } + // Test orch + const { discoveryResults, elapsed } = await discoverOrchestrator(target); + if (discoveryResults && discoveryResults == "insufficient sender reserve") { + console.log( + "Ignoring " + id + " for stats due to insufficient sender reserve" + ); + return; + } + await postStatistics(id, discoveryResults, orchDNS[id], elapsed); +}; const refreshPool = function () { currentPool = []; @@ -235,18 +306,19 @@ const refreshPool = function () { currentPool.push({ id: thisObj.id, target: thisObj.serviceURI }); } shuffle(currentPool); -} - +}; const pingNextOrch = async function () { - if (!currentPool.length) { refreshPool(); } + if (!currentPool.length) { + refreshPool(); + } let currentOrch = currentPool.splice(0, 1)[0]; if (!currentOrch.id || !currentOrch.target) { console.log("Skipping Orchestrator with malformed data: ", currentOrch); return; } await testOrchestrator(currentOrch.id, currentOrch.target); -} +}; /* @@ -261,7 +333,7 @@ let isSyncing = false; const handleSync = async function () { try { cycle++; - console.log('Starting new cycle #' + cycle); + console.log("Starting new cycle #" + cycle); isSyncing = true; await refreshOrchCache(); await pingNextOrch(); @@ -270,13 +342,12 @@ const handleSync = async function () { handleSync(); }, CONF_SLEEPTIME); return; - } - catch (err) { + } catch (err) { console.log(err); isSyncing = false; setTimeout(() => { handleSync(); - }, CONF_SLEEPTIME) + }, CONF_SLEEPTIME); } }; if (!isSyncing) { @@ -284,5 +355,4 @@ if (!isSyncing) { handleSync(); } - -export default orchTesterRouter; \ No newline at end of file +export default orchTesterRouter; diff --git a/master/src/routes/master.js b/master/src/routes/master.js index 7c08dda..0b0e5eb 100644 --- a/master/src/routes/master.js +++ b/master/src/routes/master.js @@ -12,13 +12,13 @@ collectDefaultMetrics({ timeout: 3000 }); const promLatestLatency = new client.Gauge({ name: 'orch_latest_latency', help: 'Latest latency known for a given Orchestrator', - labelNames: ['region', 'orchestrator'] + labelNames: ['region', 'orchestrator', 'latitude', 'longitude'] }); register.registerMetric(promLatestLatency); const promLatestPPP = new client.Gauge({ name: 'orch_latest_ppp', help: 'Latest price per pixel known for a given Orchestrator', - labelNames: ['region', 'orchestrator'] + labelNames: ['region', 'orchestrator', 'latitude', 'longitude'] }); register.registerMetric(promLatestPPP); const promLatency = new client.Summary({ @@ -31,13 +31,13 @@ register.registerMetric(promLatency); const promAverageLatency = new client.Gauge({ name: 'orch_average_latency', help: 'Average latency for a given Orchestrator', - labelNames: ['region', 'orchestrator'] + labelNames: ['region', 'orchestrator', 'latitude', 'longitude'] }); register.registerMetric(promAverageLatency); const promAUptimeScore = new client.Gauge({ name: 'orch_uptime_score', help: 'Uptime score for a given orchestrator', - labelNames: ['region', 'orchestrator'] + labelNames: ['region', 'orchestrator', 'latitude', 'longitude'] }); register.registerMetric(promAUptimeScore); @@ -106,10 +106,10 @@ Incoming stats parsing masterRouter.post("/collectStats", async (req, res) => { try { if (!isSynced) { console.log("waiting for sync"); res.end('busy'); return; } - const { id, discoveryResults, responseTime, tag, key } = req.body; + const { id, discoveryResults, responseTime, lookupResults, tag, key } = req.body; if (!id || !tag || !key) { console.log("Received malformed data. Aborting stats update..."); - console.log(id, discoveryResults, responseTime, tag, key); + console.log(id, discoveryResults, responseTime, lookupResults, tag, key); res.send(false); return; } @@ -118,6 +118,12 @@ masterRouter.post("/collectStats", async (req, res) => { res.send(false); return; } + const latitude = null; + const longitude = null; + if (lookupResults.ll){ + latitude = lookupResults.ll[0]; + longitude = lookupResults.ll[1]; + } let thisId = id; if (responseTime) { for (const thisEns of ensData) { @@ -125,11 +131,11 @@ masterRouter.post("/collectStats", async (req, res) => { if (thisEns.address != thisId) { continue; } thisId = thisEns.domain; } - promLatestLatency.set({ region: tag, orchestrator: thisId }, responseTime); + promLatestLatency.set({ region: tag, orchestrator: thisId, latitude: longitude, longitude: longitude }, responseTime); promLatency.observe({ region: tag }, responseTime); } if (discoveryResults && discoveryResults.price_info){ - promLatestPPP.set({ region: tag, orchestrator: thisId }, discoveryResults.price_info.pricePerUnit / discoveryResults.price_info.pixelsPerUnit); + promLatestPPP.set({ region: tag, orchestrator: thisId, latitude: longitude, longitude: longitude }, discoveryResults.price_info.pricePerUnit / discoveryResults.price_info.pixelsPerUnit); } console.log('received data for ' + thisId + ' from ' + tag + ' (' + responseTime + " ms latency)"); // Save data point @@ -179,8 +185,6 @@ masterRouter.post("/collectStats", async (req, res) => { if (thisData.latency) { pingsum += thisData.latency; pingpoints += 1; - promLatestLatency.set({ region: tag, orchestrator: thisId }, thisData.latency); - promLatency.observe({ region: tag }, thisData.latency); } // Only count *time vars if we have timestamps if (prevtime && thisData.timestamp) { @@ -193,13 +197,13 @@ masterRouter.post("/collectStats", async (req, res) => { prevtime = thisData.timestamp; } if (pingpoints) { - promAverageLatency.set({ region: tag, orchestrator: thisId }, pingsum / pingpoints); + promAverageLatency.set({ region: tag, orchestrator: thisId, latitude: longitude, longitude: longitude }, pingsum / pingpoints); } if (uptime || downtime) { let score; if (!uptime) { score = 0; } else { score = uptime / (uptime + downtime); } - promAUptimeScore.set({ region: tag, orchestrator: thisId }, score); + promAUptimeScore.set({ region: tag, orchestrator: thisId, latitude: longitude, longitude: longitude }, score); } res.send(true); } catch (err) {