diff --git a/src/main/java/io/confluent/flink/examples/table/Example_08_IntegrationAndDeployment.java b/src/main/java/io/confluent/flink/examples/table/Example_08_IntegrationAndDeployment.java
index e71c1cc..5e2d8dc 100644
--- a/src/main/java/io/confluent/flink/examples/table/Example_08_IntegrationAndDeployment.java
+++ b/src/main/java/io/confluent/flink/examples/table/Example_08_IntegrationAndDeployment.java
@@ -4,218 +4,106 @@
import io.confluent.flink.plugin.ConfluentTools;
import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
-import org.apache.flink.types.Row;
-import org.apache.flink.util.CloseableIterator;
-
-import java.util.List;
-import java.util.UUID;
-import java.util.stream.Collectors;
+import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.lit;
-import static org.apache.flink.table.api.Expressions.withAllColumns;
/**
- * An example that illustrates how to embed a table program into a CI/CD pipeline for continuous
- * testing and rollout.
- *
- * Because we cannot rely on production data in this example, the program sets up some
- * Kafka-backed tables with data during the {@code setup} phase.
+ * An example that illustrates how to structure, test, and deploy a table program for production use
+ * in a CI/CD pipeline.
*
- *
Afterward, the program can operate in two modes: one for integration testing ({@code test}
- * phase) and one for deployment ({@code deploy} phase).
+ *
The example separates two concerns that often end up entangled:
*
- *
A CI/CD workflow could execute the following:
+ *
+ * - The pipeline logic in {@link VendorsPerBrand} is plain Table API code without any
+ * Confluent-specific dependencies. It receives its input table as a parameter instead of
+ * resolving it from a catalog. This makes the logic executable on Apache Flink in local unit
+ * tests, where the input is mocked with {@code fromValues()} (see {@code
+ * Example_08_IntegrationAndDeploymentTest}), as well as on Confluent Cloud for Apache Flink,
+ * where the input is a Kafka-backed table (see {@code
+ * Example_08_IntegrationAndDeploymentIT}).
+ *
- The {@link #main(String[])} method is the deployment entrypoint. It wires the pipeline to
+ * Confluent Cloud and submits it as a long-running background statement.
+ *
*
- *
- * export EXAMPLE_JAR=./target/flink-table-api-java-examples-1.0.jar
- * export EXAMPLE_CLASS=io.confluent.flink.examples.table.Example_08_IntegrationAndDeployment
- * java -jar $EXAMPLE_JAR $EXAMPLE_CLASS setup
- * java -jar $EXAMPLE_JAR $EXAMPLE_CLASS test
- * java -jar $EXAMPLE_JAR $EXAMPLE_CLASS deploy
- *
+ * The program is configured with {@link ConfluentSettings#fromArgs(String[])}, which reads
+ * configuration from command-line arguments, with environment variables as a fallback. This also
+ * enables the plugin's built-in CI/CD lifecycle actions: when the JAR is run with an action as the
+ * first argument ({@code list}, {@code describe}, {@code resume}, {@code stop}, or {@code delete}),
+ * the plugin executes the action and exits before the deployment logic runs, so the same JAR both
+ * deploys and manages (see {@code .github/workflows-examples/manage.yml}).
*
- *
NOTE: This example requires write access to a Kafka cluster. Fill out the given variables
- * below with target catalog/database if this is fine for you.
+ *
The statement name and application name are deployment configuration, not source constants:
+ * the pipeline provides them via {@code --statement-name} / {@code --application-name}, which keeps
+ * a single source of truth for both deploy and management and is required for the lifecycle actions
+ * (they read the name at startup, before {@code main()} runs). A program that submits several
+ * statements instead names each one in code via {@link
+ * ConfluentTools#setStatementName(TableEnvironment, String)}.
*
- *
ALSO NOTE: The example submits an unbounded background statement. Make sure to stop the
- * statement in the Web UI afterward to clean up resources.
+ *
Re-running the deployment with unchanged code is idempotent. When the pipeline changed, pass
+ * {@code --on-conflict replace} to replace the existing statement; see the README's CI/CD section
+ * for what that means for stateful pipelines. The README also covers configuration, environment
+ * promotion, and the full set of workflow steps.
*
- *
The complete CI/CD workflow performs the following steps:
+ *
NOTE: This example requires write access to a Kafka cluster, selected with the {@code
+ * sql.current-catalog} (environment name) and {@code sql.current-database} (Kafka cluster name)
+ * configuration options.
*
- *
- * - Create Kafka table 'ProductsMock' and 'VendorsPerBrand'.
- *
- Fill Kafka table 'ProductsMock' with data from marketplace examples table 'products'.
- *
- Test the given SQL on a subset of data in 'ProductsMock' with the help of dynamic options.
- *
- Deploy an unbounded version of the tested SQL that write into 'VendorsPerBrand'.
- *
+ * ALSO NOTE: The example submits an unbounded background statement. Use the lifecycle actions
+ * (see {@code .github/workflows-examples/manage.yml}) or the Web UI to stop and delete the
+ * statement afterward to clean up resources.
*/
public class Example_08_IntegrationAndDeployment {
- // Fill this with an environment you have write access to
- static final String TARGET_CATALOG = "";
-
- // Fill this with a Kafka cluster you have write access to
- static final String TARGET_DATABASE = "";
-
- // Fill this with names of the Kafka Topics you want to create
- static final String SOURCE_TABLE = "ProductsMock";
+ // Name of the table that stores the results
static final String TARGET_TABLE = "VendorsPerBrand";
- // The following SQL will be tested on a finite subset of data before
- // it gets deployed to production.
- // In production, it will run on unbounded input.
- // The '%s' parameterizes the SQL for testing.
- static final String SQL =
- "SELECT brand, COUNT(*) AS vendors FROM ProductsMock %s GROUP BY brand";
-
- // All logic is defined in a main() method. It can run both in an IDE or CI/CD system.
- public static void main(String[] args) throws Exception {
- if (args.length == 0) {
- throw new IllegalArgumentException(
- "No mode specified. Possible values are 'setup', 'test', or 'deploy'.");
- }
-
- EnvironmentSettings settings = ConfluentSettings.fromResource("/cloud.properties");
- TableEnvironment env = TableEnvironment.create(settings);
- env.useCatalog(TARGET_CATALOG);
- env.useDatabase(TARGET_DATABASE);
-
- String mode = args[0];
- switch (mode) {
- case "setup":
- setupProgram(env);
- break;
- case "test":
- testProgram(env);
- break;
- case "deploy":
- deployProgram(env);
- break;
- default:
- throw new IllegalArgumentException("Unknown mode: " + mode);
+ /**
+ * The pipeline logic under test: counts the number of vendors per brand.
+ *
+ *
This class must not reference any {@code io.confluent.flink.plugin} classes so that unit
+ * tests can run it on Apache Flink without the plugin on the classpath.
+ */
+ public static class VendorsPerBrand {
+ public static Table buildPipeline(Table products) {
+ return products.groupBy($("brand")).select($("brand"), lit(1).count().as("vendors"));
}
}
- // --------------------------------------------------------------------------------------------
- // Setup Phase
- // --------------------------------------------------------------------------------------------
-
- private static void setupProgram(TableEnvironment env) throws Exception {
- System.out.println("Running setup...");
+ // The main() method performs the deployment, unless an action argument is present, in which
+ // case ConfluentSettings.fromArgs(...) below executes that action and exits before the rest
+ // of this method runs.
+ public static void main(String[] args) {
+ // All configuration comes from the deployment via fromArgs (with environment variables as
+ // a fallback): the connection settings, the statement and application names, and the
+ // target catalog and database (set with sql.current-catalog / sql.current-database). In
+ // GitHub Actions these map to repository or environment secrets; see the README.
+ EnvironmentSettings settings = ConfluentSettings.fromArgs(args);
+ TableEnvironment env = TableEnvironment.create(settings);
- System.out.println("Creating table..." + SOURCE_TABLE);
- // Create a mock table that has exactly the same schema as the example `products` table.
- // The LIKE clause is very convenient for this task which is why we use SQL here.
- // Since we use little data, a bucket of 1 is important to satisfy the `scan.bounded.mode`
- // during testing.
+ System.out.println("Creating table... " + TARGET_TABLE);
+ // The pipeline owns its output table and creates it on the first deployment.
env.executeSql(
String.format(
"CREATE TABLE IF NOT EXISTS `%s`\n"
- + "DISTRIBUTED INTO 1 BUCKETS\n"
- + "LIKE `examples`.`marketplace`.`products` (EXCLUDING OPTIONS)",
- SOURCE_TABLE));
-
- System.out.println("Start filling table...");
- // Let Flink copy generated data into the mock table. Note that the statement is unbounded
- // and submitted as a background statement by default.
- TableResult pipelineResult =
- env.from("`examples`.`marketplace`.`products`")
- .select(withAllColumns())
- .insertInto(SOURCE_TABLE)
- .execute();
-
- System.out.println("Waiting for at least 200 elements in table...");
- // We start a second Flink statement for monitoring how the copying progresses
- TableResult countResult = env.from(SOURCE_TABLE).select(lit(1).count()).as("c").execute();
- // This waits for the condition to be met:
- try (CloseableIterator iterator = countResult.collect()) {
- while (iterator.hasNext()) {
- Row row = iterator.next();
- long count = row.getFieldAs("c");
- if (count >= 200L) {
- System.out.println("200 elements reached. Stopping...");
- break;
- }
- }
- }
-
- // By using a closable iterator, the foreground statement will be stopped automatically when
- // the iterator is closed. But the background statement still needs a manual stop.
- ConfluentTools.stopStatement(pipelineResult);
-
- System.out.println("Creating table..." + TARGET_TABLE);
- // Create a table for storing the results after deployment.
- env.executeSql(
- String.format(
- "CREATE TABLE IF NOT EXISTS `%s` \n"
+ "(brand STRING, vendors BIGINT, PRIMARY KEY(brand) NOT ENFORCED)\n"
+ "DISTRIBUTED INTO 1 BUCKETS",
TARGET_TABLE));
- }
-
- // --------------------------------------------------------------------------------------------
- // Test Phase
- // --------------------------------------------------------------------------------------------
- private static void testProgram(TableEnvironment env) {
- System.out.println("Running test...");
-
- // Dynamic options allow influencing parts of a table scan. In this case, they define a
- // range (from start offset '0' to end offset '100') how to read from Kafka. Effectively,
- // they make the table bounded. If all tables are finite, the statement can terminate.
- // This allows us to run checks on the result.
- String dynamicOptions =
- "/*+ OPTIONS(\n"
- + "'scan.startup.mode' = 'specific-offsets',\n"
- + "'scan.startup.specific-offsets' = 'partition: 0, offset: 0',\n"
- + "'scan.bounded.mode' = 'specific-offsets',\n"
- + "'scan.bounded.specific-offsets' = 'partition: 0, offset: 100'\n"
- + ") */";
-
- System.out.println("Requesting test data...");
- TableResult result = env.executeSql(String.format(SQL, dynamicOptions));
- List rows = ConfluentTools.collectMaterialized(result);
+ System.out.println("Deploying statement...");
+ // The same pipeline logic that was tested locally and against Confluent Cloud now runs
+ // unbounded on the continuously generated rows of the examples catalog.
+ Table products = env.from("`examples`.`marketplace`.`products`");
+ TableResult result =
+ VendorsPerBrand.buildPipeline(products).insertInto(TARGET_TABLE).execute();
+ // Print the final submitted name (application prefix included) for use with the lifecycle
+ // actions. If no name was configured, the plugin generates one, which is not addressable
+ // for later management; CI/CD deployments should always pass --statement-name.
System.out.println(
- "Test data:\n"
- + rows.stream().map(Row::toString).collect(Collectors.joining("\n")));
-
- // Use the testing framework of your choice and add checks to verify the
- // correctness of the test data
- boolean testSuccessful =
- rows.stream()
- .map(r -> r.getFieldAs("brand"))
- .anyMatch(brand -> brand.equals("Apple"));
- if (testSuccessful) {
- System.out.println("Success. Ready for deployment.");
- } else {
- throw new IllegalStateException("Test was not successful");
- }
- }
-
- // --------------------------------------------------------------------------------------------
- // Deploy Phase
- // --------------------------------------------------------------------------------------------
-
- private static void deployProgram(TableEnvironment env) {
- System.out.println("Running deploy...");
-
- // It is possible to give a better statement name for deployment but make sure that the name
- // is unique across environment and region.
- String statementName = "vendors-per-brand-" + UUID.randomUUID();
- env.getConfig().set("client.statement-name", statementName);
-
- // Execute the SQL without dynamic options.
- // The result is unbounded and piped into the target table.
- TableResult insertIntoResult =
- env.sqlQuery(String.format(SQL, "")).insertInto(TARGET_TABLE).execute();
-
- // The API might add suffixes to manual statement names such as '-sql' or '-api'.
- // For the final submitted name, use the provided tools.
- String finalName = ConfluentTools.getStatementName(insertIntoResult);
-
- System.out.println("Statement has been deployed as: " + finalName);
+ "Statement has been deployed as: " + ConfluentTools.getStatementName(result));
}
}
diff --git a/src/test/java/io/confluent/flink/examples/table/Example_08_IntegrationAndDeploymentIT.java b/src/test/java/io/confluent/flink/examples/table/Example_08_IntegrationAndDeploymentIT.java
new file mode 100644
index 0000000..53dab7f
--- /dev/null
+++ b/src/test/java/io/confluent/flink/examples/table/Example_08_IntegrationAndDeploymentIT.java
@@ -0,0 +1,189 @@
+package io.confluent.flink.examples.table;
+
+import io.confluent.flink.plugin.ConfluentPluginOptions;
+import io.confluent.flink.plugin.ConfluentSettings;
+import io.confluent.flink.plugin.ConfluentTools;
+
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.table.api.Expressions.lit;
+import static org.apache.flink.table.api.Expressions.withAllColumns;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.fail;
+
+/**
+ * Integration tests for the pipeline logic of {@link Example_08_IntegrationAndDeployment}, executed
+ * against Confluent Cloud.
+ *
+ * While the unit tests (see {@code Example_08_IntegrationAndDeploymentTest}) verify the logic
+ * locally, these tests verify it on the real service: the exact Confluent SQL semantics, the
+ * Confluent catalog, and Kafka-backed tables.
+ *
+ *
The tests run during {@code ./mvnw verify} and fail fast when the required environment
+ * variables are not set, so a CI pipeline cannot silently skip its verification step and still
+ * report success. Builds without Confluent Cloud credentials skip them with {@code ./mvnw verify
+ * -DskipITs}. They require the standard connection variables (see the README's "Via Environment
+ * Variables" section) plus TARGET_CATALOG and TARGET_DATABASE pointing to an environment and Kafka
+ * cluster with write access.
+ *
+ *
Because we cannot rely on production data in this example, the test fixture creates a mock
+ * Kafka-backed table and fills it with data from the marketplace examples table. Dynamic options
+ * then make the table bounded, so the pipeline terminates and its result can be asserted.
+ *
+ *
NOTE: Running from the IDE needs the opposite classpath exclusion from the unit tests (the
+ * {@code flink-table-planner-loader} JAR) plus the environment variables in the run configuration;
+ * see the README's testing section.
+ */
+class Example_08_IntegrationAndDeploymentIT {
+
+ // Name of the mock Kafka topic that emulates the production input
+ static final String SOURCE_TABLE = "ProductsMock";
+
+ static TableEnvironment env;
+
+ @BeforeAll
+ // The timeout runs the setup in a separate thread so that it can be interrupted even while
+ // blocked on statement results, e.g. when the compute pool has no capacity for the fill
+ // statement. Without it, a stuck setup would hang until the CI job timeout.
+ @Timeout(value = 15, unit = TimeUnit.MINUTES, threadMode = Timeout.ThreadMode.SEPARATE_THREAD)
+ static void setUpMockTable() throws Exception {
+ requireEnvironment();
+ env = TableEnvironment.create(ConfluentSettings.fromGlobalVariables());
+ env.useCatalog(System.getenv("TARGET_CATALOG"));
+ env.useDatabase(System.getenv("TARGET_DATABASE"));
+
+ System.out.println("Creating table... " + SOURCE_TABLE);
+ // Create a mock table that has exactly the same schema as the example `products` table.
+ // The LIKE clause is very convenient for this task which is why we use SQL here.
+ // Since we use little data, a bucket of 1 is important to satisfy the
+ // `scan.bounded.mode` during testing.
+ env.executeSql(
+ String.format(
+ "CREATE TABLE IF NOT EXISTS `%s`\n"
+ + "DISTRIBUTED INTO 1 BUCKETS\n"
+ + "LIKE `examples`.`marketplace`.`products` (EXCLUDING OPTIONS)",
+ SOURCE_TABLE));
+
+ System.out.println("Start filling table...");
+ // Let Flink copy generated data into the mock table. Note that the statement is
+ // unbounded and submitted as a background statement by default.
+ TableResult pipelineResult =
+ env.from("`examples`.`marketplace`.`products`")
+ .select(withAllColumns())
+ .insertInto(SOURCE_TABLE)
+ .execute();
+
+ long count = 0;
+ try {
+ System.out.println("Waiting for at least 200 elements in table...");
+ // A second Flink statement monitors how the copying progresses. The foreground
+ // statement is stopped automatically when its iterator is closed.
+ TableResult countResult =
+ env.from(SOURCE_TABLE).select(lit(1).count()).as("c").execute();
+ try (CloseableIterator iterator = countResult.collect()) {
+ while (count < 200L && iterator.hasNext()) {
+ count = iterator.next().getFieldAs("c");
+ }
+ }
+ } finally {
+ // The fill statement is unbounded and must always be cleaned up, even when the wait
+ // above fails, as it would otherwise keep running and consuming CFUs. It is also
+ // deleted rather than just stopped: it gets a random name on every run, and stopped
+ // statements would accumulate in the environment.
+ String fillStatement = ConfluentTools.getStatementName(pipelineResult);
+ ConfluentTools.stopStatement(env, fillStatement);
+ ConfluentTools.deleteStatement(env, fillStatement);
+ }
+ if (count < 200L) {
+ fail(
+ "The mock table only reached "
+ + count
+ + " elements before the monitoring statement terminated.");
+ }
+ System.out.println("200 elements reached.");
+ }
+
+ // Fails fast with a clear message instead of skipping, so that a CI pipeline with missing
+ // secrets cannot report a successful verification that never ran. The connection variable
+ // names come from ConfluentPluginOptions, so the list cannot drift from the plugin contract.
+ private static void requireEnvironment() {
+ List required = new ArrayList<>();
+ // A properties file referenced via FLINK_PROPERTIES is a valid alternative to the
+ // discrete connection variables (see the README's "Configuration" section).
+ if (isBlank(System.getenv(ConfluentPluginOptions.VAR_FLINK_PROPERTIES))) {
+ required.add(ConfluentPluginOptions.VAR_CLOUD_PROVIDER);
+ required.add(ConfluentPluginOptions.VAR_CLOUD_REGION);
+ required.add(ConfluentPluginOptions.VAR_FLINK_API_KEY);
+ required.add(ConfluentPluginOptions.VAR_FLINK_API_SECRET);
+ required.add(ConfluentPluginOptions.VAR_ORG_ID);
+ required.add(ConfluentPluginOptions.VAR_ENV_ID);
+ required.add(ConfluentPluginOptions.VAR_COMPUTE_POOL_ID);
+ }
+ required.add("TARGET_CATALOG");
+ required.add("TARGET_DATABASE");
+ List missing =
+ required.stream()
+ .filter(name -> isBlank(System.getenv(name)))
+ .collect(Collectors.toList());
+ if (!missing.isEmpty()) {
+ fail(
+ "Integration tests verify the pipeline against Confluent Cloud and require the"
+ + " environment variables "
+ + missing
+ + ". Set them (see the README section 'Via Environment Variables') or"
+ + " skip the integration tests explicitly with -DskipITs.");
+ }
+ }
+
+ private static boolean isBlank(String value) {
+ return value == null || value.isBlank();
+ }
+
+ @Test
+ void countsVendorsPerBrandOnBoundedData() {
+ // Dynamic options allow influencing parts of a table scan. In this case, they define a
+ // range (from start offset '0' to end offset '100') how to read from Kafka. Effectively,
+ // they make the table bounded. If all tables are finite, the statement can terminate.
+ // This allows us to run checks on the result.
+ Table boundedProducts =
+ env.sqlQuery(
+ String.format(
+ "SELECT * FROM `%s` /*+ OPTIONS(\n"
+ + "'scan.startup.mode' = 'specific-offsets',\n"
+ + "'scan.startup.specific-offsets' = 'partition: 0, offset: 0',\n"
+ + "'scan.bounded.mode' = 'specific-offsets',\n"
+ + "'scan.bounded.specific-offsets' = 'partition: 0, offset: 100'\n"
+ + ") */",
+ SOURCE_TABLE));
+
+ // The exact same pipeline logic that the unit tests run locally
+ Table result =
+ Example_08_IntegrationAndDeployment.VendorsPerBrand.buildPipeline(boundedProducts);
+
+ List rows = ConfluentTools.collectMaterialized(result.execute());
+
+ assertThat(rows).isNotEmpty();
+ assertThat(rows)
+ .allSatisfy(
+ row -> {
+ assertThat(row.getFieldAs("brand")).isNotBlank();
+ assertThat(row.getFieldAs("vendors")).isPositive();
+ });
+ // The examples data generator produces a fixed set of brands. Checking for a known one
+ // guards against reading the wrong data, not just producing plausible-looking rows.
+ assertThat(rows).extracting(row -> row.getFieldAs("brand")).contains("Apple");
+ }
+}
diff --git a/src/test/java/io/confluent/flink/examples/table/Example_08_IntegrationAndDeploymentTest.java b/src/test/java/io/confluent/flink/examples/table/Example_08_IntegrationAndDeploymentTest.java
new file mode 100644
index 0000000..5e62572
--- /dev/null
+++ b/src/test/java/io/confluent/flink/examples/table/Example_08_IntegrationAndDeploymentTest.java
@@ -0,0 +1,108 @@
+package io.confluent.flink.examples.table;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.CloseableIterator;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.table.api.Expressions.row;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Unit tests for the pipeline logic of {@link Example_08_IntegrationAndDeployment}.
+ *
+ * These tests run entirely locally on Apache Flink with mock data from {@code fromValues()}. No
+ * Confluent Cloud connectivity, credentials, or compute pool are required, which makes them
+ * suitable for fast feedback during development and for CI runs on pull requests.
+ *
+ *
NOTE: The Confluent plugin and the Apache Flink planner cannot share a runtime classpath (both
+ * register Executor and Planner factories under the identifier 'default'), so these tests must be
+ * executed via {@code ./mvnw test}, where the surefire configuration excludes the plugin. Running
+ * them directly from the IDE fails with "Multiple factories for identifier 'default'"; see the
+ * README's testing section for the IDE run-configuration setup.
+ *
+ *
ALSO NOTE: Running locally on Apache Flink is not identical to Confluent Cloud.
+ * Confluent-specific features such as the {@code $rowtime} system column, the Confluent catalog,
+ * and Confluent SQL extensions are not available locally. Use the integration tests (see {@code
+ * Example_08_IntegrationAndDeploymentIT}) to verify behavior against the real service.
+ */
+class Example_08_IntegrationAndDeploymentTest {
+
+ private static Table mockProducts(TableEnvironment env) {
+ return env.fromValues(
+ DataTypes.ROW(
+ DataTypes.FIELD("name", DataTypes.STRING()),
+ DataTypes.FIELD("brand", DataTypes.STRING())),
+ row("MacBook", "Apple"),
+ row("iPhone", "Apple"),
+ row("Galaxy", "Samsung"));
+ }
+
+ @Test
+ void countsVendorsPerBrandInBatchMode() throws Exception {
+ // Batch mode computes the final result over the finite mock data, which makes
+ // assertions straightforward.
+ TableEnvironment env = TableEnvironment.create(EnvironmentSettings.inBatchMode());
+
+ Table result =
+ Example_08_IntegrationAndDeployment.VendorsPerBrand.buildPipeline(
+ mockProducts(env));
+
+ assertThat(collectRows(result))
+ .containsExactlyInAnyOrder(Row.of("Apple", 2L), Row.of("Samsung", 1L));
+ }
+
+ @Test
+ void countsVendorsPerBrandInStreamingMode() throws Exception {
+ // Streaming mode emits a changelog: an insert for the first product of a brand,
+ // followed by update_before/update_after pairs as more products arrive. This mirrors
+ // how the statement behaves on Confluent Cloud.
+ TableEnvironment env = TableEnvironment.create(EnvironmentSettings.inStreamingMode());
+
+ Table result =
+ Example_08_IntegrationAndDeployment.VendorsPerBrand.buildPipeline(
+ mockProducts(env));
+
+ List changelog = collectRows(result);
+ assertThat(materialize(changelog))
+ .containsExactlyInAnyOrder(Row.of("Apple", 2L), Row.of("Samsung", 1L));
+ }
+
+ private static List collectRows(Table table) throws Exception {
+ List rows = new ArrayList<>();
+ try (CloseableIterator iterator = table.execute().collect()) {
+ iterator.forEachRemaining(rows::add);
+ }
+ return rows;
+ }
+
+ // Applies the changelog to derive the final result, similar to what
+ // ConfluentTools.collectMaterialized() does for statements running on Confluent Cloud.
+ // Rows are copied so that the caller's changelog is left untouched.
+ private static List materialize(List changelog) {
+ List state = new ArrayList<>();
+ for (Row row : changelog) {
+ Row copy = Row.copy(row);
+ copy.setKind(RowKind.INSERT);
+ switch (row.getKind()) {
+ case INSERT:
+ case UPDATE_AFTER:
+ state.add(copy);
+ break;
+ case UPDATE_BEFORE:
+ case DELETE:
+ state.remove(copy);
+ break;
+ }
+ }
+ return state;
+ }
+}
diff --git a/src/test/java/io/confluent/flink/examples/table/Example_09_FunctionsTest.java b/src/test/java/io/confluent/flink/examples/table/Example_09_FunctionsTest.java
new file mode 100644
index 0000000..db66f3f
--- /dev/null
+++ b/src/test/java/io/confluent/flink/examples/table/Example_09_FunctionsTest.java
@@ -0,0 +1,43 @@
+package io.confluent.flink.examples.table;
+
+import org.apache.flink.api.common.functions.util.ListCollector;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Unit tests for the User-Defined Functions of {@link Example_09_Functions}.
+ *
+ * UDFs are plain Java classes, so their logic can be tested with JUnit alone: no Apache Flink,
+ * no Confluent Cloud connectivity, and no artifact upload are required. This is the fastest test
+ * tier and should cover the bulk of a UDF's business logic before it is registered and exercised on
+ * Confluent Cloud.
+ */
+class Example_09_FunctionsTest {
+
+ @Test
+ void customTaxReturnsRatePerLocation() {
+ Example_09_Functions.CustomTax tax = new Example_09_Functions.CustomTax();
+
+ assertThat(tax.eval("USA")).isEqualTo(10);
+ assertThat(tax.eval("EU")).isEqualTo(5);
+ assertThat(tax.eval("Mars")).isEqualTo(0);
+ }
+
+ @Test
+ void explodeEmitsOneRowPerElement() {
+ Example_09_Functions.Explode explode = new Example_09_Functions.Explode();
+
+ // Table functions emit rows via a collector, which tests can replace with a list
+ List collected = new ArrayList<>();
+ explode.setCollector(new ListCollector<>(collected));
+
+ explode.eval(List.of("Apples", "Bananas"));
+
+ assertThat(collected).containsExactly("Apples", "Bananas");
+ }
+}
From dc0bf5d616a0505aeb037cfea88fe04d61df6daa Mon Sep 17 00:00:00 2001
From: Martijn Visser <2989614+MartijnVisser@users.noreply.github.com>
Date: Wed, 1 Jul 2026 13:47:01 +0200
Subject: [PATCH 2/5] Build against Confluent plugin 2.3-1 for Flink 2.3.0
---
pom.xml | 3 +--
1 file changed, 1 insertion(+), 2 deletions(-)
diff --git a/pom.xml b/pom.xml
index ab53c67..88c037e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -11,8 +11,7 @@
2.3.0
-
- 2.2-24
+ 2.3-1
17
UTF-8
${target.java.version}
From cbeb95bc991fa4827e60a30817077c385d290c06 Mon Sep 17 00:00:00 2001
From: Martijn Visser <2989614+MartijnVisser@users.noreply.github.com>
Date: Wed, 1 Jul 2026 13:47:01 +0200
Subject: [PATCH 3/5] Run Example_08 explicitly via -cp in deploy/manage
workflows
---
.github/workflows-examples/deploy.yml | 4 ++--
.github/workflows-examples/manage.yml | 2 +-
2 files changed, 3 insertions(+), 3 deletions(-)
diff --git a/.github/workflows-examples/deploy.yml b/.github/workflows-examples/deploy.yml
index 70911df..544a11e 100644
--- a/.github/workflows-examples/deploy.yml
+++ b/.github/workflows-examples/deploy.yml
@@ -53,7 +53,7 @@ jobs:
# pipeline restarts from its configured source offsets without resuming prior state
# (see the README CI/CD section).
run: >
- java -jar target/flink-table-api-java-examples-1.0.jar
+ java -cp target/flink-table-api-java-examples-1.0.jar io.confluent.flink.examples.table.Example_08_IntegrationAndDeployment
--statement-name "$STATEMENT_NAME"
--application-name "$APPLICATION_NAME"
--sql.current-catalog "$TARGET_CATALOG"
@@ -64,6 +64,6 @@ jobs:
# Submitting a background statement returns once Confluent Cloud accepts it, so
# list reports the statement and its phase for visibility in the log.
run: >
- java -jar target/flink-table-api-java-examples-1.0.jar
+ java -cp target/flink-table-api-java-examples-1.0.jar io.confluent.flink.examples.table.Example_08_IntegrationAndDeployment
list
--application-name "$APPLICATION_NAME"
diff --git a/.github/workflows-examples/manage.yml b/.github/workflows-examples/manage.yml
index 1813745..d3cc81f 100644
--- a/.github/workflows-examples/manage.yml
+++ b/.github/workflows-examples/manage.yml
@@ -67,4 +67,4 @@ jobs:
if [ -n "$STATEMENT_NAME" ]; then
args+=(--statement-name "$STATEMENT_NAME")
fi
- java -jar target/flink-table-api-java-examples-1.0.jar "${args[@]}"
+ java -cp target/flink-table-api-java-examples-1.0.jar io.confluent.flink.examples.table.Example_08_IntegrationAndDeployment "${args[@]}"
From 1ef34a2f34692cf9263e5eec5c3401720bd66399 Mon Sep 17 00:00:00 2001
From: Martijn Visser <2989614+MartijnVisser@users.noreply.github.com>
Date: Wed, 1 Jul 2026 14:58:38 +0200
Subject: [PATCH 4/5] Fix flaky integration test: read uncommitted mock data
and drop table after run
---
.../Example_08_IntegrationAndDeploymentIT.java | 15 +++++++++++++++
1 file changed, 15 insertions(+)
diff --git a/src/test/java/io/confluent/flink/examples/table/Example_08_IntegrationAndDeploymentIT.java b/src/test/java/io/confluent/flink/examples/table/Example_08_IntegrationAndDeploymentIT.java
index 53dab7f..aec59fd 100644
--- a/src/test/java/io/confluent/flink/examples/table/Example_08_IntegrationAndDeploymentIT.java
+++ b/src/test/java/io/confluent/flink/examples/table/Example_08_IntegrationAndDeploymentIT.java
@@ -10,6 +10,7 @@
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
+import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
@@ -70,10 +71,15 @@ static void setUpMockTable() throws Exception {
// The LIKE clause is very convenient for this task which is why we use SQL here.
// Since we use little data, a bucket of 1 is important to satisfy the
// `scan.bounded.mode` during testing.
+ // The fill statement writes with exactly-once semantics, so records only become visible to
+ // read-committed consumers once a checkpoint commits (~1 min on Confluent Cloud). This test
+ // reads the freshly filled data well within that interval, so it reads uncommitted records;
+ // committed durability is not what this pipeline test verifies.
env.executeSql(
String.format(
"CREATE TABLE IF NOT EXISTS `%s`\n"
+ "DISTRIBUTED INTO 1 BUCKETS\n"
+ + "WITH ('kafka.consumer.isolation-level' = 'read-uncommitted')\n"
+ "LIKE `examples`.`marketplace`.`products` (EXCLUDING OPTIONS)",
SOURCE_TABLE));
@@ -186,4 +192,13 @@ void countsVendorsPerBrandOnBoundedData() {
// guards against reading the wrong data, not just producing plausible-looking rows.
assertThat(rows).extracting(row -> row.getFieldAs("brand")).contains("Apple");
}
+
+ @AfterAll
+ // Drop the mock table so the run does not leak a topic into the environment. Runs even when a
+ // test fails, so a failed run cleans up after itself too.
+ static void dropMockTable() {
+ if (env != null) {
+ env.executeSql(String.format("DROP TABLE IF EXISTS `%s`", SOURCE_TABLE));
+ }
+ }
}
From efd25e6455957da1ce83fad482006af5b668befa Mon Sep 17 00:00:00 2001
From: Martijn Visser <2989614+MartijnVisser@users.noreply.github.com>
Date: Wed, 1 Jul 2026 14:58:38 +0200
Subject: [PATCH 5/5] Name deploy and manage workflow-example runs after their
target
---
.github/workflows-examples/deploy.yml | 5 +++++
.github/workflows-examples/manage.yml | 6 ++++++
2 files changed, 11 insertions(+)
diff --git a/.github/workflows-examples/deploy.yml b/.github/workflows-examples/deploy.yml
index 544a11e..0818059 100644
--- a/.github/workflows-examples/deploy.yml
+++ b/.github/workflows-examples/deploy.yml
@@ -5,6 +5,11 @@
# required secrets.
name: Deploy Table API Program
+# Show what this pipeline deploys in the runs list. run-name can only read the github and inputs
+# contexts, not env, so the application and statement names are repeated here; keep them in sync
+# with APPLICATION_NAME / STATEMENT_NAME below.
+run-name: "Deploy marketplace-analytics / vendors-per-brand"
+
on:
# Choose the trigger(s) that fit your deployment strategy. This template can be run
# manually from the GitHub Actions UI; uncomment the others to deploy automatically,
diff --git a/.github/workflows-examples/manage.yml b/.github/workflows-examples/manage.yml
index d3cc81f..d388a6a 100644
--- a/.github/workflows-examples/manage.yml
+++ b/.github/workflows-examples/manage.yml
@@ -6,6 +6,12 @@
# secrets. The application name defaults to the one deploy.yml uses; override it per run.
name: Manage Flink Statements
+# Name each run after the action and target so the runs list is distinguishable at a glance
+# (e.g. "stop on marketplace-analytics / vendors-per-brand"). The statement-name part is only
+# added when set, so "list" (which takes no statement) shows no dangling separator. Hyphenated
+# inputs use bracket syntax because a dot would be parsed as subtraction.
+run-name: "${{ inputs.action }} on ${{ inputs['application-name'] }}${{ inputs['statement-name'] != '' && format(' / {0}', inputs['statement-name']) || '' }}"
+
on:
workflow_dispatch:
inputs: