Trong bài viết này, tôi sẽ giải thích cách thực hiện nối PySpark trên nhiều cột của hai DataFrame và cũng học cách sử dụng PySpark SQL. Ngoài ra, bạn sẽ tìm hiểu các cách khác nhau để cung cấp điều kiện Tham gia trên hai hoặc nhiều cột.

1. Ví dụ nhanh

Sau đây là các ví dụ nhanh về việc kết hợp hai DataFrames trên nhiều cột trong PySpark

   
# Quick examples of PySpark join multiple columns

# PySpark join multiple columns
empDF.join(deptDF, (empDF["dept_id"] == deptDF["dept_id"]) &
   ( empDF["branch_id"] == deptDF["branch_id"])).show()

# Using where or filter
empDF.join(deptDF).where((empDF["dept_id"] == deptDF["dept_id"]) &
    (empDF["branch_id"] == deptDF["branch_id"])).show()
    
# Create tables
empDF.createOrReplaceTempView("EMP")
deptDF.createOrReplaceTempView("DEPT")

# Spark SQL
spark.sql("SELECT * FROM EMP e, DEPT d where e.dept_id == d.dept_id"
         " and e.branch_id == d.branch_id").show()

Trước khi chúng ta chuyển sang cách sử dụng nhiều cột trên biểu thức Tham gia, trước tiên, hãy tạo DataFrames từ tập dữ liệu emp và dept, Trên các cột dept_id và branch_id này có trên cả hai tập dữ liệu và chúng tôi sử dụng các cột này trong biểu thức nối khi tham gia DataFrames.

Dưới đây là Emp DataFrame với các cột emp_id, name, branch_id, dept_id, giới tính, lương.

   
# Import pyspark 
from pyspark.sql import SparkSession

# Create SparkSession
spark = SparkSession.builder 
          .appName('SignalFix.com') 
          .getOrCreate()
         
#EMP DataFrame

empData = [(1,"Smith","2018",10,"M",3000),
    (2,"Rose","2010",20,"M",4000),
    (3,"Williams","2010",10,"M",1000),
    (4,"Jones","2005",10,"F",2000),
    (5,"Brown","2010",30,"",-1),
    (6,"Brown","2010",50,"",-1)
  ]
  
empColumns = ["emp_id","name","branch_id","dept_id",
  "gender","salary"]
empDF = spark.createDataFrame(empData,empColumns)
empDF.show()

#DEPT DataFrame
deptData = [("Finance",10,"2018"),
    ("Marketing",20,"2010"),
    ("Marketing",20,"2018"),
    ("Sales",30,"2005"),
    ("Sales",30,"2010"),
    ("IT",50,"2010")
  ]
deptColumns = ["dept_name","dept_id","branch_id"]
deptDF=spark.createDataFrame(deptData,deptColumns)  
deptDF.show()

Sản lượng thấp hơn sản lượng.

Tạo ra Dept DataFrame với các cột dept_name, dept_id, branch_id

  
#DEPT DataFrame
deptData = [("Finance",10,"2018"),
    ("Marketing",20,"2010"),
    ("Marketing",20,"2018"),
    ("Sales",30,"2005"),
    ("Sales",30,"2010"),
    ("IT",50,"2010")
  ]
deptColumns = ["dept_name","dept_id","branch_id"]
deptDF=spark.createDataFrame(deptData,deptColumns)  
deptDF.show()

Sản lượng thấp hơn sản lượng.

pyspark nối nhiều cột

2. PySpark Tham gia Nhiều Cột

Cú pháp nối này của PySpark join () lấy, lấy tập dữ liệu bên phải, joinExprs và joinType làm đối số và chúng tôi sử dụng joinExprs để cung cấp điều kiện nối trên nhiều cột.

Ví dụ này kết hợp emptDF DataFrame với deptDF DataFrame trên nhiều cột dept_id và cột branch_id bằng cách sử dụng liên kết bên trong. Ví dụ này in đầu ra dưới đây cho bảng điều khiển.

   
# PySpark join multiple columns
empDF.join(deptDF, (empDF["dept_id"] == deptDF["dept_id"]) &
   ( empDF["branch_id"] == deptDF["branch_id"]),"inner").show()

