Skip to content

[AQUA Telemetry] Update MD Tracking #1193

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion ads/aqua/modeldeployment/constants.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#!/usr/bin/env python
# Copyright (c) 2024 Oracle and/or its affiliates.
# Copyright (c) 2024, 2025 Oracle and/or its affiliates.
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/

"""
Expand All @@ -8,3 +8,6 @@

This module contains constants used in Aqua Model Deployment.
"""

DEFAULT_WAIT_TIME = 12000
DEFAULT_POLL_INTERVAL = 10
77 changes: 76 additions & 1 deletion ads/aqua/modeldeployment/deployment.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
extract_base_model_from_ft,
extract_fine_tune_artifacts_path,
)
from ads.aqua.modeldeployment.constants import DEFAULT_POLL_INTERVAL, DEFAULT_WAIT_TIME
from ads.aqua.modeldeployment.entities import (
AquaDeployment,
AquaDeploymentConfig,
Expand All @@ -65,12 +66,16 @@
ModelDeploymentConfigSummary,
)
from ads.aqua.modeldeployment.utils import MultiModelDeploymentConfigLoader
from ads.common.decorator.threaded import thread_pool
from ads.common.object_storage_details import ObjectStorageDetails
from ads.common.utils import UNKNOWN, get_log_links
from ads.common.work_request import DataScienceWorkRequest
from ads.config import (
AQUA_DEPLOYMENT_CONTAINER_CMD_VAR_METADATA_NAME,
AQUA_DEPLOYMENT_CONTAINER_METADATA_NAME,
AQUA_DEPLOYMENT_CONTAINER_URI_METADATA_NAME,
AQUA_TELEMETRY_BUCKET,
AQUA_TELEMETRY_BUCKET_NS,
COMPARTMENT_OCID,
PROJECT_OCID,
)
Expand Down Expand Up @@ -508,6 +513,9 @@ def _create(
if key not in env_var:
env_var.update(env)

env_var.update({"AQUA_TELEMETRY_BUCKET_NS": AQUA_TELEMETRY_BUCKET_NS})
env_var.update({"AQUA_TELEMETRY_BUCKET": AQUA_TELEMETRY_BUCKET})

logger.info(f"Env vars used for deploying {aqua_model.id} :{env_var}")

tags = {**tags, **(create_deployment_details.freeform_tags or {})}
Expand Down Expand Up @@ -788,7 +796,15 @@ def _create_deployment(

deployment_id = deployment.id
logger.info(
f"Aqua model deployment {deployment_id} created for model {aqua_model_id}."
f"Aqua model deployment {deployment_id} created for model {aqua_model_id}. Work request Id is {deployment.dsc_model_deployment.workflow_req_id}"
)

thread_pool.submit(
self.get_deployment_status,
deployment_id,
deployment.dsc_model_deployment.workflow_req_id,
model_type,
model_name,
)

# we arbitrarily choose last 8 characters of OCID to identify MD in telemetry
Expand Down Expand Up @@ -1313,3 +1329,62 @@ def list_shapes(self, **kwargs) -> List[ComputeShapeSummary]:
)
for oci_shape in oci_shapes
]

def get_deployment_status(
self,
model_deployment_id: str,
work_request_id: str,
model_type: str,
model_name: str,
) -> None:
"""Waits for the data science model deployment to be completed and log its status in telemetry.

Parameters
----------

model_deployment_id: str
The id of the deployed aqua model.
work_request_id: str
The work request Id of the model deployment.
model_type: str
The type of aqua model to be deployed. Allowed values are: `custom`, `service` and `multi_model`.

Returns
-------
AquaDeployment
An Aqua deployment instance.
"""
ocid = get_ocid_substring(model_deployment_id, key_len=8)
telemetry_kwargs = {"ocid": ocid}

data_science_work_request: DataScienceWorkRequest = DataScienceWorkRequest(
work_request_id
)

try:
data_science_work_request.wait_work_request(
progress_bar_description="Creating model deployment",
max_wait_time=DEFAULT_WAIT_TIME,
poll_interval=DEFAULT_POLL_INTERVAL,
)
except Exception:
if data_science_work_request._error_message:
error_str = ""
for error in data_science_work_request._error_message:
error_str = error_str + " " + error.message

