[hadoop] Hadoop에서 여러 MapReduce 작업 연결

MapReduce를 적용하는 많은 실제 상황에서 최종 알고리즘은 여러 MapReduce 단계가됩니다.

즉, Map1, Reduce1, Map2, Reduce2 등.

따라서 다음 맵에 대한 입력으로 필요한 마지막 감소의 출력이 있습니다.

중간 데이터는 파이프 라인이 성공적으로 완료되면 (일반적으로) 유지하고 싶지 않은 것입니다. 또한이 중간 데이터는 일반적으로 일부 데이터 구조 (예 : ‘맵’또는 ‘세트’)이기 때문에 이러한 키-값 쌍을 작성하고 읽는 데 너무 많은 노력을 기울이고 싶지 않습니다.

Hadoop에서 권장하는 방법은 무엇입니까?

나중에 정리를 포함하여이 중간 데이터를 올바른 방식으로 처리하는 방법을 보여주는 (간단한) 예가 있습니까?



답변

야후의 개발자 네트워크에 대한이 튜토리얼이 이것에 도움이 될 것이라고 생각합니다 : Chaining Jobs

당신은 JobClient.runJob(). 첫 번째 작업의 데이터 출력 경로가 두 번째 작업의 입력 경로가됩니다. 이를 구문 분석하고 작업에 대한 매개 변수를 설정하려면 적절한 코드를 사용하여 작업에 인수로 전달해야합니다.

그러나 위의 방법이 지금은 이전에 매핑 된 API가 수행 한 방식 일 수 있지만 여전히 작동해야한다고 생각합니다. 새로운 mapreduce API에도 비슷한 방법이 있지만 그것이 무엇인지 잘 모르겠습니다.

작업이 완료된 후 중간 데이터를 제거하는 한 코드에서이를 수행 할 수 있습니다. 내가 전에 한 방식은 다음과 같은 것을 사용하는 것입니다.

FileSystem.delete(Path f, boolean recursive);

경로는 데이터의 HDFS에있는 위치입니다. 다른 작업에 필요하지 않은 경우에만이 데이터를 삭제해야합니다.


답변

할 수있는 방법에는 여러 가지가 있습니다.

(1) 계단식 작업

첫 번째 작업에 대한 JobConf 개체 “job1″을 만들고 “input”을 inputdirectory로, “temp”를 출력 디렉터리로 모든 매개 변수를 설정합니다. 이 작업 실행 :

JobClient.run(job1).

바로 아래에 두 번째 작업에 대한 JobConf 개체 “job2″를 만들고 “temp”를 inputdirectory로, “output”을 출력 디렉터리로 모든 매개 변수를 설정합니다. 이 작업 실행 :

JobClient.run(job2).

(2) 두 개의 JobConf 객체를 만들고 JobClient.run을 사용하지 않는다는 점을 제외하고는 (1) 과 같이 모든 매개 변수를 설정합니다 .

그런 다음 jobconf를 매개 변수로 사용하여 두 개의 Job 객체를 만듭니다.

Job job1=new Job(jobconf1); 
Job job2=new Job(jobconf2);

jobControl 개체를 사용하여 작업 종속성을 지정한 다음 작업을 실행합니다.

JobControl jbcntrl=new JobControl("jbcntrl");
jbcntrl.addJob(job1);
jbcntrl.addJob(job2);
job2.addDependingJob(job1);
jbcntrl.run();

(3) Map + | 감소 | Map *에서는 Hadoop 버전 0.19 이상과 함께 제공되는 ChainMapper 및 ChainReducer 클래스를 사용할 수 있습니다.


답변

실제로이를 수행하는 방법에는 여러 가지가 있습니다. 두 가지에 집중하겠습니다.

