离线实战-网络日志监控分析(三):ETL的第一步:清洗工作2

离线实战-网络日志监控分析(三):ETL的第一步:清洗工作2

接下来我们就要开始创建pageView的表了,所谓PageView就是展示用户的行为,从哪个网页过来。跳转了几个网页,停留了多长时间等。那么我们用

pageviews编程实现思路

在这里我们省去了运行的main程序,这个参考上节我们再PreCleanData中的main方法。因为都是标准化代码。就不再展示了。现在就开始归纳怎么将标准化数据变成点击流模型:

mapper的话没有什么好说的。主要是把该赋值的属性参数赋值,因为这里主要涉及到聚合的操作。在mapper阶段就相当简化了。

package cn.leon.transMapper;

import cn.leon.pageViewBean.CleanBean;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class TransMapper extends Mapper<LongWritable,Text, Text, CleanBean> {

    CleanBean cleanBean = new CleanBean();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        //切分
        String lines[] = value.toString().split("\001");

        //如果对象长度不为9个则直接返回空
        if (lines.length < 9) return;

        //将每个字段都对应赋值到cleanBean上
        boolean vaild = lines[0].equals("true") ? true : false;
        cleanBean.set(vaild,lines[1],lines[2],lines[3],lines[4],lines[5],lines[6],lines[7],lines[8]);

        //将cleanBean中的ip为key,内容为cleanbean本身作为k2,v2
        if (cleanBean.isValid()){
            context.write(new Text(cleanBean.getRemote_addr()),cleanBean);
        }
    }
}

接下来就是重头戏Reduce阶段了。我们知道,我们要聚合的是用户的ip地址。是要统计在相同ip下用户的操作步骤和停留时间。那么久可以用ip作为key,cleanBean作为Value进行Reduce的聚合。然后进行排序,比较相邻两条记录中的时间差,如果时间差<30分钟,则该两条记录属于同一个session。

package cn.leon.transReducer;

import cn.leon.pageViewBean.CleanBean;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.codehaus.jackson.map.util.BeanUtil;

import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.*;

public class TransReducer extends Reducer<Text, CleanBean, NullWritable, Text> {
    @Override
    protected void reduce(Text key, Iterable<CleanBean> values, Context context) throws IOException, InterruptedException {
        ArrayList<CleanBean> beans = new ArrayList<CleanBean>();
        Text v = new Text();

        for (CleanBean bean : values){
            //为什么list集合当中不能直接添加循环出来的这个bean?
            //这里通过属性拷贝,每次new  一个对象,避免了bean的属性值每次覆盖
            //这是涉及到java的深浅拷贝问题
            CleanBean cleanBean = new CleanBean();
            try {
                BeanUtils.copyProperties(cleanBean,bean);
            } catch (IllegalAccessException e) {
                e.printStackTrace();
            } catch (InvocationTargetException e) {
                e.printStackTrace();
            }

            beans.add(cleanBean);
        }

        //将bean按时间先后顺序排序,排好序之后,就计算这个集合当中下一个时间和上一个时间的差值 ,如
        //如果差值小于三十分钟,那么就代表一次会话,如果差值大于30分钟,那么就代表多次会话
        //将我们的weblogBean塞到一个集合当中,我们就可以自定义排序,对集合当中的数据进行排序
        Collections.sort(beans, new Comparator<CleanBean>() {

            @Override
            public int compare(CleanBean o1, CleanBean o2) {
                try {
                    Date d1 = toDate(o1.getTime_local());
                    Date d2 = toDate(o2.getTime_local());
                    if (d1 == null || d2 == null) {
                        return 0;
                    }

                    return d1.compareTo(d2);
                } catch (ParseException e) {
                    e.printStackTrace();
                    return 0;
                }
            }
        });




        /**
         * 以下逻辑为:从有序bean中分辨出各次visit,并对一次visit中所访问的page按顺序标号step
         * 核心思想:
         * 就是比较相邻两条记录中的时间差,如果时间差<30分钟,则该两条记录属于同一个session
         * 否则,就属于不同的session
         *
         */

        int step = 1;

        //为了避免重复,session取UUID中的随机数
        String session = UUID.randomUUID().toString();

        for(int i = 0; i <beans.size(); i ++){
            CleanBean bean = beans.get(i);
            //如果数据只有一条,则直接输出
            if (beans.size() == 1){
                // 设置默认停留时长为60s
                v.set(session+"\001"+key.toString()+"\001"+bean.getRemote_user() + "\001" + bean.getTime_local() + "\001" + bean.getRequest() + "\001" + step + "\001" + (60) + "\001" + bean.getHttp_referer() + "\001" + bean.getHttp_user_agent() + "\001" + bean.getBody_bytes_sent() + "\001"
                        + bean.getStatus());
                context.write(NullWritable.get(), v);
                session = UUID.randomUUID().toString();
                break;
            }


            //如果数据不止一条,则将第一条跳过,遍历第二条时候再输出
            if (i == 0){
                continue;
            }

            //求两次时间差
            try {
                long timeDiff = timeDiff(toDate(bean.getTime_local()), toDate(beans.get(i - 1).getTime_local()));

                // 如果本次-上次时间差<30分钟,则输出前一次的页面访问信息
                if (timeDiff < 30 * 60 * 1000) {

                    v.set(session+"\001"+key.toString()+"\001"+beans.get(i - 1).getRemote_user() + "\001" + beans.get(i - 1).getTime_local() + "\001" + beans.get(i - 1).getRequest() + "\001" + step + "\001" + (timeDiff / 1000) + "\001" + beans.get(i - 1).getHttp_referer() + "\001"
                            + beans.get(i - 1).getHttp_user_agent() + "\001" + beans.get(i - 1).getBody_bytes_sent() + "\001" + beans.get(i - 1).getStatus());
                    context.write(NullWritable.get(), v);
                    step++;
                } else {

                    // 如果本次-上次时间差>30分钟,则输出前一次的页面访问信息且将step重置,以分隔为新的visit
                    v.set(session+"\001"+key.toString()+"\001"+beans.get(i - 1).getRemote_user() + "\001" + beans.get(i - 1).getTime_local() + "\001" + beans.get(i - 1).getRequest() + "\001" + (step) + "\001" + (60) + "\001" + beans.get(i - 1).getHttp_referer() + "\001"
                            + beans.get(i - 1).getHttp_user_agent() + "\001" + beans.get(i - 1).getBody_bytes_sent() + "\001" + beans.get(i - 1).getStatus());
                    context.write(NullWritable.get(), v);
                    // 输出完上一条之后,重置step编号
                    step = 1;
                    session = UUID.randomUUID().toString();
                }

                // 如果此次遍历的是最后一条,则将本条直接输出
                if (i == beans.size() - 1) {
                    // 设置默认停留时长为60s
                    v.set(session+"\001"+key.toString()+"\001"+bean.getRemote_user() + "\001" + bean.getTime_local() + "\001" + bean.getRequest() + "\001" + step + "\001" + (60) + "\001" + bean.getHttp_referer() + "\001" + bean.getHttp_user_agent() + "\001" + bean.getBody_bytes_sent() + "\001" + bean.getStatus());
                    context.write(NullWritable.get(), v);
                }

            } catch (ParseException e) {
                e.printStackTrace();
            }

        }
    }

