Skip to content

Commit b6137e5

Browse files
committed
feat - database schema & enqueue/dequeue logic
1 parent fdb06ed commit b6137e5

File tree

4 files changed

+1064
-8
lines changed

4 files changed

+1064
-8
lines changed

database/src/lib.rs

Lines changed: 264 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,9 @@ use chrono::{DateTime, Utc};
33
use hashbrown::HashMap;
44
use intern::intern;
55
use serde::{Deserialize, Serialize};
6-
use std::fmt;
6+
use std::fmt::{self, Display, Formatter};
77
use std::hash;
8-
use std::ops::{Add, Sub};
8+
use std::ops::{Add, Deref, Sub};
99
use std::sync::Arc;
1010
use std::time::Duration;
1111

@@ -19,6 +19,8 @@ pub use pool::{Connection, Pool};
1919
intern!(pub struct Metric);
2020
intern!(pub struct Benchmark);
2121

22+
type PgParam<'a> = &'a (dyn tokio_postgres::types::ToSql + Sync);
23+
2224
#[derive(Debug, PartialEq, Eq)]
2325
pub struct QueuedCommit {
2426
pub pr: u32,
@@ -152,6 +154,15 @@ impl FromStr for CommitType {
152154
}
153155
}
154156

157+
impl Display for CommitType {
158+
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
159+
match self {
160+
CommitType::Try => f.write_str("try"),
161+
CommitType::Master => f.write_str("master"),
162+
}
163+
}
164+
}
165+
155166
#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
156167
pub struct Commit {
157168
pub sha: String,
@@ -791,3 +802,254 @@ pub struct ArtifactCollection {
791802
pub duration: Duration,
792803
pub end_time: DateTime<Utc>,
793804
}
805+
806+
#[derive(Debug, Clone, Serialize, Deserialize)]
807+
pub enum CommitJobType {
808+
Try(u32),
809+
Master(u32),
810+
Release(String),
811+
}
812+
813+
#[derive(Debug, Clone, Serialize, Deserialize)]
814+
pub struct CommitJobEntity {
815+
pub sha: String,
816+
pub parent_sha: String,
817+
pub commit_time: Date,
818+
pub target: Target,
819+
pub include: Option<String>,
820+
pub exclude: Option<String>,
821+
pub runs: Option<i32>,
822+
pub backends: Option<String>,
823+
pub job_type: CommitJobType,
824+
}
825+
826+
#[derive(Debug, Clone, Serialize, Deserialize)]
827+
pub struct CommitJobInProgress {
828+
pub commit_job: CommitJobEntity,
829+
pub machine_id: String,
830+
pub started_at: Date,
831+
}
832+
833+
#[derive(Debug, Clone, Serialize, Deserialize)]
834+
pub struct CommitJobFinished {
835+
pub commit_job: CommitJobEntity,
836+
pub machine_id: String,
837+
pub started_at: Date,
838+
pub finished_at: Date,
839+
}
840+
841+
#[derive(Debug, Clone, Serialize, Deserialize)]
842+
pub struct CommitJobFailed {
843+
pub commit_job: CommitJobEntity,
844+
pub machine_id: String,
845+
pub started_at: Date,
846+
pub finished_at: Date,
847+
}
848+
849+
#[derive(Debug, Clone, Serialize, Deserialize)]
850+
pub enum CommitJob {
851+
Queued(CommitJobEntity),
852+
InProgress(CommitJobInProgress),
853+
Finished(CommitJobFinished),
854+
Failed(CommitJobFailed),
855+
}
856+
857+
impl CommitJob {
858+
/// Returns `Some(&CommitJobEntity)` only if the job is still queued.
859+
pub fn as_queued(&self) -> Option<&CommitJobEntity> {
860+
match self {
861+
CommitJob::Queued(e) => Some(e),
862+
_ => None,
863+
}
864+
}
865+
866+
/// Returns `Some(&CommitJobInProgress)` while the job is running.
867+
pub fn as_in_progress(&self) -> Option<&CommitJobInProgress> {
868+
match self {
869+
CommitJob::InProgress(ip) => Some(ip),
870+
_ => None,
871+
}
872+
}
873+
874+
/// Returns `Some(&CommitJobFinished)` once the job is done.
875+
pub fn as_finished(&self) -> Option<&CommitJobFinished> {
876+
match self {
877+
CommitJob::Finished(fin) => Some(fin),
878+
_ => None,
879+
}
880+
}
881+
882+
/// Get the status as a string
883+
pub fn status(&self) -> &'static str {
884+
match self {
885+
CommitJob::Queued(_) => "queued",
886+
CommitJob::InProgress(_) => "in_progress",
887+
CommitJob::Finished(_) => "finished",
888+
CommitJob::Failed(_) => "failed",
889+
}
890+
}
891+
892+
/// True when `status == "finished"`.
893+
pub fn is_finished(&self) -> bool {
894+
matches!(self, CommitJob::Finished(_))
895+
}
896+
897+
/// Will compose the column names for the job type
898+
pub fn get_enqueue_column_names(&self) -> Vec<String> {
899+
let mut base_columns = vec![
900+
String::from("sha"),
901+
String::from("parent_sha"),
902+
String::from("commit_type"),
903+
String::from("commit_time"),
904+
String::from("status"),
905+
String::from("target"),
906+
String::from("include"),
907+
String::from("exclude"),
908+
String::from("runs"),
909+
String::from("backends"),
910+
];
911+
912+
/* This is the last column */
913+
match self.job_type {
914+
CommitJobType::Try(_) => base_columns.push("pr".into()),
915+
CommitJobType::Master(_) => base_columns.push("pr".into()),
916+
CommitJobType::Release(_) => base_columns.push("release_tag".into()),
917+
};
918+
919+
return base_columns;
920+
}
921+
}
922+
923+
impl Deref for CommitJob {
924+
type Target = CommitJobEntity;
925+
fn deref(&self) -> &Self::Target {
926+
match self {
927+
CommitJob::Queued(e) => e,
928+
CommitJob::InProgress(ip) => &ip.commit_job,
929+
CommitJob::Finished(fin) => &fin.commit_job,
930+
CommitJob::Failed(fail) => &fail.commit_job,
931+
}
932+
}
933+
}
934+
935+
/// Maps from the database to a Rust struct
936+
fn commit_job_create(
937+
sha: String,
938+
parent_sha: String,
939+
commit_type: &str,
940+
pr: Option<u32>,
941+
release_tag: Option<String>,
942+
commit_time: Date,
943+
target: Target,
944+
machine_id: Option<String>,
945+
started_at: Option<Date>,
946+
finished_at: Option<Date>,
947+
status: &str,
948+
include: Option<String>,
949+
exclude: Option<String>,
950+
runs: Option<i32>,
951+
backends: Option<String>,
952+
) -> CommitJob {
953+
let job_type = match commit_type {
954+
"try" => CommitJobType::Try(pr.expect("`pr` cannot be `None` for a Commit of type `try`")),
955+
"master" => {
956+
CommitJobType::Master(pr.expect("`pr` cannot be `None` for a Commit of type `master`"))
957+
}
958+
"release" => CommitJobType::Release(
959+
release_tag.expect("`release_tag` cannot be `None` for a Commit of type `release`"),
960+
),
961+
_ => panic!("Unhandled commit_type {}", commit_type),
962+
};
963+
964+
let commit_job = CommitJobEntity {
965+
sha,
966+
parent_sha,
967+
commit_time,
968+
target,
969+
include,
970+
exclude,
971+
runs,
972+
backends,
973+
job_type,
974+
};
975+
976+
match status {
977+
"queued" => CommitJob::Queued(commit_job),
978+
979+
"in_progress" => {
980+
let started_at =
981+
started_at.expect("`started_at` must be Some for an `in_progress` job");
982+
let machine_id =
983+
machine_id.expect("`machine_id` must be Some for an `in_progress` job");
984+
985+
CommitJob::InProgress(CommitJobInProgress {
986+
commit_job,
987+
started_at,
988+
machine_id,
989+
})
990+
}
991+
992+
"finished" | "failed" => {
993+
let started_at =
994+
started_at.expect(&format!("`started_at` must be Some for a `{}` job", status));
995+
let finished_at = finished_at.expect(&format!(
996+
"`finished_at` must be Some for a `{}` job",
997+
status
998+
));
999+
let machine_id =
1000+
machine_id.expect(&format!("`machine_id` must be Some for a `{}` job", status));
1001+
1002+
if status == "finished" {
1003+
CommitJob::Finished(CommitJobFinished {
1004+
commit_job,
1005+
started_at,
1006+
finished_at,
1007+
machine_id,
1008+
})
1009+
} else {
1010+
CommitJob::Failed(CommitJobFailed {
1011+
commit_job,
1012+
started_at,
1013+
finished_at,
1014+
machine_id,
1015+
})
1016+
}
1017+
}
1018+
1019+
other => {
1020+
panic!("unknown status `{other}` (expected `queued`, `in_progress`, `finished` or `failed`)")
1021+
}
1022+
}
1023+
}
1024+
1025+
/// Given a vector of `CommitJobs` bucket them out into;
1026+
/// `try`, `master` and `release` (in that order)
1027+
pub fn split_queued_commit_jobs(
1028+
commit_jobs: &[CommitJob],
1029+
) -> (
1030+
Vec<(&CommitJob, u32)>,
1031+
Vec<(&CommitJob, u32)>,
1032+
Vec<(&CommitJob, String)>,
1033+
) {
1034+
// Split jobs by type as that determines what we enter into the database,
1035+
// `ToSql` is quite finiky about lifetimes. Moreover the column names
1036+
// change depending on the commit job type. `master` and `try` have
1037+
// a `pr` column whereas `release` has a `release_rag` column
1038+
commit_jobs.iter().fold(
1039+
(vec![], vec![], vec![]),
1040+
|(mut try_commits, mut master_commits, mut release_commits), job| {
1041+
let entity = job
1042+
.as_queued()
1043+
.expect("Can only enqueue jobs with a status of `queued`");
1044+
1045+
match &entity.job_type {
1046+
crate::CommitJobType::Try(pr) => try_commits.push((job, *pr)),
1047+
crate::CommitJobType::Master(pr) => master_commits.push((job, *pr)),
1048+
crate::CommitJobType::Release(release_tag) => {
1049+
release_commits.push((job, release_tag.clone()))
1050+
}
1051+
}
1052+
(try_commits, master_commits, release_commits)
1053+
},
1054+
)
1055+
}

