View Javadoc
1   /*******************************************************************************
2    *   Gisgraphy Project 
3    * 
4    *   This library is free software; you can redistribute it and/or
5    *   modify it under the terms of the GNU Lesser General Public
6    *   License as published by the Free Software Foundation; either
7    *   version 2.1 of the License, or (at your option) any later version.
8    * 
9    *   This library is distributed in the hope that it will be useful,
10   *   but WITHOUT ANY WARRANTY; without even the implied warranty of
11   *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12   *   Lesser General Public License for more details.
13   * 
14   *   You should have received a copy of the GNU Lesser General Public
15   *   License along with this library; if not, write to the Free Software
16   *   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA
17   * 
18   *  Copyright 2008  Gisgraphy project 
19   *  David Masclet <davidmasclet@gisgraphy.com>
20   *  
21   *  
22   *******************************************************************************/
23  /**
24   *
25   */
26  package com.gisgraphy.importer;
27  
28  import java.io.BufferedInputStream;
29  import java.io.BufferedReader;
30  import java.io.File;
31  import java.io.FileInputStream;
32  import java.io.FileNotFoundException;
33  import java.io.IOException;
34  import java.io.InputStream;
35  import java.io.InputStreamReader;
36  import java.io.UnsupportedEncodingException;
37  
38  import org.slf4j.Logger;
39  import org.slf4j.LoggerFactory;
40  import org.springframework.beans.factory.annotation.Autowired;
41  import org.springframework.beans.factory.annotation.Required;
42  import org.springframework.transaction.PlatformTransactionManager;
43  import org.springframework.transaction.TransactionDefinition;
44  import org.springframework.transaction.TransactionStatus;
45  import org.springframework.transaction.annotation.Isolation;
46  import org.springframework.transaction.support.DefaultTransactionDefinition;
47  
48  import com.gisgraphy.domain.repository.GisFeatureDao;
49  import com.gisgraphy.domain.valueobject.Constants;
50  import com.gisgraphy.domain.valueobject.ImporterStatus;
51  import com.gisgraphy.service.IInternationalisationService;
52  
53  /**
54   * Base class for all geonames processor. it provides session management and the
55   * ability to process one or more CSV file
56   * 
57   * @author <a href="mailto:david.masclet@gisgraphy.com">David Masclet</a>
58   */
59  public abstract class AbstractSimpleImporterProcessor implements IImporterProcessor {
60      protected int totalReadLine = 0;
61      protected int readFileLine = 0;
62      protected String statusMessage = "";
63  
64      protected ImporterStatus status = ImporterStatus.WAITING;
65      
66      @Autowired
67      protected IInternationalisationService internationalisationService;
68  
69      /**
70       * @see IImporterProcessor#getNumberOfLinesToProcess()
71       */
72      int numberOfLinesToProcess = 0;
73  
74      /**
75       * This fields is use to generate unique featureid when importing features
76       * because we don't know yet the featureId and this field is required. it
77       * should be multiply by -1 to be sure that it is not in conflict with the
78       * Geonames one which are all positive
79       * 
80       * @see GisFeatureDao#getDirties()
81       */
82      static Long nbGisInserted = 0L;
83  
84      protected ImporterConfig importerConfig;
85  
86      /**
87       * The logger
88       */
89      protected static final Logger logger = LoggerFactory
90  	    .getLogger(AbstractSimpleImporterProcessor.class);
91  
92      private File[] filesToProcess;
93  
94      /**
95       * Lines starting with this prefix are considered as comments
96       */
97      protected String COMMENT_START = "#";
98  
99      private boolean hasConsumedFirstLine = false;
100 
101     /**
102      * Whether the end of the document has been reached
103      */
104     private boolean endOfDocument = false;
105 
106     /**
107      * The bufferReader for the current read Geonames file
108      */
109     protected BufferedReader in;
110 
111     /**
112      * The transaction manager
113      */
114     protected PlatformTransactionManager transactionManager;
115 
116     /**
117      * Template Method : Whether the processor should ignore the first line of
118      * the input
119      * 
120      * @return true if the processor should ignore first line
121      */
122     protected abstract boolean shouldIgnoreFirstLine();
123 
124     /**
125      * Should flush and clear all the Daos that are used by the processor. This
126      * avoid memory leak
127      */
128     protected abstract void flushAndClear();
129 
130     /**
131      * Will flush after every commit
132      * 
133      * @see #flushAndClear()
134      */
135     protected abstract void setCommitFlushMode();
136 
137     protected TransactionStatus txStatus = null;
138 
139     protected DefaultTransactionDefinition txDefinition;
140 
141     /**
142      * @return the number of fields the processed Geonames file should have
143      */
144     protected abstract int getNumberOfColumns();
145 
146     /**
147      * Whether the filter should ignore the comments (i.e. lines starting with #)
148      * 
149      * @see AbstractSimpleImporterProcessor#COMMENT_START
150      */
151     protected abstract boolean shouldIgnoreComments();
152 
153     /**
154      * Whether we should consider the line as as comment or not (i.e. : it
155      * doesn't start with {@link #COMMENT_START})
156      * 
157      * @param input
158      *                the line we want to know if it is a commented line
159      * @return true is the specified line is a commented line
160      */
161     private boolean isNotComment(String input) {
162 	return (!shouldIgnoreComments())
163 		|| (shouldIgnoreComments() && !input.startsWith(COMMENT_START));
164     }
165 
166     /**
167      * Default constructor
168      */
169     public AbstractSimpleImporterProcessor() {
170 	super();
171     }
172 
173     /**
174      * The current processed file
175      */
176     protected File currentFile;
177 
178     /**
179      * Template method that can be override. This method is called before the
180      * process start. it is not called for each file processed.
181      */
182     protected void setup() {
183     }
184 
185     /**
186      * @return The files to be process
187      * @see ImporterHelper
188      */
189     protected abstract File[] getFiles();
190 
191     /*
192      * (non-Javadoc)
193      * 
194      * @see com.gisgraphy.domain.geoloc.importer.IGeonamesProcessor#getCurrentFile()
195      */
196     public String getCurrentFileName() {
197 
198 	if (this.currentFile != null) {
199 	    return this.currentFile.getName();
200 	}
201 	return "unknow";
202     }
203 
204     /**
205      * Process the line if needed (is not a comment, should ignore first line,
206      * is end of document,...)
207      * 
208      * @return The number of lines that have been processed for the current
209      *         processed file
210      * @throws ImporterException
211      *                 if an error occurred
212      */
213     public int readLineAndProcessData() throws ImporterException {
214 	if (this.isEndOfDocument()) {
215 	    throw new IllegalArgumentException(
216 		    "Must NOT be called when it is the end of the document");
217 	}
218 
219 	String input;
220 	try {
221 	    input = (this.in).readLine();
222 	} catch (IOException e1) {
223 	    throw new ImporterException("can not read line ", e1);
224 	}
225 
226 	if (input != null) {
227 	    readFileLine++;
228 	    if (isNotComment(input)) {
229 		if (this.shouldIgnoreFirstLine() && !hasConsumedFirstLine) {
230 		    hasConsumedFirstLine = true;
231 		} else {
232 		    try {
233 			this.processData(input);
234 		    } catch (MissingRequiredFieldException mrfe) {
235 			if (this.importerConfig.isMissingRequiredFieldThrows()) {
236 			    logger.error("A requrired field is missing "
237 				    + mrfe.getMessage());
238 			    throw new ImporterException(
239 				    "A requrired field is missing "
240 					    + mrfe.getMessage(), mrfe);
241 			} else {
242 			    logger.warn(mrfe.getMessage());
243 			}
244 		    } catch (WrongNumberOfFieldsException wnofe) {
245 			if (this.importerConfig.isWrongNumberOfFieldsThrows()) {
246 			    logger
247 				    .error("wrong number of fields during import "
248 					    + wnofe.getMessage());
249 			    throw new ImporterException(
250 				    "Wrong number of fields during import "
251 					    + wnofe.getMessage(), wnofe);
252 			} else {
253 			    logger.warn(wnofe.getMessage());
254 			}
255 		    } catch (Exception e) {
256 			String message= "An Error occurred on Line "
257 				+ readFileLine + " for " + input + " : "
258 				+ e.getMessage();
259 			throw new ImporterException(
260 				message, e);
261 		    }
262 		}
263 	    }
264 
265 	} else {
266 	    this.endOfDocument = true;
267 	}
268 	return readFileLine;
269     }
270 
271     /**
272      * Process a read line of the geonames file, must be implemented by the
273      * concrete class
274      * 
275      * @param line
276      *                the line to process
277      */
278     protected abstract void processData(String line)
279 	    throws ImporterException;
280 
281     /**
282      * Manage the transaction, flush Daos, and process all files to be processed
283      */
284     public void process() {
285 	try {
286 	    if (shouldBeSkipped()){
287 		this.status = ImporterStatus.SKIPPED;
288 		return;
289 	    }
290 	    this.status = ImporterStatus.PROCESSING;
291 	    this.getNumberOfLinesToProcess();
292 	    setup();
293 	    this.filesToProcess = getFiles();
294 	    if (this.filesToProcess.length == 0) {
295 	    	logger.info("there is 0 file to process for "
296 			+ this.getClass().getSimpleName());
297 	    	this.status= ImporterStatus.SKIPPED;
298 	    	return;
299 	    }
300 	    for (int i = 0; i < filesToProcess.length; i++) {
301 			currentFile = filesToProcess[i];
302 			this.endOfDocument = false;
303 			getBufferReader(filesToProcess[i]);
304 			processFile();
305 			closeBufferReader();
306 			onFileProcessed(filesToProcess[i]);
307 	    }
308 	} catch (Exception e) {
309 	    processError(e);
310 	} finally {
311 	    try {
312 		tearDown();
313 		this.status = this.status==ImporterStatus.PROCESSING ? ImporterStatus.PROCESSED : this.status;
314 		if (this.status!= ImporterStatus.ERROR){
315 		    this.statusMessage="";
316 		}
317 	    } catch (Exception e) {
318 		this.status = ImporterStatus.ERROR;
319 		String teardownErrorMessage= "An error occured on teardown (the import is done but maybe not optimzed) :"+e.getMessage();
320 		this.statusMessage = this.statusMessage != ""? this.statusMessage+ " and "+teardownErrorMessage:teardownErrorMessage ;
321 		logger.error(statusMessage);
322 	    }
323 	}
324     }
325 
326     /**
327      * Method called when there is an exception. 
328      * the teardown method will be call after this
329      * @param e
330      */
331     protected void processError(Exception e) {
332 	this.status = ImporterStatus.ERROR;
333 	this.statusMessage = "An error occurred when processing "
334 	    + this.getClass().getSimpleName()+ " : " + e.getMessage();
335 	logger.error(statusMessage,e);
336 	throw new ImporterException(statusMessage, e.getCause());
337     }
338 
339 
340    
341     /* (non-Javadoc)
342      * @see com.gisgraphy.domain.geoloc.importer.IImporterProcessor#shouldBeSkipped()
343      */
344     public boolean shouldBeSkipped() {
345 	return false;
346     }
347 
348     private void getBufferReader(File file) {
349 	InputStream inInternal = null;
350 	// uses a BufferedInputStream for better performance
351 	try {
352 	    inInternal = new BufferedInputStream(new FileInputStream(file));
353 	} catch (FileNotFoundException e) {
354 	    throw new RuntimeException(e);
355 	}
356 
357 	try {
358 	    this.in = new BufferedReader(new InputStreamReader(inInternal,
359 		    Constants.CHARSET));
360 	} catch (UnsupportedEncodingException e) {
361 	    throw new RuntimeException(e);
362 	}
363     }
364 
365     private void processFile() throws ImporterException {
366 	try {
367 	    hasConsumedFirstLine = false;
368 	    readFileLine = 0;
369 	    logger.info("will process " + getCurrentFileName());
370 	    // Transaction Definition
371 	    txDefinition = new DefaultTransactionDefinition();
372 	    txDefinition
373 		    .setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
374 	    txDefinition.setIsolationLevel(Isolation.READ_UNCOMMITTED.value());
375 	    txDefinition.setReadOnly(false);
376 
377 	    startTransaction();
378 	    setCommitFlushMode();
379 	    while (!isEndOfDocument()) {
380 		this.readLineAndProcessData();
381 		incrementReadedFileLine(1);
382 		if (needCommit()) {
383 		    logger
384 			    .info("We need to commit, flushing and clearing: "
385 				    + totalReadLine);
386 		    // and commit !
387 		    commit();
388 		    startTransaction();
389 		    setCommitFlushMode();
390 
391 		}
392 	    }
393 	    commit();
394 	    decrementReadedFileLine(1);// remove a processed line because it has been
395 	    // incremented on time more
396 	} catch (Exception e) {
397 	    rollbackTransaction();
398 	    throw new ImporterException(
399 		    "An error occurred when processing "
400 			    + getCurrentFileName() + " : " + e.getMessage(), e.getCause());
401 	}
402     }
403 
404     protected int incrementReadedFileLine(int increment) {
405 	totalReadLine = totalReadLine+increment;
406 	return totalReadLine;
407 	
408     }
409     
410     protected int decrementReadedFileLine(int decrement) {
411 	totalReadLine = totalReadLine-decrement;
412 	return totalReadLine;
413 	
414     }
415 
416     protected void rollbackTransaction() {
417 	transactionManager.rollback(txStatus);
418     }
419 
420     protected boolean needCommit() {
421 	return totalReadLine % this.getMaxInsertsBeforeFlush() == 0;
422     }
423 
424     protected void startTransaction() {
425     txDefinition = new DefaultTransactionDefinition();
426 	    txDefinition
427 		    .setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
428 	    txDefinition.setIsolationLevel(Isolation.READ_UNCOMMITTED.value());
429 	    txDefinition.setReadOnly(false);
430 	txStatus = transactionManager.getTransaction(txDefinition);
431 	
432     }
433 
434     /**
435      * Template method that can be override. This method is called after the end
436      * of the process. it is not called for each file processed.
437      * You should always call super.tearDown() when you overide this method
438      */
439     protected void tearDown() {
440 	closeBufferReader();
441     }
442 
443     private void closeBufferReader() {
444 	if (in != null) {
445 	    try {
446 		in.close();
447 	    } catch (IOException e) {
448 
449 	    }
450 	}
451     }
452     
453     /**
454      * hook to do when the file has been processed without error
455      * @param file
456      */
457     protected void onFileProcessed(File file){
458     	if (importerConfig.isRenameFilesAfterProcessing()){
459     		currentFile.renameTo(new File(currentFile.getAbsoluteFile()+".done"));
460     	}
461     }
462 
463     protected void commit() {
464 		flushAndClear();
465 		transactionManager.commit(this.txStatus);
466     }
467 
468     /**
469      * Check that the array is not null, and the fields of the specified
470      * position is not empty (after been trimed)
471      * 
472      * @param fields
473      *                The array to test
474      * @param position
475      *                the position of the field to test in the array
476      * @param required
477      *                if an exception should be thrown if the field is empty
478      * @return true is the field of the specifed position is empty
479      * @throws MissingRequiredFieldException
480      *                 if the fields is empty and required is true
481      */
482     protected static boolean isEmptyField(String[] fields, int position,
483     		boolean required) {
484     	if (fields == null) {
485     		if (!required) {
486     			return true;
487     		} else {
488     			throw new MissingRequiredFieldException(
489     					"can not chek fields if the array is null");
490     		}
491     	}
492     	if (position < 0) {
493     		if (!required) {
494     			return true;
495     		} else {
496     		throw new MissingRequiredFieldException(
497     				"position can not be < 0 => position = " + position);
498     		}
499     	}
500     	if (fields.length == 0) {
501     		if (!required) {
502     			return true;
503     		} else {
504     		throw new MissingRequiredFieldException("fields is empty");
505     		}
506     	}
507     	if (position > (fields.length - 1)) {
508 
509     		if (!required) {
510     			return true;
511     		} else {
512     			throw new MissingRequiredFieldException("fields has "
513     					+ (fields.length)
514     					+ " element(s), can not get element with position "
515     					+ (position) + " : " + dumpFields(fields));
516     		}
517 
518     	}
519     	String string = fields[position];
520     	if (string != null && (string.trim().equals("") || string.equals("\"\""))) {
521     		if (!required) {
522     			return true;
523     		} else {
524     			throw new MissingRequiredFieldException("fields[" + position
525     					+ "] is required for featureID " + fields[0] + " : "
526     					+ dumpFields(fields));
527     		}
528     	}
529     	return false;
530 
531     }
532 
533     /**
534      * @param fields
535      *                The array to process
536      * @return a string which represent a human readable string of the Array
537      */
538     protected static String dumpFields(String[] fields) {
539 	String result = "[";
540 	for (String element : fields) {
541 	    result = result + element + ";";
542 	}
543 	return result + "]";
544     }
545 
546     /**
547      * Utility method which throw an exception if the number of fields is not
548      * the one expected (retrieved by {@link #getNumberOfColumns()})
549      * 
550      * @see #getNumberOfColumns()
551      * @param fields
552      *                The array to check
553      */
554     protected void checkNumberOfColumn(String[] fields) {
555 	if (fields.length != getNumberOfColumns()) {
556 
557 	    throw new WrongNumberOfFieldsException(
558 		    "The number of fields is not correct. expected : "
559 			    + getNumberOfColumns() + ", founds :  "
560 			    + fields.length+ ". details :"+dumpFields(fields));
561 	}
562 
563     }
564 
565     /**
566      * @return true if the end of the document for the current processed file is
567      *         reached
568      */
569     protected boolean isEndOfDocument() {
570 	return endOfDocument;
571     }
572 
573     /*
574      * (non-Javadoc)
575      * 
576      * @see com.gisgraphy.domain.geoloc.importer.IGeonamesProcessor#getReadFileLine()
577      */
578     public long getReadFileLine() {
579 	return this.readFileLine;
580     }
581 
582     /*
583      * (non-Javadoc)
584      * 
585      * @see com.gisgraphy.domain.geoloc.importer.IGeonamesProcessor#getTotalReadedLine()
586      */
587     public long getTotalReadLine() {
588 	return this.totalReadLine;
589     }
590 
591     @Required
592     public void setTransactionManager(
593 	    PlatformTransactionManager transactionManager) {
594 	this.transactionManager = transactionManager;
595     }
596 
597     @Required
598     public void setImporterConfig(ImporterConfig importerConfig) {
599 	this.importerConfig = importerConfig;
600     }
601 
602     /**
603      * @return the number of line to process
604      */
605     protected int countLines(File[] files) {
606     logger.info("counting lines");
607 	int lines = 0;
608 	BufferedReader br = null;
609 	BufferedInputStream bis = null;
610 	for (int i = 0; i < files.length; i++) {
611 	    File countfile = files[i];
612 	    logger.info("counting lines of "+countfile);
613 	    try {
614 		bis = new BufferedInputStream(new FileInputStream(countfile));
615 		br = new BufferedReader(new InputStreamReader(bis,
616 			Constants.CHARSET));
617 		while (br.readLine() != null) {
618 		    lines++;
619 		}
620 	    } catch (Exception e) {
621 		String filename = countfile == null ? null : countfile
622 			.getName();
623 		logger.warn("can not count lines for " + filename + " : "
624 			+ e.getMessage(), e);
625 		logger.info("end of counting lines");
626 		return lines;
627 	    } finally {
628 		if (bis != null) {
629 		    try {
630 			bis.close();
631 		    } catch (IOException e) {
632 
633 		    }
634 		}
635 		if (br != null) {
636 		    try {
637 			br.close();
638 		    } catch (IOException e) {
639 
640 		    }
641 		}
642 	    }
643 	}
644 
645 	logger.info("There is " + lines + " to process for "
646 		+ this.getClass().getSimpleName());
647 	return lines;
648     }
649 
650     /*
651      * (non-Javadoc)
652      * 
653      * @see com.gisgraphy.domain.geoloc.importer.IGeonamesProcessor#getNumberOfLinesToProcess()
654      */
655     public long getNumberOfLinesToProcess() {
656 	if (this.numberOfLinesToProcess == 0 && this.status == ImporterStatus.PROCESSING) {
657 	    // it may not have been calculated yet
658 	    this.numberOfLinesToProcess = countLines(getFiles());
659 	}
660 	return this.numberOfLinesToProcess;
661     }
662 
663     /*
664      * (non-Javadoc)
665      * 
666      * @see com.gisgraphy.domain.geoloc.importer.IGeonamesProcessor#getStatus()
667      */
668     public ImporterStatus getStatus() {
669 	return this.status;
670     }
671 
672     /**
673      * @return The option
674      * @see ImporterConfig#setMaxInsertsBeforeFlush(int)
675      */
676     protected int getMaxInsertsBeforeFlush() {
677     	return importerConfig.getMaxInsertsBeforeFlush();
678     }
679 
680     public void resetStatus() {
681 	this.currentFile = null;
682 	this.readFileLine = 0;
683 	this.totalReadLine = 0;
684 	this.numberOfLinesToProcess = 0;
685 	this.status = ImporterStatus.WAITING;
686 	this.statusMessage = "";
687     }
688 
689     /*
690      * (non-Javadoc)
691      * 
692      * @see com.gisgraphy.domain.geoloc.importer.IGeonamesProcessor#getErrorMessage()
693      */
694     public String getStatusMessage() {
695 	return statusMessage;
696     }
697 
698     public void setInternationalisationService(IInternationalisationService internationalisationService) {
699         this.internationalisationService = internationalisationService;
700     }
701 
702 }