Initialisation

  • Set input and output paths
  • Define a schema for the source file
  • Create a dictionary object that contains correct data type for each field (Note: Another way of achieving this would be to define data type for each column in the previous step where the schema is defined. However I have left this dictionary approach as a reference)
In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import year, month, col

spark = SparkSession.builder.appName("Price Paid Data").getOrCreate()
In [2]:
# input path
price_paid_data_path = "s3://open-data-uk/land-registry/raw-data/pp-complete.csv"

# output path where the processed data will be written to
price_paid_data_output_path = "s3://open_data-uk/land-registry/processed-data/parquet/pp-complete"
In [3]:
# The source files doesn't have a header that can be used to derive column names 
column_names = ['transaction_id', 'price', 'date_of_transfer', 
                'postcode','property_type', 'old_or_new', 'duration', 'paon',
                'saon', 'street', 'locality', 'town_city', 'district', 'county', 'ppd_category']
In [4]:
# Set schema with String as default data type for all columns
report_fields = [StructField(field, StringType(), True) for field in column_names]
report_schema = StructType(report_fields)
report_schema
Out[4]:
StructType(List(StructField(transaction_id,StringType,true),StructField(price,StringType,true),StructField(date_of_transfer,StringType,true),StructField(postcode,StringType,true),StructField(property_type,StringType,true),StructField(old_or_new,StringType,true),StructField(duration,StringType,true),StructField(paon,StringType,true),StructField(saon,StringType,true),StructField(street,StringType,true),StructField(locality,StringType,true),StructField(town_city,StringType,true),StructField(district,StringType,true),StructField(county,StringType,true),StructField(ppd_category,StringType,true)))
In [5]:
# Dictionary object with data type for 
dict_data_types = {
    'transaction_id': 'string', 
    'price': 'long', 
    'date_of_transfer': 'timestamp', 
    'postcode': 'string',
    'property_type': 'string', 
    'old_or_new': 'string', 
    'duration': 'string', 
    'paon': 'string', 
    'saon': 'string', 
    'street': 'string', 
    'locality': 'string', 
    'town_city': 'string', 
    'district': 'string', 
    'county': 'string', 
    'ppd_category': 'string'
}

Read data into a DataFrame

  • Read the csv file stored in S3
  • Apply dictionary created in previous step to set cirrect data type for each column
  • Drop extra columns that are not needed for analysis
  • Add a new field, Year from the date of transfer field. This field will be used to partition the data on Athena
  • Sort the data to improve compression of parquet files
In [7]:
df_price_data = spark.read.csv(price_paid_data_path, schema=report_schema) \
                .select(*(col(c).cast(dict_data_types[c]).alias(c) for c in dict_data_types)) \
                .drop('ppd_category') \
                .withColumn('year', year('date_of_transfer')) \
                .orderBy(['year', 'date_of_transfer', 'county', 'town_city', 'postcode']) 
                
df_price_data.show(3)
+--------+----------------+--------------------+----------+-------------------+--------------+--------+----+--------+-----+--------------------+-----------+---------+-------------+----+
|locality|          street|      transaction_id|old_or_new|   date_of_transfer|        county|postcode|saon|duration|price|            district|       paon|town_city|property_type|year|
+--------+----------------+--------------------+----------+-------------------+--------------+--------+----+--------+-----+--------------------+-----------+---------+-------------+----+
| BRISTOL|      CANADA WAY|{E32B0A70-13E1-4F...|         N|1995-01-01 00:00:00|          AVON| BS1 6XF|   7|       L|50000|             BRISTOL|WEARE COURT|  BRISTOL|            F|1995|
| BRISTOL|BARROWMEAD DRIVE|{03849A0F-DB07-45...|         N|1995-01-01 00:00:00|          AVON|BS11 0JH|null|       F|49000|             BRISTOL|        21A|  BRISTOL|            D|1995|
|MELDRETH|      THE GRANGE|{D05D3E49-9F92-47...|         N|1995-01-01 00:00:00|CAMBRIDGESHIRE| SG8 6JZ|null|       F|77000|SOUTH CAMBRIDGESHIRE|         13|  ROYSTON|            S|1995|
+--------+----------------+--------------------+----------+-------------------+--------------+--------+----+--------+-----+--------------------+-----------+---------+-------------+----+
only showing top 3 rows

Write Processed Data Partitioned By Year In Parquet Format.

In [ ]:
 
In [ ]:
 
In [8]:
df_price_data.write.partitionBy('year').parquet(price_paid_data_output_path,mode='overwrite')