commit d18c4a6cc811e4bc7236fb1b29b23c536f8e0e7c Author: Marco van Dijk Date: Mon Aug 8 19:23:09 2022 +0200 Init: GetOrchestrator grpc requests, stores latency and uptime stats per region and orchestrator diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..fb9a937 --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +master/*.md +client/*.md +master/.node-persist/storage/* diff --git a/README.md b/README.md new file mode 100644 index 0000000..cd83e6d --- /dev/null +++ b/README.md @@ -0,0 +1,11 @@ +This program consists of three components: + - A master which consolidates orchestrator statistics from various clients + - Converts raw results of grpc calls into prometheus data points + - TODO integration with email / telegram + - TODO api to subscribe for notifications and cancel it + - TODO read test stream scores + - TODO allow custom names (ENS, configure via frontend) to be used over orch address + - A client which pushes stats to master + - Gets the list of active Orchestrators from the Livepeer subgraph + - Does GetOrchestrator calls to all Orchestrators one by one + - A frontend where users can (un)subscribe to notifications \ No newline at end of file diff --git a/client/.gitignore b/client/.gitignore new file mode 100644 index 0000000..fae52e8 --- /dev/null +++ b/client/.gitignore @@ -0,0 +1,4 @@ +/node_modules +/src/config.js +genSig.js +package-lock.json \ No newline at end of file diff --git a/client/README.md b/client/README.md new file mode 100644 index 0000000..963d32a --- /dev/null +++ b/client/README.md @@ -0,0 +1,66 @@ +Requires a Broadcaster with some reserve amount set. TODO + +### Config variables +``` + { + CLIENT_PORT: what port the client accepts API requests on + MASTER_PORT: what port the master should be listening on + MASTER_DOMAIN: the domain on which the master is hosted + MASTER_PATH: the path to the collectStats endpoint. Should be `/api/master/collectStats` unless you are running a reverse proxy + FRIENDLY_NAME: `region` label which gets attached to collected data + PRESHARED_MASTER_KEY: must be the same as the `PRESHARED_MASTER_KEY` on the master + CONF_SLEEPTIME: time between testing Orchestrators in milliseconds + CONF_ORCHINFO_TIMEOUT: timeout for refreshing the list of active orchestrators in milliseconds + CONF_BROADCASTER: eth address of the broadcaster + CONT_SIG = broadcasters' signature of their eth address - reach out on how to get this + } +``` +### Run production +Note: this folder has to be placed in `/orchTest/client` + + npm install + nano src/config.js + +example `config.js`: +``` +export const { + CLIENT_PORT = 42068, + MASTER_PORT = 443, + MASTER_DOMAIN = "nframe.nl", + MASTER_PATH = "/orch/collectStats", + FRIENDLY_NAME = "Chicago", + PRESHARED_MASTER_KEY = "koekjes", + CONF_SLEEPTIME = 2000, + CONF_ORCHINFO_TIMEOUT = 600000, + CONF_BROADCASTER = "847791cBF03be716A7fe9Dc8c9Affe17Bd49Ae5e", + CONT_SIG = Buffer [array of uint8] + +} = process.env; + +``` + + pm2 start ecosystem.config.js + +### Run development + + npm install + nano src/config.js + +example `config.js`: +``` +export const { + CLIENT_PORT = 42068, + MASTER_PORT = 42069, + MASTER_DOMAIN = "127.0.0.1", + MASTER_PATH = "/api/master/collectStats", + FRIENDLY_NAME = "Leiden", + PRESHARED_MASTER_KEY = "koekjes", + CONF_SLEEPTIME = 5000, + CONF_ORCHINFO_TIMEOUT = 100000, + CONF_BROADCASTER = "847791cBF03be716A7fe9Dc8c9Affe17Bd49Ae5e", + CONT_SIG = Buffer [array of uint8] +} = process.env; +``` + + npm run dev + diff --git a/client/ecosystem.config.js b/client/ecosystem.config.js new file mode 100644 index 0000000..06a1bde --- /dev/null +++ b/client/ecosystem.config.js @@ -0,0 +1,18 @@ +module.exports = { + apps: [ + { + name: "orchProber", + script: "./src/orchProber.js", + cwd: "/orchTest/client", + env_production: { + NODE_ENV: "production" + }, + env_development: { + NODE_ENV: "development" + }, + env_local: { + NODE_ENV: "local" + } + } + ] +} diff --git a/client/package.json b/client/package.json new file mode 100644 index 0000000..05b63a5 --- /dev/null +++ b/client/package.json @@ -0,0 +1,29 @@ +{ + "name": "orchProber", + "version": "0.0.1", + "description": "", + "main": "./src/orchProber.js", + "module": "./src/orchProber.js", + "scripts": { + "prod": "NODE_ENV=production pm2 start ecosystem.config.js", + "start": "NODE_ENV=production node ./src/orchProber.js", + "dev": "NODE_ENV=development nodemon ./src/orchProber.js", + "local": "NODE_ENV=local nodemon ./src/orchProber.js" + }, + "keywords": [], + "author": "Marco van Dijk", + "license": "WTFPL", + "dependencies": { + "@grpc/grpc-js": "^1.6.8", + "@grpc/proto-loader": "^0.7.0", + "esm": "^3.2.20", + "express": "^4.17.1", + "graphql": "^16.5.0", + "graphql-request": "^4.3.0", + "npm": "^8.5.2", + "web3": "^1.7.5" + }, + "devDependencies": { + "nodemon": "^1.18.10" + } +} diff --git a/client/src/client.js b/client/src/client.js new file mode 100644 index 0000000..97024df --- /dev/null +++ b/client/src/client.js @@ -0,0 +1,33 @@ +import express from 'express'; +import { orchTesterRouter } from './routes/index'; +import { CLIENT_PORT } from "./config"; +// Env variable which determines which DB to connect to +const { NODE_ENV: mode } = process.env; + +(async () => { + try { + const app = express(); + app.disable('x-powered-by'); + app.use(express.urlencoded({ extended: true })); + app.use(express.json()); + + const apiRouter = express.Router(); + app.use('/api', apiRouter); + apiRouter.use('/tester', orchTesterRouter); + // Error handler + app.use(function (err, req, res, next) { + res.locals.message = err.message; + // Also log it to the console + console.log(`${err.status || 500} - ${err.message} - ${req.originalUrl} - ${req.method} - ${req.ip}`); + // Render the error page + res.status(err.status || 500); + res.render('error'); + }); + + app.listen(CLIENT_PORT, "0.0.0.0", function () { + console.log(`Listening on port ${CLIENT_PORT}`); + }); + } catch (err) { + console.log(err); + } +})(); \ No newline at end of file diff --git a/client/src/orchProber.js b/client/src/orchProber.js new file mode 100644 index 0000000..4d5caa6 --- /dev/null +++ b/client/src/orchProber.js @@ -0,0 +1,2 @@ +require = require("esm")(module) +module.exports = require("./client.js") \ No newline at end of file diff --git a/client/src/proto/livepeer.proto b/client/src/proto/livepeer.proto new file mode 100644 index 0000000..8a1c9df --- /dev/null +++ b/client/src/proto/livepeer.proto @@ -0,0 +1,456 @@ +syntax = "proto3"; + +package net; + +// RPC calls implemented by the orchestrator +service Orchestrator { + + // Called by the broadcaster to request transcoder info from an orchestrator. + rpc GetOrchestrator(OrchestratorRequest) returns (OrchestratorInfo); + rpc Ping(PingPong) returns (PingPong); +} + +service Transcoder { + + // Called by the transcoder to register to an orchestrator. The orchestrator + // notifies registered transcoders of segments as they come in. + rpc RegisterTranscoder(RegisterRequest) returns (stream NotifySegment); +} + +message PingPong { + // Implementation defined + bytes value = 1; + +} + + +// This request is sent by the broadcaster in `GetTranscoder` to request +// information on which transcoder to use. +message OrchestratorRequest { + + // Ethereum address of the broadcaster + bytes address = 1; + + // Broadcaster's signature over its address + bytes sig = 2; +} + +/* + OSInfo needed to negotiate storages that will be used. + It carries info needed to write to the storage. +*/ +message OSInfo { + + enum StorageType { + DIRECT = 0; + S3 = 1; + GOOGLE = 2; + } + + // Storage type: direct, s3, ipfs. + StorageType storageType = 1; + + S3OSInfo s3info = 16; +} + +message S3OSInfo { + + // Host to use to connect to S3 + string host = 1; + + // Key (prefix) to use when uploading the object. + string key = 2; + + // POST policy that S3 owner node creates to give write access to other node. + string policy = 3; + + // Signature for POST policy. + string signature = 4; + + // Needed for POST policy. + string credential = 5; + + // Needed for POST policy. + string xAmzDate = 6; +} + +// PriceInfo conveys pricing info for transcoding services +message PriceInfo { + // price in wei + int64 pricePerUnit = 1; + + // Pixels covered in the price + // Set price to 1 wei and pixelsPerUnit > 1 to have a smaller price granularity per pixel than 1 wei + int64 pixelsPerUnit = 2; +} + +message Capabilities { + + // Bit string of supported features - one bit per feature + repeated uint64 bitstring = 1; + + // Bit string of features that are required to be supported + repeated uint64 mandatories = 2; + + // Capacity corresponding to each capability + map capacities = 3; + + // Non-binary capability constraints, such as supported ranges. + message Constraints { + // Empty for now + } +} + +// The orchestrator sends this in response to `GetOrchestrator`, containing +// miscellaneous data related to the job. +message OrchestratorInfo { + + // URI of the transcoder to use for submitting segments. + string transcoder = 1; + + // Parameters for probabilistic micropayment tickets + TicketParams ticket_params = 2; + + // Price Info containing the price per pixel to transcode + PriceInfo price_info = 3; + + // ETH address that should sign transcoded results + bytes address = 4; + + // Features and constraints supported by the orchestrator + Capabilities capabilities = 5; + + // Data for transcoding authentication + AuthToken auth_token = 6; + + // Orchestrator returns info about own input object storage, if it wants it to be used. + repeated OSInfo storage = 32; +} + +// Data for transcoding authentication that is included in the OrchestratorInfo message during discovery +message AuthToken { + // Record used to authenticate for a transcode session + // Opaque to the receiver + bytes token = 1; + + // ID of the transcode session that the token is authenticating for + string session_id = 2; + + // Timestamp when the token expires + int64 expiration = 3; +} + +// [EXPERIMENTAL] +// Describes a class that a model is trained to detect +message DetectorClass { + // ID of the class to detect + uint32 class_id = 1; + + // Name of the class to detect + string class_name = 2; +} + +// [EXPERIMENTAL] +// Describes the scene classification configuration +message SceneClassificationProfile { + // Sample rate of the frames picked by the O for scene detection + uint32 sample_rate = 1; + + // List of output classes the model is trained to detect + repeated DetectorClass classes = 2; +} + +// [EXPERIMENTAL] +// Describes the content detection configuration +message DetectorProfile { + oneof value { + SceneClassificationProfile scene_classification = 1; + } +} + +// Data included by the broadcaster when submitting a segment for transcoding. +message SegData { + + // Manifest ID this segment belongs to + bytes manifestId = 1; + + // Sequence number of the segment to be transcoded + int64 seq = 2; + + // Hash of the segment data to be transcoded + bytes hash = 3; + + // Transcoding profiles to use + bytes profiles = 4; + + // Broadcaster signature for the segment. Corresponds to: + // broadcaster.sign(manifestId | seqNo | dataHash | profiles) + bytes sig = 5; + + // Duration of the segment to be transcoded, in milliseconds + int32 duration = 6; + + // Capabilities used by this segment. + Capabilities capabilities = 7; + + // Data for transcoding authentication + AuthToken auth_token = 8; + + // [EXPERIMENTAL] + // Detector enabled for this segment + bool detector_enabled = 9; + + // Calculate perceptual hash for this segment + bool calc_perceptual_hash = 10; + + // Broadcaster's preferred storage medium(s) + // XXX should we include this in a sig somewhere until certs are authenticated? + repeated OSInfo storage = 32; + + // Transcoding profiles to use. Supersedes `profiles` field + // Deprecated by `fullProfiles2` but may still be used for mpegts formats + repeated VideoProfile fullProfiles = 33; + + // Transcoding profiles to use. Supersedes `fullProfiles` field + // Deprecated by `fullProfiles3` but may still be used for integer FPS + repeated VideoProfile fullProfiles2 = 34; + + // Transcoding profiles to use. Supersedes `fullProfiles2` field + repeated VideoProfile fullProfiles3 = 35; + + // Transcoding parameters specific to this segment + SegParameters segment_parameters = 37; + + // [EXPERIMENTAL] + // Detector profiles to use + repeated DetectorProfile detector_profiles = 36; +} + +message SegParameters { + // Start timestamp from which to start encoding + // Milliseconds, from start of the file + uint64 from = 1; + // Skip all frames after that timestamp + // Milliseconds, from start of the file + uint64 to = 2; +} + +message VideoProfile { + // Name of VideoProfile + string name = 16; + + // Width of VideoProfile + int32 width = 17; + + // Height of VideoProfile + int32 height = 18; + + // Bitrate of VideoProfile + int32 bitrate =19; + + // FPS of VideoProfile + uint32 fps = 20; + + // Desired output format + enum Format { + MPEGTS = 0; + MP4 = 1; + } + Format format = 21; + + // FPS Denominator of VideoProfile + uint32 fpsDen = 22; + + enum Profile { + ENCODER_DEFAULT = 0; + H264_BASELINE = 1; + H264_MAIN = 2; + H264_HIGH = 3; + H264_CONSTRAINED_HIGH = 4; + } + // Desired codec profile + Profile profile = 23; + + // GOP interval + int32 gop = 24; + + enum VideoCodec { + H264 = 0; + H265 = 1; + VP8 = 2; + VP9 = 3; + } + + // Encoder (video codec) + VideoCodec encoder = 25; + + int32 colorDepth = 26; + + enum ChromaSubsampling { + CHROMA_420 = 0; + CHROMA_422 = 1; + CHROMA_444 = 2; + } + ChromaSubsampling chromaFormat = 27; +} + +// Individual transcoded segment data. +message TranscodedSegmentData { + + // URL where the transcoded data can be downloaded from. + string url = 1; + + // Amount of pixels processed (output pixels) + int64 pixels = 2; + + // URL where the perceptual hash data can be downloaded from (can be empty) + string perceptual_hash_url = 3; +} + +// [EXPERIMENTAL] +// Describes scene classification results +message SceneClassificationData { + // Probability that the segment is detected as a particular classID (uint32) + map class_probs = 1; +} + +// [EXPERIMENTAL] +// Describes detection results +message DetectData { + oneof value { + SceneClassificationData scene_classification = 1; + } +} + +// A set of transcoded segments following the profiles specified in the job. +message TranscodeData { + + // Transcoded data, in the order specified in the job options + repeated TranscodedSegmentData segments = 1; + + // Signature of the hash of the concatenated hashes + bytes sig = 2; + + // [EXPERIMENTAL] + // Detection result data in same order as SegData.detector_profiles + repeated DetectData detections = 3; +} + +// Response that a transcoder sends after transcoding a segment. +message TranscodeResult { + + // Sequence number of the transcoded results. + int64 seq = 1; + + // Result of transcoding can be an error, or successful with more info + oneof result { + string error = 2; + TranscodeData data = 3; + } + + // Used to notify a broadcaster of updated orchestrator information + OrchestratorInfo info = 16; +} + +// Sent by the transcoder to register itself to the orchestrator. +message RegisterRequest { + + // Shared secret for auth + string secret = 1; + + // Transcoder capacity + int64 capacity = 2; + + // Transcoder capabilities + Capabilities capabilities = 3; +} + +// Sent by the orchestrator to the transcoder +message NotifySegment { + + // URL of the segment to transcode. + string url = 1; + + // Configuration for the transcoding job + SegData segData = 3; + + // ID for this particular transcoding task. + int64 taskId = 16; + + // All fields below are deprecated. May still be populated if necessary + + // Deprecated by segData. Job the segment belongs to. + reserved 2; // Formerly "string job" + + // Deprecated by fullProfiles. Set of presets to transcode into. + // Should be set to an invalid value to induce failures + bytes profiles = 17; + + // Deprecated by segData. Transcoding configuration to use. + reserved 33; // Formerly "repeated VideoProfile fullProfiles" +} + +// Required parameters for probabilistic micropayment tickets +message TicketParams { + // ETH address of the recipient + bytes recipient = 1; + + // Pay out (in Wei) to the recipient if the ticket wins + bytes face_value = 2; + + // Probability that the ticket wins + bytes win_prob = 3; + + // 32 byte keccak-256 hash commitment to a random number provided + // by the recipient + bytes recipient_rand_hash = 4; + + // Value generated by recipient that the recipient can use + // to derive the random number corresponding to the recipient's hash commitment + bytes seed = 5; + + // Block number at which the current set of advertised TicketParams is no longer valid + bytes expiration_block = 6; + + // Expected ticket expiration params + TicketExpirationParams expiration_params = 7; +} + +// Sender Params (nonces and signatures) +message TicketSenderParams { + // Monotonically increasing counter that makes the ticket + // unique relative to a particular hash commitment to a recipient's random number + uint32 sender_nonce = 1; + + // Sender signature over the ticket + bytes sig = 2; +} + +// Ticket params for expiration related validation +message TicketExpirationParams { + // Round during which tickets are created + int64 creation_round = 1; + + // Block hash associated with creation_round + bytes creation_round_block_hash = 2; +} + +// Payment for transcoding video segments +// A payment can constitute of multiple tickets +// A broadcaster might need to send multiple tickets to top up his credit with an Orchestrator +message Payment { + // Probabilistic micropayment ticket parameters + // These remain the same even when sending multiple tickets + TicketParams ticket_params = 1; + + // ETH address of the sender + bytes sender = 2; + + // Ticket params for expiration related validation + TicketExpirationParams expiration_params = 3; + + repeated TicketSenderParams ticket_sender_params = 4; + + // O's last known price + PriceInfo expected_price = 5; +} diff --git a/client/src/routes/index.js b/client/src/routes/index.js new file mode 100644 index 0000000..28a407e --- /dev/null +++ b/client/src/routes/index.js @@ -0,0 +1,2 @@ +import orchTesterRouter from './orchTester'; +export { orchTesterRouter }; \ No newline at end of file diff --git a/client/src/routes/orchTester.js b/client/src/routes/orchTester.js new file mode 100644 index 0000000..48a0640 --- /dev/null +++ b/client/src/routes/orchTester.js @@ -0,0 +1,276 @@ +import express from "express"; + +const orchTesterRouter = express.Router(); +import { + MASTER_DOMAIN, MASTER_PORT, MASTER_PATH, FRIENDLY_NAME, + PRESHARED_MASTER_KEY, CONF_SLEEPTIME, CONT_SIG, + CONF_ORCHINFO_TIMEOUT, CONF_BROADCASTER +} from "../config"; + +process.env.NODE_TLS_REJECT_UNAUTHORIZED = "0"; + +/* + +INIT +imported modules + +*/ + +import { request, gql } from 'graphql-request'; +const https = require('https'); +const http = require('http'); +var grpc = require('@grpc/grpc-js'); +var protoLoader = require('@grpc/proto-loader'); +var packageDefinition = protoLoader.loadSync( + "src/proto/livepeer.proto", + { + keepCase: true, + longs: String, + enums: String, + defaults: true, + oneofs: true + }); +var livepeerProto = grpc.loadPackageDefinition(packageDefinition).net; +const ssl_creds = grpc.credentials.createSsl(null, null, null, { + checkServerIdentity: () => undefined, +}); + +/* + +Global helper functions + +*/ + +function sleep(ms) { + return new Promise((resolve) => { + setTimeout(resolve, ms); + }); +} + +function shuffle(array) { + let currentIndex = array.length, randomIndex; + while (currentIndex != 0) { + randomIndex = Math.floor(Math.random() * currentIndex); + currentIndex--; + [array[currentIndex], array[randomIndex]] = [ + array[randomIndex], array[currentIndex]]; + } + return array; +} + +/* + +Refreshing active orchestrators +Pulls this data from the Livepeer subgraph (https://api.thegraph.com/subgraphs/name/livepeer/arbitrum-one/graphql) +We might want to switch to reading directly from the blockchain +but this would require constant watching for uri updates which is a pain to implement + +*/ + +var activeOrchestrators = []; +let lastUpdated = 0; + +/// Does a GQL query to the subgraph for orchestrator data +const getOrchestrators = async function () { + console.log("Getting orchestrator data from the subgraph..."); + try { + const orchQuery = gql`{ + transcoders(where: {active: true}, first: 1000) { + id + status + totalStake + serviceURI + } + }`; + let orchData = await request("https://api.thegraph.com/subgraphs/name/livepeer/arbitrum-one", orchQuery); + orchData = orchData.transcoders; + if (!orchData) { + console.log("Thegraph is probably acting up..."); + return null; + } + return orchData; + } catch (err) { + console.log(err); + console.log("Thegraph is probably acting up..."); + return null; + } +} + +/// Refreshes orchestrator data if the subgraph is available +const refreshOrchCache = async function () { + const now = new Date().getTime(); + // Update cmc once their data has expired + if (now - lastUpdated > CONF_ORCHINFO_TIMEOUT) { + const data = await getOrchestrators(); + if (data) { + activeOrchestrators = data; + lastUpdated = now; + } + } +} + +/* + +Doing grpc calls to an orchestrator + +*/ + +let currentPool = []; + +const postStatistics = async function (id, discoveryResults, responseTime) { + console.log("Posting stats for " + id + " (ping " + responseTime + " ms)"); + + // TODO look at response and log error? + var postData = JSON.stringify({ + id, discoveryResults, responseTime, + tag: FRIENDLY_NAME, + key: PRESHARED_MASTER_KEY + }); + var options = { + hostname: MASTER_DOMAIN, + port: MASTER_PORT, + path: MASTER_PATH, + method: 'POST', + headers: { + 'Content-Type': 'application/json', + 'Content-Length': postData.length + } + }; + var req; + if (MASTER_DOMAIN == "127.0.0.1" || MASTER_DOMAIN == "localhost") { + req = http.request(options, (res) => { + // console.log('statusCode:', res.statusCode); + // console.log('headers:', res.headers); + + res.on('data', (d) => { + process.stdout.write('Received response ' + d + ' from ' + MASTER_DOMAIN); + }); + }); + } else { + req = https.request(options, (res) => { + // console.log('statusCode:', res.statusCode); + // console.log('headers:', res.headers); + + res.on('data', (d) => { + process.stdout.write('Received response ' + d + ' from ' + MASTER_DOMAIN); + }); + }); + } + req.on('error', (e) => { + console.error('err', e); + }); + req.write(postData); + req.end(); +} + +function hexToBytes(hex) { + for (var bytes = [], c = 0; c < hex.length; c += 2) + bytes.push(parseInt(hex.substr(c, 2), 16)); + return bytes; +} + +const discoverOrchestrator = async function (target) { + var client = new livepeerProto.Orchestrator(target, ssl_creds); + const addr = CONF_BROADCASTER; + var receivedResults = false; + var orchestratorInfo; + const start = new Date().getTime(); + var elapsed; + await client.GetOrchestrator({ + address: hexToBytes(addr), + sig: CONT_SIG + }, function (err, res) { + if (err) { + console.log("discovery err: ", err) + } + elapsed = new Date().getTime() - start; + orchestratorInfo = res; + receivedResults = true; + }); + while (!receivedResults && new Date().getTime() - start < 5000) { await sleep(200); } + return { discoveryResults: orchestratorInfo, elapsed }; +} + +const pingOrchestrator = async function (target) { + var client = new livepeerProto.Orchestrator(target, ssl_creds); + var receivedResults = false; + var pingPong; + const start = new Date().getTime(); + var elapsed; + await client.Ping({ value: "koekjes" }, function (err, res) { + if (err) { + console.log("Ping err: ", err) + } + elapsed = new Date().getTime() - start; + pingPong = res; + receivedResults = true; + }); + while (!receivedResults && new Date().getTime() - start < 5000) { await sleep(200); } + return { pingResults: pingPong, elapsed }; +} + +const testOrchestrator = async function (id, target) { + if (!id.length || !target.length) { return; } + target = target.replace(/^https?:\/\//, ''); + console.log("Target is " + target); + const { discoveryResults, elapsed } = await discoverOrchestrator(target); + await postStatistics(id, discoveryResults, elapsed); +} + +const refreshPool = function () { + currentPool = []; + for (const thisObj of activeOrchestrators) { + currentPool.push({ id: thisObj.id, target: thisObj.serviceURI }); + } + shuffle(currentPool); +} + + +const pingNextOrch = async function () { + if (!currentPool.length) { refreshPool(); } + let currentOrch = currentPool.splice(0, 1)[0]; + if (!currentOrch.id || !currentOrch.target) { + console.log("err: \n", currentOrch); + return; + } + await testOrchestrator(currentOrch.id, currentOrch.target); +} + +/* + +Main Loop +Maybe we shouldn't use nodejs as a client... + +*/ + +let cycle = 0; +let isSyncing = false; +// Does the actual looping over last parsed block -> latest block in chain +const handleSync = async function () { + try { + cycle++; + console.log('Starting new cycle #' + cycle); + isSyncing = true; + await refreshOrchCache(); + await pingNextOrch(); + isSyncing = false; + setTimeout(() => { + handleSync(); + }, CONF_SLEEPTIME); + return; + } + catch (err) { + console.log(err); + isSyncing = false; + setTimeout(() => { + handleSync(); + }, CONF_SLEEPTIME) + } +}; +if (!isSyncing) { + console.log("Starting main loop"); + handleSync(); +} + + +export default orchTesterRouter; \ No newline at end of file diff --git a/master/.gitignore b/master/.gitignore new file mode 100644 index 0000000..29c82a2 --- /dev/null +++ b/master/.gitignore @@ -0,0 +1,3 @@ +/node_modules +/src/config.js +package-lock.json \ No newline at end of file diff --git a/master/README.md b/master/README.md new file mode 100644 index 0000000..c1bba43 --- /dev/null +++ b/master/README.md @@ -0,0 +1,39 @@ + +### Config variables +``` + { + MASTER_PORT: what port the master is listening on + PRESHARED_MASTER_KEY: clients must provide this value when uploading stats + } +``` +### Run production +Note: this folder has to be placed in `/orchTest/master` + + npm install + nano src/config.js + +example `config.js`: +``` +export const { + MASTER_PORT = 42069, + PRESHARED_MASTER_KEY = "koekjes" +} = process.env; +``` + + pm2 start ecosystem.config.js + +### Run development + + npm install + nano src/config.js + +example `config.js`: +``` +export const { + MASTER_PORT = 42069, + PRESHARED_MASTER_KEY = "koekjes" +} = process.env; +``` + + npm run dev + diff --git a/master/ecosystem.config.js b/master/ecosystem.config.js new file mode 100644 index 0000000..0466b2f --- /dev/null +++ b/master/ecosystem.config.js @@ -0,0 +1,18 @@ +module.exports = { + apps: [ + { + name: "hodler", + script: "./src/hodler.js", + cwd: "/orchTest/master", + env_production: { + NODE_ENV: "production" + }, + env_development: { + NODE_ENV: "development" + }, + env_local: { + NODE_ENV: "local" + } + } + ] +} diff --git a/master/package.json b/master/package.json new file mode 100644 index 0000000..98ed400 --- /dev/null +++ b/master/package.json @@ -0,0 +1,26 @@ +{ + "name": "hodler", + "version": "0.0.1", + "description": "", + "main": "./src/hodler.js", + "module": "./src/hodler.js", + "scripts": { + "prod": "NODE_ENV=production pm2 start ecosystem.config.js", + "start": "NODE_ENV=production node ./src/hodler.js", + "dev": "NODE_ENV=development nodemon ./src/hodler.js", + "local": "NODE_ENV=local nodemon ./src/hodler.js" + }, + "keywords": [], + "author": "Marco van Dijk", + "license": "WTFPL", + "dependencies": { + "esm": "^3.2.20", + "express": "^4.17.1", + "node-persist": "^3.1.0", + "npm": "^8.5.2", + "prom-client": "^14.0.1" + }, + "devDependencies": { + "nodemon": "^1.18.10" + } +} diff --git a/master/src/hodler.js b/master/src/hodler.js new file mode 100644 index 0000000..77efc98 --- /dev/null +++ b/master/src/hodler.js @@ -0,0 +1,2 @@ +require = require("esm")(module) +module.exports = require("./server.js") \ No newline at end of file diff --git a/master/src/routes/index.js b/master/src/routes/index.js new file mode 100644 index 0000000..4591eae --- /dev/null +++ b/master/src/routes/index.js @@ -0,0 +1,2 @@ +import masterRouter from './master'; +export { masterRouter }; \ No newline at end of file diff --git a/master/src/routes/master.js b/master/src/routes/master.js new file mode 100644 index 0000000..10f339d --- /dev/null +++ b/master/src/routes/master.js @@ -0,0 +1,227 @@ +import express, { response } from "express"; +import { + PRESHARED_MASTER_KEY +} from "../config"; +const storage = require('node-persist'); +const masterRouter = express.Router(); +const client = require('prom-client'); +const register = new client.Registry(); +const collectDefaultMetrics = client.collectDefaultMetrics; +collectDefaultMetrics({ timeout: 3000 }); +const promLatestLatency = new client.Gauge({ + name: 'orch_latest_latency', + help: 'Latest latency known for a given Orchestrator', + labelNames: ['region', 'orchestrator'] +}); +register.registerMetric(promLatestLatency); +const promLatency = new client.Summary({ + name: 'orch_latency', + help: 'Summary of latency stats', + percentiles: [0.01, 0.1, 0.9, 0.99], + labelNames: ['region'] +}); +register.registerMetric(promLatency); +const promAverageLatency = new client.Gauge({ + name: 'orch_average_latency', + help: 'Average latency for a given Orchestrator', + labelNames: ['region', 'orchestrator'] +}); +register.registerMetric(promAverageLatency); +const promAUptimeScore = new client.Gauge({ + name: 'orch_uptime_score', + help: 'Uptime score for a given orchestrator', + labelNames: ['region', 'orchestrator'] +}); +register.registerMetric(promAUptimeScore); + +let isSynced = false; + +/* + +Incoming stats parsing + +*/ + +masterRouter.post("/collectStats", async (req, res) => { + try { + if (!isSynced){ console.log ("waiting for sync"); res.end('busy'); return;} + const { id, discoveryResults, + responseTime, tag, key } = req.body; + if (!id || !tag || !key) { + console.log("Received malformed data. Aborting stats update..."); + console.log(id, discoveryResults, responseTime, tag, key); + res.send(false); + return; + } + if (PRESHARED_MASTER_KEY != key) { + console.log("Unauthorized"); + res.send(false); + return; + } + console.log('received data for ' + id + ' from ' + tag + ' (' + responseTime + " ms latency)"); + if (responseTime){ + promLatestLatency.set({ region: tag, orchestrator: id }, responseTime); + promLatency.observe({ region: tag }, responseTime); + } + // Save data point + const now = new Date().getTime(); + let thisPing = responseTime; + if (!discoveryResults || !responseTime) { thisPing = null; } + let currentDataList = []; + let orchFound = false; + let regionFound = false; + for (var orchIdx = 0; orchIdx < orchScores.length; orchIdx++) { + if (orchScores[orchIdx].id != id) { continue; } + orchFound = true; + for (var regionIdx = 0; regionIdx < orchScores[orchIdx].data.length; regionIdx++) { + if (orchScores[orchIdx].data[regionIdx].tag != tag) { continue; } + regionFound = true; + if (orchScores[orchIdx].data[regionIdx].data.length > 60) { + orchScores[orchIdx].data[regionIdx].data = orchScores[orchIdx].data[regionIdx].data.slice(1); + } + orchScores[orchIdx].data[regionIdx].data.push({ latency: thisPing, timestamp: now }); + currentDataList = orchScores[orchIdx].data[regionIdx].data; + break; + } + if (!regionFound) { + currentDataList = [{ latency: thisPing, timestamp: now }]; + orchScores[orchIdx].data.push({ tag, data: currentDataList }); + } + break; + } + if (!orchFound) { + currentDataList = [{ tag, data: [{ latency: thisPing, timestamp: now }] }]; + orchScores.push({ id, data: currentDataList }); + } + await storage.setItem('orchScores', orchScores); + // Calc new scores + let prevtime = null; + let uptime = 0; + let downtime = 0; + let pingsum = 0; + let pingpoints = 0; + for (const thisData of currentDataList) { + // Count ping* vars + if (thisData.latency) { + pingsum += thisData.latency; + pingpoints += 1; + promLatestLatency.set({ region: tag, orchestrator: id }, thisData.latency); + promLatency.observe({ region: tag }, thisData.latency); + } + // Only count *time vars if we have timestamps + if (prevtime && thisData.timestamp) { + if (thisData.latency) { + uptime += thisData.timestamp - prevtime; + } else { + downtime += thisData.timestamp - prevtime; + } + } + prevtime = thisData.timestamp; + } + if (pingpoints) { + promAverageLatency.set({ region: tag, orchestrator: id }, pingsum / pingpoints); + } + if (uptime || downtime) { + const score = uptime / (uptime + downtime) + promAUptimeScore.set({ region: tag, orchestrator: id }, score); + } + res.send(true); + } catch (err) { + console.log(err); + res.status(400).send(err); + } +}); + + +/* + +Public endpoints + +*/ + + +masterRouter.get("/prometheus", async (req, res) => { + try { + res.set('Content-Type', register.contentType); + const metrics = await register.metrics(); + res.end(metrics); + } catch (err) { + res.status(400).send(err); + } +}); + +masterRouter.get("/json", async (req, res) => { + try { + res.set('Content-Type', 'application/json'); + res.end(orchScores); + } catch (err) { + res.status(400).send(err); + } +}); + + +/* + +Recover from storage + +*/ + + +let orchScores; + +const recoverStorage = async function () { + await storage.init({ + stringify: JSON.stringify, + parse: JSON.parse, + encoding: 'utf8', + logging: false, + ttl: false, + forgiveParseErrors: false + }); + orchScores = await storage.getItem('orchScores'); + if (!orchScores) { orchScores = []; } + // Init prometheus from storage + for (const thisOrch of orchScores) { + console.log("recovering scores for " + thisOrch.id); + for (const thisRegion of thisOrch.data) { + let prevtime = null; + let uptime = 0; + let downtime = 0; + let pingsum = 0; + let pingpoints = 0; + for (const thisData of thisRegion.data) { + // Count ping* vars + if (thisData.latency) { + pingsum += thisData.latency; + pingpoints += 1; + promLatestLatency.set({ region: thisRegion.tag, orchestrator: thisOrch.id }, thisData.latency); + promLatency.observe({ region: thisRegion.tag }, thisData.latency); + } + // Only count *time vars if we have timestamps + if (prevtime && thisData.timestamp) { + if (thisData.latency) { + uptime += thisData.timestamp - prevtime; + } else { + downtime += thisData.timestamp - prevtime; + } + } + prevtime = thisData.timestamp; + } + if (pingpoints) { + promAverageLatency.set({ region: thisRegion.tag, orchestrator: thisOrch.id }, pingsum / pingpoints); + } + if (uptime || downtime) { + const score = uptime / (uptime + downtime) + promAUptimeScore.set({ region: thisRegion.tag, orchestrator: thisOrch.id }, score); + } + } + } + isSynced = true; +} +recoverStorage(); + + + + + +export default masterRouter; \ No newline at end of file diff --git a/master/src/server.js b/master/src/server.js new file mode 100644 index 0000000..9bf6e37 --- /dev/null +++ b/master/src/server.js @@ -0,0 +1,32 @@ +import express from 'express'; +import { masterRouter } from './routes/index'; +import { MASTER_PORT } from "./config"; +// Env variable which determines which DB to connect to +const { NODE_ENV: mode } = process.env; + +(async () => { + try { + const app = express(); + app.disable('x-powered-by'); + app.use(express.urlencoded({ extended: true })); + app.use(express.json()); + const apiRouter = express.Router(); + app.use('/api', apiRouter); + apiRouter.use('/master', masterRouter); + // Error handler + app.use(function (err, req, res, next) { + res.locals.message = err.message; + // Also log it to the console + console.log(`${err.status || 500} - ${err.message} - ${req.originalUrl} - ${req.method} - ${req.ip}`); + // Render the error page + res.status(err.status || 500); + res.render('error'); + }); + + app.listen(MASTER_PORT, "0.0.0.0", function () { + console.log(`Listening on port ${MASTER_PORT}`); + }); + } catch (err) { + console.log(err); + } +})(); \ No newline at end of file