self.telemetry.record_event(
category=f"aqua/{model_type}/deployment/status",
action="FAILED",
detail=error_str,
value=model_name,
**telemetry_kwargs,
)

else:
self.telemetry.record_event_async(
category=f"aqua/{model_type}/deployment/status",
action="SUCCEEDED",
value=model_name,
**telemetry_kwargs,
)
77 changes: 39 additions & 38 deletions ads/common/work_request.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
#!/usr/bin/env python
# -*- coding: utf-8; -*-

# Copyright (c) 2024 Oracle and/or its affiliates.
# Copyright (c) 2024, 2025 Oracle and/or its affiliates.
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/

import logging
Expand All @@ -12,6 +10,7 @@
import oci
from oci import Signer
from tqdm.auto import tqdm

from ads.common.oci_datascience import OCIDataScienceMixin

logger = logging.getLogger(__name__)
Expand All @@ -20,10 +19,10 @@
DEFAULT_WAIT_TIME = 1200
DEFAULT_POLL_INTERVAL = 10
WORK_REQUEST_PERCENTAGE = 100
# default tqdm progress bar format:
# default tqdm progress bar format:
# {l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, ' '{rate_fmt}{postfix}]
# customize the bar format to remove the {n_fmt}/{total_fmt} from the right side
DEFAULT_BAR_FORMAT = '{l_bar}{bar}| [{elapsed}<{remaining}, ' '{rate_fmt}{postfix}]'
DEFAULT_BAR_FORMAT = "{l_bar}{bar}| [{elapsed}<{remaining}, " "{rate_fmt}{postfix}]"


class DataScienceWorkRequest(OCIDataScienceMixin):
Expand All @@ -32,13 +31,13 @@ class DataScienceWorkRequest(OCIDataScienceMixin):
"""

def __init__(
self,
id: str,
self,
id: str,
description: str = "Processing",
config: dict = None,
signer: Signer = None,
client_kwargs: dict = None,
**kwargs
config: dict = None,
signer: Signer = None,
client_kwargs: dict = None,
**kwargs,
) -> None:
"""Initializes ADSWorkRequest object.

