网约车大数据综合项目——数据分析Spark

网约车大数据综合项目——数据分析Spark

第1关: 统计撤销订单中撤销理由最多的前 10 种理由

    import org.apache.log4j.Level;
    import org.apache.log4j.Logger;
    import org.apache.spark.sql.Dataset;
    import org.apache.spark.sql.Row;
    import org.apache.spark.sql.SaveMode;
    import org.apache.spark.sql.SparkSession;
    public class CancelReasonTop10 {
        public static void main(String[] args) {
            /********** Begin **********/
            Logger.getLogger("org").setLevel(Level.ERROR);
            SparkSession spark = SparkSession.builder().master("local").appName("CancelReasonTop10").getOrCreate();
            Dataset moviesData = spark.read().option("delimiter", "|").csv("/data/workspace/myshixun/data/canceldata.txt")
                    .toDF("companyid", "address", "districtname", "orderid", "ordertime", "canceltime", "operator", "canceltypecode", "cancelreason");
            moviesData.registerTempTable("data");
            spark.sql("select cancelreason,count(*) num from data where cancelreason != '未知' group by cancelreason order by num desc limit 10")
                    .write()
                    .format("jdbc")
                    .option("url", "jdbc:mysql://127.0.0.1:3306/mydb?useUnicode=true&characterEncoding=utf-8")
                    .option("dbtable", "cancelreason")
                    .option("user", "root")
                    .option("password", "123123")
                    .mode(SaveMode.Append)
                    .save();
            /********** End **********/
        }
    }

第2关:查询出成功订单最多的10个地区名

import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
public class OrderByCreateTop10 {
    public static void main(String[] args) {
        /********** Begin **********/
        Logger.getLogger("org").setLevel(Level.ERROR);
        SparkSession spark = SparkSession.builder().master("local").appName("OrderByCreateTop10").getOrCreate();
        Dataset orderdata = spark.read().option("delimiter", "\t").csv("/data/workspace/myshixun/data/createdata.txt")
                .toDF("companyid", "address", "districtname", "orderid", "departtime", "ordertime", "departure", "deplongitude", "deplatitude", "destination", "destlongitude", "destlatitude");
        orderdata.registerTempTable("data");
        spark.sql("select districtname,count(*) num from data group by districtname order by num desc limit 10")
                .write()
                .format("jdbc")
                .option("url", "jdbc:mysql://127.0.0.1:3306/mydb?useUnicode=true&characterEncoding=utf-8")
                .option("dbtable", "order_district")
                .option("user", "root")
                .option("password", "123123")
                .mode(SaveMode.Append)
                .save();
        /********** End **********/
    }
}

第3关:查询订单线路中出行次数最多的五条线路

