Compare commits

..

5 Commits

5 changed files with 125 additions and 115 deletions

View File

@ -2,23 +2,22 @@ Requires a Broadcaster with some reserve amount set. TODO
### Config variables ### Config variables
``` ```
{ exports.CONF_MASTER_PORT: Port to upload results to
CONF_MASTER_PORT: Port to upload results to exports.CONF_MASTER_DOMAIN: Domain to upload results to
CONF_MASTER_DOMAIN: Domain to upload results to exports.CONF_MASTER_PATH: Path to the upload endpoint in case there's a reverse proxy
CONF_MASTER_PATH: Path to the upload endpoint in case there's a reverse proxy exports.CONF_FRIENDLY_NAME: Name to attach to the prober, the city name usually works
CONF_FRIENDLY_NAME: Name to attach to the prober, the city name usually works exports.CONF_REGION: Continent the prober is located in
CONF_REGION: Continent the prober is located in exports.CONF_LP_REGIONS: Livepeer leaderboard regions which map to locally resolved orchestrators
CONF_LP_REGIONS: Livepeer leaderboard regions which map to locally resolved orchestrators exports.CONF_CONCURRENCY: Amount of orchestrators to test at the same time
CONF_CONCURRENCY: Amount of orchestrators to test at the same time exports.CONF_SLEEPTIME: Time to sleep in the main loop
CONF_SLEEPTIME: Time to sleep in the main loop exports.CONF_MAX_LATENCY: Max GetOrch discovery time before we set it to 0
CONF_MAX_LATENCY: Max GetOrch discovery time before we set it to 0 exports.CONF_ROUNDTIME: Minimum amount of time between batch-checks
CONF_ROUNDTIME: Minimum amount of time between batch-checks exports.CONF_ORCHINFO_TIMEOUT: Timeout between refreshing the active O list
CONF_ORCHINFO_TIMEOUT: Timeout between refreshing the active O list exports.CONF_BROADCASTER: Public address of the broadcaster to mimic
CONF_BROADCASTER: Public address of the broadcaster to mimic exports.CONF_DNS_TIMEOUT: Timeout between DNS & GEO resolving for orchestrator instances
CONF_DNS_TIMEOUT: Timeout between DNS & GEO resolving for orchestrator instances exports.CONF_PRESHARED_MASTER_KEY: password for uploading to the hodler
CONF_PRESHARED_MASTER_KEY: password for uploading to the hodler exports.CONF_SIGNATURE: Broadcaster signature over a message containing it's own public address
CONF_SIGNATURE: Broadcaster signature over a message containing it's own public address exports.CONF_GRAPH_URI: Full URL including API key to the Livepeer subgraph;
}
``` ```
Obtaining the `CONF_SIGNATURE` is unfortunately a pain in the ass. The only way for now it add a print statement to the `livepeer/server/rpc.go/genOrchestratorReq` function in `go-livepeer` and then run the Broadcaster. All web3 libraries prepend a prefix to any message they sign for security purposes, so a script which can generate the signature from the private key would be nice to have here... Obtaining the `CONF_SIGNATURE` is unfortunately a pain in the ass. The only way for now it add a print statement to the `livepeer/server/rpc.go/genOrchestratorReq` function in `go-livepeer` and then run the Broadcaster. All web3 libraries prepend a prefix to any message they sign for security purposes, so a script which can generate the signature from the private key would be nice to have here...
@ -31,24 +30,22 @@ Note: this folder has to be placed in `/orchTest/client`, or edit `ecosystem.con
example `config.js`: example `config.js`:
``` ```
export const { exports.CONF_MASTER_PORT = 443;
CONF_MASTER_PORT = 443, exports.CONF_MASTER_DOMAIN = "stronk.rocks";
CONF_MASTER_DOMAIN = "stronk.rocks", exports.CONF_MASTER_PATH = "/orch/collectStats";
CONF_MASTER_PATH = "/orch/collectStats", exports.CONF_FRIENDLY_NAME = "Michigan";
CONF_FRIENDLY_NAME = "Michigan", exports.CONF_REGION = "Europe";
CONF_REGION = "Europe", exports.CONF_LP_REGIONS = ["FRA", "LON", "PRA"];
CONF_LP_REGIONS = ["FRA", "LON", "PRA"], exports.CONF_CONCURRENCY = 6;
CONF_CONCURRENCY = 6, exports.CONF_SLEEPTIME = 2000; //< 2 seconds
CONF_SLEEPTIME = 2000, //< 2 seconds exports.CONF_MAX_LATENCY = 2000; //< 2 seconds
CONF_MAX_LATENCY = 2000, //< 2 seconds exports.CONF_ROUNDTIME = 60000; //< 1 minute
CONF_ROUNDTIME = 60000, //< 1 minute exports.CONF_ORCHINFO_TIMEOUT = 14400000; //< 4 hours
CONF_ORCHINFO_TIMEOUT = 14400000, //< 4 hours exports.CONF_BROADCASTER = "847791cBF03be716A7fe9Dc8c9Affe17Bd49Ae5e";
CONF_BROADCASTER = "847791cBF03be716A7fe9Dc8c9Affe17Bd49Ae5e", exports.CONF_DNS_TIMEOUT = 600000; //< 10 minutes
CONF_DNS_TIMEOUT = 600000, //< 10 minutes exports.CONF_PRESHARED_MASTER_KEY = "koekjes";
CONF_PRESHARED_MASTER_KEY = "koekjes", exports.CONF_SIGNATURE = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0];
CONF_SIGNATURE = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], exports.CONF_GRAPH_URI = "https://gateway-arbitrum.network.thegraph.com/api/REDACTED/subgraphs/id/REDACTED";
} = process.env;
``` ```
pm2 start ecosystem.config.cjs pm2 start ecosystem.config.cjs
@ -60,23 +57,22 @@ export const {
example `config.js`: example `config.js`:
``` ```
export const { exports.CONF_MASTER_PORT = 42069;
CONF_MASTER_PORT = 42069, exports.CONF_MASTER_DOMAIN = "127.0.0.1";
CONF_MASTER_DOMAIN = "127.0.0.1", exports.CONF_MASTER_PATH = "/api/master/collectStats";
CONF_MASTER_PATH = "/api/master/collectStats", exports.CONF_FRIENDLY_NAME = "Leiden";
CONF_FRIENDLY_NAME = "Leiden", exports.CONF_REGION = "Europe";
CONF_REGION = "Europe", exports.CONF_LP_REGIONS = ["FRA", "LON", "PRA"];
CONF_LP_REGIONS = ["FRA", "LON", "PRA"], exports.CONF_CONCURRENCY = 6;
CONF_CONCURRENCY = 6, exports.CONF_SLEEPTIME = 2000; //< 2 seconds
CONF_SLEEPTIME = 2000, //< 2 seconds exports.CONF_MAX_LATENCY = 2000; //< 2 seconds
CONF_MAX_LATENCY = 2000, //< 2 seconds exports.CONF_ROUNDTIME = 60000; //< 1 minute
CONF_ROUNDTIME = 60000, //< 1 minute exports.CONF_ORCHINFO_TIMEOUT = 14400000; //< 4 hours
CONF_ORCHINFO_TIMEOUT = 14400000, //< 4 hours exports.CONF_BROADCASTER = "847791cBF03be716A7fe9Dc8c9Affe17Bd49Ae5e";
CONF_BROADCASTER = "847791cBF03be716A7fe9Dc8c9Affe17Bd49Ae5e", exports.CONF_DNS_TIMEOUT = 600000; //< 10 minutes
CONF_DNS_TIMEOUT = 600000, //< 10 minutes exports.CONF_PRESHARED_MASTER_KEY = "koekjes";
CONF_PRESHARED_MASTER_KEY = "koekjes", exports.CONF_SIGNATURE = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0];
CONF_SIGNATURE = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], exports.CONF_GRAPH_URI = "https://gateway-arbitrum.network.thegraph.com/api/REDACTED/subgraphs/id/REDACTED"
} = process.env;
``` ```
npm run dev npm run dev

View File

@ -17,7 +17,7 @@
"dependencies": { "dependencies": {
"@grpc/grpc-js": "^1.6.8", "@grpc/grpc-js": "^1.6.8",
"@grpc/proto-loader": "^0.7.0", "@grpc/proto-loader": "^0.7.0",
"geoip-lite": "^1.4.8", "geoip-lite": "^1.4.10",
"graphql": "^16.5.0", "graphql": "^16.5.0",
"graphql-request": "^4.3.0" "graphql-request": "^4.3.0"
}, },

View File

@ -1,6 +1,7 @@
syntax = "proto3"; syntax = "proto3";
package net; package net;
option go_package = "./net";
// RPC calls implemented by the orchestrator // RPC calls implemented by the orchestrator
service Orchestrator { service Orchestrator {
@ -11,6 +12,13 @@ service Orchestrator {
rpc Ping(PingPong) returns (PingPong); rpc Ping(PingPong) returns (PingPong);
} }
service AIWorker {
// Called by the aiworker to register to an orchestrator. The orchestrator
// notifies registered aiworkers of jobs as they come in.
rpc RegisterAIWorker(RegisterAIWorkerRequest) returns (stream NotifyAIJob);
}
service Transcoder { service Transcoder {
// Called by the transcoder to register to an orchestrator. The orchestrator // Called by the transcoder to register to an orchestrator. The orchestrator
@ -42,6 +50,9 @@ message OrchestratorRequest {
// Broadcaster's signature over its address // Broadcaster's signature over its address
bytes sig = 2; bytes sig = 2;
// Features and constraints required by the broadcaster
Capabilities capabilities = 3;
} }
/* /*
@ -104,10 +115,27 @@ message Capabilities {
// Capacity corresponding to each capability // Capacity corresponding to each capability
map<uint32, uint32> capacities = 3; map<uint32, uint32> capacities = 3;
// Non-binary capability constraints, such as supported ranges. string version = 4;
Constraints constraints = 5;
// Non-binary constraints.
message Constraints { message Constraints {
// Empty for now string minVersion = 1;
map<uint32, CapabilityConstraints> PerCapability = 2;
} }
// Non-binary capability constraints, such as supported ranges.
message CapabilityConstraints {
message ModelConstraint {
bool warm = 1;
uint32 capacity = 2;
}
map<string, ModelConstraint> models = 1;
}
} }
// The orchestrator sends this in response to `GetOrchestrator`, containing // The orchestrator sends this in response to `GetOrchestrator`, containing
@ -149,34 +177,6 @@ message AuthToken {
int64 expiration = 3; int64 expiration = 3;
} }
// [EXPERIMENTAL]
// Describes a class that a model is trained to detect
message DetectorClass {
// ID of the class to detect
uint32 class_id = 1;
// Name of the class to detect
string class_name = 2;
}
// [EXPERIMENTAL]
// Describes the scene classification configuration
message SceneClassificationProfile {
// Sample rate of the frames picked by the O for scene detection
uint32 sample_rate = 1;
// List of output classes the model is trained to detect
repeated DetectorClass classes = 2;
}
// [EXPERIMENTAL]
// Describes the content detection configuration
message DetectorProfile {
oneof value {
SceneClassificationProfile scene_classification = 1;
}
}
// Data included by the broadcaster when submitting a segment for transcoding. // Data included by the broadcaster when submitting a segment for transcoding.
message SegData { message SegData {
@ -205,10 +205,6 @@ message SegData {
// Data for transcoding authentication // Data for transcoding authentication
AuthToken auth_token = 8; AuthToken auth_token = 8;
// [EXPERIMENTAL]
// Detector enabled for this segment
bool detector_enabled = 9;
// Calculate perceptual hash for this segment // Calculate perceptual hash for this segment
bool calc_perceptual_hash = 10; bool calc_perceptual_hash = 10;
@ -232,10 +228,6 @@ message SegData {
// Force HW Session Reinit // Force HW Session Reinit
bool ForceSessionReinit = 38; bool ForceSessionReinit = 38;
// [EXPERIMENTAL]
// Detector profiles to use
repeated DetectorProfile detector_profiles = 36;
} }
message SegParameters { message SegParameters {
@ -320,21 +312,6 @@ message TranscodedSegmentData {
string perceptual_hash_url = 3; string perceptual_hash_url = 3;
} }
// [EXPERIMENTAL]
// Describes scene classification results
message SceneClassificationData {
// Probability that the segment is detected as a particular classID (uint32)
map<uint32, double> class_probs = 1;
}
// [EXPERIMENTAL]
// Describes detection results
message DetectData {
oneof value {
SceneClassificationData scene_classification = 1;
}
}
// A set of transcoded segments following the profiles specified in the job. // A set of transcoded segments following the profiles specified in the job.
message TranscodeData { message TranscodeData {
@ -343,10 +320,6 @@ message TranscodeData {
// Signature of the hash of the concatenated hashes // Signature of the hash of the concatenated hashes
bytes sig = 2; bytes sig = 2;
// [EXPERIMENTAL]
// Detection result data in same order as SegData.detector_profiles
repeated DetectData detections = 3;
} }
// Response that a transcoder sends after transcoding a segment. // Response that a transcoder sends after transcoding a segment.
@ -365,6 +338,12 @@ message TranscodeResult {
OrchestratorInfo info = 16; OrchestratorInfo info = 16;
} }
// Response that an orchestrator sends after processing a payment.
message PaymentResult {
// Used to notify a broadcaster of updated orchestrator information
OrchestratorInfo info = 16;
}
// Sent by the transcoder to register itself to the orchestrator. // Sent by the transcoder to register itself to the orchestrator.
message RegisterRequest { message RegisterRequest {
@ -390,6 +369,9 @@ message NotifySegment {
// ID for this particular transcoding task. // ID for this particular transcoding task.
int64 taskId = 16; int64 taskId = 16;
// Orchestrator identifier for segment metadata
string orchId = 18;
// All fields below are deprecated. May still be populated if necessary // All fields below are deprecated. May still be populated if necessary
// Deprecated by segData. Job the segment belongs to. // Deprecated by segData. Job the segment belongs to.
@ -403,6 +385,34 @@ message NotifySegment {
reserved 33; // Formerly "repeated VideoProfile fullProfiles" reserved 33; // Formerly "repeated VideoProfile fullProfiles"
} }
// Sent by the aiworker to register itself to the orchestrator.
message RegisterAIWorkerRequest {
// Shared secret for auth
string secret = 1;
// AIWorker capabilities
Capabilities capabilities = 2;
}
// Data included by the gateway when submitting a AI job.
message AIJobData {
// pipeline to use for the job
string pipeline = 1;
// AI job request data
bytes requestData = 2;
}
// Sent by the orchestrator to the aiworker
message NotifyAIJob {
// Configuration for the AI job
AIJobData AIJobData = 1;
// ID for this particular AI task.
int64 taskId = 2;
}
// Required parameters for probabilistic micropayment tickets // Required parameters for probabilistic micropayment tickets
message TicketParams { message TicketParams {
// ETH address of the recipient // ETH address of the recipient
@ -466,4 +476,4 @@ message Payment {
// O's last known price // O's last known price
PriceInfo expected_price = 5; PriceInfo expected_price = 5;
} }

View File

@ -22,6 +22,7 @@ const {
CONF_DNS_TIMEOUT, CONF_DNS_TIMEOUT,
CONF_PRESHARED_MASTER_KEY, CONF_PRESHARED_MASTER_KEY,
CONF_SIGNATURE, CONF_SIGNATURE,
CONF_GRAPH_URI,
} = require("../config.js"); } = require("../config.js");
/* /*
@ -180,6 +181,7 @@ const discoverOrchestrator = async function (target) {
discoveryResults: { discoveryResults: {
transcoder: null, transcoder: null,
price_info: null, price_info: null,
version: null,
latency: 0, latency: 0,
err: "Took too long to respond. Aborted test...", err: "Took too long to respond. Aborted test...",
}, },
@ -189,6 +191,7 @@ const discoverOrchestrator = async function (target) {
discoveryResults: { discoveryResults: {
transcoder: orchestratorInfo.transcoder, transcoder: orchestratorInfo.transcoder,
price_info: orchestratorInfo.price_info, price_info: orchestratorInfo.price_info,
version: orchestratorInfo.capabilities?.version,
latency: elapsed, latency: elapsed,
err: orchestratorInfo.err, err: orchestratorInfo.err,
}, },
@ -309,10 +312,7 @@ const getOrchestrators = async function () {
} }
} }
`; `;
let orchData = await request( let orchData = await request(CONF_GRAPH_URI, orchQuery);
"https://api.thegraph.com/subgraphs/name/livepeer/arbitrum-one",
orchQuery
);
orchData = orchData.transcoders; orchData = orchData.transcoders;
if (!orchData) { if (!orchData) {
console.log("Thegraph is probably acting up..."); console.log("Thegraph is probably acting up...");

View File

@ -296,6 +296,7 @@ const onOrchUpdate = async function (id, obj, tag, region, livepeer_regions) {
price: -1, price: -1,
latitude: -1, latitude: -1,
longitude: -1, longitude: -1,
version: "",
probedFrom: {}, probedFrom: {},
regions: {}, regions: {},
livepeer_regions: {}, livepeer_regions: {},
@ -365,6 +366,9 @@ const onOrchUpdate = async function (id, obj, tag, region, livepeer_regions) {
obj.discovery.price_info.pricePerUnit / obj.discovery.price_info.pricePerUnit /
obj.discovery.price_info.pixelsPerUnit; obj.discovery.price_info.pixelsPerUnit;
} }
if (obj.discovery.version) {
newInstance.version = obj.discovery.version;
}
if (obj.resolv.geoLookup) { if (obj.resolv.geoLookup) {
newInstance.latitude = obj.resolv.geoLookup.latitude; newInstance.latitude = obj.resolv.geoLookup.latitude;
newInstance.longitude = obj.resolv.geoLookup.longitude; newInstance.longitude = obj.resolv.geoLookup.longitude;