引言
在tx新闻实习期间,接触到各种新的事物,Hadoop,spark作为内部开发的基础工具,上手是开展工作的基础。作为一个分布式小白, 战战兢兢开始了在hadoop上的开发之旅。我的任务主要是使用Hadoop中的HBase作为数据存储,spark作数据清洗和模型训练,生成一个针对 腾讯新闻网的圈子模型,上线新的新闻推荐频道,新的小视频推荐频道等。
install jdk in centos
-
wget –no-cookies –no-check-certificate –header “Cookie:oraclelicense=accept-securebackup-cookie” “http://download.oracle.com/otn-pub/java/jdk/8u91-b14/jdk-8u91-linux-x64.rpm”
-
yum localinstall jdk-8u91-linux-x64.rpm
查看开发端口
netstat -nlp | grep tcp
install Hadoop
重新启动hadoop之后
bin/hdfs namenode -format
运行Demo
下面看MapReduce的例子
package com.tencent.omg;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordMain {
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
// Configuration 类: 读取hadoop的配置文件,如 site-core.xml...;
//也可以用set方法重新设置(会覆盖): conf.set("fs.defaultFS","hdfs://masterhost:9000")
Configuration conf = new Configuration();
// 新建一个job,传入配置信息
Job job = Job.getInstance(conf, "word count");
//设置主类
job.setJarByClass(WordMain.class);
//设置Mapper类
job.setMapperClass(TokenizerMapper.class);
// the output of each map is passed through the local combiner (which is same as the Reducer as per the job configuration) for local aggregation, after being sorted on the *key*s.
job.setCombinerClass(IntSumReducer.class);
//
job.setReducerClass(IntSumReducer.class);
// 设置输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 获取输入参数
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
运行测试程序
hadoop jar wordcount.jar com.tencent.omg.WordMain /user/root/input /user/root/out
其中hadoop jar
就是要运行jar的意思,接下来wordcount.jar
是输入的jarFile,然后是[mainClass] 及mainClass的参数们 args…;
其中本程序中需要输入两个参数
hadoop出现的问题
Datanode不能够启动的问题
根据there-are-0-datanodes-running-and-no-nodes-are-excluded-in-this-operation
- remove all temporary files ( by default in /tmp) - sudo rm -R /tmp/*.
- Now try connecting to all nodes through ssh by using ssh username@host and add keys in your master using ssh-copy-id -i ~/.ssh/id_rsa.pub username@host to give unrestricted access of slaves to the master (not doing so might be the problem for refusing connections).
- Format the namenode using hadoop namenode -format and try restarting the daemons.
文件不存在问题
bin/hdfs dfs -mkdir -p /user/${username}/output
总体上来说,hdfs命令就是在原来的shell命令之前添加hadoop dfs -
就是了,其他的就丢给文件系统去做显示,查找的了。
删除文件夹
/path/to/hadoop dfs -rm -rf /user/hadoop/topics/dumps
hdfs shell官方文档中提及的一些命令已经基本够用了
向spark提交任务
首先将spark程序export为jar包
. /data/pac_hadoop/spark/bin/spark-submit --class com.tencent.Lda --master spark://master:portnum ./xxxx/lda.jar /user/hadoop/topics/dumps /user/hadoop/model_path 1 1 30 > ./xxxx/segment_all_1.log
最后的结果输出,然后根据 Spark start spark官方文档翻译博客,可供参考
reference
理论部分
LDA是一个主题模型,它能够推理出一个文本文档集合的主题。LDA可以认为是一个聚类算法,原因如下:
- 主题对应聚类中心,文档对应数据集中的样本(数据行)
- 主题和文档都在一个特征空间中,其特征向量是词频向量
- 跟使用传统的距离来评估聚类不一样的是,LDA使用评估方式是一个函数,该函数基于文档如何生成的统计模型。
LDA以词频向量表示的文档集合作为输入,输出结果提供:
- Topics:推断出的主题,每个主题是单词上的概率分布。
- Topic distributions for documents:对训练集中的每个文档,LDA给一个在主题上的概率分布。
查看spark集群
在master 节点上使用 w3m http://localhost:8080
问题:
- LDA模型的训练集合哪里来, rcd_articles还是pac_articles 发现segment_all中的数据是从pac_articles 因为后面要去读取对应的docid的title
2.ssh连接断掉的原因是什么
/data/pac_hadoop/spark/bin/spark-submit –executor-cores 1 –executor-memory 1g –num-executors 2 –class KafkaWordCount –master spark:// xxxx.xxxx.xxx.xxx:7077 xw_personas_proj-1.0.jar xxxx.xxxx.xxx.xxx:9092,xxxx.xxxx.xxx.xxx:9092,xxxx.xxxx.xxx.xxx:9092 xw_user_logs
执行jar-> spark
/data/pac_hadoop/spark/bin/spark-submit –executor-cores 1 –executor-memory 1g –num-executors 6 –class KafkaWordCount –master spark://xxxx.xxxx.xxx.xxx:7077 xw_personas_proj-1.0.jar xxxx.xxxx.xxx.xxx:9092 xw_user_logs
从remove拉取文件 scp hadoop@xx.xxx.xx.xx:/home/hadoop/xxxx lda_result_500_20
坑:
- 上传下载数据,路径使用绝对路径,绝对路径,绝对路径重要的事情说三遍;尤其是使用python os.system()执行的时候。
- 文件不要以_开头,下划线开头的文件在spark中表示隐藏文件,根本就不读取其中的内容。很多情况下以下划线开头表示隐藏文件
- Scala是也是可以访问数据库的,技术选型很重要
SQL语句:不要有多余的空格
UPDATE cwl_rcd_corpus_info_tx SET topic = '187:0.029912,107:0.027155,142:0.025944,10:0.021934,148:0.016860', update_time_tx=14934342342 WHERE article_id_tx=20170614034267;
#!/bin/sh
export PATH=$PATH:/data/Python-2.7.3/bin
export PYTHONPATH=$PYTHONPATH:/data/pac_hadoop/spark/python
virtualenv topics
source topics/bin/activate
/bin/sh /data/pac_hadoop/spark/bin/spark-submit --class com.tencent.TopicShow --master spark://xxxx.xxxx.xxx.xxx:7077 topicshow.jar
select pac_articles.article_id, title, content from pac_articles left join rcd_corpus_info on rcd_corpus_info.article_id where pac_articles.create_time between '1499244660' and '1499307191' limit 10;
select count(a.article_id) from rcd_corpus_info b, pac_articles a where a.article_id=b.article_id and b.create_time between 1499137016 and 1499309816 order by b.create_time desc limit 30000 offset 0;
ps -ef | grep "topics_corpus_update_hdfs.py" | awk '{print $2}'|xargs kill -9
|版权声明:本文为博主原创文章,未经博主允许不得转载。