国产亚洲精品福利在线无卡一,国产精久久一区二区三区,亚洲精品无码国模,精品久久久久久无码专区不卡

當(dāng)前位置: 首頁(yè) > news >正文

網(wǎng)站企業(yè)備案代理短視頻拍攝剪輯培訓(xùn)班

網(wǎng)站企業(yè)備案代理,短視頻拍攝剪輯培訓(xùn)班,谷歌云做網(wǎng)站,網(wǎng)頁(yè)設(shè)計(jì)中滾動(dòng)圖片設(shè)置目錄 一、輸入Source 1)代碼演示最普通的文件讀取方式: 2) 通過(guò)jdbc讀取數(shù)據(jù)庫(kù)數(shù)據(jù) 3) 讀取table中的數(shù)據(jù)【hive】 二、輸出Sink 實(shí)戰(zhàn)一:保存普通格式 實(shí)戰(zhàn)二:保存到數(shù)據(jù)庫(kù)中 實(shí)戰(zhàn)三:將結(jié)果保存在h…

目錄

一、輸入Source

1)代碼演示最普通的文件讀取方式:

2) 通過(guò)jdbc讀取數(shù)據(jù)庫(kù)數(shù)據(jù)

3) 讀取table中的數(shù)據(jù)【hive】

二、輸出Sink

實(shí)戰(zhàn)一:保存普通格式

實(shí)戰(zhàn)二:保存到數(shù)據(jù)庫(kù)中

實(shí)戰(zhàn)三:將結(jié)果保存在hive表中

三、總結(jié)


????????在大數(shù)據(jù)處理領(lǐng)域,SparkSQL 以其強(qiáng)大的數(shù)據(jù)處理能力和豐富的數(shù)據(jù)源支持備受青睞。它能夠高效地讀取和寫(xiě)入多種格式的數(shù)據(jù),無(wú)論是本地文件、分布式文件系統(tǒng)(如 HDFS)上的數(shù)據(jù),還是數(shù)據(jù)庫(kù)、Hive 表中的數(shù)據(jù),都能輕松駕馭。今天,就讓我們深入探究 SparkSQL 讀寫(xiě)數(shù)據(jù)的方式,通過(guò)詳細(xì)的代碼示例和原理講解,助你全面掌握這一關(guān)鍵技能。

?

一、輸入Source

?

  • 類型:text / csv【任意固定分隔符】 / json / orc / parquet / jdbc / table【Hive表】
  • 語(yǔ)法:spark.read.format(格式).load(讀取的地址)

方式一:給定讀取數(shù)據(jù)源的類型和地址

spark.read.format("json").load(path)
spark.read.format("csv").load(path)
spark.read.format("parquet").load(path)

方式二:直接調(diào)用對(duì)應(yīng)數(shù)據(jù)源類型的方法

spark.read.json(path)
spark.read.csv(path)
spark.read.parquet(path)

特殊參數(shù):option,用于指定讀取時(shí)的一些配置選項(xiàng)

spark.read.format("csv").option("sep", "\t").load(path)jdbcDF = spark.read \.format("jdbc") \.option("url", "jdbc:postgresql:dbserver") \.option("dbtable", "schema.tablename") \.option("user", "username") \.option("password", "password") \.load()

?

1)代碼演示最普通的文件讀取方式:

from pyspark.sql import SparkSession
import osif __name__ == '__main__':# 構(gòu)建環(huán)境變量# 配置環(huán)境os.environ['JAVA_HOME'] = 'D:/Program Files/Java/jdk1.8.0_271'# 配置Hadoop的路徑,就是前面解壓的那個(gè)路徑os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1/hadoop-3.3.1'# 配置base環(huán)境Python解析器的路徑os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'  # 配置base環(huán)境Python解析器的路徑os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'# 獲取sparkSession對(duì)象spark = SparkSession.builder.master("local[2]").appName("第一次構(gòu)建SparkSession").config("spark.sql.shuffle.partitions", 2).getOrCreate()df01 = spark.read.json("../../datas/resources/people.json")df01.printSchema()df02 = spark.read.format("json").load("../../datas/resources/people.json")df02.printSchema()df03 = spark.read.parquet("../../datas/resources/users.parquet")df03.printSchema()#spark.read.orc("")df04 = spark.read.format("orc").load("../../datas/resources/users.orc")df04.printSchema()df05 = spark.read.format("csv").option("sep",";").load("../../datas/resources/people.csv")df05.printSchema()df06 = spark.read.load(path="../../datas/resources/people.csv",format="csv",sep=";")df06.printSchema()spark.stop()