    private Date toDate(String timeStr) throws ParseException {
        SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US);
        return df.parse(timeStr);
    }

    private long timeDiff(Date time1, Date time2) throws ParseException {

        return time1.getTime() - time2.getTime();

    }
}

pageBean对象

package cn.leon.pageViewBean;

import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class PageViewBean implements Writable {

    private String session; // 回话session
    private String remote_addr;
    private String timestr;
    private String request;
    private int step;
    private String staylong;
    private String referal;
    private String useragent;
    private String bytes_send;
    private String status;

    public String getSession() {
        return session;
    }

    public void setSession(String session) {
        this.session = session;
    }

    public String getRemote_addr() {
        return remote_addr;
    }

    public void setRemote_addr(String remote_addr) {
        this.remote_addr = remote_addr;
    }

    public String getTimestr() {
        return timestr;
    }

    public void setTimestr(String timestr) {
        this.timestr = timestr;
    }

    public String getRequest() {
        return request;
    }

    public void setRequest(String request) {
        this.request = request;
    }

    public int getStep() {
        return step;
    }

    public void setStep(int step) {
        this.step = step;
    }

    public String getStaylong() {
        return staylong;
    }

    public void setStaylong(String staylong) {
        this.staylong = staylong;
    }

    public String getReferal() {
        return referal;
    }

    public void setReferal(String referal) {
        this.referal = referal;
    }

    public String getUseragent() {
        return useragent;
    }

    public void setUseragent(String useragent) {
        this.useragent = useragent;
    }

    public String getBytes_send() {
        return bytes_send;
    }

    public void setBytes_send(String bytes_send) {
        this.bytes_send = bytes_send;
    }

    public String getStatus() {
        return status;
    }

    public void setStatus(String status) {
        this.status = status;
    }

    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeUTF(this.session);
        dataOutput.writeUTF(this.remote_addr);
        dataOutput.writeUTF(this.timestr);
        dataOutput.writeUTF(this.request);
        dataOutput.writeInt(this.step);
        dataOutput.writeUTF(this.staylong);
        dataOutput.writeUTF(this.referal);
        dataOutput.writeUTF(this.useragent);
        dataOutput.writeUTF(this.bytes_send);
        dataOutput.writeUTF(this.status);
    }

    @Override
    public void readFields(DataInput dataInput) throws IOException {
        this.session = dataInput.readUTF();
        this.remote_addr = dataInput.readUTF();
        this.timestr = dataInput.readUTF();
        this.request = dataInput.readUTF();
        this.step = dataInput.readInt();
        this.staylong = dataInput.readUTF();
        this.referal = dataInput.readUTF();
        this.useragent = dataInput.readUTF();
        this.bytes_send = dataInput.readUTF();
        this.status = dataInput.readUTF();
    }
}

相关新闻