High-performance weather data platform processing 1M+ API calls daily with real-time analytics, machine learning forecasting, and comprehensive weather visualization. Built with Python Django, Celery task processing, and advanced data pipeline architecture for enterprise weather services.
Real-time weather data processing with machine learning forecasting
High-performance Django REST API with advanced caching, database optimization, and scalable architecture handling millions of weather data requests daily.
Celery-powered distributed task processing with Redis message broker enabling real-time weather data ingestion and analysis at scale.
TensorFlow-based machine learning models providing 95% accurate weather predictions with LSTM networks and ensemble forecasting techniques.
Sub-100ms API response times through intelligent caching strategies, database indexing, and optimized query patterns for weather data retrieval.
Comprehensive weather analytics with historical trend analysis, anomaly detection, and predictive insights for business intelligence.
Intelligent weather alerting system with custom thresholds, multi-channel notifications, and emergency response integration.
Advanced Python patterns and data processing architecture
from django.views.decorators.cache import cache_page
from django.core.cache import cache
from rest_framework.views import APIView
from rest_framework.response import Response
from celery import shared_task
import pandas as pd
import numpy as np
class WeatherDataAPIView(APIView):
"""High-performance weather data API with intelligent caching"""
@cache_page(60 * 5) # Cache for 5 minutes
def get(self, request):
location = request.query_params.get('location')
start_date = request.query_params.get('start_date')
end_date = request.query_params.get('end_date')
# Generate cache key
cache_key = f"weather_data_{location}_{start_date}_{end_date}"
# Try cache first
cached_data = cache.get(cache_key)
if cached_data:
return Response(cached_data)
# Query database with optimized select
weather_data = WeatherReading.objects.filter(
location__iexact=location,
timestamp__range=[start_date, end_date]
).select_related('location').values(
'timestamp', 'temperature', 'humidity',
'pressure', 'wind_speed', 'precipitation'
)
# Process data with pandas for performance
df = pd.DataFrame(weather_data)
if not df.empty:
# Calculate aggregations
daily_stats = df.groupby(df['timestamp'].dt.date).agg({
'temperature': ['min', 'max', 'mean'],
'humidity': 'mean',
'precipitation': 'sum'
}).round(2)
result = {
'location': location,
'period': f"{start_date} to {end_date}",
'readings': weather_data,
'daily_statistics': daily_stats.to_dict(),
'total_readings': len(df)
}
else:
result = {'message': 'No data found for specified parameters'}
# Cache result for 15 minutes
cache.set(cache_key, result, 60 * 15)
return Response(result)
@shared_task
def process_weather_data_batch(data_batch):
"""Celery task for processing weather data in batches"""
try:
processed_readings = []
for reading in data_batch:
# Data validation and cleaning
cleaned_reading = validate_weather_reading(reading)
# Anomaly detection
if detect_anomaly(cleaned_reading):
trigger_alert.delay(cleaned_reading)
# Store processed reading
weather_reading = WeatherReading.objects.create(**cleaned_reading)
processed_readings.append(weather_reading.id)
# Update forecasting model if needed
if should_retrain_model(cleaned_reading):
retrain_forecast_model.delay(cleaned_reading['location'])
return {
'status': 'success',
'processed_count': len(processed_readings),
'reading_ids': processed_readings
}
except Exception as e:
logger.error(f"Error processing weather batch: {str(e)}")
return {'status': 'error', 'message': str(e)}
def validate_weather_reading(reading):
"""Advanced weather data validation"""
# Temperature range validation
if not -50 <= reading.get('temperature', 0) <= 60:
raise ValueError("Temperature out of valid range")
# Humidity validation
if not 0 <= reading.get('humidity', 0) <= 100:
raise ValueError("Humidity out of valid range")
# Pressure validation
if not 800 <= reading.get('pressure', 1013) <= 1100:
raise ValueError("Pressure out of valid range")
return reading
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout
import numpy as np
from sklearn.preprocessing import MinMaxScaler
class WeatherForecastModel:
"""TensorFlow LSTM model for weather forecasting"""
def __init__(self):
self.model = None
self.scaler = MinMaxScaler()
self.sequence_length = 24 # 24 hours of data
def build_model(self, input_features=5):
"""Build LSTM neural network for weather prediction"""
self.model = Sequential([
LSTM(128, return_sequences=True,
input_shape=(self.sequence_length, input_features)),
Dropout(0.2),
LSTM(64, return_sequences=True),
Dropout(0.2),
LSTM(32, return_sequences=False),
Dropout(0.2),
Dense(16, activation='relu'),
Dense(input_features, activation='linear') # Predict all features
])
self.model.compile(
optimizer='adam',
loss='mse',
metrics=['mae']
)
return self.model
def prepare_training_data(self, weather_data):
"""Prepare time series data for LSTM training"""
# Convert to numpy array
data = np.array(weather_data)
# Normalize features
scaled_data = self.scaler.fit_transform(data)
# Create sequences
X, y = [], []
for i in range(self.sequence_length, len(scaled_data)):
X.append(scaled_data[i-self.sequence_length:i])
y.append(scaled_data[i])
return np.array(X), np.array(y)
def train(self, weather_data, epochs=100, batch_size=32):
"""Train the forecasting model"""
X, y = self.prepare_training_data(weather_data)
# Split data
split_idx = int(0.8 * len(X))
X_train, X_test = X[:split_idx], X[split_idx:]
y_train, y_test = y[:split_idx], y[split_idx:]
# Train model
history = self.model.fit(
X_train, y_train,
batch_size=batch_size,
epochs=epochs,
validation_data=(X_test, y_test),
verbose=1
)
return history
def predict_weather(self, recent_data, hours_ahead=24):
"""Generate weather forecast"""
# Prepare input data
scaled_input = self.scaler.transform(recent_data)
input_sequence = scaled_input[-self.sequence_length:].reshape(1, self.sequence_length, -1)
predictions = []
current_sequence = input_sequence.copy()
# Generate multi-step forecast
for _ in range(hours_ahead):
next_prediction = self.model.predict(current_sequence)
predictions.append(next_prediction[0])
# Update sequence for next prediction
current_sequence = np.roll(current_sequence, -1, axis=1)
current_sequence[0, -1, :] = next_prediction[0]
# Inverse transform predictions
forecast = self.scaler.inverse_transform(predictions)
return forecast
@shared_task
def generate_daily_forecast(location_id):
"""Celery task to generate daily weather forecast"""
try:
# Get recent weather data
recent_data = WeatherReading.objects.filter(
location_id=location_id,
timestamp__gte=timezone.now() - timedelta(days=7)
).values_list(
'temperature', 'humidity', 'pressure',
'wind_speed', 'precipitation'
)
# Load trained model
forecast_model = WeatherForecastModel()
forecast_model.load_model(f'models/weather_model_{location_id}.h5')
# Generate 24-hour forecast
forecast = forecast_model.predict_weather(recent_data, hours_ahead=24)
# Store forecast in database
for i, prediction in enumerate(forecast):
forecast_time = timezone.now() + timedelta(hours=i+1)
WeatherForecast.objects.create(
location_id=location_id,
forecast_time=forecast_time,
temperature=prediction[0],
humidity=prediction[1],
pressure=prediction[2],
wind_speed=prediction[3],
precipitation=prediction[4],
confidence_score=calculate_confidence(prediction)
)
return {'status': 'success', 'forecasts_generated': len(forecast)}
except Exception as e:
logger.error(f"Forecast generation failed: {str(e)}")
return {'status': 'error', 'message': str(e)}
Real-world metrics demonstrating platform capabilities
Delivering value through weather intelligence
Interested in creating high-performance data processing systems? Let's discuss how these Python and machine learning patterns can transform your data infrastructure.