Who wants vegan chili?!

First comes the sea bass line. About eight people deep. When I finally get my turn at the huge slab of succulent flesh I hear the server utter that dreaded word: butter. I used to love butter — and I…

Smartphone

独家优惠奖金 100% 高达 1 BTC + 180 免费旋转




How to validate the number of fields in a CSV file with Akka Stream and Alpakka CSV

In this article, we’re going to focus on CSV (Comma-Separated Values) data and show how to validate the number of fields of each row.

CSV stands for comma-separated values and it’s a type of file that allows data to be saved in a tabular format. CSVs are plain-text files, easier to create and process than spreadsheets. In fact, they only have one single sheet and don’t allows formulae.

If you feel you’re unfamiliar with Akka Streams, Sources, Sinks, and Flows, I recommend reading this article for a great introduction to the topic: Introduction to Akka Streams.

In this section we’re going to use Akka Streams to read the content of a CSV file, obtaining, for each row, a Map[String, String], where the keys represent the headers (i.e. the first row in the file) and the values represent the actual values at a given row. While the predefined connectors allow us to create this Map, they don’t help us performing basic checks on the number of fields.

Let’s start from the code and see how it behaves.

Line 1 simply creates a basic CSV file with three columns and two rows. Line 8 then turns this string into a Source[ByteString, NotUsed], where the NotUsed simply means the source doesn’t materialize any values, it just produces a stream of ByteString. We then process such stream of ByteString using two built-in flows (that can be thought of as functions). First, we split it into lines, using \n as a line separator, with lineScanner(). Secondly, we turn each line into a Map[String, String], using toMapAsStrings(). We then materialize the stream running it with a function that just prints each line (represented as a map). The runWith() method returns a Future[Seq[Map[String, String]]] that is completed when the stream ends.

If run, this simple program prints out the following output:

This code suffers from a problem though: it works correctly only if the number of fields for each line matches the number of columns (defined as the number of values in the first row).

Let’s see what it prints with the following CSVs (fewer and more fields than headers, respectively):

The two outputs are as follows:

and

As you see, in the former case the third column is just discarded and so is the additional value in the first row in the latter output. This is a safe approach, but sometimes it’s necessary to stop reading the file if the number of fields is not consistent for each row.

Let’s how we can solve this problem by implementing a custom Flow, whose type will be: Flow[List[ByteString], Either[WrongNumberOfFields, Map[String, String]], NotUSed]. What this type means is we have a function that inputs a List[ByteString] (i.e. the list of all the rows in the CSV) and outputs an Either[WrongNumberOfFields, Map[String, String]]. WrongNumberOfFields is a custom type we may want to raise an instance of if the number of fields is inconsistent across the rows. Map[String, String], on the other hand, represents a map whose keys are the headers of the CSV and whose values are the values at a given row. In brief, this Flow takes a list of rows and produces downstream either one map for each row or some information about the error.

Let’s take a look at the code:

prefixAndTail(1) is used as a first step to split the list into two parts: the headers and all the rows. It gives us a Seq[List[ByteString]], on which we invoke flatMapConcat, to transform each input element into a Source of output elements that is going to be flattened by concatenation. Using the shortcut-syntax case(hs, rows) we then extract headers and rows from the Seq. Another step is needed before getting to the validation logic: WrongNumberOfFields also needs the line where the error occurred, so we need to invoke zipWithIndex on the rows. This gives us a (List[ByteString], Long), where the first element of the tuple is a row and the second is the row number in the file (minus 1, as indexes start from 0). These two lines do pretty much all the dirty job. The rest of the function simply introduces some handful local variables to make the code more readable and checks (for each row) that the number of expected and actual fields are equal (Line 11). If so we create a Map[String, String] by zipping headers and rows after stringifying both of them. On the other hand, if the row contains fewer or more fields than expected we return an instance of WrongNumberOfFields.

Let’s see how this Flow behaves with the three CSVs above:

It’s then up to the next stage in the computation to take proper actions, such as throwing an exception to stop the stream.

In this post we saw how to implement a custom Flow to be used together with Alpakka CSV’s built-in functions. What we implemented simply checks that the number of fields is consistent across the rows, but there’s no limit to the validations we can perform during this stage.

Add a comment

Related posts:

Does Team Building Actually Work?

Team building has become an intrinsic part of corporate cultures all over the world. The principle is that teams can become more cohesive if they spend time together outside the work environment…

Risk Retention vs Risk Avoidance

Do you recall the risk management technique options available to financial institutions who interact with different level of risk in their operations? Let’s build a recap as a foundation for today’s…

Single Responsible Principle

This ensures that the class behavior can be extended. As requirements change, we should be able to make a class behave in new and different ways, to meet the needs of the new requirements. In our…