core.py (17756B)
1 import csv 2 import math 3 from pathlib import Path 4 import re 5 6 7 TEMIMAGE_REQUIRED_COLUMNS = {'X', 'Y', 'Z', 'DOI', 'DataResidual', 'NumLayers'} 8 AARHUS_REQUIRED_COLUMNS = { 9 'LINE_NO', 10 'X', 11 'Y', 12 'ELEVATION', 13 'RECORD', 14 'RESDATA', 15 'RHO_1', 16 'THK_1', 17 'DOI_STANDARD', 18 } 19 SCI_REQUIRED_COLUMNS = { 20 'Line_No', 21 'Layer_No', 22 'X', 23 'Y', 24 'Elevation_Cell', 25 'Resistivity', 26 'Depth_top', 27 'Depth_bottom', 28 'Thickness', 29 } 30 ATEM_SCI_REQUIRED_COLUMNS = { 31 'FID', 32 'LINE', 33 'E', 34 'N', 35 'DEM', 36 'RESI1', 37 'DOI', 38 'Res001', 39 'Elev001', 40 } 41 42 EPSG_PATTERN = re.compile(r"\bepsg\s*:\s*(\d+)\b", re.IGNORECASE) 43 HEADER_ALIASES = { 44 'UTMX': 'X', 45 'UTMY': 'Y', 46 } 47 48 ABOVE_DOI_OPACITY = 100 49 BELOW_DOI_OPACITY = 10 50 51 52 def validate_opacity(value): 53 try: 54 opacity = int(value) 55 except (TypeError, ValueError): 56 raise ValueError('opacity must be between 0 and 100') from None 57 if opacity < 0 or opacity > 100: 58 raise ValueError('opacity must be between 0 and 100') 59 return opacity 60 61 62 def layer_opacity( 63 depth_mid, 64 doi, 65 mask_below_doi=True, 66 below_doi_opacity=BELOW_DOI_OPACITY, 67 ): 68 below_doi_opacity = validate_opacity(below_doi_opacity) 69 if doi is None or not mask_below_doi: 70 return ABOVE_DOI_OPACITY 71 return ABOVE_DOI_OPACITY if depth_mid <= doi else below_doi_opacity 72 73 74 # Resistivity ranges mirror the graduated symbology in styles/layers.qml. 75 # Each entry is (upper_exclusive, '#RRGGBB'); the final entry is the catch-all. 76 RESISTIVITY_COLOR_RAMP = ( 77 (0.1, '#000091'), 78 (1.0, '#000091'), 79 (2.0, '#0000b4'), 80 (4.0, '#0032dc'), 81 (7.0, '#005af5'), 82 (10.0, '#008cff'), 83 (15.0, '#00beff'), 84 (20.0, '#00dcff'), 85 (25.0, '#00ffff'), 86 (30.0, '#00ff96'), 87 (35.0, '#00ff00'), 88 (40.0, '#96ff00'), 89 (50.0, '#d2ff00'), 90 (60.0, '#ffff00'), 91 (75.0, '#ffb500'), 92 (90.0, '#ff7300'), 93 (120.0, '#ff0000'), 94 (150.0, '#ff1c8d'), 95 (200.0, '#ff6aff'), 96 (250.0, '#f200f2'), 97 (300.0, '#ca00ca'), 98 (400.0, '#a600a6'), 99 (600.0, '#800080'), 100 (1600.0, '#750075'), 101 (float('inf'), '#540054'), 102 ) 103 UNKNOWN_RESISTIVITY_COLOR = '#ffffff' 104 105 106 def resistivity_color(value): 107 if value is None: 108 return UNKNOWN_RESISTIVITY_COLOR 109 try: 110 v = float(value) 111 except (ValueError, TypeError): 112 return UNKNOWN_RESISTIVITY_COLOR 113 if math.isnan(v): 114 return UNKNOWN_RESISTIVITY_COLOR 115 for upper, color in RESISTIVITY_COLOR_RAMP: 116 if v < upper: 117 return color 118 return RESISTIVITY_COLOR_RAMP[-1][1] 119 120 121 def normalize_header_tokens(line): 122 normalized = [] 123 for token in line.split(): 124 token = token.lstrip('/') 125 token = HEADER_ALIASES.get(token, token) 126 if token: 127 normalized.append(token) 128 return normalized 129 130 131 def detect_format(headers): 132 header_set = set(headers) 133 if TEMIMAGE_REQUIRED_COLUMNS.issubset(header_set): 134 return 'temimage' 135 if AARHUS_REQUIRED_COLUMNS.issubset(header_set): 136 return 'aarhus' 137 if SCI_REQUIRED_COLUMNS.issubset(header_set): 138 return 'sci' 139 if ATEM_SCI_REQUIRED_COLUMNS.issubset(header_set): 140 return 'atem_sci' 141 return None 142 143 144 def is_header_line(line): 145 return detect_format(normalize_header_tokens(line)) is not None 146 147 148 def detect_source_epsg(path, comment_char='/'): 149 with open(path, 'r') as f: 150 for line in f: 151 stripped = line.lstrip() 152 if is_header_line(stripped): 153 break 154 if not stripped.startswith(comment_char): 155 break 156 match = EPSG_PATTERN.search(stripped) 157 if match is not None: 158 return f'EPSG:{match.group(1)}' 159 return None 160 161 162 def get_numbered_columns(headers, prefix): 163 numbered = [] 164 for name in headers: 165 if not name.startswith(prefix): 166 continue 167 suffix = name[len(prefix):] 168 if suffix.isdigit(): 169 numbered.append((int(suffix), name)) 170 return [name for _, name in sorted(numbered)] 171 172 173 def parse_num_layers(value): 174 try: 175 parsed = float(value) 176 except (ValueError, TypeError): 177 return None 178 if math.isnan(parsed): 179 return None 180 return int(parsed) 181 182 183 def count_valid_layers(row, res_cols, thick_cols): 184 count = 0 185 for res_col, thick_col in zip(res_cols, thick_cols): 186 try: 187 res = float(row.get(res_col, '')) 188 thick = float(row.get(thick_col, '')) 189 except (ValueError, TypeError): 190 break 191 if math.isnan(res) or math.isnan(thick): 192 break 193 count += 1 194 return count 195 196 197 def _clip_depth_to_doi(depth_top, depth_bottom, doi, clip_below_doi): 198 if not clip_below_doi: 199 return depth_bottom, False 200 if doi is None: 201 return depth_bottom, False 202 try: 203 if math.isnan(doi): 204 return depth_bottom, False 205 except TypeError: 206 return depth_bottom, False 207 if depth_top >= doi: 208 return depth_bottom, True 209 if depth_bottom > doi: 210 return doi, False 211 return depth_bottom, False 212 213 214 def process_xyz( 215 path, 216 mask_below_doi=True, 217 below_doi_opacity=BELOW_DOI_OPACITY, 218 clip_below_doi=True, 219 ): 220 below_doi_opacity = validate_opacity(below_doi_opacity) 221 222 points = [] 223 doi_points = [] 224 layers = [] 225 headers = None 226 227 with open(path, 'r') as f: 228 for line_number, raw_line in enumerate(f, start=1): 229 stripped = raw_line.strip() 230 left_stripped = raw_line.lstrip() 231 232 if headers is None: 233 if is_header_line(left_stripped): 234 headers = normalize_header_tokens(left_stripped) 235 source_format = detect_format(headers) 236 if source_format == 'sci': 237 return _process_sci_rows(f, headers, line_number) 238 if source_format == 'atem_sci': 239 res_cols = get_numbered_columns(headers, 'Res') 240 elev_cols = get_numbered_columns(headers, 'Elev') 241 thick_cols = [] 242 elif source_format == 'temimage': 243 res_cols = get_numbered_columns(headers, 'Res_') 244 thick_cols = get_numbered_columns(headers, 'Thick_') 245 else: 246 res_cols = get_numbered_columns(headers, 'RHO_') 247 thick_cols = get_numbered_columns(headers, 'THK_') 248 dep_top_cols = get_numbered_columns(headers, 'DEP_TOP_') 249 dep_bot_cols = get_numbered_columns(headers, 'DEP_BOT_') 250 continue 251 if not left_stripped.startswith('/'): 252 raise ValueError('XYZ file does not contain a supported header row') 253 continue 254 255 if not stripped: 256 continue 257 258 values = stripped.split() 259 # When the data has one more column than the header, pandas would 260 # treat the first column as the row index. Replicate that here. 261 if len(values) == len(headers) + 1: 262 values = values[1:] 263 if len(values) != len(headers): 264 raise ValueError( 265 f'Row {line_number} has {len(values)} columns, ' 266 f'expected {len(headers)}' 267 ) 268 269 row = dict(zip(headers, values)) 270 if source_format == 'atem_sci': 271 _append_atem_sci_row( 272 row, 273 res_cols, 274 elev_cols, 275 points, 276 doi_points, 277 layers, 278 mask_below_doi, 279 below_doi_opacity, 280 clip_below_doi, 281 ) 282 continue 283 284 x = float(row['X']) 285 y = float(row['Y']) 286 287 if source_format == 'temimage': 288 z = float(row['Z']) 289 doi = float(row['DOI']) 290 data_residual = float(row['DataResidual']) 291 n_layers = parse_num_layers(row['NumLayers']) 292 line = row['Line'] 293 station_no = row['StationNo'] 294 else: 295 z = float(row['ELEVATION']) 296 doi = float(row['DOI_STANDARD']) 297 data_residual = float(row['RESDATA']) 298 line = str(int(float(row['LINE_NO']))) 299 record = int(float(row['RECORD'])) 300 station_no = f'{line}_{record:05d}' 301 n_layers = count_valid_layers(row, res_cols, thick_cols) 302 303 z_doi = z - doi 304 point_wkt = f'POINT Z ({x} {y} {z})' 305 doi_wkt = f'POINT Z ({x} {y} {z_doi})' 306 307 points.append({ 308 'X': x, 309 'Y': y, 310 'Z': z, 311 'Line': line, 312 'StationNo': station_no, 313 'DataResidual': data_residual, 314 'NumLayers': n_layers, 315 'Geometry': point_wkt, 316 }) 317 doi_points.append({ 318 'X': x, 319 'Y': y, 320 'Z': z_doi, 321 'DOI': doi, 322 'ZDOI': z_doi, 323 'Geometry': doi_wkt, 324 }) 325 326 max_layers = min(len(res_cols), len(thick_cols)) 327 if n_layers is not None: 328 max_layers = min(max_layers, n_layers) 329 330 cum_depth = 0.0 331 for i in range(max_layers): 332 res_col = res_cols[i] 333 thick_col = thick_cols[i] 334 res_val = row.get(res_col, '') 335 thick_val = row.get(thick_col, '') 336 try: 337 res = float(res_val) 338 thick = float(thick_val) 339 except (ValueError, TypeError): 340 break 341 if math.isnan(res) or math.isnan(thick): 342 break 343 344 dep_top_col = dep_top_cols[i] if i < len(dep_top_cols) else None 345 dep_bot_col = dep_bot_cols[i] if i < len(dep_bot_cols) else None 346 if dep_top_col and dep_bot_col: 347 try: 348 depth_top = float(row[dep_top_col]) 349 depth_bottom = float(row[dep_bot_col]) 350 if math.isnan(depth_top) or math.isnan(depth_bottom): 351 raise ValueError 352 except (ValueError, TypeError): 353 depth_top = cum_depth 354 depth_bottom = cum_depth + thick 355 else: 356 depth_top = cum_depth 357 depth_bottom = cum_depth + thick 358 359 cum_depth = depth_bottom 360 clipped_bottom, skip = _clip_depth_to_doi( 361 depth_top, depth_bottom, doi, clip_below_doi, 362 ) 363 if skip: 364 continue 365 depth_bottom = clipped_bottom 366 367 depth_mid = (depth_top + depth_bottom) / 2 368 z_top = z - depth_top 369 z_bot = z - depth_bottom 370 z_mid = (z_top + z_bot) / 2 371 372 layer_wkt = f'LINESTRING Z ({x} {y} {z_top}, {x} {y} {z_bot})' 373 layers.append({ 374 'X': x, 375 'Y': y, 376 'Z': z, 377 'ZTop': z_top, 378 'ZMid': z_mid, 379 'ZBottom': z_bot, 380 'DepthTop': depth_top, 381 'DepthBottom': depth_bottom, 382 'Resistivity': res, 383 'Color': resistivity_color(res), 384 'Opacity': layer_opacity( 385 depth_mid, 386 doi, 387 mask_below_doi, 388 below_doi_opacity, 389 ), 390 'Layer': i + 1, 391 'Geometry': layer_wkt, 392 }) 393 394 if headers is None: 395 raise ValueError('XYZ file does not contain a supported header row') 396 397 return points, doi_points, layers 398 399 400 def _append_atem_sci_row( 401 row, 402 res_cols, 403 elev_cols, 404 points, 405 doi_points, 406 layers, 407 mask_below_doi=True, 408 below_doi_opacity=BELOW_DOI_OPACITY, 409 clip_below_doi=True, 410 ): 411 x = float(row['E']) 412 y = float(row['N']) 413 z = float(row['DEM']) 414 doi = float(row['DOI']) 415 line = str(int(float(row['LINE']))) 416 fid = int(float(row['FID'])) 417 station_no = f'{line}_{fid:05d}' 418 419 z_tops = [] 420 for elev_col in elev_cols: 421 z_top = float(row[elev_col]) 422 if math.isnan(z_top): 423 break 424 z_tops.append(z_top) 425 426 layer_rows = [] 427 max_layers = min(len(res_cols), len(z_tops)) 428 for i in range(max_layers): 429 res = float(row[res_cols[i]]) 430 if math.isnan(res): 431 break 432 if i + 1 < len(z_tops): 433 z_bot = z_tops[i + 1] 434 elif i > 0: 435 z_bot = z_tops[i] - (z_tops[i - 1] - z_tops[i]) 436 else: 437 break 438 z_top = z_tops[i] 439 depth_top = z - z_top 440 depth_bottom = z - z_bot 441 clipped_bottom, skip = _clip_depth_to_doi( 442 depth_top, depth_bottom, doi, clip_below_doi, 443 ) 444 if skip: 445 continue 446 depth_bottom = clipped_bottom 447 z_bot = z - depth_bottom 448 z_mid = (z_top + z_bot) / 2 449 depth_mid = (depth_top + depth_bottom) / 2 450 layer_rows.append({ 451 'X': x, 452 'Y': y, 453 'Z': z, 454 'ZTop': z_top, 455 'ZMid': z_mid, 456 'ZBottom': z_bot, 457 'DepthTop': depth_top, 458 'DepthBottom': depth_bottom, 459 'Resistivity': res, 460 'Color': resistivity_color(res), 461 'Opacity': layer_opacity( 462 depth_mid, 463 doi, 464 mask_below_doi, 465 below_doi_opacity, 466 ), 467 'Layer': i + 1, 468 'Geometry': f'LINESTRING Z ({x} {y} {z_top}, {x} {y} {z_bot})', 469 }) 470 471 points.append({ 472 'X': x, 473 'Y': y, 474 'Z': z, 475 'Line': line, 476 'StationNo': station_no, 477 'DataResidual': float(row['RESI1']), 478 'NumLayers': len(layer_rows), 479 'Geometry': f'POINT Z ({x} {y} {z})', 480 }) 481 doi_points.append({ 482 'X': x, 483 'Y': y, 484 'Z': z - doi, 485 'DOI': doi, 486 'ZDOI': z - doi, 487 'Geometry': f'POINT Z ({x} {y} {z - doi})', 488 }) 489 layers.extend(layer_rows) 490 491 492 def _process_sci_rows(f, headers, header_line_number): 493 soundings = {} 494 for offset, raw_line in enumerate(f, start=1): 495 line_number = header_line_number + offset 496 stripped = raw_line.strip() 497 if not stripped: 498 continue 499 500 values = stripped.split() 501 if len(values) == len(headers) + 1: 502 values = values[1:] 503 if len(values) != len(headers): 504 raise ValueError( 505 f'Row {line_number} has {len(values)} columns, ' 506 f'expected {len(headers)}' 507 ) 508 509 row = dict(zip(headers, values)) 510 key = (int(float(row['Line_No'])), row['X'], row['Y']) 511 soundings.setdefault(key, []).append(row) 512 513 points = [] 514 layers = [] 515 line_ordinals = {} 516 517 for (line_no, x_str, y_str), rows in soundings.items(): 518 rows.sort(key=lambda r: int(float(r['Layer_No']))) 519 ordinal = line_ordinals.get(line_no, 0) + 1 520 line_ordinals[line_no] = ordinal 521 522 line = str(line_no) 523 station_no = f'{line}_{ordinal:05d}' 524 x = float(x_str) 525 y = float(y_str) 526 527 layer1 = rows[0] 528 surface_z = float(layer1['Elevation_Cell']) + float(layer1['Thickness']) / 2.0 529 530 n_layers = 0 531 for r in rows: 532 try: 533 res = float(r['Resistivity']) 534 thick = float(r['Thickness']) 535 depth_top = float(r['Depth_top']) 536 depth_bottom = float(r['Depth_bottom']) 537 except (ValueError, TypeError): 538 break 539 if math.isnan(res) or math.isnan(thick): 540 break 541 if math.isnan(depth_top) or math.isnan(depth_bottom): 542 break 543 544 z_top = surface_z - depth_top 545 z_bot = surface_z - depth_bottom 546 z_mid = (z_top + z_bot) / 2 547 layer_no = int(float(r['Layer_No'])) 548 549 layer_wkt = f'LINESTRING Z ({x} {y} {z_top}, {x} {y} {z_bot})' 550 layers.append({ 551 'X': x, 552 'Y': y, 553 'Z': surface_z, 554 'ZTop': z_top, 555 'ZMid': z_mid, 556 'ZBottom': z_bot, 557 'DepthTop': depth_top, 558 'DepthBottom': depth_bottom, 559 'Resistivity': res, 560 'Color': resistivity_color(res), 561 'Opacity': layer_opacity(depth_bottom, None), 562 'Layer': layer_no, 563 'Geometry': layer_wkt, 564 }) 565 n_layers += 1 566 567 point_wkt = f'POINT Z ({x} {y} {surface_z})' 568 points.append({ 569 'X': x, 570 'Y': y, 571 'Z': surface_z, 572 'Line': line, 573 'StationNo': station_no, 574 'NumLayers': n_layers, 575 'Geometry': point_wkt, 576 }) 577 578 return points, [], layers 579 580 581 def write_csv(rows, base_path, suffix): 582 out_path = Path(base_path).with_suffix(suffix) 583 if not rows: 584 out_path.write_text('') 585 return out_path 586 with open(out_path, 'w', newline='') as f: 587 writer = csv.DictWriter(f, fieldnames=list(rows[0].keys())) 588 writer.writeheader() 589 writer.writerows(rows) 590 return out_path