离线实战-网络日志监控分析(四):ETL的第一步:清洗工作3
离线实战-网络日志监控分析(四):ETL的第一步:清洗工作3
在结束了pageView的清洗工作之后我们应该对用户访问进行归纳,例如用户从哪个页面来从哪个页面离开之类的数据
visit编程实现思路
VisitBean:
package cn.leon.Beans;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class VisitBean implements Writable {
private String session;
private String remote_addr;
private String inTime;
private String outTime;
private String inPage;
private String outPage;
private String referal;
private int pageVisits;
public void set(String session, String remote_addr, String inTime, String outTime, String inPage, String outPage, String referal, int pageVisits) {
this.session = session;
this.remote_addr = remote_addr;
this.inTime = inTime;
this.outTime = outTime;
this.inPage = inPage;
this.outPage = outPage;
this.referal = referal;
this.pageVisits = pageVisits;
}
public String getSession() {
return session;
}
public void setSession(String session) {
this.session = session;
}
public String getRemote_addr() {
return remote_addr;
}
public void setRemote_addr(String remote_addr) {
this.remote_addr = remote_addr;
}
public String getInTime() {
return inTime;
}
public void setInTime(String inTime) {
this.inTime = inTime;
}
public String getOutTime() {
return outTime;
}
public void setOutTime(String outTime) {
this.outTime = outTime;
}
public String getInPage() {
return inPage;
}
public void setInPage(String inPage) {
this.inPage = inPage;
}
public String getOutPage() {
return outPage;
}
public void setOutPage(String outPage) {
this.outPage = outPage;
}
public String getReferal() {
return referal;
}
public void setReferal(String referal) {
this.referal = referal;
}
public int getPageVisits() {
return pageVisits;
}
public void setPageVisits(int pageVisits) {
this.pageVisits = pageVisits;
}
@Override
public void readFields(DataInput in) throws IOException {
this.session = in.readUTF();
this.remote_addr = in.readUTF();
this.inTime = in.readUTF();
this.outTime = in.readUTF();
this.inPage = in.readUTF();
this.outPage = in.readUTF();
this.referal = in.readUTF();
this.pageVisits = in.readInt();
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(session);
out.writeUTF(remote_addr);
out.writeUTF(inTime);
out.writeUTF(outTime);
out.writeUTF(inPage);
out.writeUTF(outPage);
out.writeUTF(referal);
out.writeInt(pageVisits);
}
@Override
public String toString() {
return session + "\001" + remote_addr + "\001" + inTime + "\001" + outTime + "\001" + inPage + "\001" + outPage + "\001" + referal + "\001" + pageVisits;
}
}
Mapper阶段,这里也没什么好说了,就是数据的赋值
package cn.leon.transMapper;
import cn.leon.Beans.PageViewBean;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class TransMapper extends Mapper<LongWritable,Text,Text,PageViewBean> {
PageViewBean pageViewBean = new PageViewBean();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] lines = value.toString().split("\001");
int step = Integer.parseInt(lines[5]);
pageViewBean.set(lines[0], lines[1], lines[2], lines[3],lines[4], step, lines[6], lines[7], lines[8], lines[9]);
context.write(new Text(pageViewBean.getSession()),pageViewBean);
}
}
reduer阶段
package cn.leon.transReducer;
import cn.leon.Beans.PageViewBean;
import cn.leon.Beans.VisitBean;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
public class TransReducer extends Reducer<Text, PageViewBean, NullWritable, VisitBean> {
@Override
protected void reduce(Text key, Iterable<PageViewBean> values, Context context) throws IOException, InterruptedException {
ArrayList<PageViewBean> beans = new ArrayList<PageViewBean>();
for(PageViewBean bean : values){
PageViewBean pageViewBean = new PageViewBean();
try {
BeanUtils.copyProperties(pageViewBean,bean);
beans.add(pageViewBean);
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (InvocationTargetException e) {
e.printStackTrace();
}
}
Collections.sort(beans, new Comparator<PageViewBean>() {
@Override
public int compare(PageViewBean o1, PageViewBean o2) {
return o1.getStep() - o2.getStep();
}
});
// 取这次visit的首尾pageview记录,将数据放入VisitBean中
VisitBean visitBean = new VisitBean();
// 取visit的首记录
visitBean.setInPage(beans.get(0).getRequest());
visitBean.setInTime(beans.get(0).getTimestr());
// 取visit的尾记录
visitBean.setOutPage(beans.get(beans.size() - 1).getRequest());
visitBean.setOutTime(beans.get(beans.size() - 1).getTimestr());
// visit访问的页面数
visitBean.setPageVisits(beans.size());
// 来访者的ip
visitBean.setRemote_addr(beans.get(0).getRemote_addr());
// 本次visit的referal
visitBean.setReferal(beans.get(0).getReferal());
visitBean.setSession(key.toString());
context.write(NullWritable.get(), visitBean);
}
}
reduce阶段的任务很明确,就是通过步骤step字段来分析用户第一个步骤的行为和最后一个步骤的行为即可,分别提取出来聚合到Session中就OK了。