-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy path265_pyskparkWriteDataframeAzure__1_.py
123 lines (97 loc) · 4.31 KB
/
265_pyskparkWriteDataframeAzure__1_.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# # WriteDF2Parquet
#
# This is the simplest PySpark example.create a dataframe and write it
#
# for a new environment you'll need to
# add the userid to the groups that have permission to access ML
# update the id_broker mappings so you have write access to the S3 buckets
# XX do not do this - error when creating two roles for one user: update id_broker to allow ranger access
# sync to freeipa to fix HTTP ERROR 403 forbidden
# update Ranger policy so you can write to the default Hive database, commonly policy #14
# update Ranger policy so you can use URL commonly policy #13
#
from __future__ import print_function
import sys
from random import random
from operator import add
from pyspark.sql import SparkSession
# if java is not in the default location you'll need to specify
# spark.executorEnv.JAVA_HOME=/usr/java/yadayada
#
# secret sauce to get rid of java.lang.IllegalStateException:
# Authentication with IDBroker
# failed. Please ensure you have a Kerberos token by using kinit.
#
# use this: .config("spark.yarn.access.hadoopFileSystems","s3a://cdp-sandbox-default-se/datalake/warehouse")\
#
#
# .config("spark.yarn.access.hadoopFileSystems","s3a://cdp-sandbox-default-se/datalake/warehouse")\
spark = SparkSession\
.builder\
.config('job.local.dir', 'file:///home/cdsw/')\
.appName("WriteDF2Parquet")\
.config("spark.authenticate", "true") \
.config("spark.yarn.access.hadoopFileSystems",\
"abfs://[email protected]/warehouse")\
.getOrCreate()
# Create a file named spark-defaults.conf in the project or update the existing file with property:
#spark.yarn.access.hadoopFileSystems=s3a://<STORAGE LOCATION OF ENV>
# spark.yarn.access.hadoopFileSystems='s3a://cdp-sandbox-default-se/datalake/warehouse/tablespace/managed/hive/martydropme'
# this is where a create table landed
#(Note: This is the same S3 location as defined under Data Access)
# this error is from Ranger permissions
# AnalysisException: 'org.apache.hadoop.hive.ql.metadata.HiveException:
# MetaException(message:Permission denied:
# user [marty] does not have [SELECT] privilege on [default]);'
# syncfreeipa required after "marty" added as dwadmin
#
!klist
foo=spark.sql("show tables")
foo.take(50)
spark.sql("select * from pysparktab").take(20)
# ranger is cool with the next statement, SDX not so much...
# need the right mapping
#bar=spark.sql("show create table sampletxt")
#bar.take(50)
#bar2=spark.sql("select * from sampletxt")
#bar2.take(50)
# in ranger added marty with full access to default database
# !hadoop fs -rm -r s3a://cdp-sandbox-default-se/datalake/warehouse/tablespace/external/hive/pysparktab
spark.sql("CREATE TABLE IF NOT EXISTS pysparktab (key INT, value STRING) USING hive")
spark.sql("insert into pysparktab values (22,'created in MLx')")
spark.sql("insert into pysparktab values (22,concat ( 'created in pyspark mlx', current_timestamp() ) )")
spark.sql("select * from pysparktab").take(20)
# DataFrames can be saved as Parquet files, maintaining the schema information.
!klist
# old debugging !hadoop fs -rm -r hdfs://se-sandbox-dl-12apr-master0.se-sandb.a465-9q4k.cloudera.site:8020/tmp/age.parquet
# old debugging !rm -rf age.parquet
# try to write without usinig hive
df = spark.createDataFrame([("10", ), ("11", ), ("13", )], ["age"])
df.show()
#!hadoop fs -rm -r s3a://cdp-sandbox-default-se/datalake/martyparquet
!hadoop fs -rm -r abfs://[email protected]/warehouse/martyparquet
#df.write.parquet("s3a://cdp-sandbox-default-se/datalake/martyparquet")
df.write.parquet("abfs://[email protected]/warehouse/martyparquet")
# this is where a create table landed hdfs://se-sandbox-dl-12apr-master0.se-sandb.a465-9q4k.cloudera.site:8020/tmp/age.parquet")
# file:/home/cdsw/age.parquet")
#!hadoop fs -ls s3a://cdp-sandbox-default-se/datalake/
!hadoop fs -ls abfs://[email protected]/warehouse/
#spark.stop()
spark.sql("show tables").take(50)
spark.sql("select * from martynifi1").take(3)
spark.sql("""CREATE TABLE IF NOT EXISTS
pysparknifi3 (key1 INT,
key2 int,
key3 int,
datestamp STRING) USING hive"""
)
spark.sql("""insert into pysparknifi2
select val2, val3, val1*2,
datestring from martynifi1
"""
)
spark.sql("""insert into pysparknifi3
select * from pysparknifi
"""
)
spark.sql("select * from martynifi2").take(3)