Skip to content

Commit 8b4133a

Browse files
committed
[SPARK-51934] Add MacOS integration test with Apache Spark 3.5.5
### What changes were proposed in this pull request? This PR aims to add `MacOS` integration test with Apache Spark 3.5.5. ### Why are the changes needed? To have a test coverage for Apache Spark 3.5.x. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass the CIs with the newly added `integration-test-mac-spark3` test pipeline. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #92 from dongjoon-hyun/SPARK-51934. Authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent cab692c commit 8b4133a

File tree

8 files changed

+83
-39
lines changed

8 files changed

+83
-39
lines changed

.github/workflows/build_and_test.yml

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,3 +118,24 @@ jobs:
118118
./start-connect-server.sh
119119
cd ../..
120120
swift test --no-parallel
121+
122+
integration-test-mac-spark3:
123+
runs-on: macos-15
124+
steps:
125+
- uses: actions/checkout@v4
126+
- uses: swift-actions/[email protected]
127+
with:
128+
swift-version: "6.1"
129+
- name: Install Java
130+
uses: actions/setup-java@v4
131+
with:
132+
distribution: zulu
133+
java-version: 17
134+
- name: Test
135+
run: |
136+
curl -LO https://downloads.apache.org/spark/spark-3.5.5/spark-3.5.5-bin-hadoop3.tgz
137+
tar xvfz spark-3.5.5-bin-hadoop3.tgz
138+
cd spark-3.5.5-bin-hadoop3/sbin
139+
./start-connect-server.sh --packages org.apache.spark:spark-connect_2.12:3.5.5
140+
cd ../..
141+
swift test --no-parallel

