How to convert nested Hive query schema into Bigquery schema usi

ghz 4days ago ⋅ 6 views

How to convert nested Hive query schema into Bigquery schema using Python?

Quick Note Before Question: During the migration from on-prem hadoop instance to BigQuery we needed to transfer a lot of Hive Schema to BigQuery schema. I asked similar question for unnested schema transformation and @Anjela kindly answered the question which was very useful. But there is another use case to transfer nested struct type schema to BigQuery Schema as you can find details below

Sample Hive Schema:

sample =

"reports array<struct<orderlineid:string,ordernumber:string,price:struct<currencycode:string,value:double>,quantity:int,serialnumbers:array<string>,sku:string>>"

Required BigQuery Schema:

bigquery.SchemaField("reports", "RECORD", mode="REPEATED",
        fields=(
             bigquery.SchemaField('orderline', 'STRING'),
             bigquery.SchemaField('ordernumber', 'STRING'),
             bigquery.SchemaField('price', 'RECORD'),
                 fields=(
                     bigquery.SchemaField('currencyCode', 'STRING'),
                     bigquery.SchemaField('value', 'FLOAT')  
               )
             bigquery.SchemaField('quantity', 'INTEGER'),
             bigquery.SchemaField('serialnumbers', 'STRING', mode=REPEATED),
             bigquery.SchemaField('sku', 'STRING'),
               )
     )

What we have from previous question which is useful to transfer unnested schema to bigquery schema:

import re

from google.cloud import bigquery



def is_even(number):
    if (number % 2) == 0:
        return True
    else:
        return False

def clean_string(str_value):
    return re.sub(r'[\W_]+', '', str_value)

def convert_to_bqdict(api_string):
    """
    This only works for a struct with multiple fields
    This could give you an idea on constructing a schema dict for BigQuery
    """
    num_even = True
    main_dict = {}
    struct_dict = {}
    field_arr = []
    schema_arr = []

# Hard coded this since not sure what the string will look like if there are more inputs
init_struct = sample.split(' ')
main_dict["name"] = init_struct[0]
main_dict["type"] = "RECORD"
main_dict["mode"] = "NULLABLE"

cont_struct = init_struct[1].split('<')
num_elem = len(cont_struct)

# parse fields inside of struct<
for i in range(0,num_elem):
    num_even = is_even(i)
    # fields are seen on even indices
    if num_even and i != 0:
        temp = list(filter(None,cont_struct[i].split(','))) # remove blank elements
        for elem in temp:
            fields = list(filter(None,elem.split(':')))

            struct_dict["name"] = clean_string(fields[0]) 
            # "type" works for STRING as of the moment refer to 
            # https://cloud.google.com/bigquery/docs/schemas#standard_sql_data_types
            # for the accepted data types
            struct_dict["type"] = clean_string(fields[1]).upper() 
            struct_dict["mode"] = "NULLABLE" 
            
            field_arr.append(struct_dict)
            struct_dict = {}

main_dict["fields"] = field_arr # assign dict to array of fields
schema_arr.append(main_dict)

return schema_arr

sample = "reports array<struct<imageUrl:string,reportedBy:string,newfield:bool>>"

bq_dict = convert_to_bqdict(sample)

client = bigquery.Client()
project = client.project
dataset_ref = bigquery.DatasetReference(project, '20211228')
table_ref = dataset_ref.table("20220203")
table = bigquery.Table(table_ref, schema=bq_dict)
table = client.create_table(table)  

Above script from @Anjela B. is transfering unnested query from hive schema to bigquery schema as shown below:

"name":"reports"
"col_type":"array<struct<imageUrl:string,reportedBy:string>>"

Answer

To convert a nested schema (like a struct containing another struct or an array of structs) from Hive to BigQuery, you need to parse the schema carefully, account for nested fields, and adjust the schema structure accordingly. You can extend the previous script to handle such cases by recognizing nested structures and ensuring that the correct RECORD and REPEATED modes are applied.

Here’s an updated version of your code that handles nested schemas, including arrays and structs, and converts them to a BigQuery-compatible schema.

Updated Script for Handling Nested Structs

import re
from google.cloud import bigquery

def is_even(number):
    return number % 2 == 0

def clean_string(str_value):
    # Clean string to ensure proper format for BigQuery
    return re.sub(r'[\W_]+', '', str_value)

