1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26 package com.gisgraphy.domain.geoloc.importer;
27
28 import java.io.BufferedInputStream;
29 import java.io.BufferedReader;
30 import java.io.File;
31 import java.io.FileInputStream;
32 import java.io.FileNotFoundException;
33 import java.io.IOException;
34 import java.io.InputStream;
35 import java.io.InputStreamReader;
36 import java.io.UnsupportedEncodingException;
37
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
40 import org.springframework.beans.factory.annotation.Required;
41 import org.springframework.transaction.PlatformTransactionManager;
42 import org.springframework.transaction.TransactionDefinition;
43 import org.springframework.transaction.TransactionStatus;
44 import org.springframework.transaction.support.DefaultTransactionDefinition;
45
46 import com.gisgraphy.domain.repository.GisFeatureDao;
47 import com.gisgraphy.domain.valueobject.Constants;
48 import com.gisgraphy.domain.valueobject.ImporterStatus;
49
50
51
52
53
54
55
56 public abstract class AbstractImporterProcessor implements IImporterProcessor {
57 protected int totalReadLine = 0;
58 protected int readFileLine = 0;
59 protected String statusMessage = "";
60
61 protected ImporterStatus status = ImporterStatus.WAITING;
62
63
64
65
66 int numberOfLinesToProcess = 0;
67
68
69
70
71
72
73
74
75
76 static Long nbGisInserted = 0L;
77
78 protected ImporterConfig importerConfig;
79
80
81
82
83 protected static final Logger logger = LoggerFactory
84 .getLogger(AbstractImporterProcessor.class);
85
86 private File[] filesToProcess;
87
88
89
90
91 protected String COMMENT_START = "#";
92
93 private boolean hasConsumedFirstLine = false;
94
95
96
97
98 private boolean endOfDocument = false;
99
100
101
102
103 protected BufferedReader in;
104
105
106
107
108 private PlatformTransactionManager transactionManager;
109
110
111
112
113
114
115
116 protected abstract boolean shouldIgnoreFirstLine();
117
118
119
120
121
122 protected abstract void flushAndClear();
123
124
125
126
127
128
129 protected abstract void setCommitFlushMode();
130
131 private TransactionStatus txStatus = null;
132
133 private DefaultTransactionDefinition txDefinition;
134
135
136
137
138 protected abstract int getNumberOfColumns();
139
140
141
142
143
144
145 protected abstract boolean shouldIgnoreComments();
146
147
148
149
150
151
152
153
154
155 private boolean isNotComment(String input) {
156 return (!shouldIgnoreComments())
157 || (shouldIgnoreComments() && !input.startsWith(COMMENT_START));
158 }
159
160
161
162
163 public AbstractImporterProcessor() {
164 super();
165 }
166
167
168
169
170 protected File currentFile;
171
172
173
174
175
176 protected void setup() {
177 }
178
179
180
181
182
183 protected abstract File[] getFiles();
184
185
186
187
188
189
190 public String getCurrentFileName() {
191
192 if (this.currentFile != null) {
193 return this.currentFile.getName();
194 }
195 return "unknow";
196 }
197
198
199
200
201
202
203
204
205
206
207 public int readLineAndProcessData() throws GeonamesProcessorException {
208 if (this.isEndOfDocument()) {
209 throw new IllegalArgumentException(
210 "Must NOT be called when it is the end of the document");
211 }
212
213 String input;
214 try {
215 input = (this.in).readLine();
216 } catch (IOException e1) {
217 throw new GeonamesProcessorException("can not read line ", e1);
218 }
219
220 if (input != null) {
221 readFileLine++;
222 if (isNotComment(input)) {
223 if (this.shouldIgnoreFirstLine() && !hasConsumedFirstLine) {
224 hasConsumedFirstLine = true;
225 } else {
226 try {
227 this.processData(input);
228 } catch (MissingRequiredFieldException mrfe) {
229 if (this.importerConfig.isMissingRequiredFieldThrows()) {
230 logger.error("A requrired field is missing "
231 + mrfe.getMessage());
232 throw new GeonamesProcessorException(
233 "A requrired field is missing "
234 + mrfe.getMessage(), mrfe);
235 } else {
236 logger.warn(mrfe.getMessage());
237 }
238 } catch (WrongNumberOfFieldsException wnofe) {
239 if (this.importerConfig.isWrongNumberOfFieldsThrows()) {
240 logger
241 .error("wrong number of fields during import "
242 + wnofe.getMessage());
243 throw new GeonamesProcessorException(
244 "Wrong number of fields during import "
245 + wnofe.getMessage(), wnofe);
246 } else {
247 logger.warn(wnofe.getMessage());
248 }
249 } catch (Exception e) {
250 e.printStackTrace();
251 logger.error("An Error occurred on Line "
252 + readFileLine + " for " + input + " : "
253 + e.getCause());
254 throw new GeonamesProcessorException(
255 "An Error occurred on Line " + readFileLine
256 + " for " + input + " : "
257 + e.getCause(), e);
258 }
259 }
260 }
261
262 } else {
263 this.endOfDocument = true;
264 }
265 return readFileLine;
266 }
267
268
269
270
271
272
273
274
275 protected abstract void processData(String line)
276 throws GeonamesProcessorException;
277
278
279
280
281 public void process() {
282 try {
283 if (shouldBeSkipped()){
284 this.status = ImporterStatus.SKIPPED;
285 return;
286 }
287 this.status = ImporterStatus.PROCESSING;
288 this.getNumberOfLinesToProcess();
289 setup();
290 this.filesToProcess = getFiles();
291 if (this.filesToProcess.length == 0) {
292 logger.info("there is 0 file to process for "
293 + this.getClass().getSimpleName());
294 return;
295 }
296 for (int i = 0; i < filesToProcess.length; i++) {
297 currentFile = filesToProcess[i];
298 this.endOfDocument = false;
299 getBufferReader(filesToProcess[i]);
300 processFile();
301 closeBufferReader();
302
303 }
304
305 this.status = ImporterStatus.PROCESSED;
306 } catch (Exception e) {
307 e.printStackTrace();
308 this.status = ImporterStatus.ERROR;
309 this.statusMessage = "An error occurred when processing "
310 + this.getClass().getSimpleName() + " on file "
311 + getCurrentFileName() + " on line " + getReadFileLine()
312 + " : " + e.getCause();
313 logger.error(statusMessage);
314 throw new GeonamesProcessorException(statusMessage, e.getCause());
315 } finally {
316 try {
317 tearDown();
318 } catch (Exception e) {
319 this.status = ImporterStatus.ERROR;
320 this.statusMessage = "An error occured on teardown (the import is done but is not being optimzed) :"+e;
321 logger.error(statusMessage);
322 }
323 }
324 }
325
326
327
328
329
330 protected boolean shouldBeSkipped() {
331 return false;
332 }
333
334 private void getBufferReader(File file) {
335 InputStream inInternal = null;
336
337 try {
338 inInternal = new BufferedInputStream(new FileInputStream(file));
339 } catch (FileNotFoundException e) {
340 throw new RuntimeException(e);
341 }
342
343 try {
344 this.in = new BufferedReader(new InputStreamReader(inInternal,
345 Constants.CHARSET));
346 } catch (UnsupportedEncodingException e) {
347 throw new RuntimeException(e);
348 }
349 }
350
351 private void processFile() throws GeonamesProcessorException {
352 try {
353 hasConsumedFirstLine = false;
354 readFileLine = 0;
355 logger.info("will process " + getCurrentFileName());
356
357 txDefinition = new DefaultTransactionDefinition();
358 txDefinition
359 .setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
360
361 txStatus = transactionManager.getTransaction(txDefinition);
362 setCommitFlushMode();
363 while (!isEndOfDocument()) {
364 this.readLineAndProcessData();
365 totalReadLine++;
366 if (totalReadLine % this.getMaxInsertsBeforeFlush() == 0) {
367 logger
368 .info("maxInsertsBeforeFlush reached, flushing and clearing: "
369 + totalReadLine);
370
371 commit(txStatus);
372
373 txStatus = transactionManager.getTransaction(txDefinition);
374 setCommitFlushMode();
375
376 }
377 }
378 commit(txStatus);
379 totalReadLine--;
380
381 } catch (Exception e) {
382 transactionManager.rollback(txStatus);
383 throw new GeonamesProcessorException(
384 "An error occurred when processing "
385 + getCurrentFileName() + " on line "
386 + readFileLine + " : " + e.getCause(), e.getCause());
387 }
388 }
389
390
391
392
393
394 protected void tearDown() {
395 closeBufferReader();
396 }
397
398 private void closeBufferReader() {
399 if (in != null) {
400 try {
401 in.close();
402 } catch (IOException e) {
403
404 }
405 }
406 }
407
408 private void commit(TransactionStatus txStatus) {
409 flushAndClear();
410 transactionManager.commit(txStatus);
411 }
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427 protected static boolean isEmptyField(String[] fields, int position,
428 boolean required) {
429 if (fields == null) {
430 throw new MissingRequiredFieldException(
431 "can not chek fields if the array is null");
432 }
433 if (position < 0) {
434 throw new MissingRequiredFieldException(
435 "position can not be < 0 => position = " + position);
436 }
437 if (fields.length == 0) {
438 throw new MissingRequiredFieldException("fields is empty");
439 }
440 if (position > (fields.length - 1)) {
441
442 if (!required) {
443 return true;
444 } else {
445 throw new MissingRequiredFieldException("fields has "
446 + (fields.length)
447 + " element(s), can not get element with position "
448 + (position) + " : " + dumpFields(fields));
449 }
450
451 }
452 String string = fields[position];
453 if (string != null && string.trim().equals("")) {
454 if (!required) {
455 return true;
456 } else {
457 throw new MissingRequiredFieldException("fields[" + position
458 + "] is required for featureID " + fields[0] + " : "
459 + dumpFields(fields));
460 }
461 }
462 return false;
463
464 }
465
466
467
468
469
470
471 protected static String dumpFields(String[] fields) {
472 String result = "[";
473 for (String element : fields) {
474 result = result + element + ";";
475 }
476 return result + "]";
477 }
478
479
480
481
482
483
484
485
486
487 protected void checkNumberOfColumn(String[] fields) {
488 if (fields.length != getNumberOfColumns()) {
489
490 throw new WrongNumberOfFieldsException(
491 "The number of fields is not correct. expected : "
492 + getNumberOfColumns() + ", founds : "
493 + fields.length, new Throwable(
494 "The number of fields is not correct. expected : "
495 + getNumberOfColumns() + ", founds : "
496 + fields.length));
497 }
498
499 }
500
501
502
503
504
505 protected boolean isEndOfDocument() {
506 return endOfDocument;
507 }
508
509
510
511
512
513
514 public int getReadFileLine() {
515 return this.readFileLine;
516 }
517
518
519
520
521
522
523 public int getTotalReadLine() {
524 return this.totalReadLine;
525 }
526
527 @Required
528 public void setTransactionManager(
529 PlatformTransactionManager transactionManager) {
530 this.transactionManager = transactionManager;
531 }
532
533 @Required
534 public void setImporterConfig(ImporterConfig importerConfig) {
535 this.importerConfig = importerConfig;
536 }
537
538
539
540
541 protected int countLines() {
542 int lines = 0;
543 BufferedReader br = null;
544 File[] files = getFiles();
545 BufferedInputStream bis = null;
546 for (int i = 0; i < files.length; i++) {
547 File countfile = files[i];
548 try {
549 bis = new BufferedInputStream(new FileInputStream(countfile));
550 br = new BufferedReader(new InputStreamReader(bis,
551 Constants.CHARSET));
552 while (br.readLine() != null) {
553 lines++;
554 }
555 } catch (Exception e) {
556 String filename = countfile == null ? null : countfile
557 .getName();
558 logger.warn("can not count lines for " + filename + " : "
559 + e.getMessage(), e);
560 return lines;
561 } finally {
562 if (bis != null) {
563 try {
564 bis.close();
565 } catch (IOException e) {
566
567 }
568 }
569 if (br != null) {
570 try {
571 br.close();
572 } catch (IOException e) {
573
574 }
575 }
576 }
577 }
578
579 logger.info("There is " + lines + " to process for "
580 + this.getClass().getSimpleName());
581 return lines;
582 }
583
584
585
586
587
588
589 public int getNumberOfLinesToProcess() {
590 if (this.numberOfLinesToProcess == 0 && this.status == ImporterStatus.PROCESSING) {
591
592 this.numberOfLinesToProcess = countLines();
593 }
594 return this.numberOfLinesToProcess;
595 }
596
597
598
599
600
601
602 public ImporterStatus getStatus() {
603 return this.status;
604 }
605
606
607
608
609
610 protected int getMaxInsertsBeforeFlush() {
611 return importerConfig.getMaxInsertsBeforeFlush();
612 }
613
614 protected void resetStatusFields() {
615 this.currentFile = null;
616 this.readFileLine = 0;
617 this.totalReadLine = 0;
618 this.numberOfLinesToProcess = 0;
619 this.status = ImporterStatus.WAITING;
620 this.statusMessage = "";
621 }
622
623
624
625
626
627
628 public String getStatusMessage() {
629 return statusMessage;
630 }
631
632 }