Init: GetOrchestrator grpc requests, stores latency and uptime stats per region and orchestrator

This commit is contained in:
Marco van Dijk 2022-08-08 19:23:09 +02:00
commit d18c4a6cc8
19 changed files with 1249 additions and 0 deletions

3
.gitignore vendored Normal file
View File

@ -0,0 +1,3 @@
master/*.md
client/*.md
master/.node-persist/storage/*

11
README.md Normal file
View File

@ -0,0 +1,11 @@
This program consists of three components:
- A master which consolidates orchestrator statistics from various clients
- Converts raw results of grpc calls into prometheus data points
- TODO integration with email / telegram
- TODO api to subscribe for notifications and cancel it
- TODO read test stream scores
- TODO allow custom names (ENS, configure via frontend) to be used over orch address
- A client which pushes stats to master
- Gets the list of active Orchestrators from the Livepeer subgraph
- Does GetOrchestrator calls to all Orchestrators one by one
- A frontend where users can (un)subscribe to notifications

4
client/.gitignore vendored Normal file
View File

@ -0,0 +1,4 @@
/node_modules
/src/config.js
genSig.js
package-lock.json

66
client/README.md Normal file
View File

@ -0,0 +1,66 @@
Requires a Broadcaster with some reserve amount set. TODO
### Config variables
```
{
CLIENT_PORT: what port the client accepts API requests on
MASTER_PORT: what port the master should be listening on
MASTER_DOMAIN: the domain on which the master is hosted
MASTER_PATH: the path to the collectStats endpoint. Should be `/api/master/collectStats` unless you are running a reverse proxy
FRIENDLY_NAME: `region` label which gets attached to collected data
PRESHARED_MASTER_KEY: must be the same as the `PRESHARED_MASTER_KEY` on the master
CONF_SLEEPTIME: time between testing Orchestrators in milliseconds
CONF_ORCHINFO_TIMEOUT: timeout for refreshing the list of active orchestrators in milliseconds
CONF_BROADCASTER: eth address of the broadcaster
CONT_SIG = broadcasters' signature of their eth address - reach out on how to get this
}
```
### Run production
Note: this folder has to be placed in `/orchTest/client`
npm install
nano src/config.js
example `config.js`:
```
export const {
CLIENT_PORT = 42068,
MASTER_PORT = 443,
MASTER_DOMAIN = "nframe.nl",
MASTER_PATH = "/orch/collectStats",
FRIENDLY_NAME = "Chicago",
PRESHARED_MASTER_KEY = "koekjes",
CONF_SLEEPTIME = 2000,
CONF_ORCHINFO_TIMEOUT = 600000,
CONF_BROADCASTER = "847791cBF03be716A7fe9Dc8c9Affe17Bd49Ae5e",
CONT_SIG = Buffer [array of uint8]
} = process.env;
```
pm2 start ecosystem.config.js
### Run development
npm install
nano src/config.js
example `config.js`:
```
export const {
CLIENT_PORT = 42068,
MASTER_PORT = 42069,
MASTER_DOMAIN = "127.0.0.1",
MASTER_PATH = "/api/master/collectStats",
FRIENDLY_NAME = "Leiden",
PRESHARED_MASTER_KEY = "koekjes",
CONF_SLEEPTIME = 5000,
CONF_ORCHINFO_TIMEOUT = 100000,
CONF_BROADCASTER = "847791cBF03be716A7fe9Dc8c9Affe17Bd49Ae5e",
CONT_SIG = Buffer [array of uint8]
} = process.env;
```
npm run dev

View File

@ -0,0 +1,18 @@
module.exports = {
apps: [
{
name: "orchProber",
script: "./src/orchProber.js",
cwd: "/orchTest/client",
env_production: {
NODE_ENV: "production"
},
env_development: {
NODE_ENV: "development"
},
env_local: {
NODE_ENV: "local"
}
}
]
}

29
client/package.json Normal file
View File

@ -0,0 +1,29 @@
{
"name": "orchProber",
"version": "0.0.1",
"description": "",
"main": "./src/orchProber.js",
"module": "./src/orchProber.js",
"scripts": {
"prod": "NODE_ENV=production pm2 start ecosystem.config.js",
"start": "NODE_ENV=production node ./src/orchProber.js",
"dev": "NODE_ENV=development nodemon ./src/orchProber.js",
"local": "NODE_ENV=local nodemon ./src/orchProber.js"
},
"keywords": [],
"author": "Marco van Dijk",
"license": "WTFPL",
"dependencies": {
"@grpc/grpc-js": "^1.6.8",
"@grpc/proto-loader": "^0.7.0",
"esm": "^3.2.20",
"express": "^4.17.1",
"graphql": "^16.5.0",
"graphql-request": "^4.3.0",
"npm": "^8.5.2",
"web3": "^1.7.5"
},
"devDependencies": {
"nodemon": "^1.18.10"
}
}

33
client/src/client.js Normal file
View File

@ -0,0 +1,33 @@
import express from 'express';
import { orchTesterRouter } from './routes/index';
import { CLIENT_PORT } from "./config";
// Env variable which determines which DB to connect to
const { NODE_ENV: mode } = process.env;
(async () => {
try {
const app = express();
app.disable('x-powered-by');
app.use(express.urlencoded({ extended: true }));
app.use(express.json());
const apiRouter = express.Router();
app.use('/api', apiRouter);
apiRouter.use('/tester', orchTesterRouter);
// Error handler
app.use(function (err, req, res, next) {
res.locals.message = err.message;
// Also log it to the console
console.log(`${err.status || 500} - ${err.message} - ${req.originalUrl} - ${req.method} - ${req.ip}`);
// Render the error page
res.status(err.status || 500);
res.render('error');
});
app.listen(CLIENT_PORT, "0.0.0.0", function () {
console.log(`Listening on port ${CLIENT_PORT}`);
});
} catch (err) {
console.log(err);
}
})();

2
client/src/orchProber.js Normal file
View File

@ -0,0 +1,2 @@
require = require("esm")(module)
module.exports = require("./client.js")

View File

@ -0,0 +1,456 @@
syntax = "proto3";
package net;
// RPC calls implemented by the orchestrator
service Orchestrator {
// Called by the broadcaster to request transcoder info from an orchestrator.
rpc GetOrchestrator(OrchestratorRequest) returns (OrchestratorInfo);
rpc Ping(PingPong) returns (PingPong);
}
service Transcoder {
// Called by the transcoder to register to an orchestrator. The orchestrator
// notifies registered transcoders of segments as they come in.
rpc RegisterTranscoder(RegisterRequest) returns (stream NotifySegment);
}
message PingPong {
// Implementation defined
bytes value = 1;
}
// This request is sent by the broadcaster in `GetTranscoder` to request
// information on which transcoder to use.
message OrchestratorRequest {
// Ethereum address of the broadcaster
bytes address = 1;
// Broadcaster's signature over its address
bytes sig = 2;
}
/*
OSInfo needed to negotiate storages that will be used.
It carries info needed to write to the storage.
*/
message OSInfo {
enum StorageType {
DIRECT = 0;
S3 = 1;
GOOGLE = 2;
}
// Storage type: direct, s3, ipfs.
StorageType storageType = 1;
S3OSInfo s3info = 16;
}
message S3OSInfo {
// Host to use to connect to S3
string host = 1;
// Key (prefix) to use when uploading the object.
string key = 2;
// POST policy that S3 owner node creates to give write access to other node.
string policy = 3;
// Signature for POST policy.
string signature = 4;
// Needed for POST policy.
string credential = 5;
// Needed for POST policy.
string xAmzDate = 6;
}
// PriceInfo conveys pricing info for transcoding services
message PriceInfo {
// price in wei
int64 pricePerUnit = 1;
// Pixels covered in the price
// Set price to 1 wei and pixelsPerUnit > 1 to have a smaller price granularity per pixel than 1 wei
int64 pixelsPerUnit = 2;
}
message Capabilities {
// Bit string of supported features - one bit per feature
repeated uint64 bitstring = 1;
// Bit string of features that are required to be supported
repeated uint64 mandatories = 2;
// Capacity corresponding to each capability
map<uint32, uint32> capacities = 3;
// Non-binary capability constraints, such as supported ranges.
message Constraints {
// Empty for now
}
}
// The orchestrator sends this in response to `GetOrchestrator`, containing
// miscellaneous data related to the job.
message OrchestratorInfo {
// URI of the transcoder to use for submitting segments.
string transcoder = 1;
// Parameters for probabilistic micropayment tickets
TicketParams ticket_params = 2;
// Price Info containing the price per pixel to transcode
PriceInfo price_info = 3;
// ETH address that should sign transcoded results
bytes address = 4;
// Features and constraints supported by the orchestrator
Capabilities capabilities = 5;
// Data for transcoding authentication
AuthToken auth_token = 6;
// Orchestrator returns info about own input object storage, if it wants it to be used.
repeated OSInfo storage = 32;
}
// Data for transcoding authentication that is included in the OrchestratorInfo message during discovery
message AuthToken {
// Record used to authenticate for a transcode session
// Opaque to the receiver
bytes token = 1;
// ID of the transcode session that the token is authenticating for
string session_id = 2;
// Timestamp when the token expires
int64 expiration = 3;
}
// [EXPERIMENTAL]
// Describes a class that a model is trained to detect
message DetectorClass {
// ID of the class to detect
uint32 class_id = 1;
// Name of the class to detect
string class_name = 2;
}
// [EXPERIMENTAL]
// Describes the scene classification configuration
message SceneClassificationProfile {
// Sample rate of the frames picked by the O for scene detection
uint32 sample_rate = 1;
// List of output classes the model is trained to detect
repeated DetectorClass classes = 2;
}
// [EXPERIMENTAL]
// Describes the content detection configuration
message DetectorProfile {
oneof value {
SceneClassificationProfile scene_classification = 1;
}
}
// Data included by the broadcaster when submitting a segment for transcoding.
message SegData {
// Manifest ID this segment belongs to
bytes manifestId = 1;
// Sequence number of the segment to be transcoded
int64 seq = 2;
// Hash of the segment data to be transcoded
bytes hash = 3;
// Transcoding profiles to use
bytes profiles = 4;
// Broadcaster signature for the segment. Corresponds to:
// broadcaster.sign(manifestId | seqNo | dataHash | profiles)
bytes sig = 5;
// Duration of the segment to be transcoded, in milliseconds
int32 duration = 6;
// Capabilities used by this segment.
Capabilities capabilities = 7;
// Data for transcoding authentication
AuthToken auth_token = 8;
// [EXPERIMENTAL]
// Detector enabled for this segment
bool detector_enabled = 9;
// Calculate perceptual hash for this segment
bool calc_perceptual_hash = 10;
// Broadcaster's preferred storage medium(s)
// XXX should we include this in a sig somewhere until certs are authenticated?
repeated OSInfo storage = 32;
// Transcoding profiles to use. Supersedes `profiles` field
// Deprecated by `fullProfiles2` but may still be used for mpegts formats
repeated VideoProfile fullProfiles = 33;
// Transcoding profiles to use. Supersedes `fullProfiles` field
// Deprecated by `fullProfiles3` but may still be used for integer FPS
repeated VideoProfile fullProfiles2 = 34;
// Transcoding profiles to use. Supersedes `fullProfiles2` field
repeated VideoProfile fullProfiles3 = 35;
// Transcoding parameters specific to this segment
SegParameters segment_parameters = 37;
// [EXPERIMENTAL]
// Detector profiles to use
repeated DetectorProfile detector_profiles = 36;
}
message SegParameters {
// Start timestamp from which to start encoding
// Milliseconds, from start of the file
uint64 from = 1;
// Skip all frames after that timestamp
// Milliseconds, from start of the file
uint64 to = 2;
}
message VideoProfile {
// Name of VideoProfile
string name = 16;
// Width of VideoProfile
int32 width = 17;
// Height of VideoProfile
int32 height = 18;
// Bitrate of VideoProfile
int32 bitrate =19;
// FPS of VideoProfile
uint32 fps = 20;
// Desired output format
enum Format {
MPEGTS = 0;
MP4 = 1;
}
Format format = 21;
// FPS Denominator of VideoProfile
uint32 fpsDen = 22;
enum Profile {
ENCODER_DEFAULT = 0;
H264_BASELINE = 1;
H264_MAIN = 2;
H264_HIGH = 3;
H264_CONSTRAINED_HIGH = 4;
}
// Desired codec profile
Profile profile = 23;
// GOP interval
int32 gop = 24;
enum VideoCodec {
H264 = 0;
H265 = 1;
VP8 = 2;
VP9 = 3;
}
// Encoder (video codec)
VideoCodec encoder = 25;
int32 colorDepth = 26;
enum ChromaSubsampling {
CHROMA_420 = 0;
CHROMA_422 = 1;
CHROMA_444 = 2;
}
ChromaSubsampling chromaFormat = 27;
}
// Individual transcoded segment data.
message TranscodedSegmentData {
// URL where the transcoded data can be downloaded from.
string url = 1;
// Amount of pixels processed (output pixels)
int64 pixels = 2;
// URL where the perceptual hash data can be downloaded from (can be empty)
string perceptual_hash_url = 3;
}
// [EXPERIMENTAL]
// Describes scene classification results
message SceneClassificationData {
// Probability that the segment is detected as a particular classID (uint32)
map<uint32, double> class_probs = 1;
}
// [EXPERIMENTAL]
// Describes detection results
message DetectData {
oneof value {
SceneClassificationData scene_classification = 1;
}
}
// A set of transcoded segments following the profiles specified in the job.
message TranscodeData {
// Transcoded data, in the order specified in the job options
repeated TranscodedSegmentData segments = 1;
// Signature of the hash of the concatenated hashes
bytes sig = 2;
// [EXPERIMENTAL]
// Detection result data in same order as SegData.detector_profiles
repeated DetectData detections = 3;
}
// Response that a transcoder sends after transcoding a segment.
message TranscodeResult {
// Sequence number of the transcoded results.
int64 seq = 1;
// Result of transcoding can be an error, or successful with more info
oneof result {
string error = 2;
TranscodeData data = 3;
}
// Used to notify a broadcaster of updated orchestrator information
OrchestratorInfo info = 16;
}
// Sent by the transcoder to register itself to the orchestrator.
message RegisterRequest {
// Shared secret for auth
string secret = 1;
// Transcoder capacity
int64 capacity = 2;
// Transcoder capabilities
Capabilities capabilities = 3;
}
// Sent by the orchestrator to the transcoder
message NotifySegment {
// URL of the segment to transcode.
string url = 1;
// Configuration for the transcoding job
SegData segData = 3;
// ID for this particular transcoding task.
int64 taskId = 16;
// All fields below are deprecated. May still be populated if necessary
// Deprecated by segData. Job the segment belongs to.
reserved 2; // Formerly "string job"
// Deprecated by fullProfiles. Set of presets to transcode into.
// Should be set to an invalid value to induce failures
bytes profiles = 17;
// Deprecated by segData. Transcoding configuration to use.
reserved 33; // Formerly "repeated VideoProfile fullProfiles"
}
// Required parameters for probabilistic micropayment tickets
message TicketParams {
// ETH address of the recipient
bytes recipient = 1;
// Pay out (in Wei) to the recipient if the ticket wins
bytes face_value = 2;
// Probability that the ticket wins
bytes win_prob = 3;
// 32 byte keccak-256 hash commitment to a random number provided
// by the recipient
bytes recipient_rand_hash = 4;
// Value generated by recipient that the recipient can use
// to derive the random number corresponding to the recipient's hash commitment
bytes seed = 5;
// Block number at which the current set of advertised TicketParams is no longer valid
bytes expiration_block = 6;
// Expected ticket expiration params
TicketExpirationParams expiration_params = 7;
}
// Sender Params (nonces and signatures)
message TicketSenderParams {
// Monotonically increasing counter that makes the ticket
// unique relative to a particular hash commitment to a recipient's random number
uint32 sender_nonce = 1;
// Sender signature over the ticket
bytes sig = 2;
}
// Ticket params for expiration related validation
message TicketExpirationParams {
// Round during which tickets are created
int64 creation_round = 1;
// Block hash associated with creation_round
bytes creation_round_block_hash = 2;
}
// Payment for transcoding video segments
// A payment can constitute of multiple tickets
// A broadcaster might need to send multiple tickets to top up his credit with an Orchestrator
message Payment {
// Probabilistic micropayment ticket parameters
// These remain the same even when sending multiple tickets
TicketParams ticket_params = 1;
// ETH address of the sender
bytes sender = 2;
// Ticket params for expiration related validation
TicketExpirationParams expiration_params = 3;
repeated TicketSenderParams ticket_sender_params = 4;
// O's last known price
PriceInfo expected_price = 5;
}

