diff --git a/Makefile b/Makefile index 70bed4fc..92546046 100644 --- a/Makefile +++ b/Makefile @@ -35,7 +35,7 @@ GOIMPORTS_VERSION ?= v0.33.0 # (the out_dir convention in tool/proto/BUILD.bazel) and copied back here. A # package may hold multiple .proto files (e.g. an RPC contract plus messagequeue # contracts); all generated stubs land in the same protopb/ dir. -PROTO_PACKAGES = api/base/change api/base/mergestrategy api/base/messagequeue api/runway/messagequeue api/runway api/submitqueue/gateway api/submitqueue/orchestrator api/stovepipe +PROTO_PACKAGES = api/base/change api/base/mergestrategy api/base/messagequeue api/runway/messagequeue api/runway api/submitqueue/gateway api/submitqueue/orchestrator api/stovepipe stovepipe/core/messagequeue # Set REPO_ROOT for docker-compose export REPO_ROOT := $(shell pwd) @@ -51,7 +51,7 @@ define assert_clean fi endef -.PHONY: build build-all-linux build-runway-linux build-submitqueue-gateway-linux build-submitqueue-orchestrator-linux build-stovepipe-linux check-gazelle check-mocks check-tidy clean clean-proto deps e2e-test fmt gazelle integration-test integration-test-submitqueue-consumer integration-test-extensions integration-test-submitqueue-gateway integration-test-submitqueue-orchestrator license-fix lint lint-fmt lint-license local-init-runway-queue-schema local-runway-start local-runway-stop local-submitqueue-clean local-submitqueue-gateway-start local-submitqueue-gateway-stop local-init-submitqueue-schemas local-submitqueue-logs local-submitqueue-orchestrator-start local-submitqueue-orchestrator-stop local-submitqueue-ps local-submitqueue-restart local-submitqueue-start local-stop local-stovepipe-logs local-stovepipe-start local-stovepipe-stop mocks proto query-deps query-targets run-client-runway run-client-submitqueue-gateway run-client-submitqueue-orchestrator run-client-stovepipe run-queue-admin test test-no-cache tidy tidy-bazel tidy-go help +.PHONY: build build-all-linux build-runway-linux build-submitqueue-gateway-linux build-submitqueue-orchestrator-linux build-stovepipe-linux check-gazelle check-mocks check-tidy clean clean-proto deps e2e-test fmt gazelle integration-test integration-test-submitqueue-consumer integration-test-extensions integration-test-submitqueue-gateway integration-test-submitqueue-orchestrator license-fix lint lint-fmt lint-license local-init-runway-queue-schema local-init-stovepipe-schemas local-runway-start local-runway-stop local-submitqueue-clean local-submitqueue-gateway-start local-submitqueue-gateway-stop local-init-submitqueue-schemas local-submitqueue-logs local-submitqueue-orchestrator-start local-submitqueue-orchestrator-stop local-submitqueue-ps local-submitqueue-restart local-submitqueue-start local-stop local-stovepipe-logs local-stovepipe-start local-stovepipe-stop mocks proto query-deps query-targets run-client-runway run-client-submitqueue-gateway run-client-submitqueue-orchestrator run-client-stovepipe run-queue-admin test test-no-cache tidy tidy-bazel tidy-go help build: ## Build all services and examples @@ -221,6 +221,19 @@ local-init-runway-queue-schema: ## Apply queue schema only (mysql-queue) for Run done @echo "✅ Runway queue schema applied successfully" +local-init-stovepipe-schemas: ## Apply storage (mysql-app) and queue (mysql-queue) schemas for Stovepipe compose stacks + @echo "Applying storage schema to mysql-app..." + @for file in stovepipe/extension/storage/mysql/schema/*.sql; do \ + echo " - Applying $$(basename $$file)..."; \ + docker exec -i $(STOVEPIPE_LOCAL_PROJECT)-mysql-app-1 mysql -uroot -proot submitqueue < $$file 2>&1 | grep -v "Using a password" || true; \ + done + @echo "Applying queue schema to mysql-queue..." + @for file in platform/extension/messagequeue/mysql/schema/*.sql; do \ + echo " - Applying $$(basename $$file)..."; \ + docker exec -i $(STOVEPIPE_LOCAL_PROJECT)-mysql-queue-1 mysql -uroot -proot submitqueue < $$file 2>&1 | grep -v "Using a password" || true; \ + done + @echo "✅ Stovepipe schemas applied successfully" + local-runway-start: build-runway-linux ## Start Runway locally (runway + MySQL queue) @echo "Starting Runway with compose..." @$(COMPOSE) -f $(RUNWAY_COMPOSE_FILE) -p $(RUNWAY_LOCAL_PROJECT) up -d --build --wait @@ -310,9 +323,11 @@ local-stop: ## Stop all services (keep data) local-stovepipe-logs: ## View logs from the running Stovepipe service @$(COMPOSE) -f $(STOVEPIPE_COMPOSE_FILE) -p $(STOVEPIPE_LOCAL_PROJECT) logs -f -local-stovepipe-start: build-stovepipe-linux ## Start Stovepipe service (single Ping-only gRPC service) +local-stovepipe-start: build-stovepipe-linux ## Start Stovepipe service (gRPC service + MySQL storage + MySQL queue) @echo "Starting Stovepipe service with compose..." @$(COMPOSE) -f $(STOVEPIPE_COMPOSE_FILE) -p $(STOVEPIPE_LOCAL_PROJECT) up -d --build --wait + @echo "Applying storage and queue schemas..." + @$(MAKE) -s local-init-stovepipe-schemas @echo "" @echo "✅ Stovepipe service is running!" @echo "" diff --git a/example/stovepipe/docker-compose.yml b/example/stovepipe/docker-compose.yml index aa3866d1..7bc64cc9 100644 --- a/example/stovepipe/docker-compose.yml +++ b/example/stovepipe/docker-compose.yml @@ -1,7 +1,9 @@ # Docker Compose for the Stovepipe service. # -# Stovepipe is currently a single Ping-only gRPC service with no storage or -# queue dependencies, so this stack runs just the one service. +# Stovepipe ingests a queue's head commit, persists the Request, and publishes it +# to the process stage over a messaging queue; the process consumer reloads the +# Request. The stack therefore runs the service plus two MySQL databases: one for +# storage (request, request_uri) and one for the messaging queue. # # IMPORTANT: Before running compose, build the Linux binary: # make build-stovepipe-linux @@ -12,8 +14,40 @@ # Quick start: # make local-stovepipe-start # +# After `up`, the storage and queue schemas are applied (local-init-stovepipe-schemas). services: + # Storage database - Stovepipe's request and request_uri tables. + mysql-app: + image: mysql:8.0 + environment: + MYSQL_ROOT_PASSWORD: root + MYSQL_DATABASE: submitqueue + ports: + - "3306" # Random ephemeral port to avoid conflicts + healthcheck: + # Use 127.0.0.1 (TCP) instead of localhost (Unix socket). MySQL treats + # "localhost" as a socket connection, which can be ready before the TCP + # listener — causing dependent services that connect over TCP to fail. + test: ["CMD", "mysqladmin", "ping", "-h", "127.0.0.1", "-proot"] + interval: 5s + timeout: 5s + retries: 10 + + # Queue database - messaging infrastructure (messages, offsets, partition leases). + mysql-queue: + image: mysql:8.0 + environment: + MYSQL_ROOT_PASSWORD: root + MYSQL_DATABASE: submitqueue + ports: + - "3306" # Random ephemeral port to avoid conflicts + healthcheck: + test: ["CMD", "mysqladmin", "ping", "-h", "127.0.0.1", "-proot"] + interval: 5s + timeout: 5s + retries: 10 + stovepipe-service: build: context: ${REPO_ROOT} @@ -22,3 +56,11 @@ services: - "8080" # Random ephemeral port to avoid conflicts environment: - PORT=:8080 + - STORAGE_MYSQL_DSN=root:root@tcp(mysql-app:3306)/submitqueue?parseTime=true + - QUEUE_MYSQL_DSN=root:root@tcp(mysql-queue:3306)/submitqueue?parseTime=true + - HOSTNAME=stovepipe-dev + depends_on: + mysql-app: + condition: service_healthy + mysql-queue: + condition: service_healthy diff --git a/example/stovepipe/server/BUILD.bazel b/example/stovepipe/server/BUILD.bazel index 017d8a96..68572b79 100644 --- a/example/stovepipe/server/BUILD.bazel +++ b/example/stovepipe/server/BUILD.bazel @@ -7,7 +7,19 @@ go_library( visibility = ["//visibility:private"], deps = [ "//api/stovepipe/protopb", + "//platform/consumer", + "//platform/errs", + "//platform/errs/generic", + "//platform/errs/mysql", + "//platform/extension/messagequeue", + "//platform/extension/messagequeue/mysql", "//stovepipe/controller", + "//stovepipe/controller/process", + "//stovepipe/core/messagequeue", + "//stovepipe/extension/sourcecontrol", + "//stovepipe/extension/sourcecontrol/fake", + "//stovepipe/extension/storage/mysql", + "@com_github_go_sql_driver_mysql//:mysql", "@com_github_uber_go_tally//:tally", "@org_golang_google_grpc//:grpc", "@org_golang_google_grpc//reflection", diff --git a/example/stovepipe/server/main.go b/example/stovepipe/server/main.go index 8ab97ae1..e082ba5d 100644 --- a/example/stovepipe/server/main.go +++ b/example/stovepipe/server/main.go @@ -16,6 +16,7 @@ package main import ( "context" + "database/sql" "errors" "fmt" "net" @@ -25,9 +26,21 @@ import ( "syscall" "time" + _ "github.com/go-sql-driver/mysql" "github.com/uber-go/tally" pb "github.com/uber/submitqueue/api/stovepipe/protopb" + "github.com/uber/submitqueue/platform/consumer" + "github.com/uber/submitqueue/platform/errs" + genericerrs "github.com/uber/submitqueue/platform/errs/generic" + mysqlerrs "github.com/uber/submitqueue/platform/errs/mysql" + extqueue "github.com/uber/submitqueue/platform/extension/messagequeue" + queueMySQL "github.com/uber/submitqueue/platform/extension/messagequeue/mysql" "github.com/uber/submitqueue/stovepipe/controller" + "github.com/uber/submitqueue/stovepipe/controller/process" + stovepipemq "github.com/uber/submitqueue/stovepipe/core/messagequeue" + "github.com/uber/submitqueue/stovepipe/extension/sourcecontrol" + sourcecontrolfake "github.com/uber/submitqueue/stovepipe/extension/sourcecontrol/fake" + storageMySQL "github.com/uber/submitqueue/stovepipe/extension/storage/mysql" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/reflection" @@ -70,6 +83,15 @@ func (c *inMemoryCounter) Next(_ context.Context, domain string) (int64, error) return c.values[domain], nil } +// fakeSourceControlFactory is the example SourceControl factory. It seeds each queue with a +// deterministic single-commit history so ingest resolves a stable head URI (and re-ingesting +// the same queue exercises the dedup path). A real deployment supplies a VCS-backed factory. +type fakeSourceControlFactory struct{} + +func (fakeSourceControlFactory) For(cfg sourcecontrol.Config) (sourcecontrol.SourceControl, error) { + return sourcecontrolfake.New([]string{fmt.Sprintf("git://%s/HEAD", cfg.QueueName)}), nil +} + func main() { code := 0 if err := run(); err != nil { @@ -131,12 +153,85 @@ func run() error { metricsWgDone.Wait() }() + // Storage database (request + request_uri tables). + storageDSN := os.Getenv("STORAGE_MYSQL_DSN") + if storageDSN == "" { + return fmt.Errorf("STORAGE_MYSQL_DSN environment variable is required") + } + storageDB, err := sql.Open("mysql", storageDSN) + if err != nil { + return fmt.Errorf("failed to open storage database: %w", err) + } + defer storageDB.Close() + + store, err := storageMySQL.NewStorage(storageDB, scope.SubScope("storage")) + if err != nil { + return fmt.Errorf("failed to create storage: %w", err) + } + defer store.Close() + + // Queue database (messaging infrastructure for the process stage). + queueDSN := os.Getenv("QUEUE_MYSQL_DSN") + if queueDSN == "" { + return fmt.Errorf("QUEUE_MYSQL_DSN environment variable is required") + } + queueDB, err := sql.Open("mysql", queueDSN) + if err != nil { + return fmt.Errorf("failed to open queue database: %w", err) + } + defer queueDB.Close() + + mysqlQueue, err := queueMySQL.NewQueue(queueMySQL.Params{ + DB: queueDB, + Logger: logger, + MetricsScope: scope.SubScope("queue"), + }) + if err != nil { + return fmt.Errorf("failed to create queue: %w", err) + } + defer mysqlQueue.Close() + + subscriberName := os.Getenv("HOSTNAME") + if subscriberName == "" { + subscriberName = fmt.Sprintf("stovepipe-%d", time.Now().Unix()) + } + + registry, err := newTopicRegistry(mysqlQueue, subscriberName) + if err != nil { + return fmt.Errorf("failed to create topic registry: %w", err) + } + + // Consumer running the process stage. + primaryConsumer := consumer.New(logger.Sugar(), scope.SubScope("consumer"), registry, + errs.NewClassifierProcessor( + genericerrs.Classifier, + mysqlerrs.Classifier, + ), + ) + + processController := process.NewController(logger.Sugar(), scope, store, stovepipemq.TopicKeyProcess, "stovepipe-process") + if err := primaryConsumer.Register(processController); err != nil { + return fmt.Errorf("failed to register process controller: %w", err) + } + + if err := primaryConsumer.Start(ctx); err != nil { + return fmt.Errorf("failed to start consumer: %w", err) + } + logger.Info("consumer started") + // Create gRPC server grpcServer := grpc.NewServer() // Create controllers and wrap them for gRPC pingController := controller.NewPingController(logger, scope) - ingestController := controller.NewIngestController(logger.Sugar(), scope, newInMemoryCounter()) + ingestController := controller.NewIngestController( + logger.Sugar(), + scope, + newInMemoryCounter(), + fakeSourceControlFactory{}, + store, + registry, + ) srv := &StovepipeServer{ pingController: pingController, ingestController: ingestController, @@ -166,16 +261,13 @@ func run() error { }() // Wait for interrupt signal or server critical error - // If interruption is signaled, gracefully stop the server - // If an error happens during shutdown, return the actual error, not the context cancellation error var serverErr error select { case <-ctx.Done(): fmt.Println("Shutting down stovepipe server due to interruption signal...") // Set the error to the context cancellation error to be surfaced as a desired exit code by the main function - // to indicate that the server was stopped as intended - // It may be overridden by the server error if any + // to indicate that the server was stopped as intended. It may be overridden by the server error if any. err = ctx.Err() // stop GRPC server and wait for it to exit @@ -183,11 +275,36 @@ func run() error { serverErr = <-serverErrCh case serverErr = <-serverErrCh: fmt.Println("Shutting down stovepipe server due to critical GRPC server error...") + cancel() } if serverErr != nil { - err = fmt.Errorf("GRPC server exited with error: %w", serverErr) + serverErr = fmt.Errorf("GRPC server exited with error: %w", serverErr) + } + + consumerStopErr := primaryConsumer.Stop(30000) + if consumerStopErr != nil { + consumerStopErr = fmt.Errorf("failed to stop consumer: %w", consumerStopErr) + } + + if consumerStopErr != nil || serverErr != nil { + err = errors.Join(err, consumerStopErr, serverErr) } return err } + +// newTopicRegistry builds the TopicRegistry for Stovepipe's internal pipeline queues. ingest +// publishes to the process topic and the process consumer subscribes to it. +func newTopicRegistry(q extqueue.Queue, subscriberName string) (consumer.TopicRegistry, error) { + return consumer.NewTopicRegistry([]consumer.TopicConfig{ + { + Key: stovepipemq.TopicKeyProcess, + Name: "process", + Queue: q, + Subscription: extqueue.DefaultSubscriptionConfig( + subscriberName, "stovepipe-process", + ), + }, + }) +} diff --git a/stovepipe/controller/BUILD.bazel b/stovepipe/controller/BUILD.bazel index 51aa4636..f0faffa5 100644 --- a/stovepipe/controller/BUILD.bazel +++ b/stovepipe/controller/BUILD.bazel @@ -10,10 +10,15 @@ go_library( visibility = ["//visibility:public"], deps = [ "//api/stovepipe/protopb", + "//platform/base/messagequeue", + "//platform/consumer", "//platform/errs", "//platform/extension/counter", "//platform/metrics", + "//stovepipe/core/messagequeue", "//stovepipe/entity", + "//stovepipe/extension/sourcecontrol", + "//stovepipe/extension/storage", "@com_github_uber_go_tally//:tally", "@org_uber_go_zap//:zap", ], @@ -28,7 +33,15 @@ go_test( embed = [":controller"], deps = [ "//api/stovepipe/protopb", + "//platform/consumer", "//platform/extension/counter/mock", + "//platform/extension/messagequeue/mock", + "//stovepipe/core/messagequeue", + "//stovepipe/entity", + "//stovepipe/extension/sourcecontrol", + "//stovepipe/extension/sourcecontrol/mock", + "//stovepipe/extension/storage", + "//stovepipe/extension/storage/mock", "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", "@com_github_uber_go_tally//:tally", diff --git a/stovepipe/controller/ingest.go b/stovepipe/controller/ingest.go index 8d1fd180..697432c9 100644 --- a/stovepipe/controller/ingest.go +++ b/stovepipe/controller/ingest.go @@ -21,10 +21,15 @@ import ( "github.com/uber-go/tally" pb "github.com/uber/submitqueue/api/stovepipe/protopb" + entityqueue "github.com/uber/submitqueue/platform/base/messagequeue" + "github.com/uber/submitqueue/platform/consumer" "github.com/uber/submitqueue/platform/errs" "github.com/uber/submitqueue/platform/extension/counter" "github.com/uber/submitqueue/platform/metrics" + stovepipemq "github.com/uber/submitqueue/stovepipe/core/messagequeue" "github.com/uber/submitqueue/stovepipe/entity" + "github.com/uber/submitqueue/stovepipe/extension/sourcecontrol" + "github.com/uber/submitqueue/stovepipe/extension/storage" "go.uber.org/zap" ) @@ -40,25 +45,48 @@ func IsInvalidRequest(err error) bool { // IngestController handles ingest business logic for stovepipe: it admits a queue's newly // observed commit into the validation pipeline. // -// This is the thin entry point. It mints a request ID namespaced by the queue and records the -// resulting Request. Resolving the commit URI via the SourceControl extension, persisting the -// Request, and publishing it onto the pipeline are deliberately out of scope for now. +// It resolves the queue's head commit URI via the SourceControl extension, dedups on the +// (queue, URI) pair, persists the Request and its URI mapping via storage, and publishes the +// request ID onto the process stage. Ingestion is idempotent: a re-reported head resolves to the +// already-minted request and no new work is published. type IngestController struct { - logger *zap.SugaredLogger - metricsScope tally.Scope - counter counter.Counter + logger *zap.SugaredLogger + metricsScope tally.Scope + counter counter.Counter + sourceControl sourcecontrol.Factory + store storage.Storage + registry consumer.TopicRegistry } -// NewIngestController creates a new instance of the stovepipe ingest controller. -func NewIngestController(logger *zap.SugaredLogger, scope tally.Scope, counter counter.Counter) *IngestController { +// NewIngestController creates a new instance of the stovepipe ingest controller. It publishes +// accepted requests to the topic registered under messagequeue.TopicKeyProcess in the registry. +func NewIngestController( + logger *zap.SugaredLogger, + scope tally.Scope, + counter counter.Counter, + sourceControl sourcecontrol.Factory, + store storage.Storage, + registry consumer.TopicRegistry, +) *IngestController { return &IngestController{ - logger: logger, - metricsScope: scope.SubScope("ingest_controller"), - counter: counter, + logger: logger, + metricsScope: scope.SubScope("ingest_controller"), + counter: counter, + sourceControl: sourceControl, + store: store, + registry: registry, } } -// Ingest admits a queue's newly observed commit into the validation pipeline and returns the minted request ID. +// Ingest admits a queue's newly observed commit into the validation pipeline and returns the +// request ID validating it. +// +// It is idempotent and runs to completion on every call, each step tolerant of having already +// run: it resolves (or claims) the (queue, URI) mapping, ensures the Request row exists, and +// publishes the request to the process stage. A retry after a partial failure — for example the +// URI mapping committed but the request write failed — completes the missing steps instead of +// returning a dangling reference. The (queue, URI) mapping is the dedup gate, so concurrent +// ingests of the same head converge on one request. func (c *IngestController) Ingest(ctx context.Context, req *pb.IngestRequest) (resp *pb.IngestResponse, retErr error) { const opName = "ingest" @@ -68,29 +96,142 @@ func (c *IngestController) Ingest(ctx context.Context, req *pb.IngestRequest) (r if req.Queue == "" { return nil, fmt.Errorf("IngestController requires the request to have a queue name specified: %w", ErrInvalidRequest) } - queue := req.Queue - // Generate a globally unique request ID namespaced by the queue. The counter domain + // Resolve the queue's current head commit to its opaque URI via SourceControl. + // An unresolvable queue/ref is a caller error (unknown queue), not infrastructure. + sc, err := c.sourceControl.For(sourcecontrol.Config{QueueName: queue}) + if err != nil { + return nil, fmt.Errorf("IngestController failed to resolve source control for queue=%s: %w", queue, err) + } + uri, err := sc.Latest(ctx) + if err != nil { + if sourcecontrol.IsNotFound(err) { + return nil, fmt.Errorf("IngestController could not resolve head for queue=%s: %w: %w", queue, err, ErrInvalidRequest) + } + return nil, fmt.Errorf("IngestController failed to resolve head for queue=%s: %w", queue, err) + } + + // The (queue, URI) mapping is the dedup gate and the source of truth for "does this head + // have a request id". + id, err := c.resolveID(ctx, queue, uri) + if err != nil { + return nil, err + } + + // Ensure the request row exists, healing a prior partial write where the mapping committed + // but the request did not. + request, err := c.ensureRequest(ctx, id, queue, uri) + if err != nil { + return nil, err + } + + // Publish while the request is still pre-pipeline (Accepted). The process consumer is + // idempotent (keyed on the request id, at-least-once), so re-publishing on a retry or a + // duplicate report is safe and closes the "request created but publish failed" gap. Once + // process advances the request past Accepted, ingest stops re-publishing. + if request.State == entity.RequestStateAccepted { + if err := c.publishProcess(ctx, id, queue); err != nil { + return nil, fmt.Errorf("IngestController failed to publish request %s to process: %w", id, err) + } + } + + c.logger.Infow("ingested request", + "id", request.ID, + "queue", request.Queue, + "uri", request.URI, + "state", request.State, + ) + + return &pb.IngestResponse{Id: id}, nil +} + +// resolveID returns the request id mapped to (queue, URI), minting and claiming a new one if the +// pair is not yet mapped. Claiming the mapping is the dedup gate: a concurrent ingest that loses +// the claim re-reads and returns the winner's id, so no orphan request row is created (only the +// minted counter value is spent). +func (c *IngestController) resolveID(ctx context.Context, queue, uri string) (string, error) { + uriStore := c.store.GetRequestURIStore() + + if id, err := uriStore.GetIDByURI(ctx, queue, uri); err == nil { + return id, nil + } else if !errors.Is(err, storage.ErrNotFound) { + return "", fmt.Errorf("IngestController failed to look up existing request for queue=%s: %w", queue, err) + } + + // Mint a globally unique request ID namespaced by the queue. The counter domain // ("request/") doubles as the ID prefix, so the ID is "/". domain := "request/" + queue seq, err := c.counter.Next(ctx, domain) if err != nil { - return nil, fmt.Errorf("IngestController failed to generate request ID for queue=%s: %w", queue, err) + return "", fmt.Errorf("IngestController failed to generate request ID for queue=%s: %w", queue, err) + } + id := fmt.Sprintf("%s/%d", domain, seq) + + if err := uriStore.Create(ctx, queue, uri, id); err != nil { + if errors.Is(err, storage.ErrAlreadyExists) { + existing, getErr := uriStore.GetIDByURI(ctx, queue, uri) + if getErr != nil { + return "", fmt.Errorf("IngestController failed to resolve raced request for queue=%s: %w", queue, getErr) + } + return existing, nil + } + return "", fmt.Errorf("IngestController failed to map URI for queue=%s: %w", queue, err) + } + return id, nil +} + +// ensureRequest returns the request for id, creating it in the Accepted state if it does not yet +// exist. A concurrent creator (ErrAlreadyExists) is resolved by re-reading the canonical row. +func (c *IngestController) ensureRequest(ctx context.Context, id, queue, uri string) (entity.Request, error) { + reqStore := c.store.GetRequestStore() + + got, err := reqStore.Get(ctx, id) + if err == nil { + return got, nil + } + if !errors.Is(err, storage.ErrNotFound) { + return entity.Request{}, fmt.Errorf("IngestController failed to load request %s: %w", id, err) } request := entity.Request{ - ID: fmt.Sprintf("%s/%d", domain, seq), + ID: id, Queue: queue, + URI: uri, State: entity.RequestStateAccepted, Version: 1, } + if err := reqStore.Create(ctx, request); err != nil { + if !errors.Is(err, storage.ErrAlreadyExists) { + return entity.Request{}, fmt.Errorf("IngestController failed to persist request %s: %w", id, err) + } + // Raced with a concurrent creator; read the canonical row. + return reqStore.Get(ctx, id) + } + return request, nil +} + +// publishProcess publishes the request ID to the process stage, partitioned by queue so a +// queue's requests stay ordered. +func (c *IngestController) publishProcess(ctx context.Context, id, queue string) error { + payload, err := stovepipemq.Marshal(&stovepipemq.ProcessRequest{Id: id}) + if err != nil { + return fmt.Errorf("failed to serialize process request: %w", err) + } - c.logger.Infow("accepted request", - "id", request.ID, - "queue", request.Queue, - "state", request.State, - ) + msg := entityqueue.NewMessage(id, payload, queue, nil) - return &pb.IngestResponse{Id: request.ID}, nil + q, ok := c.registry.Queue(stovepipemq.TopicKeyProcess) + if !ok { + return fmt.Errorf("no queue registered for topic key %s", stovepipemq.TopicKeyProcess) + } + topicName, ok := c.registry.TopicName(stovepipemq.TopicKeyProcess) + if !ok { + return fmt.Errorf("no topic name registered for topic key %s", stovepipemq.TopicKeyProcess) + } + + if err := q.Publisher().Publish(ctx, topicName, msg); err != nil { + return fmt.Errorf("failed to publish process request: %w", err) + } + return nil } diff --git a/stovepipe/controller/ingest_test.go b/stovepipe/controller/ingest_test.go index c0500251..484fbee1 100644 --- a/stovepipe/controller/ingest_test.go +++ b/stovepipe/controller/ingest_test.go @@ -23,48 +23,209 @@ import ( "github.com/stretchr/testify/require" "github.com/uber-go/tally" pb "github.com/uber/submitqueue/api/stovepipe/protopb" + "github.com/uber/submitqueue/platform/consumer" countermock "github.com/uber/submitqueue/platform/extension/counter/mock" + mqmock "github.com/uber/submitqueue/platform/extension/messagequeue/mock" + stovepipemq "github.com/uber/submitqueue/stovepipe/core/messagequeue" + "github.com/uber/submitqueue/stovepipe/entity" + "github.com/uber/submitqueue/stovepipe/extension/sourcecontrol" + scmock "github.com/uber/submitqueue/stovepipe/extension/sourcecontrol/mock" + "github.com/uber/submitqueue/stovepipe/extension/storage" + storagemock "github.com/uber/submitqueue/stovepipe/extension/storage/mock" "go.uber.org/mock/gomock" "go.uber.org/zap" ) -func newIngestController(t *testing.T, c *countermock.MockCounter) *IngestController { - t.Helper() - return NewIngestController(zap.NewNop().Sugar(), tally.NewTestScope("test", nil), c) +const ( + testQueue = "monorepo/main" + testURI = "git://repo/monorepo/main/abc123" +) + +// ingestMocks bundles the mocks an Ingest test case wires expectations on. +type ingestMocks struct { + counter *countermock.MockCounter + factory *scmock.MockFactory + sc *scmock.MockSourceControl + reqStore *storagemock.MockRequestStore + uriStore *storagemock.MockRequestURIStore + publisher *mqmock.MockPublisher } -func TestIngestController_Ingest(t *testing.T) { - ctrl := gomock.NewController(t) - mockCounter := countermock.NewMockCounter(ctrl) - mockCounter.EXPECT().Next(gomock.Any(), "request/monorepo/main").Return(int64(7), nil) +func newIngestController(t *testing.T, ctrl *gomock.Controller) (*IngestController, ingestMocks) { + t.Helper() - c := newIngestController(t, mockCounter) + m := ingestMocks{ + counter: countermock.NewMockCounter(ctrl), + factory: scmock.NewMockFactory(ctrl), + sc: scmock.NewMockSourceControl(ctrl), + reqStore: storagemock.NewMockRequestStore(ctrl), + uriStore: storagemock.NewMockRequestURIStore(ctrl), + publisher: mqmock.NewMockPublisher(ctrl), + } - resp, err := c.Ingest(context.Background(), &pb.IngestRequest{Queue: "monorepo/main"}) - require.NoError(t, err) - assert.Equal(t, "request/monorepo/main/7", resp.Id) -} + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetRequestStore().Return(m.reqStore).AnyTimes() + store.EXPECT().GetRequestURIStore().Return(m.uriStore).AnyTimes() + + queue := mqmock.NewMockQueue(ctrl) + queue.EXPECT().Publisher().Return(m.publisher).AnyTimes() -func TestIngestController_Ingest_EmptyQueue(t *testing.T) { - ctrl := gomock.NewController(t) - mockCounter := countermock.NewMockCounter(ctrl) - // Counter must not be consulted when the queue is missing. + registry, err := consumer.NewTopicRegistry([]consumer.TopicConfig{ + {Key: stovepipemq.TopicKeyProcess, Name: "process", Queue: queue}, + }) + require.NoError(t, err) - c := newIngestController(t, mockCounter) + c := NewIngestController(zap.NewNop().Sugar(), tally.NewTestScope("test", nil), m.counter, m.factory, store, registry) + return c, m +} - _, err := c.Ingest(context.Background(), &pb.IngestRequest{Queue: ""}) - require.Error(t, err) - assert.True(t, IsInvalidRequest(err)) +// expectResolve wires the SourceControl factory + Latest happy path returning testURI. +func expectResolve(m ingestMocks) { + m.factory.EXPECT().For(sourcecontrol.Config{QueueName: testQueue}).Return(m.sc, nil) + m.sc.EXPECT().Latest(gomock.Any()).Return(testURI, nil) } -func TestIngestController_Ingest_CounterError(t *testing.T) { - ctrl := gomock.NewController(t) - mockCounter := countermock.NewMockCounter(ctrl) - mockCounter.EXPECT().Next(gomock.Any(), gomock.Any()).Return(int64(0), errors.New("counter unavailable")) +func TestIngestController_Ingest(t *testing.T) { + tests := []struct { + name string + queue string + setup func(m ingestMocks) + wantID string + wantErr bool + wantInvalid bool + }{ + { + name: "happy path mints persists and publishes", + queue: testQueue, + setup: func(m ingestMocks) { + expectResolve(m) + m.uriStore.EXPECT().GetIDByURI(gomock.Any(), testQueue, testURI).Return("", storage.ErrNotFound) + m.counter.EXPECT().Next(gomock.Any(), "request/"+testQueue).Return(int64(7), nil) + m.uriStore.EXPECT().Create(gomock.Any(), testQueue, testURI, "request/monorepo/main/7").Return(nil) + m.reqStore.EXPECT().Get(gomock.Any(), "request/monorepo/main/7").Return(entity.Request{}, storage.ErrNotFound) + m.reqStore.EXPECT().Create(gomock.Any(), gomock.Any()).Return(nil) + m.publisher.EXPECT().Publish(gomock.Any(), "process", gomock.Any()).Return(nil) + }, + wantID: "request/monorepo/main/7", + }, + { + name: "dedup with existing accepted request republishes without minting", + queue: testQueue, + setup: func(m ingestMocks) { + expectResolve(m) + m.uriStore.EXPECT().GetIDByURI(gomock.Any(), testQueue, testURI).Return("request/monorepo/main/3", nil) + m.reqStore.EXPECT().Get(gomock.Any(), "request/monorepo/main/3").Return(entity.Request{ID: "request/monorepo/main/3", State: entity.RequestStateAccepted}, nil) + m.publisher.EXPECT().Publish(gomock.Any(), "process", gomock.Any()).Return(nil) + }, + wantID: "request/monorepo/main/3", + }, + { + name: "heals when uri mapped but request missing", + queue: testQueue, + setup: func(m ingestMocks) { + // Prior attempt committed the URI mapping but failed before the request write. + expectResolve(m) + m.uriStore.EXPECT().GetIDByURI(gomock.Any(), testQueue, testURI).Return("request/monorepo/main/3", nil) + m.reqStore.EXPECT().Get(gomock.Any(), "request/monorepo/main/3").Return(entity.Request{}, storage.ErrNotFound) + m.reqStore.EXPECT().Create(gomock.Any(), gomock.Any()).Return(nil) + m.publisher.EXPECT().Publish(gomock.Any(), "process", gomock.Any()).Return(nil) + }, + wantID: "request/monorepo/main/3", + }, + { + name: "dedup race returns winner id and completes", + queue: testQueue, + setup: func(m ingestMocks) { + expectResolve(m) + m.uriStore.EXPECT().GetIDByURI(gomock.Any(), testQueue, testURI).Return("", storage.ErrNotFound) + m.counter.EXPECT().Next(gomock.Any(), "request/"+testQueue).Return(int64(7), nil) + m.uriStore.EXPECT().Create(gomock.Any(), testQueue, testURI, "request/monorepo/main/7").Return(storage.ErrAlreadyExists) + m.uriStore.EXPECT().GetIDByURI(gomock.Any(), testQueue, testURI).Return("request/monorepo/main/3", nil) + m.reqStore.EXPECT().Get(gomock.Any(), "request/monorepo/main/3").Return(entity.Request{ID: "request/monorepo/main/3", State: entity.RequestStateAccepted}, nil) + m.publisher.EXPECT().Publish(gomock.Any(), "process", gomock.Any()).Return(nil) + }, + wantID: "request/monorepo/main/3", + }, + { + name: "empty queue is invalid", + queue: "", + setup: func(m ingestMocks) {}, + wantErr: true, + wantInvalid: true, + }, + { + name: "unknown queue head is invalid", + queue: testQueue, + setup: func(m ingestMocks) { + m.factory.EXPECT().For(sourcecontrol.Config{QueueName: testQueue}).Return(m.sc, nil) + m.sc.EXPECT().Latest(gomock.Any()).Return("", sourcecontrol.ErrNotFound) + }, + wantErr: true, + wantInvalid: true, + }, + { + name: "source control infra error is not invalid", + queue: testQueue, + setup: func(m ingestMocks) { + m.factory.EXPECT().For(sourcecontrol.Config{QueueName: testQueue}).Return(m.sc, nil) + m.sc.EXPECT().Latest(gomock.Any()).Return("", errors.New("sc unavailable")) + }, + wantErr: true, + }, + { + name: "counter error", + queue: testQueue, + setup: func(m ingestMocks) { + expectResolve(m) + m.uriStore.EXPECT().GetIDByURI(gomock.Any(), testQueue, testURI).Return("", storage.ErrNotFound) + m.counter.EXPECT().Next(gomock.Any(), gomock.Any()).Return(int64(0), errors.New("counter unavailable")) + }, + wantErr: true, + }, + { + name: "request store create error", + queue: testQueue, + setup: func(m ingestMocks) { + expectResolve(m) + m.uriStore.EXPECT().GetIDByURI(gomock.Any(), testQueue, testURI).Return("", storage.ErrNotFound) + m.counter.EXPECT().Next(gomock.Any(), gomock.Any()).Return(int64(7), nil) + m.uriStore.EXPECT().Create(gomock.Any(), testQueue, testURI, gomock.Any()).Return(nil) + m.reqStore.EXPECT().Get(gomock.Any(), gomock.Any()).Return(entity.Request{}, storage.ErrNotFound) + m.reqStore.EXPECT().Create(gomock.Any(), gomock.Any()).Return(errors.New("db down")) + }, + wantErr: true, + }, + { + name: "publish error", + queue: testQueue, + setup: func(m ingestMocks) { + expectResolve(m) + m.uriStore.EXPECT().GetIDByURI(gomock.Any(), testQueue, testURI).Return("", storage.ErrNotFound) + m.counter.EXPECT().Next(gomock.Any(), gomock.Any()).Return(int64(7), nil) + m.uriStore.EXPECT().Create(gomock.Any(), testQueue, testURI, gomock.Any()).Return(nil) + m.reqStore.EXPECT().Get(gomock.Any(), gomock.Any()).Return(entity.Request{}, storage.ErrNotFound) + m.reqStore.EXPECT().Create(gomock.Any(), gomock.Any()).Return(nil) + m.publisher.EXPECT().Publish(gomock.Any(), "process", gomock.Any()).Return(errors.New("queue down")) + }, + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctrl := gomock.NewController(t) + c, m := newIngestController(t, ctrl) + tt.setup(m) - c := newIngestController(t, mockCounter) + resp, err := c.Ingest(context.Background(), &pb.IngestRequest{Queue: tt.queue}) - _, err := c.Ingest(context.Background(), &pb.IngestRequest{Queue: "monorepo/main"}) - require.Error(t, err) - assert.False(t, IsInvalidRequest(err)) + if tt.wantErr { + require.Error(t, err) + assert.Equal(t, tt.wantInvalid, IsInvalidRequest(err)) + return + } + require.NoError(t, err) + assert.Equal(t, tt.wantID, resp.Id) + }) + } } diff --git a/stovepipe/controller/process/BUILD.bazel b/stovepipe/controller/process/BUILD.bazel new file mode 100644 index 00000000..8298b0fc --- /dev/null +++ b/stovepipe/controller/process/BUILD.bazel @@ -0,0 +1,38 @@ +load("@rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "process", + srcs = ["process.go"], + importpath = "github.com/uber/submitqueue/stovepipe/controller/process", + visibility = ["//visibility:public"], + deps = [ + "//platform/consumer", + "//platform/errs", + "//platform/metrics", + "//stovepipe/core/messagequeue", + "//stovepipe/extension/storage", + "@com_github_uber_go_tally//:tally", + "@org_uber_go_zap//:zap", + ], +) + +go_test( + name = "process_test", + srcs = ["process_test.go"], + embed = [":process"], + deps = [ + "//platform/base/messagequeue", + "//platform/consumer", + "//platform/errs", + "//platform/extension/messagequeue/mock", + "//stovepipe/core/messagequeue", + "//stovepipe/entity", + "//stovepipe/extension/storage", + "//stovepipe/extension/storage/mock", + "@com_github_stretchr_testify//assert", + "@com_github_stretchr_testify//require", + "@com_github_uber_go_tally//:tally", + "@org_uber_go_mock//gomock", + "@org_uber_go_zap//:zap", + ], +) diff --git a/stovepipe/controller/process/process.go b/stovepipe/controller/process/process.go new file mode 100644 index 00000000..71da8a54 --- /dev/null +++ b/stovepipe/controller/process/process.go @@ -0,0 +1,121 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package process holds the process-stage queue controller. It consumes the +// request ids ingest publishes, reloads the Request from storage, and (in a +// future change) decides the build strategy by asking SourceControl how the new +// head relates to the queue's last-green URI. For now it is a thin consumer that +// reloads and logs the request, establishing the stage and its wiring. +package process + +import ( + "context" + "errors" + "fmt" + + "github.com/uber-go/tally" + "github.com/uber/submitqueue/platform/consumer" + "github.com/uber/submitqueue/platform/errs" + "github.com/uber/submitqueue/platform/metrics" + stovepipemq "github.com/uber/submitqueue/stovepipe/core/messagequeue" + "github.com/uber/submitqueue/stovepipe/extension/storage" + "go.uber.org/zap" +) + +// Controller consumes ProcessRequest messages from the process stage, reloads the +// referenced Request from storage, and logs it. Implements consumer.Controller. +type Controller struct { + logger *zap.SugaredLogger + metricsScope tally.Scope + store storage.Storage + topicKey consumer.TopicKey + consumerGroup string +} + +// Verify Controller implements consumer.Controller interface at compile time. +var _ consumer.Controller = (*Controller)(nil) + +// NewController creates a new process controller. +func NewController( + logger *zap.SugaredLogger, + scope tally.Scope, + store storage.Storage, + topicKey consumer.TopicKey, + consumerGroup string, +) *Controller { + return &Controller{ + logger: logger.Named("process_controller"), + metricsScope: scope.SubScope("process_controller"), + store: store, + topicKey: topicKey, + consumerGroup: consumerGroup, + } +} + +// Process reloads the request referenced by the delivery and logs it. Returns nil +// to ack (success) or an error to nack (retry). A not-yet-visible request is +// retryable: ingest persists and publishes, but a stale read may not see the row +// yet, so redelivery converges. +func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (retErr error) { + const opName = "process" + + op := metrics.Begin(c.metricsScope, opName) + defer func() { op.Complete(retErr) }() + + msg := delivery.Message() + + pr := &stovepipemq.ProcessRequest{} + if err := stovepipemq.Unmarshal(msg.Payload, pr); err != nil { + metrics.NamedCounter(c.metricsScope, opName, "deserialize_errors", 1) + // Non-retryable: a malformed message will never succeed regardless of retries. + return fmt.Errorf("failed to deserialize process request: %w", err) + } + + request, err := c.store.GetRequestStore().Get(ctx, pr.Id) + if err != nil { + metrics.NamedCounter(c.metricsScope, opName, "storage_errors", 1) + if errors.Is(err, storage.ErrNotFound) { + // Retryable: the request row may not be visible yet; redelivery converges. + return errs.NewRetryableError(fmt.Errorf("request %s not found yet: %w", pr.Id, err)) + } + return fmt.Errorf("failed to load request %s: %w", pr.Id, err) + } + + c.logger.Infow("processing request", + "request_id", request.ID, + "queue", request.Queue, + "uri", request.URI, + "state", string(request.State), + "version", request.Version, + "attempt", delivery.Attempt(), + "partition_key", msg.PartitionKey, + ) + + return nil +} + +// Name returns the controller name for logging and metrics. +func (c *Controller) Name() string { + return "process" +} + +// TopicKey returns the topic key this controller subscribes to. +func (c *Controller) TopicKey() consumer.TopicKey { + return c.topicKey +} + +// ConsumerGroup returns the consumer group for offset tracking. +func (c *Controller) ConsumerGroup() string { + return c.consumerGroup +} diff --git a/stovepipe/controller/process/process_test.go b/stovepipe/controller/process/process_test.go new file mode 100644 index 00000000..e9083bbc --- /dev/null +++ b/stovepipe/controller/process/process_test.go @@ -0,0 +1,99 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package process + +import ( + "context" + "errors" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/uber-go/tally" + entityqueue "github.com/uber/submitqueue/platform/base/messagequeue" + "github.com/uber/submitqueue/platform/consumer" + "github.com/uber/submitqueue/platform/errs" + queuemock "github.com/uber/submitqueue/platform/extension/messagequeue/mock" + stovepipemq "github.com/uber/submitqueue/stovepipe/core/messagequeue" + "github.com/uber/submitqueue/stovepipe/entity" + "github.com/uber/submitqueue/stovepipe/extension/storage" + storagemock "github.com/uber/submitqueue/stovepipe/extension/storage/mock" + "go.uber.org/mock/gomock" + "go.uber.org/zap" +) + +const testID = "request/monorepo/main/7" + +func newController(t *testing.T, ctrl *gomock.Controller) (*Controller, *storagemock.MockRequestStore) { + t.Helper() + reqStore := storagemock.NewMockRequestStore(ctrl) + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetRequestStore().Return(reqStore).AnyTimes() + c := NewController(zap.NewNop().Sugar(), tally.NewTestScope("test", nil), store, stovepipemq.TopicKeyProcess, "stovepipe-process") + return c, reqStore +} + +// delivery wraps raw payload bytes in a MockDelivery (which satisfies consumer.Delivery). +func delivery(t *testing.T, ctrl *gomock.Controller, payload []byte) consumer.Delivery { + t.Helper() + d := queuemock.NewMockDelivery(ctrl) + d.EXPECT().Message().Return(entityqueue.NewMessage(testID, payload, "monorepo/main", nil)).AnyTimes() + d.EXPECT().Attempt().Return(1).AnyTimes() + return d +} + +func processPayload(t *testing.T, id string) []byte { + t.Helper() + b, err := stovepipemq.Marshal(&stovepipemq.ProcessRequest{Id: id}) + require.NoError(t, err) + return b +} + +func TestProcess_Success(t *testing.T) { + ctrl := gomock.NewController(t) + c, reqStore := newController(t, ctrl) + reqStore.EXPECT().Get(gomock.Any(), testID).Return(entity.Request{ID: testID, Queue: "monorepo/main", URI: "git://x", State: entity.RequestStateAccepted, Version: 1}, nil) + + require.NoError(t, c.Process(context.Background(), delivery(t, ctrl, processPayload(t, testID)))) +} + +func TestProcess_NotFoundIsRetryable(t *testing.T) { + ctrl := gomock.NewController(t) + c, reqStore := newController(t, ctrl) + reqStore.EXPECT().Get(gomock.Any(), testID).Return(entity.Request{}, storage.ErrNotFound) + + err := c.Process(context.Background(), delivery(t, ctrl, processPayload(t, testID))) + require.Error(t, err) + assert.True(t, errs.IsRetryable(err), "a not-yet-visible request must be retryable") +} + +func TestProcess_StorageErrorNotRetryable(t *testing.T) { + ctrl := gomock.NewController(t) + c, reqStore := newController(t, ctrl) + reqStore.EXPECT().Get(gomock.Any(), testID).Return(entity.Request{}, errors.New("db down")) + + err := c.Process(context.Background(), delivery(t, ctrl, processPayload(t, testID))) + require.Error(t, err) + assert.False(t, errs.IsRetryable(err)) +} + +func TestProcess_MalformedPayload(t *testing.T) { + ctrl := gomock.NewController(t) + c, _ := newController(t, ctrl) + + err := c.Process(context.Background(), delivery(t, ctrl, []byte("not-json"))) + require.Error(t, err) + assert.False(t, errs.IsRetryable(err)) +} diff --git a/stovepipe/core/messagequeue/BUILD.bazel b/stovepipe/core/messagequeue/BUILD.bazel new file mode 100644 index 00000000..f52c5df2 --- /dev/null +++ b/stovepipe/core/messagequeue/BUILD.bazel @@ -0,0 +1,29 @@ +load("@rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "messagequeue", + srcs = [ + "messagequeue.go", + "topics.go", + ], + importpath = "github.com/uber/submitqueue/stovepipe/core/messagequeue", + visibility = ["//visibility:public"], + deps = [ + "//api/base/messagequeue/protopb", # keep + "//platform/consumer", + "//stovepipe/core/messagequeue/protopb", # keep + "@org_golang_google_protobuf//encoding/protojson", + "@org_golang_google_protobuf//proto", + ], +) + +go_test( + name = "messagequeue_test", + srcs = ["process_test.go"], + embed = [":messagequeue"], + deps = [ + "@com_github_stretchr_testify//assert", + "@com_github_stretchr_testify//require", + "@org_golang_google_protobuf//proto", + ], +) diff --git a/stovepipe/core/messagequeue/README.md b/stovepipe/core/messagequeue/README.md new file mode 100644 index 00000000..2a4b32a3 --- /dev/null +++ b/stovepipe/core/messagequeue/README.md @@ -0,0 +1,11 @@ +# Stovepipe internal message-queue contract + +Wire payloads for the queues internal to the Stovepipe pipeline. It is **internal** — used only within the Stovepipe domain — so it lives under `stovepipe/core` rather than `api/` (Bazel visibility keeps it domain-scoped). + +Payloads are defined in proto3 (`proto/`, generated into `protopb/`) and serialized as **protobuf JSON** (protojson), so the MySQL-backed queue keeps storing self-describing JSON. The contract package adds only generic glue — `Marshal`/`Unmarshal` and the `TopicKeys` reflection lookup — and owns the `TopicKey` constants for the stages it carries. Each payload declares the topic key(s) that carry it via the `topic_keys` proto option (defined in `api/base/messagequeue`); a contract test round-trips every payload and asserts each topic key is bound to exactly one message. + +## Stages + +- **process** (`TopicKeyProcess`, `ProcessRequest`) — ingest publishes the minted request id here once it accepts a new head; the process controller reloads the `Request` from storage and decides the build strategy. Only the id travels: producer and consumer share the store, so messages stay small and redelivery is idempotent. + +See [doc/rfc/messagequeue-contract.md](../../../doc/rfc/messagequeue-contract.md) for the contract conventions and `api/runway/messagequeue` for the external reference example. diff --git a/stovepipe/core/messagequeue/messagequeue.go b/stovepipe/core/messagequeue/messagequeue.go new file mode 100644 index 00000000..f8e03fb6 --- /dev/null +++ b/stovepipe/core/messagequeue/messagequeue.go @@ -0,0 +1,81 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package messagequeue holds Stovepipe's internal message-queue contract: the +// wire payloads for the pipeline queues Stovepipe owns, defined by the proto +// files in proto/ and generated into protopb/. The proto is the language-neutral +// authority; the generated Go types in protopb are the binding for Go callers. +// +// It is internal — used only within the Stovepipe domain — so it lives under +// stovepipe/core rather than api/. The message types are generated into protopb; +// this package adds only generic protojson glue (Marshal/Unmarshal) and the +// topic-key reflection lookup (TopicKeys), so there is no per-message +// serialization code. Payloads are serialized as protobuf JSON, not binary, so +// the MySQL-backed queue keeps storing self-describing JSON. The topic key that +// carries each payload is declared on the message itself via the topic_keys +// proto option (see api/base/messagequeue). +package messagequeue + +import ( + "google.golang.org/protobuf/encoding/protojson" + "google.golang.org/protobuf/proto" + + basemqpb "github.com/uber/submitqueue/api/base/messagequeue/protopb" + "github.com/uber/submitqueue/stovepipe/core/messagequeue/protopb" +) + +// Wire payload types. These alias the generated protobuf bindings so callers +// reference the contract through this curated package rather than protopb. +type ( + // ProcessRequest is the payload ingest publishes to the process stage: the + // minted request id to validate. + ProcessRequest = protopb.ProcessRequest +) + +// marshalOpts keeps the JSON field names identical to the proto field names +// (snake_case), so the wire shape matches the declared contract rather than +// protojson's default lowerCamelCase. Zero-valued fields are omitted. +var marshalOpts = protojson.MarshalOptions{UseProtoNames: true} + +// unmarshalOpts tolerates unknown fields so an additive contract change (a new +// field a producer sends but this consumer does not yet know) is ignored rather +// than rejected. +var unmarshalOpts = protojson.UnmarshalOptions{DiscardUnknown: true} + +// Marshal serializes any contract message to protojson bytes for the queue +// payload, keeping the proto field names (snake_case) on the wire. +func Marshal(m proto.Message) ([]byte, error) { + return marshalOpts.Marshal(m) +} + +// Unmarshal deserializes protojson bytes into the contract message m, tolerating +// unknown fields so an additive contract change is ignored rather than rejected. +func Unmarshal[T proto.Message](b []byte, m T) error { + return unmarshalOpts.Unmarshal(b, m) +} + +// TopicKeys returns the stable logical topic keys bound to a message via the +// topic_keys proto option — not concrete wire names; a caller maps each key to +// its backend's topic name. Returns nil for a message that declares no keys. +func TopicKeys(m proto.Message) []string { + opts := m.ProtoReflect().Descriptor().Options() + if opts == nil { + return nil + } + keys, ok := proto.GetExtension(opts, basemqpb.E_TopicKeys).([]string) + if !ok { + return nil + } + return keys +} diff --git a/stovepipe/core/messagequeue/process_test.go b/stovepipe/core/messagequeue/process_test.go new file mode 100644 index 00000000..4c9345f3 --- /dev/null +++ b/stovepipe/core/messagequeue/process_test.go @@ -0,0 +1,70 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package messagequeue + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/protobuf/proto" +) + +func TestProcessRequestRoundTrip(t *testing.T) { + req := &ProcessRequest{Id: "request/monorepo/main/42"} + + data, err := Marshal(req) + require.NoError(t, err) + + got := &ProcessRequest{} + require.NoError(t, Unmarshal(data, got)) + assert.True(t, proto.Equal(req, got), "round-tripped ProcessRequest should equal the original") +} + +// TestWireFormat locks the protojson encoding decision the contract relies on: +// snake_case field names (UseProtoNames). +func TestWireFormat(t *testing.T) { + data, err := Marshal(&ProcessRequest{Id: "request/monorepo/main/42"}) + require.NoError(t, err) + + assert.Contains(t, string(data), `"id"`, "fields must serialize as snake_case") +} + +// TestTopicKeysBindEveryTopicKey is the topic-binding drift guard: every +// Stovepipe topic key is carried by exactly one message's topic_keys option, and +// no topic_keys option names an unknown key. +func TestTopicKeysBindEveryTopicKey(t *testing.T) { + bound := map[string]int{} + for _, m := range []proto.Message{&ProcessRequest{}} { + keys := TopicKeys(m) + require.NotEmpty(t, keys, "message must declare a non-empty topic_keys option") + for _, key := range keys { + bound[key]++ + } + } + + keys := []TopicKey{ + TopicKeyProcess, + } + + valid := map[string]bool{} + for _, k := range keys { + valid[k.String()] = true + assert.Equalf(t, 1, bound[k.String()], "topic key %q must be bound to exactly one message via the topic_keys option", k) + } + for key := range bound { + assert.Truef(t, valid[key], "topic_keys option names unknown key %q", key) + } +} diff --git a/stovepipe/core/messagequeue/proto/BUILD.bazel b/stovepipe/core/messagequeue/proto/BUILD.bazel new file mode 100644 index 00000000..047be015 --- /dev/null +++ b/stovepipe/core/messagequeue/proto/BUILD.bazel @@ -0,0 +1,47 @@ +load("@rules_go//go:def.bzl", "go_library") +load("@rules_go//proto:def.bzl", "go_proto_library") +load("@rules_proto//proto:defs.bzl", "proto_library") + +exports_files( + ["process.proto"], + visibility = ["//tool/proto:__pkg__"], +) + +proto_library( + name = "processpb_proto", + srcs = ["process.proto"], + visibility = ["//visibility:public"], + deps = [ + "//api/base/messagequeue/proto:messagequeuepb_proto", + ], +) + +# keep +go_proto_library( + name = "processpb_go_proto", + compilers = [ + "@rules_go//proto:go_proto", + "@rules_go//proto:go_grpc_v2", + ], + importpath = "github.com/uber/submitqueue/stovepipe/core/messagequeue/proto", + proto = ":processpb_proto", + visibility = ["//visibility:public"], + # keep + deps = [ + "//api/base/messagequeue/proto:messagequeuepb_go_proto", + ], +) + +go_library( + name = "proto", + embed = [":processpb_go_proto"], + importpath = "github.com/uber/submitqueue/stovepipe/core/messagequeue/proto", + visibility = ["//visibility:public"], +) + +go_library( + name = "protopb", + embed = [":processpb_go_proto"], + importpath = "github.com/uber/submitqueue/stovepipe/core/messagequeue/protopb", + visibility = ["//visibility:public"], +) diff --git a/stovepipe/core/messagequeue/proto/process.proto b/stovepipe/core/messagequeue/proto/process.proto new file mode 100644 index 00000000..999e149c --- /dev/null +++ b/stovepipe/core/messagequeue/proto/process.proto @@ -0,0 +1,35 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +package uber.stovepipe.messagequeue; + +import "api/base/messagequeue/proto/messagequeue.proto"; + +option go_package = "github.com/uber/submitqueue/stovepipe/core/messagequeue/protopb"; +option java_multiple_files = true; +option java_outer_classname = "ProcessProto"; +option java_package = "com.uber.submitqueue.stovepipe.messagequeue"; + +// ProcessRequest is the payload ingest publishes to the process stage once it has +// accepted a new head: only the minted request id travels on the queue. process +// reloads the full Request from storage by this id (producer and consumer share +// the store, so the id is enough and redelivery stays idempotent). +message ProcessRequest { + option (uber.base.messagequeue.topic_keys) = "process"; + + // id is the minted request id to process. Format: "request//". + string id = 1; +} diff --git a/stovepipe/core/messagequeue/protopb/BUILD.bazel b/stovepipe/core/messagequeue/protopb/BUILD.bazel new file mode 100644 index 00000000..4ae8bd74 --- /dev/null +++ b/stovepipe/core/messagequeue/protopb/BUILD.bazel @@ -0,0 +1,13 @@ +load("@rules_go//go:def.bzl", "go_library") + +go_library( + name = "protopb", + srcs = ["process.pb.go"], + importpath = "github.com/uber/submitqueue/stovepipe/core/messagequeue/protopb", + visibility = ["//visibility:public"], + deps = [ + "//api/base/messagequeue/protopb", # keep + "@org_golang_google_protobuf//reflect/protoreflect", + "@org_golang_google_protobuf//runtime/protoimpl", + ], +) diff --git a/stovepipe/core/messagequeue/protopb/process.pb.go b/stovepipe/core/messagequeue/protopb/process.pb.go new file mode 100644 index 00000000..5cffa3c2 --- /dev/null +++ b/stovepipe/core/messagequeue/protopb/process.pb.go @@ -0,0 +1,144 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.36.10 +// protoc v5.29.3 +// source: process.proto + +package protopb + +import ( + reflect "reflect" + sync "sync" + unsafe "unsafe" + + _ "github.com/uber/submitqueue/api/base/messagequeue/protopb" + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +// ProcessRequest is the payload ingest publishes to the process stage once it has +// accepted a new head: only the minted request id travels on the queue. process +// reloads the full Request from storage by this id (producer and consumer share +// the store, so the id is enough and redelivery stays idempotent). +type ProcessRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + // id is the minted request id to process. Format: "request//". + Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *ProcessRequest) Reset() { + *x = ProcessRequest{} + mi := &file_process_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *ProcessRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ProcessRequest) ProtoMessage() {} + +func (x *ProcessRequest) ProtoReflect() protoreflect.Message { + mi := &file_process_proto_msgTypes[0] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ProcessRequest.ProtoReflect.Descriptor instead. +func (*ProcessRequest) Descriptor() ([]byte, []int) { + return file_process_proto_rawDescGZIP(), []int{0} +} + +func (x *ProcessRequest) GetId() string { + if x != nil { + return x.Id + } + return "" +} + +var File_process_proto protoreflect.FileDescriptor + +const file_process_proto_rawDesc = "" + + "\n" + + "\rprocess.proto\x12\x1buber.stovepipe.messagequeue\x1a.api/base/messagequeue/proto/messagequeue.proto\"-\n" + + "\x0eProcessRequest\x12\x0e\n" + + "\x02id\x18\x01 \x01(\tR\x02id:\v\x8a\xb5\x18\aprocessB~\n" + + "+com.uber.submitqueue.stovepipe.messagequeueB\fProcessProtoP\x01Z?github.com/uber/submitqueue/stovepipe/core/messagequeue/protopbb\x06proto3" + +var ( + file_process_proto_rawDescOnce sync.Once + file_process_proto_rawDescData []byte +) + +func file_process_proto_rawDescGZIP() []byte { + file_process_proto_rawDescOnce.Do(func() { + file_process_proto_rawDescData = protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_process_proto_rawDesc), len(file_process_proto_rawDesc))) + }) + return file_process_proto_rawDescData +} + +var file_process_proto_msgTypes = make([]protoimpl.MessageInfo, 1) +var file_process_proto_goTypes = []any{ + (*ProcessRequest)(nil), // 0: uber.stovepipe.messagequeue.ProcessRequest +} +var file_process_proto_depIdxs = []int32{ + 0, // [0:0] is the sub-list for method output_type + 0, // [0:0] is the sub-list for method input_type + 0, // [0:0] is the sub-list for extension type_name + 0, // [0:0] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name +} + +func init() { file_process_proto_init() } +func file_process_proto_init() { + if File_process_proto != nil { + return + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: unsafe.Slice(unsafe.StringData(file_process_proto_rawDesc), len(file_process_proto_rawDesc)), + NumEnums: 0, + NumMessages: 1, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_process_proto_goTypes, + DependencyIndexes: file_process_proto_depIdxs, + MessageInfos: file_process_proto_msgTypes, + }.Build() + File_process_proto = out.File + file_process_proto_goTypes = nil + file_process_proto_depIdxs = nil +} diff --git a/stovepipe/core/messagequeue/topics.go b/stovepipe/core/messagequeue/topics.go new file mode 100644 index 00000000..f4a0109c --- /dev/null +++ b/stovepipe/core/messagequeue/topics.go @@ -0,0 +1,30 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package messagequeue + +import "github.com/uber/submitqueue/platform/consumer" + +// TopicKey is the typed identifier used to look up a queue backend, topic name, +// and subscription config in a consumer.TopicRegistry. The constants below are +// the logical topic keys for Stovepipe's internal pipeline stages; they are the +// same strings each message lists in its topic_keys option. +type TopicKey = consumer.TopicKey + +const ( + // TopicKeyProcess carries newly accepted requests from ingest to the process + // stage. ingest publishes a ProcessRequest (the request id) here; the process + // controller consumes it, reloads the Request, and decides the build strategy. + TopicKeyProcess TopicKey = "process" +) diff --git a/test/integration/stovepipe/BUILD.bazel b/test/integration/stovepipe/BUILD.bazel index f91f8042..8f8acfef 100644 --- a/test/integration/stovepipe/BUILD.bazel +++ b/test/integration/stovepipe/BUILD.bazel @@ -7,6 +7,8 @@ go_test( "//:MODULE.bazel", "//:go.mod", "//example/stovepipe:docker-compose.yml", + "//platform/extension/messagequeue/mysql/schema", + "//stovepipe/extension/storage/mysql/schema", ], tags = [ "external", @@ -15,6 +17,7 @@ go_test( deps = [ "//api/stovepipe/protopb", "//test/testutil", + "@com_github_go_sql_driver_mysql//:mysql", "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", "@com_github_stretchr_testify//suite", diff --git a/test/integration/stovepipe/suite_test.go b/test/integration/stovepipe/suite_test.go index 871fd78a..79f9edc3 100644 --- a/test/integration/stovepipe/suite_test.go +++ b/test/integration/stovepipe/suite_test.go @@ -18,8 +18,9 @@ package stovepipe // // These tests use compose from example/stovepipe/docker-compose.yml and require // a pre-built Linux binary (make integration-test runs //test/integration/... -// and builds all Linux binaries via build-all-linux). Stovepipe is currently a -// Ping-only service with no storage or queue dependencies. +// and builds all Linux binaries via build-all-linux). The stack runs the +// Stovepipe gRPC service plus a storage MySQL (request, request_uri) and a queue +// MySQL (process stage). // // Run with: // make integration-test @@ -28,9 +29,11 @@ package stovepipe import ( "context" + "database/sql" "path/filepath" "testing" + _ "github.com/go-sql-driver/mysql" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" @@ -41,10 +44,12 @@ import ( type StovepipeIntegrationSuite struct { suite.Suite - ctx context.Context - log *testutil.TestLogger - stack *testutil.ComposeStack - client pb.StovepipeClient + ctx context.Context + log *testutil.TestLogger + stack *testutil.ComposeStack + client pb.StovepipeClient + db *sql.DB // storage database (request, request_uri) + queueDB *sql.DB // queue database } func TestStovepipeIntegration(t *testing.T) { @@ -67,6 +72,17 @@ func (s *StovepipeIntegrationSuite) SetupSuite() { err := s.stack.Up() require.NoError(t, err, "failed to start compose stack") + s.db, err = s.stack.ConnectMySQLService("mysql-app") + require.NoError(t, err, "failed to connect to storage MySQL") + + s.queueDB, err = s.stack.ConnectMySQLService("mysql-queue") + require.NoError(t, err, "failed to connect to queue MySQL") + + // Apply schemas after the stack is up; the service connects lazily and the + // consumer retries, so the boot ordering is tolerated. + testutil.ApplySchema(t, s.log, s.db, testutil.SchemaDir("stovepipe/extension/storage/mysql/schema")) + testutil.ApplySchema(t, s.log, s.queueDB, testutil.SchemaDir("platform/extension/messagequeue/mysql/schema")) + var conn *grpc.ClientConn conn, err = s.stack.ConnectGRPC("stovepipe-service", 8080) require.NoError(t, err, "failed to connect to stovepipe service") @@ -86,3 +102,46 @@ func (s *StovepipeIntegrationSuite) TestPingAPI() { assert.NotEmpty(t, resp.Message) assert.NotZero(t, resp.Timestamp) } + +// TestIngestAPI exercises the full ingest path: the controller resolves the head +// URI via the (fake) SourceControl, persists the Request and its (queue, URI) +// mapping, and publishes the request id to the process stage. A second ingest of +// the same queue resolves the same head and dedups to the same id. +func (s *StovepipeIntegrationSuite) TestIngestAPI() { + t := s.T() + + const queue = "monorepo/main" + + resp, err := s.client.Ingest(s.ctx, &pb.IngestRequest{Queue: queue}) + require.NoError(t, err, "Ingest failed") + require.NotEmpty(t, resp.Id, "minted request id should not be empty") + id := resp.Id + + // Request persisted. + var reqCount int + require.NoError(t, s.db.QueryRow("SELECT COUNT(*) FROM request WHERE id = ?", id).Scan(&reqCount)) + assert.Equal(t, 1, reqCount, "request row should be persisted") + + // (queue, URI) mapping persisted and points at the minted id. + var mappedID string + require.NoError(t, s.db.QueryRow("SELECT request_id FROM request_uri WHERE queue = ?", queue).Scan(&mappedID)) + assert.Equal(t, id, mappedID, "URI mapping should point at the minted request id") + + // Message published to the process topic. + var msgCount int + require.NoError(t, s.queueDB.QueryRow("SELECT COUNT(*) FROM queue_messages WHERE id = ?", id).Scan(&msgCount)) + assert.Equal(t, 1, msgCount, "should have published one process message") + + // Re-ingesting the same queue resolves the same head URI and dedups. + resp2, err := s.client.Ingest(s.ctx, &pb.IngestRequest{Queue: queue}) + require.NoError(t, err, "second Ingest failed") + assert.Equal(t, id, resp2.Id, "re-ingest of the same head should dedup to the same id") +} + +// TestIngestEmptyQueue verifies the request-validation error surfaces over gRPC. +func (s *StovepipeIntegrationSuite) TestIngestEmptyQueue() { + t := s.T() + + _, err := s.client.Ingest(s.ctx, &pb.IngestRequest{Queue: ""}) + require.Error(t, err, "Ingest with empty queue should fail") +} diff --git a/tool/proto/BUILD.bazel b/tool/proto/BUILD.bazel index 5719d284..f710f7c0 100644 --- a/tool/proto/BUILD.bazel +++ b/tool/proto/BUILD.bazel @@ -63,6 +63,17 @@ go_proto_generated_files( out_dir = "api_stovepipe", ) +# Stovepipe internal queue contract (message-only, no RPC service). +go_proto_generated_files( + name = "stovepipe_core_messagequeue", + srcs = ["//stovepipe/core/messagequeue/proto:process.proto"], + gen_services = False, + imports = [ + "//api/base/messagequeue/proto:messagequeue.proto", + ], + out_dir = "stovepipe_core_messagequeue", +) + filegroup( name = "generated", srcs = [ @@ -74,5 +85,6 @@ filegroup( ":api_stovepipe", ":api_submitqueue_gateway", ":api_submitqueue_orchestrator", + ":stovepipe_core_messagequeue", ], )