[...] // Kafka Streams (Java): builder .stream("input-stream", Consumed.with(Serdes.String(), Serdes.String())) .groupBy((key, value) -> value) .count() .toStream() .to("counts", Produced.with(Serdes.String(), Serdes.Long())); // ksqlDB (SQL): SELECT x, count(*) FROM stream GROUP BY x EMIT CHANGES; [...]