View File

@ -0,0 +1,2 @@
import orchTesterRouter from './orchTester';
export { orchTesterRouter };

View File

@ -0,0 +1,276 @@
import express from "express";
const orchTesterRouter = express.Router();
import {
MASTER_DOMAIN, MASTER_PORT, MASTER_PATH, FRIENDLY_NAME,
PRESHARED_MASTER_KEY, CONF_SLEEPTIME, CONT_SIG,
CONF_ORCHINFO_TIMEOUT, CONF_BROADCASTER
} from "../config";
process.env.NODE_TLS_REJECT_UNAUTHORIZED = "0";
/*
INIT
imported modules
*/
import { request, gql } from 'graphql-request';
const https = require('https');
const http = require('http');
var grpc = require('@grpc/grpc-js');
var protoLoader = require('@grpc/proto-loader');
var packageDefinition = protoLoader.loadSync(
"src/proto/livepeer.proto",
{
keepCase: true,
longs: String,
enums: String,
defaults: true,
oneofs: true
});
var livepeerProto = grpc.loadPackageDefinition(packageDefinition).net;
const ssl_creds = grpc.credentials.createSsl(null, null, null, {
checkServerIdentity: () => undefined,
});
/*
Global helper functions
*/
function sleep(ms) {
return new Promise((resolve) => {
setTimeout(resolve, ms);
});
}
function shuffle(array) {
let currentIndex = array.length, randomIndex;
while (currentIndex != 0) {
randomIndex = Math.floor(Math.random() * currentIndex);
currentIndex--;
[array[currentIndex], array[randomIndex]] = [
array[randomIndex], array[currentIndex]];
}
return array;
}
/*
Refreshing active orchestrators
Pulls this data from the Livepeer subgraph (https://api.thegraph.com/subgraphs/name/livepeer/arbitrum-one/graphql)
We might want to switch to reading directly from the blockchain
but this would require constant watching for uri updates which is a pain to implement
*/
var activeOrchestrators = [];
let lastUpdated = 0;
/// Does a GQL query to the subgraph for orchestrator data
const getOrchestrators = async function () {
console.log("Getting orchestrator data from the subgraph...");
try {
const orchQuery = gql`{
transcoders(where: {active: true}, first: 1000) {
id
status
totalStake
serviceURI
}
}`;
let orchData = await request("https://api.thegraph.com/subgraphs/name/livepeer/arbitrum-one", orchQuery);
orchData = orchData.transcoders;
if (!orchData) {
console.log("Thegraph is probably acting up...");
return null;
}
return orchData;
} catch (err) {
console.log(err);
console.log("Thegraph is probably acting up...");
return null;
}
}
/// Refreshes orchestrator data if the subgraph is available
const refreshOrchCache = async function () {
const now = new Date().getTime();
// Update cmc once their data has expired
if (now - lastUpdated > CONF_ORCHINFO_TIMEOUT) {
const data = await getOrchestrators();
if (data) {
activeOrchestrators = data;
lastUpdated = now;
}
}
}
/*
Doing grpc calls to an orchestrator
*/
let currentPool = [];
const postStatistics = async function (id, discoveryResults, responseTime) {
console.log("Posting stats for " + id + " (ping " + responseTime + " ms)");
// TODO look at response and log error?
var postData = JSON.stringify({
id, discoveryResults, responseTime,
tag: FRIENDLY_NAME,
key: PRESHARED_MASTER_KEY
});
var options = {
hostname: MASTER_DOMAIN,
port: MASTER_PORT,
path: MASTER_PATH,
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Content-Length': postData.length
}
};
var req;
if (MASTER_DOMAIN == "127.0.0.1" || MASTER_DOMAIN == "localhost") {
req = http.request(options, (res) => {
// console.log('statusCode:', res.statusCode);
// console.log('headers:', res.headers);
res.on('data', (d) => {
process.stdout.write('Received response ' + d + ' from ' + MASTER_DOMAIN);
});
});
} else {
req = https.request(options, (res) => {
// console.log('statusCode:', res.statusCode);
// console.log('headers:', res.headers);
res.on('data', (d) => {
process.stdout.write('Received response ' + d + ' from ' + MASTER_DOMAIN);
});
});
}
req.on('error', (e) => {
console.error('err', e);
});
req.write(postData);
req.end();
}
function hexToBytes(hex) {
for (var bytes = [], c = 0; c < hex.length; c += 2)
bytes.push(parseInt(hex.substr(c, 2), 16));
return bytes;
}
const discoverOrchestrator = async function (target) {
var client = new livepeerProto.Orchestrator(target, ssl_creds);
const addr = CONF_BROADCASTER;
var receivedResults = false;
var orchestratorInfo;
const start = new Date().getTime();
var elapsed;
await client.GetOrchestrator({
address: hexToBytes(addr),
sig: CONT_SIG
}, function (err, res) {
if (err) {
console.log("discovery err: ", err)
}
elapsed = new Date().getTime() - start;
orchestratorInfo = res;
receivedResults = true;
});
while (!receivedResults && new Date().getTime() - start < 5000) { await sleep(200); }
return { discoveryResults: orchestratorInfo, elapsed };
}
const pingOrchestrator = async function (target) {
var client = new livepeerProto.Orchestrator(target, ssl_creds);
var receivedResults = false;
var pingPong;
const start = new Date().getTime();
var elapsed;
await client.Ping({ value: "koekjes" }, function (err, res) {
if (err) {
console.log("Ping err: ", err)
}
elapsed = new Date().getTime() - start;
pingPong = res;
receivedResults = true;
});
while (!receivedResults && new Date().getTime() - start < 5000) { await sleep(200); }
return { pingResults: pingPong, elapsed };
}
const testOrchestrator = async function (id, target) {
if (!id.length || !target.length) { return; }
target = target.replace(/^https?:\/\//, '');
console.log("Target is " + target);
const { discoveryResults, elapsed } = await discoverOrchestrator(target);
await postStatistics(id, discoveryResults, elapsed);
}
const refreshPool = function () {
currentPool = [];
for (const thisObj of activeOrchestrators) {
currentPool.push({ id: thisObj.id, target: thisObj.serviceURI });
}
shuffle(currentPool);
}
const pingNextOrch = async function () {
if (!currentPool.length) { refreshPool(); }
let currentOrch = currentPool.splice(0, 1)[0];
if (!currentOrch.id || !currentOrch.target) {
console.log("err: \n", currentOrch);
return;
}
await testOrchestrator(currentOrch.id, currentOrch.target);
}
/*
Main Loop
Maybe we shouldn't use nodejs as a client...
*/
let cycle = 0;
let isSyncing = false;
// Does the actual looping over last parsed block -> latest block in chain
const handleSync = async function () {
try {
cycle++;
console.log('Starting new cycle #' + cycle);
isSyncing = true;
await refreshOrchCache();
await pingNextOrch();
isSyncing = false;
setTimeout(() => {
handleSync();
}, CONF_SLEEPTIME);
return;
}
catch (err) {
console.log(err);
isSyncing = false;
setTimeout(() => {
handleSync();
}, CONF_SLEEPTIME)
}
};
if (!isSyncing) {
console.log("Starting main loop");
handleSync();
}
export default orchTesterRouter;

3
master/.gitignore vendored Normal file
View File

@ -0,0 +1,3 @@
/node_modules
/src/config.js
package-lock.json

39
master/README.md Normal file
View File

@ -0,0 +1,39 @@
### Config variables
```
{
MASTER_PORT: what port the master is listening on
PRESHARED_MASTER_KEY: clients must provide this value when uploading stats
}
```
### Run production
Note: this folder has to be placed in `/orchTest/master`
npm install
nano src/config.js
example `config.js`:
```
export const {
MASTER_PORT = 42069,
PRESHARED_MASTER_KEY = "koekjes"
} = process.env;
```
pm2 start ecosystem.config.js
### Run development
npm install
nano src/config.js
example `config.js`:
```
export const {
MASTER_PORT = 42069,
PRESHARED_MASTER_KEY = "koekjes"
} = process.env;
```
npm run dev

View File

@ -0,0 +1,18 @@
module.exports = {
apps: [
{
name: "hodler",
script: "./src/hodler.js",
cwd: "/orchTest/master",
env_production: {
NODE_ENV: "production"
},
env_development: {
NODE_ENV: "development"
},
env_local: {
NODE_ENV: "local"
}
}
]
}

26
master/package.json Normal file
View File

@ -0,0 +1,26 @@
{
"name": "hodler",
"version": "0.0.1",
"description": "",
"main": "./src/hodler.js",
"module": "./src/hodler.js",
"scripts": {
"prod": "NODE_ENV=production pm2 start ecosystem.config.js",
"start": "NODE_ENV=production node ./src/hodler.js",
"dev": "NODE_ENV=development nodemon ./src/hodler.js",
"local": "NODE_ENV=local nodemon ./src/hodler.js"
},
"keywords": [],
"author": "Marco van Dijk",
"license": "WTFPL",
"dependencies": {
"esm": "^3.2.20",
"express": "^4.17.1",
"node-persist": "^3.1.0",
"npm": "^8.5.2",
"prom-client": "^14.0.1"
},
"devDependencies": {
"nodemon": "^1.18.10"
}
}

2
master/src/hodler.js Normal file
View File

@ -0,0 +1,2 @@
require = require("esm")(module)
module.exports = require("./server.js")

View File

@ -0,0 +1,2 @@
import masterRouter from './master';
export { masterRouter };

227
master/src/routes/master.js Normal file
View File

@ -0,0 +1,227 @@
import express, { response } from "express";
import {
PRESHARED_MASTER_KEY
} from "../config";
const storage = require('node-persist');
const masterRouter = express.Router();
const client = require('prom-client');
const register = new client.Registry();
const collectDefaultMetrics = client.collectDefaultMetrics;
collectDefaultMetrics({ timeout: 3000 });
const promLatestLatency = new client.Gauge({
name: 'orch_latest_latency',
help: 'Latest latency known for a given Orchestrator',
labelNames: ['region', 'orchestrator']
});
register.registerMetric(promLatestLatency);
const promLatency = new client.Summary({
name: 'orch_latency',
help: 'Summary of latency stats',
percentiles: [0.01, 0.1, 0.9, 0.99],
labelNames: ['region']
});
register.registerMetric(promLatency);
const promAverageLatency = new client.Gauge({
name: 'orch_average_latency',
help: 'Average latency for a given Orchestrator',
labelNames: ['region', 'orchestrator']
});
register.registerMetric(promAverageLatency);
const promAUptimeScore = new client.Gauge({
name: 'orch_uptime_score',
help: 'Uptime score for a given orchestrator',
labelNames: ['region', 'orchestrator']
});
register.registerMetric(promAUptimeScore);
let isSynced = false;
/*
Incoming stats parsing
*/
masterRouter.post("/collectStats", async (req, res) => {
try {
if (!isSynced){ console.log ("waiting for sync"); res.end('busy'); return;}
const { id, discoveryResults,
responseTime, tag, key } = req.body;
if (!id || !tag || !key) {
console.log("Received malformed data. Aborting stats update...");
console.log(id, discoveryResults, responseTime, tag, key);
res.send(false);
return;
}
if (PRESHARED_MASTER_KEY != key) {
console.log("Unauthorized");
res.send(false);
return;
}
console.log('received data for ' + id + ' from ' + tag + ' (' + responseTime + " ms latency)");
if (responseTime){
promLatestLatency.set({ region: tag, orchestrator: id }, responseTime);
promLatency.observe({ region: tag }, responseTime);
}
// Save data point
const now = new Date().getTime();
let thisPing = responseTime;
if (!discoveryResults || !responseTime) { thisPing = null; }
let currentDataList = [];
let orchFound = false;
let regionFound = false;
for (var orchIdx = 0; orchIdx < orchScores.length; orchIdx++) {
if (orchScores[orchIdx].id != id) { continue; }
orchFound = true;
for (var regionIdx = 0; regionIdx < orchScores[orchIdx].data.length; regionIdx++) {
if (orchScores[orchIdx].data[regionIdx].tag != tag) { continue; }
regionFound = true;
if (orchScores[orchIdx].data[regionIdx].data.length > 60) {
orchScores[orchIdx].data[regionIdx].data = orchScores[orchIdx].data[regionIdx].data.slice(1);
}
orchScores[orchIdx].data[regionIdx].data.push({ latency: thisPing, timestamp: now });
currentDataList = orchScores[orchIdx].data[regionIdx].data;
break;
}
if (!regionFound) {
currentDataList = [{ latency: thisPing, timestamp: now }];
orchScores[orchIdx].data.push({ tag, data: currentDataList });
}
break;
}
if (!orchFound) {
currentDataList = [{ tag, data: [{ latency: thisPing, timestamp: now }] }];
orchScores.push({ id, data: currentDataList });
}
await storage.setItem('orchScores', orchScores);
// Calc new scores
let prevtime = null;
let uptime = 0;
let downtime = 0;
let pingsum = 0;
let pingpoints = 0;
for (const thisData of currentDataList) {
// Count ping* vars
if (thisData.latency) {
pingsum += thisData.latency;
pingpoints += 1;
promLatestLatency.set({ region: tag, orchestrator: id }, thisData.latency);
promLatency.observe({ region: tag }, thisData.latency);
}
// Only count *time vars if we have timestamps
if (prevtime && thisData.timestamp) {
if (thisData.latency) {
uptime += thisData.timestamp - prevtime;
} else {
downtime += thisData.timestamp - prevtime;
}
}
prevtime = thisData.timestamp;
}
if (pingpoints) {
promAverageLatency.set({ region: tag, orchestrator: id }, pingsum / pingpoints);
}
if (uptime || downtime) {
const score = uptime / (uptime + downtime)
promAUptimeScore.set({ region: tag, orchestrator: id }, score);
}
res.send(true);
} catch (err) {
console.log(err);
res.status(400).send(err);
}
});
/*
Public endpoints
*/
masterRouter.get("/prometheus", async (req, res) => {
try {
res.set('Content-Type', register.contentType);
const metrics = await register.metrics();
res.end(metrics);
} catch (err) {
res.status(400).send(err);
}
});
masterRouter.get("/json", async (req, res) => {
try {
res.set('Content-Type', 'application/json');
res.end(orchScores);
} catch (err) {
res.status(400).send(err);
}
});
/*
Recover from storage
*/
let orchScores;
const recoverStorage = async function () {
await storage.init({
stringify: JSON.stringify,
parse: JSON.parse,
encoding: 'utf8',
logging: false,
ttl: false,
forgiveParseErrors: false
});
orchScores = await storage.getItem('orchScores');
if (!orchScores) { orchScores = []; }
// Init prometheus from storage
for (const thisOrch of orchScores) {
console.log("recovering scores for " + thisOrch.id);
for (const thisRegion of thisOrch.data) {
let prevtime = null;
let uptime = 0;
let downtime = 0;
let pingsum = 0;
let pingpoints = 0;
for (const thisData of thisRegion.data) {
// Count ping* vars
if (thisData.latency) {
pingsum += thisData.latency;
pingpoints += 1;
promLatestLatency.set({ region: thisRegion.tag, orchestrator: thisOrch.id }, thisData.latency);
promLatency.observe({ region: thisRegion.tag }, thisData.latency);
}
// Only count *time vars if we have timestamps
if (prevtime && thisData.timestamp) {
if (thisData.latency) {
uptime += thisData.timestamp - prevtime;
} else {
downtime += thisData.timestamp - prevtime;
}
}
prevtime = thisData.timestamp;
}
if (pingpoints) {
promAverageLatency.set({ region: thisRegion.tag, orchestrator: thisOrch.id }, pingsum / pingpoints);
}
if (uptime || downtime) {
const score = uptime / (uptime + downtime)
promAUptimeScore.set({ region: thisRegion.tag, orchestrator: thisOrch.id }, score);
}
}
}
isSynced = true;
}
recoverStorage();
export default masterRouter;

32
master/src/server.js Normal file
View File

@ -0,0 +1,32 @@
import express from 'express';
import { masterRouter } from './routes/index';
import { MASTER_PORT } from "./config";
// Env variable which determines which DB to connect to
const { NODE_ENV: mode } = process.env;
(async () => {
try {
const app = express();
app.disable('x-powered-by');
app.use(express.urlencoded({ extended: true }));
app.use(express.json());
const apiRouter = express.Router();
app.use('/api', apiRouter);
apiRouter.use('/master', masterRouter);
// Error handler
app.use(function (err, req, res, next) {
res.locals.message = err.message;
// Also log it to the console
console.log(`${err.status || 500} - ${err.message} - ${req.originalUrl} - ${req.method} - ${req.ip}`);
// Render the error page
res.status(err.status || 500);
res.render('error');
});
app.listen(MASTER_PORT, "0.0.0.0", function () {
console.log(`Listening on port ${MASTER_PORT}`);
});
} catch (err) {
console.log(err);
}
})();