RDD

Resilient Distributed Datasets

A Fault - Tolerant Abstraction for In-Memory Cluster Computing

2012 NSDI Best Paper : RDD

spark = RDD + Interface

spark는 분산 프레임워크이다

RDD : memory 내에서 큰 cluster에서 작업할 수 있고, fault-tolerant 함

RDD 만든 이유

  • 기존의 컴퓨팅 프레임워크가 비효율적으로 처리하는 “반복적 알고리즘” 이나 “인터랙티브 데이터 마이닝 도구”를 개선하기 위해 만들어짐

RDD는 “fined-grained”가 아닌 “coarse-grained” 방식을 사용해서 고장시 해결기술을 효율적으로 구축함.

  • coarse-grained : 청소하는 장소가 바뀌더라도 “청소해” 하면 끝나는 방식
  • fined-grained : 청소하는 장소가 집, 호텔, 빌딩등으로 바뀔때마다 명령하는 방식도 수정이 필요함.

1. Introduction

대형 데이터 분석을 위한 MapReduceDryad와 같은 cluster computing framework가 나옴 -> let users write parallel computations using a set of high-level operators, without having to worry about work distribution and fault tolerance

  • cluster computing framework
    • 프로그램 사이의 데이터 채널을 그래프 형태로 구성하여 병렬 데이터 처리를 하는 프레임워크
    • MapReduce : Map 기능과 Reduce 기능을 작성
    • Dryad : 데이터를 처리하는 그래프를 만들어서 처리.

기존 cluster computing framework의 단점

  • lack abstractions for leveraging distributed memory
    • leveraging : 빚을 지다. 분배된 메모리를 끌어다가 씀
  • data reuse가 빈번한 경우에 부적합함
    • machine learning
    • graph algorithm
    • interactive data mining

fault-tolerance에 대한 기존 프레임워크들의 방식 (비용이 큼)

  • 데이터 복제
  • 로그 업데이트

fault-tolerance에 대한 RDD의 방식

  • coarse-grained transformation!
  • 데이터 셋을 만드는데 사용된 변환들을 기록해 두는 방식으로 효율적으로 fault-tolerance를 구축함.

Spark

  • Spark에 RDD를 적용함
  • Spark는 범용적이고 interactive speed로 사용되는 첫번째 시스템이다.

2. RDDs

2.1 RDD란

RDD 생성하는법 단 두가지

  • data로부터
  • 다른 RDD로부터

transformation : map, filter, join..

RDD는 항상 실체화되어야 할 필요가 없고, 다른 데이터셋에서 어떻게 이렇게 변했는지에 대한 정보를 가지고 있으면 된다.

  • RDD는 data로부터 어떻게 현재 상태가 되었는지에 대한 정보를 가지고 있음

  • 이 속성때문에 프로그램은 문제가 생긴다음 재구성 할 수 없는 RDD는 애초에 참조를 못하게 됨

RDD aspects

  • persistence
  • partitioning

2.2 Spark에서의 인터페이스

actions : count, collect, save..

spark : lazy evaluation을 사용해서 action을 할때까지 계산을 하지 않음

persist

  • 재사용할 RDD를 저장해 둘 수 있다
  • default : RAM에 persistent RDD 저장해둠
  • option : disk에 저장, 다른 머신에 복제 등의 옵션 설정 가능

2.2.1 Console Log Mining

HDFS에서 테라바이트 규모의 로그에서 에러를 찾아야 하는 상황이라고 할때, 스파크로 로그들 속 에러메시지만 가져올 수 있다.

// Scala code
lines = spark.textFile("hdfs://...")
// RDD backed by an HDFS file
errors = lines.filter(_.startsWith("ERROR"))
// filtered RDD를 error에
errors.persist()
// error가 여러 query들 사이에서 공유될 수 있도록 해줌.

위의 세 코드를 통해 에러 메시지를 담은 로그를 담은 RDD를 유저가 사용할 수 있게 되었지만, 사실상 여기까지 클러스터에는 어떠한 작업도 일어나지 않았다. action을 해야 작업이 일어남.

errors.count()
// action!!	

위처럼 액션이 발생하면 스파크는 errors의 파티션을 메모리에 저장한다. 처음에 들여온 lines가 아닌 errors만 램에 저장되게 되고, 처리속도는 매우 빨라진다.

또한, 만약 error의 일부분이 손실되면, 일치하는 파티션의 lines를 보고 거기에 filter을 적용해서 그 error의 일부분을 복구한다.

// further transformation
errors.filter(_.contains("MySQL")).count()
//"MySQL" 포함한 error 개수 세기
errors.filter(_.contains("HDFS"))
	  .map(_.split('\t')(3))
	  .collect()
//"HDFS" 포함한 error중 tsv에서 3번째 열에 있는 정보 반환

2.3 RDD의 장점

DSM(Distributed shared memory)의 특징 : 전역 주소 공간의 임의의 위치에서 읽고 쓴다. 이 일반성이 효율성과 fault-tolerance방식의 구현을 어렵게 한다.

