Update deps, proto file and replace node-persist with proper storage.

This commit is contained in:
Marco van Dijk 2025-06-06 13:02:54 +02:00
parent ca7ff2ef55
commit bf5a7647d9
6 changed files with 341 additions and 151 deletions

View File

@ -15,13 +15,13 @@
"author": "Marco van Dijk",
"license": "WTFPL",
"dependencies": {
"@grpc/grpc-js": "^1.6.8",
"@grpc/grpc-js": "^1.13.0",
"@grpc/proto-loader": "^0.7.0",
"geoip-lite": "^1.4.10",
"graphql": "^16.5.0",
"graphql-request": "^4.3.0"
"graphql": "^16.11.0",
"graphql-request": "^7.2.0"
},
"devDependencies": {
"nodemon": "^1.18.10"
"nodemon": "^3.1.0"
}
}

View File

@ -8,7 +8,8 @@ service Orchestrator {
// Called by the broadcaster to request transcoder info from an orchestrator.
rpc GetOrchestrator(OrchestratorRequest) returns (OrchestratorInfo);
rpc EndTranscodingSession(EndTranscodingSessionRequest) returns (EndTranscodingSessionResponse);
rpc EndTranscodingSession(EndTranscodingSessionRequest)
returns (EndTranscodingSessionResponse);
rpc Ping(PingPong) returns (PingPong);
}
@ -29,17 +30,16 @@ service Transcoder {
message PingPong {
// Implementation defined
bytes value = 1;
}
// sent by Broadcaster to Orchestrator to terminate the transcoding session and free resources (used for verification sessions)
// sent by Broadcaster to Orchestrator to terminate the transcoding session and
// free resources (used for verification sessions)
message EndTranscodingSessionRequest {
// Data for transcoding authentication
AuthToken auth_token = 1;
AuthToken auth_token = 1;
}
message EndTranscodingSessionResponse {
}
message EndTranscodingSessionResponse {}
// This request is sent by the broadcaster in `GetTranscoder` to request
// information on which transcoder to use.
@ -49,10 +49,14 @@ message OrchestratorRequest {
bytes address = 1;
// Broadcaster's signature over its address
bytes sig = 2;
bytes sig = 2;
// Features and constraints required by the broadcaster
Capabilities capabilities = 3;
// Don't throw error in case of missing capacity, used for refreshing session
// when we don't need to check capacity
bool ignoreCapacityCheck = 4;
}
/*
@ -62,9 +66,9 @@ message OrchestratorRequest {
message OSInfo {
enum StorageType {
DIRECT = 0;
S3 = 1;
GOOGLE = 2;
DIRECT = 0;
S3 = 1;
GOOGLE = 2;
}
// Storage type: direct, s3, ipfs.
@ -100,42 +104,49 @@ message PriceInfo {
int64 pricePerUnit = 1;
// Pixels covered in the price
// Set price to 1 wei and pixelsPerUnit > 1 to have a smaller price granularity per pixel than 1 wei
// Set price to 1 wei and pixelsPerUnit > 1 to have a smaller price
// granularity per pixel than 1 wei
int64 pixelsPerUnit = 2;
// Capability price is for
uint32 capability = 3;
// constraint price is for
string constraint = 4;
}
message Capabilities {
// Bit string of supported features - one bit per feature
repeated uint64 bitstring = 1;
// Bit string of supported features - one bit per feature
repeated uint64 bitstring = 1;
// Bit string of features that are required to be supported
repeated uint64 mandatories = 2;
// Bit string of features that are required to be supported
repeated uint64 mandatories = 2;
// Capacity corresponding to each capability
map<uint32, uint32> capacities = 3;
// Capacity corresponding to each capability
map<uint32, uint32> capacities = 3;
string version = 4;
string version = 4;
Constraints constraints = 5;
Constraints constraints = 5;
// Non-binary constraints.
message Constraints {
string minVersion = 1;
map<uint32, CapabilityConstraints> PerCapability = 2;
// Non-binary constraints.
message Constraints {
string minVersion = 1;
map<uint32, CapabilityConstraints> PerCapability = 2;
}
// Non-binary capability constraints, such as supported ranges.
message CapabilityConstraints {
message ModelConstraint {
bool warm = 1;
uint32 capacity = 2;
string runnerVersion = 3;
uint32 capacityInUse = 4;
}
// Non-binary capability constraints, such as supported ranges.
message CapabilityConstraints {
message ModelConstraint {
bool warm = 1;
uint32 capacity = 2;
}
map<string, ModelConstraint> models = 1;
}
map<string, ModelConstraint> models = 1;
}
}
// The orchestrator sends this in response to `GetOrchestrator`, containing
@ -143,7 +154,7 @@ message Capabilities {
message OrchestratorInfo {
// URI of the transcoder to use for submitting segments.
string transcoder = 1;
string transcoder = 1;
// Parameters for probabilistic micropayment tickets
TicketParams ticket_params = 2;
@ -160,11 +171,19 @@ message OrchestratorInfo {
// Data for transcoding authentication
AuthToken auth_token = 6;
// Orchestrator returns info about own input object storage, if it wants it to be used.
// Information on Orchestrator hardware
repeated HardwareInformation hardware = 7;
// Orchestrator returns info about own input object storage, if it wants it to
// be used.
repeated OSInfo storage = 32;
// prices for capabilities
repeated PriceInfo capabilities_prices = 33;
}
// Data for transcoding authentication that is included in the OrchestratorInfo message during discovery
// Data for transcoding authentication that is included in the OrchestratorInfo
// message during discovery
message AuthToken {
// Record used to authenticate for a transcode session
// Opaque to the receiver
@ -184,7 +203,7 @@ message SegData {
bytes manifestId = 1;
// Sequence number of the segment to be transcoded
int64 seq = 2;
int64 seq = 2;
// Hash of the segment data to be transcoded
bytes hash = 3;
@ -194,7 +213,7 @@ message SegData {
// Broadcaster signature for the segment. Corresponds to:
// broadcaster.sign(manifestId | seqNo | dataHash | profiles)
bytes sig = 5;
bytes sig = 5;
// Duration of the segment to be transcoded, in milliseconds
int32 duration = 6;
@ -209,7 +228,8 @@ message SegData {
bool calc_perceptual_hash = 10;
// Broadcaster's preferred storage medium(s)
// XXX should we include this in a sig somewhere until certs are authenticated?
// XXX should we include this in a sig somewhere until certs are
// authenticated?
repeated OSInfo storage = 32;
// Transcoding profiles to use. Supersedes `profiles` field
@ -250,15 +270,15 @@ message VideoProfile {
int32 height = 18;
// Bitrate of VideoProfile
int32 bitrate =19;
int32 bitrate = 19;
// FPS of VideoProfile
uint32 fps = 20;
// Desired output format
enum Format {
MPEGTS = 0;
MP4 = 1;
MPEGTS = 0;
MP4 = 1;
}
Format format = 21;
@ -266,10 +286,10 @@ message VideoProfile {
uint32 fpsDen = 22;
enum Profile {
ENCODER_DEFAULT = 0;
H264_BASELINE = 1;
H264_MAIN = 2;
H264_HIGH = 3;
ENCODER_DEFAULT = 0;
H264_BASELINE = 1;
H264_MAIN = 2;
H264_HIGH = 3;
H264_CONSTRAINED_HIGH = 4;
}
// Desired codec profile
@ -281,8 +301,8 @@ message VideoProfile {
enum VideoCodec {
H264 = 0;
H265 = 1;
VP8 = 2;
VP9 = 3;
VP8 = 2;
VP9 = 3;
}
// Encoder (video codec)
@ -302,115 +322,118 @@ message VideoProfile {
// Individual transcoded segment data.
message TranscodedSegmentData {
// URL where the transcoded data can be downloaded from.
string url = 1;
// URL where the transcoded data can be downloaded from.
string url = 1;
// Amount of pixels processed (output pixels)
int64 pixels = 2;
// Amount of pixels processed (output pixels)
int64 pixels = 2;
// URL where the perceptual hash data can be downloaded from (can be empty)
string perceptual_hash_url = 3;
// URL where the perceptual hash data can be downloaded from (can be empty)
string perceptual_hash_url = 3;
}
// A set of transcoded segments following the profiles specified in the job.
message TranscodeData {
// Transcoded data, in the order specified in the job options
repeated TranscodedSegmentData segments = 1;
// Transcoded data, in the order specified in the job options
repeated TranscodedSegmentData segments = 1;
// Signature of the hash of the concatenated hashes
bytes sig = 2;
// Signature of the hash of the concatenated hashes
bytes sig = 2;
}
// Response that a transcoder sends after transcoding a segment.
message TranscodeResult {
// Sequence number of the transcoded results.
int64 seq = 1;
// Sequence number of the transcoded results.
int64 seq = 1;
// Result of transcoding can be an error, or successful with more info
oneof result {
string error = 2;
TranscodeData data = 3;
}
// Result of transcoding can be an error, or successful with more info
oneof result {
string error = 2;
TranscodeData data = 3;
}
// Used to notify a broadcaster of updated orchestrator information
OrchestratorInfo info = 16;
// Used to notify a broadcaster of updated orchestrator information
OrchestratorInfo info = 16;
}
// Response that an orchestrator sends after processing a payment.
message PaymentResult {
// Used to notify a broadcaster of updated orchestrator information
OrchestratorInfo info = 16;
// Used to notify a broadcaster of updated orchestrator information
OrchestratorInfo info = 16;
}
// Sent by the transcoder to register itself to the orchestrator.
message RegisterRequest {
// Shared secret for auth
string secret = 1;
// Shared secret for auth
string secret = 1;
// Transcoder capacity
int64 capacity = 2;
// Transcoder capacity
int64 capacity = 2;
// Transcoder capabilities
Capabilities capabilities = 3;
// Transcoder capabilities
Capabilities capabilities = 3;
}
// Sent by the orchestrator to the transcoder
message NotifySegment {
// URL of the segment to transcode.
string url = 1;
// URL of the segment to transcode.
string url = 1;
// Configuration for the transcoding job
SegData segData = 3;
// Configuration for the transcoding job
SegData segData = 3;
// ID for this particular transcoding task.
int64 taskId = 16;
// ID for this particular transcoding task.
int64 taskId = 16;
// Orchestrator identifier for segment metadata
string orchId = 18;
// Orchestrator identifier for segment metadata
string orchId = 18;
// All fields below are deprecated. May still be populated if necessary
// All fields below are deprecated. May still be populated if necessary
// Deprecated by segData. Job the segment belongs to.
reserved 2; // Formerly "string job"
// Deprecated by segData. Job the segment belongs to.
reserved 2; // Formerly "string job"
// Deprecated by fullProfiles. Set of presets to transcode into.
// Should be set to an invalid value to induce failures
bytes profiles = 17;
// Deprecated by fullProfiles. Set of presets to transcode into.
// Should be set to an invalid value to induce failures
bytes profiles = 17;
// Deprecated by segData. Transcoding configuration to use.
reserved 33; // Formerly "repeated VideoProfile fullProfiles"
// Deprecated by segData. Transcoding configuration to use.
reserved 33; // Formerly "repeated VideoProfile fullProfiles"
}
// Sent by the aiworker to register itself to the orchestrator.
message RegisterAIWorkerRequest {
// Shared secret for auth
string secret = 1;
// Shared secret for auth
string secret = 1;
// AIWorker capabilities
Capabilities capabilities = 2;
// AIWorker capabilities
Capabilities capabilities = 2;
// AIWorker hardware info
repeated HardwareInformation hardware = 3;
}
// Data included by the gateway when submitting a AI job.
message AIJobData {
// pipeline to use for the job
string pipeline = 1;
// pipeline to use for the job
string pipeline = 1;
// AI job request data
bytes requestData = 2;
// AI job request data
bytes requestData = 2;
}
// Sent by the orchestrator to the aiworker
message NotifyAIJob {
// Configuration for the AI job
AIJobData AIJobData = 1;
// Configuration for the AI job
AIJobData AIJobData = 1;
// ID for this particular AI task.
int64 taskId = 2;
// ID for this particular AI task.
int64 taskId = 2;
}
// Required parameters for probabilistic micropayment tickets
@ -429,10 +452,12 @@ message TicketParams {
bytes recipient_rand_hash = 4;
// Value generated by recipient that the recipient can use
// to derive the random number corresponding to the recipient's hash commitment
// to derive the random number corresponding to the recipient's hash
// commitment
bytes seed = 5;
// Block number at which the current set of advertised TicketParams is no longer valid
// Block number at which the current set of advertised TicketParams is no
// longer valid
bytes expiration_block = 6;
// Expected ticket expiration params
@ -442,7 +467,8 @@ message TicketParams {
// Sender Params (nonces and signatures)
message TicketSenderParams {
// Monotonically increasing counter that makes the ticket
// unique relative to a particular hash commitment to a recipient's random number
// unique relative to a particular hash commitment to a recipient's random
// number
uint32 sender_nonce = 1;
// Sender signature over the ticket
@ -460,7 +486,8 @@ message TicketExpirationParams {
// Payment for transcoding video segments
// A payment can constitute of multiple tickets
// A broadcaster might need to send multiple tickets to top up his credit with an Orchestrator
// A broadcaster might need to send multiple tickets to top up his credit with
// an Orchestrator
message Payment {
// Probabilistic micropayment ticket parameters
// These remain the same even when sending multiple tickets
@ -476,4 +503,37 @@ message Payment {
// O's last known price
PriceInfo expected_price = 5;
}
// GPU information used by worker on network
message GPUComputeInfo {
// unique id
string id = 1;
// gpu name
string name = 2;
// compute version supported - major
uint32 major = 4;
// compute version supported - minor
uint32 minor = 5;
// GPU memory free
int64 memory_free = 6;
// GPU memory total
int64 memory_total = 7;
}
// Hardware information for worker on network
message HardwareInformation {
// pipeline hardware supports
string pipeline = 1;
// model_id hardware supports
string model_id = 2;
// hardware for pipeline and model_id
map<string, GPUComputeInfo> gpu_info = 3;
}

3
master/.gitignore vendored
View File

@ -1,3 +1,4 @@
/node_modules
/src/config.js
package-lock.json
package-lock.json
/data/

View File

@ -15,13 +15,13 @@
"author": "Marco van Dijk",
"license": "WTFPL",
"dependencies": {
"ethers": "5.7.2",
"express": "^4.17.1",
"node-persist": "^4.0.3",
"npm": "^8.5.2",
"prom-client": "^14.0.1"
"better-sqlite3": "^11.10.0",
"ethers": "^6.14.0",
"express": "^5.1.0",
"prom-client": "^15.1.0"
},
"devDependencies": {
"nodemon": "^1.18.10"
"nodemon": "^3.1.0",
"npm": "^11.4.0"
}
}

140
master/src/storage.js Normal file
View File

@ -0,0 +1,140 @@
const Database = require('better-sqlite3');
const path = require('path');
const fs = require('fs');
class Storage {
constructor(dbPath = './data/storage.db') {
// Ensure directory exists
const dir = path.dirname(dbPath);
if (!fs.existsSync(dir)) {
fs.mkdirSync(dir, { recursive: true });
}
this.db = new Database(dbPath);
this.db.pragma('journal_mode = WAL'); // Write-Ahead Logging for better concurrency
this.db.pragma('synchronous = NORMAL'); // Balance between safety and performance
this.init();
}
init() {
// Create table if it doesn't exist
const createTable = this.db.prepare(`
CREATE TABLE IF NOT EXISTS storage (
key TEXT PRIMARY KEY,
value TEXT NOT NULL,
updated_at INTEGER NOT NULL
)
`);
createTable.run();
// Prepare statements for better performance
this.getStmt = this.db.prepare('SELECT value FROM storage WHERE key = ?');
this.setStmt = this.db.prepare(`
INSERT OR REPLACE INTO storage (key, value, updated_at)
VALUES (?, ?, ?)
`);
this.deleteStmt = this.db.prepare('DELETE FROM storage WHERE key = ?');
this.getAllStmt = this.db.prepare('SELECT key, value FROM storage');
}
async getItem(key) {
try {
const row = this.getStmt.get(key);
if (!row) return null;
return JSON.parse(row.value);
} catch (error) {
console.error(`Error getting item ${key}:`, error.message);
return null;
}
}
async setItem(key, value) {
try {
const jsonValue = JSON.stringify(value);
const timestamp = Date.now();
this.setStmt.run(key, jsonValue, timestamp);
console.log(`Stored data for key: ${key}`);
return true;
} catch (error) {
console.error(`Error setting item ${key}:`, error.message);
return false;
}
}
async removeItem(key) {
try {
this.deleteStmt.run(key);
return true;
} catch (error) {
console.error(`Error removing item ${key}:`, error.message);
return false;
}
}
async clear() {
try {
this.db.prepare('DELETE FROM storage').run();
return true;
} catch (error) {
console.error('Error clearing storage:', error.message);
return false;
}
}
async getAllKeys() {
try {
const rows = this.getAllStmt.all();
return rows.map(row => row.key);
} catch (error) {
console.error('Error getting all keys:', error.message);
return [];
}
}
// Batch operations for better performance
async setItems(items) {
const transaction = this.db.transaction((items) => {
const timestamp = Date.now();
for (const [key, value] of items) {
const jsonValue = JSON.stringify(value);
this.setStmt.run(key, jsonValue, timestamp);
}
});
try {
transaction(items);
console.log(`Batch stored ${items.length} items`);
return true;
} catch (error) {
console.error('Error in batch set:', error.message);
return false;
}
}
// Get storage statistics
getStats() {
try {
const countStmt = this.db.prepare('SELECT COUNT(*) as count FROM storage');
const sizeStmt = this.db.prepare('SELECT SUM(LENGTH(value)) as size FROM storage');
const count = countStmt.get().count;
const size = sizeStmt.get().size || 0;
return { count, size };
} catch (error) {
console.error('Error getting storage stats:', error.message);
return { count: 0, size: 0 };
}
}
close() {
if (this.db) {
this.db.close();
}
}
}
module.exports = Storage;

View File

@ -1,7 +1,7 @@
const { parentPort } = require("worker_threads");
const client = require("prom-client");
const storage = require("node-persist");
const ethers = require("ethers");
const Storage = require("./src/storage.js");
const { JsonRpcProvider } = require("ethers");
const {
CONF_API_L1_HTTP,
CONF_API_L1_KEY,
@ -12,8 +12,11 @@ const {
} = require("./src/config.js");
const taskQueue = [];
// Initialize Storage
const storage = new Storage();
// Initialize Ethereum Provider
const l1provider = new ethers.providers.JsonRpcProvider(
const l1provider = new JsonRpcProvider(
CONF_API_L1_HTTP + CONF_API_L1_KEY
);
@ -91,11 +94,11 @@ function sleep(ms) {
// Recover persistent storage
async function recoverStorage(initPrometheus) {
console.log("Retrieving state from storage.");
storedDomains = await storage.getItem("ensDomainCache");
const storedDomains = await storage.getItem("ensDomainCache");
if (storedDomains) {
ensDomainCache = storedDomains;
}
storedOrchs = await storage.getItem("orchCache");
const storedOrchs = await storage.getItem("orchCache");
if (storedOrchs) {
orchCache = storedOrchs;
}
@ -164,9 +167,7 @@ async function recoverStorage(initPrometheus) {
async function writeToStorage(key, data) {
try {
const serialized = JSON.stringify(data);
if (!serialized || !serialized.length) throw new Error("Invalid JSON data");
await storage.setItem(key, JSON.parse(serialized));
await storage.setItem(key, data);
console.log(`Stored data for key: ${key}`);
} catch (err) {
console.error(`Error writing to storage for key: ${key}`, err.message);
@ -174,23 +175,19 @@ async function writeToStorage(key, data) {
}
}
async function flushCachesToStorage() {
try {
if (staleENSCache) {
writeToStorage("ensDomainCache", ensDomainCache);
}
if (staleOrchCache) {
writeToStorage("orchCache", orchCache);
}
console.log("Flushed caches to storage.");
} catch (err) {
console.error("Error flushing caches to storage:", err.message);
}
}
process.on("SIGINT", async () => {
console.log("Shutting down worker...");
await flushCachesToStorage();
if (staleENSCache) {
console.log("Writing ENS cache to storage...");
await writeToStorage("ensDomainCache", ensDomainCache);
staleENSCache = false;
}
if (staleOrchCache) {
console.log("Writing orchestrator cache to storage...");
await writeToStorage("orchCache", orchCache);
staleOrchCache = false;
}
storage.close();
process.exit();
});
@ -662,15 +659,7 @@ parentPort.on("message", async (message) => {
// Initialize and Start Worker
(async () => {
console.log("Opening storage.");
await storage.init({
stringify: JSON.stringify,
parse: JSON.parse,
encoding: "utf8",
logging: false,
ttl: false,
forgiveParseErrors: false,
});
recoverStorage(true);
await recoverStorage(true);
console.log("Worker initialized.");
runTests();
processQueue();