Skip to content

QST: Using a chunksize iterator with StataWriter for very large queries #55881

Open
@FlipperPA

Description

@FlipperPA

Research

  • I have searched the [pandas] tag on StackOverflow for similar questions.

  • I have asked my usage related question on StackOverflow.

Link to question on StackOverflow

https://stackoverflow.com/questions/77446083/pandas-statawriter-writing-an-iterator-for-large-queries-dta-file-corrupt

Question about pandas

The details are in the Stack Overflow post, I'll include it below. If this is something we might be interested in bringing into core, I'm more than happy to work on a PR.

Here's how I'm instantiating the class:

import numpy as np
from pandas.io.stata import StataWriter
from sqlalchemy import create_engine
from stata import SQLStataWriter


engine = create_engine("postgresql+psycopg://{user}@{host}:{port}/{dbname}".format([credentials])).connect()

sql = "SELECT cusip, permno, permco, bidlo, date FROM crsp_a_stock.dsf LIMIT 1000"

writer = SQLStataWriter(
     "./test.dta",
    sql,
    engine,
    50,
    dtype={
        "date": "datetime64[ns]",
    },
)
writer.write_file()

Here's the related post from Stack Overflow:

I'm trying to subclass the pandas StataWriter to allow passing a SQL query with chunksize to avoid running out of memory on very large result sets. I've gotten most of the way there, but am getting an error when I try to open up the file that is written by pandas in STATA: .dta file corrupt: The marker </data> was not found in the file where it was expected to be. This means that the file was written incorrectly or that the file has subsequently become damaged.

From what I can tell in my code, it should be working; the self._update_map("data") is properly update the location of the tag. Here are the contents of self._map:

{'stata_data': 0, 'map': 158, 'variable_types': 281, 'varnames': 326, 'sortlist': 1121, 'formats': 1156, 'value_label_names': 1517, 'variable_labels': 2330, 'characteristics': 4291, 'data': 4326, 'strls': 52339, 'value_labels': 52354, 'stata_data_close': 52383, 'end-of-file': 52395}

This end-of-file entry matches the byte-size of the file (52,395 == 52,395):

-rw------- 1 tallen wrds 52395 Nov  8 08:34 test.dta

Here's the code I've come up with. What am I missing to properly position the end </data> tag?

from collections.abc import Hashable, Sequence
from datetime import datetime

from pandas import read_sql_query
from pandas._typing import (
    CompressionOptions,
    DtypeArg,
    FilePath,
    StorageOptions,
    WriteBuffer,
)
from pandas.io.stata import StataWriterUTF8
from sqlalchemy import text
from sqlalchemy.engine.base import Connection


class SQLStataWriter(StataWriterUTF8):
    """
    Writes a STATA binary file without using an eager dataframe, avoiding loading the
    entire result into memory.

    Only supports writing modern STATA 15 (118) and 16 (119) .dta files since they are
    the only versions to support UTF-8.
    """

    def __init__(
        self,
        fname: FilePath | WriteBuffer[bytes],
        sql: str = None,
        engine: Connection = None,
        chunksize: int = 10000,
        dtype: DtypeArg | None = ...,
        convert_dates: dict[Hashable, str] | None = None,
        write_index: bool = True,
        byteorder: str | None = None,
        time_stamp: datetime | None = None,
        data_label: str | None = None,
        variable_labels: dict[Hashable, str] | None = None,
        convert_strl: Sequence[Hashable] | None = None,
        version: int | None = None,
        compression: CompressionOptions = "infer",
        storage_options: StorageOptions | None = None,
        *,
        value_labels: dict[Hashable, dict[float, str]] | None = None,
    ) -> None:
        # Bind the additional variables to self
        self.sql = text(sql)
        self.engine = engine
        self.chunksize = chunksize
        self.dtype = dtype

        # Create the dataframe for init by pulling the first row only
        for data in read_sql_query(
            self.sql,
            self.engine,
            chunksize=1,
            dtype=self.dtype,
        ):
            break

        super().__init__(
            fname,
            data,
            convert_dates,
            write_index,
            byteorder=byteorder,
            time_stamp=time_stamp,
            data_label=data_label,
            variable_labels=variable_labels,
            convert_strl=convert_strl,
            version=version,
            compression=compression,
            storage_options=storage_options,
            value_labels=value_labels,
        )

    def _prepare_data(self):
        """
        This will be called within _write_data in the loop instead.
        """
        return None

    def _write_data(self, records) -> None:
        """
        Override this to loop over records in chunksize.
        """
        self._update_map("data")
        self._write_bytes(b"<data>")

        # Instead of eagerly loading the entire dataframe, do it in chunks.
        for self.data in read_sql_query(
            self.sql,
            self.engine,
            chunksize=self.chunksize,
            dtype=self.dtype,
        ):
            # Insert an index column or values expected will be off by one. Be
            # sure to use len(self.data) in case the last chunk is fewer rows
            # than the chunksize.
            self.data.insert(0, "index", list(range(0, len(self.data))))

            # Call the parent function to prepare rows for this chunk only.
            records = super()._prepare_data()
            # Write the records to the file
            self._write_bytes(records.tobytes())

        self._write_bytes(b"</data>")

Metadata

Metadata

Assignees

No one assigned

    Labels

    Needs TriageIssue that has not been reviewed by a pandas team memberUsage Question

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions