from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import year, month, col
spark = SparkSession.builder.appName("Price Paid Data").getOrCreate()
# input path
price_paid_data_path = "s3://open-data-uk/land-registry/raw-data/pp-complete.csv"
# output path where the processed data will be written to
price_paid_data_output_path = "s3://open_data-uk/land-registry/processed-data/parquet/pp-complete"
# The source files doesn't have a header that can be used to derive column names
column_names = ['transaction_id', 'price', 'date_of_transfer',
'postcode','property_type', 'old_or_new', 'duration', 'paon',
'saon', 'street', 'locality', 'town_city', 'district', 'county', 'ppd_category']
# Set schema with String as default data type for all columns
report_fields = [StructField(field, StringType(), True) for field in column_names]
report_schema = StructType(report_fields)
report_schema
# Dictionary object with data type for
dict_data_types = {
'transaction_id': 'string',
'price': 'long',
'date_of_transfer': 'timestamp',
'postcode': 'string',
'property_type': 'string',
'old_or_new': 'string',
'duration': 'string',
'paon': 'string',
'saon': 'string',
'street': 'string',
'locality': 'string',
'town_city': 'string',
'district': 'string',
'county': 'string',
'ppd_category': 'string'
}
df_price_data = spark.read.csv(price_paid_data_path, schema=report_schema) \
.select(*(col(c).cast(dict_data_types[c]).alias(c) for c in dict_data_types)) \
.drop('ppd_category') \
.withColumn('year', year('date_of_transfer')) \
.orderBy(['year', 'date_of_transfer', 'county', 'town_city', 'postcode'])
df_price_data.show(3)
df_price_data.write.partitionBy('year').parquet(price_paid_data_output_path,mode='overwrite')