大数据进阶之路——Spark SQL日志分析

2021年11月25日 阅读数:2
这篇文章主要向大家介绍大数据进阶之路——Spark SQL日志分析,主要内容包括基础应用、实用技巧、原理机制等方面,希望对大家有所帮助。


文章目录



基本方案

用户行为日志:用户每次访问网站时全部的行为数据(访问、浏览、搜索、点击…)javascript

用户行为轨迹、流量日志css

日志数据内容:html

  • 1)访问的系统属性: 操做系统、浏览器等等
  • 2)访问特征:点击的url、从哪一个url跳转过来的(referer)、页面上的停留时间等
  • 3)访问信息:session_id、访问ip(访问城市)等
2013-05-19 13:00:00     http://www.taobao.com/17/?tracker_u=1624169&type=1      B58W48U4WKZCJ5D1T3Z9ZY88RU7QA7B1        http://hao.360.cn/      1.196.34.243 

数据处理流程

  • 1) 数据采集
    Flume: web日志写入到HDFS
  • 2)数据清洗
    脏数据
    Spark、Hive、MapReduce 或者是其余的一些分布式计算框架
    清洗完以后的数据能够存放在HDFS(Hive/Spark SQL)
  • 3)数据处理
    按照咱们的须要进行相应业务的统计和分析
    Spark、Hive、MapReduce 或者是其余的一些分布式计算框架
  • 4)处理结果入库
    结果能够存放到RDBMS、NoSQL
  • 5)数据的可视化
    经过图形化展现的方式展示出来:饼图、柱状图、地图、折线图
    ECharts、HUE、Zeppelin

大数据进阶之路——Spark SQL日志分析_sql

数据清洗

首先经过​​debug​​ 找到分割后各个字段的对应的java

大数据进阶之路——Spark SQL日志分析_spark_02

  • 报错
java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries.

执行第一步数据清洗时候,数据能打印出来,可是不能写入本地文件,这是由于本地没有hadoop伪分布式系统mysql

装一个插件便可jquery

​https://hiszm.lanzous.com/iWyqmhrgk0f​git

下载上述插件,而后,新建目录而且放入到目录里面github

​C:\Data\hadoop\bin​web

而后再系统环境变量添加ajax

​HADOOP_HOME​

​C:\Data\hadoop​

package org.sparksql

import org.apache.spark.sql.SparkSession

object SparkFormatApp {

def main(args: Array[String]): Unit = {

//SparkSession是spark的入口类
val spark = SparkSession.builder().appName("SparkFormatApp")
.master("local[2]").getOrCreate()
val access = spark.sparkContext.textFile("10000_access.log")

//access.take(10).foreach(println)

access.map(line=>{
val splits = line.split(" ")
val ip = splits(0)
val time = splits(3) + " " + splits(4)
val traffic = splits(9)
val url = splits(11).replace("\"","")
//(ip,DateUtils.parse(time),traffic,traffic,url)
DateUtils.parse(time) + "\t" + url + "\t" + traffic + "\t" + ip
}).saveAsTextFile("output")

//.take(10).foreach(println)
//.saveAsTextFile("output")

spark.stop()

}
}

大数据进阶之路——Spark SQL日志分析_big data_03

通常的日志处理方式,咱们是须要进行分区的,

按照日志中的访问时间进行相应的分区,好比:d,h,m5(每5分钟一个分区)

二次清洗

  • 输入:​​访问时间、访问URL、耗费的流量、访问IP地址信息​
  • 输出:​​URL、cmsType(video/article)、cmsId(编号)、流量、ip、城市信息、访问时间、天​
package org.sparksql

import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType}

//访问日志工具转换类
object AccessConvertUtils {

val struct=StructType(
Array(
StructField("url",StringType),
StructField("cmsType",StringType),
StructField("cmsId",LongType),
StructField("traffic",LongType),
StructField("ip",StringType),
StructField("city",StringType),
StructField("time",StringType),
StructField("day",StringType)
)
)

//根据输入的每一行信息转化成输出的样式
def parseLog(log:String)={
try{
val splits=log.split("\t")
val url =splits(1)
val traffic = splits(2).toLong
val ip = splits(3)

val domain="http://www.imooc.com/"
val cms=url.substring(url.indexOf(domain) + domain.length)
val cmsTypeId = cms.split("/")
var cmsType = ""
var cmsId = 0l
if(cmsTypeId.length > 1){
cmsType = cmsTypeId(0)
cmsId = cmsTypeId(1).toLong
}

val city = IpUtils.getCity(ip)
val time = splits(0)
val day = time.substring(0,10).replaceAll("-","")
Row(url,cmsType,cmsId,traffic,ip,city,time,day)
}catch {
case e : Exception => Row(0)
}
}
}


  • IP=>省份

