DRILL-6191: Add acknowledgement sequence number and flags fields, details for flags
closes #1134
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/40894225
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/40894225
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/40894225
Branch: refs/heads/master
Commit: 408942259800d9987f4e84b3cdbd47e29920e934
Parents: 4bd3cc2
Author: Ted Dunning <ted.dunning@gmail.com>
Authored: Tue Jan 2 16:20:35 2018 -0800
Committer: Arina Ielchiieva <arina.yelchiyeva@gmail.com>
Committed: Sat Mar 3 19:47:43 2018 +0200
----------------------------------------------------------------------
.../drill/exec/store/pcap/PcapDrillTable.java | 2 +
.../drill/exec/store/pcap/PcapRecordReader.java | 75 ++++++++++++++
.../drill/exec/store/pcap/decoder/Packet.java | 102 ++++++++++++++++++-
.../store/pcap/decoder/PacketConstants.java | 3 +
.../drill/exec/store/pcap/schema/PcapTypes.java | 1 +
.../drill/exec/store/pcap/schema/Schema.java | 14 +++
.../exec/store/pcap/TestPcapRecordReader.java | 40 ++++++--
.../src/test/resources/store/pcap/synscan.pcap | Bin 0 -> 148872 bytes
8 files changed, 224 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/40894225/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapDrillTable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapDrillTable.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapDrillTable.java
index 2fbf67d..20e7e93 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapDrillTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapDrillTable.java
@@ -53,6 +53,8 @@ public class PcapDrillTable extends DrillTable {
return typeFactory.createSqlType(SqlTypeName.BIGINT);
case INTEGER:
return typeFactory.createSqlType(SqlTypeName.INTEGER);
+ case BOOLEAN:
+ return typeFactory.createSqlType(SqlTypeName.BOOLEAN);
case STRING:
return typeFactory.createSqlType(SqlTypeName.VARCHAR);
case TIMESTAMP:
http://git-wip-us.apache.org/repos/asf/drill/blob/40894225/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapRecordReader.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapRecordReader.java
index 26e1e65..d01b746 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapRecordReader.java
@@ -280,6 +280,76 @@ public class PcapRecordReader extends AbstractRecordReader {
setIntegerColumnValue(packet.getSequenceNumber(), pci, count);
}
break;
+ case "tcp_ack":
+ if (packet.isTcpPacket()) {
+ setIntegerColumnValue(packet.getAckNumber(), pci, count);
+ }
+ break;
+ case "tcp_flags":
+ if (packet.isTcpPacket()) {
+ setIntegerColumnValue(packet.getFlags(), pci, count);
+ }
+ break;
+ case "tcp_parsed_flags":
+ if (packet.isTcpPacket()) {
+ setStringColumnValue(packet.getParsedFlags(), pci, count);
+ }
+ break;
+ case "tcp_flags_ns":
+ if (packet.isTcpPacket()) {
+ setBooleanColumnValue((packet.getFlags() & 0x100) != 0, pci, count);
+ }
+ break;
+ case "tcp_flags_cwr":
+ if (packet.isTcpPacket()) {
+ setBooleanColumnValue((packet.getFlags() & 0x80) != 0, pci, count);
+ }
+ break;
+ case "tcp_flags_ece ":
+ if (packet.isTcpPacket()) {
+ setBooleanColumnValue((packet.getFlags() & 0x40) != 0, pci, count);
+ }
+ break;
+ case "tcp_flags_ece_ecn_capable":
+ if (packet.isTcpPacket()) {
+ setBooleanColumnValue((packet.getFlags() & 0x42) == 0x42, pci, count);
+ }
+ break;
+ case "tcp_flags_ece_congestion_experienced":
+ if (packet.isTcpPacket()) {
+ setBooleanColumnValue((packet.getFlags() & 0x42) == 0x40, pci, count);
+ }
+ break;
+ case "tcp_flags_urg":
+ if (packet.isTcpPacket()) {
+ setBooleanColumnValue((packet.getFlags() & 0x20) != 0, pci, count);
+ }
+ break;
+ case "tcp_flags_ack":
+ if (packet.isTcpPacket()) {
+ setBooleanColumnValue((packet.getFlags() & 0x10) != 0, pci, count);
+ }
+ break;
+ case "tcp_flags_psh":
+ if (packet.isTcpPacket()) {
+ setBooleanColumnValue((packet.getFlags() & 0x8) != 0, pci, count);
+ }
+ break;
+ case "tcp_flags_rst":
+ if (packet.isTcpPacket()) {
+ setBooleanColumnValue((packet.getFlags() & 0x4) != 0, pci, count);
+ }
+ break;
+ case "tcp_flags_syn":
+ if (packet.isTcpPacket()) {
+ setBooleanColumnValue((packet.getFlags() & 0x2) != 0, pci, count);
+ }
+ break;
+ case "tcp_flags_fin":
+ if (packet.isTcpPacket()) {
+ setBooleanColumnValue((packet.getFlags() & 0x1) != 0, pci, count);
+ }
+ break;
case "packet_length":
setIntegerColumnValue(packet.getPacketLength(), pci, count);
break;
@@ -305,6 +375,11 @@ public class PcapRecordReader extends AbstractRecordReader {
.setSafe(count, data);
}
+ private void setBooleanColumnValue(final boolean data, final ProjectedColumnInfo pci, final
int count) {
+ ((NullableIntVector.Mutator) pci.vv.getMutator())
+ .setSafe(count, data ? 1 : 0);
+ }
+
private void setTimestampColumnValue(final long data, final ProjectedColumnInfo pci, final
int count) {
((NullableTimeStampVector.Mutator) pci.vv.getMutator())
.setSafe(count, data);
http://git-wip-us.apache.org/repos/asf/drill/blob/40894225/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/Packet.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/Packet.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/Packet.java
index 0a45290..9cc98de 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/Packet.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/Packet.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.net.InetAddress;
import java.net.UnknownHostException;
+import java.util.Formatter;
import static org.apache.drill.exec.store.pcap.PcapFormatUtils.convertInt;
import static org.apache.drill.exec.store.pcap.PcapFormatUtils.convertShort;
@@ -43,7 +44,9 @@ public class Packet {
private byte[] raw;
+ // index into the raw data where the current ethernet packet starts
private int etherOffset;
+ // index into the raw data where the current IP packet starts. Should be just after etherOffset
private int ipOffset;
private int packetLength;
@@ -180,13 +183,104 @@ public class Packet {
public int getSequenceNumber() {
if (isTcpPacket()) {
- int sequenceOffset = PacketConstants.ETHER_HEADER_LENGTH + getIPHeaderLength() + getTCPHeaderLength(raw)
+ 4;
- return Math.abs(convertInt(raw, sequenceOffset));
+ return convertInt(raw, ipOffset + getIPHeaderLength() + PacketConstants.TCP_SEQUENCE_OFFSET);
} else {
return 0;
}
}
+ public int getAckNumber() {
+ if (isTcpPacket()) {
+ return convertInt(raw, ipOffset + getIPHeaderLength() + PacketConstants.TCP_ACK_OFFSET);
+ } else {
+ return 0;
+ }
+ }
+
+ public int getFlags() {
+ if (isTcpPacket()) {
+ return convertShort(raw, ipOffset + getIPHeaderLength() + PacketConstants.TCP_FLAG_OFFSET)
& 0xfff;
+ } else {
+ return 0;
+ }
+ }
+
+ public String getParsedFlags() {
+ return formatFlags(getFlags());
+ }
+
+ public static String formatFlags(int flags) {
+ int mask = 0x100;
+ StringBuilder r = new StringBuilder();
+ String separator = "";
+ if ((flags & mask) != 0) {
+ r.append(separator);
+ r.append("NS");
+ separator = "|";
+ }
+ mask = mask >> 1;
+
+ if ((flags & mask) != 0) {
+ r.append(separator);
+ r.append("CWR");
+ separator = "|";
+ }
+ mask = mask >> 1;
+
+ if ((flags & mask) != 0) {
+ r.append(separator);
+ r.append("ECE");
+ if ((flags & 2) != 0) {
+ r.append(" (ECN capable)");
+ } else {
+ r.append(" (Congestion experienced)");
+ }
+ separator = "|";
+ }
+ mask = mask >> 1;
+
+ if ((flags & mask) != 0) {
+ r.append(separator);
+ r.append("URG");
+ separator = "|";
+ }
+ mask = mask >> 1;
+
+ if ((flags & mask) != 0) {
+ r.append(separator);
+ r.append("ACK");
+ separator = "|";
+ }
+ mask = mask >> 1;
+
+ if ((flags & mask) != 0) {
+ r.append(separator);
+ r.append("PSH");
+ separator = "|";
+ }
+ mask = mask >> 1;
+
+ if ((flags & mask) != 0) {
+ r.append(separator);
+ r.append("RST");
+ separator = "|";
+ }
+ mask = mask >> 1;
+
+ if ((flags & mask) != 0) {
+ r.append(separator);
+ r.append("SYN");
+ separator = "|";
+ }
+ mask = mask >> 1;
+
+ if ((flags & mask) != 0) {
+ r.append(separator);
+ r.append("FIN");
+ }
+ return r.toString();
+ }
+
public int getSrc_port() {
if (isPPPoV6Packet()) {
return getPort(64);
@@ -361,9 +455,9 @@ public class Packet {
private String getEthernetAddress(int offset) {
byte[] r = new byte[6];
System.arraycopy(raw, etherOffset + offset, r, 0, 6);
- StringBuilder sb = new StringBuilder();
+ Formatter sb = new Formatter();
for (int i = 0; i < r.length; i++) {
- sb.append(String.format("%02X%s", r[i], (i < r.length - 1) ? ":" : ""));
+ sb.format("%02X%s", r[i], (i < r.length - 1) ? ":" : "");
}
return sb.toString();
}
http://git-wip-us.apache.org/repos/asf/drill/blob/40894225/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/PacketConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/PacketConstants.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/PacketConstants.java
index 2c87623..6f29253 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/PacketConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/PacketConstants.java
@@ -65,4 +65,7 @@ public final class PacketConstants {
public static final int PPPoV6_IP_OFFSET = 28;
+ public static final int TCP_SEQUENCE_OFFSET = 4;
+ public static final int TCP_ACK_OFFSET = 8;
+ public static final int TCP_FLAG_OFFSET = 12;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/40894225/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/schema/PcapTypes.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/schema/PcapTypes.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/schema/PcapTypes.java
index 5c6df71..fc6e029 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/schema/PcapTypes.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/schema/PcapTypes.java
@@ -18,6 +18,7 @@
package org.apache.drill.exec.store.pcap.schema;
public enum PcapTypes {
+ BOOLEAN,
INTEGER,
STRING,
LONG,
http://git-wip-us.apache.org/repos/asf/drill/blob/40894225/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/schema/Schema.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/schema/Schema.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/schema/Schema.java
index b3e7722..89bd08f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/schema/Schema.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/schema/Schema.java
@@ -41,6 +41,20 @@ public class Schema {
columns.add(new ColumnDto("src_mac_address", PcapTypes.STRING));
columns.add(new ColumnDto("dst_mac_address", PcapTypes.STRING));
columns.add(new ColumnDto("tcp_session", PcapTypes.LONG));
+ columns.add(new ColumnDto("tcp_ack", PcapTypes.INTEGER));
+ columns.add(new ColumnDto("tcp_flags", PcapTypes.INTEGER));
+ columns.add(new ColumnDto("tcp_flags_ns", PcapTypes.INTEGER));
+ columns.add(new ColumnDto("tcp_flags_cwr", PcapTypes.INTEGER));
+ columns.add(new ColumnDto("tcp_flags_ece ", PcapTypes.INTEGER ));
+ columns.add(new ColumnDto("tcp_flags_ece_ecn_capable", PcapTypes.INTEGER ));
+ columns.add(new ColumnDto("tcp_flags_ece_congestion_experienced", PcapTypes.INTEGER ));
+ columns.add(new ColumnDto("tcp_flags_urg", PcapTypes.INTEGER ));
+ columns.add(new ColumnDto("tcp_flags_ack", PcapTypes.INTEGER ));
+ columns.add(new ColumnDto("tcp_flags_psh", PcapTypes.INTEGER ));
+ columns.add(new ColumnDto("tcp_flags_rst", PcapTypes.INTEGER ));
+ columns.add(new ColumnDto("tcp_flags_syn", PcapTypes.INTEGER ));
+ columns.add(new ColumnDto("tcp_flags_fin", PcapTypes.INTEGER ));
+ columns.add(new ColumnDto("tcp_parsed_flags", PcapTypes.STRING));
columns.add(new ColumnDto("packet_length", PcapTypes.INTEGER));
columns.add(new ColumnDto("data", PcapTypes.STRING));
}
http://git-wip-us.apache.org/repos/asf/drill/blob/40894225/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestPcapRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestPcapRecordReader.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestPcapRecordReader.java
index bb81469..385c0e0 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestPcapRecordReader.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestPcapRecordReader.java
@@ -16,16 +16,17 @@
*/
package org.apache.drill.exec.store.pcap;
+import org.apache.drill.exec.store.pcap.decoder.Packet;
import org.apache.drill.test.BaseTestQuery;
-import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.rpc.user.QueryDataBatch;
-import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import java.nio.file.Paths;
import java.util.List;
+import static org.junit.Assert.assertEquals;
+
public class TestPcapRecordReader extends BaseTestQuery {
@BeforeClass
public static void setupTestFiles() {
@@ -52,21 +53,42 @@ public class TestPcapRecordReader extends BaseTestQuery {
runSQLVerifyCount("select distinct type, network, `timestamp`, src_ip, dst_ip, src_port,
dst_port, src_mac_address, dst_mac_address, tcp_session, packet_length from dfs.`store/pcap/tcp-1.pcap`",
1);
}
+ @Test
+ public void testFlagFormatting() {
+ assertEquals("NS", Packet.formatFlags(0x100));
+ assertEquals("CWR", Packet.formatFlags(0x80));
+ assertEquals("ECE", Packet.formatFlags(0x40).substring(0, 3));
+ assertEquals("ECE", Packet.formatFlags(0x42).substring(0, 3));
+ assertEquals("URG", Packet.formatFlags(0x20));
+ assertEquals("ACK", Packet.formatFlags(0x10));
+ assertEquals("PSH", Packet.formatFlags(0x8));
+ assertEquals("RST", Packet.formatFlags(0x4));
+ assertEquals("SYN", Packet.formatFlags(0x2));
+ assertEquals("FIN", Packet.formatFlags(0x1));
+ assertEquals("RST|SYN|FIN", Packet.formatFlags(0x7));
+ }
+
+ @Test
+ public void checkFlags() throws Exception {
+ runSQLVerifyCount("select tcp_session, tcp_ack, tcp_flags from dfs.`store/pcap/synscan.pcap`",
2011);
+ }
+
private void runSQLVerifyCount(String sql, int expectedRowCount) throws Exception {
List<QueryDataBatch> results = runSQLWithResults(sql);
- printResultAndVerifyRowCount(results, expectedRowCount);
+ verifyRowCount(results, expectedRowCount);
}
private List<QueryDataBatch> runSQLWithResults(String sql) throws Exception {
return testSqlWithResults(sql);
}
- private void printResultAndVerifyRowCount(List<QueryDataBatch> results,
- int expectedRowCount) throws SchemaChangeException
{
- setColumnWidth(35);
- int rowCount = printResult(results);
- if (expectedRowCount != -1) {
- Assert.assertEquals(expectedRowCount, rowCount);
+ private void verifyRowCount(List<QueryDataBatch> results, int expectedRowCount) {
+ int count = 0;
+ for (final QueryDataBatch result : results) {
+ count += result.getHeader().getRowCount();
+ result.release();
}
+ assertEquals(expectedRowCount, count);
}
+
}
http://git-wip-us.apache.org/repos/asf/drill/blob/40894225/exec/java-exec/src/test/resources/store/pcap/synscan.pcap
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/store/pcap/synscan.pcap b/exec/java-exec/src/test/resources/store/pcap/synscan.pcap
new file mode 100644
index 0000000..8c2ca36
Binary files /dev/null and b/exec/java-exec/src/test/resources/store/pcap/synscan.pcap differ
|