Skip to content

Commit d1eb861

Browse files
committed
feat - database schema & enqueue/dequeue logic
1 parent fdb06ed commit d1eb861

File tree

4 files changed

+1052
-8
lines changed

4 files changed

+1052
-8
lines changed

database/src/lib.rs

Lines changed: 267 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,9 @@ use chrono::{DateTime, Utc};
33
use hashbrown::HashMap;
44
use intern::intern;
55
use serde::{Deserialize, Serialize};
6-
use std::fmt;
6+
use std::fmt::{self, Display, Formatter};
77
use std::hash;
8-
use std::ops::{Add, Sub};
8+
use std::ops::{Add, Deref, Sub};
99
use std::sync::Arc;
1010
use std::time::Duration;
1111

@@ -152,6 +152,15 @@ impl FromStr for CommitType {
152152
}
153153
}
154154

155+
impl Display for CommitType {
156+
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
157+
match self {
158+
CommitType::Try => f.write_str("try"),
159+
CommitType::Master => f.write_str("master"),
160+
}
161+
}
162+
}
163+
155164
#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
156165
pub struct Commit {
157166
pub sha: String,
@@ -791,3 +800,259 @@ pub struct ArtifactCollection {
791800
pub duration: Duration,
792801
pub end_time: DateTime<Utc>,
793802
}
803+
804+
#[derive(Debug, Clone, Serialize, Deserialize)]
805+
pub enum CommitJobType {
806+
Try(u32),
807+
Master(u32),
808+
Release(String),
809+
}
810+
811+
#[derive(Debug, Clone, Serialize, Deserialize)]
812+
pub struct CommitJobEntity {
813+
pub sha: String,
814+
pub parent_sha: String,
815+
pub commit_time: Date,
816+
pub target: Target,
817+
pub include: Option<String>,
818+
pub exclude: Option<String>,
819+
pub runs: Option<i32>,
820+
pub backends: Option<String>,
821+
pub job_type: CommitJobType,
822+
}
823+
824+
#[derive(Debug, Clone, Serialize, Deserialize)]
825+
pub struct CommitJobInProgress {
826+
pub commit_job: CommitJobEntity,
827+
pub machine_id: String,
828+
pub started_at: Date,
829+
}
830+
831+
#[derive(Debug, Clone, Serialize, Deserialize)]
832+
pub struct CommitJobFinished {
833+
pub commit_job: CommitJobEntity,
834+
pub machine_id: String,
835+
pub started_at: Date,
836+
pub finished_at: Date,
837+
}
838+
839+
#[derive(Debug, Clone, Serialize, Deserialize)]
840+
pub struct CommitJobFailed {
841+
pub commit_job: CommitJobEntity,
842+
pub machine_id: String,
843+
pub started_at: Date,
844+
pub finished_at: Date,
845+
}
846+
847+
#[derive(Debug, Clone, Serialize, Deserialize)]
848+
pub enum CommitJob {
849+
Queued(CommitJobEntity),
850+
InProgress(CommitJobInProgress),
851+
Finished(CommitJobFinished),
852+
Failed(CommitJobFailed),
853+
}
854+
855+
impl CommitJob {
856+
/// Returns `Some(&CommitJobEntity)` only if the job is still queued.
857+
pub fn as_queued(&self) -> Option<&CommitJobEntity> {
858+
match self {
859+
CommitJob::Queued(e) => Some(e),
860+
_ => None,
861+
}
862+
}
863+
864+
/// Returns `Some(&CommitJobInProgress)` while the job is running.
865+
pub fn as_in_progress(&self) -> Option<&CommitJobInProgress> {
866+
match self {
867+
CommitJob::InProgress(ip) => Some(ip),
868+
_ => None,
869+
}
870+
}
871+
872+
/// Returns `Some(&CommitJobFinished)` once the job is done.
873+
pub fn as_finished(&self) -> Option<&CommitJobFinished> {
874+
match self {
875+
CommitJob::Finished(fin) => Some(fin),
876+
_ => None,
877+
}
878+
}
879+
880+
/// Get the status as a string
881+
pub fn status(&self) -> &'static str {
882+
match self {
883+
CommitJob::Queued(_) => "queued",
884+
CommitJob::InProgress(_) => "in_progress",
885+
CommitJob::Finished(_) => "finished",
886+
CommitJob::Failed(_) => "failed",
887+
}
888+
}
889+
890+
/// True when `status == "finished"`.
891+
pub fn is_finished(&self) -> bool {
892+
matches!(self, CommitJob::Finished(_))
893+
}
894+
895+
/// Will compose the column names for the job type
896+
pub fn get_enqueue_column_names(&self) -> Vec<String> {
897+
let mut base_columns = vec![
898+
String::from("sha"),
899+
String::from("parent_sha"),
900+
String::from("commit_type"),
901+
String::from("commit_time"),
902+
String::from("status"),
903+
String::from("target"),
904+
String::from("include"),
905+
String::from("exclude"),
906+
String::from("runs"),
907+
String::from("backends"),
908+
];
909+
910+
/* This is the last column */
911+
match self.job_type {
912+
CommitJobType::Try(_) => base_columns.push("pr".into()),
913+
CommitJobType::Master(_) => base_columns.push("pr".into()),
914+
CommitJobType::Release(_) => base_columns.push("release_tag".into()),
915+
};
916+
917+
base_columns
918+
}
919+
}
920+
921+
impl Deref for CommitJob {
922+
type Target = CommitJobEntity;
923+
fn deref(&self) -> &Self::Target {
924+
match self {
925+
CommitJob::Queued(e) => e,
926+
CommitJob::InProgress(ip) => &ip.commit_job,
927+
CommitJob::Finished(fin) => &fin.commit_job,
928+
CommitJob::Failed(fail) => &fail.commit_job,
929+
}
930+
}
931+
}
932+
933+
/// Maps from the database to a Rust struct
934+
#[allow(clippy::too_many_arguments)]
935+
fn commit_job_create(
936+
sha: String,
937+
parent_sha: String,
938+
commit_type: &str,
939+
pr: Option<u32>,
940+
release_tag: Option<String>,
941+
commit_time: Date,
942+
target: Target,
943+
machine_id: Option<String>,
944+
started_at: Option<Date>,
945+
finished_at: Option<Date>,
946+
status: &str,
947+
include: Option<String>,
948+
exclude: Option<String>,
949+
runs: Option<i32>,
950+
backends: Option<String>,
951+
) -> CommitJob {
952+
let job_type = match commit_type {
953+
"try" => CommitJobType::Try(pr.expect("`pr` cannot be `None` for a Commit of type `try`")),
954+
"master" => {
955+
CommitJobType::Master(pr.expect("`pr` cannot be `None` for a Commit of type `master`"))
956+
}
957+
"release" => CommitJobType::Release(
958+
release_tag.expect("`release_tag` cannot be `None` for a Commit of type `release`"),
959+
),
960+
_ => panic!("Unhandled commit_type {}", commit_type),
961+
};
962+
963+
let commit_job = CommitJobEntity {
964+
sha,
965+
parent_sha,
966+
commit_time,
967+
target,
968+
include,
969+
exclude,
970+
runs,
971+
backends,
972+
job_type,
973+
};
974+
975+
match status {
976+
"queued" => CommitJob::Queued(commit_job),
977+
978+
"in_progress" => {
979+
let started_at =
980+
started_at.expect("`started_at` must be Some for an `in_progress` job");
981+
let machine_id =
982+
machine_id.expect("`machine_id` must be Some for an `in_progress` job");
983+
984+
CommitJob::InProgress(CommitJobInProgress {
985+
commit_job,
986+
started_at,
987+
machine_id,
988+
})
989+
}
990+
991+
"finished" | "failed" => {
992+
let started_at =
993+
started_at.expect("`started_at` must be Some for finished or failed job");
994+
let finished_at =
995+
finished_at.expect("`finished_at` must be Some for finished or failed");
996+
let machine_id =
997+
machine_id.expect("`machine_id` must be Some for finished or failed a job");
998+
999+
if status == "finished" {
1000+
CommitJob::Finished(CommitJobFinished {
1001+
commit_job,
1002+
started_at,
1003+
finished_at,
1004+
machine_id,
1005+
})
1006+
} else {
1007+
CommitJob::Failed(CommitJobFailed {
1008+
commit_job,
1009+
started_at,
1010+
finished_at,
1011+
machine_id,
1012+
})
1013+
}
1014+
}
1015+
1016+
other => {
1017+
panic!("unknown status `{other}` (expected `queued`, `in_progress`, `finished` or `failed`)")
1018+
}
1019+
}
1020+
}
1021+
1022+
pub struct CommitsByType<'a> {
1023+
pub r#try: Vec<(&'a CommitJob, u32)>,
1024+
pub master: Vec<(&'a CommitJob, u32)>,
1025+
pub release: Vec<(&'a CommitJob, String)>,
1026+
}
1027+
1028+
/// Given a vector of `CommitJobs` bucket them out into;
1029+
/// `try`, `master` and `release` (in that order)
1030+
pub fn split_queued_commit_jobs(commit_jobs: &[CommitJob]) -> CommitsByType<'_> {
1031+
// Split jobs by type as that determines what we enter into the database,
1032+
// `ToSql` is quite finiky about lifetimes. Moreover the column names
1033+
// change depending on the commit job type. `master` and `try` have
1034+
// a `pr` column whereas `release` has a `release_rag` column
1035+
let (try_commits, master_commits, release_commits) = commit_jobs.iter().fold(
1036+
(vec![], vec![], vec![]),
1037+
|(mut try_commits, mut master_commits, mut release_commits), job| {
1038+
let entity = job
1039+
.as_queued()
1040+
.expect("Can only enqueue jobs with a status of `queued`");
1041+
1042+
match &entity.job_type {
1043+
crate::CommitJobType::Try(pr) => try_commits.push((job, *pr)),
1044+
crate::CommitJobType::Master(pr) => master_commits.push((job, *pr)),
1045+
crate::CommitJobType::Release(release_tag) => {
1046+
release_commits.push((job, release_tag.clone()))
1047+
}
1048+
}
1049+
(try_commits, master_commits, release_commits)
1050+
},
1051+
);
1052+
1053+
CommitsByType {
1054+
r#try: try_commits,
1055+
master: master_commits,
1056+
release: release_commits,
1057+
}
1058+
}

