From 95be1e5614e03afce756fb36cfdfcebd66152b79 Mon Sep 17 00:00:00 2001 From: Marco van Dijk Date: Thu, 21 Apr 2022 10:48:12 +0200 Subject: [PATCH] Backend rework complete --- backend/src/models/ActivateEvent.js | 3 +- backend/src/models/StakeEvent.js | 2 +- backend/src/models/TransferEvent.js | 2 +- backend/src/models/WithdrawFeesEvent.js | 6 +- backend/src/models/WithdrawStakeEvent.js | 6 +- backend/src/models/monthlyStat.js | 59 +- backend/src/routes/livepeer.js | 655 +++++++++++++---------- 7 files changed, 434 insertions(+), 299 deletions(-) diff --git a/backend/src/models/ActivateEvent.js b/backend/src/models/ActivateEvent.js index 0bef8ea..0f69553 100644 --- a/backend/src/models/ActivateEvent.js +++ b/backend/src/models/ActivateEvent.js @@ -11,8 +11,7 @@ const ActivateEventSchema = new mongoose.Schema({ }, round: { type: Number, - required: false, - default: true + required: true }, transactionHash: { type: String, diff --git a/backend/src/models/StakeEvent.js b/backend/src/models/StakeEvent.js index 56a8c20..a3be915 100644 --- a/backend/src/models/StakeEvent.js +++ b/backend/src/models/StakeEvent.js @@ -7,7 +7,7 @@ const StakeEventSchema = new mongoose.Schema({ }, from: { type: String, - required: true + required: false }, to: { type: String, diff --git a/backend/src/models/TransferEvent.js b/backend/src/models/TransferEvent.js index 77f2392..19f4526 100644 --- a/backend/src/models/TransferEvent.js +++ b/backend/src/models/TransferEvent.js @@ -10,7 +10,7 @@ const TransferEventSchema = new mongoose.Schema({ required: true }, amount: { - type: number, + type: Number, required: true }, transactionHash: { diff --git a/backend/src/models/WithdrawFeesEvent.js b/backend/src/models/WithdrawFeesEvent.js index 0129d47..a58e14c 100644 --- a/backend/src/models/WithdrawFeesEvent.js +++ b/backend/src/models/WithdrawFeesEvent.js @@ -1,6 +1,6 @@ import mongoose from 'mongoose'; -const WithdrawEventSchema = new mongoose.Schema({ +const WithdrawFeesEventSchema = new mongoose.Schema({ address: { type: String, required: true @@ -23,5 +23,5 @@ const WithdrawEventSchema = new mongoose.Schema({ } }, { timestamps: false }); -const WithdrawEvent = mongoose.model('WithdrawEvent', WithdrawEventSchema); -export default WithdrawEvent; \ No newline at end of file +const WithdrawFeesEvent = mongoose.model('WithdrawFeesEvent', WithdrawFeesEventSchema); +export default WithdrawFeesEvent; \ No newline at end of file diff --git a/backend/src/models/WithdrawStakeEvent.js b/backend/src/models/WithdrawStakeEvent.js index 345a7f1..22b6b27 100644 --- a/backend/src/models/WithdrawStakeEvent.js +++ b/backend/src/models/WithdrawStakeEvent.js @@ -1,6 +1,6 @@ import mongoose from 'mongoose'; -const WithdrawEventSchema = new mongoose.Schema({ +const WithdrawStakeEventSchema = new mongoose.Schema({ address: { type: String, required: true @@ -27,5 +27,5 @@ const WithdrawEventSchema = new mongoose.Schema({ } }, { timestamps: false }); -const WithdrawEvent = mongoose.model('WithdrawEvent', WithdrawEventSchema); -export default WithdrawEvent; \ No newline at end of file +const WithdrawStakeEvent = mongoose.model('WithdrawStakeEvent', WithdrawStakeEventSchema); +export default WithdrawStakeEvent; \ No newline at end of file diff --git a/backend/src/models/monthlyStat.js b/backend/src/models/monthlyStat.js index a08d4f9..1424fe9 100644 --- a/backend/src/models/monthlyStat.js +++ b/backend/src/models/monthlyStat.js @@ -1,5 +1,54 @@ import mongoose from 'mongoose'; +const WinningTicketSchema = new mongoose.Schema({ + address: { + type: String, + required: true + }, + sum: { + type: Number, + required: true + }, + count: { + type: Number, + required: true + }, + }); + + const OrchCommissionSchema = new mongoose.Schema({ + address: { + type: String, + required: true + }, + feeCommission: { + type: Number, + required: true + }, + rewardCommission: { + type: Number, + required: true + }, + timestamp: { + type: Number, + required: true + } + }); + + const OrchStakeSchema = new mongoose.Schema({ + address: { + type: String, + required: true + }, + totalStake: { + type: Number, + required: true + }, + timestamp: { + type: Number, + required: true + } + }); + const MonthlyStatSchema = new mongoose.Schema({ // Static props year: { @@ -123,12 +172,12 @@ const MonthlyStatSchema = new mongoose.Schema({ default: 0 }, winningTicketsReceived: { - type: [Object], + type: [WinningTicketSchema], required: false, default: [] }, winningTicketsSent: { - type: [Object], + type: [WinningTicketSchema], required: false, default: [] }, @@ -144,18 +193,18 @@ const MonthlyStatSchema = new mongoose.Schema({ default: 0 }, winningTicketsRedeemed: { - type: [Object], + type: [WinningTicketSchema], required: false, default: [] }, // History for theGraph current data for that month latestCommission: { - type: [Object], + type: [OrchCommissionSchema], required: false, default: null }, latestTotalStake: { - type: [Object], + type: [OrchStakeSchema], required: false, default: null }, diff --git a/backend/src/routes/livepeer.js b/backend/src/routes/livepeer.js index 7a1a1aa..fce4bac 100644 --- a/backend/src/routes/livepeer.js +++ b/backend/src/routes/livepeer.js @@ -16,13 +16,20 @@ import WithdrawStakeEvent from "../models/WithdrawStakeEvent"; const apiRouter = express.Router(); import { - API_CMC, API_L1_HTTP, API_L2_HTTP, API_L2_WS, + API_CMC, API_L1_HTTP, API_L2_HTTP, CONF_DEFAULT_ORCH, CONF_SIMPLE_MODE, CONF_TIMEOUT_CMC, - CONF_TIMEOUT_ALCHEMY, CONF_TIMEOUT_LIVEPEER, CONF_DISABLE_SYNC, + CONF_TIMEOUT_ALCHEMY, CONF_TIMEOUT_LIVEPEER, CONF_DISABLE_DB, CONF_DISABLE_CMC, CONF_TIMEOUT_ENS_DOMAIN, - CONF_TIMEOUT_ENS_INFO, CONF_DISABLE_ENS + CONF_TIMEOUT_ENS_INFO } from "../config"; +/* + +INIT +imported modules + +*/ + // Do API requests to other API's const https = require('https'); @@ -51,12 +58,9 @@ if (!CONF_DISABLE_CMC) { // Gets blockchain data const { createAlchemyWeb3 } = require("@alch/alchemy-web3"); +console.log("Connecting to HTTP RPC's"); const web3layer1 = createAlchemyWeb3(API_L1_HTTP); const web3layer2 = createAlchemyWeb3(API_L2_HTTP); -let web3layer2WS; -if (!CONF_SIMPLE_MODE) { - web3layer2WS = createAlchemyWeb3(API_L2_WS); -} // ENS stuff TODO: CONF_DISABLE_ENS const { ethers } = require("ethers"); @@ -73,18 +77,25 @@ let TicketBrokerTargetAbi; let TicketBrokerTargetAddr; let ticketBrokerContract; if (!CONF_SIMPLE_MODE) { + console.log("Loading contracts for smart contract events"); // Listen for events on the bonding manager contract BondingManagerTargetJson = fs.readFileSync('src/abi/BondingManagerTarget.json'); BondingManagerTargetAbi = JSON.parse(BondingManagerTargetJson); BondingManagerProxyAddr = "0x35Bcf3c30594191d53231E4FF333E8A770453e40"; - bondingManagerContract = new web3layer2WS.eth.Contract(BondingManagerTargetAbi.abi, BondingManagerProxyAddr); + bondingManagerContract = new web3layer2.eth.Contract(BondingManagerTargetAbi.abi, BondingManagerProxyAddr); // Listen for events on the ticket broker contract TicketBrokerTargetJson = fs.readFileSync('src/abi/TicketBrokerTarget.json'); TicketBrokerTargetAbi = JSON.parse(TicketBrokerTargetJson); TicketBrokerTargetAddr = "0xa8bB618B1520E284046F3dFc448851A1Ff26e41B"; - ticketBrokerContract = new web3layer2WS.eth.Contract(TicketBrokerTargetAbi.abi, TicketBrokerTargetAddr); + ticketBrokerContract = new web3layer2.eth.Contract(TicketBrokerTargetAbi.abi, TicketBrokerTargetAddr); } +/* + +GLOBAL helper functions + +*/ + function sleep(ms) { return new Promise((resolve) => { setTimeout(resolve, ms); @@ -95,6 +106,12 @@ function sleep(ms) { BLOCKCHAIN BLOCKS Stored in mongoDB (block.js) and local cache +Contains a mapping of blockNumber -> blockTime +so that we can attach timestamps to events + +Currently all blocks get loaded from the DB once +on server boot, so if it is not cached, we can +assume it is not in DB */ @@ -131,21 +148,16 @@ Summarized stats stored in mongoDB as monthlyStat.js and local cache */ -// Set special flag to make sure also get blocks that pass us by while we are syncing -let isSyncing = true; +let startedInitSync = false; +let isSyncing = false; let isEventSyncing = false; let isTicketSyncing = false; -// Listening for live updates -var BondingManagerProxyListener; -var TicketBrokerProxyListener; let eventsCache = []; let latestBlockInChain = 0; let lastBlockEvents = 0; let lastBlockTickets = 0; -let syncCache = []; let ticketsCache = []; -let ticketsSyncCache = []; let updateEventCache = []; let rewardEventCache = []; @@ -170,9 +182,9 @@ const updateMonthlyReward = async function (blockTime, amount) { var dateObj = new Date(0); dateObj.setUTCSeconds(blockTime); // Determine year, month and name - const thisMonth = dateObj.getMonth(); + const thisMonth = dateObj.getMonth() ; const thisYear = dateObj.getFullYear(); - console.log("Updating monthly Reward stats for " + thisYear + "-" + thisMonth + 1); + console.log("Updating monthly Reward stats for " + thisYear + "-" + thisMonth); if (!CONF_DISABLE_DB) { // Update DB entry const doc = await MonthlyStat.findOneAndUpdate({ @@ -203,9 +215,9 @@ const updateMonthlyClaim = async function (blockTime, fees, rewards) { var dateObj = new Date(0); dateObj.setUTCSeconds(blockTime); // Determine year, month and name - const thisMonth = dateObj.getMonth(); + const thisMonth = dateObj.getMonth() ; const thisYear = dateObj.getFullYear(); - console.log("Updating monthly Claim stats for " + thisYear + "-" + thisMonth + 1); + console.log("Updating monthly Claim stats for " + thisYear + "-" + thisMonth); if (!CONF_DISABLE_DB) { // Update DB entry const doc = await MonthlyStat.findOneAndUpdate({ @@ -238,9 +250,9 @@ const updateMonthlyWithdrawStake = async function (blockTime, amount) { var dateObj = new Date(0); dateObj.setUTCSeconds(blockTime); // Determine year, month and name - const thisMonth = dateObj.getMonth(); + const thisMonth = dateObj.getMonth() ; const thisYear = dateObj.getFullYear(); - console.log("Updating monthly WithdrawStake stats for " + thisYear + "-" + thisMonth + 1); + console.log("Updating monthly WithdrawStake stats for " + thisYear + "-" + thisMonth); if (!CONF_DISABLE_DB) { // Update DB entry const doc = await MonthlyStat.findOneAndUpdate({ @@ -271,9 +283,9 @@ const updateMonthlyWithdrawFees = async function (blockTime, amount) { var dateObj = new Date(0); dateObj.setUTCSeconds(blockTime); // Determine year, month and name - const thisMonth = dateObj.getMonth(); + const thisMonth = dateObj.getMonth() ; const thisYear = dateObj.getFullYear(); - console.log("Updating monthly WithdrawFees stats for " + thisYear + "-" + thisMonth + 1); + console.log("Updating monthly WithdrawFees stats for " + thisYear + "-" + thisMonth); if (!CONF_DISABLE_DB) { // Update DB entry const doc = await MonthlyStat.findOneAndUpdate({ @@ -304,9 +316,9 @@ const updateMonthlyNewDelegator = async function (blockTime, amount) { var dateObj = new Date(0); dateObj.setUTCSeconds(blockTime); // Determine year, month and name - const thisMonth = dateObj.getMonth(); + const thisMonth = dateObj.getMonth() ; const thisYear = dateObj.getFullYear(); - console.log("Updating monthly new Delegator stats for " + thisYear + "-" + thisMonth + 1); + console.log("Updating monthly new Delegator stats for " + thisYear + "-" + thisMonth); if (!CONF_DISABLE_DB) { // Update DB entry const doc = await MonthlyStat.findOneAndUpdate({ @@ -337,9 +349,9 @@ const updateMonthlyUnbond = async function (blockTime, amount) { var dateObj = new Date(0); dateObj.setUTCSeconds(blockTime); // Determine year, month and name - const thisMonth = dateObj.getMonth(); + const thisMonth = dateObj.getMonth() ; const thisYear = dateObj.getFullYear(); - console.log("Updating monthly new Unbond stats for " + thisYear + "-" + thisMonth + 1); + console.log("Updating monthly new Unbond stats for " + thisYear + "-" + thisMonth); if (!CONF_DISABLE_DB) { // Update DB entry const doc = await MonthlyStat.findOneAndUpdate({ @@ -370,9 +382,9 @@ const updateMonthlyReactivated = async function (blockTime, amount) { var dateObj = new Date(0); dateObj.setUTCSeconds(blockTime); // Determine year, month and name - const thisMonth = dateObj.getMonth(); + const thisMonth = dateObj.getMonth() + 1; const thisYear = dateObj.getFullYear(); - console.log("Updating monthly new reactivation stats for " + thisYear + "-" + thisMonth + 1); + console.log("Updating monthly new reactivation stats for " + thisYear + "-" + thisMonth); if (!CONF_DISABLE_DB) { // Update DB entry const doc = await MonthlyStat.findOneAndUpdate({ @@ -401,9 +413,9 @@ const updateMonthlyActivation = async function (blockTime, amount) { var dateObj = new Date(0); dateObj.setUTCSeconds(blockTime); // Determine year, month and name - const thisMonth = dateObj.getMonth(); + const thisMonth = dateObj.getMonth() ; const thisYear = dateObj.getFullYear(); - console.log("Updating monthly new activation stats for " + thisYear + "-" + thisMonth + 1); + console.log("Updating monthly new activation stats for " + thisYear + "-" + thisMonth); if (!CONF_DISABLE_DB) { // Update DB entry const doc = await MonthlyStat.findOneAndUpdate({ @@ -434,9 +446,9 @@ const updateMonthlyMoveStake = async function (blockTime, amount) { var dateObj = new Date(0); dateObj.setUTCSeconds(blockTime); // Determine year, month and name - const thisMonth = dateObj.getMonth(); + const thisMonth = dateObj.getMonth() ; const thisYear = dateObj.getFullYear(); - console.log("Updating monthly stake movement stats for " + thisYear + "-" + thisMonth + 1); + console.log("Updating monthly stake movement stats for " + thisYear + "-" + thisMonth); if (!CONF_DISABLE_DB) { // Update DB entry const doc = await MonthlyStat.findOneAndUpdate({ @@ -467,9 +479,9 @@ const updateMonthlyTicketReceived = async function (blockTime, amount, from, to) var dateObj = new Date(0); dateObj.setUTCSeconds(blockTime); // Determine year, month and name - const thisMonth = dateObj.getMonth(); + const thisMonth = dateObj.getMonth() ; const thisYear = dateObj.getFullYear(); - console.log("Updating monthly ticket received stats for " + thisYear + "-" + thisMonth + 1); + console.log("Updating monthly ticket received stats for " + thisYear + "-" + thisMonth); if (!CONF_DISABLE_DB) { // Update DB entry const doc = await MonthlyStat.findOneAndUpdate({ @@ -493,13 +505,11 @@ const updateMonthlyTicketReceived = async function (blockTime, amount, from, to) await MonthlyStat.updateOne({ year: thisYear, month: thisMonth, - 'winningTicketsReceived.address': { '$ne': to } + 'winningTicketsReceived.address': to }, { - $inc: { - 'winningTicketsReceived': { - sum: amount, - count: 1 - } + $set: { + 'winningTicketsReceived.$.sum': amount + eventObj.sum, + 'winningTicketsReceived.$.count': 1 + eventObj.count, } }); hasModified = true; @@ -524,19 +534,17 @@ const updateMonthlyTicketReceived = async function (blockTime, amount, from, to) } // Check to see if the doc's embedded winningTicketsSent already contains this address hasModified = false; - for (const eventObj of doc.winningTicketsSent) { + for (var eventObj of doc.winningTicketsSent) { // If so, update that entry in winningTicketsSent if (eventObj.address == from) { await MonthlyStat.updateOne({ year: thisYear, month: thisMonth, - 'winningTicketsSent.address': { '$ne': from } + 'winningTicketsSent.address': from }, { - $inc: { - 'winningTicketsSent': { - sum: amount, - count: 1 - } + $set: { + 'winningTicketsSent.$.sum': amount + eventObj.sum, + 'winningTicketsSent.$.count': 1 + eventObj.count, } }); hasModified = true; @@ -560,26 +568,26 @@ const updateMonthlyTicketReceived = async function (blockTime, amount, from, to) }); } } -} -// Update cached entry if it is cached -for (var idx = 0; idx < monthlyStatCache.length; idx++) { - if (monthlyStatCache[idx].year == thisYear && monthlyStatCache[idx].month == thisMonth) { - monthlyStatCache[idx].winningTicketsReceivedCount += 1; - monthlyStatCache[idx].winningTicketsReceivedSum += amount; - // Check to see if the doc's embedded winningTicketsReceived already contains this address - for (var idx2 = 0; idx2 < monthlyStatCache[idx].winningTicketsReceived.length; idx2++) { - if (monthlyStatCache[idx].winningTicketsReceived[idx2].address == to) { - monthlyStatCache[idx].winningTicketsReceived[idx2].count += 1; - monthlyStatCache[idx].winningTicketsReceived[idx2].sum += amount; - break; + // Update cached entry if it is cached + for (var idx = 0; idx < monthlyStatCache.length; idx++) { + if (monthlyStatCache[idx].year == thisYear && monthlyStatCache[idx].month == thisMonth) { + monthlyStatCache[idx].winningTicketsReceivedCount += 1; + monthlyStatCache[idx].winningTicketsReceivedSum += amount; + // Check to see if the doc's embedded winningTicketsReceived already contains this address + for (var idx2 = 0; idx2 < monthlyStatCache[idx].winningTicketsReceived.length; idx2++) { + if (monthlyStatCache[idx].winningTicketsReceived[idx2].address == to) { + monthlyStatCache[idx].winningTicketsReceived[idx2].count += 1; + monthlyStatCache[idx].winningTicketsReceived[idx2].sum += amount; + break; + } } - } - // Check to see if the doc's embedded winningTicketsSent already contains this address - for (var idx2 = 0; idx2 < monthlyStatCache[idx].winningTicketsSent.length; idx2++) { - if (monthlyStatCache[idx].winningTicketsSent[idx2].address == from) { - monthlyStatCache[idx].winningTicketsSent[idx2].count += 1; - monthlyStatCache[idx].winningTicketsSent[idx2].sum += amount; - break; + // Check to see if the doc's embedded winningTicketsSent already contains this address + for (var idx2 = 0; idx2 < monthlyStatCache[idx].winningTicketsSent.length; idx2++) { + if (monthlyStatCache[idx].winningTicketsSent[idx2].address == from) { + monthlyStatCache[idx].winningTicketsSent[idx2].count += 1; + monthlyStatCache[idx].winningTicketsSent[idx2].sum += amount; + break; + } } } } @@ -589,9 +597,9 @@ const updateMonthlyTicketRedeemed = async function (blockTime, amount, address) var dateObj = new Date(0); dateObj.setUTCSeconds(blockTime); // Determine year, month and name - const thisMonth = dateObj.getMonth(); + const thisMonth = dateObj.getMonth() ; const thisYear = dateObj.getFullYear(); - console.log("Updating monthly ticket redeemed stats for " + thisYear + "-" + thisMonth + 1); + console.log("Updating monthly ticket redeemed stats for " + thisYear + "-" + thisMonth); if (!CONF_DISABLE_DB) { // Update DB entry const doc = await MonthlyStat.findOneAndUpdate({ @@ -615,13 +623,11 @@ const updateMonthlyTicketRedeemed = async function (blockTime, amount, address) await MonthlyStat.updateOne({ year: thisYear, month: thisMonth, - 'winningTicketsRedeemed.address': { '$ne': address } + 'winningTicketsRedeemed.address': address }, { - $inc: { - 'winningTicketsRedeemed': { - sum: amount, - count: 1 - } + $set: { + 'winningTicketsRedeemed.$.sum': amount + eventObj.sum, + 'winningTicketsRedeemed.$.count': 1 + eventObj.count, } }); hasModified = true; @@ -645,18 +651,18 @@ const updateMonthlyTicketRedeemed = async function (blockTime, amount, address) }); } } -} -// Update cached entry if it is cached -for (var idx = 0; idx < monthlyStatCache.length; idx++) { - if (monthlyStatCache[idx].year == thisYear && monthlyStatCache[idx].month == thisMonth) { - monthlyStatCache[idx].winningTicketsRedeemedCount += 1; - monthlyStatCache[idx].winningTicketsRedeemedSum += amount; - // Check to see if the doc's embedded winningTicketsRedeemed already contains this address - for (var idx2 = 0; idx2 < monthlyStatCache[idx].winningTicketsRedeemed.length; idx2++) { - if (monthlyStatCache[idx].winningTicketsRedeemed[idx2].address == address) { - monthlyStatCache[idx].winningTicketsRedeemed[idx2].count += 1; - monthlyStatCache[idx].winningTicketsRedeemed[idx2].sum += amount; - break; + // Update cached entry if it is cached + for (var idx = 0; idx < monthlyStatCache.length; idx++) { + if (monthlyStatCache[idx].year == thisYear && monthlyStatCache[idx].month == thisMonth) { + monthlyStatCache[idx].winningTicketsRedeemedCount += 1; + monthlyStatCache[idx].winningTicketsRedeemedSum += amount; + // Check to see if the doc's embedded winningTicketsRedeemed already contains this address + for (var idx2 = 0; idx2 < monthlyStatCache[idx].winningTicketsRedeemed.length; idx2++) { + if (monthlyStatCache[idx].winningTicketsRedeemed[idx2].address == address) { + monthlyStatCache[idx].winningTicketsRedeemed[idx2].count += 1; + monthlyStatCache[idx].winningTicketsRedeemed[idx2].sum += amount; + break; + } } } } @@ -780,7 +786,6 @@ const parseAnyEvent = async function (thisEvent) { } // Parse [Bond, Rebond, Unbond, TransferBond, TranscoderActivated] raw events into mongoDB object -let parseTimeout = 3000; let lastTx = ""; let lastTxTime = 0; let parseCache = []; @@ -791,6 +796,9 @@ const parseSequenceEvent = async function () { let eventTo = ""; // address to which X gets sent let eventAmount = 0; let eventWhen = ""; + let currentTx = ""; + let currentBlock = 0; + let currentTime = 0; let eventContainsBond = false; let eventContainsTranscoderActivated = false; let eventContainsUnbond = false; @@ -803,6 +811,11 @@ const parseSequenceEvent = async function () { parseCache = []; // Go through each event and merge their data for (const eventObj of eventSequence) { + if (currentTx === "") { + currentTx = eventObj.transactionHash; + currentBlock = eventObj.blockNumber; + currentTime = eventObj.blockTime; + } const thisName = eventObj.name; if (thisName === "Unbond") { eventContainsUnbond = true; @@ -849,15 +862,15 @@ const parseSequenceEvent = async function () { from: eventFrom, to: eventTo, stake: eventAmount, - transactionHash: thisEvent.transactionHash, - blockNumber: thisEvent.blockNumber, - blockTime: thisEvent.blockTime + transactionHash: currentTx, + blockNumber: currentBlock, + blockTime: currentTime } if (!CONF_DISABLE_DB) { const dbObj = new StakeEvent(eventObj); await dbObj.save(); } - updateMonthlyMoveStake(eventObj.blockTime, eventObj.amount); + updateMonthlyMoveStake(eventObj.blockTime, eventObj.stake); stakeEventCache.push(eventObj); } else if (eventContainsBond && eventContainsTranscoderActivated) { console.log('Parsing TranscoderActivated sequence event'); @@ -866,15 +879,15 @@ const parseSequenceEvent = async function () { address: eventCaller, initialStake: eventAmount, round: eventWhen, - transactionHash: thisEvent.transactionHash, - blockNumber: thisEvent.blockNumber, - blockTime: thisEvent.blockTime + transactionHash: currentTx, + blockNumber: currentBlock, + blockTime: currentTime } if (!CONF_DISABLE_DB) { const dbObj = new ActivateEvent(eventObj); await dbObj.save(); } - updateMonthlyActivation(eventObj.blockTime, eventObj.amount); + updateMonthlyActivation(eventObj.blockTime, eventObj.initialStake); activateEventCache.push(eventObj); } else if (eventContainsTranscoderActivated) { console.log('Parsing lone TranscoderActivated sequence event'); @@ -882,9 +895,9 @@ const parseSequenceEvent = async function () { const eventObj = { address: eventCaller, round: eventWhen, - transactionHash: thisEvent.transactionHash, - blockNumber: thisEvent.blockNumber, - blockTime: thisEvent.blockTime + transactionHash: currentTx, + blockNumber: currentBlock, + blockTime: currentTime } if (!CONF_DISABLE_DB) { const dbObj = new ActivateEvent(eventObj); @@ -900,15 +913,15 @@ const parseSequenceEvent = async function () { from: eventFrom, stake: eventAmount, round: eventWhen, - transactionHash: thisEvent.transactionHash, - blockNumber: thisEvent.blockNumber, - blockTime: thisEvent.blockTime + transactionHash: currentTx, + blockNumber: currentBlock, + blockTime: currentTime } if (!CONF_DISABLE_DB) { const dbObj = new UnbondEvent(eventObj); await dbObj.save(); } - updateMonthlyUnbond(eventObj.blockTime, eventObj.amount); + updateMonthlyUnbond(eventObj.blockTime, eventObj.stake); unbondEventCache.push(eventObj); } else if (eventContainsBond) { console.log('Parsing lone bond sequence event'); @@ -918,27 +931,26 @@ const parseSequenceEvent = async function () { from: eventFrom, // Should be 0x0000000000000000000000000000000000000000 to: eventTo, stake: eventAmount, - transactionHash: thisEvent.transactionHash, - blockNumber: thisEvent.blockNumber, - blockTime: thisEvent.blockTime + transactionHash: currentTx, + blockNumber: currentBlock, + blockTime: currentTime } if (!CONF_DISABLE_DB) { const dbObj = new StakeEvent(eventObj); await dbObj.save(); } - updateMonthlyNewDelegator(eventObj.blockTime, eventObj.amount); + updateMonthlyNewDelegator(eventObj.blockTime, eventObj.stake); stakeEventCache.push(eventObj); } else if (eventContainsRebond) { console.log('Parsing lone rebond sequence event'); // Lone Rebond: delegator increased their stake (Stake event) const eventObj = { address: eventCaller, - from: eventFrom, // Should be the same as eventTo to: eventTo, stake: eventAmount, - transactionHash: thisEvent.transactionHash, - blockNumber: thisEvent.blockNumber, - blockTime: thisEvent.blockTime + transactionHash: currentTx, + blockNumber: currentBlock, + blockTime: currentTime } if (!CONF_DISABLE_DB) { const dbObj = new StakeEvent(eventObj); @@ -951,22 +963,6 @@ const parseSequenceEvent = async function () { } } -// Angel process which calls parseSequenceEvent after parseTimeout and if lastTime > 0 -const watchSequenceEvents = async function () { - while (true) { - if (lastTxTime > 0) { - const now = new Date().getTime(); - if (now - lastTxTime > parseTimeout) { - parseSequenceEvent(); - } - } - sleep(1000); - } -} -if (!CONF_SIMPLE_MODE) { - watchSequenceEvents(); -} - // Passes incoming event into parseAnyEvent or into parseCache const onNewEvent = async function (thisEvent) { const thisName = thisEvent.name; @@ -983,121 +979,26 @@ const onNewEvent = async function (thisEvent) { /* -SMART CONTRACT EVENTS - LIVE DATA - -*/ - -if (!CONF_SIMPLE_MODE) { - BondingManagerProxyListener = bondingManagerContract.events.allEvents(async (error, event) => { - try { - if (error) { - throw error - } - if (isSyncing) { - console.log('Received new Event on block ' + event.blockNumber + " during sync"); - } else { - console.log('Received new Event on block ' + event.blockNumber); - } - const thisBlock = await getBlock(event.blockNumber); - // Push obj of event to cache and create a new entry for it in the DB - const eventObj = { - address: event.address, - transactionHash: event.transactionHash, - transactionUrl: "https://arbiscan.io/tx/" + event.transactionHash, - name: event.event, - data: event.returnValues, - blockNumber: thisBlock.number, - blockTime: thisBlock.timestamp - } - if (!isSyncing) { - if (!CONF_DISABLE_DB) { - const dbObj = new Event(eventObj); - await dbObj.save(); - } - eventsCache.push(eventObj); - - // Parse old sequence events if TX changes - if (lastTx != event.transactionHash && parseCache.length) { - parseSequenceEvent(); - } - lastTx = event.transactionHash; - // Parse current Event - onNewEvent(eventObj); - - } else { - syncCache.push(eventObj); - } - } - catch (err) { - console.log("FATAL ERROR: ", err); - } - }); - console.log("Listening for events on " + BondingManagerProxyAddr); - TicketBrokerProxyListener = ticketBrokerContract.events.allEvents(async (error, event) => { - try { - if (error) { - throw error - } - if (isSyncing) { - console.log('Received new ticket event on block ' + event.blockNumber + " during sync"); - } else { - console.log('Received new ticket event on block ' + event.blockNumber); - } - const thisBlock = await getBlock(event.blockNumber); - // Push obj of event to cache and create a new entry for it in the DB - const eventObj = { - address: event.address, - transactionHash: event.transactionHash, - transactionUrl: "https://arbiscan.io/tx/" + event.transactionHash, - name: event.event, - data: event.returnValues, - blockNumber: thisBlock.number, - blockTime: thisBlock.timestamp - } - if (!isSyncing) { - if (!CONF_DISABLE_DB) { - const dbObj = new Ticket(eventObj); - await dbObj.save(); - } - ticketsCache.push(eventObj); - - // Parse old sequence events if TX changes - if (lastTx != event.transactionHash && parseCache.length) { - parseSequenceEvent(); - } - lastTx = event.transactionHash; - // Parse current Event - onNewEvent(eventObj); - - } else { - ticketsSyncCache.push(eventObj); - } - } - catch (err) { - console.log("FATAL ERROR: ", err); - } - }); - console.log("Listening for tickets on " + TicketBrokerTargetAddr); -} - -/* - -SMART CONTRACT EVENTS - SYNC OF MISSED BLOCKS +SMART CONTRACT EVENTS - SYNC BLOCKS */ // Syncs events database -const syncEvents = function () { - console.log("Starting sync process for Bonding Manager events"); +const syncEvents = function (toBlock) { + console.log("Starting sync process for Bonding Manager events to block " + toBlock); isEventSyncing = true; + let lastTxSynced = 0; // Then do a sync from last found until latest known - bondingManagerContract.getPastEvents("allEvents", { fromBlock: lastBlockEvents + 1, toBlock: 'latest' }, async (error, events) => { + bondingManagerContract.getPastEvents("allEvents", { fromBlock: lastBlockEvents + 1, toBlock: toBlock }, async (error, events) => { try { if (error) { throw error } let size = events.length; console.log("Parsing " + size + " events"); + if (!size) { + lastBlockEvents = toBlock; + } for (const event of events) { if (event.blockNumber > lastBlockEvents) { lastBlockEvents = event.blockNumber; @@ -1117,6 +1018,18 @@ const syncEvents = function () { await dbObj.save(); } eventsCache.push(eventObj); + + // Parse old sequence events if TX changes + if (lastTxSynced != event.transactionHash && parseCache.length) { + parseSequenceEvent(); + } + lastTxSynced = event.transactionHash; + // Parse current Event + onNewEvent(eventObj); + } + // Parse old sequence events if we have parsed all events in requested blocks + if (parseCache.length) { + parseSequenceEvent(); } } catch (err) { @@ -1126,17 +1039,20 @@ const syncEvents = function () { }); } // Syncs tickets database -const syncTickets = function () { - console.log("Starting sync process for Ticket Broker events"); +const syncTickets = function (toBlock) { + console.log("Starting sync process for Ticket Broker events to block " + toBlock); isTicketSyncing = true; // Then do a sync from last found until latest known - ticketBrokerContract.getPastEvents("allEvents", { fromBlock: lastBlockTickets + 1, toBlock: 'latest' }, async (error, events) => { + ticketBrokerContract.getPastEvents("allEvents", { fromBlock: lastBlockTickets + 1, toBlock: toBlock }, async (error, events) => { try { if (error) { throw error } let size = events.length; console.log("Parsing " + size + " tickets"); + if (!size) { + lastBlockTickets = toBlock; + } for (const event of events) { if (event.blockNumber > lastBlockTickets) { lastBlockTickets = event.blockNumber; @@ -1156,6 +1072,8 @@ const syncTickets = function () { await dbObj.save(); } ticketsCache.push(eventObj); + // Parse current Event + onNewEvent(eventObj); } } catch (err) { @@ -1165,7 +1083,9 @@ const syncTickets = function () { }); } -const handleSync = async function () { +// Retrieves stuff from DB on first boot +const initSync = async function () { + startedInitSync = true; // First collection -> cache // Get all parsed blocks blockCache = await Block.find({}, { @@ -1184,7 +1104,7 @@ const handleSync = async function () { blockTime: 1, _id: 0 }); - console.log("Retrieved existing Events of size " + eventsCache.length); + console.log("Retrieved existing raw Events of size " + eventsCache.length); // Get all parsedTickets ticketsCache = await Ticket.find({}, { address: 1, @@ -1196,7 +1116,7 @@ const handleSync = async function () { blockTime: 1, _id: 0 }); - console.log("Retrieved existing Tickets of size " + ticketsCache.length); + console.log("Retrieved existing raw Tickets of size " + ticketsCache.length); // Then determine latest block number parsed based on collection for (var idx = 0; idx < eventsCache.length; idx++) { const thisBlock = eventsCache[idx]; @@ -1213,51 +1133,213 @@ const handleSync = async function () { } } console.log("Latest Ticket block parsed is " + lastBlockTickets); - // Get latest block in chain - const latestBlock = await web3layer2.eth.getBlockNumber(); - if (latestBlock > latestBlockInChain) { - latestBlockInChain = latestBlock; + // Get all parsed update events and cache them + updateEventCache = await UpdateEvent.find({}, { + address: 1, + rewardCommission: 1, + feeCommission: 1, + transactionHash: 1, + blockNumber: 1, + blockTime: 1, + _id: 0 + }); + // Get all parsed reward events and cache them + rewardEventCache = await RewardEvent.find({}, { + address: 1, + amount: 1, + transactionHash: 1, + blockNumber: 1, + blockTime: 1, + _id: 0 + }); + // Get all parsed claim events and cache them + claimEventCache = await ClaimEvent.find({}, { + address: 1, + fees: 1, + rewards: 1, + transactionHash: 1, + blockNumber: 1, + blockTime: 1, + _id: 0 + }); + // Get all parsed withdraw fees events and cache them + withdrawFeesEventCache = await WithdrawFeesEvent.find({}, { + address: 1, + amount: 1, + transactionHash: 1, + blockNumber: 1, + blockTime: 1, + _id: 0 + }); + // Get all parsed withdraw stake events and cache them + withdrawStakeEventCache = await WithdrawStakeEvent.find({}, { + address: 1, + round: 1, + amount: 1, + transactionHash: 1, + blockNumber: 1, + blockTime: 1, + _id: 0 + }); + // Get all parsed transfer winning ticket events and cache them + transferTicketEventCache = await TransferEvent.find({}, { + address: 1, + to: 1, + amount: 1, + transactionHash: 1, + blockNumber: 1, + blockTime: 1, + _id: 0 + }); + // Get all parsed redeem winning ticket events and cache them + redeemTicketEventCache = await RedeemEvent.find({}, { + address: 1, + amount: 1, + transactionHash: 1, + blockNumber: 1, + blockTime: 1, + _id: 0 + }); + // Get all parsed orchestrator activation events and cache them + activateEventCache = await ActivateEvent.find({}, { + address: 1, + initialStake: 1, + round: 1, + transactionHash: 1, + blockNumber: 1, + blockTime: 1, + _id: 0 + }); + // Get all parsed unbond events and cache them + unbondEventCache = await UnbondEvent.find({}, { + address: 1, + from: 1, + stake: 1, + round: 1, + transactionHash: 1, + blockNumber: 1, + blockTime: 1, + _id: 0 + }); + // Get all parsed stake events and cache them + stakeEventCache = await StakeEvent.find({}, { + address: 1, + from: 1, + to: 1, + stake: 1, + transactionHash: 1, + blockNumber: 1, + blockTime: 1, + _id: 0 + }); + // Get all parsed monthly stats and cache them + monthlyStatCache = await MonthlyStat.find({}, { + year: 1, + month: 1, + reactivationCount: 1, + activationCount: 1, + activationInitialSum: 1, + unbondCount: 1, + unbondStakeSum: 1, + rewardCount: 1, + rewardAmountSum: 1, + claimCount: 1, + claimRewardSum: 1, + claimFeeSum: 1, + withdrawStakeCount: 1, + withdrawStakeAmountSum: 1, + withdrawFeesCount: 1, + withdrawFeesAmountSum: 1, + bondCount: 1, + bondStakeSum: 1, + moveStakeCount: 1, + moveStakeSum: 1, + winningTicketsReceivedCount: 1, + winningTicketsReceivedSum: 1, + winningTicketsReceived: 1, + winningTicketsSent: 1, + winningTicketsRedeemedCount: 1, + winningTicketsRedeemedSum: 1, + winningTicketsRedeemed: 1, + latestCommission: 1, + latestTotalStake: 1, + testScores: 1, + _id: 0 + }); +} + +// Does the actual looping over last parsed block -> latest block in chain +const handleSync = async function () { + if (!CONF_DISABLE_DB && !startedInitSync) { + console.log("Preloading all the things from the database"); + await initSync(); } - console.log("Latest L2 Eth block is " + latestBlockInChain); - console.log("Needs to sync " + (latestBlockInChain - lastBlockEvents) + " blocks for Events sync"); - console.log("Needs to sync " + (latestBlockInChain - lastBlockTickets) + " blocks for Tickets sync"); - syncTickets(); - syncEvents(); - while (isEventSyncing || isTicketSyncing) { - await sleep(3000); - if (isEventSyncing) { - console.log("Parsed " + lastBlockEvents + " out of " + latestBlockInChain + " blocks for Event sync"); + isSyncing = true; + while (true) { + // Get latest block in chain + const latestBlock = await web3layer2.eth.getBlockNumber(); + if (latestBlock > latestBlockInChain) { + latestBlockInChain = latestBlock; + console.log("Latest L2 Eth block changed to " + latestBlockInChain); + } else { + // If there are no new blocks, wait for 10 seconds before retrying + await sleep(10000); + continue; } - if (isTicketSyncing) { - console.log("Parsed " + lastBlockTickets + " out of " + latestBlockInChain + " blocks for Ticket sync"); + console.log("Needs to sync " + (latestBlockInChain - lastBlockEvents) + " blocks for Events sync"); + console.log("Needs to sync " + (latestBlockInChain - lastBlockTickets) + " blocks for Tickets sync"); + // Batch requests when sync is large, mark if we are going to reach latestBlockInChain in this round + let getFinalTickets = false; + let toTickets = 'latest'; + if (latestBlock - lastBlockTickets > 100000) { + toTickets = lastBlockTickets + 100000; + } else { + getFinalTickets = true; } - } - while (syncCache.length || ticketsSyncCache.length) { - const liveEvents = syncCache.slice(); - syncCache = []; - for (const eventObj of liveEvents) { - console.log("Parsing event received while syncing"); - if (!CONF_DISABLE_DB) { - const dbObj = new Event(eventObj); - await dbObj.save(); + let getFinalEvents = false; + let toEvents = 'latest'; + if (latestBlock - lastBlockEvents > 100000) { + toEvents = lastBlockEvents + 100000; + } else { + getFinalEvents = true; + } + // Start initial sync for this sync round + syncTickets(toTickets); + syncEvents(toEvents); + // Then loop until we have reached the last known block + while (isEventSyncing || isTicketSyncing || !getFinalTickets || !getFinalEvents) { + await sleep(500); + if (isEventSyncing) { + console.log("Parsed " + lastBlockEvents + " out of " + latestBlockInChain + " blocks for Event sync"); + } else if (!getFinalEvents) { + // Start next batch for events + toEvents = 'latest'; + if (latestBlock - lastBlockEvents > 100000) { + toEvents = lastBlockEvents + 100000; + } else { + getFinalEvents = true; + } + syncEvents(toEvents); } - eventsCache.push(eventObj); - } - const liveTickets = ticketsSyncCache.slice(); - ticketsSyncCache = []; - for (const eventObj of liveTickets) { - console.log("Parsing ticket received while syncing"); - if (!CONF_DISABLE_DB) { - const dbObj = new Ticket(eventObj); - await dbObj.save(); + if (isTicketSyncing) { + console.log("Parsed " + lastBlockTickets + " out of " + latestBlockInChain + " blocks for Ticket sync"); + } else if (!getFinalTickets) { + // Start next batch for tickets + toTickets = 'latest'; + if (latestBlock - lastBlockTickets > 100000) { + toTickets = lastBlockTickets + 100000; + } else { + getFinalTickets = true; + } + syncTickets(toTickets); } - ticketsCache.push(eventObj); } } console.log('done syncing') isSyncing = false; }; -if (!isEventSyncing && !CONF_SIMPLE_MODE && !CONF_DISABLE_SYNC) { +if (!isSyncing && !CONF_SIMPLE_MODE) { + console.log("Starting sync process"); handleSync(); } @@ -1298,6 +1380,7 @@ const parseCmc = async function () { if (!cmcEnabled) { return; } + console.log("Getting new CMC data"); cmcCache = await cmcClient.getTickers({ limit: 200 }); for (var idx = 0; idx < cmcCache.data.length; idx++) { const coinData = cmcCache.data[idx]; @@ -1404,6 +1487,7 @@ const parseL2Blockchain = async function () { serviceUriFeeCostL2 = (serviceUriFee * l2Gwei) / 1000000000; } const parseEthBlockchain = async function () { + console.log("Getting new blockchain data"); await Promise.all([parseL1Blockchain(), parseL2Blockchain()]); } @@ -1451,6 +1535,7 @@ Latest commission and totalStake stored in mongoDB (monthlyStat.js) and all in l let orchestratorCache = []; const mutateNewCommissionRates = async function (address, feeCommission, rewardCommission) { + console.log("Found new commission rates for " + address); const dateObj = new Date(); const now = dateObj.getTime(); const thisMonth = dateObj.getMonth(); @@ -1484,14 +1569,12 @@ const mutateNewCommissionRates = async function (address, feeCommission, rewardC await MonthlyStat.updateOne({ year: thisYear, month: thisMonth, - 'latestCommission.address': { '$ne': address } + 'latestCommission.address': address }, { $set: { - 'latestCommission': { - feeCommission: feeCommission, - rewardCommission: rewardCommission, - timestamp: now - } + 'latestCommission.$.feeCommission': feeCommission, + 'latestCommission.$.rewardCommission': rewardCommission, + 'latestCommission.$.timestamp': now } }); hasModified = true; @@ -1518,6 +1601,7 @@ const mutateNewCommissionRates = async function (address, feeCommission, rewardC } const mutateNewGlobalStake = async function (address, globalStake) { + console.log("Found new total stake for " + address); const dateObj = new Date(); const now = dateObj.getTime(); const thisMonth = dateObj.getMonth(); @@ -1547,13 +1631,11 @@ const mutateNewGlobalStake = async function (address, globalStake) { await MonthlyStat.updateOne({ year: thisYear, month: thisMonth, - 'latestTotalStake.address': { '$ne': address } + 'latestTotalStake.address': address }, { $set: { - 'latestTotalStake': { - totalStake: globalStake, - timestamp: now - } + 'latestTotalStake.$.totalStake': globalStake, + 'latestTotalStake.$.timestamp': now } }); hasModified = true; @@ -1636,6 +1718,7 @@ const mutateDynamicStatsFromCache = async function (oldOrchestratorObj, newOrche // Gets info on a given Orchestrator const parseOrchestrator = async function (reqAddr) { + console.log("Getting orchestrator data from thegraph for " + reqAddr); try { reqAddr = reqAddr.toLowerCase(); const now = new Date().getTime(); @@ -1790,6 +1873,7 @@ let delegatorCache = []; // Gets info on a given Delegator const parseDelegator = async function (reqAddr) { + console.log("Getting delegator data from thegraph for " + reqAddr); reqAddr = reqAddr.toLowerCase(); const now = new Date().getTime(); // Default assume it's the first time we request this Orchestrator @@ -2110,6 +2194,7 @@ let ensDomainCache = []; let ensInfoCache = []; const getEnsDomain = async function (addr) { + console.log("Getting ENS data for " + addr); const now = new Date().getTime(); let wasInCache = false; // See if it is cached @@ -2154,6 +2239,7 @@ const getEnsDomain = async function (addr) { } const getEnsInfo = async function (addr) { + console.log("Getting ENS info for " + addr); const now = new Date().getTime(); let wasInCache = false; // See if it is cached @@ -2236,6 +2322,7 @@ Only stored in local cache let threeboxCache = []; const getThreeBoxInfo = async function (addr) { + console.log("Getting 3box data for " + addr); const now = new Date().getTime(); // See if it is cached for (const thisAddr of threeboxCache) { @@ -2304,7 +2391,7 @@ const mutateTestScoresToDB = async function (scoreObj, month, year) { const thisMonth = dateObj.getMonth(); const thisYear = dateObj.getFullYear(); // If the test stream result is not in the past, return immediately - if (thisYear == year && thisMonth == month){ + if (thisYear == year && thisMonth == month) { return; } // Immediately mutate Monthly statistics object