Skip to content

Inserting Data into Databricks via the databricks-sql-python library (Leveraging SQLALCHEMY) #299

Open
@kopitb

Description

@kopitb

Issue Description: Inserting Data into Databricks via the databricks-sql-python library (Leveraging SQLALCHEMY)

Error Message:

sql

DatabaseError: (databricks.sql.exc.ServerOperationError) Column id is not specified in INSERT
[SQL: INSERT INTO model_integrated (name) VALUES (%(name)s)]
[parameters: {'name': 'Loadsheetname'}]
(Background on this error at: https://sqlalche.me/e/14/4xp6)

Overview:
I'm encountering an issue when attempting to insert records into a Databricks database using SQLAlchemy. The error suggests that the id column is not specified in the INSERT statement, leading to a ServerOperationError.

For what it's worth, this works perfectly fine when inserting into a PostgreSQL database.

Steps to Reproduce:

  1. Connect to Databricks using SQLAlchemy.
  2. Define SQLAlchemy models, including an auto-incrementing primary key (id) column.
  3. Attempt to insert records into the model_integrated table.
  4. Encounter the mentioned error.
    Expected Behavior:
    I expect the records to be inserted successfully into the Databricks database, with the auto-incrementing id column being generated by the database.

Environment:

Python Version: 3.11.4
Databricks-sql-python: 3.0.1

I have verified that a similar approach works for a PostgreSQL database but fails in Databricks.
The issue seems to be related to the auto-incrementing primary key behavior.
Code Snippet:

python

import pandas as pd
import sqlalchemy.orm
from datetime import datetime
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy import Column, Integer, String, ForeignKey, Numeric, DateTime
from databricks import sql

connection = sql.connect(
    server_hostname=HOST,
    http_path=HTTP_PATH,
    access_token=ACCESS_TOKEN)

print("Connection established")   

# SQLAlchemy setup
Base = declarative_base()

# Model class for "model" table
class ModelIntegrated(Base):
    __tablename__ = 'model_integrated'
    
    id = Column(Integer, primary_key=True, autoincrement=True)
    name = Column(String)    
    periods = relationship('PeriodIntegrated', backref=('model_integrated'))

# Model class for "period" table
class PeriodIntegrated(Base):
    __tablename__ = 'period_integrated'    
    id = Column(Integer, primary_key=True, autoincrement=True)
    model_id = Column(Integer, ForeignKey('model_integrated.id'))
    name = Column(String)
    solution = Column(Numeric)
    start = Column(DateTime)
    end = Column(DateTime)
    period_order = Column(Integer)

catalog = "<catalog>"

conn_string = (
    "databricks://token:{token}@{host}?http_path={http_path}&catalog={catalog}&schema={schema}".format(
        token=ACCESS_TOKEN,
        host=HOST,
        port=<port>,
        catalog=catalog,
        http_path=HTTP_PATH,
        schema="<schema>"
    )
)
print("this is the conn_string", conn_string)

engine = create_engine(conn_string, echo=True)

print("engine executed")

Session = sessionmaker(bind=engine)
session = Session()
excel_file_path = "loadsheet.xlsx"
xls = pd.ExcelFile(excel_file_path)
for tab_name in xls.sheet_names:
    print(f"Processing tab: {tab_name}")
    df = pd.read_excel(excel_file_path, sheet_name=tab_name)

    # Convert column names to lowercase
    df.columns = df.columns.str.lower()
    print("Data in the tab:")
    print(df)
    
    if tab_name == 'Model':
        print("Processing Model data")
        for _, row in df.iterrows():
            # PostreSQL solution
            model = ModelIntegrated(name=row['name'])
            session.add(model)
            session.commit()  # Commit the transaction
            
            # Retrieve the generated ID using a separate query
            model_id = session.query(ModelIntegrated.id).filter_by(name=row['name']).scalar()

            # session.flush()  # Get the auto-generated ID
            # model_id = model.id # Retrieve the ID
            print(f"Inserted Model with name: {model.name}, ID: {model_id}")
    
    elif tab_name == 'Period':
        print("Processing Period data")
        
        # Sort the DataFrame by "start" dates in ascending order
        df_sorted = df.sort_values(by='start')
        
        # Add a new column "period_order" with ascending integer values
        df_sorted['period_order'] = range(1, len(df_sorted) + 1)
        
        for _, row in df_sorted.iterrows():
            period = PeriodIntegrated(
                model_id=model_id,
                name=row['name'],
                solution=row['solution'],
                start=datetime.strptime(row['start'], '%Y-%m-%d %I:%M:%S %p'),  # Convert to datetime
                end=datetime.strptime(row['end'], '%Y-%m-%d %I:%M:%S %p')  # Convert to datetime
            )
            
            # Set the "period_order" attribute with the value from the DataFrame
            period.period_order = row['period_order']
            
            session.add(period)
            print(f"Inserted Period with name: {period.name}, Period Order: {period.period_order}")

# Commit the changes
session.commit()
session.close()

Note:
I have also reached out to the Databricks community for assistance.

Thank you,
Brent

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions