All Posts Apache Flink in Depth: Stream API, Table API, CDC, Connectors and Checkpointing
#apache-flink #stream-processing #data-engineering #aws #kafka #checkpointing #cdc #iceberg

Apache Flink in Depth: Stream API, Table API, CDC, Connectors and Checkpointing

Cavan Page ·

Apache Flink is the de facto standard for stateful stream processing at scale. If you are building pipelines that need sub-second latency, exactly-once semantics or complex event-driven logic over unbounded data, Flink is the right tool. This post is a practical deep dive - the two programming models, the connectors worth knowing, how Flink CDC turns database transaction logs into real-time streams, how to configure checkpointing properly, and what managed services like AWS Managed Flink actually take off your plate.

Two APIs, One Runtime

Flink exposes two programming surfaces: the DataStream API and the Table API (which includes Flink SQL). Both compile down to the same underlying execution plan, but the abstraction level and use cases differ significantly.

The DataStream API

The DataStream API is Flink’s low-level interface. You work directly with streams of typed records, define your own operators, manage state explicitly and control exactly how windows, watermarks and event time semantics behave.

Use the DataStream API when:

  • You need custom stateful logic that cannot be expressed as a SQL query
  • You are implementing complex event processing - pattern detection, session reconstruction, fraud signals
  • You need fine-grained control over state backends and eviction policies
  • Your data model does not map cleanly to a row/column structure

A minimal DataStream job in Java:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

KafkaSource<PageEvent> source = KafkaSource.<PageEvent>builder()
    .setBootstrapServers("kafka:9092")
    .setTopics("page-events")
    .setGroupId("flink-consumer")
    .setStartingOffsets(OffsetsInitializer.earliest())
    .setValueOnlyDeserializer(new PageEventDeserializer())
    .build();

DataStream<PageEvent> events = env.fromSource(
    source,
    WatermarkStrategy.<PageEvent>forBoundedOutOfOrderness(Duration.ofSeconds(5))
        .withTimestampAssigner((event, ts) -> event.getTimestamp()),
    "Kafka Source"
);

events
    .keyBy(PageEvent::getUserId)
    .window(TumblingEventTimeWindows.of(Duration.ofMinutes(5)))
    .aggregate(new PageCountAggregator())
    .uid("page-count-aggregator")
    .sinkTo(JdbcSink.sink(...));

env.execute("Page Event Aggregator");

You have full control here. Watermark strategy, key selection, window type, aggregation logic - all explicit.

The Table API gives you a relational abstraction over streams. You define tables backed by Kafka topics, files, JDBC sources and so on using DDL, then query them with either the fluent Table API or plain SQL.

Use the Table API when:

  • Your transformation logic maps cleanly to SQL: joins, aggregations, filters, projections
  • Your team has SQL fluency and not everyone knows Java or Scala
  • You are building analytical pipelines rather than event-driven applications
  • You want faster iteration - SQL queries are quicker to write and easier to modify

The same aggregation in Flink SQL:

CREATE TABLE page_events (
    user_id STRING,
    page   STRING,
    event_time TIMESTAMP(3),
    WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
    'connector' = 'kafka',
    'topic'     = 'page-events',
    'properties.bootstrap.servers' = 'kafka:9092',
    'format'    = 'json'
);

CREATE TABLE page_counts (
    user_id      STRING,
    window_start TIMESTAMP(3),
    window_end   TIMESTAMP(3),
    page_count   BIGINT
) WITH (
    'connector'  = 'jdbc',
    'url'        = 'jdbc:postgresql://localhost:5432/analytics',
    'table-name' = 'page_counts'
);

INSERT INTO page_counts
SELECT
    user_id,
    TUMBLE_START(event_time, INTERVAL '5' MINUTE),
    TUMBLE_END(event_time,   INTERVAL '5' MINUTE),
    COUNT(*) AS page_count
FROM page_events
GROUP BY user_id, TUMBLE(event_time, INTERVAL '5' MINUTE);

Mixing Both in One Job

You can bridge them in a single job using StreamTableEnvironment. Convert a DataStream to a Table, apply SQL, then convert back. This is useful when you need custom state logic for enrichment but the rest of the pipeline is straightforward enough to express in SQL.

StreamTableEnvironment tenv = StreamTableEnvironment.create(env);

Table enrichedTable = tenv.fromDataStream(
    enrichedStream,
    Schema.newBuilder()
        .columnByExpression("event_time", "CAST(ts AS TIMESTAMP_LTZ(3))")
        .watermark("event_time", "event_time - INTERVAL '3' SECOND")
        .build()
);

tenv.createTemporaryView("enriched_events", enrichedTable);

Table result = tenv.sqlQuery(
    "SELECT user_id, COUNT(*) FROM enriched_events GROUP BY user_id"
);

The rule of thumb: reach for the DataStream API when the logic requires it; default to SQL when the logic fits, because it is easier to read, faster to change and easier to hand off.

Connectors Worth Knowing

Flink’s connector ecosystem has matured significantly. These are the ones you will actually reach for:

Apache Kafka - the default choice for both source and sink. The flink-connector-kafka library provides exactly-once sink semantics via Kafka transactions. The newer KafkaSource / KafkaSink API (Flink 1.14+) supersedes the older FlinkKafkaConsumer / FlinkKafkaProducer - use the new API for any new work.

Amazon Kinesis - flink-connector-kinesis for reading from Kinesis Data Streams. Works well for AWS-native architectures. If you are sinking to Firehose, there is a separate flink-connector-kinesis-firehose connector. Note that enhanced fan-out (EFO) is supported and significantly reduces per-shard read latency compared to the polling consumer.

JDBC - flink-connector-jdbc handles Postgres, MySQL, MariaDB and Oracle. It supports both exactly-once writes (via upsert) and at-least-once. Useful for small-to-medium dimensional data and operational sinks. Do not use it for high-throughput sinks - you will bottleneck on the database.

Apache Iceberg - the iceberg-flink-runtime connector from the Iceberg project lets you write to Iceberg tables with ACID semantics. The right choice if you are building a lakehouse and need time travel, schema evolution or partition evolution on your streaming data.

Amazon S3 / GCS / Azure Blob - Flink ships with flink-s3-fs-hadoop and flink-s3-fs-presto. Used primarily as checkpoint storage but also for file sources and sinks - writing Parquet or Avro to object storage in rolling intervals.

Elasticsearch / OpenSearch - flink-connector-elasticsearch7 handles writes to ES-compatible clusters. It supports buffered batch writes and configurable retry behavior. Use it when you need low-latency search over your stream output.

Apache Pulsar - flink-connector-pulsar if you are using Pulsar instead of Kafka. Pulsar’s native cursor-based consumption maps cleanly to Flink’s source interface and gives you per-key ordering guarantees that Kafka only approximates with partitioning.

Redis - no official Flink connector, but bahir-flink provides one. More commonly Redis is used as a lookup source via AsyncDataStream.unorderedWait to enrich stream events against external state rather than as a sink.

For the DataStream API, you instantiate connector objects directly. For the Table API, you declare them in DDL WITH properties. The same connectors support both surfaces.

Flink CDC (Change Data Capture) is a separate connector library - flink-cdc-connectors - that reads change events directly from database transaction logs. No Kafka in the middle. No Debezium broker. The database binlog or WAL becomes the source stream.

Supported databases include MySQL (binlog), PostgreSQL (WAL via logical replication), MongoDB (oplog), Oracle (LogMiner or XStream), SQL Server (CDC tables) and TiDB.

How It Works

For MySQL, Flink CDC connects as a replica, registers for binlog events, performs an initial full snapshot of the table, and then switches to streaming binlog events as they arrive. The binlog offset is stored in Flink operator state and checkpointed with the rest of the job. On recovery, the source resumes from the checkpointed offset - no duplicate reads, no missed events.

This is the key integration point with checkpointing. The CDC source does not manage offsets independently. Flink’s checkpoint mechanism is what gives you exactly-once semantics across the entire pipeline - from the database change through to your sink.

Using CDC in the DataStream API

MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
    .hostname("mysql-host")
    .port(3306)
    .databaseList("orders_db")
    .tableList("orders_db.orders")
    .username("flink-user")
    .password("password")
    .deserializer(new JsonDebeziumDeserializationSchema())
    .startupOptions(StartupOptions.initial())
    .build();

DataStream<String> cdcStream = env.fromSource(
    mySqlSource,
    WatermarkStrategy.noWatermarks(),
    "MySQL CDC Source"
);

The StartupOptions.initial() setting performs a full snapshot first and then tails the binlog. Use StartupOptions.latest() to skip the snapshot and only consume changes from the point the job starts - useful when you only care about ongoing changes and have already bootstrapped the target.

The deserialized record contains the before and after state of the row, plus the operation type (INSERT, UPDATE, DELETE). The raw format follows the Debezium envelope structure, which you either parse yourself or handle with one of the built-in deserializers.

This is where the Table API earns its keep for CDC use cases. Declaring a CDC source in SQL requires almost no boilerplate:

CREATE TABLE orders_cdc (
    order_id    INT,
    customer_id INT,
    amount      DECIMAL(10, 2),
    status      STRING,
    updated_at  TIMESTAMP(3),
    PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
    'connector'     = 'mysql-cdc',
    'hostname'      = 'mysql-host',
    'port'          = '3306',
    'username'      = 'flink-user',
    'password'      = 'password',
    'database-name' = 'orders_db',
    'table-name'    = 'orders'
);

Flink treats this as a changelog stream. INSERT, UPDATE and DELETE operations from MySQL map to Flink’s RowKind enum (+I, -U, +U, -D). Downstream operators that understand changelog semantics - aggregations, joins, Iceberg sinks, Paimon sinks - handle the retraction and upsert logic automatically.

You can then join the CDC stream with a Kafka topic or another CDC table in real time:

INSERT INTO order_summary
SELECT
    o.order_id,
    o.amount,
    c.name AS customer_name,
    o.status
FROM orders_cdc AS o
JOIN customers_cdc AS c ON o.customer_id = c.customer_id;

Flink maintains the join state in its state backend, updating the result whenever either side changes. This is a continuous, incremental join - not a batch query.

Parallel Snapshot Reading

Early versions of Flink CDC had a significant limitation: the initial snapshot phase for MySQL required parallelism 1. You could not distribute the table scan across task slots, which meant large tables took a long time to snapshot and you could not increase throughput by adding resources.

Flink CDC 2.x introduced chunk-based snapshot reading. The initial snapshot is split into primary-key-range chunks, each chunk is read independently by different task slots, and the results are merged before the job switches to binlog mode. A 500M row table that took 4 hours to snapshot at parallelism 1 might complete in 30 minutes at parallelism 8.

The chunk-based approach also handles the consistency problem: each chunk is read with a consistent snapshot, and binlog events that arrive during the snapshot are buffered and replayed after the chunk is complete. You get a globally consistent initial state without taking a database-level lock.

Flink CDC 3.0 introduced a pipeline API designed for whole-database synchronization. Instead of defining a separate Flink job for each table, you define a pipeline that maps a source database (or set of tables using regex patterns) to a target:

source:
  type: mysql
  hostname: mysql-host
  port: 3306
  username: flink-user
  password: password
  tables: orders_db\\..*

sink:
  type: doris
  fenodes: doris-fe:8030
  username: root
  password: ""

pipeline:
  name: MySQL to Doris Sync
  parallelism: 4

The pipeline handles schema evolution automatically - if you add a column to a MySQL table, the pipeline propagates the DDL change to the target. This is the right tool when you are replicating an entire operational database to an analytics system and do not want to manage per-table Flink jobs.

CDC + Iceberg: The Real-Time Lakehouse Pattern

The most common production use of Flink CDC is replicating operational databases into a lakehouse in near real-time. The combination of CDC source and Iceberg sink gives you:

  • Continuous replication of INSERT, UPDATE and DELETE from MySQL or Postgres
  • ACID writes to S3 or GCS via Iceberg
  • Time travel and snapshot queries on the target table
  • Schema evolution on the Iceberg side as your source schema changes
CREATE TABLE orders_iceberg (
    order_id    INT,
    customer_id INT,
    amount      DECIMAL(10, 2),
    status      STRING,
    PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
    'connector'            = 'iceberg',
    'catalog-name'         = 'hive_catalog',
    'catalog-type'         = 'hive',
    'uri'                  = 'thrift://hive-metastore:9083',
    'warehouse'            = 's3://my-bucket/warehouse',
    'format-version'       = '2',
    'write.upsert.enabled' = 'true'
);

INSERT INTO orders_iceberg SELECT * FROM orders_cdc;

Iceberg format version 2 supports row-level deletes, which is what allows the upsert behavior. Without it, DELETE operations from CDC have nowhere to go.

Apache Paimon is worth knowing here as an alternative to Iceberg for this pattern. Paimon (developed by the Flink community, originally called Flink Table Store) was built specifically for streaming updates. It handles CDC change events as first-class operations and has tighter integration with Flink’s changelog semantics than Iceberg does out of the box.

The natural question is when to use Flink CDC directly versus the more traditional Debezium + Kafka approach.

Use Flink CDC directly when:

  • You have a single destination - replicate database to lakehouse, database to another database, database to search index
  • You want fewer moving parts - no Kafka cluster to operate, no Debezium connector to configure
  • Latency matters - removing Kafka cuts end-to-end latency by eliminating broker write and consumer lag

Use Debezium + Kafka when:

  • Multiple downstream systems need the same change stream - the Kafka topic becomes a durable, replayable event log for any consumer
  • You need the CDC events to be available to non-Flink consumers - other services, other languages
  • You need long-term retention of the change stream - Kafka’s configurable retention gives you a replay buffer that Flink CDC does not

The architecture is not either/or. A common pattern is Debezium + Kafka for databases that multiple systems consume, and Flink CDC directly for dedicated single-destination replication pipelines.

Checkpointing from Scratch

Checkpointing is how Flink achieves fault tolerance. At configurable intervals, Flink snapshots the state of every operator in your job and writes it to durable storage. If the job fails, it restarts from the last completed checkpoint and replays events from there.

Getting this right from day one saves you a painful afternoon during your first production incident.

Basic Configuration

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// Trigger a checkpoint every 60 seconds
env.enableCheckpointing(60_000);

CheckpointConfig config = env.getCheckpointConfig();

// EXACTLY_ONCE is the default and what you want for most pipelines
config.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

// Abort the checkpoint attempt if it runs longer than 30 seconds
config.setCheckpointTimeout(30_000);

// Only one checkpoint in-flight at a time
config.setMaxConcurrentCheckpoints(1);

// Wait at least 30 seconds after a checkpoint completes before starting the next
// Prevents checkpointing from consuming all available I/O
config.setMinPauseBetweenCheckpoints(30_000);

// Retain checkpoints on disk when the job is cancelled
// Without this you lose your recovery point on a clean stop
config.setExternalizedCheckpointCleanup(
    ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
);

AT_LEAST_ONCE is faster because Flink does not need to align checkpoint barriers across streams, but it means operators may process some records twice on recovery. For idempotent sinks this is fine. For anything that counts money or drives a state machine, use EXACTLY_ONCE.

Choosing a State Backend

The state backend determines where operator state lives during execution.

HashMapStateBackend - stores state in the JVM heap. Fast access, but you are bounded by available memory per task slot. The default. Right for jobs where state per key is small or the total keyspace is manageable.

EmbeddedRocksDBStateBackend - stores state on local disk using RocksDB, with recent entries cached in memory. Handles large state (hundreds of GBs per task slot) that would OOM with the heap backend. Incremental checkpoints are only available with RocksDB.

// true = enable incremental checkpoints
env.setStateBackend(new EmbeddedRocksDBStateBackend(true));

Incremental checkpoints with RocksDB are a significant operational win when your state is large. Instead of snapshotting the entire state on every checkpoint, Flink only writes the delta since the last checkpoint. Checkpoint time drops from minutes to seconds for large stateful jobs.

The tradeoff: RocksDB is slower to read and write than the heap backend due to serialization overhead and disk I/O. If your job is small state and latency-sensitive, stick with HashMapStateBackend.

Checkpoint Storage

You also need to configure where Flink persists checkpoint data:

// Local filesystem - dev and test only
config.setCheckpointStorage(
    new FileSystemCheckpointStorage("file:///tmp/flink/checkpoints")
);

// S3 - production
config.setCheckpointStorage(
    new FileSystemCheckpointStorage("s3://my-bucket/flink/checkpoints")
);

Or via flink-conf.yaml:

state.checkpoints.dir: s3://my-bucket/flink/checkpoints
state.backend: rocksdb
state.backend.incremental: true

Never use local filesystem for production checkpoints. If a TaskManager node goes down, you lose that checkpoint and fall back to an older one - or have no recovery point at all.

For S3, make sure you have the right Flink S3 filesystem plugin on the classpath (flink-s3-fs-presto or flink-s3-fs-hadoop). Drop the jar into $FLINK_HOME/plugins/s3-fs-presto/ and Flink will pick it up automatically.

Savepoints vs Checkpoints

Checkpoints are automatic, Flink-managed, and cleaned up after a newer checkpoint completes. Savepoints are manual, application-managed, and survive job cancellation by default.

Use savepoints when:

  • Upgrading your job to a new version with code changes
  • Migrating to a different cluster or Flink version
  • Changing job parallelism
  • Making schema changes to your operator state
# Trigger a savepoint while the job is running
flink savepoint <job-id> s3://my-bucket/flink/savepoints

# Restart from a savepoint
flink run -s s3://my-bucket/flink/savepoints/<savepoint-id> my-job.jar

For savepoints to survive across job versions, assign explicit uid values to your operators. Without them, Flink auto-generates uids from the job graph topology. Change the topology - add an operator, reorder a chain - and Flink cannot match old state to the new operators.

events
    .keyBy(PageEvent::getUserId)
    .window(TumblingEventTimeWindows.of(Duration.ofMinutes(5)))
    .aggregate(new PageCountAggregator())
    .uid("page-count-aggregator")  // stable across job versions
    .name("Page Count Aggregator");

Assign uids to every stateful operator. This is the most commonly skipped step, and it bites teams on their first production upgrade when they discover their savepoint is incompatible with the new job graph.

What Managed Services Provide

Running Flink yourself means operating JobManagers, TaskManagers, high-availability coordination (ZooKeeper or Kubernetes leader election), checkpoint storage, metrics pipelines, and scaling. Managed services absorb most of this.

Formerly Kinesis Data Analytics for Apache Flink. You provide a JAR or Python script, AWS handles the cluster.

What you get:

  • Checkpoints automatically written to S3 - you specify the bucket, AWS handles the rest
  • Application-level HA with automatic recovery on failure
  • CloudWatch metrics and logging without any configuration
  • Snapshot support (savepoints via the MSF console or API)
  • Auto-scaling based on KPU utilization - configurable min/max parallelism
  • No cluster to manage - billing is per Kinesis Processing Unit (KPU)

What you give up:

  • No access to the underlying infrastructure - no JVM flag tuning beyond what MSF config exposes
  • Connector compatibility is limited to what the MSF runtime supports. Not every third-party connector works without bundling it into your JAR and dealing with classpath conflicts
  • Flink version upgrades lag the community by 6-12 months - you take the version AWS ships
  • Cold start time for paused applications can be 2-3 minutes

MSF is the right call if you are AWS-native and Kinesis-first. The Kinesis connector is first-class, IAM integration is seamless, and VPC support is solid. If you are Kafka-first, MSF still works but the MSK/Kafka setup requires more networking configuration, and you lose some of the tight integration benefits.

The managed checkpointing is particularly valuable. AWS handles checkpoint storage path configuration, lifecycle management, and retention policies. You get the fault tolerance benefit without managing any of the checkpoint infrastructure yourself.

Confluent’s managed Flink is SQL-first. You write Flink SQL, Confluent runs the compute. Tables map directly to Kafka topics in your Confluent environment, and Schema Registry integration is automatic.

