Skip to content

Serialize NamedDataStoreOutput into PTD. #9125

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Mar 29, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 48 additions & 27 deletions exir/_serialize/_serialize.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@

# pyre-strict

from typing import Dict, Optional, Tuple
from typing import Dict, Optional, Set, Tuple

from executorch.exir._serialize import _serialize_pte_binary

from executorch.exir._serialize._cord import Cord
from executorch.exir._serialize._named_data_store import NamedDataStoreOutput
from executorch.exir._serialize.data_serializer import (
DataEntry,
DataPayload,
DataSerializer,
TensorEntry,
Expand Down Expand Up @@ -74,39 +75,59 @@ def serialize_for_executorch(
tensor.extra_tensor_info.fully_qualified_name
] = TensorLayout(tensor.scalar_type, tensor.sizes, tensor.dim_order)

if len(fqn_to_tensor_layout) == 0 and (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it possible for only one of these to be true?

Copy link
Contributor Author

@lucylq lucylq Mar 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes:

fqn_to_tensor_layout is for tensors being saved to a separate file,
named_data.external_data is for named blobs being saved to a separate file

For training, named_data.external_data is empty.
For delegate weight sharing, fqn_to_tensor_layout is empty.

named_data is None or len(named_data.external_data) == 0
):
return pte, ptd_files

# Consolidate tensors and opaque data with the same external tag so they
# can be saved to the same PTD.
all_external_tags: Set[str] = set()
if named_data is not None and len(named_data.external_data) > 0:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similar question here, can named_data be not none but the length of the data is > 0 or vice versa?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

named_data stores blobs that are either saved in the PTE file or saved externally.
If all named data is marked to be saved in the PTE file, named_data.pte_data is non-empty, and named_data.external_data is empty

assert (
len(named_data.buffers) > 0
), "External data exists, but there are no buffers provided."
all_external_tags = set(named_data.external_data.keys())

if len(fqn_to_tensor_layout) > 0:
# emitter_output.external_constant_map contains the mapping from
# {file: {fqn: index into external_constant_buffer}}
# Contains the locations of the tensor buffers, and must be non-empty
# if there are external tensors to serialize.
assert emitter_output.external_constant_map is not None
for (
filename,
fqn_to_index,
) in (
# pyre-ignore Undefined attribute [16]: Optional type has no attribute `items`.
emitter_output.external_constant_map.items()
):
# Create a TensorEntry for each external tensor.
fqn_to_tensor_entry: Dict[str, TensorEntry] = {}
for fqn, index in fqn_to_index.items():
assert fqn in fqn_to_tensor_layout
fqn_to_tensor_entry[fqn] = TensorEntry(
buffer_index=index,
layout=fqn_to_tensor_layout[fqn],
)

ptd_files[filename] = data_serializer.serialize(
DataPayload(
buffers=emitter_output.external_constant_buffer,
fqn_to_tensor=fqn_to_tensor_entry,
)
assert (
emitter_output.external_constant_map is not None
), "External exists, but there are no buffers provided."
all_external_tags = all_external_tags | set(
emitter_output.external_constant_map.keys()
)

for tag in all_external_tags:
fqn_to_tensor_entry: Dict[str, TensorEntry] = {}
# pyre-ignore[16]: Undefined attribute: `Optional` has no attribute `get`.
fqn_to_index = emitter_output.external_constant_map.get(tag, {})
# Create a TensorEntry for each external tensor.
for fqn, index in fqn_to_index.items():
assert fqn in fqn_to_tensor_layout
fqn_to_tensor_entry[fqn] = TensorEntry(
buffer_index=index,
layout=fqn_to_tensor_layout[fqn],
)

if named_data is None or len(named_data.external_data) == 0:
return pte, ptd_files
# Extract external data.
key_to_data: Dict[str, DataEntry] = {}
# pyre-ignore[16]: Undefined attribute: `Optional` has no attribute `get`.
key_to_buffer_index = named_data.external_data.get(tag, {})
for key, index in key_to_buffer_index.items():
# pyre-ignore[16]: Undefined attribute: `Optional` has no attribute `buffers`.
key_to_data[key] = DataEntry(index, named_data.buffers[index].alignment)

if len(named_data.buffers) == 0:
raise RuntimeError("External data exists, but there are no buffers provided.")
# Serialize into PTD file.
ptd_files[tag] = data_serializer.serialize(
DataPayload(
buffers=emitter_output.external_constant_buffer,
fqn_to_tensor=fqn_to_tensor_entry,
key_to_data=key_to_data,
)
)

return pte, ptd_files
17 changes: 17 additions & 0 deletions exir/_serialize/data_serializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,21 @@ class TensorEntry:
layout: TensorLayout


@dataclass
class DataEntry:
"""Represents a single blob in `DataPayload`, specifying its location
and metadata.

Attributes:
buffer_index: The index inside `DataPayload.buffers` that this
DataEntry refers to.
alignment: The alignment of the data.
"""

buffer_index: int
alignment: int


@dataclass
class DataPayload:
"""Contains the data and metadata required for serialization.
Expand All @@ -49,10 +64,12 @@ class DataPayload:
Attributes:
buffers: a sequence of tensor buffers.
fqn_to_tensor: a map from fully qualified names to serializable tensors.
key_to_data: a map from unique keys to serializable opaque data.
"""

buffers: Sequence[bytes]
fqn_to_tensor: Dict[str, TensorEntry]
key_to_data: Dict[str, DataEntry]


class DataSerializer(ABC):
Expand Down
84 changes: 70 additions & 14 deletions extension/flat_tensor/serialize/serialize.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
# pyre-strict

import json
import math
import os
import tempfile
from dataclasses import dataclass
Expand All @@ -19,6 +20,7 @@
from executorch.exir._serialize._flatbuffer import _flatc_compile, _flatc_decompile
from executorch.exir._serialize._program import _insert_flatbuffer_header
from executorch.exir._serialize.data_serializer import (
DataEntry,
DataPayload,
DataSerializer,
TensorEntry,
Expand All @@ -29,6 +31,7 @@
from executorch.extension.flat_tensor.serialize.flat_tensor_schema import (
DataSegment,
FlatTensor,
NamedData,
TensorMetadata,
)

Expand Down Expand Up @@ -202,6 +205,24 @@ def to_bytes(self) -> bytes:
return data


@dataclass
class AlignedData:
"""
Holds data that should be aligned, for serialization.

Attributes:
data: The data to serialize, as a cord.
alignment: The alignment required for the data.
"""

data: Cord
alignment: int

def __init__(self, data: Cord, alignment: Optional[int] = None) -> None:
self.data = data
self.alignment = alignment or 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TIL you can do this



def _get_extended_header(flat_tensor_data: bytes) -> Optional[FlatTensorHeader]:
"""Returns the extended header of the flat_tensor data, if present and valid."""
try:
Expand All @@ -216,7 +237,7 @@ def _get_extended_header(flat_tensor_data: bytes) -> Optional[FlatTensorHeader]:
def _extract_tensors(
fqn_to_tensor: Dict[str, TensorEntry],
buffers: Sequence[bytes],
segments: List[Cord],
segments: List[AlignedData],
tensor_alignment: int,
) -> List[TensorMetadata]:
"""Places tensors into a single segment, aligned to tensor_alignment within
Expand Down Expand Up @@ -265,10 +286,43 @@ def _extract_tensors(
offset=offset,
)
)
segments.append(tensor_data)
segments.append(AlignedData(tensor_data))
return tensors


def _extract_named_data(
key_to_data: Dict[str, DataEntry],
buffers: Sequence[bytes],
segments: List[AlignedData],
) -> List[NamedData]:
"""Places named data into segments and record the alignment for each.

Args:
key_to_data: A map from keys to opaque data entries.
buffers: A sequence of buffers holding opaque blob data.
segments: A list of segments to append data to. Modified in-place.

Returns:
A list of NamedData describing the offsets to the opaque blob data.
"""

# Map from buffer_idx to segment_idx.
segment_index_map: Dict[int, int] = {}

named_data: List[NamedData] = []
for key, data_entry in key_to_data.items():
buffer_idx = data_entry.buffer_index
segment_index = segment_index_map.get(buffer_idx, None)
if segment_index is None:
segment_index = len(segments)
segment_index_map[buffer_idx] = segment_index
segments.append(
AlignedData(Cord(buffers[buffer_idx]), data_entry.alignment)
)
named_data.append(NamedData(key=key, segment_index=segment_index))
return named_data


class FlatTensorSerializer(DataSerializer):
"""A concrete implementation of the DataSerializer interface that
serializes and deserializes data to/from the FlatTensor format.
Expand All @@ -289,35 +343,37 @@ def serialize(
) -> Cord:
"""Serializes a list of tensors and named data into a blob."""

segments: List[Cord] = []
segments: List[AlignedData] = []
tensors = _extract_tensors(
data.fqn_to_tensor,
data.buffers,
segments,
self.config.tensor_alignment,
)
named_data = _extract_named_data(data.key_to_data, data.buffers, segments)

data_segments: List[DataSegment] = []
segment_data = Cord()
aggregated_segment_data = Cord()
for segment in segments:
prev_end = (
(data_segments[-1].offset + data_segments[-1].size)
if data_segments
else 0
)
alignment = math.lcm(self.config.segment_alignment, segment.alignment)
data_segments.append(
DataSegment(
offset=aligned_size(prev_end, self.config.segment_alignment),
size=len(segment),
offset=aligned_size(prev_end, alignment),
size=len(segment.data),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what else is in the segment now besides data? Alignment?

Copy link
Contributor Author

@lucylq lucylq Mar 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's the same as the PTE file; segment only contains offset+size. Size does not include alignment. We just have to make sure the data is aligned to the LCM of segment_alignment and any delegate-requested alignment.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh i meant you have segment.data. What else is in that object besides the data.

Copy link
Contributor Author

@lucylq lucylq Mar 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, it's any additional alignment from the delegate. It's passed from

NamedDataStore (delegates add blobs+optional alignment to the store) -->
DataEntry (buffer index + alignment) -->
here

)
)
# Pad segment_data to segment alignment.
# Pad aggregated_segment_data to segment alignment.
segment_pad_length = padding_required(
len(segment_data), self.config.segment_alignment
len(aggregated_segment_data), alignment
)
if segment_pad_length > 0:
segment_data.append(b"\x00" * segment_pad_length)
segment_data.append(segment)
aggregated_segment_data.append(b"\x00" * segment_pad_length)
aggregated_segment_data.append(segment.data)

# Create FlatTensor, which describes of the contents of the file and
# points to all the data segments. It will be serialized to flatbuffer.
Expand All @@ -326,7 +382,7 @@ def serialize(
tensor_alignment=self.config.tensor_alignment,
tensors=tensors,
segments=data_segments,
named_data=[],
named_data=named_data,
)

flatbuffer_payload = _serialize_to_flatbuffer(flat_tensor)
Expand All @@ -351,7 +407,7 @@ def serialize(
flatbuffer_offset=padded_header_length,
flatbuffer_size=len(flatbuffer_payload),
segment_base_offset=segment_base_offset,
segment_data_size=len(segment_data),
segment_data_size=len(aggregated_segment_data),
).to_bytes()

# Pad header and payload to segment alignment.
Expand All @@ -371,15 +427,15 @@ def serialize(
assert eh.flatbuffer_size == original_flatbuffer_payload_size
assert eh.segment_base_offset == segment_base_offset
assert eh.flatbuffer_offset == padded_header_length
assert eh.segment_data_size == len(segment_data)
assert eh.segment_data_size == len(aggregated_segment_data)

del header_data
del flatbuffer_payload

# Place everything into one segment.
payload = Cord()
payload.append(injected_flatbuffer_data)
payload.append(segment_data)
payload.append(aggregated_segment_data)

return payload

Expand Down
Loading
Loading