[scala] DataFrame의 파티셔닝을 정의하는 방법은 무엇입니까?

Spark 1.4.0에서 Spark SQL 및 DataFrames를 사용하기 시작했습니다. Scala의 DataFrames에서 사용자 지정 파티 셔 너를 정의하고 싶지만이 작업을 수행하는 방법을 보지 못했습니다.

작업중인 데이터 테이블 중 하나에는 다음 예제에 대한 silimar 계정 별 트랜잭션 목록이 포함되어 있습니다.

Account   Date       Type       Amount
1001    2014-04-01  Purchase    100.00
1001    2014-04-01  Purchase     50.00
1001    2014-04-05  Purchase     70.00
1001    2014-04-01  Payment    -150.00
1002    2014-04-01  Purchase     80.00
1002    2014-04-02  Purchase     22.00
1002    2014-04-04  Payment    -120.00
1002    2014-04-04  Purchase     60.00
1003    2014-04-02  Purchase    210.00
1003    2014-04-03  Purchase     15.00

적어도 처음에는 대부분의 계산이 계정 내의 트랜잭션간에 발생합니다. 따라서 한 계정의 모든 트랜잭션이 동일한 Spark 파티션에 있도록 데이터를 분할하고 싶습니다.

그러나 나는 이것을 정의하는 방법을 보지 못하고 있습니다. DataFrame 클래스에는 만들 파티션 수를 지정할 수있는 ‘repartition (Int)’라는 메서드가 있습니다. 하지만 RDD에 대해 지정할 수있는 것과 같이 DataFrame에 대한 사용자 지정 파티 셔 너를 정의하는 데 사용할 수있는 방법이 없습니다.

소스 데이터는 Parquet에 저장됩니다. DataFrame을 Parquet에 쓸 때 분할 할 열을 지정할 수 있으므로 Parquet에게 ‘계정’열을 기준으로 데이터를 분할하도록 지시 할 수 있음을 확인했습니다. 그러나 수백만 개의 계정이있을 수 있으며 Parquet을 올바르게 이해하면 각 계정에 대해 별개의 디렉터리를 생성하므로 합리적인 솔루션처럼 들리지 않았습니다.

계정의 모든 데이터가 동일한 파티션에 있도록 Spark가이 DataFrame을 분할하는 방법이 있습니까?



답변

스파크> = 2.3.0

SPARK-22614 는 범위 분할을 노출합니다.

val partitionedByRange = df.repartitionByRange(42, $"k")

partitionedByRange.explain
// == Parsed Logical Plan ==
// 'RepartitionByExpression ['k ASC NULLS FIRST], 42
// +- AnalysisBarrier Project [_1#2 AS k#5, _2#3 AS v#6]
// 
// == Analyzed Logical Plan ==
// k: string, v: int
// RepartitionByExpression [k#5 ASC NULLS FIRST], 42
// +- Project [_1#2 AS k#5, _2#3 AS v#6]
//    +- LocalRelation [_1#2, _2#3]
// 
// == Optimized Logical Plan ==
// RepartitionByExpression [k#5 ASC NULLS FIRST], 42
// +- LocalRelation [k#5, v#6]
// 
// == Physical Plan ==
// Exchange rangepartitioning(k#5 ASC NULLS FIRST, 42)
// +- LocalTableScan [k#5, v#6]

SPARK-22389데이터 소스 API v2 에서 외부 형식 파티셔닝을 노출합니다 .

스파크> = 1.6.0

Spark> = 1.6에서는 쿼리 및 캐싱을 위해 열별 분할을 사용할 수 있습니다. 참조 : SPARK-11410SPARK-4849 사용 repartition방법 :

val df = Seq(
  ("A", 1), ("B", 2), ("A", 3), ("C", 1)
).toDF("k", "v")

val partitioned = df.repartition($"k")
partitioned.explain

// scala> df.repartition($"k").explain(true)
// == Parsed Logical Plan ==
// 'RepartitionByExpression ['k], None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
//    +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
// 
// == Analyzed Logical Plan ==
// k: string, v: int
// RepartitionByExpression [k#7], None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
//    +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
// 
// == Optimized Logical Plan ==
// RepartitionByExpression [k#7], None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
//    +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
// 
// == Physical Plan ==
// TungstenExchange hashpartitioning(k#7,200), None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
//    +- Scan PhysicalRDD[_1#5,_2#6]

RDDsSpark Dataset( Dataset[Row]aka 포함 DataFrame) 와 달리 현재로서는 사용자 지정 파티 셔 너를 사용할 수 없습니다. 일반적으로 인위적인 분할 열을 생성하여이 문제를 해결할 수 있지만 동일한 유연성을 제공하지는 않습니다.

Spark <1.6.0 :

당신이 할 수있는 한 가지는 당신이 생성하기 전에 입력 데이터를 미리 분할하는 것입니다. DataFrame

import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import org.apache.spark.HashPartitioner

val schema = StructType(Seq(
  StructField("x", StringType, false),
  StructField("y", LongType, false),
  StructField("z", DoubleType, false)
))