?

2) 通過(guò)jdbc讀取數(shù)據(jù)庫(kù)數(shù)據(jù)

先在本地?cái)?shù)據(jù)庫(kù)或者linux數(shù)據(jù)庫(kù)中插入一張表:

CREATE TABLE `emp`  (`empno` int(11) NULL DEFAULT NULL,`ename` varchar(50) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,`job` varchar(50) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,`mgr` int(11) NULL DEFAULT NULL,`hiredate` date NULL DEFAULT NULL,`sal` decimal(7, 2) NULL DEFAULT NULL,`comm` decimal(7, 2) NULL DEFAULT NULL,`deptno` int(11) NULL DEFAULT NULL
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;-- ----------------------------
-- Records of emp
-- ----------------------------
INSERT INTO `emp` VALUES (7369, 'SMITH', 'CLERK', 7902, '1980-12-17', 800.00, NULL, 20);
INSERT INTO `emp` VALUES (7499, 'ALLEN', 'SALESMAN', 7698, '1981-02-20', 1600.00, 300.00, 30);
INSERT INTO `emp` VALUES (7521, 'WARD', 'SALESMAN', 7698, '1981-02-22', 1250.00, 500.00, 30);
INSERT INTO `emp` VALUES (7566, 'JONES', 'MANAGER', 7839, '1981-04-02', 2975.00, NULL, 20);
INSERT INTO `emp` VALUES (7654, 'MARTIN', 'SALESMAN', 7698, '1981-09-28', 1250.00, 1400.00, 30);
INSERT INTO `emp` VALUES (7698, 'BLAKE', 'MANAGER', 7839, '1981-05-01', 2850.00, NULL, 30);
INSERT INTO `emp` VALUES (7782, 'CLARK', 'MANAGER', 7839, '1981-06-09', 2450.00, NULL, 10);
INSERT INTO `emp` VALUES (7788, 'SCOTT', 'ANALYST', 7566, '1987-04-19', 3000.00, NULL, 20);
INSERT INTO `emp` VALUES (7839, 'KING', 'PRESIDENT', NULL, '1981-11-17', 5000.00, NULL, 10);
INSERT INTO `emp` VALUES (7844, 'TURNER', 'SALESMAN', 7698, '1981-09-08', 1500.00, 0.00, 30);
INSERT INTO `emp` VALUES (7876, 'ADAMS', 'CLERK', 7788, '1987-05-23', 1100.00, NULL, 20);
INSERT INTO `emp` VALUES (7900, 'JAMES', 'CLERK', 7698, '1981-12-03', 950.00, NULL, 30);
INSERT INTO `emp` VALUES (7902, 'FORD', 'ANALYST', 7566, '1981-12-03', 3000.00, NULL, 20);
INSERT INTO `emp` VALUES (7934, 'MILLER', 'CLERK', 7782, '1982-01-23', 1300.00, NULL, 10);

dept的數(shù)據(jù):

CREATE TABLE `dept`  (`deptno` int(11) NULL DEFAULT NULL,`dname` varchar(14) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,`loc` varchar(13) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;-- ----------------------------
-- Records of dept
-- ----------------------------
INSERT INTO `dept` VALUES (10, 'ACCOUNTING', 'NEW YORK');
INSERT INTO `dept` VALUES (20, 'RESEARCH', 'DALLAS');
INSERT INTO `dept` VALUES (30, 'SALES', 'CHICAGO');
INSERT INTO `dept` VALUES (40, 'OPERATIONS', 'BOSTON');

接著放驅(qū)動(dòng)程序:

py4j.protocol.Py4JJavaError: An error occurred while calling o67.load.
: java.lang.ClassNotFoundException: com.mysql.cj.jdbc.Driverat java.net.URLClassLoader.findClass(URLClassLoader.java:382)at java.lang.ClassLoader.loadClass(ClassLoader.java:418)at java.lang.ClassLoader.loadClass(ClassLoader.java:351)at org.apache.spark.sql.execution.datasources.jdbc.DriverRegistry$.register(DriverRegistry.scala:46)at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$1(JDBCOptions.scala:102)at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$1$adapted(JDBCOptions.scala:102)

Python環(huán)境放入MySQL連接驅(qū)動(dòng)

  • 找到工程中pyspark庫(kù)包所在的環(huán)境,將驅(qū)動(dòng)包放入環(huán)境所在的jars目錄中
  • 如果是Linux上:注意集群模式所有節(jié)點(diǎn)都要放。

第一種情況:

假如你是windows環(huán)境:

最終的路徑是在這里:

第二種情況:linux環(huán)境下,按照如下方式進(jìn)行

# 進(jìn)入目錄
cd /opt/installs/anaconda3/lib/python3.8/site-packages/pyspark/jars# 上傳jar包:mysql-connector-java-5.1.32.jar

代碼練習(xí):

import osfrom pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, LongTypeif __name__ == '__main__':# 獲取sparkSession對(duì)象# 設(shè)置 任務(wù)的環(huán)境變量os.environ['JAVA_HOME'] = r'C:\Program Files\Java\jdk1.8.0_77'# 配置Hadoop的路徑,就是前面解壓的那個(gè)路徑os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1/hadoop-3.3.1'# 配置base環(huán)境Python解析器的路徑os.environ['PYSPARK_PYTHON'] = r'C:\ProgramData\Miniconda3\python.exe'  # 配置base環(huán)境Python解析器的路徑os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'# 得到sparkSession對(duì)象spark = SparkSession.builder.master("local[2]").appName("").config("spark.sql.shuffle.partitions", 2).getOrCreate()# 處理邏輯# 讀取json 數(shù)據(jù)df1 = spark.read.format("json").load("../../datas/sql/person.json")df1.show()# 另一種寫(xiě)法,推薦使用這一種df2 = spark.read.json("../../datas/sql/person.json")df2.show()df3 = spark.read.csv("../../datas/dept.csv")df4 = spark.read.format("csv").load("../../datas/dept.csv")# 讀取分隔符為別的分隔符的文件user_schema = StructType([StructField(name="emp_id", dataType=StringType(), nullable=False),StructField(name="emp_name", dataType=StringType(), nullable=True),StructField(name="salary", dataType=DoubleType(), nullable=True),StructField(name="comm", dataType=DoubleType(), nullable=True),StructField(name="dept_id", dataType=LongType(), nullable=True)])# 使用csv 讀取了一個(gè) \t 為分隔符的文件,讀取的數(shù)據(jù)字段名很隨意,所以可以自定義df5 = spark.read.format("csv").option("sep","\t").load("../../datas/emp.tsv",schema=user_schema)df5.show()# 昨天的作業(yè)是否也可以有另一個(gè)寫(xiě)法movie_schema = StructType([StructField(name="movie_id", dataType=LongType(), nullable=False),StructField(name="movie_name", dataType=StringType(), nullable=True),StructField(name="movie_type", dataType=StringType(), nullable=True)])movieDF = spark.read.format("csv").option("sep","::").load("../../datas/zuoye/movies.dat",schema=movie_schema)movieDF.show()spark.read.load(path="../../datas/zuoye/movies.dat",format="csv",sep="::",schema=movie_schema).show()dict = {"user":"root","password":"root"}jdbcDf = spark.read.jdbc(url="jdbc:mysql://localhost:3306/spark",table="emp",properties=dict)jdbcDf.show()# jdbc的另一種寫(xiě)法jdbcDf2 = spark.read.format("jdbc") \.option("driver", "com.mysql.cj.jdbc.Driver") \.option("url", "jdbc:mysql://localhost:3306/spark") \.option("dbtable", "spark.dept") \.option("user", "root") \.option("password", "root").load()jdbcDf2.show()# 讀取hive表中的數(shù)據(jù)# 關(guān)閉spark.stop()

?

3) 讀取table中的數(shù)據(jù)【hive】

海量數(shù)據(jù),如何處理,存儲(chǔ)在hdfs上

第一種:

使用spark讀取hdfs上的數(shù)據(jù)(可以使用sparkCore讀取,也可以使用sparksql讀取),將數(shù)據(jù)變?yōu)楸怼緮?shù)據(jù)+Schema】,然后編寫(xiě)sql或者sparkCore代碼。

