Paul,

> What it does is it basically runs a local-mode (in-process) spark to read the log.

That's unfortunately not particularly scalable unless I'm missing something. I think the easiest path to accomplish this would be to build a NiFi flow that generates Parquet files, uploads them into a S3 bucket and then periodically run a Spark job to read the entire bucket and merge the files into the table. I've done something simple like this in a personal experiment. It's particularly easy now that we have Record API components that will directly generate Parquet output files from an Avro schema and a record set.

On Tue, Mar 31, 2020 at 11:08 AM Paul Parker <nifi.surfer@gmail.com> wrote:
Let me share answers from the delta community:

Answer to Q1: 
Structured streaming queries can do commits every minute, even every 20-30 seconds. This definitely creates small files. But that is okay, because it is expected that people will periodically compact the files. The same timing should work fine for Nifi and any other streaming engine. It does create 1000-ish versions per day, but that is okay.

Answer to Q2:
That is up to the sink implementation. Both are okay. In fact, it probably can be combination of both, as long as we dont commit every second. That may not scale well.

Answer to Q3:
You need a primary node which is responsible for managing the Delta table. That note would be responsible for reading the log, parsing it, updating it, etc. Unfortunately, we have no good non-spark way to read the log, and much less write to the log.
there is an experimental uber jar that tries to package delta + spark together into a single jar ... using which you could read the log. its available here - https://github.com/delta-io/connectors/

What it does is it basically runs a local-mode (in-process) spark to read the log. This what we are using to build a hive connector, that will allow hive to read delta files. Now the goal of that was to only read. your goal is to write which is definitely more complicated, because for that you have to do much more. Now that uber jar has all the necessary code to do the writing. ... which you could use. but there has to be a driver node which has to collect all the parquet files written by other nodes and atomically commit those parquet files to the Delta log to make them visible to all readers.

Does the first orientation help?


Mike Thomsen <mikerthomsen@gmail.com> schrieb am So., 29. März 2020, 20:29:
It looks like a lot of their connectors rely on external management by Spark. That is true of Hive, and also of Athena/Presto unless I misread the documentation. Read some of the fine print near the bottom of this to get an idea of what I mean:


Hypothetically, we could build a connector for NiFi, but there are some things about the design of Delta Lake that I am not sure about based on my own research and what not. Roughly, they are the following:

1. What is a good timing strategy for doing commits to Delta Lake?
2. What would trigger a commit in the first place?
3. Is there a good way to trigger commits that would work within the masterless cluster design of clustered NiFi instead of requiring a special "primary node only" processor for executing commits?

Based on my experimentation, one of the biggest questions around the first point is do you really want potentially thousands or tens of thousands of time shift events to be created throughout the day? A record processor that reads in a ton of small record sets and injects that into the Delta Lake would create a ton of these checkpoints, and they'd be largely meaningless to people trying to make sense of them for the purpose of going back and forth in time between versions.

Do we trigger a commit per record set or set a timer?

Most of us on the NiFi dev side have no real experience here. It would be helpful for us to get some ideas to form use cases from the community because there are some big gaps on how we'd even start to shape the requirements.

On Sun, Mar 29, 2020 at 1:28 PM Paul Parker <nifi.surfer@gmail.com> wrote:
Hi Mike,
your alternate suggestion sounds good. But how does it work if I want to keep this running continuously? In other words, the delta table should be continuously updated. Finally, this is one of the biggest advantages of Delta: you can ingest batch and streaming data into one table.

I also think about workarounds (Use Athena, Presto or Redshift with Nifi):
"Here is the list of integrations that enable you to access Delta tables from external data processing engines.
Source:

I am looking forward to further ideas from the community.

Mike Thomsen <mikerthomsen@gmail.com> schrieb am So., 29. März 2020, 17:23:
I think there is a connector for Hive that works with Delta. You could try setting up Hive to work with Delta and then using NiFi to feed Hive. Alternatively, you can simply convert the results of the JDBC query into a Parquet file, push to a desired location and run a Spark job to convert from Parquet to Delta (that should be pretty fast because Delta is basically a fork of Parquet).

On Fri, Mar 27, 2020 at 2:13 PM Paul Parker <nifi.surfer@gmail.com> wrote:
We read data via JDBC from a database and want to save the results as a delta table and then read them again. How can I realize this with Nifi and Hive or Glue Metastore?