val rdd = sc.parallelize(Seq(
  Row("foo", 1L, 0.5), Row("bar", 0L, 0.0), Row("??", -1L, 2.0),
  Row("foo", -1L, 0.0), Row("??", 3L, 0.6), Row("bar", -3L, 0.99)
))

val partitioner = new HashPartitioner(5)

val partitioned = rdd.map(r => (r.getString(0), r))
  .partitionBy(partitioner)
  .values

val df = sqlContext.createDataFrame(partitioned, schema)

이후 DataFrame에서 작성 RDD파티션 레이아웃 기존 단지 간단한지도 상 필요 * 보존한다 :

assert(df.rdd.partitions == partitioned.partitions)

동일한 방법으로 기존의 파티션을 다시 분할 할 수 있습니다 DataFrame.

sqlContext.createDataFrame(
  df.rdd.map(r => (r.getInt(1), r)).partitionBy(partitioner).values,
  df.schema
)

그래서 불가능하지 않은 것 같습니다. 그것이 의미가 있다면 문제는 남아 있습니다. 나는 대부분의 경우 다음과 같지 않다고 주장 할 것입니다.

  1. 재 파티션은 비용이 많이 드는 프로세스입니다. 일반적인 시나리오에서 대부분의 데이터는 직렬화, 셔플 및 역 직렬화되어야합니다. 반면에 사전 분할 된 데이터의 이점을 얻을 수있는 작업 수는 상대적으로 적으며 내부 API가이 속성을 활용하도록 설계되지 않은 경우 더 제한됩니다.

    • 일부 시나리오에서는 조인하지만 내부 지원이 필요합니다.
    • 창 함수는 일치하는 파티 셔너로 호출합니다. 위와 동일하며 단일 창 정의로 제한됩니다. 하지만 이미 내부적으로 분할되어 있으므로 사전 분할이 중복 될 수 있습니다.
    • 간단한 집계 GROUP BY-임시 버퍼 **의 메모리 사용량을 줄일 수 있지만 전체 비용은 훨씬 더 높습니다. groupByKey.mapValues(_.reduce)(현재 동작) 대 reduceByKey(사전 분할) 과 다소 동일합니다 . 실제로 유용 할 것 같지 않습니다.
    • 로 데이터 압축 SqlContext.cacheTable. 실행 길이 인코딩을 사용하는 것처럼 보이므로 적용 OrderedRDDFunctions.repartitionAndSortWithinPartitions하면 압축률이 향상 될 수 있습니다.
  2. 성능은 키 배포에 크게 좌우됩니다. 치우친 경우 리소스 사용률이 최적화되지 않습니다. 최악의 시나리오에서는 작업을 전혀 완료 할 수 없습니다.

  3. 높은 수준의 선언적 API 사용의 요점은 낮은 수준의 구현 세부 정보에서 자신을 격리하는 것입니다. @dwysakowicz@RomiKuntsman이 이미 언급했듯이 최적화는 Catalyst Optimizer 의 작업입니다. . 그것은 매우 정교한 짐승이며 내부에 훨씬 더 깊이 들어 가지 않고도 쉽게 향상시킬 수 있을지 의심 스럽습니다.

관련 개념

JDBC 소스로 파티셔닝 :

JDBC 데이터 소스는 predicates인수를 지원 합니다. 다음과 같이 사용할 수 있습니다.

sqlContext.read.jdbc(url, table, Array("foo = 1", "foo = 3"), props)

술어 당 하나의 JDBC 파티션을 작성합니다. 개별 조건자를 사용하여 생성 된 집합이 분리되지 않은 경우 결과 테이블에 중복 항목이 표시됩니다.

partitionBy 방법 DataFrameWriter :

Spark DataFrameWriterpartitionBy쓰기시 데이터를 “분할”하는 데 사용할 수있는 방법을 제공합니다 . 제공된 열 세트를 사용하여 쓰기시 데이터를 분리합니다.

val df = Seq(
  ("foo", 1.0), ("bar", 2.0), ("foo", 1.5), ("bar", 2.6)
).toDF("k", "v")

df.write.partitionBy("k").json("/tmp/foo.json")

이를 통해 키 기반 쿼리에 대해 읽기시 술어 푸시 다운을 사용할 수 있습니다.

val df1 = sqlContext.read.schema(df.schema).json("/tmp/foo.json")
df1.where($"k" === "bar")

그러나 DataFrame.repartition. 특히 다음과 같은 집계 :

val cnts = df1.groupBy($"k").sum()

여전히 다음이 필요합니다 TungstenExchange.

cnts.explain

// == Physical Plan ==
// TungstenAggregate(key=[k#90], functions=[(sum(v#91),mode=Final,isDistinct=false)], output=[k#90,sum(v)#93])
// +- TungstenExchange hashpartitioning(k#90,200), None
//    +- TungstenAggregate(key=[k#90], functions=[(sum(v#91),mode=Partial,isDistinct=false)], output=[k#90,sum#99])
//       +- Scan JSONRelation[k#90,v#91] InputPaths: file:/tmp/foo.json

bucketBy 방법 DataFrameWriter (스파크> = 2.0) :

