Dpdk学习过程中记录下来的代码
Udp
启动命令
make && gdb ./build/send
Code
- upd收发包,arp-resp
send.c
#include <stdio.h>
#include <stdint.h>
#include <pthread.h>
#include <inttypes.h>
#include <rte_eal.h>
#include <rte_ethdev.h>
#include <rte_mbuf.h>
#include <rte_ip.h>
#include <rte_udp.h>
#include <rte_memcpy.h>
#include <rte_log.h>
#include <rte_malloc.h>
#include <rte_ether.h>
#include <rte_timer.h>
#include <rte_cycles.h>
#include <arpa/inet.h>
#include <netinet/in.h> // 定义sockaddr_in
#include <sys/socket.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#define BUFFER_SIZE 1024
#define PORT 8888
#include "send2.h"
#define RX_RING_SIZE 1024
#define TX_RING_SIZE 1024
#define NUM_MBUFS 8191
#define MBUF_CACHE_SIZE 250
#define BURST_SIZE 32
#define RING_SIZE 1024
#define MAX_MASK 10
#define ENABLE_ARP 1
#define ENABLE_ICMP 1
#define ENABLE_DEBUG 1
#define ENABLE_TIMER 1
#define ENABLE_RING 1
#define ENABLE_UDP_APP 1
#define ENABLE_TCP_APP 1
#if ENABLE_ARP
#define MAKE_IPV4_ADDR(a, b, c, d) (a + (b << 8) + (c << 16) + (d << 24))
static uint32_t gLocalIp = MAKE_IPV4_ADDR(192, 168, 42, 128);
static uint8_t gDefaultArpMac[RTE_ETHER_ADDR_LEN] = {0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF};
#endif
static uint32_t gSrcIp;
static uint32_t gDstIp;
static uint8_t gSrcMac[RTE_ETHER_ADDR_LEN];
static uint8_t gDstMac[RTE_ETHER_ADDR_LEN];
static uint16_t gSrcPort;
static uint16_t gDstPort;
#if ENABLE_RING
struct inout_ring
{
struct rte_ring *in;
struct rte_ring *out;
};
static struct inout_ring *rInst = NULL;
struct inout_ring *ringInstance(void)
{
if (rInst == NULL)
{
rInst = rte_malloc("in/out ring", sizeof(struct inout_ring), 0);
memset(rInst, 0, sizeof(struct inout_ring));
}
return rInst;
}
#endif
int gDpdkPortId = 0;
#if ENABLE_UDP_APP
struct localhost
{
int fd;
uint32_t localip;
uint16_t localport;
uint8_t localmac[RTE_ETHER_ADDR_LEN];
uint8_t protocol;
struct rte_ring *sendbuf;
struct rte_ring *recvbuf;
struct localhost *prev;
struct localhost *next;
pthread_cond_t cond;
pthread_mutex_t mutex;
};
static struct localhost *hosts = NULL;
struct offload
{
uint32_t sip;
uint32_t dip;
uint16_t sport;
uint16_t dport;
int protocol;
unsigned char *data;
uint16_t length;
};
int nsocket(int domain, int type, int protocol);
int nbind(int sockfd, const struct sockaddr *addr, socklen_t addrlen);
ssize_t nrecvfrom(int sockfd, void *buf, size_t len, int flags, struct sockaddr *src_addr, socklen_t *addrlen);
ssize_t nsendto(int sockfd, const void *buf, size_t len, int flags, const struct sockaddr *dest_addr, socklen_t addrlen);
int nclose(int fd);
static int encode_udp_pkg(uint8_t *msg, uint32_t sip, uint32_t dip,
uint16_t sport, uint16_t dport, uint8_t *srcmac, uint8_t *dstmac, uint8_t *data, uint16_t length);
#define DEFAULT_FD_NUM 3
int get_fd_frombitmap(void)
{
int fd = DEFAULT_FD_NUM;
return fd;
}
struct localhost *get_hostinfo_fromfd(int fd)
{
struct localhost *host;
for (host = hosts; host != NULL; host = host->next)
{
if (host->fd == fd)
{
return host;
}
}
return NULL;
}
struct localhost *get_hostinfo_from_ip_port(uint32_t dip, uint16_t dport, uint8_t protocol)
{
struct localhost *host;
for (host = hosts; host != NULL; host = host->next)
{
printf("port: %d - %d\n", host->localport, dport);
printf("protocol: %d - %d\n", host->protocol, protocol);
printf("IP: %" PRIu8 ".%" PRIu8 ".%" PRIu8 ".%" PRIu8 " - ",
host->localip & 0xFF, (host->localip >> 8) & 0xFF,
(host->localip >> 16) & 0xFF, (host->localip >> 24) & 0xFF);
printf("%" PRIu8 ".%" PRIu8 ".%" PRIu8 ".%" PRIu8 "\n",
dip & 0xFF, (dip >> 8) & 0xFF,
(dip >> 16) & 0xFF, (dip >> 24) & 0xFF);
if (host->localip == dip && host->localport == dport && host->protocol == protocol)
{
return host;
}
}
return NULL;
}
int nsocket(int domain, int type, int protocol)
{
struct localhost *host = rte_malloc("localhost", sizeof(struct localhost), 0);
if (host == NULL)
return -1;
memset(host, 0, sizeof(struct localhost *));
int fd = get_fd_frombitmap();
host->fd = fd;
if (type == SOCK_DGRAM)
{
host->protocol = IPPROTO_UDP;
}
else if (type == SOCK_STREAM)
{
host->protocol = IPPROTO_TCP;
}
host->sendbuf = rte_ring_create("send buffer", RING_SIZE, rte_socket_id(), RING_F_SP_ENQ | RING_F_SC_DEQ);
if (host->sendbuf == NULL)
{
rte_free(host);
return -1;
}
host->recvbuf = rte_ring_create("recv buffer", RING_SIZE, rte_socket_id(), RING_F_SP_ENQ | RING_F_SC_DEQ);
if (host->recvbuf == NULL)
{
rte_ring_free(host->sendbuf);
rte_free(host);
return -1;
}
pthread_cond_t black_cond = PTHREAD_COND_INITIALIZER;
rte_memcpy(&host->cond, &black_cond, sizeof(pthread_cond_t));
pthread_mutex_t black_mutex = PTHREAD_MUTEX_INITIALIZER;
rte_memcpy(&host->mutex, &black_mutex, sizeof(pthread_mutex_t));
LL_ADD(host, hosts);
printf("Socket......\n");
return fd;
}
int nbind(int sockfd, const struct sockaddr *addr, socklen_t addrlen)
{
struct localhost *host = get_hostinfo_fromfd(sockfd);
if (host == NULL)
return -1;
struct sockaddr_in *laddr = (struct sockaddr_in *)addr;
host->localport = ntohs(laddr->sin_port);
rte_memcpy(&host->localip, &laddr->sin_addr.s_addr, sizeof(uint32_t));
rte_memcpy(host->localmac, gSrcMac, RTE_ETHER_ADDR_LEN);
printf("Bind......\n");
}
ssize_t nrecvfrom(int sockfd, void *buf, size_t len, int flags, struct sockaddr *src_addr, socklen_t *addrlen)
{
struct localhost *host = get_hostinfo_fromfd(sockfd);
if (host == NULL)
return -1;
struct offload *ol = NULL;
int nb = -1;
pthread_mutex_lock(&host->mutex);
while ((nb = rte_ring_mc_dequeue(host->recvbuf, (void **)&ol)) < 0)
{
pthread_cond_wait(&host->cond, &host->mutex);
}
pthread_mutex_unlock(&host->mutex);
printf("rte_memcpy %s:%d\n", ol->data, ol->length);
// 填充参数
struct sockaddr_in *saddr = (struct sockaddr_in *)src_addr;
saddr->sin_port = htons(ol->dport);
rte_memcpy(&saddr->sin_addr.s_addr, &ol->dip, sizeof(uint32_t));
if (len < ol->length)
{
// 根据最大尺寸对收到的udp包进行分割,剩余部分单独切割程
printf("根据最大尺寸对收到的udp包进行分割,剩余部分单独切割程\n");
unsigned char *ptr = NULL;
rte_memcpy(buf, ol->data, len);
ptr = rte_malloc("unsigned char *", ol->length - len, 0);
rte_memcpy(ptr, ol->data + len, ol->length - len);
ol->length -= len;
rte_free(ol->data);
ol->data = ptr;
rte_ring_mp_enqueue(host->recvbuf, (void *)ol);
return len;
}
else
{
rte_memcpy(buf, ol->data, ol->length);
uint16_t length = ol->length;
rte_free(ol->data);
rte_free(ol);
return length;
}
}
ssize_t nsendto(int sockfd, const void *buf, size_t len, int flags, const struct sockaddr *dest_addr, socklen_t addrlen)
{
struct localhost *host = get_hostinfo_fromfd(sockfd);
if (host == NULL)
return -1;
struct offload *ol = rte_malloc("offload", sizeof(struct offload), 0);
if (ol == NULL)
return -1;
struct sockaddr_in *daddr = (struct sockaddr_in *)dest_addr;
ol->dip = daddr->sin_addr.s_addr;
ol->dport = ntohs(daddr->sin_port);
ol->sip = host->localip;
ol->sport = ntohs(host->localport);
ol->length = len;
ol->data = rte_malloc("unsigned char *", len, 0);
if (ol->data == NULL)
{
rte_free(ol);
return -1;
}
rte_memcpy(ol->data, buf, len);
rte_ring_mp_enqueue(host->sendbuf, (void *)ol);
return len;
}
int nclose(int fd)
{
struct localhost *host = get_hostinfo_fromfd(fd);
if (host == NULL)
return -1;
LL_REMOVE(host, hosts);
rte_ring_free(host->recvbuf);
rte_ring_free(host->sendbuf);
rte_free(host);
}
#endif
#if ENABLE_TCP_APP
#endif
static int udp_server();
static void arp_request_timer_cb(__rte_unused struct rte_timer *tim, __rte_unused void *arg);
static struct rte_mbuf *ng_send_arp(struct rte_mempool *mbuf_pool, uint16_t op, uint8_t *data_mac, uint32_t sip, uint32_t tip);
static inline void print_ether_addr(const char *what, struct rte_ether_addr *eth_addr)
{
char buf[RTE_ETHER_ADDR_FMT_SIZE];
rte_ether_format_addr(buf, RTE_ETHER_ADDR_FMT_SIZE, eth_addr);
printf("%s%s", what, buf);
}
static void arp_request_timer_cb(__rte_unused struct rte_timer *tim, void *arg)
{
printf("Timer Inner......\n");
// 打印收到的package数量
struct rte_eth_stats stats;
rte_eth_stats_get(gDpdkPortId, &stats);
// 4. 统计信息
printf("\n[Statistics]\n");
printf(" - RX Packets: %" PRIu64 "\n", stats.ipackets);
printf(" - RX Errors: %" PRIu64 "\n", stats.ierrors);
printf(" - RX Bytes: %" PRIu64 "\n", stats.ibytes);
printf(" - TX Packets: %" PRIu64 "\n", stats.opackets);
printf(" - TX Errors: %" PRIu64 "\n", stats.oerrors);
printf(" - TX Bytes: %" PRIu64 "\n", stats.obytes);
printf(" - RX Missed: %" PRIu64 "\n", stats.imissed);
printf(" - RX No-MBUFs: %" PRIu64 "\n", stats.rx_nombuf);
printf("\n[Statistics]\n");
struct rte_mempool *mbuf_pool = (struct rte_mempool *)arg;
struct inout_ring *ring = ringInstance();
uint16_t i; // 变量声明移到循环外
// 循环当前ip段内的所有的ip进行arp请求,获取对应的arp信息,ip对应的mac地址
for (i = 0; i < MAX_MASK; i++)
{
uint32_t dstip = (gLocalIp & 0x00FFFFFF) | (0xFF000000 & (i << 24));
struct in_addr addr;
addr.s_addr = dstip;
printf("arp send-> %s\n", inet_ntoa(addr));
uint8_t *dstmac = ng_get_dst_mac(dstip);
struct rte_mbuf *arpbuf = NULL;
if (dstmac == NULL)
{
// 设置ether的mac为ff,设置arp的mac为00
arpbuf = ng_send_arp(mbuf_pool, RTE_ARP_OP_REQUEST, gDefaultArpMac, gLocalIp, dstip);
}
else
{
arpbuf = ng_send_arp(mbuf_pool, RTE_ARP_OP_REQUEST, dstmac, gLocalIp, dstip);
}
// 做对应的应答
// rte_eth_tx_burst(gDpdkPortId, 0, &arpbuf, 1);
// rte_pktmbuf_free(arpbuf);
rte_ring_mp_enqueue_burst(ring->out, (void **)&arpbuf, 1, NULL);
}
}
// 网卡配置
static struct rte_eth_conf port_conf = {
.rxmode = {
.max_rx_pkt_len = RTE_ETHER_MAX_LEN,
},
};
// icmp校验
static uint16_t ng_checksum(uint16_t *addr, int count)
{
register long sum = 0;
while (count > 1)
{
sum += *(unsigned short *)addr++;
count -= 2;
}
if (count > 0)
{
sum += *(unsigned char *)addr;
}
while (sum >> 16)
{
sum = (sum & 0xffff) + (sum >> 16);
}
return ~sum;
}
// 初始化网卡
static int init_port(struct rte_mempool *mbuf_pool)
{
// 获取网卡对象
struct rte_eth_dev_info dev_info;
rte_eth_dev_info_get(gDpdkPortId, &dev_info);
// 从当前绑定的网卡上获取mac地址
rte_eth_macaddr_get(gDpdkPortId, (struct rte_ether_addr *)gSrcMac);
int ret;
uint16_t nb_rxd = RX_RING_SIZE;
uint16_t nb_txd = TX_RING_SIZE;
// 配置网卡
ret = rte_eth_dev_configure(gDpdkPortId, 1, 1, &port_conf);
if (ret < 0)
{
rte_exit(EXIT_FAILURE, "Failed to configure port %u\n", gDpdkPortId);
}
// 设置接收队列
ret = rte_eth_rx_queue_setup(gDpdkPortId, 0, nb_rxd, rte_eth_dev_socket_id(gDpdkPortId), NULL, mbuf_pool);
if (ret < 0)
{
rte_exit(EXIT_FAILURE, "Failed to setup RX queue for port %u\n", gDpdkPortId);
}
// 设置发送队列
struct rte_eth_txconf txq_conf = dev_info.default_txconf;
txq_conf.offloads = port_conf.txmode.offloads;
ret = rte_eth_tx_queue_setup(gDpdkPortId, 0, nb_rxd, rte_eth_dev_socket_id(gDpdkPortId), &txq_conf);
if (ret < 0)
{
rte_exit(EXIT_FAILURE, "Failed to setup TX queue for port %u\n", gDpdkPortId);
}
// 启动网卡
ret = rte_eth_dev_start(gDpdkPortId);
if (ret < 0)
{
rte_exit(EXIT_FAILURE, "Failed to start port %u\n", gDpdkPortId);
}
// 启用混杂模式(接收所有流量)
rte_eth_promiscuous_enable(gDpdkPortId);
printf("Port %u initialized, promiscuous mode ON\n", gDpdkPortId);
return 0;
}
// arp结构体组装
static int encode_arp(uint8_t *msg, uint16_t op, uint8_t *dst_mac, uint32_t sip, uint32_t tip)
{
// 1 ethhdr
/*
| Destination MAC | Source MAC | Type |
|-----------------|-----------------|-------|
| 00 1A 2B 3C 4D 5E | 00 0A 0B 0C 0D 0E | 08 00 |
*/
struct rte_ether_hdr *eth = (struct rte_ether_hdr *)msg;
rte_memcpy(eth->s_addr.addr_bytes, gSrcMac, RTE_ETHER_ADDR_LEN);
rte_memcpy(eth->d_addr.addr_bytes, dst_mac, RTE_ETHER_ADDR_LEN);
eth->ether_type = htons(RTE_ETHER_TYPE_ARP);
// 2 arphdr
struct rte_arp_hdr *hdr = (struct rte_arp_hdr *)(msg + sizeof(struct rte_ether_hdr));
hdr->arp_hardware = htons(1);
hdr->arp_protocol = htons(RTE_ETHER_TYPE_IPV4);
hdr->arp_hlen = RTE_ETHER_ADDR_LEN;
hdr->arp_plen = sizeof(uint32_t);
hdr->arp_opcode = htons(op);
rte_memcpy(hdr->arp_data.arp_sha.addr_bytes, gSrcMac, RTE_ETHER_ADDR_LEN);
hdr->arp_data.arp_sip = sip;
rte_memcpy(hdr->arp_data.arp_tha.addr_bytes, dst_mac, RTE_ETHER_ADDR_LEN);
hdr->arp_data.arp_tip = tip;
}
// upd结构体组装
static int encode_udp(uint8_t *msg, unsigned char *data, uint16_t length)
{
// encode
// 1 ethhdr
/*
| Destination MAC | Source MAC | Type |
|-----------------|-----------------|-------|
| 00 1A 2B 3C 4D 5E | 00 0A 0B 0C 0D 0E | 08 00 |
*/
struct rte_ether_hdr *eth = (struct rte_ether_hdr *)msg;
rte_memcpy(eth->s_addr.addr_bytes, gSrcMac, RTE_ETHER_ADDR_LEN);
rte_memcpy(eth->d_addr.addr_bytes, gDstMac, RTE_ETHER_ADDR_LEN);
eth->ether_type = htons(RTE_ETHER_TYPE_IPV4);
// 2 iphdr
struct rte_ipv4_hdr *ip = (struct rte_ipv4_hdr *)(msg + sizeof(struct rte_ether_hdr));
ip->version_ihl = 0x45;
ip->type_of_service = 0;
ip->total_length = htons(length - sizeof(struct rte_ether_hdr));
ip->packet_id = 0;
ip->fragment_offset = 0;
ip->time_to_live = 64;
ip->next_proto_id = IPPROTO_UDP;
ip->src_addr = gSrcIp;
ip->dst_addr = gDstIp;
ip->hdr_checksum = 0;
ip->hdr_checksum = rte_ipv4_cksum(ip);
// 3 udphdr
struct rte_udp_hdr *udp = (struct rte_udp_hdr *)(msg + sizeof(struct rte_ether_hdr) + sizeof(struct rte_ipv4_hdr));
udp->src_port = gSrcPort;
udp->dst_port = gDstPort;
uint16_t udplen = length - sizeof(struct rte_ether_hdr) - sizeof(struct rte_ipv4_hdr);
udp->dgram_len = htons(udplen);
rte_memcpy((uint8_t *)(udp + 1), data, udplen);
udp->dgram_cksum = 0;
udp->dgram_cksum = rte_ipv4_udptcp_cksum(ip, udp);
}
// icmp结构体组装
static int encode_icmp(uint8_t *msg, uint8_t *dst_mac, uint32_t sip, uint32_t tip, uint16_t ident, uint16_t seq_nb)
{
// 1 ethhdr
struct rte_ether_hdr *eth = (struct rte_ether_hdr *)msg;
rte_memcpy(eth->s_addr.addr_bytes, gSrcMac, RTE_ETHER_ADDR_LEN);
rte_memcpy(eth->d_addr.addr_bytes, dst_mac, RTE_ETHER_ADDR_LEN);
eth->ether_type = htons(RTE_ETHER_TYPE_IPV4);
// 2 iphdr
struct rte_ipv4_hdr *ip = (struct rte_ipv4_hdr *)(msg + sizeof(struct rte_ether_hdr));
ip->version_ihl = 0x45;
ip->type_of_service = 0;
ip->total_length = htons(sizeof(struct rte_icmp_hdr) + sizeof(struct rte_ipv4_hdr));
ip->packet_id = 0;
ip->fragment_offset = 0;
ip->time_to_live = 64;
ip->next_proto_id = IPPROTO_ICMP;
ip->src_addr = sip;
ip->dst_addr = tip;
// 3 icmphdr
struct rte_icmp_hdr *icmp = (struct rte_icmp_hdr *)(msg + sizeof(struct rte_ether_hdr) + sizeof(struct rte_ipv4_hdr));
icmp->icmp_type = RTE_IP_ICMP_ECHO_REPLY;
icmp->icmp_code = 0;
icmp->icmp_ident = htons(ident);
icmp->icmp_seq_nb = htons(seq_nb);
icmp->icmp_cksum = 0;
icmp->icmp_cksum = ng_checksum((uint16_t *)icmp, sizeof(struct rte_icmp_hdr));
}
// 发送udp
static struct rte_mbuf *ng_send_udp(struct rte_mempool *mbuf_pool, uint8_t *data, uint16_t length)
{
// 从内存池中获取的长度
const unsigned total_len = length + 14 + 20 + 8;
struct rte_mbuf *mbuf = rte_pktmbuf_alloc(mbuf_pool);
if (!mbuf)
{
rte_exit(EXIT_FAILURE, "Alloc\n");
}
mbuf->pkt_len = total_len;
mbuf->data_len = total_len;
uint8_t *pktdata = rte_pktmbuf_mtod(mbuf, uint8_t *);
// 格式化upd数据报文
encode_udp(pktdata, data, total_len);
return mbuf;
}
// 发送arp
static struct rte_mbuf *ng_send_arp(struct rte_mempool *mbuf_pool, uint16_t op, uint8_t *data_mac, uint32_t sip, uint32_t tip)
{
// 从内存池中获取的长度
const unsigned total_len = sizeof(struct rte_ether_hdr) + sizeof(struct rte_arp_hdr);
struct rte_mbuf *mbuf = rte_pktmbuf_alloc(mbuf_pool);
if (!mbuf)
{
rte_exit(EXIT_FAILURE, "Alloc\n");
}
mbuf->pkt_len = total_len;
mbuf->data_len = total_len;
uint8_t *pktdata = rte_pktmbuf_mtod(mbuf, uint8_t *);
// 格式化arp数据报文
encode_arp(pktdata, op, data_mac, sip, tip);
return mbuf;
}
// 发送icmp
static struct rte_mbuf *ng_send_icmp(struct rte_mempool *mbuf_pool, uint8_t *data_mac, uint32_t sip, uint32_t tip, uint16_t ident, uint16_t seq_nb)
{
// 从内存池中获取的长度
const unsigned total_len = sizeof(struct rte_ether_hdr) + sizeof(struct rte_ipv4_hdr) + sizeof(struct rte_icmp_hdr);
struct rte_mbuf *mbuf = rte_pktmbuf_alloc(mbuf_pool);
if (!mbuf)
{
rte_exit(EXIT_FAILURE, "Alloc\n");
}
mbuf->pkt_len = total_len;
mbuf->data_len = total_len;
uint8_t *pktdata = rte_pktmbuf_mtod(mbuf, uint8_t *);
// 格式化upd数据报文
encode_icmp(pktdata, data_mac, sip, tip, ident, seq_nb);
return mbuf;
}
#if ENABLE_TCP_APP
#define TCP_OPTIONS_LEN 10
#define TCP_MAX_SEQ 4294967295
#define TCP_INIT_WINDOWS 14600
typedef enum _TCP_STATUS
{
TCP_STATUS_CLOSE = 0,
TCP_STATUS_LISTEN,
TCP_STATUS_SYN_RCVD,
TCP_STATUS_SYN_SENT,
TCP_STATUS_ESTABLISHED,
TCP_STATUS_FIN_WAIT_1,
TCP_STATUS_FIN_WAIT_2,
TCP_STATUS_CLOSING,
TCP_STATUS_TIME_WAIT,
TCP_STATUS_TIME_CLOSE_WAIT,
TCP_STATUS_TIME_LAST_ACK
} TCP_STATUS;
struct tcp_stream
{
int fd;
uint32_t sip;
uint32_t dip;
uint16_t sport;
uint16_t dport;
uint8_t protocol;
uint8_t localmac[RTE_ETHER_ADDR_LEN];
uint32_t send_next; // seqnum
uint32_t recv_next; // acknum
TCP_STATUS status;
struct rte_ring *sendbuf;
struct rte_ring *recvbuf;
struct tcp_stream *prev;
struct tcp_stream *next;
};
struct tcp_fragment
{
rte_be16_t sport; /**< TCP source port. */
rte_be16_t dport; /**< TCP destination port. */
rte_be32_t seqnum; /**< TX data sequence number. */
rte_be32_t acknum; /**< RX data acknowledgment sequence number. */
uint8_t hdrlen_off; /**< Data offset. */
uint8_t tcp_flags; /**< TCP flags */
rte_be16_t windows; /**< RX flow control window. */
rte_be16_t cksum; /**< TCP checksum. */
rte_be16_t tcp_urp; /**< TCP urgent pointer, if any. */
int option_len;
uint32_t option[TCP_OPTIONS_LEN];
unsigned char *data;
int length;
};
struct tcp_table
{
int count;
struct tcp_stream *tcp_set;
};
struct tcp_table *tInst = NULL;
static int encode_tcp_pkg(uint8_t *msg, uint32_t sip, uint32_t dip, uint8_t *srcmac, uint8_t *dstmac, struct tcp_fragment *fragment);
static struct tcp_table *tcpInstance(void)
{
if (tInst == NULL)
{
tInst = rte_malloc("tcp_table", sizeof(struct tcp_table), 0);
memset(tInst, 0, sizeof(struct tcp_table));
}
return tInst;
}
struct tcp_stream * tcp_stream_search(uint32_t sip, uint32_t dip, uint16_t sport, uint16_t dport)
{
struct tcp_table *table = tcpInstance();
struct tcp_stream *stream;
for(stream = table->tcp_set; stream != NULL; stream = stream->next){
if (stream->sip == sip && stream->dip == dip && stream->sport == sport && stream->dport == dport)
{
return stream;
}
}
return NULL;
}
struct tcp_stream * tcp_stream_create(uint32_t sip, uint32_t dip, uint16_t sport, uint16_t dport)
{
struct tcp_stream *stream = rte_malloc("stream", sizeof(struct tcp_stream), 0);
if (stream == 0) return NULL;
memset(stream, 0, sizeof(struct tcp_stream));
stream->sip = sip;
stream->dip = dip;
stream->sport = sport;
stream->dport = dport;
stream->protocol = IPPROTO_TCP;
// 服务端默认的状态是Listen
stream->status = TCP_STATUS_LISTEN;
stream->sendbuf = rte_ring_create("sendbuf", RING_SIZE, rte_socket_id(), 0);
stream->recvbuf = rte_ring_create("recvbuf", RING_SIZE, rte_socket_id(), 0);
uint32_t next_seed = time(NULL);
stream->send_next = rand_r(&next_seed) & TCP_MAX_SEQ;
rte_memcpy(stream->localmac, gSrcMac, RTE_ETHER_ADDR_LEN);
struct tcp_table *table = tcpInstance();
LL_ADD(stream, table->tcp_set);
table->count++;
return stream;
}
// 发送tcp
static struct rte_mbuf *ng_tcp_pkg(struct rte_mempool *mbuf_pool, uint32_t sip, uint32_t dip,
uint8_t *srcmac, uint8_t *dstmac, struct tcp_fragment *fragment)
{
// 从内存池中获取的长度
const unsigned total_len = sizeof(struct rte_ether_hdr) + sizeof(struct rte_ipv4_hdr) + sizeof(struct rte_tcp_hdr)
+ fragment->length + fragment->option_len * sizeof(uint32_t);
struct rte_mbuf *mbuf = rte_pktmbuf_alloc(mbuf_pool);
if (!mbuf)
{
rte_exit(EXIT_FAILURE, "Alloc\n");
}
mbuf->pkt_len = total_len;
mbuf->data_len = total_len;
uint8_t *pktdata = rte_pktmbuf_mtod(mbuf, uint8_t *);
// 格式化tcp数据报文
printf("格式化tcp数据报文\n");
printf("ng_tcp_pkg Src IP: %" PRIu8 ".%" PRIu8 ".%" PRIu8 ".%" PRIu8 "\n",
sip & 0xFF, (sip >> 8) & 0xFF,
(sip >> 16) & 0xFF, (sip >> 24) & 0xFF);
printf("ng_tcp_pkg Dst IP: %" PRIu8 ".%" PRIu8 ".%" PRIu8 ".%" PRIu8 "\n",
dip & 0xFF, (dip>> 8) & 0xFF,
(dip >> 16) & 0xFF, (dip >> 24) & 0xFF);
printf("port: %d - %d\n", rte_be_to_cpu_16(fragment->sport), rte_be_to_cpu_16(fragment->dport));
encode_tcp_pkg(pktdata, sip, dip, srcmac, dstmac, fragment);
return mbuf;
}
// tcp结构体组装
static int encode_tcp_pkg(uint8_t *msg, uint32_t sip, uint32_t dip, uint8_t *srcmac, uint8_t *dstmac, struct tcp_fragment *fragment)
{
// encode
// 1 ethhdr
/*
| Destination MAC | Source MAC | Type |
|-----------------|-----------------|-------|
| 00 1A 2B 3C 4D 5E | 00 0A 0B 0C 0D 0E | 08 00 |
*/
struct rte_ether_hdr *eth = (struct rte_ether_hdr *)msg;
rte_memcpy(eth->s_addr.addr_bytes, srcmac, RTE_ETHER_ADDR_LEN);
rte_memcpy(eth->d_addr.addr_bytes, dstmac, RTE_ETHER_ADDR_LEN);
eth->ether_type = htons(RTE_ETHER_TYPE_IPV4);
// 2 iphdr
struct rte_ipv4_hdr *ip = (struct rte_ipv4_hdr *)(msg + sizeof(struct rte_ether_hdr));
ip->version_ihl = 0x45;
ip->type_of_service = 0;
const unsigned total_len = sizeof(struct rte_ether_hdr) + sizeof(struct rte_ipv4_hdr) + sizeof(struct rte_tcp_hdr)
+ fragment->length + fragment->option_len * sizeof(uint32_t);
ip->total_length = htons(total_len - sizeof(struct rte_ether_hdr));
ip->packet_id = 0;
ip->fragment_offset = 0;
ip->time_to_live = 64;
ip->next_proto_id = IPPROTO_TCP;
ip->src_addr = sip;
ip->dst_addr = dip;
ip->hdr_checksum = 0;
ip->hdr_checksum = rte_ipv4_cksum(ip);
// 3 tcphdr
struct rte_tcp_hdr *tcp = (struct rte_tcp_hdr *)(msg + sizeof(struct rte_ether_hdr) + sizeof(struct rte_ipv4_hdr));
tcp->src_port = fragment->sport;
tcp->dst_port = fragment->dport;
tcp->sent_seq = htonl(fragment->seqnum);
tcp->recv_ack = htonl(fragment->acknum);
tcp->data_off = fragment->hdrlen_off;
tcp->rx_win = fragment->windows;
tcp->tcp_urp = fragment->tcp_urp;
tcp->tcp_flags = fragment->tcp_flags;
if (fragment->data != NULL)
{
uint8_t *payload = (uint8_t *)(tcp + 1) + fragment->option_len * sizeof(uint32_t);
rte_memcpy(payload, fragment->data, fragment->length);
}
tcp->cksum = 0;
tcp->cksum = rte_ipv4_udptcp_cksum(ip, tcp);
return 0;
}
static int tcp_pkt_out_process(struct rte_mempool *mbuf_pool)
{
struct tcp_table *table = tcpInstance();
struct tcp_stream *stream;
for(stream = table->tcp_set; stream != NULL; stream = stream->next){
struct tcp_fragment *fragment = NULL;
int nb_send = rte_ring_mc_dequeue(stream->sendbuf, (void **)&fragment);
if (nb_send < 0) continue;
uint8_t *dstmac = ng_get_dst_mac(stream->sip);
if (dstmac == NULL)
{
struct rte_mbuf *arpbuf = ng_send_arp(mbuf_pool, RTE_ARP_OP_REQUEST,
gDefaultArpMac, stream->dip, stream->sip);
struct inout_ring *ring = ringInstance();
// 将一次arp请求的逻辑放入out,等待执行
rte_ring_mp_enqueue_burst(ring->out, (void **)&arpbuf, 1, NULL);
// 把当前ol结构体放回ring
rte_ring_mp_enqueue(stream->sendbuf, (void *)fragment);
}
else
{
printf("Get connect\n");
struct rte_mbuf *tcpbuf = ng_tcp_pkg(mbuf_pool, stream->dip, stream->sip, stream->localmac, dstmac, fragment);
struct inout_ring *ring = ringInstance();
rte_ring_mp_enqueue_burst(ring->out, (void **)&tcpbuf, 1, NULL);
rte_free(fragment);
}
}
}
static int tcp_handle_syn_recv(struct tcp_stream *stream, struct rte_tcp_hdr *tcp_hdr)
{
// 发起的ACK请求
if (tcp_hdr->tcp_flags & RTE_TCP_ACK_FLAG)
{
if (stream->status == TCP_STATUS_SYN_RCVD)
{
uint32_t acknum = ntohs(tcp_hdr->recv_ack);
if (acknum == stream->send_next+1)
{
}
stream->status = TCP_STATUS_ESTABLISHED;
}
}
return 0;
}
static int tcp_handle_listen(struct tcp_stream *stream, struct rte_tcp_hdr *tcp_hdr)
{
// 发起的SYN请求
printf("发起的SYN请求\n");
if (tcp_hdr->tcp_flags & RTE_TCP_SYN_FLAG)
{
// 确保首次处理,后续不处理
printf("确保首次处理,后续不处理\n");
if (stream->status == TCP_STATUS_LISTEN)
{
struct tcp_fragment *fragment = rte_malloc("fragment", sizeof(struct tcp_fragment), 0);
if (fragment == NULL) return -1;
memset(fragment, 0, sizeof(struct tcp_fragment));
fragment->sport = tcp_hdr->dst_port;
fragment->dport = tcp_hdr->src_port;
fragment->seqnum = stream->send_next;
fragment->acknum = ntohl(tcp_hdr->sent_seq) + 1;
fragment->tcp_flags = (RTE_TCP_SYN_FLAG | RTE_TCP_ACK_FLAG);
fragment->windows = TCP_INIT_WINDOWS;
fragment->hdrlen_off = 0x50;
fragment->data = NULL;
fragment->length = 0;
printf("组装一个fragment %d\n", stream->send_next);
rte_ring_mp_enqueue(stream->sendbuf, (void *)fragment);
stream->status = TCP_STATUS_SYN_RCVD;
}
}
return 0;
}
static int tcp_udp(struct rte_mbuf *mbuf)
{
struct rte_ipv4_hdr *ip_hdr = rte_pktmbuf_mtod_offset(mbuf, struct rte_ipv4_hdr *, sizeof(struct rte_ether_hdr));
struct rte_tcp_hdr *tcp_hdr = (struct rte_tcp_hdr *)(ip_hdr + 1);
// uint16_t cksum = rte_ipv4_udptcp_cksum(ip_hdr, tcp_hdr);
// if (cksum != ntohs(tcp_hdr->cksum)) return -1;
struct tcp_stream *stream = tcp_stream_search(ip_hdr->src_addr, ip_hdr->dst_addr, tcp_hdr->src_port, tcp_hdr->dst_port);
if (stream == NULL)
{
stream = tcp_stream_create(ip_hdr->src_addr, ip_hdr->dst_addr, tcp_hdr->src_port, tcp_hdr->dst_port);
if (stream == NULL) return -1;
}
switch(stream->status)
{
case TCP_STATUS_CLOSE: // client
break;
case TCP_STATUS_LISTEN: // server
tcp_handle_listen(stream, tcp_hdr);
break;
case TCP_STATUS_SYN_RCVD: // server
tcp_handle_syn_recv(stream, tcp_hdr);
break;
case TCP_STATUS_SYN_SENT: // client
break;
case TCP_STATUS_ESTABLISHED: // all
break;
case TCP_STATUS_FIN_WAIT_1: // client
break;
case TCP_STATUS_FIN_WAIT_2: // client
break;
case TCP_STATUS_CLOSING: // client
break;
case TCP_STATUS_TIME_WAIT: // client
break;
case TCP_STATUS_TIME_CLOSE_WAIT: // server
break;
case TCP_STATUS_TIME_LAST_ACK: // server
break;
}
}
#endif
#if ENABLE_UDP_APP
static int pkt_udp(struct rte_mbuf *mbuf)
{
struct rte_ether_hdr *eth_hdr = rte_pktmbuf_mtod(mbuf, struct rte_ether_hdr *);
struct rte_ipv4_hdr *ip_hdr = (struct rte_ipv4_hdr *)(eth_hdr + 1);
struct rte_udp_hdr *udp_hdr = (struct rte_udp_hdr *)(ip_hdr + 1);
// 提取 UDP 负载
uint16_t udp_len = rte_be_to_cpu_16(udp_hdr->dgram_len) - sizeof(struct rte_udp_hdr);
// 从当前报文内获取源和目的信息临时存储
rte_memcpy(gDstMac, eth_hdr->s_addr.addr_bytes, RTE_ETHER_ADDR_LEN);
rte_memcpy(&gSrcIp, &ip_hdr->dst_addr, sizeof(uint32_t));
rte_memcpy(&gDstIp, &ip_hdr->src_addr, sizeof(uint32_t));
rte_memcpy(&gSrcPort, &udp_hdr->dst_port, sizeof(uint16_t));
rte_memcpy(&gDstPort, &udp_hdr->src_port, sizeof(uint16_t));
// 打印报文信息
printf("Src Port: %" PRIu16 "\n", rte_be_to_cpu_16(udp_hdr->src_port));
printf("Dst Port: %" PRIu16 "\n", rte_be_to_cpu_16(udp_hdr->dst_port));
printf("Payload Length: %" PRIu16 " bytes\n", udp_len);
struct offload *ol = rte_malloc("offload", sizeof(struct offload), 0);
if (ol == NULL) return -2;
struct localhost *host = get_hostinfo_from_ip_port(ip_hdr->dst_addr, ntohs(udp_hdr->dst_port), ip_hdr->next_proto_id);
if (host == NULL) return -3;
ol->sip = ip_hdr->dst_addr;
ol->dip = ip_hdr->src_addr;
ol->sport = udp_hdr->dst_port;
ol->dport = udp_hdr->src_port;
ol->protocol = IPPROTO_UDP;
ol->length = ntohs(udp_hdr->dgram_len);
ol->data = rte_malloc("unsigned char*", ol->length - sizeof(struct rte_udp_hdr), 0);
if (ol->data == NULL)
{
rte_free(ol);
return -1;
}
uint8_t *payload = (uint8_t *)(udp_hdr + 1);
rte_memcpy(ol->data, payload, ol->length - sizeof(struct rte_udp_hdr));
rte_ring_mp_enqueue(host->recvbuf, (void *)ol); // 把整理好之后的包放入recvbuf
pthread_mutex_lock(&host->mutex);
pthread_cond_signal(&host->cond);
pthread_mutex_unlock(&host->mutex);
return 0;
}
// 发送udp
static struct rte_mbuf *ng_udp_pkg(struct rte_mempool *mbuf_pool, uint32_t sip, uint32_t dip,
uint16_t sport, uint16_t dport, uint8_t *srcmac, uint8_t *dstmac, uint8_t *data, uint16_t length)
{
// 从内存池中获取的长度
const unsigned total_len = length + 14 + 20 + 8;
struct rte_mbuf *mbuf = rte_pktmbuf_alloc(mbuf_pool);
if (!mbuf)
{
rte_exit(EXIT_FAILURE, "Alloc\n");
}
mbuf->pkt_len = total_len;
mbuf->data_len = total_len;
uint8_t *pktdata = rte_pktmbuf_mtod(mbuf, uint8_t *);
// 格式化upd数据报文
encode_udp_pkg(pktdata, sip, dip, sport, dport, srcmac, dstmac, data, total_len);
return mbuf;
}
// upd结构体组装
static int encode_udp_pkg(uint8_t *msg, uint32_t sip, uint32_t dip,
uint16_t sport, uint16_t dport, uint8_t *srcmac, uint8_t *dstmac, uint8_t *data, uint16_t length)
{
// encode
// 1 ethhdr
/*
| Destination MAC | Source MAC | Type |
|-----------------|-----------------|-------|
| 00 1A 2B 3C 4D 5E | 00 0A 0B 0C 0D 0E | 08 00 |
*/
struct rte_ether_hdr *eth = (struct rte_ether_hdr *)msg;
rte_memcpy(eth->s_addr.addr_bytes, srcmac, RTE_ETHER_ADDR_LEN);
rte_memcpy(eth->d_addr.addr_bytes, dstmac, RTE_ETHER_ADDR_LEN);
eth->ether_type = htons(RTE_ETHER_TYPE_IPV4);
// 2 iphdr
struct rte_ipv4_hdr *ip = (struct rte_ipv4_hdr *)(msg + sizeof(struct rte_ether_hdr));
ip->version_ihl = 0x45;
ip->type_of_service = 0;
ip->total_length = htons(length - sizeof(struct rte_ether_hdr));
ip->packet_id = 0;
ip->fragment_offset = 0;
ip->time_to_live = 64;
ip->next_proto_id = IPPROTO_UDP;
ip->src_addr = sip;
ip->dst_addr = dip;
ip->hdr_checksum = 0;
ip->hdr_checksum = rte_ipv4_cksum(ip);
// 3 udphdr
struct rte_udp_hdr *udp = (struct rte_udp_hdr *)(msg + sizeof(struct rte_ether_hdr) + sizeof(struct rte_ipv4_hdr));
udp->src_port = sport;
udp->dst_port = dport;
uint16_t udplen = length - sizeof(struct rte_ether_hdr) - sizeof(struct rte_ipv4_hdr);
udp->dgram_len = htons(udplen);
rte_memcpy((uint8_t *)(udp + 1), data, udplen);
udp->dgram_cksum = 0;
udp->dgram_cksum = rte_ipv4_udptcp_cksum(ip, udp);
}
static int upd_pkt_out_process(struct rte_mempool *mbuf_pool)
{
struct localhost *host;
for (host = hosts; host != NULL; host = host->next)
{
struct offload *ol;
// 循环检查所有host的队列
int nb_send = rte_ring_mc_dequeue(host->sendbuf, (void **)&ol);
if (nb_send < 0)
continue;
// int available;
// int nb_send = rte_ring_mc_dequeue_bulk_elem(host->sendbuf, (void **)&ol, sizeof(void *), 1, &available);
// if (nb_send == 0)
// continue;
// 获取当前arp链表中是否有当前目标IP对应的mac地址
uint8_t *dstmac = ng_get_dst_mac(ol->dip);
if (dstmac == NULL)
{
struct rte_mbuf *arpbuf = ng_send_arp(mbuf_pool, RTE_ARP_OP_REQUEST,
gDefaultArpMac, ol->sip, ol->dip);
struct inout_ring *ring = ringInstance();
// 将一次arp请求的逻辑放入out,等待执行
rte_ring_mp_enqueue_burst(ring->out, (void **)&arpbuf, 1, NULL);
// 把当前ol结构体放回ring
rte_ring_mp_enqueue(host->sendbuf, (void *)ol);
}
else
{
struct in_addr addrs;
addrs.s_addr = ol->sip;
// 根据ol构造一个udp包
struct rte_mbuf *udpbuf = ng_udp_pkg(mbuf_pool, ol->sip, ol->dip, ol->sport, ol->dport, host->localmac, dstmac, ol->data, ol->length);
struct in_addr addrd;
addrd.s_addr = ol->dip;
printf("[send] %s:%d, data:%s\n", inet_ntoa(addrd), ntohs(ol->dport), ol->data);
struct inout_ring *ring = ringInstance();
rte_ring_mp_enqueue_burst(ring->out, (void **)&udpbuf, 1, NULL);
}
}
return 0;
}
#endif
// 用户态协议栈解析
static int pkt_process(void *arg)
{
struct rte_mempool *mbuf_pool = (struct rte_mempool *)arg;
struct inout_ring *ring = ringInstance();
while (1)
{
// 从in ring内获取对象
struct rte_mbuf *mbufs[BURST_SIZE];
unsigned int num_recvd = rte_ring_mc_dequeue_burst(ring->in, (void **)mbufs, BURST_SIZE, NULL);
if (num_recvd == 0) continue;
// 处理每个报文
uint16_t i; // 变量声明移到循环外
for (i = 0; i < num_recvd; i++)
{
// 数据链路层
struct rte_ether_hdr *eth_hdr = rte_pktmbuf_mtod(mbufs[i], struct rte_ether_hdr *);
printf("\n=== GET Packet ===[%u]===\n", ntohs(eth_hdr->ether_type));
printf("Src MAC: %02" PRIx8 ":%02" PRIx8 ":%02" PRIx8 ":%02" PRIx8 ":%02" PRIx8 ":%02" PRIx8 "\n",
eth_hdr->s_addr.addr_bytes[0], eth_hdr->s_addr.addr_bytes[1],
eth_hdr->s_addr.addr_bytes[2], eth_hdr->s_addr.addr_bytes[3],
eth_hdr->s_addr.addr_bytes[4], eth_hdr->s_addr.addr_bytes[5]);
printf("Dst MAC: %02" PRIx8 ":%02" PRIx8 ":%02" PRIx8 ":%02" PRIx8 ":%02" PRIx8 ":%02" PRIx8 "\n",
eth_hdr->d_addr.addr_bytes[0], eth_hdr->d_addr.addr_bytes[1],
eth_hdr->d_addr.addr_bytes[2], eth_hdr->d_addr.addr_bytes[3],
eth_hdr->d_addr.addr_bytes[4], eth_hdr->d_addr.addr_bytes[5]);
#if ENABLE_ARP
// 检查是否为 ARP 报文,ARP报文是传输层的协议,所以在数据链路层查看传输的协议类型就能查到
if (eth_hdr->ether_type == rte_cpu_to_be_16(RTE_ETHER_TYPE_ARP))
{
// ARP 请求,在数据链路层上偏移可以获取整个ARP的包
struct rte_arp_hdr *art_hdr = rte_pktmbuf_mtod_offset(mbufs[i], struct rte_arp_hdr *, sizeof(struct rte_ether_hdr));
// 判断是否是访问本机的
if (art_hdr->arp_data.arp_tip != gLocalIp)
{
continue;
}
if (art_hdr->arp_opcode == rte_cpu_to_be_16(RTE_ARP_OP_REQUEST))
{
// 根据请求包构造出来一个应答包,需要把源和目的的数据做一次互换
struct rte_mbuf *arpbuf = ng_send_arp(mbuf_pool, RTE_ARP_OP_REPLY,
art_hdr->arp_data.arp_sha.addr_bytes, art_hdr->arp_data.arp_tip, art_hdr->arp_data.arp_sip);
rte_ring_mp_enqueue_burst(ring->out, (void **)&arpbuf, 1, NULL);
}
else if (art_hdr->arp_opcode == rte_cpu_to_be_16(RTE_ARP_OP_REPLY))
{
struct arp_table *table = arp_table_instance();
// 检查当前ip是否在arp列表内存在
uint8_t *mac = ng_get_dst_mac(art_hdr->arp_data.arp_sip);
if (mac == NULL)
{
// 把当前ip和mac的对应信息存储到arp列表内
struct arp_entry *entry = rte_malloc("arp entry", sizeof(struct arp_entry), 0);
if (entry)
{
memset(entry, 0, sizeof(struct arp_entry));
entry->ip = art_hdr->arp_data.arp_sip;
rte_memcpy(entry->mac, art_hdr->arp_data.arp_sha.addr_bytes, RTE_ETHER_ADDR_LEN);
entry->type = ARP_ENTRY_STATUS_DYNAMIC;
LL_ADD(entry, table->entries);
table->count++;
}
}
#if ENABLE_DEBUG
struct arp_entry *iter;
printf("ARP TABLE\n");
printf("MAC | IP \n");
for (iter = table->entries; iter != NULL; iter = iter->next)
{
print_ether_addr("", (struct rte_ether_addr *)iter->mac);
struct in_addr addr;
addr.s_addr = iter->ip;
printf(" %s\n", inet_ntoa(addr));
}
#endif
}
goto FREE_BUF;
}
#endif
// 检查是否为 IPv4 报文
if (eth_hdr->ether_type != rte_cpu_to_be_16(RTE_ETHER_TYPE_IPV4))
{
continue;
}
// 传输层
struct rte_ipv4_hdr *ip_hdr = (struct rte_ipv4_hdr *)(eth_hdr + 1);
printf("Src IP: %" PRIu8 ".%" PRIu8 ".%" PRIu8 ".%" PRIu8 "\n",
ip_hdr->src_addr & 0xFF, (ip_hdr->src_addr >> 8) & 0xFF,
(ip_hdr->src_addr >> 16) & 0xFF, (ip_hdr->src_addr >> 24) & 0xFF);
printf("Dst IP: %" PRIu8 ".%" PRIu8 ".%" PRIu8 ".%" PRIu8 "\n",
ip_hdr->dst_addr & 0xFF, (ip_hdr->dst_addr >> 8) & 0xFF,
(ip_hdr->dst_addr >> 16) & 0xFF, (ip_hdr->dst_addr >> 24) & 0xFF);
#if ENABLE_ICMP
// 检查是否为 ICMP 报文,ICMP报文是基于ip协议实现的,所以在ip协议的next protocol上可以查看
if (ip_hdr->next_proto_id == IPPROTO_ICMP)
{
struct rte_icmp_hdr *icmp_hdr = (struct rte_icmp_hdr *)(ip_hdr + 1);
if (icmp_hdr->icmp_type == RTE_IP_ICMP_ECHO_REQUEST)
{
struct rte_mbuf *tx_pkts = ng_send_icmp(mbuf_pool, eth_hdr->s_addr.addr_bytes,
ip_hdr->dst_addr, ip_hdr->src_addr, icmp_hdr->icmp_ident, icmp_hdr->icmp_seq_nb);
rte_ring_mp_enqueue_burst(ring->out, (void **)&tx_pkts, 1, NULL);
}
goto FREE_BUF;
}
#endif
// 检查是否为 UDP 报文
if (ip_hdr->next_proto_id == IPPROTO_UDP)
{
// 执行udp的后续逻辑
pkt_udp(mbufs[i]);
}else if (ip_hdr->next_proto_id == IPPROTO_TCP)
{
// 执行tcp的后续逻辑
tcp_udp(mbufs[i]);
}
FREE_BUF:
printf("释放内存\n");
rte_pktmbuf_free(mbufs[i]); // 释放内存
}
// 从sendout ring内获取数据
upd_pkt_out_process(mbuf_pool);
tcp_pkt_out_process(mbuf_pool);
}
}
// 主循环:接收并处理报文
static void brust_loop(struct rte_mempool *mbuf_pool)
{
int lcore_id = rte_lcore_id();
#if ENABLE_TIMER
// 创建一个timer
rte_timer_subsystem_init();
struct rte_timer arp_timer;
rte_timer_init(&arp_timer);
uint64_t hz = rte_get_timer_hz();
rte_timer_reset(&arp_timer, hz, PERIODICAL, lcore_id, arp_request_timer_cb, mbuf_pool);
#endif
#if ENABLE_RING
// 创建环对象
struct inout_ring *ring = ringInstance();
if (ring == NULL)
rte_exit(EXIT_FAILURE, "ring buffer init failed\n");
ring->in = rte_ring_create("in ring", RING_SIZE, rte_socket_id(), RING_F_SP_ENQ | RING_F_SC_DEQ);
ring->out = rte_ring_create("out ring", RING_SIZE, rte_socket_id(), RING_F_SP_ENQ | RING_F_SC_DEQ);
#endif
// 启动用户态的协议处置函数,交给一个单独的lcore执行
printf("启动用户态的协议处置函数,交给一个单独的lcore执行\n");
lcore_id = rte_get_next_lcore(lcore_id, 1, 0);
rte_eal_remote_launch(pkt_process, mbuf_pool, lcore_id);
#if ENABLE_UDP_APP
// 实现一个UDP的server
printf("实现一个UDP的server\n");
lcore_id = rte_get_next_lcore(lcore_id, 1, 0);
rte_eal_remote_launch(udp_server, mbuf_pool, lcore_id);
#endif
struct rte_mbuf *rx_pkts[BURST_SIZE];
printf("Starting packet capture on port %u...\n", gDpdkPortId);
while (1)
{
// 接收报文
uint16_t nb_rx = rte_eth_rx_burst(gDpdkPortId, 0, rx_pkts, BURST_SIZE);
if (nb_rx == 0)
continue;
// 接收到的报文提交到in等待后续消费
rte_ring_sp_enqueue_burst(ring->in, (void **)rx_pkts, nb_rx, NULL);
// 发送报文
struct rte_mbuf *tx_pkts[BURST_SIZE];
// 从out接收报文
unsigned int nb_tx = rte_ring_sc_dequeue_burst(ring->out, (void **)tx_pkts, BURST_SIZE, NULL);
if (nb_tx == 0)
continue;
// 把收到的报文发出去
uint16_t res_pkts = rte_eth_tx_burst(gDpdkPortId, 0, tx_pkts, nb_tx);
printf("发送成功 %d......\n", res_pkts);
// 释放
unsigned i = 0;
for (i = 0; i < nb_tx; i++)
{
rte_pktmbuf_free(tx_pkts[i]);
}
#if ENABLE_TIMER
static uint64_t prev_tsc = 0, cur_tsc, diff_tsc;
uint64_t timer_resolution_cycles = hz * 11; /* around 11s */
cur_tsc = rte_get_timer_cycles();
diff_tsc = cur_tsc - prev_tsc;
if (diff_tsc > timer_resolution_cycles)
{
printf("Timer.....\n");
rte_timer_manage();
prev_tsc = cur_tsc;
}
#endif
}
}
#if ENABLE_UDP_APP
#define BUFFER_SIZE 1024
#define PORT 8888
// udp的server
static int udp_server()
{
// socket(int domain, int type, int protocol);
int listen_fd = nsocket(AF_INET, SOCK_DGRAM, 0);
if (listen_fd == -1)
{
return -1;
}
// bind
struct sockaddr_in addr = {
.sin_family = AF_INET,
.sin_addr.s_addr = inet_addr("192.168.42.128"),
.sin_port = htons(PORT)};
nbind(listen_fd, (struct sockaddr *)&addr, sizeof(addr));
// recv
struct sockaddr_in clientaddr;
socklen_t addrlen = sizeof(struct sockaddr_in);
char buffer[BUFFER_SIZE] = {0};
while (1)
{
// recvfrom(int socket, void *restrict buffer, size_t length, int flags, struct sockaddr *restrict address, socklen_t *restrict address_len)
ssize_t recv_c = nrecvfrom(listen_fd, buffer, BUFFER_SIZE, 0, (struct sockaddr *)&clientaddr, &addrlen);
if (recv_c < 0)
continue;
else
{
printf("[recv] %s:%d, data:%s\n", inet_ntoa(clientaddr.sin_addr), clientaddr.sin_port, buffer);
// sendto(int socket, const void *buffer, size_t length, int flags, const struct sockaddr *dest_addr, socklen_t dest_len);
nsendto(listen_fd, buffer, strlen(buffer), 0, (struct sockaddr *)&clientaddr, sizeof(clientaddr));
}
}
// close
nclose(listen_fd);
}
#endif
#if ENABLE_TCP_APP
// tcp的server
static int tcp_server(){}
#endif
void print_port_info(uint16_t port_id)
{
// 检查网卡是否有效
if (port_id >= rte_eth_dev_count_avail())
{
printf("Error: Invalid port ID %u\n", port_id);
return;
}
// 查看cpu亲和性
RTE_LCORE_FOREACH(port_id)
{
printf("lcore %u is using socket %u\n",
port_id, rte_lcore_to_socket_id(port_id));
}
// 定义存储信息的结构体
struct rte_eth_dev_info dev_info;
struct rte_eth_link link;
struct rte_eth_stats stats;
struct rte_ether_addr mac_addr;
// 获取各类信息
rte_eth_macaddr_get(port_id, &mac_addr);
rte_eth_dev_info_get(port_id, &dev_info);
rte_eth_link_get_nowait(port_id, &link); // DPDK 20.05 使用 link_get_nowait
rte_eth_stats_get(port_id, &stats);
// 打印格式化输出
printf("\n===== Port %u Information =====\n", port_id);
// 1. 基础信息
printf("[Basic Info]\n");
printf(" - MAC Address: %02x:%02x:%02x:%02x:%02x:%02x\n",
mac_addr.addr_bytes[0], mac_addr.addr_bytes[1],
mac_addr.addr_bytes[2], mac_addr.addr_bytes[3],
mac_addr.addr_bytes[4], mac_addr.addr_bytes[5]);
printf(" - Driver Name: %s\n", dev_info.driver_name);
// 2. 链路状态
printf("[Link Status]\n");
printf(" - Status: %s\n", link.link_status ? "UP" : "DOWN");
printf(" - Speed: %u Mbps\n", link.link_speed);
// 3. 队列能力
printf("[Queue Capabilities]\n");
printf(" - Max RX Queues: %u\n", dev_info.max_rx_queues);
printf(" - Max TX Queues: %u\n", dev_info.max_tx_queues);
printf(" - Max RX Descriptors: %u\n", dev_info.rx_desc_lim.nb_max);
printf(" - Max TX Descriptors: %u\n", dev_info.tx_desc_lim.nb_max);
// 4. 统计信息
printf("[Statistics]\n");
printf(" - RX Packets: %" PRIu64 "\n", stats.ipackets); // DPDK 20.05 使用 PRIu64 打印 uint64_t
printf(" - RX Errors: %" PRIu64 "\n", stats.ierrors);
printf(" - RX Bytes: %" PRIu64 "\n", stats.ibytes);
printf(" - TX Packets: %" PRIu64 "\n", stats.opackets);
printf(" - TX Errors: %" PRIu64 "\n", stats.oerrors);
printf(" - TX Bytes: %" PRIu64 "\n", stats.obytes);
printf(" - RX Missed: %" PRIu64 "\n", stats.imissed);
printf(" - RX No-MBUFs: %" PRIu64 "\n", stats.rx_nombuf);
// 5. 高级特性(适配 20.05 的字段名)
printf("[Advanced Features]\n");
printf(" - RSS Support: %s\n", (dev_info.flow_type_rss_offloads != 0) ? "Yes" : "No");
printf(" - VLAN Strip: %s\n", (dev_info.rx_offload_capa & DEV_RX_OFFLOAD_VLAN_STRIP) ? "Yes" : "No"); // 常量名变化
printf("================================\n\n");
}
// 主函数
int main(int argc, char **argv)
{
// 初始化 DPDK 环境
printf("初始化 DPDK 环境\n");
int ret = rte_eal_init(argc, argv);
if (ret < 0)
{
rte_exit(EXIT_FAILURE, "Failed to initialize DPDK EAL %d\n", ret);
}
unsigned int master_lcore = rte_get_main_lcore();
unsigned int worker_count = rte_lcore_count() - 1; // 排除主核
printf("Using %u - %u worker lcores\n", master_lcore, worker_count);
// 创建内存池
struct rte_mempool *mbuf_pool = rte_pktmbuf_pool_create(
"MBUF_POOL", NUM_MBUFS, MBUF_CACHE_SIZE, 0, RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id());
if (mbuf_pool == NULL)
{
rte_exit(EXIT_FAILURE, "Failed to create mbuf pool\n");
}
// 获取网卡数量
uint16_t nb_ports = rte_eth_dev_count_avail();
if (nb_ports == 0)
{
rte_exit(EXIT_FAILURE, "No available Ethernet ports\n");
}
printf("nb_ports: %d\n", nb_ports);
print_port_info(gDpdkPortId);
// 初始化第一个网卡
init_port(mbuf_pool);
// 进入主循环
brust_loop(mbuf_pool);
return 0;
}
send.h
#include <rte_ether.h>
#ifndef __NG_ARP_H__
#define __NG_ARP_H__
#define ARP_ENTRY_STATUS_DYNAMIC 0
#define ARP_ENTRY_STATUS_STATIC 1
#define LL_ADD(item, list) do { \
item->prev = NULL; \
item->next = list; \
if (list != NULL) list->prev = item; \
list = item; \
} while (0); \
#define LL_REMOVE(item, list) do { \
if(item->prev != NULL) item->prev->next = item->next; \
if(item->next != NULL) item->next->prev = item->prev; \
if(list == item) list = item->next; \
item->prev = item->next = NULL; \
} while (0) ; \
struct arp_entry
{
uint32_t ip;
uint8_t mac[RTE_ETHER_ADDR_LEN];
uint8_t type;
struct arp_entry *next;
struct arp_entry *prev;
};
struct arp_table
{
struct arp_entry *entries;
int count;
};
static struct arp_table *arpt = NULL;
static struct arp_table *arp_table_instance(void)
{
if (arpt == NULL)
{
arpt = rte_malloc("arp_table", sizeof(struct arp_table), 0);
if (arpt == NULL)
{
rte_exit(EXIT_FAILURE, "Malloc art_table\n");
}
memset(arpt, 0, sizeof(struct arp_table));
}
return arpt;
}
static uint8_t * ng_get_dst_mac(uint32_t dip)
{
struct arp_entry *iter;
struct arp_table *table = arp_table_instance();
for(iter = table->entries;iter != NULL;iter = iter->next)
{
if (dip == iter->ip)
{
return iter->mac;
}
}
return NULL;
}
#endif
Makefile
# SPDX-License-Identifier: BSD-3-Clause
# Copyright(c) 2010-2014 Intel Corporation
# binary name
APP = send
# all source are stored in SRCS-y
SRCS-y := send2.c
PKGCONF ?= pkg-config
# Build using pkg-config variables if possible
ifneq ($(shell $(PKGCONF) --exists libdpdk && echo 0),0)
$(error "no installation of DPDK found")
endif
all: shared
.PHONY: shared static
shared: build/$(APP)-shared
ln -sf $(APP)-shared build/$(APP)
static: build/$(APP)-static
ln -sf $(APP)-static build/$(APP)
PC_FILE := $(shell $(PKGCONF) --path libdpdk 2>/dev/null)
CFLAGS += -O3 $(shell $(PKGCONF) --cflags libdpdk)
LDFLAGS_SHARED = $(shell $(PKGCONF) --libs libdpdk)
LDFLAGS_STATIC = $(shell $(PKGCONF) --static --libs libdpdk)
CFLAGS += -DALLOW_EXPERIMENTAL_API
build/$(APP)-shared: $(SRCS-y) Makefile $(PC_FILE) | build
$(CC) $(CFLAGS) $(SRCS-y) -g -o $@ $(LDFLAGS) $(LDFLAGS_SHARED)
build/$(APP)-static: $(SRCS-y) Makefile $(PC_FILE) | build
$(CC) $(CFLAGS) $(SRCS-y) -g -o $@ $(LDFLAGS) $(LDFLAGS_STATIC)
build:
@mkdir -p $@
.PHONY: clean
clean:
rm -f build/$(APP) build/$(APP)-static build/$(APP)-shared
test -d build && rmdir -p build || true