使用github上已有的开源项目

1)​​git clone https://github.com/wzhe06/ipdatabase.git​

2)编译下载的项目:​​mvn clean package -DskipTests​

大数据进阶之路——Spark SQL日志分析_big data_04

3)安装jar包到本身的maven仓库

mvn install:install-file -Dfile=C:\Data\ipdatabase\target\ipdatabase-1.0-SNAPSHOT.jar -DgroupId=com.ggstar -DartifactId=ipdatabase -Dversion=1.0 -Dpackaging=jar

大数据进阶之路——Spark SQL日志分析_ooc_05

大数据进阶之路——Spark SQL日志分析_ide_06

  1. 拷贝相关文件否则会报错

大数据进阶之路——Spark SQL日志分析_ide_07

java.io.FileNotFoundException: file:/Users/rocky/maven_repos/com/ggstar/ipdatabase/1.0/ipdatabase-1.0.jar!/ipRegion.xlsx (No such file or directory)

  1. 测试

大数据进阶之路——Spark SQL日志分析_big data_08

package org.sparksql

import org.apache.spark.sql.SparkSession

object SparkCleanApp {

def main(args: Array[String]): Unit = {
//SparkSession是spark的入口类
val spark = SparkSession.builder().appName("SparkFormatApp")
.master("local[2]").getOrCreate()
val accessRDD = spark.sparkContext.textFile("access.log")

//accessRDD.take(10).foreach(println)

val accessDF = spark.createDataFrame(accessRDD.map(x=>AccessConvertUtils.parseLog(x)),AccessConvertUtils.struct)

accessDF.printSchema()
accessDF.show()

spark.stop
}


}

root
|-- url: string (nullable = true)
|-- cmsType: string (nullable = true)
|-- cmsId: long (nullable = true)
|-- traffic: long (nullable = true)
|-- ip: string (nullable = true)
|-- city: string (nullable = true)
|-- time: string (nullable = true)
|-- day: string (nullable = true)




+--------------------+-------+-----+-------+---------------+----+-------------------+--------+
| url|cmsType|cmsId|traffic| ip|city| time| day|
+--------------------+-------+-----+-------+---------------+----+-------------------+--------+
|http://www.imooc....| video| 4500| 304| 218.75.35.226| 浙江省|2017-05-11 14:09:14|20170511|
|http://www.imooc....| video|14623| 69| 202.96.134.133| 广东省|2017-05-11 15:25:05|20170511|
|http://www.imooc....|article|17894| 115| 202.96.134.133| 广东省|2017-05-11 07:50:01|20170511|
|http://www.imooc....|article|17896| 804| 218.75.35.226| 浙江省|2017-05-11 02:46:43|20170511|
|http://www.imooc....|article|17893| 893|222.129.235.182| 北京市|2017-05-11 09:30:25|20170511|
|http://www.imooc....|article|17891| 407| 218.75.35.226| 浙江省|2017-05-11 08:07:35|20170511|
|http://www.imooc....|article|17897| 78| 202.96.134.133| 广东省|2017-05-11 19:08:13|20170511|
|http://www.imooc....|article|17894| 658|222.129.235.182| 北京市|2017-05-11 04:18:47|20170511|
|http://www.imooc....|article|17893| 161| 58.32.19.255| 上海市|2017-05-11 01:25:21|20170511|
|http://www.imooc....|article|17895| 701| 218.22.9.56| 安徽省|2017-05-11 13:37:22|20170511|
|http://www.imooc....|article|17892| 986| 218.75.35.226| 浙江省|2017-05-11 05:53:47|20170511|
|http://www.imooc....| video|14540| 987| 58.32.19.255| 上海市|2017-05-11 18:44:56|20170511|
|http://www.imooc....|article|17892| 610| 218.75.35.226| 浙江省|2017-05-11 17:48:51|20170511|
|http://www.imooc....|article|17893| 0| 218.22.9.56| 安徽省|2017-05-11 16:20:03|20170511|
|http://www.imooc....|article|17891| 262| 58.32.19.255| 上海市|2017-05-11 00:38:01|20170511|
|http://www.imooc....| video| 4600| 465| 218.75.35.226| 浙江省|2017-05-11 17:38:16|20170511|
|http://www.imooc....| video| 4600| 833|222.129.235.182| 北京市|2017-05-11 07:11:36|20170511|
|http://www.imooc....|article|17895| 320|222.129.235.182| 北京市|2017-05-11 19:25:04|20170511|
|http://www.imooc....|article|17898| 460| 202.96.134.133| 广东省|2017-05-11 15:14:28|20170511|
|http://www.imooc....|article|17899| 389|222.129.235.182| 北京市|2017-05-11 02:43:15|20170511|
+--------------------+-------+-----+-------+---------------+----+-------------------+--------+

