Trong phần này, tôi sẽ giải thích một số Biến đổi RDD với ví dụ đếm từ trong Spark với scala, trước khi bắt đầu, hãy tạo một RDD bằng cách đọc một tệp văn bản. Tệp văn bản được sử dụng ở đây có sẵn trên GitHub.

   
# Imports
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession

# Create SparkSession 
val spark = SparkSession.builder()
      .master("local[3]")
      .appName("SignalFix.com")
      .getOrCreate()

# Prepare Data
data = ["Project Gutenberg’s",
        "Alice’s Adventures in Wonderland",
        "Project Gutenberg’s",
        "Adventures in Wonderland",
        "Project Gutenberg’s"]

# Create RDD
rdd = spark.sparkContext.parallelize(data)
ví dụ đếm từ spark

1. Bản đồ phẳng() Chuyển đổi

flatMap() phép biến đổi làm phẳng RDD sau khi áp dụng hàm và trả về một RDD mới. Trong ví dụ dưới đây, đầu tiên, nó chia từng bản ghi theo khoảng trắng trong RDD và cuối cùng làm phẳng nó. Kết quả RDD bao gồm một từ duy nhất trên mỗi bản ghi.

   
val rdd2 = rdd.flatMap(f=>f.split(" "))

2. bản đồ() Chuyển đổi

map() phép chuyển đổi được sử dụng để áp dụng bất kỳ hoạt động phức tạp nào như thêm cột, cập nhật cột, v.v., đầu ra của phép biến đổi bản đồ sẽ luôn có cùng số bản ghi với đầu vào.

Trong ví dụ đếm từ của chúng tôi, chúng tôi đang thêm một cột mới với giá trị 1 cho mỗi từ, kết quả của RDD là CặpRDDFunctions chứa các cặp khóa-giá trị, từ loại Chuỗi là Khóa và 1 loại Int là giá trị. Để bạn hiểu, tôi đã định nghĩa biến rdd3 với kiểu.

   
val rdd3:RDD[(String,Int)]= rdd2.map(m=>(m,1))

3. filter () Chuyển đổi

filter() phép biến đổi được sử dụng để lọc các bản ghi trong RDD. Trong ví dụ của chúng tôi, chúng tôi đang lọc tất cả các từ bắt đầu bằng a.

   
val rdd4 = rdd3.filter(a=> a._1.startsWith("a"))

4. ReduceByKey () Chuyển đổi

reduceByKey() hợp nhất các giá trị cho mỗi khóa với chức năng được chỉ định. Trong ví dụ của chúng tôi, nó làm giảm chuỗi từ bằng cách áp dụng hàm sum cho giá trị. Kết quả RDD của chúng tôi chứa các từ duy nhất và số lượng của chúng.

   
val rdd5 = rdd3.reduceByKey(_ + _)

5. Chuyển đổi sortByKey ()

sortByKey() phép biến đổi được sử dụng để sắp xếp các phần tử RDD trên khóa. Trong ví dụ của chúng tôi, đầu tiên, chúng tôi chuyển đổi RDD[(String,Int]) sang RDD[(Int,String]) bằng cách sử dụng phép biến đổi bản đồ và áp dụng sortByKey mà lý tưởng là sắp xếp trên một giá trị số nguyên. Và cuối cùng, câu lệnh foreach với println in tất cả các từ trong RDD và số lượng của chúng là cặp khóa-giá trị vào bảng điều khiển.

  
val rdd6 = rdd5.map(a=>(a._2,a._1)).sortByKey()

//Print rdd6 result to console
rdd6.foreach(println)
 

6. Ví dụ về số lượng từ Spark

Sau đây là một ví dụ đầy đủ về ví dụ đếm từ trong Scala bằng cách sử dụng một số phép biến đổi RDD.

   
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession

object WordCountExample {

  def main(args:Array[String]): Unit = {

    val spark:SparkSession = SparkSession.builder()
      .master("local[3]")
      .appName("SignalFix.com")
      .getOrCreate()

    val sc = spark.sparkContext

    val rdd:RDD[String] = sc.textFile("src/main/resources/test.txt")
    println("initial partition count:"+rdd.getNumPartitions)

    val reparRdd = rdd.repartition(4)
    println("re-partition count:"+reparRdd.getNumPartitions)

    //rdd.coalesce(3)

    rdd.collect().foreach(println)

    // rdd flatMap transformation
    val rdd2 = rdd.flatMap(f=>f.split(" "))
    rdd2.foreach(f=>println(f))

    //Create a Tuple by adding 1 to each word
    val rdd3:RDD[(String,Int)]= rdd2.map(m=>(m,1))
    rdd3.foreach(println)

    //Filter transformation
    val rdd4 = rdd3.filter(a=> a._1.startsWith("a"))
    rdd4.foreach(println)

    //ReduceBy transformation
    val rdd5 = rdd3.reduceByKey(_ + _)
    rdd5.foreach(println)

    //Swap word,count and sortByKey transformation
    val rdd6 = rdd5.map(a=>(a._2,a._1)).sortByKey()
    println("Final Result")

    //Action - foreach
    rdd6.foreach(println)

    //Action - count
    println("Count : "+rdd6.count())

    //Action - first
    val firstRec = rdd6.first()
    println("First Record : "+firstRec._1 + ","+ firstRec._2)

    //Action - max
    val datMax = rdd6.max()
    println("Max Record : "+datMax._1 + ","+ datMax._2)

    //Action - reduce
    val totalWordCount = rdd6.reduce((a,b) => (a._1+b._1,a._2))
    println("dataReduce Record : "+totalWordCount._1)
    //Action - take
    val data3 = rdd6.take(3)
    data3.foreach(f=>{
      println("data3 Key:"+ f._1 +", Value:"+f._2)
    })

    //Action - collect
    val data = rdd6.collect()
    data.foreach(f=>{
      println("Key:"+ f._1 +", Value:"+f._2)
    })

    //Action - saveAsTextFile
    rdd5.saveAsTextFile("c:/tmp/wordCount")
    
  }
}

Sự kết luận

Trong hướng dẫn Spark RDD Transformations này, bạn đã học các hàm biến đổi khác nhau và cách sử dụng chúng với các ví dụ về scala và dự án GitHub để tham khảo nhanh.

Trên đây là toàn bộ thông tin chi tiết nhất về Spark Word Count Explained với Ví dụ. Hướng dẫn này có trả lời câu hỏi của bạn không? Để cho signalfix.net biết trong các ý kiến ​​dưới đây.