1   package eu.fbk.knowledgestore.datastore.hbase;
2   
3   import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.DEFAULT_CON_TAB_NAME;
4   import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.DEFAULT_ENT_TAB_NAME;
5   import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.DEFAULT_MEN_TAB_NAME;
6   import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.DEFAULT_RES_TAB_NAME;
7   import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.DEFAULT_USR_TAB_NAME;
8   import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.HADOOP_FS_DEFAULT_NAME;
9   import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.HADOOP_FS_URL;
10  import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.HBASEDATASTORE_TABLEPREFIX_PROP;
11  import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.HBASE_TRAN_LAYER;
12  import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.HBASE_ZOOKEEPER_QUORUM;
13  import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.NATIVE_TRAN_LAYER_OPT;
14  import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.OMID_TRAN_LAYER_OPT;
15  import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.OMID_TSO_HOST;
16  import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.URIDICT_RELATIVEPATH_DEFAULT;
17  import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.URIDICT_RELATIVEPATH_PROP;
18  
19  import java.io.IOException;
20  import java.io.InputStream;
21  import java.io.PrintWriter;
22  import java.net.URL;
23  import java.util.ArrayList;
24  import java.util.HashMap;
25  import java.util.HashSet;
26  import java.util.List;
27  import java.util.Map;
28  import java.util.Properties;
29  import java.util.Random;
30  import java.util.Set;
31  import java.util.concurrent.CancellationException;
32  
33  import com.google.common.base.Joiner;
34  import com.google.common.collect.Maps;
35  
36  import org.apache.commons.cli.CommandLine;
37  import org.apache.commons.cli.GnuParser;
38  import org.apache.commons.cli.HelpFormatter;
39  import org.apache.commons.cli.Option;
40  import org.apache.commons.cli.Options;
41  import org.apache.hadoop.fs.FileSystem;
42  import org.openrdf.model.URI;
43  import org.openrdf.model.impl.URIImpl;
44  import org.openrdf.model.vocabulary.RDF;
45  import org.openrdf.model.vocabulary.RDFS;
46  import org.slf4j.Logger;
47  import org.slf4j.LoggerFactory;
48  
49  import eu.fbk.knowledgestore.data.Data;
50  import eu.fbk.knowledgestore.data.Dictionary;
51  import eu.fbk.knowledgestore.data.Record;
52  import eu.fbk.knowledgestore.data.Stream;
53  import eu.fbk.knowledgestore.data.XPath;
54  import eu.fbk.knowledgestore.datastore.DataTransaction;
55  import eu.fbk.knowledgestore.datastore.hbase.exception.DataTransactionBlockingException;
56  import eu.fbk.knowledgestore.runtime.DataCorruptedException;
57  import eu.fbk.knowledgestore.runtime.Files;
58  import eu.fbk.knowledgestore.vocabulary.KS;
59  
60  /**
61   * Class for testing HBaseDataTransaction
62   */
63  public class HBaseDataTransactionTester
64  {
65  
66      private HBaseDataStore ds;
67  
68      private DataTransaction dt;
69  
70      /** Logger object */
71      private static Logger logger = LoggerFactory.getLogger(HBaseDataTransactionTester.class);
72  
73      private static boolean printCfgFiles = false;
74  
75      private static boolean OmidMode = true;
76  
77      /** regulate the transaction end:
78            if  1 then "commit"     => dataTransaction.end(true);
79            if  0 then "rollback"   => dataTransaction.end(false);
80            if -1 then do-nothing   => empty code
81      **/
82      private static int transactionEndMode = 1;
83  
84      private static String generalPrefix = null;
85  
86      private static String hbaseTableNamePrefix = "";
87  
88      private static String masterHost = "";
89  
90      /**
91       * Constructor.
92       */
93      public HBaseDataTransactionTester(boolean readOnly) {
94          try {
95              final String propertiesFileName = getClass().getSimpleName() + ".properties";
96              final URL url = getClass().getResource(propertiesFileName);
97              logger.info("url is " + url);
98              final InputStream stream = url.openStream();
99              final Properties properties = new Properties();
100             properties.load(stream);
101             stream.close();
102             logger.info("read properties from file");
103 
104 
105 	    /* 
106 	       Override properties from file with those from options
107 	    */
108 
109             // override property HBASE_TRAN_LAYER
110             if (OmidMode) {
111                 properties.setProperty(HBASE_TRAN_LAYER, OMID_TRAN_LAYER_OPT);
112             } else {
113                 properties.setProperty(HBASE_TRAN_LAYER, NATIVE_TRAN_LAYER_OPT);
114             }
115 
116             // override property "transaction.end.mode" (specific to this class)
117 	    properties.setProperty("transaction.end.mode", Integer.toString(transactionEndMode));
118 
119 	    if (! masterHost.equals("")) {
120 		// override property HBASE_ZOOKEEPER_QUORUM
121 		properties.setProperty(HBASE_ZOOKEEPER_QUORUM, masterHost);
122 
123 		// override property HADOOP_FS_URL and HADOOP_FS_DEFAULT_NAME
124 		properties.setProperty(HADOOP_FS_URL, "hdfs://" + masterHost + ":9000/");
125 		properties.setProperty(HADOOP_FS_DEFAULT_NAME, properties.getProperty(HADOOP_FS_URL));
126 
127 		// override property OMID_TSO_HOST
128 		properties.setProperty(OMID_TSO_HOST, masterHost);
129 	    }
130 	    
131             // override property HBASEDATASTORE_TABLEPREFIX_PROP
132 	    if (generalPrefix != null) {
133 		if (generalPrefix.equals("")) {
134 		    hbaseTableNamePrefix = "";
135 		} else {
136 		    hbaseTableNamePrefix = generalPrefix + ".";
137 		}
138 		properties.setProperty(HBASEDATASTORE_TABLEPREFIX_PROP, hbaseTableNamePrefix);
139 
140 		// override property URIDICT_RELATIVEPATH_PROP
141 		String uriDictPath;
142 		if (generalPrefix.equals("")) {
143 		    uriDictPath = "KnowledgeStore/" + URIDICT_RELATIVEPATH_DEFAULT;
144 		} else {
145 		    uriDictPath = "KnowledgeStore." + generalPrefix + "/" + URIDICT_RELATIVEPATH_DEFAULT;
146 		}	    
147 		properties.setProperty(URIDICT_RELATIVEPATH_PROP, uriDictPath);
148 	    }
149 
150 	    // set local variables 
151 	    hbaseTableNamePrefix = properties.getProperty(HBASEDATASTORE_TABLEPREFIX_PROP);
152 	    
153             if (printCfgFiles) {
154                 System.out.println("\nBEGIN OF |origXmlCfg|");
155                 System.out.println(Joiner.on("\n").withKeyValueSeparator("=").join(properties));
156                 System.out.println("END OF |origXmlCfg|\n");
157             }
158 
159             // create filesystem
160             final String fsURL = properties.getProperty(HADOOP_FS_URL);
161             final Map<String, String> fsProperties = Maps.newHashMap();
162             for (Map.Entry<Object, Object> entry : properties.entrySet()) {
163                 if (entry.getKey().toString().startsWith("fs.")) {
164                     fsProperties.put(entry.getKey().toString(), entry.getValue().toString());
165                 }
166             }
167             final FileSystem fileSystem = Files.getFileSystem(fsURL, fsProperties);
168 
169             // create new DataStore
170             ds = new HBaseDataStore(fileSystem, properties);
171 
172             final org.apache.hadoop.conf.Configuration hbaseCfg = ds.hbaseCfg;
173 
174             if (printCfgFiles) {
175                 // print the two conf files
176                 System.out.println("\nBEGIN OF |hbaseCfg|");
177                 if (hbaseCfg != null) {
178                     org.apache.hadoop.conf.Configuration.dumpConfiguration(hbaseCfg,
179                             new PrintWriter(System.out));
180                 } else {
181                     System.out.println("hbaseCfg null");
182                 }
183                 System.out.println("END OF |hbaseCfg|\n");
184 
185                 System.out.println("\nBEGIN OF |xmlCfg|");
186                 System.out.println(Joiner.on("\n").withKeyValueSeparator("=").join(properties));
187                 System.out.println("END OF |xmlCfg|\n");
188             }
189 
190             ds.init();
191 
192             dt = ds.begin(readOnly);
193 	    logger.info("created dt = ds.begin(" + readOnly + ")");
194 	    
195             if (printCfgFiles) {
196                 final Dictionary<URI> dict = ds.getSerializer().getDictionary();
197                 final String dictUrl = dict.getDictionaryURL();
198                 System.out.println("\nDictionary " + dictUrl + " begin");
199                 for (int i = 1; i < 1000; i++) {
200                     try {
201                         final URI val = (URI) dict.objectFor(i, true);
202                         System.out.println(i + " -> " + val);
203                     } catch (Exception e) {
204                         break;
205                     }
206                 }
207                 System.out.println("Dictionary end\n");
208             }
209 
210             System.out.println("end of " + this.getClass().getSimpleName() + " Constructor");
211 
212         } catch (final Exception e) {
213             e.printStackTrace();
214         }
215     }
216 
217     // Generate random integerin range 0..(limit-1)");
218     public static int getRandomInt(final int limit)
219     {
220         final Random randomGenerator = new Random();
221         final int randomInt = randomGenerator.nextInt(limit);
222         return randomInt;
223     }
224 
225     private static String recordToString(final Record r)
226     {
227 	String str = new String(r.toString(Data.getNamespaceMap(), true));
228 	return str;
229     }
230 
231     private static Record createRecordWithRandomValue(final URI type, final int limit, final int id)
232     {
233         final URI uriId = new URIImpl("http://rolexample.org/" + id);
234         final Record r = Record.create();
235         r.setID(uriId);
236         r.set(RDF.TYPE, type);
237         final String value = "comment # " + Integer.toString(getRandomInt(limit));
238         r.set(RDFS.COMMENT, value);
239         return r;
240     }
241 
242     private static void printOccurrenceMap(final Map<String, Integer> occurrenceMap)
243     {
244         int tot = 0;
245         System.out.println("Print occurrenceMap");
246         for (final Map.Entry<String, Integer> entry : occurrenceMap.entrySet()) {
247             final String key = entry.getKey();
248             final Integer value = entry.getValue();
249             tot += value.intValue();
250             System.out.println("  occurrenceMap value: " + key + " -> " + value.toString());
251         }
252         System.out.println("found tot occurrenceMap values " + Integer.toString(tot));
253     }
254 
255     private static void endTransaction (DataTransaction dataTran) throws DataCorruptedException, IOException {
256 	if (transactionEndMode == 1) {
257 	    dataTran.end(true);
258 	    logger.info("doTransactionEnd: dataTran.end(true) [= commit]");
259 	} else if (transactionEndMode == 0) {
260 	    dataTran.end(false);
261 	    logger.info("doTransactionEnd: dataTran.end(false) [= rollback]");
262 	} else {
263 	    logger.info("doTransactionEnd: NOTHING [= neither commit nor rollback]");
264 	}
265     }
266 
267     private URI getUriTypeFromTablename (String tableName) 
268     {
269 	if (tableName.equalsIgnoreCase(hbaseTableNamePrefix + DEFAULT_RES_TAB_NAME)) {
270 	    return KS.RESOURCE;
271 	} else if (tableName.equalsIgnoreCase(hbaseTableNamePrefix + DEFAULT_MEN_TAB_NAME)) {
272 	    return KS.MENTION;
273 	} else if (tableName.equalsIgnoreCase(hbaseTableNamePrefix + DEFAULT_ENT_TAB_NAME)) {
274 	    return KS.ENTITY;
275 	} else if (tableName.equalsIgnoreCase(hbaseTableNamePrefix + DEFAULT_CON_TAB_NAME)) {
276 	    return KS.CONTEXT;
277 	} else if (tableName.equalsIgnoreCase(hbaseTableNamePrefix + DEFAULT_USR_TAB_NAME)) {
278 	    return KS.USER;
279 	} else {
280 	    System.out.println("unknown tableName " + tableName);
281 	    return null;
282 	}
283     }
284 
285 
286     private static void populateTableRandomly(String tableName, final int num, final int startId, int tem)
287             throws Throwable
288     {
289         System.out.println("populateTableRandomly: tableName " + tableName + ", num " + num + ", startId " + startId + ", tem " + tem);
290         final HBaseDataTransactionTester dtt = new HBaseDataTransactionTester(false);
291         if (dtt.ds == null) {
292             return;
293         }
294 	URI type = dtt.getUriTypeFromTablename(tableName);
295 	if (type == null) {
296 	    return;
297 	}
298 	long time1 = 0, time2 = 0, time3 = 0, time4 = 0, time5 = 0;
299         time1 = System.currentTimeMillis();
300         final int randomLimit = 10;
301         final List<Record> recordList = new ArrayList<Record>();
302         for (int i = 0; i < num; i++) {
303             final Record r = createRecordWithRandomValue(type, randomLimit, startId + i);
304             recordList.add(r);
305         }
306         time2 = System.currentTimeMillis();
307 
308         if (num > 0) {
309             // collect statistic of randomly generated value of the "RDFS.COMMENT" attribute of
310             // records
311             final Map<String, Integer> occurrenceMap = new HashMap<String, Integer>();
312             for (int i = 0; i < num; i++) {
313                 final Record r = recordList.get(i);
314 
315                 // the id:
316                 // final String id = r.getID().toString();
317 
318                 // the value of property/attribute RDFS.COMMENT:
319                 final String val = r.getUnique(RDFS.COMMENT, String.class);
320 
321                 // increment the occurrence of attribute value
322                 int occ = 1;
323                 if (occurrenceMap.containsKey(val)) {
324                     occ = occurrenceMap.get(val).intValue() + 1;
325                 }
326                 occurrenceMap.put(val, new Integer(occ));
327             }
328             // print the occurrenceMap
329             printOccurrenceMap(occurrenceMap);
330         }
331         time3 = System.currentTimeMillis();
332 
333         String msg = "";
334         DataTransaction dataTran = dtt.dt;
335 
336 	transactionEndMode = tem;
337         try {
338             for (final Record r : recordList) {
339                 dataTran.store(type, r);
340             }
341             time4 = System.currentTimeMillis();
342 	    // logger.info("sleep(10000)"); Thread.sleep(10000);
343 	    endTransaction(dataTran);
344             dataTran = null;
345             time5 = System.currentTimeMillis();
346             msg = "Added " + num + " records";
347             msg += "\ntime 2-1: " + String.valueOf(time2 - time1) + " ms";
348             msg += "\ntime 3-2: " + String.valueOf(time3 - time2) + " ms";
349             msg += "\ntime 4-3: " + String.valueOf(time4 - time3) + " ms";
350             msg += "\ntime 5-4: " + String.valueOf(time5 - time4) + " ms";
351         } catch (final DataTransactionBlockingException e) {
352             msg = "WARNING: DataTransactionBlockingException";
353             msg += " No records added!";
354         } catch (final CancellationException e) {
355             // not our case: ignore
356             msg = "CancelledException";
357             msg += " No records added!";
358         } catch (final Exception e) {
359             msg = "WARNING: Exception";
360             msg += " No records added!";
361             e.printStackTrace();
362         } finally {
363             if (dataTran != null) {
364                 dataTran.end(false);
365 		logger.info("dataTran.end(false)");
366             }
367         }
368         System.out.println(msg);
369     }
370 
371     private static void retrieveRowsInTable(final String tableName, int maxRecords)
372     {
373         System.out.println("retrieveRowsInTable: tableName " + tableName + ", maxRecords " + maxRecords);
374         final HBaseDataTransactionTester dtt = new HBaseDataTransactionTester(true);
375         if (dtt.ds == null) {
376             return;
377         }
378 	URI type = dtt.getUriTypeFromTablename(tableName);
379 	if (type == null) {
380 	    return;
381 	}
382         long time1 = 0, time2 = 0, time3 = 0, time4 = 0, time5 = 0, time6 = 0, time7 = 0;
383         int numRecords = 0;
384         DataTransaction dataTran = dtt.dt;
385         try {
386             time1 = System.currentTimeMillis();
387 	    logger.info("before retrieve() 1");
388             Stream<Record> cur = dataTran.retrieve(type, null, null);
389             try {
390                 time2 = System.currentTimeMillis();
391                 numRecords += cur.count();
392             } finally {
393                 cur.close();
394             }
395             time3 = System.currentTimeMillis();
396 
397 	    logger.info("before retrieve() 2");
398             cur = dataTran.retrieve(type, null, null);
399 	    int numRecords2 = 0;
400             try {
401                 time4 = System.currentTimeMillis();
402                 // collect statistic of randomly generated value of the "RDFS.COMMENT" attribute
403                 // of records
404                 final Map<String, Integer> occurrenceMap = new HashMap<String, Integer>();
405                 for (Record r : cur) {
406 		    numRecords2++;
407 
408 		    // print the first record
409 		    // if (numRecords2 == 1) {System.out.println("first record: " + recordToString(r));}
410 
411                     // the value of property/attribute RDFS.COMMENT:
412                     final String val = r.getUnique(RDFS.COMMENT, String.class);
413 
414                     // increment the occurrence of attribute value
415                     int occ = 1;
416                     if (occurrenceMap.containsKey(val)) {
417                         occ = occurrenceMap.get(val).intValue() + 1;
418                     }
419                     occurrenceMap.put(val, new Integer(occ));
420 		    if ((maxRecords > 0) && (numRecords2 >= maxRecords)) {
421 			break;
422 		    }
423                 }
424                 time5 = System.currentTimeMillis();
425                 endTransaction(dataTran);
426                 time6 = System.currentTimeMillis();
427 
428                 // print the occurrenceMap
429                 printOccurrenceMap(occurrenceMap);
430                 time7 = System.currentTimeMillis();
431             } finally {
432                 cur.close();
433             }
434 	    
435         } catch (final IOException e) {
436             System.out.println("WARNING Exception");
437             e.printStackTrace();
438         }
439 
440         System.out.println("Found " + Integer.toString(numRecords) + " records");
441         System.out.println("time 2-1: " + String.valueOf(time2 - time1) + " ms");
442         System.out.println("time 3-2: " + String.valueOf(time3 - time2) + " ms");
443         System.out.println("time 4-3: " + String.valueOf(time4 - time3) + " ms");
444         System.out.println("time 5-4: " + String.valueOf(time5 - time4) + " ms");
445         System.out.println("time 6-5: " + String.valueOf(time6 - time5) + " ms");
446         System.out.println("time 7-6: " + String.valueOf(time7 - time6) + " ms");
447     }
448 
449     private static void retrieveWithFilter(String tableName, final String conditionString,
450 					   final boolean doFilterOnClientSide)
451     {
452         System.out.println("retrieveWithFilter: tableName " + tableName + ", conditionString " 
453 			   + conditionString + ", doFilterOnClientSide " + doFilterOnClientSide);
454         final HBaseDataTransactionTester dtt = new HBaseDataTransactionTester(true);
455         final HBaseDataStore ds = dtt.ds;
456         if (dtt.ds == null) {
457             return;
458         }
459 	URI type = dtt.getUriTypeFromTablename(tableName);
460 	if (type == null) {
461 	    return;
462 	}
463         final boolean serverSideFiltering = ds.getServerFilterFlag();
464         if (doFilterOnClientSide != !serverSideFiltering) {
465             String msg = "Unsupported filtering modality: ";
466             msg += "requested " + (doFilterOnClientSide ? "client-side" : "server-side");
467             msg += ", application configured with "
468                     + (serverSideFiltering ? "server-side" : "client-side");
469             System.out.println(msg);
470             return;
471         }
472         long time1 = 0, time2 = 0, time3 = 0, time4 = 0, time5 = 0, time6 = 0, time7 = 0;
473         int numRecords = 0;
474         DataTransaction dataTran = dtt.dt;
475         try {
476             // Condition cond = Condition.create("/<" + RDFS.COMMENT + "> = 'comment # 0'");
477             final XPath cond = XPath.parse(conditionString);
478             System.out.println("cond is " + cond.toString());
479             time1 = System.currentTimeMillis();
480 	    logger.info("before retrieve() 1");
481             Stream<Record> cur = dataTran.retrieve(type, cond, null);
482             try {
483                 time2 = System.currentTimeMillis();
484                 numRecords += cur.count();
485                 time3 = System.currentTimeMillis();
486                 System.out.println("first retrieve() found " + numRecords + " records");
487             } finally {
488                 cur.close();
489             }
490 	    logger.info("before retrieve() 2");
491             cur = dataTran.retrieve(type, cond, null);
492             try {
493                 time4 = System.currentTimeMillis();
494                 // collect statistic of randomly generated value of the "RDFS.COMMENT" attribute
495                 // of records
496                 final Map<String, Integer> occurrenceMap = new HashMap<String, Integer>();
497                 for (Record m : cur) {
498                     // the value of property/attribute RDFS.COMMENT:
499                     final String val = m.getUnique(RDFS.COMMENT, String.class);
500 
501                     // increment the occurrence of attribute value
502                     int occ = 1;
503                     if (occurrenceMap.containsKey(val)) {
504                         occ = occurrenceMap.get(val).intValue() + 1;
505                     }
506                     occurrenceMap.put(val, new Integer(occ));
507                 }
508                 time5 = System.currentTimeMillis();
509                 endTransaction(dataTran);
510                 time6 = System.currentTimeMillis();
511 
512                 // print the occurrenceMap
513                 printOccurrenceMap(occurrenceMap);
514                 time7 = System.currentTimeMillis();
515             } finally {
516                 cur.close();
517             }
518 
519         } catch (final IOException e) {
520             System.out.println("WARNING Exception");
521             e.printStackTrace();
522         }
523 
524         System.out.println("Found " + numRecords + " records");
525         System.out.println("time 2-1: " + String.valueOf(time2 - time1) + " ms");
526         System.out.println("time 3-2: " + String.valueOf(time3 - time2) + " ms");
527         System.out.println("time 4-3: " + String.valueOf(time4 - time3) + " ms");
528         System.out.println("time 5-4: " + String.valueOf(time5 - time4) + " ms");
529         System.out.println("time 6-5: " + String.valueOf(time6 - time5) + " ms");
530         System.out.println("time 7-6: " + String.valueOf(time7 - time6) + " ms");
531     }
532 
533     private static void retrieveAllAndSelectLocally(String tableName, final String conditionString)
534     {
535         System.out.println("retrieveAllAndSelectLocally: tableName " + tableName + ", conditionString " + conditionString);
536 	XPath cond = XPath.parse(conditionString);
537         final HBaseDataTransactionTester dtt = new HBaseDataTransactionTester(true);
538         if (dtt.ds == null) {
539             return;
540         }
541 	URI type = dtt.getUriTypeFromTablename(tableName);
542 	if (type == null) {
543 	    return;
544 	}
545         long time1 = 0, time2 = 0, time3 = 0, time4 = 0, time5 = 0, time6 = 0, time7 = 0;
546         int numRecords = 0;
547         DataTransaction dataTran = dtt.dt;
548         try {
549             time1 = System.currentTimeMillis();
550 	    logger.info("before retrieve() 1");
551             Stream<Record> cur = dataTran.retrieve(type, null, null);
552             try {
553                 time2 = System.currentTimeMillis();
554                 for (Record r : cur) {
555                     if (cond.evalBoolean(r)) {
556                         numRecords++;
557                         // print the first record
558 			// if (numRecords == 1) { System.out.println("first record\n" + recordToString(r)); }
559                     }
560                 }
561                 time3 = System.currentTimeMillis();
562             } finally {
563                 cur.close();
564             }
565             
566 	    logger.info("before retrieve() 2");
567             cur = dataTran.retrieve(type, null, null);
568             try {
569                 time4 = System.currentTimeMillis();
570                 // collect statistic of randomly generated value of the "RDFS.COMMENT" attribute
571                 // of
572                 // records
573                 final Map<String, Integer> occurrenceMap = new HashMap<String, Integer>();
574                 for (Record m : cur) {
575                     if (cond.evalBoolean(m)) {
576                         // the value of property/attribute RDFS.COMMENT:
577                         final String val = m.getUnique(RDFS.COMMENT, String.class);
578 
579                         // increment the occurrence of attribute value
580                         int occ = 1;
581                         if (occurrenceMap.containsKey(val)) {
582                             occ = occurrenceMap.get(val).intValue() + 1;
583                         }
584                         occurrenceMap.put(val, new Integer(occ));
585                     }
586                 }
587                 time5 = System.currentTimeMillis();
588                 endTransaction(dataTran);
589                 time6 = System.currentTimeMillis();
590 
591                 // print the occurrenceMap
592                 printOccurrenceMap(occurrenceMap);
593                 time7 = System.currentTimeMillis();
594             } finally {
595                 cur.close();
596             }
597 
598         } catch (final IOException e) {
599             System.out.println("WARNING Exception");
600             e.printStackTrace();
601         }
602 
603         System.out.println("Found " + numRecords + " records");
604         System.out.println("time 2-1: " + String.valueOf(time2 - time1) + " ms");
605         System.out.println("time 3-2: " + String.valueOf(time3 - time2) + " ms");
606         System.out.println("time 4-3: " + String.valueOf(time4 - time3) + " ms");
607         System.out.println("time 5-4: " + String.valueOf(time5 - time4) + " ms");
608         System.out.println("time 6-5: " + String.valueOf(time6 - time5) + " ms");
609         System.out.println("time 7-6: " + String.valueOf(time7 - time6) + " ms");
610     }
611 
612     private static void countRowsInTable(final String tableName)
613     {
614         System.out.println("countRowsInTable: tableName " + tableName);
615         final HBaseDataTransactionTester dtt = new HBaseDataTransactionTester(true);
616         if (dtt.ds == null) {
617             return;
618         }
619 	URI type = dtt.getUriTypeFromTablename(tableName);
620 	if (type == null) {
621 	    return;
622 	}
623         long time1 = 0, time2 = 0, time3 = 0;
624         long rowCounts = 0;
625         DataTransaction dataTran = dtt.dt;
626         try {
627             time1 = System.currentTimeMillis();
628 	    logger.info("before dataTran.count() " + type.toString());
629             rowCounts = dataTran.count(type, null);
630 	    logger.info("after dataTran.count()");
631             time2 = System.currentTimeMillis();
632             endTransaction(dataTran);
633             time3 = System.currentTimeMillis();
634 
635         } catch (final IOException e) {
636             System.out.println("WARNING Exception");
637             e.printStackTrace();
638         }
639 
640         System.out.println("Found " + Long.toString(rowCounts) + " entries");
641         System.out.println("time 2-1: " + String.valueOf(time2 - time1) + " ms");
642         System.out.println("time 3-2: " + String.valueOf(time3 - time2) + " ms");
643     }
644 
645     private static void lookupIdInTable(final String id, final String tableName)
646     {
647         System.out.println("lookupIdInTable: id " + id + ", tableName " + tableName);
648         final HBaseDataTransactionTester dtt = new HBaseDataTransactionTester(true);
649         if (dtt.ds == null) {
650             return;
651         }
652 	URI type = dtt.getUriTypeFromTablename(tableName);
653 	if (type == null) {
654 	    return;
655 	}
656 	Set<URIImpl> ids = new HashSet<URIImpl>();
657 	ids.add(new URIImpl(id));
658 
659         long time1 = 0, time2 = 0, time3 = 0, time4 = 0, time5 = 0, time6 = 0;
660 	int numRecords = 0;
661         DataTransaction dataTran = dtt.dt;
662 
663         try {
664             time1 = System.currentTimeMillis();
665 	    logger.info("before lookup() 1");
666             Stream<Record> cur = dataTran.lookup(type, ids, null);
667             try {
668                 time2 = System.currentTimeMillis();
669                 numRecords += cur.count();
670                 time3 = System.currentTimeMillis();
671             } finally {
672                 cur.close();
673             }
674 
675 	    logger.info("before lookup() 2");
676             cur = dataTran.lookup(type, ids, null);
677             try {
678                 time4 = System.currentTimeMillis();
679                 for (Record r : cur) {
680                     String str = "found ";
681                     str += recordToString(r);
682                     System.out.println(str);
683                 }
684                 time5 = System.currentTimeMillis();
685                 endTransaction(dataTran);
686                 time6 = System.currentTimeMillis();
687             } finally {
688                 cur.close();
689             }
690         } catch (final IOException e) {
691             System.out.println("WARNING Exception");
692             e.printStackTrace();
693         }
694 
695         System.out.println("Found " + Integer.toString(numRecords) + " Records");
696         System.out.println("time 2-1: " + String.valueOf(time2 - time1) + " ms");
697         System.out.println("time 3-2: " + String.valueOf(time3 - time2) + " ms");
698         System.out.println("time 4-3: " + String.valueOf(time4 - time3) + " ms");
699         System.out.println("time 5-4: " + String.valueOf(time5 - time4) + " ms");
700         System.out.println("time 6-5: " + String.valueOf(time6 - time5) + " ms");
701     }
702 
703     private static void updateIdInTable(final String id, final String tableName, final String newvalue)
704     {
705         System.out.println("updateIdInTable: id " + id + ", tableName " + tableName + ", newvalue " + newvalue);
706         final HBaseDataTransactionTester dtt = new HBaseDataTransactionTester(false);
707         if (dtt.ds == null) {
708             return;
709         }
710 	URI type = dtt.getUriTypeFromTablename(tableName);
711 	if (type == null) {
712 	    return;
713 	}
714 	Record r = Record.create();
715 	URI uriID = new URIImpl(id);
716 	r.setID(uriID);
717 	r.set(RDF.TYPE, type);
718         r.set(RDFS.COMMENT, newvalue);
719 
720         long time1 = 0, time2 = 0, time3 = 0;
721         DataTransaction dataTran = dtt.dt;
722 
723         try {
724             time1 = System.currentTimeMillis();
725 	    logger.info("before update()");
726 	    dataTran.store(type, r);
727             time2 = System.currentTimeMillis();
728             endTransaction(dataTran);
729             time3 = System.currentTimeMillis();
730 	    
731         } catch (final IOException e) {
732             System.out.println("WARNING Exception");
733             e.printStackTrace();
734         }
735 
736         System.out.println("Updated record");
737         System.out.println("time 2-1: " + String.valueOf(time2 - time1) + " ms");
738         System.out.println("time 3-2: " + String.valueOf(time3 - time2) + " ms");
739     }
740 
741     private static void deleteIdInTable(final String id, final String tableName)
742     {
743         System.out.println("deleteIdInTable: id " + id + ", tableName " + tableName);
744         final HBaseDataTransactionTester dtt = new HBaseDataTransactionTester(false);
745         if (dtt.ds == null) {
746             return;
747         }
748 	URI type = dtt.getUriTypeFromTablename(tableName);
749 	if (type == null) {
750 	    return;
751 	}
752 	Record r = Record.create();
753 	URI uriID = new URIImpl(id);
754 	r.set(RDF.TYPE, type);
755 	r.setID(uriID);
756 
757         long time1 = 0, time2 = 0, time3 = 0;
758         DataTransaction dataTran = dtt.dt;
759 
760         try {
761             time1 = System.currentTimeMillis();
762 	    logger.info("before delete()");
763 	    dataTran.delete(type, r.getID());
764             time2 = System.currentTimeMillis();
765             endTransaction(dataTran);
766             time3 = System.currentTimeMillis();
767 	    
768         } catch (final IOException e) {
769             System.out.println("WARNING Exception");
770             e.printStackTrace();
771         }
772 
773         System.out.println("Deleted record");
774         System.out.println("time 2-1: " + String.valueOf(time2 - time1) + " ms");
775         System.out.println("time 3-2: " + String.valueOf(time3 - time2) + " ms");
776     }
777 
778     private static void onceRetrieveRowsInTable(final String tableName, int maxRecords)
779     {
780         System.out.println("onceRetrieveRowsInTable: tableName " + tableName + ", maxRecords " + maxRecords);
781         final HBaseDataTransactionTester dtt = new HBaseDataTransactionTester(true);
782         if (dtt.ds == null) {
783             return;
784         }
785 	URI type = dtt.getUriTypeFromTablename(tableName);
786 	if (type == null) {
787 	    return;
788 	}
789         long time1 = 0, time2 = 0, time3 = 0, time4 = 0;
790         int numRecords = 0;
791         DataTransaction dataTran = dtt.dt;
792 	Stream<Record> cur = null;
793         try {
794             time1 = System.currentTimeMillis();
795 	    logger.info("before retrieve() ");
796 	    cur = dataTran.retrieve(type, null, null);
797 	    time2 = System.currentTimeMillis();
798 	    for (Record r : cur) {
799 		numRecords++;
800 		System.out.println(recordToString(r));
801 
802 		if ((maxRecords > 0) && (numRecords >= maxRecords)) {
803 		    break;
804                 }
805 	    }
806 	    time3 = System.currentTimeMillis();
807 	    endTransaction(dataTran);
808 	    time4 = System.currentTimeMillis();
809 
810 	} catch (final IOException e) {
811             System.out.println("WARNING Exception");
812             e.printStackTrace();
813 	} finally {
814 	    cur.close();
815         }
816 
817         System.out.println("Found " + Integer.toString(numRecords) + " records");
818         System.out.println("time 2-1: " + String.valueOf(time2 - time1) + " ms");
819         System.out.println("time 3-2: " + String.valueOf(time3 - time2) + " ms");
820         System.out.println("time 4-3: " + String.valueOf(time4 - time3) + " ms");
821     }
822 
823     private static void onceRetrieveRowsInTable_IdOnly(final String tableName, int maxRecords)
824     {
825         System.out.println("onceRetrieveRowsInTable_IdOnly: tableName " + tableName + ", maxRecords " + maxRecords);
826         final HBaseDataTransactionTester dtt = new HBaseDataTransactionTester(true);
827         if (dtt.ds == null) {
828             return;
829         }
830 	URI type = dtt.getUriTypeFromTablename(tableName);
831 	if (type == null) {
832 	    return;
833 	}
834         long time1 = 0, time2 = 0, time3 = 0, time4 = 0;
835         int numRecords = 0;
836         DataTransaction dataTran = dtt.dt;
837 	Stream<Record> cur = null;
838         try {
839             time1 = System.currentTimeMillis();
840 	    logger.info("before retrieve() ");
841 	    cur = dataTran.retrieve(type, null, null);
842 	    time2 = System.currentTimeMillis();
843 	    for (Record r : cur) {
844 		numRecords++;
845 		System.out.println(r.toString());
846 
847 		if ((maxRecords > 0) && (numRecords >= maxRecords)) {
848 		    break;
849                 }
850 	    }
851 	    time3 = System.currentTimeMillis();
852 	    endTransaction(dataTran);
853 	    time4 = System.currentTimeMillis();
854 
855 	} catch (final IOException e) {
856             System.out.println("WARNING Exception");
857             e.printStackTrace();
858 	} finally {
859 	    cur.close();
860         }
861 
862         System.out.println("Found " + Integer.toString(numRecords) + " records");
863         System.out.println("time 2-1: " + String.valueOf(time2 - time1) + " ms");
864         System.out.println("time 3-2: " + String.valueOf(time3 - time2) + " ms");
865         System.out.println("time 4-3: " + String.valueOf(time4 - time3) + " ms");
866     }
867 
868     private static void printUsage(Options options) {
869         int WIDTH = 80;
870         final PrintWriter out = new PrintWriter(System.out);
871         final HelpFormatter formatter = new HelpFormatter();
872 	// String fullClassName = Thread.currentThread().getStackTrace()[1].getClassName();
873 	// String className = fullClassName.split("\\.")[fullClassName.split("\\.").length - 1];
874 	String className = "<thisClass>";
875 	String cmdLineSyntax = className + " [options] cmd table [args*]";
876 	String header = "";
877         String footer = "where cmd:\n"
878 	    + "1 TABLE NUM INDEX: populate with num records starting at index\n"
879 	    + "2 TABLE [MAX_RECORD]: retrieve max_record records (default 0 means all)\n"
880 	    + "3 TABLE CONDITION: filter records with condition on server-side\n"
881 	    + "4 TABLE CONDITION: retrieve all records and select locally with condition\n"
882 	    + "5 TABLE: count rows\n"
883 	    + "6 TABLE ID: lookup identifier\n"
884 	    + "7 TABLE ID NEW_VALUE: update the attribute RDF.COMMENT of identifier with new_value\n"
885 	    + "8 TABLE ID: delete identifier in table\n"
886 	    + "12 TABLE [MAX_RECORD]: once-retrieve max_record records (default 0 means all)\n"
887 	    + "13 TABLE [MAX_RECORD]: once-retrieve_IdOnly max_record records (default 0 means all)\n";
888 
889         formatter.printHelp(out, WIDTH, cmdLineSyntax, header, options, 2, 2, footer);
890         out.flush();
891 
892         System.exit(1);
893     }
894 
895     public static void main(final String[] args) throws Throwable
896     {
897 	final Options options = new Options();
898         options.addOption("cfg", "config_print",          false, "print configuration settings");
899         options.addOption("csf", "client_side_filtering", false, "do filtering on client side (default server)");
900         options.addOption("h", "help",                    false, "print help and exit");
901         options.addOption("n", "native_mode",             false, "set native mode (default omid)");
902         options.addOption("m", "master_host",            true, "the host running hdfs master, zookeeper and omid daemon ");
903         options.addOption("p", "prefix",                 true, "the prefix of the tables and FS");
904         options.addOption("tem", "transaction_end_mode", true, "set the mode of transaction end: 1 commit, 0 rollback, -1 do-nothing (default 1)");
905 	
906 	CommandLine cl = new GnuParser().parse(options, args);
907 	if (cl.hasOption("h")) {
908 	    printUsage(options);
909 	}
910 	if (cl.hasOption("cfg")) {
911 	    printCfgFiles = true;
912 	}
913 	boolean clientSideFlag = false;
914 	if (cl.hasOption("csf")) {
915 	    clientSideFlag = true;
916 	}
917 	if (cl.hasOption("n")) {
918 	    OmidMode = false;
919 	}
920 
921 	if (cl.hasOption("m")) {
922 	    masterHost = cl.getOptionValue("m");
923 	}
924 	if (cl.hasOption("p")) {
925 	    generalPrefix = cl.getOptionValue("p");
926 	}
927 	if (cl.hasOption("tem")) {
928 	    int value = Integer.parseInt(cl.getOptionValue("tem"));
929 	    switch (value) {
930 	    case 1:
931 	    case 0:
932 	    case -1:
933 		transactionEndMode = value;
934 		break;
935 	    default:
936 		System.err.println("error: unknown value for transaction_end_mode " + cl.getOptionValue("tem"));
937 		printUsage(options);
938 		break;
939 	    }
940 	}
941 
942 	String[] leftArgs = cl.getArgs();
943 	if (leftArgs.length < 2) {
944 	    if (leftArgs.length == 0) {
945 		System.err.println("error: missing cmd");
946 	    } else {
947 		System.err.println("error: missing table");
948 	    }
949 	    printUsage(options);
950 	}
951 
952 	int cmd = 0;
953 	cmd = Integer.parseInt(leftArgs[0]);
954 	String tableName = leftArgs[1];
955 
956 	if (printCfgFiles) {
957 	    System.out.println("CommandLine Options");
958 	    for (Option o : cl.getOptions()) {
959 		System.out.println(" " + o.getOpt() + " -> " + o.getValue());
960 	    }
961 	    System.out.println("Args");
962 	    for (String arg : leftArgs) {
963 		System.out.println(" " + arg);
964 	    }
965 	    System.out.println("");
966 	}
967 	
968 
969         switch (cmd) {
970 
971         case 1:
972 	    // generate and store numEntries records from startIndex with random content
973             if (leftArgs.length < 3) {
974 		System.err.println("error: missing num_of_entries");
975                 printUsage(options);
976             } else if (leftArgs.length < 4) {
977 		System.err.println("error: missing start_index");
978                 printUsage(options);
979             }
980 	    {
981 		int numEntries = -1;
982 		int startIndex = -1;
983 		numEntries = Integer.parseInt(leftArgs[2]);
984 		startIndex = Integer.parseInt(leftArgs[3]);
985 	
986                 populateTableRandomly(tableName, numEntries, startIndex, transactionEndMode);
987             }
988             break;
989 
990 	case 2:
991             // retrieve all the rows in the given table
992             {
993 		int maxRecords = 0;
994 		if (leftArgs.length > 2) {
995 		    maxRecords = Integer.parseInt(leftArgs[2]);
996 		}
997 
998 		retrieveRowsInTable(tableName, maxRecords);
999             }
1000             break;
1001 
1002         case 3:
1003             // filter with condition (default on server-side)
1004             if (leftArgs.length < 3) {
1005 		System.err.println("error: missing condition");
1006                 printUsage(options);
1007             }
1008 	    {
1009 		String conditionString = leftArgs[2];
1010 
1011 		retrieveWithFilter(tableName, conditionString, clientSideFlag);
1012 	    }
1013             break;
1014 
1015         case 4:
1016             // retrieve all the records and select locally with condition
1017             if (leftArgs.length < 3) {
1018 		System.err.println("error: missing condition");
1019                 printUsage(options);
1020             }
1021             {
1022 		String conditionString = leftArgs[2];
1023 
1024                 retrieveAllAndSelectLocally(tableName, conditionString);
1025             }
1026             break;
1027 
1028         case 5:
1029             // count the rows in the given table
1030             {
1031                 countRowsInTable(tableName);
1032             }
1033             break;
1034 
1035         case 6:
1036             // lookup in the given table the given id (=URI=rowkey)
1037             if (leftArgs.length < 3) {
1038 		System.err.println("error: missing identifier");
1039                 printUsage(options);
1040             }
1041             {
1042 		String identifier = leftArgs[2];
1043 
1044                 lookupIdInTable(identifier, tableName);
1045             }
1046             break;
1047 
1048         case 7:
1049             // update the attribute RDF.COMMENT of id (=URI=rowkey) in table with newvalue
1050             if (leftArgs.length < 3) {
1051 		System.err.println("error: missing identifier");
1052                 printUsage(options);
1053             } else if (leftArgs.length < 4) {
1054 		System.err.println("error: missing new_value");
1055                 printUsage(options);
1056             }
1057             {
1058 		String identifier = leftArgs[2];
1059 		String newValue   = leftArgs[3];
1060 
1061                 updateIdInTable(identifier, tableName, newValue);
1062             }
1063             break;
1064 
1065         case 8:
1066             // delete in the given table the given id (=URI=rowkey)
1067             if (leftArgs.length < 3) {
1068 		System.err.println("error: missing identifier");
1069                 printUsage(options);
1070             }
1071             {
1072 		String identifier = leftArgs[2];
1073 		
1074                 deleteIdInTable(identifier, tableName);
1075             }
1076             break;
1077 
1078 	case 12:
1079             // once retrieve all the rows in the given table
1080             {
1081 		int maxRecords = 0;
1082 		if (leftArgs.length > 2) {
1083 		    maxRecords = Integer.parseInt(leftArgs[2]);
1084 		}
1085 
1086 		onceRetrieveRowsInTable(tableName, maxRecords);
1087             }
1088             break;
1089 
1090 	case 13:
1091             // once retrieve all the rows in the given table printing only the ID
1092             {
1093 		int maxRecords = 0;
1094 		if (leftArgs.length > 2) {
1095 		    maxRecords = Integer.parseInt(leftArgs[2]);
1096 		}
1097 
1098 		onceRetrieveRowsInTable_IdOnly(tableName, maxRecords);
1099             }
1100             break;
1101 
1102 	default:
1103             printUsage(options);
1104         }
1105 	
1106         System.exit(0);
1107     }
1108 
1109 }