惯性聚合 高效追踪和阅读你感兴趣的博客、新闻、科技资讯
阅读原文 在惯性聚合中打开

推荐订阅源

F
Full Disclosure
WordPress大学
WordPress大学
小众软件
小众软件
Cloudbric
Cloudbric
AWS News Blog
AWS News Blog
腾讯CDC
量子位
人人都是产品经理
人人都是产品经理
大猫的无限游戏
大猫的无限游戏
freeCodeCamp Programming Tutorials: Python, JavaScript, Git & More
V
Vulnerabilities – Threatpost
Scott Helme
Scott Helme
Hugging Face - Blog
Hugging Face - Blog
博客园_首页
C
CXSECURITY Database RSS Feed - CXSecurity.com
The Hacker News
The Hacker News
奇客Solidot–传递最新科技情报
奇客Solidot–传递最新科技情报
IT之家
IT之家
Jina AI
Jina AI
Attack and Defense Labs
Attack and Defense Labs
S
SegmentFault 最新的问题
Simon Willison's Weblog
Simon Willison's Weblog
The Cloudflare Blog
阮一峰的网络日志
阮一峰的网络日志
T
Tailwind CSS Blog
Last Week in AI
Last Week in AI
博客园 - 【当耐特】
Google Online Security Blog
Google Online Security Blog
美团技术团队
OSCHINA 社区最新新闻
OSCHINA 社区最新新闻
V
Visual Studio Blog
罗磊的独立博客
L
LINUX DO - 最新话题
博客园 - Franky
博客园 - 叶小钗
Apple Machine Learning Research
Apple Machine Learning Research
The Last Watchdog
The Last Watchdog
J
Java Code Geeks
AI
AI
C
Cisco Blogs
酷 壳 – CoolShell
酷 壳 – CoolShell
C
Cyber Attacks, Cyber Crime and Cyber Security
Cisco Talos Blog
Cisco Talos Blog
博客园 - 三生石上(FineUI控件)
雷峰网
雷峰网
Help Net Security
Help Net Security
钛媒体:引领未来商业与生活新知
钛媒体:引领未来商业与生活新知
云风的 BLOG
云风的 BLOG
I
Intezer
S
Securelist

Christian Hollinger

Building confidence in geospatial data I used my homelab to start an LLC: Meet SkaldMaps How I run and deploy docker services in my homelab with Komodo and a custom CLI More random home lab things I've recently learned New Website & Scala Days 2025 Announcement A Distributed System from scratch, with Scala 3 - Part 3: Job submission, worker scaling, and leader election & consensus with Raft My 2025 Homelab Updates: Quadrupling Capacity Why I still self host my servers (and what I've recently learned) Improving my Distributed System with Scala 3: Consistency Guarantees & Background Tasks (Part 2) Moving a Proxmox host with a SAS HBA as PCI passthrough for zfs + TrueNAS Building a functional, effectful Distributed System from scratch in Scala 3, just to avoid Leetcode (Part 1) Migrating a Home Server to Proxmox, TrueNas, and zfs, or: How to make your home network really complicated for no good reason QGIS is the mapping software you didn't know you needed Tiny Telematics: Tracking my truck's location offline with a Raspberry Pi, redis, Kafka, and Flink (Part 2) Tiny Telematics: Tracking my truck's location offline with a Raspberry Pi, redis, Kafka, and Flink (Part 1) Functional Programming concepts I actually like: A bit of praise for Scala (for once) Scala, Spark, Books, and Functional Programming: An Essay Building a Data Lake with Spark and Iceberg at Home to over-complicate shopping for a House Writing a Telegram Bot to control a Raspberry Pi from afar (to observe Guinea Pigs) Raspberry Pi Gardening: Monitoring a Vegetable Garden using a Raspberry Pi - Part 2: 3D Printing Raspberry Pi Gardening: Monitoring a Vegetable Garden using a Raspberry Pi - Part 1 Bad Data and Data Engineering: Dissecting Google Play Music Takeout Data using Beam, go, Python, and SQL Why I use Linux RE: Throw Away Code? Use go, not Python or Rust! A Data Engineering Perspective on Go vs. Python (Part 2 - Dataflow) A Data Engineering Perspective on Go vs. Python (Part 1) Goodbye, WordPress - Hello, Hugo & nginx How a broken memory module hid in plain sight Tensorflow on edge, or – Building a “smart” security camera with a Raspberry Pi How I built a (tiny) real-time Telematics application on AWS A look at Apache Hadoop in 2019 Building a Home Server Analyzing Reddit’s Top Posts & Images With Google Cloud (Part 2 - AutoML) Analyzing Reddit’s Top Posts & Images With Google Cloud (Part 1) Analyzing Twitter Location Data with Heron, Machine Learning, Google's NLP, and BigQuery Data Lakes: Some thoughts on Hadoop, Hive, HBase, and Spark Storm vs. Heron – Part 2 – Why Heron? A developer’s view Storm vs. Heron, Part 1: Reusing a Storm topology for Heron Update an HBase table with Hive... or sed
(Tiny) Telematics with Spark and Zeppelin
Christian Hollinger · 2017-03-06 · via Christian Hollinger