bucketBy와 유사한 응용 프로그램이 partitionBy있지만 테이블 ( saveAsTable) 에만 사용할 수 있습니다 . 버 케팅 정보를 사용하여 조인을 최적화 할 수 있습니다.

// Temporarily disable broadcast joins
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

df.write.bucketBy(42, "k").saveAsTable("df1")
val df2 = Seq(("A", -1.0), ("B", 2.0)).toDF("k", "v2")
df2.write.bucketBy(42, "k").saveAsTable("df2")

// == Physical Plan ==
// *Project [k#41, v#42, v2#47]
// +- *SortMergeJoin [k#41], [k#46], Inner
//    :- *Sort [k#41 ASC NULLS FIRST], false, 0
//    :  +- *Project [k#41, v#42]
//    :     +- *Filter isnotnull(k#41)
//    :        +- *FileScan parquet default.df1[k#41,v#42] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/spark-warehouse/df1], PartitionFilters: [], PushedFilters: [IsNotNull(k)], ReadSchema: struct<k:string,v:int>
//    +- *Sort [k#46 ASC NULLS FIRST], false, 0
//       +- *Project [k#46, v2#47]
//          +- *Filter isnotnull(k#46)
//             +- *FileScan parquet default.df2[k#46,v2#47] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/spark-warehouse/df2], PartitionFilters: [], PushedFilters: [IsNotNull(k)], ReadSchema: struct<k:string,v2:double>

* 파티션 레이아웃 이란 데이터 배포만을 의미합니다. partitionedRDD에는 더 이상 파티 셔 너가 없습니다. ** 초기 예측이 없다고 가정합니다. 집계가 열의 작은 하위 집합 만 포함하는 경우에는 전혀 이득이 없습니다.


답변

Spark <1.6 HiveContext에서 일반 오래된 SqlContext것이 아니라 를 생성하는 경우 HiveQL을 사용할 수 있습니다 DISTRIBUTE BY colX...(N 감속기 각각이 x의 겹치지 않는 범위를 얻도록 보장) & CLUSTER BY colX...(배포 기준 및 정렬 기준의 단축키).

df.registerTempTable("partitionMe")
hiveCtx.sql("select * from partitionMe DISTRIBUTE BY accountId SORT BY accountId, date")

이것이 Spark DF api와 어떻게 맞는지 잘 모르겠습니다. 이러한 키워드는 일반 SqlContext에서 지원되지 않습니다 (HiveContext를 사용하기 위해 하이브 메타 저장소가 필요하지 않음).

편집 : Spark 1.6 이상은 이제 네이티브 DataFrame API에 있습니다.


답변

그래서 어떤 종류의 대답으로 시작하려면 :)-당신은 할 수 없습니다

나는 전문가는 아니지만 DataFrames를 이해하는 한 rdd와 같지 않으며 DataFrame에는 Partitioner와 같은 것이 없습니다.

일반적으로 DataFrame의 아이디어는 이러한 문제 자체를 처리하는 또 다른 수준의 추상화를 제공하는 것입니다. DataFrame에 대한 쿼리는 RDD에 대한 작업으로 추가로 변환되는 논리적 계획으로 변환됩니다. 제안한 파티셔닝은 아마도 자동으로 적용되거나 적어도 적용되어야합니다.

SparkSQL이 어떤 종류의 최적의 작업을 제공 할 것이라고 신뢰하지 않는 경우 의견에서 제안한대로 DataFrame을 RDD [Row]로 변환 할 수 있습니다.


답변

다음에서 반환 된 DataFrame을 사용합니다.

yourDF.orderBy(account)

명시적인 사용 방법이 없습니다. partitionByDataFrame에서 하는 은 PairRDD에서만 사용할 수 있지만 DataFrame을 정렬 할 때 LogicalPlan에서 사용하고 각 계정에 대해 계산해야 할 때 도움이됩니다.

계정별로 분할하려는 데이터 프레임으로 똑같은 문제를 발견했습니다. “계정에 대한 모든 트랜잭션이 동일한 Spark 파티션에 있도록 데이터를 분할하고 싶다”고 말할 때 규모와 성능을 위해 데이터를 분할하고 싶지만 코드가 이에 의존하지 않는다고 가정합니다 (예 : mapPartitions()등), 맞습니까?


답변

나는 RDD를 사용하여 이것을 할 수 있었다. 그러나 이것이 당신에게 허용 가능한 해결책인지 모르겠습니다. DF를 RDD로 사용할 수 있으면 신청 repartitionAndSortWithinPartitions하여 데이터의 사용자 지정 재 파티션을 수행 할 수 있습니다 .

다음은 내가 사용한 샘플입니다.

class DatePartitioner(partitions: Int) extends Partitioner {

  override def getPartition(key: Any): Int = {
    val start_time: Long = key.asInstanceOf[Long]
    Objects.hash(Array(start_time)) % partitions
  }

  override def numPartitions: Int = partitions
}

myRDD
  .repartitionAndSortWithinPartitions(new DatePartitioner(24))
  .map { v => v._2 }
  .toDF()
  .write.mode(SaveMode.Overwrite)


답변