하나는 Riffle ( http://github.com/cwensel/riffle )을 통해 종속 항목을 식별하고 종속성 (토폴로지) 순서로 ‘실행’하는 주석 라이브러리입니다.

또는 Cascading ( http://www.cascading.org/ ) 에서 Cascade (및 MapReduceFlow)를 사용할 수 있습니다 . 향후 버전은 Riffle 주석을 지원하지만 이제는 원시 MR JobConf 작업에서 잘 작동합니다.

이에 대한 변형은 MR 작업을 전혀 관리하지 않고 Cascading API를 사용하여 애플리케이션을 개발하는 것입니다. 그런 다음 JobConf 및 작업 체인은 Cascading planner 및 Flow 클래스를 통해 내부적으로 처리됩니다.

이렇게하면 Hadoop 작업 등을 관리하는 메커니즘이 아니라 문제에 집중하는 데 시간을 할애 할 수 있습니다. 다른 언어 (예 : clojure 또는 jruby)를 계층화하여 개발 및 애플리케이션을 더욱 단순화 할 수도 있습니다. http://www.cascading.org/modules.html


답변

JobConf 객체를 차례로 사용하여 작업 체인을 수행했습니다. 작업을 연결하기 위해 WordCount 예제를 사용했습니다. 한 작업은 주어진 출력에서 ​​단어가 몇 번 반복되는지 알아냅니다. 두 번째 작업은 첫 번째 작업 출력을 입력으로 사용하고 주어진 입력에서 총 단어를 계산합니다. 다음은 Driver 클래스에 배치해야하는 코드입니다.

    //First Job - Counts, how many times a word encountered in a given file 
    JobConf job1 = new JobConf(WordCount.class);
    job1.setJobName("WordCount");

    job1.setOutputKeyClass(Text.class);
    job1.setOutputValueClass(IntWritable.class);

    job1.setMapperClass(WordCountMapper.class);
    job1.setCombinerClass(WordCountReducer.class);
    job1.setReducerClass(WordCountReducer.class);

    job1.setInputFormat(TextInputFormat.class);
    job1.setOutputFormat(TextOutputFormat.class);

    //Ensure that a folder with the "input_data" exists on HDFS and contains the input files
    FileInputFormat.setInputPaths(job1, new Path("input_data"));

    //"first_job_output" contains data that how many times a word occurred in the given file
    //This will be the input to the second job. For second job, input data name should be
    //"first_job_output". 
    FileOutputFormat.setOutputPath(job1, new Path("first_job_output"));

    JobClient.runJob(job1);


    //Second Job - Counts total number of words in a given file

    JobConf job2 = new JobConf(TotalWords.class);
    job2.setJobName("TotalWords");

    job2.setOutputKeyClass(Text.class);
    job2.setOutputValueClass(IntWritable.class);

    job2.setMapperClass(TotalWordsMapper.class);
    job2.setCombinerClass(TotalWordsReducer.class);
    job2.setReducerClass(TotalWordsReducer.class);

    job2.setInputFormat(TextInputFormat.class);
    job2.setOutputFormat(TextOutputFormat.class);

    //Path name for this job should match first job's output path name
    FileInputFormat.setInputPaths(job2, new Path("first_job_output"));

    //This will contain the final output. If you want to send this jobs output
    //as input to third job, then third jobs input path name should be "second_job_output"
    //In this way, jobs can be chained, sending output one to other as input and get the
    //final output
    FileOutputFormat.setOutputPath(job2, new Path("second_job_output"));

    JobClient.runJob(job2);

이러한 작업을 실행하는 명령은 다음과 같습니다.

bin / hadoop jar TotalWords.

명령에 대한 최종 작업 이름을 제공해야합니다. 위의 경우 TotalWords입니다.


답변

코드에 주어진 방식으로 MR 체인을 실행할 수 있습니다.

참고 : 드라이버 코드 만 제공되었습니다.

public class WordCountSorting {
// here the word keys shall be sorted
      //let us write the wordcount logic first

      public static void main(String[] args)throws IOException,InterruptedException,ClassNotFoundException {
            //THE DRIVER CODE FOR MR CHAIN
            Configuration conf1=new Configuration();
            Job j1=Job.getInstance(conf1);
            j1.setJarByClass(WordCountSorting.class);
            j1.setMapperClass(MyMapper.class);
            j1.setReducerClass(MyReducer.class);

            j1.setMapOutputKeyClass(Text.class);
            j1.setMapOutputValueClass(IntWritable.class);
            j1.setOutputKeyClass(LongWritable.class);
            j1.setOutputValueClass(Text.class);
            Path outputPath=new Path("FirstMapper");
            FileInputFormat.addInputPath(j1,new Path(args[0]));
                  FileOutputFormat.setOutputPath(j1,outputPath);
                  outputPath.getFileSystem(conf1).delete(outputPath);
            j1.waitForCompletion(true);
                  Configuration conf2=new Configuration();
                  Job j2=Job.getInstance(conf2);
                  j2.setJarByClass(WordCountSorting.class);
                  j2.setMapperClass(MyMapper2.class);
                  j2.setNumReduceTasks(0);
                  j2.setOutputKeyClass(Text.class);
                  j2.setOutputValueClass(IntWritable.class);
                  Path outputPath1=new Path(args[1]);
                  FileInputFormat.addInputPath(j2, outputPath);
                  FileOutputFormat.setOutputPath(j2, outputPath1);
                  outputPath1.getFileSystem(conf2).delete(outputPath1, true);
                  System.exit(j2.waitForCompletion(true)?0:1);
      }

}

순서는

( JOB1 ) MAP-> REDUCE-> ( JOB2 ) MAP
키를 정렬하기 위해 수행되었지만 트리 맵을 사용하는 것과 같은 더 많은 방법
이 있지만 작업이 연결된 방식에주의를 집중하고 싶습니다! !
감사합니다


답변

MapReduce 작업을 처리하는 barch에 oozie를 사용할 수 있습니다. http://issues.apache.org/jira/browse/HADOOP-5303


답변

Apache Mahout 프로젝트에는 여러 MapReduce 작업을 연결하는 예제가 있습니다. 예 중 하나는 다음에서 찾을 수 있습니다.

RecommenderJob.java

http://search-lucene.com/c/Mahout:/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java%7C%7CRecommenderJob