dpdk收发数据

使用dpdk收发数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
#include <rte_eal.h>
#include <rte_ethdev.h>
#include <rte_mbuf.h>

#include <stdio.h>
#include <arpa/inet.h>


#define NUM_MBUFS (4096-1)

#define BURST_SIZE 32
#define ENABLE_SEND 1

#if ENABLE_SEND

static uint32_t gSrcIp;
static uint32_t gDstIp;
static uint8_t gSrcMac[RTE_ETHER_ADDR_LEN];
static uint8_t gDstMac[RTE_ETHER_ADDR_LEN];
static uint16_t gSrcPort;
static uint16_t gDstPort;

#endif


int gDpdkPortId = 0;

static const struct rte_eth_conf port_conf_default = {
.rxmode = {.max_rx_pkt_len = RTE_ETHER_MAX_LEN }
};

static void ng_init_port(struct rte_mempool *mbuf_pool) {

uint16_t nb_sys_ports= rte_eth_dev_count_avail(); //
if (nb_sys_ports == 0) {
rte_exit(EXIT_FAILURE, "No Supported eth found\n");
}

struct rte_eth_dev_info dev_info;
rte_eth_dev_info_get(gDpdkPortId, &dev_info); //

const int num_rx_queues = 1;
const int num_tx_queues = 1;
struct rte_eth_conf port_conf = port_conf_default;
rte_eth_dev_configure(gDpdkPortId, num_rx_queues, num_tx_queues, &port_conf);

if (rte_eth_rx_queue_setup(gDpdkPortId, 0 , 128, //队列和mbuf_pool有关联
rte_eth_dev_socket_id(gDpdkPortId),NULL, mbuf_pool) < 0) {

rte_exit(EXIT_FAILURE, "Could not setup RX queue\n");
}

#if ENABLE_SEND

struct rte_eth_txconf txq_conf = dev_info.default_txconf;
txq_conf.offloads = port_conf.rxmode.offloads;
if (rte_eth_tx_queue_setup(gDpdkPortId, 0 , 1024, //gDpdkPortId号网卡的第0号队列,最大容纳128个包
rte_eth_dev_socket_id(gDpdkPortId) , &txq_conf) < 0) {
rte_exit(EXIT_FAILURE, "Could not setup TX queue\n");
}

#endif

if (rte_eth_dev_start(gDpdkPortId) < 0 ) {
rte_exit(EXIT_FAILURE, "Could not start\n");
}
//rte_eth_promiscuous_enable(gDpdkPortId); //混杂模式
}

static int ng_encode_udp_pkt(uint8_t* msg , unsigned char* data , uint16_t total_len) {
//在msg所指的位置用data替代

//eth
struct rte_ether_hdr* eth = (struct rte_ether_hdr*)msg;
rte_memcpy(&eth->s_addr , &gSrcMac , RTE_ETHER_ADDR_LEN);
rte_memcpy(&eth->d_addr, &gDstMac, RTE_ETHER_ADDR_LEN);
eth->ether_type = htons(RTE_ETHER_TYPE_IPV4);
//ip
struct rte_ipv4_hdr* ip = (struct rte_ipv4_hdr*)(msg + sizeof(struct rte_ether_hdr));
ip->version_ihl = 0x45;
ip->type_of_service = 0;
ip->total_length = htonl(total_len - sizeof(struct rte_ether_hdr));
ip->packet_id = 0;
ip->fragment_offset = 0;
ip->time_to_live = 64;
ip->next_proto_id = IPPROTO_UDP;
ip->src_addr = gSrcIp;
ip->dst_addr = gDstIp;
ip->hdr_checksum = 0;
ip->hdr_checksum = rte_ipv4_cksum(ip);
//udp
struct rte_udp_hdr* udp = (struct rte_udp_hdr*)(ip + 1);
udp->src_port = gSrcPort;
udp->dst_port = gDstPort;
udp->dgram_len = htons(total_len - sizeof(struct rte_ether_hdr) - sizeof(struct rte_ipv4_hdr));
uint16_t udp_len = total_len - sizeof(struct rte_ether_hdr) - sizeof(struct rte_ipv4_hdr);
rte_memcpy(udp+1 , data , udp_len);
udp->dgram_cksum= 0;
udp->dgram_cksum = rte_ipv4_udptcp_cksum(ip ,udp);

return 0;
}

