XML Format: User Documentation
Loading XML Data into Databricks
Prerequisites
Parsing XML Record
val xmlString = """
<books>
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
</book>
<book id="bk104">
<author>Corets, Eva</author>
<title>Oberon's Legacy</title>
</book>
</books>
"""
val xmlPath = inputPath + "books.xml"
dbutils.fs.put(xmlPath, xmlString)
val df = spark.read.option( "rowTag" , "book" ).xml(xmlPath)
df.printSchema()
df.show(truncate= false )
root
|-- book: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- _id: string (nullable = true)
| | |-- author: string (nullable = true)
| | |-- title: string (nullable = true)
+------------------------------------------------------------------------------+
|book |
+------------------------------------------------------------------------------+
|[{bk103, Corets, Eva, Maeve Ascendant}, {bk104, Corets, Eva, Oberon's Legacy}]|
+------------------------------------------------------------------------------+
val df = spark.read.option( "rowTag" , "book" ).xml(xmlPath)
root
|-- _id: string (nullable = true)
|-- author: string (nullable = true)
|-- title: string (nullable = true)
+-----+-----------+---------------+
|_id |author |title |
+-----+-----------+---------------+
|bk103|Corets, Eva|Maeve Ascendant|
|bk104|Corets, Eva|Oberon's Legacy|
+-----+-----------+---------------+
Data Source Option
XSD Support
rowValidationXSDPath
import org . apache . spark . sql . catalyst . xml . XSDToSchema
import java . nio . file . Paths
val xsdPath = inputPath + "books.xsd"
val xsdString = """<?xml version="1.0" encoding="UTF-8" ?>
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
<xs:element name="book">
<xs:complexType>
<xs:sequence>
<xs:element name="author" type="xs:string" />
<xs:element name="title" type="xs:string" />
<xs:element name="genre" type="xs:string" />
<xs:element name="price" type="xs:decimal" />
<xs:element name="publish_date" type="xs:date" />
<xs:element name="description" type="xs:string" />
</xs:sequence>
<xs:attribute name="id" type="xs:string" use="required" />
</xs:complexType>
</xs:element>
</xs:schema>"""
dbutils.fs.rm(xsdPath, true )
dbutils.fs.put(xsdPath, xsdString)
val schema1 = XSDToSchema .read(xsdString)
val schema2 = XSDToSchema .read( Paths .get( "/dbfs" + xsdPath))
Parsing Nested XML
XML data in a string-valued column in an existing DataFrame can be parsed with schema_of_xml and
from_xml that returns the schema and the parsed results as a new struct column respectively.
XML data passed as argument to schema_of_xml and from_xml must be a single well-formed XML
record.
Syntax:
schema_of_xml(xmlStr [, options] )
Arguments
xmlStr: A STRING expression specifying a single well-formed XML record.
options: An optional MAP<STRING,STRING> literal specifying directives.
Returns
A STRING holding a definition of a struct with n fields of strings where the column names are derived
from the XML element and attribute names. The field values hold the derived formatted SQL types.
Syntax:
from_xml(xmlStr, schema [, options])
Arguments
xmlStr: A STRING expression specifying a single well-formed XML record.
schema: A STRING expression or invocation of schema_of_xml function.
options: An optional MAP<STRING,STRING> literal specifying directives.
Returns
A struct with field names and types matching the schema definition.
Schema must be defined as comma-separated column name and data type pairs as used in for
example CREATE TABLE. Most options shown in the Data Source options are applicable with the
following exception:
rowTag : As there is only one XML record, the rowTag option is not applicable.
mode (default: PERMISSIVE ): allows a mode for dealing with corrupt records during parsing.
PERMISSIVE : when it meets a corrupted record, puts the malformed string into a field
configured by columnNameOfCorruptRecord , and sets malformed fields to null. To keep
corrupt records, you can set a string type field named columnNameOfCorruptRecord in
an user-defined schema. If a schema does not have the field, it drops corrupt records
during parsing. When inferring a schema, it implicitly adds a
columnNameOfCorruptRecord field in an output schema.
FAILFAST : throws an exception when it meets corrupted records.
Structure Conversion
DataFrame
DataFrame DataFrame
excludeAttribute
Conversion from XML to DataFrame
attributePrefix
<one myOneAttrib="AAAA">
<two>two</two>
<three>three</three>
</one>
root
|-- _myOneAttrib: string (nullable = true)
|-- two: string (nullable = true)
|-- three: string (nullable = true)
valueTag
<one>
<two myTwoAttrib="BBBBB">two</two>
<three>three</three>
</one>
root
|-- two: struct (nullable = true)
| |-- _VALUE: string (nullable = true)
| |-- _myTwoAttrib: string (nullable = true)
|-- three: string (nullable = true)
Conversion from DataFrame to XML
DataFrame
ArrayType ArrayType
DataFrame
DataFrame
DataFrame
|-- a: array (nullable = true)
| |-- element: array (containsNull = true)
| | |-- element: string (containsNull = true)
+------------------------------------+
| a|
+------------------------------------+
|[WrappedArray(aa), WrappedArray(bb)]|
+------------------------------------+
<a>
<item>aa</item>
</a>
<a>
<item>bb</item>
</a>
Rescued data column
Schema inference and evolution in Auto Loader
Examples
SQL API
%sql
DROP TABLE IF EXISTS books;
CREATE TABLE books
USING XML
OPTIONS (path "books.xml" , rowTag "book" );
SELECT * FROM books;
%sql
DROP TABLE IF EXISTS books;
CREATE TABLE books (author string , description string , genre string , _id string ,
price double , publish_date string , title string )
USING XML
OPTIONS (path "books.xml" , rowTag "book" );
Load XML using COPY INTO
%sql
DROP TABLE IF EXISTS books;
CREATE TABLE IF NOT EXISTS books;
COPY INTO books
FROM "/FileStore/xmltestDir/input/books.xml"
FILEFORMAT = XML
FORMAT_OPTIONS ( 'mergeSchema' = 'true' , 'rowTag' = 'book' )
COPY_OPTIONS ( 'mergeSchema' = 'true' );
Scala API
val df = spark.read
.option( "rowTag" , "book" )
.xml( "books.xml" )
val selectedData = df.select( "author" , "_id" )
selectedData.write
.option( "rootTag" , "books" )
.option( "rowTag" , "book" )
.xml( "newbooks.xml" )
import org . apache . spark . sql . types .{ StructType , StructField , StringType , DoubleType }
val customSchema = StructType ( Array (
StructField ( "_id" , StringType , nullable = true ),
StructField ( "author" , StringType , nullable = true ),
StructField ( "description" , StringType , nullable = true ),
StructField ( "genre" , StringType , nullable = true ),
StructField ( "price" , DoubleType , nullable = true ),
StructField ( "publish_date" , StringType , nullable = true ),
StructField ( "title" , StringType , nullable = true )))
val df = spark.read
.option( "rowTag" , "book" )
.schema(customSchema)
.xml( "books.xml" )
val selectedData = df.select( "author" , "_id" )
selectedData.write
.option( "rootTag" , "books" )
.option( "rowTag" , "book" )
.xml( "newbooks.xml" )
Python API
%python
df = spark.read. format ( 'xml' ).options(rowTag= 'book' ).load( 'books.xml' )
df.select( "author" , "_id" ).write \
. format ( 'xml' ) \
.options(rowTag= 'book' , rootTag= 'books' ) \
.save( 'newbooks.xml' )
%python
from pyspark.sql.types import *
customSchema = StructType([
StructField( "_id" , StringType(), True ),
StructField( "author" , StringType(), True ),
StructField( "description" , StringType(), True ),
StructField( "genre" , StringType(), True ),
StructField( "price" , DoubleType(), True ),
StructField( "publish_date" , StringType(), True ),
StructField( "title" , StringType(), True )])
df = spark.read \
. format ( 'xml' ) \
.options(rowTag= 'book' ) \
.load( 'books.xml' , schema = customSchema)
df.select( "author" , "_id" ).write \
. format ( 'xml' ) \
.options(rowTag= 'book' , rootTag= 'books' ) \
.save( 'newbooks.xml' )
R API
%r
df <- loadDF( "books.xml" , source = "xml" , rowTag = "book" )
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
saveDF(df, "newbooks.xml" , "xml" , "overwrite" )
%r
customSchema <- structType(
structField( "_id" , "string" ),
structField( "author" , "string" ),
structField( "description" , "string" ),
structField( "genre" , "string" ),
structField( "price" , "double" ),
structField( "publish_date" , "string" ),
structField( "title" , "string" ))
df <- loadDF( "books.xml" , source = "xml" , schema = customSchema, rowTag = "book" )
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
saveDF(df, "newbooks.xml" , "xml" , "overwrite" )
Read XML with Row Validation
val df = spark.read
.option( "rowTag" , "book" )
.option( "rowValidationXSDPath" , xsdPath)
.xml(inputPath)
df.printSchema
Parsing Nested XML (from_xml and schema_of_xml)
import org . apache . spark . sql . functions .{ from_xml , schema_of_xml , lit }
val xmlData = """
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
<genre>Fantasy</genre>
<price>5.95</price>
<publish_date>2000-11-17</publish_date>
</book>""" .stripMargin
val df = Seq (( 8 , xmlData)).toDF( "number" , "payload" )
val schema = schema_of_xml(xmlData)
val parsed = df.withColumn( "parsed" , from_xml($ "payload" , schema))
parsed.printSchema()
parsed.show()
From_xml and schema_of_xml with SQL API
%sql
SELECT from_xml( '
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
<genre>Fantasy</genre>
<price>5.95</price>
<publish_date>2000-11-17</publish_date>
</book>' ,
schema_of_xml( '
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
<genre>Fantasy</genre>
<price>5.95</price>
<publish_date>2000-11-17</publish_date>
</book>' )
);
Loading XML with Auto Loader
val query = spark
.readStream
.format( "cloudFiles" )
.option( "cloudFiles.format" , "xml" )
.option( "rowTag" , "book" )
.option( "cloudFiles.inferColumnTypes" , true )
.option( "cloudFiles.schemaLocation" , schemaPath)
.option( "cloudFiles.schemaEvolutionMode" , "rescue" )
.load(inputPath)
.writeStream
.format( "delta" )
.option( "mergeSchema" , "true" )
.option( "checkpointLocation" , checkPointPath)
.trigger( Trigger . AvailableNow ())
query.start(outputPath).awaitTermination()
val df = spark.read.format( "delta" ).load(outputPath)
df.show()