난 그냥 사이의 차이 무엇인가 궁금 RDD
하고 DataFrame
(스파크 2.0.0 DataFrame가 단순한 유형의 별칭 Dataset[Row]
) 아파치 스파크에서?
하나를 다른 것으로 바꿀 수 있습니까?
답변
A DataFrame
는 “DataFrame definition”에 대한 Google 검색으로 잘 정의됩니다.
데이터 프레임은 테이블 또는 2 차원 배열과 유사한 구조로, 각 열에는 하나의 변수에 대한 측정 값이 포함되고 각 행에는 하나의 사례가 포함됩니다.
따라서 DataFrame
테이블 형식으로 인해 추가 메타 데이터가 있으므로 Spark는 최종 쿼리에서 특정 최적화를 실행할 수 있습니다.
는 RDD
반면에, 단지 인 R esilient D istributed D ataset 제약으로 더 반대 행할 수있는 조작으로 최적화 될 수없는 데이터의 블랙 박스 인 것이, 아니다.
그러나, 당신은에 DataFrame에서 갈 수 RDD
의를 통해 rdd
방법, 당신은에서 갈 수있는 RDD
A와 DataFrame
비아합니다 (RDD가 표 형식 인 경우) toDF
방법
일반적으로DataFrame
내장 된 쿼리 최적화로 인해 가능한 경우 사용하는 것이 좋습니다 .
답변
첫 번째는
DataFrame
에서 진화되었습니다SchemaRDD
.
예 .. 사이의 변환 Dataframe
과는 RDD
절대적으로 가능하다.
다음은 일부 샘플 코드 스 니펫입니다.
df.rdd
이다RDD[Row]
다음은 데이터 프레임을 만드는 옵션 중 일부입니다.
-
1)
yourrddOffrow.toDF
로 변환합니다DataFrame
. -
2)
createDataFrame
SQL 컨텍스트 사용val df = spark.createDataFrame(rddOfRow, schema)
여기서 좋은 SO 게시물에 설명 된 것처럼 스키마는 아래 옵션 중 일부 일 수 있습니다 .
스칼라 사례 클래스 및 스칼라 반사 API에서import org.apache.spark.sql.catalyst.ScalaReflection val schema = ScalaReflection.schemaFor[YourScalacaseClass].dataType.asInstanceOf[StructType]
또는 사용
Encoders
import org.apache.spark.sql.Encoders val mySchema = Encoders.product[MyCaseClass].schema
Schema에서 설명한대로
StructType
and를
사용하여 만들 수도 있습니다.StructField
val schema = new StructType() .add(StructField("id", StringType, true)) .add(StructField("col1", DoubleType, true)) .add(StructField("col2", DoubleType, true)) etc...
실제로 3 개의 Apache Spark API가 있습니다.
-
RDD
API :
RDD
(탄력 분산 데이터 집합) API는 1.0 릴리스 이후 스파크되었습니다.
RDD
API 같은 많은 변환 방법을 제공한다map
()
filter
() 및reduce
()는 데이터에 대해 연산을 수행. 이러한 각 방법으로RDD
변환 된 데이터를 나타내는 새로운 결과가 생성됩니다. 그러나 이러한 메소드는 수행 할 조작을 정의하기 만하며 조치 메소드가 호출 될 때까지 변환이 수행되지 않습니다. 동작 방법의 예는collect
() 및
saveAsObjectFile
()입니다.
RDD 예 :
rdd.filter(_.age > 21) // transformation
.map(_.last)// transformation
.saveAsObjectFile("under21.bin") // action
예 : RDD를 사용하여 속성별로 필터링
rdd.filter(_.age > 21)
-
DataFrame
API
Spark 1.3
DataFrame
은 Project Tungsten 이니셔티브의 일환으로 Spark의 성능과 확장 성을 개선하기 위해 새로운 API를 도입했습니다 .DataFrame
API를 소개합니다 스키마의 개념 스키마를 관리하고 단지 자바 직렬화를 사용하는 것보다 훨씬 더 효율적인 방법으로, 노드간에 데이터를 전달하기 위해 불꽃을 수 있도록 데이터를 설명합니다.
DataFrame
API는 근본적으로 다르다RDD
는 스파크의 촉매 최적화 한 후 실행할 수있는 관계형 쿼리 계획을 구축하는 API이기 때문에 API. API는 쿼리 계획 작성에 익숙한 개발자에게 적합합니다.
SQL 스타일 예 :
df.filter("age > 21");
제한 사항 :
코드가 이름으로 데이터 특성을 참조하므로 컴파일러가 오류를 포착 할 수 없습니다. 속성 이름이 올바르지 않으면 쿼리 계획이 작성 될 때 런타임시에만 오류가 감지됩니다.
DataFrame
API의 또 다른 단점 은 매우 스칼라 중심이며 Java를 지원하지만 지원은 제한적이라는 것입니다.
예를 들어, DataFrame
기존 RDD
Java 객체에서 객체를 생성 할 때 Spark의 Catalyst 최적화 프로그램은 스키마를 유추 할 수 없으며 DataFrame의 객체가 scala.Product
인터페이스를 구현한다고 가정합니다 . Scala case class
는이 인터페이스를 구현하기 때문에 기본적으로 작동합니다.
-
Dataset
API
Dataset
스파크 1.6에서 API 미리보기로 출시 API는 두 세계의 최고를 제공하는 것을 목표로;RDD
API 의 친숙한 객체 지향 프로그래밍 스타일 및 컴파일 타임 유형 안전성 이지만 Catalyst 쿼리 최적화 프로그램의 성능 이점이 있습니다. 데이터 셋은 또한DataFrame
API 와 동일한 효율적인 오프 힙 스토리지 메커니즘을 사용합니다
.데이터 직렬화와 관련하여
Dataset
API에는 JVM 표현 (객체)과 Spark의 내부 이진 형식간에 변환 하는 인코더 개념이
있습니다. Spark에는 내장 된 인코더가있어 힙 코드가 아닌 데이터와 상호 작용하고 전체 객체를 직렬화 해제하지 않고도 개별 속성에 대한 주문형 액세스를 제공하기 위해 바이트 코드를 생성한다는 점에서 매우 고급화되었습니다. Spark는 아직 사용자 정의 인코더를 구현하기위한 API를 제공하지 않지만 향후 릴리스를 위해 계획되어 있습니다.또한
Dataset
API는 Java와 Scala 모두에서 동일하게 작동하도록 설계되었습니다. Java 객체로 작업 할 때는 완전히 Bean을 준수하는 것이 중요합니다.
Dataset
API SQL 스타일 예 :
dataset.filter(_.age < 21);
카탈리스트 레벨 흐름. . (Spark Summit에서 DataFrame 및 Dataset 프레젠테이션 이해하기)
더 읽어보기 … databricks article-3 가지 Apache Spark API 이야기 : RDD와 DataFrame 및 데이터 세트
답변
Apache Spark는 세 가지 유형의 API를 제공합니다
- RDD
- DataFrame
- 데이터 세트
다음은 RDD, Dataframe 및 Dataset 간의 API 비교입니다.
RDD
스파크가 제공하는 주요 추상화는 RRD (Resilient Distributed Dataset)로, 클러스터 노드에서 분할되어 병렬로 작동 할 수있는 요소의 모음입니다.
RDD 특징 :-
-
분산 콜렉션 :
RDD는 클러스터에서 병렬 분산 알고리즘을 사용하여 대용량 데이터 세트를 처리하고 생성하는 데 널리 채택되는 MapReduce 조작을 사용합니다. 사용자는 작업 분포 및 내결함성에 대해 걱정할 필요없이 일련의 고급 연산자를 사용하여 병렬 계산을 작성할 수 있습니다. -
불변 : RDD는 파티션 된 레코드 콜렉션으로 구성됩니다. 파티션은 RDD의 기본 병렬 처리 단위이며 각 파티션은 기존 파티션의 일부 변환을 통해 변경 불가능하고 생성되는 데이터의 논리적 분할입니다.
-
내결함성 :
RDD의 일부 파티션이 손실 된 경우 여러 노드에서 데이터 복제를 수행하지 않고 동일한 계산을 달성하기 위해 해당 파티션의 변환을 계보로 재생할 수 있습니다. 데이터 관리 및 복제에 많은 노력을 기울여 더 빠른 계산을 달성합니다. -
지연 평가 : Spark의 모든 변환은 결과가 바로 계산되지 않기 때문에 지연됩니다. 대신, 일부 기본 데이터 세트에 적용된 변환 만 기억합니다. 변환은 조치가 드라이버 프로그램으로 결과를 리턴해야하는 경우에만 계산됩니다.
-
기능 변환 :
RDD는 두 가지 유형의 작업, 즉 기존 데이터에서 새 데이터 집합을 만드는 변환과 데이터 집합에서 계산을 실행 한 후 드라이버 프로그램에 값을 반환하는 작업을 지원합니다. -
데이터 처리 형식 :
구조화되지 않은 데이터뿐만 아니라 구조화 된 데이터도 쉽고 효율적으로 처리 할 수 있습니다. -
프로그래밍 언어 지원 :
RDD API는 Java, Scala, Python 및 R에서 사용할 수 있습니다.
RDD 제한 사항 :-
-
내장 된 최적화 엔진 없음 :
구조화 된 데이터로 작업 할 때 RDD는 촉매 최적화 및 텅스텐 실행 엔진을 포함한 Spark의 고급 최적화 도구를 활용할 수 없습니다. 개발자는 속성에 따라 각 RDD를 최적화해야합니다. -
구조화 된 데이터 처리 : 데이터 프레임
및 데이터 세트와 달리 RDD는 수집 된 데이터의 스키마를 유추하지 않으며 사용자가이를 지정해야합니다.
데이터 프레임
Spark는 Spark 1.3 릴리스에서 데이터 프레임을 도입했습니다. 데이터 프레임은 RDD의 주요 과제를 극복합니다.
DataFrame은 명명 된 열로 구성된 분산 된 데이터 모음입니다. 관계형 데이터베이스 또는 R / Python 데이터 프레임의 테이블과 개념적으로 동일합니다. Spark는 Dataframe과 함께 고급 프로그래밍 기능을 활용하여 확장 가능한 쿼리 최적화 프로그램을 구축하는 촉매 최적화 도구도 도입했습니다.
데이터 프레임 특징 :-
-
행 개체의 분산 컬렉션 :
DataFrame은 명명 된 열로 구성된 분산 데이터 컬렉션입니다. 개념적으로 관계형 데이터베이스의 테이블과 동일하지만 기본적으로 풍부한 최적화 기능이 있습니다. -
데이터 처리 :
정형 및 비정형 데이터 형식 (Avro, CSV, 탄력적 검색 및 Cassandra) 및 스토리지 시스템 (HDFS, HIVE 테이블, MySQL 등) 처리 이 모든 다양한 데이터 소스에서 읽고 쓸 수 있습니다. -
Catalyst Optimizer를 사용한 최적화 :
SQL 쿼리와 DataFrame API를 모두 지원합니다. 데이터 프레임은 4 단계에서 촉매 트리 변환 프레임 워크를 사용합니다.1.Analyzing a logical plan to resolve references 2.Logical plan optimization 3.Physical planning 4.Code generation to compile parts of the query to Java bytecode.
-
Hive 호환성 :
Spark SQL을 사용하면 기존 Hive웨어 하우스에서 수정되지 않은 Hive 쿼리를 실행할 수 있습니다. Hive 프런트 엔드 및 MetaStore를 재사용하고 기존 Hive 데이터, 쿼리 및 UDF와 완벽하게 호환됩니다. -
텅스텐 :
텅스텐은 메모리를 명시 적으로 관리하고 식 평가를 위해 바이트 코드를 동적으로 생성하는 물리적 실행 백엔드를 제공합니다. -
프로그래밍 언어 지원 :
Dataframe API는 Java, Scala, Python 및 R로 제공됩니다.
데이터 프레임 제한 :-
- 컴파일 타임 유형 안전성 :
논의 된 바와 같이, Dataframe API는 구조를 모르는 경우 데이터 조작을 제한하는 컴파일 타임 안전성을 지원하지 않습니다. 다음 예제는 컴파일 시간 동안 작동합니다. 그러나이 코드를 실행할 때 런타임 예외가 발생합니다.
예:
case class Person(name : String , age : Int)
val dataframe = sqlContext.read.json("people.json")
dataframe.filter("salary > 10000").show
=> throws Exception : cannot resolve 'salary' given input age , name
여러 변환 및 집계 단계로 작업 할 때 특히 어려운 문제입니다.
- 도메인 객체 (손실 된 도메인 객체)에서 작동 할 수 없음 :
도메인 객체를 데이터 프레임으로 변환 한 후에는 도메인 객체를 다시 생성 할 수 없습니다. 다음 예에서는 personRDD에서 personDF를 만든 후에는 원래 RDD of Person 클래스 (RDD [Person])를 복구하지 않습니다.
예:
case class Person(name : String , age : Int)
val personRDD = sc.makeRDD(Seq(Person("A",10),Person("B",20)))
val personDF = sqlContext.createDataframe(personRDD)
personDF.rdd // returns RDD[Row] , does not returns RDD[Person]
데이터 세트 API
Dataset API는 형식에 안전한 객체 지향 프로그래밍 인터페이스를 제공하는 DataFrames의 확장입니다. 관계형 스키마에 매핑되는 강력한 형식의 변경 불가능한 개체 컬렉션입니다.
데이터 세트의 핵심에서 API는 엔코더라고하는 새로운 개념으로, JVM 객체와 테이블 형식 간의 변환을 담당합니다. 테이블 형식 표현은 Spark 내부 텅스텐 이진 형식을 사용하여 저장되므로 직렬화 된 데이터에 대한 작업이 가능하고 메모리 사용률이 향상됩니다. Spark 1.6은 기본 유형 (예 : String, Integer, Long), Scala 케이스 클래스 및 Java Bean을 포함하여 다양한 유형에 대한 인코더 자동 생성을 지원합니다.
데이터 셋 특징 :-
-
RDD (기능 프로그래밍, 형식 안전), DataFrame (관계형 모델, 쿼리 최적화, 텅스텐 실행, 정렬 및 셔플 링) : RDD 및 데이터 프레임 모두를 제공합니다. -
인코더 : 인코더를
사용하면 모든 JVM 객체를 데이터 세트로 쉽게 변환 할 수 있으므로 사용자는 데이터 프레임과 달리 구조화 된 데이터와 구조화되지 않은 데이터를 모두 사용할 수 있습니다. -
프로그래밍 언어 지원 :
Datasets API는 현재 Scala 및 Java에서만 사용할 수 있습니다. Python 및 R은 현재 버전 1.6에서 지원되지 않습니다. Python 지원은 버전 2.0에 예정되어 있습니다. -
Type Safety :
Datasets API는 데이터 프레임에서 사용할 수 없었던 컴파일 시간 안전을 제공합니다. 아래 예제에서 컴파일 람다 함수를 사용하여 도메인 객체에서 데이터 세트가 작동하는 방법을 확인할 수 있습니다.
예:
case class Person(name : String , age : Int)
val personRDD = sc.makeRDD(Seq(Person("A",10),Person("B",20)))
val personDF = sqlContext.createDataframe(personRDD)
val ds:Dataset[Person] = personDF.as[Person]
ds.filter(p => p.age > 25)
ds.filter(p => p.salary > 25)
// error : value salary is not a member of person
ds.rdd // returns RDD[Person]
- 상호 운용성 : 데이터 세트를 사용하면 기존 RDD 및 데이터 프레임을 상용구 코드없이 데이터 세트로 쉽게 변환 할 수 있습니다.
데이터 세트 API 제한 :-
- String으로 타입 캐스팅이 필요합니다 :
현재 데이터 셋에서 데이터를 쿼리하려면 클래스의 필드를 문자열로 지정해야합니다. 데이터를 쿼리 한 후에는 필요한 데이터 유형으로 열을 캐스트해야합니다. 반면에 데이터 세트에서 맵 작업을 사용하면 Catalyst 옵티마이 저가 사용되지 않습니다.
예:
ds.select(col("name").as[String], $"age".as[Int]).collect()
Python 및 R 지원 안 함 : 릴리스 1.6부터 데이터 세트는 Scala 및 Java 만 지원합니다. Python 2.0에 Spark 지원이 도입 될 예정입니다.
Datasets API는 기존 RDD 및 Dataframe API에 비해 형식 안전성 및 기능 프로그래밍이 향상되어 여러 가지 이점을 제공합니다.
답변
한 장의 모든 사진 (RDD, DataFrame 및 DataSet).
RDD
RDD
병렬로 작동 할 수있는 내결함성 요소 컬렉션입니다.
DataFrame
DataFrame
명명 된 열로 구성된 데이터 집합입니다. 관계형 데이터베이스의 테이블 또는 R / Python의 데이터 프레임과 개념적으로 동일 하지만 후드 아래에서 더 풍부한 최적화를 제공합니다 .
Dataset
Dataset
분산 된 데이터 모음입니다. 데이터 세트는 Spark 1.6에 추가 된 새로운 인터페이스 로 Spark SQL의 최적화 된 실행 엔진의 이점
과 함께 RDD (강력한 타이핑, 강력한 람다 함수 사용 기능)의
이점을 제공 합니다.
노트 :
Dataset[Row]
스칼라 / 자바 의 행 데이터 세트 ( )는 종종 DataFrames라고 합니다.
Nice comparison of all of them with a code snippet.
Q : RDD와 같이 DataFrame으로 또는 그 반대로 변환 할 수 있습니까?
예, 둘 다 가능합니다
1. RDD
~ DataFrame
와.toDF()
val rowsRdd: RDD[Row] = sc.parallelize(
Seq(
Row("first", 2.0, 7.0),
Row("second", 3.5, 2.5),
Row("third", 7.0, 5.9)
)
)
val df = spark.createDataFrame(rowsRdd).toDF("id", "val1", "val2")
df.show()
+------+----+----+
| id|val1|val2|
+------+----+----+
| first| 2.0| 7.0|
|second| 3.5| 2.5|
| third| 7.0| 5.9|
+------+----+----+
더 많은 방법 : Spark에서 RDD 객체를 데이터 프레임으로 변환
2. DataFrame
/ DataSet
행 RDD
과 .rdd()
방법
val rowsRdd: RDD[Row] = df.rdd() // DataFrame to RDD
답변
때문에 DataFrame
약하게 입력하고, 개발자는 타입 시스템의 혜택을받지 못하고있다. 예를 들어, SQL에서 무언가를 읽고 그것에 대한 집계를 실행한다고 가정 해 봅시다.
val people = sqlContext.read.parquet("...")
val department = sqlContext.read.parquet("...")
people.filter("age > 30")
.join(department, people("deptId") === department("id"))
.groupBy(department("name"), "gender")
.agg(avg(people("salary")), max(people("age")))
당신이 말할 때 people("deptId")
, Int
또는 Long
을 되찾지 않고 Column
조작해야하는 객체를 되 찾습니다. 스칼라와 같은 풍부한 유형 시스템을 사용하는 언어에서는 결국 모든 유형 안전을 잃어 컴파일 타임에 발견 될 수있는 것들에 대한 런타임 오류 수가 증가합니다.
반대로 DataSet[T]
입력됩니다. 당신이 할 때 :
val people: People = val people = sqlContext.read.parquet("...").as[People]
당신은 실제로 돌아오고있다 People
deptId
열 유형이 아닌 실제 정수 유형 인 객체 유형 시스템을 활용합니다.
Spark 2.0부터 DataFrame 및 DataSet API가 통합되며 여기서는에 DataFrame
대한 유형 별칭이됩니다 DataSet[Row]
.
답변
단순히 RDD
핵심 구성 요소이지만DataFrame
spark 1.30에 도입 된 API입니다.
RDD
라는 데이터 파티션 모음 RDD
. 이들은 RDD
다음과 같은 몇 가지 속성을 따라야합니다.
- 불변,
- 내결함성,
- 분산,
- 더.
여기 RDD
구조화되거나 구조화되어 있지 않습니다.
DataFrame
DataFrame
Scala, Java, Python 및 R에서 사용 가능한 API입니다. 모든 유형의 구조적 및 반 구조적 데이터를 처리 할 수 있습니다. 를 정의하기 위해 DataFrame
이라는 명명 된 열로 구성된 분산 데이터 모음입니다 DataFrame
. 당신은 쉽게 최적화 할 수 있습니다 RDDs
에 DataFrame
. 를 사용하여 JSON 데이터, 쪽모이 세공 데이터, HiveQL 데이터를 한 번에 처리 할 수 있습니다 DataFrame
.
val sampleRDD = sqlContext.jsonFile("hdfs://localhost:9000/jsondata.json")
val sample_DF = sampleRDD.toDF()
여기서 Sample_DF는로 간주됩니다 DataFrame
. sampleRDD
은 (원시 데이터) RDD
입니다.
답변
대부분의 대답은 정확합니다. 여기에 한 점만 추가하고 싶습니다.
Spark 2.0에서는 두 API (DataFrame + DataSet)가 단일 API로 통합됩니다.
“데이터 프레임과 데이터 세트 통합 : 스칼라와 자바에서 데이터 프레임과 데이터 세트가 통합되었습니다. 즉, 데이터 프레임은 행의 데이터 세트에 대한 유형 별칭 일뿐입니다. 파이썬과 R에서는 형식 안전성이 부족한 경우 데이터 프레임이 기본 프로그래밍 인터페이스입니다.”
데이터 세트는 RDD와 유사하지만 Java 직렬화 또는 Kryo 대신 특수 인코더를 사용하여 처리 또는 네트워크를 통해 전송하기 위해 객체를 직렬화합니다.
Spark SQL은 기존 RDD를 데이터 세트로 변환하는 두 가지 방법을 지원합니다. 첫 번째 방법은 리플렉션을 사용하여 특정 유형의 객체를 포함하는 RDD의 스키마를 유추합니다. 이 리플렉션 기반 접근 방식은보다 간결한 코드로 연결되며 Spark 애플리케이션을 작성하는 동안 이미 스키마를 알고있을 때 잘 작동합니다.
데이터 집합을 만드는 두 번째 방법은 스키마를 구성한 다음 기존 RDD에 적용 할 수있는 프로그래밍 인터페이스를 사용하는 것입니다. 이 방법은 더 장황하지만 런타임까지 열과 해당 유형을 알 수없는 경우 데이터 집합을 구성 할 수 있습니다.
여기에서 RDD tof 데이터 프레임 대화 답변을 찾을 수 있습니다