A V8 doesn’t need computers

Cars these days are quite the experience. Or to use Mike Skinner’s words, they are full of “Space Invader”-technology.

Even entry-level models have more wiring, computers and sensors built in than a medium-sized plane. Well, while that might be a slight exaggeration, but a modern car collects data. A lot of it. Different sources, different numbers – this paper from 2014 quotes a dead link with “approximately 4 megabytes to over 5 gigabytes of data per vehicle per month”, other sources bring up even higher figures. However you put it – there is a lot of information available.

Location, speed, hundreds of sensors for tire pressure, MPG, wiper fluid, you get the idea. Some manufacturers might use this data, some insurance companies might and you might even see some of it in your car’s dashboard.

That’s the modern world. And then there’s my car. A 7-year-old, 130k mi battle-proven Ford Crown Victoria Police Interceptor with an all-American 4.6l V8 (“Turbos on an engine are just god’s way of telling you it ain’t got enough cylinders” –Mike Skinner, again, in character on The Grand Tour).

Not a connected car

And given its lack of fancy technology, it collects and shows exactly one metric to me: The fact that it actually has 130k miles on it. But what if we could get just a little bit more information? What if we could live up to the spirit of 2017 and not think about what we need, but what we might need? Introducing: A homegrown telematics solution!

Telematics

To get a bit more serious, what exactly is telematics? In general, it means transmitting information via telecommunication networks, but when we refer to the term “telematics”, we usually talk about collecting and transmitting certain information related to a vehicle to a corporate-backend in order enable a variety of use cases. Insurance companies use telematics data to offer discounts to people who drive responsibly, car manufacturers might use it to (anonymously) collect and do all sorts of data mining and gather insights on profiles or defects and for corporate fleet, it might be used for fleet management – the potential use cases are as complex as they are plentiful.

For the scope of this article, I would like to talk about what data we can gather and analyze by simple using a car and an iPhone – feel free to extend the idea, the Macchina M2 Kickstarter might be a good starting point. This article will lay the groundwork and give some ideas on how to do it at home.

My maximum speeds in Georgia

 Scope & Technology

For this project, here’s what we like to achieve:

  • Collect all available driving data while on the road – only a car and a phone are required
  • Send and store the data on a server controlled by the user
  • Analyze the data and get all insights from it that we reasonably can
  • Show the results in a dashboard

And here’s what we’ll use:

  • SensorLog, a $2.99 app available for iOS
  • Any form of web-server that accepts HTTP POST and can store data
  • Hadoop 2.7.3 for HDFS
  • Spark 2.0 for ETL and number crunching
  • Apache Hive for data access
  • Apache Zeppelin for visualization and exploration

Yes, this is still a Big Data blog, so we will use classical Hadoop-related technologies. But please be advised that this is intended to be a proof of concept for home use and not a corporate architecture for a huge, multi-user scenario. You will see why in just a second.

As I can already see the emails coming in: “Christian, why not just use any of the dozens of apps and startups offering exactly that? You can get an ODB stick with an app for a couple of bucks! And it can do so much more!”. And you know what? Entirely true! But given the scope of this blog, this is a technology blog, first and foremost. Secondly, all data will wind up on your computers and not in any cloud or with a vendor who needs to analyze your data to make some profit.

Architecture

Have a look at this very much simplified architecture diagram:

We will follow 5 major steps:

  1. Collect the data using SensorLog
  2. Transmit the data via your phone’s mobile network (in this case, via VPN)
  3. Accept & store it using node.js with express
  4. Extract, transform, analyze and persist the data using Spark 2.0 with SparkSQL on Hive
  5. Visualize the data using Apache Zeppelin with Spark, Hive and Angular.js

The architecture for this article is batch-driven via Oozie, but you could simply transform the Spark code into a SparkStreaming application to enable a real-time use case.

Further extensions or different tools are possible. You could replace node.js, use SparkStreaming with Kafka, put the data in HBase, have a mlib pipeline for Machine Learning set up – be creative.

Step 1: Collecting data

The first thing we need to define is how we collect data – and there’s an app for that. Bernd Thomas’ SensorLog for iOS collects all available metrics from your iOS device and is even able to send that data via TCP/IP sockets and HTTP, enabling our foundation for collecting data.

!

SensorLog collects 69 different metrics – basically everything your iPhone’s motion and GPS sensors have available. For the scope of this article, we will focus on time, location and speed.

In its standard configuration, it will gather data with a frequency of 30Hz, equal to roughly 1MB/minute in my tests. While that might not seem much, do a bit of number crunching for your driving profile: At 50minutes/day, roughly the average American commute, that would be 50MB a day just for your location profile. If we include the weekends, that’s 1.5GB a month for a single car’s movements. Multiply that by any number of users and add an imaginary overhead for more information from e.g. ODB, you quickly see why the cry for “Big Data” becomes legitimate.

Granted – a modern car will be able collect much more data than that, but we work with what we have.

Given iOS’ aggressive scheduler, the app needs to be running in order to collect data, making the process semi-automatic – you would have to start the app before and leave it open while driving. I’m sure Android has you covered if you need something that can run in the background, but for the sake of simplicity, we will live with it.

Step 2: Transmitting & storing data

In order to receive the data on our backend, you will need an HTTP server that accepts POST requests. I will not go into too much detail for this topic, as there are simply too many options available. Just make sure you limit the accepted IP range and/or a VPN and accept & store the incoming data somewhere. Here, we use CSV, as it is also SensorLog’s default format for storing the data on the phone.

// dependencies

var WebHDFS = require('webhdfs');

var express = require('express');

var fs = require('fs');

// App

var app = express();

// HTTP POST

var app = express();

app.post('/', function(req, res){

console.log('POST /');

console.dir(req.body);

res.writeHead(200, {'Content-Type': 'text/html'});

res.end('END');

// Write to HDFS or FS

fs.writeFile('data.csv', req.body, function (err) {

if (err) return console.log(err);

});

});

For our article, I used node.js with webhdfs and had this running on my Debian-powered NAS, which sits behind an openWRT router, giving me the flexibility I needed for this. Basically, anything from a massive AWS EC2 cluster to a RaspPi would work for accepting data. In the real world, you could actually use node.js – it has proven to be quite scalable.

You won’t be able to use this node.js code - I am still trying to figure out the best practices, so use it as a rough guideline.

Step 3: Analyzing the data

As so often these days, Spark to the rescue. We will use Spark 2.0, as it offers a new API for working with DataFrames and CSV files. The example code is written in Scala, but feel free to use Java, Python or R.

The first step is to initialize our SparkSQL context and build the CSV’s schema using Spark’s org.apache.spark.sql.types.StructType. We use a combination of String and Double values for this. We then load the data from a pre-defined directory on the local file system, HDFS would naturally be preferred for larger amounts of data.

val sc = SparkSession

.builder()

.appName("TinyTelematics")

.master("local[2]")

.config("spark.sql.warehouse.dir", "file:///tmp/spark-warehouse")

.enableHiveSupport()

.getOrCreate()

// Build the schema

// I honestly don't know how to dynamically infer the schema by type - the old spark-csv library from databricks was able to do that as far as I recall...

val schemaStringMeta = "loggingTime loggingSample identifierForVendor deviceID locationTimestamp_since1970"

val schemaStringData = "locationLatitude locationLongitude locationAltitude locationSpeed locationCourse locationVerticalAccuracy locationHorizontalAccuracy locationHeadingTimestamp_since1970 locationHeadingX locationHeadingY locationHeadingZ locationTrueHeading locationMagneticHeading locationHeadingAccuracy accelerometerTimestamp_sinceReboot accelerometerAccelerationX accelerometerAccelerationY accelerometerAccelerationZ gyroTimestamp_sinceReboot gyroRotationX gyroRotationY gyroRotationZ motionTimestamp_sinceReboot motionYaw motionRoll motionPitch motionRotationRateX motionRotationRateY motionRotationRateZ motionUserAccelerationX motionUserAccelerationY motionUserAccelerationZ"

val schemaStringActivityStrings = "motionAttitudeReferenceFrame motionQuaternionX motionQuaternionY motionQuaternionZ motionQuaternionW motionGravityX motionGravityY motionGravityZ motionMagneticFieldX motionMagneticFieldY motionMagneticFieldZ motionMagneticFieldCalibrationAccuracy " +

"activityTimestamp_sinceReboot activity activityActivityConfidence activityActivityStartDate pedometerStartDate pedometerNumberofSteps pedometerDistance pedometerFloorAscended pedometerFloorDescended pedometerEndDate altimeterTimestamp_sinceReboot altimeterReset altimeterRelativeAltitude altimeterPressure IP_en0 IP_pdp_ip0 deviceOrientation batteryState batteryLevel state"

// String

val fieldsMeta = schemaStringMeta.split(" ")

.map(fieldName => StructField(fieldName, StringType, nullable = true))

// Double

val fieldsData = schemaStringData.split(" ")

.map(fieldName => StructField(fieldName, DoubleType, nullable = true))

// String

val fieldsActivity = schemaStringActivityStrings.split(" ")

.map(fieldName => StructField(fieldName, StringType, nullable = true))

val schema = StructType(fieldsMeta ++ fieldsData ++ fieldsActivity)

// Read the input files

val input = sc.read.format("csv").schema(schema)

.load("/Users/christian/Documents/SensorLog/tiny/*.csv")

.filter(col => !col.get(0).equals("loggingTime"))

.dropDuplicates("locationTimestamp_since1970")

Next, we try to cluster our raw data into trips. A trip is defined as a continuous flow of data. If the data collection stops, a new trip segment will be started.

To do that, we use SparkSQL’s windowing function to analyze more than one row per query. The basic idea is to Split the DataFrame when a certain metric’s difference is >= threshold (i.e. x_n - x_n+1 > t , whereas x is the metric, t is the threshold and n is its offset).

One approach for this is to use the timestamp, another would be to check if your current speed remains 0m/s for more than x seconds or a combination of the two.

input.createOrReplaceTempView("input")

var cols = "loggingSample, identifierForVendor, deviceID, locationTimestamp_since1970, locationLatitude, locationLongitude, locationAltitude, locationSpeed, locationCourse"

// The number of rows to lag can optionally be specified. If the number of rows to lag is not specified, the lag is one row.

val lag = 1

val splitBy = "locationTimestamp_since1970"

// Add a column that includes the next row in a window, i.e. the next timestamp

val dfDiffRaw = sc.sql("SELECT "+cols+", " +

"(lag("+splitBy+", "+lag+") " +

"OVER (PARTITION BY identifierForVendor ORDER BY "+splitBy+" ASC)) AS ts1 " +

"FROM input")

// Calculate the timestamp difference to the previous data point

// Convert EPOCH to normal format for Hive partionining

val dfDiff = dfDiffRaw.filter(dfDiffRaw.col("ts1").isNotNull)

.withColumn("diff", dfDiffRaw.col("locationTimestamp_since1970")-dfDiffRaw.col("ts1"))

.withColumn("year", org.apache.spark.sql.functions.from_unixtime(dfDiffRaw.col("locationTimestamp_since1970")).substr(0,4))

.withColumn("month", org.apache.spark.sql.functions.from_unixtime(dfDiffRaw.col("locationTimestamp_since1970")).substr(6,2))

// Cluster the data based on a minimum idle time between data points

// If you did not collect data for x seconds, start a new "trip"

val minTripLengthInS = 60

val tripBy = "diff"

cols = cols + ", year, month"

dfDiff.createOrReplaceTempView("diffInput")

val fullTrips = sc.sql("select "+cols+", diff, segment, " +

" sum(case when "+tripBy+" > "+minTripLengthInS+" then locationTimestamp_since1970 else 0 end) " +

" over (partition by identifierForVendor order by "+splitBy+") as trip" +

" from (select "+cols+",diff," +

" lag("+tripBy+", "+lag+") " +

" over (partition by identifierForVendor order by "+tripBy+" asc) as segment" +

" from diffInput" +

" ) ue")

This adds a trip column, in our case just the first timestamp of the trip.

Last but not least we calculate certain KPIs that might be interesting later on. More specifically, we want to see the following KPIs per trip:

  • Top speed
  • Average speed
  • Points where we went over the speed limit

