HadoopでJSONデータを扱う
HadoopでJSONを扱うのにはどうしたらいいのだろうとググっても出なかったのでかいた。
Twitterから取得できるデータは基本JSON。yatsのAPIを叩いてデータを保存しておく。search.jsonという名前で保存した。
やりたいことはツイートの内容をYahoo!形態素解析にかけて分解して単語を数える。今回はyatsから"政治"で取得してきたので"政治"と一緒に使われている単語がわかるわけですね。
hadoopがあるフォルダ直下にpythonフォルダを作成して下記の3つのファイルを作成
jsonmapper.py
#!/usr/bin/env python import sys import simplejson import yahooMorph #print sys.stdin.read() #print sys.stdin.read() json_str = simplejson.loads(sys.stdin.read()) for line in json_str: #print s['user'] + "\t" + s['content'] result = yahooMorph.morph(sentence=line['content'],ma_filter="9") for word in result: word = word + "\t1" #print "%s\t%s" % (word,1) word = word.encode('utf-8') print word
yahooMorph.py
#!/usr/bin/env python #coding:utf-8 import urllib import urllib2 from BeautifulSoup import BeautifulSoup APPID = "apiのid" PAGEURL = "http://jlp.yahooapis.jp/MAService/V1/parse" def morph(sentence, appid=APPID, results="ma", ma_filter="1|2|3|4|5|6|7|8|9|10|11|12|13"): sentence = sentence.encode('utf-8') params = urllib.urlencode({'appid':appid,'results':results,'ma_filter':ma_filter,'sentence':sentence}) #query = "%s?appid=%s&results=%s&uniq_filter=%s&sentence=%s" % (PAGEURL, appid, results, uniq_filter, sentence) c = urllib2.urlopen(PAGEURL,params) soup = BeautifulSoup(c.read()) #return [(w.surface.string, w.reading.string, w.pos.string) # for w in soup.ma_result.word_list] return [w.surface.string for w in soup.ma_result.word_list]
reducer.py
#!/usr/bin/env python from operator import itemgetter import sys word2count = {} for line in sys.stdin: line = line.strip() word,count = line.split("\t",1) try: count = int(count) word2count[word] = word2count.get(word, 0) + count except ValueError: pass sorted_word2count = sorted(word2count.items(),key=itemgetter(0)) for word, count in sorted_word2count: print "%s\t%s" % (word, count)
とりあえず
cat inputjson/search.json | python python/jsonmapper.py | sort | python python/reducer.py
確認できました。
packageJobJar: [/usr/lib/hadoop-0.20/python/jsonmapper.py, /usr/lib/hadoop-0.20/python/reducer.py, /var/lib/hadoop-0.20/cache/hadoop/hadoop-unjar4015039559084905408/] [] /tmp/streamjob7352753504723779858.jar tmpDir=null
10/08/12 14:13:15 INFO mapred.FileInputFormat: Total input paths to process : 3
10/08/12 14:13:15 INFO streaming.StreamJob: getLocalDirs(): [/var/lib/hadoop-0.20/cache/hadoop/mapred/local]
10/08/12 14:13:15 INFO streaming.StreamJob: Running job: job_201008111735_0019
10/08/12 14:13:15 INFO streaming.StreamJob: To kill this job, run:
10/08/12 14:13:15 INFO streaming.StreamJob: /usr/lib/hadoop-0.20/bin/hadoop job -Dmapred.job.tracker=localhost:8021 -kill job_201008111735_0019
10/08/12 14:13:15 INFO streaming.StreamJob: Tracking URL: http://localhost:50030/jobdetails.jsp?jobid=job_201008111735_0019
10/08/12 14:13:16 INFO streaming.StreamJob: map 0% reduce 0%
10/08/12 14:13:48 INFO streaming.StreamJob: map 100% reduce 100%
10/08/12 14:13:48 INFO streaming.StreamJob: To kill this job, run:
10/08/12 14:13:48 INFO streaming.StreamJob: /usr/lib/hadoop-0.20/bin/hadoop job -Dmapred.job.tracker=localhost:8021 -kill job_201008111735_0019
10/08/12 14:13:48 INFO streaming.StreamJob: Tracking URL: http://localhost:50030/jobdetails.jsp?jobid=job_201008111735_0019
10/08/12 14:13:48 ERROR streaming.StreamJob: Job not Successful!
10/08/12 14:13:48 INFO streaming.StreamJob: killJob...
Streaming Command Failed!
あ、あれぇ.....