大数据实战项目记录--农村、城市居民主要疾病死亡率及死因数据分析(国家数据网站)
1爬取医疗行业数据并输出保存为csv格式文件目的:编写爬虫完成爬取国家统计局网站年度数据之卫生行业分类数据,具体爬取数据为:(1)城市居民主要疾病死因构成(2)农村居民主要疾病死因构成将爬取的数据进行初步的分析、整合,要求输出为csv格式文件,供Spark程序处理分析。1.1网站数据查找与分析国家数据网站(国家统计局) :https://data.stats.gov.cn/easyquery.ht
1爬取医疗行业数据并输出保存为csv格式文件
目的:编写爬虫完成爬取国家统计局网站年度数据之卫生行业分类数据,具体爬取数据为:
(1)城市居民主要疾病死因构成
(2)农村居民主要疾病死因构成
将爬取的数据进行初步的分析、整合,要求输出为csv格式文件,供Spark程序处理分析。
1.1网站数据查找与分析
国家数据网站(国家统计局) :https://data.stats.gov.cn/easyquery.htm
选择“年度数据”,然后选择“卫生”行业数据下的“城市居民主要疾病死亡率及死因构成”和“农村居民主要疾病死亡率及死因构成”。
再进一层获取下一级的“城市居民主要疾病死因构成”和“农村居民主要疾病死因构成”数据,均为近十年数据。
数据显示如下:
打开浏览器调试窗口找到具体的请求url地址信息。
通过preview页签核实是否是自己需要的数据。
datanodes:对应的是数据
wdnodes:对应的就是数据标签、或者数据对照表,这里面有疾病指标代码对照、年份列表等等
最终锁定爬取的url目标地址:
Request URL:
https://data.stats.gov.cn/easyquery.htm?m=QueryData&dbcode=hgnd&rowcode=zb&colcode=sj&wds=%5B%5D&dfwds=%5B%7B%22wdcode%22%3A%22zb%22%2C%22valuecode%22%3A%22A0O0G02%22%7D%5D&k1=1625457548015&h=1
参数传递:
Query String Parameters 其实就是get请求中url后面要带的参数。也就是说,向对应网址服务器传递这些参数就可以获得请求的内容。
当发起一次GET请求时,参数会以url string的形式进行传递。即?后的字符串则为其请求参数,并以&作为分隔符。
# coding=utf-8
import requests
import json
import csv
def crawl(url, save_file):
'''
数据爬取,将数据爬取下来保存为json文件,开发时每次读取json文件,降低被封的几率
:param url: 爬取页面url
:param save_file: 保存路径
'''
response = requests.get(url, verify=False) # 直接get会有问题, 设置verify=False移除SSL认证
cont = response.content.decode('utf-8') #HTTP响应内容的二进制(bytes) 形式以utf-8解码
m = json.loads(cont)
print(m)
with open(save_file, 'w', encoding='utf-8') as f:
json.dump(m, f, ensure_ascii=False)
def load_jsondata(file):
'''
读取本地json文件
:param file:
:return: 返回读取的文本数据转换成的字典对象
'''
f = open(file, encoding='utf-8')
m = json.load(f)
f.close()
return m
def save_csvfile(lst, savefile):
'''
将传入的列表数据保存为csv文件
:param lst:
:param savefile:
:return:
'''
with open(savefile, 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
for n in lst:
writer.writerow(n)
def merge(m):
'''
将json数据中的文本数据与列标签进行合并
:param m:
:return:
'''
#datanode格式样例:{'code': 'zb.A0O0G0201_sj.2020', 'data': {'data': 0.0, 'dotcount': 2, 'hasdata': False, 'strdata': ''}, 'wds': [{'valuecode': 'A0O0G0201', 'wdcode': 'zb'}, {'valuecode': '2020', 'wdcode': 'sj'}]}
#wdnode格式样例:{'cname': '城市传染病(不含呼吸道结核)死亡人数占总死亡人数的比重', 'code': 'A0O0G0201', 'dotcount': 2, 'exp': '', 'ifshowcode': False, 'memo': '', 'name': '城市传染病(不含呼吸道结核)死亡人数占总死亡人数的比重', 'nodesort': '1', 'sortcode': 51658, 'tag': '', 'unit': '%'}
#我们需要的数据在strdata中,但strdata外包裹着一层datanodes
datanodes = m['returndata']['datanodes'] # 剥开datanodes,获取数据节点列表
wdnodes = m['returndata']['wdnodes'][0]['nodes'] # 获取列标签(年份)信息列表
print(datanodes[0])
print(wdnodes[4])
# print(len(datanodes))
# print(len(wdnodes[0]['nodes']))
一个wdnode对应十个(十年)datanode,每个wdnode中需要提取两项,cname和code;每个datanode需要提取两项,data和wds[0]的valuecode
# 获取wdnode中的编号和标签名,标签名中含有英文半角逗号,会对结果造成问题,因此转换为全角逗号
urban_l = [[node['code'], node['cname'].replace(',', ',')] for node in wdnodes]
# 获取datanode中的编号和数值
# urban_data_l = [(node['wds'][0]['valuecode'], node['data']['strdata']) if node['data']['strdata'] != '' else
# (node['wds'][0]['valuecode'], '0.0') for node in datanodes]
urban_data_l = [(node['wds'][0]['valuecode'], node['data']['strdata']) for node in datanodes]
tmp_m = {}
tmp_l = []
key = ''
for idx, node in enumerate(urban_data_l):
key = node[0]
tmp_l.append(node[1])
if (idx + 1) % 10 == 0:
tmp_m[key] = tmp_l.copy()
tmp_l.clear()
for n in urban_l:
key = n[0]
n.extend(tmp_m[key])
return urban_l
if __name__ == '__main__':
# 爬取城市居民主要疾病死因和城市居民主要疾病死因(近十年,2011~2020)
urban_url = "https://data.stats.gov.cn/easyquery.htm?m=QueryData&dbcode=hgnd&rowcode=zb&colcode=sj&wds=%5B%5D&dfwds=%5B%7B%22wdcode%22%3A%22zb%22%2C%22valuecode%22%3A%22A0O0G02%22%7D%5D&k1=1623569580790&h=1"
urban_save = 'urban.json'
crawl(urban_url, urban_save)
# 爬取农村居民主要疾病死因和城市居民主要疾病死因(近十年,2011~2020)
rural_url = 'https://data.stats.gov.cn/easyquery.htm?m=QueryData&dbcode=hgnd&rowcode=zb&colcode=sj&wds=%5B%5D&dfwds=%5B%7B%22wdcode%22%3A%22zb%22%2C%22valuecode%22%3A%22A0O0H02%22%7D%5D&k1=1623572236704&h=1'
rural_save = 'rural.json'
crawl(rural_url, rural_save)
urban_m = load_jsondata('urban.json')
rural_m = load_jsondata('rural.json')
urban_l = merge(urban_m)
rural_l = merge(rural_m)
save_csvfile(urban_l, 'urban.csv')
save_csvfile(rural_l, 'rural.csv')
结果展示:
json文件:
csv文件:
2使用spark技术完成数据的统计分析
2.1数据排序
(1)去除数据中为“”的记录,并将数据按照死亡人数由高到低排序
(2)男性按照死亡人数由高到低排序
(3)女性按照死亡人数由高到低排序
将数据爬取处理后的两个csv格式文件放到当前工程的某个文件夹下,此处建立文件夹:data/original
创建Sort.scala程序完成数据处理。除了总排序外,还需实现男性、女性死亡率的排序问题
import org.apache.spark.{SparkConf, SparkContext}
object Sort {
def main(args: Array[String]): Unit = {
val conf=new SparkConf()
.setMaster("local[*]")
.setAppName("sort")
val sc=new SparkContext(conf)
val datas=sc.textFile("./data/original/")
.map(_.split(","))
.filter(arr=>{if (arr(3)!="") true
else false
}).map(arr=>{
(arr(3).toFloat,arr) // 保存到缓存中,提高后面男性和女性分别计算的效率
}).persist()
// 根据key排倒序,并合并到一个partition
datas.sortByKey(ascending = false,1)
.map(_._2)
.map(_.mkString(","))
.saveAsTextFile("./data/sort/total")
datas.filter(_._2(1).contains("男性")).sortByKey(ascending = false,1)
.map(_._2)
.map(_.mkString(","))
.saveAsTextFile("data/sort/man/")
datas.filter(_._2(1).contains("女性")).sortByKey(ascending = false,1)
.map(_._2)
.map(_.mkString(","))
.saveAsTextFile("data/sort/woman")
}
}




2.2计算每种疾病死亡率最高的年份
先创建一个列表用于存储最终的结果(包括疾病编号 疾病名称),然后对读取的数据文件中的数据进行初步的筛选(只保留死亡率数据),然后进行数据的转换(将数据转换为float类型),对数据进行比较取最大值最后整合输出。
package com.fy.spark.sql
import org.apache.spark.{SparkConf, SparkContext}
import scala.::
//计算每种疾病哪一年的死亡率最高
object DeathRate {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName("analyze1").setMaster("local[2]")
val sc = new SparkContext(conf)
sc.textFile("data/original")
.map(_.split(","))
.map(arr => {
// linelist用于保存结果数据
var linelist = arr(1) :: arr(0) :: Nil
// 去掉编号和标签列,只保留数据列
val sub = arr.slice(2, arr.length)
// 将所有数据转换为float类型
sub.foreach(v => {
if (v == "") 0.0
else v.toFloat
})
val years = (2011 to 2020).reverse
// 年份与数据的映射关系 (年份,数据)
val merge = years.zip(sub)
// 计算最大值
val r = merge.reduce((a, b) => {
if (a._2 > b._2) a else b
})
//保存到linelist
((r._1 + ":" + r._2) :: linelist).reverse
})
.map(_.mkString(","))
.saveAsTextFile("data/deathrate")
}
}


2.3计算比较每种疾病城市和农村每年的死亡率哪个更高
import org.apache.spark.{SparkConf, SparkContext}
object Compare {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName("analyze1")
val sc = new SparkContext(conf)
val ruralRDD = sc.textFile("./data/original/rural.csv")
val ruralmapRDD = ruralRDD.map(_.split(","))
.filter(arr=>{
!(arr(1).contains("男性")) && !(arr(1).contains("女性"))
})
.sortBy(arr=>arr(0),true,1)
.zipWithIndex()
.map(t=>(t._2,t._1))
val urbanRDD = sc.textFile("./data/original//urban.csv")
val urbanmapRDD = urbanRDD.map(_.split(","))
.filter(arr=>{
!(arr(1).contains("男性")) && !(arr(1).contains("女性"))
})
.sortBy(arr=>arr(0),true,1)
.zipWithIndex()
.map(t=>(t._2,t._1))
ruralmapRDD.join(urbanmapRDD)
//.foreach(println)
.map(_._2)
//.foreach(println)
.map(t=>{
val rural = t._1
val urban = t._2
rural.zip(urban)
})
.map(arr=>{
val merge = arr.slice(1,arr.length)
//merge.foreach(println)
val resultline = new Array[String](merge.length)
for(i<-merge.indices){
if(i==0){
resultline(i)=merge(i)._1.replace("农村","")
}else{
val one = if(merge(i)._1=="") 0.0 else merge(i)._1.toFloat
val two = if(merge(i)._2=="") 0.0 else merge(i)._2.toFloat
if(one>two)
resultline(i)="农村"
else if(one<two)
resultline(i)="城市"
else resultline(i)="相同"
}
}
resultline
})
.map(_.mkString(","))
.saveAsTextFile("data/compare")
}
}