Expand All @@ -49,41 +48,43 @@ def __init__(
description: str
Progress bar initial step description (Defaults to `Processing`).
config : dict, optional
OCI API key config dictionary to initialize
OCI API key config dictionary to initialize
oci.data_science.DataScienceClient (Defaults to None).
signer : oci.signer.Signer, optional
OCI authentication signer to initialize
OCI authentication signer to initialize
oci.data_science.DataScienceClient (Defaults to None).
client_kwargs : dict, optional
Additional client keyword arguments to initialize
Additional client keyword arguments to initialize
oci.data_science.DataScienceClient (Defaults to None).
kwargs:
Additional keyword arguments to initialize
Additional keyword arguments to initialize
oci.data_science.DataScienceClient.
"""
self.id = id
self._description = description
self._percentage = 0
self._status = None
self._error_message = ""
super().__init__(config, signer, client_kwargs, **kwargs)


def _sync(self):
"""Fetches the latest work request information to ADSWorkRequest object."""
work_request = self.client.get_work_request(self.id).data
work_request_logs = self.client.list_work_request_logs(
self.id
).data
work_request_logs = self.client.list_work_request_logs(self.id).data

self._percentage= work_request.percent_complete
self._percentage = work_request.percent_complete
self._status = work_request.status
self._description = work_request_logs[-1].message if work_request_logs else "Processing"
self._description = (
work_request_logs[-1].message if work_request_logs else "Processing"
)
if work_request.status == "FAILED":
self._error_message = self.client.list_work_request_errors(self.id).data

def watch(
self,
self,
progress_callback: Callable,
max_wait_time: int=DEFAULT_WAIT_TIME,
poll_interval: int=DEFAULT_POLL_INTERVAL,
max_wait_time: int = DEFAULT_WAIT_TIME,
poll_interval: int = DEFAULT_POLL_INTERVAL,
):
"""Updates the progress bar with realtime message and percentage until the process is completed.

Expand All @@ -92,10 +93,10 @@ def watch(
progress_callback: Callable
Progress bar callback function.
It must accept `(percent_change, description)` where `percent_change` is the
work request percent complete and `description` is the latest work request log message.
work request percent complete and `description` is the latest work request log message.
max_wait_time: int
Maximum amount of time to wait in seconds (Defaults to 1200).
Negative implies infinite wait time.
Negative implies infinite wait time.
poll_interval: int
Poll interval in seconds (Defaults to 10).

Expand All @@ -107,7 +108,6 @@ def watch(

start_time = time.time()
while self._percentage < 100:

seconds_since = time.time() - start_time
if max_wait_time > 0 and seconds_since >= max_wait_time:
logger.error(f"Exceeded max wait time of {max_wait_time} seconds.")
Expand All @@ -124,12 +124,14 @@ def watch(
percent_change = self._percentage - previous_percent_complete
previous_percent_complete = self._percentage
progress_callback(
percent_change=percent_change,
description=self._description
percent_change=percent_change, description=self._description
)

if self._status in WORK_REQUEST_STOP_STATE:
if self._status != oci.work_requests.models.WorkRequest.STATUS_SUCCEEDED:
if (
self._status
!= oci.work_requests.models.WorkRequest.STATUS_SUCCEEDED
):
if self._description:
raise Exception(self._description)
else:
Expand All @@ -145,12 +147,12 @@ def watch(

def wait_work_request(
self,
progress_bar_description: str="Processing",
max_wait_time: int=DEFAULT_WAIT_TIME,
poll_interval: int=DEFAULT_POLL_INTERVAL
progress_bar_description: str = "Processing",
max_wait_time: int = DEFAULT_WAIT_TIME,
poll_interval: int = DEFAULT_POLL_INTERVAL,
):
"""Waits for the work request progress bar to be completed.

Parameters
----------
progress_bar_description: str
Expand All @@ -160,7 +162,7 @@ def wait_work_request(
Negative implies infinite wait time.
poll_interval: int
Poll interval in seconds (Defaults to 10).

Returns
-------
None
Expand All @@ -172,7 +174,7 @@ def wait_work_request(
mininterval=0,
file=sys.stdout,
desc=progress_bar_description,
bar_format=DEFAULT_BAR_FORMAT
bar_format=DEFAULT_BAR_FORMAT,
) as pbar:

def progress_callback(percent_change, description):
Expand All @@ -184,6 +186,5 @@ def progress_callback(percent_change, description):
self.watch(
progress_callback=progress_callback,
max_wait_time=max_wait_time,
poll_interval=poll_interval
poll_interval=poll_interval,
)

17 changes: 6 additions & 11 deletions ads/model/service/oci_datascience_model_deployment.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,9 @@ def activate(
self.id,
)


self.workflow_req_id = response.headers.get("opc-work-request-id", None)
if wait_for_completion:
self.workflow_req_id = response.headers.get("opc-work-request-id", None)

try:
DataScienceWorkRequest(self.workflow_req_id).wait_work_request(
progress_bar_description="Activating model deployment",
Expand Down Expand Up @@ -233,11 +233,9 @@ def create(
response = self.client.create_model_deployment(create_model_deployment_details)
self.update_from_oci_model(response.data)
logger.info(f"Creating model deployment `{self.id}`.")
print(f"Model Deployment OCID: {self.id}")

self.workflow_req_id = response.headers.get("opc-work-request-id", None)
if wait_for_completion:
self.workflow_req_id = response.headers.get("opc-work-request-id", None)

try:
DataScienceWorkRequest(self.workflow_req_id).wait_work_request(
progress_bar_description="Creating model deployment",
Expand Down Expand Up @@ -287,10 +285,8 @@ def deactivate(
response = self.client.deactivate_model_deployment(
self.id,
)

self.workflow_req_id = response.headers.get("opc-work-request-id", None)
if wait_for_completion:
self.workflow_req_id = response.headers.get("opc-work-request-id", None)

try:
DataScienceWorkRequest(self.workflow_req_id).wait_work_request(
progress_bar_description="Deactivating model deployment",
Expand Down Expand Up @@ -355,10 +351,9 @@ def delete(
response = self.client.delete_model_deployment(
self.id,
)


self.workflow_req_id = response.headers.get("opc-work-request-id", None)
if wait_for_completion:
self.workflow_req_id = response.headers.get("opc-work-request-id", None)

try:
DataScienceWorkRequest(self.workflow_req_id).wait_work_request(
progress_bar_description="Deleting model deployment",
Expand Down
Loading
Loading