[spark Series 1] delta.io What problems have been solved

Posted on

This article is reproduced fromhttps://mp.weixin.qq.com/s/ZN…

On October 16, 2019, at the spark + AI European summit in Amsterdam, the Netherlands, databricks officially announced that delta lake was donated to the Linux foundation, which became an official project of the foundation. We expect that this year (2019) or in the future, very soon, delta lake will become the mainstream of the data lake or the de facto standard.

Delta lake is also on the list of the best open source software awards for 2019 released in September. As described in the official award comments on Delta lake, we are all surprised that the company has opened up the core product of delta lake. Delta lake is actually launched to solve many pain points of spark as a big data analysis platform, and it is believed that it will benefit the whole spark community and other big data communities, and truly solve various key problems of data lake management.

Fortunately, I participated in the early development of delta lake, especially the design and implementation of key DML such as merge, update and delete. The project was first launched in June 2017. At that time, many users complained to us about the lack of spark and the inconvenience of using it. After discussion, our engineers found that it was time to propose our own storage architecture. Spark is a computing engine that separates storage and computing. Before, we mainly relied on other open source or non open source projects to solve various data storage problems. But in fact, we found that in the user’s production environment, the existing storage solutions can not really solve the data lake. So, we try to develop with our customers to solve the problems in the actual production environment. After four months of rapid development, we officially announced the birth of delta Lake products in October 2017. At the spark + AI summit in June of the next year, Apple engineers and our engineer Michael gave a keynote speech and shared some valuable experience of apple in using delta lake. For example, they used delta lake to solve the problem of reading and writing large tables of trillion level data.

After summit, we have been praised by many parties. At present, more than 3000 customers are using delta Lake in their production environment. In this context, we think we should extend it to the whole spark community to help the whole big data community solve their big data management pain points. So, in April 2019, we decided to open source.

But after the open source, spark community has a lot of feedback that delta lake is a GitHub repository of your company. You may change the open source license at any time in the future, and your open source management mode is not very transparent. So, in order to solve this problem, we decided to donate it to the Linux foundation to make it a standard open platform for more people to participate in the development and use of delta lake.

Today, we will share some typical scenarios, why delta lake can solve all kinds of pain points, and then we will also share the basic principle and delta architecture of delta lake, and how it can replace the lambda architecture that is widely used.

Tangle of Data Engineer and disorder of operation and maintenance
Project managers always tell engineers that we have a very simple requirement. However, the fact is that these simple requirements are quite difficult to achieve.

In the production system of big data, often, as an engineer, you will face such a project manager: “I want to have such a data Pipeline is used to process data continuously and incrementally. As long as new data comes, it should be processed. Instead of reprocessing all historical data every time, it should only process incremental data and ensure high efficiency and speed. Remember, we can’t let users realize whether this is batch processing or stream processing. In short, it’s about getting the right results quickly. “

As a data engineer, you need to build a basic data pipeline. According to the project manager, it’s very simple. We use spark to read the formats of Kafka, kinesis and various data lakes, and then use spark to do some data cleaning and data conversion, and then save the results to a data lake, and then use another spark job to analyze the content of the data lake, do training or do various analysis, and finally generate a report to the end user. This is a very simple pipeline. But this pipeline has a headache. If you only use Spark’s batch processing, the delay may not meet the standard, and you are not doing incremental processing.

So the second solution is to use spark structured streaming. Structured streaming has trigger once, which can help you record where you were last processed. In this way, you can reduce the delay and only process the increment. You don’t need to record and manage where you were last processed. However, we encounter a new problem, that is, if you use structured streaming, each small batch will generate multiple small spark result files. With more and more small files, the whole pipeline becomes slower and slower, and the delay is often unacceptable in the end.

For this reason, we have to choose the next plan. Since we have small files, we have to compress them regularly. But in the process of compression, the whole line will go offline. Why? Due to the lack of atomic read-write ability, it is impossible to read data while writing your compression. Too long compression cycle will also affect the timeliness of your final report. For example, business can’t accept half an hour or an hour of delay. Then, at this time, you will naturally choose the most classic architecture, lambda architecture. That is to say, you can deploy a batch and a stream at the same time. The batch can be slower, but the results are comprehensive and accurate. Stream processing is to use the fastest time to generate results for the latest increment. Then the batch and flow results are aggregated to produce a global result.

But this kind of lambda architecture needs to operate two different pipelines at the same time, and the consumption of additional resources is also greatly increased, and the cost of human and resources is greatly increased.

And we need to verify these two pipelines. Especially when the data comes from unstructured data sources, the data is not particularly clean and consistent.