static struct rte_mbuf* ng_send(struct rte_mempool* mbuf_pool, unsigned char* msg , uint16_t length) {
//从mbufpool 中获取 mbuf
const unsigned total_len = 14 + 20 + 8 + length; //eth + ip + udp + 应用数据

struct rte_mbuf* mbuf = rte_pktmbuf_alloc(mbuf_pool);
mbuf->pkt_len = total_len;
mbuf->data_len = total_len;

uint8_t* pktdata = rte_pktmbuf_mtod(mbuf, uint8_t*);
ng_encode_udp_pkt(pktdata , msg ,length);

return mbuf;
}


int main(int argc, char *argv[]) {

if (rte_eal_init(argc, argv) < 0) {
rte_exit(EXIT_FAILURE, "Error with EAL init\n");
}
printf("init success!! \n");

struct rte_mempool *mbuf_pool = rte_pktmbuf_pool_create("mbuf pool", NUM_MBUFS,
0, 0, RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id());
if (mbuf_pool == NULL) {
rte_exit(EXIT_FAILURE, "Could not create mbuf pool\n");
}

rte_eth_macaddr_get(gDpdkPortId , (struct rte_ether_addr*)gSrcMac);

ng_init_port(mbuf_pool);
printf("mbuf_pool init success!! \n");

while(1) {
struct rte_mbuf* mbufs[BUFSIZ];
unsigned num_recvd = rte_eth_rx_burst(gDpdkPortId , 0 , mbufs , BUFSIZ);
if(num_recvd > BUFSIZ) {
rte_exit(EXIT_FAILURE , "Error receiving from eth \n");
}

unsigned int i = 0;
for(i=0 ; i<num_recvd ; ++i) {
struct rte_ether_hdr* ehdr = rte_pktmbuf_mtod(mbufs[i] , struct rte_ether_hdr*);
if(ehdr->ether_type != rte_cpu_to_be_16(RTE_ETHER_TYPE_IPV4)) {
rte_pktmbuf_free(mbufs[i]);
continue;
}
struct rte_ipv4_hdr* iphdr = rte_pktmbuf_mtod_offset(mbufs[i] , struct rte_ipv4_hdr* , sizeof(struct rte_ether_hdr));
if(iphdr->next_proto_id == IPPROTO_UDP) {
struct rte_udp_hdr* udphdr = (struct rte_udp_hdr*)((char*)iphdr + sizeof(struct rte_ipv4_hdr));
#if ENABLE_SEND
rte_memcpy(&gDstMac , &ehdr->d_addr , RTE_ETHER_ADDR_LEN);
gSrcIp = iphdr->dst_addr;
gDstIp = iphdr->src_addr;
gSrcPort = udphdr->dst_port;
gDstPort = udphdr->src_port;
#endif

uint16_t length = htons(udphdr->dgram_len);
*((char*)(udphdr + length)) = '\0';

struct in_addr addr;
addr.s_addr = iphdr->src_addr;
printf("src:%s %d " , inet_ntoa(addr) , ntohs(udphdr->src_port));

addr.s_addr = iphdr->dst_addr;
printf("dst:%s %d length=%d:%s\n" , inet_ntoa(addr) , ntohs(udphdr->dst_port) ,
length , (char*)((char*)udphdr + sizeof(udphdr)));

#if ENABLE_SEND
struct rte_mbuf* txbuf = ng_send(mbuf_pool , (unsigned char*)(udphdr + 1) , length);
rte_eth_tx_burst(gDpdkPortId , 0 , &txbuf , 1);
rte_pktmbuf_free(txbuf);
#endif
rte_pktmbuf_free(mbufs[i]);
}
}
}
}

使用API

在DPDK中,rte前缀代表Runtime Environment,即运行环境。DPDK的主要对外函数接口都以rte_作为前缀,抽象化函数接口是典型软件设计思路,可以帮助DPDK运行在多个操作系统上。

1
int rte_eal_init	(	int 	argc,    char ** 	argv )	;

此函数用于初始化环境抽象层,函数成功时返回值大于或等于0,所有参数 argv[x] (x < 返回值) 可能已经被这个函数修改,失败返回 -1,并设置rte_error

1
void rte_exit(int exit_code , const char* format);

此函数用于立即终止应用程序,打印错误信息并将退出码返回shell

1
2
struct rte_mempool* rte_pktmbuf_pool_create(const char* name , unsigned cache_size , uint16_t priv_size 
, uint16_t data_room_size , int socket_id);
  • name:内存池的名称。
  • n:内存池中的元素数量。
  • cache_size:每个 CPU 缓存的大小,单位为元素数目,如果为 0 则表示禁用缓存。
  • priv_size:每个元素的私有数据空间大小。可以使用 0 表示没有私有数据。
  • data_room_size:每个元素中存储数据的空间大小。
  • socket_id:内存池所在的 NUMA 节点编号。