import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.types.DataTypes;
public class LinesTop5 {
    public static void main(String[] args) {
        /********** Begin **********/
        Logger.getLogger("org").setLevel(Level.ERROR);
        SparkSession spark = SparkSession.builder().master("local").appName("OrderByCreateTop10").getOrCreate();
        Dataset orderdata = spark.read().option("delimiter", "\t").csv("/data/workspace/myshixun/data/createdata.txt")
                .toDF("companyid", "address", "districtname", "orderid", "departtime", "ordertime", "departure", "deplongitude", "deplatitude", "destination", "destlongitude", "destlatitude");
        orderdata.registerTempTable("data");
spark.udf().register("compare", (UDF1) s -> {
            String ss = "";
            int i = s.split("\\*")[0].compareTo(s.split("\\*")[1]);
            if (s.split("\\*").length == 2) {
                if (i >= 0) {
                    ss = s.split("\\*")[0] + "*" + s.split("\\*")[1];
                } else {
                    ss = s.split("\\*")[1] + "*" + s.split("\\*")[0];
                }
            } else if (s.split("\\*").length == 6) {
                if (i >= 0) {
                    ss = s.split("\\*")[0] + "*" + s.split("\\*")[1] + "*" + s.split("\\*")[2] + "*" + s.split("\\*")[3] + "*" + s.split("\\*")[4] + "*" + s.split("\\*")[5];
                } else {
                    ss = s.split("\\*")[1] + "*" + s.split("\\*")[0] + "*" + s.split("\\*")[4] + "*" + s.split("\\*")[5] + "*" + s.split("\\*")[2] + "*" + s.split("\\*")[3];
                }
            }
            return ss;
        }, DataTypes.StringType);
        spark.sql("select compare(concat_ws('*',departure,destination))line,count(*) num from data where departure is not null and destination is not null group by compare(concat_ws('*',departure,destination)) order by num desc limit 5")
                .registerTempTable("t1");
        spark.sql("select concat_ws('*',split(compare(concat_ws('*',departure,destination,deplongitude,deplatitude,destlongitude,destlatitude)),'[*]')[0],split(compare(concat_ws('*',departure,destination,deplongitude,deplatitude,destlongitude,destlatitude)),'[*]')[1])line,compare(concat_ws('*',departure,destination,deplongitude,deplatitude,destlongitude,destlatitude)) bb,count(*) num from data where departure is not null and destination is not null group by compare(concat_ws('*',departure,destination,deplongitude,deplatitude,destlongitude,destlatitude)) order by num desc").registerTempTable("t2");
        spark.sql("select split(bb,'[*]')[0] departure,split(bb,'[*]')[2] deplongitude,split(bb,'[*]')[3] deplatitude,split(bb,'[*]')[1] destination,split(bb,'[*]')[4] destlongitude,split(bb,'[*]')[5] destlatitude,num from(select t1.line,t2.bb,t2.num count,t1.num, Row_Number() OVER (partition by t1.line ORDER BY t2.num desc) rank from t1 left join t2 on t1.line = t2.line order by t1.num desc) where rank=1")                .write()
                .format("jdbc")
                .option("url", "jdbc:mysql://127.0.0.1:3306/mydb?useUnicode=true&characterEncoding=utf-8")
                .option("dbtable", "orderline")
                .option("user", "root")
                .option("password", "123123")
                .mode(SaveMode.Append)
                .save();
       
       
        /********** End **********/
    }
}

第4关:湖南各个市的所有订单总量

import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.types.DataTypes;
public class OrderCountByCity {
    public static void main(String[] args) {
        /********** Begin **********/
        Logger.getLogger("org").setLevel(Level.ERROR);
        SparkSession spark = SparkSession.builder().master("local").appName("OrderCountByCity").getOrCreate();
        Dataset orderdata = spark.read().option("delimiter", "\t").csv("/data/workspace/myshixun/data/createdata.txt")
                .toDF("companyid", "address", "districtname", "orderid","departtime", "ordertime", "departure", "deplongitude", "deplatitude", "destination","destlongitude", "destlatitude");
        orderdata.registerTempTable("data");
        Dataset canceldata = spark.read().option("delimiter", "|").csv("/data/workspace/myshixun/data/canceldata.txt")
                .toDF("companyid", "address", "districtname", "orderid", "ordertime", "canceltime", "operator", "canceltypecode", "cancelreason");
        canceldata.registerTempTable("data1");
        spark.udf().register("city", (UDF1) s -> {
            String city = "";
            if (s.contains("自治州")){
                city = s.split("自治州")[0] + "自治州";
            }else {
                city = s.split("市")[0] + "市";
            }
            return city;
        }, DataTypes.StringType);
        spark.sql("select city(districtname) city,count(*) count from data where districtname like '湖南省%' group by city(districtname)").registerTempTable("order");
        spark.sql("select city(districtname) city,count(*) count from data1 where districtname like '湖南省%' group by city(districtname)").registerTempTable("cancel");
        spark.sql("select order.city,(order.count+cancel.count) num from order left join cancel on order.city == cancel.city order by num desc")
                .write()
                .format("jdbc")
                .option("url", "jdbc:mysql://127.0.0.1:3306/mydb?useUnicode=true&characterEncoding=utf-8")
                .option("dbtable", "orderbycity")
                .option("user", "root")
                .option("password", "123123")
                .mode(SaveMode.Append)
                .save();
        spark.stop();
        /********** End **********/
    }
}