database/src/pool.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use crate::{
2-
ArtifactCollection, ArtifactId, ArtifactIdNumber, CodegenBackend, CompileBenchmark, Target,
2+
ArtifactCollection, ArtifactId, ArtifactIdNumber, CodegenBackend, CommitJob, CompileBenchmark,
3+
Target,
34
};
45
use crate::{CollectionId, Index, Profile, QueuedCommit, Scenario, Step};
56
use chrono::{DateTime, Utc};
@@ -178,6 +179,16 @@ pub trait Connection: Send + Sync {
178179

179180
/// Removes all data associated with the given artifact.
180181
async fn purge_artifact(&self, aid: &ArtifactId);
182+
183+
/// Add a jobs to the queue
184+
async fn enqueue_commit_jobs(&self, jobs: &[CommitJob]);
185+
186+
/// Dequeue jobs, we pass `machine_id` and `target` in case there are jobs
187+
/// the machine was previously doing and can pick up again
188+
async fn dequeue_commit_job(&self, machine_id: &str, target: Target) -> Option<CommitJob>;
189+
190+
/// Mark the job as finished
191+
async fn finish_commit_job(&self, machine_id: &str, target: Target, sha: String) -> bool;
181192
}
182193

183194
#[async_trait::async_trait]

0 commit comments

Comments
 (0)