매분 데이터 세트를 생성하는 스파크 스트리밍 응용 프로그램이 있습니다. 처리 된 데이터의 결과를 저장 / 덮어 쓰기해야합니다.
org.apache.hadoop.mapred.FileAlreadyExistsException 데이터 세트를 덮어 쓰려고하면 실행이 중지됩니다.
Spark 속성을 설정 set("spark.files.overwrite","true")
했지만 운이 없습니다.
Spark에서 파일을 덮어 쓰거나 미리 삭제하는 방법은 무엇입니까?
답변
업데이트 : 사용 제안 Dataframes
및 ... .write.mode(SaveMode.Overwrite) ...
.
핸디 포주 :
implicit class PimpedStringRDD(rdd: RDD[String]) {
def write(p: String)(implicit ss: SparkSession): Unit = {
import ss.implicits._
rdd.toDF().as[String].write.mode(SaveMode.Overwrite).text(p)
}
}
이전 버전의 경우
yourSparkConf.set("spark.hadoop.validateOutputSpecs", "false")
val sc = SparkContext(yourSparkConf)
1.1.0에서는 –conf 플래그와 함께 spark-submit 스크립트를 사용하여 conf 설정을 지정할 수 있습니다.
경고 (이전 버전) : @piggybox에 따르면 Spark에는 파일을 작성하는 데 필요한 파일 만 덮어 쓰는 버그 part-
가 있으며 다른 파일은 제거되지 않은 상태로 유지됩니다.
답변
이후 df.save(path, source, mode)
사용되지 않으며, ( http://spark.apache.org/docs/1.5.0/api/scala/index.html#org.apache.spark.sql.DataFrame )
사용 df.write.format(source).mode("overwrite").save(path)
df.write이 DataFrameWriter입니다
‘source’는 ( “com.databricks.spark.avro”| “parquet”| “json”) 일 수 있습니다.
답변
매개 변수에 대한 문서는 spark.files.overwrite
” SparkContext.addFile()
대상 파일이 존재하고 그 내용이 소스의 내용과 일치하지 않을 때 추가 된 파일을 덮어 쓸지 여부”라고 말합니다 . 따라서 saveAsTextFiles 메서드에는 영향을주지 않습니다.
파일을 저장하기 전에 다음을 수행 할 수 있습니다.
val hadoopConf = new org.apache.hadoop.conf.Configuration()
val hdfs = org.apache.hadoop.fs.FileSystem.get(new java.net.URI("hdfs://localhost:9000"), hadoopConf)
try { hdfs.delete(new org.apache.hadoop.fs.Path(filepath), true) } catch { case _ : Throwable => { } }
Aas는 http://apache-spark-user-list.1001560.n3.nabble.com/How-can-I-make-Spark-1-0-saveAsTextFile-to-overwrite-existing-file-td6696에 설명되어 있습니다
. HTML
답변
로부터 pyspark.sql.DataFrame.save의 문서 (현재 1.3.1에서)을 지정할 수 있습니다 mode='overwrite'
DataFrame을 저장할 때 :
myDataFrame.save(path='myPath', source='parquet', mode='overwrite')
나는 이것이 남은 파티션 파일도 제거한다는 것을 확인했습니다. 따라서 원래 10 개의 파티션 / 파일이 있다고 말했지만 6 개의 파티션 만있는 DataFrame으로 폴더를 덮어 쓴 경우 결과 폴더에는 6 개의 파티션 / 파일이 있습니다.
참고 항목 스파크 SQL 설명서를 모드 옵션에 대한 자세한 내용은.
답변
df.write.mode('overwrite').parquet("/output/folder/path")
파이썬을 사용하여 마루 파일을 덮어 쓰려면 작동합니다. 이것은 스파크 1.6.2에 있습니다. API는 이후 버전에서 다를 수 있습니다.
답변
val jobName = "WordCount";
//overwrite the output directory in spark set("spark.hadoop.validateOutputSpecs", "false")
val conf = new
SparkConf().setAppName(jobName).set("spark.hadoop.validateOutputSpecs", "false");
val sc = new SparkContext(conf)
답변
이 오버로드 된 버전의 저장 기능은 저에게 효과적입니다.
yourDF.save (outputPath, org.apache.spark.sql.SaveMode.valueOf ( “덮어 쓰기”))
위의 예는 기존 폴더를 덮어 씁니다. savemode는 다음 매개 변수도 사용할 수 있습니다 ( https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/sql/SaveMode.html ) :
Append : Append 모드는 DataFrame을 데이터 소스에 저장할 때 데이터 / 테이블이 이미 존재하는 경우 DataFrame의 내용이 기존 데이터에 추가되는 것을 의미합니다.
ErrorIfExists : 데이터 소스에 DataFrame를 저장할 때 데이터가 이미 존재하는 경우 ErrorIfExists 모드 수단은, 예외가 발생 될 것으로 예상된다.
Ignore : Ignore 모드는 DataFrame을 데이터 소스에 저장할 때 데이터가 이미 존재하는 경우 저장 작업이 DataFrame의 내용을 저장하지 않고 기존 데이터를 변경하지 않을 것으로 예상됨을 의미합니다.