1   package eu.fbk.knowledgestore.datastore.hbase;
2   
3   import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.DEFAULT_CON_FAM_NAME;
4   import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.DEFAULT_CON_QUA_NAME;
5   import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.DEFAULT_CON_TAB_NAME;
6   import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.DEFAULT_ENT_FAM_NAME;
7   import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.DEFAULT_ENT_QUA_NAME;
8   import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.DEFAULT_ENT_TAB_NAME;
9   import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.DEFAULT_MEN_FAM_NAME;
10  import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.DEFAULT_MEN_QUA_NAME;
11  import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.DEFAULT_MEN_TAB_NAME;
12  import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.DEFAULT_RES_FAM_NAME;
13  import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.DEFAULT_RES_QUA_NAME;
14  import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.DEFAULT_RES_TAB_NAME;
15  import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.DEFAULT_USR_FAM_NAME;
16  import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.DEFAULT_USR_QUA_NAME;
17  import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.DEFAULT_USR_TAB_NAME;
18  import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.HADOOP_FS_DEFAULT_NAME;
19  import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.HADOOP_FS_URL;
20  import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.HBASEDATASTORE_TABLEPREFIX_PROP;
21  import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.HBASE_TRAN_LAYER;
22  import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.HBASE_ZOOKEEPER_QUORUM;
23  import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.NATIVE_TRAN_LAYER_OPT;
24  import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.OMID_TRAN_LAYER_OPT;
25  import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.OMID_TSO_HOST;
26  import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.URIDICT_RELATIVEPATH_DEFAULT;
27  import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.URIDICT_RELATIVEPATH_PROP;
28  
29  import java.io.IOException;
30  import java.io.InputStream;
31  import java.io.PrintWriter;
32  import java.net.URL;
33  import java.util.HashMap;
34  import java.util.Map;
35  import java.util.Properties;
36  import java.util.Random;
37  
38  import com.google.common.base.Joiner;
39  import com.google.common.collect.Maps;
40  
41  import org.apache.commons.cli.CommandLine;
42  import org.apache.commons.cli.GnuParser;
43  import org.apache.commons.cli.HelpFormatter;
44  import org.apache.commons.cli.Option;
45  import org.apache.commons.cli.Options;
46  import org.apache.hadoop.fs.FileSystem;
47  import org.apache.hadoop.hbase.client.Delete;
48  import org.apache.hadoop.hbase.client.Get;
49  import org.apache.hadoop.hbase.client.HBaseAdmin;
50  import org.apache.hadoop.hbase.client.HTable;
51  import org.apache.hadoop.hbase.client.Put;
52  import org.apache.hadoop.hbase.client.Result;
53  import org.apache.hadoop.hbase.client.ResultScanner;
54  import org.apache.hadoop.hbase.client.Scan;
55  import org.apache.hadoop.hbase.util.Bytes;
56  import org.openrdf.model.URI;
57  import org.openrdf.model.impl.URIImpl;
58  import org.openrdf.model.vocabulary.RDF;
59  import org.slf4j.Logger;
60  import org.slf4j.LoggerFactory;
61  
62  import eu.fbk.knowledgestore.data.Data;
63  import eu.fbk.knowledgestore.data.Dictionary;
64  import eu.fbk.knowledgestore.data.Record;
65  import eu.fbk.knowledgestore.data.Stream;
66  import eu.fbk.knowledgestore.datastore.DataTransaction;
67  import eu.fbk.knowledgestore.datastore.hbase.utils.AbstractHBaseUtils;
68  import eu.fbk.knowledgestore.datastore.hbase.utils.AvroSerializer;
69  import eu.fbk.knowledgestore.runtime.DataCorruptedException;
70  import eu.fbk.knowledgestore.runtime.Files;
71  import eu.fbk.knowledgestore.vocabulary.KS;
72  
73  /**
74   * Class for update the timestamp in a table
75   */
76  public class HBaseLowlevelUtilities
77  {
78  
79      private HBaseDataStore ds;
80  
81      private HBaseDataTransaction dt;
82  
83      /** Logger object */
84      private static Logger logger = LoggerFactory.getLogger(HBaseLowlevelUtilities.class);
85  
86      private static boolean printCfgFiles = false;
87  
88      private static boolean OmidMode = false;
89  
90      /** regulate the transaction end:
91            if  1 then "commit"     => dataTransaction.end(true);
92            if  0 then "rollback"   => dataTransaction.end(false);
93            if -1 then do-nothing   => empty code
94      **/
95      private static int transactionEndMode = 1;
96  
97      private static String generalPrefix = null;
98  
99      private static String hbaseTableNamePrefix = "";
100 
101     private static String masterHost = "";
102 
103     /**
104      * Constructor.
105      */
106     public HBaseLowlevelUtilities(boolean readOnly) {
107         try {
108             final String propertiesFileName = getClass().getSimpleName() + ".properties";
109             final URL url = getClass().getResource(propertiesFileName);
110             logger.info("url is " + url);
111             final InputStream stream = url.openStream();
112             final Properties properties = new Properties();
113             properties.load(stream);
114             stream.close();
115             logger.info("read properties from file");
116 
117             // check and set the printCfgFiles variable
118             printCfgFiles = Boolean.parseBoolean(properties.getProperty("print.cfg.files", ""
119                     + printCfgFiles));
120 
121 	    /* 
122 	       Override properties from file with those from options
123 	    */
124 
125             // override property HBASE_TRAN_LAYER
126             if (OmidMode) {
127                 properties.setProperty(HBASE_TRAN_LAYER, OMID_TRAN_LAYER_OPT);
128             } else {
129                 properties.setProperty(HBASE_TRAN_LAYER, NATIVE_TRAN_LAYER_OPT);
130             }
131 
132 	    if (! masterHost.equals("")) {
133 		// override property HBASE_ZOOKEEPER_QUORUM
134 		properties.setProperty(HBASE_ZOOKEEPER_QUORUM, masterHost);
135 
136 		// override property HADOOP_FS_URL and HADOOP_FS_DEFAULT_NAME
137 		properties.setProperty(HADOOP_FS_URL, "hdfs://" + masterHost + ":9000/");
138 		properties.setProperty(HADOOP_FS_DEFAULT_NAME, properties.getProperty(HADOOP_FS_URL));
139 
140 		// override property OMID_TSO_HOST
141 		properties.setProperty(OMID_TSO_HOST, masterHost);
142 	    }
143 	    
144             // override property HBASEDATASTORE_TABLEPREFIX_PROP
145 	    if (generalPrefix != null) {
146 		if (generalPrefix.equals("")) {
147 		    hbaseTableNamePrefix = "";
148 		} else {
149 		    hbaseTableNamePrefix = generalPrefix + ".";
150 		}
151 		properties.setProperty(HBASEDATASTORE_TABLEPREFIX_PROP, hbaseTableNamePrefix);
152 
153 		// override property URIDICT_RELATIVEPATH_PROP
154 		String uriDictPath;
155 		if (generalPrefix.equals("")) {
156 		    uriDictPath = "KnowledgeStore/" + URIDICT_RELATIVEPATH_DEFAULT;
157 		} else {
158 		    uriDictPath = "KnowledgeStore." + generalPrefix + "/" + URIDICT_RELATIVEPATH_DEFAULT;
159 		}	    
160 		properties.setProperty(URIDICT_RELATIVEPATH_PROP, uriDictPath);
161 	    }
162 
163 	    // set local variables 
164 	    hbaseTableNamePrefix = properties.getProperty(HBASEDATASTORE_TABLEPREFIX_PROP);
165 	    
166             logger.info("transactionEndMode = " + transactionEndMode);
167 
168             if (printCfgFiles) {
169                 System.out.println("\nBEGIN OF |origXmlCfg|");
170                 System.out.println(Joiner.on("\n").withKeyValueSeparator("=").join(properties));
171                 System.out.println("END OF |origXmlCfg|\n");
172             }
173 
174             // create filesystem
175             final String fsURL = properties.getProperty("fs.url");
176             final Map<String, String> fsProperties = Maps.newHashMap();
177             for (Map.Entry<Object, Object> entry : properties.entrySet()) {
178                 if (entry.getKey().toString().startsWith("fs.")) {
179                     fsProperties.put(entry.getKey().toString(), entry.getValue().toString());
180                 }
181             }
182             final FileSystem fileSystem = Files.getFileSystem(fsURL, fsProperties);
183 
184             // create new DataStore
185             ds = new HBaseDataStore(fileSystem, properties);
186 
187             final org.apache.hadoop.conf.Configuration hbaseCfg = ds.hbaseCfg;
188 
189             if (printCfgFiles) {
190                 // print the two conf files
191                 System.out.println("\nBEGIN OF |hbaseCfg|");
192                 if (hbaseCfg != null) {
193                     org.apache.hadoop.conf.Configuration.dumpConfiguration(hbaseCfg,
194                             new PrintWriter(System.out));
195                 } else {
196                     System.out.println("hbaseCfg null");
197                 }
198                 System.out.println("END OF |hbaseCfg|\n");
199 
200                 System.out.println("\nBEGIN OF |xmlCfg|");
201                 System.out.println(Joiner.on("\n").withKeyValueSeparator("=").join(properties));
202                 System.out.println("END OF |xmlCfg|\n");
203             }
204 
205             ds.init();
206 
207             dt = (HBaseDataTransaction) ds.begin(readOnly);
208 	    logger.info("created dt = ds.begin(" + readOnly + ")");
209 
210 	    if (printCfgFiles) {
211                 final Dictionary<URI> dict = ds.getSerializer().getDictionary();
212                 final String dictUrl = dict.getDictionaryURL();
213                 System.out.println("\nDictionary " + dictUrl + " begin");
214                 for (int i = 1; i < 1000; i++) {
215                     try {
216                         final URI val = (URI) dict.objectFor(i, true);
217                         System.out.println(i + " -> " + val);
218                     } catch (Exception e) {
219                         break;
220                     }
221                 }
222                 System.out.println("Dictionary end\n");
223             }
224 
225             System.out.println("end of " + this.getClass().getSimpleName() + " Constructor");
226 
227         } catch (final Exception e) {
228             e.printStackTrace();
229         }
230     }
231 
232     private void createRecordWithTimestamp(Record record, String tableName,
233 					   String famName, String quaName, long timestamp) throws IOException 
234     {
235 	AbstractHBaseUtils hbaseUtils = dt.getHbaseUtils();
236         HTable hTable = (HTable) hbaseUtils.getTable(tableName);
237         Put put = null;
238         if (hTable != null) {
239             // Transforming data model record into an Avro record
240             AvroSerializer serializer = hbaseUtils.getSerializer();
241             final byte[] bytes = serializer.toBytes(record);
242             // Resource's Key
243             put = new Put(Bytes.toBytes(record.getID().toString()));
244             // Resource's Value
245             put.add(Bytes.toBytes(famName), Bytes.toBytes(quaName), timestamp, bytes);
246         }
247         hTable.put(put);
248     }
249 
250     /*
251       add a delete with the given timestamp (at the whole row)
252     */
253     private void deleteRecordWithTimestamp(Record record, String tableName, long timestamp) 
254 	throws IOException 
255     {
256 	AbstractHBaseUtils hbaseUtils = dt.getHbaseUtils();
257         HTable hTable = (HTable) hbaseUtils.getTable(tableName);
258         Delete del = null;
259         if (hTable != null) {
260             // delete the whole row (i.e. all the column families)
261             del = new Delete(Bytes.toBytes(record.getID().toString()), timestamp);
262         }
263         hTable.delete(del);
264     }
265 
266     private void compactTable(String tableName) 
267 	throws IOException, InterruptedException
268     {
269 	AbstractHBaseUtils hbaseUtils = dt.getHbaseUtils();
270 	HBaseAdmin admin = new HBaseAdmin(hbaseUtils.getHbcfg());
271         admin.flush(tableName);
272         admin.majorCompact(tableName);
273 	admin.close();
274     }
275 
276     class TimestampedRecord {
277 	private Record record;
278 	private long timestamp;
279 	// Constructor
280 	TimestampedRecord(Record r, long t) {
281 	    this.record = r;
282 	    this.timestamp = t;
283 	}
284 	// get Record field
285 	public Record getRecord() {
286 	    return this.record;
287 	}
288 	// get timestamp field
289 	public long getTimestamp() {
290 	    return this.timestamp;
291 	}
292     }
293 
294     private TimestampedRecord getTimestampedRecord(String tableName, String famName, String quaName, String id)
295 	throws IOException {
296 	HBaseLowlevelUtilities.TimestampedRecord tr = null;
297 	AbstractHBaseUtils hbaseUtils = dt.getHbaseUtils();
298 	HTable hTable = (HTable) hbaseUtils.getTable(tableName);
299 	if (hTable != null) {
300 	    Get get = new Get(Bytes.toBytes(id.toString()));
301 	    Result rs = hTable.get(get);
302 	    if (rs == null) {
303 		return null;
304 	    }
305 	    final AvroSerializer serializer = hbaseUtils.getSerializer();
306 	    Record record = (Record) serializer.fromBytes(rs.value());
307 	    long timestamp = rs.getColumnLatest(Bytes.toBytes(famName), Bytes.toBytes(quaName)).getTimestamp();
308 	    tr = new TimestampedRecord(record, timestamp);
309 	}
310 	return tr;
311     }
312 
313     private URI getUriTypeFromTablename (String tableName) 
314     {
315 	if (tableName.equalsIgnoreCase(hbaseTableNamePrefix + DEFAULT_RES_TAB_NAME)) {
316 	    return KS.RESOURCE;
317 	} else if (tableName.equalsIgnoreCase(hbaseTableNamePrefix + DEFAULT_MEN_TAB_NAME)) {
318 	    return KS.MENTION;
319 	} else if (tableName.equalsIgnoreCase(hbaseTableNamePrefix + DEFAULT_ENT_TAB_NAME)) {
320 	    return KS.ENTITY;
321 	} else if (tableName.equalsIgnoreCase(hbaseTableNamePrefix + DEFAULT_CON_TAB_NAME)) {
322 	    return KS.CONTEXT;
323 	} else if (tableName.equalsIgnoreCase(hbaseTableNamePrefix + DEFAULT_USR_TAB_NAME)) {
324 	    return KS.USER;
325 	} else {
326 	    System.out.println("getUriTypeFromTablename: unknown tableName " + tableName);
327 	    return null;
328 	}
329     }
330 
331     private String getFamilyNameFromTablename (String tableName) 
332     {
333 	if (tableName.equalsIgnoreCase(hbaseTableNamePrefix + DEFAULT_RES_TAB_NAME)) {
334 	    return DEFAULT_RES_FAM_NAME;
335 	} else if (tableName.equalsIgnoreCase(hbaseTableNamePrefix + DEFAULT_MEN_TAB_NAME)) {
336 	    return DEFAULT_MEN_FAM_NAME;
337 	} else if (tableName.equalsIgnoreCase(hbaseTableNamePrefix + DEFAULT_ENT_TAB_NAME)) {
338 	    return DEFAULT_ENT_FAM_NAME;
339 	} else if (tableName.equalsIgnoreCase(hbaseTableNamePrefix + DEFAULT_CON_TAB_NAME)) {
340 	    return DEFAULT_CON_FAM_NAME;
341 	} else if (tableName.equalsIgnoreCase(hbaseTableNamePrefix + DEFAULT_USR_TAB_NAME)) {
342 	    return DEFAULT_USR_FAM_NAME;
343 	} else {
344 	    System.out.println("getFamilyNameFromTablename: unknown tableName " + tableName);
345 	    return null;
346 	}
347     }
348 
349     private String getQualifierNameFromTablename (String tableName) 
350     {
351 	if (tableName.equalsIgnoreCase(hbaseTableNamePrefix + DEFAULT_RES_TAB_NAME)) {
352 	    return DEFAULT_RES_QUA_NAME;
353 	} else if (tableName.equalsIgnoreCase(hbaseTableNamePrefix + DEFAULT_MEN_TAB_NAME)) {
354 	    return DEFAULT_MEN_QUA_NAME;
355 	} else if (tableName.equalsIgnoreCase(hbaseTableNamePrefix + DEFAULT_ENT_TAB_NAME)) {
356 	    return DEFAULT_ENT_QUA_NAME;
357 	} else if (tableName.equalsIgnoreCase(hbaseTableNamePrefix + DEFAULT_CON_TAB_NAME)) {
358 	    return DEFAULT_CON_QUA_NAME;
359 	} else if (tableName.equalsIgnoreCase(hbaseTableNamePrefix + DEFAULT_USR_TAB_NAME)) {
360 	    return DEFAULT_USR_QUA_NAME;
361 	} else {
362 	    System.out.println("getQualifierNameFromTablename: unknown tableName " + tableName);
363 	    return null;
364 	}
365     }    
366 
367     private static void endTransaction (DataTransaction dataTran) throws DataCorruptedException, IOException {
368 	if (transactionEndMode == 1) {
369 	    dataTran.end(true);
370 	    logger.info("doTransactionEnd: dataTran.end(true) [= commit]");
371 	} else if (transactionEndMode == 0) {
372 	    dataTran.end(false);
373 	    logger.info("doTransactionEnd: dataTran.end(false) [= rollback]");
374 	} else {
375 	    logger.info("doTransactionEnd: NOTHING [= neither commit nor rollback]");
376 	}
377     }
378 
379     // Generate random integer in range 0..(limit-1)");
380     public static int getRandomInt(final int limit)
381     {
382         final Random randomGenerator = new Random();
383         final int randomInt = randomGenerator.nextInt(limit);
384         return randomInt;
385     }
386 
387     private static void replicateRowkeysInTableWithTimestamp(final String tableName, long timestamp)
388     {
389         System.out.println("replicateRowkeysInTableWithTimestamp: tableName " + tableName + ", timestamp " + timestamp);
390 
391 	/* WARNING: Native mode is required!! */
392 	OmidMode = false;
393 	transactionEndMode = 1;
394 
395         final HBaseLowlevelUtilities hlu = new HBaseLowlevelUtilities(false);
396         if (hlu.ds == null) {
397             return;
398         }
399 
400 	URI type = hlu.getUriTypeFromTablename(tableName);
401 	String famName = hlu.getFamilyNameFromTablename(tableName);
402 	String quaName = hlu. getQualifierNameFromTablename(tableName);
403 	if ((type == null) || (famName == null) || (quaName == null)) {
404 	    return;
405 	}
406 
407         long time1 = 0, time2 = 0, time3 = 0, time4 = 0;
408 
409 	int numRecord = 0;
410         try {
411 	    DataTransaction dataTran = hlu.dt;
412             time1 = System.currentTimeMillis();
413             Stream<Record> cur = dataTran.retrieve(type, null, null);
414             try {
415                 time2 = System.currentTimeMillis();
416                 for (Record r : cur) {
417                     hlu.createRecordWithTimestamp(r, tableName, famName, quaName, timestamp);
418                     numRecord++;
419                 }
420                 time3 = System.currentTimeMillis();
421                 endTransaction(dataTran);
422                 time4 = System.currentTimeMillis();
423             } finally {
424                 cur.close();
425             }
426 	    
427         } catch (final IOException e) {
428             System.out.println("WARNING Exception");
429             e.printStackTrace();
430         }
431 
432         System.out.println("Processed " + numRecord + " records");
433         System.out.println("time 2-1: " + String.valueOf(time2 - time1) + " ms");
434         System.out.println("time 3-2: " + String.valueOf(time3 - time2) + " ms");
435         System.out.println("time 4-3: " + String.valueOf(time4 - time3) + " ms");
436     }
437 
438 
439    private static void addDeleteWithTimestampToAllRowkeysInTable(final String tableName, long timestamp)
440     {
441         System.out.println("addDeleteWithTimestampToAllRowkeysInTable: tableName " + tableName + ", timestamp " + timestamp);
442 
443 	/* WARNING: Native mode is required!! */
444 	OmidMode = false;
445 	transactionEndMode = 1;
446 
447         final HBaseLowlevelUtilities hlu = new HBaseLowlevelUtilities(false);
448         if (hlu.ds == null) {
449             return;
450         }
451 
452 	URI type = hlu.getUriTypeFromTablename(tableName);
453 	if (type == null) {
454 	    return;
455 	}
456 
457         long time1 = 0, time2 = 0, time3 = 0, time4 = 0;
458 
459 	int numRecord = 0;
460         try {
461 	    DataTransaction dataTran = hlu.dt;
462 
463             time1 = System.currentTimeMillis();
464             Stream<Record> cur = dataTran.retrieve(type, null, null);
465             try {
466                 time2 = System.currentTimeMillis();
467                 for (Record r : cur) {
468                     hlu.deleteRecordWithTimestamp(r, tableName, timestamp);
469                     numRecord++;
470                 }
471                 time3 = System.currentTimeMillis();
472                 endTransaction(dataTran);
473                 time4 = System.currentTimeMillis();
474             } finally {
475                 cur.close();
476             }
477 
478         } catch (final IOException e) {
479             System.out.println("WARNING Exception");
480             e.printStackTrace();
481         }
482 
483         System.out.println("Processed " + numRecord + " records");
484         System.out.println("time 2-1: " + String.valueOf(time2 - time1) + " ms");
485         System.out.println("time 3-2: " + String.valueOf(time3 - time2) + " ms");
486         System.out.println("time 4-3: " + String.valueOf(time4 - time3) + " ms");
487     }
488 
489     private static void addDeleteAndReplicateAllRowkeysInTable(final String tableName, long timestamp1, long timestamp2)
490     {
491         System.out.println("addDeleteAndReplicateAllRowkeysInTable: tableName " + tableName + ", timestamp1 " + timestamp1 + ", timestamp2 " + timestamp2);
492 
493 	/* WARNING: Native mode is required!! */
494 	OmidMode = false;
495 	transactionEndMode = 1;
496 
497         final HBaseLowlevelUtilities hlu = new HBaseLowlevelUtilities(false);
498         if (hlu.ds == null) {
499             return;
500         }
501 
502 	URI type = hlu.getUriTypeFromTablename(tableName);
503 	String famName = hlu.getFamilyNameFromTablename(tableName);
504 	String quaName = hlu. getQualifierNameFromTablename(tableName);
505 	if ((type == null) || (famName == null) || (quaName == null)) {
506 	    return;
507 	}
508 
509         long time1 = 0, time2 = 0, time3 = 0, time4 = 0;
510 
511         int numRecord = 0;
512         try {
513             DataTransaction dataTran = hlu.dt;
514 
515             time1 = System.currentTimeMillis();
516             Stream<Record> cur = dataTran.retrieve(type, null, null);
517             try {
518                 time2 = System.currentTimeMillis();
519                 for (Record r : cur) {
520                     hlu.deleteRecordWithTimestamp(r, tableName, timestamp1);
521                     hlu.createRecordWithTimestamp(r, tableName, famName, quaName, timestamp2);
522                     numRecord++;
523                 }
524                 time3 = System.currentTimeMillis();
525                 endTransaction(dataTran);
526                 time4 = System.currentTimeMillis();
527             } finally {
528                 cur.close();
529             }
530 
531         } catch (final IOException e) {
532             System.out.println("WARNING Exception");
533             e.printStackTrace();
534         }
535 
536         System.out.println("Processed " + numRecord + " records");
537         System.out.println("time 2-1: " + String.valueOf(time2 - time1) + " ms");
538         System.out.println("time 3-2: " + String.valueOf(time3 - time2) + " ms");
539         System.out.println("time 4-3: " + String.valueOf(time4 - time3) + " ms");
540     }
541 
542     private static void printIdContentWithTimestamp(final String tableName, final String id)
543     {
544         System.out.println("printIdContentWithTimestamp: tableName " + tableName + ", id " + id);
545 
546 	/* WARNING: Native mode is required!! */
547 	OmidMode = false;
548 	transactionEndMode = 1;
549 
550         final HBaseLowlevelUtilities hlu = new HBaseLowlevelUtilities(true);
551         if (hlu.ds == null) {
552             return;
553         }
554 
555 	String famName = hlu.getFamilyNameFromTablename(tableName);
556 	String quaName = hlu. getQualifierNameFromTablename(tableName);
557 	if ((famName == null) || (quaName == null)) {
558 	    return;
559 	}
560 
561         long time1 = 0, time2 = 0, time3 = 0;
562 
563 	HBaseLowlevelUtilities.TimestampedRecord tRecord = null;
564         try {
565 	    DataTransaction dataTran = hlu.dt;
566 	    
567 	    time1 = System.currentTimeMillis();
568 	    tRecord = hlu.getTimestampedRecord(tableName, famName, quaName, id);
569             time2 = System.currentTimeMillis();
570             endTransaction(dataTran);
571             time3 = System.currentTimeMillis();
572 
573         } catch (final IOException e) {
574             System.out.println("WARNING Exception");
575             e.printStackTrace();
576         }
577 
578 	Record record  = tRecord.getRecord();
579 	long timestamp = tRecord.getTimestamp();
580 	
581 	String str = record.toString(Data.getNamespaceMap(), true);
582 	System.out.println("Found:\n" + str + "\nwith timestamp " + timestamp);
583         System.out.println("time 2-1: " + String.valueOf(time2 - time1) + " ms");
584         System.out.println("time 3-2: " + String.valueOf(time3 - time2) + " ms");
585     }
586 
587     private static void omidize(final String tableName)
588     {
589         System.out.println("omidize: tableName " + tableName);
590 
591 	HBaseLowlevelUtilities hlu;
592 	DataTransaction dataTran;
593 	Record r;
594         long time1 = 0, time2 = 0, time3 = 0, time4 = 0, time5 = 0, time6 = 0;
595 
596 
597 	// a) create a new temporary entry in OMID mode
598 	//
599 	OmidMode = true;
600 	hlu = new HBaseLowlevelUtilities(false);
601 
602 	URI type = hlu.getUriTypeFromTablename(tableName);
603 	String famName = hlu.getFamilyNameFromTablename(tableName);
604 	String quaName = hlu. getQualifierNameFromTablename(tableName);
605 	if ((type == null) || (famName == null) || (quaName == null)) {
606 	    return;
607 	}
608 
609 	transactionEndMode = 1;
610 	time1 = System.currentTimeMillis();
611 
612 	try {
613 	    dataTran = hlu.dt;
614 	    r = Record.create();
615 	    r.set(RDF.TYPE, type);
616 	    URI id = new URIImpl("rol:///tmp_omidize_" + Integer.toString(getRandomInt(6666)));
617 	    r.setID(id);
618 	    dataTran.store(type, r);
619 	    endTransaction(dataTran);
620 	    logger.info("new temporary record created with id " + id);
621 	    time2 = System.currentTimeMillis();
622 
623 	    // b) get the timestamp of the new entry as a OMID VCTS
624 	    //
625 	    OmidMode = false; // WARNING: Native mode is required!
626 	    hlu = new HBaseLowlevelUtilities(true);
627 	    dataTran = hlu.dt;
628 	    TimestampedRecord tr = hlu.getTimestampedRecord(tableName, famName, quaName, id.toString());
629 	    long VCTS = tr.getTimestamp();
630 	    endTransaction(dataTran);
631 	    r = tr.getRecord();
632 	    logger.info("VCTS for " + r.toString() + " is " + VCTS);
633 	    time3 = System.currentTimeMillis();
634 	
635 	    // c) fix all the entries in the table with such VCTS 
636 	    //
637 	    addDeleteAndReplicateAllRowkeysInTable(tableName,VCTS-1,VCTS);
638 	    System.out.println("fixed all entries in table " + tableName);
639 	    time4 = System.currentTimeMillis();
640 	
641 	    // d) + e) delete the temporary entry and compact the table
642 	    //
643 	    OmidMode = false; // WARNING: Native mode is required!
644 	    hlu = new HBaseLowlevelUtilities(false);
645 	    dataTran = hlu.dt;
646 	    r = Record.create();
647 	    r.set(RDF.TYPE, type);
648 	    r.setID(id);
649 	    hlu.deleteRecordWithTimestamp(r, tableName, VCTS);
650 	    time5 = System.currentTimeMillis();
651 	    hlu.compactTable(tableName);
652 	    endTransaction(dataTran);
653 	    logger.info("deleted new temporary record with id " + id + " and compacted table");
654 	    time6 = System.currentTimeMillis();
655 
656 	} catch (final Exception e) {
657             System.out.println("WARNING Exception");
658             e.printStackTrace();
659 	}
660 
661         System.out.println("time 2-1: " + String.valueOf(time2 - time1) + " ms");
662         System.out.println("time 3-2: " + String.valueOf(time3 - time2) + " ms");
663         System.out.println("time 4-3: " + String.valueOf(time4 - time3) + " ms");
664         System.out.println("time 5-4: " + String.valueOf(time5 - time4) + " ms");
665         System.out.println("time 6-5: " + String.valueOf(time6 - time5) + " ms");
666     }
667 
668     private static void printStatisticsOfTimestampsOfTableEntries(final String tableName)
669     {
670         System.out.println("printStatisticsOfTimestampsOfTableEntries: tableName " + tableName);
671 
672 	/* WARNING: Native mode is required!! */
673 	OmidMode = false;
674 	transactionEndMode = 1;
675 
676         final HBaseLowlevelUtilities hlu = new HBaseLowlevelUtilities(false);
677         if (hlu.ds == null) {
678             return;
679         }
680 
681 	URI type = hlu.getUriTypeFromTablename(tableName);
682 	String famName = hlu.getFamilyNameFromTablename(tableName);
683 	String quaName = hlu. getQualifierNameFromTablename(tableName);
684 	if ((type == null) || (famName == null) || (quaName == null)) {
685 	    return;
686 	}
687 
688         long time1 = 0, time2 = 0, time3 = 0, time4 = 0;
689 
690         int numRecord = 0;
691 	Map<Long, Integer> tsMap = new HashMap<Long, Integer>();
692         try {
693             time1 = System.currentTimeMillis();
694 
695 	    Scan scan = hlu.ds.getHbaseUtils().getScan(tableName, famName);
696 	    ResultScanner scanner = hlu.ds.getHbaseUtils().getScanner(tableName, scan);
697 	    long ts;
698 	    Long tsL;
699 
700 	    Result r = scanner.next();
701             while (r != null) {
702 		numRecord++;
703 		ts = r.getColumnLatest(Bytes.toBytes(famName), Bytes.toBytes(quaName)).getTimestamp();
704 		tsL = new Long(ts);
705                 // increment the occurrence of attribute value
706                 int occ = 1;
707                 if (tsMap.containsKey(tsL)) {occ = tsMap.get(tsL).intValue() + 1;}
708 		tsMap.put(tsL, new Integer(occ));
709 		r = scanner.next();
710 	    }
711         } catch (final IOException e) {
712             System.out.println("WARNING Exception");
713             e.printStackTrace();
714         }
715 	time2 = System.currentTimeMillis();
716 
717 	int tot = 0;
718 	StringBuffer str = new StringBuffer();
719 	for (Map.Entry<Long, Integer> entry : tsMap.entrySet()) {
720             Long key = entry.getKey();
721 	    Integer value = entry.getValue();
722             tot += value.intValue();
723             str.append(key.toString() + " -> " + value.toString() + "\n");
724 	}
725 	time3 = System.currentTimeMillis();
726         System.out.println("tsMap:\n" + str);
727 	time4 = System.currentTimeMillis();
728 	
729         System.out.println("numRecord " + numRecord + ", tot " + tot + ", size " + tsMap.size());
730         System.out.println("time 2-1: " + String.valueOf(time2 - time1) + " ms");
731         System.out.println("time 3-2: " + String.valueOf(time3 - time2) + " ms");
732         System.out.println("time 4-3: " + String.valueOf(time4 - time3) + " ms");
733     }
734 
735     private static void printUsage(Options options) {
736         int WIDTH = 80;
737         final PrintWriter out = new PrintWriter(System.out);
738         final HelpFormatter formatter = new HelpFormatter();
739 	// String fullClassName = Thread.currentThread().getStackTrace()[1].getClassName();
740 	// String className = fullClassName.split("\\.")[fullClassName.split("\\.").length - 1];
741 	String className = "<thisClass>";
742 	String cmdLineSyntax = className + " [options] cmd table [args]";
743 	String header = "";
744         String footer = "where:\n"
745 	    + "1 TABLE TIMESTAMP: replicate each entry with timestamp TIMESTAMP\n"
746 	    + "2 TABLE TIMESTAMP: for each entry add a delete entry with timestamp TIMESTAMP\n"
747 	    + "3 TABLE TIMESTAMP1 TIMESTAMP2: for each entry add a delete entry with TIMESTAMP1 and replicate it with TIMESTAMP2\n"
748 	    + "4 TABLE ID: print content and most recent timestamp of ID (= entry/rowkey)\n"
749 	    + "5 TABLE: 'OMIDize' all the entries\n"
750 	    + "6 TABLE: print statistics about the entry timestamps\n";
751 
752         formatter.printHelp(out, WIDTH, cmdLineSyntax, header, options, 2, 2, footer);
753         out.flush();
754 
755         System.exit(1);
756     }
757 
758     public static void main(final String[] args) throws Throwable
759     {
760 	final Options options = new Options();
761         options.addOption("cfg", "config_print",          false, "print configuration settings");
762         options.addOption("h", "help",                    false, "print help and exit");
763         options.addOption("m", "master_host",            true, "the host running hdfs master, zookeeper and omid daemon ");
764 	options.addOption("p", "prefix",                 true, "the prefix of the tables and FS");
765 
766 	CommandLine cl = new GnuParser().parse(options, args);
767 	if (cl.hasOption("h")) {
768 	    printUsage(options);
769 	}
770 	if (cl.hasOption("cfg")) {
771 	    printCfgFiles = true;
772 	}
773 	if (cl.hasOption("m")) {
774 	    masterHost = cl.getOptionValue("m");
775 	}
776 	if (cl.hasOption("p")) {
777 	    generalPrefix = cl.getOptionValue("p");
778 	}
779 
780 	String[] leftArgs = cl.getArgs();
781 	if (leftArgs.length < 2) {
782 	    if (leftArgs.length == 0) {
783 		System.err.println("error: missing cmd");
784 	    } else {
785 		System.err.println("error: missing table");
786 	    }
787 	    printUsage(options);
788 	}
789 
790         int cmd = 0;
791 	cmd = Integer.parseInt(leftArgs[0]);
792 	String tableName = leftArgs[1];
793 
794 	if (printCfgFiles) {
795 	    System.out.println("CommandLine Options");
796 	    for (Option o : cl.getOptions()) {
797 		System.out.println(" " + o.getOpt() + " -> " + o.getValue());
798 	    }
799 	    System.out.println("Args");
800 	    for (String arg : leftArgs) {
801 		System.out.println(" " + arg);
802 	    }
803 	    System.out.println("");
804 	}
805 	
806 
807         switch (cmd) {
808 
809         case 1:
810 	    // replicate all the rowkeys in given table with the given timestamp
811             if (leftArgs.length < 3) {
812                 printUsage(options);
813             }
814             {
815                 long timestamp = 0;
816 		try {
817 		    timestamp = Long.parseLong(leftArgs[2]);
818 		} catch (final NumberFormatException e) {
819 		    System.err.println("error in parsing long for timestamp");
820 		    printUsage(options);
821 		}
822                 replicateRowkeysInTableWithTimestamp(tableName, timestamp);
823             }
824             break;
825 
826         case 2:
827 	    // add a delete entry with the given timestamp for all the rowkeys in given table
828             if (leftArgs.length < 3) {
829                 printUsage(options);
830             }
831             {
832                 long timestamp = 0;
833 		try {
834 		    timestamp = Long.parseLong(leftArgs[2]);
835 		} catch (final NumberFormatException e) {
836 		    System.err.println("error in parsing long for timestamp");
837 		    printUsage(options);
838 		}
839                 addDeleteWithTimestampToAllRowkeysInTable(tableName, timestamp);
840             }
841             break;
842 
843         case 3:
844 	    // for ech rowkey in given table: 
845 	    //   a) add a delete entry with the given timestamp1 and 
846 	    //   b) replicate its content with the given timestamp2
847             if (leftArgs.length < 4) {
848                 printUsage(options);
849             }
850             {
851                 long timestamp1 = 0;
852                 long timestamp2 = 0;
853 		try {
854 		    timestamp1 = Long.parseLong(leftArgs[2]);
855 		    timestamp2 = Long.parseLong(leftArgs[3]);
856 		} catch (final NumberFormatException e) {
857 		    System.err.println("error in parsing long for timestamp1 and timestamp2");
858 		    printUsage(options);
859 		}
860                 addDeleteAndReplicateAllRowkeysInTable(tableName, timestamp1, timestamp2);
861             }
862             break;
863 
864         case 4:
865 	    // print the content and most recent timestamp of given id in the given table
866             if (leftArgs.length < 3) {
867                 printUsage(options);
868             }
869             {
870                 String id = new String(leftArgs[2]);
871                 printIdContentWithTimestamp(tableName, id);
872             }
873             break;
874 
875         case 5:
876 	    // OMIDize the given table
877             {
878                 omidize(tableName);
879             }
880             break;
881 
882         case 6:
883 	    // print statistcs of timestamps of table entries
884             {
885                 printStatisticsOfTimestampsOfTableEntries(tableName);
886             }
887             break;
888 
889         default:
890             printUsage(options);
891         }
892         System.exit(0);
893     }
894 
895 }