Spark 성능 튜닝 - 2. cache(), persist(), unpersist()
Spark의 Performance 튜닝을 수행 해 보자. 2023-05-24

오늘은 Spark 성능 튜닝에 필요한, cache()persist() 에 대해서 알아 보도록 하겠습니다.

RDDTransformation (ex: map(), filter(), distinct() 등)을 이용 하여 새로운 RDD를 만들 수 있습니다. 하지만, Action (ex: collection(), count(), foreach() 등)이 호출 되기 전까지는, 실제 연산을 수행 하지 않죠.

다음 예제를 함께 봅시다!

// 데이터 프레임 생성
val data = Seq(
  ("Alice", 25),
  ("Bob", 30),
  ("Charlie", 35),
  ("David", 40)
)

val df = spark.createDataFrame(data).toDF("name", "age")

// age 컬럼이 30보다 큰 행만 선택 하는 Transformation, 하지만 여기는 계산이 되지 않아요!
val filteredDf = df.filter($"age" > 30)

// 여기는 Action 함수 이므로, 실제 계산이 수행 됩니다!
filteredDf.count()

하지만, 우리가 다음과 같은 상황을 가정해 보죠. 만약, 동일하게 TransformationRDD에 대해, 여러 개의 Action을 수행 한다고 가정 해 봅시다!

val data = Seq(
  ("Alice", 25),
  ("Bob", 30),
  ("Charlie", 35),
  ("David", 40)
)

val df = spark.createDataFrame(data).toDF("name", "age")
val filteredDf = df.filter($"age" > 30)

val resultCount = filteredDf.count()  // action
val maxAge = df.agg(max("Age")).first().getInt(0)  // action

그러면 연산이 몇 번 발생할까요? filter() 연산을 통해, 새로운 RDD를 생성 하고, count(), agg() 를 호출하였으니 이렇게 연산이 되었을 것이라 예상 할 수 있습니다.

  • filter() -> count(), agg()

아니요, 틀렸습니다. count(), agg() 같은 액션을 호출 할 때마다, 모든 의존성을 재연산 하게 됩니다. 실제 연산은 다음과 같습니다.

  • filter() -> count()
  • filter() -> agg()

그렇다면, 우리는 어떻게 하는 것이 좋을까요? 이럴 때, 메모리 혹은 디스크에 계속 DataFrame을 저장 할 수 있도록 만든 것이 cache() 입니다. 다음과 같이 연산 하게 되면, 재연산을 막을 수가 있겠죠. 더 이상 캐시가 필요 하지 않을 때는, unpersist()를 통해 캐시를 release 하여야 합니다.

val data = Seq(
  ("Alice", 25),
  ("Bob", 30),
  ("Charlie", 35),
  ("David", 40)
)

val df = spark.createDataFrame(data).toDF("name", "age")

df.cache()  // cache를 통해 연산 결과를 메모리에 남기기! df와, filteredDf 모두가 저장 된다.

val filteredDf = df.filter($"age" > 30)

val resultCount = filteredDf.count()  // action
val maxAge = df.agg(max("Age")).first().getInt(0)  // action

df.unpersist() // cache release

cache()persist(storageLevel = MEMORY_AND_DISK)와 똑같습니다. persist() 도, 연산이 끝난 데이터에 대해 영속성을 유지 하게 해 줍니다. 사용 방법은 cache()와 같지만, storageLevel을 기입 하여 주어야 합니다. 이는 다음과 같으며, _SER 이 붙어 있는 옵션은, 데이터를 Serialize (직렬화) 하여 저장 하는지에 대한 여부 입니다. (직렬화를 하면 역직렬화도 진행 하여야 하기 때문에, 메모리를 적게 사용하게 되나, CPU를 많이 사용 하게 됩니다.)

DISK (HDD, SDD)는 당연히, MEMORY (RAM) 보다 가져오는 속도가 느릴 것이니, 참고 하여야 합니다! (Sparkin-memory 연산 입니다.)

Storage Level    Space used  CPU time  In memory  On-disk  Serialized   Recompute some partitions
----------------------------------------------------------------------------------------------------
MEMORY_ONLY          High        Low       Y          N        N         Y    
MEMORY_ONLY_SER      Low         High      Y          N        Y         Y
MEMORY_AND_DISK      High        Medium    Some       Some     Some      N
MEMORY_AND_DISK_SER  Low         High      Some       Some     Y         N
DISK_ONLY            Low         High      N          Y        Y         N

(출처: https://sparkbyexamples.com/spark/spark-persistence-storage-levels/)