离线实战-网络日志监控分析(四):ETL的第一步:清洗工作3

离线实战-网络日志监控分析(四):ETL的第一步:清洗工作3

在结束了pageView的清洗工作之后我们应该对用户访问进行归纳,例如用户从哪个页面来从哪个页面离开之类的数据

visit编程实现思路

VisitBean:

package cn.leon.Beans;

import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class VisitBean implements Writable {

    private String session;
    private String remote_addr;
    private String inTime;
    private String outTime;
    private String inPage;
    private String outPage;
    private String referal;
    private int pageVisits;

    public void set(String session, String remote_addr, String inTime, String outTime, String inPage, String outPage, String referal, int pageVisits) {
        this.session = session;
        this.remote_addr = remote_addr;
        this.inTime = inTime;
        this.outTime = outTime;
        this.inPage = inPage;
        this.outPage = outPage;
        this.referal = referal;
        this.pageVisits = pageVisits;
    }

    public String getSession() {
        return session;
    }

    public void setSession(String session) {
        this.session = session;
    }

    public String getRemote_addr() {
        return remote_addr;
    }

    public void setRemote_addr(String remote_addr) {
        this.remote_addr = remote_addr;
    }

    public String getInTime() {
        return inTime;
    }

    public void setInTime(String inTime) {
        this.inTime = inTime;
    }

    public String getOutTime() {
        return outTime;
    }

    public void setOutTime(String outTime) {
        this.outTime = outTime;
    }

    public String getInPage() {
        return inPage;
    }

    public void setInPage(String inPage) {
        this.inPage = inPage;
    }

    public String getOutPage() {
        return outPage;
    }

    public void setOutPage(String outPage) {
        this.outPage = outPage;
    }

    public String getReferal() {
        return referal;
    }

    public void setReferal(String referal) {
        this.referal = referal;
    }

    public int getPageVisits() {
        return pageVisits;
    }

    public void setPageVisits(int pageVisits) {
        this.pageVisits = pageVisits;
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        this.session = in.readUTF();
        this.remote_addr = in.readUTF();
        this.inTime = in.readUTF();
        this.outTime = in.readUTF();
        this.inPage = in.readUTF();
        this.outPage = in.readUTF();
        this.referal = in.readUTF();
        this.pageVisits = in.readInt();

    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(session);
        out.writeUTF(remote_addr);
        out.writeUTF(inTime);
        out.writeUTF(outTime);
        out.writeUTF(inPage);
        out.writeUTF(outPage);
        out.writeUTF(referal);
        out.writeInt(pageVisits);
    }

    @Override
    public String toString() {
        return session + "\001" + remote_addr + "\001" + inTime + "\001" + outTime + "\001" + inPage + "\001" + outPage + "\001" + referal + "\001" + pageVisits;
    }
}

Mapper阶段,这里也没什么好说了,就是数据的赋值

package cn.leon.transMapper;

import cn.leon.Beans.PageViewBean;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class TransMapper extends Mapper<LongWritable,Text,Text,PageViewBean> {

    PageViewBean pageViewBean = new PageViewBean();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String[] lines = value.toString().split("\001");

        int step = Integer.parseInt(lines[5]);
        pageViewBean.set(lines[0], lines[1], lines[2], lines[3],lines[4], step, lines[6], lines[7], lines[8], lines[9]);
        context.write(new Text(pageViewBean.getSession()),pageViewBean);
    }
}

reduer阶段

package cn.leon.transReducer;

import cn.leon.Beans.PageViewBean;
import cn.leon.Beans.VisitBean;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;

public class TransReducer extends Reducer<Text, PageViewBean, NullWritable, VisitBean> {



    @Override
    protected void reduce(Text key, Iterable<PageViewBean> values, Context context) throws IOException, InterruptedException {
        ArrayList<PageViewBean> beans = new ArrayList<PageViewBean>();
        for(PageViewBean bean : values){
            PageViewBean pageViewBean = new PageViewBean();
            try {
                BeanUtils.copyProperties(pageViewBean,bean);
                beans.add(pageViewBean);
            } catch (IllegalAccessException e) {
                e.printStackTrace();
            } catch (InvocationTargetException e) {
                e.printStackTrace();
            }
        }


        Collections.sort(beans, new Comparator<PageViewBean>() {
            @Override
            public int compare(PageViewBean o1, PageViewBean o2) {

                return o1.getStep() - o2.getStep();
            }
        });


        // 取这次visit的首尾pageview记录,将数据放入VisitBean中
        VisitBean visitBean = new VisitBean();

        // 取visit的首记录
        visitBean.setInPage(beans.get(0).getRequest());
        visitBean.setInTime(beans.get(0).getTimestr());
        // 取visit的尾记录
        visitBean.setOutPage(beans.get(beans.size() - 1).getRequest());
        visitBean.setOutTime(beans.get(beans.size() - 1).getTimestr());
        // visit访问的页面数
        visitBean.setPageVisits(beans.size());
        // 来访者的ip
        visitBean.setRemote_addr(beans.get(0).getRemote_addr());
        // 本次visit的referal
        visitBean.setReferal(beans.get(0).getReferal());
        visitBean.setSession(key.toString());

        context.write(NullWritable.get(), visitBean);
    }
}

reduce阶段的任务很明确,就是通过步骤step字段来分析用户第一个步骤的行为和最后一个步骤的行为即可,分别提取出来聚合到Session中就OK了。

相关新闻