Compare commits

...

5 Commits

5 changed files with 125 additions and 115 deletions

View File

@ -2,23 +2,22 @@ Requires a Broadcaster with some reserve amount set. TODO
### Config variables
```
{
CONF_MASTER_PORT: Port to upload results to
CONF_MASTER_DOMAIN: Domain to upload results to
CONF_MASTER_PATH: Path to the upload endpoint in case there's a reverse proxy
CONF_FRIENDLY_NAME: Name to attach to the prober, the city name usually works
CONF_REGION: Continent the prober is located in
CONF_LP_REGIONS: Livepeer leaderboard regions which map to locally resolved orchestrators
CONF_CONCURRENCY: Amount of orchestrators to test at the same time
CONF_SLEEPTIME: Time to sleep in the main loop
CONF_MAX_LATENCY: Max GetOrch discovery time before we set it to 0
CONF_ROUNDTIME: Minimum amount of time between batch-checks
CONF_ORCHINFO_TIMEOUT: Timeout between refreshing the active O list
CONF_BROADCASTER: Public address of the broadcaster to mimic
CONF_DNS_TIMEOUT: Timeout between DNS & GEO resolving for orchestrator instances
CONF_PRESHARED_MASTER_KEY: password for uploading to the hodler
CONF_SIGNATURE: Broadcaster signature over a message containing it's own public address
}
exports.CONF_MASTER_PORT: Port to upload results to
exports.CONF_MASTER_DOMAIN: Domain to upload results to
exports.CONF_MASTER_PATH: Path to the upload endpoint in case there's a reverse proxy
exports.CONF_FRIENDLY_NAME: Name to attach to the prober, the city name usually works
exports.CONF_REGION: Continent the prober is located in
exports.CONF_LP_REGIONS: Livepeer leaderboard regions which map to locally resolved orchestrators
exports.CONF_CONCURRENCY: Amount of orchestrators to test at the same time
exports.CONF_SLEEPTIME: Time to sleep in the main loop
exports.CONF_MAX_LATENCY: Max GetOrch discovery time before we set it to 0
exports.CONF_ROUNDTIME: Minimum amount of time between batch-checks
exports.CONF_ORCHINFO_TIMEOUT: Timeout between refreshing the active O list
exports.CONF_BROADCASTER: Public address of the broadcaster to mimic
exports.CONF_DNS_TIMEOUT: Timeout between DNS & GEO resolving for orchestrator instances
exports.CONF_PRESHARED_MASTER_KEY: password for uploading to the hodler
exports.CONF_SIGNATURE: Broadcaster signature over a message containing it's own public address
exports.CONF_GRAPH_URI: Full URL including API key to the Livepeer subgraph;
```
Obtaining the `CONF_SIGNATURE` is unfortunately a pain in the ass. The only way for now it add a print statement to the `livepeer/server/rpc.go/genOrchestratorReq` function in `go-livepeer` and then run the Broadcaster. All web3 libraries prepend a prefix to any message they sign for security purposes, so a script which can generate the signature from the private key would be nice to have here...
@ -31,24 +30,22 @@ Note: this folder has to be placed in `/orchTest/client`, or edit `ecosystem.con
example `config.js`:
```
export const {
CONF_MASTER_PORT = 443,
CONF_MASTER_DOMAIN = "stronk.rocks",
CONF_MASTER_PATH = "/orch/collectStats",
CONF_FRIENDLY_NAME = "Michigan",
CONF_REGION = "Europe",
CONF_LP_REGIONS = ["FRA", "LON", "PRA"],
CONF_CONCURRENCY = 6,
CONF_SLEEPTIME = 2000, //< 2 seconds
CONF_MAX_LATENCY = 2000, //< 2 seconds
CONF_ROUNDTIME = 60000, //< 1 minute
CONF_ORCHINFO_TIMEOUT = 14400000, //< 4 hours
CONF_BROADCASTER = "847791cBF03be716A7fe9Dc8c9Affe17Bd49Ae5e",
CONF_DNS_TIMEOUT = 600000, //< 10 minutes
CONF_PRESHARED_MASTER_KEY = "koekjes",
CONF_SIGNATURE = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
} = process.env;
exports.CONF_MASTER_PORT = 443;
exports.CONF_MASTER_DOMAIN = "stronk.rocks";
exports.CONF_MASTER_PATH = "/orch/collectStats";
exports.CONF_FRIENDLY_NAME = "Michigan";
exports.CONF_REGION = "Europe";
exports.CONF_LP_REGIONS = ["FRA", "LON", "PRA"];
exports.CONF_CONCURRENCY = 6;
exports.CONF_SLEEPTIME = 2000; //< 2 seconds
exports.CONF_MAX_LATENCY = 2000; //< 2 seconds
exports.CONF_ROUNDTIME = 60000; //< 1 minute
exports.CONF_ORCHINFO_TIMEOUT = 14400000; //< 4 hours
exports.CONF_BROADCASTER = "847791cBF03be716A7fe9Dc8c9Affe17Bd49Ae5e";
exports.CONF_DNS_TIMEOUT = 600000; //< 10 minutes
exports.CONF_PRESHARED_MASTER_KEY = "koekjes";
exports.CONF_SIGNATURE = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0];
exports.CONF_GRAPH_URI = "https://gateway-arbitrum.network.thegraph.com/api/REDACTED/subgraphs/id/REDACTED";
```
pm2 start ecosystem.config.cjs
@ -60,23 +57,22 @@ export const {
example `config.js`:
```
export const {
CONF_MASTER_PORT = 42069,
CONF_MASTER_DOMAIN = "127.0.0.1",
CONF_MASTER_PATH = "/api/master/collectStats",
CONF_FRIENDLY_NAME = "Leiden",
CONF_REGION = "Europe",
CONF_LP_REGIONS = ["FRA", "LON", "PRA"],
CONF_CONCURRENCY = 6,
CONF_SLEEPTIME = 2000, //< 2 seconds
CONF_MAX_LATENCY = 2000, //< 2 seconds
CONF_ROUNDTIME = 60000, //< 1 minute
CONF_ORCHINFO_TIMEOUT = 14400000, //< 4 hours
CONF_BROADCASTER = "847791cBF03be716A7fe9Dc8c9Affe17Bd49Ae5e",
CONF_DNS_TIMEOUT = 600000, //< 10 minutes
CONF_PRESHARED_MASTER_KEY = "koekjes",
CONF_SIGNATURE = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
} = process.env;
exports.CONF_MASTER_PORT = 42069;
exports.CONF_MASTER_DOMAIN = "127.0.0.1";
exports.CONF_MASTER_PATH = "/api/master/collectStats";
exports.CONF_FRIENDLY_NAME = "Leiden";
exports.CONF_REGION = "Europe";
exports.CONF_LP_REGIONS = ["FRA", "LON", "PRA"];
exports.CONF_CONCURRENCY = 6;
exports.CONF_SLEEPTIME = 2000; //< 2 seconds
exports.CONF_MAX_LATENCY = 2000; //< 2 seconds
exports.CONF_ROUNDTIME = 60000; //< 1 minute
exports.CONF_ORCHINFO_TIMEOUT = 14400000; //< 4 hours
exports.CONF_BROADCASTER = "847791cBF03be716A7fe9Dc8c9Affe17Bd49Ae5e";
exports.CONF_DNS_TIMEOUT = 600000; //< 10 minutes
exports.CONF_PRESHARED_MASTER_KEY = "koekjes";
exports.CONF_SIGNATURE = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0];
exports.CONF_GRAPH_URI = "https://gateway-arbitrum.network.thegraph.com/api/REDACTED/subgraphs/id/REDACTED"
```
npm run dev

View File

@ -17,7 +17,7 @@
"dependencies": {
"@grpc/grpc-js": "^1.6.8",
"@grpc/proto-loader": "^0.7.0",
"geoip-lite": "^1.4.8",
"geoip-lite": "^1.4.10",
"graphql": "^16.5.0",
"graphql-request": "^4.3.0"
},

View File

@ -1,6 +1,7 @@
syntax = "proto3";
package net;
option go_package = "./net";
// RPC calls implemented by the orchestrator
service Orchestrator {
@ -11,6 +12,13 @@ service Orchestrator {
rpc Ping(PingPong) returns (PingPong);
}
service AIWorker {
// Called by the aiworker to register to an orchestrator. The orchestrator
// notifies registered aiworkers of jobs as they come in.
rpc RegisterAIWorker(RegisterAIWorkerRequest) returns (stream NotifyAIJob);
}
service Transcoder {
// Called by the transcoder to register to an orchestrator. The orchestrator
@ -42,6 +50,9 @@ message OrchestratorRequest {
// Broadcaster's signature over its address
bytes sig = 2;
// Features and constraints required by the broadcaster
Capabilities capabilities = 3;
}
/*
@ -104,10 +115,27 @@ message Capabilities {
// Capacity corresponding to each capability
map<uint32, uint32> capacities = 3;
// Non-binary capability constraints, such as supported ranges.
string version = 4;
Constraints constraints = 5;
// Non-binary constraints.
message Constraints {
// Empty for now
string minVersion = 1;
map<uint32, CapabilityConstraints> PerCapability = 2;
}
// Non-binary capability constraints, such as supported ranges.
message CapabilityConstraints {
message ModelConstraint {
bool warm = 1;
uint32 capacity = 2;
}
map<string, ModelConstraint> models = 1;
}
}
// The orchestrator sends this in response to `GetOrchestrator`, containing
@ -149,34 +177,6 @@ message AuthToken {
int64 expiration = 3;
}
// [EXPERIMENTAL]
// Describes a class that a model is trained to detect
message DetectorClass {
// ID of the class to detect
uint32 class_id = 1;
// Name of the class to detect
string class_name = 2;
}
// [EXPERIMENTAL]
// Describes the scene classification configuration
message SceneClassificationProfile {
// Sample rate of the frames picked by the O for scene detection
uint32 sample_rate = 1;
// List of output classes the model is trained to detect
repeated DetectorClass classes = 2;
}
// [EXPERIMENTAL]
// Describes the content detection configuration
message DetectorProfile {
oneof value {
SceneClassificationProfile scene_classification = 1;
}
}
// Data included by the broadcaster when submitting a segment for transcoding.
message SegData {
@ -205,10 +205,6 @@ message SegData {
// Data for transcoding authentication
AuthToken auth_token = 8;
// [EXPERIMENTAL]
// Detector enabled for this segment
bool detector_enabled = 9;
// Calculate perceptual hash for this segment
bool calc_perceptual_hash = 10;
@ -232,10 +228,6 @@ message SegData {
// Force HW Session Reinit
bool ForceSessionReinit = 38;
// [EXPERIMENTAL]
// Detector profiles to use
repeated DetectorProfile detector_profiles = 36;
}
message SegParameters {
@ -320,21 +312,6 @@ message TranscodedSegmentData {
string perceptual_hash_url = 3;
}
// [EXPERIMENTAL]
// Describes scene classification results
message SceneClassificationData {
// Probability that the segment is detected as a particular classID (uint32)
map<uint32, double> class_probs = 1;
}
// [EXPERIMENTAL]
// Describes detection results
message DetectData {
oneof value {
SceneClassificationData scene_classification = 1;
}
}
// A set of transcoded segments following the profiles specified in the job.
message TranscodeData {
@ -343,10 +320,6 @@ message TranscodeData {
// Signature of the hash of the concatenated hashes
bytes sig = 2;
// [EXPERIMENTAL]
// Detection result data in same order as SegData.detector_profiles
repeated DetectData detections = 3;
}
// Response that a transcoder sends after transcoding a segment.
@ -365,6 +338,12 @@ message TranscodeResult {
OrchestratorInfo info = 16;
}
// Response that an orchestrator sends after processing a payment.
message PaymentResult {
// Used to notify a broadcaster of updated orchestrator information
OrchestratorInfo info = 16;
}
// Sent by the transcoder to register itself to the orchestrator.
message RegisterRequest {
@ -390,6 +369,9 @@ message NotifySegment {
// ID for this particular transcoding task.
int64 taskId = 16;
// Orchestrator identifier for segment metadata
string orchId = 18;
// All fields below are deprecated. May still be populated if necessary
// Deprecated by segData. Job the segment belongs to.
@ -403,6 +385,34 @@ message NotifySegment {
reserved 33; // Formerly "repeated VideoProfile fullProfiles"
}
// Sent by the aiworker to register itself to the orchestrator.
message RegisterAIWorkerRequest {
// Shared secret for auth
string secret = 1;
// AIWorker capabilities
Capabilities capabilities = 2;
}
// Data included by the gateway when submitting a AI job.
message AIJobData {
// pipeline to use for the job
string pipeline = 1;
// AI job request data
bytes requestData = 2;
}
// Sent by the orchestrator to the aiworker
message NotifyAIJob {
// Configuration for the AI job
AIJobData AIJobData = 1;
// ID for this particular AI task.
int64 taskId = 2;
}
// Required parameters for probabilistic micropayment tickets
message TicketParams {
// ETH address of the recipient
@ -466,4 +476,4 @@ message Payment {
// O's last known price
PriceInfo expected_price = 5;
}
}

View File

@ -22,6 +22,7 @@ const {
CONF_DNS_TIMEOUT,
CONF_PRESHARED_MASTER_KEY,
CONF_SIGNATURE,
CONF_GRAPH_URI,
} = require("../config.js");
/*
@ -180,6 +181,7 @@ const discoverOrchestrator = async function (target) {
discoveryResults: {
transcoder: null,
price_info: null,
version: null,
latency: 0,
err: "Took too long to respond. Aborted test...",
},
@ -189,6 +191,7 @@ const discoverOrchestrator = async function (target) {
discoveryResults: {
transcoder: orchestratorInfo.transcoder,
price_info: orchestratorInfo.price_info,
version: orchestratorInfo.capabilities?.version,
latency: elapsed,
err: orchestratorInfo.err,
},
@ -309,10 +312,7 @@ const getOrchestrators = async function () {
}
}
`;
let orchData = await request(
"https://api.thegraph.com/subgraphs/name/livepeer/arbitrum-one",
orchQuery
);
let orchData = await request(CONF_GRAPH_URI, orchQuery);
orchData = orchData.transcoders;
if (!orchData) {
console.log("Thegraph is probably acting up...");

View File

@ -296,6 +296,7 @@ const onOrchUpdate = async function (id, obj, tag, region, livepeer_regions) {
price: -1,
latitude: -1,
longitude: -1,
version: "",
probedFrom: {},
regions: {},
livepeer_regions: {},
@ -365,6 +366,9 @@ const onOrchUpdate = async function (id, obj, tag, region, livepeer_regions) {
obj.discovery.price_info.pricePerUnit /
obj.discovery.price_info.pixelsPerUnit;
}
if (obj.discovery.version) {
newInstance.version = obj.discovery.version;
}
if (obj.resolv.geoLookup) {
newInstance.latitude = obj.resolv.geoLookup.latitude;
newInstance.longitude = obj.resolv.geoLookup.longitude;