Sản lượng thấp hơn sản lượng. Bạn nên sử dụng & / | các toán tử mare cẩn thận và cẩn thận về (== có mức độ ưu tiên thấp hơn so với bitwise AND và OR)

3. Sử dụng Nơi cung cấp Điều kiện Tham gia

Thay vì sử dụng một điều kiện tham gia với join() toán tử, chúng ta có thể sử dụng where() để cung cấp một điều kiện tham gia.

   
#Using Join with multiple columns on where clause 
empDF.join(deptDF).where((empDF["dept_id"] == deptDF["dept_id"]) &
    (empDF["branch_id"] == deptDF["branch_id"])).show()

4. SQL để nối nhiều cột

Cuối cùng, hãy chuyển đoạn mã trên thành truy vấn SQL PySpark để nối trên nhiều cột. Để làm như vậy, trước tiên, bạn cần tạo một dạng xem tạm thời bằng cách sử dụng createOrReplaceTempView () và sử dụng SparkSession.sql () để chạy truy vấn. Bảng sẽ có sẵn để sử dụng cho đến khi bạn kết thúc SparkSession của mình.

   
# Create tables
empDF.createOrReplaceTempView("EMP")
deptDF.createOrReplaceTempView("DEPT")

# Spark SQL
spark.sql("SELECT * FROM EMP e, DEPT d where e.dept_id == d.dept_id"
         " and e.branch_id == d.branch_id").show()

5. Hoàn thành Ví dụ

Sau đây là ví dụ đầy đủ về việc kết hợp hai DataFrames trên nhiều cột.

   
# Import pyspark 
from pyspark.sql import SparkSession

# Create SparkSession
spark = SparkSession.builder 
          .appName('SignalFix.com') 
          .getOrCreate()
         
#EMP DataFrame
empData = [(1,"Smith","2018",10,"M",3000),
    (2,"Rose","2010",20,"M",4000),
    (3,"Williams","2010",10,"M",1000),
    (4,"Jones","2005",10,"F",2000),
    (5,"Brown","2010",30,"",-1),
    (6,"Brown","2010",50,"",-1)
  ]
  
empColumns = ["emp_id","name","branch_id","dept_id",
  "gender","salary"]
empDF = spark.createDataFrame(empData,empColumns)
empDF.show()

#DEPT DataFrame
deptData = [("Finance",10,"2018"),
    ("Marketing",20,"2010"),
    ("Marketing",20,"2018"),
    ("Sales",30,"2005"),
    ("Sales",30,"2010"),
    ("IT",50,"2010")
  ]
deptColumns = ["dept_name","dept_id","branch_id"]
deptDF=spark.createDataFrame(deptData,deptColumns)  
deptDF.show()

# PySpark join multiple columns
empDF.join(deptDF, (empDF["dept_id"] == deptDF["dept_id"]) &
   ( empDF["branch_id"] == deptDF["branch_id"])).show()

# Using where or filter
empDF.join(deptDF).where((empDF["dept_id"] == deptDF["dept_id"]) &
    (empDF["branch_id"] == deptDF["branch_id"])).show()
    
# Create tables
empDF.createOrReplaceTempView("EMP")
deptDF.createOrReplaceTempView("DEPT")

# Spark SQL
spark.sql("SELECT * FROM EMP e, DEPT d where e.dept_id == d.dept_id"
         " and e.branch_id == d.branch_id").show()

Ví dụ đầy đủ có sẵn tại dự án GitHub để tham khảo.

6. Kết luận

Trong bài viết này, bạn đã học cách thực hiện hai phép nối DataFrame trên nhiều cột trong PySpark. Và cũng đã học cách sử dụng các điều kiện tham gia bằng cách sử dụng Tham gia, ở đâu, bộ lọc và biểu thức SQL.

Trong bài viết này, chúng ta đã khám phá PySpark Join Multiple Columns. Trong trường hợp bạn có bất kỳ câu hỏi nào, vui lòng hỏi signalfix.net trong phần bình luận bên dưới.