phương pháp pyspark.sql.DataFrame.repartition() được sử dụng để tăng hoặc giảm RDD/DataFrame vách ngăn. Hàm này nhận 2 tham số; numPartitions*colskhi một cái được chỉ định thì cái kia là tùy chọn. repartition() là một chuyển đổi rộng hơn bao gồm việc xáo trộn dữ liệu, do đó, nó được coi là hoạt động tốn kém.

Những điểm chính –

  • phân vùng lại () được sử dụng để tăng hoặc giảm số lượng phân vùng.
  • phân vùng lại () tạo phân vùng đồng đều khi so sánh với coalesce().
  • Đây là một hoạt động tốn kém vì nó liên quan đến việc xáo trộn dữ liệu và tiêu tốn nhiều tài nguyên hơn.
  • phân vùng lại () có thể lấy int hoặc tên cột làm tham số để xác định cách thực hiện các phân vùng.
  • Nếu tham số không được chỉ định, nó sử dụng số lượng phân vùng mặc định.
  • Là một phần của tối ưu hóa hiệu suất, bạn nên tránh sử dụng chức năng này.

1. Ví dụ nhanh về Phân vùng lại PySpark ()

Sau đây là các ví dụ nhanh về Phân vùng lại PySpark () của DataFrame.

   
# Repartition by number
df2 = df.repartition(5)

# Repatition by column name
df2 = df.repartition("state")

# Repatition by column name
df2 = df.repartition(5, "state")

# Repatition by multiple columns
df2 = df.repartition("state","department")

2. DataFrame.repartition ()

phân vùng lại () là một phương thức của lớp pyspark.sql.DataFrame được sử dụng để tăng hoặc giảm số lượng phân vùng của Khung dữ liệu. Khi bạn tạo Khung dữ liệu, dữ liệu hoặc hàng được phân phối trên nhiều phân vùng trên nhiều máy chủ. kể từ đây, Phân vùng lại PySpark () kích hoạt xáo trộn dữ liệu từ tất cả các phân vùng của Khung dữ liệu.

2.1 Cú pháp

Sau đây là cú pháp của DataFrame.repartition ()

  
# Syntax of repartition()
DataFrame.repartition(numPartitions, *cols)
 

2.2 Thông số & Loại trả lại

Sau đây là các thông số của phân vùng lại () và điều này trở lại Khung dữ liệu với dữ liệu được phân vùng lại.

  • numPartitions – Mục tiêu Số lượng phân vùng. Nếu không được chỉ định, số lượng phân vùng mặc định sẽ được sử dụng.
  • *cols – Một hoặc nhiều cột để sử dụng trong phân vùng lại.

3. Phân vùng lại PySpark DataFrame ()

Phân vùng lại phân phối lại dữ liệu từ tất cả các phân vùng thành số lượng phân vùng được chỉ định, dẫn đến xáo trộn toàn bộ dữ liệu, đây là một hoạt động rất tốn kém khi bạn có hàng tỷ hoặc hàng nghìn tỷ dữ liệu. Để xem nó hoạt động như thế nào, hãy tạo DataFrame với một số dữ liệu thử nghiệm.

   
# Imports
from pyspark.sql import SparkSession

# Create Spark Session
spark = SparkSession.builder.appName('SignalFix.com') 
        .master("local[5]").getOrCreate()

# Create PySpark DataFrame
simpleData = [("James","Sales","NY",90000,34,10000),
    ("Michael","Sales","NY",86000,56,20000),
    ("Robert","Sales","CA",81000,30,23000),
    ("Maria","Finance","CA",90000,24,23000),
    ("Raman","Finance","CA",99000,40,24000),
    ("Scott","Finance","NY",83000,36,19000),
    ("Jen","Finance","NY",79000,53,15000),
    ("Jeff","Marketing","CA",80000,25,18000),
    ("Kumar","Marketing","NY",91000,50,21000)
  ]

schema = ["employee_name","department","state","salary","age","bonus"]
df = spark.createDataFrame(data=simpleData, schema = schema)
df.show()

# Write to CSV file
df.write.mode("overwrite").csv("/tmp/partition.csv")