该函数返回一个指向新创建的 mempool 的指针。这个 mempool 可以通过 rte_pktmbuf_alloc 和 rte_pktmbuf_free 命令进行分配和释放。

dpdk中一个进程确定一个内存池,将发送数据和接收数据都放在内存池中

1
unsigned int rte_socket_id(void);

返回当前正常运行此函数网卡对应的socketid

1
uint16_t rte_eth_dev_count_avail(void);

获取可用于dpdk的网卡的个数

1
2
int rte_eth_dev_configure	(uint16_t  port_id , uint16_t  nb_rx_queue,
uint16_t nb_tx_queue , const struct rte_eth_conf* eth_conf )

此函数用于配置网卡设备,此函数必须被调用在任何与网卡api有关的接口之前

  • port_id:要配置的网卡对应的id
  • nb_rx_queue:接收队列的个数
  • nb_tx_queue:发送队列的个数
  • eth_conf:指向要对网卡进行的配置操作的结构体

成功返回 0 失败返回 < 0 的错误号

1
2
3
int rte_eth_rx_queue_setup	(uint16_t 	port_id  ,  uint16_t 	rx_queue_id,
uint16_t nb_rx_desc , unsigned int socket_id,
const struct rte_eth_rxconf *rx_conf , struct rte_mempool *mb_pool )

该函数从与 socket_id 关联的内存区域中分配一个连续的内存块,用于存放 nb_rx_desc 个接收描述符,并使用从内存池 mb_pool 中分配的网络缓冲区初始化每个接收描述符。

  • port_id:以太网设备的端口标识符。每个物理网卡都有一个唯一的端口 ID,用于在 DPDK 中标识该设备。
  • rx_queue_id:要设置的接收队列的索引。每个物理网卡可以有多个接收队列,用于并行处理接收到的数据包。该值必须在之前调用 rte_eth_dev_configure() 函数时指定的范围 [0, nb_rx_queue - 1] 内。
  • nb_rx_desc:要为接收环分配的接收描述符的数量。接收描述符用于描述数据包在接收环中的位置和状态。该值通常由应用程序的性能需求和物理网卡的特性决定。
  • socket_id:在 NUMA 系统中,指定用于分配接收描述符的内存所在的 NUMA 节点 ID。如果没有 NUMA 限制,则可以设置为 SOCKET_ID_ANY。
  • rx_conf:指向要用于接收队列的配置数据的指针。如果设置为 NULL,则使用默认的接收配置
  • mb_pool:指向内存池的指针,用于分配 rte_mbuf 网络内存缓冲区,以填充接收环的每个描述符。rte_mbuf 是 DPDK 中用于存储数据包数据的结构。

返回 0 表示成功

1
2
static uint16_t rte_eth_rx_burst(uint16_t port_id,uint16_t queue_id,
struct rte_mbuf** rx_pkts , const uint16_t nb_pkts )

从对应的网卡对应的队列中收数据包,被检索到的数据包被存放到rx_pkts数组中指向的rte_mbuf结构体中,此函数是非阻塞调用,这意味着它会立即返回,即使没有可用的输入数据包。如果没有可用的输入数据包,它将返回 0。

  • port_id:以太网设备端口标识符。
  • queue_id:要检索数据包的接收队列索引。该值必须在之前提供给 rte_eth_dev_configure() 的范围 [0, nb_rx_queue - 1] 内。
  • rx_pkts:用于存储检索到的数据包的 rte_mbuf 结构指针数组。
  • nb_pkts:要检索的最大数据包数。

返回实际检索到的数据包数,指示 rx_pkts 数组中填充了多少 rte_mbuf 指针。无错误通知:该函数不提供错误通知以避免开销。

1
2
3
#define rte_pktmbuf_mtod(m, t) rte_pktmbuf_mtod_offset(m, t, 0)
#define rte_pktmbuf_mtod_offset(m, t, o) \
((t)((char *)(m)->buf_addr + (m)->data_off + (o)))

rte_pktmbuf_mtod() 是 DPDK 中用于将 rte_mbuf 结构指针转换为特定类型指针的接口。全称是 rte_mbuf to data pointer,即“rte_mbuf 结构指针到数据指针”。