[...]
// Kafka Streams (Java):
builder
  .stream("input-stream",
    Consumed.with(Serdes.String(), Serdes.String()))
  .groupBy((key, value) -> value)
  .count()
  .toStream()
  .to("counts", Produced.with(Serdes.String(), Serdes.Long()));

// ksqlDB (SQL):

SELECT x, count(*) FROM stream GROUP BY x EMIT CHANGES;
[...]