Tests/SparkConnectTests/DataFrameInternalTests.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ struct DataFrameInternalTests {
3232
#expect(rows.count == 1)
3333
#expect(rows[0].length == 1)
3434
#expect(
35-
try rows[0].get(0) as! String == """
35+
try (rows[0].get(0) as! String).trimmingCharacters(in: .whitespacesAndNewlines) == """
3636
+---+
3737
|id |
3838
+---+
@@ -73,7 +73,7 @@ struct DataFrameInternalTests {
7373
#expect(rows[0].length == 1)
7474
print(try rows[0].get(0) as! String)
7575
#expect(
76-
try rows[0].get(0) as! String == """
76+
try (rows[0].get(0) as! String).trimmingCharacters(in: .whitespacesAndNewlines) == """
7777
-RECORD 0--
7878
id | 0
7979
-RECORD 1--

Tests/SparkConnectTests/DataFrameReaderTests.swift

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -48,10 +48,12 @@ struct DataFrameReaderTests {
4848
@Test
4949
func xml() async throws {
5050
let spark = try await SparkSession.builder.getOrCreate()
51-
let path = "../examples/src/main/resources/people.xml"
52-
#expect(try await spark.read.option("rowTag", "person").format("xml").load(path).count() == 3)
53-
#expect(try await spark.read.option("rowTag", "person").xml(path).count() == 3)
54-
#expect(try await spark.read.option("rowTag", "person").xml(path, path).count() == 6)
51+
if await spark.version >= "4.0.0" {
52+
let path = "../examples/src/main/resources/people.xml"
53+
#expect(try await spark.read.option("rowTag", "person").format("xml").load(path).count() == 3)
54+
#expect(try await spark.read.option("rowTag", "person").xml(path).count() == 3)
55+
#expect(try await spark.read.option("rowTag", "person").xml(path, path).count() == 6)
56+
}
5557
await spark.stop()
5658
}
5759

@@ -80,7 +82,7 @@ struct DataFrameReaderTests {
8082
let tableName = "TABLE_" + UUID().uuidString.replacingOccurrences(of: "-", with: "")
8183
let spark = try await SparkSession.builder.getOrCreate()
8284
try await SQLHelper.withTable(spark, tableName)({
83-
_ = try await spark.sql("CREATE TABLE \(tableName) AS VALUES (1), (2)").count()
85+
_ = try await spark.sql("CREATE TABLE \(tableName) USING ORC AS VALUES (1), (2)").count()
8486
#expect(try await spark.read.table(tableName).count() == 2)
8587
})
8688
await spark.stop()

Tests/SparkConnectTests/DataFrameTests.swift

Lines changed: 30 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -69,16 +69,20 @@ struct DataFrameTests {
6969
let spark = try await SparkSession.builder.getOrCreate()
7070

7171
let schema1 = try await spark.sql("SELECT 'a' as col1").schema
72-
#expect(
73-
schema1
74-
== #"{"struct":{"fields":[{"name":"col1","dataType":{"string":{"collation":"UTF8_BINARY"}}}]}}"#
75-
)
72+
let answer1 = if await spark.version.starts(with: "4.") {
73+
#"{"struct":{"fields":[{"name":"col1","dataType":{"string":{"collation":"UTF8_BINARY"}}}]}}"#
74+
} else {
75+
#"{"struct":{"fields":[{"name":"col1","dataType":{"string":{}}}]}}"#
76+
}
77+
#expect(schema1 == answer1)
7678

7779
let schema2 = try await spark.sql("SELECT 'a' as col1, 'b' as col2").schema
78-
#expect(
79-
schema2
80-
== #"{"struct":{"fields":[{"name":"col1","dataType":{"string":{"collation":"UTF8_BINARY"}}},{"name":"col2","dataType":{"string":{"collation":"UTF8_BINARY"}}}]}}"#
81-
)
80+
let answer2 = if await spark.version.starts(with: "4.") {
81+
#"{"struct":{"fields":[{"name":"col1","dataType":{"string":{"collation":"UTF8_BINARY"}}},{"name":"col2","dataType":{"string":{"collation":"UTF8_BINARY"}}}]}}"#
82+
} else {
83+
#"{"struct":{"fields":[{"name":"col1","dataType":{"string":{}}},{"name":"col2","dataType":{"string":{}}}]}}"#
84+
}
85+
#expect(schema2 == answer2)
8286

8387
let emptySchema = try await spark.sql("DROP TABLE IF EXISTS nonexistent").schema
8488
#expect(emptySchema == #"{"struct":{}}"#)
@@ -319,11 +323,12 @@ struct DataFrameTests {
319323
let spark = try await SparkSession.builder.getOrCreate()
320324
#expect(try await spark.sql("DROP TABLE IF EXISTS t").count() == 0)
321325
#expect(try await spark.sql("SHOW TABLES").count() == 0)
322-
#expect(try await spark.sql("CREATE TABLE IF NOT EXISTS t(a INT)").count() == 0)
326+
#expect(try await spark.sql("CREATE TABLE IF NOT EXISTS t(a INT) USING ORC").count() == 0)
323327
#expect(try await spark.sql("SHOW TABLES").count() == 1)
324328
#expect(try await spark.sql("SELECT * FROM t").count() == 0)
325329
#expect(try await spark.sql("INSERT INTO t VALUES (1), (2), (3)").count() == 0)
326330
#expect(try await spark.sql("SELECT * FROM t").count() == 3)
331+
#expect(try await spark.sql("DROP TABLE IF EXISTS t").count() == 0)
327332
await spark.stop()
328333
}
329334

@@ -482,20 +487,22 @@ struct DataFrameTests {
482487
@Test
483488
func lateralJoin() async throws {
484489
let spark = try await SparkSession.builder.getOrCreate()
485-
let df1 = try await spark.sql("SELECT * FROM VALUES ('a', '1'), ('b', '2') AS T(a, b)")
486-
let df2 = try await spark.sql("SELECT * FROM VALUES ('c', '2'), ('d', '3') AS S(c, b)")
487-
let expectedCross = [
488-
Row("a", "1", "c", "2"),
489-
Row("a", "1", "d", "3"),
490-
Row("b", "2", "c", "2"),
491-
Row("b", "2", "d", "3"),
492-
]
493-
#expect(try await df1.lateralJoin(df2).collect() == expectedCross)
494-
#expect(try await df1.lateralJoin(df2, joinType: "inner").collect() == expectedCross)
495-
496-
let expected = [Row("b", "2", "c", "2")]
497-
#expect(try await df1.lateralJoin(df2, joinExprs: "T.b = S.b").collect() == expected)
498-
#expect(try await df1.lateralJoin(df2, joinExprs: "T.b = S.b", joinType: "inner").collect() == expected)
490+
if await spark.version.starts(with: "4.") {
491+
let df1 = try await spark.sql("SELECT * FROM VALUES ('a', '1'), ('b', '2') AS T(a, b)")
492+
let df2 = try await spark.sql("SELECT * FROM VALUES ('c', '2'), ('d', '3') AS S(c, b)")
493+
let expectedCross = [
494+
Row("a", "1", "c", "2"),
495+
Row("a", "1", "d", "3"),
496+
Row("b", "2", "c", "2"),
497+
Row("b", "2", "d", "3"),
498+
]
499+
#expect(try await df1.lateralJoin(df2).collect() == expectedCross)
500+
#expect(try await df1.lateralJoin(df2, joinType: "inner").collect() == expectedCross)
501+
502+
let expected = [Row("b", "2", "c", "2")]
503+
#expect(try await df1.lateralJoin(df2, joinExprs: "T.b = S.b").collect() == expected)
504+
#expect(try await df1.lateralJoin(df2, joinExprs: "T.b = S.b", joinType: "inner").collect() == expected)
505+
}
499506
await spark.stop()
500507
}
501508

Tests/SparkConnectTests/DataFrameWriterTests.swift

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,10 @@ struct DataFrameWriterTests {
4747
func xml() async throws {
4848
let tmpDir = "/tmp/" + UUID().uuidString
4949
let spark = try await SparkSession.builder.getOrCreate()
50-
try await spark.range(2025).write.option("rowTag", "person").xml(tmpDir)
51-
#expect(try await spark.read.option("rowTag", "person").xml(tmpDir).count() == 2025)
50+
if await spark.version >= "4.0.0" {
51+
try await spark.range(2025).write.option("rowTag", "person").xml(tmpDir)
52+
#expect(try await spark.read.option("rowTag", "person").xml(tmpDir).count() == 2025)
53+
}
5254
await spark.stop()
5355
}
5456

Tests/SparkConnectTests/SQLTests.swift

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,13 +69,24 @@ struct SQLTests {
6969
#expect(removeOwner("185") == "*")
7070
}
7171

72+
let queriesForSpark4Only: [String] = [
73+
"create_scala_function.sql",
74+
"create_table_function.sql",
75+
"pipesyntax.sql",
76+
"explain.sql",
77+
]
78+
7279
#if !os(Linux)
7380
@Test
7481
func runAll() async throws {
7582
let spark = try await SparkSession.builder.getOrCreate()
7683
for name in try! fm.contentsOfDirectory(atPath: path).sorted() {
7784
guard name.hasSuffix(".sql") else { continue }
7885
print(name)
86+
if queriesForSpark4Only.contains(name) {
87+
print("Skip query \(name) due to the difference between Spark 3 and 4.")
88+
continue
89+
}
7990

8091
let sql = try String(contentsOf: URL(fileURLWithPath: "\(path)/\(name)"), encoding: .utf8)
8192
let answer = cleanUp(try await spark.sql(sql).collect().map { $0.toString() }.joined(separator: "\n"))

Tests/SparkConnectTests/SparkConnectClientTests.swift

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -84,15 +84,15 @@ struct SparkConnectClientTests {
8484
await client.stop()
8585
}
8686

87-
#if !os(Linux) // TODO: Enable this with the offical Spark 4 docker image
8887
@Test
8988
func jsonToDdl() async throws {
9089
let client = SparkConnectClient(remote: TEST_REMOTE)
91-
let _ = try await client.connect(UUID().uuidString)
92-
let json =
90+
let response = try await client.connect(UUID().uuidString)
91+
if response.sparkVersion.version.starts(with: "4.") {
92+
let json =
9393
#"{"type":"struct","fields":[{"name":"id","type":"long","nullable":false,"metadata":{}}]}"#
94-
#expect(try await client.jsonToDdl(json) == "id BIGINT NOT NULL")
94+
#expect(try await client.jsonToDdl(json) == "id BIGINT NOT NULL")
95+
}
9596
await client.stop()
9697
}
97-
#endif
9898
}

Tests/SparkConnectTests/SparkSessionTests.swift

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,8 @@ struct SparkSessionTests {
5353
@Test
5454
func version() async throws {
5555
let spark = try await SparkSession.builder.getOrCreate()
56-
#expect(await spark.version.starts(with: "4.0.0"))
56+
let version = await spark.version
57+
#expect(version.starts(with: "4.0.0") || version.starts(with: "3.5."))
5758
await spark.stop()
5859
}
5960

@@ -80,7 +81,7 @@ struct SparkSessionTests {
8081
let tableName = "TABLE_" + UUID().uuidString.replacingOccurrences(of: "-", with: "")
8182
let spark = try await SparkSession.builder.getOrCreate()
8283
try await SQLHelper.withTable(spark, tableName)({
83-
_ = try await spark.sql("CREATE TABLE \(tableName) AS VALUES (1), (2)").count()
84+
_ = try await spark.sql("CREATE TABLE \(tableName) USING ORC AS VALUES (1), (2)").count()
8485
#expect(try await spark.table(tableName).count() == 2)
8586
})
8687
await spark.stop()

0 commit comments

Comments
 (0)