Add new module, JSON-RPC downloader
diff --git a/jsonrpc-downloader/README.md b/jsonrpc-downloader/README.md
new file mode 100644
index 0000000..a8753b8
--- /dev/null
+++ b/jsonrpc-downloader/README.md
@@ -0,0 +1,20 @@
+<!---
+Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+License. You may obtain a copy of the License at
+ *
+http://www.apache.org/licenses/LICENSE-2.0
+ *
+Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+specific language governing permissions and limitations under the License.
+ --->
+# JSONRPC downloader
+
+| Status | |
+|----------------|---------------|
+| Stability | [development] |
+| Component Type | [application] |
+
+The `jsonrpc downloader` application downloads blocks and transactions from a JSON-RPC endpoint and stores them in files.
diff --git a/jsonrpc-downloader/build.gradle b/jsonrpc-downloader/build.gradle
new file mode 100644
index 0000000..e056252
--- /dev/null
+++ b/jsonrpc-downloader/build.gradle
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+plugins { id 'application' }
+
+description = 'Ethereum JSON-RPC downloader'
+
+dependencies {
+ implementation project(':app-commons')
+ implementation project(':bytes')
+ implementation project(':config')
+ implementation project(':crypto')
+ implementation project(':concurrent')
+ implementation project(':eth')
+ implementation project(':jsonrpc')
+ implementation project(':kv')
+ implementation project(':metrics')
+ implementation project(':net')
+ implementation project(':units')
+
+ implementation 'com.fasterxml.jackson.core:jackson-databind'
+ implementation 'org.bouncycastle:bcprov-jdk15on'
+ implementation 'io.vertx:vertx-core'
+ implementation 'io.vertx:vertx-lang-kotlin-coroutines'
+ implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core'
+ implementation 'org.slf4j:slf4j-api'
+ implementation 'org.jetbrains.kotlin:kotlin-stdlib'
+ implementation 'org.postgresql:postgresql'
+ implementation 'javax.xml.bind:jaxb-api'
+
+ testImplementation project(':junit')
+ testImplementation 'org.junit.jupiter:junit-jupiter-api'
+ testImplementation 'org.junit.jupiter:junit-jupiter-params'
+
+ testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine'
+
+ runtimeOnly 'ch.qos.logback:logback-classic'
+}
+
+application {
+ mainClassName = 'org.apache.tuweni.jsonrpc.downloader.DownloaderApp'
+ applicationName = 'jsonrpc-downloader'
+}
diff --git a/jsonrpc-downloader/src/main/kotlin/org/apache/tuweni/jsonrpc/downloader/DownloadState.kt b/jsonrpc-downloader/src/main/kotlin/org/apache/tuweni/jsonrpc/downloader/DownloadState.kt
new file mode 100644
index 0000000..8b45fa0
--- /dev/null
+++ b/jsonrpc-downloader/src/main/kotlin/org/apache/tuweni/jsonrpc/downloader/DownloadState.kt
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tuweni.jsonrpc.downloader
+
+data class DownloadState(val start: Int, val end: Int)
diff --git a/jsonrpc-downloader/src/main/kotlin/org/apache/tuweni/jsonrpc/downloader/Downloader.kt b/jsonrpc-downloader/src/main/kotlin/org/apache/tuweni/jsonrpc/downloader/Downloader.kt
new file mode 100644
index 0000000..287720b
--- /dev/null
+++ b/jsonrpc-downloader/src/main/kotlin/org/apache/tuweni/jsonrpc/downloader/Downloader.kt
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tuweni.jsonrpc.downloader
+
+import com.fasterxml.jackson.core.type.TypeReference
+import com.fasterxml.jackson.databind.ObjectMapper
+import io.vertx.core.Vertx
+import io.vertx.core.VertxOptions
+import io.vertx.core.buffer.Buffer
+import io.vertx.kotlin.coroutines.await
+import kotlinx.coroutines.CoroutineScope
+import kotlinx.coroutines.Job
+import kotlinx.coroutines.asCoroutineDispatcher
+import kotlinx.coroutines.coroutineScope
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.joinAll
+import kotlinx.coroutines.launch
+import kotlinx.coroutines.runBlocking
+import org.apache.tuweni.app.commons.ApplicationUtils
+import org.apache.tuweni.jsonrpc.JSONRPCClient
+import org.bouncycastle.jce.provider.BouncyCastleProvider
+import org.slf4j.LoggerFactory
+import java.io.IOException
+import java.lang.Exception
+import java.lang.Integer.max
+import java.lang.Integer.min
+import java.nio.file.Files
+import java.nio.file.Path
+import java.nio.file.Paths
+import java.security.Security
+import java.util.concurrent.Executors
+import kotlin.coroutines.CoroutineContext
+import kotlin.math.round
+import kotlin.system.exitProcess
+
+val logger = LoggerFactory.getLogger(Downloader::class.java)
+
+/**
+ * Application downloading chain data from a JSON-RPC endpoint.
+ */
+object DownloaderApp {
+
+ @JvmStatic
+ fun main(args: Array<String>) {
+ runBlocking {
+ if (args.contains("--version")) {
+ println("Apache Tuweni JSON-RPC downloader ${ApplicationUtils.version}")
+ exitProcess(0)
+ }
+ if (args.contains("--help") || args.contains("-h")) {
+ println("USAGE: jsonrpc-downloader <config file>")
+ exitProcess(0)
+ }
+ ApplicationUtils.renderBanner("Loading JSON-RPC downloader")
+ Security.addProvider(BouncyCastleProvider())
+ val configFile = Paths.get(if (args.isNotEmpty()) args[0] else "config.toml")
+ Security.addProvider(BouncyCastleProvider())
+
+ val config = DownloaderConfig(configFile)
+ if (config.config.hasErrors()) {
+ for (error in config.config.errors()) {
+ println(error.message)
+ }
+ System.exit(1)
+ }
+ val vertx = Vertx.vertx(VertxOptions().setWorkerPoolSize(config.numberOfThreads()))
+ val pool = Executors.newFixedThreadPool(
+ config.numberOfThreads()
+ ) {
+ val thread = Thread("downloader")
+ thread.isDaemon = true
+ thread
+ }
+ val downloader = Downloader(vertx, config, pool.asCoroutineDispatcher())
+ logger.info("Starting download")
+ try {
+ downloader.loopDownload()
+ } catch (e: Exception) {
+ logger.error("Fatal error downloading blocks", e)
+ exitProcess(1)
+ }
+ logger.info("Completed download")
+
+ vertx.close()
+ pool.shutdown()
+ }
+ }
+}
+
+class Downloader(val vertx: Vertx, val config: DownloaderConfig, override val coroutineContext: CoroutineContext) :
+ CoroutineScope {
+
+ val jsonRpcClient: JSONRPCClient
+ val objectMapper = ObjectMapper()
+
+ init {
+ jsonRpcClient = JSONRPCClient(vertx, config.url(), coroutineContext = this.coroutineContext)
+ }
+
+ suspend fun loopDownload() = coroutineScope {
+ val state = readInitialState()
+ val intervals = createMissingIntervals(state)
+ logger.info("Working with intervals $intervals")
+ val jobs = mutableListOf<Job>()
+ var length = 0
+ var completed = 0
+ for (interval in intervals) {
+ length += interval.last - interval.first
+ for (i in interval) {
+ val job = launch {
+ try {
+ val block = downloadBlock(i)
+ writeBlock(i, block)
+ } catch (e: Exception) {
+ logger.error("Error downloading block $i, aborting", e)
+ }
+ completed++
+ }
+ jobs.add(job)
+ }
+ }
+ launch {
+ while (completed < length) {
+ delay(5000)
+ logger.info("Progress ${round(completed * 100.0 * 100 / length) / 100}")
+ }
+ }
+ jobs.joinAll()
+ writeFinalState()
+ }
+
+ private suspend fun downloadBlock(blockNumber: Int): String {
+ val blockJson = jsonRpcClient.getBlockByBlockNumber(blockNumber, true)
+ return objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(blockJson)
+ }
+
+ private suspend fun writeBlock(blockNumber: Int, block: String) {
+ val filePath = Paths.get(config.outputPath(), "block-${blockNumber.toString().padStart(16, '0')}.json")
+ coroutineScope {
+ vertx.fileSystem().writeFile(filePath.toString(), Buffer.buffer(block)).await()
+ }
+ }
+
+ fun createMissingIntervals(state: DownloadState): List<IntRange> {
+ val intervals = mutableListOf<IntRange>()
+ if (config.start() < state.start) {
+ intervals.add(config.start()..min(state.start, config.end()))
+ }
+ if (state.end < config.end()) {
+ intervals.add(max(config.start(), state.end)..config.end())
+ }
+
+ return intervals
+ }
+
+ private fun readInitialState(): DownloadState {
+ // read the initial state
+ var initialState = DownloadState(0, 0)
+ try {
+ val str = Files.readString(Path.of(config.outputPath(), ".offset"))
+ initialState = objectMapper.readValue(str, object : TypeReference<DownloadState>() {})
+ } catch (e: IOException) {
+ // ignored
+ }
+ return initialState
+ }
+
+ private fun writeFinalState() {
+ val state = DownloadState(config.start(), config.end())
+ val json = objectMapper.writeValueAsString(state)
+ Files.writeString(Path.of(config.outputPath(), ".offset"), json)
+ }
+}
diff --git a/jsonrpc-downloader/src/main/kotlin/org/apache/tuweni/jsonrpc/downloader/DownloaderConfig.kt b/jsonrpc-downloader/src/main/kotlin/org/apache/tuweni/jsonrpc/downloader/DownloaderConfig.kt
new file mode 100644
index 0000000..44c6f36
--- /dev/null
+++ b/jsonrpc-downloader/src/main/kotlin/org/apache/tuweni/jsonrpc/downloader/DownloaderConfig.kt
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tuweni.jsonrpc.downloader
+
+import org.apache.tuweni.config.Configuration
+import org.apache.tuweni.config.SchemaBuilder
+import java.nio.file.Path
+
+class DownloaderConfig(filePath: Path? = null, configContents: String? = null) {
+
+ companion object {
+
+ fun schema() = SchemaBuilder.create()
+ .addInteger("numberOfThreads", 10, "Number of threads for each thread pool", null)
+ .addString("outputPath", "", "Path to output block data", null)
+ .addInteger("start", 0, "First block to scrape", null)
+ .addInteger("end", null, "Last block to scrape. If unset, the scrape will continue to ask for new blocks", null)
+ .addString("url", null, "URL of the JSON-RPC service to query for information", null)
+ .toSchema()
+ }
+
+ val config = if (filePath != null) {
+ Configuration.fromToml(filePath, schema())
+ } else if (configContents != null) {
+ Configuration.fromToml(configContents)
+ } else {
+ Configuration.empty(schema())
+ }
+
+ fun numberOfThreads() = config.getInteger("numberOfThreads")
+ fun outputPath() = config.getString("outputPath")
+ fun start() = config.getInteger("start")
+ fun end() = config.getInteger("end")
+ fun url() = config.getString("url")
+}
diff --git a/jsonrpc-downloader/src/test/kotlin/org/apache/tuweni/jsonrpc/downloader/DownloaderTest.kt b/jsonrpc-downloader/src/test/kotlin/org/apache/tuweni/jsonrpc/downloader/DownloaderTest.kt
new file mode 100644
index 0000000..044d633
--- /dev/null
+++ b/jsonrpc-downloader/src/test/kotlin/org/apache/tuweni/jsonrpc/downloader/DownloaderTest.kt
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tuweni.jsonrpc.downloader
+
+import io.vertx.core.Vertx
+import kotlinx.coroutines.Dispatchers
+import org.apache.tuweni.junit.VertxExtension
+import org.apache.tuweni.junit.VertxInstance
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.extension.ExtendWith
+
+@ExtendWith(VertxExtension::class)
+class DownloaderTest {
+ @Test
+ fun testIntervals(@VertxInstance vertx: Vertx) {
+ val config = DownloaderConfig(
+ configContents = """
+ start=10
+ end=20
+ url="example.com"
+ """.trimIndent()
+ )
+ val downloader = Downloader(vertx, config, Dispatchers.Default)
+ var intervals = downloader.createMissingIntervals(DownloadState(0, 0))
+ assertEquals(1, intervals.size)
+ assertEquals(10..20, intervals[0])
+ intervals = downloader.createMissingIntervals(DownloadState(10, 20))
+ assertEquals(0, intervals.size)
+ intervals = downloader.createMissingIntervals(DownloadState(25, 40))
+ assertEquals(1, intervals.size)
+ assertEquals(10..20, intervals[0])
+ intervals = downloader.createMissingIntervals(DownloadState(5, 15))
+ assertEquals(1, intervals.size)
+ assertEquals(15..20, intervals[0])
+ intervals = downloader.createMissingIntervals(DownloadState(15, 25))
+ assertEquals(1, intervals.size)
+ assertEquals(10..15, intervals[0])
+ intervals = downloader.createMissingIntervals(DownloadState(12, 18))
+ assertEquals(2, intervals.size)
+ assertEquals(10..12, intervals[0])
+ assertEquals(18..20, intervals[1])
+ }
+}
diff --git a/settings.gradle b/settings.gradle
index fd46989..648e74d 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -43,6 +43,7 @@
include 'io'
include 'jsonrpc'
include 'jsonrpc-app'
+include 'jsonrpc-downloader'
include 'junit'
include 'kademlia'
include 'kv'