HadoopでJSONデータを扱う

hadooppython

HadoopJSONを扱うのにはどうしたらいいのだろうとググっても出なかったのでかいた。
Twitterから取得できるデータは基本JSON。yatsのAPIを叩いてデータを保存しておく。search.jsonという名前で保存した。
やりたいことはツイートの内容をYahoo!形態素解析にかけて分解して単語を数える。今回はyatsから"政治"で取得してきたので"政治"と一緒に使われている単語がわかるわけですね。

hadoopがあるフォルダ直下にpythonフォルダを作成して下記の3つのファイルを作成

jsonmapper.py

#!/usr/bin/env python

import sys
import simplejson
import yahooMorph


#print sys.stdin.read()
#print sys.stdin.read()
json_str = simplejson.loads(sys.stdin.read())

for line in json_str:
    #print s['user'] + "\t" + s['content']
    result = yahooMorph.morph(sentence=line['content'],ma_filter="9")
    for word in result:
        word = word + "\t1"
        #print "%s\t%s" % (word,1)
        word = word.encode('utf-8')
        print word

yahooMorph.py

#!/usr/bin/env python
#coding:utf-8

import urllib
import urllib2
from BeautifulSoup import BeautifulSoup

APPID = "apiのid"
PAGEURL = "http://jlp.yahooapis.jp/MAService/V1/parse"

def morph(sentence, appid=APPID, results="ma", ma_filter="1|2|3|4|5|6|7|8|9|10|11|12|13"):
    sentence = sentence.encode('utf-8')
    params = urllib.urlencode({'appid':appid,'results':results,'ma_filter':ma_filter,'sentence':sentence})
    #query = "%s?appid=%s&results=%s&uniq_filter=%s&sentence=%s" % (PAGEURL, appid, results, uniq_filter, sentence)
    c = urllib2.urlopen(PAGEURL,params)
    soup = BeautifulSoup(c.read())
    #return [(w.surface.string, w.reading.string, w.pos.string)
    #        for w in soup.ma_result.word_list]
    return [w.surface.string for w in soup.ma_result.word_list]

reducer.py

#!/usr/bin/env python

from operator import itemgetter
import sys

word2count = {}

for line in sys.stdin:
  line = line.strip()

  word,count = line.split("\t",1)
  try:
    count = int(count)
    word2count[word] = word2count.get(word, 0) + count
  except ValueError:
    pass

sorted_word2count = sorted(word2count.items(),key=itemgetter(0))

for word, count in sorted_word2count:
  print "%s\t%s" % (word, count)

とりあえず

cat inputjson/search.json | python python/jsonmapper.py | sort | python python/reducer.py

確認できました。

hadoop jar contrib/streaming/hadoop-streaming-*.jar \

  • input input \
  • output outputs \
  • mapper python/jsonmapper \
  • reducer python/reducer.py \
  • file /usr/lib/hadoop-0.20/python/jsonmapper.py \
  • file /usr/lib/hadoop-0.20/python/reducer.py

packageJobJar: [/usr/lib/hadoop-0.20/python/jsonmapper.py, /usr/lib/hadoop-0.20/python/reducer.py, /var/lib/hadoop-0.20/cache/hadoop/hadoop-unjar4015039559084905408/] [] /tmp/streamjob7352753504723779858.jar tmpDir=null
10/08/12 14:13:15 INFO mapred.FileInputFormat: Total input paths to process : 3
10/08/12 14:13:15 INFO streaming.StreamJob: getLocalDirs(): [/var/lib/hadoop-0.20/cache/hadoop/mapred/local]
10/08/12 14:13:15 INFO streaming.StreamJob: Running job: job_201008111735_0019
10/08/12 14:13:15 INFO streaming.StreamJob: To kill this job, run:
10/08/12 14:13:15 INFO streaming.StreamJob: /usr/lib/hadoop-0.20/bin/hadoop job -Dmapred.job.tracker=localhost:8021 -kill job_201008111735_0019
10/08/12 14:13:15 INFO streaming.StreamJob: Tracking URL: http://localhost:50030/jobdetails.jsp?jobid=job_201008111735_0019
10/08/12 14:13:16 INFO streaming.StreamJob: map 0% reduce 0%
10/08/12 14:13:48 INFO streaming.StreamJob: map 100% reduce 100%
10/08/12 14:13:48 INFO streaming.StreamJob: To kill this job, run:
10/08/12 14:13:48 INFO streaming.StreamJob: /usr/lib/hadoop-0.20/bin/hadoop job -Dmapred.job.tracker=localhost:8021 -kill job_201008111735_0019
10/08/12 14:13:48 INFO streaming.StreamJob: Tracking URL: http://localhost:50030/jobdetails.jsp?jobid=job_201008111735_0019
10/08/12 14:13:48 ERROR streaming.StreamJob: Job not Successful!
10/08/12 14:13:48 INFO streaming.StreamJob: killJob...
Streaming Command Failed!

あ、あれぇ.....