23 #include <amqp_tcp_socket.h> 35 amqp_connection_state_t
conn;
45 #define DEFAULT_CHANNEL 1 47 #define OFFSET(x) offsetof(AMQPContext, x) 48 #define D AV_OPT_FLAG_DECODING_PARAM 49 #define E AV_OPT_FLAG_ENCODING_PARAM 63 const char *user, *password =
NULL;
64 const char *user_decoded, *password_decoded;
66 amqp_rpc_reply_t broker_reply;
67 struct timeval tval = { 0 };
75 hostname,
sizeof(hostname), &port,
NULL, 0, uri);
80 if (hostname[0] ==
'\0' || port <= 0 || port > 65535 ) {
85 p = strchr(credentials,
':');
91 if (!password || *password ==
'\0')
95 if (!password_decoded)
108 s->
conn = amqp_new_connection();
119 goto destroy_connection;
127 ret = amqp_socket_open_noblock(s->
socket, hostname, port, &tval);
131 amqp_error_string2(ret));
132 goto destroy_connection;
135 broker_reply = amqp_login(s->
conn,
"/", 0, s->
pkt_size, 0,
136 AMQP_SASL_METHOD_PLAIN, user_decoded, password_decoded);
138 if (broker_reply.reply_type != AMQP_RESPONSE_NORMAL) {
140 server_msg = AMQP_ACCESS_REFUSED;
141 goto close_connection;
145 broker_reply = amqp_get_rpc_reply(s->
conn);
147 if (broker_reply.reply_type != AMQP_RESPONSE_NORMAL) {
149 server_msg = AMQP_CHANNEL_ERROR;
150 goto close_connection;
154 amqp_bytes_t queuename;
156 amqp_queue_declare_ok_t *
r;
159 0, 0, 0, 1, amqp_empty_table);
160 broker_reply = amqp_get_rpc_reply(s->
conn);
161 if (!r || broker_reply.reply_type != AMQP_RESPONSE_NORMAL) {
163 server_msg = AMQP_RESOURCE_ERROR;
168 queuename.bytes = queuename_buff;
170 memcpy(queuename.bytes, r->queue.bytes, queuename.len);
174 amqp_cstring_bytes(s->
routing_key), amqp_empty_table);
176 broker_reply = amqp_get_rpc_reply(s->
conn);
177 if (broker_reply.reply_type != AMQP_RESPONSE_NORMAL) {
179 server_msg = AMQP_INTERNAL_ERROR;
184 0, 1, 0, amqp_empty_table);
186 broker_reply = amqp_get_rpc_reply(s->
conn);
187 if (broker_reply.reply_type != AMQP_RESPONSE_NORMAL) {
189 server_msg = AMQP_INTERNAL_ERROR;
201 amqp_connection_close(s->
conn, server_msg);
203 amqp_destroy_connection(s->
conn);
214 int fd = amqp_socket_get_sockfd(s->
socket);
217 amqp_basic_properties_t props;
223 props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG;
224 props.content_type = amqp_cstring_bytes(
"octet/stream");
225 props.delivery_mode = 2;
242 int fd = amqp_socket_get_sockfd(s->
socket);
245 amqp_rpc_reply_t broker_reply;
252 amqp_maybe_release_buffers(s->
conn);
253 broker_reply = amqp_consume_message(s->
conn, &envelope,
NULL, 0);
255 if (broker_reply.reply_type != AMQP_RESPONSE_NORMAL)
258 if (envelope.message.body.len > size) {
261 "Message will be truncated. Setting -pkt_size %d " 264 size =
FFMIN(size, envelope.message.body.len);
266 memcpy(buf, envelope.message.body.bytes, size);
267 amqp_destroy_envelope(&envelope);
276 amqp_connection_close(s->
conn, AMQP_REPLY_SUCCESS);
277 amqp_destroy_connection(s->
conn);
296 .priv_data_class = &amqp_context_class,
void av_url_split(char *proto, int proto_size, char *authorization, int authorization_size, char *hostname, int hostname_size, int *port_ptr, char *path, int path_size, const char *url)
Split a URL string into components.
static int amqp_proto_write(URLContext *h, const unsigned char *buf, int size)
#define URL_PROTOCOL_FLAG_NETWORK
#define AV_LOG_WARNING
Something somehow does not look correct.
#define LIBAVUTIL_VERSION_INT
int is_streamed
true if streamed (no seek possible), default = false
AVIOInterruptCB interrupt_callback
const char * av_default_item_name(void *ptr)
Return the context name.
#define AVIO_FLAG_READ
read-only
int64_t rw_timeout
maximum time to wait for (network) read/write operation completion, in mcs
amqp_connection_state_t conn
const char * class_name
The name of the class; usually it is the same name as the context structure type to which the AVClass...
const URLProtocol ff_libamqp_protocol
static int amqp_proto_read(URLContext *h, unsigned char *buf, int size)
char * ff_urldecode(const char *url, int decode_plus_sign)
Decodes an URL from its percent-encoded form back into normal representation.
#define AV_LOG_ERROR
Something went wrong and cannot losslessly be recovered.
int ff_network_wait_fd_timeout(int fd, int write, int64_t timeout, AVIOInterruptCB *int_cb)
This works similarly to ff_network_wait_fd, but waits up to 'timeout' microseconds Uses ff_network_wa...
static void envelope(VectorscopeContext *s, AVFrame *out)
static const AVClass amqp_context_class
static int amqp_proto_open(URLContext *h, const char *uri, int flags)
Describe the class of an AVClass context structure.
static const AVOption options[]
#define flags(name, subs,...)
static int amqp_proto_close(URLContext *h)
int max_packet_size
if non zero, the stream is packetized with this max packet size
unbuffered private I/O API
#define AVERROR_EXTERNAL
Generic error in an external library.
int64_t connection_timeout