Feature FileOps Streaming - Gadreel/divconq GitHub Wiki

Background

Often when performing operations on a file or collection of files it is the case that you want to perform multiple operations on the file. Traditionally this involves creating one or more temporary files for each operation. This is not only potentially slow, it can use up a lot of disk space. When possible we'd like to stream (pipe) these operations in memory.

Consider the following example. We have PGP encrypted, tared and split files coming in as part of a deposit. The initial deposit contains:

/uploads/Deposit_2014-09-06_1.tar.enc
/uploads/Deposit_2014-09-06_2.tar.enc
/uploads/Deposit_2014-09-06_3.tar.enc

We need to combine the files to:

/temp/Deposit_2014-09-06.tar.enc

Then we need to decrypt to:

/temp/Deposit_2014-09-06.tar

And finally expand the files into a folder.

/deposits/2014-09-06/File_A.txt
/deposits/2014-09-06/File_B.txt
/deposits/2014-09-06/File_C.txt

dcScript provides a concise way to describe the task so let's look at the code for this using dcScript. Assume some trigger has started our script and asked that we process the deposit for 2014-09-06. First we need to collect the files:

<LocalFolder Name="Uploads" Path="/uploads" />

<SelectFiles Name="SplitDeposit" In="$Uploads" Sort="Match" SortAs="Number">
	<NameFilter Pattern="Deposit_2014-09-06_(\d+).tar.enc" />
</SelectFiles>

At this point nothing has really happened other than a variable - SplitDeposit - has been defined. SplitDeposit will be the original source in our stream. Now we need to combine the files.

The Verbose Way

If we want to use the "temp file" approach we still can by providing a file for the destination:

<TempFile Name="CombinedDeposit" />

<FileOps>
	<Join Source="$SplitDeposit" Dest="$CombinedDeposit" />
</FileOps>

The Stream Way

The stream approach is very similar, just provide a name for the next step in the stream:

<FileOps>
	<Join Name="CombinedDeposit" Source="$SplitDeposit" />
</FileOps>

The difference is that the verbose approach is executed there and then, after Join there is a file in the temp folder. With streaming still nothing has happened except we have defined another variable - CombinedDeposit - which is a step in our stream.

Now we need to decrypt the combined file. Assume we already have a keyring defined as TheKeyring.

The Verbose Way

If we want to use the "temp file" approach here we still can by providing a file for the destination:

<TempFile Name="DecryptedDeposit" />

<FileOps>
	<PGPDecrypt Source="$CombinedDeposit" Dest="$DecryptedDeposit" Keyring="$TheKeyring" />
</FileOps>

The Stream Way

The stream approach is very similar, just provide a name for the next step in the stream:

<FileOps>
	<PGPDecrypt Name="DecryptedDeposit" Source="$CombinedDeposit" Keyring="$TheKeyring" />
</FileOps>

Again, the difference is that the verbose approach is executed there and then, after PGPDecrypt there is a file in the temp folder. With streaming still nothing has happened except we have defined another variable - DecryptedDeposit - which is a step in our stream.

Finally we want to untar the deposit. We need a folder for that and then Untar is given a Dest(ination):

<LocalFolder Name="DepositDest" Path="/deposits/2014-09-06" />

<FileOps>
	<Untar Source="$DecryptedDeposit" Dest="$DepositDest" />
</FileOps>

This is the same for verbose or stream. With stream it sees that we now have a final dest - not just another handler in a chain - and so we now know to run the stream. A "send" message is sent up the stream chain that causes the original source to start pumping data through the chain.

Complete Examples

To easily compare the two approaches here is the full code for each.

Compelete Verbose Way Source

<Var Name="Date" SetTo="2014-09-06" />

<LocalFolder Name="Uploads" Path="/uploads" />

<SelectFiles Name="SplitDeposit" In="$Uploads" Sort="Match" SortAs="Number">
	<NameFilter Pattern="Deposit_{$Date}_(\d+).tar.enc" />
</SelectFiles>

<TempFile Name="CombinedDeposit" />
<TempFile Name="DecryptedDeposit" />
<LocalFolder Name="DepositDest" Path="/deposits/{$Date}" />

<FileOps>
	<Join Source="$SplitDeposit" Dest="$CombinedDeposit" />
	<PGPDecrypt Source="$CombinedDeposit" Dest="$DecryptedDeposit" Keyring="$TheKeyring" />
	<Untar Source="$DecryptedDeposit" Dest="$DepositDest" />
</FileOps>

Compelete Stream Way Source

<Var Name="Date" SetTo="2014-09-06" />

<LocalFolder Name="Uploads" Path="/uploads" />

<SelectFiles Name="SplitDeposit" In="$Uploads" Sort="Match" SortAs="Number">
	<NameFilter Pattern="Deposit_{$Date}_(\d+).tar.enc" />
</SelectFiles>

<LocalFolder Name="DepositDest" Path="/deposits/{$Date}" />

<FileOps>
	<Join Name="CombinedDeposit" Source="#SplitDeposit" />
	<PGPDecrypt Name="DecryptedDeposit" Source="$CombinedDeposit" Keyring="$TheKeyring" />
	<Untar Source="$DecryptedDeposit" Dest="$DepositDest" />
</FileOps>

Limitations