第5关:统计湖南省当天的各时间段订单总数量与各市级当天各时间段订单总数量

import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.types.DataTypes;
public class OrderHourCity {
    public static void main(String[] args) {
        /********** Begin **********/
        Logger.getLogger("org").setLevel(Level.ERROR);
        SparkSession spark = SparkSession.builder().master("local").appName("OrderHourCity").getOrCreate();
        Dataset orderdata = spark.read().option("delimiter", "\t").csv("/data/workspace/myshixun/data/createdata.txt")
                .toDF("companyid", "address", "districtname", "orderid","departtime", "ordertime", "departure", "deplongitude", "deplatitude", "destination","destlongitude", "destlatitude");
        orderdata.registerTempTable("data");
        Dataset canceldata = spark.read().option("delimiter", "|").csv("/data/workspace/myshixun/data/canceldata.txt")
                .toDF("companyid", "address", "districtname", "orderid", "ordertime", "canceltime", "operator", "canceltypecode", "cancelreason");
        canceldata.registerTempTable("data1");
        spark.udf().register("city", (UDF1) s -> {
            String city = "";
            if (s.contains("自治州")) {
                city = s.split("自治州")[0] + "自治州";
            } else {
                city = s.split("市")[0] + "市";
            }
            return city;
        }, DataTypes.StringType);
        spark.sql("select hour(ordertime) hour,city(districtname)city,count(*) count from data1 where districtname like '湖南省%' group by hour(ordertime),city(districtname) order by hour").registerTempTable("t1");
        spark.sql("select hour(ordertime) hour,city(districtname)city,count(*) count from data where districtname like '湖南省%' group by hour(ordertime),city(districtname) order by hour").registerTempTable("t2");
        spark.sql("select (case when t1.hour is null then t2.hour when t2.hour is null then t1.hour else t2.hour end)hour,(case when t1.city is null then t2.city when t2.city is null then t1.city else t2.city end)city,(case when t1.count is null then t2.count when t2.count is null then t1.count else t2.count+t1.count end)num from t1 full join t2 on concat_ws('*',t1.hour,t1.city) = concat_ws('*',t2.hour,t2.city) order by hour,city")
                .write()
                .format("jdbc")
                .option("url", "jdbc:mysql://127.0.0.1:3306/mydb?useUnicode=true&characterEncoding=utf-8")
                .option("dbtable", "order_city_hour")
                .option("user", "root")
                .option("password", "123123")
                .mode(SaveMode.Append)
                .save();
        spark.sql("select (case when t1 is null then t2 when t2 is null then t1 else t2 end) as time ,(case when count1 is null then count2 when count2 is null then count1 else count2+count1 end) as num from(select * from (SELECT DATE_FORMAT(ordertime,'yyyy-MM-dd HH:mm') as t1,count(DATE_FORMAT(ordertime,'yyyy-MM-dd HH:mm')) as count1 FROM data GROUP BY DATE_FORMAT(ordertime,'yyyy-MM-dd HH:mm')) as a FULL OUTER JOIN (SELECT DATE_FORMAT(ordertime,'yyyy-MM-dd HH:mm') as t2,count(DATE_FORMAT(ordertime,'yyyy-MM-dd HH:mm')) as count2 FROM data1 GROUP BY DATE_FORMAT(ordertime,'yyyy-MM-dd HH:mm')) as b on a.t1=b.t2) as c order by time")
                .write()
                .format("jdbc")
                .option("url", "jdbc:mysql://127.0.0.1:3306/mydb?useUnicode=true&characterEncoding=utf-8")
                .option("dbtable", "order_quantity_time")
                .option("user", "root")
                .option("password", "123123")
                .mode(SaveMode.Append)
                .save();
        spark.stop();
        /********** End **********/
    }
}

本文来自网络,不代表协通编程立场,如若转载,请注明出处:https://net2asp.com/5d3f30072f.html