Skip to content

Commit 8cdccca

Browse files
committed
feat(dynamo-run): Basic routing choice
As a first step towards KV routing: - introduce a `--router-mode` in dynamo-run that only does random and round-robin right now. Not that interesting yet. - Make the vllm engine publish the KV events received from our patched vllm. Now we "just" need to connect the two. Easy right?
1 parent 31ca862 commit 8cdccca

File tree

18 files changed

+265
-103
lines changed

18 files changed

+265
-103
lines changed

launch/dynamo-run/src/flags.rs

+36
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@ use std::collections::HashMap;
1717
use std::path::PathBuf;
1818
use std::str::FromStr;
1919

20+
use clap::ValueEnum;
21+
use dynamo_runtime::component::RouterMode as RuntimeRouterMode;
22+
2023
/// Required options depend on the in and out choices
2124
#[derive(clap::Parser, Debug, Clone)]
2225
#[command(version, about, long_about = None)]
@@ -92,6 +95,13 @@ pub struct Flags {
9295
#[arg(long)]
9396
pub leader_addr: Option<String>,
9497

98+
/// If using `out=dyn://..` with multiple backends, this says how to route the requests.
99+
///
100+
/// Mostly interesting for KV-aware routing.
101+
/// Defaults to RouterMode::Random
102+
#[arg(long, default_value = "random")]
103+
pub router_mode: RouterMode,
104+
95105
/// Internal use only.
96106
// Start the python vllm engine sub-process.
97107
#[arg(long, hide = true, default_value = "false")]
@@ -198,3 +208,29 @@ fn parse_sglang_flags(s: &str) -> Result<SgLangFlags, String> {
198208
gpu_id: nums[2],
199209
})
200210
}
211+
212+
#[derive(Default, PartialEq, Eq, ValueEnum, Clone, Debug)]
213+
pub enum RouterMode {
214+
#[default]
215+
Random,
216+
#[value(name = "round-robin")]
217+
RoundRobin,
218+
#[value(name = "kv")]
219+
KV,
220+
}
221+
222+
impl RouterMode {
223+
pub fn is_kv_routing(&self) -> bool {
224+
*self == RouterMode::KV
225+
}
226+
}
227+
228+
impl From<RouterMode> for RuntimeRouterMode {
229+
fn from(r: RouterMode) -> RuntimeRouterMode {
230+
match r {
231+
RouterMode::RoundRobin => RuntimeRouterMode::RoundRobin,
232+
RouterMode::KV => todo!("KV not implemented yet"),
233+
_ => RuntimeRouterMode::Random,
234+
}
235+
}
236+
}

launch/dynamo-run/src/input/batch.rs

+4-3
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ use std::time::{Duration, Instant};
3131
use tokio::io::{AsyncBufReadExt, AsyncWriteExt};
3232

3333
use crate::input::common;
34-
use crate::EngineConfig;
34+
use crate::{EngineConfig, Flags};
3535

