顯示具有 bigdata 標籤的文章。 顯示所有文章
顯示具有 bigdata 標籤的文章。 顯示所有文章

2016年5月20日 星期五

[PySpark] MLlib Regression example



[hadoop@master01 spark-1.6.0]$ cd /opt/spark-1.6.0/python/
[hadoop@master01 python]$ pyspark
Python 2.7.5 (default, Nov 20 2015, 02:00:19) 
[GCC 4.8.5 20150623 (Red Hat 4.8.5-4)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
16/05/19 20:10:08 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 1.6.0
/_/

Using Python version 2.7.5 (default, Nov 20 2015 02:00:19)
SparkContext available as sc, HiveContext available as sqlContext.
>>> from pyspark.sql.types import *
>>> from pyspark.sql import Row
>>> rdd = sc.textFile('file:/opt/data/Sacramentorealestatetransactions.csv')
>>> rdd = rdd.map(lambda line: line.split(","))

Now now we can see that each line has been broken into Spark's RDD tuple format, which is what we want. However, we'll want to remove the header before we convert to a DataFrame since there's not a straightforward way (that I know of) to tell Spark to interpret that header as a list of column names.

>>> header = rdd.first()
>>> rdd = rdd.filter(lambda line:line != header)

Now we can see that the header has been removed.

>>> df = rdd.map(lambda line: Row(street = line[0], city = line[1], zip=line[2], beds=line[4], baths=line[5], sqft=line[6], price=line[9])).toDF()
16/05/19 20:11:04 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
16/05/19 20:11:04 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
16/05/19 20:11:08 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0
16/05/19 20:11:08 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException
16/05/19 20:11:10 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
16/05/19 20:11:10 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
>>> 
>>> 
>>> 
>>> favorite_zip = df[df.zip == 95815]
>>> favorite_zip.show(5)
+-----+----+----------+------+----+----------------+-----+
|baths|beds| city| price|sqft| street| zip|
+-----+----+----------+------+----+----------------+-----+
| 1| 2|SACRAMENTO| 68880| 796| 2796 BRANCH ST|95815|
| 1| 2|SACRAMENTO| 69307| 852|2805 JANETTE WAY|95815|
| 1| 1|SACRAMENTO|106852| 871| 2930 LA ROSA RD|95815|
| 1| 2|SACRAMENTO| 78000| 800| 3132 CLAY ST|95815|
| 2| 4|SACRAMENTO| 89000|1316| 483 ARCADE BLVD|95815|
+-----+----+----------+------+----+----------------+-----+
only showing top 5 rows

>>> 
>>> 
>>> import pyspark.mllib
>>> import pyspark.mllib.regression
>>> from pyspark.mllib.regression import LabeledPoint
>>> from pyspark.sql.functions import *

Let's remove those rows that have suspicious 0 values for any of the features we want to use for prediction

>>> df = df.select('price','baths','beds','sqft')
>>> df = df[df.baths > 0]
>>> df = df[df.beds > 0]
>>> df = df[df.sqft > 0]
>>> df.describe(['baths','beds','price','sqft']).show()

+-------+------------------+------------------+------------------+------------------+
|summary| baths| beds| price| sqft|
+-------+------------------+------------------+------------------+------------------+
| count| 814| 814| 814| 814|
| mean|1.9606879606879606|3.2444717444717446| 229448.3697788698|1591.1461916461917|
| stddev|0.6698038253879438|0.8521372615281976|119825.57606009026| 663.8419297942894|
| min| 1| 1| 100000| 1000|
| max| 5| 8| 99000| 998|
+-------+------------------+------------------+------------------+------------------+

Labeled Points and Scaling Data


>>> 
>>> temp = df.map(lambda line:LabeledPoint(line[0],[line[1:]]))
>>> temp.take(5)
[LabeledPoint(59222.0, [1.0,2.0,836.0]), LabeledPoint(68212.0, [1.0,3.0,1167.0]), LabeledPoint(68880.0, [1.0,2.0,796.0]), LabeledPoint(69307.0, [1.0,2.0,852.0]), LabeledPoint(81900.0, [1.0,2.0,797.0])]
>>> 
>>> 
>>> 
>>> from pyspark.mllib.util import MLUtils
>>> from pyspark.mllib.linalg import Vectors
>>> from pyspark.mllib.feature import StandardScaler
>>> 
>>> features = df.map(lambda row: row[1:])
>>> features.take(5)
[(u'1', u'2', u'836'), (u'1', u'3', u'1167'), (u'1', u'2', u'796'), (u'1', u'2', u'852'), (u'1', u'2', u'797')]
>>> 
>>> 
>>> 
>>> standardizer = StandardScaler()
>>> model = standardizer.fit(features)
>>> features_transform = model.transform(features)
>>> 
>>> features_transform.take(5)
[DenseVector([1.493, 2.347, 1.2593]), DenseVector([1.493, 3.5206, 1.7579]), DenseVector([1.493, 2.347, 1.1991]), DenseVector([1.493, 2.347, 1.2834]), DenseVector([1.493, 2.347, 1.2006])]
>>> 
>>> 
>>> lab = df.map(lambda row: row[0])
>>> lab.take(5)
[u'59222', u'68212', u'68880', u'69307', u'81900']
>>> 
>>> transformedData = lab.zip(features_transform)
>>> transformedData.take(5)
[(u'59222', DenseVector([1.493, 2.347, 1.2593])), (u'68212', DenseVector([1.493, 3.5206, 1.7579])), (u'68880', DenseVector([1.493, 2.347, 1.1991])), (u'69307', DenseVector([1.493, 2.347, 1.2834])), (u'81900', DenseVector([1.493, 2.347, 1.2006]))]
>>> 
>>> 
>>> transformedData = transformedData.map(lambda row: LabeledPoint(row[0],[row[1]]))
>>> transformedData.take(5)
[LabeledPoint(59222.0, [1.49297445326,2.34703972035,1.25933593899]), LabeledPoint(68212.0, [1.49297445326,3.52055958053,1.7579486134]), LabeledPoint(68880.0, [1.49297445326,2.34703972035,1.19908063091]), LabeledPoint(69307.0, [1.49297445326,2.34703972035,1.28343806223]), LabeledPoint(81900.0, [1.49297445326,2.34703972035,1.20058701361])]
>>> 
>>> 
>>> trainingData, testingData = transformedData.randomSplit([.8,.2],seed=1234)
>>> from pyspark.mllib.regression import LinearRegressionWithSGD
>>> linearModel = LinearRegressionWithSGD.train(trainingData,1000,.2)
16/05/19 20:13:49 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
16/05/19 20:13:49 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
>>> linearModel.weights
DenseVector([15098.627, 3792.023, 70216.8097])
>>> 
>>> 
>>> testingData.take(10)
[LabeledPoint(100309.0, [2.98594890652,3.52055958053,1.36930187625]), LabeledPoint(124100.0, [2.98594890652,3.52055958053,2.41171870613]), LabeledPoint(148750.0, [2.98594890652,4.69407944071,2.21739533756]), LabeledPoint(150000.0, [1.49297445326,1.17351986018,1.14485085363]), LabeledPoint(161500.0, [2.98594890652,4.69407944071,2.3906293483]), LabeledPoint(166357.0, [1.49297445326,4.69407944071,2.94497818269]), LabeledPoint(168000.0, [2.98594890652,3.52055958053,2.22492725107]), LabeledPoint(178480.0, [2.98594890652,3.52055958053,1.78506350204]), LabeledPoint(181872.0, [1.49297445326,3.52055958053,1.73535287287]), LabeledPoint(182587.0, [4.47892335978,4.69407944071,2.78831438167])]
>>> 
>>> 
>>> linearModel.predict([1.49297445326,3.52055958053,1.73535287287])
157742.84989605084
>>> 
>>> 
>>> from pyspark.mllib.evaluation import RegressionMetrics
>>> prediObserRDDin = trainingData.map(lambda row: (float(linearModel.predict(row.features[0])),row.label))
>>> metrics = RegressionMetrics(prediObserRDDin)
>>> 
>>> 
>>> metrics.r2
0.4969184679643588 
>>> 
>>> 
>>> prediObserRDDout = testingData.map(lambda row: (float(linearModel.predict(row.features[0])),row.label))
>>> metrics = RegressionMetrics(prediObserRDDout)
>>> 
>>> 
>>> etrics.rootMeanSquaredError
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
NameError: name 'etrics' is not defined
>>> metrics.rootMeanSquaredError
94895.10434498572



[Reference]
http://www.techpoweredmath.com/spark-dataframes-mllib-tutorial/

2016年5月9日 星期一

[Spark] Collaborative Filtering, alternating least squares (ALS) practice


Collaborative Filtering - spark.mllib
http://spark.apache.org/docs/latest/mllib-collaborative-filtering.html#collaborative-filtering

In the following example we load rating data. Each row consists of a user, a product and a rating. We use the default ALS.train() method which assumes ratings are explicit. We evaluate the recommendation model by measuring the Mean Squared Error of rating prediction.







Result :
Mean Squared Error = 5.491294660658085E-6



-------------------------------------------------------------------------------------------------------

ERROR : taskSchedulerImpl: Initial job has not accepted any resources
http://www.datastax.com/dev/blog/common-spark-troubleshooting






-------------------------------------------------------------------------------------------------------

ALS
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.recommendation.ALS$

ALS.scala
https://github.com/apache/spark/blob/v1.6.1/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala

Movie Recommendations with MLlib
https://databricks-training.s3.amazonaws.com/movie-recommendation-with-mllib.html

Dataset - MovieLens 1M Dataset
http://grouplens.org/datasets/movielens/



2015年5月17日 星期日

Cluster監控的tool (持續更新中)





















Sematext - SPM

我覺得它的UI非常好看,但是要收費




ambari包含了Ganglia & Nagios

Installing a Hadoop Cluster with three Commands

Ambari (the graphical monitoring and management environment for Hadoop)


ambari安裝經驗分享


使用Ambari快速部署Hadoop大数据环境

http://www.cnblogs.com/scotoma/archive/2013/05/18/3085248.html

Ganglia介紹

http://www.ascc.sinica.edu.tw/iascc/articals.php?_section=2.4&_op=?articalID:5134

























RPi-Monitor


專門監控Raspberry Pi

  • CPU Loads
  • Network
  • Disk Boot
  • Disk Root
  • Swap
  • Memory
  • Uptime
  • Temperature




2015/05/17

我要尋找監控Hadoop和Spark效能以及cluster功率消耗的tool
目前還沒找到最理想的解決方法 

[Paper Note] Raspberry Pi相關的paper


Heterogeneity: The Key to Achieve Power-Proportional Computing


da Costa, G. ; IRIT, Univ. de Toulouse, Toulouse, France

The Smart 2020 report on low carbon economy in the information age shows that 2% of the global CO2footprint will come from ICT in 2020. Out of these, 18% will be caused by data-centers, while 45% will come from personal computers. Classical research to reduce this footprint usually focuses on new consolidation techniques for global data-centers. In reality, personal computers and private computing infrastructures are here to stay. They are subject to irregular workload, and are usually largely under-loaded. Most of these computers waste tremendous amount of energy as nearly half of their maximum power consumption comes from simply being switched on. The ideal situation would be to use proportional computers that use nearly 0W when lightly loaded. This article shows the gains of using a perfectly proportional hardware on different type of data-centers: 50% gains for the servers used during 98 World Cup, 20% to the already optimized Google servers. Gains would attain up to 80% for personal computers. As such perfect hardware still does not exist, a real platform composed of Intel I7, Intel Atom and Raspberry Pi is evaluated. Using this infrastructure, gains are of 20% for the World Cup data-center, 5% for Google data-centers and up to 60% for personal computers.
這篇paper有拿intel的處理器和Pi作效能上的比較,可以做為異質環境比較的參考

Published in:

Cluster, Cloud and Grid Computing (CCGrid), 2013 13th IEEE/ACM International Symposium on

Date of Conference:

13-16 May 2013


Affordable and Energy-Efficient Cloud Computing Clusters: The Bolzano Raspberry Pi Cloud Cluster Experiment


Abrahamsson, P. ; Fac. of Comput. Sci., Free Univ. of Bozen-Bolzano, Bolzano, Italy ; Helmer, S. ; Phaphoom, N. ; Nicolodi, L. 

We present our ongoing work building a Raspberry Pi cluster consisting of 300 nodes. The unique characteristics of this single board computer pose several challenges, but also offer a number of interesting opportunities. On the one hand, a single Raspberry Pi can be purchased cheaply and has a low power consumption, which makes it possible to create an affordable and energy-efficient cluster. On the other hand, it lacks in computing power, which makes it difficult to run computationally intensive software on it. Nevertheless, by combining a large number of Raspberries into a cluster, this drawback can be (partially) offset. Here we report on the first important steps of creating our cluster: how to set up and configure the hardware and the system software, and how to monitor and maintain the system. We also discuss potential use cases for our cluster, the two most important being an inexpensive and green test bed for cloud computing research and a robust and mobile data center for operating in adverse environments.

Published in:

Cloud Computing Technology and Science (CloudCom), 2013 IEEE 5th International Conference on (Volume:2 )

Date of Conference:

2-5 Dec. 2013


Technical development and socioeconomic implications of the Raspberry Pi as a learning tool in developing countries


Ali, M. ; Sch. of Eng., Univ. of Warwick, Coventry, UK ; Vlaskamp, J.H.A. ; Eddin, N.N. ; Falconer, B. 

The recent development of the Raspberry Pi mini computer has provided new opportunities to enhance tools for education. The low cost means that it could be a viable option to develop solutions for education sectors in developing countries. This study describes the design, development and manufacture of a prototype solution for educational use within schools in Uganda whilst considering the social implications of implementing such solutions. This study aims to show the potential for providing an educational tool capable of teaching science, engineering and computing in the developing world. During the design and manufacture of the prototype, software and hardware were developed as well as testing performed to define the performance and limitation of the technology. This study showed that it is possible to develop a viable modular based computer systems for educational and teaching purposes. In addition to science, engineering and computing; this study considers the socioeconomic implications of introducing the EPi within developing countries. From a sociological perspective, it is shown that the success of EPi is dependant on understanding the social context, therefore a next phase implementation strategy is proposed.

Published in:

Computer Science and Electronic Engineering Conference (CEEC), 2013 5th

Date of Conference:

17-18 Sept. 2013



Raspberry PI Hadoop Cluster


安裝教學的blog~

2015年5月5日 星期二

[Hadoop] Browse the filesystem無法連結
















點選Browse the filesystem出現錯誤無法連結



[ Solution 1] 在master01執行

cd /opt/hadoop/etc/hadoop
vi hdfs-site.xml

hdfs-site.xml中加入以下,結果無效

<property>
<name>dfs.datanode.http.address</name>
<value>10.0.0.234:50075</value>
</property>



[ Solution 2 ] 在本機端執行

cd /etc
sudo vi hosts

在檔案的最後面加入(記得換成自己的ip喲),就可以瀏覽了

192.168.70.101 master01
192.168.70.102 slave01
192.168.70.103 slave02

















[參考資料]

http://www.cnblogs.com/hzmark/p/hadoop_browsethefilesystem.html

http://kurthung1224.pixnet.net/blog/post/170147913

2015年5月4日 星期一

[Spark] Word count 練習

先到spark資料夾
cd /opt/spark/

開啟spark-shell
sbin/start-all.sh

開啟spark-shell
bin/spark-shell



建立path到我們要讀的檔案
val path = "/in/123.txt"

把檔案讀進去,sc是SparkContext的縮寫
val file = sc.textFile(path)

file變成了一個RDD,要用collect指令看RDD裡的東西
file.collect



val line1 = file.flatMap(_.split(" "))

line1.collect

val line2 = line1.filter(_ != "")

line2.collect

val line3 = line2.map(s=> (s,1))

line3.collect

val line4 = line3.reduceByKey(_ + _)

line4.collect

line4.take(10)

line4.take(10).foreach(println)


官網一行指令

val wordCounts = file.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)

wordCounts.collect()


來看看執行狀況
http://[node IP]:4040/jobs/
















[參考資料]

https://spark.apache.org/docs/latest/quick-start.html

http://kurthung1224.pixnet.net/blog/post/275207950


[Hadoop] Word count 範例實做教學


Hadoop on cloudera quickstart vm test example 01 wordcount







mkdir temp

cd temp

ls -ltr

echo "this is huiming and you can call me juiming or killniu i am good at statistical modeling and data analysis" > wordcount.txt



hdfs dfs -mkdir /user/cloudera/input

hdfs dfs -ls /user/cloudera/input

hdfs dfs -put /home/cloudera/temp/wordcount.txt /user/cloudera/input

hdfs dfs -ls /user/cloudera/input

應該會出現剛剛創造的wordcount.txt



ls -ltr /usr/lib/hadoop-mapreduce/

hadoop jar /usr/lib/hadoop-mapreduce/hadoop-mapreduce-example.jar

hadoop jar /usr/lib/hadoop-mapreduce/hadoop-mapreduce-example.jar wordcount /user/cloudera/input/wordcount.txt /user/cloudera/output



hdfs dfs -ls /user/cloudera/output

hdfs dfs -cat /user/cloudera/output/part-r-00000

最後就會跑出word count囉