database/src/pool.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use crate::{
2-
ArtifactCollection, ArtifactId, ArtifactIdNumber, CodegenBackend, CompileBenchmark, Target,
2+
ArtifactCollection, ArtifactId, ArtifactIdNumber, CodegenBackend, CommitJob, CompileBenchmark,
3+
Target,
34
};
45
use crate::{CollectionId, Index, Profile, QueuedCommit, Scenario, Step};
56
use chrono::{DateTime, Utc};
@@ -178,6 +179,16 @@ pub trait Connection: Send + Sync {
178179

179180
/// Removes all data associated with the given artifact.
180181
async fn purge_artifact(&self, aid: &ArtifactId);
182+
183+
/// Add a jobs to the queue
184+
async fn enqueue_commit_jobs(&self, jobs: &[CommitJob]);
185+
186+
/// Dequeue jobs, we pass `machine_id` and `target` in case there are jobs
187+
/// the machine was previously doing and can pick up again
188+
async fn dequeue_commit_job(&self, machine_id: &str, target: Target) -> Option<CommitJob>;
189+
190+
/// Mark the job as finished
191+
async fn finish_commit_job(&self, machine_id: &str, target: Target, sha: String) -> bool;
181192
}
182193

183194
#[async_trait::async_trait]

0 commit comments

Comments
 (0)