AdSenseHeader


Bulk Loading Data into Cassandra Using SSTableLoader
Why Use SSTableLoader:
                        When you want to move the data from any database to Cassandra database the best option is SSTableloader in Cassandra. By using this we can transfer the data very fast.

Steps to loading the data into Cassandra:

  • Create Keyspace in the Casssandra.
  • Create table based on your requirement using CQLSH.
  • Create a .csv file from the existing data 
  •       Then use SSTableloader move the data into Cassandra.
          Step1: Creating Keyspace

                CREATE KEYSPACE sample WITH REPLICATION = {‘class’ : 'SimpleStrategy', 'replication_factor' : 1 };

Step 2: Creating table based on your requirement .
              CREATE TABLE sample.users (
            key uuid,
            firstname ascii,
            lastname ascii,
            password ascii,
            age ascii,
            email ascii,
            PRIMARY KEY (key, firstname));  
     
               In the above i am creating table users .Primary keys are key and firstname.

Step 3:

Creating the .csv based on your table.

How to create CSV file using Java:

Sample program to create CsvFile:

import java.io.FileWriter;

public class CreateCsv {

       public static void main(String[] args) {
              generateCsvFile("E:/csv/records.csv");
       }

       public static void generateCsvFile(String csvName) {
              try {
                     FileWriter writer = new FileWriter(csvName);
                     for (int i = 0; i < 1000000; i++) {
                          
                           writer.append(Integer.toString(i));
                           writer.append(',');
                           writer.append("26");
                           writer.append('\n');
                          
                     }

                     writer.flush();
                     writer.close();
                     System.out.println("Success");
              } catch (Exception e) {
                     e.printStackTrace();
              }
       }

}

These are mandatory steps after the  create project for sstableloader

·        In the project to upload the all the jars of Cassandra. These jars all are available in lib folder and  tools folder of Cassandra tar or zip file provided by the Datastax.
·        And also upload the Cassandra.yaml file of conf folder in Cassandra tar or zip file of Datastax.
·         And also upload the .csv file to the project.For example I put the sstable.csv in my project.

Step 4:
Creating the data for sstableloader using java program.

package com.cassandra.ramu;


import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
import static org.apache.cassandra.utils.UUIDGen.decompose;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;

import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.AsciiType;
import org.apache.cassandra.db.marshal.CompositeType;
import org.apache.cassandra.db.marshal.CompositeType.Builder;
import org.apache.cassandra.db.marshal.UUIDType;
import org.apache.cassandra.dht.Murmur3Partitioner;
import org.apache.cassandra.io.sstable.SSTableSimpleUnsortedWriter;

public class SStableBuilder {

       static String csvfilename = "sstable.csv";

       public static void main(String[] args) {
             
              try {
                     buildSStables();
              } catch (Exception e) {
                     // TODO Auto-generated catch block
                     e.printStackTrace();
              }
       }
      
       public static void buildSStables() throws Exception {

              String keyspace = "sample";
              String table = "users";
              File directory = new File(keyspace + "/" + table);
             
              if (!directory.exists()) {
                     directory.mkdirs();
           }

              List<AbstractType<?>> compositeColumnValues = new ArrayList<AbstractType<?>>();
              compositeColumnValues.add(AsciiType.instance);
              compositeColumnValues.add(AsciiType.instance);
              CompositeType compositeColumn = CompositeType.getInstance(compositeColumnValues);

              SSTableSimpleUnsortedWriter bulkWriter = new SSTableSimpleUnsortedWriter(
                           directory, new Murmur3Partitioner(), keyspace, table,
                           compositeColumn, null, 64);

              // Create a single timestamp for each insert
              long timestamp = System.currentTimeMillis() * 1000;

              BufferedReader reader = new BufferedReader(new FileReader(csvfilename));
              String line;
              int lineNumber = 1;
              CsvEntry entry = new CsvEntry();

              while ((line = reader.readLine()) != null) {

                     if (entry.parse(line, lineNumber)) {

                           ByteBuffer uuid = ByteBuffer.wrap(decompose(entry.key));
                           bulkWriter.newRow(uuid);
                          
                           Builder builder = compositeColumn.builder();
                           builder.add(bytes(entry.firstname));
                           builder.add(bytes("firstname"));
                           bulkWriter.addColumn(builder.build(), bytes(entry.firstname), timestamp);
                          
                           builder = compositeColumn.builder();
                           builder.add(bytes(entry.firstname));
                           builder.add(bytes("lastname"));
                           bulkWriter.addColumn(builder.build(), bytes(entry.lastname), timestamp);
                          
                           builder = compositeColumn.builder();
                           builder.add(bytes(entry.firstname));
                           builder.add(bytes("password"));
                           bulkWriter.addColumn(builder.build(), bytes(entry.password), timestamp);
                          
                           builder = compositeColumn.builder();
                           builder.add(bytes(entry.firstname));
                           builder.add(bytes("age"));
                           bulkWriter.addColumn(builder.build(), bytes(String.valueOf(entry.age)), timestamp);
                          
                           builder = compositeColumn.builder();
                           builder.add(bytes(entry.firstname));
                           builder.add(bytes("email"));
                           bulkWriter.addColumn(builder.build(), bytes(entry.email), timestamp);
                          
                        
                          
                     }
                     lineNumber++;
              }

              reader.close();
              System.out.println("Success");
              bulkWriter.close();
              System.exit(0);
       }

