sudopower

De-Duplication Service: Adding Time-Based Uniqueness and Cleanup

This is a follow up post on building a de-duplication service. Click here to start from the top

In most cases, duplicates occur only in a certain period of time and in some cases, you want to allow duplicates after a certain period of time.
Like in the case of voting at the office lunch, you want to be able to discard these votes after one day.
In case of sensor data, where you want only one reading per hour, your uniqueness period here is one hour.

Let’s try to solve two problems

  1. Expiring entries after a certain period.
  2. Cleanup expired entries and reclaim space.

Adding expiry and cleanup features

  • Store the timestamp of, arrival of the data, as value of the key.
  • Run a background task to check the seen map and remove anything older than required.


In the previous code let’s add these modifications.


Code:

package main

import (
	"bufio"
	"flag"
	"fmt"
	"io"
	"log"
	"os"
	"sync"
	"time"
)

// Deduplicator holds the state for tracking seen message keys.
// It is safe for concurrent use.
type Deduplicator struct {

	// The duration to keep a key before it can be seen again.
	// A zero value means permanent deduplication.
	period time.Duration

	// mu protects the seen map from concurrent access.
	mu sync.RWMutex

	// seen stores the last time a key was observed.
	// The key is of type interface{} to handle various types (numbers, strings).
	seen map[interface{}]time.Time
}

// NewDeduplicator creates and initializes a new Deduplicator instance.
// It also starts a background cleanup goroutine if a deduplication period is specified.
func NewDeduplicator(period time.Duration) *Deduplicator {
	d := &Deduplicator{
		period: period,
		seen:   make(map[interface{}]time.Time),
	}

	// If a period is set, we need to periodically clean up old entries
	// from the 'seen' map to prevent memory from growing indefinitely.
	if period > 0 {
		go d.startCleanup()
	}

	return d
}

// ProcessMessages reads messages from the provided reader, deduplicates them,
// and writes the unique messages to the writer.
func (d *Deduplicator) ProcessMessages(writer io.Writer, reader io.Reader) {
	scanner := bufio.NewScanner(reader)
	for scanner.Scan() {
		lineBytes := scanner.Bytes()

		// An empty line is not a valid JSON, so we skip it.
		if len(lineBytes) == 0 {
			continue
		}

		// Check if the key is a duplicate.
		if !d.isDuplicate(string(lineBytes)) {
			// If not a duplicate, write the original message to the output.
			fmt.Fprintln(writer, "output: "+string(lineBytes))
		}
	}

	if err := scanner.Err(); err != nil {
		log.Printf("Error reading from STDIN: %v", err)
	}
}

// isDuplicate checks if a key has been seen before within the deduplication period.
// It returns true if the message is a duplicate, and false otherwise.
// If the message is not a duplicate, it records the key and the current time.
func (d *Deduplicator) isDuplicate(key interface{}) bool {
	// Permanent deduplication (period is 0)
	if d.period == 0 {
		d.mu.Lock()
		defer d.mu.Unlock()

		if _, found := d.seen[key]; found {
			return true // Found, so it's a duplicate.
		}
		d.seen[key] = time.Time{} // Store it permanently. The time value doesn't matter.
		return false
	}

	// Timed deduplication
	d.mu.Lock()
	defer d.mu.Unlock()

	lastSeen, found := d.seen[key]
	now := time.Now()

	// If found and the time since last seen is less than the period, it's a duplicate.
	if found && now.Sub(lastSeen) < d.period {
		return true
	}

	// Otherwise, it's not a duplicate. Record the time we saw it.
	d.seen[key] = now
	return false
}

// startCleanup runs a periodic task to remove expired keys from the 'seen' map.
// This prevents the map from growing indefinitely in timed deduplication mode.
func (d *Deduplicator) startCleanup() {
	// The cleanup interval can be the same as the period.
	// For very long periods (e.g., 1 week), you might choose a shorter interval
	// like once an hour. For this implementation, the period itself is a good default.
	ticker := time.NewTicker(d.period)
	defer ticker.Stop()

	for range ticker.C {
		d.cleanupExpired()
	}
}

// cleanupExpired iterates over the map and removes keys that are older than the period.
func (d *Deduplicator) cleanupExpired() {
	d.mu.Lock()
	defer d.mu.Unlock()

	now := time.Now()
	for key, lastSeen := range d.seen {
		if now.Sub(lastSeen) > d.period {
			delete(d.seen, key)
		}
	}
}

func main() {
	// Define and parse command-line flags
	dedupPeriod := flag.Duration("period", 0, "Optional: Deduplication period (e.g., '10s', '5m', '1h'). If not set, deduplication is permanent.")

	flag.Parse()

	// Validate inputs
	if *dedupPeriod < 0 {
		log.Println("Error: The -period cannot be negative.")
		os.Exit(1)
	}

	// Create and run the service
	log.Printf("Starting deduplication service with period '%v'", *dedupPeriod)

	deduplicator := NewDeduplicator(*dedupPeriod)
	deduplicator.ProcessMessages(os.Stdout, os.Stdin)

	log.Println("Deduplication service finished.")
}

Input & Output: We use STDIN and paste the input, to understand better

sudopower@MacBookAir deduplication-service % go run main.go -period=10s 
2025/07/01 20:06:06 Starting deduplication service with period '10s'
Dwight voted for Salad 🥗
Stanely voted for Salad 🥗
Michael voted for 🍕 Pizza, but he may vote again, coz he's naughty and doesn't want Salad 
Michael voted for 🍕 Pizza, but he may vote again, coz he's naughty and doesn't want Salad 
Michael voted for 🍕 Pizza, but he may vote again, coz he's naughty and doesn't want Salad 
Pam voted for Salad 🥗
Jim voted for Salad 🥗

output: Dwight voted for Salad 🥗
output: Stanely voted for Salad 🥗
output: Michael voted for 🍕 Pizza, but he may vote again, coz he's naughty and doesn't want Salad 
output: Pam voted for Salad 🥗
output: Jim voted for Salad 🥗

# paste this again within 10 seconds
Dwight voted for Salad 🥗
Stanely voted for Salad 🥗
Michael voted for 🍕 Pizza, but he may vote again, coz he's naughty and doesn't want Salad 

# and no output

#paste this again after 10 seconds
Dwight voted for Salad 🥗
Stanely voted for Salad 🥗
Stanely voted for Salad 🥗
Michael voted for 🍕 Pizza, but he may vote again, coz he's naughty and doesn't want Salad 
Michael voted for 🍕 Pizza, but he may vote again, coz he's naughty and doesn't want Salad 


output: Dwight voted for Salad 🥗
output: Stanely voted for Salad 🥗
output: Michael voted for 🍕 Pizza, but he may vote again, coz he's naughty and doesn't want Salad 

In this implementation, we have to be careful about locking the seen Map because it is being shared by the main process, which checks for duplicates, but also by the cleanup go routine. 

Problem

  1. What if the system crashes and this Seen map is lost.
  2. What if the data is too big and cannot be stored as a key in the map ?
  3. What if throughput is very high and the table grows suddenly beyond limit ?

There are a few ways to go from here:

  • You can use an external database to store this, but it needs fast read and write access and low latency.
  • You can store it in a file, but it needs to be backed up in case the application crashes.
  • You can use a Bloom Filter instead of a table to test whether this key was seen before, massive memory savings, but with some accuracy trade-off.

These are very interesting problems and require more posts to discuss them in detail. Comment if you’re interested in more.

Leave a Reply

Your email address will not be published. Required fields are marked *

Related Blogs