DivConq makes it possible for you to use different [file op implementations] (Feature FileOps Implementations). Some implementations such as OpenGPG commandline calls will not support streaming technically. But this does not matter, you can still pass these implementations streams instead of files, behind the scenes they will make a copy of the file and then clean it up.

The good news is YOU CAN ALWAYS USE STREAMS. So don't bother with those pesky temp files when you can use streams.

Progress and Logging

TODO: Source keeps track of the progress to - so we can actually produce % done in the script by having a smart selector...

Inspiration

One inspiration I had for the streaming ops was this [article] (java.dzone.com/articles/io-files-arent-files) which happens to be issue #1.

The example from that article uses Named Pipes to enable streaming. Here is the example code:

function processFile {
	encryptedFile=$(basename ${1})
	decryptedFile=${encryptedFile%.enc}

	mkfifo ${encryptedFile}
	mkfifo ${decryptedFile}

	hdfs dfs -cat ${1} > ${encryptedFile} &
	cat ${decryptedFile} | awk '{ if ( $5 == someValue) print $0}' &

	/somepath/decryptFile.sh ${encryptedFile} ${decryptedFile}
	rm ${encryptedFile} ${decryptedFile}
}

Basically this example is grabbing an encrypted file from HDFS, decrypting the file and then searching the file for some value in the fifth column. If a match is made the line is printed.

dcScript can provide similar functionality:

<HdfsFileStore Name="HadoopDeposits" RootPath="/lists" Connection="..." />

<SelectFiles Name="MySeptLists" In="$HadoopDeposits">
	<NameFilter Pattern="List_2014-09-\d{2}.txt.enc" />
</SelectFiles>

<FileOps>
	<PGPDecrypt Name="DecryptedLists" Source="$MySeptLists" Keyring="$TheKeyring" />
	<TextReader Name="TextRdr" Source="$DecryptedLists" />
</FileOps>

<ForEach Name="Line" In="$TextRdr">
	<With Target="$Line">
		<AwkSplit Name="Cols" />
	</With>
	
	<If Target="$Cols.5" Equal="$someValue" CaseInsensitive="true">
		<Console>{$Line}</Console>
	</If>
</ForEach>

Above the entire list is decrypted and then sent through a text reader. Although technically there is no concatenation occuring, the text reader will just keep reading file after file from the list as if the files were concatenated.

Or using just one file and AwkFilter:

<HdfsFileStore Name="HadoopDeposits" RootPath="/" Connection="..." />

<File Name="MyList" Path="List_2014-09-12.txt.enc" In="$HadoopDeposits" />

<FileOps>
	<PGPDecrypt Name="DecryptedLists" Source="$MyList" Keyring="$TheKeyring" />
	<TextReader Name="TextRdr" Source="$DecryptedLists">
		<AwkFilter Column="5" Equal="$someValue" />
	</TextReader>
</FileOps>

<ForEach Name="Line" In="$TextRdr">
	<Console>{$Line}</Console>
</ForEach>

Above instead filters the incoming text keeping only the lines that match the filter. Thus reading from TextRdr produces only matching lines.

Other Streaming Commands

TextReader

[more about streaming with TR]

Hash

Hash can be inserted in the middle of a stream, like so:

<FileOps>
	<Join Name="CombinedDeposit" Source="$SplitDeposit" />
	<Hash Name="CombinedHash" Method="SHA256" Source="$CombinedDeposit" StreamTo="HashedDeposit" />
	<PGPDecrypt Name="DecryptedDeposit" Source="$HashedDeposit" Keyring="$TheKeyring" />
	<Untar Source="$DecryptedDeposit" Dest="$DepositDest" />
</FileOps>
  • $CombinedHash is calculated "silently" on the combined files
  • Hash fits into the stream as just another step, doesn't even take any extra memory - passes source memory on to dest
  • $CombinedHash is not available until Untar is done, due to the nature of streams only executing when a terminal destination is found

It is possible to hash multiple files and put them in a ValueMap - while also streaming:

	<Hash MapName="SplitHashes" Method="SHA256" Source="$SplitDeposit" StreamTo="HashedSplitDeposit" />

Sign

You can create a detached signature in a pass through stream like Hash above. The detached signature is stored in a HeapFile object.

<Var Name="Date" SetTo="2014-09-06" />
	
<LocalFolder Name="DepositSrc" Path="/files/{$Date}" />
<CtpFileStore Name="DepositDest" RootPath="/backup/{$Date}" Connection="$conn1" />

<FileOps>
	<Tar Name="CombinedDeposit" Source="$DepositSrc" NameHint="files.bak" />
	<Gzip Name="DeflatedDeposit" Source="$CombinedDeposit" />
	
	<PGPSign Name="SigFile" Source="$DeflatedDeposit" StreamTo="SignedDeposit"
		Keyring="$MyKeyring" Signer="John Smith" />
		
	<Copy Source="$SignedDeposit" Dest="$DepositDest" />
	<Copy Source="$SigFile" Dest="$DepositDest" />
</FileOps>

Where $SigFile is a HeapFile. The NameHint tells Tar to treat the original files as files.bak. Tar adds .tar so then the file becomes files.bak.tar. Gzip adds .gz so then file becomes files.bak.tar.gz. The HeapFile adds .sig so in the end two files are uploaded, they are on the remote store as:

/backup/2014-09-06/files.bak.tar.gz
/backup/2014-09-06/files.bak.tar.gz.sig
⚠️ **GitHub.com Fallback** ⚠️