Description
Research
-
I have searched the [pandas] tag on StackOverflow for similar questions.
-
I have asked my usage related question on StackOverflow.
Link to question on StackOverflow
Question about pandas
The details are in the Stack Overflow post, I'll include it below. If this is something we might be interested in bringing into core, I'm more than happy to work on a PR.
Here's how I'm instantiating the class:
import numpy as np
from pandas.io.stata import StataWriter
from sqlalchemy import create_engine
from stata import SQLStataWriter
engine = create_engine("postgresql+psycopg://{user}@{host}:{port}/{dbname}".format([credentials])).connect()
sql = "SELECT cusip, permno, permco, bidlo, date FROM crsp_a_stock.dsf LIMIT 1000"
writer = SQLStataWriter(
"./test.dta",
sql,
engine,
50,
dtype={
"date": "datetime64[ns]",
},
)
writer.write_file()
Here's the related post from Stack Overflow:
I'm trying to subclass the pandas StataWriter
to allow passing a SQL query with chunksize
to avoid running out of memory on very large result sets. I've gotten most of the way there, but am getting an error when I try to open up the file that is written by pandas in STATA: .dta file corrupt: The marker </data> was not found in the file where it was expected to be. This means that the file was written incorrectly or that the file has subsequently become damaged.
From what I can tell in my code, it should be working; the self._update_map("data")
is properly update the location of the tag. Here are the contents of self._map
:
{'stata_data': 0, 'map': 158, 'variable_types': 281, 'varnames': 326, 'sortlist': 1121, 'formats': 1156, 'value_label_names': 1517, 'variable_labels': 2330, 'characteristics': 4291, 'data': 4326, 'strls': 52339, 'value_labels': 52354, 'stata_data_close': 52383, 'end-of-file': 52395}
This end-of-file
entry matches the byte-size of the file (52,395 == 52,395):
-rw------- 1 tallen wrds 52395 Nov 8 08:34 test.dta
Here's the code I've come up with. What am I missing to properly position the end </data>
tag?
from collections.abc import Hashable, Sequence
from datetime import datetime
from pandas import read_sql_query
from pandas._typing import (
CompressionOptions,
DtypeArg,
FilePath,
StorageOptions,
WriteBuffer,
)
from pandas.io.stata import StataWriterUTF8
from sqlalchemy import text
from sqlalchemy.engine.base import Connection
class SQLStataWriter(StataWriterUTF8):
"""
Writes a STATA binary file without using an eager dataframe, avoiding loading the
entire result into memory.
Only supports writing modern STATA 15 (118) and 16 (119) .dta files since they are
the only versions to support UTF-8.
"""
def __init__(
self,
fname: FilePath | WriteBuffer[bytes],
sql: str = None,
engine: Connection = None,
chunksize: int = 10000,
dtype: DtypeArg | None = ...,
convert_dates: dict[Hashable, str] | None = None,
write_index: bool = True,
byteorder: str | None = None,
time_stamp: datetime | None = None,
data_label: str | None = None,
variable_labels: dict[Hashable, str] | None = None,
convert_strl: Sequence[Hashable] | None = None,
version: int | None = None,
compression: CompressionOptions = "infer",
storage_options: StorageOptions | None = None,
*,
value_labels: dict[Hashable, dict[float, str]] | None = None,
) -> None:
# Bind the additional variables to self
self.sql = text(sql)
self.engine = engine
self.chunksize = chunksize
self.dtype = dtype
# Create the dataframe for init by pulling the first row only
for data in read_sql_query(
self.sql,
self.engine,
chunksize=1,
dtype=self.dtype,
):
break
super().__init__(
fname,
data,
convert_dates,
write_index,
byteorder=byteorder,
time_stamp=time_stamp,
data_label=data_label,
variable_labels=variable_labels,
convert_strl=convert_strl,
version=version,
compression=compression,
storage_options=storage_options,
value_labels=value_labels,
)
def _prepare_data(self):
"""
This will be called within _write_data in the loop instead.
"""
return None
def _write_data(self, records) -> None:
"""
Override this to loop over records in chunksize.
"""
self._update_map("data")
self._write_bytes(b"<data>")
# Instead of eagerly loading the entire dataframe, do it in chunks.
for self.data in read_sql_query(
self.sql,
self.engine,
chunksize=self.chunksize,
dtype=self.dtype,
):
# Insert an index column or values expected will be off by one. Be
# sure to use len(self.data) in case the last chunk is fewer rows
# than the chunksize.
self.data.insert(0, "index", list(range(0, len(self.data))))
# Call the parent function to prepare rows for this chunk only.
records = super()._prepare_data()
# Write the records to the file
self._write_bytes(records.tobytes())
self._write_bytes(b"</data>")