For the errors found in verification, we don’t want to shut down the pipeline, but want it to be repaired automatically. Then, one solution is to avoid modifying the whole table, but to reprocess some partitions. Data reprocessing generally affects the delay of your entire pipeline, and further increases the load of hardware resources and the complexity of the pipeline.

After that, there may be some business adjustments or many reasons. You may want to update and merge the data lake. Since the current data lake does not support update and delete, you may need to implement update and merge yourself. We found that the implementation methods of different users are not the same, they are just different. These solutions are not only error prone, but also have high complexity and delay, and most of them are not universal.

Complexity is complex, but after half a year of research and development, the scheme can finally be online, which should be a happy thing. However, after the lambda architecture goes online, you will receive numerous complaints. For example, your data loading is too slow. When we do some metadata operations, other parallel commands and queries are useless and blocked. We have to wait for the big data to be loaded or the metadata to be processed before we can do anything else. Or when users change the data lake to update, they will get a lot of reports that filenotfound. Maybe your file address has been updated, but the metadata buffer has not been updated. If you can’t find the file, you need to refresh the cache. But sometimes customers will complain that refresh doesn’t seem to work, but when will it work? If you use the object store, analysis to the end, may find that it is the problem of eventual consistency, maybe you have to wait half an hour to see this file All in all, it’s all kinds of mistakes.

The operation and maintenance is not easy. It’s too urgent. This lambda architecture is expensive and laborious. It wastes a lot of time to solve various deficiencies and limitations of the system instead of taking time to extract value from the data. It’s really not worth the loss.

But let’s look at it in reverse. In the beginning, the first plan was actually very simple and beautiful. So what’s wrong with it? What makes it so complicated? What are we missing? How can we simplify to create a simple and maintainable architecture?

Here we list five reasons

1) First, to support simultaneous reading and writing means that you can still read while writing, and you should not read a wrong result. At the same time, it can support multiple writes and ensure the consistency of data;

2) Second, it can read data from large tables with high throughput. Big data solutions can’t have many restrictions. For example, I heard that some solutions can only support several concurrent reads at most, or read too many files to allow you to submit your homework. If so, for the business side, your whole design does not meet his needs;

3) Third, errors are inevitable. You need to be able to support rollback, redo, or delete or modify the result. You can’t ask the business party to adjust the business logic in order to support deletion or modification;

4) Fourth, when the business logic is changed again, the data should be reprocessed. At this time, the business cannot be offline. Before the data is reprocessed, the data of the data lake should be accessible all the time;

5) Fifthly, due to many reasons, the data may arrive late. You should be able to handle the late data without delaying the next stage of data processing.

Based on the above five points, we have produced a new architecture based on Delta lake and structured streaming, which is called delta architecture. It is a kind of subversion of lambda architecture, or a kind of promotion.

In delta architecture, batch streams are merged, and data processing should be carried out continuously. Historical data should be reprocessed on demand, and computing or storage resources should be expanded flexibly on demand by using the characteristics of public or private cloud.

The basic principle of delta Lake

The basic principle of delta lake is very simple. It’s extremely simple. As an ordinary partition, it is generally partition directories plus some data files. Delta lake is also based on this structure. The only difference is that it has a transaction log to record your table version and change history.

Now, let’s take a fresh look at what constitutes a table. Tables are actually the result of a bunch of operations, such as changing metadata, changing names, changing schemas, adding or deleting partitions, and adding or removing files. The current state or result of all tables is the result of this series of actions. This result contains the current metadata, file list, transaction history, and version information.

So how to achieve this atomicity? It’s also very simple, as long as the order and atomicity of commit file are guaranteed.

For example, in the first version of the table, two files are added. In the second version, these two files are deleted and a new file is added. As a reader, you can only see the result of the current commit each time.

How to achieve the concurrency of multiple writes? Generally speaking, the pipeline of spark is high concurrency read and low concurrency write. In this case, optimistic concurrency is more appropriate. In fact, it’s very simple. When multiple users read it, first record what the current version of data is. If there are two people committing at the same time, only one party can succeed, and the other party needs to check whether the successful party has touched the file he read in the previous commit. If there is no change, he will just change the file name. If there is, he will have to redo it. This can be done automatically by delta lake, or by the transaction provider / business party.

Another classic problem delta Lake needs to solve is the processing of large-scale metadata. You find that you have a large number of commit log files, because each commit will generate a file, which is also a classic small file processing. How to solve this problem? The standard answer is to use spark. Delta lake uses spark to process its metadata. For example, in the example just mentioned, two files are added, two files are subtracted, and then a parquet is added. After that, spark will read all the commit and generate a new one, which we call checkpoint.

That’s Delta lake. That’s it.

