机遇 research之路其修远兮,我将上下而求索

Hadoop到Spark的踩坑

2017-09-30
cwlseu

引言

在tx新闻实习期间,接触到各种新的事物,Hadoop,spark作为内部开发的基础工具,上手是开展工作的基础。作为一个分布式小白, 战战兢兢开始了在hadoop上的开发之旅。我的任务主要是使用Hadoop中的HBase作为数据存储,spark作数据清洗和模型训练,生成一个针对 腾讯新闻网的圈子模型,上线新的新闻推荐频道,新的小视频推荐频道等。

install jdk in centos

  1. wget –no-cookies –no-check-certificate –header “Cookie:oraclelicense=accept-securebackup-cookie” “http://download.oracle.com/otn-pub/java/jdk/8u91-b14/jdk-8u91-linux-x64.rpm”

  2. yum localinstall jdk-8u91-linux-x64.rpm

查看开发端口

netstat -nlp | grep tcp

install Hadoop

重新启动hadoop之后

bin/hdfs namenode -format

运行Demo

下面看MapReduce的例子

package com.tencent.omg;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordMain {

  public static class TokenizerMapper
       extends Mapper<Object, Text, Text, IntWritable>{

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);
      }
    }
  }

  public static class IntSumReducer
       extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values,
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }
  }

  public static void main(String[] args) throws Exception {
    // Configuration 类: 读取hadoop的配置文件,如 site-core.xml...;
    //也可以用set方法重新设置(会覆盖): conf.set("fs.defaultFS","hdfs://masterhost:9000")
    Configuration conf = new Configuration();
    // 新建一个job,传入配置信息
    Job job = Job.getInstance(conf, "word count");
    //设置主类
    job.setJarByClass(WordMain.class);
    //设置Mapper类
    job.setMapperClass(TokenizerMapper.class);
    // the output of each map is passed through the local combiner (which is same as the Reducer as per the job configuration) for local aggregation, after being sorted on the *key*s.
    job.setCombinerClass(IntSumReducer.class);
    // 
    job.setReducerClass(IntSumReducer.class);
    // 设置输出类型
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    // 获取输入参数
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

运行测试程序

hadoop jar wordcount.jar com.tencent.omg.WordMain /user/root/input /user/root/out 其中hadoop jar就是要运行jar的意思,接下来wordcount.jar是输入的jarFile,然后是[mainClass] 及mainClass的参数们 args…; 其中本程序中需要输入两个参数

hadoop出现的问题

Datanode不能够启动的问题

根据there-are-0-datanodes-running-and-no-nodes-are-excluded-in-this-operation

  • remove all temporary files ( by default in /tmp) - sudo rm -R /tmp/*.
  • Now try connecting to all nodes through ssh by using ssh username@host and add keys in your master using ssh-copy-id -i ~/.ssh/id_rsa.pub username@host to give unrestricted access of slaves to the master (not doing so might be the problem for refusing connections).
  • Format the namenode using hadoop namenode -format and try restarting the daemons.

文件不存在问题

bin/hdfs dfs -mkdir -p /user/${username}/output

总体上来说,hdfs命令就是在原来的shell命令之前添加hadoop dfs -就是了,其他的就丢给文件系统去做显示,查找的了。

删除文件夹

/path/to/hadoop dfs -rm -rf /user/hadoop/topics/dumps

hdfs shell官方文档中提及的一些命令已经基本够用了

向spark提交任务

首先将spark程序export为jar包

. /data/pac_hadoop/spark/bin/spark-submit --class com.tencent.Lda --master spark://master:portnum ./xxxx/lda.jar /user/hadoop/topics/dumps /user/hadoop/model_path 1 1 30 > ./xxxx/segment_all_1.log

最后的结果输出,然后根据 Spark start spark官方文档翻译博客,可供参考

reference

  1. Hadoop Commands Guide
  2. 本地编辑,hadoopserver部署运行案例-详细
  3. 一个python写的hadoop map reduce程序

理论部分

LDA是一个主题模型,它能够推理出一个文本文档集合的主题。LDA可以认为是一个聚类算法,原因如下:

  • 主题对应聚类中心,文档对应数据集中的样本(数据行)
  • 主题和文档都在一个特征空间中,其特征向量是词频向量
  • 跟使用传统的距离来评估聚类不一样的是,LDA使用评估方式是一个函数,该函数基于文档如何生成的统计模型。

LDA以词频向量表示的文档集合作为输入,输出结果提供:

  • Topics:推断出的主题,每个主题是单词上的概率分布。
  • Topic distributions for documents:对训练集中的每个文档,LDA给一个在主题上的概率分布。

查看spark集群

在master 节点上使用 w3m http://localhost:8080

问题:

  1. LDA模型的训练集合哪里来, rcd_articles还是pac_articles 发现segment_all中的数据是从pac_articles 因为后面要去读取对应的docid的title

2.ssh连接断掉的原因是什么

/data/pac_hadoop/spark/bin/spark-submit –executor-cores 1 –executor-memory 1g –num-executors 2 –class KafkaWordCount –master spark:// xxxx.xxxx.xxx.xxx:7077 xw_personas_proj-1.0.jar xxxx.xxxx.xxx.xxx:9092,xxxx.xxxx.xxx.xxx:9092,xxxx.xxxx.xxx.xxx:9092 xw_user_logs

执行jar-> spark

/data/pac_hadoop/spark/bin/spark-submit –executor-cores 1 –executor-memory 1g –num-executors 6 –class KafkaWordCount –master spark://xxxx.xxxx.xxx.xxx:7077 xw_personas_proj-1.0.jar xxxx.xxxx.xxx.xxx:9092 xw_user_logs

从remove拉取文件 scp hadoop@xx.xxx.xx.xx:/home/hadoop/xxxx lda_result_500_20

坑:

  1. 上传下载数据,路径使用绝对路径,绝对路径,绝对路径重要的事情说三遍;尤其是使用python os.system()执行的时候。
  2. 文件不要以_开头,下划线开头的文件在spark中表示隐藏文件,根本就不读取其中的内容。很多情况下以下划线开头表示隐藏文件
  3. Scala是也是可以访问数据库的,技术选型很重要

SQL语句:不要有多余的空格 UPDATE cwl_rcd_corpus_info_tx SET topic = '187:0.029912,107:0.027155,142:0.025944,10:0.021934,148:0.016860', update_time_tx=14934342342 WHERE article_id_tx=20170614034267;

#!/bin/sh
export PATH=$PATH:/data/Python-2.7.3/bin
export PYTHONPATH=$PYTHONPATH:/data/pac_hadoop/spark/python

virtualenv topics
source topics/bin/activate

/bin/sh /data/pac_hadoop/spark/bin/spark-submit --class com.tencent.TopicShow --master spark://xxxx.xxxx.xxx.xxx:7077 topicshow.jar

select pac_articles.article_id, title, content from pac_articles left join rcd_corpus_info on rcd_corpus_info.article_id where pac_articles.create_time between '1499244660' and '1499307191' limit 10;

select count(a.article_id) from rcd_corpus_info b, pac_articles a where a.article_id=b.article_id and b.create_time between 1499137016 and 1499309816 order by b.create_time desc limit 30000 offset 0;

ps -ef | grep "topics_corpus_update_hdfs.py" | awk '{print $2}'|xargs kill -9

|版权声明:本文为博主原创文章,未经博主允许不得转载。


相似博文

欢迎评论