diff --git a/backend/src/models/event.js b/backend/src/models/event.js index 25c74bd..3584ad7 100644 --- a/backend/src/models/event.js +++ b/backend/src/models/event.js @@ -21,6 +21,10 @@ const EventSchema = new mongoose.Schema({ type: Object, required: true }, + blockNumber: { + type: Number, + required: true + } }, { timestamps: true }); const Event = mongoose.model('Event', EventSchema); diff --git a/backend/src/routes/livepeer.js b/backend/src/routes/livepeer.js index 2603cde..ed1741d 100644 --- a/backend/src/routes/livepeer.js +++ b/backend/src/routes/livepeer.js @@ -68,8 +68,9 @@ let orchestratorCache = []; // Listen to smart contract emitters. Resync with DB every 5 minutes const timeoutEvents = 300000; let eventsCache = []; +let latestMissedDuringSync = 0; +let lastBlockDataAdded = 0; let syncCache = []; -let isSyncing = false; let eventsGet = 0; // Set to true to drop the entire collection on boot and get all events const fullSync = false; @@ -79,82 +80,134 @@ const BondingManagerTargetAbi = JSON.parse(BondingManagerTargetJson); const BondingManagerProxyAddr = "0x35Bcf3c30594191d53231E4FF333E8A770453e40"; const contractInstance = new web3layer2WS.eth.Contract(BondingManagerTargetAbi.abi, BondingManagerProxyAddr); +// If fullsync: drop collection on DB +if (fullSync) { + console.log("dropping old data due to full synchronization"); + Event.collection.drop(); +} +// Set special flag to make sure also get blocks that pass us by while we are syncing +let isSyncing = true; +let isSyncRunning = false; +// Start Listening for live updates var BondingManagerProxyListener = contractInstance.events.allEvents(async (error, event) => { try { if (error) { throw error } - console.log('New event emitted on', BondingManagerProxyAddr); + if (isSyncing) { + console.log('Received new Event on block ' + event.blockNumber + " during sync"); + } else { + console.log('Received new Event on block ' + event.blockNumber); + } // Push obj of event to cache and create a new entry for it in the DB const eventObj = { address: event.address, transactionHash: event.transactionHash, transactionUrl: "https://arbiscan.io/tx/" + event.transactionHash, name: event.event, - data: event.returnValues + data: event.returnValues, + blockNumber: event.blockNumber } - if(isSyncing){ - syncCache.push(eventObj); - }else{ + if(!isSyncing){ const dbObj = new Event(eventObj); await dbObj.save(); eventsCache.push(eventObj); + }else{ + syncCache.push(eventObj); } } catch (err) { console.log("FATAL ERROR: ", err); } }); -console.log("listening for events on", BondingManagerProxyAddr); +console.log("Listening for events on " + BondingManagerProxyAddr); -if(fullSync){ - console.log("dropping old data"); - Event.collection.drop(); - console.log("getting all events"); - isSyncing = true; - contractInstance.getPastEvents("allEvents", {fromBlock: 0, toBlock: 'latest'}, async (error, events) => { +// Does the syncing +const doSync = function () { + console.log("Starting sync process"); + isSyncRunning = true; + // Then do a sync from last found until latest known + contractInstance.getPastEvents("allEvents", { fromBlock: lastBlockDataAdded + 1, toBlock: 'latest' }, async (error, events) => { try { if (error) { throw error } - let counter = 1; let size = events.length; + console.log("Parsing " + size + " events"); for (const event of events) { - console.log("Parsing " + counter + " out of " + size + " events"); + if (event.blockNumber > lastBlockDataAdded) { + lastBlockDataAdded = event.blockNumber; + } const eventObj = { address: event.address, transactionHash: event.transactionHash, transactionUrl: "https://arbiscan.io/tx/" + event.transactionHash, name: event.event, - data: event.returnValues + data: event.returnValues, + blockNumber: event.blockNumber } const dbObj = new Event(eventObj); await dbObj.save(); eventsCache.push(eventObj); - counter++; } } catch (err) { console.log("FATAL ERROR: ", err); } - isSyncing = false; - let counter = 1; - let size = syncCache.length; - for (const event of syncCache) { - console.log("Parsing " + counter + " out of " + size + " events received while syncing"); - const eventObj = { - address: event.address, - transactionHash: event.transactionHash, - transactionUrl: "https://arbiscan.io/tx/" + event.transactionHash, - name: event.event, - data: event.returnValues - } + isSyncRunning = false; + }); +} +function sleep(ms) { + return new Promise((resolve) => { + setTimeout(resolve, ms); + }); +} + +const handleSync = async function () { + // First collection -> cache + eventsCache = await Event.find({}, { + address: 1, + transactionHash: 1, + transactionUrl: 1, + name: 1, + data: 1, + blockNumber: 1, + _id: 0 + }); + console.log("Retrieved existing Events of size " + eventsCache.length); + // Then determine latest block number parsed based on collection + for (var idx = 0; idx < eventsCache.length; idx++) { + const thisBlock = eventsCache[idx]; + if (thisBlock.blockNumber > lastBlockDataAdded) { + lastBlockDataAdded = thisBlock.blockNumber; + } + } + // Get latest block in chain + const latestBlock = await web3layer2.eth.getBlockNumber(); + if (latestBlock > latestMissedDuringSync) { + latestMissedDuringSync = latestBlock; + } + console.log("Parsed up to block " + lastBlockDataAdded + " out of " + latestMissedDuringSync + " blocks"); + doSync(); + while (isSyncRunning) { + await sleep(1000); + console.log("Parsed " + lastBlockDataAdded + " out of " + latestMissedDuringSync + " blocks"); + } + while(syncCache.length){ + const liveEvents = syncCache; + syncCache = []; + for (const eventObj of liveEvents) { + console.log("Parsing event received while syncing"); const dbObj = new Event(eventObj); await dbObj.save(); eventsCache.push(eventObj); - counter++; } - }); + } + console.log('done syncing') + isSyncing = false; +}; +if (!isSyncRunning) { + handleSync(); } // Splits of raw CMC object into coin quote data @@ -414,7 +467,7 @@ const parseOrchestrator = async function (reqAddr) { apiRouter.get("/getOrchestrator", async (req, res) => { try { let reqOrch = req.query.orch; - if (!reqOrch || reqOrch == ""){ + if (!reqOrch || reqOrch == "") { reqOrch = defaultOrch; } const reqObj = await parseOrchestrator(reqOrch); diff --git a/src/eventButton.js b/src/eventButton.js index 7460c5b..1b63008 100644 --- a/src/eventButton.js +++ b/src/eventButton.js @@ -36,10 +36,10 @@ const EventButton = (obj) => { return (