Skip to content

Commit f718110

Browse files
committed
[SPARK-51942] Support selectExpr in DataFrame
### What changes were proposed in this pull request? This PR aims to support `selectExpr` API in `DataFrame`. ### Why are the changes needed? For feature parity. ### Does this PR introduce _any_ user-facing change? No behavior change. ### How was this patch tested? Pass the CIs. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #93 from dongjoon-hyun/SPARK-51942. Authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 8b4133a commit f718110

File tree

4 files changed

+39
-0
lines changed

4 files changed

+39
-0
lines changed

Sources/SparkConnect/DataFrame.swift

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -269,6 +269,13 @@ public actor DataFrame: Sendable {
269269
return DataFrame(spark: self.spark, plan: SparkConnectClient.getProject(self.plan.root, cols))
270270
}
271271

272+
/// Projects a set of expressions and returns a new ``DataFrame``.
273+
/// - Parameter exprs: Expression strings
274+
/// - Returns: A ``DataFrame`` with subset of columns.
275+
public func selectExpr(_ exprs: String...) -> DataFrame {
276+
return DataFrame(spark: self.spark, plan: SparkConnectClient.getProjectExprs(self.plan.root, exprs))
277+
}
278+
272279
/// Returns a new Dataset with a column dropped. This is a no-op if schema doesn't contain column name.
273280
/// - Parameter cols: Column names
274281
/// - Returns: A ``DataFrame`` with subset of columns.

Sources/SparkConnect/SparkConnectClient.swift

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -390,6 +390,22 @@ public actor SparkConnectClient {
390390
return plan
391391
}
392392

393+
static func getProjectExprs(_ child: Relation, _ exprs: [String]) -> Plan {
394+
var project = Project()
395+
project.input = child
396+
let expressions: [Spark_Connect_Expression] = exprs.map {
397+
var expression = Spark_Connect_Expression()
398+
expression.exprType = .expressionString($0.toExpressionString)
399+
return expression
400+
}
401+
project.expressions = expressions
402+
var relation = Relation()
403+
relation.project = project
404+
var plan = Plan()
405+
plan.opType = .root(relation)
406+
return plan
407+
}
408+
393409
static func getWithColumnRenamed(_ child: Relation, _ colsMap: [String: String]) -> Plan {
394410
var withColumnsRenamed = WithColumnsRenamed()
395411
withColumnsRenamed.input = child

Tests/SparkConnectTests/DataFrameTests.swift

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,20 @@ struct DataFrameTests {
208208
try await #require(throws: Error.self) {
209209
let _ = try await spark.range(1).select("invalid").schema
210210
}
211+
try await #require(throws: Error.self) {
212+
let _ = try await spark.range(1).select("id + 1").schema
213+
}
214+
await spark.stop()
215+
}
216+
217+
@Test
218+
func selectExpr() async throws {
219+
let spark = try await SparkSession.builder.getOrCreate()
220+
let schema = try await spark.range(1).selectExpr("id + 1 as id2").schema
221+
#expect(
222+
schema
223+
== #"{"struct":{"fields":[{"name":"id2","dataType":{"long":{}}}]}}"#
224+
)
211225
await spark.stop()
212226
}
213227

Tests/SparkConnectTests/SparkConnectClientTests.swift

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ struct SparkConnectClientTests {
8484
await client.stop()
8585
}
8686

87+
#if !os(Linux) // TODO: Enable this with the offical Spark 4 docker image
8788
@Test
8889
func jsonToDdl() async throws {
8990
let client = SparkConnectClient(remote: TEST_REMOTE)
@@ -95,4 +96,5 @@ struct SparkConnectClientTests {
9596
}
9697
await client.stop()
9798
}
99+
#endif
98100
}

0 commit comments

Comments
 (0)