调优势:

  1. 控制文件输出的大小: coalesce
  2. 分区字段的数据类型调整:spark.sql.sources.partitionColumnTypeInference.enabled
  3. 批量插入数据库数据,提交使用batch操做
package org.sparksql

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._
object TopNApp {
//最受欢迎
def videoAccessTopN(spark: SparkSession, accessDF: DataFrame) = {
import spark.implicits._
val videoTopNDF = accessDF.filter($"day"==="20170511"&& $"cmsType" === "video")
.groupBy("day","cmsId").agg(count("cmsId")
.as("times")).orderBy($"times".desc)
videoTopNDF.show()

accessDF.createOrReplaceTempView("access_log")
val videoTopNDF1 = spark.sql("select day,cmsId,count(1) as times from access_log where day='20170511' and cmsType = 'video' group by day,cmsId order by times desc")

videoTopNDF1.show()


}

def main(args: Array[String]): Unit = {
//SparkSession是spark的入口类
val spark = SparkSession.builder().appName("SparkFormatApp")
.config("spark.sql.sources.partitionColumnTypeInference.enabled","false")
.master("local[2]").getOrCreate()

val accessDF= spark.read.format("parquet").load("output2/")
accessDF.printSchema()
accessDF.show(false)

videoAccessTopN(spark,accessDF)
spark.stop()
}




}








+--------+-----+------+
| day|cmsId| times|
+--------+-----+------+
|20170511|14540|111027|
|20170511| 4000| 55734|
|20170511|14704| 55701|
|20170511|14390| 55683|
|20170511|14623| 55621|
|20170511| 4600| 55501|
|20170511| 4500| 55366|
|20170511|14322| 55102|
+--------+-----+------+

视频访问

package org.sparksql

import java.sql.{Connection, DriverManager, PreparedStatement}

object MySqlUtils {

def getConnection() ={

// if (!conn.isClosed) System.out.println("已链接上数据库!")
// else System.out.println("没有链接到数据库!")
DriverManager.getConnection("jdbc:mysql://localhost:3306/imooc_user?user=root&password=root")
}
//释放数据库链接资源
def release(connection:Connection,pstmt:PreparedStatement): Unit ={
try{
if(pstmt != null){
pstmt.close()
}

}catch{
case e:Exception => e.printStackTrace()

}finally {
if(connection!=null){
connection.close()
}
}
}

def main(args: Array[String]): Unit = {
println(getConnection())

}

}


大数据进阶之路——Spark SQL日志分析_ooc_09


create table day_video_access_topn_stat (
day varchar(8) not null,
cms_id bigint(10) not null,
times bigint(10) not null,
primary key (day, cms_id)
);

package org.sparksql

import java.sql.{Connection, PreparedStatement}

import scala.collection.mutable.ListBuffer

object StatisticsDAO {
def insertDayVideoAccessTopN(list:ListBuffer[DayVideoAccessStatistics]): Unit ={

var connection:Connection = null
var pstmt:PreparedStatement = null

try{

connection= MySqlUtils.getConnection()
//取消自动提交

connection.setAutoCommit(false)

val sql = "insert into day_video_access_topn_stat(day,cms_id,times) value (? ,? ,? )"
pstmt = connection.prepareStatement(sql)

for(i<-list){
pstmt.setString(1,i.day)
pstmt.setLong(2,i.cmsId)
pstmt.setLong(3,i.times)

pstmt.addBatch()
}

pstmt.executeBatch()//批量处理

//手动提交
connection.commit()

}catch {
case e:Exception=>e.printStackTrace()
}finally {
MySqlUtils.release(connection,pstmt)
}



}

}


