-
Notifications
You must be signed in to change notification settings - Fork 553
Serialize NamedDataStoreOutput into PTD. #9125
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,13 +6,14 @@ | |
|
||
# pyre-strict | ||
|
||
from typing import Dict, Optional, Tuple | ||
from typing import Dict, Optional, Set, Tuple | ||
|
||
from executorch.exir._serialize import _serialize_pte_binary | ||
|
||
from executorch.exir._serialize._cord import Cord | ||
from executorch.exir._serialize._named_data_store import NamedDataStoreOutput | ||
from executorch.exir._serialize.data_serializer import ( | ||
DataEntry, | ||
DataPayload, | ||
DataSerializer, | ||
TensorEntry, | ||
|
@@ -74,39 +75,59 @@ def serialize_for_executorch( | |
tensor.extra_tensor_info.fully_qualified_name | ||
] = TensorLayout(tensor.scalar_type, tensor.sizes, tensor.dim_order) | ||
|
||
if len(fqn_to_tensor_layout) == 0 and ( | ||
named_data is None or len(named_data.external_data) == 0 | ||
): | ||
return pte, ptd_files | ||
|
||
# Consolidate tensors and opaque data with the same external tag so they | ||
# can be saved to the same PTD. | ||
all_external_tags: Set[str] = set() | ||
if named_data is not None and len(named_data.external_data) > 0: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. similar question here, can named_data be not none but the length of the data is > 0 or vice versa? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. named_data stores blobs that are either saved in the PTE file or saved externally. |
||
assert ( | ||
len(named_data.buffers) > 0 | ||
), "External data exists, but there are no buffers provided." | ||
all_external_tags = set(named_data.external_data.keys()) | ||
|
||
if len(fqn_to_tensor_layout) > 0: | ||
# emitter_output.external_constant_map contains the mapping from | ||
# {file: {fqn: index into external_constant_buffer}} | ||
# Contains the locations of the tensor buffers, and must be non-empty | ||
# if there are external tensors to serialize. | ||
assert emitter_output.external_constant_map is not None | ||
for ( | ||
filename, | ||
fqn_to_index, | ||
) in ( | ||
# pyre-ignore Undefined attribute [16]: Optional type has no attribute `items`. | ||
emitter_output.external_constant_map.items() | ||
): | ||
# Create a TensorEntry for each external tensor. | ||
fqn_to_tensor_entry: Dict[str, TensorEntry] = {} | ||
for fqn, index in fqn_to_index.items(): | ||
assert fqn in fqn_to_tensor_layout | ||
fqn_to_tensor_entry[fqn] = TensorEntry( | ||
buffer_index=index, | ||
layout=fqn_to_tensor_layout[fqn], | ||
) | ||
|
||
ptd_files[filename] = data_serializer.serialize( | ||
DataPayload( | ||
buffers=emitter_output.external_constant_buffer, | ||
fqn_to_tensor=fqn_to_tensor_entry, | ||
) | ||
assert ( | ||
emitter_output.external_constant_map is not None | ||
), "External exists, but there are no buffers provided." | ||
all_external_tags = all_external_tags | set( | ||
emitter_output.external_constant_map.keys() | ||
) | ||
|
||
for tag in all_external_tags: | ||
fqn_to_tensor_entry: Dict[str, TensorEntry] = {} | ||
# pyre-ignore[16]: Undefined attribute: `Optional` has no attribute `get`. | ||
fqn_to_index = emitter_output.external_constant_map.get(tag, {}) | ||
# Create a TensorEntry for each external tensor. | ||
for fqn, index in fqn_to_index.items(): | ||
assert fqn in fqn_to_tensor_layout | ||
fqn_to_tensor_entry[fqn] = TensorEntry( | ||
buffer_index=index, | ||
layout=fqn_to_tensor_layout[fqn], | ||
) | ||
|
||
if named_data is None or len(named_data.external_data) == 0: | ||
return pte, ptd_files | ||
# Extract external data. | ||
key_to_data: Dict[str, DataEntry] = {} | ||
# pyre-ignore[16]: Undefined attribute: `Optional` has no attribute `get`. | ||
key_to_buffer_index = named_data.external_data.get(tag, {}) | ||
for key, index in key_to_buffer_index.items(): | ||
# pyre-ignore[16]: Undefined attribute: `Optional` has no attribute `buffers`. | ||
key_to_data[key] = DataEntry(index, named_data.buffers[index].alignment) | ||
|
||
if len(named_data.buffers) == 0: | ||
raise RuntimeError("External data exists, but there are no buffers provided.") | ||
# Serialize into PTD file. | ||
ptd_files[tag] = data_serializer.serialize( | ||
DataPayload( | ||
buffers=emitter_output.external_constant_buffer, | ||
fqn_to_tensor=fqn_to_tensor_entry, | ||
key_to_data=key_to_data, | ||
) | ||
) | ||
|
||
return pte, ptd_files |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,6 +7,7 @@ | |
# pyre-strict | ||
|
||
import json | ||
import math | ||
import os | ||
import tempfile | ||
from dataclasses import dataclass | ||
|
@@ -19,6 +20,7 @@ | |
from executorch.exir._serialize._flatbuffer import _flatc_compile, _flatc_decompile | ||
from executorch.exir._serialize._program import _insert_flatbuffer_header | ||
from executorch.exir._serialize.data_serializer import ( | ||
DataEntry, | ||
DataPayload, | ||
DataSerializer, | ||
TensorEntry, | ||
|
@@ -29,6 +31,7 @@ | |
from executorch.extension.flat_tensor.serialize.flat_tensor_schema import ( | ||
DataSegment, | ||
FlatTensor, | ||
NamedData, | ||
TensorMetadata, | ||
) | ||
|
||
|
@@ -202,6 +205,24 @@ def to_bytes(self) -> bytes: | |
return data | ||
|
||
|
||
@dataclass | ||
class AlignedData: | ||
""" | ||
Holds data that should be aligned, for serialization. | ||
|
||
Attributes: | ||
data: The data to serialize, as a cord. | ||
alignment: The alignment required for the data. | ||
""" | ||
|
||
data: Cord | ||
alignment: int | ||
|
||
def __init__(self, data: Cord, alignment: Optional[int] = None) -> None: | ||
self.data = data | ||
self.alignment = alignment or 1 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. TIL you can do this |
||
|
||
|
||
def _get_extended_header(flat_tensor_data: bytes) -> Optional[FlatTensorHeader]: | ||
"""Returns the extended header of the flat_tensor data, if present and valid.""" | ||
try: | ||
|
@@ -216,7 +237,7 @@ def _get_extended_header(flat_tensor_data: bytes) -> Optional[FlatTensorHeader]: | |
def _extract_tensors( | ||
fqn_to_tensor: Dict[str, TensorEntry], | ||
buffers: Sequence[bytes], | ||
segments: List[Cord], | ||
segments: List[AlignedData], | ||
tensor_alignment: int, | ||
) -> List[TensorMetadata]: | ||
"""Places tensors into a single segment, aligned to tensor_alignment within | ||
|
@@ -265,10 +286,43 @@ def _extract_tensors( | |
offset=offset, | ||
) | ||
) | ||
segments.append(tensor_data) | ||
segments.append(AlignedData(tensor_data)) | ||
return tensors | ||
|
||
|
||
def _extract_named_data( | ||
key_to_data: Dict[str, DataEntry], | ||
buffers: Sequence[bytes], | ||
segments: List[AlignedData], | ||
) -> List[NamedData]: | ||
"""Places named data into segments and record the alignment for each. | ||
|
||
Args: | ||
key_to_data: A map from keys to opaque data entries. | ||
buffers: A sequence of buffers holding opaque blob data. | ||
segments: A list of segments to append data to. Modified in-place. | ||
|
||
Returns: | ||
A list of NamedData describing the offsets to the opaque blob data. | ||
""" | ||
|
||
# Map from buffer_idx to segment_idx. | ||
segment_index_map: Dict[int, int] = {} | ||
|
||
named_data: List[NamedData] = [] | ||
for key, data_entry in key_to_data.items(): | ||
buffer_idx = data_entry.buffer_index | ||
segment_index = segment_index_map.get(buffer_idx, None) | ||
if segment_index is None: | ||
segment_index = len(segments) | ||
segment_index_map[buffer_idx] = segment_index | ||
segments.append( | ||
AlignedData(Cord(buffers[buffer_idx]), data_entry.alignment) | ||
) | ||
named_data.append(NamedData(key=key, segment_index=segment_index)) | ||
return named_data | ||
|
||
|
||
class FlatTensorSerializer(DataSerializer): | ||
"""A concrete implementation of the DataSerializer interface that | ||
serializes and deserializes data to/from the FlatTensor format. | ||
|
@@ -289,35 +343,37 @@ def serialize( | |
) -> Cord: | ||
"""Serializes a list of tensors and named data into a blob.""" | ||
|
||
segments: List[Cord] = [] | ||
segments: List[AlignedData] = [] | ||
tensors = _extract_tensors( | ||
data.fqn_to_tensor, | ||
data.buffers, | ||
segments, | ||
self.config.tensor_alignment, | ||
) | ||
named_data = _extract_named_data(data.key_to_data, data.buffers, segments) | ||
|
||
data_segments: List[DataSegment] = [] | ||
segment_data = Cord() | ||
aggregated_segment_data = Cord() | ||
for segment in segments: | ||
prev_end = ( | ||
(data_segments[-1].offset + data_segments[-1].size) | ||
if data_segments | ||
else 0 | ||
) | ||
alignment = math.lcm(self.config.segment_alignment, segment.alignment) | ||
data_segments.append( | ||
DataSegment( | ||
offset=aligned_size(prev_end, self.config.segment_alignment), | ||
size=len(segment), | ||
offset=aligned_size(prev_end, alignment), | ||
size=len(segment.data), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what else is in the segment now besides data? Alignment? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's the same as the PTE file; segment only contains offset+size. Size does not include alignment. We just have to make sure the data is aligned to the LCM of segment_alignment and any delegate-requested alignment. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. oh i meant you have segment.data. What else is in that object besides the data. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, it's any additional alignment from the delegate. It's passed from NamedDataStore (delegates add blobs+optional alignment to the store) --> |
||
) | ||
) | ||
# Pad segment_data to segment alignment. | ||
# Pad aggregated_segment_data to segment alignment. | ||
segment_pad_length = padding_required( | ||
len(segment_data), self.config.segment_alignment | ||
len(aggregated_segment_data), alignment | ||
) | ||
if segment_pad_length > 0: | ||
segment_data.append(b"\x00" * segment_pad_length) | ||
segment_data.append(segment) | ||
aggregated_segment_data.append(b"\x00" * segment_pad_length) | ||
aggregated_segment_data.append(segment.data) | ||
|
||
# Create FlatTensor, which describes of the contents of the file and | ||
# points to all the data segments. It will be serialized to flatbuffer. | ||
|
@@ -326,7 +382,7 @@ def serialize( | |
tensor_alignment=self.config.tensor_alignment, | ||
tensors=tensors, | ||
segments=data_segments, | ||
named_data=[], | ||
named_data=named_data, | ||
) | ||
|
||
flatbuffer_payload = _serialize_to_flatbuffer(flat_tensor) | ||
|
@@ -351,7 +407,7 @@ def serialize( | |
flatbuffer_offset=padded_header_length, | ||
flatbuffer_size=len(flatbuffer_payload), | ||
segment_base_offset=segment_base_offset, | ||
segment_data_size=len(segment_data), | ||
segment_data_size=len(aggregated_segment_data), | ||
).to_bytes() | ||
|
||
# Pad header and payload to segment alignment. | ||
|
@@ -371,15 +427,15 @@ def serialize( | |
assert eh.flatbuffer_size == original_flatbuffer_payload_size | ||
assert eh.segment_base_offset == segment_base_offset | ||
assert eh.flatbuffer_offset == padded_header_length | ||
assert eh.segment_data_size == len(segment_data) | ||
assert eh.segment_data_size == len(aggregated_segment_data) | ||
|
||
del header_data | ||
del flatbuffer_payload | ||
|
||
# Place everything into one segment. | ||
payload = Cord() | ||
payload.append(injected_flatbuffer_data) | ||
payload.append(segment_data) | ||
payload.append(aggregated_segment_data) | ||
|
||
return payload | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it possible for only one of these to be true?
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes:
fqn_to_tensor_layout is for tensors being saved to a separate file,
named_data.external_data is for named blobs being saved to a separate file
For training, named_data.external_data is empty.
For delegate weight sharing, fqn_to_tensor_layout is empty.