Catching Data in Motion: Working with Real-Time Streams

Most data science feels like studying history. You analyze what already happened. But what if you could observe and react to data as it’s being born? That’s the magic of real-time analytics. Instead of reading yesterday’s news, you’re watching the live broadcast.

From monitoring financial markets to tracking delivery trucks, from detecting security breaches to watching social media trends unfold—real-time data lets you move from hindsight to insight to foresight.

The Basics: Tapping into Live Data Feeds

Before we dive into complex systems, let’s start with something simple: watching files as they grow. This is surprisingly useful for monitoring log files, sensor outputs, or any continuously updated text source.

The Humble Log File Watcher

r

# Think of this as a security camera for your data files

watch_log_file <- function(log_path, check_interval = 1) {

  conn <- file(log_path, open = “r”)

  # Start reading from the end of the file

  file_info <- file.info(log_path)

  current_position <- file_info$size

  cat(“Watching”, log_path, “for new entries…\n”)

  cat(“Press Ctrl + C to stop\n\n”)

  tryCatch({

    while (TRUE) {

      # Check if file has grown

      latest_info <- file.info(log_path)

      if (latest_info$size > current_position) {

        # Read new content

        seek(conn, current_position)

        new_lines <- readLines(conn)

        # Process each new line

        for (line in new_lines) {

          if (nchar(trimws(line)) > 0) {

            process_log_entry(line)  # Your custom processing function

          }

        }

        current_position <- latest_info$size

      }

      # Take a breather

      Sys.sleep(check_interval)

    }

  }, finally = {

    close(conn)

    cat(“\nStopped watching”, log_path, “\n”)

  })

}

# Example: Monitor web server errors in real-time

process_log_entry <- function(log_line) {

  if (grepl(“ERROR”, log_line)) {

    # Send alert for errors

    cat(“🚨 ERROR DETECTED:”, log_line, “\n”)

    # Could also send email, Slack message, etc.

    # send_slack_alert(paste(“Server Error:”, log_line))

  } else if (grepl(“WARN”, log_line)) {

    cat(“⚠️  Warning:”, log_line, “\n”)

  }

}

# Start watching

# watch_log_file(“/var/log/nginx/error.log”)

This approach may seem simple, but it’s incredibly powerful for system monitoring, IoT devices, or any application where data gets appended to files.

WebSockets: The Real-Time Web

When you need truly live data from web services, WebSockets provide a persistent two-way connection. Think stock prices, live sports scores, or real-time collaboration tools.

Live Crypto Price Monitor

r

library(websocket)

library(jsonlite)

create_crypto_monitor <- function(symbols = c(“btcusdt”, “ethusdt”)) {

  # Create a monitoring environment

  monitor_env <- new.env()

  monitor_env$price_history <- list()

  monitor_env$last_alert_prices <- list()

  # Build WebSocket URL for multiple symbols

  streams <- paste0(symbols, “@trade”)

  url <- paste0(“wss://stream.binance.com:9443/ws/”, paste(streams, collapse = “/”))

  ws <- WebSocket$new(url)

  ws$onMessage(function(event) {

    tryCatch({

      trade_data <- fromJSON(event$data)

      symbol <- trade_data$s

      price <- as.numeric(trade_data$p)

      quantity <- as.numeric(trade_data$q)

      timestamp <- as.POSIXct(trade_data$T/1000, origin = “1970-01-01”)

      # Store in history

      if (is.null(monitor_env$price_history[[symbol]])) {

        monitor_env$price_history[[symbol]] <- data.frame()

      }

      new_row <- data.frame(

        timestamp = timestamp,

        price = price,

        quantity = quantity,

        stringsAsFactors = FALSE

      )

      monitor_env$price_history[[symbol]] <- rbind(

        monitor_env$price_history[[symbol]],

        new_row

      )

      # Keep only last 100 trades

      if (nrow(monitor_env$price_history[[symbol]]) > 100) {

        monitor_env$price_history[[symbol]] <- tail(monitor_env$price_history[[symbol]], 100)

      }

      # Check for significant price movements

      check_price_alert(symbol, price, monitor_env)

      # Print update

      cat(format(timestamp, “%H:%M:%S”), “|”, symbol, “:”, price, “\n”)

    }, error = function(e) {

      message(“Error processing message: “, e$message)

    })

  })

  ws$onOpen(function(event) {

    cat(“Connected to Binance WebSocket. Monitoring:”, paste(symbols, collapse = “, “), “\n”)

  })

  ws$onClose(function(event) {

    cat(“WebSocket connection closed\n”)

  })

  ws$onError(function(event) {

    message(“WebSocket error: “, event$message)

  })

  return(list(

    ws = ws,

    env = monitor_env,

    stop = function() { ws$close() }

  ))

}

check_price_alert <- function(symbol, current_price, env) {

  history <- env$price_history[[symbol]]

  if (nrow(history) < 10) return()  # Need some history first

  previous_prices <- head(history$price, -1)  # All but current price

  avg_price <- mean(previous_prices)

  price_change <- (current_price – avg_price) / avg_price * 100

  # Alert on moves greater than 2%

  if (abs(price_change) > 2) {

    direction <- ifelse(price_change > 0, “📈 UP”, “📉 DOWN”)

    cat(“🚨 SIGNIFICANT MOVE:”, symbol, direction,

        sprintf(“%.2f%%”, price_change),

        “Current:”, current_price, “\n”)

  }

}

# Usage

# monitor <- create_crypto_monitor(c(“btcusdt”, “ethusdt”, “adausdt”))

# Sys.sleep(60)  # Monitor for 60 seconds

# monitor$stop()

Social Media Pulse: Tracking Trends as They Happen

Social media streams give you a real-time window into public opinion, breaking news, and cultural trends.

Live Twitter Sentiment Tracker

r

library(rtweet)

library(syuzhet)

library(dplyr)

create_sentiment_tracker <- function(search_terms, duration_minutes = 15) {

  cat(“Starting sentiment tracking for:”, paste(search_terms, collapse = “, “), “\n”)

  # Create storage for tweets

  all_tweets <- data.frame()

  start_time <- Sys.time()

  while (difftime(Sys.time(), start_time, units = “mins”) < duration_minutes) {

    tryCatch({

      # Stream tweets for 60 seconds

      stream <- stream_tweets(

        q = search_terms,

        timeout = 60,

        verbose = FALSE

      )

      if (nrow(stream) > 0) {

        # Calculate sentiment for each tweet

        stream_with_sentiment <- stream %>%

          mutate(

            clean_text = gsub(“http\\S+”, “”, text),  # Remove URLs

            clean_text = gsub(“@\\w+”, “”, clean_text),  # Remove mentions

            sentiment = get_sentiment(clean_text, method = “syuzhet”),

            sentiment_category = case_when(

              sentiment > 0.1 ~ “positive”,

              sentiment < -0.1 ~ “negative”,

              TRUE ~ “neutral”

            )

          ) %>%

          select(created_at, text, sentiment, sentiment_category, favorite_count, retweet_count)

        all_tweets <- bind_rows(all_tweets, stream_with_sentiment)

        # Print summary

        sentiment_summary <- all_tweets %>%

          count(sentiment_category) %>%

          mutate(percentage = n / nrow(all_tweets) * 100)

        cat(“\n— Sentiment Summary —\n”)

        print(sentiment_summary)

        cat(“Total tweets:”, nrow(all_tweets), “\n”)

        cat(“Latest:”, format(max(all_tweets$created_at), “%H:%M:%S”), “\n\n”)

        # Alert on sentiment shifts

        if (nrow(all_tweets) > 50) {

          recent_tweets <- tail(all_tweets, 50)

          recent_positive <- mean(recent_tweets$sentiment_category == “positive”)

          if (recent_positive > 0.7) {

            cat(“🔥 TRENDING POSITIVE: Recent tweets are”,

                sprintf(“%.1f%%”, recent_positive * 100), “positive!\n”)

          } else if (recent_positive < 0.3) {

            cat(“💀 TRENDING NEGATIVE: Recent tweets are only”,

                sprintf(“%.1f%%”, recent_positive * 100), “positive\n”)

          }

        }

      }

    }, error = function(e) {

      message(“Error in streaming cycle: “, e$message)

      Sys.sleep(5)  # Wait before retrying

    })

  }

  return(all_tweets)

}

# Usage (requires Twitter API credentials)

# tweets <- create_sentiment_tracker(c(“artificial intelligence”, “AI”), duration_minutes = 10)

Building Real-Time Dashboards with Shiny

Sometimes you need to share real-time insights with others. Shiny dashboards can update in real-time, giving your team a live view of what’s happening.

Live System Monitoring Dashboard

r

library(shiny)

library(ggplot2)

library(plotly)

# Simulate real-time system metrics

generate_system_metrics <- function() {

  data.frame(

    timestamp = Sys.time(),

    cpu_usage = runif(1, 0, 100),

    memory_usage = runif(1, 50, 95),

    active_connections = sample(100:500, 1),

    request_rate = rexp(1, 1/50)  # Poisson-like distribution

  )

}

ui <- fluidPage(

  titlePanel(“🖥️ Live System Monitor”),

  sidebarLayout(

    sidebarPanel(

      width = 3,

      h4(“System Status”),

      verbatimTextOutput(“current_status”),

      br(),

      actionButton(“start”, “Start Monitoring”, class = “btn-success”),

      actionButton(“stop”, “Stop Monitoring”, class = “btn-danger”)

    ),

    mainPanel(

      fluidRow(

        column(6, plotlyOutput(“cpu_plot”)),

        column(6, plotlyOutput(“memory_plot”))

      ),

      fluidRow(

        column(6, plotlyOutput(“connections_plot”)),

        column(6, plotlyOutput(“requests_plot”))

      )

    )

  )

)

server <- function(input, output, session) {

  # Reactive value for metrics data

  metrics <- reactiveVal(data.frame())

  monitoring_active <- reactiveVal(FALSE)

  # Start/stop monitoring

  observeEvent(input$start, {

    monitoring_active(TRUE)

    cat(“Monitoring started\n”)

  })

  observeEvent(input$stop, {

    monitoring_active(FALSE)

    cat(“Monitoring stopped\n”)

  })

  # Generate new metrics periodically

  observe({

    if (monitoring_active()) {

      new_metrics <- generate_system_metrics()

      current_data <- metrics()

      updated_data <- rbind(current_data, new_metrics)

      # Keep only last 100 data points

      if (nrow(updated_data) > 100) {

        updated_data <- tail(updated_data, 100)

      }

      metrics(updated_data)

      invalidateLater(1000)  # Update every second

    }

  })

  # Current status output

  output$current_status <- renderPrint({

    data <- metrics()

    if (nrow(data) > 0) {

      latest <- tail(data, 1)

      cat(“Last Update:”, format(latest$timestamp, “%H:%M:%S”), “\n”)

      cat(“CPU Usage:”, sprintf(“%.1f%%”, latest$cpu_usage), “\n”)

      cat(“Memory Usage:”, sprintf(“%.1f%%”, latest$memory_usage), “\n”)

      cat(“Active Connections:”, latest$active_connections, “\n”)

      cat(“Requests/sec:”, sprintf(“%.1f”, latest$request_rate), “\n”)

    } else {

      cat(“No data yet\nClick ‘Start Monitoring'”)

    }

  })

  # Plot functions

  output$cpu_plot <- renderPlotly({

    data <- metrics()

    if (nrow(data) > 0) {

      p <- ggplot(data, aes(x = timestamp, y = cpu_usage)) +

        geom_line(color = ifelse(tail(data$cpu_usage, 1) > 80, “red”, “blue”)) +

        geom_hline(yintercept = 80, linetype = “dashed”, color = “red”) +

        labs(title = “CPU Usage”, x = “Time”, y = “Usage %”) +

        theme_minimal()

      ggplotly(p)

    }

  })

  # Similar renderPlotly functions for memory, connections, requests…

  output$memory_plot <- renderPlotly({

    data <- metrics()

    if (nrow(data) > 0) {

      p <- ggplot(data, aes(x = timestamp, y = memory_usage)) +

        geom_line(color = ifelse(tail(data$memory_usage, 1) > 85, “red”, “green”)) +

        geom_hline(yintercept = 85, linetype = “dashed”, color = “red”) +

        labs(title = “Memory Usage”, x = “Time”, y = “Usage %”) +

        theme_minimal()

      ggplotly(p)

    }

  })

}

# Run the app

# shinyApp(ui, server)

Production Considerations for Real-Time Systems

Building reliable real-time systems requires thinking about failure scenarios:

r

# Robust stream processor with error handling

create_resilient_stream_processor <- function(stream_config) {

  list(

    start = function() {

      attempts <- 0

      max_attempts <- 5

      while (attempts < max_attempts) {

        tryCatch({

          process_stream(stream_config)

          # If we get here, the stream ended normally

          break

        }, error = function(e) {

          attempts <<- attempts + 1

          message(“Stream attempt “, attempts, ” failed: “, e$message)

          if (attempts < max_attempts) {

            wait_time <- 2 ^ attempts  # Exponential backoff

            message(“Retrying in “, wait_time, ” seconds…”)

            Sys.sleep(wait_time)

          } else {

            message(“Max retries reached. Giving up.”)

            # Send alert to operations team

            send_incident_alert(“Stream processor failed after multiple retries”)

          }

        })

      }

    },

    # Graceful shutdown handler

    shutdown = function() {

      # Close connections, flush buffers, save state

      save_recovery_point()

      close_all_connections()

    }

  )

}

Conclusion: From Observing to Participating

Working with real-time data transforms your role from data archaeologist to data participant. You’re no longer just studying what happened—you’re observing what’s happening right now, and potentially influencing what happens next.

The real power comes when you combine these approaches:

  • Monitor system logs while watching business metrics
  • Track social sentiment while monitoring your website traffic
  • Watch financial markets while following news streams

The patterns we’ve covered—file watching, WebSockets, social streams, and live dashboards—give you the foundation to build responsive, intelligent systems. But remember: with great power comes great responsibility. Real-time systems can be resource-intensive, and you need to be mindful of API rate limits, system resources, and the ethical implications of constant monitoring.

Start small. Monitor one log file. Watch one WebSocket feed. Build one simple dashboard. As you get comfortable with the patterns and pitfalls, you’ll discover endless possibilities for creating systems that don’t just analyze history, but actively participate in the present.

Leave a Comment