HadoopでPython
hadooppython
HBaseをとりあえずあきらめてHadoopをPythonで書くことにした。
mapper.py
#!/usr/bin/env python import sys for line in sys.stdin: line = line.strip() words = line.split() for word in words: print "%s\t%s" % (word, 1)
reducer.py
#!/usr/bin/env python from operator import itemgetter import sys word2count = {} for line in sys.stdin: line = line.strip() word,count = line.split("\t",1) try: count = int(count) word2count[word] = word2count.get(word, 0) + count except ValueError: pass sorted_word2count = sorted(word2count.items(),key=itemgetter(0)) for word, count in sorted_word2count: print "%s\t%s" % (word, count)
適当なデータを用意して
hadoop jar contrib/streaming/hadoop-streaming-0.20.2+320.jar -mapper python/mapper.py -reducer python/reducer.py -input input -output output
結果は
packageJobJar: [/var/lib/hadoop-0.20/cache/hadoop/hadoop-unjar8477778014084911721/] [] /tmp/streamjob2926389444165168943.jar tmpDir=null
10/08/10 16:38:36 INFO mapred.FileInputFormat: Total input paths to process : 3
10/08/10 16:38:36 INFO streaming.StreamJob: getLocalDirs(): [/var/lib/hadoop-0.20/cache/hadoop/mapred/local]
10/08/10 16:38:36 INFO streaming.StreamJob: Running job: job_201008101314_0014
10/08/10 16:38:36 INFO streaming.StreamJob: To kill this job, run:
10/08/10 16:38:36 INFO streaming.StreamJob: /usr/lib/hadoop-0.20/bin/hadoop job -Dmapred.job.tracker=localhost:8021 -kill job_201008101314_0014
10/08/10 16:38:36 INFO streaming.StreamJob: Tracking URL: http://localhost:50030/jobdetails.jsp?jobid=job_201008101314_0014
10/08/10 16:38:37 INFO streaming.StreamJob: map 0% reduce 0%
10/08/10 16:39:09 INFO streaming.StreamJob: map 100% reduce 100%
10/08/10 16:39:09 INFO streaming.StreamJob: To kill this job, run:
10/08/10 16:39:09 INFO streaming.StreamJob: /usr/lib/hadoop-0.20/bin/hadoop job -Dmapred.job.tracker=localhost:8021 -kill job_201008101314_0014
10/08/10 16:39:09 INFO streaming.StreamJob: Tracking URL: http://localhost:50030/jobdetails.jsp?jobid=job_201008101314_0014
10/08/10 16:39:09 ERROR streaming.StreamJob: Job not Successful!
10/08/10 16:39:09 INFO streaming.StreamJob: killJob...
Streaming Command Failed!
失敗...
cat input/sample.txt | python python/mapper.py | sort | python python/reducer.py
これはうまく動作する。
- 追記
hadoop jar contrib/streaming/hadoop-streaming-0.20.2+320.jar \ [/usr/lib/hadoop-0.20]
これでうまくいった。初めのJobがKillされたけど結果は出力されたっぽい