3636
/// Max tokens in each response.
3737
/// TODO: For batch mode this should be the full context size of the model
@@ -64,11 +64,12 @@ struct Entry {
6464

6565
pub async fn run(
6666
runtime: Runtime,
67-
cancel_token: CancellationToken,
67+
flags: Flags,
6868
maybe_card: Option<ModelDeploymentCard>,
6969
input_jsonl: PathBuf,
7070
engine_config: EngineConfig,
7171
) -> anyhow::Result<()> {
72+
let cancel_token = runtime.primary_token();
7273
// Check if the path exists and is a directory
7374
if !input_jsonl.exists() || !input_jsonl.is_file() {
7475
anyhow::bail!(
@@ -78,7 +79,7 @@ pub async fn run(
7879
}
7980

8081
let (service_name, engine, _inspect_template) =
81-
common::prepare_engine(runtime.clone(), engine_config).await?;
82+
common::prepare_engine(runtime, flags, engine_config).await?;
8283
let service_name_ref = Arc::new(service_name);
8384

8485
let pre_processor = if let Some(card) = maybe_card {

launch/dynamo-run/src/input/common.rs

+16-8
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
// See the License for the specific language governing permissions and
1414
// limitations under the License.
1515

16-
use crate::EngineConfig;
16+
use crate::{flags::RouterMode, EngineConfig, Flags};
1717
use dynamo_llm::{
1818
backend::Backend,
1919
preprocessor::OpenAIPreprocessor,
@@ -34,21 +34,29 @@ use std::sync::Arc;
3434
/// Turns an EngineConfig into an OpenAIChatCompletionsStreamingEngine.
3535
pub async fn prepare_engine(
3636
runtime: Runtime,
37+
flags: Flags,
3738
engine_config: EngineConfig,
3839
) -> anyhow::Result<(String, OpenAIChatCompletionsStreamingEngine, bool)> {
3940
match engine_config {
4041
EngineConfig::Dynamic(endpoint_id) => {
4142
let distributed_runtime = DistributedRuntime::from_settings(runtime.clone()).await?;
4243

4344
let endpoint = distributed_runtime
44-
.namespace(endpoint_id.namespace)?
45-
.component(endpoint_id.component)?
46-
.endpoint(endpoint_id.name);
45+
.namespace(endpoint_id.namespace.clone())?
46+
.component(endpoint_id.component.clone())?
47+
.endpoint(endpoint_id.name.clone());
4748

48-
let client = endpoint.client::<NvCreateChatCompletionRequest, Annotated<NvCreateChatCompletionStreamResponse>>().await?;
49-
tracing::info!("Waiting for remote model..");
50-
client.wait_for_endpoints().await?;
51-
tracing::info!("Model discovered");
49+
let mut client = endpoint.client::<NvCreateChatCompletionRequest, Annotated<NvCreateChatCompletionStreamResponse>>().await?;
50+
51+
match &flags.router_mode {
52+
RouterMode::Random | RouterMode::RoundRobin => {
53+
client.set_router_mode(flags.router_mode.into());
54+
tracing::info!("Waiting for remote model..");
55+
client.wait_for_endpoints().await?;
56+
tracing::info!("Model discovered");
57+
}
58+
RouterMode::KV => todo!(),
59+
}
5260

5361
// The service_name isn't used for text chat outside of logs,
5462
// so use the path. That avoids having to listen on etcd for model registration.

launch/dynamo-run/src/input/endpoint.rs

+8-7
Original file line numberDiff line numberDiff line change
@@ -28,21 +28,22 @@ use dynamo_llm::{
2828
use dynamo_runtime::pipeline::{
2929
network::Ingress, ManyOut, Operator, SegmentSource, ServiceBackend, SingleIn, Source,
3030
};
31-
use dynamo_runtime::{protocols::Endpoint, DistributedRuntime, Runtime};
31+
use dynamo_runtime::{protocols::Endpoint, DistributedRuntime};
3232

3333
use crate::EngineConfig;
3434

3535
pub async fn run(
36-
runtime: Runtime,
36+
distributed_runtime: DistributedRuntime,
3737
path: String,
3838
engine_config: EngineConfig,
3939
) -> anyhow::Result<()> {
4040
// This will attempt to connect to NATS and etcd
41-
let distributed = DistributedRuntime::from_settings(runtime.clone()).await?;
4241

43-
let cancel_token = runtime.primary_token().clone();
42+
let cancel_token = distributed_runtime.primary_token().clone();
4443
let endpoint_id: Endpoint = path.parse()?;
4544

45+
let etcd_client = distributed_runtime.etcd_client();
46+
4647
let (ingress, service_name) = match engine_config {
4748
EngineConfig::StaticFull {
4849
service_name,
@@ -85,7 +86,7 @@ pub async fn run(
8586
model_type: ModelType::Chat,
8687
};
8788

88-
let component = distributed
89+
let component = distributed_runtime
8990
.namespace(endpoint_id.namespace)?
9091
.component(endpoint_id.component)?;
9192
let endpoint = component
@@ -94,8 +95,8 @@ pub async fn run(
9495
.await?
9596
.endpoint(endpoint_id.name);
9697

97-
if let Some(etcd_client) = distributed.etcd_client() {
98-
let network_name = endpoint.subject();
98+
if let Some(etcd_client) = etcd_client {
99+
let network_name = endpoint.subject_to(etcd_client.lease_id());
99100
tracing::debug!("Registering with etcd as {network_name}");
100101
etcd_client
101102
.kv_create(

launch/dynamo-run/src/input/http.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -32,16 +32,16 @@ use dynamo_runtime::{
3232
DistributedRuntime, Runtime,
3333
};
3434

35-
use crate::EngineConfig;
35+
use crate::{EngineConfig, Flags};
3636

3737
/// Build and run an HTTP service
3838
pub async fn run(
3939
runtime: Runtime,
40-
http_port: u16,
40+
flags: Flags,
4141
engine_config: EngineConfig,
4242
) -> anyhow::Result<()> {
4343
let http_service = service_v2::HttpService::builder()
44-
.port(http_port)
44+
.port(flags.http_port)
4545
.enable_chat_endpoints(true)
4646
.enable_cmpl_endpoints(true)
4747
.build()?;

launch/dynamo-run/src/input/text.rs

+4-3
Original file line numberDiff line numberDiff line change
@@ -21,23 +21,24 @@ use futures::StreamExt;
2121
use std::io::{ErrorKind, Write};
2222

2323
use crate::input::common;
24-
use crate::EngineConfig;
24+
use crate::{EngineConfig, Flags};
2525

2626
/// Max response tokens for each single query. Must be less than model context size.
2727
/// TODO: Cmd line flag to overwrite this
2828
const MAX_TOKENS: u32 = 8192;
2929

3030
pub async fn run(
3131
runtime: Runtime,
32-
cancel_token: CancellationToken,
32+
flags: Flags,
3333
single_prompt: Option<String>,
3434
engine_config: EngineConfig,
3535
) -> anyhow::Result<()> {
36+
let cancel_token = runtime.primary_token();
3637
let (service_name, engine, inspect_template): (
3738
String,
3839
OpenAIChatCompletionsStreamingEngine,
3940
bool,
40-
) = common::prepare_engine(runtime.clone(), engine_config).await?;
41+
) = common::prepare_engine(runtime, flags, engine_config).await?;
4142
main_loop(
4243
cancel_token,
4344
&service_name,

0 commit comments

Comments
 (0)