顯示具有 hdfs 標籤的文章。 顯示所有文章
顯示具有 hdfs 標籤的文章。 顯示所有文章

2016年5月20日 星期五

[PySpark] MLlib Regression example



[hadoop@master01 spark-1.6.0]$ cd /opt/spark-1.6.0/python/
[hadoop@master01 python]$ pyspark
Python 2.7.5 (default, Nov 20 2015, 02:00:19) 
[GCC 4.8.5 20150623 (Red Hat 4.8.5-4)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
16/05/19 20:10:08 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 1.6.0
/_/

Using Python version 2.7.5 (default, Nov 20 2015 02:00:19)
SparkContext available as sc, HiveContext available as sqlContext.
>>> from pyspark.sql.types import *
>>> from pyspark.sql import Row
>>> rdd = sc.textFile('file:/opt/data/Sacramentorealestatetransactions.csv')
>>> rdd = rdd.map(lambda line: line.split(","))

Now now we can see that each line has been broken into Spark's RDD tuple format, which is what we want. However, we'll want to remove the header before we convert to a DataFrame since there's not a straightforward way (that I know of) to tell Spark to interpret that header as a list of column names.

>>> header = rdd.first()
>>> rdd = rdd.filter(lambda line:line != header)

Now we can see that the header has been removed.

>>> df = rdd.map(lambda line: Row(street = line[0], city = line[1], zip=line[2], beds=line[4], baths=line[5], sqft=line[6], price=line[9])).toDF()
16/05/19 20:11:04 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
16/05/19 20:11:04 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
16/05/19 20:11:08 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0
16/05/19 20:11:08 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException
16/05/19 20:11:10 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
16/05/19 20:11:10 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
>>> 
>>> 
>>> 
>>> favorite_zip = df[df.zip == 95815]
>>> favorite_zip.show(5)
+-----+----+----------+------+----+----------------+-----+
|baths|beds| city| price|sqft| street| zip|
+-----+----+----------+------+----+----------------+-----+
| 1| 2|SACRAMENTO| 68880| 796| 2796 BRANCH ST|95815|
| 1| 2|SACRAMENTO| 69307| 852|2805 JANETTE WAY|95815|
| 1| 1|SACRAMENTO|106852| 871| 2930 LA ROSA RD|95815|
| 1| 2|SACRAMENTO| 78000| 800| 3132 CLAY ST|95815|
| 2| 4|SACRAMENTO| 89000|1316| 483 ARCADE BLVD|95815|
+-----+----+----------+------+----+----------------+-----+
only showing top 5 rows

>>> 
>>> 
>>> import pyspark.mllib
>>> import pyspark.mllib.regression
>>> from pyspark.mllib.regression import LabeledPoint
>>> from pyspark.sql.functions import *

Let's remove those rows that have suspicious 0 values for any of the features we want to use for prediction

>>> df = df.select('price','baths','beds','sqft')
>>> df = df[df.baths > 0]
>>> df = df[df.beds > 0]
>>> df = df[df.sqft > 0]
>>> df.describe(['baths','beds','price','sqft']).show()

+-------+------------------+------------------+------------------+------------------+
|summary| baths| beds| price| sqft|
+-------+------------------+------------------+------------------+------------------+
| count| 814| 814| 814| 814|
| mean|1.9606879606879606|3.2444717444717446| 229448.3697788698|1591.1461916461917|
| stddev|0.6698038253879438|0.8521372615281976|119825.57606009026| 663.8419297942894|
| min| 1| 1| 100000| 1000|
| max| 5| 8| 99000| 998|
+-------+------------------+------------------+------------------+------------------+

Labeled Points and Scaling Data


>>> 
>>> temp = df.map(lambda line:LabeledPoint(line[0],[line[1:]]))
>>> temp.take(5)
[LabeledPoint(59222.0, [1.0,2.0,836.0]), LabeledPoint(68212.0, [1.0,3.0,1167.0]), LabeledPoint(68880.0, [1.0,2.0,796.0]), LabeledPoint(69307.0, [1.0,2.0,852.0]), LabeledPoint(81900.0, [1.0,2.0,797.0])]
>>> 
>>> 
>>> 
>>> from pyspark.mllib.util import MLUtils
>>> from pyspark.mllib.linalg import Vectors
>>> from pyspark.mllib.feature import StandardScaler
>>> 
>>> features = df.map(lambda row: row[1:])
>>> features.take(5)
[(u'1', u'2', u'836'), (u'1', u'3', u'1167'), (u'1', u'2', u'796'), (u'1', u'2', u'852'), (u'1', u'2', u'797')]
>>> 
>>> 
>>> 
>>> standardizer = StandardScaler()
>>> model = standardizer.fit(features)
>>> features_transform = model.transform(features)
>>> 
>>> features_transform.take(5)
[DenseVector([1.493, 2.347, 1.2593]), DenseVector([1.493, 3.5206, 1.7579]), DenseVector([1.493, 2.347, 1.1991]), DenseVector([1.493, 2.347, 1.2834]), DenseVector([1.493, 2.347, 1.2006])]
>>> 
>>> 
>>> lab = df.map(lambda row: row[0])
>>> lab.take(5)
[u'59222', u'68212', u'68880', u'69307', u'81900']
>>> 
>>> transformedData = lab.zip(features_transform)
>>> transformedData.take(5)
[(u'59222', DenseVector([1.493, 2.347, 1.2593])), (u'68212', DenseVector([1.493, 3.5206, 1.7579])), (u'68880', DenseVector([1.493, 2.347, 1.1991])), (u'69307', DenseVector([1.493, 2.347, 1.2834])), (u'81900', DenseVector([1.493, 2.347, 1.2006]))]
>>> 
>>> 
>>> transformedData = transformedData.map(lambda row: LabeledPoint(row[0],[row[1]]))
>>> transformedData.take(5)
[LabeledPoint(59222.0, [1.49297445326,2.34703972035,1.25933593899]), LabeledPoint(68212.0, [1.49297445326,3.52055958053,1.7579486134]), LabeledPoint(68880.0, [1.49297445326,2.34703972035,1.19908063091]), LabeledPoint(69307.0, [1.49297445326,2.34703972035,1.28343806223]), LabeledPoint(81900.0, [1.49297445326,2.34703972035,1.20058701361])]
>>> 
>>> 
>>> trainingData, testingData = transformedData.randomSplit([.8,.2],seed=1234)
>>> from pyspark.mllib.regression import LinearRegressionWithSGD
>>> linearModel = LinearRegressionWithSGD.train(trainingData,1000,.2)
16/05/19 20:13:49 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
16/05/19 20:13:49 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
>>> linearModel.weights
DenseVector([15098.627, 3792.023, 70216.8097])
>>> 
>>> 
>>> testingData.take(10)
[LabeledPoint(100309.0, [2.98594890652,3.52055958053,1.36930187625]), LabeledPoint(124100.0, [2.98594890652,3.52055958053,2.41171870613]), LabeledPoint(148750.0, [2.98594890652,4.69407944071,2.21739533756]), LabeledPoint(150000.0, [1.49297445326,1.17351986018,1.14485085363]), LabeledPoint(161500.0, [2.98594890652,4.69407944071,2.3906293483]), LabeledPoint(166357.0, [1.49297445326,4.69407944071,2.94497818269]), LabeledPoint(168000.0, [2.98594890652,3.52055958053,2.22492725107]), LabeledPoint(178480.0, [2.98594890652,3.52055958053,1.78506350204]), LabeledPoint(181872.0, [1.49297445326,3.52055958053,1.73535287287]), LabeledPoint(182587.0, [4.47892335978,4.69407944071,2.78831438167])]
>>> 
>>> 
>>> linearModel.predict([1.49297445326,3.52055958053,1.73535287287])
157742.84989605084
>>> 
>>> 
>>> from pyspark.mllib.evaluation import RegressionMetrics
>>> prediObserRDDin = trainingData.map(lambda row: (float(linearModel.predict(row.features[0])),row.label))
>>> metrics = RegressionMetrics(prediObserRDDin)
>>> 
>>> 
>>> metrics.r2
0.4969184679643588 
>>> 
>>> 
>>> prediObserRDDout = testingData.map(lambda row: (float(linearModel.predict(row.features[0])),row.label))
>>> metrics = RegressionMetrics(prediObserRDDout)
>>> 
>>> 
>>> etrics.rootMeanSquaredError
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
NameError: name 'etrics' is not defined
>>> metrics.rootMeanSquaredError
94895.10434498572



[Reference]
http://www.techpoweredmath.com/spark-dataframes-mllib-tutorial/

2015年5月5日 星期二

[Hadoop] Browse the filesystem無法連結
















點選Browse the filesystem出現錯誤無法連結



[ Solution 1] 在master01執行

cd /opt/hadoop/etc/hadoop
vi hdfs-site.xml

hdfs-site.xml中加入以下,結果無效

<property>
<name>dfs.datanode.http.address</name>
<value>10.0.0.234:50075</value>
</property>



[ Solution 2 ] 在本機端執行

cd /etc
sudo vi hosts

在檔案的最後面加入(記得換成自己的ip喲),就可以瀏覽了

192.168.70.101 master01
192.168.70.102 slave01
192.168.70.103 slave02

















[參考資料]

http://www.cnblogs.com/hzmark/p/hadoop_browsethefilesystem.html

http://kurthung1224.pixnet.net/blog/post/170147913