val spark: SparkSession = ...
spark.sql("select * from t1, t2 where t1.id = t2.id")
Join Operators
From PostgreSQL’s 2.6. Joins Between Tables:
Queries can access multiple tables at once, or access the same table in such a way that multiple rows of the table are being processed at the same time. A query that accesses multiple rows of the same or different tables at one time is called a join query.
You can join datasets using join operators: crossJoin, join, and joinWith.
| Operator | Return Type | Description | 
|---|---|---|
Untyped,   | 
||
Untyped,   | 
||
Used for type-preserving join with two output columns for records for which join condition holds  | 
| 
 Note 
 | 
 You can also use SQL mode to join datasets using good ol' SQL.  | 
You can specify a join condition (aka join expression) as part of join operators or using where operator.
df1.join(df2, $"df1Key" === $"df2Key")
df1.join(df2).where($"df1Key" === $"df2Key")
You can specify the join type as part of join operators (using joinType optional parameter).
df1.join(df2, $"df1Key" === $"df2Key", "inner")
| SQL | Name (joinType) | JoinType | 
|---|---|---|
  | 
  | 
|
  | 
  | 
  | 
  | 
  | 
|
  | 
  | 
  | 
  | 
  | 
  | 
  | 
  | 
  | 
  | 
  | 
  | 
  | 
Special case for   | 
  | 
Special case for   | 
| 
 Tip 
 | 
Name are case-insensitive and can use the underscore (_) at any position, i.e. left_anti and LEFT_ANTI are equivalent.
 | 
| 
 Note 
 | 
Spark SQL offers different join strategies with Broadcast Joins (aka Map-Side Joins) among them that are supposed to optimize your join queries over large distributed datasets. | 
 join Operators
join(right: Dataset[_]): DataFrame (1)
join(right: Dataset[_], usingColumn: String): DataFrame (2)
join(right: Dataset[_], usingColumns: Seq[String]): DataFrame (3)
join(right: Dataset[_], usingColumns: Seq[String], joinType: String): DataFrame (4)
join(right: Dataset[_], joinExprs: Column): DataFrame (5)
join(right: Dataset[_], joinExprs: Column, joinType: String): DataFrame (6)
- 
Condition-less inner join
 - 
Inner join with a single column that exists on both sides
 - 
Inner join with columns that exist on both sides
 - 
Equi-join with explicit join type
 - 
Inner join
 - 
Join with explicit join type. Self-joins are acceptable.
 
join joins two Datasets.
val left = Seq((0, "zero"), (1, "one")).toDF("id", "left")
val right = Seq((0, "zero"), (2, "two"), (3, "three")).toDF("id", "right")
// Inner join
scala> left.join(right, "id").show
+---+----+-----+
| id|left|right|
+---+----+-----+
|  0|zero| zero|
+---+----+-----+
scala> left.join(right, "id").explain
== Physical Plan ==
*Project [id#50, left#51, right#61]
+- *BroadcastHashJoin [id#50], [id#60], Inner, BuildRight
   :- LocalTableScan [id#50, left#51]
   +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
      +- LocalTableScan [id#60, right#61]
// Full outer
scala> left.join(right, Seq("id"), "fullouter").show
+---+----+-----+
| id|left|right|
+---+----+-----+
|  1| one| null|
|  3|null|three|
|  2|null|  two|
|  0|zero| zero|
+---+----+-----+
scala> left.join(right, Seq("id"), "fullouter").explain
== Physical Plan ==
*Project [coalesce(id#50, id#60) AS id#85, left#51, right#61]
+- SortMergeJoin [id#50], [id#60], FullOuter
   :- *Sort [id#50 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(id#50, 200)
   :     +- LocalTableScan [id#50, left#51]
   +- *Sort [id#60 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(id#60, 200)
         +- LocalTableScan [id#60, right#61]
// Left anti
scala> left.join(right, Seq("id"), "leftanti").show
+---+----+
| id|left|
+---+----+
|  1| one|
+---+----+
scala> left.join(right, Seq("id"), "leftanti").explain
== Physical Plan ==
*BroadcastHashJoin [id#50], [id#60], LeftAnti, BuildRight
:- LocalTableScan [id#50, left#51]
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
   +- LocalTableScan [id#60]
Internally, join(right: Dataset[_]) creates a DataFrame with a condition-less Join logical operator (in the current SparkSession).
| 
 Note 
 | 
join(right: Dataset[_]) creates a logical plan with a condition-less Join operator with two child logical plans of the both sides of the join.
 | 
| 
 Note 
 | 
join(right: Dataset[_], usingColumns: Seq[String], joinType: String) creates a logical plan with a condition-less Join operator with UsingJoin join type.
 | 
| 
 Note 
 | 
 
 
That is usually considered a trivially true condition and refused as acceptable. With spark.sql.selfJoinAutoResolveAmbiguity option enabled (which it is by default),   | 
 crossJoin Method
crossJoin(right: Dataset[_]): DataFrame
| 
 Note 
 | 
crossJoin creates an explicit cartesian join that can be very expensive without an extra filter (that can be pushed down).
 | 
 Type-Preserving Joins — joinWith Operators
joinWith[U](other: Dataset[U], condition: Column): Dataset[(T, U)]  (1)
joinWith[U](other: Dataset[U], condition: Column, joinType: String): Dataset[(T, U)]
- 
Type-safe inner join
 
joinWith creates a Dataset with two columns _1 and _2 that each contains records for which condition holds.
case class Person(id: Long, name: String, cityId: Long)
case class City(id: Long, name: String)
val people = Seq(Person(0, "Agata", 0), Person(1, "Iweta", 0)).toDS
val cities = Seq(City(0, "Warsaw"), City(1, "Washington")).toDS
val joined = people.joinWith(cities, people("cityId") === cities("id"))
scala> joined.printSchema
root
 |-- _1: struct (nullable = false)
 |    |-- id: long (nullable = false)
 |    |-- name: string (nullable = true)
 |    |-- cityId: long (nullable = false)
 |-- _2: struct (nullable = false)
 |    |-- id: long (nullable = false)
 |    |-- name: string (nullable = true)
scala> joined.show
+-----------+----------+
|         _1|        _2|
+-----------+----------+
|[0,Agata,0]|[0,Warsaw]|
|[1,Iweta,0]|[0,Warsaw]|
+-----------+----------+
| 
 Note 
 | 
joinWith preserves type-safety with the original object types.
 | 
| 
 Note 
 | 
joinWith creates a Dataset with Join logical plan.
 |