[scala] DataFrame을 Hive에 직접 저장하는 방법은 무엇입니까?

DataFrameSpark에서 Hive에 직접 저장할 수 있습니까?

변환 DataFrame을 시도한 Rdd다음 텍스트 파일로 저장 한 다음 하이브에로드했습니다. 하지만 dataframe벌집에 직접 저장할 수 있는지 궁금합니다.



답변

메모리 내 임시 테이블을 만들고 sqlContext를 사용하여 하이브 테이블에 저장할 수 있습니다.

데이터 프레임이 myDf라고 가정 해 보겠습니다. 다음을 사용하여 하나의 임시 테이블을 만들 수 있습니다.

myDf.createOrReplaceTempView("mytempTable")

그런 다음 간단한 하이브 문을 사용하여 테이블을 만들고 임시 테이블에서 데이터를 덤프 할 수 있습니다.

sqlContext.sql("create table mytable as select * from mytempTable");


답변

사용 DataFrameWriter.saveAsTable. ( df.write.saveAsTable(...)) Spark SQL 및 DataFrame 가이드를 참조하십시오 .


답변

df.write.saveAsTable(...)Spark 2.0 문서에서 더 이상 사용되지 않는 항목이 표시 되지 않습니다. Amazon EMR에서 우리를 위해 일했습니다. 우리는 S3에서 데이터 프레임으로 데이터를 읽고, 처리하고, 결과에서 테이블을 생성하고, MicroStrategy로 읽을 수있었습니다. Vinays 답변도 작동했습니다.


답변

HiveContext를 가지고 / 생성해야합니다.

import org.apache.spark.sql.hive.HiveContext;

HiveContext sqlContext = new org.apache.spark.sql.hive.HiveContext(sc.sc());

그런 다음 데이터 프레임을 직접 저장하거나 하이브 테이블로 저장할 열을 선택합니다.

df는 데이터 프레임입니다.

df.write().mode("overwrite").saveAsTable("schemaName.tableName");

또는

df.select(df.col("col1"),df.col("col2"), df.col("col3")) .write().mode("overwrite").saveAsTable("schemaName.tableName");

또는

df.write().mode(SaveMode.Overwrite).saveAsTable("dbName.tableName");

SaveMode는 Append / Ignore / Overwrite / ErrorIfExists입니다.

여기에 Spark Documentation의 HiveContext에 대한 정의를 추가했습니다.

기본 SQLContext 외에도 기본 SQLContext에서 제공하는 기능의 상위 집합을 제공하는 HiveContext를 만들 수도 있습니다. 추가 기능에는보다 완전한 HiveQL 파서를 사용하여 쿼리를 작성하는 기능, Hive UDF에 대한 액세스 및 Hive 테이블에서 데이터를 읽는 기능이 포함됩니다. HiveContext를 사용하기 위해 기존 Hive 설정이 필요하지 않으며 SQLContext에 사용 가능한 모든 데이터 소스를 계속 사용할 수 있습니다. HiveContext는 기본 Spark 빌드에 Hive의 모든 종속성을 포함하지 않도록 개별적으로 만 패키징됩니다.


Spark 버전 1.6.2에서 “dbName.tableName”을 사용하면 다음 오류가 발생합니다.

org.apache.spark.sql.AnalysisException : 임시 테이블에는 데이터베이스 이름 또는 기타 규정자를 지정할 수 없습니다. 테이블 이름에 마침표 (.)가있는 경우 백틱 ()으로 테이블 이름을 인용하십시오 .`


답변

Hive에 저장하는 것은 write()SQLContext의 메소드를 사용 하는 것입니다.

df.write.saveAsTable(tableName)

https://spark.apache.org/docs/2.1.0/api/java/org/apache/spark/sql/DataFrameWriter.html#saveAsTable(java.lang.String) 참조

Spark 2.2에서 : DataFrame 대신 DataSet을 사용합니다.


답변

게시물에 늦게 작성하여 죄송하지만 수락 된 답변이 없습니다.

df.write().saveAsTable던지고 AnalysisExceptionHIVE 테이블과 호환되지 않습니다.

DF를 저장 df.write().format("hive")하면 트릭이 필요합니다!

그러나 그것이 작동하지 않으면 이전 의견과 답변으로 이동하면 이것이 내 의견으로는 가장 좋은 해결책입니다 (그러나 제안에 개방적입니다).

가장 좋은 방법은 HIVE 테이블 (PARTITIONED 테이블 포함)을 명시 적으로 생성하는 것입니다.

def createHiveTable: Unit ={
spark.sql("CREATE TABLE $hive_table_name($fields) " +
  "PARTITIONED BY ($partition_column String) STORED AS $StorageType")
}

DF를 임시 테이블로 저장하고,

df.createOrReplaceTempView("$tempTableName")

PARTITIONED HIVE 테이블에 삽입하십시오.

spark.sql("insert into table default.$hive_table_name PARTITION($partition_column) select * from $tempTableName")
spark.sql("select * from default.$hive_table_name").show(1000,false)

Offcourse 에서 DF 의 LAST COLUMNPARTITION COLUMN이 되므로 이에 따라 HIVE 테이블을 생성하십시오!

작동한다면 댓글을 남겨주세요! 또는 아닙니다.


–최신 정보–

df.write()
  .partitionBy("$partition_column")
  .format("hive")
  .mode(SaveMode.append)
  .saveAsTable($new_table_name_to_be_created_in_hive)  //Table should not exist OR should be a PARTITIONED table in HIVE


답변

다음은 parquet 파일에서 Hive 테이블을 만드는 PySpark 버전입니다. 추론 된 스키마를 사용하여 Parquet 파일을 생성했으며 이제 정의를 Hive 메타 스토어에 푸시하려고 할 수 있습니다. 정의를 Hive 메타 스토어뿐만 아니라 AWS Glue 또는 AWS Athena와 같은 시스템에 푸시 할 수도 있습니다. 여기에서는 spark.sql을 사용하여 영구 테이블을 푸시 / 생성합니다.

   # Location where my parquet files are present.
    df = spark.read.parquet("s3://my-location/data/")
    cols = df.dtypes
    buf = []
    buf.append('CREATE EXTERNAL TABLE test123 (')
    keyanddatatypes =  df.dtypes
    sizeof = len(df.dtypes)
    print ("size----------",sizeof)
    count=1;
    for eachvalue in keyanddatatypes:
        print count,sizeof,eachvalue
        if count == sizeof:
            total = str(eachvalue[0])+str(' ')+str(eachvalue[1])
        else:
            total = str(eachvalue[0]) + str(' ') + str(eachvalue[1]) + str(',')
        buf.append(total)
        count = count + 1

    buf.append(' )')
    buf.append(' STORED as parquet ')
    buf.append("LOCATION")
    buf.append("'")
    buf.append('s3://my-location/data/')
    buf.append("'")
    buf.append("'")
    ##partition by pt
    tabledef = ''.join(buf)

    print "---------print definition ---------"
    print tabledef
    ## create a table using spark.sql. Assuming you are using spark 2.1+
    spark.sql(tabledef);