v1.0 • Lead Management
Secure your webhook endpoints with HMAC signature verification.
Every webhook request from Pollybot includes a cryptographic signature that allows you to verify the request actually came from us and hasn't been tampered with.
POST /your-webhook-endpoint HTTP/1.1
Content-Type: application/json
X-Webhook-Signature: sha256=a8b7c9d2e3f4...
X-Webhook-Timestamp: 1696531200
User-Agent: Pollybot-Webhooks/1.0
{
"event": "LEAD_CREATED",
"data": { ... }
}const crypto = require('crypto');
function verifyWebhookSignature(body, signature, secret, timestamp) {
// Check timestamp (optional - prevents replay attacks)
const now = Math.floor(Date.now() / 1000);
if (Math.abs(now - timestamp) > 300) { // 5 minutes tolerance
throw new Error('Request timestamp too old');
}
// Create the payload string (body + timestamp)
const payload = body + timestamp;
// Generate expected signature
const expectedSignature = crypto
.createHmac('sha256', secret)
.update(payload, 'utf8')
.digest('hex');
// Compare signatures securely
const receivedSignature = signature.replace('sha256=', '');
const isValid = crypto.timingSafeEqual(
Buffer.from(expectedSignature, 'hex'),
Buffer.from(receivedSignature, 'hex')
);
if (!isValid) {
throw new Error('Invalid webhook signature');
}
return true;
}
// Express.js middleware example
app.use('/webhook', express.raw({ type: 'application/json' }));
app.post('/webhook/pollybot', (req, res) => {
const signature = req.headers['x-webhook-signature'];
const timestamp = req.headers['x-webhook-timestamp'];
const body = req.body.toString();
try {
verifyWebhookSignature(body, signature, process.env.WEBHOOK_SECRET, timestamp);
// Signature is valid, process the webhook
const payload = JSON.parse(body);
res.status(200).json({ received: true });
} catch (error) {
console.error('Webhook verification failed:', error.message);
res.status(400).json({ error: 'Invalid signature' });
}
});import hmac
import hashlib
import time
from flask import Flask, request, jsonify
app = Flask(__name__)
def verify_webhook_signature(body, signature, secret, timestamp):
# Check timestamp (optional)
current_time = int(time.time())
if abs(current_time - int(timestamp)) > 300: # 5 minutes tolerance
raise ValueError("Request timestamp too old")
# Create payload string
payload = body + timestamp
# Generate expected signature
expected_signature = hmac.new(
secret.encode('utf-8'),
payload.encode('utf-8'),
hashlib.sha256
).hexdigest()
# Compare signatures
received_signature = signature.replace('sha256=', '')
if not hmac.compare_digest(expected_signature, received_signature):
raise ValueError("Invalid webhook signature")
return True
@app.route('/webhook/pollybot', methods=['POST'])
def handle_webhook():
signature = request.headers.get('X-Webhook-Signature')
timestamp = request.headers.get('X-Webhook-Timestamp')
body = request.get_data(as_text=True)
try:
verify_webhook_signature(body, signature, os.environ['WEBHOOK_SECRET'], timestamp)
# Process webhook
payload = request.get_json()
print(f"Valid webhook received: {payload}")
return jsonify({"received": True}), 200
except ValueError as e:
print(f"Webhook verification failed: {e}")
return jsonify({"error": "Invalid signature"}), 400Use our test endpoint to see how signatures are generated and verify your implementation:
http://localhost:3000/api/test-webhook