IP Geolocation for Fraud Prevention: Detecting Suspicious Locations
Leverage IP geolocation data for fraud detection by identifying suspicious location patterns and impossible travel scenarios.
Table of Contents
Table of Contents
IP Geolocation for Fraud Prevention: Detecting Suspicious Locations
IP geolocation is a practical signal for fraud prevention when combined with behavior and device intelligence. Below are actionable techniques, code snippets, and queries to detect impossible travel, high‑risk networks, and geo anomalies.
IP Geolocation for Fraud Prevention Overview
- Impossible travel: user appears in far‑apart locations within an infeasible time window.
- corporate/VPN/Proxy/Hosting ASNs: traffic originates from data centers or anonymity networks.
- Country/region policy mismatch: geo not allowed by business rules or licensing.
- Geo‑velocity spikes: abnormal movement speed vs historical baseline.
Data Sources and Accuracy
- Geo DB: authoritative IP→(country, region, city, lat/lon, ASN).
- ASN catalogs: provider→list of hosting/VPN ASNs.
- Velocity models: store last known login coordinates and timestamps per user.
- Cache hot lookups via Redis for <5ms p95.
Node.js: fast IP→Geo lookup with caching
import Redis from 'ioredis'
import { getGeoByIp } from './geoProvider'
const redis = new Redis(process.env.REDIS_URL!)
export async function geoLookup(ip: string) {
const key = 'geo:' + ip
const cached = await redis.get(key)
if (cached) return JSON.parse(cached)
const geo = await getGeoByIp(ip)
await redis.set(key, JSON.stringify(geo), 'EX', 3600)
return geo
}Upserting geo on login
import { geoLookup } from './geo'
import { sql } from './db'
export async function recordLogin(userId: string, ip: string, ts: Date) {
const g = await geoLookup(ip)
// ... insert into user_login_events ...
}Risk Scoring Model
type Signals = { speedKmh?: number; isHostingAsn?: boolean; deniedCountry?: boolean }
export function score(signals: Signals): number {
let s = 0
if ((signals.speedKmh ?? 0) > 900) s += 50
if (signals.isHostingAsn) s += 30
if (signals.deniedCountry) s += 40
return Math.min(100, s)
}
export function action(score: number) {
if (score >= 70) return 'block'
if (score >= 40) return 'step_up_auth'
return 'allow'
}Practical SQL Examples
with last_two as (
select user_id, ts, lat, lon,
lag(ts) over (partition by user_id order by ts) as prev_ts,
lag(lat) over (partition by user_id order by ts) as prev_lat,
lag(lon) over (partition by user_id order by ts) as prev_lon
from user_login_events
)
select user_id, ts, prev_ts,
earth_distance(ll_to_earth(lat, lon), ll_to_earth(prev_lat, prev_lon)) / 1000 as km,
extract(epoch from (ts - prev_ts))/3600 as hours,
case when extract(epoch from (ts - prev_ts))>0
then (earth_distance(ll_to_earth(lat, lon), ll_to_earth(prev_lat, prev_lon)) / 1000)
/(extract(epoch from (ts - prev_ts))/3600)
else null end as kmh
from last_two
where prev_ts is not null and (earth_distance(ll_to_earth(lat, lon), ll_to_earth(prev_lat, prev_lon)) / 1000) / nullif(extract(epoch from (ts - prev_ts))/3600,0) > 900;Prerequisite extensions:
create extension if not exists cube;
create extension if not exists earthdistance;Flag hosting/VPN ASNs
create table if not exists hosting_asns (asn int primary key);
-- Upsert new ASN list daily
-- insert into hosting_asns (asn) values (13335), (15169), ...;
select e.user_id, e.ts, e.asn
from user_login_events e
join hosting_asns h on h.asn = e.asn;Security and Privacy
- Hash IPs for long‑term storage (e.g., SHA-256 with salt) if retention is required.
- Respect regional laws for geo‑based decisions; provide appeals/verification paths.
Monitoring and Alerts
Expose metrics and alerts:
- Rate of impossible travel per 1k logins.
- Share of hosting/VPN ASN traffic by region.
- Step‑up and block rates; false positive ratio.
journalctl -u auth.service -n 1000 | grep "country_change" | wc -lOperational Playbooks
- Excess hosting ASN traffic: enforce CAPTCHA on signup/login, tighten rate limits for flagged ASNs.
- Country policy bypass attempts: enable step‑up auth and manual review for affected cohort.
Implementation Guide
Postgres schema for velocity and geo decisions
create table if not exists user_login_events (
user_id uuid not null,
ts timestamptz not null,
ip inet not null,
country text,
region text,
city text,
lat double precision,
lon double precision,
asn int,
);
create index if not exists idx_login_user_ts on user_login_events(user_id, ts desc);Upserting geo on login
import { geoLookup } from './geo'
import { sql } from './db'
export async function recordLogin(userId: string, ip: string, ts: Date) {
const g = await geoLookup(ip)
// ... insert into user_login_events ...
}Risk Scoring Model
type Signals = { speedKmh?: number; isHostingAsn?: boolean; deniedCountry?: boolean }
export function score(signals: Signals): number {
let s = 0
if ((signals.speedKmh ?? 0) > 900) s += 50
if (signals.isHostingAsn) s += 30
if (signals.deniedCountry) s += 40
return Math.min(100, s)
}
export function action(score: number) {
if (score >= 70) return 'block'
if (score >= 40) return 'step_up_auth'
return 'allow'
}Advanced Analytics
User Journey Mapping
-- Track user location changes over time
with user_journey as (
select
user_id,
ts,
country,
region,
city,
lat, lon,
-- Calculate time since last location
lag(ts) over (partition by user_id order by ts) as prev_ts,
lag(country) over (partition by user_id order by ts) as prev_country,
-- Flag location changes
case when lag(country) over (partition by user_id order by ts) != country
then 'COUNTRY_CHANGE' else 'SAME_COUNTRY' end as location_change
from user_login_events
where ts >= now() - interval '30 days'
)
select
user_id,
count(*) as total_logins,
count(distinct country) as countries_visited,
array_agg(distinct country order by min(ts)) as country_sequence,
max(case when location_change = 'COUNTRY_CHANGE' then 1 else 0 end) as has_country_changes,
min(ts) as first_login,
max(ts) as last_login
from user_journey
group by user_id
having count(*) >= 5 -- Only users with multiple logins
order by countries_visited desc, total_logins desc;Anomaly Detection with Machine Learning
import numpy as np
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
def detect_anomalous_locations(login_history):
"""
Use machine learning to detect unusual location patterns
"""
if len(login_history) < 10:
return [] # Need more data for ML
# Extract features
features = []
for i in range(1, len(login_history)):
prev = login_history[i-1]
curr = login_history[i]
# Time difference in hours
time_diff = (curr['timestamp'] - prev['timestamp']).total_seconds() / 3600
# Distance in km
distance = haversine(prev['lat'], prev['lon'], curr['lat'], curr['lon'])
# Speed
speed = distance / max(time_diff, 0.001)
features.append([time_diff, distance, speed])
if not features:
return []
# Scale features
scaler = StandardScaler()
features_scaled = scaler.fit_transform(features)
# Train isolation forest
clf = IsolationForest(contamination=0.1, random_state=42)
predictions = clf.fit_predict(features_scaled)
# Return indices of anomalous logins
return [i+1 for i, pred in enumerate(predictions) if pred == -1]
# Example usage
login_history = [
{'lat': 40.7128, 'lon': -74.0060, 'timestamp': datetime.now() - timedelta(days=1)},
{'lat': 34.0522, 'lon': -118.2437, 'timestamp': datetime.now()}, # LA from NYC
# ... more logins
]
anomalies = detect_anomalous_locations(login_history)
print(f"Anomalous logins at indices: {anomalies}")Integration Examples
Express.js Middleware
import { geoLookup, score, action } from './geo-fraud'
app.use('/api/*', async (req, res, next) => {
const clientIP = req.ip || req.connection.remoteAddress
const userId = req.user?.id
if (userId) {
const geo = await geoLookup(clientIP)
const riskScore = await score({ speedKmh: 0, isHostingAsn: false, deniedCountry: false })
const recommendedAction = action(riskScore)
// Store for logging
req.geoContext = { geo, riskScore, action: recommendedAction }
if (recommendedAction === 'block') {
return res.status(403).json({
error: 'GEOGRAPHIC_RESTRICTION',
message: 'Access denied due to geographic restrictions.'
})
}
if (recommendedAction === 'step_up_auth') {
// Require additional verification
return res.status(401).json({
error: 'ADDITIONAL_VERIFICATION_REQUIRED',
message: 'Please verify your identity to continue.'
})
}
}
next()
})Python FastAPI Integration
from fastapi import HTTPException, Request
from .geo_fraud import calculate_risk_score, get_action
@app.middleware("http")
async def geo_fraud_middleware(request: Request, call_next):
client_ip = request.client.host if request.client else None
if client_ip and hasattr(request.state, 'user_id'):
user_id = request.state.user_id
geo = await get_geo_data(client_ip)
risk_score = await calculate_risk_score({
'speed_kmh': 0,
'is_hosting_asn': False,
'denied_country': False
})
recommended_action = get_action(risk_score)
if recommended_action == 'block':
raise HTTPException(
status_code=403,
detail="Access denied due to geographic restrictions."
)
if recommended_action == 'step_up_auth':
raise HTTPException(
status_code=401,
detail="Additional verification required."
)
response = await call_next(request)
return responseBest Practices
1. Combine Signals: Never rely on geo alone - combine with device fingerprinting, behavior analysis.
2. Regular Updates: Refresh ASN lists and geo databases weekly.
3. Graceful Degradation: Handle API failures and missing data gracefully.
4. User Communication: Provide clear explanations for blocks and appeals processes.
5. Privacy by Design: Minimize data collection and retention.
6. A/B Testing: Test different thresholds and policies with small user groups.
Conclusion
This comprehensive approach balances fraud prevention with user experience, ensuring legitimate users aren't unnecessarily blocked while protecting your platform from abuse.