       static class CsvEntry {
              UUID key;
              String firstname;
              String lastname;
              String password;
              long age;
              String email;

              boolean parse(String line, int lineNumber) {
                     // Ghetto csv parsing
                     String[] columns = line.split(",");
                     if (columns.length != 6) {
                           System.out.println(String.format(
                                         "Invalid input '%s' at line %d of %s", line,
                                         lineNumber, csvfilename));
                           return false;
                     }
                     try {
                           key = UUID.fromString(columns[0].trim());
                           firstname = columns[1].trim();
                           lastname = columns[2].trim();
                           password = columns[3].trim();
                           age = Long.parseLong(columns[4].trim());
                           email = columns[5].trim();
                           return true;
                     } catch (NumberFormatException e) {
                           System.out.println(String.format(
                                         "Invalid number in input '%s' at line %d of %s", line,
                                         lineNumber, csvfilename));
                           return false;
                     }
              }
       }

}


In the above SStableBuilder java program create a data for loading the data into Cassandra.

Before run the SStableBuilder.java the project structure as fallows.


After run the sstableBuilder.java the project structure as fallows.

After run the sstableBuilder.java program created data in the above format.


Step 5:

                   Run the SSTABLELOADER command from the cmd.
CMD:
                     sstableloader  -d 127.0.0.1 pathofaboveusers
                   for example above workspace location in D drive .You can go to D drive then give the path  upto users folder
for example


In the above I was declare in my Cassandra.yaml file put my IP.But If you declare cassandra.yaml file 127.0.0.1 you can put the 127.0.0.1.
Path is upto your folder where the data is created through sstablebuilder program
 

Then check in cqlsh.

You can see the data in the table.


5 comments:

  1. This comment has been removed by the author.

    ReplyDelete
  2. Does the first column of the csv data being imported and the first column destination table have to be a uuid value for this example to work?

    What about a csv file and cassandra table without uuid a column? Would you make the newRow call using the concatenated string values of the primary key columns?

    Also, what is "decompose" here? Ie. in what java package is it defined?

    ReplyDelete
  3. Without UUID is also same but some changes are required.

    You need to define in CSV Entry class as fallows

    String key;
    key = columns[0].trim();

    when you are reading the line of Csv in the code you need to change like as fallows

    String sss=entry.key;
    bulkWriter.newRow(ByteBuffer.wrap(sss.getBytes()));

    Builder builder = compositeColumn.builder();
    builder.add(bytes(entry.key)); //Here you need to give the primary key
    builder.add(bytes("ts"));
    bulkWriter.addColumn(builder.build(), bytes(String.valueOf(entry.ts)), timestamp);

    ReplyDelete
  4. How fast is this tool?
    I have 160GB csv file to insert. How much time should I expect?

    ReplyDelete
  5. I tried compiling this with 3.0.4 cassandra and it gets an error:

    SStableBuilder.java:18: error: SSTableSimpleUnsortedWriter is not public in org.apache.cassandra.io.sstabl

    ReplyDelete

 
Top