장점 1 : 위의 표를 보면, RDD와 DSM의 가장 큰 차이점은 Write(Rdd의 생성)을 할때 RDD는 coarse-grained 방식으로만 가능하다는 점이다.

  • 이 점이 RDD가 효율적인 fault-tolerance가 가능하도록 한다.
  • RDD는 check point를 만드느라 높은 오버헤드를 발생시키는 대신에, lineage를 기록해둠으로서 해결한다
  • RDD 전체를 다시 계산할 필요없이, 손실된 파티션만 다시 계산하면 된다.

장점 2: MapReduce처럼 백업 사본을 작업하느라 느려질 필요가 없음

장점 3 : 대량 작업에서 지역성을 기반으로 작업을 스케쥴링해서 성능을 향상시킬 수 있다

장점 4 : 충분한 메모리가 없는 경우에, RAM에 들어갈 수 없는 파티션은 디스크에 저장됨으로서, 비슷한 성능을 보일 수 있다.

2.4 RDD의 한계

RDD에 적합하지 않은 어플리케이션들도 있다.

RDD에 가장 적합한 application

  • batch application : 일괄 어플리케이션

RDD에 부적절한 application

  • 비동기식 어플리케이션.
    • ex. 웹 어플리케이션이나 웹 크롤러를 위한 저장 시스템
    • 이런 경우에는 전통적인 check point 만드는 식의 시스템이 더 적합하다

3. Spark Programing Interface

spark는 Scala의 간결성 때문에 Scala로 구현되었지만, 파이썬, 자바, R등 다양한 언어를 지원하는 SDK를 가지고 있다.

위 그림의 Driver가 Spark Context를 생성하고, 이 Spark Context가 각 worker의 클러스터에 연결된다. Spark Context가 실행할 task들을 worker에 보내는 방식으로 처리된다.

RDD 는 정적으로 type이 정해져 있다.

  • RDD[Int] : 정수 타입의 RDD
  • 그러나 Scala는 type inference를 지원하기 때문에 type 생략 가능

3.1 RDD Operations in Spark

위 표를 보면 RDD의 주요 transformation 과 action들을 볼 수 있다.

transformation은 lazy execution이 되다가, action이 실행될때에야 진짜로 계산이 실행된다.

참고

  • join : key-value 쌍을 가지고 있는 RDD에만 사용 가능

3.2 Example Appplication

3.2.1 Logistic Regression

val points = spark.textFile(...)
				.map(parsePoint).persist()
var w = // random initial vector
for (i <- 1 to ITERATIONS){
    val gradient = points.map{ p =>
    p.x * (1/(1+exp(-p.y*(w dot p.x)))-1)*p.y
    }.reduce((a,b) => a+b)
    w -= gradient
}
  • points 라는 spark 만들기
  • for문 안에서 points 에 map, reduce를 사용하여 gradient 계산

3.2.2 PageRank

** PageRank : 영향력 있는 페이지가 인용할 수록 페이지 랭크가 올라감

위 사진의 그래프는 PageRank의 Lineage graph인데, iteration의 횟수가 많아질수록 그래프가 세로로 길어짐을 확인할 수 있다. 그러므로 많이 반복하는 일의 경우에는 복사본을 만들어두는것이 fault recovery time을 줄이는데에 효과적이다.

(PageRank의 결과물은 반복된 join의 결과물이다. 그렇기 때문에 checkpoint를 만들어두는 것이 좋다.)

RDD의 파티셔닝을 제어함으로서 최적화 시키는 것도 가능하다

  • hash-partitioning : 동일한 해시값을 갖는 키들이 동일한 노드에 옴
  • range-partitioning : 같은 범위의 키가 같은 노드에 옴
links = spark.textFile(...).map(...)
			.partitionBy(myPartFunc).persist()
// scala,java의 경우에는 myPartFunc가 지정가능하지만 python은 불가능

link를 정의할때 위의 코드를 호출해주면, 파티셔닝이 가능하다

4. Representing RDDS

RDD간의 dependency

  • narrow dependency : parent RDD의 각 파티션이 최대 한개의 child 파티션으로부터 쓰인 경우
    • ex) map
  • wide dependency : 다수의 child 파티션들이 의존하는 경우
    • ex) join
  • 이렇게 구분하는 이유
    • narrow dependency의 경우 파이프라인 실행이 가능해짐
    • node failure 의 복구가 narrow dependency의 경우에 더 효과적임 (wide dependency의 경우에는 checkpoint를 만들어 두는것이 좋음)

5. Implementation

스파크는 Mesos cluster manager위에서 동작한다

** Mesos

  • 분산시스템 커널
  • 네트워크로 묶여있는 여러개의 컴퓨터 자원을 하나로 묶어서 resource pool로 만들어서 하나의 컴퓨터처럼 보이게 함

5.1 Job Scheduling

위 그림의 실선으로 되어있는 박스들은 RDD들이고, 그안의 파티션이 사각형으로 나뉘어 있다. 유저가 action을 실행하면, 스케쥴러는 RDD의 lineage graph를 보고 만들어가며 DAG에 나온 실행할 순서대로 계산한다. narrow dependency인 경우에는 최대한 병렬화 되도록 DAG를 구성한다.