Theo mặc định Khung dữ liệu được tạo với các phân vùng mặc định dựa trên môi trường và thiết lập của bạn. Ví dụ sau đây tạo nhiều tệp phần tại vị trí được chỉ định. Số phân vùng = số tập tin bộ phận. Lưu ý rằng nếu bạn chạy điều này nhiều lần, bạn sẽ nhận được các giá trị khác nhau trong các tệp phần cho mỗi lần chạy.

Phân vùng lại PySpark

3.1 Phân vùng lại theo số

Bây giờ hãy phân vùng lại dữ liệu này thành 3 phân vùng bằng cách gửi giá trị 3 tới numPartitions tham số.

  
# repartition()
df2 = df.repartition(numPartitions=3)
print(df2.rdd.getNumPartitions())

# Write DataFrame to CSV file
df2.write.mode("overwrite").csv("/tmp/partition.csv")
 

Nó phân chia lại DataFrame thành 3 phân vùng.

3.2 Phân vùng lại theo cột

Hãy phân vùng lại PySpark DataFrame theo cột. Ví dụ sau đây phân phối lại dữ liệu theo trạng thái tên cột.

  
# repartition by column
df2 = df.repartition("state")
print(df2.rdd.getNumPartitions())

# Write
df2.write.mode("overwrite").csv("/tmp/partition.csv")
 

3.3. Phân vùng lại theo nhiều cột

Hãy phân vùng lại PySpark DataFrame bởi nhiều cột. Ví dụ sau đây phân phối lại dữ liệu theo trạng thái cột và bộ phận.

  
# repartition by multiple column
df2 = df.repartition("state","department")
print(df2.rdd.getNumPartitions())

# Write
df2.write.mode("overwrite").csv("/tmp/partition.csv")
 

4. PySpark RDD Phân vùng lại

Trong RDD, bạn có thể tạo song song tại thời điểm tạo RDD bằng cách sử dụng parallelize(), textFile()wholeTextFiles().

   
rdd = spark.sparkContext.parallelize((0,20))
print("From local[5]"+str(rdd.getNumPartitions()))

rdd1 = spark.sparkContext.parallelize((0,25), 6)
print("parallelize : "+str(rdd1.getNumPartitions()))

rddFromFile = spark.sparkContext.textFile("/tmp/test.txt",10)
print("TextFile : "+str(rddFromFile.getNumPartitions()))

Ví dụ trên cho kết quả đầu ra bên dưới.

   
From local[5] : 5
Parallelize : 6
TextFile : 10

spark.sparkContext.parallelize(Range(0,20),6) phân phối RDD thành 6 phân vùng và dữ liệu được phân phối như bên dưới.

  
rdd1.saveAsTextFile("/tmp/partition")
//Writes 6 part files, one for each partition
Partition 1 : 0 1 2
Partition 2 : 3 4 5
Partition 3 : 6 7 8 9
Partition 4 : 10 11 12
Partition 5 : 13 14 15
Partition 6 : 16 17 18 19
 

1.1 Phân vùng lại RDD ()

phương pháp Phân vùng lại PySpark RDD () được sử dụng để tăng hoặc giảm các phân vùng. Ví dụ dưới đây giảm các phân vùng từ 10 xuống 4 bằng cách di chuyển dữ liệu từ tất cả các phân vùng.

  
rdd2 = rdd1.repartition(4)
print("Repartition size : "+str(rdd2.getNumPartitions()))
rdd2.saveAsTextFile("/tmp/re-partition")
 

Điều này mang lại sản lượng Repartition size : 4 và phân vùng lại phân phối lại dữ liệu (như được hiển thị bên dưới) từ tất cả các phân vùng bị xáo trộn đầy đủ dẫn đến hoạt động rất tốn kém khi xử lý hàng tỷ và hàng nghìn tỷ dữ liệu.

   
Partition 1 : 1 6 10 15 19
Partition 2 : 2 3 7 11 16
Partition 3 : 4 8 12 13 17
Partition 4 : 0 5 9 14 18

Trong bài viết này, chúng ta đã khám phá phân vùng PySpark () – Giải thích với các ví dụ. Trong trường hợp bạn có bất kỳ câu hỏi nào, vui lòng hỏi signalfix.net trong phần bình luận bên dưới.