rdd --> dataFrame

第二種:推薦

將hdfs上的數(shù)據(jù)映射成hive的表,然后通過(guò)sparkSql連接hive, 編寫(xiě) sql 處理需求。

  • 場(chǎng)景:Hive底層默認(rèn)是MR引擎,計(jì)算性能特別差,一般用Hive作為數(shù)據(jù)倉(cāng)庫(kù),使用SparkSQL對(duì)Hive中的數(shù)據(jù)進(jìn)行計(jì)算
    • 存儲(chǔ):數(shù)據(jù)倉(cāng)庫(kù):Hive:將HDFS文件映射成表
    • 計(jì)算:計(jì)算引擎:SparkSQL、Impala、Presto:對(duì)Hive中的數(shù)據(jù)表進(jìn)行處理
  • 問(wèn)題:SparkSQL怎么能訪問(wèn)到Hive中有哪些表,以及如何知道Hive中表對(duì)應(yīng)的HDFS的地址?

Hive中的表存在哪里?元數(shù)據(jù)--MySQL , 啟動(dòng)metastore服務(wù)即可。

本質(zhì)上:SparkSQL訪問(wèn)了Metastore服務(wù)獲取了Hive元數(shù)據(jù),基于元數(shù)據(jù)提供的地址進(jìn)行計(jì)算