package org.sparksql

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._

import scala.collection.mutable.ListBuffer
object TopNApp {
//最受欢迎
def videoAccessTopN(spark: SparkSession, accessDF: DataFrame) = {
import spark.implicits._
val videoTopNDF = accessDF.filter($"day"==="20170511"&& $"cmsType" === "video")
.groupBy("day","cmsId").agg(count("cmsId")
.as("times")).orderBy($"times".desc)


videoTopNDF.show()

try{
videoTopNDF.foreachPartition(partitionOfRecords =>{
val list = new ListBuffer[DayVideoAccessStatistics]

partitionOfRecords.foreach(info =>{
val day = info.getAs[String]("day")
val cmsId = info.getAs[Long]("cmsId")
val times = info.getAs[Long]("times")

list.append(DayVideoAccessStatistics(day,cmsId,times))

})
StatisticsDAO.insertDayVideoAccessTopN(list)
})


}catch {
case e:Exception =>e.printStackTrace()
}

}




大数据进阶之路——Spark SQL日志分析_big data_10

java.sql.SQLException: No value specified for parameter 2

检查插入参数和类型是否一直

按照省份

create table day_video_city_access_topn_stat (
day varchar(8) not null,
cms_id bigint(10) not null,
city varchar(20) not null,
times bigint(10) not null,
times_rank int not null,
primary key (day, cms_id, city)
);

  def cityAccessTopN(spark: SparkSession, accessDF: DataFrame) = {
import spark.implicits._
val cityTopNDF = accessDF.filter($"day"==="20170511"&& $"cmsType" === "video")
.groupBy("day","city","cmsId").agg(count("cmsId")
.as("times")).orderBy($"times".desc)
cityTopNDF.show()

val top3DF = cityTopNDF.select(
cityTopNDF("day"),
cityTopNDF("city"),
cityTopNDF("cmsId"),
cityTopNDF("times"),
row_number().over(Window.partitionBy(cityTopNDF("city"))
.orderBy(cityTopNDF("times").desc)).as("times_rank")
).filter("times_rank <=3")//.show()


try{
top3DF.foreachPartition(partitionOfRecords =>{
val list = new ListBuffer[DayCityAccessStatistics]
partitionOfRecords.foreach(info =>{
val day = info.getAs[String]("day")
val cmsId = info.getAs[Long]("cmsId")
val city = info.getAs[String]("city")
val times = info.getAs[Long]("times")
val timesRank = info.getAs[Int]("times_rank")
list.append(DayCityAccessStatistics(day,cmsId,city,times,timesRank))
})
StatisticsDAO.insertCityVideoAccessTopN(list)
})
}catch {
case e:Exception =>e.printStackTrace()
}
}


大数据进阶之路——Spark SQL日志分析_big data_11

按照流量

create table day_video_traffics_topn_stat (
day varchar(8) not null,
cms_id bigint(10) not null,
traffics bigint(20) not null,
primary key (day, cms_id)
);

  def trafficAccessTopN(spark: SparkSession, accessDF: DataFrame) = {
import spark.implicits._
val trafficTopNDF = accessDF.filter($"day"==="20170511"&& $"cmsType" === "video")
.groupBy("day","cmsId").agg(sum("traffic").as("traffics"))
.orderBy($"traffics".desc)


trafficTopNDF.show()

try{
trafficTopNDF.foreachPartition(partitionOfRecords =>{
val list = new ListBuffer[DayTrafficAccessStatistics]

partitionOfRecords.foreach(info =>{
val day = info.getAs[String]("day")
val cmsId = info.getAs[Long]("cmsId")
val traffics = info.getAs[Long]("traffics")

list.append(DayTrafficAccessStatistics(day,cmsId,traffics))

})
StatisticsDAO.insertTrafficVideoAccessTopN(list)
})
}catch {
case e:Exception =>e.printStackTrace()
}

}


优化