Delta architecture
Introduction to delta architecture
Let’s take a look at Delta architecture and how to replace the classic lambda architecture with delta architecture.

1) First, read and write at the same time, and ensure the consistency of data

Just now our first requirement is to support translation. That is to say, as long as you can realize snapshot isolation between reading and writing, you can focus on your data flow without worrying about whether you will read part of the result or not. You don’t have to worry about errors like filenot found. Delta lake can handle all these things for you.

Delta Lake provides streams, that is, the reading and writing of streaming and batch. The standard API is easy to implement and use. You can find the specific API in the document.

2) It can read data from large tables with high throughput

Maybe the students who have dealt with big data have encountered this classic pain point. I have also dealt with this kind of problem of customers for many times. Without delta lake, I can hardly bear to live.

If there is no delta lake, it is necessary to read the location path of a million level patent line by line with hive metadata. It is extremely slow to get a million lines. Then, in the address of each patent, all the files contained in the file system column need to be passed. In the object storage system, this kind of operation is also expensive and slow.

In fact, isn’t this a typical big data problem? Big data systems can’t solve the big data problem. Isn’t that a joke?

Of course, the solution here is very simple, that is, standard spark uses parquet to store the file path, and uses Spark’s distributed vectorized read in to read. This is how delta Lake solves the previous pain point. Because of this performance, our customers can easily improve hundreds or even thousands of times. In fact, the list file operation of hive metadata and file system is too slow.

3) Supports rollback and deletion

The data is so dirty that it is difficult to avoid the need of rollback and deletion. Delta Lake provides time travel, because the transaction log can actually see the results of the whole historical change, so it is very convenient for delta lake to implement this. We provide two APIs, which you can do based on timestamp or version number. Time travel is a very good function. It can do a lot of things, not only to correct errors, but also to debug, rebuild past reports, audit, audit, complex temporary query, and do version query on tables that quickly update data

Delta lake also supports update / delete / merge, but currently delta does not have its own SQL syntax. Of course, we can completely copy the syntax of spark, but the maintenance cost is also very high. But when spark 3.0 came, the problem was solved. Of course, if you want to support spark 2.4, delta needs to add its own SQL parser. We are still discussing whether to do this.

4) Historical data can be reprocessed while online business is not offline

As long as you delete the relevant results of delta lake, change the business logic again, and batch process the historical data, you can get your latest results. At the same time, because delta Lake supports acid, the downstream application Party of the data can also access the data of the previous version at the same time.

5) Processing late data without postponing the next stage of data processing

It’s not a problem to deal with late data. As long as you can support merge, update if it exists and insert if it doesn’t exist, it doesn’t affect your existing delta Lake rewriting.

As mentioned above, delta Lake perfectly meets our needs, making your data pipeline simple and elegant again, instead of using such a complex lambda architecture.

How to use delta architecture best? Based on various discussions with customers, we have summarized the following points.

You need delta lake with multiple stages. Our basic idea is this: the first stage is to ensure that there is no loss of original data. It’s stored in delta lake. If some important information is lost due to previous data cleaning, you can easily recover it. The second stage is to do data cleaning, do some cleaning, conversion, filter. Then we can really reach a third stage that can be analyzed by data. This is based on data quality divided into multiple levels, multiple states. As for how many stages are needed in the actual production line, it depends on the complexity of the business, SLA, and delay requirements.

Characteristics of delta architecture
Take a look at the features of delta architecture.

1) Continuous data flow

It sounds tall, but it’s easy to understand with a little more explanation.

Batch flow merging. Streaming and batch use the same engine, so there is no need to maintain more than one; the same set of API, even without batch API, can use streaming API to solve problems; the same user code, without lambda architecture, is just a pipeline to solve all problems. Efficient incremental data loading. If there are new data coming in, you can directly use structured streaming Trigger.Once To record where you dealt with last time, you just need to restart this Trigger.Once It is very convenient to process the new data after the last time. Fast and delay free stream processing, you can choose different trigger modes, of course Trigger.Once The most cost-effective, of course, you can also use low latency, such as how often trigger once, or you can use continuous trigger with low latency. You can turn batch processing into a continuous stream processing, which is easy to use. Moreover, because delta Lake supports atomicity, it can guarantee exactly only, which is very important. Other data sources can’t guarantee it.

2) Physicochemical intermediate results

This is a bit of a subversion of the traditional model. We suggest that you materialize your intermediate results many times, that is to say, multiple stages. Each stage is to store the intermediate results in a file. It has the following advantages.

