-
Notifications
You must be signed in to change notification settings - Fork 934
Expand file tree
/
Copy pathinitial_program.py
More file actions
189 lines (145 loc) · 5.86 KB
/
initial_program.py
File metadata and controls
189 lines (145 loc) · 5.86 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
# EVOLVE-BLOCK-START
"""
Real-Time Adaptive Signal Processing Algorithm for Non-Stationary Time Series
This algorithm implements a sliding window approach to filter volatile, non-stationary
time series data while minimizing noise and preserving signal dynamics.
"""
import numpy as np
from scipy import signal
from collections import deque
def adaptive_filter(x, window_size=20):
"""
Adaptive signal processing algorithm using sliding window approach.
Args:
x: Input signal (1D array of real-valued samples)
window_size: Size of the sliding window (W samples)
Returns:
y: Filtered output signal with length = len(x) - window_size + 1
"""
if len(x) < window_size:
raise ValueError(f"Input signal length ({len(x)}) must be >= window_size ({window_size})")
# Initialize output array
output_length = len(x) - window_size + 1
y = np.zeros(output_length)
# Simple moving average as baseline
for i in range(output_length):
window = x[i : i + window_size]
# Basic moving average filter
y[i] = np.mean(window)
return y
def enhanced_filter_with_trend_preservation(x, window_size=20):
"""
Enhanced version with trend preservation using weighted moving average.
Args:
x: Input signal (1D array of real-valued samples)
window_size: Size of the sliding window
Returns:
y: Filtered output signal
"""
if len(x) < window_size:
raise ValueError(f"Input signal length ({len(x)}) must be >= window_size ({window_size})")
output_length = len(x) - window_size + 1
y = np.zeros(output_length)
# Create weights that emphasize recent samples
weights = np.exp(np.linspace(-2, 0, window_size))
weights = weights / np.sum(weights)
for i in range(output_length):
window = x[i : i + window_size]
# Weighted moving average with exponential weights
y[i] = np.sum(window * weights)
return y
def process_signal(input_signal, window_size=20, algorithm_type="enhanced"):
"""
Main signal processing function that applies the selected algorithm.
Args:
input_signal: Input time series data
window_size: Window size for processing
algorithm_type: Type of algorithm to use ("basic" or "enhanced")
Returns:
Filtered signal
"""
if algorithm_type == "enhanced":
return enhanced_filter_with_trend_preservation(input_signal, window_size)
else:
return adaptive_filter(input_signal, window_size)
# EVOLVE-BLOCK-END
def generate_test_signal(length=1000, noise_level=0.3, seed=42):
"""
Generate synthetic test signal with known characteristics.
Args:
length: Length of the signal
noise_level: Standard deviation of noise to add
seed: Random seed for reproducibility
Returns:
Tuple of (noisy_signal, clean_signal)
"""
np.random.seed(seed)
t = np.linspace(0, 10, length)
# Create a complex signal with multiple components
clean_signal = (
2 * np.sin(2 * np.pi * 0.5 * t) # Low frequency component
+ 1.5 * np.sin(2 * np.pi * 2 * t) # Medium frequency component
+ 0.5 * np.sin(2 * np.pi * 5 * t) # Higher frequency component
+ 0.8 * np.exp(-t / 5) * np.sin(2 * np.pi * 1.5 * t) # Decaying oscillation
)
# Add non-stationary behavior
trend = 0.1 * t * np.sin(0.2 * t) # Slowly varying trend
clean_signal += trend
# Add random walk component for non-stationarity
random_walk = np.cumsum(np.random.randn(length) * 0.05)
clean_signal += random_walk
# Add noise
noise = np.random.normal(0, noise_level, length)
noisy_signal = clean_signal + noise
return noisy_signal, clean_signal
def run_signal_processing(signal_length=1000, noise_level=0.3, window_size=20):
"""
Run the signal processing algorithm on a test signal.
Returns:
Dictionary containing results and metrics
"""
# Generate test signal
noisy_signal, clean_signal = generate_test_signal(signal_length, noise_level)
# Process the signal
filtered_signal = process_signal(noisy_signal, window_size, "enhanced")
# Calculate basic metrics
if len(filtered_signal) > 0:
# Align signals for comparison (account for processing delay)
delay = window_size - 1
aligned_clean = clean_signal[delay:]
aligned_noisy = noisy_signal[delay:]
# Ensure same length
min_length = min(len(filtered_signal), len(aligned_clean))
filtered_signal = filtered_signal[:min_length]
aligned_clean = aligned_clean[:min_length]
aligned_noisy = aligned_noisy[:min_length]
# Calculate correlation with clean signal
correlation = np.corrcoef(filtered_signal, aligned_clean)[0, 1] if min_length > 1 else 0
# Calculate noise reduction
noise_before = np.var(aligned_noisy - aligned_clean)
noise_after = np.var(filtered_signal - aligned_clean)
noise_reduction = (noise_before - noise_after) / noise_before if noise_before > 0 else 0
return {
"filtered_signal": filtered_signal,
"clean_signal": aligned_clean,
"noisy_signal": aligned_noisy,
"correlation": correlation,
"noise_reduction": noise_reduction,
"signal_length": min_length,
}
else:
return {
"filtered_signal": [],
"clean_signal": [],
"noisy_signal": [],
"correlation": 0,
"noise_reduction": 0,
"signal_length": 0,
}
if __name__ == "__main__":
# Test the algorithm
results = run_signal_processing()
print(f"Signal processing completed!")
print(f"Correlation with clean signal: {results['correlation']:.3f}")
print(f"Noise reduction: {results['noise_reduction']:.3f}")
print(f"Processed signal length: {results['signal_length']}")