先退出base環(huán)境:conda deactivate
啟動(dòng)服務(wù):
啟動(dòng)hdfs:  start-dfs.sh  因?yàn)閔ive的數(shù)據(jù)在那里存儲(chǔ)著
啟動(dòng)yarn:  start-yarn.sh 因?yàn)閟park是根據(jù)yarn部署的,假如你的spark是standalone模式,不需要啟動(dòng)yarn.
日志服務(wù)也需要啟動(dòng)一下:
mapred --daemon start historyserver
# 啟動(dòng)Spark的HistoryServer:18080
/opt/installs/spark/sbin/start-history-server.sh
啟動(dòng)metastore服務(wù): 因?yàn)閟parkSQL需要知道表結(jié)構(gòu),和表數(shù)據(jù)的位置
hive-server-manager.sh start metastore
啟動(dòng)spark服務(wù): 啥服務(wù)也沒(méi)有了,已經(jīng)啟動(dòng)完了。
查看metastore服務(wù):
hive-server-manager.sh status metastore

修改配置:

cd /opt/installs/spark/conf
新增:hive-site.xml
vi hive-site.xml在這個(gè)文件中,編寫(xiě)如下配置:
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration><property><name>hive.metastore.uris</name><value>thrift://bigdata01:9083</value></property>
</configuration>接著將該文件進(jìn)行分發(fā):
xsync.sh hive-site.xml

操作sparkSQL:

/opt/installs/spark/bin/pyspark --master local[2] --conf spark.sql.shuffle.partitions=2

此處的pyspark更像是一個(gè)客戶端,里面可以通過(guò)python編寫(xiě)spark代碼而已。而我們以前安裝的pyspark更像是spark的python運(yùn)行環(huán)境。

