from_json
If you have JSON object in a column, and need to do any transformation you can use from_json. from_json can parses a column containing a JSON string into a MapType with StringType as keys type, StructType or ArrayType with the specified schema. If it encounterd any unparsable value , it will return null.
Function : pyspark.sql.functions.from_json(col, schema, options=None) Syntax : from_json(col, schema, options=None)
Note :
- “col” should be well formed with respect to schema and options
- It is not possible for map values to have different types. Use a struct for this situation.
Example code
from pyspark.sql import SparkSession from pyspark.sql.types import * from pyspark.sql.functions import from_json from pyspark.sql.types import MapType,StringType,IntegerType from pyspark.sql.functions import from_json from pyspark.sql.types import StructType,StructField from pyspark.sql.functions import col spark = SparkSession.builder.appName('www.freshers.in training').getOrCreate() data = [ (1,"""{"country":"USA","country_id":100}""","USD"), (2,"""{"country":"UK","country_id":200}""","EURO"), (3,"""{"country":"INDIA","country_id":300}""","INR")] schema= ["sino","cntry_json","currency"] df = spark.createDataFrame(data,schema) df.show(20,False) +----+------------------------------------+--------+ |sino|cntry_json |currency| +----+------------------------------------+--------+ |1 |{"country":"USA","country_id":100} |USD | |2 |{"country":"UK","country_id":200} |EURO | |3 |{"country":"INDIA","country_id":300}|INR | +----+------------------------------------+--------+ df.printSchema() root |-- sino: long (nullable = true) |-- cntry_json: string (nullable = true) |-- currency: string (nullable = true) #Converting using from_json >>> df2 = df.select(df.sino,from_json(df.cntry_json,MapType(StringType(),StringType())),df.currency,) >>> df2.show(20,False) +----+-------------------------------------+--------+ |sino|entries |currency| +----+-------------------------------------+--------+ |1 |[country -> USA, country_id -> 100] |USD | |2 |[country -> UK, country_id -> 200] |EURO | |3 |[country -> INDIA, country_id -> 300]|INR | +----+-------------------------------------+--------+ df2.printSchema() root |-- sino: long (nullable = true) |-- entries: map (nullable = true) | |-- key: string | |-- value: string (valueContainsNull = true) |-- currency: string (nullable = true) #Here we are defining the schema for the JSON column schema2 = StructType([ StructField("country",StringType(),True), StructField("country_id",IntegerType(),True), ]) df3 = df.withColumn("cntry_json",from_json(df.cntry_json,schema2)) df3.show() +----+------------+--------+ |sino| cntry_json|currency| +----+------------+--------+ | 1| [USA, 100]| USD| | 2| [UK, 200]| EURO| | 3|[INDIA, 300]| INR| +----+------------+--------+ df3.printSchema() root |-- sino: long (nullable = true) |-- cntry_json: struct (nullable = true) | |-- country: string (nullable = true) | |-- country_id: integer (nullable = true) |-- currency: string (nullable = true)
Reference