Skip to main content
Data Science Series (2 of 3): Data Extraction and Transformation in ETL Projects | Kranio

Correct data types

In this case, we create the function set_correct_data_types which is responsible for standardizing the data to assign the correct data type to each column and at the same time make modifications to those values depending on the assigned data type.

In this code example, we correct values with negative signs on the left, incorrect formats in text values, and date formats.

def set_correct_data_types(df):

    columns_types_dict = {
        'DATE': DateType(),
        'QUANTITY': IntegerType(),
        'US_SALES': DoubleType(),
        'NAME': StringType(),
        'PRODUCT_CODE': IntegerType(),
				...
        }

    for column_name, column_type in columns_types_dict.items():
        if column_type == DoubleType():
            column = col(column_name)
            negative_col = concat(column.substr(-1,1), column.substr(lit(1), length(column)-1))
            df = df.withColumn(
                column_name, 
                when(
                    column.endswith('-'), 
                    negative_col.cast(column_type)
                ).otherwise(column.cast(column_type))
            )
            
        elif column_type == DateType():
            df = df.withColumn(
                column_name, to_date(col(column_name), 'yyyyMMdd')
            )
        elif column_type == StringType():
            df = df.withColumn(
                column_name,
                regexp_replace(column_name, '"', '')
            )
    return {'add_correct_data_types': df}

Obtain master relationships

In our example, we have a master data called tipo_producto_detail.csv which contains the descriptive values used to build two of the columns in the final report.

Transactional values like venta_ropa.csv have a column called codigo_producto which, when related to the codigo column of the master file tipo_producto_detail.csv, allows obtaining the product code name and the product description which we will rename descripcion. The result is that we add two additional columns to ventas_ropa.csv by relating it to the master file, as shown in the following image:

In the code, we use .join to get the columns we need by matching the column codigo_producto with codigo.
def add_tipo_producto_descr(df):
  master = dataframes_dict['tipo_producto_detail'].select('codigo', 'nombre')
  df = df.join(master, df.codigo_producto == master.codigo, 'left') \
                            .withColumnRenamed('nombre', 'nombre') \
                            .drop('codigo')
  return {'add_tipo_producto': df}

def add_descripcion_descr(df):
  master = dataframes_dict['tipo_producto_detail'].select('codigo', 'desc')
  df = df.join(master, df.codigo_producto == master.codigo, 'left') \
                            .withColumnRenamed('desc', 'descripcion') \
                            .drop('codigo')
  return {'add_descripcion': df}

Filter to generate new data

We can typically use filters to get only the information relevant to our goal. However, as we will see in the next case, it can also be used to build new data.

With withColumn() we can modify the value of a column or create it if we add the name of a column that does not exist. Then with when we can add a condition and the value that will be applied if this condition matches. In the example, we use it to create the category column.
def add_category_filter(df):
	df = df.withColumn(
			'CATEGORY',
				when( 
					(df.tipo_producto == 'Polera') | 
					(df.tipo_producto == 'Poleron') |
					(df.tipo_producto == 'Gorro') \
					, lit('Clothing').cast(StringType()) \
				.when( 
					(df.tipo_producto == 'Lentes') | 
					(df.tipo_producto == 'Billetera') \
					, lit('Accessories').cast(StringType()) \
				.otherwise('')
		)

Understanding the result

Based on what we saw earlier, we can describe the transformations through the following figure:

What we did was take two data sources that shared characteristics, venta_ropa.csv and venta_accesorios.csv, performed standardization processes (Data correction script), obtained new columns by relating a column from the original file with the master source (Master relationships script), and through a filtering process generated a new column by matching a condition (Filtering script). Additionally, we could take the learned references and perform more complex calculations to, for example, use the transactional table from costos_margen.csv and in this way obtain Forecast values.

Additionally, since the data is now categorized by the category column, we could obtain the sum of the numerical columns, thus extending the scope and purpose of our transformations.

In the next article of the series, we will look in detail at the next step of the ETL flow, data loading and reconciliation of results. We hope the article has been helpful; if you have any questions or your organization needs help solving projects like this, do not hesitate to contact us.

Ready to optimize your data processes with efficient ETL flows?

At Kranio, we accompany you at every stage of your Data Science projects, from data extraction and transformation to the implementation of analytical solutions. Our team of experts is ready to help you transform your data into strategic decisions. Contact us and discover how we can drive your company's digital transformation.

Previous Posts

Google Apps Scripts: Automation and Efficiency within the Google Ecosystem

Google Apps Scripts: Automation and Efficiency within the Google Ecosystem

Automate tasks, connect Google Workspace, and enhance internal processes with Google Apps Script. An efficient solution for teams and businesses.

Augmented Coding vs. Vibe Coding

Augmented Coding vs. Vibe Coding

AI generates functional code but does not guarantee security. Learn to use it wisely to build robust, scalable, and risk-free software.

Kraneating is also about protection: the process behind our ISO 27001 certification