注册 登录  
 加关注
   显示下一条  |  关闭
温馨提示!由于新浪微博认证机制调整,您的新浪微博帐号绑定已过期,请重新绑定!立即重新绑定新浪微博》  |  关闭

避尘巷

沧海一粟渡沧海

 
 
 

日志

 
 

一个基于Mahout与hadoop的聚类搭建  

2012-05-04 09:39:23|  分类: Hadoop |  标签: |举报 |字号 订阅

  下载LOFTER 我的照片书  |
    mahout是基于hadoop的数据挖掘工具,因为有了hadoop,所以进行海量数据的挖掘工作显得更为简单。但是因为算法需要支持M/R,所以不是所有常用的数据挖掘算法都会支持。这篇文章会告诉你,如何使用hadoop + mahout搭出一个简易的聚类工具。 

    第一步:搭建hadoop平台。 

我使用的是ubuntu 11.04,如果没有ubuntu的开发环境,就参考我的帖子《Ubuntu 10.10 java 开发环境》 

    #1 在ubuntu下面建立一个用户组与用户 
Java代码  收藏代码
  1. beneo@ubuntu:~$ sudo addgroup hadoop  
  2. beneo@ubuntu:~$ sudo adduser --ingroup hadoop hduser  


   #2 安装ssh-server 
Java代码  收藏代码
  1. beneo@ubuntu:~$ sudo apt-get install ssh  
  2. beneo@ubuntu:~$ su - hduser  
  3. hduser@ubuntu:~$ ssh-keygen -t rsa -P ""  
  4. hduser@ubuntu:~$ cat $HOME/.ssh/id_rsa.pub >> $HOME/.ssh/authorized_keys  


   #3 验证ssh通信 
Java代码  收藏代码
  1. hduser@ubuntu:ssh localhost  


ssh localhost 后,选择 yes,如果没有问题,就可以安装hadoop了 
   
   #4 添加java_home 
修改conf/hadoop-env.sh文件,让JAVA_HOME指向正确的地址 

   #5 修改下面的配置 
conf/core-site.xml: 
Java代码  收藏代码
  1. <configuration>  
  2.      <property>  
  3.          <name>fs.default.name</name>  
  4.          <value>hdfs://localhost:9000</value>  
  5.      </property>  
  6. </configuration>  

conf/hdfs-site.xml: 
Java代码  收藏代码
  1. <configuration>  
  2.      <property>  
  3.          <name>dfs.replication</name>  
  4.          <value>1</value>  
  5.      </property>  
  6. </configuration>  

conf/mapred-site.xml: 
Java代码  收藏代码
  1. <configuration>  
  2.      <property>  
  3.          <name>mapred.job.tracker</name>  
  4.          <value>localhost:9001</value>  
  5.      </property>  
  6. </configuration>  


    #6 Format a new distributed-filesystem: 
Java代码  收藏代码
  1. $ bin/hadoop namenode -format  


    #7 Start the hadoop daemons: 
Java代码  收藏代码
  1. $ bin/start-all.sh  


    #8 验证启动成功没有 
Java代码  收藏代码
  1. $ jps  

数一下有没有6个,没有的话,删除logs下面的文件,然后从#6开始 

    #9 别慌,先打开网页,打不开,等!!! 
Java代码  收藏代码
  1. NameNode - http://localhost:50070/  
  2. JobTracker - http://localhost:50030/  


第一步搭建hadoop结束 

    第二步,Mahout的配置 

    #1 下载Mahout,解压 
    #2 .bash_profile里面设置HADOOP_HOME 
    #3 mahout/bin/mahout 看看打印结果 

    第三步,做一个聚类的demo吧 

我的聚类是文本 -> lucene index -> mahout -> clustering dumper 
可以选择的是 sequeneceFile -> mahout -> clustering dumper 

我直接贴代码吧,用的是groovy,可能写的不好 
    #1 text -> lucene index 
Java代码  收藏代码
  1. def assembleDoc = {  
  2.     label, content ->  
  3.     assert !label.toString().trim().empty  
  4.     assert !content.toString().trim().empty  
  5.   
  6.     def doc = new Document()  
  7.     doc.add(new Field("label", label, Field.Store.YES, Field.Index.NOT_ANALYZED))  
  8.     doc.add(new Field("content", content, Field.Store.NO, Field.Index.ANALYZED, TermVector.YES))  
  9.     doc  
  10. }  
  11.   
  12. def mockContent = {  
  13.     def set = []  
  14.     new File("""/home/beneo/text.txt""").newReader().eachLine {  
  15.         String line ->  
  16.         set << line  
  17.     }  
  18.     set  
  19. }  
  20.   
  21.   
  22. def mockExpandoSet = {  
  23.   
  24.     def lst = []  
  25.   
  26.     mockContent()?.each {  
  27.         content ->  
  28.         // 过滤掉所有非中文字符  
  29.         def line = content.replaceAll("[^\一-\龥]+""")  
  30.         if (line != null && line.trim().length() > 2) {  
  31.             println(content)  
  32.             def expando = new Expando()  
  33.             expando.label = content  
  34.             expando.content = line  
  35.             lst << expando  
  36.         }  
  37.     }  
  38.     lst  
  39. }  
  40.   
  41. //创建一个dic  
  42. def directory = FSDirectory.open(new File("""/home/beneo/index"""), NoLockFactory.getNoLockFactory())  
  43. // 用的是 IK分词  
  44. def analyzer = new IKAnalyzer()  
  45. //创建一个indexWriter,这个wirter就是用来产生出index  
  46. def indexWriter = new IndexWriter(directory, analyzer, true, IndexWriter.MaxFieldLength.UNLIMITED)  
  47.   
  48. //从本地获得文本  
  49. mockExpandoSet().each {  
  50.     expando ->  
  51.     indexWriter.addDocument(assembleDoc(expando.label, expando.content))  
  52. }  
  53.   
  54. indexWriter.commit()  
  55. indexWriter.close()  
  56. directory.close()  
         

    #2 lucene index -> mahout vector 