What you get:

  • Zero cluster management
  • SQL-native interface - no JAR deployment, no build process
  • Topic-to-table mapping out of the box - a Kafka topic becomes a Flink table with schema pulled from Schema Registry
  • Auto-scaling
  • Compute pools that scale independently from your Kafka partitions

The tradeoff: this is the Table API and SQL world. If your pipeline needs DataStream-level logic - custom state management, complex event pattern matching, CEP - this is not the right platform. Confluent managed Flink is best for analytics and operational pipelines that are naturally SQL-shaped.

Ververica Platform

Ververica was founded by the original Apache Flink creators and is now owned by Alibaba Cloud, with the platform also available as a standalone enterprise product. It is the most operationally complete managed Flink environment.

What you get over running Flink yourself:

  • SQL gateway and visual workbench for authoring Flink SQL jobs
  • Application lifecycle management with blue/green deployments and savepoint-based upgrades
  • Advanced state management tooling - state size inspection, eviction controls
  • Better observability than vanilla Flink - operator-level metrics, flame graph support, backpressure visualization
  • Support for both DataStream and Table API deployments

Ververica is the choice when you need DataStream-level control without the operational overhead of running the cluster yourself, and you are not locked into a single cloud provider. It is also the natural fit for teams running Flink at a scale where the raw open-source tooling becomes a bottleneck.

Google Cloud Dataflow

Worth a mention even though it is Apache Beam, not native Flink. Dataflow can use Beam’s Flink runner under the hood, but the default runner is Dataflow’s own. If you are building on GCP, Dataflow is the managed streaming platform - you get auto-scaling, managed state, and tight integration with Pub/Sub, GCS and BigQuery. The tradeoff is that you are writing Beam pipelines, not Flink-native code. The concepts are similar but the APIs are not the same, and the operational model differs enough that Flink experience does not transfer directly.

Picking Your Setup

Self-hosted Flink on Kubernetes via the Flink Kubernetes Operator is the most flexible path. Every connector, every config option, any Flink version you need. The operational cost is real but manageable if your team already runs Kubernetes workloads.

AWS Managed Service for Apache Flink is the right call if you are AWS-native, want minimal ops overhead, and your connectors fit within what MSF supports. The managed checkpointing and recovery alone save significant operational time.

Confluent managed Flink is the fastest path from idea to running pipeline if your logic is SQL-expressible and you are already on Confluent Kafka.

Ververica is the choice for DataStream-level control without cluster management, particularly in multi-cloud or on-prem environments.

Whichever path you take: configure checkpointing from day one, assign UIDs to every stateful operator, and test a full recovery from checkpoint before your pipeline goes live. Discovering your checkpoint storage was misconfigured - or that your state was not actually being persisted - during a production incident is not a situation you want to be in.

FAQ

Do I have to choose between the DataStream API and Table API?

No. You can mix them in a single job using StreamTableEnvironment. The two surfaces are bridges, not separate runtimes. In practice, most jobs use one or the other for 90% of the logic with occasional crossover.

What Flink version should I start with?

As of 2026, Flink 1.19 / 1.20 are the stable choices for new projects. Flink 2.0 introduced breaking changes to some APIs. Check connector compatibility for your version before upgrading - not all connectors release on the same cadence as Flink core.

How do I size TaskManagers?

Start with 2-4 CPU cores and 4-8 GB memory per TaskManager, then adjust based on your state size and processing latency. For RocksDB state backends, provision additional disk - RocksDB will use it. For heap state backends, add memory and monitor GC pressure.

What happens if a checkpoint fails?

By default, a failed checkpoint is retried on the next interval. If checkpoints fail consistently (e.g., because they take longer than the configured timeout), Flink will eventually fail the job depending on your tolerable-checkpoint-failure-number setting. Consistently failing checkpoints are a signal that checkpoint interval, timeout, or state size need tuning.

Can I run Flink SQL without a Flink cluster?

Yes - the Flink SQL gateway and SQL Client can be run locally against a mini-cluster for development. For production, you need a proper cluster or a managed service.