def convert_to_bqdict(api_string):
    """
    Convert a nested Hive schema (with structs and arrays) to BigQuery schema.
    This function supports nested structs and arrays of structs.
    """
    # Initialize the dictionary for the BigQuery schema
    main_dict = {}
    field_arr = []
    
    # Split the string to separate the name and the type definition
    init_struct = api_string.split(' ', 1)
    main_dict["name"] = init_struct[0]
    main_dict["type"] = "RECORD"
    main_dict["mode"] = "REPEATED" if "array" in init_struct[1] else "NULLABLE"

    # Process the inner struct details, which is found after "array<struct<...>>"
    cont_struct = init_struct[1].split('<')
    num_elem = len(cont_struct)

    # Loop through the split string and handle nested structures
    struct_dict = {}
    for i in range(0, num_elem):
        # If we are at an even index, we are dealing with the fields in the struct
        if is_even(i) and i != 0:
            temp = list(filter(None, cont_struct[i].split(',')))  # Remove empty elements
            for elem in temp:
                fields = list(filter(None, elem.split(':')))
                
                # Build the schema for each field
                field_name = clean_string(fields[0])
                field_type = clean_string(fields[1]).upper()

                struct_dict = {
                    "name": field_name,
                    "type": field_type,
                    "mode": "NULLABLE"
                }

                # Check if the type is a nested struct or an array
                if 'struct' in field_type:
                    struct_dict["type"] = "RECORD"
                    struct_dict["fields"] = []  # Need to further process nested struct fields
                elif 'array' in field_type:
                    struct_dict["mode"] = "REPEATED"  # Array fields are repeated

                field_arr.append(struct_dict)
                struct_dict = {}

    main_dict["fields"] = field_arr  # Add fields to the main dictionary
    return [main_dict]

# Example Schema from Hive
sample = "reports array<struct<orderlineid:string,ordernumber:string,price:struct<currencycode:string,value:double>,quantity:int,serialnumbers:array<string>,sku:string>>"

# Convert the schema from Hive to BigQuery format
bq_dict = convert_to_bqdict(sample)

# Initialize the BigQuery client
client = bigquery.Client()
project = client.project
dataset_ref = bigquery.DatasetReference(project, '20211228')
table_ref = dataset_ref.table("20220203")

# Create the table schema based on the converted BigQuery dictionary
table = bigquery.Table(table_ref, schema=bq_dict)

# Create the table in BigQuery (this will create a new table with the specified schema)
table = client.create_table(table)

# Output schema to verify correctness
print(table.schema)

Explanation of Updates:

  1. is_even(): Used to determine if an index is even (helps separate the struct fields from the other parts of the schema).
  2. clean_string(): Ensures that the field names are cleaned up (removes special characters, etc.), which is important for BigQuery schema compatibility.
  3. Handling Nested Structs:
    • When the field type is a struct, it’s further processed by setting the field's type as RECORD and initializing an empty list for nested fields.
    • When the field is an array, the mode is set to REPEATED (because BigQuery uses this to indicate array fields).
  4. Field Parsing: The function iterates over each part of the schema, processes the field names and types, and applies the necessary BigQuery field types like STRING, FLOAT, INTEGER, RECORD, and REPEATED.

Example Input and Output:

Input Schema:

"reports array<struct<orderlineid:string,ordernumber:string,price:struct<currencycode:string,value:double>,quantity:int,serialnumbers:array<string>,sku:string>>"

Output BigQuery Schema:

[
    {
        "name": "reports",
        "type": "RECORD",
        "mode": "REPEATED",
        "fields": [
            {"name": "orderlineid", "type": "STRING", "mode": "NULLABLE"},
            {"name": "ordernumber", "type": "STRING", "mode": "NULLABLE"},
            {
                "name": "price",
                "type": "RECORD",
                "mode": "NULLABLE",
                "fields": [
                    {"name": "currencycode", "type": "STRING", "mode": "NULLABLE"},
                    {"name": "value", "type": "FLOAT", "mode": "NULLABLE"}
                ]
            },
            {"name": "quantity", "type": "INTEGER", "mode": "NULLABLE"},
            {"name": "serialnumbers", "type": "STRING", "mode": "REPEATED"},
            {"name": "sku", "type": "STRING", "mode": "NULLABLE"}
        ]
    }
]

Notes:

  • Array Fields: For array fields, the REPEATED mode is set in BigQuery, which indicates that these fields contain multiple values.
  • Nested Structs: If there are nested structs, the code ensures they are handled as RECORD fields and processes their internal fields recursively.
  • This solution should work for any nested schema with structs and arrays, making it flexible for various use cases during migration from Hive to BigQuery.

Let me know if you'd like further improvements or clarifications!