cassandra-driver-mapping

Entity Mapper Add-on for the DataStax Java Driver (Driver) for Cassandra (C*).
This Add-on allows you to synchronize schema automatically and persist JPA annotated entities in Cassandra.

No mapping files, no scripts, no configuration files.
No need to create Tables and Indexes for your Entity manually.
Entity definition will be automatically synchronized with Cassandra.

Add-on is not replacement for the Driver but lightweight Object Mapper on top of it.
You still can utilize full power of the Driver API and Datastax documentation.
Mapping Add-on relies on JPA 2.1 and Driver 3+

More Usage Samples in Unit Tests

Spring Framework Example

Table of Contents

Features

The features provided by the module include:

Jump Start

    <dependency>
      <groupId>com.valchkou.datastax</groupId>
      <artifactId>cassandra-driver-mapping</artifactId>
      <version>3.0.0</version>
    </dependency>

All new changes and bugfixes are released within the latest version as soon as coded. Module versioning policy matches underlying datastax driver core versioning.

	import com.datastax.driver.core.Session;
	import com.datastax.driver.core.Cluster;
	import com.datastax.driver.mapping.MappingSession;
	...

	// initialize datastax cluster.
	Cluster cluster = Cluster.builder()
        .withClusterName("myCluster").addContactPoint("127.0.0.1").build();

	// initialize datastax session.
	Session session = cluster.connect();

	// initialize mapping.
	MappingSession mappingSession = new MappingSession("keyspace_name", session);

MappingSession is very lightweight.
Underlying Datastax Session does all the heavylifting and is expansive.
For more info about Datastax Session please refer to Datastax Dcumentation. or Spring Framework Example.

	Entity entity = new Entity();
	mappingSession.save(entity);
	Entity entity = mappingSession.get(Entity.class, id);
	mappingSession.delete(entity);	

Synchronization is a cool feature but you can completely or partially disable it using SyncOptions.
Supported SyncOptions are: DoNotSync, DoNotAddColumns, DoNotDropColumns.
SyncOptions can be set for all or specific entities as shown below:

	/** Turn synchronization with C* off: */
	SyncOptions syncOptions = SyncOptions.withOptions().doNotSync());

	/** Turn synchronization off for specific Entity */
	SyncOptions syncOptions = SyncOptions.withOptions().doNotSync(Entity1.class);

	/** Turn synchronization with C* off except specific Entity: */
	SyncOptions syncOptions = SyncOptions.withOptions().doNotSync().doSync(Entity1.class);

	/** 
	 *  Do not add or drop columns for Entities.   
	 *  This will not affect initial synchronization when Table is created for the first time. 
	 */
	SyncOptions syncOptions = SyncOptions.withOptions().add(SyncOptionTypes.DoNotAddColumns);
	SyncOptions syncOptions = SyncOptions.withOptions().add(SyncOptionTypes.DoNotAddColumns).add(SyncOptionTypes.DoNotDropColumns);
	SyncOptions syncOptions = SyncOptions.withOptions().add(Entity1.class, SyncOptionTypes.DoNotDropColumns);

Pass SyncOptions into the MappingSession

	/** Constractor */
	MappingSession mappingSession = new MappingSession("keyspace_name", session, syncOptions);

	/** Setter */
	mappingSession.setSyncOptions(syncOptions);

Mapping Session API

To explore complete api go to MappingSession.java
Synchronous samples are in UnitTests MappingSessionTest.java
Asynchronous samples are in UnitTests MappingSessionAsyncTest.java

More samples below:

