HadoopでPython

hadooppython
HBaseをとりあえずあきらめてHadoopPythonで書くことにした。

mapper.py

#!/usr/bin/env python

import sys

for line in sys.stdin:
  line = line.strip()
  words = line.split()
  for word in words:
    print "%s\t%s" % (word, 1)

reducer.py

#!/usr/bin/env python

from operator import itemgetter
import sys

word2count = {}
  
for line in sys.stdin:
  line = line.strip()

  word,count = line.split("\t",1)
  try:
    count = int(count)
    word2count[word] = word2count.get(word, 0) + count
  except ValueError:
    pass

sorted_word2count = sorted(word2count.items(),key=itemgetter(0))

for word, count in sorted_word2count:
  print "%s\t%s" % (word, count)

適当なデータを用意して

hadoop jar contrib/streaming/hadoop-streaming-0.20.2+320.jar -mapper python/mapper.py -reducer python/reducer.py -input input -output output

結果は

packageJobJar: [/var/lib/hadoop-0.20/cache/hadoop/hadoop-unjar8477778014084911721/] [] /tmp/streamjob2926389444165168943.jar tmpDir=null
10/08/10 16:38:36 INFO mapred.FileInputFormat: Total input paths to process : 3
10/08/10 16:38:36 INFO streaming.StreamJob: getLocalDirs(): [/var/lib/hadoop-0.20/cache/hadoop/mapred/local]
10/08/10 16:38:36 INFO streaming.StreamJob: Running job: job_201008101314_0014
10/08/10 16:38:36 INFO streaming.StreamJob: To kill this job, run:
10/08/10 16:38:36 INFO streaming.StreamJob: /usr/lib/hadoop-0.20/bin/hadoop job -Dmapred.job.tracker=localhost:8021 -kill job_201008101314_0014
10/08/10 16:38:36 INFO streaming.StreamJob: Tracking URL: http://localhost:50030/jobdetails.jsp?jobid=job_201008101314_0014
10/08/10 16:38:37 INFO streaming.StreamJob: map 0% reduce 0%
10/08/10 16:39:09 INFO streaming.StreamJob: map 100% reduce 100%
10/08/10 16:39:09 INFO streaming.StreamJob: To kill this job, run:
10/08/10 16:39:09 INFO streaming.StreamJob: /usr/lib/hadoop-0.20/bin/hadoop job -Dmapred.job.tracker=localhost:8021 -kill job_201008101314_0014
10/08/10 16:39:09 INFO streaming.StreamJob: Tracking URL: http://localhost:50030/jobdetails.jsp?jobid=job_201008101314_0014
10/08/10 16:39:09 ERROR streaming.StreamJob: Job not Successful!
10/08/10 16:39:09 INFO streaming.StreamJob: killJob...
Streaming Command Failed!

失敗...

cat input/sample.txt | python python/mapper.py | sort | python python/reducer.py

これはうまく動作する。

  • 追記

hadoop jar contrib/streaming/hadoop-streaming-0.20.2+320.jar \ [/usr/lib/hadoop-0.20]

  • mapper python/mapper.py \
  • reducer python/reducer.py \
  • input input \
  • output outputs \
  • file python/mapper.py \
  • file python/reducer.py

これでうまくいった。初めのJobがKillされたけど結果は出力されたっぽい