confluentinc/flink-table-api-python-examples
Python Examples for running Apache Flink® Table API on Confluent Cloud
Apache Flink® Table API on Confluent Cloud - Examples
This repository contains examples for running Apache Flink's Table API on Confluent Cloud.
Introduction to Table API for Python
The Table API enables a programmatic
way of developing, testing, and submitting Flink pipelines for processing data streams.
Streams can be finite or infinite, with insert-only or changelog data. The latter allows for dealing with Change Data
Capture (CDC) events.
Within the API, you conceptually work with tables that change over time - inspired by relational databases. Write
a Table Program as a declarative and structured graph of data transformations. Table API is inspired by SQL and complements
it with additional tools for juggling real-time data. You can mix and match Flink SQL with Table API at any time as they
go hand in hand.
Table API on Confluent Cloud
Table API on Confluent Cloud is a client-side library that delegates Flink API calls to Confluent’s public
REST API. It submits Statements and retrieves
StatementResults.
Table programs are implemented against Flink's open source Table API for Python.
The provided Confluent pip packages repackage Flink's Python API and bundle the Confluent-specific components for powering the TableEnvironment without the need
for a local Flink cluster. While using those packages, Flink internal components such as
CatalogStore, Catalog, Planner, Executor, and configuration are managed by the plugin and fully integrate with
Confluent Cloud. Including access to Apache Kafka®, Schema Registry, and Flink Compute Pools.
Note: The Table API plugin is in Open Preview stage. Take a look at the Known Limitation section below.
Motivating Example
The following code shows how a Table API program is structured. Subsequent sections will go into more details how you
can use the examples of this repository to play around with Flink on Confluent Cloud.
from pyflink.table.confluent import ConfluentSettings, ConfluentTools
from pyflink.table import TableEnvironment, Row
from pyflink.table.expressions import col, row
def run():
# Setup connection properties to Confluent Cloud
settings = ConfluentSettings.from_global_variables()
env = TableEnvironment.create(settings)
# Run your first Flink statement in Table API
env.from_elements([row("Hello world!")]).execute().print()
# Or use SQL
env.sql_query("SELECT 'Hello world!'").execute().print()
# Structure your code with Table objects - the main ingredient of Table API.
table = env.from_path("examples.marketplace.clicks") \
.filter(col("user_agent").like("Mozilla%")) \
.select(col("click_id"), col("user_id"))
table.print_schema()
print(table.explain())
# Use the provided tools to test on a subset of the streaming data
expected = ConfluentTools.collect_materialized_limit(table, 50)
actual = [Row(42, 500)]
if expected != actual:
print("Results don't match!")
if __name__ == "__main__":
run()Getting Started
Prerequisites
- Sign up for Confluent Cloud at https://confluent.cloud
- Create a compute pool
in the web UI of Confluent's Cloud Console - Generate an API Key
for the region where you created your compute pool - Optional: Create a Kafka cluster
if you want to run examples that store data in Kafka - Have the correct environment variables set as per the documentation
- We recommend using a tool like uv to manage your Python versions and environments and Python 3.9-3.11 are the only versions currently supported.
Run Examples
All example files are located in examples. Each file contains a run()
function that can be executed directly or in __main__. Each has multiple table programs that will be executed individually. Every example program covers a different topic to learn
more about how Table API can be used. It is recommended to go through the examples in the defined order as they partially
build on top of each other.
Clone this repository to your local computer, or download it as a ZIP file and extract it.
git clone https://github.com/confluentinc/flink-table-api-python-examples.gitChange the current directory.
cd flink-table-api-python-examplesWe recommend using uv to run the scripts, will automatically create a virtualenv with the required dependencies.
Note: Flink's Python API communicates with a Java process under the hood. Make sure you also have at least Java 11
installed. Check that your JAVA_HOME environment variable is correctly set. Only checking java -version might not
be enough.
echo $JAVA_HOME
If required install openjdk and export the JAVA_HOME
brew install openjdk && export JAVA_HOME=$(/usr/libexec/java_home) && echo $JAVA_HOMERun an example script. No worries the program is read-only so it won't affect your existing
Kafka clusters. All results will be printed to the console.
uv run examples/example_00_hello_worldAn output similar to the following means that you are able to run the examples:
io.confluent.flink.plugin.ConfluentFlinkException: Parameter 'client.organization-id' not found.
Configuration will be covered in the next section.
Configure the settings parameters in the ConfluentSettings class.
The Table API plugin needs a set of configuration options for establishing a connection to Confluent Cloud. These can be set as a properties file, passed in via the command line as arguments, defined in the code or via the environment variables. This example uses the environment variables. For more details, please see the documentation.
All required information can be found in the web UI of Confluent's Cloud Console:
client.organization-id|ORG_IDfrom Menu → Settings → Organizationsclient.environment-id|ENV_IDfrom Menu → Environmentsclient.cloud|CLOUD_PROVIDER,client.region|CLOUD_REGION,client.compute-pool-id|COMPUTE_POOL_IDfrom Menu → Environments → your environment → Flink → your compute poolclient.flink-api-key|FLINK_API_KEY,client.flink-api-secret|FLINK_API_SECRETfrom Menu → Settings → API keys
Export the environment variables as shown below:
export CLOUD_PROVIDER="<my_cloud>"
export CLOUD_REGION="<my_region>"
export FLINK_API_KEY="<my_key>"
export FLINK_API_SECRET="<my_secret>"
export ORG_ID="<my_organization>"
export ENV_ID="<my_environment>"
export COMPUTE_POOL_ID="<my_compute_pool>"Examples should be runnable after setting all configuration options correctly.
Table API Playground using Python Interactive Shell
For convenience, the repository also contains an init script for playing around with
Table API in an interactive manner.
-
Create a virtualenv with
uv syncand activate it withsource .venv/bin/activate. -
Run
python -i start_pyshell.pyto start an interactive repl to explore Table API. -
The
TableEnvironmentis pre-initialized from environment variables and available underenv. -
Run your first "Hello world!" using
env.execute_sql("SELECT 'Hello world!'").print()
Configuration
The Table API plugin needs a set of configuration options for establishing a connection to Confluent Cloud.
The ConfluentSettings class is a utility for providing configuration options from various sources.
For production, external input, code, and environment variables can be combined.
Precedence order (highest to lowest):
- Properties File
- Code
- Environment Variables
A multi-layered configuration can look like:
from pyflink.table.confluent import ConfluentSettings
from pyflink.table import TableEnvironment
def run():
# Properties file might set cloud, region, org, env, and compute pool.
# Environment variables might pass key and secret.
# Code sets the session name and SQL-specific options.
settings = ConfluentSettings.new_builder_from_file(...) \
.set_context_name("MyTableProgram") \
.set_option("sql.local-time-zone", "UTC") \
.build()
env = TableEnvironment.create(settings)Via Properties File
Store options (or some options) in a cloud.properties file:
# Cloud region
client.cloud=aws
client.region=us-east-1
# Access & compute resources
client.flink-api-key=key
client.flink-api-secret=secret
client.organization-id=b0b21724-4586-4a07-b787-d0bb5aacbf87
client.environment-id=env-z3y2x1
client.compute-pool-id=lfcp-8m03rmReference the cloud.properties file:
from pyflink.table.confluent import ConfluentSettings
# Arbitrary file location in file system
settings = ConfluentSettings.from_file("/path/to/cloud.properties")A path to a properties file can also be specified by setting the environment variable FLINK_PROPERTIES.
Via Code
Pass all options (or some options) in code:
from pyflink.table.confluent import ConfluentSettings
settings = ConfluentSettings.new_builder() \
.set_cloud("aws") \
.set_region("us-east-1") \
.set_flink_api_key("key") \
.set_flink_api_secret("secret") \
.set_organization_id("b0b21724-4586-4a07-b787-d0bb5aacbf87") \
.set_environment_id("env-z3y2x1") \
.set_compute_pool_id("lfcp-8m03rm") \
.build()Via Environment Variables
Pass all options (or some options) as variables:
export CLOUD_PROVIDER="aws"
export CLOUD_REGION="us-east-1"
export FLINK_API_KEY="key"
export FLINK_API_SECRET="secret"
export ORG_ID="b0b21724-4586-4a07-b787-d0bb5aacbf87"
export ENV_ID="env-z3y2x1"
export COMPUTE_POOL_ID="lfcp-8m03rm"
poetry run exampleIn code call:
from pyflink.table.confluent import ConfluentSettings
settings = ConfluentSettings.from_global_variables()A path to a properties file can also be specified by setting the environment variable FLINK_PROPERTIES.
Configuration Options
The following configuration needs to be provided:
| Property key | Environment variable | Required | Comment |
|---|---|---|---|
client.cloud |
CLOUD_PROVIDER |
Y | Confluent identifier for a cloud provider. For example: aws |
client.region |
CLOUD_REGION |
Y | Confluent identifier for a cloud provider's region. For example: us-east-1 |
client.flink-api-key |
FLINK_API_KEY |
Y | API key for Flink access. |
client.flink-api-secret |
FLINK_API_SECRET |
Y | API secret for Flink access. |
client.organization-id |
ORG_ID |
Y | ID of the organization. For example: b0b21724-4586-4a07-b787-d0bb5aacbf87 |
client.environment-id |
ENV_ID |
Y | ID of the environment. For example: env-z3y2x1 |
client.compute-pool-id |
COMPUTE_POOL_ID |
Y | ID of the compute pool. For example: lfcp-8m03rm |
Additional configuration:
| Property key | Environment variable | Required | Comment |
|---|---|---|---|
client.endpoint-template |
ENDPOINT_TEMPLATE |
N | A template for the endpoint URL. For example: https://flinkpls-dom123.{region}.{cloud}.confluent.cloud |
client.principal-id |
PRINCIPAL_ID |
N | Principal that runs submitted statements. For example: sa-23kgz4 (for a service account) |
client.context |
N | A name for this Table API session. For example: my_table_program |
|
client.statement-name |
N | Unique name for statement submission. By default, generated using a UUID. | |
client.rest-endpoint |
REST_ENDPOINT |
N | URL to the REST endpoint. For example: proxyto.confluent.cloud |
client.catalog-cache |
N | Expiration time for catalog objects. For example: '5 min'. '1 min' by default. '0' disables the caching. |
Endpoint Configuration
The Confluent Flink plugin provides options to configure endpoints for connecting to Confluent Cloud services. The template-based approach is the recommended method.
client.endpoint-template
This option provides a template for constructing the Flink statement API endpoint URL.
- Default:
https://flink.{region}.{cloud}.confluent.cloud - Example:
https://flinkpls-dom123.{region}.{cloud}.confluent.cloud - Usage: The template supports placeholders
{region}and{cloud}that are replaced with the configured region and cloud provider values. - Environment Variable:
ENDPOINT_TEMPLATE
client.rest-endpoint (Discouraged)
This option specifies the base domain for REST API calls to Confluent Cloud. While still supported, using the template-based configuration above is preferred.
- Default: No default value
- Example:
proxy.confluent.cloud - Usage: When specified, the plugin constructs the full Flink statement API endpoint URL as
https://flink.{region}.{cloud}.{rest-endpoint}where{region}and{cloud}are replaced with the configured region and cloud provider values. - Important:
client.endpoint-templateandclient.rest-endpointare mutually exclusive. If both are set, an exception is thrown. - Environment Variable:
REST_ENDPOINT
Relationship and Default Behavior
-
Mutual Exclusivity:
client.endpoint-templateandclient.rest-endpointcannot be set simultaneously
-
Default Behavior:
- If neither
client.rest-endpointnorclient.endpoint-templateis configured, the default templatehttps://flink.{region}.{cloud}.confluent.cloudis used for statement API - If endpoint templates are used, each endpoint is constructed independently with the provided templates
- If neither
Example
Here's a simple example showing how to configure an endpoint:
# cloud.properties:
# client.region=us-east-1
# client.cloud=aws
# client.endpoint-template=https://flinkpls-dom123.{region}.{cloud}.confluent.cloud
# Resolved endpoints:
# - Statement API: https://flinkpls-dom123.us-east-1.aws.confluent.cloud
settings = ConfluentSettings.from_file("/cloud.properties")Documentation for Confluent Utilities
Confluent Tools
The ConfluentTools class adds additional methods that can be useful when developing and testing Table API programs.
ConfluentTools.collect_changelog / ConfluentTools.print_changelog
Executes the given table transformations on Confluent Cloud and returns the results locally
as a list of changelog rows. Or prints to the console in a table style.
This method performs table.execute().collect() under the hood and consumes a fixed
amount of rows from the returned iterator.
Note: The method can work on both finite and infinite input tables. If the pipeline is
potentially unbounded, it will stop fetching after the desired amount of rows has been
reached.
Examples:
from pyflink.table.confluent import ConfluentSettings, ConfluentTools
from pyflink.table import TableEnvironment
settings = ConfluentSettings.from_global_variables()
env = TableEnvironment.create(settings)
# On Table object
table = env.from_path("examples.marketplace.customers")
rows = ConfluentTools.collect_changelog_limit(table, 100)
ConfluentTools.print_changelog_limit(table, 100)
# On TableResult object
tableResult = env.execute_sql("SELECT * FROM examples.marketplace.customers")
rows = ConfluentTools.collect_changelog_limit(tableResult, 100)
ConfluentTools.print_changelog_limit(tableResult, 100)Shortcuts:
# For finite (i.e. bounded) tables
ConfluentTools.collect_changelog(table)
ConfluentTools.print_changelog(table)ConfluentTools.collect_materialized / ConfluentTools.print_materialized
Executes the given table transformations on Confluent Cloud and returns the results locally
as a materialized changelog. In other words: changes are applied to an in-memory table and
returned as a list of insert-only rows. Or printed to the console in a table style.
This method performs table.execute().collect() under the hood and consumes a fixed
amount of rows from the returned iterator.
Note: The method can work on both finite and infinite input tables. If the pipeline is
potentially unbounded, it will stop fetching after the desired amount of rows has been
reached.
from pyflink.table.confluent import ConfluentSettings, ConfluentTools
from pyflink.table import TableEnvironment
settings = ConfluentSettings.from_global_variables()
env = TableEnvironment.create(settings)
# On Table object
table = env.from_path("examples.marketplace.customers")
rows = ConfluentTools.collect_materialized_limit(table, 100)
ConfluentTools.print_materialized_limit(table, 100)
# On TableResult object
tableResult = env.execute_sql("SELECT * FROM examples.marketplace.customers")
rows = ConfluentTools.collect_materialized_limit(tableResult, 100)
ConfluentTools.print_materialized_limit(tableResult, 100)Shortcuts:
# For finite (i.e. bounded) tables
ConfluentTools.collect_materialized(table)
ConfluentTools.print_materialized(table)ConfluentTools.get_statement_name / ConfluentTools.stop_statement
Additional lifecycle methods are available to control statements on Confluent Cloud after they have
been submitted.
# On TableResult object
table_result = env.execute_sql("SELECT * FROM examples.marketplace.customers")
statement_name = ConfluentTools.get_statement_name(table_result)
ConfluentTools.stop_statement(table_result)
# Based on statement name
ConfluentTools.stop_statement_by_name(env, "table-api-2024-03-21-150457-36e0dbb2e366-sql")Confluent Table Descriptor
A table descriptor for creating tables located in Confluent Cloud programmatically.
Compared to the regular Flink one, this class adds support for Confluent's system columns
and convenience methods for working with Confluent tables.
for_managed corresponds to TableDescriptor.for_conector("confluent").
from pyflink.table.confluent import ConfluentTableDescriptor
from pyflink.table import Schema, DataTypes
from pyflink.table.expressions import col, lit
descriptor = ConfluentTableDescriptor.for_managed() \
.schema(
Schema.new_builder()
.column("i", DataTypes.INT())
.column("s", DataTypes.INT())
.watermark("$rowtime", col("$rowtime").minus(lit(5).seconds)) # Access $rowtime system column
.build()) \
.build()
env.createTable("t1", descriptor)Known Limitations
The Table API plugin is in Open Preview stage.
Unsupported by Table API Plugin
The following features are currently not supported:
- Temporary catalog objects (including tables, views, functions)
- Custom modules
- Custom catalogs
- User-defined functions (including system functions)
- Anonymous, inline objects (including functions, data types)
- CompiledPlan features are not supported
- Batch mode
- Restrictions coming from Confluent Cloud
- custom connectors/formats, including:
- from_elements with Python objects
- converting to/from_pandas
- processing time operations
- structured data types
- many configuration options
- limited SQL syntax
- batch execution mode
- custom connectors/formats, including:
Issues in Open Source Flink
- Both catalog/database must be set or identifiers must be fully qualified. A mixture of setting a current catalog and
using two-part identifiers can lead to errors. - String concatenation with
.plusleads to errors. UseExpressions.concat. - Selecting
.rowtimein windows leads to errors. - Using
.limit()can lead to errors. - Python API is not fully on par with the Java API. The API lacks support for: TablePipeline, ResolvedSchema
Supported API
The following API methods are considered stable and ready to be used:
// TableEnvironment
TableEnvironment.create_statement_st()
TableEnvironment.create_table(String, TableDescriptor)
TableEnvironment.execute_sql(String)
TableEnvironment.explain_sql(String)
TableEnvironment.from_path(String)
TableEnvironment.get_config()
TableEnvironment.get_current_catalog()
TableEnvironment.get_current_database()
TableEnvironment.list_catalogs()
TableEnvironment.list_databases()
TableEnvironment.list_functions()
TableEnvironment.list_tables()
TableEnvironment.list_views()
TableEnvironment.sql_query(String)
TableEnvironment.use_catalog(String)
TableEnvironment.use_database(String)
// from_elements works partially, it should be safe to use it in combination with
// pyflink.table.expression, passing Python objects is not supported
TableEnvironment.from_elements(...)
// Table: SQL equivalents
Table.select(...)
Table.alias(...)
Table.filter(...)
Table.where(...)
Table.group_by(...)
Table.distinct()
Table.join(...)
Table.left_outer_join(...)
Table.right_outer_join(...)
Table.full_outer_join(...)
Table.minus(...)
Table.minus_all(...)
Table.union(...)
Table.union_all(...)
Table.intersect(...)
Table.intersect_all(...)
Table.order_by(...)
Table.offset(...)
Table.fetch(...)
Table.limit(...)
Table.window(...)
// Table: API extensions
Table.print_schema()
Table.add_columns(...)
Table.add_or_replace_columns(...)
Table.rename_columns(...)
Table.drop_columns(...)
Table.explain()
Table.execute()
Table.execute_insert(...)
// StatementSet
StatementSet.execute()
StatementSet.add_insert(...)
StatementSet.add_insert_sql(...)
// TableResult
TableResult.get_job_client().cancel()
TableResult.wait(...)
TableResult.collect()
TableResult.print()
// TableConfig
TableConfig.set(...)
// Expressions
Expressions.* (except for call())
// Others
TableDescriptor.*
FormatDescriptor.*
Tumble.*
Slide.*
Session.*
Over.*
Confluent adds the following classes for more convenience:
ConfluentSettings.*
ConfluentTools.*
ConfluentTableDescriptor.*
Support
Table API goes hand in hand with Flink SQL on Confluent Cloud.
For feature requests or support tickets, use one of the established channels.
Frequent Issues
1. py4j.protocol.Py4JError: ConfluentSettings does not exist in the JVM
This indicates that the Python API was unable to find a working Java runtime for starting a JVM process.
The plugin requires at least Java 11. Check that your JAVA_HOME environment variable is correctly set:
echo "$JAVA_HOME"
It should look similar to:
/Users/Bob/.jenv/versions/11.0Note: Only checking java -version might not be enough. It might be that it shows a correct Java version, but JAVA_HOME
still points to an invalid version. Consider using jenv.
2. io.confluent.flink.plugin.ConfluentFlinkException: Parameter 'client.organization-id' not found.
This indicates that something is wrong with your configuration. Make sure to fill out the./config/cloud.properties file
with the required connection information to Confluent Cloud, or set all properties via environment variables as described
above.