View Javadoc

1   /*******************************************************************************
2    *   Gisgraphy Project 
3    * 
4    *   This library is free software; you can redistribute it and/or
5    *   modify it under the terms of the GNU Lesser General Public
6    *   License as published by the Free Software Foundation; either
7    *   version 2.1 of the License, or (at your option) any later version.
8    * 
9    *   This library is distributed in the hope that it will be useful,
10   *   but WITHOUT ANY WARRANTY; without even the implied warranty of
11   *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12   *   Lesser General Public License for more details.
13   * 
14   *   You should have received a copy of the GNU Lesser General Public
15   *   License along with this library; if not, write to the Free Software
16   *   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA
17   * 
18   *  Copyright 2008  Gisgraphy project 
19   *  David Masclet <davidmasclet@gisgraphy.com>
20   *  
21   *  
22   *******************************************************************************/
23  /**
24   *
25   */
26  package com.gisgraphy.domain.geoloc.importer;
27  
28  import java.io.BufferedInputStream;
29  import java.io.BufferedReader;
30  import java.io.File;
31  import java.io.FileInputStream;
32  import java.io.FileNotFoundException;
33  import java.io.IOException;
34  import java.io.InputStream;
35  import java.io.InputStreamReader;
36  import java.io.UnsupportedEncodingException;
37  
38  import org.slf4j.Logger;
39  import org.slf4j.LoggerFactory;
40  import org.springframework.beans.factory.annotation.Required;
41  import org.springframework.transaction.PlatformTransactionManager;
42  import org.springframework.transaction.TransactionDefinition;
43  import org.springframework.transaction.TransactionStatus;
44  import org.springframework.transaction.support.DefaultTransactionDefinition;
45  
46  import com.gisgraphy.domain.repository.GisFeatureDao;
47  import com.gisgraphy.domain.valueobject.Constants;
48  import com.gisgraphy.domain.valueobject.ImporterStatus;
49  
50  /**
51   * Base class for all geonames processor. it provides session management and the
52   * ability to process one or more CSV file
53   * 
54   * @author <a href="mailto:david.masclet@gisgraphy.com">David Masclet</a>
55   */
56  public abstract class AbstractImporterProcessor implements IImporterProcessor {
57      protected int totalReadLine = 0;
58      protected int readFileLine = 0;
59      protected String statusMessage = "";
60  
61      protected ImporterStatus status = ImporterStatus.WAITING;
62  
63      /**
64       * @see IImporterProcessor#getNumberOfLinesToProcess()
65       */
66      int numberOfLinesToProcess = 0;
67  
68      /**
69       * This fields is use to generate unique featureid when importing features
70       * because we don't know yet the featureId and this field is required. it
71       * should be multiply by -1 to be sure that it is not in conflict with the
72       * Geonames one which are all positive
73       * 
74       * @see GisFeatureDao#getDirties()
75       */
76      static Long nbGisInserted = 0L;
77  
78      protected ImporterConfig importerConfig;
79  
80      /**
81       * The logger
82       */
83      protected static final Logger logger = LoggerFactory
84  	    .getLogger(AbstractImporterProcessor.class);
85  
86      private File[] filesToProcess;
87  
88      /**
89       * Lines starting with this prefix are considered as comments
90       */
91      protected String COMMENT_START = "#";
92  
93      private boolean hasConsumedFirstLine = false;
94  
95      /**
96       * Whether the end of the document has been reached
97       */
98      private boolean endOfDocument = false;
99  
100     /**
101      * The bufferReader for the current read Geonames file
102      */
103     protected BufferedReader in;
104 
105     /**
106      * The transaction manager
107      */
108     private PlatformTransactionManager transactionManager;
109 
110     /**
111      * Template Method : Whether the processor should ignore the first line of
112      * the input
113      * 
114      * @return true if the processor should ignore first line
115      */
116     protected abstract boolean shouldIgnoreFirstLine();
117 
118     /**
119      * Should flush and clear all the Daos that are used by the processor. This
120      * avoid memory leak
121      */
122     protected abstract void flushAndClear();
123 
124     /**
125      * Will flush after every commit
126      * 
127      * @see #flushAndClear()
128      */
129     protected abstract void setCommitFlushMode();
130 
131     private TransactionStatus txStatus = null;
132 
133     private DefaultTransactionDefinition txDefinition;
134 
135     /**
136      * @return the number of fields the processed Geonames file should have
137      */
138     protected abstract int getNumberOfColumns();
139 
140     /**
141      * Whether the filter should ignore the comments (i.e. lines starting with #)
142      * 
143      * @see AbstractImporterProcessor#COMMENT_START
144      */
145     protected abstract boolean shouldIgnoreComments();
146 
147     /**
148      * Whether we should consider the line as as comment or not (i.e. : it
149      * doesn't start with {@link #COMMENT_START})
150      * 
151      * @param input
152      *                the line we want to know if it is a commented line
153      * @return true is the specified line is a commented line
154      */
155     private boolean isNotComment(String input) {
156 	return (!shouldIgnoreComments())
157 		|| (shouldIgnoreComments() && !input.startsWith(COMMENT_START));
158     }
159 
160     /**
161      * Default constructor
162      */
163     public AbstractImporterProcessor() {
164 	super();
165     }
166 
167     /**
168      * The current processed file
169      */
170     protected File currentFile;
171 
172     /**
173      * Template method that can be override. This method is called before the
174      * process start. it is not called for each file processed.
175      */
176     protected void setup() {
177     }
178 
179     /**
180      * @return The files to be process
181      * @see ImporterHelper
182      */
183     protected abstract File[] getFiles();
184 
185     /*
186      * (non-Javadoc)
187      * 
188      * @see com.gisgraphy.domain.geoloc.importer.IGeonamesProcessor#getCurrentFile()
189      */
190     public String getCurrentFileName() {
191 
192 	if (this.currentFile != null) {
193 	    return this.currentFile.getName();
194 	}
195 	return "unknow";
196     }
197 
198     /**
199      * Process the line if needed (is not a comment, should ignore first line,
200      * is end of document,...)
201      * 
202      * @return The number of lines that have been processed for the current
203      *         processed file
204      * @throws GeonamesProcessorException
205      *                 if an error occurred
206      */
207     public int readLineAndProcessData() throws GeonamesProcessorException {
208 	if (this.isEndOfDocument()) {
209 	    throw new IllegalArgumentException(
210 		    "Must NOT be called when it is the end of the document");
211 	}
212 
213 	String input;
214 	try {
215 	    input = (this.in).readLine();
216 	} catch (IOException e1) {
217 	    throw new GeonamesProcessorException("can not read line ", e1);
218 	}
219 
220 	if (input != null) {
221 	    readFileLine++;
222 	    if (isNotComment(input)) {
223 		if (this.shouldIgnoreFirstLine() && !hasConsumedFirstLine) {
224 		    hasConsumedFirstLine = true;
225 		} else {
226 		    try {
227 			this.processData(input);
228 		    } catch (MissingRequiredFieldException mrfe) {
229 			if (this.importerConfig.isMissingRequiredFieldThrows()) {
230 			    logger.error("A requrired field is missing "
231 				    + mrfe.getMessage());
232 			    throw new GeonamesProcessorException(
233 				    "A requrired field is missing "
234 					    + mrfe.getMessage(), mrfe);
235 			} else {
236 			    logger.warn(mrfe.getMessage());
237 			}
238 		    } catch (WrongNumberOfFieldsException wnofe) {
239 			if (this.importerConfig.isWrongNumberOfFieldsThrows()) {
240 			    logger
241 				    .error("wrong number of fields during import "
242 					    + wnofe.getMessage());
243 			    throw new GeonamesProcessorException(
244 				    "Wrong number of fields during import "
245 					    + wnofe.getMessage(), wnofe);
246 			} else {
247 			    logger.warn(wnofe.getMessage());
248 			}
249 		    } catch (Exception e) {
250 			e.printStackTrace();
251 			logger.error("An Error occurred on Line "
252 				+ readFileLine + " for " + input + " : "
253 				+ e.getCause());
254 			throw new GeonamesProcessorException(
255 				"An Error occurred on Line " + readFileLine
256 					+ " for " + input + " : "
257 					+ e.getCause(), e);
258 		    }
259 		}
260 	    }
261 
262 	} else {
263 	    this.endOfDocument = true;
264 	}
265 	return readFileLine;
266     }
267 
268     /**
269      * Process a read line of the geonames file, must be implemented by the
270      * concrete class
271      * 
272      * @param line
273      *                the line to process
274      */
275     protected abstract void processData(String line)
276 	    throws GeonamesProcessorException;
277 
278     /**
279      * Manage the transaction, flush Daos, and process all files to be processed
280      */
281     public void process() {
282 	try {
283 	    if (shouldBeSkipped()){
284 		this.status = ImporterStatus.SKIPPED;
285 		return;
286 	    }
287 	    this.status = ImporterStatus.PROCESSING;
288 	    this.getNumberOfLinesToProcess();
289 	    setup();
290 	    this.filesToProcess = getFiles();
291 	    if (this.filesToProcess.length == 0) {
292 		logger.info("there is 0 file to process for "
293 			+ this.getClass().getSimpleName());
294 		return;
295 	    }
296 	    for (int i = 0; i < filesToProcess.length; i++) {
297 		currentFile = filesToProcess[i];
298 		this.endOfDocument = false;
299 		getBufferReader(filesToProcess[i]);
300 		processFile();
301 		closeBufferReader();
302 
303 	    }
304 	    
305 	this.status = ImporterStatus.PROCESSED;
306 	} catch (Exception e) {
307 	    e.printStackTrace();
308 	    this.status = ImporterStatus.ERROR;
309 	    this.statusMessage = "An error occurred when processing "
310 		    + this.getClass().getSimpleName() + " on file "
311 		    + getCurrentFileName() + " on line " + getReadFileLine()
312 		    + " : " + e.getCause();
313 	    logger.error(statusMessage);
314 	    throw new GeonamesProcessorException(statusMessage, e.getCause());
315 	} finally {
316 	    try {
317 		tearDown();
318 	    } catch (Exception e) {
319 		this.status = ImporterStatus.ERROR;
320 		this.statusMessage = "An error occured on teardown (the import is done but is not being optimzed) :"+e;
321 		logger.error(statusMessage);
322 	    }
323 	}
324     }
325 
326 
327     /**
328      * @return true if the processor should Not be executed
329      */
330     protected boolean shouldBeSkipped() {
331 	return false;
332     }
333 
334     private void getBufferReader(File file) {
335 	InputStream inInternal = null;
336 	// uses a BufferedInputStream for better performance
337 	try {
338 	    inInternal = new BufferedInputStream(new FileInputStream(file));
339 	} catch (FileNotFoundException e) {
340 	    throw new RuntimeException(e);
341 	}
342 
343 	try {
344 	    this.in = new BufferedReader(new InputStreamReader(inInternal,
345 		    Constants.CHARSET));
346 	} catch (UnsupportedEncodingException e) {
347 	    throw new RuntimeException(e);
348 	}
349     }
350 
351     private void processFile() throws GeonamesProcessorException {
352 	try {
353 	    hasConsumedFirstLine = false;
354 	    readFileLine = 0;
355 	    logger.info("will process " + getCurrentFileName());
356 	    // Transaction Definition
357 	    txDefinition = new DefaultTransactionDefinition();
358 	    txDefinition
359 		    .setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
360 
361 	    txStatus = transactionManager.getTransaction(txDefinition);
362 	    setCommitFlushMode();
363 	    while (!isEndOfDocument()) {
364 		this.readLineAndProcessData();
365 		totalReadLine++;
366 		if (totalReadLine % this.getMaxInsertsBeforeFlush() == 0) {
367 		    logger
368 			    .info("maxInsertsBeforeFlush reached, flushing and clearing: "
369 				    + totalReadLine);
370 		    // and commit !
371 		    commit(txStatus);
372 		    // and re-opens a new transaction
373 		    txStatus = transactionManager.getTransaction(txDefinition);
374 		    setCommitFlushMode();
375 
376 		}
377 	    }
378 	    commit(txStatus);
379 	    totalReadLine--;// remove a processed line because it has been
380 	    // incremented on time more
381 	} catch (Exception e) {
382 	    transactionManager.rollback(txStatus);
383 	    throw new GeonamesProcessorException(
384 		    "An error occurred when processing "
385 			    + getCurrentFileName() + " on line "
386 			    + readFileLine + " : " + e.getCause(), e.getCause());
387 	}
388     }
389 
390     /**
391      * Template method that can be override. This method is called after the end
392      * of the process. it is not called for each file processed.
393      */
394     protected void tearDown() {
395 	closeBufferReader();
396     }
397 
398     private void closeBufferReader() {
399 	if (in != null) {
400 	    try {
401 		in.close();
402 	    } catch (IOException e) {
403 
404 	    }
405 	}
406     }
407 
408     private void commit(TransactionStatus txStatus) {
409 	flushAndClear();
410 	transactionManager.commit(txStatus);
411     }
412 
413     /**
414      * Check that the array is not null, and the fields of the specified
415      * position is not empty (after been trimed)
416      * 
417      * @param fields
418      *                The array to test
419      * @param position
420      *                the position of the field to test in the array
421      * @param required
422      *                if an exception should be thrown if the field is empty
423      * @return true is the field of the specifed position is empty
424      * @throws MissingRequiredFieldException
425      *                 if the fields is empty and required is true
426      */
427     protected static boolean isEmptyField(String[] fields, int position,
428 	    boolean required) {
429 	if (fields == null) {
430 	    throw new MissingRequiredFieldException(
431 		    "can not chek fields if the array is null");
432 	}
433 	if (position < 0) {
434 	    throw new MissingRequiredFieldException(
435 		    "position can not be < 0 => position = " + position);
436 	}
437 	if (fields.length == 0) {
438 	    throw new MissingRequiredFieldException("fields is empty");
439 	}
440 	if (position > (fields.length - 1)) {
441 
442 	    if (!required) {
443 		return true;
444 	    } else {
445 		throw new MissingRequiredFieldException("fields has "
446 			+ (fields.length)
447 			+ " element(s), can not get element with position "
448 			+ (position) + " : " + dumpFields(fields));
449 	    }
450 
451 	}
452 	String string = fields[position];
453 	if (string != null && string.trim().equals("")) {
454 	    if (!required) {
455 		return true;
456 	    } else {
457 		throw new MissingRequiredFieldException("fields[" + position
458 			+ "] is required for featureID " + fields[0] + " : "
459 			+ dumpFields(fields));
460 	    }
461 	}
462 	return false;
463 
464     }
465 
466     /**
467      * @param fields
468      *                The array to process
469      * @return a string which represent a human readable string of the Array
470      */
471     protected static String dumpFields(String[] fields) {
472 	String result = "[";
473 	for (String element : fields) {
474 	    result = result + element + ";";
475 	}
476 	return result + "]";
477     }
478 
479     /**
480      * Utility method which throw an exception if the number of fields is not
481      * the one expected (retrieved by {@link #getNumberOfColumns()})
482      * 
483      * @see #getNumberOfColumns()
484      * @param fields
485      *                The array to check
486      */
487     protected void checkNumberOfColumn(String[] fields) {
488 	if (fields.length != getNumberOfColumns()) {
489 
490 	    throw new WrongNumberOfFieldsException(
491 		    "The number of fields is not correct. expected : "
492 			    + getNumberOfColumns() + ", founds :  "
493 			    + fields.length, new Throwable(
494 			    "The number of fields is not correct. expected : "
495 				    + getNumberOfColumns() + ", founds :  "
496 				    + fields.length));
497 	}
498 
499     }
500 
501     /**
502      * @return true if the end of the document for the current processed file is
503      *         reached
504      */
505     protected boolean isEndOfDocument() {
506 	return endOfDocument;
507     }
508 
509     /*
510      * (non-Javadoc)
511      * 
512      * @see com.gisgraphy.domain.geoloc.importer.IGeonamesProcessor#getReadFileLine()
513      */
514     public int getReadFileLine() {
515 	return this.readFileLine;
516     }
517 
518     /*
519      * (non-Javadoc)
520      * 
521      * @see com.gisgraphy.domain.geoloc.importer.IGeonamesProcessor#getTotalReadedLine()
522      */
523     public int getTotalReadLine() {
524 	return this.totalReadLine;
525     }
526 
527     @Required
528     public void setTransactionManager(
529 	    PlatformTransactionManager transactionManager) {
530 	this.transactionManager = transactionManager;
531     }
532 
533     @Required
534     public void setImporterConfig(ImporterConfig importerConfig) {
535 	this.importerConfig = importerConfig;
536     }
537 
538     /**
539      * @return the number of line to process
540      */
541     protected int countLines() {
542 	int lines = 0;
543 	BufferedReader br = null;
544 	File[] files = getFiles();
545 	BufferedInputStream bis = null;
546 	for (int i = 0; i < files.length; i++) {
547 	    File countfile = files[i];
548 	    try {
549 		bis = new BufferedInputStream(new FileInputStream(countfile));
550 		br = new BufferedReader(new InputStreamReader(bis,
551 			Constants.CHARSET));
552 		while (br.readLine() != null) {
553 		    lines++;
554 		}
555 	    } catch (Exception e) {
556 		String filename = countfile == null ? null : countfile
557 			.getName();
558 		logger.warn("can not count lines for " + filename + " : "
559 			+ e.getMessage(), e);
560 		return lines;
561 	    } finally {
562 		if (bis != null) {
563 		    try {
564 			bis.close();
565 		    } catch (IOException e) {
566 
567 		    }
568 		}
569 		if (br != null) {
570 		    try {
571 			br.close();
572 		    } catch (IOException e) {
573 
574 		    }
575 		}
576 	    }
577 	}
578 
579 	logger.info("There is " + lines + " to process for "
580 		+ this.getClass().getSimpleName());
581 	return lines;
582     }
583 
584     /*
585      * (non-Javadoc)
586      * 
587      * @see com.gisgraphy.domain.geoloc.importer.IGeonamesProcessor#getNumberOfLinesToProcess()
588      */
589     public int getNumberOfLinesToProcess() {
590 	if (this.numberOfLinesToProcess == 0 && this.status == ImporterStatus.PROCESSING) {
591 	    // it may not have been calculated yet
592 	    this.numberOfLinesToProcess = countLines();
593 	}
594 	return this.numberOfLinesToProcess;
595     }
596 
597     /*
598      * (non-Javadoc)
599      * 
600      * @see com.gisgraphy.domain.geoloc.importer.IGeonamesProcessor#getStatus()
601      */
602     public ImporterStatus getStatus() {
603 	return this.status;
604     }
605 
606     /**
607      * @return The option
608      * @see ImporterConfig#setMaxInsertsBeforeFlush(int)
609      */
610     protected int getMaxInsertsBeforeFlush() {
611 	return importerConfig.getMaxInsertsBeforeFlush();
612     }
613 
614     protected void resetStatusFields() {
615 	this.currentFile = null;
616 	this.readFileLine = 0;
617 	this.totalReadLine = 0;
618 	this.numberOfLinesToProcess = 0;
619 	this.status = ImporterStatus.WAITING;
620 	this.statusMessage = "";
621     }
622 
623     /*
624      * (non-Javadoc)
625      * 
626      * @see com.gisgraphy.domain.geoloc.importer.IGeonamesProcessor#getErrorMessage()
627      */
628     public String getStatusMessage() {
629 	return statusMessage;
630     }
631 
632 }