Fault tolerant recovery. When something goes wrong, you can go back to a certain version. From then on, you don’t need to start with the most original data. This is very important in pipeline. It’s convenient for troubleshooting. You know which step is wrong. If it doesn’t exist, you don’t know where the problem is when the business report is wrong. You can’t even debug it, and you can’t trace it back. When you have a lot of complex pipelines, you may reuse some results in the middle, which is really convenient. For example, the two pipelines in the legend are the same before T3. We can reuse it.

If your transformation is complex, you can materialize it many times. How many times you materialize depends on your choice between reliability / SLA and end-2-end latency. If you have a good reliability / SLA, you have to materialize more times, but there must be a price to write, so end-2-end latency is slow. The details depend on your needs.

3) Trade off between cost and delay

Flow processing, continuous data inflow and processing, no job scheduling management, need always online cluster. Frequent batch processing and minute level data inflow and processing do not require low latency, such as half an hour. Warm pool of machine is required to shut down and start on demand. You can use spark structured streaming Trigger.Once pattern. Non frequent batch processing, several hours or days of data batch inflow and processing, no shutdown, on-demand start, can also use structured streaming Trigger.Once pattern. In this way, a lot of resources can be saved.

4) Optimize the physical storage of data

According to the predict of common query, in order to improve the reading speed, the physical storage of data can be optimized. For example, using partitioning and z-ordering. Partitioning everyone should be very clear. The column of low cardinality is more appropriate, that is, each partition should not exceed 1 GB. Generally, for example, date is a commonly used partition column, and each date should be given a different eventtype. In this way, each partition will not be too large and will not generate too many partitions. On the other hand, if you use timestamp as a partition column, you can generate countless partition values, which is just wonderful. You can easily make hive metadata pop up. We don’t recommend it in delta lake, even if we don’t use Metastore. The second is z-ordering, which has not yet reached the open source version, but what can be solved? It is aimed at high cardinality, that is, there are a lot of different values in the column, which is suitable for z-ordering index.

5) Reprocessing historical data

What are the advantages of keeping one stage at a time? You delete the result and use it again Tigger.Once Just do it again, and the result will come out. If your system is deployed on the cloud, it’s also very simple for you. If you want to quickly backfill, you need to add a few more machines, and the result will come out faster. For example, expand from the original 10 machines to 100.

6) Data quality adjustment

This is also a place where we need to change our way of thinking.

In the beginning, we’d better ensure data integrity. Schema can choose to merge automatically to avoid data loss. In the final stage, we need to force schema not to change, data type not to change, and data expectation not to change. For example, you can’t have null. Data quality is crucial to the accuracy of data analysis.

The above characteristics are not difficult to understand, but need to change the way of thinking.

Advantages of delta architecture
1) Reduce end-to-end pipeline SLA multiple users (customers) reduce data pipeline SLA from hours to minutes.

2) To reduce the maintenance cost of pipeline, the original lambda architecture is time-consuming and laborious. The delta Lake architecture does not need to be so complex to achieve the same minute level use case delay.

3) It is easier to deal with data update and deletion, simplifying change data capture, gdpr, session, and data De redundancy. These can be realized with delta lake, which is much more convenient.

4) The cost of infrastructure is reduced through the separation and flexibility of calculation and storage. Multiple users reduce the cost of infrastructure by more than ten times.

The classic case of delta architecture
Here are three classic schemes of delta architecture.

The first is Comcast, a communication company like China Mobile, which collects massive user data in the United States. Delta lake is used in its petabyte scale jobs, which is reduced from 640 servers to 64, from 84 jobs to 34 jobs, and the delay is also reduced by half.

The second one is Sam’s club, which also uses delta lake. In the past, data consistency could not be achieved at all, but now it can be achieved. The delay dropped from one hour to six seconds.

The third is healthdirect in Australia. The data is cleaner and more consistent. The accuracy of data analysis and matching has increased from 80% to 95%, and the time consumption of data loading has decreased from one day to 20 minutes.

These are all cases shared by delta Lake users on spark summit.

It’s very easy to use delta lake. Just change the keywords of parquet.

How to add delta lake? Just add the package. See demo demo: Delta Lake primer for details(https://dbricks.co/dlw-01)。

Delta Community
Delta Lake iteration is very fast. We actually have a large number of features inside, but there is no open source. We will gradually open source and strengthen R & D.

Alibaba’s team is helping delta lake, so hive can read the data in delta lake.

Delta Lake’s community development is really fast. Since April 2019, it has been open source. At present, it has 3700 customers. The download volume of Maven is fast 20000. Our own customers have used more than 2 Exabyte to read and write.

Welcome to create your own delta lake and join the delta Lake community,https://delta.io/There is a slack channel. You can see all kinds of problems. Our engineers and experts are actively responding to all kinds of problems.

Leave a Reply

Your email address will not be published.