Java代码  收藏代码
  1. mahout/bin/mahout lucene.vector -d index/ -i label -o tmp/vector/vector -f content -t tmp/vector/dict -n 2  

  
    #3 mahout vector -> mahout canopy 
Java代码  收藏代码
  1. mahout/bin/mahout canopy -i tmp/vector/vector -o tmp/canopy/ -dm org.apache.mahout.common.distance.CosineDistanceMeasure -t1 0.32 -t2 0.08 -ow  


    #4 mahout canopy -> mahout kmeans 
Java代码  收藏代码
  1. mahout/bin/mahout kmeans -i tmp/vector/vector -c tmp/canopy/clusters-0/ -dm org.apache.mahout.common.distance.CosineDistanceMeasure -o tmp/kmeans/ -cd 0.001 -x 10 -ow -cl  

    
   #5 mahout keamns -> 结果分析 
Java代码  收藏代码
  1. String seqFileDir = "/home/hduser/tmp/kmeans/clusters-2/"  
  2. String pointsDir = "/home/hduser/tmp/kmeans/clusteredPoints/"  
  3.   
  4. def conf = new Configuration()  
  5.   
  6. FileSystem fs = new Path(seqFileDir).getFileSystem(conf)  
  7.   
  8. Map<Integer, List<WeightedVectorWritable>> clusterIdToPoints = readPoints(new Path(pointsDir), new Configuration());  
  9.   
  10. for (FileStatus seqFile: fs.globStatus(new Path(seqFileDir, "part-*"))) {  
  11.     Path path = seqFile.getPath()  
  12.   
  13.     SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);  
  14.   
  15.     org.apache.hadoop.io.Writable key = reader.getKeyClass().asSubclass(org.apache.hadoop.io.Writable.class).newInstance();  
  16.     org.apache.hadoop.io.Writable value = reader.getValueClass().asSubclass(org.apache.hadoop.io.Writable.class).newInstance();  
  17.   
  18.     while (reader.next(key, value)) {  
  19.         Cluster cluster = (Cluster) value;  
  20.         int id = cluster.getId()  
  21.         int np = cluster.getNumPoints()  
  22.   
  23.         List<WeightedVectorWritable> points = clusterIdToPoints.get(cluster.getId());  
  24.         if (points != null && points.size() > 4) {  
  25.             for (Iterator<WeightedVectorWritable> iterator = points.iterator(); iterator.hasNext();) {  
  26.                 println(((NamedVector) iterator.next().getVector()).getName())  
  27.             }  
  28.             println "======================================"  
  29.         }  
  30.     }  
  31. }  
  32.   
  33.   
  34. private static Map<Integer, List<WeightedVectorWritable>> readPoints(Path pointsPathDir, Configuration conf)  
  35. throws IOException {  
  36.     Map<Integer, List<WeightedVectorWritable>> result = new TreeMap<Integer, List<WeightedVectorWritable>>();  
  37.   
  38.     FileSystem fs = pointsPathDir.getFileSystem(conf);  
  39.     FileStatus[] children = fs.listStatus(pointsPathDir, new PathFilter() {  
  40.         @Override  
  41.         public boolean accept(Path path) {  
  42.             String name = path.getName();  
  43.             return !(name.endsWith(".crc") || name.startsWith("_"));  
  44.         }  
  45.     });  
  46.   
  47.     for (FileStatus file: children) {  
  48.         Path path = file.getPath();  
  49.         SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);  
  50.   
  51.         IntWritable key = reader.getKeyClass().asSubclass(IntWritable.class).newInstance();  
  52.         WeightedVectorWritable value = reader.getValueClass().asSubclass(WeightedVectorWritable.class).newInstance();  
  53.         while (reader.next(key, value)) {  
  54.             // value is the cluster id as an int, key is the name/id of the  
  55.             // vector, but that doesn't matter because we only care about printing  
  56.             // it  
  57.             // String clusterId = value.toString();  
  58.             List<WeightedVectorWritable> pointList = result.get(key.get());  
  59.             if (pointList == null) {  
  60.                 pointList = new ArrayList<WeightedVectorWritable>();  
  61.                 result.put(key.get(), pointList);  
  62.             }  
  63.             pointList.add(value);  
  64.             value = reader.getValueClass().asSubclass(WeightedVectorWritable.class).newInstance();  
  65.         }  
  66.   
  67.     }  
  68.   
  69.     return result;  
  70. }  


效果我就不展示了 
  评论这张
 
阅读(827)| 评论(0)
推荐 转载

历史上的今天

评论

<#--最新日志,群博日志--> <#--推荐日志--> <#--引用记录--> <#--博主推荐--> <#--随机阅读--> <#--首页推荐--> <#--历史上的今天--> <#--被推荐日志--> <#--上一篇,下一篇--> <#-- 热度 --> <#-- 网易新闻广告 --> <#--右边模块结构--> <#--评论模块结构--> <#--引用模块结构--> <#--博主发起的投票-->
 
 
 
 
 
 
 
 
 
 
 
 
 
 

页脚

网易公司版权所有 ©1997-2017