This is a follow up post on building a de-duplication service. Click here to start from the top
Currently we store the entire message in the map, for very short messages this is fine.
What if we could compress the data, and store it in the map as key ?
It seems like a good option but there is a chance that two items can produce very similar compressed outputs.
This points us in the direction of hashing which doesn’t really compress the data, but it reduces it’s size, which is what we want exactly.
The difference between hashing and compression is, hashing is one way, where as decompressing can give you back the original data.
Lossy compression reduces the file size by removing some of the data, because of this an exact match of the original data cannot be recreated. Quality is lost.
https://www.computersciencecafe.com/17-data-compression.html
Let’s try to solve these problems
- Reduce key size by using hash of the data as key
Code:
package main
import (
"bufio"
"crypto/sha256"
"flag"
"fmt"
"hash"
"io"
"log"
"os"
"sync"
"time"
)
// Deduplicator holds the state for tracking seen message keys.
// It is safe for concurrent use.
type Deduplicator struct {
// The duration to keep a key before it can be seen again.
// A zero value means permanent deduplication.
period time.Duration
// mu protects the seen map from concurrent access.
mu sync.RWMutex
// seen stores the last time a key was observed.
// The key is of type interface{} to handle various types (numbers, strings).
seen map[interface{}]time.Time
hasher hash.Hash
}
// NewDeduplicator creates and initializes a new Deduplicator instance.
// It also starts a background cleanup goroutine if a deduplication period is specified.
func NewDeduplicator(period time.Duration) *Deduplicator {
d := &Deduplicator{
period: period,
seen: make(map[interface{}]time.Time),
hasher: sha256.New(),
}
// If a period is set, we need to periodically clean up old entries
// from the 'seen' map to prevent memory from growing indefinitely.
if period > 0 {
go d.startCleanup()
}
return d
}
// ProcessMessages reads messages from the provided reader, deduplicates them,
// and writes the unique messages to the writer.
func (d *Deduplicator) ProcessMessages(writer io.Writer, reader io.Reader) {
scanner := bufio.NewScanner(reader)
for scanner.Scan() {
lineBytes := scanner.Bytes()
// An empty line is not a valid JSON, so we skip it.
if len(lineBytes) == 0 {
continue
}
// Check if the key is a duplicate.
d.hasher.Reset()
d.hasher.Write(lineBytes)
key := d.hasher.Sum(nil)
fmt.Fprintln(writer, fmt.Sprintf("data: %s | hash: %x\n", string(lineBytes), key))
if !d.isDuplicate(string(key)) {
// If not a duplicate, write the original message to the output.
fmt.Fprintln(writer, fmt.Sprintf("output: %s | hash: %x\n", string(lineBytes)))
}
}
if err := scanner.Err(); err != nil {
log.Printf("Error reading from STDIN: %v", err)
}
}
// isDuplicate checks if a key has been seen before within the deduplication period.
// It returns true if the message is a duplicate, and false otherwise.
// If the message is not a duplicate, it records the key and the current time.
func (d *Deduplicator) isDuplicate(key interface{}) bool {
// Permanent deduplication (period is 0)
if d.period == 0 {
d.mu.Lock()
defer d.mu.Unlock()
if _, found := d.seen[key]; found {
return true // Found, so it's a duplicate.
}
d.seen[key] = time.Time{} // Store it permanently. The time value doesn't matter.
return false
}
// Timed deduplication
d.mu.Lock()
defer d.mu.Unlock()
lastSeen, found := d.seen[key]
now := time.Now()
// If found and the time since last seen is less than the period, it's a duplicate.
if found && now.Sub(lastSeen) < d.period {
return true
}
// Otherwise, it's not a duplicate. Record the time we saw it.
d.seen[key] = now
return false
}
// startCleanup runs a periodic task to remove expired keys from the 'seen' map.
// This prevents the map from growing indefinitely in timed deduplication mode.
func (d *Deduplicator) startCleanup() {
// The cleanup interval can be the same as the period.
// For very long periods (e.g., 1 week), you might choose a shorter interval
// like once an hour. For this implementation, the period itself is a good default.
ticker := time.NewTicker(d.period)
defer ticker.Stop()
for range ticker.C {
d.cleanupExpired()
}
}
// cleanupExpired iterates over the map and removes keys that are older than the period.
func (d *Deduplicator) cleanupExpired() {
d.mu.Lock()
defer d.mu.Unlock()
now := time.Now()
for key, lastSeen := range d.seen {
if now.Sub(lastSeen) > d.period {
delete(d.seen, key)
}
}
}
func main() {
// Define and parse command-line flags
dedupPeriod := flag.Duration("period", 0, "Optional: Deduplication period (e.g., '10s', '5m', '1h'). If not set, deduplication is permanent.")
flag.Parse()
// Validate inputs
if *dedupPeriod < 0 {
log.Println("Error: The -period cannot be negative.")
os.Exit(1)
}
// Create and run the service
log.Printf("Starting deduplication service with period '%v'", *dedupPeriod)
deduplicator := NewDeduplicator(*dedupPeriod)
deduplicator.ProcessMessages(os.Stdout, os.Stdin)
log.Println("Deduplication service finished.")
}
Output
sudopower@MacBookAir deduplication-service % go run main.go
2025/07/04 22:13:46 Starting deduplication service with period '0s'
{ "vote": "pizza", "employeeDetails": { "employeeId": "EMP001", "firstName": "Alice", "lastName": "Smithson", "department": "Marketing and Communications", "position": "Senior Marketing Manager for Digital Initiatives", "email": "alice.smithson.digital@example.com", "phoneNumber": "+49 151 1234567890", "hireDate": "2020-03-15", "officeLocation": "Berlin Headquarters, Building C, Floor 4", "team": "Global Brand Strategy Team" } }
hashed to: b39fd0a572b8725672069a8642de84714e7a6805a26212fe3eaaad5bb2c553bc
not a duplicate
{ "vote": "pizza", "employeeDetails": { "employeeId": "EMP001", "firstName": "Alice", "lastName": "Smithson", "department": "Marketing and Communications", "position": "Senior Marketing Manager for Digital Initiatives", "email": "alice.smithson.digital@example.com", "phoneNumber": "+49 151 1234567890", "hireDate": "2020-03-15", "officeLocation": "Berlin Headquarters, Building C, Floor 4", "team": "Global Brand Strategy Team" } }
hashed to: b39fd0a572b8725672069a8642de84714e7a6805a26212fe3eaaad5bb2c553bc
{ "vote": "salad", "employeeDetails": { "employeeId": "EMP002", "firstName": "Bob", "lastName": "Johnsonville", "department": "Research and Development", "position": "Lead Software Engineer for AI Solutions", "email": "bob.johnsonville.ai@example.com", "phoneNumber": "+49 160 9876543210", "hireDate": "2018-07-22", "officeLocation": "Berlin Headquarters, Building A, Floor 2", "team": "Advanced Robotics Development Unit" } }
hashed to: 53f4d8edc1e210cfb927b942ac383431b64285bbb14ca9e780b649efc7eeebac
not a duplicate
{ "vote": "salad", "employeeDetails": { "employeeId": "EMP002", "firstName": "Bob", "lastName": "Johnsonville", "department": "Research and Development", "position": "Lead Software Engineer for AI Solutions", "email": "bob.johnsonville.ai@example.com", "phoneNumber": "+49 160 9876543210", "hireDate": "2018-07-22", "officeLocation": "Berlin Headquarters, Building A, Floor 2", "team": "Advanced Robotics Development Unit" } }
hashed to: 53f4d8edc1e210cfb927b942ac383431b64285bbb14ca9e780b649efc7eeebac
To save re-initialising the hasher we add it to the service and reset it after generating a hash.
You can see in the highlighted output, the hash value is much smaller than the original data and is consistent.
Accuracy & Performance
Now there are various hashing algorithms, MD5 is potentially faster than Sha256 but since uniqueness is important.
We need a hashing algorithm that produces less collisions, i.e. chances of two data points, producing the same hash.
Another important thing to consider is the computation speed of the hash, since it will be a significant compute instruction.
Most modern CPUs have built-in hardware support for SHA-256, significantly accelerating its performance.
It also produces same hash on different system so if you decide to move the hash table to an external database, all instances of service will produce same hash for the input.
Problem
- What if the system crashes and this Seen map is lost.
There are a few ways to go from here:
- You can use an external database to store this, but it needs fast read and write access and low latency.
- You can store it in a file, but it needs to be backed up in case the application crashes.
These are very interesting problems and require more posts to discuss them in detail. Comment if you’re interested for more.
Leave a Reply