進(jìn)入后,通過(guò)內(nèi)置對(duì)象spark:

>>> spark.sql("show databases").show()
+---------+
|namespace|
+---------+
|  default|
|     yhdb|
+---------+>>> spark.sql("select * from yhdb.student").show()
+---+------+                                                                    
|sid| sname|
+---+------+
|  1|laoyan|
|  1|廉德楓|
|  2|  劉浩|
|  3|  王鑫|
|  4|  司翔|
+---+------+

開(kāi)發(fā)環(huán)境如何編寫(xiě)代碼,操作hive:

Pycharm工具集成Hive開(kāi)發(fā)SparkSQL,必須申明Metastore的地址和啟用Hive的支持

spark = SparkSession \.builder \.appName("HiveAPP") \.master("local[2]") \.config("spark.sql.warehouse.dir", 'hdfs://bigdata01:9820/user/hive/warehouse') \.config('hive.metastore.uris', 'thrift://bigdata01:9083') \.config("spark.sql.shuffle.partitions", 2) \.enableHiveSupport()\.getOrCreate()

代碼實(shí)戰(zhàn):

from pyspark.sql import SparkSession
import osif __name__ == '__main__':# 構(gòu)建環(huán)境變量# 配置環(huán)境os.environ['JAVA_HOME'] = 'D:/Program Files/Java/jdk1.8.0_271'# 配置Hadoop的路徑,就是前面解壓的那個(gè)路徑os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1/hadoop-3.3.1'# 配置base環(huán)境Python解析器的路徑os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'  # 配置base環(huán)境Python解析器的路徑os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'# 防止在本地操作hdfs的時(shí)候,出現(xiàn)權(quán)限問(wèn)題os.environ['HADOOP_USER_NAME'] = 'root'# 獲取sparkSession對(duì)象spark = SparkSession \.builder \.appName("HiveAPP") \.master("local[2]") \.config("spark.sql.warehouse.dir", 'hdfs://bigdata01:9820/user/hive/warehouse') \.config('hive.metastore.uris', 'thrift://bigdata01:9083') \.config("spark.sql.shuffle.partitions", 2) \.enableHiveSupport() \.getOrCreate()spark.sql("select * from yhdb.student").show()spark.stop()

代碼還可以這樣寫(xiě):

方式二:加載Hive表的數(shù)據(jù)變成DF,可以調(diào)用DSL或者SQL的方式來(lái)實(shí)現(xiàn)計(jì)算

# 讀取Hive表構(gòu)建DataFrame

hiveData = spark.read.table("yhdb.student")

hiveData.printSchema()

hiveData.show()

# 讀取hive表中的數(shù)據(jù)spark2 = SparkSession \.builder \.appName("HiveAPP") \.master("local[2]") \.config("spark.sql.warehouse.dir", 'hdfs://192.168.233.128:9820/user/hive/warehouse') \.config('hive.metastore.uris', 'thrift://192.168.233.128:9083') \.config("spark.sql.shuffle.partitions", 2) \.enableHiveSupport() \.getOrCreate()#spark2.sql("show databases").show()#spark2.sql("show  tables").show()#spark2.sql("select * from yhdb.t_user").show()spark2.read.table("t_user2").show()

不要在一個(gè)python 文件中,創(chuàng)建兩個(gè)不同的sparkSession對(duì)象,否則對(duì)于sparksql獲取hive的元數(shù)據(jù),有影響。另外,記得添加一個(gè)權(quán)限校驗(yàn)的語(yǔ)句:

# 防止在本地操作hdfs的時(shí)候,出現(xiàn)權(quán)限問(wèn)題
os.environ['HADOOP_USER_NAME'] = 'root'

為什么有些平臺(tái)不支持,不兼容 sqoop flume datax 這些工具呢?

spark 可以讀取日志數(shù)據(jù)

spark 可以讀取數(shù)據(jù)庫(kù)數(shù)據(jù)

spark 可以讀取 hdfs 數(shù)據(jù)

spark 可以讀取 hive 數(shù)據(jù)

