blob: 287720b8b2c1f62d6e70e68fb50398a7bf0dd55a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tuweni.jsonrpc.downloader
import com.fasterxml.jackson.core.type.TypeReference
import com.fasterxml.jackson.databind.ObjectMapper
import io.vertx.core.Vertx
import io.vertx.core.VertxOptions
import io.vertx.core.buffer.Buffer
import io.vertx.kotlin.coroutines.await
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.asCoroutineDispatcher
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.joinAll
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.apache.tuweni.app.commons.ApplicationUtils
import org.apache.tuweni.jsonrpc.JSONRPCClient
import org.bouncycastle.jce.provider.BouncyCastleProvider
import org.slf4j.LoggerFactory
import java.io.IOException
import java.lang.Exception
import java.lang.Integer.max
import java.lang.Integer.min
import java.nio.file.Files
import java.nio.file.Path
import java.nio.file.Paths
import java.security.Security
import java.util.concurrent.Executors
import kotlin.coroutines.CoroutineContext
import kotlin.math.round
import kotlin.system.exitProcess
val logger = LoggerFactory.getLogger(Downloader::class.java)
/**
* Application downloading chain data from a JSON-RPC endpoint.
*/
object DownloaderApp {
@JvmStatic
fun main(args: Array<String>) {
runBlocking {
if (args.contains("--version")) {
println("Apache Tuweni JSON-RPC downloader ${ApplicationUtils.version}")
exitProcess(0)
}
if (args.contains("--help") || args.contains("-h")) {
println("USAGE: jsonrpc-downloader <config file>")
exitProcess(0)
}
ApplicationUtils.renderBanner("Loading JSON-RPC downloader")
Security.addProvider(BouncyCastleProvider())
val configFile = Paths.get(if (args.isNotEmpty()) args[0] else "config.toml")
Security.addProvider(BouncyCastleProvider())
val config = DownloaderConfig(configFile)
if (config.config.hasErrors()) {
for (error in config.config.errors()) {
println(error.message)
}
System.exit(1)
}
val vertx = Vertx.vertx(VertxOptions().setWorkerPoolSize(config.numberOfThreads()))
val pool = Executors.newFixedThreadPool(
config.numberOfThreads()
) {
val thread = Thread("downloader")
thread.isDaemon = true
thread
}
val downloader = Downloader(vertx, config, pool.asCoroutineDispatcher())
logger.info("Starting download")
try {
downloader.loopDownload()
} catch (e: Exception) {
logger.error("Fatal error downloading blocks", e)
exitProcess(1)
}
logger.info("Completed download")
vertx.close()
pool.shutdown()
}
}
}
class Downloader(val vertx: Vertx, val config: DownloaderConfig, override val coroutineContext: CoroutineContext) :
CoroutineScope {
val jsonRpcClient: JSONRPCClient
val objectMapper = ObjectMapper()
init {
jsonRpcClient = JSONRPCClient(vertx, config.url(), coroutineContext = this.coroutineContext)
}
suspend fun loopDownload() = coroutineScope {
val state = readInitialState()
val intervals = createMissingIntervals(state)
logger.info("Working with intervals $intervals")
val jobs = mutableListOf<Job>()
var length = 0
var completed = 0
for (interval in intervals) {
length += interval.last - interval.first
for (i in interval) {
val job = launch {
try {
val block = downloadBlock(i)
writeBlock(i, block)
} catch (e: Exception) {
logger.error("Error downloading block $i, aborting", e)
}
completed++
}
jobs.add(job)
}
}
launch {
while (completed < length) {
delay(5000)
logger.info("Progress ${round(completed * 100.0 * 100 / length) / 100}")
}
}
jobs.joinAll()
writeFinalState()
}
private suspend fun downloadBlock(blockNumber: Int): String {
val blockJson = jsonRpcClient.getBlockByBlockNumber(blockNumber, true)
return objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(blockJson)
}
private suspend fun writeBlock(blockNumber: Int, block: String) {
val filePath = Paths.get(config.outputPath(), "block-${blockNumber.toString().padStart(16, '0')}.json")
coroutineScope {
vertx.fileSystem().writeFile(filePath.toString(), Buffer.buffer(block)).await()
}
}
fun createMissingIntervals(state: DownloadState): List<IntRange> {
val intervals = mutableListOf<IntRange>()
if (config.start() < state.start) {
intervals.add(config.start()..min(state.start, config.end()))
}
if (state.end < config.end()) {
intervals.add(max(config.start(), state.end)..config.end())
}
return intervals
}
private fun readInitialState(): DownloadState {
// read the initial state
var initialState = DownloadState(0, 0)
try {
val str = Files.readString(Path.of(config.outputPath(), ".offset"))
initialState = objectMapper.readValue(str, object : TypeReference<DownloadState>() {})
} catch (e: IOException) {
// ignored
}
return initialState
}
private fun writeFinalState() {
val state = DownloadState(config.start(), config.end())
val json = objectMapper.writeValueAsString(state)
Files.writeString(Path.of(config.outputPath(), ".offset"), json)
}
}