Write

    /** Persist Entity */
    save(entity);

    /** Persist Entity with WriteOptions*/
    save(entity, writeOptions);

    /** Remove an item or items from the Set or List. */
    remove(id, Entity.class, propertyName, item);

    /** Append value to the Set, List or Map. Value can be a single value, a List, Set or a Map. */
    append(id, Entity.class, propertyName, value);

    /** Append value to the Set, List or Map with WriteOptions. Value can be a single value, a List, Set or a Map. */
    append(id, Entity.class, propertyName, value, writeOptions);

    /** Save Individual Value. */
    updateValue(id, Entity.class, propertyName, value);

    /** Save Individual Value with WriteOptions. */
    updateValue(id, Entity.class, propertyName, value, writeOptions);

    /** Place value at the beginning of the List. 
     *  Value can be a single value or a List. */
    prepend(id, Entity.class, propertyName, value);

    /** Place value at the beginning of the List with WriteOptions. 
     *  Value can be a single value or a List. */
    prepend(id, Entity.class, propertyName, value, writeOptions);

    /** Replace item at the specified position in the List. */
    replaceAt(id, Entity.class, propertyName, item, index);

    /** Replace item at the specified position in the List with WriteOptions. */
    replaceAt(id, Entity.class, propertyName, item, index, writeOptions);
    
    /** Asynchronously Persist Entity */
    saveAsync(entity);

    /** Asynchronously Persist Entity with WriteOptions */
    saveAsync(entity, writeOptions);

    /** Asynchronously Remove an item or items from the Set or List. */
    removeAsync(id, Entity.class, propertyName, item);

    /** Asynchronously Append value to the Set, List or Map. Value can be a single value, a List, Set or a Map. */
    appendAsync(id, Entity.class, propertyName, value);

    /** Asynchronously Append value to the Set, List or Map with WriteOptions. Value can be a single value, a List, Set or a Map. */
    appendAsync(id, Entity.class, propertyName, value, writeOptions);

    /** Asynchronously Save Individual Value. */
    updateValueAsync(id, Entity.class, propertyName, value);

    /** Asynchronously Save Individual Value with WriteOptions. */
    updateValueAsync(id, Entity.class, propertyName, value, writeOptions);

    /** Asynchronously Place value at the beginning of the List. 
     *  Value can be a single value or a List. */
    prependAsync(id, Entity.class, propertyName, value);

    /** Asynchronously Place value at the beginning of the List with WriteOptions. 
     *  Value can be a single value or a List. */
    prependAsync(id, Entity.class, propertyName, value, writeOptions);

    /** Asynchronously Replace item at the specified position in the List. */
    replaceAtAsync(id, Entity.class, propertyName, item, index);

    /** Asynchronously Replace item at the specified position in the List with WriteOptions. */
    replaceAtAsync(id, Entity.class, propertyName, item, index, writeOptions);

Save/Upate methods accept "WriteOptions" argument.
Supported write options are: ConsistencyLevel, RetryPolicy, Timestamp, TTL.
Examples:

	import com.datastax.driver.mapping.option.WriteOptions;
	import com.datastax.driver.core.policies.DefaultRetryPolicy;
	import com.datastax.driver.core.ConsistencyLevel;
	...
	// create options
	WriteOptions options = new WriteOptions()
		.setTtl(300)
		.setTimestamp(42)
		.setConsistencyLevel(ConsistencyLevel.ANY)
		.setRetryPolicy(DefaultRetryPolicy.INSTANCE);

	Entity entity = new Entity();
	entity = mappingSession.save(entity, options);
	import com.datastax.driver.mapping.option.WriteOptions;
	...
	Entity entity = new Entity();
	ResultSetFuture f = mappingSession.saveValueAsync(entity, new WriteOptions().setTtl(300));

You can work with your collection properties as you would normally work with other entity properties.
In addition C* provides optimized operations on collections. Those operations do not require to load and save the whole entity. C* allows us directly manipulate collections.

// append item to list
mappingSession.append(id, Entity.class, "cats", "Black Cat");

// append item to be expired in 5 sec
mappingSession.append(id, Entity.class, "cats", "Expired Cat", new WriteOptions().setTtl(5));

// prepend item
mappingSession.prepend(id, Entity.class, "cats", "First Cat");

// replace item at specified index
mappingSession.replaceAt(id, Entity.class, "cats", "Grey Cat", 1);

// append List of items
List<String> addCats = new ArrayList<String>();
addCats.add("Red Cat");
addCats.add("Green Cat");
mappingSession.append(id, Entity.class, "cats", addCats);

// remove item
mappingSession.remove(id, Entity.class, "cats", "Grey Cat");

// remove List of items
List<String> removeCats = new ArrayList<String>();
removeCats.add("Red Cat");
removeCats.add("Green Cat");
mappingSession.remove(id, Entity.class, "cats", removeCats);

// remove all items
mappingSession.deleteValue(id, Entity.class, "cats");
// append item
mappingSession.append(id, Entity.class, "dogs", "Black Dog");

// append item to be expired in 5 sec
mappingSession.append(id, Entity.class, "dogs", "Expired Dog", new WriteOptions().setTtl(5));

// append Set of items
Set<String> addDogs = new HashSet<String>();
addDogs.add("Red Dog");
addDogs.add("Green Dog");
mappingSession.append(id, Entity.class, "dogs", addDogs);

// remove item
mappingSession.remove(id, Entity.class, "dogs", "Black Dog");

// remove Set of items
Set<String> removeDogs = new HashSet<String>();
removeDogs.add("Red Dog");
removeDogs.add("Green Dog");
mappingSession.remove(id, Entity.class, "dogs", removeDogs);

// remove all items
mappingSession.deleteValue(id, Entity.class, "dogs");
/** append item */
Map<String, BigInteger> pets = new HashMap<String, BigInteger>();
pets.put("Red Dogs", 25);
pets.put("Black Cats", 50);
mappingSession.append(id, Entity.class, "pets", pets);

/** append items to be expired in 5 sec */
Map<String, BigInteger> pets = new HashMap<String, BigInteger>();
pets.put("Green Dogs", 25);
pets.put("Brown Cats", 50);
mappingSession.append(id, Entity.class, "pets", pets, new WriteOptions().setTtl(5));

/** remove all items */
mappingSession.deleteValue(id, Entity.class, "pets");

Read

    /** Get Entity by Id(Primary Key) */
    Entity e = mappingSession.get(Entity.class, id);

    /** Get Entity by Id(Primary Key) with Options */
    Entity e = mappingSession.get(Entity.class, id, readOptions);

    /** Get Collection of Entities by custom Query Statement  */
    List<Entity> list = mappingSession.getByQuery(Entity.class,  queryStatement);

    /** Get Collection of Entities by custom Query String  */
    List<Entity> list = mappingSession.getByQuery(Entity.class,  queryString);


    /** Convert custom ResultSet into Collection of Entities */
    List<Entity> list = mappingSession.getFromResultSet(Entity.class, resultSet);

    /** Convert Row of ResultSet into Entity instance. */
    Entity e =  mappingSession.getFromRow(Entity.class, row);

    /** Convert Rows of ResultSet into Collection of Entities. */
    List<Entity> list = mappingSession.getFromRows(Entity.class, rows);
    
	import com.datastax.driver.mapping.option.ReadOptions;
	import com.datastax.driver.core.policies.DefaultRetryPolicy;
	import com.datastax.driver.core.ConsistencyLevel;
	...
	// using options
	ReadOptions options = new ReadOptions()
		.setConsistencyLevel(ConsistencyLevel.ANY)
		.setRetryPolicy(DefaultRetryPolicy.INSTANCE);

	Entity entity = mappingSession.get(Entity.class, id, options);
  1. run using mapping session
import com.datastax.driver.mapping.MappingSession;
...
List<Entity> result = mappingSession.getByQuery(Entity.class, query);
  1. run using DataStax session and map the ResultSet
import com.datastax.driver.core.Session;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.mapping.MappingSession;
...
ResultSet rs = session.execute(query);
List<Entity> result = mappingSession.getFromResultSet(Entity.class, rs);

Section below describes how you can build Custom Queries.

	import com.datastax.driver.mapping.MappingSession;
	...

	// build query
	String query = "SELECT name, age, birth_date, salary FROM person");

	// run query						
	List<Entity> result = mappingSession.getByQuery(Entity.class, query);	
	import com.datastax.driver.core.Statement;
	import com.datastax.driver.core.querybuilder.QueryBuilder;
	import com.datastax.driver.mapping.MappingSession;
	...

	// build query
	Statement query = QueryBuilder.select().all().from("your_keyspace", "your_table").where(eq("column", value));

	// run query						
	List<Entity> result = mappingSession.getByQuery(Entity.class, query);
	import com.datastax.driver.core.Statement;
	import com.datastax.driver.core.querybuilder.QueryBuilder;
	import com.datastax.driver.mapping.MappingSession;
	import com.datastax.driver.mapping.EntityFieldMetaData;
	import com.datastax.driver.mapping.EntityTypeMetadata;
	...

	// get Entity Metadata
	EntityTypeMetadata emeta = EntityTypeParser.getEntityMetadata(Entity.class);

	// get field metadata by property/field name
	EntityFieldMetaData fmeta = emeta.getFieldMetadata(field_name);

	// build query.
	Statement query = QueryBuilder.select().all()
		.from("your_keyspace", emeta.getTableName()).where(eq(fmeta.getColumnName(), value));

	// run query
	List<Entity> result = mappingSession.getByQuery(Entity.class, query);

This is the coolest feature of the module. Your Entity doesn't have to match the table.
You can populate any entity from any query (Any-to-Any).
Consider example:

	public class AnyObject {
		private String name;
		private int age;
		// public getters/setters ...
	}

You can populate this object from any ResultSet which contains 'name' and 'age' columns.

	ResultSet rs = session.execute("SELECT name, age, birth_date, salary FROM person");
	List<AnyObject> result = mappingSession.getFromResultSet(AnyObject.class, rs);

In this particular case 'name' and 'age' will be populated on 'AnyObject'. 'birth_date' and 'salary' will be ignored and no errors will be thrown.
The biggest advantage that we can reuse the same entity to query different results from even different tables. Entity doesn't have to map, match or relate to the table at all. Many thank to magic gnomes under the hood making all these work.

Delete

    /** Delete Entity  */
    delete(entity);

    /** Delete Entity by ID(Primary key) */
    delete(Entity.class, id);

    /** Asynchronously delete Entity  */
    deleteAsync(entity);

    /** Asynchronously Delete Entity by ID(Primary key) */
    deleteAsync(Entity.class, id);

    /** Delete Individual Value */
    deleteValue(id, Entity.class, propertyName);
            
    /** Asynchronously Delete Individual Value */
    deleteValueAsync(id, Entity.class, propertyName);
    

Batch

	mappingSession.withBatch()
		.save(entityA)
		.save(entityB, writeOptions)
		.delete(entityD)
		.execute();
	ResultSetFuture f = mappingSession.withBatch()
		.save(entityA)
		.save(entityB, writeOptions)
		.delete(entityD)
		.executeAsync();

Various Mappings

IMPORTANT!!!
- Each persistant field MUST have publlic Getter/Setter.
- If entity or field is not annotated it will provide its name as default.
- Id field is required and must be annotated with @Id or @EmbeddedId.
- Index name must be unique within the keyspace.
- C* supports only single-column-index.

Basic Mapping

import javax.persistence.Id;
import javax.persistence.Table;
import javax.persistence.Column

@Table (name="mytable")
public class Entity {

	@Id
	private long Id;

	@Column(name = "myname")
	private String name;

	// @Column is not required
	private int age;

	@Transient
	private BigDecimal calculable;

	// public getters/setters ...
}

CQL3 Statement

   CREATE TABLE IF NOT EXISTS ks.mytable (id bigint, myname text, age int, PRIMARY KEY(id))

Mapping Indexes

import javax.persistence.Id;
import javax.persistence.Table;
import javax.persistence.Column
import javax.persistence.Index
import java.util.UUID

@Table (name="mytable",
indexes = {
	@Index(name="entity_email_idx", columnList="email" ),
	@Index(name="entity_name_idx", columnList="myname" )
})
public class Entity {

	@Id
	private java.util.UUID code;

	@Column(name = "myname")
	private String name;
	private String email;
	// public getters/setters ...
}

CQL3 Statement

   	CREATE TABLE IF NOT EXISTS ks.mytable (code uuid, myname text, email text,  PRIMARY KEY(code));
	CREATE INDEX IF NOT EXISTS entity_email_idx ON ks.mytable(email);
	CREATE INDEX IF NOT EXISTS entity_name_idx ON ks.mytable(myname);

Compound Primary Key

public class CompositeKey {
	private String name;
	private int rank;
	// public getters/setters ...
}
import javax.persistence.Table;
import javax.persistence.EmbeddedId;

@Table(name="entity")
public class Entity {
	@EmbeddedId
	private CompositeKey key;
	private String email;
	// public getters/setters ...
}

CQL3 Statement

CREATE TABLE IF NOT EXISTS ks.entity (name text,  rank int, email text,  PRIMARY KEY(name, rank))

Composite Partition Key

public class PartitionKey {
	private String firstName;
	private String lastName;
	// public getters/setters ...
}
public class CompositeKey {
	@EmbeddedId
	private PartitionKey key;
	private int age;
	// public getters/setters ...
}
import javax.persistence.Table;
import javax.persistence.EmbeddedId;

@Table(name="entity")
public class Entity {
	@EmbeddedId
	private CompositeKey key;
	private String email;
	// public getters/setters ...
}

CQL3 Statement

   CREATE TABLE IF NOT EXISTS ks.entity (firstname text, lastname text, age int, email text,  PRIMARY KEY((firstname, lastname), age))

Table Properties

import javax.persistence.Id;
import javax.persistence.Table;
import javax.persistence.Column

import com.datastax.driver.mapping.annotation.TableProperties;
import com.datastax.driver.mapping.annotation.TableProperty;

@Table (name="mytable")
@TableProperties(values = {
	@TableProperty("comment='Important records'"),
	@TableProperty("read_repair_chance = 1.0"),
	@TableProperty("compression ={ 'sstable_compression' : 'DeflateCompressor', 'chunk_length_kb' : 64 }")
})
public class Entity {

	@Id
	private long Id;
	private String name;
	// public getters/setters ...
}

CQL3 Statement

   CREATE TABLE IF NOT EXISTS ks.mytable (id bigint, name text, PRIMARY KEY(id)) WITH comment='Important records' AND read_repair_chance = 1.0 AND compression ={ 'sstable_compression' : 'DeflateCompressor', 'chunk_length_kb' : 64 }
public class CompositeKey {
	private String name;
	private int rank;
	// public getters/setters ...
}
import javax.persistence.Table;
import javax.persistence.EmbeddedId;
import com.datastax.driver.mapping.annotation.TableProperties;
import com.datastax.driver.mapping.annotation.TableProperty;

@Table(name="entity")
@TableProperties(values = {
		@TableProperty("CLUSTERING ORDER BY (rank DESC)")
	})
public class Entity {
	@EmbeddedId
	private CompositeKey key;
	private String email;
	// public getters/setters ...
}

CQL3 Statement

   CREATE TABLE IF NOT EXISTS ks.entity (name text,  rank int, email text,  PRIMARY KEY(name, rank)) WITH CLUSTERING ORDER BY (rank DESC)

Override Column Type, TIMEUUID.

In case you don't like defaults you are able to override the type on the column level.
For example you want to leverage "time UUID" for timeseries data instead of "random UUID".

import javax.persistence.Id;
import javax.persistence.Table;
import javax.persistence.Column

@Table (name="mytable")
public class Entity {

	@Id
	@Column(name="uid", columnDefinition="timeuuid") // case insensitive
	private UUID uid;

	@Column(name="name", columnDefinition="VarChaR") // case insensitive
	private String name;
	// public getters/setters ...
}

CQL3 Statement

   CREATE TABLE IF NOT EXISTS ks.mytable (uid timeuuid, name varchar, PRIMARY KEY(uid))

Mixed Case for Column Names

Cassandra converts all names to lowercase. This is default and recommended approach.
But in case you need enforce the case you will need to wrap you names in double quotes.

import javax.persistence.Id;
import javax.persistence.Table;
import javax.persistence.Column

@Table (name="mytable")
public class Entity {

	@Id
	@Column(name = "\"KEY\"")
	private int id;
	private String firstName;

	@Column(name = "\"last_NAME\"")
	private String lastName;

	@Column(name = "AGE")
	private int age;
	// public getters/setters ...
}

CQL3 Statement

   CREATE TABLE IF NOT EXISTS ks.mytable ("KEY" int, firstName text, "last_NAME" text, AGE int, PRIMARY KEY("KEY"))

Collections

Collections must have generic type defined. Only java.util.List, Map and Set are allowed.
By default implementation of HashMap, HashSet and ArrayList are used.

@Table (name="entity")
public class Entity {
	...
	private List<String> cats;
	private Set<Date> dogs;
	private Map<String, BigInteger> pets;
	...
}

If you are unhappy with defaults and would like your data to be baked with specific collection implementation you can apply an annotation as shown below.
NOTE: this is strictly java side feature and does not effect how your data stored in C*.

import com.datastax.driver.mapping.annotation.CollectionType;
	...
@Table (name="entity")
public class Entity {
	...
	@CollectionType(LinkedList.class)
	private List<String> cats;

	@CollectionType(TreeSet.class)
	private Set<Date> dogs;

	@CollectionType(TreeMap.class)
	private Map<String, BigInteger> pets;
	...
}

CQL3 Statement

   CREATE TABLE IF NOT EXISTS ks.entity (id uuid, cats list<text>, dogs set<timestamp>, pets map<text, varint>,  PRIMARY KEY(id))

TTL

import com.datastax.driver.mapping.annotation.Ttl;
...
@Ttl(300) // expires in 5 minutes
@Table (name="mytable")
public class Entity {
   ...
}

This is TTL for the entity will be applied whenever entity of this type is saved. NOTE: @Ttl does not set table property: 'default_time_to_live'. You can set default with @TableProperty
Also you can override @TTL or default_time_to_live at the time you save entity as:

mappingSession.save(entity, new WriteOptions().setTtl(600)); // expires in 10 minutes

Static columns

import com.datastax.driver.mapping.annotation.Static;
...

@Table (name="mytable")
public class Entity {
    @EmbeddedId
    private ClusteringKey key;

    @Static
    private long balance;
    ...
}
public class ClusteringKey {
    private String user;
    private int expense_id;
    ...
}

CQL3 Statement

   CREATE TABLE IF NOT EXISTS ks.mytable (user text, expense_id int, balance bigint static,  PRIMARY KEY(user, expense_id))

ID, UUID and TimeUUID.

uuid and timeuuid are often used in Primary Key.
This section describes few important features working with uuid type.

    private UUID someUuid;

    @Column(columnDefinition="timeuuid")
    private UUID someTimeuuid;    
    @GeneratedValue
    private UUID someUuid;

    @GeneratedValue
    @Column(columnDefinition="timeuuid")
    private UUID someTimeuuid;    

If uuid or timeuuid are NULL on insert the Mapping Module will build CQL with uuid() or now() functions respectively.
The drowback of this approach that CQL does not return generated id back.
It means when you do obj = save(obj) the obj will not have its uuid set though it will be set in C*.
This approach works fine for write-and-forget. But in case you need to know id you have to set it manually.

UUID id = UUID.randomUUID();
import com.datastax.driver.core.utils.UUIDs;
import java.util.UUID;

UUID id = UUIDs.timeBased();
import java.util.Date;
import java.util.UUID;
import com.datastax.driver.core.utils.UUIDs;

public class DateUtil {
    public static Date timeUUID2Date(UUID uuid) {
        long time = UUIDs.unixTimestamp(uuid);
        return new Date(time);
    }
}

Optimistic Lock & @Version

Cassandra does not have built-in locking. But it supports conditional writes with Lightweight Transactions.

Mapping Add-on enables optimistic locking using annotation @Version which utilizes Lightweight Transactions feature.
The property must be of "long" data type. Whenever you save entity the version gets incremented and as result of operation the updated entity is returned. On attempt to save not-the-latest one the "null" will be returned and no error will be thrown.

	import javax.persistence.Id;
	import javax.persistence.Table;
	import javax.persistence.Version;

	@Table(name="entity")
	public class EntityWithVersion {
		@Id
		private java.util.UUID id;

		@Version
		private long version;
		// public getters/setters ...
	}

	@Test
	public void entityWithVersionTest() throws Exception {
		UUID id = UUID.randomUUID();
		EntityWithVersion obj = new EntityWithVersion();
		obj.setId(id);
		obj.setName("ver1");

		EntityWithVersion loaded = mappingSession.get(EntityWithVersion.class, id);
		assertNull(loaded);

		// save object ver1 
		EntityWithVersion saved = mappingSession.save(obj);

		// get object ver1
		EntityWithVersion obj1 = mappingSession.get(EntityWithVersion.class, id);
		assertEquals(obj1, saved);
		assertEquals(1, saved.getVersion());

		// save object ver2
		saved = mappingSession.save(saved);
		EntityWithVersion obj2 = mappingSession.get(EntityWithVersion.class, id);
		assertEquals(obj2, saved);
		assertEquals(2, saved.getVersion());

		saved = mappingSession.save(obj1);
		assertNull(saved);
	}