sparkdataframe(sparkdataframe 转pandas 不能有空值吗)
本篇文章给大家谈谈sparkdataframe,以及sparkdataframe 转pandas 不能有空值吗对应的知识点,希望对各位有所帮助,不要忘了收藏本站喔。
本文目录一览:
- 1、spark dataframe可以干什么
- 2、spark dataframe 字段可以有几种数据类型
- 3、如何理解spark中RDD和DataFrame的结构
- 4、spark中什么是dataframe
- 5、如何打印 spark dataframe
- 6、sparkdataframe转换成字节流
spark dataframe可以干什么
DataFrame是Spark SQL的一种编程抽象,它是一张分布式的表,是数据类型为Row的DataSet,可以简单认为:DataFrame是DataSet[Row]的别名。
你说我们得到了一张表可以做些什么呢?那些数据库的操作都可以,比如增删改查,联结操作等等,都是可以的。
推荐你去Spark官网查看官方文档,然后结合官方文档、博客还有相关书籍,这样学起来比较快。
[img]spark dataframe 字段可以有几种数据类型
import scala.collection.mutable.ArrayBuffer
import scala.io.Source
import java.io.PrintWriter
import util.control.Breaks._
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import java.sql.DriverManager
import java.sql.PreparedStatement
import java.sql.Connection
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.Row
import java.util.Properties
import org.apache.spark.sql.SaveMode
object SimpleDemo extends App {
val sc = new SparkContext("local[*]", "test")
val sqlc = new SQLContext(sc)
val driverUrl = "jdbc:mysql://ip:3306/ding?user=rootpassword=rootzeroDateTimeBehavior=convertToNullcharacterEncoding=utf-8"
val tableName = "tbaclusterresult"
//把数据转化为DataFrame,并注册为一个表
val df = sqlc.read.json("G:/data/json.txt")
df.registerTempTable("user")
val res = sqlc.sql("select * from user"亩核)
println(res.count() + "---------------------------")
res.collect().map { row =
{
println(row.toString())
}
}
//从MYSQL读取数据
val jdbcDF = sqlc.read
.options(Map("url" - driverUrl,
// "user" - "root",
// "password" - "root",
"dbtable" - tableName))
.format("jdbc")
.load()
println(jdbcDF.count() + "---------------------------")
jdbcDF.collect().map { row =轮陆
{
println(row.toString())
}
}
//插入数据至MYSQL
val schema = StructType(
StructField("name", StringType) ::
StructField("age", IntegerType)
:: Nil)
val data1 = sc.parallelize(List(("blog1", 301), ("iteblog", 29),
("com", 40), ("bt", 33), ("www", 23))).
map(item = Row.apply(item._1, item._2))
import sqlc.implicits._
val df1 = sqlc.createDataFrame(data1, schema)
// df1.write.jdbc(driverUrl, "sparktomysql"迅桐掘, new Properties)
df1.write.mode(SaveMode.Overwrite).jdbc(driverUrl, "testtable", new Properties)
//DataFrame类中还有insertIntoJDBC方法,调用该函数必须保证表事先存在,它只用于插入数据,函数原型如下:
//def insertIntoJDBC(url: String, table: String, overwrite: Boolean): Unit
//插入数据到MYSQL
val data = sc.parallelize(List(("www", 10), ("iteblog", 20), ("com", 30)))
data.foreachPartition(myFun)
case class Blog(name: String, count: Int)
def myFun(iterator: Iterator[(String, Int)]): Unit = {
var conn: Connection = null
var ps: PreparedStatement = null
val sql = "insert into blog(name, count) values (?, ?)"
try {
conn = DriverManager.getConnection(driverUrl, "root", "root")
iterator.foreach(data = {
ps = conn.prepareStatement(sql)
ps.setString(1, data._1)
ps.setInt(2, data._2)
ps.executeUpdate()
})
} catch {
case e: Exception = e.printStackTrace()
} finally {
if (ps != null) {
ps.close()
}
if (conn != null) {
conn.close()
}
}
}
}
如何理解spark中RDD和DataFrame的结构
RDD、DataFrame和DataSet是册丛容易产生混淆的概念,必须对其相互之间对比,才可以知道其中异同。
RDD和DataFrame
RDD-DataFrame
上图直观地体现了DataFrame和RDD的区别。左侧的RDD[Person]虽然以Person为类型参数,但Spark框架本身不了解
Person类的内部结构。而右侧的DataFrame却提供了详细的结构信息,使前友得Spark
SQL可以清楚地知道该数据集中包含哪些列,每列的名称和类型各是什么。DataFrame多了数据的结构信息,即schema。RDD是分布式的
Java对象的集合。DataFrame是分慧姿槐布式的Row对象的集合。DataFrame除了提供了比RDD更丰富的算子以外,更重要的特点是提升执行效
率、减少数据读取以及执行计划的优化,比如filter下推、裁剪等。
spark中什么是dataframe
sparksql中为了将普通的rdd可以进行执行sql的操作,而将rdd封装成一个结构化的模型, 就是dataframe, 获得凯笑dataframe后就可以创建临时表进行sql操段孙宏作了。握册
如何打印 spark dataframe
打印DataFrame里面的模式
在创建完DataFrame之后,我前顷信们一般都会慧轮查看里面数据的模式,我们可以通过printSchema函数来查乎厅看。它会打印出列的名称和类型:
students.printSchema
root
|--
id:
string (nullable =
true)
|--
studentName:
string (nullable =
true)
|--
phone:
string (nullable =
true)
|--
email:
string (nullable =
true)
如果采用的是load方式参见DataFrame的,students.printSchema的输出则如下:
root
|--
id|studentName|phone|email:
string (nullable =
true)
sparkdataframe转换成字节流
本文介绍基于Spark(2.0+)的Json字符串和DataFrame相互转换。
json字符串转DataFrame
spark提供了将json字符串解析为DF的接口,如果不指定生成的DF的schema,默认spark会先扫码一遍给的json字符串,然后推断生成DF的schema:
* 若列数据全为null会用String类型
* 整数默认会用Long类型
* 浮点数默认会用Double类型 val json1 = """{"a":null, "b": 23.1, "c": 1}""" val json2 =
"""{"a":null, "b": "hello", "d": 1.2}""" val ds =
spark.createDataset(Seq(json1, json2))val df = spark.read.json(ds) df.show
df.printSchema +----+-----+----+----+ | a| b| c| d| +----+-----+----+----+ |null
|23.1| 1|null| |null|hello|null| 1.2| +----+-----+----+----+ root |-- a: string
(nullable =true) |-- b: string (nullable = true) |-- c: long (nullable = true)
|-- d: double (nullable =true)
若指定schema会按照schema生成DF:
* schema中不存在州明举的列会被忽略
* 可以用两种方法指定schema,StructType和String,具体对应关系看后面
*
若数据无法册碧匹配schema中类型:若schema中列允许为null会转为null;若不允许为null会转为相应类型的空值(如Double类型为0.0值),若无法转换为值会抛出异常
val schema = StructType(List( StructField("a", ByteType, true), StructField("b"
, FloatType,false), StructField("c", ShortType, true) )) //或 val schema = "b
float, c short" val df = spark.read.schema(schema).json(ds) df.show
df.printSchema +----+----+----+ | a| b| c| +----+----+----+ |null|23.1| 1| |null
|0|null| +----+----+----+ root |-- a: byte (nullable = true) |-- b: float
(nullable =true) |-- c: short (nullable = true)
json解析相关配置参数
primitivesAsString (default false): 把所有列看作string类型
prefersDecimal(default false): 将小数看作decimal,如果不匹配decimal,就看做doubles.
allowComments (default false): 忽略json字符串槐迟中Java/C++风格的注释
allowUnquotedFieldNames (default false): 允许不加引号的列名
allowSingleQuotes (default true): 除双引号外,还允许用单引号
allowNumericLeadingZeros (default false): 允许数字中额外的前导0(如0012)
allowBackslashEscapingAnyCharacter (default false): 允许反斜杠机制接受所有字符
allowUnquotedControlChars (default false):
允许JSON字符串包含未加引号的控制字符(值小于32的ASCII字符,包括制表符和换行字符)。
mode (default PERMISSIVE): 允许在解析期间处理损坏记录的模式。
PERMISSIVE
:当遇到损坏的记录时,将其他字段设置为null,并将格式错误的字符串放入由columnNameOfCorruptRecord配置的字段中。若指定schema,在schema中设置名为columnNameOfCorruptRecord的字符串类型字段。
如果schema中不具有该字段,则会在分析过程中删除损坏的记录。若不指定schema(推断模式),它会在输出模式中隐式添加一个columnNameOfCorruptRecord字段。
DROPMALFORMED : 忽略整条损害记录
FAILFAST : 遇到损坏记录throws an exception
columnNameOfCorruptRecord
(默认值为spark.sql.columnNameOfCorruptRecord的值):允许PERMISSIVE
mode添加的新字段,会重写spark.sql.columnNameOfCorruptRecord
dateFormat (default yyyy-MM-dd): 自定义日期格式,遵循java.text.SimpleDateFormat格式.
只有日期部分(无详细时间)
timestampFormat (default yyyy-MM-dd’T’HH:mm:ss.SSSXXX):
自定义日期格式,遵循java.text.SimpleDateFormat格式. 可以有详细时间部分(到微秒)
multiLine (default false): 解析一个记录,该记录可能跨越多行,每个文件
以上参数可用option方法配置:
val stringDF = spark.read.option("primitivesAsString", "true").json(ds)
stringDF.show stringDF.printSchema +----+-----+----+----+ | a| b| c| d|
+----+-----+----+----+ |null| 23.1| 1|null| |null|hello|null| 1.2|
+----+-----+----+----+ root |-- a: string (nullable =true) |-- b: string
(nullable =true) |-- c: string (nullable = true) |-- d: string (nullable = true)
二进制类型会自动用base64编码方式表示
‘Man’(ascci) base64编码后为:”TWFu”
val byteArr = Array('M'.toByte, 'a'.toByte, 'n'.toByte) val binaryDs =
spark.createDataset(Seq(byteArr))val dsWithB64 = binaryDs.withColumn("b64",
base64(col("value"))) dsWithB64.show(false) dsWithB64.printSchema
+----------+----+ |value |b64 | +----------+----+ |[4D 61 6E]|TWFu|
+----------+----+ root |-- value: binary (nullable =true) |-- b64: string
(nullable =true) //=================================================
dsWithB64.toJSON.show(false) +-----------------------------+ |value |
+-----------------------------+ |{"value":"TWFu","b64":"TWFu"}|
+-----------------------------+
//================================================= val json =
"""{"value":"TWFu"}""" val jsonDs = spark.createDataset(Seq(json)) val binaryDF
= spark.read.schema("value binary").json(jsonDs ) binaryDF.show
binaryDF.printSchema +----------+ | value| +----------+ |[4D 61 6E]|
+----------+ root |-- value: binary (nullable =true)
指定schema示例:
以下是Spark SQL支持的所有基本类型:
val json = """{"stringc":"abc", "shortc":1, "integerc":null, "longc":3,
"floatc":4.5, "doublec":6.7, "decimalc":8.90, "booleanc":true, "bytec":23,
"binaryc":"TWFu", "datec":"2010-01-01", "timestampc":"2012-12-12
11:22:22.123123"}""" val ds = spark.createDataset(Seq(json)) val schema =
"stringc string, shortc short, integerc int, longc long, floatc float, doublec
double, decimalc decimal(10, 3), booleanc boolean, bytec byte, binaryc binary,
datec date, timestampc timestamp" val df = spark.read.schema(schema).json(ds)
df.show(false) df.printSchema
+-------+------+--------+-----+------+-------+--------+--------+-----+----------+----------+-----------------------+
|stringc|shortc|integerc|longc|floatc|doublec|decimalc|booleanc|bytec|binaryc
|datec |timestampc |
+-------+------+--------+-----+------+-------+--------+--------+-----+----------+----------+-----------------------+
|abc |1 |null |3 |4.5 |6.7 |8.900 |true |23 |[4D 61 6E]|2010-01-01|2012-12-12 11
:22:22.123|
+-------+------+--------+-----+------+-------+--------+--------+-----+----------+----------+-----------------------+
root |-- stringc: string (nullable =true) |-- shortc: short (nullable = true)
|-- integerc: integer (nullable =true) |-- longc: long (nullable = true) |--
floatc: float (nullable =true) |-- doublec: double (nullable = true) |--
decimalc: decimal(10,3) (nullable = true) |-- booleanc: boolean (nullable = true
) |-- bytec: byte (nullable =true) |-- binaryc: binary (nullable = true) |--
datec: date (nullable =true) |-- timestampc: timestamp (nullable = true)
复合类型:
val json = """ { "arrayc" : [ 1, 2, 3 ], "structc" : { "strc" : "efg",
"decimalc" : 1.1 }, "mapc" : { "key1" : 1.2, "key2" : 1.1 } } """ val ds =
spark.createDataset(Seq(json))val schema = "arrayc array, structc
struct, mapc map" val df =
spark.read.schema(schema).json(ds) df.show(false) df.printSchema
+---------+--------+--------------------------+ |arrayc |structc |mapc |
+---------+--------+--------------------------+ |[1, 2, 3]|[efg, 1]|[key1 - 1.2
, key2 -1.1]| +---------+--------+--------------------------+ root |-- arrayc:
array (nullable =true) | |-- element: short (containsNull = true) |-- structc:
struct (nullable =true) | |-- strc: string (nullable = true) | |-- decimalc:
decimal(10,0) (nullable = true) |-- mapc: map (nullable = true) | |-- key:
string | |-- value: float (valueContainsNull =true)
SparkSQL数据类型
基本类型:
DataType simpleString typeName sql defaultSize catalogString json
StringType string string STRING 20 string “string”
ShortType smallint short SMALLINT 2 smallint “short”
IntegerType int integer INT 4 int “integer”
LongType bigint long BIGINT 8 bigint “long”
FloatType float float FLOAT 4 float “float”
DoubleType double double DOUBLE 8 double “double”
DecimalType(10,3) decimal(10,3) decimal(10,3) DECIMAL(10,3) 8 decimal(10,3)
“decimal(10,3)”
BooleanType boolean boolean BOOLEAN 1 boolean “boolean”
ByteType tinyint byte TINYINT 1 tinyint “byte”
BinaryType binary binary BINARY 100 binary “binary”
DateType date date DATE 4 date “date”
TimestampType timestamp timestamp TIMESTAMP 8 timestamp “timestamp”
三个复合类型:
DataType simpleString typeName sql defaultSize catalogString json
ArrayType(IntegerType, true) array array ARRAY 4 array
{“type”:”array”,”elementType”:”integer”,”containsNull”:true}
MapType(StringType, LongType, true) map map MAP
28 map
{“type”:”map”,”keyType”:”string”,”valueType”:”long”,”valueContainsNull”:true}
StructType(StructField(“sf”, DoubleType)::Nil) struct struct
STRUCT 8 struct
{“type”:”struct”,”fields”:[{“name”:”sf”,”type”:”double”,”nullable”:true,”metadata”:{}}]}
关于sparkdataframe和sparkdataframe 转pandas 不能有空值吗的介绍到此就结束了,不知道你从中找到你需要的信息了吗 ?如果你还想了解更多这方面的信息,记得收藏关注本站。