[SPARK-52376] Support `addArtifact(s)?` in `SparkSession`
### What changes were proposed in this pull request?
This PR aims to support the following APIs in `SparkSession`.
- `addArtifact(_ path: String)`
- `addArtifact(_ url: URL)`
- `addArtifacts(_ url: URL...)`
### Why are the changes needed?
For feature parity.
### Does this PR introduce _any_ user-facing change?
No behavior change.
### How was this patch tested?
Pass the CIs.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #193 from dongjoon-hyun/SPARK-52376.
Authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
diff --git a/Sources/SparkConnect/SparkConnectClient.swift b/Sources/SparkConnect/SparkConnectClient.swift
index 03a6ffe..c1c9bd1 100644
--- a/Sources/SparkConnect/SparkConnectClient.swift
+++ b/Sources/SparkConnect/SparkConnectClient.swift
@@ -735,6 +735,39 @@
return plan
}
+ func addArtifact(_ url: URL) async throws {
+ guard url.lastPathComponent.hasSuffix(".jar") else {
+ throw SparkConnectError.InvalidArgument
+ }
+
+ let JAR_PREFIX = "jars"
+ let name = "\(JAR_PREFIX)/" + url.lastPathComponent
+
+ try await withGPRC { client in
+ let service = SparkConnectService.Client(wrapping: client)
+
+ var chunk = Spark_Connect_AddArtifactsRequest.ArtifactChunk()
+ chunk.data = try Data(contentsOf: url)
+ chunk.crc = Int64(CRC32.checksum(data: chunk.data))
+
+ var singleChunk = Spark_Connect_AddArtifactsRequest.SingleChunkArtifact()
+ singleChunk.name = name
+ singleChunk.data = chunk
+ var batch = Spark_Connect_AddArtifactsRequest.Batch()
+ batch.artifacts.append(singleChunk)
+
+ var addArtifactsRequest = Spark_Connect_AddArtifactsRequest()
+ addArtifactsRequest.sessionID = self.sessionID!
+ addArtifactsRequest.userContext = self.userContext
+ addArtifactsRequest.clientType = self.clientType
+ addArtifactsRequest.batch = batch
+ let request = addArtifactsRequest
+ _ = try await service.addArtifacts(request: StreamingClientRequest<Spark_Connect_AddArtifactsRequest> { x in
+ try await x.write(contentsOf: [request])
+ })
+ }
+ }
+
/// Add a tag to be assigned to all the operations started by this thread in this session.
/// - Parameter tag: The tag to be added. Cannot contain ',' (comma) character or be an empty string.
public func addTag(tag: String) throws {
diff --git a/Sources/SparkConnect/SparkSession.swift b/Sources/SparkConnect/SparkSession.swift
index c7a8a27..7e7326c 100644
--- a/Sources/SparkConnect/SparkSession.swift
+++ b/Sources/SparkConnect/SparkSession.swift
@@ -267,6 +267,36 @@
return await read.table(tableName)
}
+ /// Add a single artifact to the current session.
+ /// Currently only local files with extensions .jar supported.
+ /// - Parameter url: A url to the artifact
+ public func addArtifact(_ url: URL) async throws {
+ try await self.client.addArtifact(url)
+ }
+
+ /// Add a single artifact to the current session.
+ /// Currently only local files with extensions .jar are supported.
+ /// - Parameter path: A path to the file.
+ public func addArtifact(_ path: String) async throws {
+ try await self.client.addArtifact(URL(fileURLWithPath: path))
+ }
+
+ /// Add one or more artifacts to the session.
+ /// - Parameter url: One or more URLs
+ public func addArtifacts(_ url: URL...) async throws {
+ for u in url {
+ try await self.client.addArtifact(u)
+ }
+ }
+
+ /// Execute an arbitrary string command inside an external execution engine rather than Spark.
+ /// This could be useful when user wants to execute some commands out of Spark. For example,
+ /// executing custom DDL/DML command for JDBC, creating index for ElasticSearch, creating cores
+ /// for Solr and so on.
+ /// - Parameters:
+ /// - runner: The class name of the runner that implements `ExternalCommandRunner`.
+ /// - command: The target command to be executed
+ /// - options: The options for the runner.
public func executeCommand(_ runner: String, _ command: String, _ options: [String: String])
async throws -> DataFrame
{
diff --git a/Tests/SparkConnectTests/SparkSessionTests.swift b/Tests/SparkConnectTests/SparkSessionTests.swift
index deece09..1b4a658 100644
--- a/Tests/SparkConnectTests/SparkSessionTests.swift
+++ b/Tests/SparkConnectTests/SparkSessionTests.swift
@@ -143,6 +143,49 @@
}
@Test
+ func addInvalidArtifact() async throws {
+ await SparkSession.builder.clear()
+ let spark = try await SparkSession.builder.getOrCreate()
+ await #expect(throws: SparkConnectError.InvalidArgument) {
+ try await spark.addArtifact("x.txt")
+ }
+ await spark.stop()
+ }
+
+ @Test
+ func addArtifact() async throws {
+ let fm = FileManager()
+ let path = "my.jar"
+ let url = URL(fileURLWithPath: path)
+
+ await SparkSession.builder.clear()
+ let spark = try await SparkSession.builder.getOrCreate()
+ #expect(fm.createFile(atPath: path, contents: "abc".data(using: .utf8)))
+ if await spark.version.starts(with: "4.") {
+ try await spark.addArtifact(path)
+ try await spark.addArtifact(url)
+ }
+ try fm.removeItem(atPath: path)
+ await spark.stop()
+ }
+
+ @Test
+ func addArtifacts() async throws {
+ let fm = FileManager()
+ let path = "my.jar"
+ let url = URL(fileURLWithPath: path)
+
+ await SparkSession.builder.clear()
+ let spark = try await SparkSession.builder.getOrCreate()
+ #expect(fm.createFile(atPath: path, contents: "abc".data(using: .utf8)))
+ if await spark.version.starts(with: "4.") {
+ try await spark.addArtifacts(url, url)
+ }
+ try fm.removeItem(atPath: path)
+ await spark.stop()
+ }
+
+ @Test
func executeCommand() async throws {
await SparkSession.builder.clear()
let spark = try await SparkSession.builder.getOrCreate()