sparkdataframe(sparkdataframe转换成字节流)

本篇文章给大家谈谈sparkdataframe,以及sparkdataframe转换成字节流对应的知识点,希望对各位有所帮助,不要忘了收藏本站喔。

本文目录一览:

如何打印 spark dataframe

打印DataFrame里面的模式

在创建完DataFrame之后,我前顷信们一般都会慧轮查看里面数据的模式,我们可以通过printSchema函数来查乎厅看。它会打印出列的名称和类型:

students.printSchema

root

|--

id:

string (nullable =

true)

|--

studentName:

string (nullable =

true)

|--

phone:

string (nullable =

true)

|--

email:

string (nullable =

true)

如果采用的是load方式参见DataFrame的,students.printSchema的输出则如下:

root

|--

id|studentName|phone|email:

string (nullable =

true)

spark中什么是dataframe

sparksql中为了将普通的rdd可以进行执行sql的操作,而将rdd封装成一个结构化的模型, 就是dataframe, 获得凯笑dataframe后就可以创建临时表进行sql操段孙宏作了。握册

[img]

spark dataframe可以干什么

DataFrame是Spark SQL的一种编程抽象,它是一简大张分布式的表,是数据类型为Row的DataSet,可以简单认为:DataFrame是DataSet[Row]的别名。

你说我们得到了一张表可以做些什么呢?那些数据库的操作都可以,比如增删改查,联结操作等等,都是可以的。

推荐你去Spark官网查看官方文档,然后结合官方文档圆团、博客还拦腔竖有相关书籍,这样学起来比较快。

spark dataframe 字段可以有几种数据类型

import scala.collection.mutable.ArrayBuffer

import scala.io.Source

import java.io.PrintWriter

import util.control.Breaks._

import org.apache.spark.SparkContext

import org.apache.spark.sql.SQLContext

import java.sql.DriverManager

import java.sql.PreparedStatement

import java.sql.Connection

import org.apache.spark.sql.types.IntegerType

import org.apache.spark.sql.types.StructType

import org.apache.spark.sql.types.StructField

import org.apache.spark.sql.types.StringType

import org.apache.spark.sql.Row

import java.util.Properties

import org.apache.spark.sql.SaveMode

object SimpleDemo extends App {

val sc = new SparkContext("local[*]", "test")

val sqlc = new SQLContext(sc)

val driverUrl = "jdbc:mysql://ip:3306/ding?user=rootpassword=rootzeroDateTimeBehavior=convertToNullcharacterEncoding=utf-8"

val tableName = "tbaclusterresult"

//把数据转化为DataFrame,并注册为一个表

val df = sqlc.read.json("G:/data/json.txt")

df.registerTempTable("user")

val res = sqlc.sql("select * from user"亩核)

println(res.count() + "---------------------------")

res.collect().map { row =

{

println(row.toString())

}

}

//从MYSQL读取数据

val jdbcDF = sqlc.read

.options(Map("url" - driverUrl,

// "user" - "root",

// "password" - "root",

"dbtable" - tableName))

.format("jdbc")

.load()

println(jdbcDF.count() + "---------------------------")

jdbcDF.collect().map { row =轮陆

{

println(row.toString())

}

}

//插入数据至MYSQL

val schema = StructType(

StructField("name", StringType) ::

StructField("age", IntegerType)

:: Nil)

val data1 = sc.parallelize(List(("blog1", 301), ("iteblog", 29),

("com", 40), ("bt", 33), ("www", 23))).

map(item = Row.apply(item._1, item._2))

import sqlc.implicits._

val df1 = sqlc.createDataFrame(data1, schema)

// df1.write.jdbc(driverUrl, "sparktomysql"迅桐掘, new Properties)

df1.write.mode(SaveMode.Overwrite).jdbc(driverUrl, "testtable", new Properties)

//DataFrame类中还有insertIntoJDBC方法,调用该函数必须保证表事先存在,它只用于插入数据,函数原型如下:

//def insertIntoJDBC(url: String, table: String, overwrite: Boolean): Unit

//插入数据到MYSQL

val data = sc.parallelize(List(("www", 10), ("iteblog", 20), ("com", 30)))

data.foreachPartition(myFun)

case class Blog(name: String, count: Int)

def myFun(iterator: Iterator[(String, Int)]): Unit = {

var conn: Connection = null

var ps: PreparedStatement = null

val sql = "insert into blog(name, count) values (?, ?)"

try {

conn = DriverManager.getConnection(driverUrl, "root", "root")

iterator.foreach(data = {

ps = conn.prepareStatement(sql)

ps.setString(1, data._1)

ps.setInt(2, data._2)

ps.executeUpdate()

})

} catch {

case e: Exception = e.printStackTrace()

} finally {

if (ps != null) {

ps.close()

}

if (conn != null) {

conn.close()

}

}

}

}

关于sparkdataframe和sparkdataframe转换成字节流的介绍到此就结束了,不知道你从中找到你需要的信息了吗 ?如果你还想了解更多这方面的信息,记得收藏关注本站。

标签列表