------------------------------------

spark 可以讀取日志數(shù)據(jù),形成一個(gè) A 表,讀取 mysql 數(shù)據(jù),形成一個(gè) B 表

A 表和 B 表還可以相互關(guān)聯(lián),此時(shí)也就不需要 sqoop、flume、datax 去導(dǎo)入導(dǎo)出了。

spark 還可以將統(tǒng)計(jì)出來(lái)的結(jié)果直接放入 mysql 或者直接放入 hive

--------------------

我們后面學(xué)習(xí)的內(nèi)容還是沿用 將日志數(shù)據(jù),數(shù)據(jù)庫(kù)數(shù)據(jù)等所有數(shù)據(jù)抽取到 hive ,然后呢,使用 spark 去統(tǒng)計(jì),統(tǒng)計(jì)完之后還是放入 hive ,使用 datax 等工具將結(jié)果導(dǎo)出 mysql。

?

二、輸出Sink

?

sink --> 下沉 --> 落盤(pán) --> 保存起來(lái)

如果輸出路徑或者表已經(jīng)存在了怎么辦

  • 類型:text /csv【所有具有固定分隔符的文件】/ json/ orc/ parquet / jdbc / table【Hive表】
  • 語(yǔ)法:DataFrame.write.format(保存的類型).save(保存到哪)
    • 方法:save-保存到文件save(path)或者數(shù)據(jù)庫(kù)表save()中,saveAsTable-用于保存到Hive表

方式一:給定輸出數(shù)據(jù)源的類型和地址

df.write.format("json").save(path)
df.write.format("csv").save(path)
df.write.format("parquet").save(path)

方式二:直接調(diào)用對(duì)應(yīng)數(shù)據(jù)源類型的方法

df.write.json(path)
df.write.csv(path)
df.write.parquet(path)

特殊參數(shù):option,用于指定輸出時(shí)的一些配置選項(xiàng)

df.write \
.format("jdbc") \
.option("url", "jdbc:postgresql:dbserver") \
.option("dbtable", "schema.tablename") \
.option("user", "username") \
.option("password", "password") \
.save()

輸出模式:Save Mode

append: 追加模式,當(dāng)數(shù)據(jù)存在時(shí),繼續(xù)追加
overwrite: 覆寫(xiě)模式,當(dāng)數(shù)據(jù)存在時(shí),覆寫(xiě)以前數(shù)據(jù),存儲(chǔ)當(dāng)前最新數(shù)據(jù);
error/errorifexists: 如果目標(biāo)存在就報(bào)錯(cuò),默認(rèn)的模式
ignore: 忽略,數(shù)據(jù)存在時(shí)不做任何操作

代碼如何編寫(xiě):

df.write.mode(saveMode="append").format("csv").save(path)

?

實(shí)戰(zhàn)一:保存普通格式

import osfrom pyspark.sql import SparkSessionif __name__ == '__main__':# 配置環(huán)境os.environ['JAVA_HOME'] = 'C:/Program Files/Java/jdk1.8.0_241'# 配置Hadoop的路徑,就是前面解壓的那個(gè)路徑os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1'# 配置base環(huán)境Python解析器的路徑os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'  # 配置base環(huán)境Python解析器的路徑os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'spark = SparkSession.builder.master("local[2]").appName("").config("spark.sql.shuffle.partitions", 2).getOrCreate()df = spark.read.json("../../datas/person.json")# 獲取年齡最大的人的名字df.createOrReplaceTempView("persons")rsDf = spark.sql("""select name,age from persons where age = (select max(age) from persons)""")# 將結(jié)果打印到控制臺(tái)#rsDf.write.format("console").save()#rsDf.write.json("../../datas/result",mode="overwrite")#rsDf.write.mode(saveMode='overwrite').format("json").save("../../datas/result")#rsDf.write.mode(saveMode='overwrite').format("csv").save("../../datas/result1")#rsDf.write.mode(saveMode='overwrite').format("parquet").save("../../datas/result2")#rsDf.write.mode(saveMode='append').format("csv").save("../../datas/result1")# text 保存路徑為hdfs 直接報(bào)錯(cuò),不支持#rsDf.write.mode(saveMode='overwrite').text("hdfs://bigdata01:9820/result")#rsDf.write.orc("hdfs://bigdata01:9820/result",mode="overwrite")rsDf.write.parquet("hdfs://bigdata01:9820/result", mode="overwrite")spark.stop()
假如:
spark.sql("select concat(name,' ',age) from person").write.text("hdfs://bigdata01:9820/spark/result")
直接報(bào)錯(cuò):假如你的輸出類型是text類型,直接報(bào)錯(cuò)
pyspark.sql.utils.AnalysisException: Text data source does not support bigint data type.
假如修改為parquet等類型,是可以直接保存的:
rsDf.write.parquet("hdfs://bigdata01:9820/result")                                                                