2.4计算各种疾病死亡的年增长率,及每种疾病死亡率的平均增长率
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable
object Increaserate {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName("analyze1").setMaster("local[2]")
val sc = new SparkContext(conf)
sc.textFile("data/original").map(_.split(",")).map(arr => {
// 汇总每年的增长率,用于计算年平均增长率
var rateList: List[Float] = Nil
// 将每年的增长率整合为列表,用于文本输出
var lineList: List[String] = Nil
// 拆分成的数组前三列为 id,label,2020年数据,因为2020年数据都为"",因此也需要跳过
val endColumn = 3
// 去掉编号和标签列,只保留各年的数据列
val arrCleared = arr.slice(endColumn, arr.length)
// 用于映射年份,key为下面迭代过程的索引
val dataYearMap =
mutable.Map[Int, String](
8 -> "2011",
7 -> "2012",
6 -> "2013",
5 -> "2014",
4 -> "2015",
3 -> "2016",
2 -> "2017",
1 -> "2018",
0 -> "2019")
// 用于保存每条记录共计算几次增长率,用于最后计算每年平均增长率
var flag = 0
// 迭代计算各年增长率,公式为:(当前数据-下一年数据)/当前数据*100%
for (i <- arrCleared.indices.reverse if i > 0) {
//print(arrCleared(i)+":"+arrCleared(i-1)+",")
val strCurData = arrCleared(i)
val strPreData = arrCleared(i - 1)
var s = ""
if (strCurData != "" && strPreData != "") {
val curData = strCurData.toFloat
val preData = strPreData.toFloat
// 计算增长率
val rate = (preData - curData) / curData * 100
// 计算结果转换为字符串,最后保存到linelist中
s = dataYearMap(i) + "~" + dataYearMap(i - 1) + ":" + rate
// 将每年增长率保存到ratelist,用于最后计算平均增长率
rateList = rate :: rateList
flag += 1
} else {
// 如果当年或下一年数据为空字符串,则标识为0.0
s = dataYearMap(i) + "~" + dataYearMap(i - 1) + ":" + 0.0
}
// 将各年增长率生成的字符串保存到linelist
lineList = s :: lineList
}
// 平均增长率并生成字符串
val avg_total = "avg:" + rateList.sum / flag
// 将平均增长率以及整条记录的编号和标签添加到linelist后,通过mkstring转换为以逗号分隔的字符串
(arr(0) :: arr(1) :: (avg_total :: lineList).reverse).mkString(",")
}).saveAsTextFile("data/increaserate")
}
}


更多推荐


所有评论(0)