mirror of
https://github.com/stronk-dev/LivepeerEvents.git
synced 2025-07-05 02:35:09 +02:00
Hopefully fix the infinite sync stopping after a few days. Else we go back to the WS push bases subscription
Prework on displaying round numbers for events
This commit is contained in:
parent
d513bb9427
commit
9e71c63582
@ -1412,76 +1412,80 @@ const initSync = async function () {
|
||||
});
|
||||
}
|
||||
|
||||
let cycle = 0;
|
||||
// Does the actual looping over last parsed block -> latest block in chain
|
||||
const handleSync = async function () {
|
||||
if (!CONF_DISABLE_DB && !startedInitSync) {
|
||||
console.log("Preloading all the things from the database");
|
||||
await initSync();
|
||||
}
|
||||
cycle++;
|
||||
console.log('Starting new sync cycle #' + cycle);
|
||||
isSyncing = true;
|
||||
while (true) {
|
||||
// Get latest block in chain
|
||||
const latestBlock = await web3layer2.eth.getBlockNumber();
|
||||
if (latestBlock > latestBlockInChain) {
|
||||
latestBlockInChain = latestBlock;
|
||||
console.log("Latest L2 Eth block changed to " + latestBlockInChain);
|
||||
} else {
|
||||
// If there are no new blocks, wait for 10 seconds before retrying
|
||||
console.log("No new blocks. Sleeping for 10 seconds...");
|
||||
await sleep(10000);
|
||||
continue;
|
||||
}
|
||||
console.log("Needs to sync " + (latestBlockInChain - lastBlockEvents) + " blocks for Events sync");
|
||||
console.log("Needs to sync " + (latestBlockInChain - lastBlockTickets) + " blocks for Tickets sync");
|
||||
// Batch requests when sync is large, mark if we are going to reach latestBlockInChain in this round
|
||||
let getFinalTickets = false;
|
||||
let toTickets = 'latest';
|
||||
if (latestBlock - lastBlockTickets > 1000000) {
|
||||
toTickets = lastBlockTickets + 1000000;
|
||||
} else {
|
||||
getFinalTickets = true;
|
||||
}
|
||||
let getFinalEvents = false;
|
||||
let toEvents = 'latest';
|
||||
if (latestBlock - lastBlockEvents > 1000000) {
|
||||
toEvents = lastBlockEvents + 1000000;
|
||||
} else {
|
||||
getFinalEvents = true;
|
||||
}
|
||||
// Start initial sync for this sync round
|
||||
syncTickets(toTickets);
|
||||
syncEvents(toEvents);
|
||||
// Then loop until we have reached the last known block
|
||||
while (isEventSyncing || isTicketSyncing || !getFinalTickets || !getFinalEvents) {
|
||||
await sleep(500);
|
||||
if (isEventSyncing) {
|
||||
console.log("Parsed " + lastBlockEvents + " out of " + latestBlockInChain + " blocks for Event sync");
|
||||
} else if (!getFinalEvents) {
|
||||
// Start next batch for events
|
||||
toEvents = 'latest';
|
||||
if (latestBlock - lastBlockEvents > 1000000) {
|
||||
toEvents = lastBlockEvents + 1000000;
|
||||
} else {
|
||||
getFinalEvents = true;
|
||||
}
|
||||
syncEvents(toEvents);
|
||||
// Get latest block in chain
|
||||
const latestBlock = await web3layer2.eth.getBlockNumber();
|
||||
if (latestBlock > latestBlockInChain) {
|
||||
latestBlockInChain = latestBlock;
|
||||
console.log("Latest L2 Eth block changed to " + latestBlockInChain);
|
||||
} else {
|
||||
// If there are no new blocks, wait for 10 seconds before retrying
|
||||
console.log("No new blocks. Sleeping for 10 seconds...");
|
||||
setTimeout(() => {
|
||||
handleSync();
|
||||
}, 10000)
|
||||
}
|
||||
console.log("Needs to sync " + (latestBlockInChain - lastBlockEvents) + " blocks for Events sync");
|
||||
console.log("Needs to sync " + (latestBlockInChain - lastBlockTickets) + " blocks for Tickets sync");
|
||||
// Batch requests when sync is large, mark if we are going to reach latestBlockInChain in this round
|
||||
let getFinalTickets = false;
|
||||
let toTickets = 'latest';
|
||||
if (latestBlock - lastBlockTickets > 1000000) {
|
||||
toTickets = lastBlockTickets + 1000000;
|
||||
} else {
|
||||
getFinalTickets = true;
|
||||
}
|
||||
let getFinalEvents = false;
|
||||
let toEvents = 'latest';
|
||||
if (latestBlock - lastBlockEvents > 1000000) {
|
||||
toEvents = lastBlockEvents + 1000000;
|
||||
} else {
|
||||
getFinalEvents = true;
|
||||
}
|
||||
// Start initial sync for this sync round
|
||||
syncTickets(toTickets);
|
||||
syncEvents(toEvents);
|
||||
// Then loop until we have reached the last known block
|
||||
while (isEventSyncing || isTicketSyncing || !getFinalTickets || !getFinalEvents) {
|
||||
await sleep(500);
|
||||
if (isEventSyncing) {
|
||||
console.log("Parsed " + lastBlockEvents + " out of " + latestBlockInChain + " blocks for Event sync");
|
||||
} else if (!getFinalEvents) {
|
||||
// Start next batch for events
|
||||
toEvents = 'latest';
|
||||
if (latestBlock - lastBlockEvents > 1000000) {
|
||||
toEvents = lastBlockEvents + 1000000;
|
||||
} else {
|
||||
getFinalEvents = true;
|
||||
}
|
||||
if (isTicketSyncing) {
|
||||
console.log("Parsed " + lastBlockTickets + " out of " + latestBlockInChain + " blocks for Ticket sync");
|
||||
} else if (!getFinalTickets) {
|
||||
// Start next batch for tickets
|
||||
toTickets = 'latest';
|
||||
if (latestBlock - lastBlockTickets > 1000000) {
|
||||
toTickets = lastBlockTickets + 1000000;
|
||||
} else {
|
||||
getFinalTickets = true;
|
||||
}
|
||||
syncTickets(toTickets);
|
||||
syncEvents(toEvents);
|
||||
}
|
||||
if (isTicketSyncing) {
|
||||
console.log("Parsed " + lastBlockTickets + " out of " + latestBlockInChain + " blocks for Ticket sync");
|
||||
} else if (!getFinalTickets) {
|
||||
// Start next batch for tickets
|
||||
toTickets = 'latest';
|
||||
if (latestBlock - lastBlockTickets > 1000000) {
|
||||
toTickets = lastBlockTickets + 1000000;
|
||||
} else {
|
||||
getFinalTickets = true;
|
||||
}
|
||||
syncTickets(toTickets);
|
||||
}
|
||||
}
|
||||
console.log('done syncing')
|
||||
isSyncing = false;
|
||||
setTimeout(() => {
|
||||
handleSync();
|
||||
}, 10000)
|
||||
};
|
||||
if (!isSyncing && !CONF_SIMPLE_MODE) {
|
||||
console.log("Starting sync process");
|
||||
@ -2654,8 +2658,9 @@ Mutates the Event in the database to contain the round number
|
||||
*/
|
||||
|
||||
let roundCache = [];
|
||||
let highestEndblockSeen = 0;
|
||||
|
||||
const mutateRoundToDB = async function (scoreObj) {
|
||||
const mutateRound = async function (scoreObj) {
|
||||
// Immediately mutate new object
|
||||
const doc = await MonthlyStat.findOneAndUpdate({
|
||||
year: year,
|
||||
@ -2666,7 +2671,7 @@ const mutateRoundToDB = async function (scoreObj) {
|
||||
upsert: true,
|
||||
new: true,
|
||||
setDefaultsOnInsert: true
|
||||
});
|
||||
}); k
|
||||
// Then find and mutate all Event objects which fall in this round
|
||||
}
|
||||
|
||||
@ -2674,25 +2679,21 @@ apiRouter.post("/getRoundAtBlock", async (req, res) => {
|
||||
try {
|
||||
const { blockNumber } = req.body;
|
||||
if (blockNumber) {
|
||||
// Since months get counted starting at 0
|
||||
const now = new Date().getTime();
|
||||
let wasInCache = false;
|
||||
// See if it is cached
|
||||
for (const thisAddr of orchScoreCache) {
|
||||
if (thisAddr.year === year && thisAddr.month === month) {
|
||||
// Check timeout
|
||||
if (now - thisAddr.timestamp < 360000) {
|
||||
res.send(thisAddr);
|
||||
return;
|
||||
for (const thisRound of roundCache) {
|
||||
if (thisRound.startBlock <= blockNumber && thisRound.endBlock >= blockNumber) {
|
||||
if (thisRound.endBlock > highestEndblockSeen) {
|
||||
|
||||
}
|
||||
wasInCache = true;
|
||||
res.send(thisAddr);
|
||||
}
|
||||
}
|
||||
// Check DB
|
||||
// If exists, mutate cache and return value
|
||||
// Get from thegraph
|
||||
console.log("Getting new Orchestrator scores for " + year + "-" + month + " @ " + url);
|
||||
// Save to DB
|
||||
mutateRoundToDB();
|
||||
mutateRound();
|
||||
}
|
||||
} catch (err) {
|
||||
console.log(err);
|
||||
|
Loading…
x
Reference in New Issue
Block a user