?

實(shí)戰(zhàn)二:保存到數(shù)據(jù)庫(kù)中

import osfrom pyspark.sql import SparkSessionif __name__ == '__main__':# 配置環(huán)境os.environ['JAVA_HOME'] = 'C:/Program Files/Java/jdk1.8.0_241'# 配置Hadoop的路徑,就是前面解壓的那個(gè)路徑os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1'# 配置base環(huán)境Python解析器的路徑os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'  # 配置base環(huán)境Python解析器的路徑os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'spark = SparkSession.builder.master("local[2]").appName("").config("spark.sql.shuffle.partitions", 2).getOrCreate()df = spark.read.format("csv").option("sep","\t").load("../../datas/zuoye/emp.tsv").toDF("id","name","sal","comm","deptno")# 獲取年齡最大的人的名字df.createOrReplaceTempView("emps")rsDf = spark.sql("""select * from emps where comm is not null""")# 不需要事先將表創(chuàng)建好,它可以幫助我們創(chuàng)建rsDf.write.format("jdbc") \.option("driver", "com.mysql.cj.jdbc.Driver") \.option("url", "jdbc:mysql://localhost:3306/spark?characterEncoding=UTF-8") \.option("user","root") \.option("password", "123456") \.option("dbtable", "emp1") \.save(mode="overwrite")spark.stop()

?

實(shí)戰(zhàn)三:將結(jié)果保存在hive表中

import osfrom pyspark.sql import SparkSessionif __name__ == '__main__':# 配置環(huán)境os.environ['JAVA_HOME'] = 'C:/Program Files/Java/jdk1.8.0_241'# 配置Hadoop的路徑,就是前面解壓的那個(gè)路徑os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1'# 配置base環(huán)境Python解析器的路徑os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'  # 配置base環(huán)境Python解析器的路徑os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'os.environ['HADOOP_USER_NAME'] = 'root'spark = SparkSession \.builder \.appName("測(cè)試本地連接hive") \.master("local[2]") \.config("spark.sql.warehouse.dir", 'hdfs://bigdata01:9820/user/hive/warehouse') \.config('hive.metastore.uris', 'thrift://bigdata01:9083') \.config("spark.sql.shuffle.partitions", 2) \.enableHiveSupport() \.getOrCreate()df = spark.read.format("csv").option("sep", "\t").load("../../datas/zuoye/emp.tsv").toDF("id", "name", "sal","comm", "deptno")# 獲取年齡最大的人的名字df.createOrReplaceTempView("emps")rsDf = spark.sql("""select * from emps where comm is not null""")rsDf.write.saveAsTable("yhdb03.emp")spark.stop()

三、總結(jié)

????????SparkSQL 讀寫(xiě)數(shù)據(jù)功能豐富強(qiáng)大,涵蓋多種數(shù)據(jù)源與格式,理解其原理、語(yǔ)法和操作細(xì)節(jié),結(jié)合不同業(yè)務(wù)場(chǎng)景(如數(shù)據(jù)分析、數(shù)據(jù)遷移、數(shù)據(jù)存儲(chǔ)優(yōu)化等)靈活運(yùn)用,能極大提升大數(shù)據(jù)處理效率,助力在大數(shù)據(jù)領(lǐng)域深挖數(shù)據(jù)價(jià)值、攻克業(yè)務(wù)難題,為數(shù)據(jù)驅(qū)動(dòng)決策筑牢根基。后續(xù)實(shí)踐中,多嘗試不同數(shù)據(jù)、場(chǎng)景組合,深化掌握程度。