** DAG : directed acyclic graph

스케쥴러는 데이터의 지역성을 기반으로 노드에 파티션을 부여한다.

wide dependency의 경우, checkpoint를 만든다.

5.2 Interpreter Integration

Scala에도 python처럼 interactive shell이 있다

Scala interpreter는 일반적으로 유저가 입력한 라인을 JVM에서 실행하는 형식으로 동작한다.

x = 5
println(x)

위와같은 코드를 유저가 입력했다면, Scala interpreter는 먼저 Line1이라는 클래스를 만들어서 x 라는 객체를 만든 후, 두번째 라인을 println(Line1.getInstance().x) 와 같은 형식으로 실행시키게 된다

Spark Interpreter의 두가지 변화점

  • Class shipping
    • 클래스들을 HTTP를 통해 옮김
  • Modified code generation
    • 일반적으로는 각 singleton object는 일치하는 클래스의 정적인 메소드로 접근되지만, 그대신에 각 라인의 오브젝트로 직접적으로 접근하도록 수정하였다.

5.3 Memory Management

스파크는 persistent RDD를 위한 세가지 옵션을 제공한다

  • in-memory storage as deserialized Java objects
    • 세 방식중 가장 빠르다 (java VM 이 각 RDD 요소에 접근 가능하기 때문에)
  • in-memory storage as serialized data
    • 공간이 제한되어 있을때 사용 (그대신 성능은 저하됨)
  • on-disk storage
    • 램에 두기에는 RDD가 너무 크지만 사용할때마다 다시 계산하기에는 비용이 클때 사용

5.4 Support for Checkpointing

lineage가 항상 RDD의 복구를 위해 쓰일 수 있지만, 특정 경우에는 이렇게 하는 것이 매우 큰 시간을 소모할 수 있다. 그러므로 특정 RDD의 경우에는 checkpoint를 만들어 두는 것이 더 효율적일 수 있다

  • 일반적으로, 긴 lineage graph를 가지는 RDD에게 체크포인트를 만들어 두는 방식이 효과적이다
    • ex) PageRank
    • 이러한 경우에, node failure 는 전체를 다시 계산해야하게 만듬. (narrow dependency의 경우에는 아니기 때문에 체크포인트를 만들기 보다 다시 계산하는 것이 빠름)
  • Spark는 checkpoint를 위한 API를 제공함
    • persist 메소드를 쓸때 REPLICATE flag를 1로 설정해줌으로서 사용할 수 있음
    • RDD의 read-only 성질 때문에 checkpoint를 만들어 두는 것도 간단해진다.

6. Evaluation

여러 벤치마크 어플리케이션을 수행했을때의 결과

  • Spark는 하둡보다 20-40배 높은 성능을 보였다
  • node failure가 일어났을때 스파크는 손실된 파티션만 다시 계산함으로서 빠르게 복구할 수 있다
  • 스파크는 5-7초정도의 지연시간으로 1TB의 데이터를 query .

6.1 Iterative Machine Learning Application

6.2 Page Rank

6.3 Fault Recovery

6.4 Behavior with Insufficient Memory

6.5 User Applications Built with Spark

6.6 Interactive Data Mining

7. Discussion

기존에는 MapReduce로 그래프를 그리고, 그걸로 Pregel을 돌렸다면, RDD는 수많은 일들을 하나의 프로그램에서 돌릴 수 있다.

7.1 Expressing Existing Programming Models

Scala는 data sharing abstraction의 부족에 집중해서 여러 문제들을 범용적으로 풀 수 있게 됨.

RDD를 사용하는 모델이 포함하는 것

  • MapReduce
  • DraydLINQ
  • SQL
  • Pregel
    • Google이 만든 iterative graph application
    • pregel은 같은 유저의 function을 모든 verices에 모든 iteration에서 활용하고 이 때 RDD를 사용함
  • Iterative MapReduce
  • Batched Stream Processing
    • 15분 마다 update되는 어플리케이션
    • application 의 변화하는 상태를 RDD에 저장
    • 중간단계의 RDD 상태의 저장이 속도 향상에 도움
  • Explaining the Expressivity of RDDs

7.2 Leveraging RDDs for Debugging

RDD를 fault-tolerance를 위해 다시 계산할 수 있도록 설계하였는데, 이 성질이 디버깅에도 도움을 준다.

  • 이 RDD를 나중에 다시 만들때 lineage를 보고 다시 만들 수 있음
  • RDD의 파티션을 다시 돌림으로서 디버깅할 수 있음.

Conclusion

RDD

  • 범용성
  • 복제가 필요없이 fault-tolerance를 구현해냄
  • iterative computation에도 사용가능

참고자료

[coarse-grained vs fine-grained] : http://moonlighting.tistory.com/90

[Dryad] : https://d2.naver.com/helloworld/29533

[spark partitioning] : http://ourcstory.tistory.com/156

[Mesos란] : http://brownbears.tistory.com/261

Comments