
Getting started with the Apache Cassandra database
Believable
Once upon a time, Facebook set itself the goal of offering its users a search option in their inbox, which meant storing a reverse index of all messages. At that time, however, no database would have been able to do this safely and inexpensively. The MySQL database used until then had reached its limits at a size of 7TB and more than 100 million users.
Two Facebook programmers, Avinash Lakshman (one of the authors of Amazon's DynamoDB) and Prashant Malik, set out to develop a completely new storage system that would bear up in the face of the huge amount of data and a high growth rate while being fail-safe, fast, and economical [1]. The result was Apache Cassandra [2].
Facebook released Cassandra in 2008 under an open source license, and two former Rackspace employees founded their own company in 2010 to launch a commercial Cassandra offshoot in 2011: DataStax Enterprise (DSE) has several additional features compared with Apache Cassandra and is now used by some of the world's largest companies, including Netflix, Microsoft, Deloitte, The Home Depot, Walgreens, and Facebook and its subsidiary Instagram. (See the "Cassandra Hands-on" box.)
What Is Cassandra?
Cassandra is a Java-implemented, error-tolerant, distributed and column-oriented NoSQL database that uses the wide column store layout. This type of database relies on column families, groups of related columns, that resemble relational tables. Unlike the relational model, you can add or delete columns per row at any time.
Column families in turn can contain groups of columns and are then known as super column families. In the wide column store universe, the keyspace comes closest to the concept of the schema in the relational world. The keyspace can contain several column families as well as functions or views. In general, each application has one keyspace. The cluster in turn houses the keyspaces.
Cassandra uses each of the nodes in the cluster, which together form a logical ring and can also be distributed across different data centers for reading and writing. Master and slave roles do not exist, and the node computers can be relatively inexpensive off-the-shelf hardware.
For each data item, a consistent hash algorithm determines its position in the ring; it traverses the ring clockwise until it finds the first node whose position is greater than the data item's position and identifies that node as the one responsible for the data item. This node, known as the coordinator, also replicates the data item to a configurable number of other nodes. It can follow adjustable policies and makes sure replication always includes nodes in remote data centers.
Because each node is only responsible for the section of the ring up to its predecessor, the failure or insertion of a node never affects the entire ring, only the immediate neighbors of the affected node.
Cassandra cannot achieve all desirable goals at the same time. As is well known, in a distributed system, only two of the three requirements of the CAP theorem – consistency (C), availability (A), and partition tolerance (P, failure tolerance) – can be guaranteed (Figure 1).

Cassandra vouches for A and P, providing only eventual consistency. It can happen that simultaneous identical queries on two different nodes lead to different results because data to be written has not been stored immediately at all locations. Instead, the write operation is confirmed when a quorum of replicas to which the request was forwarded confirms the operation. The results will eventually be consistent again but might not be consistent at the time of the query. Users have to live with that.
The number of nodes can be increased at any time to prevent bottlenecks, and a number of nodes can always fail without affecting the database function.
Bookkeeping for Books
The first example is a simple table containing books reviewed in the German Linux-Magazin since the beginning of 2018. For each title, the table stores the ISBN, the year of publication, the title, the book publisher, the names of up to three authors, the number of issues, and the issue number in which the review appeared. If such a table existed in MySQL, it could be sorted very easily (e.g., by the issue number; Listing 1). A somewhat more complex question, such as the number of reviewed titles per publisher (Listing 2), could be solved almost as easily.
Listing 1: Sorted List of Titles
+------------------------+----------------------------------+-------------+-------+ | Author1 | Title | Publisher | Issue | +------------------------+----------------------------------+-------------+-------+ | Tim Philipp Schäfers | Hacking im Web | Franzis | 18/01 | | Richard Eisenmenger | Nur noch dieses Level! | Rheinwerk | 18/01 | | Joachim Zuckarelli | Statistik mit R | O'Reilly | 18/02 | | Sebastian Erlhofer | Website-Konzeption und Relaunch | Rheinwerk | 18/02 | [...]
Listing 2: Reviews per Publisher
# SELECT Publisher, COUNT(*) AS 'Number' FROM books GROUP BY Publisher ORDER BY Number DESC; +-----------+--------+ | Publisher | Number | +-----------+--------+ | Rheinwerk | 10 | | O'Reilly | 6 | | Wiley | 5 | | Hanser | 5 | | Mitp | 4 | | C.H. Beck | 2 | | Tintal | 1 | | Franzis | 1 | | Dpunkt | 1 | | Apress | 1 | +-----------+--------+
Not so in Cassandra. Although the database has a SQL-flavored query language, CQL, it is strongly tailored for use in a partitioned, distributed NoSQL database that is designed to handle petabytes of data. Language constructs that require, for example, that a filter criterion be compared with each individual line could lead to incalculable performance problems. Restrictions thus apply to their use, and subqueries don't even exist.
Searches with wildcards are also missing. Worse still, joins like those that make connections between tables in the relational world are not intended in the Cassandra realm. In the MySQL case, for example, the database designer would have moved the names of the publishers to a separate table, so as not to store them redundantly in the main table. The designer would only have stored an ID in the main table that points to a publisher name table. You can't do that with Cassandra; instead, redundant data storage is consciously accepted. Anything a query wants to investigate must be in a column family.
Additionally, the aggregation functions known from SQL (e.g., AVG()
, SUM()
, COUNT()
, MAX()
, and MIN()
) are not available per se, although they could be retrofitted as user-defined functions. However, it is more common to create a separate table that contains a counter updated by each INSERT
or DELETE
rather than a COUNT()
function.
Cassandra requires a completely different data modeling technique compared with relational databases because of these special features. Whereas database developers in the relational world rely on the table as the basic element and express relationships between tables through joins, Cassandra's starting point is the query.
Often a table is designed to support precisely one query. If it is not possible to merge related data in a table, the application logic must establish the connection.
For the book reviews example, this could mean that the designer would also create a table for each publisher with a count column to determine the number of reviewed titles from that publisher without having to rely on COUNT()
. The application would then be responsible for updating the count table each time it is inserted or deleted.
If a table like the one in the example only contains a small amount of data, this kind of filtering will work as well. The first attempt, however, will result in an error message that points to possible performance effects that are difficult to calculate. A user who is sure that this cannot happen in their case therefore explicitly needs to allow filtering with ALLOW FILTERING
(Listing 3).
Listing 3: ALLOW FILTERING
# SELECT Authors, Title FROM titles WHERE Publisher='Mitp' ALLOW FILTERING; Authors | Title --------------------------------------+----------------------------------------- {1: 'Ralf Jesse'} | Embedded Linux mit Raspberry Pi und Co. {1: 'F. Chollet', 2: 'J.J. Allaire'} | Deep Learning mit R und Keras {1: 'Aditya Y. Bhargava'} | Algorithmen kapieren {1: 'Winfried Seimert'} | Inkscape
Books can have a different number of authors. In the relational model – and in Cassandra, as well – a number of fields could be reserved for authors (e.g., three: Author1
, Author2
, and Author3
). If the book has only one author, Author2
and Author3
contain zero values. But if the book has four authors, one has to be dropped.
Cassandra has an elegant solution to this problem: maps, which are fields that can contain any number of key-value pairs of the same meaning. Maps could be used to define a field for the authors, as in Listing 4, which also demonstrates inserts into such a table and a select.
Listing 4: Maps
01 # CREATE TABLE titles 02 ( 03 isbn TEXT, 04 year INT, 05 titles TEXT, 06 publisher TEXT, 07 authors MAP<INT,TEXT>, 08 circulation INT, 09 issue TEXT, 10 PRIMARY KEY(publisher, issue, isbn) 11 ); 12 13 # INSERT INTO titles(isbn, year, title, publisher, authors, circulation, issue) VALUES('978-3836244091', 2017, 'Nur noch dieses Level!', 'Rheinwerk', {1 : 'Richard Eisenmenger'}, 1, '18/01'); 14 15 INSERT INTO titles(isbn, year, title, publisher, authors, circulation, issue) VALUES('978-3836262477', 2018, 'Objektorientierte Programmierung', 'Rheinwerk', {1 :'Bernhard Lahres', 2 : 'Gregor Rayman', 3 : 'Stefan Strich'}, 4, '18/09'); 16 17 # SELECT Title FROM titles WHERE authors CONTAINS 'Stefan Strich' ALLOW FILTERING; 18 19 Title 20 ---------------------------------- 21 Objektorientierte Programmierung
Everything Is Fine
Another difference that causes SQL stomach pain is sorting. Before you can sort, you first need to look at the different keys of a Cassandra table:
- If a single column is defined as the primary key, this key is also the partition key, which determines the location of the data (i.e., it picks the node of the distributed database on which the data ends up). As already mentioned, an algorithm uses a hash value calculated from the partition key to determine a token that is permanently assigned to a specific node.
- For a compound primary key, the partition key is the first component (i.e., the first expression in the key definition).
- The partition key can also comprise several columns, which in this case have to be bracketed in the definition.
- The second component of the compound primary key is the clustering key, which determines the sort order within the partition. Data can only be returned in an orderly manner with this sort criterion.
Therefore, where required, you have to construct the primary key when creating the table such that it comprises several parts, and its second part, the clustering key, reflects the sort sequence that you will need later. Moreover, ORDER BY
can only be used if an EQ
or IN
condition restricts the partition key.
To get something similar to the MySQL example (Listing 1), first define the primary key so that the output column becomes the clustering key:
PRIMARY KEY (publisher, issue, isbn)
After that, an EQ condition would be needed to restrict the partition key (the publisher
row in this case); then, the result could be sorted by the output number (Listing 5).
Listing 5: Sorted Output
# SELECT authors, title, publisher, issue FROM titles WHERE publisher='Hanser' ORDER BY issue ALLOW FILTERING; authors | title | publisher | issue -----------------------+-----------------------------------------+-----------+------- {1: 'Dirk Louis', 2: 'Peter M¸ller'} |Java | Hanser | 18/08 {1: 'Herbert Dowalil'} |Grundlagen des modularen Softwareentwurfs| Hanser | 18/08 {1: 'Jonas Freiknecht', 2: 'Stefan Papp'} |Big Data in der Praxis | Hanser | 18/10 {1: 'Jörg Frochte'} |Maschinelles Lernen | Hanser | 18/11 {1: 'Heiko Kalista'} |Python 3 | Hanser | 19/05
In any case, Cassandra users, when designing the database, have to consider carefully which queries the database will need to answer later. The freedom to think about the evaluation afterward, unlike with the relational model, does not exist.
More Data and a Cluster
A second example is intended to illustrate a more typical NoSQL application case. This time, measured values and timestamps need to be saved. Also, instead of a single instance of Cassandra, a small cluster will be used, which can be set up with the help of Docker for demo purposes. The measurement data here are run times of a ping
against the Linux-Magazin website.
Cron executes a simple shell script every minute, which essentially comprises only one line:
/bin/ping -c1 -D -W5 www.linux-magazin.de | /home/jcb/Cassandra/pingtest.pl
The script might also need to set some environment variables for Perl. The pingtest.pl
Perl script (Listing 6) takes the output from ping
, extracts the values with a regular expression, and stores them in the Cassandra database. In case of emergency, error handling would be necessary, but at this point I want to keep things as simple as possible.
Listing 6: pingtest.pl
01 #!/usr/bin/perl 02 use Cassandra::Client; 03 04 my $client=Cassandra::Client->new( 05 contact_points=>['172.21.0.2', '172.21.0.4', '172.21.0.5'], 06 username => 'admin', 07 password => '********', 08 keyspace => 'pingtest' 09 ); 10 11 $client->connect; 12 13 foreach my $line ( <STDIN> ) { 14 if($line =~ /\[(\d+.\d+)\](.*)time=(\d+\.\d+)/) { 15 $client->execute("INSERT INTO pingtime (tstamp, tvalue) VALUES(?, ?)", 16 [$1*1000, $3], 17 { consistency => "one" }); 18 } 19 } 20 21 $client->shutdown;
The timestamp, which ping
prints thanks to the -D
option, represents Unix time (i.e., the number of seconds since January 1, 1970, 0:00 hours). It is multiplied here by 1,000 to obtain the value in milliseconds needed by Cassandra. Also bear in mind that this time is in UTC, which explains the time difference of two hours from central European summer time (CEST=UTC+2) if you live in Germany, as I do.
If you later stop and restart the cluster several times, the internal IP addresses of the node computers can change. If this happens, the addresses in the Perl script need to be adapted; otherwise, the database server will not be found. The script uses the Cassandra::Client module, which you can best install from the Comprehensive Perl Archive Network (CPAN). All dependencies – and there are many in this case – are then automatically resolved by the installation routine.
The easiest way to do this is to start an interactive CPAN shell and install the module:
perl -MCPAN -e shell install Cassandra::Client
A C compiler is also required.
The column family that will store the timestamp and ping the round-trip time (RTT) is simple:
CREATE TABLE pingtime (tstamp timestamp, tvalue float, PRIMARY KEY(tvalue, tstamp) ) WITH default_time_to_live = 259200 AND CLUSTERING ORDER BY (tstamp DESC);
This statement will take a long time to generate terabytes of data, but it will create at least 1,500 entries per day. If you want to limit the number, you could – as in the example above – specify a time to live (TTL) value (in seconds) for the entire table. Cassandra then automatically deletes the data after this period – three days in this example. Instead of a definition for the whole table, the retention time can also be defined by INSERT
. In this case, you would append USING TTL 259200
to the corresponding statement.
This useful feature might tempt you to think about using Cassandra even if you are not dealing with a huge volume of data.
Thus far, I have only brushed the surface of one genuine Cassandra highlight. Clustering and replication – thus, high-performance, fail-safe databases – do not require massive overhead on the part of the admin. Most of these tasks are done automatically by the database.
In Practice
You might want to avoid following the steps in this section on a low-powered laptop, because the load might make it difficult to use – at least temporarily.
To set up a small Cassandra cluster based on Docker, you need to install Docker Compose, a tool for managing multicontainer applications, in addition to Docker. If the cluster is to be operated across different physical hosts, you would have to choose Docker Swarm or Kubernetes; however, this example is deliberately restricted to one host, which keeps configuration overhead low and the installation simple.
On Ubuntu, the easiest way to install the required software is to use the package manager. Afterward, you need to enable Docker control over the TCP interface – instead of just through the local socket file – by editing /lib/systemd/system/docker.service
. Add a hash mark in front of the existing line, as shown, and add the second line in its place:
#ExecStart=/usr/bin/dockerd -H fd:// ExecStart=/usr/bin/dockerd -H fd:// -H tcp://0.0.0.0:2375
Next, type:
systemctl daemon-reload service docker restart
Log out as root and test your setup with a non-privileged user ID by typing:
curl http://localhost:2375/version
Working as this user, create a cassandra
subdirectory for the cluster configuration somewhere below your home directory and a docker-compose.yml
file with the content from Listing 7. This file provides all the information Docker needs to find the right images; download, unpack, and install them on the desired number of nodes; establish a network connection between the nodes; and create storage volumes.
The Compose file defines four services as components of the application. Each service uses exactly one Docker image. Three of the services are Cassandra nodes, the fourth, Portainer, provides a simple web GUI for controlling the service containers.
The environment
section sets some environment variables, which later overwrite settings with the same name in the central Cassandra configuration file, /etc/cassandra/cassandra.yaml
, on the respective node. In this way, you can manage with just one image for all Cassandra nodes and still configure each node individually. In addition to the variables shown here, you could use a number of other variables, if needed. Details can be found in the documentation for the Cassandra image [4].
In the Compose file, the image name is followed by a fairly cryptic shell command that is used to start the nodes one after another at an interval of one or two minutes at the very first launch, when the Cassandra data directory is still empty. Attempting to start everything at the same time at first launch will just not work.
Now the $DOCKER_HOST
environment variable has to be set host side:
export DOCKER_HOST=localhost:2375
The installation process, which could take a little while, can be started from the cassandra
directory with the
docker-compose up -d command.
Portainer
As soon as everything is finished, the Portainer GUI can be called in the browser on localhost:10001 (Figure 2) for an overview of the running containers, their states, the images they are based on, the networks, and the volumes.

From here, you can click your way to more specific views. For example, from an overview page of the containers (Figure 3), you can start, stop, pause, resume, remove, or add containers. The overview also reveals the IP addresses of the individual containers on the internal network. From here, you can check the container log (Figure 4), display some performance statistics for each container, inspect the container configurations, or log into a shell on a container.


If you dig down even deeper into the details of a single container, you will see that you can create a new image from the running container, which means you can persist changes to the database configuration. The access rights to the container can also be set here.
Cluster Playground
As a last test, if everything is working as desired, you can use Portainer to open a Bash shell on one of the node computers and enter:
nodetool status
Nodetool is a comprehensive tool for managing, monitoring, and repairing the Cassandra cluster. The output of the status
subcommand should look like Figure 5. All three nodes must appear, the status should be Up U and Normal N, and each node should have an equal share of the data.

nodetool status
command indicates that everything is OK.Finally, you can start playing around with the cluster. From the docker host, log into the DC1N1 node created in Listing 7 by specifying its IP address and the port reserved for cqlsh
in the configuration – by default this is port 9042:
Listing 7: docker-compose.yml
01 version: '3' 02 services: 03 # Configuration for the seed node DC1N1 04 # The name could stand for datacenter 1, node 1 05 DC1N1: 06 image: cassandra:3.10 07 command: bash -c 'if [ -z "$$(ls -A /var/lib/cassandra/)" ] ; then sleep 0; fi && /docker-entrypoint.sh cassandra -f' 08 # Network for communication between nodes 09 networks: 10 - dc1ring 11 # Mapped the volume to a local directory. 12 volumes: 13 - ./n1data:/var/lib/cassandra 14 # Environment variable for the Cassandra configuration. 15 # CASSANDRA_CLUSTER_NAME must be identical on all nodes. 16 environment: 17 - CASSANDRA_CLUSTER_NAME=Test Cluster 18 - CASSANDRA_SEEDS=DC1N1 19 # Expose ports for cluster communication 20 expose: 21 # Intra-node communication 22 - 7000 23 # TLS intra-node communication 24 - 7001 25 # JMX 26 - 7199 27 # CQL 28 - 9042 29 # Thrift service 30 - 9160 31 # recommended Cassandra Ulimit settings 32 ulimits: 33 memlock: -1 34 nproc: 32768 35 nofile: 100000 36 37 DC1N2: 38 image: cassandra:3.10 39 command: bash -c 'if [ -z "$$(ls -A /var/lib/cassandra/)" ] ; then sleep 60; fi && /docker-entrypoint.sh cassandra -f' 40 networks: 41 - dc1ring 42 volumes: 43 - ./n2data:/var/lib/cassandra 44 environment: 45 - CASSANDRA_CLUSTER_NAME=Test Cluster 46 - CASSANDRA_SEEDS=DC1N1 47 depends_on: 48 - DC1N1 49 expose: 50 - 7000 51 - 7001 52 - 7199 53 - 9042 54 - 9160 55 ulimits: 56 memlock: -1 57 nproc: 32768 58 nofile: 100000 59 60 DC1N3: 61 image: cassandra:3.10 62 command: bash -c 'if [ -z "$$(ls -A /var/lib/cassandra/)" ] ; then sleep 120; fi && /docker-entrypoint.sh cassandra -f' 63 networks: 64 - dc1ring 65 volumes: 66 - ./n3data:/var/lib/cassandra 67 environment: 68 - CASSANDRA_CLUSTER_NAME=Test Cluster 69 - CASSANDRA_SEEDS=DC1N1 70 depends_on: 71 - DC1N1 72 expose: 73 - 7000 74 - 7001 75 - 7199 76 - 9042 77 - 9160 78 ulimits: 79 memlock: -1 80 nproc: 32768 81 nofile: 100000 82 83 # A web-based GUI for managing containers. 84 portainer: 85 image: portainer/portainer 86 networks: 87 - dc1ring 88 volumes: 89 - /var/run/docker.sock:/var/run/docker.sock 90 - ./portainer-data:/data 91 # Access to the web interface from the host via 92 # http://localhost:10001 93 ports: 94 - "10001:9000" 95 networks: 96 dc1ring:
$ cqlsh -uadmin 172.21.0.2 9042
This command requires a local Cassandra installation on top of the cluster, which is included with the CQL shell. Alternatively, you can use Portainer to open a Linux shell on one of the nodes and then launch cqlsh
there. Next, entering
CREATE KEYSPACE pingtest WITH replication = {'class':'SimpleStrategy','replication_factor':3};
creates the keyspace for the ping run-time example on the three-node cluster with three replicas.
High Availability, No Hard Work
Without having to do anything else, the database now stores copies of every row in the pingtest
keyspace on every node in the cluster. For experimental purposes, the consistency level can be set to different values, either interactively for each session or by using the query in pingtest.pl
.
A value of one
(Listing 6, line 17) means, for example, that only one node needs to confirm the read or write operation for the transaction to be confirmed. More nodes are required for settings of two
or three
, and all the nodes if you select all
. A quorum
setting means that a majority of the nodes at all the data centers across which the cluster is distributed must confirm the operation, whereas local_quorum
means that a configurable minimum number at the data center that hosts the coordinator node for this row is sufficient.
In this way, Cassandra achieves what is known as tunable consistency, which means that users can specify for each individual query what is more important to them. In an Internet of Things application, simple confirmation of a node might be sufficient, the upside being database performance boosts because the database does not have to wait for more nodes to confirm the results. With a financial application, on the other hand, you might prefer to play it safe and have the result of the operation confirmed by a majority of the nodes. In this case, you also have to accept that this will take a few milliseconds longer.
Once you have set up the pingtime
table (Listing 6, line 15) in the distributed pingtest
keyspace and have enabled the cron job described earlier so that data is received, you should now be able to retrieve the same data from the table on all nodes. If this is the case, then replication is working as ordered.
You could now use Portainer to shut down individual nodes. Depending on the consistency setting, the selects will provoke an error message if you address a surviving node that has the data available but cannot find a sufficient number of coworkers to confirm the result.
If you then reactivate a node some time later, no further steps are required in the ideal case. Cassandra takes care of the resynchronization on its own. The prerequisite is that the hinted_handoff_enabled
variable in the central cassandra.yaml
configuration file is set to true
. Cassandra stores, in the form of hints, the write operations the node missed because of its temporary failure and automatically retrofits them as soon as the node becomes available once again. For more complicated cases, such as the node having been down for so long that you run out of space for hints, nodetool
has a repair
command.
If you browse the Cassandra documentation [5], you will find many interesting commands and procedures that you can try on a small test cluster to get a feel for the database that will certainly pay dividends when you go live.
Conclusions
Cassandra is a powerful distributed NoSQL database especially suited for environments that need to handle large volumes of data and grow rapidly. Several advanced features, such as built-in failover, automatic replication, and self-healing after node failure, make it interesting for a wide range of application scenarios. However, migration from a traditional relational database management system (RDBMS) to Cassandra is unlikely to be possible without some overhead because of the need to redesign the data structure and queries.