For the latter, we abstract the concept and define an absolute speed limit – for Georgia and Florida (where I collected my sample data), that would be 70mph (or 31.3m/s). In a real life scenario, you would want to think about matching the GPS location to a map provider that offers specific speed limits – because 70mph in a 30mph zone is not a good idea.

val trips = fullTrips.groupBy("trip")

// Calculate KPIs

val avgSpeeds = trips.avg("locationSpeed").withColumnRenamed("avg(locationSpeed)", "avgSpeed")

val topSpeeds = trips.max("locationSpeed").withColumnRenamed("max(locationSpeed)", "topSpeed")

val result = fullTrips.join(avgSpeeds, "trip").join(topSpeeds, "trip")

// Save

result.createOrReplaceTempView("result")

result.write.format("parquet").mode(SaveMode.Append).saveAsTable("telematics")

Top and average speed are simple – group the result by trip, use avg(1) and max(1) and join the result with the original DataFrame on column “trip”. The result winds up in a Hive table called “telematics”.

Accessing the data

For Hive, make sure you have Hiveserver2 running and not just a local Hive with derby, as we will be using JDBC for our visualization. Verify that setup with

mysql.server start

hiveserver2

beeline

beeline> !connect jdbc:hive2://localhost:10000

Beeline Sidenode: If you get something along the lines of “Plugin ‘*4DF1D66463C18D44E3B001A8FB1BBFBEA13E27FC’ is not loaded” - restore the hive user in mysql. Setting up Hiveserver2 on Mac is not fun.

Visualizing the data

First off, I tried using R with shiny, but failed. See my StackOverflow post here. An alternative is Tableau, as it can use JDBC as of version 10, but that might a bit pricey for a small PoC on your local machine.

Example trips in Georgia

For the sake of simplicity, we will use Apache Zeppelin. It is standard in Hortonworks HDP, but also runs fine as standalone. To configure Zeppelin, refer to your environment. For me, I had to manually configure JDBC to use the hive driver and reference said jar in ZEPPELIN_CLASSPATH via the zeppelin-deamon.sh file and some other details.

We use this angular.js example to visualize the data on a map.

In order to do that, we pull the data back from Hive using SparkSQL on scala directly in Zeppelin and map the result to a “trip” object which we bind to the global scope using z.angularBind(2).

%spark

sc.hadoopConfiguration.set("fs.defaultFS","hdfs://localhost:9000")

val data = sqlContext.sql("select * from telematics")

val fullTrips = data.collect

case class trip(id: String, ts: String, speed: Double, lat: Double, long: Double)

val items = fullTrips.zipWithIndex.filter(_._2%10==0).map(_._1)

val result = items.map(r => trip(

r.get(0).toString,

r.get(4).toString,

r.get(7).toString.toDouble,

r.get(5).toString.toDouble,

r.get(6).toString.toDouble

))

z.angularBind("locations", result)

Since the map is rendered on the client, we filter every nth result before actually showing the result:

Example trip in Atlanta, GA with Spark

As we can see in this example, I went around 70mph on 01/08/17.

We can also use Zeppelin to analyze our data with any other form of SQL and visualization. For instance, this example, we took all trips currently in the system and plotted them against their top- and average speeds.

Average and top speeds by trip

There’s more to be done here, of course - a fun exercise would be to link my last.fm profile (which is connected to Spotify) with my top speeds. Now is Watain making me floor it? Or does a CCR playlist have the same effect?

Conclusion

This is a very simple showcase. It uses a variety of technologies to bring a bit of connectivity to my old Crown Vic, but is it worth it?

Absolutely not. Sure - it is a cool proof of concept for me to have a look on how I drive where, to explore the data we can generate using and iPhone and to find out that there are a couple of 1 mile trips to Popeye’s (which maybe, just maybe, one could avoid). But after all, it is just one car with location and speed data.

But: As soon as we extend this concept by collecting actual ODB data (see the kickstarter link above!), link more data to it or add more users, we have a very interesting concept on our hands. The more actual insights we are able to collect, the more we can learn. How to my car’s sensor metrics relate to the driving style? What is my driving profile, how does it relate to that of my friends and their cars? Is there open maintenance, should I schedule it, has it anything to do with how I drive? If we can we enrich the GPS data with speedlimits, how much would the police like my driving style?

The foundation is there. We can collect, send, format, store and analyze data. Adding just more data to it is trivial (given we are able to extract it).

But for now - a fun little project and maybe a foundation for more.

The full source is available on GitHub.