Spark也是一个集群计算系统,提供Python,Java,Scala,R语言的高级API进行数据操作。Spark有各种优点,请自行搜索。
一、下载安装spark
下载编译好的二进制版本,目前还用不到hadoop选择第一个安装。
wget http://mirrors.cnnic.cn/apache/spark/spark-1.5.2/spark-1.5.2-bin-hadoop2.6.tgz
解压
tar zxvf spark-1.5.2-bin-hadoop2.6.tgz
使用默认参数进入python交互模式。启动参数可以指定Spark集群的地址,处理的线程数等值。
park-1.5.2-bin-hadoop2.6/bin/pyspark
交互模式下,Spark会默认给你启动一个SparkContext,名字为sc。你可以执行以下命令查看版本和应用名字。
>>> sc.version
u'1.5.2'
>>> sc.appName
u'PySparkShell'
二、分析nginx log文件
在交互模式下输入,创建一个RDD,textFile会按行读入一个文件,这种方式读入的文件,不会随文件的更新数据进行更新。
第二篇将进行脚本的编写和使用Stream进行实时数据的统计。
>>>log=sc.textFile('./app_access.log')
查看文档有多少行,对应nginx收到多少请求。count是一个action,返回值。
>>> log.count()
2186762
返回文档第一行
>>> log.first()
u'117.136.40.185 [11/Nov/2015:06:25:52 +0800] "POST /zmw/v2/favorite_status HTTP/1.1" 200 0.013 61 "-" "%E6%B2%B3%E7%8B%B8%E5%AE%B6/2891 CFNetwork/758.1.6 Darwin/15.0.0" "-" 0.013 Upstream:"10.0.10.135:8700"'
查看IP为"117.136.40.185"的访问次数。filter是一个transformations,返回一个新的RDD,然后对这个新的rdd进行count求值
>>> log.filter(lambda line: "117.136.40.185" == line.split()[0]).count()
637
可以使用cache操作将经常使用RDD尽可能的存放在内存中,加快计算速度。
>>> log.cache()
统计每个ip的访问次数,还可以统计每个url的访问次数,HTTP status code分别的访问次数等相关类似统计
>>> ips = log.map(lambda line: (line.split()[0], 1)).reduceByKey(lambda a, b: a + b)
>>> ips
PythonRDD[16] at RDD at PythonRDD.scala:43
>>> ips.collect()
验证之前统计过的,数据是否一致
>>> b=ips.filter(lambda a: a[0]=="117.136.40.185")
>>> b.collect()
[(u'117.136.40.185', 637)]
三、监控
可以从WEB访问,查看job执行情况和统计数据
http://172.16.117.0:4040/jobs/
Read more...