http://m.aloenet.com.cn/news/42743.html

相關(guān)文章:

  • 溫州網(wǎng)站制作多少錢(qián)谷歌google 官網(wǎng)下載
  • 手機(jī)html5網(wǎng)站源碼廣告投放的方式有哪些
  • 深圳網(wǎng)站建設(shè)培訓(xùn)班深圳最新通告今天
  • 技術(shù)支持:淄博網(wǎng)站建設(shè)優(yōu)化設(shè)計(jì)三年級(jí)上冊(cè)語(yǔ)文答案
  • 山東省建設(shè)工程招標(biāo)中心網(wǎng)站當(dāng)日網(wǎng)站收錄查詢統(tǒng)計(jì)
  • 網(wǎng)站建設(shè)需求分析寫(xiě)什么茶葉seo網(wǎng)站推廣與優(yōu)化方案
  • 網(wǎng)站程序組成seo搜狗排名點(diǎn)擊
  • 辛集seo網(wǎng)站優(yōu)化電話靠譜的免費(fèi)建站
  • 建立手機(jī)個(gè)人網(wǎng)站營(yíng)銷網(wǎng)站建設(shè)制作
  • 視頻資源的網(wǎng)站怎么做站長(zhǎng)資訊
  • 網(wǎng)站建設(shè)課程設(shè)計(jì)內(nèi)容淘寶店鋪轉(zhuǎn)讓價(jià)格表
  • wordpress評(píng)論框文件采集站seo課程
  • 自己做網(wǎng)站外包百度熱搜高考大數(shù)據(jù)
  • 企業(yè)做網(wǎng)站需要什么軟件百度品牌廣告收費(fèi)標(biāo)準(zhǔn)
  • 網(wǎng)站制作預(yù)付款會(huì)計(jì)分錄小程序運(yùn)營(yíng)推廣公司
  • 大慶網(wǎng)站制作營(yíng)銷策劃方案包括哪些內(nèi)容
  • 在百度做網(wǎng)站多少錢(qián)網(wǎng)站推廣營(yíng)銷
  • 網(wǎng)站站內(nèi)鏈接濰坊住房公積金管理中心
  • 設(shè)計(jì)網(wǎng)頁(yè)推薦萬(wàn)秀服務(wù)不錯(cuò)的seo推廣
  • 網(wǎng)站的功能和作用百度seo排名帝搜軟件
  • 寶雞哪有有做網(wǎng)站的專業(yè)網(wǎng)絡(luò)推廣公司
  • 網(wǎng)站開(kāi)發(fā)的工作總結(jié)google搜索優(yōu)化方法
  • 怎么做賭博網(wǎng)站代理承德seo
  • html做網(wǎng)站實(shí)戰(zhàn)教程軟文寫(xiě)作經(jīng)驗(yàn)是什么
  • 余姚物流做網(wǎng)站鄭州本地seo顧問(wèn)
  • 醫(yī)院網(wǎng)站建設(shè) 費(fèi)用百度認(rèn)證官網(wǎng)
  • 四川建設(shè)廳官方網(wǎng)站四庫(kù)一平臺(tái)網(wǎng)絡(luò)運(yùn)營(yíng)商
  • 建設(shè)銀行手機(jī)查詢網(wǎng)站合肥seo優(yōu)化排名公司
  • 在北京大學(xué)生做家教的網(wǎng)站關(guān)鍵詞調(diào)詞平臺(tái)哪個(gè)好
  • 東莞電商頁(yè)面設(shè)計(jì)公司福州短視頻seo機(jī)會(huì)