Quick Overview - uofuseismo/anxcor GitHub Wiki
Crosscorrelation on USArray Data
Assuming you've already installed it according to the installation instructions, this section will run you through doing some crosscorrelations on USArray Data.
Download the Data from IRIS
First, download the data. We'll be using a roughly east-west sampling of sites starting near Berkeley, CA and use only the Z component from broadbands 'BHZ'.
from obspy.clients.fdsn.mass_downloader import RectangularDomain, \
Restrictions, MassDownloader
# Rectangular domain containing parts of the Western US.
domain = RectangularDomain(minlatitude=30, maxlatitude=50,
minlongitude=-123, maxlongitude=-110)
## download the data and format the files in a particular way
for network in ['BK','TA']:
for station in ["CMB","BRK","BRIB","BKS","BDM","FARB" ,"R07C","R08A","Q08A","Q09A","Q10A","Q11A","Q12A"]:
print('{}.{}'.format(network,station))
restrictions = Restrictions(
# Get data for a whole month.
starttime=UTCDateTime("2008-03-01 00:00:00"),
endtime=UTCDateTime("2008-03-02 00:00:00"),
chunklength_in_sec=86400,
network=network,
station=station,
channel="BHZ",
reject_channels_with_gaps=False,
minimum_length=0.9,
minimum_interstation_distance_in_m=100.0)
mdl = MassDownloader(providers=['NCEDC','IRIS'])
mdl.download(domain, restrictions, mseed_storage=("data/{}/{}".format(network,station)),
stationxml_storage="data/metadata/{}/{}".format(network,station))
Connect Anxcor to the Data with a Class
Anxcor needs to be given an instantiated class with get_waveforms()
and get_station()
methods. Here's an example of such a class, with some complementary preprocessing methods thrown in. However, as long as you implement these methods yourself, you can define your data class in any way that makes sense to you.
from obspy.core import UTCDateTime, Stream
import pandas as pd
import numpy as np
from anxcor.containers import AnxcorDatabase
from obspy import read_inventory, read
import anxcor.anxcor_utils as anxcor_utils
import os
import glob
class ArrayReader(AnxcorDatabase):
"""
Example class for reading from file after mass-downloading data
"""
def __init__(self,data_dir):
self.data_dir = data_dir
self.df = pd.DataFrame(columns=['latitude','longitude','network','station','station_id'])
def get_waveforms(self, starttime=0, endtime=0, station=0, network=0):
"""
get waveforms yields a stream of traces given a starttitme and endtime in utcdatetime floats
"""
stream = read('{}/{}/{}/*.mseed'.format(self.data_dir,network,station))
stream.merge(method=1, interpolation_samples=-1)
delta = endtime-starttime
pm = delta*0.10
stream = stream.trim(starttime=UTCDateTime(starttime-delta),
endtime=UTCDateTime(endtime+delta),
fill_value=np.nan)
stream.detrend(type='constant')
stream.detrend(type='linear')
stream.taper(0.05)
pre_filt = (0.001, 0.01, 5.0, 10.0)
stream = anxcor_utils.remove_response(stream, output='DISP',inventory=self.inv,
pre_filt=pre_filt,zero_mean=True, taper=True)
stream.taper(0.05)
stream.trim(starttime=UTCDateTime(starttime),
endtime=UTCDateTime(endtime),
fill_value=np.nan)
traces = []
for trace in stream:
if max(abs(trace.data)) > 0.0:
self.assign_coordinates(trace)
self.cast_to_simple_channel(trace)
traces.append(trace)
new_stream = Stream(traces=traces)
return new_stream
def assign_coordinates(self, trace):
"""
assigns latitude and longitude coordinates to trace.stats.coordinates from the attached pandas
dataframe for every trace read in. modify to suit your needs, but Anxcor will only use
elevation,latitude, and longitude.
"""
station = trace.stats.station
latitude = self.df.loc[self.df['station'] == station]['latitude'].values[0]
longitude = self.df.loc[self.df['station'] == station]['longitude'].values[0]
trace.stats.coordinates = {'latitude': latitude, 'longitude': longitude}
def cast_to_simple_channel(self, trace):
"""
assigns the channel to a simple string. This helps reduce the sparseness of the resultant
crosscorrelation tensor
"""
channel = trace.stats.channel
target_channel = 'na'
if 'z' in channel[-1].lower():
target_channel='z'
elif 'n' in channel[-1].lower():
target_channel='n'
elif 'e' in channel[-1].lower():
target_channel='e'
trace.stats.channel=target_channel
def _build_df(self):
"""
builds a pandas dataframe of metadata from the database you build with the mass downloader
"""
xml_files = glob.glob('{}/metadata/*/*/*.xml'.format(self.data_dir))
prev_inv = None
for xml in xml_files:
inv = read_inventory(xml)
if prev_inv is None:
prev_inv = inv
else:
prev_inv+=inv
self.inv=prev_inv
for network in prev_inv:
for station in network:
latitude = station.latitude
longitude= station.longitude
if '{}.{}'.format(network.code,station.code) not in self.df['station_id'].unique():
self.df=self.df.append({'network': str(network.code),
'station': str(station.code),
'latitude': latitude,
'longitude': longitude,
'station_id': '{}.{}'.format(network.code,station.code)},
ignore_index=True)
def get_stations(self):
"""
returns a list of stations formatted as '{}.{}'.format(network,station)
"""
self._build_df()
print(self.df)
return list(self.df['station_id'].values)
Define auxiliary processing routines as needed.
As every crosscorrelation window will have different maximum amplitudes, its probably a good idea to normalize across every time window. We'll define an extension class XArrayCustomComponentNormalizer
to do the task.
from anxcor.xarray_routines import XArrayProcessor
import xarray as xr
class XArrayCustomComponentNormalizer(XArrayProcessor):
"""
normalizes preprocessed data based on a single component
"""
def __init__(self,**kwargs):
super().__init__(**kwargs)
def execute(self, xarray, *args, **kwargs):
norm_chan_max = np.amax(np.abs(xarray))
xarray /= norm_chan_max
return xarray
def _add_operation_string(self):
return 'channel normer'
def get_name(self):
return 'channel normer'
def _persist_metadata(self, first_data, *args, **kwargs):
return first_data.attrs
Set up Crosscorrelation Study Parameters
Important parameters for a crosscorrelation study are:
- window length
- window overlap
- processing routine order and associated arguments
- amount of correlegram to keep
- the actual time windows in UTC DateTime to pluck your windows from
from anxcor.core import Anxcor
from anxcor.xarray_routines import XArrayRemoveMeanTrend, XArrayResample, XArrayTaper
tau = 6*60
taper_ratio = 0.05
correlate_kwargs= dict(max_tau_shift=tau,taper=taper_ratio)
starttime = UTCDateTime("2008-03-01 00:00:00").timestamp #start and end times are in float seconds from
endtime = UTCDateTime("2008-03-02 00:00:00").timestamp # UTCDateTime value 0.0
window_length = 10*60 # 10 minute windows
overlap = 0.50 # 50% overlap
# make your ArrayReader class with the directory holding the data
reader = ArrayReader('data')
# make an anxcor object
anxcor_main = Anxcor()
# add the database to it
anxcor_main.add_dataset(reader, 'BK&TA')
# add your processing routines in the order you want
anxcor_main.add_process(XArrayRemoveMeanTrend())
anxcor_main.add_process(XArrayTaper(taper=taper_ratio))
# specify particulars of the crosscorrelation routine
anxcor_main.set_task_kwargs('crosscorrelate',correlate_kwargs)
# add any specialty processing routines where you want them to
anxcor_main.set_task('post-combine',XArrayCustomComponentNormalizer())
# set the window length for the study
anxcor_main.set_window_length(window_length)
# generate a list of start times
starttime_list = anxcor_main.get_starttimes(starttime,endtime,overlap)
Spin up a Dask Cluster (or connect to one if you have it)
while we can fire and forget with no multiprocessing, dask gives us a handy interface to do so.
from distributed import Client, LocalCluster
cluster = LocalCluster(n_workers=2, threads_per_worker=20)
c = Client(cluster)
going to localhost:8787
in our browser will allow us to monitor the ongoing computation
Execute our plan!
To execute the study, we need to specify the amount of time windows to parallel compute, so adjust this to your needs. Start small and monitor memory usage.
result=anxcor_main.process(starttime_list,stack=100, dask_client =c)
anxcor_main.save_result(result,'result')
our data is now stored on disk as a netcdf4 file result.nc
. Load it up using xarray or use the result
variable to view in memory.
visualize the data
Either by loading the netcdf4 file from disk or using the result in live memory, lets pick a source station and visualize the moveout.
# some imports and things to make plots look nice
from anxcor.xarray_routines import XArrayBandpass, XArrayTaper
import xarray as xr
from obspy.geodetics.base import gps2dist_azimuth
from scipy.signal import find_peaks
from obspy.core import UTCDateTime
import matplotlib.pyplot as plt
import matplotlib
from scipy.signal import hilbert
import numpy as np
font = {'family' : 'normal',
'size' : 22}
matplotlib.rc('font', **font)
# load from file if you need to
from anxcor.core import Anxcor
anxcor_main = Anxcor()
original = anxcor_main.load_result('result')
# else just assign result to original
# original = result
# extract the metadata from our study
df = result.attrs['df']
def get_dist(src,rec,df):
"""
helper function to calculate the arc distance based on a given source and receiver
"""
df_subset = df[(df['src']==src)&(df['rec']==rec)]
if df.empty:
df_subset = df[(df['rec']==src)&(df['src']==rec)]
src_latitude = df_subset['src_latitude'].values[0]
src_longitude = df_subset['src_longitude'].values[0]
rec_latitude = df_subset['rec_latitude'].values[0]
rec_longitude = df_subset['rec_longitude'].values[0]
return gps2dist_azimuth(src_latitude,src_longitude,rec_latitude,rec_longitude)[0]
# create a list of all stations
srcs = list(result.coords['src'].values)
recs = list(result.coords['rec'].values)
all_stations = set(srcs+recs)
# create a bandpass filter to examine the singals of interest
center_frequency = 1/8
data_var = XArrayBandpass(freqmin=center_frequency-0.2*center_frequency,
freqmax=center_frequency+0.2*center_frequency,
order=4,zerophase=True)(data_var)
# Source station to compare against
main_src = 'BK.BRK'
#helper functions for plotting
datetime_array = result.coords['time'].values
time_min = UTCDateTime(str(datetime_array[0])).timestamp
time_max = UTCDateTime(str(datetime_array[-1])).timestamp
time = np.linspace(time_min,time_max,num=len(datetime_array))
multiplier = 60
# extract the data we want from the original data
data_var = result['src:BK&TA rec:BK&TA']
# loop through every source-reciever pair, plot it on a record section, plot its hilbert transform, and the main peaks of the hilbert transform. Then print out derived group velocities
plt.figure(figsize=(25,4))
station_latitudes=[]
station_longitudes=[]
for station in all_stations:
data = data_var.loc[dict(src_chan='z',rec_chan='z',src=main_src,rec=station)].data.ravel()[::-1]
if np.sum(data)==0:
data = data_var.loc[dict(src_chan='z',rec_chan='z',src=station,rec=main_src)].data.ravel()
distance_to_src = get_dist(station,main_src,df)
longitude = df[df['src']==station]['src_longitude'].values[0]
latitude = df[df['src']==station]['src_latitude'].values[0]
else:
longitude = df[df['rec']==station]['rec_longitude'].values[0]
latitude = df[df['rec']==station]['rec_latitude'].values[0]
distance_to_src = get_dist(main_src,station,df)
if station==main_src:
src_lat = latitude
src_lon = longitude
else:
station_latitudes.append(latitude)
station_longitudes.append(longitude)
data/=np.amax(np.abs(data))
distance = distance_to_src/1000
envelope = np.abs(hilbert(data))
peaks,others = find_peaks(envelope, height=0.55)
peaks = np.sort(peaks)
velocities = distance / time[peaks]
print('group velocities for {}: {}'.format(station,np.sort(velocities)))
plt.plot(time,multiplier*data + distance,color='black')
plt.plot(time,multiplier*envelope + distance,color='blue')
plt.plot(time,-multiplier*envelope + distance,color='blue')
plt.scatter(time[peaks],-multiplier*envelope[peaks]+distance,color='red',marker='o',s=100)
plt.xlim([-360,360])
plt.yticks([0,160,325,480,650])
plt.gca().invert_yaxis()
plt.xlabel('Time (s)')
plt.ylabel('Distance (km)')
plt.gca().yaxis.label.set_color('white')
plt.gca().xaxis.label.set_color('white')
plt.gca().tick_params(axis='x', labelcolor='white')
plt.gca().tick_params(axis='y', labelcolor='white')
plt.show()