diff --git a/client/package.json b/client/package.json index 3a42fab..f7bfb7d 100644 --- a/client/package.json +++ b/client/package.json @@ -15,13 +15,13 @@ "author": "Marco van Dijk", "license": "WTFPL", "dependencies": { - "@grpc/grpc-js": "^1.6.8", + "@grpc/grpc-js": "^1.13.0", "@grpc/proto-loader": "^0.7.0", "geoip-lite": "^1.4.10", - "graphql": "^16.5.0", - "graphql-request": "^4.3.0" + "graphql": "^16.11.0", + "graphql-request": "^7.2.0" }, "devDependencies": { - "nodemon": "^1.18.10" + "nodemon": "^3.1.0" } } diff --git a/client/src/proto/livepeer.proto b/client/src/proto/livepeer.proto index 406d8dd..9a09c28 100644 --- a/client/src/proto/livepeer.proto +++ b/client/src/proto/livepeer.proto @@ -8,7 +8,8 @@ service Orchestrator { // Called by the broadcaster to request transcoder info from an orchestrator. rpc GetOrchestrator(OrchestratorRequest) returns (OrchestratorInfo); - rpc EndTranscodingSession(EndTranscodingSessionRequest) returns (EndTranscodingSessionResponse); + rpc EndTranscodingSession(EndTranscodingSessionRequest) + returns (EndTranscodingSessionResponse); rpc Ping(PingPong) returns (PingPong); } @@ -29,17 +30,16 @@ service Transcoder { message PingPong { // Implementation defined bytes value = 1; - } -// sent by Broadcaster to Orchestrator to terminate the transcoding session and free resources (used for verification sessions) +// sent by Broadcaster to Orchestrator to terminate the transcoding session and +// free resources (used for verification sessions) message EndTranscodingSessionRequest { // Data for transcoding authentication - AuthToken auth_token = 1; + AuthToken auth_token = 1; } -message EndTranscodingSessionResponse { -} +message EndTranscodingSessionResponse {} // This request is sent by the broadcaster in `GetTranscoder` to request // information on which transcoder to use. @@ -49,10 +49,14 @@ message OrchestratorRequest { bytes address = 1; // Broadcaster's signature over its address - bytes sig = 2; + bytes sig = 2; // Features and constraints required by the broadcaster Capabilities capabilities = 3; + + // Don't throw error in case of missing capacity, used for refreshing session + // when we don't need to check capacity + bool ignoreCapacityCheck = 4; } /* @@ -62,9 +66,9 @@ message OrchestratorRequest { message OSInfo { enum StorageType { - DIRECT = 0; - S3 = 1; - GOOGLE = 2; + DIRECT = 0; + S3 = 1; + GOOGLE = 2; } // Storage type: direct, s3, ipfs. @@ -100,42 +104,49 @@ message PriceInfo { int64 pricePerUnit = 1; // Pixels covered in the price - // Set price to 1 wei and pixelsPerUnit > 1 to have a smaller price granularity per pixel than 1 wei + // Set price to 1 wei and pixelsPerUnit > 1 to have a smaller price + // granularity per pixel than 1 wei int64 pixelsPerUnit = 2; + + // Capability price is for + uint32 capability = 3; + + // constraint price is for + string constraint = 4; } message Capabilities { - // Bit string of supported features - one bit per feature - repeated uint64 bitstring = 1; + // Bit string of supported features - one bit per feature + repeated uint64 bitstring = 1; - // Bit string of features that are required to be supported - repeated uint64 mandatories = 2; + // Bit string of features that are required to be supported + repeated uint64 mandatories = 2; - // Capacity corresponding to each capability - map capacities = 3; + // Capacity corresponding to each capability + map capacities = 3; - string version = 4; + string version = 4; - Constraints constraints = 5; + Constraints constraints = 5; - // Non-binary constraints. - message Constraints { - string minVersion = 1; - map PerCapability = 2; + // Non-binary constraints. + message Constraints { + string minVersion = 1; + map PerCapability = 2; + } + + // Non-binary capability constraints, such as supported ranges. + message CapabilityConstraints { + message ModelConstraint { + bool warm = 1; + uint32 capacity = 2; + string runnerVersion = 3; + uint32 capacityInUse = 4; } - // Non-binary capability constraints, such as supported ranges. - message CapabilityConstraints { - message ModelConstraint { - bool warm = 1; - uint32 capacity = 2; - } - - map models = 1; - } - - + map models = 1; + } } // The orchestrator sends this in response to `GetOrchestrator`, containing @@ -143,7 +154,7 @@ message Capabilities { message OrchestratorInfo { // URI of the transcoder to use for submitting segments. - string transcoder = 1; + string transcoder = 1; // Parameters for probabilistic micropayment tickets TicketParams ticket_params = 2; @@ -160,11 +171,19 @@ message OrchestratorInfo { // Data for transcoding authentication AuthToken auth_token = 6; - // Orchestrator returns info about own input object storage, if it wants it to be used. + // Information on Orchestrator hardware + repeated HardwareInformation hardware = 7; + + // Orchestrator returns info about own input object storage, if it wants it to + // be used. repeated OSInfo storage = 32; + + // prices for capabilities + repeated PriceInfo capabilities_prices = 33; } -// Data for transcoding authentication that is included in the OrchestratorInfo message during discovery +// Data for transcoding authentication that is included in the OrchestratorInfo +// message during discovery message AuthToken { // Record used to authenticate for a transcode session // Opaque to the receiver @@ -184,7 +203,7 @@ message SegData { bytes manifestId = 1; // Sequence number of the segment to be transcoded - int64 seq = 2; + int64 seq = 2; // Hash of the segment data to be transcoded bytes hash = 3; @@ -194,7 +213,7 @@ message SegData { // Broadcaster signature for the segment. Corresponds to: // broadcaster.sign(manifestId | seqNo | dataHash | profiles) - bytes sig = 5; + bytes sig = 5; // Duration of the segment to be transcoded, in milliseconds int32 duration = 6; @@ -209,7 +228,8 @@ message SegData { bool calc_perceptual_hash = 10; // Broadcaster's preferred storage medium(s) - // XXX should we include this in a sig somewhere until certs are authenticated? + // XXX should we include this in a sig somewhere until certs are + // authenticated? repeated OSInfo storage = 32; // Transcoding profiles to use. Supersedes `profiles` field @@ -250,15 +270,15 @@ message VideoProfile { int32 height = 18; // Bitrate of VideoProfile - int32 bitrate =19; + int32 bitrate = 19; // FPS of VideoProfile uint32 fps = 20; // Desired output format enum Format { - MPEGTS = 0; - MP4 = 1; + MPEGTS = 0; + MP4 = 1; } Format format = 21; @@ -266,10 +286,10 @@ message VideoProfile { uint32 fpsDen = 22; enum Profile { - ENCODER_DEFAULT = 0; - H264_BASELINE = 1; - H264_MAIN = 2; - H264_HIGH = 3; + ENCODER_DEFAULT = 0; + H264_BASELINE = 1; + H264_MAIN = 2; + H264_HIGH = 3; H264_CONSTRAINED_HIGH = 4; } // Desired codec profile @@ -281,8 +301,8 @@ message VideoProfile { enum VideoCodec { H264 = 0; H265 = 1; - VP8 = 2; - VP9 = 3; + VP8 = 2; + VP9 = 3; } // Encoder (video codec) @@ -302,115 +322,118 @@ message VideoProfile { // Individual transcoded segment data. message TranscodedSegmentData { - // URL where the transcoded data can be downloaded from. - string url = 1; + // URL where the transcoded data can be downloaded from. + string url = 1; - // Amount of pixels processed (output pixels) - int64 pixels = 2; + // Amount of pixels processed (output pixels) + int64 pixels = 2; - // URL where the perceptual hash data can be downloaded from (can be empty) - string perceptual_hash_url = 3; + // URL where the perceptual hash data can be downloaded from (can be empty) + string perceptual_hash_url = 3; } // A set of transcoded segments following the profiles specified in the job. message TranscodeData { - // Transcoded data, in the order specified in the job options - repeated TranscodedSegmentData segments = 1; + // Transcoded data, in the order specified in the job options + repeated TranscodedSegmentData segments = 1; - // Signature of the hash of the concatenated hashes - bytes sig = 2; + // Signature of the hash of the concatenated hashes + bytes sig = 2; } // Response that a transcoder sends after transcoding a segment. message TranscodeResult { - // Sequence number of the transcoded results. - int64 seq = 1; + // Sequence number of the transcoded results. + int64 seq = 1; - // Result of transcoding can be an error, or successful with more info - oneof result { - string error = 2; - TranscodeData data = 3; - } + // Result of transcoding can be an error, or successful with more info + oneof result { + string error = 2; + TranscodeData data = 3; + } - // Used to notify a broadcaster of updated orchestrator information - OrchestratorInfo info = 16; + // Used to notify a broadcaster of updated orchestrator information + OrchestratorInfo info = 16; } // Response that an orchestrator sends after processing a payment. message PaymentResult { - // Used to notify a broadcaster of updated orchestrator information - OrchestratorInfo info = 16; + // Used to notify a broadcaster of updated orchestrator information + OrchestratorInfo info = 16; } // Sent by the transcoder to register itself to the orchestrator. message RegisterRequest { - // Shared secret for auth - string secret = 1; + // Shared secret for auth + string secret = 1; - // Transcoder capacity - int64 capacity = 2; + // Transcoder capacity + int64 capacity = 2; - // Transcoder capabilities - Capabilities capabilities = 3; + // Transcoder capabilities + Capabilities capabilities = 3; } // Sent by the orchestrator to the transcoder message NotifySegment { - // URL of the segment to transcode. - string url = 1; + // URL of the segment to transcode. + string url = 1; - // Configuration for the transcoding job - SegData segData = 3; + // Configuration for the transcoding job + SegData segData = 3; - // ID for this particular transcoding task. - int64 taskId = 16; + // ID for this particular transcoding task. + int64 taskId = 16; - // Orchestrator identifier for segment metadata - string orchId = 18; + // Orchestrator identifier for segment metadata + string orchId = 18; - // All fields below are deprecated. May still be populated if necessary + // All fields below are deprecated. May still be populated if necessary - // Deprecated by segData. Job the segment belongs to. - reserved 2; // Formerly "string job" + // Deprecated by segData. Job the segment belongs to. + reserved 2; // Formerly "string job" - // Deprecated by fullProfiles. Set of presets to transcode into. - // Should be set to an invalid value to induce failures - bytes profiles = 17; + // Deprecated by fullProfiles. Set of presets to transcode into. + // Should be set to an invalid value to induce failures + bytes profiles = 17; - // Deprecated by segData. Transcoding configuration to use. - reserved 33; // Formerly "repeated VideoProfile fullProfiles" + // Deprecated by segData. Transcoding configuration to use. + reserved 33; // Formerly "repeated VideoProfile fullProfiles" } // Sent by the aiworker to register itself to the orchestrator. message RegisterAIWorkerRequest { - // Shared secret for auth - string secret = 1; + // Shared secret for auth + string secret = 1; - // AIWorker capabilities - Capabilities capabilities = 2; + // AIWorker capabilities + Capabilities capabilities = 2; + + // AIWorker hardware info + repeated HardwareInformation hardware = 3; } // Data included by the gateway when submitting a AI job. message AIJobData { - // pipeline to use for the job - string pipeline = 1; + // pipeline to use for the job + string pipeline = 1; - // AI job request data - bytes requestData = 2; + // AI job request data + bytes requestData = 2; } // Sent by the orchestrator to the aiworker message NotifyAIJob { - // Configuration for the AI job - AIJobData AIJobData = 1; + // Configuration for the AI job + AIJobData AIJobData = 1; - // ID for this particular AI task. - int64 taskId = 2; + // ID for this particular AI task. + int64 taskId = 2; } // Required parameters for probabilistic micropayment tickets @@ -429,10 +452,12 @@ message TicketParams { bytes recipient_rand_hash = 4; // Value generated by recipient that the recipient can use - // to derive the random number corresponding to the recipient's hash commitment + // to derive the random number corresponding to the recipient's hash + // commitment bytes seed = 5; - // Block number at which the current set of advertised TicketParams is no longer valid + // Block number at which the current set of advertised TicketParams is no + // longer valid bytes expiration_block = 6; // Expected ticket expiration params @@ -442,7 +467,8 @@ message TicketParams { // Sender Params (nonces and signatures) message TicketSenderParams { // Monotonically increasing counter that makes the ticket - // unique relative to a particular hash commitment to a recipient's random number + // unique relative to a particular hash commitment to a recipient's random + // number uint32 sender_nonce = 1; // Sender signature over the ticket @@ -460,7 +486,8 @@ message TicketExpirationParams { // Payment for transcoding video segments // A payment can constitute of multiple tickets -// A broadcaster might need to send multiple tickets to top up his credit with an Orchestrator +// A broadcaster might need to send multiple tickets to top up his credit with +// an Orchestrator message Payment { // Probabilistic micropayment ticket parameters // These remain the same even when sending multiple tickets @@ -476,4 +503,37 @@ message Payment { // O's last known price PriceInfo expected_price = 5; +} + +// GPU information used by worker on network +message GPUComputeInfo { + // unique id + string id = 1; + + // gpu name + string name = 2; + + // compute version supported - major + uint32 major = 4; + + // compute version supported - minor + uint32 minor = 5; + + // GPU memory free + int64 memory_free = 6; + + // GPU memory total + int64 memory_total = 7; +} + +// Hardware information for worker on network +message HardwareInformation { + // pipeline hardware supports + string pipeline = 1; + + // model_id hardware supports + string model_id = 2; + + // hardware for pipeline and model_id + map gpu_info = 3; } \ No newline at end of file diff --git a/master/.gitignore b/master/.gitignore index 29c82a2..507c6f1 100644 --- a/master/.gitignore +++ b/master/.gitignore @@ -1,3 +1,4 @@ /node_modules /src/config.js -package-lock.json \ No newline at end of file +package-lock.json +/data/ \ No newline at end of file diff --git a/master/package.json b/master/package.json index 9e49e9d..dd40311 100644 --- a/master/package.json +++ b/master/package.json @@ -15,13 +15,13 @@ "author": "Marco van Dijk", "license": "WTFPL", "dependencies": { - "ethers": "5.7.2", - "express": "^4.17.1", - "node-persist": "^4.0.3", - "npm": "^8.5.2", - "prom-client": "^14.0.1" + "better-sqlite3": "^11.10.0", + "ethers": "^6.14.0", + "express": "^5.1.0", + "prom-client": "^15.1.0" }, "devDependencies": { - "nodemon": "^1.18.10" + "nodemon": "^3.1.0", + "npm": "^11.4.0" } } diff --git a/master/src/storage.js b/master/src/storage.js new file mode 100644 index 0000000..6e60b6a --- /dev/null +++ b/master/src/storage.js @@ -0,0 +1,140 @@ +const Database = require('better-sqlite3'); +const path = require('path'); +const fs = require('fs'); + +class Storage { + constructor(dbPath = './data/storage.db') { + // Ensure directory exists + const dir = path.dirname(dbPath); + if (!fs.existsSync(dir)) { + fs.mkdirSync(dir, { recursive: true }); + } + + this.db = new Database(dbPath); + this.db.pragma('journal_mode = WAL'); // Write-Ahead Logging for better concurrency + this.db.pragma('synchronous = NORMAL'); // Balance between safety and performance + + this.init(); + } + + init() { + // Create table if it doesn't exist + const createTable = this.db.prepare(` + CREATE TABLE IF NOT EXISTS storage ( + key TEXT PRIMARY KEY, + value TEXT NOT NULL, + updated_at INTEGER NOT NULL + ) + `); + createTable.run(); + + // Prepare statements for better performance + this.getStmt = this.db.prepare('SELECT value FROM storage WHERE key = ?'); + this.setStmt = this.db.prepare(` + INSERT OR REPLACE INTO storage (key, value, updated_at) + VALUES (?, ?, ?) + `); + this.deleteStmt = this.db.prepare('DELETE FROM storage WHERE key = ?'); + this.getAllStmt = this.db.prepare('SELECT key, value FROM storage'); + } + + async getItem(key) { + try { + const row = this.getStmt.get(key); + if (!row) return null; + + return JSON.parse(row.value); + } catch (error) { + console.error(`Error getting item ${key}:`, error.message); + return null; + } + } + + async setItem(key, value) { + try { + const jsonValue = JSON.stringify(value); + const timestamp = Date.now(); + + this.setStmt.run(key, jsonValue, timestamp); + console.log(`Stored data for key: ${key}`); + return true; + } catch (error) { + console.error(`Error setting item ${key}:`, error.message); + return false; + } + } + + async removeItem(key) { + try { + this.deleteStmt.run(key); + return true; + } catch (error) { + console.error(`Error removing item ${key}:`, error.message); + return false; + } + } + + async clear() { + try { + this.db.prepare('DELETE FROM storage').run(); + return true; + } catch (error) { + console.error('Error clearing storage:', error.message); + return false; + } + } + + async getAllKeys() { + try { + const rows = this.getAllStmt.all(); + return rows.map(row => row.key); + } catch (error) { + console.error('Error getting all keys:', error.message); + return []; + } + } + + // Batch operations for better performance + async setItems(items) { + const transaction = this.db.transaction((items) => { + const timestamp = Date.now(); + for (const [key, value] of items) { + const jsonValue = JSON.stringify(value); + this.setStmt.run(key, jsonValue, timestamp); + } + }); + + try { + transaction(items); + console.log(`Batch stored ${items.length} items`); + return true; + } catch (error) { + console.error('Error in batch set:', error.message); + return false; + } + } + + // Get storage statistics + getStats() { + try { + const countStmt = this.db.prepare('SELECT COUNT(*) as count FROM storage'); + const sizeStmt = this.db.prepare('SELECT SUM(LENGTH(value)) as size FROM storage'); + + const count = countStmt.get().count; + const size = sizeStmt.get().size || 0; + + return { count, size }; + } catch (error) { + console.error('Error getting storage stats:', error.message); + return { count: 0, size: 0 }; + } + } + + close() { + if (this.db) { + this.db.close(); + } + } +} + +module.exports = Storage; \ No newline at end of file diff --git a/master/worker.js b/master/worker.js index 6e685d2..fd4180f 100644 --- a/master/worker.js +++ b/master/worker.js @@ -1,7 +1,7 @@ const { parentPort } = require("worker_threads"); const client = require("prom-client"); -const storage = require("node-persist"); -const ethers = require("ethers"); +const Storage = require("./src/storage.js"); +const { JsonRpcProvider } = require("ethers"); const { CONF_API_L1_HTTP, CONF_API_L1_KEY, @@ -12,8 +12,11 @@ const { } = require("./src/config.js"); const taskQueue = []; +// Initialize Storage +const storage = new Storage(); + // Initialize Ethereum Provider -const l1provider = new ethers.providers.JsonRpcProvider( +const l1provider = new JsonRpcProvider( CONF_API_L1_HTTP + CONF_API_L1_KEY ); @@ -91,11 +94,11 @@ function sleep(ms) { // Recover persistent storage async function recoverStorage(initPrometheus) { console.log("Retrieving state from storage."); - storedDomains = await storage.getItem("ensDomainCache"); + const storedDomains = await storage.getItem("ensDomainCache"); if (storedDomains) { ensDomainCache = storedDomains; } - storedOrchs = await storage.getItem("orchCache"); + const storedOrchs = await storage.getItem("orchCache"); if (storedOrchs) { orchCache = storedOrchs; } @@ -164,9 +167,7 @@ async function recoverStorage(initPrometheus) { async function writeToStorage(key, data) { try { - const serialized = JSON.stringify(data); - if (!serialized || !serialized.length) throw new Error("Invalid JSON data"); - await storage.setItem(key, JSON.parse(serialized)); + await storage.setItem(key, data); console.log(`Stored data for key: ${key}`); } catch (err) { console.error(`Error writing to storage for key: ${key}`, err.message); @@ -174,23 +175,19 @@ async function writeToStorage(key, data) { } } -async function flushCachesToStorage() { - try { - if (staleENSCache) { - writeToStorage("ensDomainCache", ensDomainCache); - } - if (staleOrchCache) { - writeToStorage("orchCache", orchCache); - } - console.log("Flushed caches to storage."); - } catch (err) { - console.error("Error flushing caches to storage:", err.message); - } -} - process.on("SIGINT", async () => { console.log("Shutting down worker..."); - await flushCachesToStorage(); + if (staleENSCache) { + console.log("Writing ENS cache to storage..."); + await writeToStorage("ensDomainCache", ensDomainCache); + staleENSCache = false; + } + if (staleOrchCache) { + console.log("Writing orchestrator cache to storage..."); + await writeToStorage("orchCache", orchCache); + staleOrchCache = false; + } + storage.close(); process.exit(); }); @@ -662,15 +659,7 @@ parentPort.on("message", async (message) => { // Initialize and Start Worker (async () => { console.log("Opening storage."); - await storage.init({ - stringify: JSON.stringify, - parse: JSON.parse, - encoding: "utf8", - logging: false, - ttl: false, - forgiveParseErrors: false, - }); - recoverStorage(true); + await recoverStorage(true); console.log("Worker initialized."); runTests(); processQueue();