Codegen examples - animeshtrivedi/notes GitHub Wiki
/* 001 */ public Object generate(Object[] references) {
/* 002 */ return new GeneratedIterator(references);
/* 003 */ }
/* 004 */
/* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 006 */ private Object[] references;
/* 007 */ private scala.collection.Iterator[] inputs;
/* 008 */ private scala.collection.Iterator scan_input;
/* 009 */ private org.apache.spark.sql.execution.metric.SQLMetric scan_numOutputRows;
/* 010 */ private org.apache.spark.sql.execution.metric.SQLMetric scan_scanTime;
/* 011 */ private long scan_scanTime1;
/* 012 */ private org.apache.spark.sql.execution.vectorized.ColumnarBatch scan_batch;
/* 013 */ private int scan_batchIdx;
/* 014 */ private org.apache.spark.sql.execution.vectorized.ColumnVector scan_colInstance0;
/* 015 */ private org.apache.spark.sql.execution.vectorized.ColumnVector scan_colInstance1;
/* 016 */ private UnsafeRow scan_result;
/* 017 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder scan_holder;
/* 018 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter scan_rowWriter;
/* 019 */
/* 020 */ public GeneratedIterator(Object[] references) {
/* 021 */ this.references = references;
/* 022 */ }
/* 023 */
/* 024 */ public void init(int index, scala.collection.Iterator[] inputs) {
/* 025 */ partitionIndex = index;
/* 026 */ this.inputs = inputs;
/* 027 */ scan_input = inputs[0];
/* 028 */ this.scan_numOutputRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[0];
/* 029 */ this.scan_scanTime = (org.apache.spark.sql.execution.metric.SQLMetric) references[1];
/* 030 */ scan_scanTime1 = 0;
/* 031 */ scan_batch = null;
/* 032 */ scan_batchIdx = 0;
/* 033 */ scan_colInstance0 = null;
/* 034 */ scan_colInstance1 = null;
/* 035 */ scan_result = new UnsafeRow(2);
/* 036 */ this.scan_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(scan_result, 32);
/* 037 */ this.scan_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(scan_holder, 2);
/* 038 */
/* 039 */ }
/* 040 */
/* 041 */ private void scan_nextBatch() throws java.io.IOException {
/* 042 */ long getBatchStart = System.nanoTime();
/* 043 */ if (scan_input.hasNext()) {
/* 044 */ scan_batch = (org.apache.spark.sql.execution.vectorized.ColumnarBatch)scan_input.next();
/* 045 */ scan_numOutputRows.add(scan_batch.numRows());
/* 046 */ scan_batchIdx = 0;
/* 047 */ scan_colInstance0 = scan_batch.column(0);
/* 048 */ scan_colInstance1 = scan_batch.column(1);
/* 049 */
/* 050 */ }
/* 051 */ scan_scanTime1 += System.nanoTime() - getBatchStart;
/* 052 */ }
/* 053 */
/* 054 */ protected void processNext() throws java.io.IOException {
/* 055 */ if (scan_batch == null) {
/* 056 */ scan_nextBatch();
/* 057 */ }
/* 058 */ while (scan_batch != null) {
/* 059 */ int numRows = scan_batch.numRows();
/* 060 */ while (scan_batchIdx < numRows) {
/* 061 */ int scan_rowIdx = scan_batchIdx++;
/* 062 */ boolean scan_isNull = scan_colInstance0.isNullAt(scan_rowIdx);
/* 063 */ int scan_value = scan_isNull ? -1 : (scan_colInstance0.getInt(scan_rowIdx));
/* 064 */ boolean scan_isNull1 = scan_colInstance1.isNullAt(scan_rowIdx);
/* 065 */ byte[] scan_value1 = scan_isNull1 ? null : (scan_colInstance1.getBinary(scan_rowIdx));
/* 066 */ scan_holder.reset();
/* 067 */
/* 068 */ scan_rowWriter.zeroOutNullBytes();
/* 069 */
/* 070 */ if (scan_isNull) {
/* 071 */ scan_rowWriter.setNullAt(0);
/* 072 */ } else {
/* 073 */ scan_rowWriter.write(0, scan_value);
/* 074 */ }
/* 075 */
/* 076 */ if (scan_isNull1) {
/* 077 */ scan_rowWriter.setNullAt(1);
/* 078 */ } else {
/* 079 */ scan_rowWriter.write(1, scan_value1);
/* 080 */ }
/* 081 */ scan_result.setTotalSize(scan_holder.totalSize());
/* 082 */ append(scan_result);
/* 083 */ if (shouldStop()) return;
/* 084 */ }
/* 085 */ scan_batch = null;
/* 086 */ scan_nextBatch();
/* 087 */ }
/* 088 */ scan_scanTime.add(scan_scanTime1 / (1000 * 1000));
/* 089 */ scan_scanTime1 = 0;
/* 090 */ }
/* 091 */ }
This is yet to be located conclusively. I suspect SortMergeJoinExec
.
/* 001 */ public Object generate(Object[] references) {
/* 002 */ return new GeneratedIterator(references);
/* 003 */ }
/* 004 */
/* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 006 */ private Object[] references;
/* 007 */ private scala.collection.Iterator[] inputs;
/* 008 */ private scala.collection.Iterator smj_leftInput;
/* 009 */ private scala.collection.Iterator smj_rightInput;
/* 010 */ private InternalRow smj_leftRow;
/* 011 */ private InternalRow smj_rightRow;
/* 012 */ private int smj_value4;
/* 013 */ private java.util.ArrayList smj_matches;
/* 014 */ private int smj_value5;
/* 015 */ private InternalRow smj_value6;
/* 016 */ private org.apache.spark.sql.execution.metric.SQLMetric smj_numOutputRows;
/* 017 */ private UnsafeRow smj_result;
/* 018 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder smj_holder;
/* 019 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter smj_rowWriter;
/* 020 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter smj_rowWriter1;
/* 021 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter smj_rowWriter2;
/* 022 */
/* 023 */ public GeneratedIterator(Object[] references) {
/* 024 */ this.references = references;
/* 025 */ }
/* 026 */
/* 027 */ public void init(int index, scala.collection.Iterator[] inputs) {
/* 028 */ partitionIndex = index;
/* 029 */ this.inputs = inputs;
/* 030 */ smj_leftInput = inputs[0];
/* 031 */ smj_rightInput = inputs[1];
/* 032 */
/* 033 */ smj_rightRow = null;
/* 034 */
/* 035 */ smj_matches = new java.util.ArrayList();
/* 036 */
/* 037 */ this.smj_numOutputRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[0];
/* 038 */ smj_result = new UnsafeRow(2);
/* 039 */ this.smj_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(smj_result, 64);
/* 040 */ this.smj_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(smj_holder, 2);
/* 041 */ this.smj_rowWriter1 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(smj_holder, 5);
/* 042 */ this.smj_rowWriter2 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(smj_holder, 5);
/* 043 */
/* 044 */ }
/* 045 */
/* 046 */ private boolean findNextInnerJoinRows(
/* 047 */ scala.collection.Iterator leftIter,
/* 048 */ scala.collection.Iterator rightIter) {
/* 049 */ smj_leftRow = null;
/* 050 */ int comp = 0;
/* 051 */ while (smj_leftRow == null) {
/* 052 */ if (!leftIter.hasNext()) return false;
/* 053 */ smj_leftRow = (InternalRow) leftIter.next();
/* 054 */
/* 055 */ InternalRow smj_value1 = smj_leftRow.getStruct(0, 5);
/* 056 */ boolean smj_isNull = false;
/* 057 */ int smj_value = -1;
/* 058 */
/* 059 */ if (smj_value1.isNullAt(0)) {
/* 060 */ smj_isNull = true;
/* 061 */ } else {
/* 062 */ smj_value = smj_value1.getInt(0);
/* 063 */ }
/* 064 */ if (smj_isNull) {
/* 065 */ smj_leftRow = null;
/* 066 */ continue;
/* 067 */ }
/* 068 */ if (!smj_matches.isEmpty()) {
/* 069 */ comp = 0;
/* 070 */ if (comp == 0) {
/* 071 */ comp = (smj_value > smj_value5 ? 1 : smj_value < smj_value5 ? -1 : 0);
/* 072 */ }
/* 073 */
/* 074 */ if (comp == 0) {
/* 075 */ return true;
/* 076 */ }
/* 077 */ smj_matches.clear();
/* 078 */ }
/* 079 */
/* 080 */ do {
/* 081 */ if (smj_rightRow == null) {
/* 082 */ if (!rightIter.hasNext()) {
/* 083 */ smj_value5 = smj_value;
/* 084 */ return !smj_matches.isEmpty();
/* 085 */ }
/* 086 */ smj_rightRow = (InternalRow) rightIter.next();
/* 087 */
/* 088 */ InternalRow smj_value3 = smj_rightRow.getStruct(0, 5);
/* 089 */ boolean smj_isNull2 = false;
/* 090 */ int smj_value2 = -1;
/* 091 */
/* 092 */ if (smj_value3.isNullAt(0)) {
/* 093 */ smj_isNull2 = true;
/* 094 */ } else {
/* 095 */ smj_value2 = smj_value3.getInt(0);
/* 096 */ }
/* 097 */ if (smj_isNull2) {
/* 098 */ smj_rightRow = null;
/* 099 */ continue;
/* 100 */ }
/* 101 */ smj_value4 = smj_value2;
/* 102 */ }
/* 103 */
/* 104 */ comp = 0;
/* 105 */ if (comp == 0) {
/* 106 */ comp = (smj_value > smj_value4 ? 1 : smj_value < smj_value4 ? -1 : 0);
/* 107 */ }
/* 108 */
/* 109 */ if (comp > 0) {
/* 110 */ smj_rightRow = null;
/* 111 */ } else if (comp < 0) {
/* 112 */ if (!smj_matches.isEmpty()) {
/* 113 */ smj_value5 = smj_value;
/* 114 */ return true;
/* 115 */ }
/* 116 */ smj_leftRow = null;
/* 117 */ } else {
/* 118 */ smj_matches.add(smj_rightRow.copy());
/* 119 */ smj_rightRow = null;;
/* 120 */ }
/* 121 */ } while (smj_leftRow != null);
/* 122 */ }
/* 123 */ return false; // unreachable
/* 124 */ }
/* 125 */
/* 126 */ protected void processNext() throws java.io.IOException {
/* 127 */ while (findNextInnerJoinRows(smj_leftInput, smj_rightInput)) {
/* 128 */ int smj_size = smj_matches.size();
/* 129 */ smj_value6 = smj_leftRow.getStruct(0, 5);
/* 130 */ for (int smj_i = 0; smj_i < smj_size; smj_i ++) {
/* 131 */ InternalRow smj_rightRow1 = (InternalRow) smj_matches.get(smj_i);
/* 132 */
/* 133 */ smj_numOutputRows.add(1);
/* 134 */
/* 135 */ InternalRow smj_value7 = smj_rightRow1.getStruct(0, 5);
/* 136 */ smj_holder.reset();
/* 137 */
/* 138 */ // Remember the current cursor so that we can calculate how many bytes are
/* 139 */ // written later.
/* 140 */ final int smj_tmpCursor = smj_holder.cursor;
/* 141 */
/* 142 */ if (smj_value6 instanceof UnsafeRow) {
/* 143 */ final int smj_sizeInBytes = ((UnsafeRow) smj_value6).getSizeInBytes();
/* 144 */ // grow the global buffer before writing data.
/* 145 */ smj_holder.grow(smj_sizeInBytes);
/* 146 */ ((UnsafeRow) smj_value6).writeToMemory(smj_holder.buffer, smj_holder.cursor);
/* 147 */ smj_holder.cursor += smj_sizeInBytes;
/* 148 */
/* 149 */ } else {
/* 150 */ smj_rowWriter1.reset();
/* 151 */
/* 152 */ final int smj_fieldName = smj_value6.getInt(0);
/* 153 */ if (smj_value6.isNullAt(0)) {
/* 154 */ smj_rowWriter1.setNullAt(0);
/* 155 */ } else {
/* 156 */ smj_rowWriter1.write(0, smj_fieldName);
/* 157 */ }
/* 158 */
/* 159 */ final long smj_fieldName1 = smj_value6.getLong(1);
/* 160 */ if (smj_value6.isNullAt(1)) {
/* 161 */ smj_rowWriter1.setNullAt(1);
/* 162 */ } else {
/* 163 */ smj_rowWriter1.write(1, smj_fieldName1);
/* 164 */ }
/* 165 */
/* 166 */ final double smj_fieldName2 = smj_value6.getDouble(2);
/* 167 */ if (smj_value6.isNullAt(2)) {
/* 168 */ smj_rowWriter1.setNullAt(2);
/* 169 */ } else {
/* 170 */ smj_rowWriter1.write(2, smj_fieldName2);
/* 171 */ }
/* 172 */
/* 173 */ final float smj_fieldName3 = smj_value6.getFloat(3);
/* 174 */ if (smj_value6.isNullAt(3)) {
/* 175 */ smj_rowWriter1.setNullAt(3);
/* 176 */ } else {
/* 177 */ smj_rowWriter1.write(3, smj_fieldName3);
/* 178 */ }
/* 179 */
/* 180 */ final UTF8String smj_fieldName4 = smj_value6.getUTF8String(4);
/* 181 */ if (smj_value6.isNullAt(4)) {
/* 182 */ smj_rowWriter1.setNullAt(4);
/* 183 */ } else {
/* 184 */ smj_rowWriter1.write(4, smj_fieldName4);
/* 185 */ }
/* 186 */ }
/* 187 */
/* 188 */ smj_rowWriter.setOffsetAndSize(0, smj_tmpCursor, smj_holder.cursor - smj_tmpCursor);
/* 189 */
/* 190 */ // Remember the current cursor so that we can calculate how many bytes are
/* 191 */ // written later.
/* 192 */ final int smj_tmpCursor6 = smj_holder.cursor;
/* 193 */
/* 194 */ if (smj_value7 instanceof UnsafeRow) {
/* 195 */ final int smj_sizeInBytes1 = ((UnsafeRow) smj_value7).getSizeInBytes();
/* 196 */ // grow the global buffer before writing data.
/* 197 */ smj_holder.grow(smj_sizeInBytes1);
/* 198 */ ((UnsafeRow) smj_value7).writeToMemory(smj_holder.buffer, smj_holder.cursor);
/* 199 */ smj_holder.cursor += smj_sizeInBytes1;
/* 200 */
/* 201 */ } else {
/* 202 */ smj_rowWriter2.reset();
/* 203 */
/* 204 */ final int smj_fieldName5 = smj_value7.getInt(0);
/* 205 */ if (smj_value7.isNullAt(0)) {
/* 206 */ smj_rowWriter2.setNullAt(0);
/* 207 */ } else {
/* 208 */ smj_rowWriter2.write(0, smj_fieldName5);
/* 209 */ }
/* 210 */
/* 211 */ final long smj_fieldName6 = smj_value7.getLong(1);
/* 212 */ if (smj_value7.isNullAt(1)) {
/* 213 */ smj_rowWriter2.setNullAt(1);
/* 214 */ } else {
/* 215 */ smj_rowWriter2.write(1, smj_fieldName6);
/* 216 */ }
/* 217 */
/* 218 */ final double smj_fieldName7 = smj_value7.getDouble(2);
/* 219 */ if (smj_value7.isNullAt(2)) {
/* 220 */ smj_rowWriter2.setNullAt(2);
/* 221 */ } else {
/* 222 */ smj_rowWriter2.write(2, smj_fieldName7);
/* 223 */ }
/* 224 */
/* 225 */ final float smj_fieldName8 = smj_value7.getFloat(3);
/* 226 */ if (smj_value7.isNullAt(3)) {
/* 227 */ smj_rowWriter2.setNullAt(3);
/* 228 */ } else {
/* 229 */ smj_rowWriter2.write(3, smj_fieldName8);
/* 230 */ }
/* 231 */
/* 232 */ final UTF8String smj_fieldName9 = smj_value7.getUTF8String(4);
/* 233 */ if (smj_value7.isNullAt(4)) {
/* 234 */ smj_rowWriter2.setNullAt(4);
/* 235 */ } else {
/* 236 */ smj_rowWriter2.write(4, smj_fieldName9);
/* 237 */ }
/* 238 */ }
/* 239 */
/* 240 */ smj_rowWriter.setOffsetAndSize(1, smj_tmpCursor6, smj_holder.cursor - smj_tmpCursor6);
/* 241 */ smj_result.setTotalSize(smj_holder.totalSize());
/* 242 */ append(smj_result.copy());
/* 243 */
/* 244 */ }
/* 245 */ if (shouldStop()) return;
/* 246 */ }
/* 247 */ }
/* 248 */ }
This is where the shuffle iterator is consumed.
/* 001 */ public Object generate(Object[] references) {
/* 002 */ return new GeneratedIterator(references);
/* 003 */ }
/* 004 */
/* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 006 */ private Object[] references;
/* 007 */ private scala.collection.Iterator[] inputs;
/* 008 */ private boolean sort_needToSort;
/* 009 */ private org.apache.spark.sql.execution.SortExec sort_plan;
/* 010 */ private org.apache.spark.sql.execution.UnsafeExternalRowSorter sort_sorter;
/* 011 */ private org.apache.spark.executor.TaskMetrics sort_metrics;
/* 012 */ private scala.collection.Iterator<UnsafeRow> sort_sortedIter;
/* 013 */ private scala.collection.Iterator inputadapter_input;
/* 014 */ private org.apache.spark.sql.execution.metric.SQLMetric sort_peakMemory;
/* 015 */ private org.apache.spark.sql.execution.metric.SQLMetric sort_spillSize;
/* 016 */ private org.apache.spark.sql.execution.metric.SQLMetric sort_sortTime;
/* 017 */
/* 018 */ public GeneratedIterator(Object[] references) {
/* 019 */ this.references = references;
/* 020 */ }
/* 021 */
/* 022 */ public void init(int index, scala.collection.Iterator[] inputs) {
/* 023 */ partitionIndex = index;
/* 024 */ this.inputs = inputs;
/* 025 */ sort_needToSort = true;
/* 026 */ this.sort_plan = (org.apache.spark.sql.execution.SortExec) references[0];
/* 027 */ sort_sorter = sort_plan.createSorter();
/* 028 */ sort_metrics = org.apache.spark.TaskContext.get().taskMetrics();
/* 029 */
/* 030 */ inputadapter_input = inputs[0];
/* 031 */ this.sort_peakMemory = (org.apache.spark.sql.execution.metric.SQLMetric) references[1];
/* 032 */ this.sort_spillSize = (org.apache.spark.sql.execution.metric.SQLMetric) references[2];
/* 033 */ this.sort_sortTime = (org.apache.spark.sql.execution.metric.SQLMetric) references[3];
/* 034 */
/* 035 */ }
/* 036 */
/* 037 */ private void sort_addToSorter() throws java.io.IOException {
/* 038 */ while (inputadapter_input.hasNext()) {
/* 039 */ InternalRow inputadapter_row = (InternalRow) inputadapter_input.next();
/* 040 */ sort_sorter.insertRow((UnsafeRow)inputadapter_row);
/* 041 */ if (shouldStop()) return;
/* 042 */ }
/* 043 */
/* 044 */ }
/* 045 */
/* 046 */ protected void processNext() throws java.io.IOException {
/* 047 */ if (sort_needToSort) {
/* 048 */ long sort_spillSizeBefore = sort_metrics.memoryBytesSpilled();
/* 049 */ sort_addToSorter();
/* 050 */ sort_sortedIter = sort_sorter.sort();
/* 051 */ sort_sortTime.add(sort_sorter.getSortTimeNanos() / 1000000);
/* 052 */ sort_peakMemory.add(sort_sorter.getPeakMemoryUsage());
/* 053 */ sort_spillSize.add(sort_metrics.memoryBytesSpilled() - sort_spillSizeBefore);
/* 054 */ sort_metrics.incPeakExecutionMemory(sort_sorter.getPeakMemoryUsage());
/* 055 */ sort_needToSort = false;
/* 056 */ }
/* 057 */
/* 058 */ while (sort_sortedIter.hasNext()) {
/* 059 */ UnsafeRow sort_outputRow = (UnsafeRow)sort_sortedIter.next();
/* 060 */
/* 061 */ append(sort_outputRow);
/* 062 */
/* 063 */ if (shouldStop()) return;
/* 064 */ }
/* 065 */ }
/* 066 */ }