  • 每次更新删除前面的数据
  def deleteData(day:String)={
val tables= Array("day_video_traffics_topn_stat","day_video_city_access_topn_stat","day_video_access_topn_stat")
var connection:Connection = null
var pstmt:PreparedStatement = null
try{

connection = MySqlUtils.getConnection()
for(table<- tables){
val deleteSQL = s"delete from $table where day = ?"
pstmt = connection.prepareStatement(deleteSQL)
pstmt.setString(1,day)
pstmt.executeUpdate()
}

}catch {
case e:Exception => e.printStackTrace()
}finally {
MySqlUtils.release(connection, pstmt)
}
}


数据可视化

数据可视化:一副图片最伟大的价值莫过于它可以使得咱们实际看到的比咱们指望看到的内容更加丰富

常见的可视化框架

1)echarts

2)highcharts

3)D3.js

4)HUE

5)Zeppelin

echarts

大数据进阶之路——Spark SQL日志分析_big data_12

package org.sparkSQL.Utils;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
public class MySqlUtils {

private static final String USERNAME = "root";
private static final String PASSWORD = "root";
private static final String DRIVERCLASS = "com.mysql.jdbc.Driver";
private static final String URL = "jdbc:mysql://localhost:3306/imooc_user";

public static Connection getConnection(){

Connection connection = null;
try {
Class.forName(DRIVERCLASS);
connection = DriverManager.getConnection(URL,USERNAME,PASSWORD);
}catch (Exception e){
e.printStackTrace();
}

return connection;


}



public static void release(Connection connection, PreparedStatement pstmt , ResultSet rs){
if(rs != null){
try{
rs.close();
}catch (Exception e){
e.printStackTrace();
}

}
if(connection != null){
try{
connection.close();
}catch (Exception e){
e.printStackTrace();
}

}

if(pstmt != null){
try{
pstmt.close();
}catch (Exception e){
e.printStackTrace();
}

}

}

public static void main(String[] args) {
System.out.println(getConnection());
}


}


<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<title>ECharts</title>
<!-- 引入 echarts.js -->
<script src="https://cdn.bootcss.com/echarts/3.7.1/echarts.min.js"></script>
<script src="https://s3.pstatp.com/cdn/expire-1-M/jquery/3.1.1/jquery.min.js"></script>
</head>
<body>
<!-- 为ECharts准备一个具有大小(宽高)的Dom -->
<div id="main" style="width: 600px;height:400px;"></div>
<script type="text/javascript">


// 基于准备好的dom,初始化echarts实例
var myChart = echarts.init(document.getElementById('main'));

// 指定图表的配置项和数据
var option = {
title: {
text: '最受欢迎的TOPN',
subtext: '测试',
left: 'center'
},
tooltip: {
trigger: 'item',
formatter: '{a} <br/>{b} : {c} ({d}%)'
},
legend: {
orient: 'vertical',
left: 'left',
data: ['直接访问', '邮件营销', '联盟广告', '视频广告', '搜索引擎']
},
series: [
{
name: '访问次数',
type: 'pie',
radius: '55%',
center: ['50%', '60%'],
data: (function(){
var courses= [];
$.ajax({
type:"GET",
url:"stat?day=20170511",
dataType:'json',
async:false,
success:function (result){
for(var i=0;i<result.length;i++){
courses.push({"value":result[i].value,"name":result[i].name})
}

}
})
return courses;

})(),
emphasis: {
itemStyle: {
shadowBlur: 10,
shadowOffsetX: 0,
shadowColor: 'rgba(0, 0, 0, 0.5)'
}
}
}
]
};


// 使用刚指定的配置项和数据显示图表。
myChart.setOption(option);
</script>
</body>
</html>

大数据进阶之路——Spark SQL日志分析_ide_13


[hadoop@hadoop001 software]$ tar -zxvf zeppelin-0.7.1-bin-all.tgz -C ~/app/

大数据进阶之路——Spark SQL日志分析_sql_14

[hadoop@hadoop001 bin]$ ./zeppelin-daemon.sh start
Log dir doesn't exist, create /home/hadoop/app/zeppelin-0.7.1-bin-all/logs
Pid dir doesn't exist, create /home/hadoop/app/zeppelin-0.7.1-bin-all/run
Zeppelin start

大数据进阶之路——Spark SQL日志分析_spark_15