Commit f9dbcb5d authored by Tien-Thinh Nguyen's avatar Tien-Thinh Nguyen
Browse files

Test curl multiple interface with Communication_N1N2MessageTransfer

parent 0162e7a8
......@@ -194,12 +194,11 @@ typedef struct qos_profile_s {
#define NNRF_NFM_BASE "/nnrf-nfm/"
#define NNRF_NF_REGISTER_URL "/nf-instances/"
#define NNRF_NF_STATUS_SUBSCRIBE_URL "/subscriptions"
#define NRF_CURL_TIMEOUT_MS 100L
// for CURL
#define AMF_CURL_TIMEOUT_MS 100L
#define NF_CURL_TIMEOUT_MS 100L
#define MAX_WAIT_MSECS 10000 // 1 second
#define AMF_NUMBER_RETRIES 3
#define UDM_CURL_TIMEOUT_MS 100L
#define UDM_NUMBER_RETRIES 3
constexpr auto CURL_MIME_BOUNDARY = "----Boundary";
......
......@@ -3264,6 +3264,7 @@ bool dnn_context::remove_pdu_session(const uint32_t pdu_session_id) {
}
return false;
}
//------------------------------------------------------------------------------
size_t dnn_context::get_number_pdu_sessions() const {
std::shared_lock lock(m_context);
......
......@@ -155,9 +155,32 @@ smf_sbi::smf_sbi() {
Logger::smf_sbi().error("Cannot create task TASK_SMF_SBI");
throw std::runtime_error("Cannot create task TASK_SMF_SBI");
}
curl_global_init(CURL_GLOBAL_DEFAULT);
curl_multi = curl_multi_init();
handles = {};
headers = NULL;
headers = curl_slist_append(headers, "Accept: application/json");
headers = curl_slist_append(headers, "Content-Type: application/json");
headers = curl_slist_append(headers, "charsets: utf-8");
Logger::smf_sbi().startup("Started");
}
//------------------------------------------------------------------------------
smf_sbi::~smf_sbi() {
Logger::smf_sbi().debug("Delete SMF SBI instance...");
// Remove handle, free memory
for (auto h : handles) {
curl_multi_remove_handle(curl_multi, h);
curl_easy_cleanup(h);
}
handles.clear();
curl_multi_cleanup(curl_multi);
curl_global_cleanup();
curl_slist_free_all(headers);
}
//------------------------------------------------------------------------------
void smf_sbi::send_n1n2_message_transfer_request(
std::shared_ptr<itti_n11_create_sm_context_response> sm_context_res) {
......@@ -208,7 +231,7 @@ void smf_sbi::send_n1n2_message_transfer_request(
curl_easy_setopt(
curl, CURLOPT_URL, sm_context_res->res.get_amf_url().c_str());
curl_easy_setopt(curl, CURLOPT_HTTPGET, 1);
curl_easy_setopt(curl, CURLOPT_TIMEOUT_MS, AMF_CURL_TIMEOUT_MS);
curl_easy_setopt(curl, CURLOPT_TIMEOUT_MS, NF_CURL_TIMEOUT_MS);
curl_easy_setopt(curl, CURLOPT_INTERFACE, smf_cfg.sbi.if_name.c_str());
if (sm_context_res->http_version == 2) {
......@@ -324,7 +347,7 @@ void smf_sbi::send_n1n2_message_transfer_request(
curl_easy_setopt(
curl, CURLOPT_URL, sm_session_modification->msg.get_amf_url().c_str());
curl_easy_setopt(curl, CURLOPT_HTTPGET, 1);
curl_easy_setopt(curl, CURLOPT_TIMEOUT_MS, AMF_CURL_TIMEOUT_MS);
curl_easy_setopt(curl, CURLOPT_TIMEOUT_MS, NF_CURL_TIMEOUT_MS);
curl_easy_setopt(curl, CURLOPT_INTERFACE, smf_cfg.sbi.if_name.c_str());
if (sm_session_modification->http_version == 2) {
......@@ -412,7 +435,7 @@ void smf_sbi::send_n1n2_message_transfer_request(
curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers);
curl_easy_setopt(curl, CURLOPT_URL, report_msg->res.get_amf_url().c_str());
curl_easy_setopt(curl, CURLOPT_HTTPGET, 1);
curl_easy_setopt(curl, CURLOPT_TIMEOUT_MS, AMF_CURL_TIMEOUT_MS);
curl_easy_setopt(curl, CURLOPT_TIMEOUT_MS, NF_CURL_TIMEOUT_MS);
curl_easy_setopt(curl, CURLOPT_INTERFACE, smf_cfg.sbi.if_name.c_str());
if (report_msg->http_version == 2) {
......@@ -507,7 +530,7 @@ void smf_sbi::send_sm_context_status_notification(
curl_easy_setopt(
curl, CURLOPT_URL, sm_context_status->amf_status_uri.c_str());
curl_easy_setopt(curl, CURLOPT_HTTPGET, 1);
curl_easy_setopt(curl, CURLOPT_TIMEOUT_MS, AMF_CURL_TIMEOUT_MS);
curl_easy_setopt(curl, CURLOPT_TIMEOUT_MS, NF_CURL_TIMEOUT_MS);
if (sm_context_status->http_version == 2) {
curl_easy_setopt(curl, CURLOPT_VERBOSE, 1L);
......@@ -704,7 +727,7 @@ void smf_sbi::register_nf_instance(
curl_easy_setopt(curl, CURLOPT_URL, url.c_str());
curl_easy_setopt(curl, CURLOPT_HTTPGET, 1);
curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "PUT");
curl_easy_setopt(curl, CURLOPT_TIMEOUT_MS, NRF_CURL_TIMEOUT_MS);
curl_easy_setopt(curl, CURLOPT_TIMEOUT_MS, NF_CURL_TIMEOUT_MS);
if (msg->http_version == 2) {
curl_easy_setopt(curl, CURLOPT_VERBOSE, 1L);
......@@ -804,7 +827,7 @@ void smf_sbi::update_nf_instance(
curl_easy_setopt(curl, CURLOPT_URL, url.c_str());
curl_easy_setopt(curl, CURLOPT_HTTPGET, 1);
curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "PATCH");
curl_easy_setopt(curl, CURLOPT_TIMEOUT_MS, NRF_CURL_TIMEOUT_MS);
curl_easy_setopt(curl, CURLOPT_TIMEOUT_MS, NF_CURL_TIMEOUT_MS);
if (msg->http_version == 2) {
curl_easy_setopt(curl, CURLOPT_VERBOSE, 1L);
......@@ -886,7 +909,7 @@ void smf_sbi::deregister_nf_instance(
curl_easy_setopt(curl, CURLOPT_URL, url.c_str());
curl_easy_setopt(curl, CURLOPT_HTTPGET, 1);
curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "DELETE");
curl_easy_setopt(curl, CURLOPT_TIMEOUT_MS, NRF_CURL_TIMEOUT_MS);
curl_easy_setopt(curl, CURLOPT_TIMEOUT_MS, NF_CURL_TIMEOUT_MS);
if (msg->http_version == 2) {
curl_easy_setopt(curl, CURLOPT_VERBOSE, 1L);
......@@ -951,7 +974,7 @@ void smf_sbi::subscribe_upf_status_notify(
curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers);
curl_easy_setopt(curl, CURLOPT_URL, msg->url.c_str());
curl_easy_setopt(curl, CURLOPT_HTTPGET, 1);
curl_easy_setopt(curl, CURLOPT_TIMEOUT_MS, NRF_CURL_TIMEOUT_MS);
curl_easy_setopt(curl, CURLOPT_TIMEOUT_MS, NF_CURL_TIMEOUT_MS);
if (msg->http_version == 2) {
curl_easy_setopt(curl, CURLOPT_VERBOSE, 1L);
......@@ -1021,7 +1044,7 @@ bool smf_sbi::get_sm_data(
curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers);
curl_easy_setopt(curl, CURLOPT_URL, url.c_str());
curl_easy_setopt(curl, CURLOPT_HTTPGET, 1);
curl_easy_setopt(curl, CURLOPT_TIMEOUT_MS, UDM_CURL_TIMEOUT_MS);
curl_easy_setopt(curl, CURLOPT_TIMEOUT_MS, NF_CURL_TIMEOUT_MS);
curl_easy_setopt(curl, CURLOPT_INTERFACE, smf_cfg.sbi.if_name.c_str());
// Response information.
......@@ -1143,3 +1166,213 @@ bool smf_sbi::get_sm_data(
void smf_sbi::subscribe_sm_data() {
// TODO:
}
//------------------------------------------------------------------------------
CURL* smf_sbi::curl_create_handle(
const std::string& uri, const std::string& data,
std::string& response_data) {
// create handle for a curl request
CURL* curl = curl_easy_init();
if (curl) {
curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers);
curl_easy_setopt(curl, CURLOPT_URL, uri.c_str());
// curl_easy_setopt(curl, CURLOPT_PRIVATE, str);
// curl_easy_setopt(curl, CURLOPT_VERBOSE, 0L);
curl_easy_setopt(curl, CURLOPT_HTTPGET, 1);
curl_easy_setopt(curl, CURLOPT_TIMEOUT_MS, NF_CURL_TIMEOUT_MS);
// Hook up data handling function.
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, &callback);
curl_easy_setopt(curl, CURLOPT_WRITEDATA, &response_data);
curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L);
curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, data.length());
curl_easy_setopt(curl, CURLOPT_POSTFIELDS, data.c_str());
}
return curl;
}
//------------------------------------------------------------------------------
void smf_sbi::send_curl_multi(
const std::string& uri, const std::string& data,
std::string& response_data) {
// create a new handle and add to the multi handle
// the curl will actually be sent in perform_curl_multi
CURL* tmp = curl_create_handle(uri, data, response_data);
curl_multi_add_handle(curl_multi, tmp);
handles.push_back(tmp);
}
//------------------------------------------------------------------------------
void smf_sbi::perform_curl_multi(uint64_t ms) {
//_unused(ms);
int still_running = 0, numfds = 0;
CURLMcode code = curl_multi_perform(curl_multi, &still_running);
do {
code = curl_multi_wait(curl_multi, NULL, 0, 200000, &numfds);
if (code != CURLM_OK) {
Logger::smf_app().debug("curl_multi_wait() returned %d!", code);
}
curl_multi_perform(curl_multi, &still_running);
} while (still_running);
curl_release_handles();
}
//------------------------------------------------------------------------------
void smf_sbi::wait_curl_end() {
// block until activity is detected on at least one of the handles or
// MAX_WAIT_MSECS has passed.
int still_running = 0, numfds = 0;
do {
CURLMcode code = curl_multi_perform(curl_multi, &still_running);
if (code == CURLM_OK) {
code = curl_multi_wait(curl_multi, NULL, 0, MAX_WAIT_MSECS, &numfds);
if (code != CURLM_OK) break;
} else {
break;
}
} while (still_running);
curl_release_handles();
}
//------------------------------------------------------------------------------
void smf_sbi::curl_release_handles() {
CURLMsg* curl_msg = nullptr;
CURL* curl = nullptr;
CURLcode code = {};
int http_code = 0;
int msgs_left = 0;
while ((curl_msg = curl_multi_info_read(curl_multi, &msgs_left))) {
if (curl_msg && curl_msg->msg == CURLMSG_DONE) {
curl = curl_msg->easy_handle;
code = curl_msg->data.result;
if (code != CURLE_OK) {
Logger::smf_app().debug("CURL error code %d!", curl_msg->data.result);
continue;
}
// Get HTTP code
curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &http_code);
Logger::smf_app().debug("Got response with HTTP code %d!", http_code);
std::string curl_url;
int res = curl_easy_getinfo(curl, CURLINFO_EFFECTIVE_URL, &curl_url);
if (res == CURLE_OK) {
std::unique_lock lock(m_curl_handle_promises);
if (curl_handle_promises.count(curl_url) > 0) {
curl_handle_promises[curl_url]->set_value(std::to_string(http_code));
// Remove this promise from list
curl_handle_promises.erase(curl_url);
}
}
// TODO: remove handle from the multi session and end this handle now, or
// later
curl_multi_remove_handle(curl_multi, curl);
curl_easy_cleanup(curl);
std::vector<CURL*>::iterator it;
it = find(handles.begin(), handles.end(), curl);
if (it != handles.end()) {
handles.erase(it);
}
} else if (curl_msg) {
curl = curl_msg->easy_handle;
Logger::smf_app().debug("Error after curl_multi_info_read()");
curl_multi_remove_handle(curl_multi, curl);
curl_easy_cleanup(curl);
std::vector<CURL*>::iterator it;
it = find(handles.begin(), handles.end(), curl);
if (it != handles.end()) {
handles.erase(it);
}
} else {
Logger::smf_app().debug("curl_msg null");
}
}
}
//------------------------------------------------------------------------------
// Example with curl multiple for TESTING PURPOSE
void smf_sbi::send_n1n2_message_transfer_request_curl_multi(
std::shared_ptr<itti_n11_create_sm_context_response> sm_context_res) {
Logger::smf_sbi().debug(
"Send Communication_N1N2MessageTransfer to AMF (HTTP version %d)",
sm_context_res->http_version);
mime_parser parser = {};
std::string n1_message = sm_context_res->res.get_n1_sm_message();
nlohmann::json json_data = {};
std::string body;
sm_context_res->res.get_json_data(json_data);
std::string json_part = json_data.dump();
// add N2 content if available
auto n2_sm_found = json_data.count("n2InfoContainer");
if (n2_sm_found > 0) {
std::string n2_message = sm_context_res->res.get_n2_sm_information();
// prepare the body content for Curl
parser.create_multipart_related_content(
body, json_part, CURL_MIME_BOUNDARY, n1_message, n2_message);
} else {
// prepare the body content for Curl
parser.create_multipart_related_content(
body, json_part, CURL_MIME_BOUNDARY, n1_message,
multipart_related_content_part_e::NAS);
}
Logger::smf_sbi().debug(
"Send Communication_N1N2MessageTransfer to AMF, body %s", body.c_str());
uint32_t str_len = body.length();
char* data = (char*) malloc(str_len + 1);
memset(data, 0, str_len + 1);
memcpy((void*) data, (void*) body.c_str(), str_len);
std::string response_data;
// send_curl_multi(sm_context_res->res.get_amf_url(), body, response_data);
// create a new handle and add to the multi handle
// the curl will actually be sent in perform_curl_multi
CURL* tmp = curl_create_handle(
sm_context_res->res.get_amf_url(), body, response_data);
curl_multi_add_handle(curl_multi, tmp);
handles.push_back(tmp);
boost::shared_ptr<boost::promise<std::string>> p =
boost::make_shared<boost::promise<std::string>>();
boost::shared_future<std::string> f;
f = p->get_future();
// Generate ID for this promise (to be used in SMF-APP)
// uint32_t promise_id = 1; // generate_promise_id();
// Logger::smf_sbi().debug("Promise ID generated %d", promise_id);
add_promise(sm_context_res->res.get_amf_url(), p);
perform_curl_multi(
0); // TODO: current time as parameter if curl is performed per event
// Wait for the response back
std::string response_msg = f.get();
Logger::smf_sbi().debug(
"Got result for promise ID %s",
sm_context_res->res.get_amf_url().c_str());
Logger::smf_sbi().debug("Response data %s", response_data.c_str());
free_wrapper((void**) &data);
}
//---------------------------------------------------------------------------------------------
void smf_sbi::add_promise(
std::string id, boost::shared_ptr<boost::promise<std::string>>& p) {
std::unique_lock lock(m_curl_handle_promises);
curl_handle_promises.emplace(id, p);
}
......@@ -32,6 +32,8 @@
#include <map>
#include <thread>
#include <boost/thread.hpp>
#include <boost/thread/future.hpp>
#include <curl/curl.h>
#include "3gpp_29.503.h"
#include "smf.h"
......@@ -43,11 +45,21 @@ namespace smf {
class smf_sbi {
private:
CURLM* curl_multi;
std::vector<CURL*> handles;
struct curl_slist* headers;
mutable std::shared_mutex m_curl_handle_promises;
std::map<std::string, boost::shared_ptr<boost::promise<std::string>>>
curl_handle_promises;
std::thread::id thread_id;
std::thread thread;
public:
smf_sbi();
virtual ~smf_sbi();
smf_sbi(smf_sbi const&) = delete;
void operator=(smf_sbi const&) = delete;
......@@ -162,6 +174,54 @@ class smf_sbi {
*
*/
void subscribe_sm_data();
/*
* Create Curl handle for multi curl
* @param [const std::string &] uri: URI of the subscribed NF
* @param [std::string &] data: data to be sent
* @param [std::string &] response_data: response data
* @return pointer to the created curl
*/
CURL* curl_create_handle(
const std::string& uri, const std::string& data,
std::string& response_data);
/*
* Prepare to send a request using curl multi
* @param [const std::string &] uri: URI of the subscribed NF
* @param [std::string &] data: data to be sent
* @param [std::string &] response_data: response data
* @return void
*/
void send_curl_multi(
const std::string& uri, const std::string& data,
std::string& response_data);
/*
* Perform curl multi to actually process the available data
* @param [uint64_t ms] ms: current time
* @return void
*/
void perform_curl_multi(uint64_t ms);
/*
* Finish all the curl transfers
* @param void
* @return void
*/
void wait_curl_end();
/*
* Release all the handles
* @param void
* @return void
*/
void curl_release_handles();
void send_n1n2_message_transfer_request_curl_multi(
std::shared_ptr<itti_n11_create_sm_context_response> sm_context_res);
void add_promise(
std::string id, boost::shared_ptr<boost::promise<std::string>>& p);
};
} // namespace smf
#endif /* FILE_SMF_SBI_HPP_SEEN */
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment